Python Async

Asynchronous programming with asyncio coroutines, tasks, and event loops.

Core Concepts

async def and await — define a coroutine and wait for its result
import asyncio

async def fetch_config(host: str) -> str:
    """Coroutine -- does not run until awaited."""
    reader, writer = await asyncio.open_connection(host, 22)
    data = await reader.read(4096)
    writer.close()
    await writer.wait_closed()
    return data.decode()

# Run from synchronous code
result = asyncio.run(fetch_config("10.50.1.10"))
Why async matters for network automation — one thread, many connections
# Synchronous: 50 devices x 2 seconds each = 100 seconds
# Asynchronous: 50 devices concurrently = ~2 seconds

# The event loop switches between coroutines during I/O waits.
# No threads, no locks, no race conditions on shared data.

Running Coroutines

asyncio.run — entry point from synchronous code, creates event loop and runs until complete
async def main():
    config = await fetch_config("sw-core-01")
    print(config)

asyncio.run(main())    # blocks until main() finishes
await — pause this coroutine until the awaited one completes
async def backup_device(host: str):
    config = await fetch_config(host)         # pause here, let other coroutines run
    await save_to_file(host, config)          # pause again during file I/O
    return f"{host}: {len(config)} bytes"

Concurrent Execution

asyncio.gather — run multiple coroutines concurrently, collect all results
async def backup_all(hosts: list[str]) -> list[str]:
    tasks = [fetch_config(host) for host in hosts]
    results = await asyncio.gather(*tasks)
    return results

# All hosts contacted simultaneously, results in same order as input
configs = asyncio.run(backup_all(["sw1", "sw2", "sw3", "rtr1"]))
asyncio.gather with error handling — return_exceptions=True prevents one failure from canceling all
async def backup_all_safe(hosts: list[str]):
    results = await asyncio.gather(
        *[fetch_config(h) for h in hosts],
        return_exceptions=True,
    )
    for host, result in zip(hosts, results):
        if isinstance(result, Exception):
            print(f"{host}: FAILED - {result}")
        else:
            print(f"{host}: {len(result)} bytes")
asyncio.create_task — schedule a coroutine to run in the background
async def monitor(hosts: list[str]):
    # Start all pings as background tasks
    tasks = [asyncio.create_task(ping(host), name=host) for host in hosts]

    # Do other work while pings run...
    await process_something_else()

    # Now collect results
    for task in tasks:
        result = await task
        print(f"{task.get_name()}: {'UP' if result else 'DOWN'}")

Timeouts and Cancellation

asyncio.wait_for — cancel a coroutine if it takes too long
async def fetch_with_timeout(host: str) -> str | None:
    try:
        return await asyncio.wait_for(fetch_config(host), timeout=10.0)
    except asyncio.TimeoutError:
        print(f"{host}: timed out after 10s")
        return None
asyncio.TaskGroup (Python 3.11+) — structured concurrency, auto-cancels on failure
async def backup_all(hosts: list[str]):
    results = {}
    async with asyncio.TaskGroup() as tg:
        for host in hosts:
            tg.create_task(fetch_and_store(host, results))
    # All tasks complete or all canceled if one raises
    return results

aiohttp

HTTP GET with aiohttp — async HTTP client for API calls
import aiohttp

async def get_ise_endpoints() -> list[dict]:
    async with aiohttp.ClientSession() as session:
        async with session.get(
            "https://ise-01:9060/ers/config/endpoint",
            headers={"Accept": "application/json"},
            ssl=False,
        ) as resp:
            data = await resp.json()
            return data["SearchResult"]["resources"]
Multiple API calls concurrently — fetch from several sources at once
async def fetch_all_apis():
    async with aiohttp.ClientSession() as session:
        endpoints = fetch_endpoints(session)
        groups = fetch_groups(session)
        policies = fetch_policies(session)

        results = await asyncio.gather(endpoints, groups, policies)
        return dict(zip(["endpoints", "groups", "policies"], results))

async def fetch_endpoints(session: aiohttp.ClientSession) -> dict:
    async with session.get(url, headers=headers, ssl=False) as resp:
        return await resp.json()
POST with aiohttp — send data to an API
async def create_endpoint(mac: str, group_id: str):
    payload = {
        "ERSEndPoint": {
            "mac": mac,
            "groupId": group_id,
        }
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://ise-01:9060/ers/config/endpoint",
            json=payload,
            headers={"Content-Type": "application/json"},
            ssl=False,
        ) as resp:
            if resp.status == 201:
                return await resp.json()
            raise RuntimeError(f"Failed: {resp.status}")

Async Context Managers

Async with — resource management for async operations
async def process():
    async with aiohttp.ClientSession() as session:   # async context manager
        async with session.get(url) as response:      # another one, nested
            data = await response.json()
    # session is closed here, even if an exception occurred
Custom async context manager — aenter and aexit for async resources
class AsyncSSHSession:
    def __init__(self, host: str):
        self.host = host

    async def __aenter__(self):
        self.conn = await asyncssh.connect(self.host)
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.conn.close()
        return False

async def backup(host: str):
    async with AsyncSSHSession(host) as conn:
        result = await conn.run("show running-config")
        return result.stdout
@asynccontextmanager — generator-based, cleaner than class-based
from contextlib import asynccontextmanager

@asynccontextmanager
async def ssh_session(host: str):
    conn = await asyncssh.connect(host)
    try:
        yield conn
    finally:
        conn.close()

async def backup(host: str):
    async with ssh_session(host) as conn:
        result = await conn.run("show running-config")

Async Iteration

async for — iterate over async data sources
async def read_log_stream(host: str):
    async with connect_syslog(host) as stream:
        async for line in stream:
            if "CRITICAL" in line:
                await send_alert(line)

Semaphore (Rate Limiting)

Semaphore — limit concurrent operations to avoid overwhelming targets
async def backup_all(hosts: list[str], max_concurrent: int = 10):
    sem = asyncio.Semaphore(max_concurrent)

    async def limited_backup(host: str):
        async with sem:    # at most 10 coroutines enter this block
            return await fetch_config(host)

    return await asyncio.gather(*[limited_backup(h) for h in hosts])

Practical Async: Device Scanner

Concurrent device scanner — ping sweep with rate limiting and timeout
import asyncio

async def ping(host: str, timeout: float = 2.0) -> tuple[str, bool]:
    try:
        proc = await asyncio.create_subprocess_exec(
            "ping", "-c", "1", "-W", str(int(timeout)), host,
            stdout=asyncio.subprocess.DEVNULL,
            stderr=asyncio.subprocess.DEVNULL,
        )
        await asyncio.wait_for(proc.wait(), timeout=timeout + 1)
        return host, proc.returncode == 0
    except asyncio.TimeoutError:
        return host, False

async def scan_subnet(subnet: str, max_concurrent: int = 50):
    sem = asyncio.Semaphore(max_concurrent)
    base = subnet.rsplit(".", 1)[0]

    async def limited_ping(host):
        async with sem:
            return await ping(host)

    tasks = [limited_ping(f"{base}.{i}") for i in range(1, 255)]
    results = await asyncio.gather(*tasks)

    alive = [host for host, up in results if up]
    print(f"Found {len(alive)} hosts up in {subnet}")
    return alive

if __name__ == "__main__":
    hosts = asyncio.run(scan_subnet("10.50.1.0"))