Infrastructure Patterns

Production patterns extracted from netapi. Copy these for your own infrastructure automation.

API Client Architecture

Base Client

from abc import ABC, abstractmethod
from dataclasses import dataclass
import httpx

@dataclass
class ClientConfig:
    """Configuration for API clients."""
    host: str
    username: str
    password: str
    verify_ssl: bool = True
    timeout: float = 30.0
    max_retries: int = 3

class BaseAPIClient(ABC):
    """Abstract base for all vendor API clients."""

    def __init__(self, config: ClientConfig):
        self.config = config
        self._client: httpx.Client | None = None

    @property
    @abstractmethod
    def base_url(self) -> str:
        """Return the base URL for API requests."""
        pass

    @property
    def client(self) -> httpx.Client:
        """Lazy-initialize HTTP client."""
        if self._client is None:
            self._client = httpx.Client(
                base_url=self.base_url,
                auth=(self.config.username, self.config.password),
                verify=self.config.verify_ssl,
                timeout=self.config.timeout,
                headers=self._default_headers()
            )
        return self._client

    def _default_headers(self) -> dict[str, str]:
        return {
            "Accept": "application/json",
            "Content-Type": "application/json"
        }

    def request(self, method: str, path: str, **kwargs) -> dict:
        """Make request with retry logic."""
        import time
        last_error = None

        for attempt in range(self.config.max_retries):
            try:
                response = self.client.request(method, path, **kwargs)
                response.raise_for_status()
                return response.json() if response.content else {}
            except httpx.TransportError as e:
                last_error = e
                if attempt < self.config.max_retries - 1:
                    time.sleep(2 ** attempt)
            except httpx.HTTPStatusError as e:
                if e.response.status_code in (502, 503, 504):
                    last_error = e
                    if attempt < self.config.max_retries - 1:
                        time.sleep(2 ** attempt)
                else:
                    raise

        raise last_error

    def get(self, path: str, **kwargs) -> dict:
        return self.request("GET", path, **kwargs)

    def post(self, path: str, **kwargs) -> dict:
        return self.request("POST", path, **kwargs)

    def put(self, path: str, **kwargs) -> dict:
        return self.request("PUT", path, **kwargs)

    def delete(self, path: str, **kwargs) -> dict:
        return self.request("DELETE", path, **kwargs)

    def close(self):
        if self._client:
            self._client.close()
            self._client = None

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

Vendor Implementation

class ISEClient(BaseAPIClient):
    """Cisco ISE ERS API client."""

    @property
    def base_url(self) -> str:
        return f"https://{self.config.host}:9060/ers/config"

    def get_endpoints(self, page_size: int = 100) -> list[dict]:
        """Get all endpoints with pagination."""
        endpoints = []
        page = 1

        while True:
            data = self.get("/endpoint", params={"page": page, "size": page_size})
            resources = data.get("SearchResult", {}).get("resources", [])

            for resource in resources:
                detail = self.get(f"/endpoint/{resource['id']}")
                endpoints.append(detail.get("ERSEndPoint", detail))

            total = data.get("SearchResult", {}).get("total", 0)
            if page * page_size >= total:
                break
            page += 1

        return endpoints

    def get_endpoint_by_mac(self, mac: str) -> dict | None:
        """Look up endpoint by MAC."""
        data = self.get("/endpoint", params={"filter": f"mac.EQ.{mac}"})
        resources = data.get("SearchResult", {}).get("resources", [])
        if resources:
            detail = self.get(f"/endpoint/{resources[0]['id']}")
            return detail.get("ERSEndPoint", detail)
        return None

    def create_endpoint(self, mac: str, group_name: str, **kwargs) -> str:
        """Create endpoint, return ID."""
        group_id = self._get_group_id(group_name)
        payload = {
            "ERSEndPoint": {
                "mac": mac,
                "groupId": group_id,
                **kwargs
            }
        }
        response = self.client.post("/endpoint", json=payload)
        response.raise_for_status()
        return response.headers["Location"].split("/")[-1]

    def _get_group_id(self, name: str) -> str:
        data = self.get("/endpointgroup", params={"filter": f"name.EQ.{name}"})
        resources = data.get("SearchResult", {}).get("resources", [])
        if not resources:
            raise ValueError(f"Group not found: {name}")
        return resources[0]["id"]


class WLCClient(BaseAPIClient):
    """Cisco WLC RESTCONF client."""

    @property
    def base_url(self) -> str:
        return f"https://{self.config.host}/restconf/data"

    def _default_headers(self) -> dict[str, str]:
        return {
            "Accept": "application/yang-data+json",
            "Content-Type": "application/yang-data+json"
        }

    def get_access_points(self) -> list[dict]:
        data = self.get("/Cisco-IOS-XE-wireless-access-point-oper:access-point-oper-data")
        return data.get("ap-name-mac-map", [])

