jq Transforms

The jq transform is Monad’s most powerful tool - full JSON transformation using jq expressions. Master this for complex routing, OCSF normalization, and intelligent filtering.

jq Transform Syntax

{
  "operation": "jq",
  "arguments": {
    "query": "<jq expression>"
  }
}

The expression receives each log record as input (.) and must output modified JSON.

Basic Patterns

Add Field

. + {"destination": "sentinel"}

Add Multiple Fields

. + {
  "destination": "sentinel",
  "processed_at": now | todate,
  "pipeline": "ise-radius"
}

Conditional Field

if .severity == "critical" then
  . + {"route": "sentinel"}
else
  . + {"route": "s3"}
end

Remove Field

del(.password)

Rename Field

.source_ip = .src_ip | del(.src_ip)

Routing Patterns

Route by Severity

if (.severity == "critical" or .severity == "high") then
  . + {"route": "sentinel"}
else
  . + {"route": "s3_archive"}
end

Route by Event Type

if .event_type == "auth_failure" or
   .event_type == "intrusion_detected" or
   .event_type == "policy_violation" then
  . + {"route": "sentinel", "priority": "high"}
else
  . + {"route": "s3_archive", "priority": "low"}
end

Route by Multiple Conditions

# Complex routing: severity OR event type OR source
if (
  (.severity | IN("critical", "high", "emergency")) or
  (.event_type | IN("auth_failure", "intrusion", "malware")) or
  (.source | test("tacacs|admin|config"; "i"))
) then
  . + {"route": "sentinel"}
else
  . + {"route": "s3"}
end

MITRE ATT&CK Tagging

Single Technique

if .event_type == "auth_failure" then
  . + {"mitre_technique": "T1078", "mitre_tactic": "initial_access"}
else
  .
end

Technique Mapping

# Map event types to MITRE techniques
(
  {
    "auth_failure": {"technique": "T1078", "name": "Valid Accounts"},
    "brute_force": {"technique": "T1110", "name": "Brute Force"},
    "command_exec": {"technique": "T1059", "name": "Command Execution"},
    "lateral_movement": {"technique": "T1021", "name": "Remote Services"},
    "exfiltration": {"technique": "T1041", "name": "Exfil Over C2"},
    "privilege_escalation": {"technique": "T1068", "name": "Exploitation for Privilege Escalation"}
  }[.event_type] // null
) as $mitre |
if $mitre then
  . + {"mitre": $mitre}
else
  .
end

Full MITRE Enrichment

# Comprehensive MITRE tagging for ISE/FTD logs
def mitre_tag:
  if .event_type == "auth_failure" then
    {"id": "T1078", "tactic": "initial_access", "name": "Valid Accounts"}
  elif .event_type == "brute_force" then
    {"id": "T1110", "tactic": "credential_access", "name": "Brute Force"}
  elif .event_type == "posture_failure" then
    {"id": "T1190", "tactic": "initial_access", "name": "Exploit Public-Facing"}
  elif .action == "deny" and .threat_signature then
    {"id": "T1071", "tactic": "command_control", "name": "Application Layer Protocol"}
  elif .event_type == "config_change" then
    {"id": "T1059", "tactic": "execution", "name": "Command and Scripting"}
  else
    null
  end;

mitre_tag as $m |
if $m then . + {"mitre": $m} else . end

ISE-Specific Patterns

Categorize ISE RADIUS Events

# ISE RADIUS event categorization
if .ise_auth_result == "FAILED" then
  . + {
    "category": "auth_failure",
    "route": "sentinel",
    "mitre_technique": "T1078"
  }
elif .ise_auth_result == "PASSED" and .ise_posture_status == "NonCompliant" then
  . + {
    "category": "posture_failure",
    "route": "sentinel",
    "mitre_technique": "T1190"
  }
elif .ise_auth_result == "PASSED" then
  . + {
    "category": "auth_success",
    "route": "s3_archive"
  }
else
  . + {"route": "s3_archive"}
end

Parse ISE Syslog Message

