SDK Integration & Custom Transforms

Overview

Monad lacks native connectors for ISE, FTD, and traditional network devices. This is an opportunity: build custom transforms using the SDK, own the integration, and create reusable assets for the team.

Value Proposition

Challenge Opportunity

No native ISE connector

Build ISE transform library - become the expert

No native FTD connector

Build FTD transform library - own the integration

Team uses UI clicks

You use pipeline-as-code - DevOps credibility

One-off configs

Reusable modules - team multiplier

Architecture

                     YOUR CONTRIBUTION
                            │
┌───────────────────────────┴────────────────────────────┐
│                                                         │
│   ┌─────────────┐    ┌─────────────┐    ┌───────────┐  │
│   │  ISE Logs   │    │  FTD Logs   │    │  Network  │  │
│   └──────┬──────┘    └──────┬──────┘    └─────┬─────┘  │
│          │                  │                  │        │
│          ▼                  ▼                  ▼        │
│   ┌─────────────────────────────────────────────────┐  │
│   │           Syslog Collector (rsyslog)            │  │
│   └─────────────────────┬───────────────────────────┘  │
│                         │                               │
│                         ▼                               │
│   ┌─────────────────────────────────────────────────┐  │
│   │              S3 Bucket (staging)                │  │
│   └─────────────────────┬───────────────────────────┘  │
│                         │                               │
│   ══════════════════════╪═══════════════════════════   │
│         MONAD BOUNDARY  │                               │
│   ══════════════════════╪═══════════════════════════   │
│                         ▼                               │
│   ┌─────────────────────────────────────────────────┐  │
│   │              Monad S3 Input                     │  │
│   └─────────────────────┬───────────────────────────┘  │
│                         │                               │
│          ┌──────────────┼──────────────┐               │
│          ▼              ▼              ▼               │
│   ┌────────────┐ ┌────────────┐ ┌────────────┐        │
│   │ ISE Parse  │ │ FTD Parse  │ │ Net Parse  │        │
│   │ Transform  │ │ Transform  │ │ Transform  │        │
│   └─────┬──────┘ └─────┬──────┘ └─────┬──────┘        │
│         │              │              │                │
│         ▼              ▼              ▼                │
│   ┌─────────────────────────────────────────────────┐  │
│   │           Filter Transform (Critical?)          │  │
│   └──────────┬─────────────────────────┬────────────┘  │
│              │                         │               │
│         CRITICAL                     BULK              │
│              ▼                         ▼               │
│   ┌──────────────────┐      ┌──────────────────┐      │
│   │ Sentinel Output  │      │   S3 Archive     │      │
│   └──────────────────┘      └──────────────────┘      │
│                                                         │
└─────────────────────────────────────────────────────────┘

Repository Structure

Scalable layout for pipeline-as-code:

monad-pipelines/
├── README.md
├── pyproject.toml              # Python package config
│
├── monad_chla/                 # Your reusable library
│   ├── __init__.py
│   ├── client.py               # API client wrapper
│   ├── transforms/
│   │   ├── __init__.py
│   │   ├── ise.py              # ISE parsing & filtering
│   │   ├── ftd.py              # FTD parsing & filtering
│   │   ├── network.py          # Generic network devices
│   │   └── common.py           # Shared utilities
│   ├── inputs/
│   │   ├── __init__.py
│   │   └── s3.py               # S3 input configs
│   └── outputs/
│       ├── __init__.py
│       ├── sentinel.py         # Sentinel output config
│       └── s3_archive.py       # S3 archive config
│
├── pipelines/                  # Pipeline definitions (JSON/YAML)
│   ├── ise-radius/
│   │   ├── config.yaml
│   │   └── transforms.yaml
│   ├── ise-tacacs/
│   │   ├── config.yaml
│   │   └── transforms.yaml
│   ├── ftd-firewall/
│   │   ├── config.yaml
│   │   └── transforms.yaml
│   └── network-devices/
│       ├── config.yaml
│       └── transforms.yaml
│
├── scripts/
│   ├── deploy.py               # CI/CD deployment script
│   ├── validate.py             # Config validation
│   └── rollback.py             # Rollback script
│
├── tests/
│   ├── test_ise_transforms.py
│   ├── test_ftd_transforms.py
│   └── fixtures/               # Sample log data
│       ├── ise_samples.log
│       └── ftd_samples.log
│
└── .github/
    └── workflows/
        └── deploy.yaml         # GitHub Actions CI/CD

