Pipes & Process Substitution

Pipelines, process substitution, and command composition.

Pipeline Fundamentals

# Classic pipeline: filter → transform → aggregate
grep "ERROR" /var/log/syslog | awk '{print $5}' | sort | uniq -c | sort -rn

# Each stage runs in parallel, connected by pipes
# stdout of left → stdin of right

# Pipeline exit status is LAST command (unless pipefail set)
false | true
echo $?                                      # 0 (from 'true')

# Enable pipefail for first-failure exit
set -o pipefail
false | true
echo $?                                      # 1 (from 'false')

# Check individual exit codes with PIPESTATUS
cat /etc/passwd | grep root | wc -l
echo "${PIPESTATUS[@]}"                      # "0 0 0" or "0 1 0" if grep fails

Tee: Split Output Streams

# Write to file AND pass through
command | tee output.log | next_command

# Append instead of overwrite
command | tee -a output.log | next_command

# Multiple outputs
command | tee file1.log file2.log | next_command

# Log and display
./long_running_job.sh 2>&1 | tee job.log

# Tee to stderr (for visibility while piping)
command | tee /dev/stderr | next_command

# Tee with process substitution (fan-out)
generate_data | tee >(process_a) >(process_b) >(process_c) > combined.out

# Infrastructure: Log all ISE sessions while processing
netapi ise mnt sessions --format json | \
    tee /var/log/ise-sessions-$(date +%Y%m%d).json | \
    jq -r '.[] | select(.authentication_status == "FAILED")'

# Sudo tee for writing to protected files
echo "new config" | sudo tee /etc/myapp.conf > /dev/null

Process Substitution

# <() - treat command output as a file
diff <(ssh vault-01 cat /etc/hosts) <(ssh vault-02 cat /etc/hosts)

# Compare sorted versions
diff <(sort file1.txt) <(sort file2.txt)

# Compare before/after config
diff <(kubectl get configmap myconfig -o yaml) <(cat new-config.yaml)

# >() - treat file as command input (fan-out)
command | tee >(gzip > output.gz) >(sha256sum > output.sha256)

# Multiple consumers of same data
generate_report | tee >(mail -s "Report" admin@example.com) >(slack-post "#reports")

# Feed to while loop without subshell
while read -r line; do
    ((count++))
done < <(cat /etc/passwd)
echo "Count: $count"                         # Works! No subshell.

# Compare k8s resources across namespaces
diff <(kubectl get pods -n prod -o name | sort) \
     <(kubectl get pods -n staging -o name | sort)

# Verify file transfer integrity
diff <(ssh remote "sha256sum /path/to/file" | cut -d' ' -f1) \
     <(sha256sum local/file | cut -d' ' -f1)

Named Pipes (FIFOs)

# Create named pipe
mkfifo /tmp/mypipe

# Producer (in background or separate terminal)
cat /var/log/syslog > /tmp/mypipe &

# Consumer
grep "ERROR" < /tmp/mypipe

# Cleanup
rm /tmp/mypipe

# Use case: Long-running producer, multiple consumers
mkfifo /tmp/log-pipe
tail -f /var/log/messages > /tmp/log-pipe &

# Consumer 1: Errors to Slack
grep --line-buffered "ERROR" < /tmp/log-pipe | while read -r line; do
    slack-post "#alerts" "$line"
done &

# Consumer 2: Metrics to monitoring
grep --line-buffered "metric" < /tmp/log-pipe | while read -r line; do
    send_to_prometheus "$line"
done &

# Cleanup on exit
trap "rm -f /tmp/log-pipe" EXIT

# Bidirectional with two pipes
mkfifo /tmp/request /tmp/response
# Server
while read -r req < /tmp/request; do
    echo "Processed: $req" > /tmp/response
done &
# Client
echo "my request" > /tmp/request
read -r resp < /tmp/response
echo "Got: $resp"

Subshell Awareness (CRITICAL)

# Pipes create subshells - variable changes are LOST

# WRONG: count stays 0
count=0
cat /etc/passwd | while read -r line; do
    ((count++))
done
echo "Count: $count"                         # 0 - count modified in subshell!

# CORRECT: Use process substitution
count=0
while read -r line; do
    ((count++))
done < <(cat /etc/passwd)
echo "Count: $count"                         # Correct!

# CORRECT: Use here-string for simple cases
count=0
while read -r line; do
    ((count++))
done <<< "$(cat /etc/passwd)"
echo "Count: $count"                         # Correct!

# CORRECT: Redirect from file
count=0
while read -r line; do
    ((count++))
done < /etc/passwd
echo "Count: $count"                         # Correct!

# CORRECT: lastpipe (bash 4.2+, not in subshell)
shopt -s lastpipe
count=0
cat /etc/passwd | while read -r line; do
    ((count++))
done
echo "Count: $count"                         # Now correct with pipe!

# Array building has same issue
hosts=()
cat hosts.txt | while read -r h; do
    hosts+=("$h")                            # Modifies subshell's copy!
done
echo "${#hosts[@]}"                          # 0!

# CORRECT
mapfile -t hosts < hosts.txt
echo "${#hosts[@]}"                          # Correct!

Complex Pipeline Patterns

# Multi-stage data transformation
cat access.log | \
    awk '{print $1}' | \                     # Extract IPs
    sort | \                                 # Sort for uniq
    uniq -c | \                              # Count occurrences
    sort -rn | \                             # Sort by count
    head -10 | \                             # Top 10
    awk '{printf "%-8s %s\n", $1, $2}'       # Format output

# Conditional pipeline stages
cat data.txt | \
    grep -v "^#" | \                         # Remove comments
    ${FILTER:+grep "$FILTER" |} \            # Optional filter
    sort | uniq

# Parallel processing in pipeline
cat large_file.txt | \
    parallel --pipe -N1000 "process_chunk.sh" | \
    aggregate_results.sh

# Error handling in pipelines
set -o pipefail
if ! cat /etc/passwd | grep "root" | wc -l; then
    echo "Pipeline failed"
    exit 1
fi

# Capture intermediate results
cat data.txt | \
    tee >(wc -l > /tmp/line_count) | \
    grep "pattern" | \
    tee >(wc -l > /tmp/match_count) | \
    process_matches.sh

# Infrastructure: ISE failed auth analysis
netapi ise mnt sessions --format json | \
    jq -r '.[] | select(.authentication_status == "FAILED") | [.calling_station_id, .user_name, .failure_reason] | @tsv' | \
    sort | \
    uniq -c | \
    sort -rn | \
    head -20 | \
    awk 'BEGIN {printf "%-6s %-20s %-20s %s\n", "COUNT", "MAC", "USER", "REASON"}
         {printf "%-6s %-20s %-20s %s\n", $1, $2, $3, $4}'

Inline Processing Patterns

# xargs - convert stdin to arguments
find /var/log -name "*.log" -mtime +30 | xargs rm -f

# xargs with -I for placeholder
cat hosts.txt | xargs -I{} ssh {} "uptime"

# xargs parallel execution
cat hosts.txt | xargs -P5 -I{} ssh {} "hostname && df -h /"

# xargs with null delimiter (handles spaces)
find . -name "*.txt" -print0 | xargs -0 grep "pattern"

# while read vs xargs
# while read: Better for complex per-line logic
cat hosts.txt | while read -r host; do
    echo "=== $host ==="
    ssh "$host" "uptime"
    ssh "$host" "df -h /"
done

# xargs: Better for simple commands, parallel
cat hosts.txt | xargs -P10 -I{} ssh {} "uptime"

# Command substitution in pipeline
for host in $(cat hosts.txt); do             # WRONG: Word splitting!
    ssh "$host" "uptime"
done

# CORRECT
while read -r host; do
    ssh "$host" "uptime"
done < hosts.txt

Infrastructure Pipeline Examples

# Certificate expiry report
find /etc/ssl/certs -name "*.pem" -type f | \
    xargs -I{} openssl x509 -in {} -noout -enddate -subject 2>/dev/null | \
    paste - - | \
    awk -F'[=,]' '{
        gsub(/^ +| +$/, "", $2)
        gsub(/^ +| +$/, "", $4)
        print $2, $4
    }' | \
    sort -t' ' -k1 -M | \
    column -t

# k8s resource usage
kubectl top pods -A --no-headers | \
    awk '{
        ns=$1; pod=$2; cpu=$3; mem=$4
        gsub(/m$/, "", cpu)
        gsub(/Mi$/, "", mem)
        printf "%-20s %-40s %8s %8s\n", ns, pod, cpu"m", mem"Mi"
    }' | \
    sort -t' ' -k3 -rn | \
    head -20

# Log analysis with timestamps
journalctl -u sshd --since "1 hour ago" --no-pager | \
    grep "Failed" | \
    awk '{print $1, $2, $3, $11}' | \
    sort | \
    uniq -c | \
    sort -rn

# Vault audit log analysis
sudo cat /var/log/vault/audit.log | \
    jq -r 'select(.type == "response") | [.time, .request.operation, .request.path, .response.data.error // "OK"] | @tsv' | \
    column -t -s $'\t' | \
    tail -50

# ISE session duration analysis
netapi ise mnt sessions --format json | \
    jq -r '.[] | select(.session_state == "AUTHENTICATED") | [.user_name, .calling_station_id, .session_time] | @tsv' | \
    awk -F'\t' '{
        mins = $3 / 60
        printf "%-20s %-20s %8.1f mins\n", $1, $2, mins
    }' | \
    sort -t' ' -k3 -rn | \
    head -10

# Multi-host command with parallel + error handling
cat hosts.txt | \
    xargs -P10 -I{} sh -c 'ssh -o ConnectTimeout=5 {} "uptime" 2>/dev/null || echo "{}: UNREACHABLE"' | \
    sort

Pipeline Debugging

# Inspect intermediate stages with tee
command | tee /dev/stderr | next_command

# Number each stage for debugging
echo "Stage 0: Raw data"
cat data.txt | \
    { echo "Stage 1: Filter"; grep "pattern"; } | \
    { echo "Stage 2: Transform"; awk '{print $2}'; } | \
    { echo "Stage 3: Sort"; sort | uniq -c; }

# Verbose mode with set -x
set -x
cat file | grep pattern | wc -l
set +x

# Pipeline with error checking
cat file.txt 2>/dev/null | {
    if ! grep -q "pattern"; then
        echo "No matches found" >&2
        exit 1
    fi
} | wc -l

# Trace with BASH_COMMAND
trap 'echo "Executing: $BASH_COMMAND"' DEBUG
cat file | grep pattern | wc -l
trap - DEBUG

# Time each stage
time (cat large_file | sort | uniq | wc -l)

# More granular timing
start=$(date +%s.%N)
cat large_file > /tmp/stage1
echo "Stage 1: $(echo "$(date +%s.%N) - $start" | bc)s"

start=$(date +%s.%N)
sort /tmp/stage1 > /tmp/stage2
echo "Stage 2: $(echo "$(date +%s.%N) - $start" | bc)s"

Pipeline Gotchas

# WRONG: Variable assignment in pipeline loses value
total=0
cat numbers.txt | while read -r n; do
    ((total += n))
done
echo "Total: $total"                         # 0!

# CORRECT: Process substitution
total=0
while read -r n; do
    ((total += n))
done < <(cat numbers.txt)
echo "Total: $total"                         # Correct!

# WRONG: Exit in subshell doesn't exit script
cat hosts.txt | while read -r host; do
    ssh "$host" "test -f /critical/file" || exit 1  # Only exits subshell!
done
echo "This still runs!"                      # Oops!

# CORRECT: Track failure
failed=false
while read -r host; do
    if ! ssh "$host" "test -f /critical/file"; then
        failed=true
        break
    fi
done < hosts.txt
$failed && exit 1

# WRONG: Pipeline hides command errors
bad_command | good_command                   # Exit 0 if good_command succeeds!

# CORRECT: Use pipefail
set -o pipefail
bad_command | good_command                   # Now fails!

# WRONG: Assuming pipeline is sequential
echo "start" | sleep 5 | echo "end"          # "end" prints immediately!
# Pipeline stages run in parallel!

# WRONG: Large intermediate data
cat huge_file | sort | head -10              # sort loads EVERYTHING into memory!

# CORRECT: Limit early
head -10000 huge_file | sort | head -10      # Much better

# WRONG: Buffering delays output
tail -f logfile | grep "pattern"             # grep buffers, delays output

# CORRECT: Line-buffered
tail -f logfile | grep --line-buffered "pattern"
# or
tail -f logfile | stdbuf -oL grep "pattern"