Python Files
File operations, JSON/YAML parsing, and path handling.
Path Operations (pathlib)
from pathlib import Path
# Current working directory
cwd = Path.cwd()
home = Path.home()
# Path construction (OS-agnostic)
config_dir = home / ".config" / "myapp"
log_file = Path("/var/log") / "app.log"
# Path properties
p = Path("/etc/ssl/certs/ca-certificates.crt")
p.name # "ca-certificates.crt"
p.stem # "ca-certificates"
p.suffix # ".crt"
p.suffixes # [".crt"]
p.parent # Path("/etc/ssl/certs")
p.parents[0] # Path("/etc/ssl/certs")
p.parents[1] # Path("/etc/ssl")
p.parts # ("/", "etc", "ssl", "certs", "ca-certificates.crt")
# Change extension
new_path = p.with_suffix(".pem") # /etc/ssl/certs/ca-certificates.pem
new_path = p.with_name("new.crt") # /etc/ssl/certs/new.crt
# Path resolution
relative = Path("../configs/app.yaml")
absolute = relative.resolve() # Full absolute path
absolute = relative.absolute() # Same but doesn't resolve symlinks
# String conversion
str(p) # "/etc/ssl/certs/ca-certificates.crt"
TIP: Always use pathlib.Path over os.path for new code. It’s more readable and chainable.
Path Existence and Type Checks
from pathlib import Path
p = Path("/etc/passwd")
# Existence checks
p.exists() # True/False
p.is_file() # True if regular file
p.is_dir() # True if directory
p.is_symlink() # True if symbolic link
p.is_mount() # True if mount point
p.is_socket() # True if Unix socket
p.is_fifo() # True if FIFO/named pipe
p.is_block_device()
p.is_char_device()
# Permissions
import os
os.access(p, os.R_OK) # Readable?
os.access(p, os.W_OK) # Writable?
os.access(p, os.X_OK) # Executable?
# Stat info
stat = p.stat()
stat.st_size # Size in bytes
stat.st_mtime # Modification time (epoch)
stat.st_mode # Permissions (octal)
stat.st_uid # Owner UID
stat.st_gid # Group GID
# Human-readable size
def human_size(size: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}PB"
print(human_size(p.stat().st_size))
# Modification time as datetime
from datetime import datetime
mtime = datetime.fromtimestamp(p.stat().st_mtime)
print(f"Modified: {mtime.isoformat()}")
Directory Traversal and Globbing
from pathlib import Path
# List directory contents
for item in Path("/etc").iterdir():
print(item.name)
# Glob patterns (non-recursive)
for f in Path("/etc/ssl").glob("*.conf"):
print(f)
# Recursive glob
for f in Path("/var/log").glob("**/*.log"):
print(f)
# rglob (shortcut for recursive glob)
for f in Path(".").rglob("*.py"):
print(f)
# Filtered iteration
configs = [f for f in Path("/etc").iterdir() if f.suffix == ".conf"]
dirs = [f for f in Path(".").iterdir() if f.is_dir()]
files = [f for f in Path(".").iterdir() if f.is_file()]
# Find files by pattern with size filter
large_logs = [
f for f in Path("/var/log").rglob("*.log")
if f.stat().st_size > 1024 * 1024 # > 1MB
]
# Infrastructure pattern: Find all certificates
def find_certs(root: Path) -> list[Path]:
"""Find all certificate files in a directory tree."""
patterns = ["*.pem", "*.crt", "*.cer", "*.der"]
certs = []
for pattern in patterns:
certs.extend(root.rglob(pattern))
return certs
certs = find_certs(Path("/etc/ssl"))
for cert in certs:
print(f"{cert}: {cert.stat().st_size} bytes")
File Reading and Writing
from pathlib import Path
# Simple read/write (entire file)
content = Path("file.txt").read_text()
Path("file.txt").write_text("new content")
# Binary read/write
data = Path("file.bin").read_bytes()
Path("file.bin").write_bytes(b"\x00\x01\x02")
# With encoding
content = Path("file.txt").read_text(encoding="utf-8")
Path("file.txt").write_text(content, encoding="utf-8")
# Line by line (memory efficient for large files)
with open("large.log", "r") as f:
for line in f:
process(line.rstrip("\n"))
# Read all lines as list
lines = Path("file.txt").read_text().splitlines()
# Read with context manager (preferred)
with Path("file.txt").open("r") as f:
content = f.read()
# Write with context manager
with Path("file.txt").open("w") as f:
f.write("line 1\n")
f.write("line 2\n")
# Append mode
with Path("file.txt").open("a") as f:
f.write("appended line\n")
# Read specific number of bytes
with open("file.bin", "rb") as f:
header = f.read(4) # First 4 bytes
f.seek(100) # Jump to byte 100
chunk = f.read(50) # Read 50 bytes
# Write multiple lines
lines = ["line 1", "line 2", "line 3"]
Path("file.txt").write_text("\n".join(lines) + "\n")
# Or with writelines
with open("file.txt", "w") as f:
f.writelines(line + "\n" for line in lines)
GOTCHA: write_text() and write_bytes() overwrite completely. No append mode.
JSON Handling
import json
from pathlib import Path
from typing import Any
# Read JSON file
data = json.loads(Path("config.json").read_text())
# Write JSON file (pretty printed)
Path("config.json").write_text(
json.dumps(data, indent=2, sort_keys=True)
)
# Compact JSON (no whitespace)
compact = json.dumps(data, separators=(",", ":"))
# Handle non-serializable types
from datetime import datetime
def json_serializer(obj):
"""Custom serializer for non-JSON types."""
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Path):
return str(obj)
if isinstance(obj, bytes):
return obj.decode("utf-8")
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
data = {"timestamp": datetime.now(), "path": Path("/etc/passwd")}
json_str = json.dumps(data, default=json_serializer, indent=2)
# jq-like operations in Python
def jq_get(data: dict, path: str) -> Any:
"""Extract nested value: jq_get(data, "a.b.c") -> data["a"]["b"]["c"]"""
result = data
for key in path.split("."):
if isinstance(result, list):
result = result[int(key)]
else:
result = result[key]
return result
# Usage
config = {"server": {"host": "vault-01", "port": 8200}}
host = jq_get(config, "server.host") # "vault-01"
# Safe nested access with default
def safe_get(data: dict, path: str, default=None) -> Any:
"""Safely extract nested value, return default if missing."""
try:
return jq_get(data, path)
except (KeyError, IndexError, TypeError):
return default
# Merge JSON objects
def merge_json(base: dict, override: dict) -> dict:
"""Deep merge two dictionaries."""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = merge_json(result[key], value)
else:
result[key] = value
return result
base = {"a": 1, "b": {"c": 2}}
override = {"b": {"d": 3}, "e": 4}
merged = merge_json(base, override) # {"a": 1, "b": {"c": 2, "d": 3}, "e": 4}
# Stream large JSON (line-delimited JSON / NDJSON)
def read_ndjson(path: Path):
"""Read newline-delimited JSON file."""
with path.open() as f:
for line in f:
if line.strip():
yield json.loads(line)
for record in read_ndjson(Path("logs.ndjson")):
print(record["timestamp"])
# Write NDJSON
def write_ndjson(path: Path, records):
"""Write newline-delimited JSON file."""
with path.open("w") as f:
for record in records:
f.write(json.dumps(record) + "\n")
YAML Handling
# Requires: pip install pyyaml
import yaml
from pathlib import Path
# Read YAML
data = yaml.safe_load(Path("config.yaml").read_text())
# Write YAML
Path("config.yaml").write_text(
yaml.dump(data, default_flow_style=False, sort_keys=False)
)
# Multi-document YAML (like k8s manifests)
docs = list(yaml.safe_load_all(Path("manifests.yaml").read_text()))
for doc in docs:
print(doc["kind"])
# Write multi-document YAML
Path("manifests.yaml").write_text(
yaml.dump_all(docs, default_flow_style=False)
)
# Custom YAML dumper for cleaner output
class CleanDumper(yaml.SafeDumper):
"""Dumper that produces cleaner YAML output."""
pass
def str_representer(dumper, data):
"""Use literal block style for multiline strings."""
if "\n" in data:
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
CleanDumper.add_representer(str, str_representer)
# Usage
data = {"script": "#!/bin/bash\necho hello\nexit 0"}
print(yaml.dump(data, Dumper=CleanDumper, default_flow_style=False))
# Output:
# script: |
# #!/bin/bash
# echo hello
# exit 0
# ruamel.yaml for round-trip editing (preserves comments!)
# Requires: pip install ruamel.yaml
from ruamel.yaml import YAML
yaml_rt = YAML()
yaml_rt.preserve_quotes = True
# Read, modify, write - preserving comments
with open("config.yaml") as f:
data = yaml_rt.load(f)
data["new_key"] = "new_value"
with open("config.yaml", "w") as f:
yaml_rt.dump(data, f)
TIP: Use ruamel.yaml when editing config files to preserve comments and formatting.
CSV Handling
import csv
from pathlib import Path
# Read CSV as dictionaries
with open("data.csv", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
print(row["name"], row["ip"])
# Write CSV from dictionaries
data = [
{"name": "vault-01", "ip": "10.50.1.60"},
{"name": "ise-01", "ip": "10.50.1.20"},
]
with open("servers.csv", "w", newline="") as f:
fieldnames = ["name", "ip"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
# Read CSV as lists (no header)
with open("data.csv", newline="") as f:
reader = csv.reader(f)
header = next(reader) # Skip header
for row in reader:
print(row[0], row[1])
# Custom delimiter
with open("data.tsv", newline="") as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
print(row)
# Handle quoted fields with special characters
with open("complex.csv", newline="") as f:
reader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_ALL)
for row in reader:
print(row)
# Read entire CSV into memory
def read_csv_to_dicts(path: Path) -> list[dict]:
"""Read CSV file as list of dictionaries."""
with path.open(newline="") as f:
return list(csv.DictReader(f))
# Convert CSV to JSON
def csv_to_json(csv_path: Path, json_path: Path):
"""Convert CSV file to JSON."""
import json
data = read_csv_to_dicts(csv_path)
json_path.write_text(json.dumps(data, indent=2))
# Infrastructure pattern: Parse ISE export
def parse_ise_endpoint_export(path: Path) -> list[dict]:
"""Parse ISE endpoint export CSV."""
endpoints = []
with path.open(newline="") as f:
reader = csv.DictReader(f)
for row in reader:
endpoints.append({
"mac": row.get("MACAddress", "").upper(),
"group": row.get("IdentityGroup", ""),
"description": row.get("Description", ""),
})
return endpoints
GOTCHA: Always use newline="" when opening CSV files to avoid blank row issues on Windows.
TOML Handling (Python 3.11+)
# Python 3.11+ has built-in tomllib (read-only)
import tomllib
from pathlib import Path
# Read TOML
with open("pyproject.toml", "rb") as f:
data = tomllib.load(f)
# Or from string
toml_string = Path("pyproject.toml").read_text()
data = tomllib.loads(toml_string)
# Access nested values
project_name = data["project"]["name"]
dependencies = data["project"].get("dependencies", [])
# For WRITING TOML, use tomli-w
# Requires: pip install tomli-w
import tomli_w
config = {
"tool": {
"myapp": {
"debug": True,
"servers": ["vault-01", "vault-02"],
}
}
}
with open("config.toml", "wb") as f:
tomli_w.dump(config, f)
# Parse pyproject.toml to get project metadata
def get_project_info(project_dir: Path) -> dict:
"""Extract project info from pyproject.toml."""
pyproject = project_dir / "pyproject.toml"
if not pyproject.exists():
return {}
with pyproject.open("rb") as f:
data = tomllib.load(f)
project = data.get("project", {})
return {
"name": project.get("name"),
"version": project.get("version"),
"description": project.get("description"),
"dependencies": project.get("dependencies", []),
}
INI/Config File Handling
import configparser
from pathlib import Path
# Read INI file
config = configparser.ConfigParser()
config.read("app.ini")
# Access values
host = config["database"]["host"]
port = config.getint("database", "port")
debug = config.getboolean("app", "debug")
# Default values
timeout = config.getint("database", "timeout", fallback=30)
# Iterate sections
for section in config.sections():
print(f"[{section}]")
for key, value in config[section].items():
print(f" {key} = {value}")
# Write INI file
config = configparser.ConfigParser()
config["database"] = {
"host": "localhost",
"port": "5432",
"name": "mydb",
}
config["logging"] = {
"level": "INFO",
"file": "/var/log/app.log",
}
with open("app.ini", "w") as f:
config.write(f)
# Parse SSSD config (infrastructure pattern)
def parse_sssd_config(path: Path = Path("/etc/sssd/sssd.conf")) -> dict:
"""Parse SSSD configuration file."""
config = configparser.ConfigParser()
config.read(path)
result = {}
if "domain/INSIDE.DOMUSDIGITALIS.DEV" in config:
domain = config["domain/INSIDE.DOMUSDIGITALIS.DEV"]
result["ad_server"] = domain.get("ad_server")
result["krb5_realm"] = domain.get("krb5_realm")
result["ldap_id_mapping"] = domain.getboolean("ldap_id_mapping", fallback=True)
return result
Temporary Files and Directories
import tempfile
from pathlib import Path
import os
# Temporary file (auto-deleted when closed)
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=True) as f:
f.write('{"key": "value"}')
f.flush()
print(f"Temp file: {f.name}")
# File exists here
# File deleted after context manager exits
# Temporary file that persists
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
temp_path = Path(f.name)
f.write('{"key": "value"}')
# Use temp_path, then manually delete
temp_path.unlink()
# Temporary directory (auto-deleted)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
(tmp_path / "file.txt").write_text("content")
# Do work...
# Directory and all contents deleted
# Get system temp directory
tmp = Path(tempfile.gettempdir())
# Create unique temp file path (doesn't create file)
fd, path = tempfile.mkstemp(suffix=".tmp", prefix="myapp_")
os.close(fd) # Close file descriptor
temp_path = Path(path)
# Now use temp_path
temp_path.unlink() # Cleanup
# Infrastructure pattern: Safe config update
def safe_config_update(config_path: Path, new_content: str):
"""Safely update config file using temp file + rename."""
# Write to temp file in same directory (same filesystem)
temp_path = config_path.with_suffix(".tmp")
temp_path.write_text(new_content)
# Atomic rename (on same filesystem)
temp_path.rename(config_path)
TIP: For atomic file updates, write to a temp file in the same directory, then rename.
Atomic Writes and File Locking
import os
import fcntl
from pathlib import Path
from contextlib import contextmanager
# Atomic write using rename
def atomic_write(path: Path, content: str, encoding: str = "utf-8"):
"""Write file atomically using temp file + rename."""
temp_path = path.with_suffix(path.suffix + ".tmp")
try:
temp_path.write_text(content, encoding=encoding)
temp_path.rename(path) # Atomic on POSIX
except:
temp_path.unlink(missing_ok=True)
raise
# File locking (advisory locks)
@contextmanager
def file_lock(path: Path, exclusive: bool = True):
"""Acquire file lock for safe concurrent access."""
lock_path = path.with_suffix(path.suffix + ".lock")
lock_file = open(lock_path, "w")
try:
operation = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
fcntl.flock(lock_file.fileno(), operation)
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
# Usage
with file_lock(Path("/etc/myapp/config.json")):
# Only one process can be here at a time
config = json.loads(Path("/etc/myapp/config.json").read_text())
config["counter"] += 1
atomic_write(Path("/etc/myapp/config.json"), json.dumps(config))
# Non-blocking lock attempt
@contextmanager
def try_lock(path: Path, timeout: float = 5.0):
"""Try to acquire lock with timeout."""
import time
lock_path = path.with_suffix(path.suffix + ".lock")
lock_file = open(lock_path, "w")
start = time.time()
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
lock_file.close()
raise TimeoutError(f"Could not acquire lock on {path}")
time.sleep(0.1)
try:
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
Log File Handling
import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from datetime import datetime
# Basic file logging
logging.basicConfig(
filename="/var/log/myapp/app.log",
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Rotating file handler (by size)
handler = RotatingFileHandler(
"/var/log/myapp/app.log",
maxBytes=10_000_000, # 10MB
backupCount=5 # Keep 5 backup files
)
# Timed rotation (daily)
handler = TimedRotatingFileHandler(
"/var/log/myapp/app.log",
when="midnight",
interval=1,
backupCount=30 # Keep 30 days
)
# Complete logging setup
def setup_logging(app_name: str, log_dir: Path, level: int = logging.INFO):
"""Configure application logging."""
log_dir.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(app_name)
logger.setLevel(level)
# File handler with rotation
file_handler = RotatingFileHandler(
log_dir / f"{app_name}.log",
maxBytes=10_000_000,
backupCount=5
)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
"%(levelname)s - %(message)s"
))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# Parse log files
def parse_log_line(line: str) -> dict | None:
"""Parse standard log format line."""
import re
pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - (\w+) - (\w+) - (.+)"
match = re.match(pattern, line)
if match:
return {
"timestamp": match.group(1),
"logger": match.group(2),
"level": match.group(3),
"message": match.group(4),
}
return None
# Infrastructure pattern: Parse auth.log for failed SSH
def parse_ssh_failures(log_path: Path = Path("/var/log/auth.log")) -> list[dict]:
"""Extract failed SSH login attempts."""
import re
failures = []
pattern = r"(\w+ \d+ \d+:\d+:\d+).*sshd.*Failed password for (?:invalid user )?(\w+) from ([\d.]+)"
for line in log_path.read_text().splitlines():
match = re.search(pattern, line)
if match:
failures.append({
"timestamp": match.group(1),
"username": match.group(2),
"source_ip": match.group(3),
})
return failures
Certificate File Handling
from pathlib import Path
from datetime import datetime
import subprocess
# Parse certificate using openssl
def get_cert_info(cert_path: Path) -> dict:
"""Extract certificate information using openssl."""
result = subprocess.run(
["openssl", "x509", "-in", str(cert_path), "-noout",
"-subject", "-issuer", "-dates", "-serial"],
capture_output=True,
text=True
)
info = {}
for line in result.stdout.splitlines():
if line.startswith("subject="):
info["subject"] = line.split("=", 1)[1].strip()
elif line.startswith("issuer="):
info["issuer"] = line.split("=", 1)[1].strip()
elif line.startswith("notBefore="):
info["not_before"] = line.split("=", 1)[1].strip()
elif line.startswith("notAfter="):
info["not_after"] = line.split("=", 1)[1].strip()
elif line.startswith("serial="):
info["serial"] = line.split("=", 1)[1].strip()
return info
# Check certificate expiry
def check_cert_expiry(cert_path: Path) -> tuple[bool, int]:
"""Check if certificate is expired and days until expiry."""
result = subprocess.run(
["openssl", "x509", "-in", str(cert_path), "-noout", "-enddate"],
capture_output=True,
text=True
)
# Parse: notAfter=Feb 27 12:00:00 2027 GMT
date_str = result.stdout.split("=")[1].strip()
expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z")
days_left = (expiry - datetime.now()).days
is_expired = days_left < 0
return is_expired, days_left
# Infrastructure pattern: Check all certs in directory
def audit_certificates(cert_dir: Path, warn_days: int = 30) -> list[dict]:
"""Audit all certificates in directory for expiry."""
results = []
for cert in cert_dir.glob("*.pem"):
try:
is_expired, days_left = check_cert_expiry(cert)
info = get_cert_info(cert)
results.append({
"path": str(cert),
"subject": info.get("subject", "Unknown"),
"issuer": info.get("issuer", "Unknown"),
"days_left": days_left,
"status": "EXPIRED" if is_expired else
"WARNING" if days_left < warn_days else "OK",
})
except Exception as e:
results.append({
"path": str(cert),
"status": "ERROR",
"error": str(e),
})
return results
# Using cryptography library (more Pythonic)
# Requires: pip install cryptography
from cryptography import x509
from cryptography.hazmat.primitives import serialization
def parse_cert_python(cert_path: Path) -> dict:
"""Parse certificate using cryptography library."""
cert_data = cert_path.read_bytes()
cert = x509.load_pem_x509_certificate(cert_data)
return {
"subject": cert.subject.rfc4514_string(),
"issuer": cert.issuer.rfc4514_string(),
"serial": cert.serial_number,
"not_before": cert.not_valid_before_utc.isoformat(),
"not_after": cert.not_valid_after_utc.isoformat(),
"san": [
str(name.value)
for name in cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
).value
] if cert.extensions else [],
}
Backup and Archive Operations
import tarfile
import zipfile
import gzip
import shutil
from pathlib import Path
from datetime import datetime
# Create tar.gz archive
def create_backup(source_dir: Path, backup_dir: Path, name: str) -> Path:
"""Create timestamped tar.gz backup."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archive_name = f"{name}_{timestamp}.tar.gz"
archive_path = backup_dir / archive_name
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(source_dir, arcname=name)
return archive_path
# Extract tar.gz
def extract_backup(archive_path: Path, dest_dir: Path):
"""Extract tar.gz archive."""
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(dest_dir)
# Create zip archive
def create_zip(source_dir: Path, zip_path: Path):
"""Create zip archive of directory."""
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for file in source_dir.rglob("*"):
if file.is_file():
zf.write(file, file.relative_to(source_dir))
# Read gzip file
def read_gzip(path: Path) -> str:
"""Read gzipped text file."""
with gzip.open(path, "rt") as f:
return f.read()
# Write gzip file
def write_gzip(path: Path, content: str):
"""Write gzipped text file."""
with gzip.open(path, "wt") as f:
f.write(content)
# Infrastructure pattern: Rotate backups
def rotate_backups(backup_dir: Path, pattern: str, keep: int = 7):
"""Keep only the N most recent backups matching pattern."""
backups = sorted(
backup_dir.glob(pattern),
key=lambda p: p.stat().st_mtime,
reverse=True
)
for old_backup in backups[keep:]:
old_backup.unlink()
print(f"Deleted old backup: {old_backup.name}")
# Usage
rotate_backups(Path("/backup"), "myapp_*.tar.gz", keep=7)
# Copy directory tree
shutil.copytree(
src="/etc/myapp",
dst="/backup/myapp_config",
dirs_exist_ok=True # Overwrite if exists (Python 3.8+)
)
# Copy single file
shutil.copy2("/etc/myapp/config.yaml", "/backup/config.yaml.bak")
# copy2 preserves metadata (timestamps, permissions)
File Watching (watchdog)
# Requires: pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from pathlib import Path
import time
class ConfigReloadHandler(FileSystemEventHandler):
"""Handler that reloads config on file changes."""
def __init__(self, config_path: Path, callback):
self.config_path = config_path
self.callback = callback
self._last_modified = 0
def on_modified(self, event):
if isinstance(event, FileModifiedEvent):
if Path(event.src_path).resolve() == self.config_path.resolve():
# Debounce (avoid duplicate events)
mtime = self.config_path.stat().st_mtime
if mtime != self._last_modified:
self._last_modified = mtime
self.callback()
def watch_config(config_path: Path, on_change):
"""Watch config file for changes."""
observer = Observer()
handler = ConfigReloadHandler(config_path, on_change)
observer.schedule(handler, str(config_path.parent), recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# Usage
def reload_config():
print("Config changed, reloading...")
# Reload logic here
# watch_config(Path("/etc/myapp/config.yaml"), reload_config)
# Simple polling approach (no dependencies)
def poll_file_changes(path: Path, callback, interval: float = 1.0):
"""Simple file change detection via polling."""
last_mtime = path.stat().st_mtime if path.exists() else 0
while True:
time.sleep(interval)
try:
current_mtime = path.stat().st_mtime
if current_mtime != last_mtime:
last_mtime = current_mtime
callback()
except FileNotFoundError:
pass
Infrastructure File Patterns
from pathlib import Path
import json
import yaml
# Parse NetworkManager connection file
def parse_nm_connection(path: Path) -> dict:
"""Parse NetworkManager connection file."""
import configparser
config = configparser.ConfigParser()
config.read(path)
conn = {}
if "connection" in config:
conn["name"] = config["connection"].get("id")
conn["type"] = config["connection"].get("type")
if "802-1x" in config:
conn["eap"] = {
"identity": config["802-1x"].get("identity"),
"ca_cert": config["802-1x"].get("ca-cert"),
"client_cert": config["802-1x"].get("client-cert"),
"private_key": config["802-1x"].get("private-key"),
}
return conn
# Parse wpa_supplicant config
def parse_wpa_supplicant(path: Path) -> list[dict]:
"""Parse wpa_supplicant.conf networks."""
content = path.read_text()
networks = []
import re
network_blocks = re.findall(r"network=\{([^}]+)\}", content, re.DOTALL)
for block in network_blocks:
network = {}
for line in block.strip().splitlines():
line = line.strip()
if "=" in line:
key, value = line.split("=", 1)
network[key.strip()] = value.strip().strip('"')
networks.append(network)
return networks
# Parse k8s manifest
def parse_k8s_manifest(path: Path) -> list[dict]:
"""Parse Kubernetes YAML manifest (multi-doc)."""
return list(yaml.safe_load_all(path.read_text()))
# Extract secrets from k8s Secret manifest
def extract_k8s_secrets(manifest: dict) -> dict:
"""Decode base64 secrets from k8s Secret manifest."""
import base64
if manifest.get("kind") != "Secret":
return {}
data = manifest.get("data", {})
return {
key: base64.b64decode(value).decode("utf-8")
for key, value in data.items()
}
# Parse Vault policy file
def parse_vault_policy(path: Path) -> dict:
"""Parse Vault HCL policy file (basic parser)."""
import re
content = path.read_text()
paths = {}
pattern = r'path\s+"([^"]+)"\s+\{([^}]+)\}'
for match in re.finditer(pattern, content):
policy_path = match.group(1)
capabilities = re.findall(r'"(\w+)"', match.group(2))
paths[policy_path] = capabilities
return paths
# Parse gopass entry (YAML after first line)
def parse_gopass_entry(content: str) -> tuple[str, dict]:
"""Parse gopass entry into password and metadata."""
lines = content.splitlines()
password = lines[0]
metadata = {}
yaml_start = None
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
yaml_start = i + 1
break
if yaml_start and yaml_start < len(lines):
yaml_content = "\n".join(lines[yaml_start:])
metadata = yaml.safe_load(yaml_content) or {}
return password, metadata
# Generate hosts file entries
def generate_hosts_entries(hosts: list[dict]) -> str:
"""Generate /etc/hosts format entries."""
lines = ["# Generated by Python script", ""]
for host in hosts:
ip = host["ip"]
names = [host["fqdn"]]
if "aliases" in host:
names.extend(host["aliases"])
lines.append(f"{ip}\t{' '.join(names)}")
return "\n".join(lines)
# Update single line in config file
def update_config_line(path: Path, key: str, value: str, separator: str = "="):
"""Update single line in config file by key."""
lines = path.read_text().splitlines()
updated = False
for i, line in enumerate(lines):
if line.strip().startswith(key + separator) or line.strip().startswith(key + " "):
lines[i] = f"{key}{separator}{value}"
updated = True
break
if not updated:
lines.append(f"{key}{separator}{value}")
path.write_text("\n".join(lines) + "\n")
Common Gotchas
# WRONG: Using os.path instead of pathlib
import os
path = os.path.join("/etc", "ssl", "certs")
# CORRECT: Use pathlib
from pathlib import Path
path = Path("/etc") / "ssl" / "certs"
# WRONG: Forgetting to close files
f = open("file.txt")
content = f.read()
# f is never closed!
# CORRECT: Use context manager
with open("file.txt") as f:
content = f.read()
# CORRECT: Use Path methods (auto-close)
content = Path("file.txt").read_text()
# WRONG: Reading binary file as text
content = Path("image.png").read_text() # UnicodeDecodeError
# CORRECT: Use read_bytes for binary
data = Path("image.png").read_bytes()
# WRONG: Writing without newline
Path("file.txt").write_text("line1\nline2") # No trailing newline
# CORRECT: Include trailing newline
Path("file.txt").write_text("line1\nline2\n")
# WRONG: Assuming file exists
data = json.loads(Path("config.json").read_text()) # FileNotFoundError
# CORRECT: Check existence or handle error
config_path = Path("config.json")
if config_path.exists():
data = json.loads(config_path.read_text())
else:
data = {}
# Or with try/except
try:
data = json.loads(Path("config.json").read_text())
except FileNotFoundError:
data = {}
# WRONG: CSV without newline="" (Windows issues)
with open("data.csv") as f: # Missing newline=""
reader = csv.reader(f)
# CORRECT: Always specify newline=""
with open("data.csv", newline="") as f:
reader = csv.reader(f)
# WRONG: YAML safe_load on untrusted input... wait, that's correct!
# WRONG: Using yaml.load() without Loader
data = yaml.load(content) # Security warning
# CORRECT: Always use safe_load or specify Loader
data = yaml.safe_load(content)
data = yaml.load(content, Loader=yaml.SafeLoader)
# WRONG: Assuming symlink target exists
target = Path("/etc/ssl/certs/ca-certificates.crt").resolve()
# If symlink is broken, resolve() raises FileNotFoundError
# CORRECT: Check if it's a valid symlink
path = Path("/etc/ssl/certs/ca-certificates.crt")
if path.is_symlink():
if path.exists(): # Target exists
target = path.resolve()
# WRONG: rename() across filesystems
Path("/tmp/file").rename("/home/user/file") # May fail!
# CORRECT: Use shutil.move() for cross-filesystem moves
import shutil
shutil.move("/tmp/file", "/home/user/file")