ISE Transform Module

Message ID Reference

ID Description Severity Route

5200

Authentication succeeded

Info

S3 Archive

5400

Authentication failed

Critical

Sentinel

5405

RADIUS request dropped

Critical

Sentinel

5440

Endpoint abandoned EAP

Warning

Sentinel

5449

Endpoint failed EAP

Critical

Sentinel

12508

EAP-TLS handshake failed

Critical

Sentinel

24015

EAP-TLS began

Debug

S3 Archive

86009

Guest login

Info

S3 Archive

86010

Guest login failed

Warning

Sentinel

ISE Transform Code

# monad_chla/transforms/ise.py

from dataclasses import dataclass
from typing import List, Dict, Any

# Message ID classifications
ISE_CRITICAL_IDS = [
    5400,   # Auth failed
    5405,   # RADIUS dropped
    5440,   # Endpoint abandoned
    5449,   # Endpoint EAP failed
    12508,  # EAP-TLS failed
    86010,  # Guest failed
]

ISE_BULK_IDS = [
    5200,   # Auth passed
    24015,  # EAP started
    24016,  # EAP continued
    86009,  # Guest login
]

@dataclass
class ISEParserConfig:
    """Configuration for ISE syslog parser transform."""
    name: str = "ise-syslog-parser"

    # Regex patterns for ISE syslog fields
    patterns: Dict[str, str] = None

    def __post_init__(self):
        self.patterns = {
            "message_id": r"(\d{4,5})\s+NOTICE",
            "username": r"UserName=([^,\s]+)",
            "mac_address": r"Calling-Station-ID=([0-9A-Fa-f:-]+)",
            "ip_address": r"Framed-IP-Address=([0-9.]+)",
            "nas_ip": r"NAS-IP-Address=([0-9.]+)",
            "nas_port": r"NAS-Port=(\d+)",
            "failure_reason": r"FailureReason=([^,]+)",
            "auth_policy": r"AuthenticationPolicy=([^,]+)",
            "authz_policy": r"AuthorizationPolicy=([^,]+)",
            "selected_profile": r"SelectedAuthorizationProfiles=([^,]+)",
        }

    def to_monad_config(self) -> Dict[str, Any]:
        """Convert to Monad API format."""
        return {
            "name": self.name,
            "type": "parse",
            "config": {
                "rules": [
                    {"field": k, "pattern": v}
                    for k, v in self.patterns.items()
                ]
            }
        }


@dataclass
class ISEFilterConfig:
    """Configuration for ISE critical/bulk filter transform."""
    name: str = "ise-critical-filter"
    critical_ids: List[int] = None

    def __post_init__(self):
        if self.critical_ids is None:
            self.critical_ids = ISE_CRITICAL_IDS

    def to_monad_config(self) -> Dict[str, Any]:
        """Convert to Monad API format."""
        return {
            "name": self.name,
            "type": "filter",
            "config": {
                "condition": f"message_id in {self.critical_ids}"
            }
        }


class ISETransformBuilder:
    """Builder for ISE transform pipeline."""

    def __init__(self, api_client, org_id: str):
        self.api = api_client
        self.org_id = org_id

    def create_parser(self, config: ISEParserConfig = None) -> str:
        """Create ISE parser transform, return transform ID."""
        config = config or ISEParserConfig()
        response = self.api.V1OrganizationIdTransformsPost(
            self.org_id,
            config.to_monad_config()
        )
        return response.id

    def create_critical_filter(self, config: ISEFilterConfig = None) -> str:
        """Create ISE critical filter transform, return transform ID."""
        config = config or ISEFilterConfig()
        response = self.api.V1OrganizationIdTransformsPost(
            self.org_id,
            config.to_monad_config()
        )
        return response.id

    def create_bulk_filter(self) -> str:
        """Create ISE bulk filter (inverse of critical)."""
        config = {
            "name": "ise-bulk-filter",
            "type": "filter",
            "config": {
                "condition": f"message_id in {ISE_BULK_IDS}"
            }
        }
        response = self.api.V1OrganizationIdTransformsPost(
            self.org_id,
            config
        )
        return response.id

