Python Files

File operations, JSON/YAML parsing, and path handling.

Path Operations (pathlib)

from pathlib import Path

# Current working directory
cwd = Path.cwd()
home = Path.home()

# Path construction (OS-agnostic)
config_dir = home / ".config" / "myapp"
log_file = Path("/var/log") / "app.log"

# Path properties
p = Path("/etc/ssl/certs/ca-certificates.crt")
p.name          # "ca-certificates.crt"
p.stem          # "ca-certificates"
p.suffix        # ".crt"
p.suffixes      # [".crt"]
p.parent        # Path("/etc/ssl/certs")
p.parents[0]    # Path("/etc/ssl/certs")
p.parents[1]    # Path("/etc/ssl")
p.parts         # ("/", "etc", "ssl", "certs", "ca-certificates.crt")

# Change extension
new_path = p.with_suffix(".pem")  # /etc/ssl/certs/ca-certificates.pem
new_path = p.with_name("new.crt") # /etc/ssl/certs/new.crt

# Path resolution
relative = Path("../configs/app.yaml")
absolute = relative.resolve()  # Full absolute path
absolute = relative.absolute() # Same but doesn't resolve symlinks

# String conversion
str(p)  # "/etc/ssl/certs/ca-certificates.crt"

TIP: Always use pathlib.Path over os.path for new code. It’s more readable and chainable.

Path Existence and Type Checks

from pathlib import Path

p = Path("/etc/passwd")

# Existence checks
p.exists()       # True/False
p.is_file()      # True if regular file
p.is_dir()       # True if directory
p.is_symlink()   # True if symbolic link
p.is_mount()     # True if mount point
p.is_socket()    # True if Unix socket
p.is_fifo()      # True if FIFO/named pipe
p.is_block_device()
p.is_char_device()

# Permissions
import os
os.access(p, os.R_OK)  # Readable?
os.access(p, os.W_OK)  # Writable?
os.access(p, os.X_OK)  # Executable?

# Stat info
stat = p.stat()
stat.st_size    # Size in bytes
stat.st_mtime   # Modification time (epoch)
stat.st_mode    # Permissions (octal)
stat.st_uid     # Owner UID
stat.st_gid     # Group GID

# Human-readable size
def human_size(size: int) -> str:
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if size < 1024:
            return f"{size:.1f}{unit}"
        size /= 1024
    return f"{size:.1f}PB"

print(human_size(p.stat().st_size))

# Modification time as datetime
from datetime import datetime
mtime = datetime.fromtimestamp(p.stat().st_mtime)
print(f"Modified: {mtime.isoformat()}")

Directory Traversal and Globbing

from pathlib import Path

# List directory contents
for item in Path("/etc").iterdir():
    print(item.name)

# Glob patterns (non-recursive)
for f in Path("/etc/ssl").glob("*.conf"):
    print(f)

# Recursive glob
for f in Path("/var/log").glob("**/*.log"):
    print(f)

# rglob (shortcut for recursive glob)
for f in Path(".").rglob("*.py"):
    print(f)

# Filtered iteration
configs = [f for f in Path("/etc").iterdir() if f.suffix == ".conf"]
dirs = [f for f in Path(".").iterdir() if f.is_dir()]
files = [f for f in Path(".").iterdir() if f.is_file()]

# Find files by pattern with size filter
large_logs = [
    f for f in Path("/var/log").rglob("*.log")
    if f.stat().st_size > 1024 * 1024  # > 1MB
]

# Infrastructure pattern: Find all certificates
def find_certs(root: Path) -> list[Path]:
    """Find all certificate files in a directory tree."""
    patterns = ["*.pem", "*.crt", "*.cer", "*.der"]
    certs = []
    for pattern in patterns:
        certs.extend(root.rglob(pattern))
    return certs

certs = find_certs(Path("/etc/ssl"))
for cert in certs:
    print(f"{cert}: {cert.stat().st_size} bytes")

File Reading and Writing

from pathlib import Path

# Simple read/write (entire file)
content = Path("file.txt").read_text()
Path("file.txt").write_text("new content")

