A Realtime Search Engine pipeline demonstrating how to sync a transactional Product Catalog (PostgreSQL) with a Search Engine (Elasticsearch) using Kafka and Debezium.
graph TD
User([User / Python App])
subgraph "Real-time Analytics Loop"
K_Search[Kafka: search.queries]
KSQL[KSQLDB Aggregation]
K_Trend[Kafka: trending_keywords]
Connect[Kafka Connect]
ES_Trend[(Elastic: trending)]
end
subgraph "Core Data Pipeline"
Postgres[(Postgres: products)]
ES_Prod[(Elastic: products)]
end
%% 1. Search Action
User -- "1. Log Search" --> K_Search
User -- "2. Get Results" --> ES_Prod
%% 2. Processing
K_Search -- "Stream" --> KSQL
KSQL -- "3. Count (Windowed)" --> K_Trend
K_Trend --> Connect --> ES_Trend
%% 3. Feedback
ES_Trend -.-> |"4. Fetch Top 5"| User
%% Context: Core Pipeline
Postgres -.-> Connect -.-> ES_Prod
This section illustrates the difference between the traditional "Dual Write" anti-pattern and the robust "Change Data Capture (CDC)" pattern implemented in this system.
In a traditional setup without Kafka Connect, the application is responsible for writing to both the Database and the Search Engine simultaneously.
The Problem: If the App crashes or the Network fails after step 1 but before step 2, your Database has the product, but your Search Engine does not. Data is permanently inconsistent.
flowchart TD
Client((User)) -->|"Update Product"| App[Python Application]
subgraph "Dual Write Zone"
App -->|"1. Write Transaction"| DB[(PostgreSQL)]
App -.->|"2. Index Request (Might Fail!)"| ES[(Elasticsearch)]
end
style App fill:#f9f,stroke:#333
style DB fill:#bbf,stroke:#333
style ES fill:#fdd,stroke:#333
This project utilizes an Event-Driven approach to decouple writes from reads.
The Solution: The Application only writes to the Database (Single Source of Truth). The Pipeline handles synchronization.
- Resiliency: If the Sync fails, Kafka retries indefinitely.
- Guaranteed Consistency: Data in PostgreSQL is guaranteed to eventually propagate to Elasticsearch.
flowchart TD
Client((User)) -->|"Update Product"| App[Python Application]
%% Write Path
App -->|"1. Commit Transaction"| DB[(PostgreSQL)]
%% CDC Path
subgraph "CDC Pipeline (Connect)"
DB -.->|"2. WAL Log"| Debezium[Debezium Source]
Debezium -->|"3. Change Event"| Kafka{Kafka Topic}
Kafka -->|"4. Sink Task"| Sink[Elastic Sink Connector]
Sink -->|"5. Upsert Document"| ES[(Elasticsearch)]
end
style App fill:#f9f,stroke:#333
style DB fill:#bbf,stroke:#333
style Kafka fill:#ffa,stroke:#333
style Debezium fill:#dfd,stroke:#333
style Sink fill:#dfd,stroke:#333
- Docker & Docker Compose
jq(optional, for pretty printing JSON)
Start all services (Kafka, Postgres, Elastic, Connect, etc.):
docker-compose up -dWait for about 60 seconds for all services to be healthy.
Run the setup script to link Postgres to Kafka to Elastic:
./scripts/setup-connectors.shInsert sample random products into Postgres:
chmod +x data/populate-products.sh
./data/populate-products.shYou can now search for the new products in Elasticsearch.
Check via Curl:
curl "localhost:9200/dbserver1.public.products/_search?q=category:Electronics&pretty"Check via Kibana:
- Open http://localhost:5601.
- Go to Dev Tools.
- Run query:
GET /dbserver1.public.products/_search { "query": { "match": { "name": "Pro" } } }
This project now includes a Real-time Search Analytics feedback loop.
-
Start KSQL Analytics:
./scripts/setup-analytics.sh
(Configures the stream processing to calculate trending keywords)
-
Run the Search App: (Requires Python 3.8+)
cd client pip install -r requirements.txt python3 app.py -
Test It:
- In the app, search for "laptop" multiple times.
- Watch the "🔥 Trending" section update in real-time!
- Source:
productstable in Postgres. - Topic:
dbserver1.public.products(created automatically by Debezium). - Index:
dbserver1.public.productsin Elasticsearch.
- Check Connectors:
curl localhost:8083/connectors/source-postgres-products/status - View Kafka Topic:
docker exec kcat kcat -b broker:29092 -C -t dbserver1.public.products -o end