Shell Pipelines: Composing Tools

The power isn’t in any single tool. It’s in how they flow together.


Core Concepts

The Pipeline Model

┌─────────────────────────────────────────────────────────────────┐
│                    UNIX PIPELINE PHILOSOPHY                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   "Write programs that do one thing and do it well.             │
│    Write programs to work together.                              │
│    Write programs to handle text streams."                       │
│                                                                  │
│                                      — Doug McIlroy              │
│                                                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   PRODUCER  ──►  FILTER  ──►  FILTER  ──►  CONSUMER             │
│                                                                  │
│   curl         jq          sort         xargs                   │
│   cat          awk         uniq         tee                     │
│   find         sed         head         > file                  │
│   kubectl      grep        tail         variable                │
│                                                                  │
│   Each tool:                                                     │
│   • Reads from stdin (or files)                                  │
│   • Writes to stdout                                             │
│   • Errors go to stderr                                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Pipeline Operators

Operator Purpose Example

|

Pipe stdout to next command

cmd1 | cmd2

|&

Pipe stdout AND stderr

cmd1 |& cmd2

>

Redirect stdout to file

cmd > file

>>

Append stdout to file

cmd >> file

2>

Redirect stderr to file

cmd 2> errors.log

2>&1

Redirect stderr to stdout

cmd 2>&1 | grep

&>

Redirect both to file

cmd &> all.log

<

Read from file

cmd < file

<<

Here document

cmd << 'EOF'

<<<

Here string

cmd <<< "string"

<()

Process substitution

diff <(cmd1) <(cmd2)

>()

Process substitution (write)

cmd | tee >(cmd2)


Basic Compositions

The Classic Trio: grep | sort | uniq

# Count unique values
grep "pattern" file.log | sort | uniq -c | sort -rn

# Top 10 most common
grep ERROR app.log | awk '{print $5}' | sort | uniq -c | sort -rn | head -10

# Unique IPs from log
awk '{print $1}' access.log | sort -u

curl | jq Pipeline

# API to filtered JSON
curl -s https://api.example.com/users | jq '.[] | select(.active) | .email'

# API to CSV
curl -s https://api.example.com/users | jq -r '.[] | [.id, .name, .email] | @csv'

# Multiple API calls combined
{
  curl -s https://api.example.com/users
  curl -s https://api.example.com/admins
} | jq -s 'add | unique_by(.id)'

find | xargs Pipeline

# Process files in parallel
find /data -name "*.log" -print0 | xargs -0 -P4 gzip

# Search in files
find . -name "*.py" -print0 | xargs -0 grep -l "import requests"

# Complex processing
find . -name "*.json" -print0 | xargs -0 -I {} sh -c 'jq ".name" {} 2>/dev/null'

Process Substitution

Compare Two Outputs

# Compare two commands
diff <(ssh host1 'cat /etc/passwd') <(ssh host2 'cat /etc/passwd')

# Compare sorted versions
diff <(sort file1) <(sort file2)

# Compare API responses
diff <(curl -s https://api1/data | jq -S .) <(curl -s https://api2/data | jq -S .)

Multiple Inputs

# Join data from two sources
join <(sort users.txt) <(sort permissions.txt)

# Combine with paste
paste <(cut -f1 file1) <(cut -f2 file2)

# Process multiple streams
cat <(grep ERROR log1) <(grep ERROR log2) <(grep ERROR log3) | sort

Write to Multiple Destinations

# tee to file and continue pipeline
curl -s https://api.example.com/data | tee response.json | jq '.items[]'

# tee to process substitution
curl -s https://api.example.com/data | tee >(jq '.count' > count.txt) | jq '.items[]'

# Multiple outputs
cat data.txt | tee >(grep ERROR > errors.txt) >(grep WARN > warnings.txt) > /dev/null

Real Infrastructure Pipelines

Certificate Audit Pipeline

#!/bin/bash
# Full certificate audit with JSON output

cat << 'EOF' | while read hostport; do
ise-01.inside.domusdigitalis.dev:443
ise-02.inside.domusdigitalis.dev:443
vault.inside.domusdigitalis.dev:8200
pfsense.inside.domusdigitalis.dev:443
EOF
  host="${hostport%%:*}"

  # Fetch cert data
  data=$(echo | timeout 5 openssl s_client -connect "$hostport" \
    -servername "$host" 2>/dev/null | \
    openssl x509 -noout -subject -issuer -enddate 2>/dev/null)

  if [[ -n "$data" ]]; then
    subject=$(echo "$data" | awk -F'CN=' '/subject/{print $2}' | cut -d',' -f1)
    issuer=$(echo "$data" | awk -F'CN=' '/issuer/{print $2}' | cut -d',' -f1)
    enddate=$(echo "$data" | grep notAfter | cut -d= -f2)
    days=$(( ($(date -d "$enddate" +%s) - $(date +%s)) / 86400 ))

    jq -n --arg h "$hostport" --arg s "$subject" --arg i "$issuer" \
          --argjson d "$days" \
      '{host:$h, subject:$s, issuer:$i, days_left:$d}'
  else
    jq -n --arg h "$hostport" '{host:$h, error:"unreachable"}'
  fi
done | jq -s '{
  timestamp: (now | strftime("%Y-%m-%dT%H:%M:%SZ")),
  certificates: .,
  alerts: [.[] | select(.days_left < 30 and .days_left != null)]
}'