CLI Architecture

Main Entry Point

# cli/main.py
import click
from rich.console import Console
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path

class OutputFormat(str, Enum):
    TABLE = "table"
    JSON = "json"
    CSV = "csv"

@dataclass
class AppContext:
    """Shared CLI context."""
    console: Console = field(default_factory=Console)
    format: OutputFormat = OutputFormat.TABLE
    debug: bool = False
    config_path: Path = Path("~/.config/netapi/config.yaml").expanduser()

pass_ctx = click.make_pass_decorator(AppContext, ensure=True)

@click.group()
@click.option("--format", "-f",
    type=click.Choice([f.value for f in OutputFormat]),
    default="table", help="Output format")
@click.option("--debug/--no-debug", default=False)
@click.option("--config", "-c",
    type=click.Path(path_type=Path),
    default=Path("~/.config/netapi/config.yaml"),
    help="Config file path")
@click.version_option(prog_name="netapi")
@pass_ctx
def cli(ctx: AppContext, format: str, debug: bool, config: Path):
    """netapi - Infrastructure automation CLI."""
    ctx.format = OutputFormat(format)
    ctx.debug = debug
    ctx.config_path = config.expanduser()

# Register command groups
from netapi.cli import ise, vault, wlc
cli.add_command(ise.ise)
cli.add_command(vault.vault)
cli.add_command(wlc.wlc)

Command Group

# cli/ise.py
import click
from rich.table import Table
import json

from netapi.cli.main import pass_ctx, AppContext, OutputFormat
from netapi.vendors.cisco.ise import ISEClient, ClientConfig

def get_ise_client(ctx: AppContext) -> ISEClient:
    """Create ISE client from context."""
    # Load from config or env
    config = ClientConfig(
        host=ctx.obj.get("ise_host", "ise-01"),
        username=ctx.obj.get("ise_user", "admin"),
        password=ctx.obj.get("ise_pass", ""),
        verify_ssl=False
    )
    return ISEClient(config)

@click.group()
@click.option("--host", "-h", envvar="ISE_HOST", help="ISE hostname")
@click.option("--username", "-u", envvar="ISE_USERNAME", default="admin")
@click.option("--password", "-p", envvar="ISE_PASSWORD", prompt=True, hide_input=True)
@click.pass_context
def ise(ctx, host: str, username: str, password: str):
    """ISE operations."""
    ctx.ensure_object(dict)
    ctx.obj["ise_host"] = host
    ctx.obj["ise_user"] = username
    ctx.obj["ise_pass"] = password

@ise.command()
@click.option("--limit", "-l", type=int, default=100)
@pass_ctx
def endpoints(ctx: AppContext, limit: int):
    """List ISE endpoints."""
    with get_ise_client(ctx) as client:
        data = client.get_endpoints()[:limit]

    output(ctx, data, columns=["mac", "groupId", "staticGroupAssignment"])

@ise.command()
@click.argument("mac")
@pass_ctx
def lookup(ctx: AppContext, mac: str):
    """Look up endpoint by MAC."""
    with get_ise_client(ctx) as client:
        endpoint = client.get_endpoint_by_mac(mac)

    if endpoint:
        output(ctx, endpoint)
    else:
        ctx.console.print(f"[red]Endpoint not found: {mac}[/red]")
        raise SystemExit(1)

Output Formatting

# cli/output.py
import json
import csv
import io
from rich.table import Table
from rich.json import JSON

from netapi.cli.main import AppContext, OutputFormat

def output(ctx: AppContext, data: dict | list, columns: list[str] | None = None):
    """Format and output data based on context."""
    match ctx.format:
        case OutputFormat.JSON:
            output_json(ctx, data)
        case OutputFormat.CSV:
            output_csv(ctx, data, columns)
        case OutputFormat.TABLE:
            output_table(ctx, data, columns)

def output_json(ctx: AppContext, data):
    """Output as JSON."""
    ctx.console.print_json(data=data)

def output_csv(ctx: AppContext, data, columns: list[str] | None = None):
    """Output as CSV."""
    if isinstance(data, dict):
        data = [data]

    if not data:
        return

    columns = columns or list(data[0].keys())

    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=columns, extrasaction="ignore")
    writer.writeheader()
    writer.writerows(data)

    ctx.console.print(output.getvalue())

