This started a while ago as a simple sandbox to play around with system level components, and how to use Python to glue them together. Then I found myself using those files as my reference for new projects, at which point I realise I need to have a local repo for proper change management. And then I thought, I might as well put it in my repo and clean things up (Claude added test and gave me a code review before hand).
Finally, I decide to use uv and uv_build - as opposed to pip and hatchling - as all the cool-kids are nowadays. So that was a nice change.
A demo project that wires together Kafka, RabbitMQ, Redis, and Cassandra into a simple event-processing pipeline. Built as a reference example for learning how these technologies connect.
┌── Consumer 1 (a-g) ──┐ ┌── Worker 1 (a-g) ──┐
├── Consumer 2 (h-m) ──┤──> Redis (counters) ├── Worker 2 (h-m) ──┤
Producer --> Kafka (4p) ─┤ ├──> Cassandra (events) ├── Worker 3 (n-t) ──┼──> Redis (job status)
├── Consumer 3 (n-t) ──┤──> RabbitMQ (4q) ──────>├── Worker 4 (u-z) ──┤
└── Consumer 4 (u-z) ──┘ └────────────────────┘
API (FastAPI) reads from Redis and Cassandra
Producer generates fake pageview events and routes them to one of 4 Kafka partitions based on the first letter of the username (a-g, h-m, n-t, u-z).
Consumers (x4) each read from one partition and fan out to three destinations:
- Redis — increments page view counters and tracks each user's last visited page
- Cassandra — stores a persistent log of all pageview events
- RabbitMQ — publishes events to one of 4 partitioned queues using the same username routing (a-g, h-m, n-t, u-z)
Workers (x4) each consume from one RabbitMQ queue and mark jobs as processed in Redis.
API exposes the pipeline data via HTTP endpoints:
GET /counts/page/{page}— page view countGET /users/{user_id}/last-page— last page a user visitedGET /events/{user_id}— recent pageview events for a user
git clone <repo-url>
cd basic-data-pipeline
uv sync --all-extrasdocker compose up -dThis starts Zookeeper, Kafka, RabbitMQ, Redis, and Cassandra. All services have healthchecks — you can monitor their status with:
docker compose psWait until all services show as healthy before proceeding. Cassandra is the slowest and can take up to 2 minutes.
Once Cassandra is healthy, load the schema:
docker compose cp cassandra_schema.cql cassandra:/tmp/schema.cql
docker compose exec cassandra cqlsh -f /tmp/schema.cqlActivate the virtual enviroment for Python:
source .venv/bin/activateUse honcho to run all processes at once via the Procfile:
uv run honcho startThis starts the producer, 4 consumers, 4 workers, and API server simultaneously. Each process is labelled in the log output.
To run individual components instead:
uv run producer
# 4 consumers — each joins the same consumer group, so Kafka assigns one partition to each
uv run consumer # run in 4 separate terminals
uv run consumer
uv run consumer
uv run consumer
# 4 workers — each reads from a specific RabbitMQ queue
RABBITMQ_QUEUE=analytics_jobs_0 uv run worker
RABBITMQ_QUEUE=analytics_jobs_1 uv run worker
RABBITMQ_QUEUE=analytics_jobs_2 uv run worker
RABBITMQ_QUEUE=analytics_jobs_3 uv run worker
uv run apiEach consumer instance joins the same Kafka consumer group (pipeline-consumer), so Kafka automatically assigns one partition to each. Each worker instance reads from a dedicated RabbitMQ queue, matching the same username partitioning scheme used by Kafka.
All settings default to localhost for local development. Override via environment variables:
| Variable | Default | Description |
|---|---|---|
KAFKA_SERVER |
localhost:9092 |
Kafka bootstrap server |
KAFKA_TOPIC |
pageviews |
Kafka topic name |
KAFKA_PARTITIONS |
4 |
Number of Kafka partitions |
REDIS_HOST |
localhost |
Redis host |
REDIS_PORT |
6379 |
Redis port |
RABBITMQ_HOST |
localhost |
RabbitMQ host |
RABBITMQ_QUEUE |
analytics_jobs |
RabbitMQ queue base name |
RABBITMQ_PARTITIONS |
4 |
Number of RabbitMQ queues |
CASSANDRA_HOST |
localhost |
Cassandra host |
CASSANDRA_KEYSPACE |
pipeline |
Cassandra keyspace |
The API runs on http://localhost:8000 by default. Example queries using curl:
# Get the view count for the /pricing page
curl http://localhost:8000/counts/page/pricing
# Get the last page visited by a user
curl http://localhost:8000/users/john_doe/last-page
# Get recent pageview events for a user
curl http://localhost:8000/events/john_doeExample responses:
// GET /counts/page/pricing
{"page": "pricing", "count": 42}
// GET /users/john_doe/last-page
{"user": "john_doe", "last_page": "/docs"}
// GET /events/john_doe
[
{
"user_id": "john_doe",
"event_id": "a1b2c3d4-...",
"event_time": "2026-03-15T10:30:00",
"page": "/pricing"
}
]FastAPI also generates interactive API docs at http://localhost:8000/docs.
To list topics:
docker compose exec kafka kafka-topics --bootstrap-server localhost:9092 --listTo see topic details including partition count:
docker compose exec kafka kafka-topics --bootstrap-server localhost:9092 --describe --topic pageviewsTo read messages from all partitions (from the beginning):
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic pageviews --from-beginningTo read messages from a specific partition (e.g. partition 0, usernames a-g):
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic pageviews --partition 0 --from-beginningTo see consumer group offsets per partition (which consumer owns which partition, and how far behind each is):
docker compose exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 --group pipeline-consumer --describeTo open a CQL shell:
docker compose exec cassandra cqlshUseful queries once inside the shell:
-- Switch to the pipeline keyspace
USE pipeline;
-- Describe the schema
DESCRIBE TABLES;
DESCRIBE TABLE pageviews;
-- View recent events (most recent first)
SELECT * FROM pageviews LIMIT 10;
-- View events for a specific user
SELECT * FROM pageviews WHERE user_id = 'some_username' LIMIT 10;
-- View events for a user within a time range
SELECT * FROM pageviews WHERE user_id = 'some_username'
AND event_time > '2026-03-15 00:00:00' LIMIT 10;
-- Count total rows (slow on large tables)
SELECT COUNT(*) FROM pageviews;
-- Count events per user (requires ALLOW FILTERING)
SELECT user_id, COUNT(*) FROM pageviews GROUP BY user_id;To open a Redis CLI session:
docker compose exec redis redis-cliUseful commands once inside the CLI:
# List all keys matching a pattern
KEYS pageviews:*
KEYS user:last_page:*
KEYS job:*
# Get a page view count
GET pageviews:/pricing
# Get a user's last visited page
GET user:last_page:some_username
# Check if a job was processed
GET job:some-event-uuid
With Docker running, you can access:
- RabbitMQ Management — http://localhost:15672 (guest/guest)
Tests use pytest with mocked external services — no Docker required.
# Activate the virtual env
source .venv/bin/activate
# Run all tests
uv run pytest
# Run with verbose output
uv run pytest -v
# Run a specific test file
uv run pytest tests/test_api.py
# Run a specific test by name
uv run pytest -k "test_page_count"src/pipeline/
config.py # Configuration (env vars with defaults)
producer.py # Generates fake events, publishes to Kafka
kafka_consumer.py # Reads Kafka, writes to Redis/Cassandra/RabbitMQ
rabbitmq_worker.py # Processes RabbitMQ jobs
api.py # FastAPI endpoints for querying pipeline data
tests/
test_config.py # Env var overrides and defaults
test_producer.py # Event creation and validation
test_kafka_consumer.py # Redis/Cassandra updates and message fan-out
test_rabbitmq_worker.py # Job processing and acknowledgement
test_api.py # API endpoint responses