jq Data Engineering Reference
Overview
jq is the core tool for security data engineering - filtering, transforming, and enriching JSON before SIEM ingestion. These patterns apply to:
-
Monad transforms (QRadar → Sentinel migration)
-
kubectl JSON processing (k8s observability)
-
netapi ISE/Wazuh (security telemetry)
-
API responses (any JSON source)
Core Patterns
Filter (select)
Keep only what matters - reduce SIEM ingestion costs.
# Filter failures only (critical → Sentinel)
jq '.[] | select(.passed == false)'
# Filter by severity (high/critical → Sentinel, rest → local)
jq '.[] | select(.severity >= 7)'
# Filter non-Running pods
jq '.items[] | select(.status.phase != "Running")'
# Multiple conditions (AND)
jq '.[] | select(.severity >= 5 and .category == "auth")'
# Multiple conditions (OR)
jq '.[] | select(.type == "login_failure" or .type == "privilege_escalation")'
Transform (reshape)
Normalize data to common schema (OCSF, CEF, etc.)
# Reshape to SIEM-friendly format
jq '.[] | {
timestamp: .acs_timestamp,
event_type: "radius_auth",
outcome: (if .passed then "success" else "failure" end),
user: .user_name,
source_ip: .framed_ip_address
}'
# Rename fields (source → target schema)
jq '.[] | {
"@timestamp": .timestamp,
"event.category": "authentication",
"event.outcome": .result,
"user.name": .username
}'
# Add static enrichment
jq '.[] | . + {pipeline: "monad", environment: "production"}'
Aggregate (group_by, count)
Summarize for dashboards and alerts.
# Count by field
jq '[.[].namespace] | group_by(.) | map({key: .[0], count: length})'
# Count by multiple fields
jq 'group_by(.user, .action) | map({user: .[0].user, action: .[0].action, count: length})'
# Top N (most frequent)
jq '[.[].source_ip] | group_by(.) | map({ip: .[0], count: length}) | sort_by(-.count) | .[0:10]'
Enrich (add context)
Add context before SIEM ingestion.
# Add severity label
jq '.[] | . + {
severity_label: (
if .severity >= 9 then "critical"
elif .severity >= 7 then "high"
elif .severity >= 4 then "medium"
else "low"
end
)
}'
# Add geo context (from lookup)
jq --slurpfile geo /tmp/ip-geo.json '
.[] | . + {country: ($geo[0][.source_ip] // "unknown")}
'
kubectl Patterns
Pod Analysis
# Get pods as JSON
kubectl get pods -A -o json > /tmp/pods.json
# Find non-Running pods
jq '.items[] | select(.status.phase != "Running" and .status.phase != "Succeeded") | {
namespace: .metadata.namespace,
name: .metadata.name,
phase: .status.phase
}' /tmp/pods.json
# Count by namespace
jq '[.items[].metadata.namespace] | group_by(.) | map({namespace: .[0], count: length})' /tmp/pods.json
# Find crashlooping
jq '.items[] | select(.status.containerStatuses[]?.restartCount > 5) | {
name: .metadata.name,
restarts: .status.containerStatuses[0].restartCount
}' /tmp/pods.json
# Deep status inspection (container level)
jq '.items[] | select(.metadata.name == "POD_NAME") | {
phase: .status.phase,
conditions: .status.conditions,
containerStatuses: .status.containerStatuses
}' /tmp/pods.json
Service Analysis
# Get services as JSON
kubectl get svc -A -o json > /tmp/svc.json
# Find LoadBalancer services with external IPs
jq '.items[] | select(.spec.type == "LoadBalancer") | {
namespace: .metadata.namespace,
name: .metadata.name,
externalIP: .status.loadBalancer.ingress[0].ip,
ports: [.spec.ports[] | "\(.port)/\(.protocol)"]
}' /tmp/svc.json
# Find services without endpoints
jq '.items[] | select(.spec.clusterIP != "None") | {
name: .metadata.name,
selector: .spec.selector
}' /tmp/svc.json
netapi ISE Patterns
Authentication Analysis
# Get sessions as JSON
netapi ise --format json mnt sessions > /tmp/sessions.json
# Filter failures
jq '.[] | select(.passed == false) | {
time: .acs_timestamp,
user: .user_name,
mac: .calling_station_id,
reason: .failure_reason
}' /tmp/sessions.json
# Transform for SIEM
jq '.[] | {
"@timestamp": .acs_timestamp,
"event.category": "authentication",
"event.outcome": (if .passed then "success" else "failure" end),
"user.name": .user_name,
"source.mac": .calling_station_id,
"network.vlan.id": .selected_azn_profiles
}' /tmp/sessions.json
# Top failed users
jq '[.[] | select(.passed == false) | .user_name] | group_by(.) | map({user: .[0], failures: length}) | sort_by(-.failures) | .[0:10]' /tmp/sessions.json
Policy Analysis
# Get policy sets
netapi ise --format json get-policy-sets > /tmp/policies.json
# Active policies with hit counts
jq '.response[] | select(.state == "enabled") | {
name: .name,
hitCounts: .hitCounts,
rank: .rank
}' /tmp/policies.json
Monad-Style Pipeline
Simulate Monad transform pipeline locally:
# Step 1: Fetch raw data
netapi ise --format json mnt sessions > /tmp/raw.json
# Step 2: Filter (critical only → Sentinel)
jq '[.[] | select(.passed == false or .selected_azn_profiles | contains("Admin"))]' /tmp/raw.json > /tmp/critical.json
# Step 3: Transform to OCSF/CEF
jq '.[] | {
class_uid: 3002,
class_name: "Authentication",
activity_id: (if .passed then 1 else 2 end),
activity_name: (if .passed then "Logon" else "Logon Failed" end),
time: .acs_timestamp,
user: {name: .user_name},
src_endpoint: {mac: .calling_station_id},
dst_endpoint: {ip: .nas_ip_address},
severity_id: (if .passed then 1 else 4 end)
}' /tmp/critical.json > /tmp/ocsf.json
# Step 4: Count for metrics
jq '{
total: length,
failures: [.[] | select(.activity_id == 2)] | length,
success_rate: (([.[] | select(.activity_id == 1)] | length) / length * 100)
}' /tmp/ocsf.json
Quick Reference
| Operation | Pattern | Example |
|---|---|---|
Filter |
|
|
Project |
|
|
Transform |
|
|
Add field |
|
|
Group |
|
|
Count |
|
|
Sort |
|
|
Top N |
|
|
Flatten |
|
|
Null handling |
|
|
Gotchas
| Issue | Solution |
|---|---|
Empty output |
Check if array vs object: use |
null errors |
Use |
Parse errors |
Validate JSON first: |
Shell pollution |
Filter before jq: |
Large files |
Stream with |
Related
-
jq Favorites - Quick commands
-
kubectl Favorites - k8s patterns
-
API jq Transforms - API-specific patterns