A comprehensive data engineering project demonstrating both batch ETL processing (Bronze-Silver-Gold architecture) and real-time Change Data Capture (CDC) using Apache Spark, Kafka, Debezium, and MinIO.
- Overview
- Architecture
- Technologies
- Prerequisites
- Project Structure
- Setup & Installation
- Batch ETL Pipeline
- Real-Time CDC Pipeline
- Monitoring & Verification
- Data Flow
- Troubleshooting
- Performance Metrics
This project implements a production-ready data pipeline with two main components:
- Batch ETL Processing: Medallion architecture (Bronze β Silver β Gold) for data warehousing
- Real-Time CDC: Event-driven processing with zero-duplicate guarantee using atomic locks
- β Batch ETL: Incremental data ingestion from MySQL to MinIO with Spark
- β Star Schema: Dimensional modeling with fact and dimension tables
- β Real-Time CDC: MySQL change capture with Debezium and Kafka
- β Zero Duplicates: Redis atomic locks for idempotent processing
- β Scalable: Multi-worker architecture with parallel processing
- β Monitoring: Comprehensive logging and metrics collection
- E-commerce Order Processing: Track orders, order details, customers, and payment methods
- Data Warehousing: Historical data analysis with star schema
- Real-Time Analytics: Immediate insights on order completion events
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA SOURCES β
β MySQL Database β
β (orders, order_details, customers, payment_method) β
ββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ¬ββββββββββββββββββββ
β β
β Batch ETL β Real-Time CDC
β (Spark JDBC) β (Debezium)
βΌ βΌ
ββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββββ
β BATCH ETL PIPELINE β β REAL-TIME CDC PIPELINE β
β ββββββββββββββββββββ β β ββββββββββββββββββββββ β
β 1. Bronze Layer (Raw) β β 1. Debezium Connector β
β ββ MySQL β MinIO β β ββ Binlog capture β
β ββ Partitioned data β β ββ Kafka topics β
β ββ Full/Incremental β β β
β β β 2. Kafka Streams β
β 2. Silver Layer (Cleaned) β β ββ mysql.project_db.orders β
β ββ Deduplication β β ββ mysql.project_db. β
β ββ Data quality β β order_details β
β ββ Window functions β β β
β β β 3. Python Consumers β
β 3. Gold Layer (Curated) β β ββ order.py (2 workers) β
β ββ Star schema β β β ββ Cache to Redis β
β ββ Dim tables β β ββ order_details.py β
β ββ Fact tables β β (3 workers) β
β β β ββ Atomic locks β
ββββββββββββββ¬ββββββββββββββββ ββββββββββββββββ¬ββββββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββββ
β MinIO Object Storage β β Redis Cache + Locks β
β ββ bronze/ β β ββ db=0: Static lookups β
β ββ silver/ β β ββ db=1: Order cache (TTL) β
β ββ gold/ β β ββ db=2: Atomic locks β
β ββ dimensions/ β ββββββββββββββββββββββββββββββββββββ
β ββ facts/ β β
βββββββββββββββββββββββββββββββ βΌ
ββββββββββββββββββββββββββββββββββββ
β Kafka Topic: order_final β
β (Completed orders) β
ββββββββββββββββββββββββββββββββββββ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β BRONZE ββββββΆβ SILVER ββββββΆβ GOLD β
β Raw Data β β Cleaned β β Star Schema β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β β
β β β
MySQL Dedup + Dimensions:
JDBC Quality - dim_customer
Full/ Checks - dim_payment_method
Incr - dim_product
- dim_date
Facts:
- fact_orders
- fact_order_products
MySQL Binlog
β
βΌ
Debezium Connector
β
ββ mysql.project_db.orders ββββββββββββββ
β β
ββ mysql.project_db.order_details β
β β
β βΌ
β order.py (2 workers)
β β
β βΌ
β Redis Cache (db=1)
β {order_id: {customer_id,
β payment_method_id,
β num_products}}
βΌ β
order_details.py β
(3 workers) β
β β
ββ Read cache βββββββββββββ
β
ββ Verify completion
β
ββ Atomic lock check
β (Redis SET NX)
β
ββ Produce to order_final
(if complete & not locked)
| Component | Technology | Version | Purpose |
|---|---|---|---|
| Data Source | MySQL | 8.0 | Transactional database |
| Batch Processing | Apache Spark | 3.5.0 | Distributed data processing |
| Stream Processing | Apache Kafka | 7.5.0 | Event streaming platform |
| CDC | Debezium | 2.5 | Change Data Capture |
| Object Storage | MinIO | Latest | S3-compatible storage |
| Cache & Locks | Redis | Latest | In-memory data store |
| Orchestration | Docker Compose | - | Container orchestration |
| Language | Python | 3.x | Consumer applications |
pyspark==3.5.0
kafka-python==2.0.2
redis==5.0.0
mysql-connector-python==8.0.33
- Zookeeper: Kafka coordination
- Kafka Connect: Debezium connector hosting
- Spark Master & Workers: Distributed processing
- Airflow (Optional): Workflow orchestration
- OS: Windows 10/11, Linux, or macOS
- RAM: Minimum 8GB (16GB recommended)
- CPU: 4 cores minimum (8 cores recommended)
- Disk: 20GB free space
- Docker: Version 20.10+
- Docker Compose: Version 2.0+
-
Docker Desktop
# Download from https://www.docker.com/products/docker-desktop # Verify installation docker --version docker-compose --version
-
Python 3.x
# Verify installation python --version pip --version -
Git (optional, for cloning)
git --version
Project13-12/
βββ docker-compose.yml # Container orchestration
βββ requirements.txt # Python dependencies
βββ README.md # This file
βββ QUICKSTART.md # Quick start guide
β
βββ init-scripts/ # Database initialization
β βββ 01_init.sql # MySQL schema and seed data
β βββ run.txt # Complete command reference
β
βββ data/ # CSV data files
β βββ customers.csv # Customer master data
β βββ payment_method.csv # Payment method lookup
β
βββ scripts/
β βββ batch/ # Batch ETL scripts
β β βββ bronze_layer.py # Raw data ingestion (MySQL β MinIO)
β β βββ silver_layer.py # Data cleaning & deduplication
β β βββ gold_layer.py # Star schema creation
β β
β βββ real-time/ # Real-time CDC scripts
β β βββ kafka_handler.py # Kafka producer/consumer wrapper
β β βββ order.py # Order info caching (2 workers)
β β βββ order_details.py # Order completion processing (3 workers)
β β βββ mysql-src-connector.json # Debezium connector config
β β βββ QUICKSTART.md # Real-time quick start
β β
β βββ database/ # Database utilities
β βββ generate_orders.py # Generate sample orders
β βββ load_file.py # Load CSV to MySQL
β
βββ logs/ # Application logs
β βββ bronze_layer.log
β βββ silver_layer.log
β βββ gold_layer.log
β βββ real-time.log
β
βββ jars/ # Java dependencies
βββ debezium/ # Debezium connector JARs
# If cloning from repository
git clone <repository-url>
cd Project13-12
# Or create manually
mkdir Project13-12
cd Project13-12# Start all services
docker-compose up -d
# Verify all containers are running
docker ps
# Expected containers:
# - mysql-db
# - kafka-broker
# - zookeeper
# - kafka-connect
# - spark-master
# - spark-worker-1
# - spark-worker-2
# - minio-storage
# - redis-cache# Load initial schema and data
docker exec -i mysql-db mysql -uadmin -padmin project_db < init-scripts/01_init.sql
# Verify tables created
docker exec mysql-db mysql -uadmin -padmin project_db -e "SHOW TABLES;"
# Expected output:
# +----------------------+
# | Tables_in_project_db |
# +----------------------+
# | customers |
# | order_details |
# | orders |
# | payment_method |
# +----------------------+# Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt# Required for Debezium CDC
docker exec mysql-db mysql -uroot -prootpassword -e "GRANT RELOAD, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'admin'@'%'; FLUSH PRIVILEGES;"
# Verify privileges
docker exec mysql-db mysql -uroot -prootpassword -e "SHOW GRANTS FOR 'admin'@'%';"The batch ETL pipeline implements a Medallion Architecture with three layers:
- Bronze Layer: Raw data ingestion from MySQL to MinIO
- Silver Layer: Data cleaning, deduplication, and quality checks
- Gold Layer: Star schema with dimension and fact tables
# Create directory
docker exec -u root spark-master mkdir -p /opt/spark-apps
# Copy ETL scripts
docker cp scripts/batch/bronze_layer.py spark-master:/opt/spark-apps/
docker cp scripts/batch/silver_layer.py spark-master:/opt/spark-apps/
docker cp scripts/batch/gold_layer.py spark-master:/opt/spark-apps/
# Create logs directory
docker exec -u root spark-master mkdir -p /opt/logs
docker exec -u root spark-master chown -R spark:spark /opt/logs# Ingest raw data from MySQL to MinIO (s3a://bronze/)
docker exec spark-master /opt/spark/bin/spark-submit \
--master spark://spark-master:7077 \
--packages org.apache.hadoop:hadoop-aws:3.3.4,mysql:mysql-connector-java:8.0.33 \
/opt/spark-apps/bronze_layer.pyOutput:
s3a://bronze/orders/- 25,000 recordss3a://bronze/order_details/- 47,089 recordss3a://bronze/customers/- 1,000,000 recordss3a://bronze/payment_method/- 12 records
Features:
- Partitioning by year/month/day
- Full load on first run
- Incremental load on subsequent runs (based on timestamp)
# Clean and deduplicate data (Bronze β Silver)
docker exec spark-master /opt/spark/bin/spark-submit \
--master spark://spark-master:7077 \
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
/opt/spark-apps/silver_layer.pyOutput:
s3a://silver/orders/- Deduplicated orderss3a://silver/order_details/- Deduplicated order detailss3a://silver/customers/- Cleaned customer datas3a://silver/payment_method/- Validated payment methods
Operations:
- Window functions for deduplication
- Data quality validation
- Null handling
- Type conversions
# Create star schema (Silver β Gold)
docker exec spark-master /opt/spark/bin/spark-submit \
--master spark://spark-master:7077 \
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
/opt/spark-apps/gold_layer.pyOutput - Dimension Tables:
s3a://gold/dimensions/dim_customer/- Customer dimensions3a://gold/dimensions/dim_payment_method/- Payment method dimensions3a://gold/dimensions/dim_product/- Product dimensions3a://gold/dimensions/dim_date/- Date dimension
Output - Fact Tables:
s3a://gold/facts/fact_orders/- Order facts (partitioned by order_date_id)s3a://gold/facts/fact_order_products/- Order-product relationships
Star Schema Benefits:
- Optimized for analytical queries
- Denormalized for performance
- Partitioned for efficient scans
- SCD Type 1 dimensions
The real-time CDC pipeline captures MySQL changes using Debezium and processes them with Python consumers to achieve:
- Zero-duplicate guarantee using Redis atomic locks
- Exactly-once semantics for order completion events
- Multi-worker parallel processing for high throughput
- Debezium MySQL Connector: Captures binlog changes
- Kafka Topics: Streams CDC events
- order.py: Caches order metadata to Redis (2 workers)
- order_details.py: Processes order details and triggers completion (3 workers)
- Redis: Provides cache and atomic locks
# Check existing connectors
curl http://localhost:8083/connectors
# Create MySQL CDC connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql-db",
"database.port": "3306",
"database.user": "admin",
"database.password": "admin",
"database.server.id": "184054",
"database.server.name": "mysql",
"database.include.list": "project_db",
"table.include.list": "project_db.orders,project_db.order_details,project_db.customers,project_db.payment_method",
"database.history.kafka.bootstrap.servers": "kafka-broker:9092",
"database.history.kafka.topic": "schema-changes.project_db",
"snapshot.mode": "initial",
"topic.prefix": "mysql"
}
}'
# Check connector status (wait for snapshotCompleted: true)
curl http://localhost:8083/connectors/mysql-connector/status# List all Kafka topics
docker exec kafka-broker kafka-topics --bootstrap-server localhost:9092 --list
# Expected topics:
# - mysql.project_db.orders
# - mysql.project_db.order_details
# - mysql.project_db.customers
# - mysql.project_db.payment_method
# Check message counts
docker exec kafka-broker kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic mysql.project_db.orders \
--time -1
# Expected: mysql.project_db.orders:0:25000cd scripts/real-time
python order.pyWhat it does:
- Consumes from
mysql.project_db.orderstopic - Caches order metadata to Redis db=1:
- Key:
order:{order_id} - Value:
{customer_id, payment_method_id, num_products} - TTL: 120 seconds
- Key:
- Runs 2 parallel workers
- Consumer group:
order-consumer-group
Log output:
2026-01-10 22:24:49 - INFO - order - [ORDER_CACHED] fff89e2e-2ed8-41d1-a9ac-958b4e98e936 - customer: 999034, products: 4
cd scripts/real-time
python order_details.pyWhat it does:
- Consumes from
mysql.project_db.order_detailstopic - Retrieves cached order info from Redis
- Verifies order completion (all products received)
- Uses Redis atomic locks to prevent duplicates:
- Lock key:
order_trigger_lock:{order_id} - TTL: 10 seconds
- Mechanism:
SET NX(set if not exists)
- Lock key:
- Produces completed orders to
order_finaltopic - Runs 3 parallel workers
- Consumer group:
order-details-consumer-group
Log output:
2026-01-10 22:30:15 - INFO - order_details - [ORDER_TRIGGERED] abc123... - Products: 3, Customer: 12345
# Check order_final topic created
docker exec kafka-broker kafka-topics --bootstrap-server localhost:9092 --list | findstr order_final
# Check message count
docker exec kafka-broker kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic order_final \
--time -1
# Consume messages
docker exec kafka-broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic order_final \
--from-beginning \
--max-messages 10
# Count total messages (PowerShell)
docker exec kafka-broker kafka-console-consumer `
--bootstrap-server localhost:9092 `
--topic order_final `
--from-beginning `
--timeout-ms 5000 | Measure-Object -Line
# Expected: ~25,000 completed ordersInsert a new order and verify it's processed in real-time:
# 1. Insert new order
docker exec mysql-db mysql -uadmin -padmin project_db -e "
INSERT INTO orders (id, customer_id, store_id, payment_method_id, num_products, timestamp)
VALUES (UUID(), 99999, 1, 1, 2, NOW());
"
# 2. Get the order_id
docker exec mysql-db mysql -uadmin -padmin project_db -e "
SELECT id, customer_id, num_products FROM orders
WHERE customer_id = 99999 ORDER BY timestamp DESC LIMIT 1;
"
# 3. Insert order_details (replace YOUR_ORDER_ID)
docker exec mysql-db mysql -uadmin -padmin project_db -e "
INSERT INTO order_details (order_id, product_id, quantity)
VALUES ('YOUR_ORDER_ID', 1001, 1), ('YOUR_ORDER_ID', 1002, 1);
"
# 4. Verify in order_final topic (should appear within 1-2 seconds)
docker exec kafka-broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic order_final \
--from-beginning \
--max-messages 1000 | findstr YOUR_ORDER_ID# Bronze layer logs
docker exec spark-master tail -n 50 /opt/logs/bronze_layer.log
# Silver layer logs
docker exec spark-master tail -n 50 /opt/logs/silver_layer.log
# Gold layer logs
docker exec spark-master tail -n 50 /opt/logs/gold_layer.log| Service | URL | Credentials |
|---|---|---|
| Spark Master | http://localhost:8080 | - |
| Spark Worker 1 | http://localhost:8081 | - |
| Spark Worker 2 | http://localhost:8082 | - |
| MinIO Console | http://localhost:9000 | admin / admin123 |
# List buckets
docker exec minio-storage ls -lh /data/
# Check Bronze data
docker exec minio-storage ls -lh /data/bronze/orders/
# Check Silver data
docker exec minio-storage ls -lh /data/silver/orders/
# Check Gold dimensions
docker exec minio-storage ls -lh /data/gold/dimensions/
# Check Gold facts
docker exec minio-storage ls -lh /data/gold/facts/# Connector status
curl http://localhost:8083/connectors/mysql-connector/status
# Expected output:
# {
# "name": "mysql-connector",
# "connector": {
# "state": "RUNNING",
# "worker_id": "kafka-connect:8083"
# },
# "tasks": [
# {
# "id": 0,
# "state": "RUNNING",
# "worker_id": "kafka-connect:8083"
# }
# ],
# "type": "source"
# }
# Connector configuration
curl http://localhost:8083/connectors/mysql-connector
# Delete connector (if needed)
curl -X DELETE http://localhost:8083/connectors/mysql-connector# order.py consumer group
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-consumer-group
# order_details.py consumer group
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-details-consumer-group
# Expected: LAG = 0 when all messages are consumed# Count cached orders in Redis db=1
docker exec redis-cache redis-cli -n 1 KEYS "order:*" | wc -l
# View specific cached order
docker exec redis-cache redis-cli -n 1 GET "order:YOUR_ORDER_ID"
# Check atomic locks in Redis db=2
docker exec redis-cache redis-cli -n 2 KEYS "order_trigger_lock:*"
# Expected: Empty or locks with short TTL (~10 seconds)# List all topics
docker exec kafka-broker kafka-topics --bootstrap-server localhost:9092 --list
# Describe topic details
docker exec kafka-broker kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic order_final
# Check topic message count
docker exec kafka-broker kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic order_final \
--time -1# Real-time processing logs
tail -f logs/real-time.log
# Look for:
# - [ORDER_CACHED] - order.py cached order
# - [ORDER_TRIGGERED] - order_details.py triggered completion
# - [DUPLICATE_PREVENTED] - atomic lock prevented duplicate1. MySQL β Bronze Layer
ββ JDBC connection to MySQL
ββ Read orders, order_details, customers, payment_method
ββ Add partitioning columns (year, month, day)
ββ Write to MinIO: s3a://bronze/{table}/year={y}/month={m}/day={d}/
2. Bronze β Silver Layer
ββ Read from s3a://bronze/
ββ Deduplicate using window functions
ββ Data quality checks (null handling, type validation)
ββ Write to MinIO: s3a://silver/{table}/
3. Silver β Gold Layer
ββ Read from s3a://silver/
ββ Create dimension tables (SCD Type 1)
β ββ dim_customer (customer_id as SK)
β ββ dim_payment_method (payment_method_id as SK)
β ββ dim_product (product_id as SK)
β ββ dim_date (date_id as SK in YYYYMMDD format)
ββ Create fact tables with foreign keys
β ββ fact_orders (order_id, customer_id, payment_method_id, order_date_id)
β ββ fact_order_products (order_id, product_id, quantity)
ββ Write to MinIO: s3a://gold/{dimensions,facts}/
1. MySQL Binlog Changes
ββ INSERT/UPDATE/DELETE operations
ββ Captured by Debezium connector
2. Debezium β Kafka
ββ mysql.project_db.orders β Order insert/update events
ββ mysql.project_db.order_details β Order detail insert events
3. Kafka β order.py (2 workers)
ββ Consume from mysql.project_db.orders
ββ Parse CDC event (after.id, after.customer_id, after.num_products)
ββ Cache to Redis db=1:
Key: order:{order_id}
Value: {customer_id, payment_method_id, num_products}
TTL: 120 seconds
4. Kafka β order_details.py (3 workers)
ββ Consume from mysql.project_db.order_details
ββ Parse CDC event (after.order_id, after.product_id)
ββ Lookup cached order from Redis db=1
ββ Check order completion:
β - Count received products
β - Compare with expected num_products
ββ If complete:
β ββ Try atomic lock: SET NX order_trigger_lock:{order_id} with 10s TTL
β ββ If lock acquired:
β β ββ Produce to order_final topic
β β ββ Log [ORDER_TRIGGERED]
β ββ If lock exists:
β ββ Log [DUPLICATE_PREVENTED] (another worker already triggered)
ββ If incomplete:
ββ Log [PRODUCTS_INCOMPLETE]
5. order_final Topic
ββ Contains only completed orders
ββ Guaranteed zero duplicates
ββ Ready for downstream processing (analytics, notifications, etc.)
Symptoms: Services fail health checks or exit immediately
Solutions:
# Check container logs
docker logs mysql-db
docker logs kafka-broker
docker logs spark-master
# Verify ports are not in use
netstat -an | findstr "3306 9092 8080 9000"
# Restart specific service
docker-compose restart mysql-db
# Restart all services
docker-compose down
docker-compose up -dSymptoms: java.lang.OutOfMemoryError
Solutions:
# Increase executor memory in Python scripts
spark = SparkSession.builder \
.config("spark.executor.memory", "4g") \ # Increase from 2g to 4g
.config("spark.driver.memory", "4g") \ # Increase driver memory
.getOrCreate()Or in spark-submit:
docker exec spark-master /opt/spark/bin/spark-submit \
--executor-memory 4g \
--driver-memory 4g \
--master spark://spark-master:7077 \
/opt/spark-apps/bronze_layer.pySymptoms: Connector status shows FAILED
Check logs:
docker logs kafka-connectCommon causes and solutions:
a) Missing MySQL privileges:
docker exec mysql-db mysql -uroot -prootpassword -e "
GRANT RELOAD, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'admin'@'%';
FLUSH PRIVILEGES;"b) Connector already exists:
# Delete and recreate
curl -X DELETE http://localhost:8083/connectors/mysql-connector
# Then create again with POSTc) Kafka Connect not ready:
# Wait for Kafka Connect to start
curl http://localhost:8083/
# Should return: {"version":"7.5.0","commit":"..."}Symptoms: order_final topic is empty or has fewer messages than expected
Check order.py is running:
# order.py must run before order_details.py
# Verify logs show [ORDER_CACHED] messagesCheck Redis cache:
# Verify orders are cached
docker exec redis-cache redis-cli -n 1 KEYS "order:*" | wc -l
# Should show cached orders (may be 0 if all TTL expired)Check consumer lag:
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-details-consumer-groupCheck atomic locks:
# If all orders already triggered, locks should be expired
docker exec redis-cache redis-cli -n 2 KEYS "order_trigger_lock:*"Symptoms: Same order_id appears multiple times in order_final topic
This should NOT happen due to atomic locks. If it does:
Verify atomic lock implementation:
# In order_details.py, check this code:
lock_key = f"order_trigger_lock:{order_id}"
if not redis_locks.set(lock_key, "1", nx=True, ex=10):
logger.warning(f"[DUPLICATE_PREVENTED] {order_id} - Lock exists")
returnCheck Redis is accessible:
docker exec redis-cache redis-cli ping
# Should return: PONGSymptoms: AccessDeniedException or 403 Forbidden when accessing MinIO
Verify credentials:
# In Spark scripts, check:
.config("spark.hadoop.fs.s3a.access.key", "admin")
.config("spark.hadoop.fs.s3a.secret.key", "admin123")Create buckets manually:
# Access MinIO console: http://localhost:9000
# Login: admin / admin123
# Create buckets: bronze, silver, goldSymptoms: Consumer starts but no messages are processed
Check topic exists:
docker exec kafka-broker kafka-topics --bootstrap-server localhost:9092 --listCheck message count:
docker exec kafka-broker kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic mysql.project_db.orders \
--time -1Check consumer group:
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group order-consumer-groupReset consumer offset (if needed):
# WARNING: This will reprocess all messages
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group order-consumer-group \
--reset-offsets \
--to-earliest \
--topic mysql.project_db.orders \
--execute