High-performance streaming pipeline using Python, Redis Streams, and Postgres. No Kafka needed.
Everyone thinks you need Kafka for streaming. Not really.
Python Can Handle Streaming — Here's How:
- Python's asyncio is I/O-bound friendly — the GIL doesn't matter when you're waiting on network or disk
- Redis Streams + asyncpg give you persistence, consumer groups, and bulk inserts — all the primitives you need
- Most "real-time" workloads are under 10K events/sec — Python handles that on a single box without breaking a sweat
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Producers │─────▶│ Redis Stream │─────▶│ Consumers │
│ (async) │ │ (buffer) │ │ (5 workers)│
└─────────────┘ └──────────────┘ └──────┬──────┘
│
│ Batch Insert
│ (500 events or 2s)
▼
┌─────────────┐
│ Postgres │
│ (JSONB cols)│
└─────────────┘
Key Components:
- Docker Compose with three services — Redis, Postgres, Python app running with uv
- uv syncs dependencies in 200ms — faster Docker builds, smaller images, no pip confusion
- redis-py with connection pooling — one connection per asyncio worker, reused across requests
- Redis Streams with XREADGROUP — consumer groups claim messages, no duplicate processing
asyncio.create_task()spawns your consumer pool — 5-10 workers per core is the sweet spot- Each worker reads 100 events from Redis Stream —
XREADGROUP BLOCK 5000 COUNT 100 - Batch accumulates until 500 records or 2 seconds — whichever comes first
asyncpg.executemany()bulk inserts to Postgres — one round-trip beats 500 inserts
- Backpressure happens automatically — queue.put() blocking slows producers naturally
- Dead letter queue is another Redis Stream — just XADD to 'stream:dead' with error details
- XACK after Postgres insert — crash recovery replays unacknowledged messages for free
- Postgres JSONB columns store raw events — schema evolution doesn't break the pipeline
XLENmonitors stream depth — if it grows past 10K, fix consumers, not producerspg_stat_statementstracks insert latency — tune slowest queries first- The entire engine is ~250 lines of Python — producer loop, consumer pool, batch inserter, that's it
It runs 10K events/sec on one box.
If it breaks, you've earned enough to hire a team for Kafka.
- Docker and Docker Compose
- Python 3.11+ (for local development)
- uv - Fast Python package installer
Install uv:
curl -LsSf https://astral.sh/uv/install.sh | shcd python-redis-streaming
cp .env.example .envThe run.sh script handles everything:
# Start Redis and Postgres
./run.sh
# Start the streaming engine
./run.sh startIn another terminal:
# Produce 1000 sample events
./run.sh produce 1000
# Or produce 5000 events
./run.sh produce 5000In another terminal:
# Monitor with 5 second refresh
./run.sh monitor
# Or with 10 second refresh
./run.sh monitor 10You'll see:
============================================================
STREAMING PIPELINE DASHBOARD
============================================================
Redis Streams:
Stream Length: 1523
Pending Messages: 0
DLQ Length: 0
Postgres:
Total Events: 8477
Events (last minute): 1000
DLQ Count: 0
Table Size: 1256 kB
Events by Type:
order_placed: 1243
user_created: 1189
payment_processed: 1156
product_viewed: 1098
order_shipped: 1067
============================================================
# Make sure services are running first
./run.sh
# Run tests
./run.sh testTest the throughput:
# Run at 10K events/sec for 60 seconds
./run.sh benchmark 10000 60
# Run at 5K events/sec for 30 seconds
./run.sh benchmark 5000 30Expected output:
============================================================
STREAMING BENCHMARK
============================================================
Target Rate: 10000 events/sec
Duration: 60 seconds
Expected Total: 600000 events
============================================================
Produced 1000 events...
Produced 2000 events...
...
Produced 600000 events...
Completed: 600000 total events produced
============================================================
BENCHMARK RESULTS
============================================================
Total Events: 600000
Elapsed Time: 60.02 seconds
Actual Rate: 9996.67 events/sec
Target Rate: 10000 events/sec
Accuracy: 99.97%
============================================================
python-redis-streaming/
├── src/
│ ├── config.py # Configuration management
│ ├── producer.py # Event producer
│ ├── consumer.py # Event consumer with batching
│ ├── monitor.py # Observability utilities
│ └── main.py # Main application
├── tests/
│ ├── test_producer.py
│ └── test_consumer.py
├── scripts/
│ ├── produce_sample.py # Produce sample events
│ ├── monitor.py # Standalone monitoring
│ └── benchmark.py # Benchmark script
├── docker-compose.yml
├── Dockerfile
├── init.sql # Postgres schema
├── pyproject.toml
├── run.sh # Main entry script
└── README.md
Edit .env to customize:
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
# Postgres
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=streaming
POSTGRES_USER=streaming_user
POSTGRES_PASSWORD=streaming_pass
# Streaming config
NUM_WORKERS=5 # Consumer workers
BATCH_SIZE=500 # Events per batch
BATCH_TIMEOUT_SECONDS=2 # Max wait time for batch
XREAD_COUNT=100 # Events to read per XREADGROUP
XREAD_BLOCK_MS=5000 # Block time for XREADGROUP# Produce single event
await producer.produce('user_created', {
'user_id': 123,
'email': 'user@example.com'
})
# Produce batch
events = [
('order_placed', {'order_id': 'ORD-123', 'amount': 99.99}),
('payment_processed', {'txn_id': 'TXN-456'})
]
await producer.produce_batch(events)Consumers automatically:
- Read from Redis Stream using consumer groups
- Accumulate events into batches
- Bulk insert to Postgres when batch is full or timeout expires
- Acknowledge messages after successful insert
- Send failed messages to dead letter queue
# Get stream length
stream_len = await monitor.get_stream_length()
# Get pending messages
pending = await monitor.get_pending_messages()
# Get Postgres stats
stats = await monitor.get_postgres_stats()# Start everything with Docker Compose
docker-compose up -d
# View logs
docker-compose logs -f streaming-app
# Stop everything
docker-compose down# Install dependencies
uv sync
# Start Redis and Postgres only
docker-compose up -d redis postgres
# Run locally
uv run python -m src.main
# Run tests
uv run pytest tests/ -v# Check Redis stream length
docker-compose exec redis redis-cli XLEN events
# Check Postgres data
docker-compose exec postgres psql -U streaming_user -d streaming -c "SELECT COUNT(*) FROM events;"
# View consumer groups
docker-compose exec redis redis-cli XINFO GROUPS events
# Clear all data
./run.sh clean- Tune batch size: Larger batches = better throughput, higher latency
- Add more workers: Scale horizontally by increasing NUM_WORKERS
- Connection pooling: Already configured optimally
- Postgres tuning: Use JSONB indexes for query optimization
- Monitor XLEN: If stream grows, add more consumers
Consumers can't keep up with producers. Solutions:
- Increase NUM_WORKERS
- Increase BATCH_SIZE
- Check Postgres insert latency
Check Postgres performance:
SELECT * FROM pg_stat_statements
WHERE query LIKE '%INSERT INTO events%';Check dead letter queue:
docker-compose exec postgres psql -U streaming_user -d streaming -c "SELECT * FROM dead_letter_queue ORDER BY failed_at DESC LIMIT 10;"