Shell Pipelines: Composing Tools
The power isn’t in any single tool. It’s in how they flow together.
Core Concepts
The Pipeline Model
┌─────────────────────────────────────────────────────────────────┐
│ UNIX PIPELINE PHILOSOPHY │
├─────────────────────────────────────────────────────────────────┤
│ │
│ "Write programs that do one thing and do it well. │
│ Write programs to work together. │
│ Write programs to handle text streams." │
│ │
│ — Doug McIlroy │
│ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ PRODUCER ──► FILTER ──► FILTER ──► CONSUMER │
│ │
│ curl jq sort xargs │
│ cat awk uniq tee │
│ find sed head > file │
│ kubectl grep tail variable │
│ │
│ Each tool: │
│ • Reads from stdin (or files) │
│ • Writes to stdout │
│ • Errors go to stderr │
│ │
└─────────────────────────────────────────────────────────────────┘
Pipeline Operators
| Operator | Purpose | Example |
|---|---|---|
|
Pipe stdout to next command |
|
|
Pipe stdout AND stderr |
|
|
Redirect stdout to file |
|
|
Append stdout to file |
|
|
Redirect stderr to file |
|
|
Redirect stderr to stdout |
|
|
Redirect both to file |
|
|
Read from file |
|
|
Here document |
|
|
Here string |
|
|
Process substitution |
|
|
Process substitution (write) |
|
Basic Compositions
The Classic Trio: grep | sort | uniq
# Count unique values
grep "pattern" file.log | sort | uniq -c | sort -rn
# Top 10 most common
grep ERROR app.log | awk '{print $5}' | sort | uniq -c | sort -rn | head -10
# Unique IPs from log
awk '{print $1}' access.log | sort -u
curl | jq Pipeline
# API to filtered JSON
curl -s https://api.example.com/users | jq '.[] | select(.active) | .email'
# API to CSV
curl -s https://api.example.com/users | jq -r '.[] | [.id, .name, .email] | @csv'
# Multiple API calls combined
{
curl -s https://api.example.com/users
curl -s https://api.example.com/admins
} | jq -s 'add | unique_by(.id)'
Process Substitution
Compare Two Outputs
# Compare two commands
diff <(ssh host1 'cat /etc/passwd') <(ssh host2 'cat /etc/passwd')
# Compare sorted versions
diff <(sort file1) <(sort file2)
# Compare API responses
diff <(curl -s https://api1/data | jq -S .) <(curl -s https://api2/data | jq -S .)
Multiple Inputs
# Join data from two sources
join <(sort users.txt) <(sort permissions.txt)
# Combine with paste
paste <(cut -f1 file1) <(cut -f2 file2)
# Process multiple streams
cat <(grep ERROR log1) <(grep ERROR log2) <(grep ERROR log3) | sort
Write to Multiple Destinations
# tee to file and continue pipeline
curl -s https://api.example.com/data | tee response.json | jq '.items[]'
# tee to process substitution
curl -s https://api.example.com/data | tee >(jq '.count' > count.txt) | jq '.items[]'
# Multiple outputs
cat data.txt | tee >(grep ERROR > errors.txt) >(grep WARN > warnings.txt) > /dev/null
Real Infrastructure Pipelines
Certificate Audit Pipeline
#!/bin/bash
# Full certificate audit with JSON output
cat << 'EOF' | while read hostport; do
ise-01.inside.domusdigitalis.dev:443
ise-02.inside.domusdigitalis.dev:443
vault.inside.domusdigitalis.dev:8200
pfsense.inside.domusdigitalis.dev:443
EOF
host="${hostport%%:*}"
# Fetch cert data
data=$(echo | timeout 5 openssl s_client -connect "$hostport" \
-servername "$host" 2>/dev/null | \
openssl x509 -noout -subject -issuer -enddate 2>/dev/null)
if [[ -n "$data" ]]; then
subject=$(echo "$data" | awk -F'CN=' '/subject/{print $2}' | cut -d',' -f1)
issuer=$(echo "$data" | awk -F'CN=' '/issuer/{print $2}' | cut -d',' -f1)
enddate=$(echo "$data" | grep notAfter | cut -d= -f2)
days=$(( ($(date -d "$enddate" +%s) - $(date +%s)) / 86400 ))
jq -n --arg h "$hostport" --arg s "$subject" --arg i "$issuer" \
--argjson d "$days" \
'{host:$h, subject:$s, issuer:$i, days_left:$d}'
else
jq -n --arg h "$hostport" '{host:$h, error:"unreachable"}'
fi
done | jq -s '{
timestamp: (now | strftime("%Y-%m-%dT%H:%M:%SZ")),
certificates: .,
alerts: [.[] | select(.days_left < 30 and .days_left != null)]
}'
Log Analysis Pipeline
# Real-time error monitoring
tail -f /var/log/app.log | \
grep --line-buffered ERROR | \
awk '{print strftime("%H:%M:%S"), $0}' | \
tee -a errors.log | \
while read line; do
echo "$line"
# Could send to Slack/PagerDuty here
done
# Aggregated analysis
cat /var/log/app.log | \
grep ERROR | \
awk '{print $5}' | \ # Extract error code
sort | \
uniq -c | \
sort -rn | \
head -20 | \
awk 'BEGIN {print "Count\tError"} {print $1"\t"$2}'
Multi-Host Command Pipeline
# Parallel disk check across hosts
cat hosts.txt | \
xargs -P10 -I {} ssh {} 'hostname; df -h / | tail -1' 2>/dev/null | \
paste - - | \
awk '{print $1, $5}' | \
sort -k2 -t'%' -rn | \
while read host usage; do
pct=${usage%\%}
if [[ $pct -gt 80 ]]; then
echo "ALERT: $host at $usage"
else
echo "OK: $host at $usage"
fi
done
API Data Pipeline
# Fetch, transform, and load
curl -s https://api.example.com/users | \
jq -r '.[] | [.id, .email, .role] | @csv' | \
while IFS=, read -r id email role; do
# Remove quotes from CSV
id="${id//\"/}"
email="${email//\"/}"
role="${role//\"/}"
# Insert into database
echo "INSERT INTO users VALUES ($id, '$email', '$role');"
done | \
psql -d mydb
# Or bulk insert
curl -s https://api.example.com/users | \
jq -r '.[] | "(\(.id), \(.email | @sh), \(.role | @sh))"' | \
tr '\n' ',' | \
sed 's/,$//' | \
xargs -I {} echo "INSERT INTO users VALUES {};" | \
psql -d mydb
Inventory Generation
# Generate Ansible inventory from multiple sources
{
# ISE endpoints
netapi ise --format json get-endpoints | \
jq -r '.[] | select(.profile | test("Linux")) | "\(.ip) ansible_host=\(.ip) source=ise"'
# DNS records
dig axfr inside.domusdigitalis.dev @ns1 | \
awk '/IN\s+A\s+10\.50\./ {print $1, "ansible_host="$5, "source=dns"}'
# Static additions
cat static_hosts.txt
} | \
sort -u | \
awk '{
split($1, parts, ".")
group = parts[1]
hosts[group] = hosts[group] "\n" $0
}
END {
for (g in hosts) {
print "[" g "]"
print hosts[g]
}
}'
Error Handling in Pipelines
PIPEFAIL
# Default: pipeline succeeds if last command succeeds
false | true
echo $? # 0 (success - true succeeded)
# With pipefail: fails if ANY command fails
set -o pipefail
false | true
echo $? # 1 (failure - false failed)
# In scripts
#!/bin/bash
set -euo pipefail # Strict mode
curl -s https://api.example.com/data | jq '.items[]'
# Script exits if curl fails OR jq fails
Pattern Library
Data Transformation Patterns
# JSON to table
curl -s api/users | jq -r '["ID","NAME","EMAIL"], (.[] | [.id,.name,.email]) | @tsv' | column -t
# CSV to JSON
cat data.csv | awk -F, 'NR==1{split($0,h);next} {for(i=1;i<=NF;i++)printf "{\"%s\":\"%s\"}%s",h[i],$i,(i<NF?",":"\n")}' | jq -s '.'
# Transpose rows/columns
awk '{for(i=1;i<=NF;i++)a[i,NR]=$i}END{for(i=1;i<=NF;i++){for(j=1;j<=NR;j++)printf"%s ",a[i,j];print""}}' file
# Pivot data
awk '{data[$1][$2]=$3} END {for(r in data){printf r; for(c in data[r])printf " %s",data[r][c]; print""}}' file
Monitoring Patterns
# Watch for changes
watch -d 'kubectl get pods -o wide'
# Alert on condition
while true; do
usage=$(df / | awk 'NR==2{print int($5)}')
[[ $usage -gt 80 ]] && echo "DISK ALERT: $usage%"
sleep 60
done
# Tail multiple logs
tail -f /var/log/*.log | awk '/ERROR/{print strftime("%T"), $0; fflush()}'
Debugging Pipelines
Inspect Intermediate Results
# Use tee to see what's flowing
curl -s api/data | tee /dev/stderr | jq '.items[]'
# Or to a file
curl -s api/data | tee debug.json | jq '.items[]'
# At each stage
cmd1 | tee stage1.txt | cmd2 | tee stage2.txt | cmd3
Step-by-Step Building
# Build incrementally
curl -s api/data | head # Check raw response
curl -s api/data | jq 'type' # Check JSON type
curl -s api/data | jq 'keys' # Check structure
curl -s api/data | jq '.items[0]' # Sample one item
curl -s api/data | jq '.items[] | .name' # Extract field
curl -s api/data | jq -r '.items[] | .name' | sort # Full pipeline
Performance Tips
Minimize Process Spawning
# Bad: spawning grep for each line
while read line; do
echo "$line" | grep pattern
done < file
# Good: single grep
grep pattern file
# Bad: repeated curl calls
for id in $(cat ids.txt); do
curl -s "api/$id"
done
# Good: parallel with xargs
cat ids.txt | xargs -P10 -I {} curl -s "api/{}"
Quick Reference
Common Patterns
# Top N by count
sort | uniq -c | sort -rn | head -N
# Unique values
sort -u
# Count matching
grep -c pattern
# Extract column
awk '{print $N}'
# Filter JSON
jq '.[] | select(.field == "value")'
# Parallel execution
xargs -P4 -I {} cmd {}
# Compare outputs
diff <(cmd1) <(cmd2)
# Multiple outputs
cmd | tee file | next_cmd
# Safe file handling
find -print0 | xargs -0
# Error handling
set -o pipefail
Related
-
jq Mastery - JSON processing
-
xargs - Parallel execution
-
awk - Text processing