Async Programming

When you need to hit 50 ISE nodes simultaneously, sync code is too slow. Async lets you do concurrent I/O efficiently.

Why Async?

# Sync: Sequential requests (slow)
# Time: 50 nodes × 2 seconds = 100 seconds
for node in nodes:
    result = check_node(node)  # Blocks for each

# Async: Concurrent requests (fast)
# Time: ~2 seconds (all at once)
results = await asyncio.gather(*[check_node(node) for node in nodes])

asyncio Basics

Coroutines

import asyncio

# Define coroutine with async def
async def fetch_data(host: str) -> dict:
    """Coroutine - doesn't run until awaited."""
    await asyncio.sleep(1)  # Non-blocking sleep
    return {"host": host, "status": "ok"}

# Running coroutines
async def main():
    # Await single coroutine
    result = await fetch_data("ise-01")
    print(result)

    # Await multiple concurrently
    results = await asyncio.gather(
        fetch_data("ise-01"),
        fetch_data("ise-02"),
        fetch_data("ise-03")
    )
    print(results)

# Entry point
asyncio.run(main())

Running Async Code

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Option 1: asyncio.run (recommended for scripts)
asyncio.run(main())

# Option 2: Get existing event loop (for notebooks, frameworks)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# Option 3: From sync function
def sync_wrapper():
    return asyncio.run(main())

await

await pauses coroutine until result is ready, allowing other coroutines to run.

async def process_nodes(nodes: list[str]) -> list[dict]:
    results = []
    for node in nodes:
        # BAD: Sequential, doesn't benefit from async
        result = await fetch_data(node)
        results.append(result)
    return results

async def process_nodes_concurrent(nodes: list[str]) -> list[dict]:
    # GOOD: Concurrent execution
    return await asyncio.gather(*[fetch_data(node) for node in nodes])

Concurrent Execution

asyncio.gather

import asyncio

async def check_node(host: str) -> dict:
    await asyncio.sleep(1)
    return {"host": host, "status": "ok"}

async def main():
    # Run all concurrently, wait for all
    results = await asyncio.gather(
        check_node("ise-01"),
        check_node("ise-02"),
        check_node("ise-03")
    )
    # results = [{"host": "ise-01", ...}, {"host": "ise-02", ...}, ...]

    # Handle errors without stopping others
    results = await asyncio.gather(
        check_node("ise-01"),
        check_node("bad-host"),  # Might fail
        check_node("ise-03"),
        return_exceptions=True  # Returns exception objects instead of raising
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Success: {result}")

asyncio.create_task

import asyncio

async def background_sync():
    while True:
        print("Syncing...")
        await asyncio.sleep(60)

async def main():
    # Start background task
    sync_task = asyncio.create_task(background_sync())

    # Do other work
    await asyncio.sleep(5)
    print("Main work done")

    # Cancel background task
    sync_task.cancel()
    try:
        await sync_task
    except asyncio.CancelledError:
        print("Sync task cancelled")

asyncio.run(main())

asyncio.wait

import asyncio

async def main():
    tasks = [
        asyncio.create_task(check_node("ise-01")),
        asyncio.create_task(check_node("ise-02")),
        asyncio.create_task(check_node("ise-03")),
    ]

    # Wait for first to complete
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(f"First result: {task.result()}")

    # Cancel remaining
    for task in pending:
        task.cancel()

    # Wait with timeout
    done, pending = await asyncio.wait(tasks, timeout=5.0)

Semaphores (Rate Limiting)

import asyncio

async def fetch_with_limit(semaphore: asyncio.Semaphore, host: str) -> dict:
    async with semaphore:  # Only N concurrent
        return await fetch_data(host)

async def main():
    # Limit to 10 concurrent requests
    semaphore = asyncio.Semaphore(10)
    hosts = [f"host-{i}" for i in range(100)]

    tasks = [fetch_with_limit(semaphore, host) for host in hosts]
    results = await asyncio.gather(*tasks)

Async HTTP with httpx

Async Client

import httpx
import asyncio

async def fetch_endpoint(client: httpx.AsyncClient, mac: str) -> dict:
    response = await client.get(f"/endpoint", params={"filter": f"mac.EQ.{mac}"})
    response.raise_for_status()
    return response.json()

async def main():
    async with httpx.AsyncClient(
        base_url="https://ise-01:9060/ers/config",
        auth=("admin", "password"),
        verify=False,
        timeout=30.0
    ) as client:
        # Single request
        data = await fetch_endpoint(client, "00:11:22:33:44:55")

        # Multiple concurrent
        macs = ["00:11:22:33:44:55", "00:11:22:33:44:56", "00:11:22:33:44:57"]
        results = await asyncio.gather(*[
            fetch_endpoint(client, mac) for mac in macs
        ])

asyncio.run(main())

Async API Client

import httpx
import asyncio
from typing import AsyncIterator

class AsyncISEClient:
    def __init__(self, base_url: str, username: str, password: str):
        self.client = httpx.AsyncClient(
            base_url=f"{base_url}/ers/config",
            auth=(username, password),
            verify=False,
            timeout=30.0,
            headers={"Accept": "application/json"}
        )
        self._semaphore = asyncio.Semaphore(10)  # Rate limit

    async def _request(self, method: str, url: str, **kwargs) -> dict:
        async with self._semaphore:
            response = await self.client.request(method, url, **kwargs)
            response.raise_for_status()
            return response.json() if response.content else {}

    async def get_endpoints(self, page_size: int = 100) -> AsyncIterator[dict]:
        """Async generator for endpoints."""
        page = 1
        while True:
            data = await self._request(
                "GET", "/endpoint",
                params={"page": page, "size": page_size}
            )
            resources = data.get("SearchResult", {}).get("resources", [])

            # Fetch details concurrently
            details = await asyncio.gather(*[
                self._request("GET", f"/endpoint/{r['id']}")
                for r in resources
            ])

            for detail in details:
                yield detail.get("ERSEndPoint", detail)

            total = data.get("SearchResult", {}).get("total", 0)
            if page * page_size >= total:
                break
            page += 1

    async def check_nodes(self, hosts: list[str]) -> list[dict]:
        """Check multiple nodes concurrently."""
        async def check_one(host: str) -> dict:
            try:
                response = await self.client.get(
                    f"https://{host}:9060/ers/config/deploymentinfo/getAllInfo"
                )
                return {"host": host, "status": "ok", "data": response.json()}
            except Exception as e:
                return {"host": host, "status": "error", "error": str(e)}

        return await asyncio.gather(*[check_one(host) for host in hosts])

    async def close(self):
        await self.client.aclose()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()

# Usage
async def main():
    async with AsyncISEClient("https://ise-01:9060", "admin", "password") as client:
        # Check all nodes
        nodes = ["ise-01", "ise-02", "ise-03", "ise-04"]
        results = await client.check_nodes(nodes)
        for r in results:
            print(f"{r['host']}: {r['status']}")

        # Iterate endpoints
        async for endpoint in client.get_endpoints():
            print(endpoint["mac"])

asyncio.run(main())

Async Patterns

Producer-Consumer

import asyncio
from collections.abc import AsyncIterator

async def producer(queue: asyncio.Queue, items: list[str]):
    """Put items into queue."""
    for item in items:
        await queue.put(item)
    # Signal end
    await queue.put(None)

async def consumer(queue: asyncio.Queue, worker_id: int):
    """Process items from queue."""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"Worker {worker_id} processing {item}")
        await asyncio.sleep(0.5)  # Simulate work
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    items = [f"item-{i}" for i in range(20)]

    # Start producer
    producer_task = asyncio.create_task(producer(queue, items))

    # Start multiple consumers
    consumers = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]

    # Wait for producer
    await producer_task

    # Signal consumers to stop
    for _ in consumers:
        await queue.put(None)

    # Wait for consumers
    await asyncio.gather(*consumers)