FTD Transform Module

Syslog ID Reference

ID Description Severity Route

106001

Inbound connection permitted

Info

S3 Archive

106006

Denied inbound

Warning

Sentinel

106007

Denied inbound (no xlate)

Warning

Sentinel

106014

Denied inbound (ACL)

Warning

Sentinel

106015

Denied inbound (no route)

Warning

Sentinel

106023

Denied by ACL

Critical

Sentinel

106100

ACL permitted/denied

Varies

Parse action field

430003

IPS/Snort drop

Critical

Sentinel

733100

Threat detected

Critical

Sentinel

FTD Transform Code

# monad_chla/transforms/ftd.py

from dataclasses import dataclass
from typing import List, Dict, Any

FTD_CRITICAL_IDS = [
    "106006", "106007", "106014", "106015", "106023",
    "430003", "733100"
]

FTD_BULK_IDS = [
    "106001",  # Permitted
    "302013",  # Built connection
    "302014",  # Teardown connection
]

@dataclass
class FTDParserConfig:
    """Configuration for FTD syslog parser transform."""
    name: str = "ftd-syslog-parser"

    patterns: Dict[str, str] = None

    def __post_init__(self):
        self.patterns = {
            "syslog_id": r"%FTD-\d-(\d{6})",
            "action": r"(permitted|denied|Deny|Drop)",
            "src_ip": r"(\d+\.\d+\.\d+\.\d+)/\d+",
            "dst_ip": r"->.*?(\d+\.\d+\.\d+\.\d+)/\d+",
            "src_port": r"(\d+\.\d+\.\d+\.\d+)/(\d+)",
            "dst_port": r"->.*?(\d+\.\d+\.\d+\.\d+)/(\d+)",
            "protocol": r"(TCP|UDP|ICMP|GRE)",
            "acl_name": r"access-group\s+(\S+)",
            "interface": r"interface\s+(\S+)",
        }

    def to_monad_config(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "type": "parse",
            "config": {
                "rules": [
                    {"field": k, "pattern": v}
                    for k, v in self.patterns.items()
                ]
            }
        }


class FTDTransformBuilder:
    """Builder for FTD transform pipeline."""

    def __init__(self, api_client, org_id: str):
        self.api = api_client
        self.org_id = org_id

    def create_parser(self, config: FTDParserConfig = None) -> str:
        config = config or FTDParserConfig()
        response = self.api.V1OrganizationIdTransformsPost(
            self.org_id,
            config.to_monad_config()
        )
        return response.id

    def create_critical_filter(self) -> str:
        config = {
            "name": "ftd-critical-filter",
            "type": "filter",
            "config": {
                "condition": f"syslog_id in {FTD_CRITICAL_IDS} or action in ['denied', 'Deny', 'Drop']"
            }
        }
        response = self.api.V1OrganizationIdTransformsPost(
            self.org_id,
            config
        )
        return response.id

jq Transforms

Monad uses jq for JSON transformations. Master these patterns to manipulate log data at the pipeline level.

ISE jq Patterns

Filter by Message ID (Critical Only)

# Keep only critical ISE events
select(.message_id | IN(5400, 5405, 5440, 5449, 12508, 86010))

Extract Authentication Fields