def output_table(ctx: AppContext, data, columns: list[str] | None = None):
    """Output as Rich table."""
    if isinstance(data, dict):
        # Single item - key/value table
        table = Table(show_header=False)
        table.add_column("Key", style="cyan")
        table.add_column("Value")
        for key, value in data.items():
            table.add_row(key, str(value))
        ctx.console.print(table)
        return

    if not data:
        ctx.console.print("[dim]No results[/dim]")
        return

    columns = columns or list(data[0].keys())

    table = Table()
    for col in columns:
        table.add_column(col.replace("_", " ").title())

    for item in data:
        row = [str(item.get(col, "")) for col in columns]
        table.add_row(*row)

    ctx.console.print(table)

Configuration Management

Config File

# config.py
from dataclasses import dataclass, field, asdict
from pathlib import Path
import yaml

@dataclass
class ISEConfig:
    host: str = ""
    username: str = "admin"
    verify_ssl: bool = True

@dataclass
class VaultConfig:
    addr: str = "https://vault.example.com:8200"
    token: str = ""
    namespace: str = ""

@dataclass
class AppConfig:
    ise: ISEConfig = field(default_factory=ISEConfig)
    vault: VaultConfig = field(default_factory=VaultConfig)
    default_format: str = "table"

    @classmethod
    def load(cls, path: Path) -> "AppConfig":
        """Load config from YAML file."""
        if not path.exists():
            return cls()

        data = yaml.safe_load(path.read_text())
        return cls(
            ise=ISEConfig(**data.get("ise", {})),
            vault=VaultConfig(**data.get("vault", {})),
            default_format=data.get("default_format", "table")
        )

    def save(self, path: Path) -> None:
        """Save config to YAML file."""
        path.parent.mkdir(parents=True, exist_ok=True)
        data = {
            "ise": asdict(self.ise),
            "vault": asdict(self.vault),
            "default_format": self.default_format
        }
        path.write_text(yaml.dump(data, default_flow_style=False))

# Usage
config = AppConfig.load(Path("~/.config/netapi/config.yaml").expanduser())
print(config.ise.host)

Environment Variables

import os
from dataclasses import dataclass

@dataclass
class EnvConfig:
    """Configuration from environment variables."""
    ise_host: str = ""
    ise_username: str = "admin"
    ise_password: str = ""
    vault_addr: str = ""
    vault_token: str = ""

    @classmethod
    def from_env(cls) -> "EnvConfig":
        return cls(
            ise_host=os.environ.get("ISE_HOST", ""),
            ise_username=os.environ.get("ISE_USERNAME", "admin"),
            ise_password=os.environ.get("ISE_PASSWORD", ""),
            vault_addr=os.environ.get("VAULT_ADDR", ""),
            vault_token=os.environ.get("VAULT_TOKEN", "")
        )

# Priority: CLI option > Environment > Config file > Default
def get_value(cli_value, env_key, config_value, default):
    if cli_value:
        return cli_value
    if env_value := os.environ.get(env_key):
        return env_value
    if config_value:
        return config_value
    return default

Repository Pattern

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic

T = TypeVar("T")

@dataclass
class Endpoint:
    id: str
    mac: str
    group: str
    status: str

class Repository(ABC, Generic[T]):
    """Abstract repository for CRUD operations."""

    @abstractmethod
    def get(self, id: str) -> T | None: ...

    @abstractmethod
    def list(self, **filters) -> list[T]: ...

    @abstractmethod
    def create(self, entity: T) -> str: ...

    @abstractmethod
    def update(self, entity: T) -> None: ...

    @abstractmethod
    def delete(self, id: str) -> bool: ...

class ISEEndpointRepository(Repository[Endpoint]):
    """ISE API implementation of endpoint repository."""

    def __init__(self, client: ISEClient):
        self.client = client

    def get(self, id: str) -> Endpoint | None:
        data = self.client.get(f"/endpoint/{id}")
        if data:
            return self._to_endpoint(data.get("ERSEndPoint", data))
        return None

    def list(self, **filters) -> list[Endpoint]:
        filter_str = "&".join(f"{k}.EQ.{v}" for k, v in filters.items())
        params = {"filter": filter_str} if filter_str else {}
        data = self.client.get("/endpoint", params=params)
        resources = data.get("SearchResult", {}).get("resources", [])
        return [self._to_endpoint(self.client.get(f"/endpoint/{r['id']}").get("ERSEndPoint", {}))
                for r in resources]

    def create(self, endpoint: Endpoint) -> str:
        return self.client.create_endpoint(endpoint.mac, endpoint.group)

    def update(self, endpoint: Endpoint) -> None:
        self.client.put(f"/endpoint/{endpoint.id}", json={"ERSEndPoint": {
            "mac": endpoint.mac,
            "groupId": self.client._get_group_id(endpoint.group)
        }})

    def delete(self, id: str) -> bool:
        response = self.client.client.delete(f"/endpoint/{id}")
        return response.status_code == 204

    def _to_endpoint(self, data: dict) -> Endpoint:
        return Endpoint(
            id=data.get("id", ""),
            mac=data.get("mac", ""),
            group=data.get("groupId", ""),
            status=data.get("staticGroupAssignment", "unknown")
        )

