Python Requests

HTTP requests and API interactions with requests library.

Basic Requests

import requests

# GET request
response = requests.get("https://api.example.com/data")
print(response.status_code)  # 200
print(response.json())       # Parse JSON response

# POST with JSON body
response = requests.post(
    "https://api.example.com/create",
    json={"name": "value", "count": 42}
)

# POST with form data
response = requests.post(
    "https://api.example.com/form",
    data={"field1": "value1", "field2": "value2"}
)

# PUT, PATCH, DELETE
requests.put("https://api.example.com/update/1", json={"name": "updated"})
requests.patch("https://api.example.com/partial/1", json={"status": "active"})
requests.delete("https://api.example.com/delete/1")

Headers and Authentication

import requests

# Custom headers
response = requests.get(
    "https://api.example.com/data",
    headers={
        "Authorization": "Bearer eyJhbGciOiJIUzI1NiIs...",
        "Accept": "application/json",
        "X-Custom-Header": "value"
    }
)

# Basic auth
response = requests.get(
    "https://api.example.com/data",
    auth=("username", "password")
)

# Bearer token pattern
token = "eyJhbGciOiJIUzI1NiIs..."
headers = {"Authorization": f"Bearer {token}"}
response = requests.get("https://api.example.com/data", headers=headers)

# API key patterns
# Header
requests.get(url, headers={"X-API-Key": "key123"})

# Query parameter
requests.get(url, params={"api_key": "key123"})

Sessions (Persist State)

import requests

# Session persists cookies and headers
session = requests.Session()

# Set default headers for all requests
session.headers.update({
    "Authorization": "Bearer TOKEN",
    "Accept": "application/json"
})

# All requests use session settings
r1 = session.get("https://api.example.com/endpoint1")
r2 = session.get("https://api.example.com/endpoint2")  # Same auth

# Session maintains cookies (login flow)
session.post("https://example.com/login", data={"user": "admin", "pass": "secret"})
# Subsequent requests have session cookie
response = session.get("https://example.com/dashboard")

# Close session when done
session.close()

# Or use context manager
with requests.Session() as s:
    s.auth = ("user", "pass")
    r = s.get("https://api.example.com/data")

SSL/TLS and Certificates

import requests

# Skip SSL verification (DEVELOPMENT ONLY)
response = requests.get("https://self-signed.example.com", verify=False)
# Suppress warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Custom CA bundle
response = requests.get(
    "https://internal.example.com",
    verify="/path/to/ca-bundle.pem"
)

# Client certificate (mTLS)
response = requests.get(
    "https://secure.example.com",
    cert=("/path/to/client.crt", "/path/to/client.key")
)

# Combined cert + key file
response = requests.get(
    "https://secure.example.com",
    cert="/path/to/client.pem"
)

# With CA verification
response = requests.get(
    "https://secure.example.com",
    cert=("/path/to/client.crt", "/path/to/client.key"),
    verify="/path/to/ca-bundle.pem"
)

SECURITY: Never use verify=False in production. Configure proper CA trust.

Error Handling

import requests
from requests.exceptions import (
    RequestException, HTTPError, ConnectionError,
    Timeout, TooManyRedirects
)

# Raise exception for 4xx/5xx responses
response = requests.get("https://api.example.com/data")
response.raise_for_status()  # Raises HTTPError if status >= 400

# Comprehensive error handling
try:
    response = requests.get("https://api.example.com/data", timeout=10)
    response.raise_for_status()
    data = response.json()

except HTTPError as e:
    print(f"HTTP error: {e.response.status_code} - {e.response.text}")

except ConnectionError:
    print("Connection failed - check network/hostname")

except Timeout:
    print("Request timed out")

except RequestException as e:
    print(f"Request failed: {e}")

# Check status without exception
response = requests.get("https://api.example.com/data")
if response.ok:  # status_code < 400
    print(response.json())
else:
    print(f"Error: {response.status_code}")

Timeouts and Retries

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Simple timeout (seconds)
response = requests.get("https://api.example.com", timeout=10)

# Connect and read timeout separately
response = requests.get(
    "https://api.example.com",
    timeout=(3.05, 27)  # (connect, read)
)

# Retry with backoff
retry_strategy = Retry(
    total=3,
    backoff_factor=1,  # 1s, 2s, 4s between retries
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["HEAD", "GET", "OPTIONS"]
)

adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

response = session.get("https://api.example.com/data", timeout=10)

Response Handling

import requests

response = requests.get("https://api.example.com/data")

# Status
print(response.status_code)   # 200
print(response.ok)            # True (if < 400)
print(response.reason)        # "OK"

# Headers
print(response.headers)                    # All headers
print(response.headers['Content-Type'])    # application/json

# Content
print(response.text)          # String (decoded)
print(response.content)       # Bytes (raw)
print(response.json())        # Parsed JSON