# Binary read/write
data = Path("file.bin").read_bytes()
Path("file.bin").write_bytes(b"\x00\x01\x02")

# With encoding
content = Path("file.txt").read_text(encoding="utf-8")
Path("file.txt").write_text(content, encoding="utf-8")

# Line by line (memory efficient for large files)
with open("large.log", "r") as f:
    for line in f:
        process(line.rstrip("\n"))

# Read all lines as list
lines = Path("file.txt").read_text().splitlines()

# Read with context manager (preferred)
with Path("file.txt").open("r") as f:
    content = f.read()

# Write with context manager
with Path("file.txt").open("w") as f:
    f.write("line 1\n")
    f.write("line 2\n")

# Append mode
with Path("file.txt").open("a") as f:
    f.write("appended line\n")

# Read specific number of bytes
with open("file.bin", "rb") as f:
    header = f.read(4)  # First 4 bytes
    f.seek(100)         # Jump to byte 100
    chunk = f.read(50)  # Read 50 bytes

# Write multiple lines
lines = ["line 1", "line 2", "line 3"]
Path("file.txt").write_text("\n".join(lines) + "\n")

# Or with writelines
with open("file.txt", "w") as f:
    f.writelines(line + "\n" for line in lines)

GOTCHA: write_text() and write_bytes() overwrite completely. No append mode.

JSON Handling

import json
from pathlib import Path
from typing import Any

# Read JSON file
data = json.loads(Path("config.json").read_text())

# Write JSON file (pretty printed)
Path("config.json").write_text(
    json.dumps(data, indent=2, sort_keys=True)
)

# Compact JSON (no whitespace)
compact = json.dumps(data, separators=(",", ":"))

# Handle non-serializable types
from datetime import datetime

def json_serializer(obj):
    """Custom serializer for non-JSON types."""
    if isinstance(obj, datetime):
        return obj.isoformat()
    if isinstance(obj, Path):
        return str(obj)
    if isinstance(obj, bytes):
        return obj.decode("utf-8")
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

data = {"timestamp": datetime.now(), "path": Path("/etc/passwd")}
json_str = json.dumps(data, default=json_serializer, indent=2)

# jq-like operations in Python
def jq_get(data: dict, path: str) -> Any:
    """Extract nested value: jq_get(data, "a.b.c") -> data["a"]["b"]["c"]"""
    result = data
    for key in path.split("."):
        if isinstance(result, list):
            result = result[int(key)]
        else:
            result = result[key]
    return result

# Usage
config = {"server": {"host": "vault-01", "port": 8200}}
host = jq_get(config, "server.host")  # "vault-01"

# Safe nested access with default
def safe_get(data: dict, path: str, default=None) -> Any:
    """Safely extract nested value, return default if missing."""
    try:
        return jq_get(data, path)
    except (KeyError, IndexError, TypeError):
        return default

# Merge JSON objects
def merge_json(base: dict, override: dict) -> dict:
    """Deep merge two dictionaries."""
    result = base.copy()
    for key, value in override.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = merge_json(result[key], value)
        else:
            result[key] = value
    return result

base = {"a": 1, "b": {"c": 2}}
override = {"b": {"d": 3}, "e": 4}
merged = merge_json(base, override)  # {"a": 1, "b": {"c": 2, "d": 3}, "e": 4}

# Stream large JSON (line-delimited JSON / NDJSON)
def read_ndjson(path: Path):
    """Read newline-delimited JSON file."""
    with path.open() as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

for record in read_ndjson(Path("logs.ndjson")):
    print(record["timestamp"])

# Write NDJSON
def write_ndjson(path: Path, records):
    """Write newline-delimited JSON file."""
    with path.open("w") as f:
        for record in records:
            f.write(json.dumps(record) + "\n")

YAML Handling

# Requires: pip install pyyaml
import yaml
from pathlib import Path

# Read YAML
data = yaml.safe_load(Path("config.yaml").read_text())

# Write YAML
Path("config.yaml").write_text(
    yaml.dump(data, default_flow_style=False, sort_keys=False)
)