Batch Operations

from typing import Callable, TypeVar, Iterator
from dataclasses import dataclass
import time

T = TypeVar("T")
R = TypeVar("R")

@dataclass
class BatchResult:
    success: list
    failed: list[tuple[object, Exception]]

def batch_process(
    items: list[T],
    processor: Callable[[T], R],
    batch_size: int = 100,
    delay: float = 0.1,
    on_progress: Callable[[int, int], None] | None = None
) -> BatchResult:
    """Process items in batches with error handling."""
    success = []
    failed = []

    for i, item in enumerate(items):
        try:
            result = processor(item)
            success.append(result)
        except Exception as e:
            failed.append((item, e))

        if on_progress:
            on_progress(i + 1, len(items))

        if (i + 1) % batch_size == 0:
            time.sleep(delay)

    return BatchResult(success=success, failed=failed)

# Usage with progress bar
from rich.progress import Progress

def update_endpoints(endpoints: list[dict], new_group: str):
    with Progress() as progress:
        task = progress.add_task("Updating...", total=len(endpoints))

        def process(ep):
            client.update_endpoint(ep["id"], group=new_group)
            return ep["id"]

        def on_progress(current, total):
            progress.update(task, completed=current)

        result = batch_process(
            endpoints,
            process,
            batch_size=50,
            delay=0.5,
            on_progress=on_progress
        )

    print(f"Success: {len(result.success)}, Failed: {len(result.failed)}")

Error Handling

from dataclasses import dataclass

# Custom exceptions
class NetAPIError(Exception):
    """Base exception for netapi."""
    pass

class AuthenticationError(NetAPIError):
    """Authentication failed."""
    pass

class ResourceNotFoundError(NetAPIError):
    """Resource not found."""
    def __init__(self, resource_type: str, identifier: str):
        self.resource_type = resource_type
        self.identifier = identifier
        super().__init__(f"{resource_type} not found: {identifier}")

class APIError(NetAPIError):
    """API request failed."""
    def __init__(self, status_code: int, message: str, response: dict | None = None):
        self.status_code = status_code
        self.response = response
        super().__init__(f"HTTP {status_code}: {message}")

# Usage in client
def get_endpoint(self, id: str) -> dict:
    try:
        response = self.client.get(f"/endpoint/{id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 401:
            raise AuthenticationError("Invalid credentials")
        if e.response.status_code == 404:
            raise ResourceNotFoundError("Endpoint", id)
        raise APIError(
            e.response.status_code,
            e.response.text,
            e.response.json() if e.response.content else None
        )

# Usage in CLI
@ise.command()
@click.argument("id")
@pass_ctx
def get(ctx: AppContext, id: str):
    try:
        endpoint = client.get_endpoint(id)
        output(ctx, endpoint)
    except ResourceNotFoundError as e:
        ctx.console.print(f"[red]Not found: {e.identifier}[/red]")
        raise SystemExit(1)
    except AuthenticationError:
        ctx.console.print("[red]Authentication failed. Check credentials.[/red]")
        raise SystemExit(2)
    except APIError as e:
        ctx.console.print(f"[red]API error: {e}[/red]")
        if ctx.debug and e.response:
            ctx.console.print_json(data=e.response)
        raise SystemExit(3)

Logging

import logging
from rich.logging import RichHandler

def setup_logging(debug: bool = False) -> logging.Logger:
    """Configure logging with Rich handler."""
    level = logging.DEBUG if debug else logging.INFO

    logging.basicConfig(
        level=level,
        format="%(message)s",
        datefmt="[%X]",
        handlers=[RichHandler(rich_tracebacks=True)]
    )

    # Silence noisy libraries
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("httpcore").setLevel(logging.WARNING)

    return logging.getLogger("netapi")

# Usage
logger = setup_logging(debug=True)
logger.info("Starting operation")
logger.debug("Request: %s", request_data)
logger.error("Operation failed: %s", error)

Summary

These patterns form the foundation of production infrastructure automation:

  1. Base Client: Reusable HTTP client with retry logic

  2. CLI Architecture: Click groups with shared context

  3. Output Formatting: Table/JSON/CSV with Rich

  4. Configuration: File + environment + CLI priority

  5. Repository Pattern: Abstract data access

  6. Batch Operations: Progress tracking, error handling

  7. Custom Exceptions: Meaningful error types

  8. Logging: Structured logging with Rich

Apply these patterns to build professional-grade automation tools.