SDK Integration & Custom Transforms
Overview
Monad lacks native connectors for ISE, FTD, and traditional network devices. This is an opportunity: build custom transforms using the SDK, own the integration, and create reusable assets for the team.
Value Proposition
| Challenge | Opportunity |
|---|---|
No native ISE connector |
Build ISE transform library - become the expert |
No native FTD connector |
Build FTD transform library - own the integration |
Team uses UI clicks |
You use pipeline-as-code - DevOps credibility |
One-off configs |
Reusable modules - team multiplier |
Architecture
YOUR CONTRIBUTION
│
┌───────────────────────────┴────────────────────────────┐
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ ISE Logs │ │ FTD Logs │ │ Network │ │
│ └──────┬──────┘ └──────┬──────┘ └─────┬─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Syslog Collector (rsyslog) │ │
│ └─────────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ S3 Bucket (staging) │ │
│ └─────────────────────┬───────────────────────────┘ │
│ │ │
│ ══════════════════════╪═══════════════════════════ │
│ MONAD BOUNDARY │ │
│ ══════════════════════╪═══════════════════════════ │
│ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Monad S3 Input │ │
│ └─────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ ISE Parse │ │ FTD Parse │ │ Net Parse │ │
│ │ Transform │ │ Transform │ │ Transform │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Filter Transform (Critical?) │ │
│ └──────────┬─────────────────────────┬────────────┘ │
│ │ │ │
│ CRITICAL BULK │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Sentinel Output │ │ S3 Archive │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
Repository Structure
Scalable layout for pipeline-as-code:
monad-pipelines/
├── README.md
├── pyproject.toml # Python package config
│
├── monad_chla/ # Your reusable library
│ ├── __init__.py
│ ├── client.py # API client wrapper
│ ├── transforms/
│ │ ├── __init__.py
│ │ ├── ise.py # ISE parsing & filtering
│ │ ├── ftd.py # FTD parsing & filtering
│ │ ├── network.py # Generic network devices
│ │ └── common.py # Shared utilities
│ ├── inputs/
│ │ ├── __init__.py
│ │ └── s3.py # S3 input configs
│ └── outputs/
│ ├── __init__.py
│ ├── sentinel.py # Sentinel output config
│ └── s3_archive.py # S3 archive config
│
├── pipelines/ # Pipeline definitions (JSON/YAML)
│ ├── ise-radius/
│ │ ├── config.yaml
│ │ └── transforms.yaml
│ ├── ise-tacacs/
│ │ ├── config.yaml
│ │ └── transforms.yaml
│ ├── ftd-firewall/
│ │ ├── config.yaml
│ │ └── transforms.yaml
│ └── network-devices/
│ ├── config.yaml
│ └── transforms.yaml
│
├── scripts/
│ ├── deploy.py # CI/CD deployment script
│ ├── validate.py # Config validation
│ └── rollback.py # Rollback script
│
├── tests/
│ ├── test_ise_transforms.py
│ ├── test_ftd_transforms.py
│ └── fixtures/ # Sample log data
│ ├── ise_samples.log
│ └── ftd_samples.log
│
└── .github/
└── workflows/
└── deploy.yaml # GitHub Actions CI/CD
ISE Transform Module
Message ID Reference
| ID | Description | Severity | Route |
|---|---|---|---|
5200 |
Authentication succeeded |
Info |
S3 Archive |
5400 |
Authentication failed |
Critical |
Sentinel |
5405 |
RADIUS request dropped |
Critical |
Sentinel |
5440 |
Endpoint abandoned EAP |
Warning |
Sentinel |
5449 |
Endpoint failed EAP |
Critical |
Sentinel |
12508 |
EAP-TLS handshake failed |
Critical |
Sentinel |
24015 |
EAP-TLS began |
Debug |
S3 Archive |
86009 |
Guest login |
Info |
S3 Archive |
86010 |
Guest login failed |
Warning |
Sentinel |
ISE Transform Code
# monad_chla/transforms/ise.py
from dataclasses import dataclass
from typing import List, Dict, Any
# Message ID classifications
ISE_CRITICAL_IDS = [
5400, # Auth failed
5405, # RADIUS dropped
5440, # Endpoint abandoned
5449, # Endpoint EAP failed
12508, # EAP-TLS failed
86010, # Guest failed
]
ISE_BULK_IDS = [
5200, # Auth passed
24015, # EAP started
24016, # EAP continued
86009, # Guest login
]
@dataclass
class ISEParserConfig:
"""Configuration for ISE syslog parser transform."""
name: str = "ise-syslog-parser"
# Regex patterns for ISE syslog fields
patterns: Dict[str, str] = None
def __post_init__(self):
self.patterns = {
"message_id": r"(\d{4,5})\s+NOTICE",
"username": r"UserName=([^,\s]+)",
"mac_address": r"Calling-Station-ID=([0-9A-Fa-f:-]+)",
"ip_address": r"Framed-IP-Address=([0-9.]+)",
"nas_ip": r"NAS-IP-Address=([0-9.]+)",
"nas_port": r"NAS-Port=(\d+)",
"failure_reason": r"FailureReason=([^,]+)",
"auth_policy": r"AuthenticationPolicy=([^,]+)",
"authz_policy": r"AuthorizationPolicy=([^,]+)",
"selected_profile": r"SelectedAuthorizationProfiles=([^,]+)",
}
def to_monad_config(self) -> Dict[str, Any]:
"""Convert to Monad API format."""
return {
"name": self.name,
"type": "parse",
"config": {
"rules": [
{"field": k, "pattern": v}
for k, v in self.patterns.items()
]
}
}
@dataclass
class ISEFilterConfig:
"""Configuration for ISE critical/bulk filter transform."""
name: str = "ise-critical-filter"
critical_ids: List[int] = None
def __post_init__(self):
if self.critical_ids is None:
self.critical_ids = ISE_CRITICAL_IDS
def to_monad_config(self) -> Dict[str, Any]:
"""Convert to Monad API format."""
return {
"name": self.name,
"type": "filter",
"config": {
"condition": f"message_id in {self.critical_ids}"
}
}
class ISETransformBuilder:
"""Builder for ISE transform pipeline."""
def __init__(self, api_client, org_id: str):
self.api = api_client
self.org_id = org_id
def create_parser(self, config: ISEParserConfig = None) -> str:
"""Create ISE parser transform, return transform ID."""
config = config or ISEParserConfig()
response = self.api.V1OrganizationIdTransformsPost(
self.org_id,
config.to_monad_config()
)
return response.id
def create_critical_filter(self, config: ISEFilterConfig = None) -> str:
"""Create ISE critical filter transform, return transform ID."""
config = config or ISEFilterConfig()
response = self.api.V1OrganizationIdTransformsPost(
self.org_id,
config.to_monad_config()
)
return response.id
def create_bulk_filter(self) -> str:
"""Create ISE bulk filter (inverse of critical)."""
config = {
"name": "ise-bulk-filter",
"type": "filter",
"config": {
"condition": f"message_id in {ISE_BULK_IDS}"
}
}
response = self.api.V1OrganizationIdTransformsPost(
self.org_id,
config
)
return response.id
FTD Transform Module
Syslog ID Reference
| ID | Description | Severity | Route |
|---|---|---|---|
106001 |
Inbound connection permitted |
Info |
S3 Archive |
106006 |
Denied inbound |
Warning |
Sentinel |
106007 |
Denied inbound (no xlate) |
Warning |
Sentinel |
106014 |
Denied inbound (ACL) |
Warning |
Sentinel |
106015 |
Denied inbound (no route) |
Warning |
Sentinel |
106023 |
Denied by ACL |
Critical |
Sentinel |
106100 |
ACL permitted/denied |
Varies |
Parse action field |
430003 |
IPS/Snort drop |
Critical |
Sentinel |
733100 |
Threat detected |
Critical |
Sentinel |
FTD Transform Code
# monad_chla/transforms/ftd.py
from dataclasses import dataclass
from typing import List, Dict, Any
FTD_CRITICAL_IDS = [
"106006", "106007", "106014", "106015", "106023",
"430003", "733100"
]
FTD_BULK_IDS = [
"106001", # Permitted
"302013", # Built connection
"302014", # Teardown connection
]
@dataclass
class FTDParserConfig:
"""Configuration for FTD syslog parser transform."""
name: str = "ftd-syslog-parser"
patterns: Dict[str, str] = None
def __post_init__(self):
self.patterns = {
"syslog_id": r"%FTD-\d-(\d{6})",
"action": r"(permitted|denied|Deny|Drop)",
"src_ip": r"(\d+\.\d+\.\d+\.\d+)/\d+",
"dst_ip": r"->.*?(\d+\.\d+\.\d+\.\d+)/\d+",
"src_port": r"(\d+\.\d+\.\d+\.\d+)/(\d+)",
"dst_port": r"->.*?(\d+\.\d+\.\d+\.\d+)/(\d+)",
"protocol": r"(TCP|UDP|ICMP|GRE)",
"acl_name": r"access-group\s+(\S+)",
"interface": r"interface\s+(\S+)",
}
def to_monad_config(self) -> Dict[str, Any]:
return {
"name": self.name,
"type": "parse",
"config": {
"rules": [
{"field": k, "pattern": v}
for k, v in self.patterns.items()
]
}
}
class FTDTransformBuilder:
"""Builder for FTD transform pipeline."""
def __init__(self, api_client, org_id: str):
self.api = api_client
self.org_id = org_id
def create_parser(self, config: FTDParserConfig = None) -> str:
config = config or FTDParserConfig()
response = self.api.V1OrganizationIdTransformsPost(
self.org_id,
config.to_monad_config()
)
return response.id
def create_critical_filter(self) -> str:
config = {
"name": "ftd-critical-filter",
"type": "filter",
"config": {
"condition": f"syslog_id in {FTD_CRITICAL_IDS} or action in ['denied', 'Deny', 'Drop']"
}
}
response = self.api.V1OrganizationIdTransformsPost(
self.org_id,
config
)
return response.id
jq Transforms
Monad uses jq for JSON transformations. Master these patterns to manipulate log data at the pipeline level.
ISE jq Patterns
Filter by Message ID (Critical Only)
# Keep only critical ISE events
select(.message_id | IN(5400, 5405, 5440, 5449, 12508, 86010))
Extract Authentication Fields
# Parse ISE syslog into structured JSON
{
timestamp: .timestamp,
message_id: (.raw | capture("(?<id>\\d{4,5})\\s+NOTICE").id | tonumber),
username: (.raw | capture("UserName=(?<u>[^,\\s]+)").u // null),
mac: (.raw | capture("Calling-Station-ID=(?<m>[0-9A-Fa-f:-]+)").m // null),
ip: (.raw | capture("Framed-IP-Address=(?<ip>[0-9.]+)").ip // null),
nas_ip: (.raw | capture("NAS-IP-Address=(?<nas>[0-9.]+)").nas // null),
failure_reason: (.raw | capture("FailureReason=(?<r>[^,]+)").r // null),
policy: (.raw | capture("AuthorizationPolicy=(?<p>[^,]+)").p // null)
}
Route by Severity
# Add routing tag based on message ID
. + {
route: (
if .message_id | IN(5400, 5405, 5440, 5449, 12508) then "sentinel"
elif .message_id | IN(5200, 24015, 86009) then "archive"
else "unknown"
end
)
}
FTD jq Patterns
Filter Denied Traffic
# Keep only denied/dropped traffic
select(.action | IN("denied", "Deny", "Drop"))
Extract Firewall 5-Tuple
# Parse FTD syslog into structured JSON
{
timestamp: .timestamp,
syslog_id: (.raw | capture("%FTD-\\d-(?<id>\\d{6})").id),
action: (.raw | capture("(?<a>permitted|denied|Deny|Drop)").a // "unknown"),
protocol: (.raw | capture("(?<p>TCP|UDP|ICMP|GRE)").p // null),
src_ip: (.raw | capture("(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/\\d+").ip // null),
dst_ip: (.raw | capture("->.*?(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/\\d+").ip // null),
src_port: (.raw | capture("(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/(?<port>\\d+)").port // null),
dst_port: (.raw | capture("->.*?(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/(?<port>\\d+)").port // null)
}
IPS Event Classification
# Tag IPS events with severity
. + {
severity: (
if .syslog_id | IN("430003", "733100") then "critical"
elif .syslog_id | IN("106023", "106006", "106007") then "high"
elif .syslog_id | IN("106014", "106015") then "medium"
else "low"
end
)
}
JSON Flattening
Flatten Nested Objects
# Flatten nested ISE attributes for Sentinel
{
timestamp,
message_id,
username,
"endpoint.mac": .endpoint.mac,
"endpoint.ip": .endpoint.ip,
"endpoint.os": .endpoint.os,
"policy.authn": .policy.authentication,
"policy.authz": .policy.authorization,
"policy.profile": .policy.selected_profile
}
Array to Delimited String
# Convert array fields to pipe-delimited strings
. + {
applied_acls: (.acls | join("|")),
user_groups: (.groups | join("|"))
}
OCSF Normalization
ISE to OCSF Authentication (Class 3002)
# Map ISE to OCSF Authentication schema
{
class_uid: 3002,
class_name: "Authentication",
category_uid: 3,
category_name: "Identity & Access Management",
activity_id: (if .message_id == 5200 then 1 else 2 end),
activity_name: (if .message_id == 5200 then "Logon" else "Logon Failed" end),
severity_id: (if .message_id | IN(5400, 5405, 5449) then 4 else 1 end),
status_id: (if .message_id == 5200 then 1 else 2 end),
time: (.timestamp | fromdateiso8601),
user: {
name: .username,
uid: .username
},
src_endpoint: {
ip: .ip,
mac: .mac
},
dst_endpoint: {
ip: .nas_ip,
name: .nas_hostname
},
auth_protocol: "RADIUS",
logon_type_id: 9,
message: .raw,
metadata: {
product: { name: "Cisco ISE", vendor_name: "Cisco" },
version: "1.0.0"
}
}
FTD to OCSF Network Activity (Class 4001)
# Map FTD to OCSF Network Activity schema
{
class_uid: 4001,
class_name: "Network Activity",
category_uid: 4,
category_name: "Network Activity",
activity_id: (if .action | IN("permitted", "allow") then 1 else 2 end),
activity_name: (if .action | IN("permitted", "allow") then "Traffic" else "Traffic Denied" end),
severity_id: (
if .syslog_id | IN("430003", "733100") then 5
elif .action | IN("denied", "Deny", "Drop") then 3
else 1
end
),
status_id: (if .action | IN("permitted", "allow") then 1 else 2 end),
time: (.timestamp | fromdateiso8601),
src_endpoint: {
ip: .src_ip,
port: (.src_port | tonumber)
},
dst_endpoint: {
ip: .dst_ip,
port: (.dst_port | tonumber)
},
connection_info: {
protocol_name: .protocol
},
disposition_id: (if .action | IN("permitted", "allow") then 1 else 2 end),
message: .raw,
metadata: {
product: { name: "Cisco FTD", vendor_name: "Cisco" },
version: "1.0.0"
}
}
jq Quick Reference
| Pattern | Purpose |
|---|---|
|
Filter by value list |
|
Extract with named groups |
|
Null coalescing |
|
Conditional |
|
Add field |
|
Remove field |
|
Reshape object |
|
Array to string |
|
Type conversion |
|
Parse ISO timestamp |
Skills Development
Go/Python Learning Opportunity
Yes, this project provides real-world language learning.
| Language | Monad Use Case | Learning Path |
|---|---|---|
Python |
SDK automation, pipeline-as-code, testing |
Start here - SDK examples are Python-first |
Go |
High-performance transforms, SDK native |
After Python basics - Go SDK is fully featured |
jq |
JSON transformations in pipelines |
Essential - used in every transform |
Learning Progression
Week 1-2: jq fundamentals
├── Filter, select, capture
├── Reshape objects
└── Type conversions
Week 3-4: Python SDK
├── API client wrapper
├── Transform builders
└── Pipeline deployment scripts
Week 5-6: Go SDK (optional)
├── Convert Python transforms to Go
├── Performance-critical pipelines
└── Native Monad integration
Why This Matters for Your Career
-
jq → Universal log/JSON skill (Splunk, ELK, any SIEM)
-
Python SDK → Automation credibility (DevSecOps)
-
Go SDK → Cloud-native skillset (Kubernetes, observability tools)
-
OCSF → Emerging standard (Microsoft, Splunk, AWS Security Lake)
Real Deliverables
What you’ll build that goes on your resume:
-
Custom ISE transform library (Python)
-
Custom FTD transform library (Python)
-
Pipeline-as-code repository
-
CI/CD deployment automation
-
OCSF normalization transforms
-
Unit test suite with fixture data
None of your team will have this. They’ll click buttons in the UI. You’ll have code.
Pipeline Deployment
Configuration File
# pipelines/ise-radius/config.yaml
pipeline:
name: ise-radius-to-sentinel
description: ISE RADIUS logs - critical to Sentinel, bulk to S3
input:
type: s3
config:
bucket: chla-syslog-staging
prefix: ise/radius/
region: us-west-2
transforms:
- name: ise-parser
type: custom
module: monad_chla.transforms.ise
class: ISEParserConfig
- name: ise-critical-filter
type: custom
module: monad_chla.transforms.ise
class: ISEFilterConfig
params:
critical_ids: [5400, 5405, 5440, 5449, 12508]
outputs:
critical:
type: sentinel
condition: "passes critical filter"
config:
workspace_id: ${SENTINEL_WORKSPACE_ID}
shared_key: ${SENTINEL_SHARED_KEY}
log_type: ISESecurityLogs
archive:
type: s3
condition: "all logs"
config:
bucket: chla-log-archive
prefix: ise/radius/
region: us-west-2
Deployment Script
#!/usr/bin/env python3
# scripts/deploy.py
import os
import yaml
import argparse
from monad_chla.client import MonadClient
from monad_chla.transforms.ise import ISETransformBuilder
from monad_chla.transforms.ftd import FTDTransformBuilder
def load_config(pipeline_path: str) -> dict:
"""Load pipeline configuration from YAML."""
with open(f"pipelines/{pipeline_path}/config.yaml") as f:
return yaml.safe_load(f)
def deploy_pipeline(client: MonadClient, config: dict) -> str:
"""Deploy a complete pipeline, return pipeline ID."""
# Create input
input_id = client.create_input(config["input"])
print(f"Created input: {input_id}")
# Create transforms
transform_ids = []
for t in config["transforms"]:
if t["type"] == "custom":
# Use our custom builders
builder = get_transform_builder(client, t["module"])
transform_id = builder.create_from_config(t)
else:
transform_id = client.create_transform(t)
transform_ids.append(transform_id)
print(f"Created transform: {transform_id}")
# Create outputs
output_ids = []
for name, output_config in config["outputs"].items():
output_id = client.create_output(output_config)
output_ids.append(output_id)
print(f"Created output ({name}): {output_id}")
# Wire together as pipeline
pipeline_id = client.create_pipeline(
name=config["pipeline"]["name"],
description=config["pipeline"]["description"],
nodes=[input_id] + transform_ids + output_ids,
edges=build_edges(input_id, transform_ids, output_ids, config)
)
print(f"Created pipeline: {pipeline_id}")
return pipeline_id
def main():
parser = argparse.ArgumentParser(description="Deploy Monad pipeline")
parser.add_argument("pipeline", help="Pipeline to deploy (e.g., ise-radius)")
parser.add_argument("--dry-run", action="store_true", help="Validate only")
args = parser.parse_args()
client = MonadClient(
api_key=os.environ["MONAD_API_KEY"],
org_id=os.environ["MONAD_ORG_ID"]
)
config = load_config(args.pipeline)
if args.dry_run:
print(f"Would deploy: {config['pipeline']['name']}")
return
pipeline_id = deploy_pipeline(client, config)
print(f"Successfully deployed: {pipeline_id}")
if __name__ == "__main__":
main()
CI/CD Integration
GitHub Actions Workflow
# .github/workflows/deploy.yaml
name: Deploy Monad Pipelines
on:
push:
branches: [main]
paths:
- 'pipelines/**'
- 'monad_chla/**'
workflow_dispatch:
inputs:
pipeline:
description: 'Pipeline to deploy'
required: true
type: choice
options:
- ise-radius
- ise-tacacs
- ftd-firewall
- network-devices
- all
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -e .
- run: python scripts/validate.py
deploy:
needs: validate
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -e .
- name: Deploy pipeline
env:
MONAD_API_KEY: ${{ secrets.MONAD_API_KEY }}
MONAD_ORG_ID: ${{ secrets.MONAD_ORG_ID }}
SENTINEL_WORKSPACE_ID: ${{ secrets.SENTINEL_WORKSPACE_ID }}
SENTINEL_SHARED_KEY: ${{ secrets.SENTINEL_SHARED_KEY }}
run: |
if [ "${{ github.event.inputs.pipeline }}" = "all" ]; then
for p in ise-radius ise-tacacs ftd-firewall network-devices; do
python scripts/deploy.py $p
done
else
python scripts/deploy.py ${{ github.event.inputs.pipeline }}
fi
Testing
Unit Tests
# tests/test_ise_transforms.py
import pytest
from monad_chla.transforms.ise import ISEParserConfig, ISEFilterConfig
class TestISEParser:
def test_parser_extracts_message_id(self):
config = ISEParserConfig()
# Verify regex pattern matches ISE log format
import re
sample = "5400 NOTICE Failed-Attempt: UserName=jsmith"
match = re.search(config.patterns["message_id"], sample)
assert match.group(1) == "5400"
def test_parser_extracts_username(self):
config = ISEParserConfig()
import re
sample = "UserName=john.doe, Calling-Station-ID=AA:BB:CC:DD:EE:FF"
match = re.search(config.patterns["username"], sample)
assert match.group(1) == "john.doe"
def test_critical_filter_includes_failures(self):
config = ISEFilterConfig()
assert 5400 in config.critical_ids
assert 5405 in config.critical_ids
def test_critical_filter_excludes_success(self):
config = ISEFilterConfig()
assert 5200 not in config.critical_ids
class TestISEFilterConfig:
def test_monad_config_format(self):
config = ISEFilterConfig()
monad_config = config.to_monad_config()
assert monad_config["name"] == "ise-critical-filter"
assert monad_config["type"] == "filter"
assert "condition" in monad_config["config"]
Scaling Checklist
As you add more sources:
-
Create new module in
monad_chla/transforms/ -
Document message IDs / syslog codes in module
-
Add pipeline config in
pipelines/<source>/ -
Add tests in
tests/test_<source>_transforms.py -
Add sample logs in
tests/fixtures/ -
Update CI/CD workflow options
-
Document in this page
Next Sources to Add
| Source | Priority | Notes |
|---|---|---|
ISE TACACS |
High |
Similar to RADIUS, different message IDs |
Network switches |
Medium |
Cisco IOS syslog format |
WLC |
Medium |
Wireless controller events |
DNS (BIND) |
Low |
Query logs if needed |