# Multi-document YAML (like k8s manifests)
docs = list(yaml.safe_load_all(Path("manifests.yaml").read_text()))
for doc in docs:
    print(doc["kind"])

# Write multi-document YAML
Path("manifests.yaml").write_text(
    yaml.dump_all(docs, default_flow_style=False)
)

# Custom YAML dumper for cleaner output
class CleanDumper(yaml.SafeDumper):
    """Dumper that produces cleaner YAML output."""
    pass

def str_representer(dumper, data):
    """Use literal block style for multiline strings."""
    if "\n" in data:
        return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
    return dumper.represent_scalar("tag:yaml.org,2002:str", data)

CleanDumper.add_representer(str, str_representer)

# Usage
data = {"script": "#!/bin/bash\necho hello\nexit 0"}
print(yaml.dump(data, Dumper=CleanDumper, default_flow_style=False))
# Output:
# script: |
#   #!/bin/bash
#   echo hello
#   exit 0

# ruamel.yaml for round-trip editing (preserves comments!)
# Requires: pip install ruamel.yaml
from ruamel.yaml import YAML

yaml_rt = YAML()
yaml_rt.preserve_quotes = True

# Read, modify, write - preserving comments
with open("config.yaml") as f:
    data = yaml_rt.load(f)

data["new_key"] = "new_value"

with open("config.yaml", "w") as f:
    yaml_rt.dump(data, f)

TIP: Use ruamel.yaml when editing config files to preserve comments and formatting.

CSV Handling

import csv
from pathlib import Path

# Read CSV as dictionaries
with open("data.csv", newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row["name"], row["ip"])

# Write CSV from dictionaries
data = [
    {"name": "vault-01", "ip": "10.50.1.60"},
    {"name": "ise-01", "ip": "10.50.1.20"},
]

with open("servers.csv", "w", newline="") as f:
    fieldnames = ["name", "ip"]
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(data)

# Read CSV as lists (no header)
with open("data.csv", newline="") as f:
    reader = csv.reader(f)
    header = next(reader)  # Skip header
    for row in reader:
        print(row[0], row[1])

# Custom delimiter
with open("data.tsv", newline="") as f:
    reader = csv.reader(f, delimiter="\t")
    for row in reader:
        print(row)

# Handle quoted fields with special characters
with open("complex.csv", newline="") as f:
    reader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_ALL)
    for row in reader:
        print(row)

# Read entire CSV into memory
def read_csv_to_dicts(path: Path) -> list[dict]:
    """Read CSV file as list of dictionaries."""
    with path.open(newline="") as f:
        return list(csv.DictReader(f))

# Convert CSV to JSON
def csv_to_json(csv_path: Path, json_path: Path):
    """Convert CSV file to JSON."""
    import json
    data = read_csv_to_dicts(csv_path)
    json_path.write_text(json.dumps(data, indent=2))

# Infrastructure pattern: Parse ISE export
def parse_ise_endpoint_export(path: Path) -> list[dict]:
    """Parse ISE endpoint export CSV."""
    endpoints = []
    with path.open(newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            endpoints.append({
                "mac": row.get("MACAddress", "").upper(),
                "group": row.get("IdentityGroup", ""),
                "description": row.get("Description", ""),
            })
    return endpoints

GOTCHA: Always use newline="" when opening CSV files to avoid blank row issues on Windows.

TOML Handling (Python 3.11+)

# Python 3.11+ has built-in tomllib (read-only)
import tomllib
from pathlib import Path

# Read TOML
with open("pyproject.toml", "rb") as f:
    data = tomllib.load(f)

# Or from string
toml_string = Path("pyproject.toml").read_text()
data = tomllib.loads(toml_string)

# Access nested values
project_name = data["project"]["name"]
dependencies = data["project"].get("dependencies", [])

# For WRITING TOML, use tomli-w
# Requires: pip install tomli-w
import tomli_w

config = {
    "tool": {
        "myapp": {
            "debug": True,
            "servers": ["vault-01", "vault-02"],
        }
    }
}

with open("config.toml", "wb") as f:
    tomli_w.dump(config, f)

# Parse pyproject.toml to get project metadata
def get_project_info(project_dir: Path) -> dict:
    """Extract project info from pyproject.toml."""
    pyproject = project_dir / "pyproject.toml"
    if not pyproject.exists():
        return {}

    with pyproject.open("rb") as f:
        data = tomllib.load(f)

    project = data.get("project", {})
    return {
        "name": project.get("name"),
        "version": project.get("version"),
        "description": project.get("description"),
        "dependencies": project.get("dependencies", []),
    }

INI/Config File Handling

import configparser
from pathlib import Path

# Read INI file
config = configparser.ConfigParser()
config.read("app.ini")

# Access values
host = config["database"]["host"]
port = config.getint("database", "port")
debug = config.getboolean("app", "debug")

# Default values
timeout = config.getint("database", "timeout", fallback=30)

# Iterate sections
for section in config.sections():
    print(f"[{section}]")
    for key, value in config[section].items():
        print(f"  {key} = {value}")

# Write INI file
config = configparser.ConfigParser()
config["database"] = {
    "host": "localhost",
    "port": "5432",
    "name": "mydb",
}
config["logging"] = {
    "level": "INFO",
    "file": "/var/log/app.log",
}

with open("app.ini", "w") as f:
    config.write(f)

# Parse SSSD config (infrastructure pattern)
def parse_sssd_config(path: Path = Path("/etc/sssd/sssd.conf")) -> dict:
    """Parse SSSD configuration file."""
    config = configparser.ConfigParser()
    config.read(path)

    result = {}
    if "domain/INSIDE.DOMUSDIGITALIS.DEV" in config:
        domain = config["domain/INSIDE.DOMUSDIGITALIS.DEV"]
        result["ad_server"] = domain.get("ad_server")
        result["krb5_realm"] = domain.get("krb5_realm")
        result["ldap_id_mapping"] = domain.getboolean("ldap_id_mapping", fallback=True)

    return result

Temporary Files and Directories

import tempfile
from pathlib import Path
import os

# Temporary file (auto-deleted when closed)
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=True) as f:
    f.write('{"key": "value"}')
    f.flush()
    print(f"Temp file: {f.name}")
    # File exists here
# File deleted after context manager exits

# Temporary file that persists
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
    temp_path = Path(f.name)
    f.write('{"key": "value"}')

# Use temp_path, then manually delete
temp_path.unlink()

# Temporary directory (auto-deleted)
with tempfile.TemporaryDirectory() as tmpdir:
    tmp_path = Path(tmpdir)
    (tmp_path / "file.txt").write_text("content")
    # Do work...
# Directory and all contents deleted

# Get system temp directory
tmp = Path(tempfile.gettempdir())

# Create unique temp file path (doesn't create file)
fd, path = tempfile.mkstemp(suffix=".tmp", prefix="myapp_")
os.close(fd)  # Close file descriptor
temp_path = Path(path)
# Now use temp_path
temp_path.unlink()  # Cleanup

# Infrastructure pattern: Safe config update
def safe_config_update(config_path: Path, new_content: str):
    """Safely update config file using temp file + rename."""
    # Write to temp file in same directory (same filesystem)
    temp_path = config_path.with_suffix(".tmp")
    temp_path.write_text(new_content)

    # Atomic rename (on same filesystem)
    temp_path.rename(config_path)

TIP: For atomic file updates, write to a temp file in the same directory, then rename.

Atomic Writes and File Locking

import os
import fcntl
from pathlib import Path
from contextlib import contextmanager

# Atomic write using rename
def atomic_write(path: Path, content: str, encoding: str = "utf-8"):
    """Write file atomically using temp file + rename."""
    temp_path = path.with_suffix(path.suffix + ".tmp")
    try:
        temp_path.write_text(content, encoding=encoding)
        temp_path.rename(path)  # Atomic on POSIX
    except:
        temp_path.unlink(missing_ok=True)
        raise

