Skip to content

feat: Implement & harden Kafka event bus architecture#36

Merged
aniebietafia merged 3 commits intomainfrom
message-broker
Mar 14, 2026
Merged

feat: Implement & harden Kafka event bus architecture#36
aniebietafia merged 3 commits intomainfrom
message-broker

Conversation

@aniebietafia
Copy link
Contributor

@aniebietafia aniebietafia commented Mar 14, 2026

  • Add app/kafka/ package — topics.py, schemas.py, producer.py, consumer.py, manager.py, exceptions.py
  • Integrate KafkaManager singleton lifecycle into FastAPI lifespan (app/main.py)
  • Extend /health endpoint with live Kafka broker connectivity check
  • Add new Kafka settings to app/core/config.py: acks, auto-offset-reset, retries, backoff
  • Inject KafkaProducer into BaseConsumer via KafkaManager.register_consumer() — eliminates circular import
  • Replace payload-mutating DLQ with a proper DLQEvent wrapper schema
  • Clean up retry logic from fragile while loop to explicit for attempt in range(...)
  • Add public ping() and is_started to KafkaProducer; health check no longer accesses private aiokafka internals
  • Declare _initialized: bool at class level in KafkaManager for Mypy compliance
  • Add tests/test_kafka/ — schema serialization, producer send/ping, consumer retry & DLQ routing

Summary by CodeRabbit

  • New Features

    • Kafka event streaming with producer/consumer, retries and DLQ for failed messages
    • Event-driven email notifications and media upload workflows via Kafka topics
    • Mailgun configuration options
    • Kafka health integrated into the app health endpoint
  • Chores

    • Added a new local docker-compose stack with Kafka and Kafka UI (replaces previous infra compose)

- Add app/kafka/ package — topics.py, schemas.py, producer.py, consumer.py, manager.py, exceptions.py
- Integrate KafkaManager singleton lifecycle into FastAPI lifespan (app/main.py)
- Extend /health endpoint with live Kafka broker connectivity check
- Add new Kafka settings to app/core/config.py: acks, auto-offset-reset, retries, backoff
- Inject KafkaProducer into BaseConsumer via KafkaManager.register_consumer() — eliminates circular import
- Replace payload-mutating DLQ with a proper DLQEvent wrapper schema
- Clean up retry logic from fragile while loop to explicit for attempt in range(...)
- Add public ping() and is_started to KafkaProducer; health check no longer accesses private aiokafka internals
- Declare _initialized: bool at class level in KafkaManager for Mypy compliance
- Add tests/test_kafka/ — schema serialization, producer send/ping, consumer retry & DLQ routing

Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
@coderabbitai
Copy link

coderabbitai bot commented Mar 14, 2026

Warning

Rate limit exceeded

@aniebietafia has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 12 minutes and 9 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5d1445f8-fa84-4a15-8204-3eb19351a875

📥 Commits

Reviewing files that changed from the base of the PR and between e90ac9b and f573d42.

📒 Files selected for processing (3)
  • app/kafka/consumer.py
  • app/kafka/producer.py
  • mypy_output.txt
📝 Walkthrough

Walkthrough

Adds a Kafka subsystem: producer, consumer base with retries and DLQ, schemas, exceptions, topic constants, a singleton manager, config settings for Kafka and Mailgun, FastAPI lifespan integration and health checks, docker-compose for Kafka stack, and new tests for Kafka components and health endpoints.

Changes