Log Analysis Pipeline

# Real-time error monitoring
tail -f /var/log/app.log | \
  grep --line-buffered ERROR | \
  awk '{print strftime("%H:%M:%S"), $0}' | \
  tee -a errors.log | \
  while read line; do
    echo "$line"
    # Could send to Slack/PagerDuty here
  done

# Aggregated analysis
cat /var/log/app.log | \
  grep ERROR | \
  awk '{print $5}' | \  # Extract error code
  sort | \
  uniq -c | \
  sort -rn | \
  head -20 | \
  awk 'BEGIN {print "Count\tError"} {print $1"\t"$2}'

Multi-Host Command Pipeline

# Parallel disk check across hosts
cat hosts.txt | \
  xargs -P10 -I {} ssh {} 'hostname; df -h / | tail -1' 2>/dev/null | \
  paste - - | \
  awk '{print $1, $5}' | \
  sort -k2 -t'%' -rn | \
  while read host usage; do
    pct=${usage%\%}
    if [[ $pct -gt 80 ]]; then
      echo "ALERT: $host at $usage"
    else
      echo "OK: $host at $usage"
    fi
  done

API Data Pipeline

# Fetch, transform, and load
curl -s https://api.example.com/users | \
  jq -r '.[] | [.id, .email, .role] | @csv' | \
  while IFS=, read -r id email role; do
    # Remove quotes from CSV
    id="${id//\"/}"
    email="${email//\"/}"
    role="${role//\"/}"

    # Insert into database
    echo "INSERT INTO users VALUES ($id, '$email', '$role');"
  done | \
  psql -d mydb

# Or bulk insert
curl -s https://api.example.com/users | \
  jq -r '.[] | "(\(.id), \(.email | @sh), \(.role | @sh))"' | \
  tr '\n' ',' | \
  sed 's/,$//' | \
  xargs -I {} echo "INSERT INTO users VALUES {};" | \
  psql -d mydb

Inventory Generation

# Generate Ansible inventory from multiple sources
{
  # ISE endpoints
  netapi ise --format json get-endpoints | \
    jq -r '.[] | select(.profile | test("Linux")) | "\(.ip) ansible_host=\(.ip) source=ise"'

  # DNS records
  dig axfr inside.domusdigitalis.dev @ns1 | \
    awk '/IN\s+A\s+10\.50\./ {print $1, "ansible_host="$5, "source=dns"}'

  # Static additions
  cat static_hosts.txt
} | \
  sort -u | \
  awk '{
    split($1, parts, ".")
    group = parts[1]
    hosts[group] = hosts[group] "\n" $0
  }
  END {
    for (g in hosts) {
      print "[" g "]"
      print hosts[g]
    }
  }'

Error Handling in Pipelines

PIPEFAIL

# Default: pipeline succeeds if last command succeeds
false | true
echo $?  # 0 (success - true succeeded)

# With pipefail: fails if ANY command fails
set -o pipefail
false | true
echo $?  # 1 (failure - false failed)

# In scripts
#!/bin/bash
set -euo pipefail  # Strict mode

curl -s https://api.example.com/data | jq '.items[]'
# Script exits if curl fails OR jq fails

Check Exit Status

# Capture exit status of pipe components
curl -s https://api.example.com/data | jq '.' > output.json
PIPE_STATUS=("${PIPESTATUS[@]}")

if [[ ${PIPE_STATUS[0]} -ne 0 ]]; then
  echo "curl failed"
elif [[ ${PIPE_STATUS[1]} -ne 0 ]]; then
  echo "jq failed"
else
  echo "Success"
fi

Stderr Handling

# Separate stdout and stderr
cmd 2>errors.log | next_cmd

# Combine then filter
cmd 2>&1 | grep -v "expected warning"

# Log errors while continuing
cmd 2> >(tee errors.log >&2) | next_cmd

# Discard stderr
cmd 2>/dev/null | next_cmd

Pattern Library

Data Transformation Patterns

# JSON to table
curl -s api/users | jq -r '["ID","NAME","EMAIL"], (.[] | [.id,.name,.email]) | @tsv' | column -t

# CSV to JSON
cat data.csv | awk -F, 'NR==1{split($0,h);next} {for(i=1;i<=NF;i++)printf "{\"%s\":\"%s\"}%s",h[i],$i,(i<NF?",":"\n")}' | jq -s '.'

# Transpose rows/columns
awk '{for(i=1;i<=NF;i++)a[i,NR]=$i}END{for(i=1;i<=NF;i++){for(j=1;j<=NR;j++)printf"%s ",a[i,j];print""}}' file

# Pivot data
awk '{data[$1][$2]=$3} END {for(r in data){printf r; for(c in data[r])printf " %s",data[r][c]; print""}}' file

Monitoring Patterns

# Watch for changes
watch -d 'kubectl get pods -o wide'

# Alert on condition
while true; do
  usage=$(df / | awk 'NR==2{print int($5)}')
  [[ $usage -gt 80 ]] && echo "DISK ALERT: $usage%"
  sleep 60
done

# Tail multiple logs
tail -f /var/log/*.log | awk '/ERROR/{print strftime("%T"), $0; fflush()}'

Parallel Patterns

# Fan-out, fan-in
cat urls.txt | \
  xargs -P10 -I {} sh -c 'curl -s "{}" | jq .count' | \
  awk '{sum+=$1} END {print "Total:", sum}'

# Map-reduce style
find /data -name "*.log" -print0 | \
  xargs -0 -P$(nproc) grep -c ERROR | \
  awk -F: '{sum+=$2} END {print sum}'

Debugging Pipelines

Inspect Intermediate Results

# Use tee to see what's flowing
curl -s api/data | tee /dev/stderr | jq '.items[]'

# Or to a file
curl -s api/data | tee debug.json | jq '.items[]'

# At each stage
cmd1 | tee stage1.txt | cmd2 | tee stage2.txt | cmd3

Step-by-Step Building

# Build incrementally
curl -s api/data | head  # Check raw response
curl -s api/data | jq 'type'  # Check JSON type
curl -s api/data | jq 'keys'  # Check structure
curl -s api/data | jq '.items[0]'  # Sample one item
curl -s api/data | jq '.items[] | .name'  # Extract field
curl -s api/data | jq -r '.items[] | .name' | sort  # Full pipeline

Verbose Mode

# Add verbose output at each stage
curl -sv api/data 2>&1 | head -20  # See curl details

# Echo commands
set -x
cat file | grep pattern | awk '{print $1}'
set +x

Performance Tips

Minimize Process Spawning

# Bad: spawning grep for each line
while read line; do
  echo "$line" | grep pattern
done < file

# Good: single grep
grep pattern file

# Bad: repeated curl calls
for id in $(cat ids.txt); do
  curl -s "api/$id"
done

# Good: parallel with xargs
cat ids.txt | xargs -P10 -I {} curl -s "api/{}"

Use Built-in Features

# Instead of: cat file | grep
grep pattern file

# Instead of: echo "$var" | cmd
cmd <<< "$var"

# Instead of: cat file | head
head file

# Instead of external tools for simple tasks
# Bad: echo $x | awk '{print $1}'
# Good: ${x%% *}

Buffer Management

# Line buffering for real-time output
grep --line-buffered pattern | while read line; do
  process "$line"
done

# stdbuf for commands without buffering options
stdbuf -oL cmd1 | cmd2

# Unbuffer (from expect package)
unbuffer cmd1 | cmd2

Quick Reference

Common Patterns

# Top N by count
sort | uniq -c | sort -rn | head -N

# Unique values
sort -u

# Count matching
grep -c pattern

# Extract column
awk '{print $N}'

# Filter JSON
jq '.[] | select(.field == "value")'

# Parallel execution
xargs -P4 -I {} cmd {}

# Compare outputs
diff <(cmd1) <(cmd2)

# Multiple outputs
cmd | tee file | next_cmd

# Safe file handling
find -print0 | xargs -0

# Error handling
set -o pipefail

Pipeline Template

#!/bin/bash
set -euo pipefail

# Source → Transform → Filter → Aggregate → Output

source_data | \
  transform_step | \
  filter_step | \
  aggregate_step | \
  format_output