# Parse ISE syslog into structured JSON
{
  timestamp: .timestamp,
  message_id: (.raw | capture("(?<id>\\d{4,5})\\s+NOTICE").id | tonumber),
  username: (.raw | capture("UserName=(?<u>[^,\\s]+)").u // null),
  mac: (.raw | capture("Calling-Station-ID=(?<m>[0-9A-Fa-f:-]+)").m // null),
  ip: (.raw | capture("Framed-IP-Address=(?<ip>[0-9.]+)").ip // null),
  nas_ip: (.raw | capture("NAS-IP-Address=(?<nas>[0-9.]+)").nas // null),
  failure_reason: (.raw | capture("FailureReason=(?<r>[^,]+)").r // null),
  policy: (.raw | capture("AuthorizationPolicy=(?<p>[^,]+)").p // null)
}

Route by Severity

# Add routing tag based on message ID
. + {
  route: (
    if .message_id | IN(5400, 5405, 5440, 5449, 12508) then "sentinel"
    elif .message_id | IN(5200, 24015, 86009) then "archive"
    else "unknown"
    end
  )
}

FTD jq Patterns

Filter Denied Traffic

# Keep only denied/dropped traffic
select(.action | IN("denied", "Deny", "Drop"))

Extract Firewall 5-Tuple

# Parse FTD syslog into structured JSON
{
  timestamp: .timestamp,
  syslog_id: (.raw | capture("%FTD-\\d-(?<id>\\d{6})").id),
  action: (.raw | capture("(?<a>permitted|denied|Deny|Drop)").a // "unknown"),
  protocol: (.raw | capture("(?<p>TCP|UDP|ICMP|GRE)").p // null),
  src_ip: (.raw | capture("(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/\\d+").ip // null),
  dst_ip: (.raw | capture("->.*?(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/\\d+").ip // null),
  src_port: (.raw | capture("(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/(?<port>\\d+)").port // null),
  dst_port: (.raw | capture("->.*?(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)/(?<port>\\d+)").port // null)
}

IPS Event Classification

# Tag IPS events with severity
. + {
  severity: (
    if .syslog_id | IN("430003", "733100") then "critical"
    elif .syslog_id | IN("106023", "106006", "106007") then "high"
    elif .syslog_id | IN("106014", "106015") then "medium"
    else "low"
    end
  )
}

JSON Flattening

Flatten Nested Objects

# Flatten nested ISE attributes for Sentinel
{
  timestamp,
  message_id,
  username,
  "endpoint.mac": .endpoint.mac,
  "endpoint.ip": .endpoint.ip,
  "endpoint.os": .endpoint.os,
  "policy.authn": .policy.authentication,
  "policy.authz": .policy.authorization,
  "policy.profile": .policy.selected_profile
}

Array to Delimited String

# Convert array fields to pipe-delimited strings
. + {
  applied_acls: (.acls | join("|")),
  user_groups: (.groups | join("|"))
}

OCSF Normalization

ISE to OCSF Authentication (Class 3002)

# Map ISE to OCSF Authentication schema
{
  class_uid: 3002,
  class_name: "Authentication",
  category_uid: 3,
  category_name: "Identity & Access Management",
  activity_id: (if .message_id == 5200 then 1 else 2 end),
  activity_name: (if .message_id == 5200 then "Logon" else "Logon Failed" end),
  severity_id: (if .message_id | IN(5400, 5405, 5449) then 4 else 1 end),
  status_id: (if .message_id == 5200 then 1 else 2 end),
  time: (.timestamp | fromdateiso8601),
  user: {
    name: .username,
    uid: .username
  },
  src_endpoint: {
    ip: .ip,
    mac: .mac
  },
  dst_endpoint: {
    ip: .nas_ip,
    name: .nas_hostname
  },
  auth_protocol: "RADIUS",
  logon_type_id: 9,
  message: .raw,
  metadata: {
    product: { name: "Cisco ISE", vendor_name: "Cisco" },
    version: "1.0.0"
  }
}

FTD to OCSF Network Activity (Class 4001)

# Map FTD to OCSF Network Activity schema
{
  class_uid: 4001,
  class_name: "Network Activity",
  category_uid: 4,
  category_name: "Network Activity",
  activity_id: (if .action | IN("permitted", "allow") then 1 else 2 end),
  activity_name: (if .action | IN("permitted", "allow") then "Traffic" else "Traffic Denied" end),
  severity_id: (
    if .syslog_id | IN("430003", "733100") then 5
    elif .action | IN("denied", "Deny", "Drop") then 3
    else 1
    end
  ),
  status_id: (if .action | IN("permitted", "allow") then 1 else 2 end),
  time: (.timestamp | fromdateiso8601),
  src_endpoint: {
    ip: .src_ip,
    port: (.src_port | tonumber)
  },
  dst_endpoint: {
    ip: .dst_ip,
    port: (.dst_port | tonumber)
  },
  connection_info: {
    protocol_name: .protocol
  },
  disposition_id: (if .action | IN("permitted", "allow") then 1 else 2 end),
  message: .raw,
  metadata: {
    product: { name: "Cisco FTD", vendor_name: "Cisco" },
    version: "1.0.0"
  }
}

jq Quick Reference

Pattern Purpose

select(.field | IN(val1, val2))

Filter by value list

capture("regex").group

Extract with named groups

.field // default

Null coalescing

if cond then a else b end

Conditional

. + {new_field: value}

Add field

del(.field)

Remove field

{a, b: .nested.b}

Reshape object

.array | join(",")

Array to string

tonumber, tostring

Type conversion

fromdateiso8601

Parse ISO timestamp

Skills Development

Go/Python Learning Opportunity

Yes, this project provides real-world language learning.

Language Monad Use Case Learning Path

Python

SDK automation, pipeline-as-code, testing

Start here - SDK examples are Python-first

Go

High-performance transforms, SDK native

After Python basics - Go SDK is fully featured

jq

JSON transformations in pipelines

Essential - used in every transform

Learning Progression

Week 1-2: jq fundamentals
├── Filter, select, capture
├── Reshape objects
└── Type conversions

Week 3-4: Python SDK
├── API client wrapper
├── Transform builders
└── Pipeline deployment scripts

Week 5-6: Go SDK (optional)
├── Convert Python transforms to Go
├── Performance-critical pipelines
└── Native Monad integration

Why This Matters for Your Career

  1. jq → Universal log/JSON skill (Splunk, ELK, any SIEM)

  2. Python SDK → Automation credibility (DevSecOps)

  3. Go SDK → Cloud-native skillset (Kubernetes, observability tools)

  4. OCSF → Emerging standard (Microsoft, Splunk, AWS Security Lake)

Real Deliverables

What you’ll build that goes on your resume:

  • Custom ISE transform library (Python)

  • Custom FTD transform library (Python)

  • Pipeline-as-code repository

  • CI/CD deployment automation

  • OCSF normalization transforms

  • Unit test suite with fixture data

None of your team will have this. They’ll click buttons in the UI. You’ll have code.

Pipeline Deployment

Configuration File

# pipelines/ise-radius/config.yaml

pipeline:
  name: ise-radius-to-sentinel
  description: ISE RADIUS logs - critical to Sentinel, bulk to S3

input:
  type: s3
  config:
    bucket: chla-syslog-staging
    prefix: ise/radius/
    region: us-west-2

transforms:
  - name: ise-parser
    type: custom
    module: monad_chla.transforms.ise
    class: ISEParserConfig

  - name: ise-critical-filter
    type: custom
    module: monad_chla.transforms.ise
    class: ISEFilterConfig
    params:
      critical_ids: [5400, 5405, 5440, 5449, 12508]

outputs:
  critical:
    type: sentinel
    condition: "passes critical filter"
    config:
      workspace_id: ${SENTINEL_WORKSPACE_ID}
      shared_key: ${SENTINEL_SHARED_KEY}
      log_type: ISESecurityLogs

  archive:
    type: s3
    condition: "all logs"
    config:
      bucket: chla-log-archive
      prefix: ise/radius/
      region: us-west-2

Deployment Script

#!/usr/bin/env python3
# scripts/deploy.py

import os
import yaml
import argparse
from monad_chla.client import MonadClient
from monad_chla.transforms.ise import ISETransformBuilder
from monad_chla.transforms.ftd import FTDTransformBuilder

def load_config(pipeline_path: str) -> dict:
    """Load pipeline configuration from YAML."""
    with open(f"pipelines/{pipeline_path}/config.yaml") as f:
        return yaml.safe_load(f)

def deploy_pipeline(client: MonadClient, config: dict) -> str:
    """Deploy a complete pipeline, return pipeline ID."""

    # Create input
    input_id = client.create_input(config["input"])
    print(f"Created input: {input_id}")

    # Create transforms
    transform_ids = []
    for t in config["transforms"]:
        if t["type"] == "custom":
            # Use our custom builders
            builder = get_transform_builder(client, t["module"])
            transform_id = builder.create_from_config(t)
        else:
            transform_id = client.create_transform(t)
        transform_ids.append(transform_id)
        print(f"Created transform: {transform_id}")

    # Create outputs
    output_ids = []
    for name, output_config in config["outputs"].items():
        output_id = client.create_output(output_config)
        output_ids.append(output_id)
        print(f"Created output ({name}): {output_id}")

    # Wire together as pipeline
    pipeline_id = client.create_pipeline(
        name=config["pipeline"]["name"],
        description=config["pipeline"]["description"],
        nodes=[input_id] + transform_ids + output_ids,
        edges=build_edges(input_id, transform_ids, output_ids, config)
    )

    print(f"Created pipeline: {pipeline_id}")
    return pipeline_id

def main():
    parser = argparse.ArgumentParser(description="Deploy Monad pipeline")
    parser.add_argument("pipeline", help="Pipeline to deploy (e.g., ise-radius)")
    parser.add_argument("--dry-run", action="store_true", help="Validate only")
    args = parser.parse_args()

    client = MonadClient(
        api_key=os.environ["MONAD_API_KEY"],
        org_id=os.environ["MONAD_ORG_ID"]
    )

    config = load_config(args.pipeline)

    if args.dry_run:
        print(f"Would deploy: {config['pipeline']['name']}")
        return

    pipeline_id = deploy_pipeline(client, config)
    print(f"Successfully deployed: {pipeline_id}")

if __name__ == "__main__":
    main()

CI/CD Integration

GitHub Actions Workflow

# .github/workflows/deploy.yaml

name: Deploy Monad Pipelines

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'monad_chla/**'
  workflow_dispatch:
    inputs:
      pipeline:
        description: 'Pipeline to deploy'
        required: true
        type: choice
        options:
          - ise-radius
          - ise-tacacs
          - ftd-firewall
          - network-devices
          - all

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install -e .
      - run: python scripts/validate.py

  deploy:
    needs: validate
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install -e .
      - name: Deploy pipeline
        env:
          MONAD_API_KEY: ${{ secrets.MONAD_API_KEY }}
          MONAD_ORG_ID: ${{ secrets.MONAD_ORG_ID }}
          SENTINEL_WORKSPACE_ID: ${{ secrets.SENTINEL_WORKSPACE_ID }}
          SENTINEL_SHARED_KEY: ${{ secrets.SENTINEL_SHARED_KEY }}
        run: |
          if [ "${{ github.event.inputs.pipeline }}" = "all" ]; then
            for p in ise-radius ise-tacacs ftd-firewall network-devices; do
              python scripts/deploy.py $p
            done
          else
            python scripts/deploy.py ${{ github.event.inputs.pipeline }}
          fi

Testing

Unit Tests

# tests/test_ise_transforms.py

import pytest
from monad_chla.transforms.ise import ISEParserConfig, ISEFilterConfig

class TestISEParser:

    def test_parser_extracts_message_id(self):
        config = ISEParserConfig()
        # Verify regex pattern matches ISE log format
        import re
        sample = "5400 NOTICE Failed-Attempt: UserName=jsmith"
        match = re.search(config.patterns["message_id"], sample)
        assert match.group(1) == "5400"

    def test_parser_extracts_username(self):
        config = ISEParserConfig()
        import re
        sample = "UserName=john.doe, Calling-Station-ID=AA:BB:CC:DD:EE:FF"
        match = re.search(config.patterns["username"], sample)
        assert match.group(1) == "john.doe"

    def test_critical_filter_includes_failures(self):
        config = ISEFilterConfig()
        assert 5400 in config.critical_ids
        assert 5405 in config.critical_ids

    def test_critical_filter_excludes_success(self):
        config = ISEFilterConfig()
        assert 5200 not in config.critical_ids

class TestISEFilterConfig:

    def test_monad_config_format(self):
        config = ISEFilterConfig()
        monad_config = config.to_monad_config()

        assert monad_config["name"] == "ise-critical-filter"
        assert monad_config["type"] == "filter"
        assert "condition" in monad_config["config"]

Scaling Checklist

As you add more sources:

  • Create new module in monad_chla/transforms/

  • Document message IDs / syslog codes in module

  • Add pipeline config in pipelines/<source>/

  • Add tests in tests/test_<source>_transforms.py

  • Add sample logs in tests/fixtures/

  • Update CI/CD workflow options

  • Document in this page

Next Sources to Add

Source Priority Notes

ISE TACACS

High

Similar to RADIUS, different message IDs

Network switches

Medium

Cisco IOS syslog format

WLC

Medium

Wireless controller events

DNS (BIND)

Low

Query logs if needed