Skip to content

sameer08055/nyc-uber-spark

Repository files navigation

Overview

This project builds a real-time data pipeline for Uber pickup analytics in New York City. Using historical trip data from the NYC TLC FOIL dataset, the system simulates a live Kafka data stream, processes it with Apache Spark Structured Streaming, stores aggregated KPIs in PostgreSQL, and visualizes them in Grafana.

┌──────────┐ ┌────────────┐ ┌──────────────┐ ┌─────────────┐ ┌────────────┐ │ CSV Data │ ───▶ │ Kafka Prod │ ───▶ │ Spark Stream │ ───▶ │ PostgreSQL │ ───▶ │ Grafana UI │ └──────────┘ └────────────┘ └──────────────┘ └─────────────┘ └────────────┘ (Uber raw trip files) (Kafka topic → structured streaming) (aggregated KPIs)

Technologies: Apache Spark 3.5.1 Apache Kafka 3.7 PostgreSQL 16 Grafana 10 Docker + Docker Compose Python 3.11 / PySpark / Kafka-Python

Repository Layout NYC-UBER-SPARK ├─ batch/ │ └─ batch_etl.py # Optional/batch backfills ├─ data/ # CSVs + checkpoint + DB volume mounts ├─ notebooks/ # Optional exploration ├─ scripts/ # Optional helpers ├─ streaming/ │ ├─ producer.py # CSV → Kafka simulator │ └─ spark_streaming.py # Main streaming job ├─ docker-compose.yml ├─ Dockerfile.producer ├─ Dockerfile.spark ├─ grafana_dashboard.json # Importable Grafana dashboard └─ README.md

Data & Topics

Kafka topic: uber_trips

Producer CSVs (mounted to /app/data)

uber-raw-data-apr14.csv

uber-raw-data-janjune-15.csv

taxi-zone-lookup.csv (zone metadata for joins)

Event format (JSON per record; typical fields):

{ "pickup_datetime": "2014-04-01T00:11:00", "pu_location_id": 142, "do_location_id": 236, "passenger_count": 1, "trip_distance": 2.1, "fare_amount": 9.0, "tip_amount": 2.1, "total_amount": 12.48, "payment_type": 1 }

Quick Start

  1. Bring up the stack docker compose up -d --build

Starts: kafka, postgres, grafana, producer, spark. Kafka is advertised as kafka:9092 for in-cluster access; port 9092 is published for host tooling.

  1. Verify Kafka and topic

Auto-create is enabled, but you can check:

docker compose exec kafka kafka-topics.sh --list --bootstrap-server kafka:9092

expect: uber_trips (after producer starts)

  1. Producer (CSV → Kafka)

The producer container is built from Dockerfile.producer and uses env vars from docker-compose.yml:

BOOTSTRAP=kafka:9092

TOPIC=uber_trips

CSV_2014=/app/data/uber-raw-data-apr14.csv

CSV_2015=/app/data/uber-raw-data-janjune-15.csv

Follow its logs to see streaming:

docker compose logs -f producer

  1. Spark Streaming job

The spark container (from Dockerfile.spark) auto-runs:

spark-submit --master local[*] /app/streaming/spark_streaming.py
--bootstrap kafka:9092
--topic uber_trips
--zones /app/data/taxi-zone-lookup.csv
--starting_offsets earliest
--sink postgres
--checkpoint /app/data/chk_pg
--pg_url jdbc:postgresql://postgres:5432/nyc_uber
--pg_user postgres
--pg_password postgres
--pg_table kpis_rides_minute

Tail logs:

docker compose logs -f spark

  1. Grafana

Open: http://localhost:3000 (admin / admin on first login)

Add PostgreSQL data source:

Host: postgres:5432

Database: nyc_uber

User: postgres

Password: postgres

Import grafana_dashboard.json and switch its data source to the one you added.

What the Streaming Job Does

streaming/spark_streaming.py:

Consumes JSON events from Kafka topic uber_trips (readStream.format("kafka")).

Parses & casts fields, converts pickup timestamps, and joins with TLC zone metadata from --zones (broadcast/static).

Aggregates in tumbling windows (e.g., minute/15-min) to compute KPIs:

Per-zone pickup counts

Citywide volume & distance metrics

Optional Top-N zone leaderboard

Writes to Postgres using foreachBatch with idempotent upserts (primary keys on window + zone).

Uses --checkpoint directory to maintain offsets & exactly-once semantics.

PostgreSQL Tables (typical)

The script can create or upsert into these; adapt to your naming if needed.

-- Minute window pickups per zone CREATE TABLE IF NOT EXISTS kpis_rides_minute ( window_start TIMESTAMP NOT NULL, window_end TIMESTAMP NOT NULL, pu_location_id INT NOT NULL, trip_count BIGINT NOT NULL, PRIMARY KEY (window_start, window_end, pu_location_id) );

-- Optional citywide rollup CREATE TABLE IF NOT EXISTS kpi_citywide ( window_start TIMESTAMP NOT NULL, window_end TIMESTAMP NOT NULL, trips BIGINT NOT NULL, avg_trip_distance DOUBLE PRECISION, total_revenue DOUBLE PRECISION, PRIMARY KEY (window_start, window_end) );

-- Optional zone lookup (seeded from taxi-zone-lookup.csv) CREATE TABLE IF NOT EXISTS tlc_zones ( location_id INT PRIMARY KEY, borough TEXT, zone TEXT, service_zone TEXT );

Sample Queries (Grafana panels)

Minute volume by zone

SELECT window_start AS time, pu_location_id, trip_count FROM kpis_rides_minute ORDER BY time ASC;

Latest Top-N zones (if you materialize a top table/view)

WITH lastw AS ( SELECT MAX(window_start) AS ws FROM kpis_rides_minute ) SELECT z.zone, km.trip_count FROM kpis_rides_minute km JOIN lastw ON km.window_start = lastw.ws JOIN tlc_zones z ON z.location_id = km.pu_location_id ORDER BY km.trip_count DESC LIMIT 10;

Operational Commands

Build & start

docker compose up -d --build

See all logs

docker compose logs -f

Service-specific logs

docker compose logs -f producer docker compose logs -f spark docker compose logs -f kafka docker compose logs -f postgres

Exec into spark shell

docker compose exec spark bash

Stop / remove

docker compose down

wipe volumes (⚠️ deletes Postgres data and Kafka logs)

docker compose down -v

Common Pitfalls

Producer/Spark can’t reach Kafka Ensure KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 in compose (already set).

Spark Kafka package mismatch Use org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 if you customize spark-submit.

Timezone differences Prefer UTC in Spark (spark.sql.session.timeZone=UTC) and Grafana.

Extending

Add KPIs: tip rate, dropoff KPIs, hourly windows, percentile trip distances.

Enrich with weather/events; create correlation panels in Grafana.

Swap Postgres for ClickHouse/BigQuery for larger datasets.

Use batch/batch_etl.py to backfill historical windows into the same tables.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages