Python Requests
HTTP requests and API interactions with requests library.
Basic Requests
import requests
# GET request
response = requests.get("https://api.example.com/data")
print(response.status_code) # 200
print(response.json()) # Parse JSON response
# POST with JSON body
response = requests.post(
"https://api.example.com/create",
json={"name": "value", "count": 42}
)
# POST with form data
response = requests.post(
"https://api.example.com/form",
data={"field1": "value1", "field2": "value2"}
)
# PUT, PATCH, DELETE
requests.put("https://api.example.com/update/1", json={"name": "updated"})
requests.patch("https://api.example.com/partial/1", json={"status": "active"})
requests.delete("https://api.example.com/delete/1")
Headers and Authentication
import requests
# Custom headers
response = requests.get(
"https://api.example.com/data",
headers={
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIs...",
"Accept": "application/json",
"X-Custom-Header": "value"
}
)
# Basic auth
response = requests.get(
"https://api.example.com/data",
auth=("username", "password")
)
# Bearer token pattern
token = "eyJhbGciOiJIUzI1NiIs..."
headers = {"Authorization": f"Bearer {token}"}
response = requests.get("https://api.example.com/data", headers=headers)
# API key patterns
# Header
requests.get(url, headers={"X-API-Key": "key123"})
# Query parameter
requests.get(url, params={"api_key": "key123"})
Sessions (Persist State)
import requests
# Session persists cookies and headers
session = requests.Session()
# Set default headers for all requests
session.headers.update({
"Authorization": "Bearer TOKEN",
"Accept": "application/json"
})
# All requests use session settings
r1 = session.get("https://api.example.com/endpoint1")
r2 = session.get("https://api.example.com/endpoint2") # Same auth
# Session maintains cookies (login flow)
session.post("https://example.com/login", data={"user": "admin", "pass": "secret"})
# Subsequent requests have session cookie
response = session.get("https://example.com/dashboard")
# Close session when done
session.close()
# Or use context manager
with requests.Session() as s:
s.auth = ("user", "pass")
r = s.get("https://api.example.com/data")
SSL/TLS and Certificates
import requests
# Skip SSL verification (DEVELOPMENT ONLY)
response = requests.get("https://self-signed.example.com", verify=False)
# Suppress warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Custom CA bundle
response = requests.get(
"https://internal.example.com",
verify="/path/to/ca-bundle.pem"
)
# Client certificate (mTLS)
response = requests.get(
"https://secure.example.com",
cert=("/path/to/client.crt", "/path/to/client.key")
)
# Combined cert + key file
response = requests.get(
"https://secure.example.com",
cert="/path/to/client.pem"
)
# With CA verification
response = requests.get(
"https://secure.example.com",
cert=("/path/to/client.crt", "/path/to/client.key"),
verify="/path/to/ca-bundle.pem"
)
SECURITY: Never use verify=False in production. Configure proper CA trust.
Error Handling
import requests
from requests.exceptions import (
RequestException, HTTPError, ConnectionError,
Timeout, TooManyRedirects
)
# Raise exception for 4xx/5xx responses
response = requests.get("https://api.example.com/data")
response.raise_for_status() # Raises HTTPError if status >= 400
# Comprehensive error handling
try:
response = requests.get("https://api.example.com/data", timeout=10)
response.raise_for_status()
data = response.json()
except HTTPError as e:
print(f"HTTP error: {e.response.status_code} - {e.response.text}")
except ConnectionError:
print("Connection failed - check network/hostname")
except Timeout:
print("Request timed out")
except RequestException as e:
print(f"Request failed: {e}")
# Check status without exception
response = requests.get("https://api.example.com/data")
if response.ok: # status_code < 400
print(response.json())
else:
print(f"Error: {response.status_code}")
Timeouts and Retries
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Simple timeout (seconds)
response = requests.get("https://api.example.com", timeout=10)
# Connect and read timeout separately
response = requests.get(
"https://api.example.com",
timeout=(3.05, 27) # (connect, read)
)
# Retry with backoff
retry_strategy = Retry(
total=3,
backoff_factor=1, # 1s, 2s, 4s between retries
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
response = session.get("https://api.example.com/data", timeout=10)
Response Handling
import requests
response = requests.get("https://api.example.com/data")
# Status
print(response.status_code) # 200
print(response.ok) # True (if < 400)
print(response.reason) # "OK"
# Headers
print(response.headers) # All headers
print(response.headers['Content-Type']) # application/json
# Content
print(response.text) # String (decoded)
print(response.content) # Bytes (raw)
print(response.json()) # Parsed JSON
# Encoding
print(response.encoding) # utf-8
response.encoding = 'utf-8' # Override encoding
# URL and redirects
print(response.url) # Final URL (after redirects)
print(response.history) # List of redirect responses
# Cookies
print(response.cookies)
print(response.cookies['session_id'])
# Request info
print(response.request.headers)
print(response.request.body)
Pagination Patterns
import requests
def fetch_all_pages(base_url, params=None, max_pages=100):
"""Fetch all pages from paginated API."""
params = params or {}
page = 1
all_results = []
while page <= max_pages:
params['page'] = page
response = requests.get(base_url, params=params)
response.raise_for_status()
data = response.json()
results = data.get('results', data.get('items', []))
if not results:
break
all_results.extend(results)
# Check for next page
if not data.get('next') and page >= data.get('total_pages', page):
break
page += 1
return all_results
# Offset-based pagination
def fetch_with_offset(url, limit=100):
offset = 0
all_items = []
while True:
response = requests.get(url, params={'offset': offset, 'limit': limit})
items = response.json()['items']
if not items:
break
all_items.extend(items)
offset += limit
return all_items
# Cursor-based pagination
def fetch_with_cursor(url):
cursor = None
all_items = []
while True:
params = {'cursor': cursor} if cursor else {}
response = requests.get(url, params=params)
data = response.json()
all_items.extend(data['items'])
cursor = data.get('next_cursor')
if not cursor:
break
return all_items
ISE ERS API Pattern
import requests
import os
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class ISEClient:
def __init__(self, host: str, username: str, password: str):
self.base_url = f"https://{host}:9060/ers/config"
self.session = requests.Session()
self.session.auth = (username, password)
self.session.headers.update({
"Accept": "application/json",
"Content-Type": "application/json"
})
self.session.verify = False
def get_endpoints(self, filter_value: str = None) -> list:
url = f"{self.base_url}/endpoint"
params = {}
if filter_value:
params['filter'] = f"mac.CONTAINS.{filter_value}"
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json().get('SearchResult', {}).get('resources', [])
def get_endpoint_by_id(self, endpoint_id: str) -> dict:
response = self.session.get(f"{self.base_url}/endpoint/{endpoint_id}")
response.raise_for_status()
return response.json()['Endpoint']
def create_endpoint(self, mac: str, group_id: str, **kwargs) -> dict:
payload = {
"Endpoint": {
"mac": mac,
"groupId": group_id,
**kwargs
}
}
response = self.session.post(f"{self.base_url}/endpoint", json=payload)
response.raise_for_status()
return response.headers.get('Location')
# Usage
ise = ISEClient(
host=os.environ['ISE_HOST'],
username=os.environ['ISE_USER'],
password=os.environ['ISE_PASS']
)
endpoints = ise.get_endpoints(filter_value="AA:BB")
for ep in endpoints:
details = ise.get_endpoint_by_id(ep['id'])
print(f"{details['mac']} - {details.get('groupId', 'N/A')}")
Vault API Pattern
import requests
import os
class VaultClient:
def __init__(self, addr: str, token: str):
self.addr = addr.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
"X-Vault-Token": token,
"Accept": "application/json"
})
# For self-signed certs in dev
self.session.verify = os.environ.get('VAULT_CACERT', True)
def read_secret(self, path: str) -> dict:
"""Read from KV v2 secrets engine."""
# KV v2 requires /data/ in path
url = f"{self.addr}/v1/kv/data/{path}"
response = self.session.get(url)
response.raise_for_status()
return response.json()['data']['data']
def write_secret(self, path: str, data: dict) -> dict:
url = f"{self.addr}/v1/kv/data/{path}"
payload = {"data": data}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()
def issue_certificate(self, common_name: str, ttl: str = "8760h") -> dict:
url = f"{self.addr}/v1/pki_int/issue/domus-client"
payload = {
"common_name": common_name,
"ttl": ttl
}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()['data']
def sign_ssh_key(self, public_key: str, principals: str) -> str:
url = f"{self.addr}/v1/ssh/sign/domus-client"
payload = {
"public_key": public_key,
"valid_principals": principals
}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()['data']['signed_key']
# Usage
vault = VaultClient(
addr=os.environ['VAULT_ADDR'],
token=os.environ['VAULT_TOKEN']
)
# Read secret
secret = vault.read_secret("myapp/config")
print(secret['database_url'])
# Issue certificate
cert = vault.issue_certificate("server.inside.domusdigitalis.dev")
print(cert['certificate'])
Wazuh API Pattern
import requests
import os
class WazuhClient:
def __init__(self, api_url: str, username: str, password: str):
self.api_url = api_url.rstrip('/')
self.session = requests.Session()
self.session.verify = False # Self-signed certs
# Authenticate and get JWT
auth_response = self.session.post(
f"{self.api_url}/security/user/authenticate",
auth=(username, password)
)
auth_response.raise_for_status()
token = auth_response.json()['data']['token']
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
})
def get_agents(self, status: str = None) -> list:
params = {}
if status:
params['status'] = status
response = self.session.get(f"{self.api_url}/agents", params=params)
response.raise_for_status()
return response.json()['data']['affected_items']
def get_agent_alerts(self, agent_id: str, limit: int = 10) -> list:
# Uses Wazuh Indexer API (OpenSearch)
pass
def restart_agent(self, agent_id: str) -> dict:
response = self.session.put(
f"{self.api_url}/agents/{agent_id}/restart"
)
response.raise_for_status()
return response.json()
# Usage
wazuh = WazuhClient(
api_url=os.environ['WAZUH_API_URL'],
username=os.environ['WAZUH_API_USER'],
password=os.environ['WAZUH_API_PASSWORD']
)
agents = wazuh.get_agents(status="active")
for agent in agents:
print(f"{agent['id']}: {agent['name']} - {agent['status']}")
Streaming and Large Files
import requests
# Stream large file download
url = "https://example.com/large-file.zip"
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open("large-file.zip", "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# Stream with progress
import sys
with requests.get(url, stream=True) as r:
r.raise_for_status()
total = int(r.headers.get('content-length', 0))
downloaded = 0
with open("large-file.zip", "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
percent = (downloaded / total) * 100 if total else 0
sys.stdout.write(f"\rDownloaded: {percent:.1f}%")
sys.stdout.flush()
# Stream JSON lines
with requests.get(url, stream=True) as r:
for line in r.iter_lines(decode_unicode=True):
if line:
data = json.loads(line)
process(data)
# Upload large file
with open("large-file.zip", "rb") as f:
response = requests.post(
"https://example.com/upload",
data=f,
headers={"Content-Type": "application/octet-stream"}
)
Concurrent Requests
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_url(url):
"""Fetch single URL."""
try:
response = requests.get(url, timeout=10)
return url, response.status_code, len(response.content)
except Exception as e:
return url, None, str(e)
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
]
# Parallel requests with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url, status, size = future.result()
print(f"{url}: {status} ({size})")
# Or with requests-futures library
from requests_futures.sessions import FuturesSession
session = FuturesSession(max_workers=5)
futures = [session.get(url) for url in urls]
for future in futures:
response = future.result()
print(f"{response.url}: {response.status_code}")
Common Gotchas
# WRONG: Not checking status
response = requests.get(url)
data = response.json() # May fail if 500 error
# CORRECT: Always check
response = requests.get(url)
response.raise_for_status()
data = response.json()
# WRONG: Forgetting timeout
response = requests.get(url) # Can hang forever
# CORRECT: Always set timeout
response = requests.get(url, timeout=30)
# WRONG: verify=False in production
requests.get("https://internal.example.com", verify=False)
# CORRECT: Configure proper CA
requests.get("https://internal.example.com", verify="/path/to/ca.pem")
# WRONG: Creating new session for each request
for url in urls:
response = requests.get(url) # New connection each time
# CORRECT: Reuse session
session = requests.Session()
for url in urls:
response = session.get(url) # Connection pooling
# WRONG: JSON in data parameter
requests.post(url, data={"key": "value"}) # Sends form-encoded
# CORRECT: Use json parameter for JSON body
requests.post(url, json={"key": "value"}) # Sends JSON