# Encoding
print(response.encoding)      # utf-8
response.encoding = 'utf-8'   # Override encoding

# URL and redirects
print(response.url)           # Final URL (after redirects)
print(response.history)       # List of redirect responses

# Cookies
print(response.cookies)
print(response.cookies['session_id'])

# Request info
print(response.request.headers)
print(response.request.body)

Pagination Patterns

import requests

def fetch_all_pages(base_url, params=None, max_pages=100):
    """Fetch all pages from paginated API."""
    params = params or {}
    page = 1
    all_results = []

    while page <= max_pages:
        params['page'] = page
        response = requests.get(base_url, params=params)
        response.raise_for_status()
        data = response.json()

        results = data.get('results', data.get('items', []))
        if not results:
            break

        all_results.extend(results)

        # Check for next page
        if not data.get('next') and page >= data.get('total_pages', page):
            break

        page += 1

    return all_results

# Offset-based pagination
def fetch_with_offset(url, limit=100):
    offset = 0
    all_items = []

    while True:
        response = requests.get(url, params={'offset': offset, 'limit': limit})
        items = response.json()['items']

        if not items:
            break

        all_items.extend(items)
        offset += limit

    return all_items

# Cursor-based pagination
def fetch_with_cursor(url):
    cursor = None
    all_items = []

    while True:
        params = {'cursor': cursor} if cursor else {}
        response = requests.get(url, params=params)
        data = response.json()

        all_items.extend(data['items'])

        cursor = data.get('next_cursor')
        if not cursor:
            break

    return all_items

ISE ERS API Pattern

import requests
import os
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

class ISEClient:
    def __init__(self, host: str, username: str, password: str):
        self.base_url = f"https://{host}:9060/ers/config"
        self.session = requests.Session()
        self.session.auth = (username, password)
        self.session.headers.update({
            "Accept": "application/json",
            "Content-Type": "application/json"
        })
        self.session.verify = False

    def get_endpoints(self, filter_value: str = None) -> list:
        url = f"{self.base_url}/endpoint"
        params = {}
        if filter_value:
            params['filter'] = f"mac.CONTAINS.{filter_value}"

        response = self.session.get(url, params=params)
        response.raise_for_status()
        return response.json().get('SearchResult', {}).get('resources', [])

    def get_endpoint_by_id(self, endpoint_id: str) -> dict:
        response = self.session.get(f"{self.base_url}/endpoint/{endpoint_id}")
        response.raise_for_status()
        return response.json()['Endpoint']

    def create_endpoint(self, mac: str, group_id: str, **kwargs) -> dict:
        payload = {
            "Endpoint": {
                "mac": mac,
                "groupId": group_id,
                **kwargs
            }
        }
        response = self.session.post(f"{self.base_url}/endpoint", json=payload)
        response.raise_for_status()
        return response.headers.get('Location')

# Usage
ise = ISEClient(
    host=os.environ['ISE_HOST'],
    username=os.environ['ISE_USER'],
    password=os.environ['ISE_PASS']
)

endpoints = ise.get_endpoints(filter_value="AA:BB")
for ep in endpoints:
    details = ise.get_endpoint_by_id(ep['id'])
    print(f"{details['mac']} - {details.get('groupId', 'N/A')}")

Vault API Pattern

import requests
import os

class VaultClient:
    def __init__(self, addr: str, token: str):
        self.addr = addr.rstrip('/')
        self.session = requests.Session()
        self.session.headers.update({
            "X-Vault-Token": token,
            "Accept": "application/json"
        })
        # For self-signed certs in dev
        self.session.verify = os.environ.get('VAULT_CACERT', True)

    def read_secret(self, path: str) -> dict:
        """Read from KV v2 secrets engine."""
        # KV v2 requires /data/ in path
        url = f"{self.addr}/v1/kv/data/{path}"
        response = self.session.get(url)
        response.raise_for_status()
        return response.json()['data']['data']

    def write_secret(self, path: str, data: dict) -> dict:
        url = f"{self.addr}/v1/kv/data/{path}"
        payload = {"data": data}
        response = self.session.post(url, json=payload)
        response.raise_for_status()
        return response.json()

    def issue_certificate(self, common_name: str, ttl: str = "8760h") -> dict:
        url = f"{self.addr}/v1/pki_int/issue/domus-client"
        payload = {
            "common_name": common_name,
            "ttl": ttl
        }
        response = self.session.post(url, json=payload)
        response.raise_for_status()
        return response.json()['data']

    def sign_ssh_key(self, public_key: str, principals: str) -> str:
        url = f"{self.addr}/v1/ssh/sign/domus-client"
        payload = {
            "public_key": public_key,
            "valid_principals": principals
        }
        response = self.session.post(url, json=payload)
        response.raise_for_status()
        return response.json()['data']['signed_key']

