jq Transforms
The jq transform is Monad’s most powerful tool - full JSON transformation using jq expressions. Master this for complex routing, OCSF normalization, and intelligent filtering.
jq Transform Syntax
{
"operation": "jq",
"arguments": {
"query": "<jq expression>"
}
}
The expression receives each log record as input (.) and must output modified JSON.
Basic Patterns
Add Field
. + {"destination": "sentinel"}
Add Multiple Fields
. + {
"destination": "sentinel",
"processed_at": now | todate,
"pipeline": "ise-radius"
}
Conditional Field
if .severity == "critical" then
. + {"route": "sentinel"}
else
. + {"route": "s3"}
end
Remove Field
del(.password)
Rename Field
.source_ip = .src_ip | del(.src_ip)
Routing Patterns
Route by Severity
if (.severity == "critical" or .severity == "high") then
. + {"route": "sentinel"}
else
. + {"route": "s3_archive"}
end
Route by Event Type
if .event_type == "auth_failure" or
.event_type == "intrusion_detected" or
.event_type == "policy_violation" then
. + {"route": "sentinel", "priority": "high"}
else
. + {"route": "s3_archive", "priority": "low"}
end
Route by Multiple Conditions
# Complex routing: severity OR event type OR source
if (
(.severity | IN("critical", "high", "emergency")) or
(.event_type | IN("auth_failure", "intrusion", "malware")) or
(.source | test("tacacs|admin|config"; "i"))
) then
. + {"route": "sentinel"}
else
. + {"route": "s3"}
end
MITRE ATT&CK Tagging
Single Technique
if .event_type == "auth_failure" then
. + {"mitre_technique": "T1078", "mitre_tactic": "initial_access"}
else
.
end
Technique Mapping
# Map event types to MITRE techniques
(
{
"auth_failure": {"technique": "T1078", "name": "Valid Accounts"},
"brute_force": {"technique": "T1110", "name": "Brute Force"},
"command_exec": {"technique": "T1059", "name": "Command Execution"},
"lateral_movement": {"technique": "T1021", "name": "Remote Services"},
"exfiltration": {"technique": "T1041", "name": "Exfil Over C2"},
"privilege_escalation": {"technique": "T1068", "name": "Exploitation for Privilege Escalation"}
}[.event_type] // null
) as $mitre |
if $mitre then
. + {"mitre": $mitre}
else
.
end
Full MITRE Enrichment
# Comprehensive MITRE tagging for ISE/FTD logs
def mitre_tag:
if .event_type == "auth_failure" then
{"id": "T1078", "tactic": "initial_access", "name": "Valid Accounts"}
elif .event_type == "brute_force" then
{"id": "T1110", "tactic": "credential_access", "name": "Brute Force"}
elif .event_type == "posture_failure" then
{"id": "T1190", "tactic": "initial_access", "name": "Exploit Public-Facing"}
elif .action == "deny" and .threat_signature then
{"id": "T1071", "tactic": "command_control", "name": "Application Layer Protocol"}
elif .event_type == "config_change" then
{"id": "T1059", "tactic": "execution", "name": "Command and Scripting"}
else
null
end;
mitre_tag as $m |
if $m then . + {"mitre": $m} else . end
ISE-Specific Patterns
Categorize ISE RADIUS Events
# ISE RADIUS event categorization
if .ise_auth_result == "FAILED" then
. + {
"category": "auth_failure",
"route": "sentinel",
"mitre_technique": "T1078"
}
elif .ise_auth_result == "PASSED" and .ise_posture_status == "NonCompliant" then
. + {
"category": "posture_failure",
"route": "sentinel",
"mitre_technique": "T1190"
}
elif .ise_auth_result == "PASSED" then
. + {
"category": "auth_success",
"route": "s3_archive"
}
else
. + {"route": "s3_archive"}
end
Parse ISE Syslog Message
# Extract fields from ISE syslog message
(.message | capture("User-Name=(?<username>[^,]+)")) as $user |
(.message | capture("Calling-Station-ID=(?<mac>[^,]+)")) as $mac |
(.message | capture("Framed-IP-Address=(?<ip>[^,]+)")) as $ip |
. + {
"user": ($user.username // null),
"endpoint_mac": ($mac.mac // null),
"endpoint_ip": ($ip.ip // null)
}
FTD/Firewall Patterns
Categorize FTD Events
# FTD firewall event routing
if .action == "deny" then
if .threat_signature then
. + {"category": "threat", "route": "sentinel", "mitre_technique": "T1071"}
else
. + {"category": "block", "route": "sentinel"}
end
elif .action == "intrusion_detected" then
. + {"category": "ids_alert", "route": "sentinel", "mitre_technique": "T1190"}
elif .action == "malware_detected" then
. + {"category": "malware", "route": "sentinel", "mitre_technique": "T1204"}
elif .action == "permit" or .action == "allow" then
. + {"category": "flow", "route": "s3_archive"}
else
. + {"route": "s3_archive"}
end
Extract Firewall Fields
# Normalize firewall log to OCSF-like schema
{
"source": {
"ip": .src_ip,
"port": .src_port,
"zone": .src_zone
},
"destination": {
"ip": .dst_ip,
"port": .dst_port,
"zone": .dst_zone
},
"action": .action,
"protocol": .protocol,
"rule": .rule_name,
"bytes": (.bytes_in + .bytes_out),
"timestamp": .timestamp,
"observer": {
"hostname": .firewall_name,
"type": "firewall"
}
} + (if .threat_signature then {"threat": {"signature": .threat_signature}} else {} end)
Volume Reduction Patterns
Drop Info-Level Logs
if .severity | IN("info", "informational", "debug", "notice") then
empty # Drops the record
else
.
end
Drop by Rate (Sample)
# Keep only 10% of flow logs (random sampling)
if .category == "flow" and (now | . % 10 != 0) then
empty
else
.
end
Drop Health Checks
if .event_type | IN("heartbeat", "keepalive", "health_check", "ping") then
empty
else
.
end
Aggregate Similar Events
# Add bucket for aggregation (by minute)
. + {
"time_bucket": (.timestamp | fromdateiso8601 | . - (. % 60) | todate),
"agg_key": "\(.source_ip)|\(.destination_ip)|\(.action)"
}
OCSF Normalization
Basic OCSF Schema
# Transform to OCSF Authentication event
{
"class_uid": 3002,
"class_name": "Authentication",
"activity_id": (if .auth_result == "success" then 1 else 2 end),
"activity_name": (if .auth_result == "success" then "Logon" else "Logon Failed" end),
"severity_id": (if .auth_result == "success" then 1 else 4 end),
"time": .timestamp,
"actor": {
"user": {
"name": .username,
"domain": .domain
}
},
"src_endpoint": {
"ip": .source_ip,
"mac": .source_mac
},
"auth_protocol": .auth_method,
"status": .auth_result,
"status_detail": .failure_reason
}
Error Handling
Safe Field Access
# Handle missing fields with defaults
. + {
"severity": (.severity // "unknown"),
"source_ip": (.source_ip // .src_ip // "0.0.0.0"),
"username": (.user.name // .username // "anonymous")
}
Try-Catch Pattern
# Wrap risky operations
(try (.timestamp | fromdateiso8601) catch null) as $ts |
. + {"parsed_time": $ts}
Performance Tips
Minimize Operations
# Bad - multiple passes
. | del(.temp) | del(.debug) | del(.raw)
# Good - single pass
del(.temp, .debug, .raw)
Short-Circuit Evaluation
# Evaluates left-to-right, stops on first true
if .severity == "critical" then
. + {"route": "sentinel"}
elif .severity == "high" then
. + {"route": "sentinel"}
# ... etc
Avoid Complex Regex
# Bad - complex regex on every record
select(.message | test("complex|pattern|here"; "i"))
# Better - use simple comparisons
.event_type == "auth_failure"
Testing jq Expressions
Local Testing
# Test expression locally before deploying
echo '{"severity": "critical", "event_type": "auth_failure"}' | jq '
if .severity == "critical" then
. + {"route": "sentinel"}
else
. + {"route": "s3"}
end
'
Test with Sample Logs
# Use synthetic log generator
./examples/monad/testing/synthetic-logs.sh test 1 | jq '
# Your transform here
'
Common Mistakes
Mistake 1: Forgetting Output
# Wrong - returns nothing
if .severity == "critical" then
debug("critical!")
end
# Correct - always return the record
if .severity == "critical" then
. + {"route": "sentinel"}
else
.
end
Mistake 2: Breaking JSON
# Wrong - invalid JSON output
.severity
# Correct - full record
. + {"normalized_severity": .severity}
Mistake 3: Not Handling Null
# Wrong - fails if .user is null
.user.name
# Correct - safe access
.user.name // null
Key Takeaways
-
jq is most powerful transform - Full JSON manipulation
-
Always return valid JSON - Modified record or
emptyto drop -
Use conditionals for routing -
if-then-elsepattern -
Handle missing fields - Use
// nullor// "default" -
Test locally first - Before deploying to pipeline
-
MITRE tagging enables threat hunting - Critical for Sentinel
Next Module
GJSON Paths - UI-based transforms using GJSON queries.