FleetOps Analytics is a batch analytics project built with Scala, Apache Spark, and SBT. It processes NYC yellow taxi trip data through a medallion-style pipeline and produces curated operational datasets for monitoring trip volumes, revenue hotspots, anomalies, and data quality.
The repository already contains:
- a raw yellow taxi sample dataset for January 2025
- a taxi zone lookup used for enrichment
- runnable Spark jobs for bronze, silver, and gold layers
- tests covering schema validation, normalization, enrichment, aggregations, and reporting
The pipeline is designed around a small, explicit set of analytical capabilities:
- ingest raw yellow taxi trip parquet files into a partitioned bronze layer
- validate source records and keep row-level validation results
- normalize the raw taxi schema into a cleaner analytics schema
- enrich trips with pickup and dropoff borough/zone metadata
- aggregate daily zone KPIs
- aggregate hourly pickup hotspots and revenue concentration
- flag suspicious or operationally interesting trips with rule-based anomaly detection
- emit a compact data quality report across bronze, silver, and gold outputs
This codebase follows a medallion architecture:
raw: externally sourced files and lookup tablesbronze: source-aligned records with validation metadata and ingestion lineagesilver: cleaned and business-friendly trip recordsgold: analytics-ready marts and monitoring outputs
Processing is partitioned by year and month and stored as parquet in overwrite mode. Jobs are implemented as independent Spark entrypoints and share common runtime wiring through:
JobRunner: loads configuration, creates the Spark session, runs the job, and stops SparkSparkSessionFactory: applies Spark runtime settings fromapplication.confDataFrameOpsand transformation modules: keep transformation logic separate from job orchestrationPartitionedParquetWriter: standardizes partitioned parquet output
- Scala 2.13.16
- Apache Spark 4.0.1 (
spark-core,spark-sql) - SBT
- Typesafe Config
- ScalaTest
src/main/scala/com/fleetops/analytics/
common/ Shared Spark runtime, processing window, writers, helpers
config/ Typed configuration loaders
domain/ Schemas, models, job arguments
job/
bronze/ Raw ingestion jobs
silver/ Cleansing and enrichment jobs
gold/ Aggregations, anomaly detection, quality reporting
transformation/ Pure transformation logic used by jobs
src/main/resources/
application.conf Default local configuration
data/
raw/ Source parquet and lookup CSV files
bronze/ Bronze layer outputs
silver/ Silver layer outputs
gold/ Gold layer outputs
Current local inputs in the repository:
- raw trip file:
data/raw/2025/yellow_tripdata_2025-01.parquet - taxi zone lookup:
data/raw/lookup/taxi_zone_lookup.csv - reference schema note:
data/raw/schema/yellow_tripdata_2025-01.md
The source schema expected by bronze ingestion is defined in YellowTripSourceSchema and matches the yellow taxi parquet columns such as:
- vendor and trip timestamps
- pickup and dropoff location IDs
- passenger, distance, payment, and fare fields
- surcharge and total amount fields
Bronze ingestion performs a strict schema match before processing.
Output path: data/bronze/trips
Purpose:
- ingest raw parquet
- validate source rows
- add ingestion metadata
- partition records by
yearandmonth
Validation rules currently implemented:
- pickup time must be before dropoff time
- trip distance must not be negative or
NaN - fare amount must not be negative or
NaN
Bronze output columns:
- all source yellow taxi columns
validation_errors: array<string>ingestion_timestamp: timestamp_ntzsource_file: stringyear: intmonth: int
Output path: data/silver/trips
Purpose:
- drop invalid bronze records
- rename and cast source columns into analytics-friendly names
- derive trip duration in seconds
Output columns:
vendorIdpickupDatetimedropoffDatetimepickupLocationIddropoffLocationIdpassengerCounttripDistancefareAmounttipAmounttotalAmountpaymentTypetripDurationSecondsyearmonth
Output path: data/silver/trips_enriched
Purpose:
- join the silver trips dataset with taxi zone lookup metadata
- add human-readable pickup and dropoff zone attributes
Additional output columns on top of normalized trips:
pickupBoroughpickupZonedropoffBoroughdropoffZone
The lookup join is broadcast in Spark, which is appropriate for the small taxi zone reference dataset.
Output path: data/gold/daily_zone_metrics
Purpose:
- provide per-day, per-pickup-zone operational KPIs
Output schema:
tripDatepickupLocationIdpickupBoroughpickupZonetripsCounttotalTripDistanceavgTripDistancetotalFareAmountavgFareAmounttotalTipAmounttotalRevenueavgTripDurationSecondsyearmonth
Output path: data/gold/hourly_hotspots
Purpose:
- identify high-traffic pickup zones by hour
- measure revenue concentration across zones and hours
Output schema:
tripDatepickupHourpickupLocationIdpickupBoroughpickupZonetripsCounttotalRevenueavgRevenuePerTripyearmonth
This dataset is sorted within partitions by date, hour, descending trip volume, and pickup location ID.
Output path: data/gold/trip_anomalies
Purpose:
- flag trips that match configurable anomaly rules
Selected output columns:
pickupDatetimepickupLocationIdpickupZonedropoffLocationIddropoffZonetripDistancefareAmounttotalAmounttipAmounttripDurationSecondsanomalyFlagsisAnomalousyearmonth
Current anomaly rules are driven by configuration:
- very long duration
- very high fare
- very high tip
- low distance with high fare
Output path: data/gold/data_quality_report
Purpose:
- summarize data quality and record flow across the pipeline for one processing window
Output schema:
processingDateyearmonthdatasetNamemetricNamemetricValue
Current metrics include:
- bronze total records, invalid records, invalid ratio
- silver normalized records, dropped invalid records, normalized ratio
- silver enrichment missing pickup/dropoff zone counts and ratios
- gold anomaly counts and ratios
All jobs are standalone Spark entrypoints under src/main/scala/com/fleetops/analytics/job.
Use the local Python orchestrator to run the full pipeline in the required dependency order:
./scripts/run_pipeline_local.py --year 2025 --month 01or in debug mode
./scripts/run_pipeline_local.py --year 2025 --month 01 --debugBehavior:
- runs bronze, silver, and gold jobs sequentially
- stops immediately when any job exits with a non-zero status
- logs each job start, command, and completion timestamp clearly
- overrides
fleetops.dataset.yearandfleetops.dataset.monthat runtime for all non-bronze jobs
By default the bronze input path is resolved as:
data/raw/<year>/yellow_tripdata_<year>-<month>.parquet
Example:
data/raw/2025/yellow_tripdata_2025-01.parquet
If the raw parquet file lives elsewhere, pass it explicitly:
./scripts/run_pipeline_local.py \
--year 2025 \
--month 01 \
--input-path /absolute/path/to/yellow_tripdata_2025-01.parquetsbt "runMain com.fleetops.analytics.job.bronze.BronzeTripIngestionJob --input-path data/raw/2025/yellow_tripdata_2025-01.parquet --year 2025 --month 1"
sbt "runMain com.fleetops.analytics.job.silver.SilverTripNormalizationJob"
sbt "runMain com.fleetops.analytics.job.silver.SilverTripEnrichmentJob"
sbt "runMain com.fleetops.analytics.job.gold.DailyZoneMetricsJob"
sbt "runMain com.fleetops.analytics.job.gold.HourlyHotspotMetricsJob"
sbt "runMain com.fleetops.analytics.job.gold.TripAnomalyDetectionJob"
sbt "runMain com.fleetops.analytics.job.gold.DataQualityReportJob"Prerequisites:
- JDK 17+
- SBT 1.10+
Commands:
sbt compile
sbt test