# File locking (advisory locks)
@contextmanager
def file_lock(path: Path, exclusive: bool = True):
    """Acquire file lock for safe concurrent access."""
    lock_path = path.with_suffix(path.suffix + ".lock")
    lock_file = open(lock_path, "w")

    try:
        operation = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
        fcntl.flock(lock_file.fileno(), operation)
        yield
    finally:
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
        lock_file.close()

# Usage
with file_lock(Path("/etc/myapp/config.json")):
    # Only one process can be here at a time
    config = json.loads(Path("/etc/myapp/config.json").read_text())
    config["counter"] += 1
    atomic_write(Path("/etc/myapp/config.json"), json.dumps(config))

# Non-blocking lock attempt
@contextmanager
def try_lock(path: Path, timeout: float = 5.0):
    """Try to acquire lock with timeout."""
    import time
    lock_path = path.with_suffix(path.suffix + ".lock")
    lock_file = open(lock_path, "w")

    start = time.time()
    while True:
        try:
            fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
            break
        except BlockingIOError:
            if time.time() - start > timeout:
                lock_file.close()
                raise TimeoutError(f"Could not acquire lock on {path}")
            time.sleep(0.1)

    try:
        yield
    finally:
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
        lock_file.close()

Log File Handling

import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from datetime import datetime

# Basic file logging
logging.basicConfig(
    filename="/var/log/myapp/app.log",
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

# Rotating file handler (by size)
handler = RotatingFileHandler(
    "/var/log/myapp/app.log",
    maxBytes=10_000_000,  # 10MB
    backupCount=5         # Keep 5 backup files
)

# Timed rotation (daily)
handler = TimedRotatingFileHandler(
    "/var/log/myapp/app.log",
    when="midnight",
    interval=1,
    backupCount=30  # Keep 30 days
)

# Complete logging setup
def setup_logging(app_name: str, log_dir: Path, level: int = logging.INFO):
    """Configure application logging."""
    log_dir.mkdir(parents=True, exist_ok=True)

    logger = logging.getLogger(app_name)
    logger.setLevel(level)

    # File handler with rotation
    file_handler = RotatingFileHandler(
        log_dir / f"{app_name}.log",
        maxBytes=10_000_000,
        backupCount=5
    )
    file_handler.setFormatter(logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    ))

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(
        "%(levelname)s - %(message)s"
    ))

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

# Parse log files
def parse_log_line(line: str) -> dict | None:
    """Parse standard log format line."""
    import re
    pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - (\w+) - (\w+) - (.+)"
    match = re.match(pattern, line)
    if match:
        return {
            "timestamp": match.group(1),
            "logger": match.group(2),
            "level": match.group(3),
            "message": match.group(4),
        }
    return None

# Infrastructure pattern: Parse auth.log for failed SSH
def parse_ssh_failures(log_path: Path = Path("/var/log/auth.log")) -> list[dict]:
    """Extract failed SSH login attempts."""
    import re
    failures = []
    pattern = r"(\w+ \d+ \d+:\d+:\d+).*sshd.*Failed password for (?:invalid user )?(\w+) from ([\d.]+)"

    for line in log_path.read_text().splitlines():
        match = re.search(pattern, line)
        if match:
            failures.append({
                "timestamp": match.group(1),
                "username": match.group(2),
                "source_ip": match.group(3),
            })

    return failures

Certificate File Handling

from pathlib import Path
from datetime import datetime
import subprocess

# Parse certificate using openssl
def get_cert_info(cert_path: Path) -> dict:
    """Extract certificate information using openssl."""
    result = subprocess.run(
        ["openssl", "x509", "-in", str(cert_path), "-noout",
         "-subject", "-issuer", "-dates", "-serial"],
        capture_output=True,
        text=True
    )

    info = {}
    for line in result.stdout.splitlines():
        if line.startswith("subject="):
            info["subject"] = line.split("=", 1)[1].strip()
        elif line.startswith("issuer="):
            info["issuer"] = line.split("=", 1)[1].strip()
        elif line.startswith("notBefore="):
            info["not_before"] = line.split("=", 1)[1].strip()
        elif line.startswith("notAfter="):
            info["not_after"] = line.split("=", 1)[1].strip()
        elif line.startswith("serial="):
            info["serial"] = line.split("=", 1)[1].strip()

    return info

