Overview
This project builds a real-time data pipeline for Uber pickup analytics in New York City. Using historical trip data from the NYC TLC FOIL dataset, the system simulates a live Kafka data stream, processes it with Apache Spark Structured Streaming, stores aggregated KPIs in PostgreSQL, and visualizes them in Grafana.
┌──────────┐ ┌────────────┐ ┌──────────────┐ ┌─────────────┐ ┌────────────┐ │ CSV Data │ ───▶ │ Kafka Prod │ ───▶ │ Spark Stream │ ───▶ │ PostgreSQL │ ───▶ │ Grafana UI │ └──────────┘ └────────────┘ └──────────────┘ └─────────────┘ └────────────┘ (Uber raw trip files) (Kafka topic → structured streaming) (aggregated KPIs)
Technologies: Apache Spark 3.5.1 Apache Kafka 3.7 PostgreSQL 16 Grafana 10 Docker + Docker Compose Python 3.11 / PySpark / Kafka-Python
Repository Layout NYC-UBER-SPARK ├─ batch/ │ └─ batch_etl.py # Optional/batch backfills ├─ data/ # CSVs + checkpoint + DB volume mounts ├─ notebooks/ # Optional exploration ├─ scripts/ # Optional helpers ├─ streaming/ │ ├─ producer.py # CSV → Kafka simulator │ └─ spark_streaming.py # Main streaming job ├─ docker-compose.yml ├─ Dockerfile.producer ├─ Dockerfile.spark ├─ grafana_dashboard.json # Importable Grafana dashboard └─ README.md
Data & Topics
Kafka topic: uber_trips
Producer CSVs (mounted to /app/data)
uber-raw-data-apr14.csv
uber-raw-data-janjune-15.csv
taxi-zone-lookup.csv (zone metadata for joins)
Event format (JSON per record; typical fields):
{ "pickup_datetime": "2014-04-01T00:11:00", "pu_location_id": 142, "do_location_id": 236, "passenger_count": 1, "trip_distance": 2.1, "fare_amount": 9.0, "tip_amount": 2.1, "total_amount": 12.48, "payment_type": 1 }
Quick Start
- Bring up the stack docker compose up -d --build
Starts: kafka, postgres, grafana, producer, spark. Kafka is advertised as kafka:9092 for in-cluster access; port 9092 is published for host tooling.
- Verify Kafka and topic
Auto-create is enabled, but you can check:
docker compose exec kafka kafka-topics.sh --list --bootstrap-server kafka:9092
- Producer (CSV → Kafka)
The producer container is built from Dockerfile.producer and uses env vars from docker-compose.yml:
BOOTSTRAP=kafka:9092
TOPIC=uber_trips
CSV_2014=/app/data/uber-raw-data-apr14.csv
CSV_2015=/app/data/uber-raw-data-janjune-15.csv
Follow its logs to see streaming:
docker compose logs -f producer
- Spark Streaming job
The spark container (from Dockerfile.spark) auto-runs:
spark-submit --master local[*] /app/streaming/spark_streaming.py
--bootstrap kafka:9092
--topic uber_trips
--zones /app/data/taxi-zone-lookup.csv
--starting_offsets earliest
--sink postgres
--checkpoint /app/data/chk_pg
--pg_url jdbc:postgresql://postgres:5432/nyc_uber
--pg_user postgres
--pg_password postgres
--pg_table kpis_rides_minute
Tail logs:
docker compose logs -f spark
- Grafana
Open: http://localhost:3000 (admin / admin on first login)
Add PostgreSQL data source:
Host: postgres:5432
Database: nyc_uber
User: postgres
Password: postgres
Import grafana_dashboard.json and switch its data source to the one you added.
What the Streaming Job Does
streaming/spark_streaming.py:
Consumes JSON events from Kafka topic uber_trips (readStream.format("kafka")).
Parses & casts fields, converts pickup timestamps, and joins with TLC zone metadata from --zones (broadcast/static).
Aggregates in tumbling windows (e.g., minute/15-min) to compute KPIs:
Per-zone pickup counts
Citywide volume & distance metrics
Optional Top-N zone leaderboard
Writes to Postgres using foreachBatch with idempotent upserts (primary keys on window + zone).
Uses --checkpoint directory to maintain offsets & exactly-once semantics.
PostgreSQL Tables (typical)
The script can create or upsert into these; adapt to your naming if needed.
-- Minute window pickups per zone CREATE TABLE IF NOT EXISTS kpis_rides_minute ( window_start TIMESTAMP NOT NULL, window_end TIMESTAMP NOT NULL, pu_location_id INT NOT NULL, trip_count BIGINT NOT NULL, PRIMARY KEY (window_start, window_end, pu_location_id) );
-- Optional citywide rollup CREATE TABLE IF NOT EXISTS kpi_citywide ( window_start TIMESTAMP NOT NULL, window_end TIMESTAMP NOT NULL, trips BIGINT NOT NULL, avg_trip_distance DOUBLE PRECISION, total_revenue DOUBLE PRECISION, PRIMARY KEY (window_start, window_end) );
-- Optional zone lookup (seeded from taxi-zone-lookup.csv) CREATE TABLE IF NOT EXISTS tlc_zones ( location_id INT PRIMARY KEY, borough TEXT, zone TEXT, service_zone TEXT );
Sample Queries (Grafana panels)
Minute volume by zone
SELECT window_start AS time, pu_location_id, trip_count FROM kpis_rides_minute ORDER BY time ASC;
Latest Top-N zones (if you materialize a top table/view)
WITH lastw AS ( SELECT MAX(window_start) AS ws FROM kpis_rides_minute ) SELECT z.zone, km.trip_count FROM kpis_rides_minute km JOIN lastw ON km.window_start = lastw.ws JOIN tlc_zones z ON z.location_id = km.pu_location_id ORDER BY km.trip_count DESC LIMIT 10;
Operational Commands
docker compose up -d --build
docker compose logs -f
docker compose logs -f producer docker compose logs -f spark docker compose logs -f kafka docker compose logs -f postgres
docker compose exec spark bash
docker compose down
docker compose down -v
Common Pitfalls
Producer/Spark can’t reach Kafka Ensure KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 in compose (already set).
Spark Kafka package mismatch Use org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 if you customize spark-submit.
Timezone differences Prefer UTC in Spark (spark.sql.session.timeZone=UTC) and Grafana.
Extending
Add KPIs: tip rate, dropoff KPIs, hourly windows, percentile trip distances.
Enrich with weather/events; create correlation panels in Grafana.
Swap Postgres for ClickHouse/BigQuery for larger datasets.
Use batch/batch_etl.py to backfill historical windows into the same tables.