asyncio.run(main())

Async Context Manager

import asyncio

class AsyncDatabase:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self._connection = None

    async def connect(self):
        print(f"Connecting to {self.connection_string}")
        await asyncio.sleep(0.5)
        self._connection = "connected"

    async def disconnect(self):
        print("Disconnecting")
        await asyncio.sleep(0.1)
        self._connection = None

    async def query(self, sql: str) -> list:
        if not self._connection:
            raise RuntimeError("Not connected")
        await asyncio.sleep(0.2)
        return [{"result": "data"}]

    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, *args):
        await self.disconnect()

# Usage
async def main():
    async with AsyncDatabase("postgres://localhost/db") as db:
        results = await db.query("SELECT * FROM users")
        print(results)

Async Generator

import asyncio
from typing import AsyncIterator

async def fetch_pages(client, url: str) -> AsyncIterator[dict]:
    """Async generator for paginated results."""
    page = 1
    while True:
        data = await client.get(url, params={"page": page})
        items = data.get("items", [])
        if not items:
            break

        for item in items:
            yield item

        page += 1

# Usage
async def main():
    async with httpx.AsyncClient() as client:
        async for item in fetch_pages(client, "/api/endpoints"):
            print(item)

Timeout Handling

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    # Timeout single operation
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Operation timed out")

    # Timeout with gather
    async def check_with_timeout(host: str, timeout: float = 5.0):
        try:
            return await asyncio.wait_for(check_node(host), timeout=timeout)
        except asyncio.TimeoutError:
            return {"host": host, "status": "timeout"}

    results = await asyncio.gather(*[
        check_with_timeout(host) for host in hosts
    ])

Mixing Sync and Async

Run Async from Sync

import asyncio

async def async_operation():
    await asyncio.sleep(1)
    return "done"

# In sync code
def sync_function():
    return asyncio.run(async_operation())

# Or get existing loop (careful with this)
def sync_function_existing_loop():
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(async_operation())

Run Sync from Async

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    """CPU-bound or blocking operation."""
    import time
    time.sleep(2)
    return "result"

async def main():
    loop = asyncio.get_event_loop()

    # Run in thread pool (for I/O bound)
    result = await loop.run_in_executor(None, blocking_io)

    # Or with explicit pool
    with ThreadPoolExecutor(max_workers=4) as pool:
        result = await loop.run_in_executor(pool, blocking_io)

asyncio.run(main())

Best Practices

# 1. Use async with for clients
async with httpx.AsyncClient() as client:
    pass  # Client properly closed

# 2. Use semaphores for rate limiting
semaphore = asyncio.Semaphore(10)
async with semaphore:
    await make_request()

# 3. Handle cancellation gracefully
async def worker():
    try:
        while True:
            await do_work()
    except asyncio.CancelledError:
        await cleanup()
        raise  # Re-raise to signal cancellation

# 4. Use gather with return_exceptions for resilience
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
    if isinstance(result, Exception):
        handle_error(result)

# 5. Don't mix asyncio.run() calls
# BAD: Nested asyncio.run()
async def outer():
    asyncio.run(inner())  # Error!

# GOOD: await directly
async def outer():
    await inner()

# 6. Use asyncio.create_task for fire-and-forget
task = asyncio.create_task(background_job())
# Don't forget to await or cancel later

Next Module

Testing - pytest, fixtures, mocking, coverage.