# Usage
vault = VaultClient(
    addr=os.environ['VAULT_ADDR'],
    token=os.environ['VAULT_TOKEN']
)

# Read secret
secret = vault.read_secret("myapp/config")
print(secret['database_url'])

# Issue certificate
cert = vault.issue_certificate("server.inside.domusdigitalis.dev")
print(cert['certificate'])

Wazuh API Pattern

import requests
import os

class WazuhClient:
    def __init__(self, api_url: str, username: str, password: str):
        self.api_url = api_url.rstrip('/')
        self.session = requests.Session()
        self.session.verify = False  # Self-signed certs

        # Authenticate and get JWT
        auth_response = self.session.post(
            f"{self.api_url}/security/user/authenticate",
            auth=(username, password)
        )
        auth_response.raise_for_status()
        token = auth_response.json()['data']['token']

        self.session.headers.update({
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        })

    def get_agents(self, status: str = None) -> list:
        params = {}
        if status:
            params['status'] = status

        response = self.session.get(f"{self.api_url}/agents", params=params)
        response.raise_for_status()
        return response.json()['data']['affected_items']

    def get_agent_alerts(self, agent_id: str, limit: int = 10) -> list:
        # Uses Wazuh Indexer API (OpenSearch)
        pass

    def restart_agent(self, agent_id: str) -> dict:
        response = self.session.put(
            f"{self.api_url}/agents/{agent_id}/restart"
        )
        response.raise_for_status()
        return response.json()

# Usage
wazuh = WazuhClient(
    api_url=os.environ['WAZUH_API_URL'],
    username=os.environ['WAZUH_API_USER'],
    password=os.environ['WAZUH_API_PASSWORD']
)

agents = wazuh.get_agents(status="active")
for agent in agents:
    print(f"{agent['id']}: {agent['name']} - {agent['status']}")

Streaming and Large Files

import requests

# Stream large file download
url = "https://example.com/large-file.zip"
with requests.get(url, stream=True) as r:
    r.raise_for_status()
    with open("large-file.zip", "wb") as f:
        for chunk in r.iter_content(chunk_size=8192):
            f.write(chunk)

# Stream with progress
import sys
with requests.get(url, stream=True) as r:
    r.raise_for_status()
    total = int(r.headers.get('content-length', 0))
    downloaded = 0

    with open("large-file.zip", "wb") as f:
        for chunk in r.iter_content(chunk_size=8192):
            f.write(chunk)
            downloaded += len(chunk)
            percent = (downloaded / total) * 100 if total else 0
            sys.stdout.write(f"\rDownloaded: {percent:.1f}%")
            sys.stdout.flush()

# Stream JSON lines
with requests.get(url, stream=True) as r:
    for line in r.iter_lines(decode_unicode=True):
        if line:
            data = json.loads(line)
            process(data)

# Upload large file
with open("large-file.zip", "rb") as f:
    response = requests.post(
        "https://example.com/upload",
        data=f,
        headers={"Content-Type": "application/octet-stream"}
    )

Concurrent Requests

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_url(url):
    """Fetch single URL."""
    try:
        response = requests.get(url, timeout=10)
        return url, response.status_code, len(response.content)
    except Exception as e:
        return url, None, str(e)

urls = [
    "https://api.example.com/1",
    "https://api.example.com/2",
    "https://api.example.com/3",
]

# Parallel requests with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(fetch_url, url): url for url in urls}

    for future in as_completed(futures):
        url, status, size = future.result()
        print(f"{url}: {status} ({size})")

# Or with requests-futures library
from requests_futures.sessions import FuturesSession

session = FuturesSession(max_workers=5)
futures = [session.get(url) for url in urls]

for future in futures:
    response = future.result()
    print(f"{response.url}: {response.status_code}")

Common Gotchas

# WRONG: Not checking status
response = requests.get(url)
data = response.json()  # May fail if 500 error

# CORRECT: Always check
response = requests.get(url)
response.raise_for_status()
data = response.json()

# WRONG: Forgetting timeout
response = requests.get(url)  # Can hang forever

# CORRECT: Always set timeout
response = requests.get(url, timeout=30)

# WRONG: verify=False in production
requests.get("https://internal.example.com", verify=False)

# CORRECT: Configure proper CA
requests.get("https://internal.example.com", verify="/path/to/ca.pem")

# WRONG: Creating new session for each request
for url in urls:
    response = requests.get(url)  # New connection each time

# CORRECT: Reuse session
session = requests.Session()
for url in urls:
    response = session.get(url)  # Connection pooling

# WRONG: JSON in data parameter
requests.post(url, data={"key": "value"})  # Sends form-encoded

# CORRECT: Use json parameter for JSON body
requests.post(url, json={"key": "value"})  # Sends JSON