# Extract fields from ISE syslog message
(.message | capture("User-Name=(?<username>[^,]+)")) as $user |
(.message | capture("Calling-Station-ID=(?<mac>[^,]+)")) as $mac |
(.message | capture("Framed-IP-Address=(?<ip>[^,]+)")) as $ip |
. + {
  "user": ($user.username // null),
  "endpoint_mac": ($mac.mac // null),
  "endpoint_ip": ($ip.ip // null)
}

FTD/Firewall Patterns

Categorize FTD Events

# FTD firewall event routing
if .action == "deny" then
  if .threat_signature then
    . + {"category": "threat", "route": "sentinel", "mitre_technique": "T1071"}
  else
    . + {"category": "block", "route": "sentinel"}
  end
elif .action == "intrusion_detected" then
  . + {"category": "ids_alert", "route": "sentinel", "mitre_technique": "T1190"}
elif .action == "malware_detected" then
  . + {"category": "malware", "route": "sentinel", "mitre_technique": "T1204"}
elif .action == "permit" or .action == "allow" then
  . + {"category": "flow", "route": "s3_archive"}
else
  . + {"route": "s3_archive"}
end

Extract Firewall Fields

# Normalize firewall log to OCSF-like schema
{
  "source": {
    "ip": .src_ip,
    "port": .src_port,
    "zone": .src_zone
  },
  "destination": {
    "ip": .dst_ip,
    "port": .dst_port,
    "zone": .dst_zone
  },
  "action": .action,
  "protocol": .protocol,
  "rule": .rule_name,
  "bytes": (.bytes_in + .bytes_out),
  "timestamp": .timestamp,
  "observer": {
    "hostname": .firewall_name,
    "type": "firewall"
  }
} + (if .threat_signature then {"threat": {"signature": .threat_signature}} else {} end)

Volume Reduction Patterns

Drop Info-Level Logs

if .severity | IN("info", "informational", "debug", "notice") then
  empty  # Drops the record
else
  .
end

Drop by Rate (Sample)

# Keep only 10% of flow logs (random sampling)
if .category == "flow" and (now | . % 10 != 0) then
  empty
else
  .
end

Drop Health Checks

if .event_type | IN("heartbeat", "keepalive", "health_check", "ping") then
  empty
else
  .
end

Aggregate Similar Events

# Add bucket for aggregation (by minute)
. + {
  "time_bucket": (.timestamp | fromdateiso8601 | . - (. % 60) | todate),
  "agg_key": "\(.source_ip)|\(.destination_ip)|\(.action)"
}

OCSF Normalization

Basic OCSF Schema

# Transform to OCSF Authentication event
{
  "class_uid": 3002,
  "class_name": "Authentication",
  "activity_id": (if .auth_result == "success" then 1 else 2 end),
  "activity_name": (if .auth_result == "success" then "Logon" else "Logon Failed" end),
  "severity_id": (if .auth_result == "success" then 1 else 4 end),
  "time": .timestamp,
  "actor": {
    "user": {
      "name": .username,
      "domain": .domain
    }
  },
  "src_endpoint": {
    "ip": .source_ip,
    "mac": .source_mac
  },
  "auth_protocol": .auth_method,
  "status": .auth_result,
  "status_detail": .failure_reason
}

Error Handling

Safe Field Access

# Handle missing fields with defaults
. + {
  "severity": (.severity // "unknown"),
  "source_ip": (.source_ip // .src_ip // "0.0.0.0"),
  "username": (.user.name // .username // "anonymous")
}

Try-Catch Pattern

# Wrap risky operations
(try (.timestamp | fromdateiso8601) catch null) as $ts |
. + {"parsed_time": $ts}

Performance Tips

Minimize Operations

# Bad - multiple passes
. | del(.temp) | del(.debug) | del(.raw)

# Good - single pass
del(.temp, .debug, .raw)

Short-Circuit Evaluation

# Evaluates left-to-right, stops on first true
if .severity == "critical" then
  . + {"route": "sentinel"}
elif .severity == "high" then
  . + {"route": "sentinel"}
# ... etc

Avoid Complex Regex

# Bad - complex regex on every record
select(.message | test("complex|pattern|here"; "i"))

# Better - use simple comparisons
.event_type == "auth_failure"

Testing jq Expressions

Local Testing

# Test expression locally before deploying
echo '{"severity": "critical", "event_type": "auth_failure"}' | jq '
  if .severity == "critical" then
    . + {"route": "sentinel"}
  else
    . + {"route": "s3"}
  end
'

Test with Sample Logs

# Use synthetic log generator
./examples/monad/testing/synthetic-logs.sh test 1 | jq '
  # Your transform here
'

Common Mistakes

Mistake 1: Forgetting Output

# Wrong - returns nothing
if .severity == "critical" then
  debug("critical!")
end

# Correct - always return the record
if .severity == "critical" then
  . + {"route": "sentinel"}
else
  .
end

Mistake 2: Breaking JSON

# Wrong - invalid JSON output
.severity

# Correct - full record
. + {"normalized_severity": .severity}

Mistake 3: Not Handling Null

# Wrong - fails if .user is null
.user.name

# Correct - safe access
.user.name // null

Key Takeaways

  1. jq is most powerful transform - Full JSON manipulation

  2. Always return valid JSON - Modified record or empty to drop

  3. Use conditionals for routing - if-then-else pattern

  4. Handle missing fields - Use // null or // "default"

  5. Test locally first - Before deploying to pipeline

  6. MITRE tagging enables threat hunting - Critical for Sentinel

Next Module

GJSON Paths - UI-based transforms using GJSON queries.