Python Async
Asynchronous programming with asyncio coroutines, tasks, and event loops.
Core Concepts
async def and await — define a coroutine and wait for its result
import asyncio
async def fetch_config(host: str) -> str:
"""Coroutine -- does not run until awaited."""
reader, writer = await asyncio.open_connection(host, 22)
data = await reader.read(4096)
writer.close()
await writer.wait_closed()
return data.decode()
# Run from synchronous code
result = asyncio.run(fetch_config("10.50.1.10"))
Why async matters for network automation — one thread, many connections
# Synchronous: 50 devices x 2 seconds each = 100 seconds
# Asynchronous: 50 devices concurrently = ~2 seconds
# The event loop switches between coroutines during I/O waits.
# No threads, no locks, no race conditions on shared data.
Running Coroutines
asyncio.run — entry point from synchronous code, creates event loop and runs until complete
async def main():
config = await fetch_config("sw-core-01")
print(config)
asyncio.run(main()) # blocks until main() finishes
await — pause this coroutine until the awaited one completes
async def backup_device(host: str):
config = await fetch_config(host) # pause here, let other coroutines run
await save_to_file(host, config) # pause again during file I/O
return f"{host}: {len(config)} bytes"
Concurrent Execution
asyncio.gather — run multiple coroutines concurrently, collect all results
async def backup_all(hosts: list[str]) -> list[str]:
tasks = [fetch_config(host) for host in hosts]
results = await asyncio.gather(*tasks)
return results
# All hosts contacted simultaneously, results in same order as input
configs = asyncio.run(backup_all(["sw1", "sw2", "sw3", "rtr1"]))
asyncio.gather with error handling — return_exceptions=True prevents one failure from canceling all
async def backup_all_safe(hosts: list[str]):
results = await asyncio.gather(
*[fetch_config(h) for h in hosts],
return_exceptions=True,
)
for host, result in zip(hosts, results):
if isinstance(result, Exception):
print(f"{host}: FAILED - {result}")
else:
print(f"{host}: {len(result)} bytes")
asyncio.create_task — schedule a coroutine to run in the background
async def monitor(hosts: list[str]):
# Start all pings as background tasks
tasks = [asyncio.create_task(ping(host), name=host) for host in hosts]
# Do other work while pings run...
await process_something_else()
# Now collect results
for task in tasks:
result = await task
print(f"{task.get_name()}: {'UP' if result else 'DOWN'}")
Timeouts and Cancellation
asyncio.wait_for — cancel a coroutine if it takes too long
async def fetch_with_timeout(host: str) -> str | None:
try:
return await asyncio.wait_for(fetch_config(host), timeout=10.0)
except asyncio.TimeoutError:
print(f"{host}: timed out after 10s")
return None
asyncio.TaskGroup (Python 3.11+) — structured concurrency, auto-cancels on failure
async def backup_all(hosts: list[str]):
results = {}
async with asyncio.TaskGroup() as tg:
for host in hosts:
tg.create_task(fetch_and_store(host, results))
# All tasks complete or all canceled if one raises
return results
aiohttp
HTTP GET with aiohttp — async HTTP client for API calls
import aiohttp
async def get_ise_endpoints() -> list[dict]:
async with aiohttp.ClientSession() as session:
async with session.get(
"https://ise-01:9060/ers/config/endpoint",
headers={"Accept": "application/json"},
ssl=False,
) as resp:
data = await resp.json()
return data["SearchResult"]["resources"]
Multiple API calls concurrently — fetch from several sources at once
async def fetch_all_apis():
async with aiohttp.ClientSession() as session:
endpoints = fetch_endpoints(session)
groups = fetch_groups(session)
policies = fetch_policies(session)
results = await asyncio.gather(endpoints, groups, policies)
return dict(zip(["endpoints", "groups", "policies"], results))
async def fetch_endpoints(session: aiohttp.ClientSession) -> dict:
async with session.get(url, headers=headers, ssl=False) as resp:
return await resp.json()
POST with aiohttp — send data to an API
async def create_endpoint(mac: str, group_id: str):
payload = {
"ERSEndPoint": {
"mac": mac,
"groupId": group_id,
}
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://ise-01:9060/ers/config/endpoint",
json=payload,
headers={"Content-Type": "application/json"},
ssl=False,
) as resp:
if resp.status == 201:
return await resp.json()
raise RuntimeError(f"Failed: {resp.status}")
Async Context Managers
Async with — resource management for async operations
async def process():
async with aiohttp.ClientSession() as session: # async context manager
async with session.get(url) as response: # another one, nested
data = await response.json()
# session is closed here, even if an exception occurred
Custom async context manager — aenter and aexit for async resources
class AsyncSSHSession:
def __init__(self, host: str):
self.host = host
async def __aenter__(self):
self.conn = await asyncssh.connect(self.host)
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
return False
async def backup(host: str):
async with AsyncSSHSession(host) as conn:
result = await conn.run("show running-config")
return result.stdout
@asynccontextmanager — generator-based, cleaner than class-based
from contextlib import asynccontextmanager
@asynccontextmanager
async def ssh_session(host: str):
conn = await asyncssh.connect(host)
try:
yield conn
finally:
conn.close()
async def backup(host: str):
async with ssh_session(host) as conn:
result = await conn.run("show running-config")
Async Iteration
async for — iterate over async data sources
async def read_log_stream(host: str):
async with connect_syslog(host) as stream:
async for line in stream:
if "CRITICAL" in line:
await send_alert(line)
Semaphore (Rate Limiting)
Semaphore — limit concurrent operations to avoid overwhelming targets
async def backup_all(hosts: list[str], max_concurrent: int = 10):
sem = asyncio.Semaphore(max_concurrent)
async def limited_backup(host: str):
async with sem: # at most 10 coroutines enter this block
return await fetch_config(host)
return await asyncio.gather(*[limited_backup(h) for h in hosts])
Practical Async: Device Scanner
Concurrent device scanner — ping sweep with rate limiting and timeout
import asyncio
async def ping(host: str, timeout: float = 2.0) -> tuple[str, bool]:
try:
proc = await asyncio.create_subprocess_exec(
"ping", "-c", "1", "-W", str(int(timeout)), host,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await asyncio.wait_for(proc.wait(), timeout=timeout + 1)
return host, proc.returncode == 0
except asyncio.TimeoutError:
return host, False
async def scan_subnet(subnet: str, max_concurrent: int = 50):
sem = asyncio.Semaphore(max_concurrent)
base = subnet.rsplit(".", 1)[0]
async def limited_ping(host):
async with sem:
return await ping(host)
tasks = [limited_ping(f"{base}.{i}") for i in range(1, 255)]
results = await asyncio.gather(*tasks)
alive = [host for host, up in results if up]
print(f"Found {len(alive)} hosts up in {subnet}")
return alive
if __name__ == "__main__":
hosts = asyncio.run(scan_subnet("10.50.1.0"))