Cohort / File(s) Summary
Configuration
app/core/config.py
Added Kafka settings (KAFKA_PRODUCER_ACK, KAFKA_CONSUMER_AUTO_OFFSET_RESET, KAFKA_MAX_RETRIES, KAFKA_RETRY_BACKOFF_MS) and Mailgun fields (MAILGUN_API_KEY, MAILGUN_DOMAIN, MAILGUN_FROM_EMAIL).
Kafka package & constants
app/kafka/__init__.py, app/kafka/topics.py
Introduced kafka package exports and centralized topic name constants (notifications, media, audio/text topics, DLQ prefix).
Schemas & Errors
app/kafka/schemas.py, app/kafka/exceptions.py
Added generic BaseEvent and concrete events (EmailEvent, MediaUploadEvent), DLQEvent model; introduced Kafka-specific exceptions (KafkaError, KafkaConnectionError, KafkaPublishError, KafkaConsumeError).
Producer
app/kafka/producer.py
New KafkaProducer wrapper around AIOKafkaProducer: lifecycle (start/stop), ping/metadata, send(event) with serialization and error wrapping.
Consumer base
app/kafka/consumer.py
New abstract BaseConsumer with consume loop, deserialization, retry/backoff (config-driven), commit-on-success, DLQ forwarding on final failure, and abstract handle method.
Manager
app/kafka/manager.py
Added singleton KafkaManager to create/manage producer, register/start/stop consumers, and expose health_check(); get_kafka_manager() accessor.
FastAPI integration
app/main.py
Added lifespan context to start/stop KafkaManager on app startup/shutdown and extended /health to include services.kafka status (overall status may be degraded).
Docker compose
docker-compose.yml, (removed) infra/docker-compose.yml
Added root docker-compose.yml for redis/kafka/kafka-ui; removed legacy infra/docker-compose.yml (Postgres/Zookeeper/Kafka).
Tests
tests/test_kafka/*, tests/test_main.py, tests/test_error_handling.py
Added tests for producer lifecycle/serialization, consumer retry & DLQ behavior, schema validation; loosened health-check assertions to accept degraded/ok and include services key.

Sequence Diagram(s)

sequenceDiagram
    participant App as FastAPI
    participant Manager as KafkaManager
    participant Producer as KafkaProducer
    participant Broker as Kafka Broker
    participant Consumer as BaseConsumer
    participant DLQ as DLQ Topic

    App->>Manager: startup (lifespan)
    Manager->>Producer: start()
    Producer->>Broker: connect & fetch metadata
    Manager->>Consumer: start(bootstrap_servers)
    Consumer->>Broker: subscribe & begin consuming

    App->>Producer: send(topic, event)
    Producer->>Broker: publish serialized event
    Broker-->>Consumer: deliver message

    Consumer->>Consumer: _process_with_retry(event)
    Consumer->>Consumer: handle(event) attempt 1

    alt success within retries
        Consumer->>Broker: commit offset
    else exhausted retries
        Consumer->>DLQ: publish DLQEvent (dlq.{topic})
        Consumer->>Broker: commit offset
    end

    App->>Manager: shutdown (lifespan)
    Manager->>Consumer: stop()
    Manager->>Producer: stop()
Loading
sequenceDiagram
    participant Client as /health endpoint
    participant Manager as KafkaManager
    participant Producer as KafkaProducer
    participant Broker as Kafka Broker

    Client->>Manager: health_check()
    Manager->>Producer: ping()
    Producer->>Broker: force_metadata_update()

    alt Broker reachable
        Broker-->>Producer: success
        Producer-->>Manager: healthy
    else Broker unreachable
        Broker-->>Producer: error/timeout
        Producer-->>Manager: unhealthy (error)
    end

    Manager-->>Client: {status: "ok"|"degraded", services: {kafka: {...}}, version: ...}
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 I hopped through queues both near and far,

Marshaled events beneath the evening star,
Retries like thumps, a DLQ for the lost,
Producer, consumer — I tended the cost.
Hooray, the streams now hum — what a delightful job!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.54% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: implementing a Kafka event bus architecture with hardening measures (retry logic, DLQ, health checks).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch message-broker
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +45 to +51
return {
"status": "ok" if kafka_health["status"] == "healthy" else "degraded",
"version": settings.VERSION,
"services": {
"kafka": kafka_health,
},
}

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 4 days ago

In general, the fix is to avoid returning raw exception messages (or anything derived directly from them) to the client. Instead, log the detailed error on the server and return a generic, non-sensitive description in the API response. For this case, that means changing KafkaManager.health_check so that the returned dictionary contains a generic "error" or "details" field, not str(e).

The best minimal fix without changing the external API shape of /health is:

  • Keep logging e with logger.error(...) so operators retain diagnostic information.
  • Replace the "error": str(e) value in KafkaManager.health_check with a constant, generic message such as "Kafka health check failed" that does not include exception details.
  • Optionally, we can keep an additional non-sensitive hint like "details": "Unable to reach Kafka broker" if useful; the key point is to remove direct use of the exception object in the returned payload.

Only app/kafka/manager.py needs modification: in the except Exception as e: block of health_check, adjust the returned dict. No new imports or helper methods are required.

Suggested changeset 1
app/kafka/manager.py
Outside changed files

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/app/kafka/manager.py b/app/kafka/manager.py
--- a/app/kafka/manager.py
+++ b/app/kafka/manager.py
@@ -83,8 +83,12 @@
             await self.producer.ping()
             return {"status": "healthy"}
         except Exception as e:
-            logger.error(f"Kafka health check failed: {e}")
-            return {"status": "unhealthy", "error": str(e)}
+            # Log full details server-side, but return only a generic message to clients
+            logger.error("Kafka health check failed", exc_info=e)
+            return {
+                "status": "unhealthy",
+                "error": "Kafka health check failed. Please contact the system administrator.",
+            }
 
 
 def get_kafka_manager() -> KafkaManager:
EOF
@@ -83,8 +83,12 @@
await self.producer.ping()
return {"status": "healthy"}
except Exception as e:
logger.error(f"Kafka health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
# Log full details server-side, but return only a generic message to clients
logger.error("Kafka health check failed", exc_info=e)
return {
"status": "unhealthy",
"error": "Kafka health check failed. Please contact the system administrator.",
}


def get_kafka_manager() -> KafkaManager:
Copilot is powered by AI and may make mistakes. Always verify output.
f"Unrecoverable error on message from '{self.topic}'. "
f"Skipping commit — offset will be re-delivered on restart."
)
except asyncio.CancelledError:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 4 days ago

General approach: avoid a bare pass in the except asyncio.CancelledError block by either logging the cancellation or documenting it with a comment and possibly re-raising. We want to preserve existing shutdown behavior, where cancellation during stop() is treated as normal and does not log as an error.

Best specific fix: on line 112–113, replace the empty handler:

        except asyncio.CancelledError:
            pass

with a handler that records that the consumer loop was cancelled in a non-noisy way, e.g.:

        except asyncio.CancelledError:
            logger.info(f"Consumer loop for '{self.topic}' was cancelled")
            # Cancellation is expected during shutdown; no further action required.

or use logger.debug if you want less frequent logs. This preserves the semantics (we still swallow the cancellation and don’t propagate it as an error) but removes the empty except, explains the intent, and satisfies CodeQL. No new imports or helper methods are required; logger and asyncio are already available in this file.

Location details:

  • File: app/kafka/consumer.py
  • Region: the _consume_loop method, the outer try/except around the async for msg in self._consumer loop, currently lines 112–113.
Suggested changeset 1
app/kafka/consumer.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/app/kafka/consumer.py b/app/kafka/consumer.py
--- a/app/kafka/consumer.py
+++ b/app/kafka/consumer.py
@@ -110,7 +110,8 @@
                         f"Skipping commit — offset will be re-delivered on restart."
                     )
         except asyncio.CancelledError:
-            pass
+            logger.info(f"Consumer loop for '{self.topic}' was cancelled")
+            # Cancellation is expected during graceful shutdown; no further action required.
         except Exception:
             logger.exception(f"Unexpected error in consumer loop for '{self.topic}'")
 
EOF
@@ -110,7 +110,8 @@
f"Skipping commit — offset will be re-delivered on restart."
)
except asyncio.CancelledError:
pass
logger.info(f"Consumer loop for '{self.topic}' was cancelled")
# Cancellation is expected during graceful shutdown; no further action required.
except Exception:
logger.exception(f"Unexpected error in consumer loop for '{self.topic}'")

Copilot is powered by AI and may make mistakes. Always verify output.
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (4)
docker-compose.yml (2)

66-69: Remove unused postgres_data volume declaration.

postgres_data is declared on Line 67 but no Postgres service mounts it. This is stale config and adds confusion during local setup.

♻️ Proposed cleanup
 volumes:
-  postgres_data:
   redis_data:
   kafka_data:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose.yml` around lines 66 - 69, Remove the stale volume declaration
postgres_data from the volumes section in docker-compose.yml since no Postgres
service mounts it; delete the postgres_data entry and verify no services (e.g.,
any compose service configs) reference postgres_data elsewhere to avoid breaking
mounts.

3-64: Document/align app container env values for Docker networking.

Given app/core/config.py and .env.example default to localhost for Redis/Kafka, a containerized app in this compose network must use REDIS_HOST=redis and KAFKA_BOOTSTRAP_SERVERS=kafka:29092; otherwise connections will fail. Please add this to compose docs or an app service/env override.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose.yml` around lines 3 - 64, The compose file currently exposes
redis and kafka services on the network but the app defaults in
app/core/config.py and .env.example point to localhost; update docker-compose to
provide container-friendly environment overrides (e.g., set REDIS_HOST=redis and
KAFKA_BOOTSTRAP_SERVERS=kafka:29092) for the application service or add clear
docs in the compose comments explaining that in-container apps must use service
names (redis, kafka:29092) instead of localhost; refer to the redis and kafka
service names in the diff and the REDIS_HOST / KAFKA_BOOTSTRAP_SERVERS env keys
to locate where to add the overrides.
tests/test_main.py (1)

6-6: Module-level TestClient triggers lifespan during import.

Instantiating TestClient(app) at module level means the FastAPI lifespan (which calls kafka_manager.start()) runs when the test module is imported. If Kafka is unavailable, this could cause import-time failures affecting other tests.

Consider using a pytest fixture with appropriate scope, or wrapping in a context manager within each test.

♻️ Proposed fix using fixture
-client = TestClient(app)
+
+@pytest.fixture(scope="module")
+def client():
+    with TestClient(app) as c:
+        yield c


-def test_health_check():
+def test_health_check(client):
     response = client.get("/health")

Don't forget to add import pytest at the top of the file.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_main.py` at line 6, The TestClient instantiation at module level
causes FastAPI lifespan (which calls kafka_manager.start()) to run on import;
change this by creating a pytest fixture that yields TestClient(app) (import
pytest at top) and use that fixture in tests or instantiate TestClient within a
context manager inside each test so the lifespan runs per-test and doesn't
trigger kafka_manager.start() during module import; update references to the
module-level client to use the new fixture name (or local context) instead.
app/kafka/schemas.py (1)

17-17: Consider using datetime.now(timezone.utc) instead of datetime.utcnow().

datetime.utcnow() is deprecated since Python 3.12 and emits warnings. The recommended replacement is datetime.now(timezone.utc) which returns a timezone-aware datetime.

♻️ Proposed fix
+from datetime import datetime, timezone
-from datetime import datetime
-    timestamp: datetime = Field(default_factory=datetime.utcnow)
+    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

Apply the same change to DLQEvent.failed_at on line 31.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/schemas.py` at line 17, Replace naive UTC construction with
timezone-aware datetimes: change the Field default for timestamp (timestamp:
datetime = Field(default_factory=datetime.utcnow)) to use
datetime.now(timezone.utc) and do the same for DLQEvent.failed_at; update
imports if necessary to include timezone from datetime and ensure both Field
default_factory call sites reference datetime.now(timezone.utc) so the produced
datetimes are timezone-aware.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/kafka/consumer.py`:
- Around line 147-176: The DLQ routine _send_to_dlq currently catches and
swallows all exceptions which allows _consume_loop to commit offsets after
_process_with_retry returns; change _send_to_dlq to propagate failures instead
of absorbing them — after logging the failure in the except block re-raise the
exception (or raise a specific DLQPublishError) so callers (_process_with_retry
and ultimately _consume_loop) can detect the DLQ publish failure and avoid
committing the original offset; ensure callers either catch and handle that
exception or let it bubble up to prevent message commit.
- Around line 59-76: The guard sets self._running = True before awaiting
self._consumer.start(), which leaves the consumer stuck if start() raises;
change the start sequence in the start method so you first construct
AIOKafkaConsumer, await self._consumer.start(), then set self._running = True
and only then create the asyncio task for self._consume_loop(); additionally, if
start() fails ensure you do not leave self._consumer or self._running in an
inconsistent state (set self._running = False and cleanup/close self._consumer
on exception) so subsequent retries can proceed.

In `@app/kafka/manager.py`:
- Around line 51-72: When starting in KafkaManager.start(), roll back partial
startup if a consumer.start(...) raises: wrap the consumer start loop so that on
exception you iterate over already-started consumers and call their stop(), then
stop self.producer, and re-raise the original error; reference start(),
consumers, producer, and settings.KAFKA_BOOTSTRAP_SERVERS. Likewise harden
KafkaManager.stop() so exceptions from one consumer.stop() do not abort the rest
— catch and log per-consumer exceptions, continue stopping remaining consumers,
and ensure producer.stop() is always attempted in a finally block (logging any
errors) so full cleanup always runs.
- Around line 82-87: The health_check() in manager.py currently returns str(e)
which can leak broker details; change it so producer.ping() exceptions are
logged with full details via logger.exception or logger.error(...,
exc_info=True) but the returned payload only contains a generic error (e.g.,
{"status":"unhealthy","error":"unavailable"} or omit "error"), so callers
(app/main.py -> services.kafka) never see sensitive broker/auth/TLS info; keep
the exception capture around await self.producer.ping(), reference health_check,
self.producer.ping, and logger when making these changes.

In `@tests/test_kafka/test_consumer.py`:
- Around line 1-2: Remove the unused asyncio import and fix the unused parameter
in the flaky_handle function: delete the top-level "asyncio" import from
tests/test_kafka/test_consumer.py (since it's unused) and update the
flaky_handle signature to either remove the unused callback parameter or rename
it to _callback (or use *_, _unused) so the linter no longer flags an unused
parameter; ensure references to flaky_handle remain consistent where it's used
in the tests.

In `@tests/test_kafka/test_schemas.py`:
- Around line 1-5: Remove the unused top-level import "pytest" from the test
module (it's currently imported alongside uuid and datetime) to satisfy linting
(ruff F401) and isort checks; locate the import line that reads "import pytest"
in the test_schemas module and delete it, and run the tests/linter to confirm no
other references to "pytest" remain.

---

Nitpick comments:
In `@app/kafka/schemas.py`:
- Line 17: Replace naive UTC construction with timezone-aware datetimes: change
the Field default for timestamp (timestamp: datetime =
Field(default_factory=datetime.utcnow)) to use datetime.now(timezone.utc) and do
the same for DLQEvent.failed_at; update imports if necessary to include timezone
from datetime and ensure both Field default_factory call sites reference
datetime.now(timezone.utc) so the produced datetimes are timezone-aware.

In `@docker-compose.yml`:
- Around line 66-69: Remove the stale volume declaration postgres_data from the
volumes section in docker-compose.yml since no Postgres service mounts it;
delete the postgres_data entry and verify no services (e.g., any compose service
configs) reference postgres_data elsewhere to avoid breaking mounts.
- Around line 3-64: The compose file currently exposes redis and kafka services
on the network but the app defaults in app/core/config.py and .env.example point
to localhost; update docker-compose to provide container-friendly environment
overrides (e.g., set REDIS_HOST=redis and KAFKA_BOOTSTRAP_SERVERS=kafka:29092)
for the application service or add clear docs in the compose comments explaining
that in-container apps must use service names (redis, kafka:29092) instead of
localhost; refer to the redis and kafka service names in the diff and the
REDIS_HOST / KAFKA_BOOTSTRAP_SERVERS env keys to locate where to add the
overrides.

In `@tests/test_main.py`:
- Line 6: The TestClient instantiation at module level causes FastAPI lifespan
(which calls kafka_manager.start()) to run on import; change this by creating a
pytest fixture that yields TestClient(app) (import pytest at top) and use that
fixture in tests or instantiate TestClient within a context manager inside each
test so the lifespan runs per-test and doesn't trigger kafka_manager.start()
during module import; update references to the module-level client to use the
new fixture name (or local context) instead.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a4d1e138-1277-44a4-a782-297910d64cc8

📥 Commits

Reviewing files that changed from the base of the PR and between cafa506 and 6922300.

📒 Files selected for processing (16)
  • app/core/config.py
  • app/kafka/__init__.py
  • app/kafka/consumer.py
  • app/kafka/exceptions.py
  • app/kafka/manager.py
  • app/kafka/producer.py
  • app/kafka/schemas.py
  • app/kafka/topics.py
  • app/main.py
  • docker-compose.yml
  • infra/docker-compose.yml
  • tests/test_error_handling.py
  • tests/test_kafka/test_consumer.py
  • tests/test_kafka/test_producer.py
  • tests/test_kafka/test_schemas.py
  • tests/test_main.py
💤 Files with no reviewable changes (1)
  • infra/docker-compose.yml

Comment on lines +59 to +76
if self._running:
return

self._running = True
self._consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=bootstrap_servers,
group_id=self.group_id,
auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
# Manual commit: offsets are committed only after handle() succeeds,
# preventing silent message loss on pod restart mid-retry.
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
await self._consumer.start()
logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started")

self._task = asyncio.create_task(self._consume_loop())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only set _running after the consumer actually starts.

If await self._consumer.start() fails, _running stays True and the guard at Line 59 turns every later retry into a no-op. That leaves the consumer wedged until the process is restarted.

Suggested fix
-        self._running = True
-        self._consumer = AIOKafkaConsumer(
+        consumer = AIOKafkaConsumer(
             self.topic,
             bootstrap_servers=bootstrap_servers,
             group_id=self.group_id,
             auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
             # Manual commit: offsets are committed only after handle() succeeds,
             # preventing silent message loss on pod restart mid-retry.
             enable_auto_commit=False,
             value_deserializer=lambda v: json.loads(v.decode("utf-8")),
         )
-        await self._consumer.start()
+        try:
+            await consumer.start()
+        except Exception:
+            with contextlib.suppress(Exception):
+                await consumer.stop()
+            raise
+        self._consumer = consumer
+        self._running = True
         logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/consumer.py` around lines 59 - 76, The guard sets self._running =
True before awaiting self._consumer.start(), which leaves the consumer stuck if
start() raises; change the start sequence in the start method so you first
construct AIOKafkaConsumer, await self._consumer.start(), then set self._running
= True and only then create the asyncio task for self._consume_loop();
additionally, if start() fails ensure you do not leave self._consumer or
self._running in an inconsistent state (set self._running = False and
cleanup/close self._consumer on exception) so subsequent retries can proceed.

Comment on lines +147 to +176
async def _send_to_dlq(
self, event: BaseEvent[Any], error_message: str, retries: int
) -> None:
"""
Forward a failed event to its Dead Letter Queue topic.
Wraps it in a DLQEvent — a proper structured schema — instead of
mutating the original event payload.
"""
dlq_topic = f"{DLQ_PREFIX}{self.topic}"
dlq_event = DLQEvent(
original_event_id=event.event_id,
original_topic=self.topic,
original_event=event.model_dump(),
error_message=error_message,
retry_count=retries,
)

try:
# Use the injected producer directly — no circular import needed
dlq_payload = dlq_event.model_dump()
await self._producer._producer.send_and_wait( # type: ignore[union-attr]
dlq_topic,
value=json.dumps(dlq_payload, default=str).encode("utf-8"),
)
logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'")
except Exception:
logger.exception(
f"CRITICAL: Failed to forward event {event.event_id} "
f"to '{dlq_topic}'. Event is permanently lost."
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Propagate DLQ publish failures so the original offset is not committed.

_consume_loop() commits at Lines 103-106 whenever _process_with_retry() returns. Because _send_to_dlq() swallows send errors, a broken DLQ path still lets the original message get committed and lost without either successful handling or dead-lettering.

Suggested fix
         try:
             # Use the injected producer directly — no circular import needed
             dlq_payload = dlq_event.model_dump()
             await self._producer._producer.send_and_wait(  # type: ignore[union-attr]
                 dlq_topic,
                 value=json.dumps(dlq_payload, default=str).encode("utf-8"),
             )
             logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'")
         except Exception:
             logger.exception(
                 f"CRITICAL: Failed to forward event {event.event_id} "
                 f"to '{dlq_topic}'. Event is permanently lost."
             )
+            raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/consumer.py` around lines 147 - 176, The DLQ routine _send_to_dlq
currently catches and swallows all exceptions which allows _consume_loop to
commit offsets after _process_with_retry returns; change _send_to_dlq to
propagate failures instead of absorbing them — after logging the failure in the
except block re-raise the exception (or raise a specific DLQPublishError) so
callers (_process_with_retry and ultimately _consume_loop) can detect the DLQ
publish failure and avoid committing the original offset; ensure callers either
catch and handle that exception or let it bubble up to prevent message commit.

Comment on lines +51 to +72
async def start(self) -> None:
"""Start the producer, then all registered consumers."""
logger.info("Starting Kafka Manager...")
await self.producer.start()

for consumer in self.consumers:
# Pass bootstrap_servers at start-time — consumers don't store it
await consumer.start(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)

logger.info(
f"Kafka Manager started — {len(self.consumers)} consumer(s) running"
)

async def stop(self) -> None:
"""Stop all consumers first, then the producer."""
logger.info("Stopping Kafka Manager...")

for consumer in self.consumers:
await consumer.stop()

await self.producer.stop()
logger.info("Kafka Manager stopped")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Rollback partial startup and keep shutdown cleanup going.

If one consumer.start() raises after the producer has already started, app/main.py Lines 11-18 never reach shutdown and the earlier resources stay alive. The shutdown path has the mirror problem: one consumer.stop() exception prevents the remaining consumers and the producer from being stopped.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/manager.py` around lines 51 - 72, When starting in
KafkaManager.start(), roll back partial startup if a consumer.start(...) raises:
wrap the consumer start loop so that on exception you iterate over
already-started consumers and call their stop(), then stop self.producer, and
re-raise the original error; reference start(), consumers, producer, and
settings.KAFKA_BOOTSTRAP_SERVERS. Likewise harden KafkaManager.stop() so
exceptions from one consumer.stop() do not abort the rest — catch and log
per-consumer exceptions, continue stopping remaining consumers, and ensure
producer.stop() is always attempted in a finally block (logging any errors) so
full cleanup always runs.

Comment on lines +82 to +87
try:
await self.producer.ping()
return {"status": "healthy"}
except Exception as e:
logger.error(f"Kafka health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't leak broker errors through /health.

health_check() returns str(e), and app/main.py Lines 45-50 expose that payload directly under services.kafka. That can leak broker addresses, auth failures, or TLS details to any caller of the health endpoint.

Suggested fix
-        except Exception as e:
-            logger.error(f"Kafka health check failed: {e}")
-            return {"status": "unhealthy", "error": str(e)}
+        except Exception:
+            logger.exception("Kafka health check failed")
+            return {"status": "unhealthy"}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
await self.producer.ping()
return {"status": "healthy"}
except Exception as e:
logger.error(f"Kafka health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
try:
await self.producer.ping()
return {"status": "healthy"}
except Exception:
logger.exception("Kafka health check failed")
return {"status": "unhealthy"}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/manager.py` around lines 82 - 87, The health_check() in manager.py
currently returns str(e) which can leak broker details; change it so
producer.ping() exceptions are logged with full details via logger.exception or
logger.error(..., exc_info=True) but the returned payload only contains a
generic error (e.g., {"status":"unhealthy","error":"unavailable"} or omit
"error"), so callers (app/main.py -> services.kafka) never see sensitive
broker/auth/TLS info; keep the exception capture around await
self.producer.ping(), reference health_check, self.producer.ping, and logger
when making these changes.

Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
app/main.py (1)

42-52: ⚠️ Potential issue | 🟡 Minor

Sensitive error details may be exposed to external users.

The kafka_health dict may contain an "error" field with internal exception details (per manager.py line 86: "error": str(e)). Exposing raw exception messages can leak implementation details. Consider sanitizing or omitting the error field in the public response.

🛡️ Proposed fix to sanitize error details
 `@app.get`("/health", tags=["health"])
 async def health_check() -> dict:
     kafka_manager = get_kafka_manager()
     kafka_health = await kafka_manager.health_check()
 
+    # Sanitize error details before exposing to clients
+    sanitized_kafka_health = {"status": kafka_health["status"]}
+    if kafka_health["status"] == "unhealthy":
+        sanitized_kafka_health["error"] = "Kafka connection failed"
+
     return {
         "status": "ok" if kafka_health["status"] == "healthy" else "degraded",
         "version": settings.VERSION,
         "services": {
-            "kafka": kafka_health,
+            "kafka": sanitized_kafka_health,
         },
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/main.py` around lines 42 - 52, The health_check function currently
returns kafka_health directly which may include raw exception text; update
app.main.health_check to sanitize or omit sensitive fields from kafka_health
before returning (call get_kafka_manager() and await
kafka_manager.health_check() as before, then remove or replace
kafka_health["error"] with a generic message like "unavailable" or omit the key
entirely when the status isn't healthy). Ensure the returned dict still includes
"status", "version" (settings.VERSION) and a sanitized "services": {"kafka":
...} object so no internal exception strings are exposed.
🧹 Nitpick comments (1)
app/main.py (1)

42-42: Consider a stricter return type annotation.

Using dict as the return type loses type safety. A TypedDict or Pydantic response model would provide better documentation and validation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/main.py` at line 42, The health_check function currently returns a bare
dict which loses type safety; replace its return annotation with a concrete
response model (either a typing.TypedDict like HealthCheckResponse or a Pydantic
model like HealthCheckResponseModel) and update the function signature async def
health_check() -> HealthCheckResponse/HealthCheckResponseModel, add the
corresponding import and definition (ensure keys match the actual returned
payload, e.g., status, version, uptime), and update any callers/tests to use the
new type so the function both documents and validates its response shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/main.py`:
- Around line 12-19: The lifespan asynccontextmanager can leave Kafka resources
running if kafka_manager.start() raises after partially starting; modify the
lifespan function to wrap the startup in try/except/finally (or try/except with
explicit cleanup) so that on any exception during kafka_manager.start() you call
kafka_manager.stop() (or otherwise clean up started components) before
re-raising the error; locate the lifespan function and calls to
get_kafka_manager(), kafka_manager.start(), and kafka_manager.stop() and ensure
partial-start failures trigger kafka_manager.stop() to avoid dangling
producers/consumers.

---

Duplicate comments:
In `@app/main.py`:
- Around line 42-52: The health_check function currently returns kafka_health
directly which may include raw exception text; update app.main.health_check to
sanitize or omit sensitive fields from kafka_health before returning (call
get_kafka_manager() and await kafka_manager.health_check() as before, then
remove or replace kafka_health["error"] with a generic message like
"unavailable" or omit the key entirely when the status isn't healthy). Ensure
the returned dict still includes "status", "version" (settings.VERSION) and a
sanitized "services": {"kafka": ...} object so no internal exception strings are
exposed.

---

Nitpick comments:
In `@app/main.py`:
- Line 42: The health_check function currently returns a bare dict which loses
type safety; replace its return annotation with a concrete response model
(either a typing.TypedDict like HealthCheckResponse or a Pydantic model like
HealthCheckResponseModel) and update the function signature async def
health_check() -> HealthCheckResponse/HealthCheckResponseModel, add the
corresponding import and definition (ensure keys match the actual returned
payload, e.g., status, version, uptime), and update any callers/tests to use the
new type so the function both documents and validates its response shape.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 15762539-9526-481a-a9bf-cfb927ff3cc4

📥 Commits

Reviewing files that changed from the base of the PR and between 6922300 and e90ac9b.

📒 Files selected for processing (3)
  • app/main.py
  • tests/test_kafka/test_consumer.py
  • tests/test_kafka/test_schemas.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/test_kafka/test_consumer.py
  • tests/test_kafka/test_schemas.py

Comment on lines +12 to +19
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# Startup
kafka_manager = get_kafka_manager()
await kafka_manager.start()
yield
# Shutdown
await kafka_manager.stop()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Startup failure leaves resources dangling.

If kafka_manager.start() fails partway through (e.g., producer starts but a consumer fails), the code after yield never executes, leaving the producer running. Wrap the startup in try/except to ensure cleanup on partial failure.

🛠️ Proposed fix
 `@asynccontextmanager`
 async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
-    # Startup
     kafka_manager = get_kafka_manager()
-    await kafka_manager.start()
-    yield
-    # Shutdown
-    await kafka_manager.stop()
+    try:
+        await kafka_manager.start()
+        yield
+    finally:
+        await kafka_manager.stop()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/main.py` around lines 12 - 19, The lifespan asynccontextmanager can leave
Kafka resources running if kafka_manager.start() raises after partially
starting; modify the lifespan function to wrap the startup in try/except/finally
(or try/except with explicit cleanup) so that on any exception during
kafka_manager.start() you call kafka_manager.stop() (or otherwise clean up
started components) before re-raising the error; locate the lifespan function
and calls to get_kafka_manager(), kafka_manager.start(), and
kafka_manager.stop() and ensure partial-start failures trigger
kafka_manager.stop() to avoid dangling producers/consumers.

@aniebietafia aniebietafia merged commit 8c60e6d into main Mar 14, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant