Stream Processing Mastery

Overview

Unix philosophy: small tools that do one thing well, connected by pipes. Master stream processing to chain commands efficiently.

The Pipe Operator |

Basic Piping

# Output of left becomes input of right
command1 | command2 | command3

# Example: Find large files, sort, show top 10
du -ah /home | sort -rh | head -10

# Chain multiple filters
cat access.log | grep "ERROR" | cut -d' ' -f1 | sort | uniq -c | sort -rn

Pipeline Exit Codes

# By default, pipeline exit code is the LAST command
false | true
echo $?  # 0 (from true)

# Enable pipefail to catch failures anywhere in pipeline
set -o pipefail
false | true
echo $?  # 1 (from false)

# Check specific command in pipeline
cat file 2>/dev/null | grep pattern | wc -l
echo "${PIPESTATUS[@]}"  # Shows exit code of each command
# Example output: 0 1 0 (grep found nothing)

tee - Split Output Streams

Basic tee

# Send output to BOTH file AND stdout
command | tee output.txt

# Append instead of overwrite
command | tee -a output.txt

# Multiple files
command | tee file1.txt file2.txt file3.txt

# Suppress stdout (only write to file)
command | tee output.txt > /dev/null

Logging While Viewing

# Watch and log simultaneously
tail -f /var/log/syslog | tee debug-session.log

# Build with visible output AND log file
make 2>&1 | tee build.log

# Run tests, see output, save log
pytest -v 2>&1 | tee test-results.log

tee with sudo

# WRONG: redirection happens before sudo
echo "config line" | sudo tee > /etc/config

# CORRECT: tee runs as root
echo "nameserver 8.8.8.8" | sudo tee /etc/resolv.conf

# Append to root-owned file
echo "new entry" | sudo tee -a /etc/hosts

# Multiple lines
cat <<EOF | sudo tee /etc/myapp.conf
setting1 = value1
setting2 = value2
EOF

# Write to file without terminal output
echo "secret" | sudo tee /etc/secret.conf > /dev/null

Intermediate tee (Debug Pipelines)

# Debug: see what's passing through each stage
cat data.csv | \
    tee /tmp/stage1.txt | \
    grep -v "^#" | \
    tee /tmp/stage2.txt | \
    cut -d',' -f2,4 | \
    tee /tmp/stage3.txt | \
    sort -u

# After running, inspect each stage:
# cat /tmp/stage1.txt  # Raw input
# cat /tmp/stage2.txt  # After removing comments
# cat /tmp/stage3.txt  # After extracting columns

Process Substitution

Basic Syntax

# <(command) - Command output as a file
# >(command) - File input to command

# Compare two command outputs
diff <(ls /dir1) <(ls /dir2)

# Compare sorted outputs
diff <(sort file1) <(sort file2)

# Compare remote and local
diff <(ssh server cat /etc/config) /etc/config

Practical Examples

# Join data from two commands
paste <(cut -d: -f1 /etc/passwd) <(cut -d: -f3 /etc/passwd)

# Compare two git branches
diff <(git show main:config.yaml) <(git show dev:config.yaml)

# Verify download integrity
diff <(curl -s https://example.com/file.sha256) <(sha256sum downloaded.file)

# Compare formatted JSON
diff <(curl -s url1 | jq .) <(curl -s url2 | jq .)

# Feed multiple files to command expecting files
paste <(awk '{print $1}' file1) <(awk '{print $2}' file2) <(awk '{print $3}' file3)

Write to Multiple Destinations

# Write to file AND compress simultaneously
command | tee >(gzip > output.gz) > output.txt

# Send to both stdout and a processing pipeline
tail -f /var/log/syslog | tee >(grep ERROR > errors.log) >(grep WARN > warns.log)

# Log to file while also sending to remote
command | tee >(ssh remote "cat >> /var/log/remote.log") local.log

Named Pipes (FIFOs)

Creating and Using

# Create named pipe
mkfifo /tmp/mypipe

# Terminal 1: Writer
echo "Hello through pipe" > /tmp/mypipe

# Terminal 2: Reader
cat /tmp/mypipe

# Cleanup
rm /tmp/mypipe

Practical FIFO Examples

# Progress monitoring
mkfifo /tmp/progress

# Background: monitor progress
( while read line; do echo "Processing: $line"; done < /tmp/progress ) &

# Foreground: do work and report
for file in *.txt; do
    process "$file"
    echo "$file" > /tmp/progress
done

rm /tmp/progress

xargs - Build Commands from Input

Basic xargs

# Execute command for each input line
echo -e "file1\nfile2\nfile3" | xargs rm

# With argument placeholder
find . -name "*.log" | xargs -I {} mv {} /backup/

# Parallel execution
find . -name "*.png" | xargs -P 4 -I {} convert {} {}.jpg

Safe xargs with Null Delimiter

# Handle filenames with spaces/newlines
find . -name "*.txt" -print0 | xargs -0 rm

# With placeholder
find . -name "*.log" -print0 | xargs -0 -I {} gzip {}

xargs Arguments

Option Description Example

-I {}

Replace {} with input

xargs -I {} echo "File: {}"

-n N

N arguments per command

xargs -n 2 diff

-P N

Run N processes in parallel

xargs -P 4 process

-0

Null-delimited input

find -print0 | xargs -0

-t

Print commands before running

xargs -t rm

-p

Prompt before each command

xargs -p rm

-L N

Max N lines per command

xargs -L 1 cmd

xargs Patterns

# Run command for each line
cat urls.txt | xargs -n 1 curl -O

# Multiple arguments per command
echo "a b c d e f" | xargs -n 2 echo
# Output:
# a b
# c d
# e f

# Build complex commands
ls *.txt | xargs -I {} sh -c 'wc -l {} | awk "{print \$1, \"{}\"}"'

# Batch processing
cat servers.txt | xargs -P 10 -I {} ssh {} 'uptime'

Redirection in Pipelines

Stderr Handling

# Pipe stdout only (stderr to terminal)
command 2>/dev/null | next_command

# Pipe stderr only
command 2>&1 >/dev/null | grep ERROR

# Pipe both stdout and stderr
command 2>&1 | tee output.log

# Modern syntax (bash 4+)
command |& tee output.log

Separate Handling

# Different destinations for stdout and stderr
command > stdout.log 2> stderr.log

# Stderr to file, stdout continues in pipeline
command 2>errors.log | process_output

# Swap stdout and stderr
command 3>&1 1>&2 2>&3 | process_what_was_stderr

Real-World Patterns

Log Analysis Pipeline

# Full analysis pipeline
cat /var/log/auth.log | \
    grep "Failed password" | \
    awk '{print $(NF-3)}' | \
    sort | \
    uniq -c | \
    sort -rn | \
    head -20 | \
    tee failed-ips.txt | \
    awk '{print $2}' | \
    while read ip; do
        echo "$ip: $(geoiplookup $ip 2>/dev/null | cut -d: -f2)"
    done

Data Transformation Pipeline

# CSV to JSON pipeline
cat data.csv | \
    tail -n +2 | \              # Skip header
    while IFS=, read name age city; do
        printf '{"name":"%s","age":%s,"city":"%s"}\n' "$name" "$age" "$city"
    done | \
    jq -s '.'  # Combine into array

Multi-Host Command

# Run command on multiple servers, collect results
cat servers.txt | xargs -P 10 -I {} sh -c '
    echo "=== {} ==="
    ssh {} "df -h / | tail -1" 2>/dev/null || echo "FAILED: {}"
' | tee disk-report.txt

Parallel Download

# Download multiple files in parallel
cat urls.txt | xargs -P 5 -n 1 curl -sO

# With progress
cat urls.txt | xargs -P 5 -I {} sh -c 'curl -sO {} && echo "Done: {}"'

Build and Test Pipeline

# Build, test, and report
make 2>&1 | tee build.log | grep -E "error|warning" && \
    pytest -v 2>&1 | tee test.log | grep -E "PASSED|FAILED" && \
    echo "Build complete at $(date)" | tee -a build.log

Continuous Monitoring

# Monitor logs with multiple outputs
tail -f /var/log/syslog | tee \
    >(grep --line-buffered ERROR >> errors.log) \
    >(grep --line-buffered WARN >> warnings.log) \
    >(grep --line-buffered -c . | while read n; do echo "$(date): $n events"; done >> stats.log)

CTF Patterns

Extract and Decode

# Extract base64, decode, search
strings binary | grep -oE '[A-Za-z0-9+/]{20,}={0,2}' | while read b64; do
    decoded=$(echo "$b64" | base64 -d 2>/dev/null)
    if echo "$decoded" | grep -q "flag"; then
        echo "FOUND: $decoded"
    fi
done

# Hex dump and search
xxd binary | tee hexdump.txt | grep -i "flag"

# Extract embedded files
binwalk -e firmware.bin 2>&1 | tee extraction.log

Network Analysis

# Extract IPs from pcap, lookup, dedupe
tshark -r capture.pcap -T fields -e ip.src -e ip.dst 2>/dev/null | \
    tr '\t' '\n' | \
    sort -u | \
    while read ip; do
        echo "$ip: $(host $ip 2>/dev/null | tail -1)"
    done | tee ip-analysis.txt

# HTTP request URLs
tshark -r capture.pcap -Y "http.request" -T fields -e http.host -e http.request.uri | \
    awk '{print "http://"$1$2}' | \
    sort -u

Password Cracking Pipeline

# Generate candidates and test
cat wordlist.txt | \
    tee >(cat) \
        >(sed 's/a/4/g; s/e/3/g; s/i/1/g; s/o/0/g') \
        >(awk '{print $0"123"}') \
        >(awk '{print toupper(substr($0,1,1)) substr($0,2)}') | \
    sort -u | \
    while read pass; do
        if echo -n "$pass" | md5sum | grep -q "^$target_hash"; then
            echo "FOUND: $pass"
            break
        fi
    done

Performance Optimization

Buffer Control

# Line-buffered output (for real-time)
grep --line-buffered pattern file | process

# Unbuffered (immediate)
stdbuf -oL command | next_command

# Larger buffer for throughput
stdbuf -o1M command | next_command

Parallel Processing

# GNU Parallel for heavy lifting
cat urls.txt | parallel -j 10 curl -sO {}

# Parallel with progress
cat files.txt | parallel --bar gzip {}

# Parallel pipeline stages
cat data | parallel --pipe -N1000 'sort' | sort -m

Quick Reference

# Basic pipe
cmd1 | cmd2                     # stdout of cmd1 to stdin of cmd2

# tee - split stream
cmd | tee file                  # stdout to file AND continue
cmd | tee -a file               # append to file
cmd | tee >(cmd2)               # to another command

# Process substitution
diff <(cmd1) <(cmd2)            # compare outputs
cmd > >(processor)              # output to command

# xargs - build commands
input | xargs cmd               # pass all as args
input | xargs -n 1 cmd          # one arg per invocation
input | xargs -I {} cmd {}      # placeholder
find -print0 | xargs -0 cmd     # null-delimited (safe)
input | xargs -P 4 cmd          # parallel

# Stderr handling
cmd 2>&1 | next                 # include stderr
cmd 2>/dev/null | next          # discard stderr
cmd |& next                     # bash 4+ syntax

# Pipeline status
echo "${PIPESTATUS[@]}"         # exit codes of all commands
set -o pipefail                 # fail on any failure