Infrastructure Patterns
Production patterns extracted from netapi. Copy these for your own infrastructure automation.
API Client Architecture
Base Client
from abc import ABC, abstractmethod
from dataclasses import dataclass
import httpx
@dataclass
class ClientConfig:
"""Configuration for API clients."""
host: str
username: str
password: str
verify_ssl: bool = True
timeout: float = 30.0
max_retries: int = 3
class BaseAPIClient(ABC):
"""Abstract base for all vendor API clients."""
def __init__(self, config: ClientConfig):
self.config = config
self._client: httpx.Client | None = None
@property
@abstractmethod
def base_url(self) -> str:
"""Return the base URL for API requests."""
pass
@property
def client(self) -> httpx.Client:
"""Lazy-initialize HTTP client."""
if self._client is None:
self._client = httpx.Client(
base_url=self.base_url,
auth=(self.config.username, self.config.password),
verify=self.config.verify_ssl,
timeout=self.config.timeout,
headers=self._default_headers()
)
return self._client
def _default_headers(self) -> dict[str, str]:
return {
"Accept": "application/json",
"Content-Type": "application/json"
}
def request(self, method: str, path: str, **kwargs) -> dict:
"""Make request with retry logic."""
import time
last_error = None
for attempt in range(self.config.max_retries):
try:
response = self.client.request(method, path, **kwargs)
response.raise_for_status()
return response.json() if response.content else {}
except httpx.TransportError as e:
last_error = e
if attempt < self.config.max_retries - 1:
time.sleep(2 ** attempt)
except httpx.HTTPStatusError as e:
if e.response.status_code in (502, 503, 504):
last_error = e
if attempt < self.config.max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
raise last_error
def get(self, path: str, **kwargs) -> dict:
return self.request("GET", path, **kwargs)
def post(self, path: str, **kwargs) -> dict:
return self.request("POST", path, **kwargs)
def put(self, path: str, **kwargs) -> dict:
return self.request("PUT", path, **kwargs)
def delete(self, path: str, **kwargs) -> dict:
return self.request("DELETE", path, **kwargs)
def close(self):
if self._client:
self._client.close()
self._client = None
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
Vendor Implementation
class ISEClient(BaseAPIClient):
"""Cisco ISE ERS API client."""
@property
def base_url(self) -> str:
return f"https://{self.config.host}:9060/ers/config"
def get_endpoints(self, page_size: int = 100) -> list[dict]:
"""Get all endpoints with pagination."""
endpoints = []
page = 1
while True:
data = self.get("/endpoint", params={"page": page, "size": page_size})
resources = data.get("SearchResult", {}).get("resources", [])
for resource in resources:
detail = self.get(f"/endpoint/{resource['id']}")
endpoints.append(detail.get("ERSEndPoint", detail))
total = data.get("SearchResult", {}).get("total", 0)
if page * page_size >= total:
break
page += 1
return endpoints
def get_endpoint_by_mac(self, mac: str) -> dict | None:
"""Look up endpoint by MAC."""
data = self.get("/endpoint", params={"filter": f"mac.EQ.{mac}"})
resources = data.get("SearchResult", {}).get("resources", [])
if resources:
detail = self.get(f"/endpoint/{resources[0]['id']}")
return detail.get("ERSEndPoint", detail)
return None
def create_endpoint(self, mac: str, group_name: str, **kwargs) -> str:
"""Create endpoint, return ID."""
group_id = self._get_group_id(group_name)
payload = {
"ERSEndPoint": {
"mac": mac,
"groupId": group_id,
**kwargs
}
}
response = self.client.post("/endpoint", json=payload)
response.raise_for_status()
return response.headers["Location"].split("/")[-1]
def _get_group_id(self, name: str) -> str:
data = self.get("/endpointgroup", params={"filter": f"name.EQ.{name}"})
resources = data.get("SearchResult", {}).get("resources", [])
if not resources:
raise ValueError(f"Group not found: {name}")
return resources[0]["id"]
class WLCClient(BaseAPIClient):
"""Cisco WLC RESTCONF client."""
@property
def base_url(self) -> str:
return f"https://{self.config.host}/restconf/data"
def _default_headers(self) -> dict[str, str]:
return {
"Accept": "application/yang-data+json",
"Content-Type": "application/yang-data+json"
}
def get_access_points(self) -> list[dict]:
data = self.get("/Cisco-IOS-XE-wireless-access-point-oper:access-point-oper-data")
return data.get("ap-name-mac-map", [])
CLI Architecture
Main Entry Point
# cli/main.py
import click
from rich.console import Console
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
class OutputFormat(str, Enum):
TABLE = "table"
JSON = "json"
CSV = "csv"
@dataclass
class AppContext:
"""Shared CLI context."""
console: Console = field(default_factory=Console)
format: OutputFormat = OutputFormat.TABLE
debug: bool = False
config_path: Path = Path("~/.config/netapi/config.yaml").expanduser()
pass_ctx = click.make_pass_decorator(AppContext, ensure=True)
@click.group()
@click.option("--format", "-f",
type=click.Choice([f.value for f in OutputFormat]),
default="table", help="Output format")
@click.option("--debug/--no-debug", default=False)
@click.option("--config", "-c",
type=click.Path(path_type=Path),
default=Path("~/.config/netapi/config.yaml"),
help="Config file path")
@click.version_option(prog_name="netapi")
@pass_ctx
def cli(ctx: AppContext, format: str, debug: bool, config: Path):
"""netapi - Infrastructure automation CLI."""
ctx.format = OutputFormat(format)
ctx.debug = debug
ctx.config_path = config.expanduser()
# Register command groups
from netapi.cli import ise, vault, wlc
cli.add_command(ise.ise)
cli.add_command(vault.vault)
cli.add_command(wlc.wlc)
Command Group
# cli/ise.py
import click
from rich.table import Table
import json
from netapi.cli.main import pass_ctx, AppContext, OutputFormat
from netapi.vendors.cisco.ise import ISEClient, ClientConfig
def get_ise_client(ctx: AppContext) -> ISEClient:
"""Create ISE client from context."""
# Load from config or env
config = ClientConfig(
host=ctx.obj.get("ise_host", "ise-01"),
username=ctx.obj.get("ise_user", "admin"),
password=ctx.obj.get("ise_pass", ""),
verify_ssl=False
)
return ISEClient(config)
@click.group()
@click.option("--host", "-h", envvar="ISE_HOST", help="ISE hostname")
@click.option("--username", "-u", envvar="ISE_USERNAME", default="admin")
@click.option("--password", "-p", envvar="ISE_PASSWORD", prompt=True, hide_input=True)
@click.pass_context
def ise(ctx, host: str, username: str, password: str):
"""ISE operations."""
ctx.ensure_object(dict)
ctx.obj["ise_host"] = host
ctx.obj["ise_user"] = username
ctx.obj["ise_pass"] = password
@ise.command()
@click.option("--limit", "-l", type=int, default=100)
@pass_ctx
def endpoints(ctx: AppContext, limit: int):
"""List ISE endpoints."""
with get_ise_client(ctx) as client:
data = client.get_endpoints()[:limit]
output(ctx, data, columns=["mac", "groupId", "staticGroupAssignment"])
@ise.command()
@click.argument("mac")
@pass_ctx
def lookup(ctx: AppContext, mac: str):
"""Look up endpoint by MAC."""
with get_ise_client(ctx) as client:
endpoint = client.get_endpoint_by_mac(mac)
if endpoint:
output(ctx, endpoint)
else:
ctx.console.print(f"[red]Endpoint not found: {mac}[/red]")
raise SystemExit(1)
Output Formatting
# cli/output.py
import json
import csv
import io
from rich.table import Table
from rich.json import JSON
from netapi.cli.main import AppContext, OutputFormat
def output(ctx: AppContext, data: dict | list, columns: list[str] | None = None):
"""Format and output data based on context."""
match ctx.format:
case OutputFormat.JSON:
output_json(ctx, data)
case OutputFormat.CSV:
output_csv(ctx, data, columns)
case OutputFormat.TABLE:
output_table(ctx, data, columns)
def output_json(ctx: AppContext, data):
"""Output as JSON."""
ctx.console.print_json(data=data)
def output_csv(ctx: AppContext, data, columns: list[str] | None = None):
"""Output as CSV."""
if isinstance(data, dict):
data = [data]
if not data:
return
columns = columns or list(data[0].keys())
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
writer.writerows(data)
ctx.console.print(output.getvalue())
def output_table(ctx: AppContext, data, columns: list[str] | None = None):
"""Output as Rich table."""
if isinstance(data, dict):
# Single item - key/value table
table = Table(show_header=False)
table.add_column("Key", style="cyan")
table.add_column("Value")
for key, value in data.items():
table.add_row(key, str(value))
ctx.console.print(table)
return
if not data:
ctx.console.print("[dim]No results[/dim]")
return
columns = columns or list(data[0].keys())
table = Table()
for col in columns:
table.add_column(col.replace("_", " ").title())
for item in data:
row = [str(item.get(col, "")) for col in columns]
table.add_row(*row)
ctx.console.print(table)
Configuration Management
Config File
# config.py
from dataclasses import dataclass, field, asdict
from pathlib import Path
import yaml
@dataclass
class ISEConfig:
host: str = ""
username: str = "admin"
verify_ssl: bool = True
@dataclass
class VaultConfig:
addr: str = "https://vault.example.com:8200"
token: str = ""
namespace: str = ""
@dataclass
class AppConfig:
ise: ISEConfig = field(default_factory=ISEConfig)
vault: VaultConfig = field(default_factory=VaultConfig)
default_format: str = "table"
@classmethod
def load(cls, path: Path) -> "AppConfig":
"""Load config from YAML file."""
if not path.exists():
return cls()
data = yaml.safe_load(path.read_text())
return cls(
ise=ISEConfig(**data.get("ise", {})),
vault=VaultConfig(**data.get("vault", {})),
default_format=data.get("default_format", "table")
)
def save(self, path: Path) -> None:
"""Save config to YAML file."""
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"ise": asdict(self.ise),
"vault": asdict(self.vault),
"default_format": self.default_format
}
path.write_text(yaml.dump(data, default_flow_style=False))
# Usage
config = AppConfig.load(Path("~/.config/netapi/config.yaml").expanduser())
print(config.ise.host)
Environment Variables
import os
from dataclasses import dataclass
@dataclass
class EnvConfig:
"""Configuration from environment variables."""
ise_host: str = ""
ise_username: str = "admin"
ise_password: str = ""
vault_addr: str = ""
vault_token: str = ""
@classmethod
def from_env(cls) -> "EnvConfig":
return cls(
ise_host=os.environ.get("ISE_HOST", ""),
ise_username=os.environ.get("ISE_USERNAME", "admin"),
ise_password=os.environ.get("ISE_PASSWORD", ""),
vault_addr=os.environ.get("VAULT_ADDR", ""),
vault_token=os.environ.get("VAULT_TOKEN", "")
)
# Priority: CLI option > Environment > Config file > Default
def get_value(cli_value, env_key, config_value, default):
if cli_value:
return cli_value
if env_value := os.environ.get(env_key):
return env_value
if config_value:
return config_value
return default
Repository Pattern
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic
T = TypeVar("T")
@dataclass
class Endpoint:
id: str
mac: str
group: str
status: str
class Repository(ABC, Generic[T]):
"""Abstract repository for CRUD operations."""
@abstractmethod
def get(self, id: str) -> T | None: ...
@abstractmethod
def list(self, **filters) -> list[T]: ...
@abstractmethod
def create(self, entity: T) -> str: ...
@abstractmethod
def update(self, entity: T) -> None: ...
@abstractmethod
def delete(self, id: str) -> bool: ...
class ISEEndpointRepository(Repository[Endpoint]):
"""ISE API implementation of endpoint repository."""
def __init__(self, client: ISEClient):
self.client = client
def get(self, id: str) -> Endpoint | None:
data = self.client.get(f"/endpoint/{id}")
if data:
return self._to_endpoint(data.get("ERSEndPoint", data))
return None
def list(self, **filters) -> list[Endpoint]:
filter_str = "&".join(f"{k}.EQ.{v}" for k, v in filters.items())
params = {"filter": filter_str} if filter_str else {}
data = self.client.get("/endpoint", params=params)
resources = data.get("SearchResult", {}).get("resources", [])
return [self._to_endpoint(self.client.get(f"/endpoint/{r['id']}").get("ERSEndPoint", {}))
for r in resources]
def create(self, endpoint: Endpoint) -> str:
return self.client.create_endpoint(endpoint.mac, endpoint.group)
def update(self, endpoint: Endpoint) -> None:
self.client.put(f"/endpoint/{endpoint.id}", json={"ERSEndPoint": {
"mac": endpoint.mac,
"groupId": self.client._get_group_id(endpoint.group)
}})
def delete(self, id: str) -> bool:
response = self.client.client.delete(f"/endpoint/{id}")
return response.status_code == 204
def _to_endpoint(self, data: dict) -> Endpoint:
return Endpoint(
id=data.get("id", ""),
mac=data.get("mac", ""),
group=data.get("groupId", ""),
status=data.get("staticGroupAssignment", "unknown")
)
Batch Operations
from typing import Callable, TypeVar, Iterator
from dataclasses import dataclass
import time
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class BatchResult:
success: list
failed: list[tuple[object, Exception]]
def batch_process(
items: list[T],
processor: Callable[[T], R],
batch_size: int = 100,
delay: float = 0.1,
on_progress: Callable[[int, int], None] | None = None
) -> BatchResult:
"""Process items in batches with error handling."""
success = []
failed = []
for i, item in enumerate(items):
try:
result = processor(item)
success.append(result)
except Exception as e:
failed.append((item, e))
if on_progress:
on_progress(i + 1, len(items))
if (i + 1) % batch_size == 0:
time.sleep(delay)
return BatchResult(success=success, failed=failed)
# Usage with progress bar
from rich.progress import Progress
def update_endpoints(endpoints: list[dict], new_group: str):
with Progress() as progress:
task = progress.add_task("Updating...", total=len(endpoints))
def process(ep):
client.update_endpoint(ep["id"], group=new_group)
return ep["id"]
def on_progress(current, total):
progress.update(task, completed=current)
result = batch_process(
endpoints,
process,
batch_size=50,
delay=0.5,
on_progress=on_progress
)
print(f"Success: {len(result.success)}, Failed: {len(result.failed)}")
Error Handling
from dataclasses import dataclass
# Custom exceptions
class NetAPIError(Exception):
"""Base exception for netapi."""
pass
class AuthenticationError(NetAPIError):
"""Authentication failed."""
pass
class ResourceNotFoundError(NetAPIError):
"""Resource not found."""
def __init__(self, resource_type: str, identifier: str):
self.resource_type = resource_type
self.identifier = identifier
super().__init__(f"{resource_type} not found: {identifier}")
class APIError(NetAPIError):
"""API request failed."""
def __init__(self, status_code: int, message: str, response: dict | None = None):
self.status_code = status_code
self.response = response
super().__init__(f"HTTP {status_code}: {message}")
# Usage in client
def get_endpoint(self, id: str) -> dict:
try:
response = self.client.get(f"/endpoint/{id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise AuthenticationError("Invalid credentials")
if e.response.status_code == 404:
raise ResourceNotFoundError("Endpoint", id)
raise APIError(
e.response.status_code,
e.response.text,
e.response.json() if e.response.content else None
)
# Usage in CLI
@ise.command()
@click.argument("id")
@pass_ctx
def get(ctx: AppContext, id: str):
try:
endpoint = client.get_endpoint(id)
output(ctx, endpoint)
except ResourceNotFoundError as e:
ctx.console.print(f"[red]Not found: {e.identifier}[/red]")
raise SystemExit(1)
except AuthenticationError:
ctx.console.print("[red]Authentication failed. Check credentials.[/red]")
raise SystemExit(2)
except APIError as e:
ctx.console.print(f"[red]API error: {e}[/red]")
if ctx.debug and e.response:
ctx.console.print_json(data=e.response)
raise SystemExit(3)
Logging
import logging
from rich.logging import RichHandler
def setup_logging(debug: bool = False) -> logging.Logger:
"""Configure logging with Rich handler."""
level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=level,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True)]
)
# Silence noisy libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
return logging.getLogger("netapi")
# Usage
logger = setup_logging(debug=True)
logger.info("Starting operation")
logger.debug("Request: %s", request_data)
logger.error("Operation failed: %s", error)
Summary
These patterns form the foundation of production infrastructure automation:
-
Base Client: Reusable HTTP client with retry logic
-
CLI Architecture: Click groups with shared context
-
Output Formatting: Table/JSON/CSV with Rich
-
Configuration: File + environment + CLI priority
-
Repository Pattern: Abstract data access
-
Batch Operations: Progress tracking, error handling
-
Custom Exceptions: Meaningful error types
-
Logging: Structured logging with Rich
Apply these patterns to build professional-grade automation tools.