Async Programming
When you need to hit 50 ISE nodes simultaneously, sync code is too slow. Async lets you do concurrent I/O efficiently.
Why Async?
# Sync: Sequential requests (slow)
# Time: 50 nodes × 2 seconds = 100 seconds
for node in nodes:
result = check_node(node) # Blocks for each
# Async: Concurrent requests (fast)
# Time: ~2 seconds (all at once)
results = await asyncio.gather(*[check_node(node) for node in nodes])
asyncio Basics
Coroutines
import asyncio
# Define coroutine with async def
async def fetch_data(host: str) -> dict:
"""Coroutine - doesn't run until awaited."""
await asyncio.sleep(1) # Non-blocking sleep
return {"host": host, "status": "ok"}
# Running coroutines
async def main():
# Await single coroutine
result = await fetch_data("ise-01")
print(result)
# Await multiple concurrently
results = await asyncio.gather(
fetch_data("ise-01"),
fetch_data("ise-02"),
fetch_data("ise-03")
)
print(results)
# Entry point
asyncio.run(main())
Running Async Code
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Option 1: asyncio.run (recommended for scripts)
asyncio.run(main())
# Option 2: Get existing event loop (for notebooks, frameworks)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Option 3: From sync function
def sync_wrapper():
return asyncio.run(main())
await
await pauses coroutine until result is ready, allowing other coroutines to run.
async def process_nodes(nodes: list[str]) -> list[dict]:
results = []
for node in nodes:
# BAD: Sequential, doesn't benefit from async
result = await fetch_data(node)
results.append(result)
return results
async def process_nodes_concurrent(nodes: list[str]) -> list[dict]:
# GOOD: Concurrent execution
return await asyncio.gather(*[fetch_data(node) for node in nodes])
Concurrent Execution
asyncio.gather
import asyncio
async def check_node(host: str) -> dict:
await asyncio.sleep(1)
return {"host": host, "status": "ok"}
async def main():
# Run all concurrently, wait for all
results = await asyncio.gather(
check_node("ise-01"),
check_node("ise-02"),
check_node("ise-03")
)
# results = [{"host": "ise-01", ...}, {"host": "ise-02", ...}, ...]
# Handle errors without stopping others
results = await asyncio.gather(
check_node("ise-01"),
check_node("bad-host"), # Might fail
check_node("ise-03"),
return_exceptions=True # Returns exception objects instead of raising
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Success: {result}")
asyncio.create_task
import asyncio
async def background_sync():
while True:
print("Syncing...")
await asyncio.sleep(60)
async def main():
# Start background task
sync_task = asyncio.create_task(background_sync())
# Do other work
await asyncio.sleep(5)
print("Main work done")
# Cancel background task
sync_task.cancel()
try:
await sync_task
except asyncio.CancelledError:
print("Sync task cancelled")
asyncio.run(main())
asyncio.wait
import asyncio
async def main():
tasks = [
asyncio.create_task(check_node("ise-01")),
asyncio.create_task(check_node("ise-02")),
asyncio.create_task(check_node("ise-03")),
]
# Wait for first to complete
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(f"First result: {task.result()}")
# Cancel remaining
for task in pending:
task.cancel()
# Wait with timeout
done, pending = await asyncio.wait(tasks, timeout=5.0)
Semaphores (Rate Limiting)
import asyncio
async def fetch_with_limit(semaphore: asyncio.Semaphore, host: str) -> dict:
async with semaphore: # Only N concurrent
return await fetch_data(host)
async def main():
# Limit to 10 concurrent requests
semaphore = asyncio.Semaphore(10)
hosts = [f"host-{i}" for i in range(100)]
tasks = [fetch_with_limit(semaphore, host) for host in hosts]
results = await asyncio.gather(*tasks)
Async HTTP with httpx
Async Client
import httpx
import asyncio
async def fetch_endpoint(client: httpx.AsyncClient, mac: str) -> dict:
response = await client.get(f"/endpoint", params={"filter": f"mac.EQ.{mac}"})
response.raise_for_status()
return response.json()
async def main():
async with httpx.AsyncClient(
base_url="https://ise-01:9060/ers/config",
auth=("admin", "password"),
verify=False,
timeout=30.0
) as client:
# Single request
data = await fetch_endpoint(client, "00:11:22:33:44:55")
# Multiple concurrent
macs = ["00:11:22:33:44:55", "00:11:22:33:44:56", "00:11:22:33:44:57"]
results = await asyncio.gather(*[
fetch_endpoint(client, mac) for mac in macs
])
asyncio.run(main())
Async API Client
import httpx
import asyncio
from typing import AsyncIterator
class AsyncISEClient:
def __init__(self, base_url: str, username: str, password: str):
self.client = httpx.AsyncClient(
base_url=f"{base_url}/ers/config",
auth=(username, password),
verify=False,
timeout=30.0,
headers={"Accept": "application/json"}
)
self._semaphore = asyncio.Semaphore(10) # Rate limit
async def _request(self, method: str, url: str, **kwargs) -> dict:
async with self._semaphore:
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
return response.json() if response.content else {}
async def get_endpoints(self, page_size: int = 100) -> AsyncIterator[dict]:
"""Async generator for endpoints."""
page = 1
while True:
data = await self._request(
"GET", "/endpoint",
params={"page": page, "size": page_size}
)
resources = data.get("SearchResult", {}).get("resources", [])
# Fetch details concurrently
details = await asyncio.gather(*[
self._request("GET", f"/endpoint/{r['id']}")
for r in resources
])
for detail in details:
yield detail.get("ERSEndPoint", detail)
total = data.get("SearchResult", {}).get("total", 0)
if page * page_size >= total:
break
page += 1
async def check_nodes(self, hosts: list[str]) -> list[dict]:
"""Check multiple nodes concurrently."""
async def check_one(host: str) -> dict:
try:
response = await self.client.get(
f"https://{host}:9060/ers/config/deploymentinfo/getAllInfo"
)
return {"host": host, "status": "ok", "data": response.json()}
except Exception as e:
return {"host": host, "status": "error", "error": str(e)}
return await asyncio.gather(*[check_one(host) for host in hosts])
async def close(self):
await self.client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
# Usage
async def main():
async with AsyncISEClient("https://ise-01:9060", "admin", "password") as client:
# Check all nodes
nodes = ["ise-01", "ise-02", "ise-03", "ise-04"]
results = await client.check_nodes(nodes)
for r in results:
print(f"{r['host']}: {r['status']}")
# Iterate endpoints
async for endpoint in client.get_endpoints():
print(endpoint["mac"])
asyncio.run(main())
Async Patterns
Producer-Consumer
import asyncio
from collections.abc import AsyncIterator
async def producer(queue: asyncio.Queue, items: list[str]):
"""Put items into queue."""
for item in items:
await queue.put(item)
# Signal end
await queue.put(None)
async def consumer(queue: asyncio.Queue, worker_id: int):
"""Process items from queue."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Worker {worker_id} processing {item}")
await asyncio.sleep(0.5) # Simulate work
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
items = [f"item-{i}" for i in range(20)]
# Start producer
producer_task = asyncio.create_task(producer(queue, items))
# Start multiple consumers
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Wait for producer
await producer_task
# Signal consumers to stop
for _ in consumers:
await queue.put(None)
# Wait for consumers
await asyncio.gather(*consumers)
asyncio.run(main())
Async Context Manager
import asyncio
class AsyncDatabase:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self._connection = None
async def connect(self):
print(f"Connecting to {self.connection_string}")
await asyncio.sleep(0.5)
self._connection = "connected"
async def disconnect(self):
print("Disconnecting")
await asyncio.sleep(0.1)
self._connection = None
async def query(self, sql: str) -> list:
if not self._connection:
raise RuntimeError("Not connected")
await asyncio.sleep(0.2)
return [{"result": "data"}]
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, *args):
await self.disconnect()
# Usage
async def main():
async with AsyncDatabase("postgres://localhost/db") as db:
results = await db.query("SELECT * FROM users")
print(results)
Async Generator
import asyncio
from typing import AsyncIterator
async def fetch_pages(client, url: str) -> AsyncIterator[dict]:
"""Async generator for paginated results."""
page = 1
while True:
data = await client.get(url, params={"page": page})
items = data.get("items", [])
if not items:
break
for item in items:
yield item
page += 1
# Usage
async def main():
async with httpx.AsyncClient() as client:
async for item in fetch_pages(client, "/api/endpoints"):
print(item)
Timeout Handling
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
# Timeout single operation
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out")
# Timeout with gather
async def check_with_timeout(host: str, timeout: float = 5.0):
try:
return await asyncio.wait_for(check_node(host), timeout=timeout)
except asyncio.TimeoutError:
return {"host": host, "status": "timeout"}
results = await asyncio.gather(*[
check_with_timeout(host) for host in hosts
])
Mixing Sync and Async
Run Async from Sync
import asyncio
async def async_operation():
await asyncio.sleep(1)
return "done"
# In sync code
def sync_function():
return asyncio.run(async_operation())
# Or get existing loop (careful with this)
def sync_function_existing_loop():
loop = asyncio.get_event_loop()
return loop.run_until_complete(async_operation())
Run Sync from Async
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
"""CPU-bound or blocking operation."""
import time
time.sleep(2)
return "result"
async def main():
loop = asyncio.get_event_loop()
# Run in thread pool (for I/O bound)
result = await loop.run_in_executor(None, blocking_io)
# Or with explicit pool
with ThreadPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, blocking_io)
asyncio.run(main())
Best Practices
# 1. Use async with for clients
async with httpx.AsyncClient() as client:
pass # Client properly closed
# 2. Use semaphores for rate limiting
semaphore = asyncio.Semaphore(10)
async with semaphore:
await make_request()
# 3. Handle cancellation gracefully
async def worker():
try:
while True:
await do_work()
except asyncio.CancelledError:
await cleanup()
raise # Re-raise to signal cancellation
# 4. Use gather with return_exceptions for resilience
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
handle_error(result)
# 5. Don't mix asyncio.run() calls
# BAD: Nested asyncio.run()
async def outer():
asyncio.run(inner()) # Error!
# GOOD: await directly
async def outer():
await inner()
# 6. Use asyncio.create_task for fire-and-forget
task = asyncio.create_task(background_job())
# Don't forget to await or cancel later
Next Module
Testing - pytest, fixtures, mocking, coverage.