# Check certificate expiry
def check_cert_expiry(cert_path: Path) -> tuple[bool, int]:
    """Check if certificate is expired and days until expiry."""
    result = subprocess.run(
        ["openssl", "x509", "-in", str(cert_path), "-noout", "-enddate"],
        capture_output=True,
        text=True
    )

    # Parse: notAfter=Feb 27 12:00:00 2027 GMT
    date_str = result.stdout.split("=")[1].strip()
    expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z")

    days_left = (expiry - datetime.now()).days
    is_expired = days_left < 0

    return is_expired, days_left

# Infrastructure pattern: Check all certs in directory
def audit_certificates(cert_dir: Path, warn_days: int = 30) -> list[dict]:
    """Audit all certificates in directory for expiry."""
    results = []

    for cert in cert_dir.glob("*.pem"):
        try:
            is_expired, days_left = check_cert_expiry(cert)
            info = get_cert_info(cert)

            results.append({
                "path": str(cert),
                "subject": info.get("subject", "Unknown"),
                "issuer": info.get("issuer", "Unknown"),
                "days_left": days_left,
                "status": "EXPIRED" if is_expired else
                         "WARNING" if days_left < warn_days else "OK",
            })
        except Exception as e:
            results.append({
                "path": str(cert),
                "status": "ERROR",
                "error": str(e),
            })

    return results

# Using cryptography library (more Pythonic)
# Requires: pip install cryptography
from cryptography import x509
from cryptography.hazmat.primitives import serialization

def parse_cert_python(cert_path: Path) -> dict:
    """Parse certificate using cryptography library."""
    cert_data = cert_path.read_bytes()
    cert = x509.load_pem_x509_certificate(cert_data)

    return {
        "subject": cert.subject.rfc4514_string(),
        "issuer": cert.issuer.rfc4514_string(),
        "serial": cert.serial_number,
        "not_before": cert.not_valid_before_utc.isoformat(),
        "not_after": cert.not_valid_after_utc.isoformat(),
        "san": [
            str(name.value)
            for name in cert.extensions.get_extension_for_class(
                x509.SubjectAlternativeName
            ).value
        ] if cert.extensions else [],
    }

Backup and Archive Operations

import tarfile
import zipfile
import gzip
import shutil
from pathlib import Path
from datetime import datetime

# Create tar.gz archive
def create_backup(source_dir: Path, backup_dir: Path, name: str) -> Path:
    """Create timestamped tar.gz backup."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    archive_name = f"{name}_{timestamp}.tar.gz"
    archive_path = backup_dir / archive_name

    with tarfile.open(archive_path, "w:gz") as tar:
        tar.add(source_dir, arcname=name)

    return archive_path

# Extract tar.gz
def extract_backup(archive_path: Path, dest_dir: Path):
    """Extract tar.gz archive."""
    with tarfile.open(archive_path, "r:gz") as tar:
        tar.extractall(dest_dir)

# Create zip archive
def create_zip(source_dir: Path, zip_path: Path):
    """Create zip archive of directory."""
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
        for file in source_dir.rglob("*"):
            if file.is_file():
                zf.write(file, file.relative_to(source_dir))

# Read gzip file
def read_gzip(path: Path) -> str:
    """Read gzipped text file."""
    with gzip.open(path, "rt") as f:
        return f.read()

# Write gzip file
def write_gzip(path: Path, content: str):
    """Write gzipped text file."""
    with gzip.open(path, "wt") as f:
        f.write(content)

# Infrastructure pattern: Rotate backups
def rotate_backups(backup_dir: Path, pattern: str, keep: int = 7):
    """Keep only the N most recent backups matching pattern."""
    backups = sorted(
        backup_dir.glob(pattern),
        key=lambda p: p.stat().st_mtime,
        reverse=True
    )

    for old_backup in backups[keep:]:
        old_backup.unlink()
        print(f"Deleted old backup: {old_backup.name}")

# Usage
rotate_backups(Path("/backup"), "myapp_*.tar.gz", keep=7)

# Copy directory tree
shutil.copytree(
    src="/etc/myapp",
    dst="/backup/myapp_config",
    dirs_exist_ok=True  # Overwrite if exists (Python 3.8+)
)

# Copy single file
shutil.copy2("/etc/myapp/config.yaml", "/backup/config.yaml.bak")
# copy2 preserves metadata (timestamps, permissions)

File Watching (watchdog)

# Requires: pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from pathlib import Path
import time

class ConfigReloadHandler(FileSystemEventHandler):
    """Handler that reloads config on file changes."""

    def __init__(self, config_path: Path, callback):
        self.config_path = config_path
        self.callback = callback
        self._last_modified = 0

    def on_modified(self, event):
        if isinstance(event, FileModifiedEvent):
            if Path(event.src_path).resolve() == self.config_path.resolve():
                # Debounce (avoid duplicate events)
                mtime = self.config_path.stat().st_mtime
                if mtime != self._last_modified:
                    self._last_modified = mtime
                    self.callback()

def watch_config(config_path: Path, on_change):
    """Watch config file for changes."""
    observer = Observer()
    handler = ConfigReloadHandler(config_path, on_change)
    observer.schedule(handler, str(config_path.parent), recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# Usage
def reload_config():
    print("Config changed, reloading...")
    # Reload logic here

# watch_config(Path("/etc/myapp/config.yaml"), reload_config)

# Simple polling approach (no dependencies)
def poll_file_changes(path: Path, callback, interval: float = 1.0):
    """Simple file change detection via polling."""
    last_mtime = path.stat().st_mtime if path.exists() else 0

    while True:
        time.sleep(interval)
        try:
            current_mtime = path.stat().st_mtime
            if current_mtime != last_mtime:
                last_mtime = current_mtime
                callback()
        except FileNotFoundError:
            pass

Infrastructure File Patterns

from pathlib import Path
import json
import yaml

# Parse NetworkManager connection file
def parse_nm_connection(path: Path) -> dict:
    """Parse NetworkManager connection file."""
    import configparser
    config = configparser.ConfigParser()
    config.read(path)

    conn = {}
    if "connection" in config:
        conn["name"] = config["connection"].get("id")
        conn["type"] = config["connection"].get("type")

    if "802-1x" in config:
        conn["eap"] = {
            "identity": config["802-1x"].get("identity"),
            "ca_cert": config["802-1x"].get("ca-cert"),
            "client_cert": config["802-1x"].get("client-cert"),
            "private_key": config["802-1x"].get("private-key"),
        }

    return conn

# Parse wpa_supplicant config
def parse_wpa_supplicant(path: Path) -> list[dict]:
    """Parse wpa_supplicant.conf networks."""
    content = path.read_text()
    networks = []

    import re
    network_blocks = re.findall(r"network=\{([^}]+)\}", content, re.DOTALL)

    for block in network_blocks:
        network = {}
        for line in block.strip().splitlines():
            line = line.strip()
            if "=" in line:
                key, value = line.split("=", 1)
                network[key.strip()] = value.strip().strip('"')
        networks.append(network)

    return networks

# Parse k8s manifest
def parse_k8s_manifest(path: Path) -> list[dict]:
    """Parse Kubernetes YAML manifest (multi-doc)."""
    return list(yaml.safe_load_all(path.read_text()))

# Extract secrets from k8s Secret manifest
def extract_k8s_secrets(manifest: dict) -> dict:
    """Decode base64 secrets from k8s Secret manifest."""
    import base64

    if manifest.get("kind") != "Secret":
        return {}

    data = manifest.get("data", {})
    return {
        key: base64.b64decode(value).decode("utf-8")
        for key, value in data.items()
    }

# Parse Vault policy file
def parse_vault_policy(path: Path) -> dict:
    """Parse Vault HCL policy file (basic parser)."""
    import re
    content = path.read_text()

    paths = {}
    pattern = r'path\s+"([^"]+)"\s+\{([^}]+)\}'

    for match in re.finditer(pattern, content):
        policy_path = match.group(1)
        capabilities = re.findall(r'"(\w+)"', match.group(2))
        paths[policy_path] = capabilities

    return paths

# Parse gopass entry (YAML after first line)
def parse_gopass_entry(content: str) -> tuple[str, dict]:
    """Parse gopass entry into password and metadata."""
    lines = content.splitlines()
    password = lines[0]

    metadata = {}
    yaml_start = None
    for i, line in enumerate(lines[1:], 1):
        if line.strip() == "---":
            yaml_start = i + 1
            break

    if yaml_start and yaml_start < len(lines):
        yaml_content = "\n".join(lines[yaml_start:])
        metadata = yaml.safe_load(yaml_content) or {}

    return password, metadata

# Generate hosts file entries
def generate_hosts_entries(hosts: list[dict]) -> str:
    """Generate /etc/hosts format entries."""
    lines = ["# Generated by Python script", ""]

    for host in hosts:
        ip = host["ip"]
        names = [host["fqdn"]]
        if "aliases" in host:
            names.extend(host["aliases"])
        lines.append(f"{ip}\t{' '.join(names)}")

    return "\n".join(lines)

# Update single line in config file
def update_config_line(path: Path, key: str, value: str, separator: str = "="):
    """Update single line in config file by key."""
    lines = path.read_text().splitlines()
    updated = False

    for i, line in enumerate(lines):
        if line.strip().startswith(key + separator) or line.strip().startswith(key + " "):
            lines[i] = f"{key}{separator}{value}"
            updated = True
            break

    if not updated:
        lines.append(f"{key}{separator}{value}")

    path.write_text("\n".join(lines) + "\n")

Common Gotchas

# WRONG: Using os.path instead of pathlib
import os
path = os.path.join("/etc", "ssl", "certs")

# CORRECT: Use pathlib
from pathlib import Path
path = Path("/etc") / "ssl" / "certs"

# WRONG: Forgetting to close files
f = open("file.txt")
content = f.read()
# f is never closed!

# CORRECT: Use context manager
with open("file.txt") as f:
    content = f.read()

# CORRECT: Use Path methods (auto-close)
content = Path("file.txt").read_text()

# WRONG: Reading binary file as text
content = Path("image.png").read_text()  # UnicodeDecodeError

# CORRECT: Use read_bytes for binary
data = Path("image.png").read_bytes()

# WRONG: Writing without newline
Path("file.txt").write_text("line1\nline2")  # No trailing newline

# CORRECT: Include trailing newline
Path("file.txt").write_text("line1\nline2\n")

# WRONG: Assuming file exists
data = json.loads(Path("config.json").read_text())  # FileNotFoundError

# CORRECT: Check existence or handle error
config_path = Path("config.json")
if config_path.exists():
    data = json.loads(config_path.read_text())
else:
    data = {}

# Or with try/except
try:
    data = json.loads(Path("config.json").read_text())
except FileNotFoundError:
    data = {}

# WRONG: CSV without newline="" (Windows issues)
with open("data.csv") as f:  # Missing newline=""
    reader = csv.reader(f)

# CORRECT: Always specify newline=""
with open("data.csv", newline="") as f:
    reader = csv.reader(f)

# WRONG: YAML safe_load on untrusted input... wait, that's correct!
# WRONG: Using yaml.load() without Loader
data = yaml.load(content)  # Security warning

# CORRECT: Always use safe_load or specify Loader
data = yaml.safe_load(content)
data = yaml.load(content, Loader=yaml.SafeLoader)

# WRONG: Assuming symlink target exists
target = Path("/etc/ssl/certs/ca-certificates.crt").resolve()
# If symlink is broken, resolve() raises FileNotFoundError

# CORRECT: Check if it's a valid symlink
path = Path("/etc/ssl/certs/ca-certificates.crt")
if path.is_symlink():
    if path.exists():  # Target exists
        target = path.resolve()

# WRONG: rename() across filesystems
Path("/tmp/file").rename("/home/user/file")  # May fail!

# CORRECT: Use shutil.move() for cross-filesystem moves
import shutil
shutil.move("/tmp/file", "/home/user/file")