jq Data Engineering Reference

Overview

jq is the core tool for security data engineering - filtering, transforming, and enriching JSON before SIEM ingestion. These patterns apply to:

  • Monad transforms (QRadar → Sentinel migration)

  • kubectl JSON processing (k8s observability)

  • netapi ISE/Wazuh (security telemetry)

  • API responses (any JSON source)

Core Patterns

Filter (select)

Keep only what matters - reduce SIEM ingestion costs.

# Filter failures only (critical → Sentinel)
jq '.[] | select(.passed == false)'

# Filter by severity (high/critical → Sentinel, rest → local)
jq '.[] | select(.severity >= 7)'

# Filter non-Running pods
jq '.items[] | select(.status.phase != "Running")'

# Multiple conditions (AND)
jq '.[] | select(.severity >= 5 and .category == "auth")'

# Multiple conditions (OR)
jq '.[] | select(.type == "login_failure" or .type == "privilege_escalation")'

Transform (reshape)

Normalize data to common schema (OCSF, CEF, etc.)

# Reshape to SIEM-friendly format
jq '.[] | {
  timestamp: .acs_timestamp,
  event_type: "radius_auth",
  outcome: (if .passed then "success" else "failure" end),
  user: .user_name,
  source_ip: .framed_ip_address
}'

# Rename fields (source → target schema)
jq '.[] | {
  "@timestamp": .timestamp,
  "event.category": "authentication",
  "event.outcome": .result,
  "user.name": .username
}'

# Add static enrichment
jq '.[] | . + {pipeline: "monad", environment: "production"}'

Aggregate (group_by, count)

Summarize for dashboards and alerts.

# Count by field
jq '[.[].namespace] | group_by(.) | map({key: .[0], count: length})'

# Count by multiple fields
jq 'group_by(.user, .action) | map({user: .[0].user, action: .[0].action, count: length})'

# Top N (most frequent)
jq '[.[].source_ip] | group_by(.) | map({ip: .[0], count: length}) | sort_by(-.count) | .[0:10]'

Enrich (add context)

Add context before SIEM ingestion.

# Add severity label
jq '.[] | . + {
  severity_label: (
    if .severity >= 9 then "critical"
    elif .severity >= 7 then "high"
    elif .severity >= 4 then "medium"
    else "low"
    end
  )
}'

# Add geo context (from lookup)
jq --slurpfile geo /tmp/ip-geo.json '
  .[] | . + {country: ($geo[0][.source_ip] // "unknown")}
'

kubectl Patterns

Pod Analysis

# Get pods as JSON
kubectl get pods -A -o json > /tmp/pods.json

# Find non-Running pods
jq '.items[] | select(.status.phase != "Running" and .status.phase != "Succeeded") | {
  namespace: .metadata.namespace,
  name: .metadata.name,
  phase: .status.phase
}' /tmp/pods.json

# Count by namespace
jq '[.items[].metadata.namespace] | group_by(.) | map({namespace: .[0], count: length})' /tmp/pods.json

# Find crashlooping
jq '.items[] | select(.status.containerStatuses[]?.restartCount > 5) | {
  name: .metadata.name,
  restarts: .status.containerStatuses[0].restartCount
}' /tmp/pods.json

# Deep status inspection (container level)
jq '.items[] | select(.metadata.name == "POD_NAME") | {
  phase: .status.phase,
  conditions: .status.conditions,
  containerStatuses: .status.containerStatuses
}' /tmp/pods.json

Service Analysis

# Get services as JSON
kubectl get svc -A -o json > /tmp/svc.json

# Find LoadBalancer services with external IPs
jq '.items[] | select(.spec.type == "LoadBalancer") | {
  namespace: .metadata.namespace,
  name: .metadata.name,
  externalIP: .status.loadBalancer.ingress[0].ip,
  ports: [.spec.ports[] | "\(.port)/\(.protocol)"]
}' /tmp/svc.json

# Find services without endpoints
jq '.items[] | select(.spec.clusterIP != "None") | {
  name: .metadata.name,
  selector: .spec.selector
}' /tmp/svc.json

netapi ISE Patterns

Authentication Analysis

# Get sessions as JSON
netapi ise --format json mnt sessions > /tmp/sessions.json

# Filter failures
jq '.[] | select(.passed == false) | {
  time: .acs_timestamp,
  user: .user_name,
  mac: .calling_station_id,
  reason: .failure_reason
}' /tmp/sessions.json

# Transform for SIEM
jq '.[] | {
  "@timestamp": .acs_timestamp,
  "event.category": "authentication",
  "event.outcome": (if .passed then "success" else "failure" end),
  "user.name": .user_name,
  "source.mac": .calling_station_id,
  "network.vlan.id": .selected_azn_profiles
}' /tmp/sessions.json

# Top failed users
jq '[.[] | select(.passed == false) | .user_name] | group_by(.) | map({user: .[0], failures: length}) | sort_by(-.failures) | .[0:10]' /tmp/sessions.json

Policy Analysis

# Get policy sets
netapi ise --format json get-policy-sets > /tmp/policies.json

# Active policies with hit counts
jq '.response[] | select(.state == "enabled") | {
  name: .name,
  hitCounts: .hitCounts,
  rank: .rank
}' /tmp/policies.json

Monad-Style Pipeline

Simulate Monad transform pipeline locally:

# Step 1: Fetch raw data
netapi ise --format json mnt sessions > /tmp/raw.json

# Step 2: Filter (critical only → Sentinel)
jq '[.[] | select(.passed == false or .selected_azn_profiles | contains("Admin"))]' /tmp/raw.json > /tmp/critical.json

# Step 3: Transform to OCSF/CEF
jq '.[] | {
  class_uid: 3002,
  class_name: "Authentication",
  activity_id: (if .passed then 1 else 2 end),
  activity_name: (if .passed then "Logon" else "Logon Failed" end),
  time: .acs_timestamp,
  user: {name: .user_name},
  src_endpoint: {mac: .calling_station_id},
  dst_endpoint: {ip: .nas_ip_address},
  severity_id: (if .passed then 1 else 4 end)
}' /tmp/critical.json > /tmp/ocsf.json

# Step 4: Count for metrics
jq '{
  total: length,
  failures: [.[] | select(.activity_id == 2)] | length,
  success_rate: (([.[] | select(.activity_id == 1)] | length) / length * 100)
}' /tmp/ocsf.json

Quick Reference

Operation Pattern Example

Filter

select(.field == value)

select(.passed == false)

Project

{new: .old}

{user: .user_name, ip: .ip}

Transform

if-then-else

if .passed then "ok" else "fail" end

Add field

. + {field: value}

. + {pipeline: "monad"}

Group

group_by(.field)

group_by(.user_name)

Count

length

[.[] | select(…​)] | length

Sort

sort_by(.field)

sort_by(-.count) (descending)

Top N

sort_by(-.) | .[0:N]

sort_by(-.count) | .[0:10]

Flatten

[.items[].field]

[.items[].metadata.name]

Null handling

// default

.field // "unknown"

Gotchas

Issue Solution

Empty output

Check if array vs object: use .[] for arrays, .field for objects

null errors

Use // "default" or select(. != null)

Parse errors

Validate JSON first: jq empty file.json

Shell pollution

Filter before jq: cmd | grep -v "noise" | jq …​

Large files

Stream with --stream or use head first to test