feat: Implement & harden Kafka event bus architecture#36
Conversation
- Add app/kafka/ package — topics.py, schemas.py, producer.py, consumer.py, manager.py, exceptions.py - Integrate KafkaManager singleton lifecycle into FastAPI lifespan (app/main.py) - Extend /health endpoint with live Kafka broker connectivity check - Add new Kafka settings to app/core/config.py: acks, auto-offset-reset, retries, backoff - Inject KafkaProducer into BaseConsumer via KafkaManager.register_consumer() — eliminates circular import - Replace payload-mutating DLQ with a proper DLQEvent wrapper schema - Clean up retry logic from fragile while loop to explicit for attempt in range(...) - Add public ping() and is_started to KafkaProducer; health check no longer accesses private aiokafka internals - Declare _initialized: bool at class level in KafkaManager for Mypy compliance - Add tests/test_kafka/ — schema serialization, producer send/ping, consumer retry & DLQ routing Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds a Kafka subsystem: producer, consumer base with retries and DLQ, schemas, exceptions, topic constants, a singleton manager, config settings for Kafka and Mailgun, FastAPI lifespan integration and health checks, docker-compose for Kafka stack, and new tests for Kafka components and health endpoints. Changes
Sequence Diagram(s)sequenceDiagram
participant App as FastAPI
participant Manager as KafkaManager
participant Producer as KafkaProducer
participant Broker as Kafka Broker
participant Consumer as BaseConsumer
participant DLQ as DLQ Topic
App->>Manager: startup (lifespan)
Manager->>Producer: start()
Producer->>Broker: connect & fetch metadata
Manager->>Consumer: start(bootstrap_servers)
Consumer->>Broker: subscribe & begin consuming
App->>Producer: send(topic, event)
Producer->>Broker: publish serialized event
Broker-->>Consumer: deliver message
Consumer->>Consumer: _process_with_retry(event)
Consumer->>Consumer: handle(event) attempt 1
alt success within retries
Consumer->>Broker: commit offset
else exhausted retries
Consumer->>DLQ: publish DLQEvent (dlq.{topic})
Consumer->>Broker: commit offset
end
App->>Manager: shutdown (lifespan)
Manager->>Consumer: stop()
Manager->>Producer: stop()
sequenceDiagram
participant Client as /health endpoint
participant Manager as KafkaManager
participant Producer as KafkaProducer
participant Broker as Kafka Broker
Client->>Manager: health_check()
Manager->>Producer: ping()
Producer->>Broker: force_metadata_update()
alt Broker reachable
Broker-->>Producer: success
Producer-->>Manager: healthy
else Broker unreachable
Broker-->>Producer: error/timeout
Producer-->>Manager: unhealthy (error)
end
Manager-->>Client: {status: "ok"|"degraded", services: {kafka: {...}}, version: ...}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| return { | ||
| "status": "ok" if kafka_health["status"] == "healthy" else "degraded", | ||
| "version": settings.VERSION, | ||
| "services": { | ||
| "kafka": kafka_health, | ||
| }, | ||
| } |
Check warning
Code scanning / CodeQL
Information exposure through an exception Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
In general, the fix is to avoid returning raw exception messages (or anything derived directly from them) to the client. Instead, log the detailed error on the server and return a generic, non-sensitive description in the API response. For this case, that means changing KafkaManager.health_check so that the returned dictionary contains a generic "error" or "details" field, not str(e).
The best minimal fix without changing the external API shape of /health is:
- Keep logging
ewithlogger.error(...)so operators retain diagnostic information. - Replace the
"error": str(e)value inKafkaManager.health_checkwith a constant, generic message such as"Kafka health check failed"that does not include exception details. - Optionally, we can keep an additional non-sensitive hint like
"details": "Unable to reach Kafka broker"if useful; the key point is to remove direct use of the exception object in the returned payload.
Only app/kafka/manager.py needs modification: in the except Exception as e: block of health_check, adjust the returned dict. No new imports or helper methods are required.
| @@ -83,8 +83,12 @@ | ||
| await self.producer.ping() | ||
| return {"status": "healthy"} | ||
| except Exception as e: | ||
| logger.error(f"Kafka health check failed: {e}") | ||
| return {"status": "unhealthy", "error": str(e)} | ||
| # Log full details server-side, but return only a generic message to clients | ||
| logger.error("Kafka health check failed", exc_info=e) | ||
| return { | ||
| "status": "unhealthy", | ||
| "error": "Kafka health check failed. Please contact the system administrator.", | ||
| } | ||
|
|
||
|
|
||
| def get_kafka_manager() -> KafkaManager: |
| f"Unrecoverable error on message from '{self.topic}'. " | ||
| f"Skipping commit — offset will be re-delivered on restart." | ||
| ) | ||
| except asyncio.CancelledError: |
Check notice
Code scanning / CodeQL
Empty except Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
General approach: avoid a bare pass in the except asyncio.CancelledError block by either logging the cancellation or documenting it with a comment and possibly re-raising. We want to preserve existing shutdown behavior, where cancellation during stop() is treated as normal and does not log as an error.
Best specific fix: on line 112–113, replace the empty handler:
except asyncio.CancelledError:
passwith a handler that records that the consumer loop was cancelled in a non-noisy way, e.g.:
except asyncio.CancelledError:
logger.info(f"Consumer loop for '{self.topic}' was cancelled")
# Cancellation is expected during shutdown; no further action required.or use logger.debug if you want less frequent logs. This preserves the semantics (we still swallow the cancellation and don’t propagate it as an error) but removes the empty except, explains the intent, and satisfies CodeQL. No new imports or helper methods are required; logger and asyncio are already available in this file.
Location details:
- File:
app/kafka/consumer.py - Region: the
_consume_loopmethod, the outertry/exceptaround theasync for msg in self._consumerloop, currently lines 112–113.
| @@ -110,7 +110,8 @@ | ||
| f"Skipping commit — offset will be re-delivered on restart." | ||
| ) | ||
| except asyncio.CancelledError: | ||
| pass | ||
| logger.info(f"Consumer loop for '{self.topic}' was cancelled") | ||
| # Cancellation is expected during graceful shutdown; no further action required. | ||
| except Exception: | ||
| logger.exception(f"Unexpected error in consumer loop for '{self.topic}'") | ||
|
|
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (4)
docker-compose.yml (2)
66-69: Remove unusedpostgres_datavolume declaration.
postgres_datais declared on Line 67 but no Postgres service mounts it. This is stale config and adds confusion during local setup.♻️ Proposed cleanup
volumes: - postgres_data: redis_data: kafka_data:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.yml` around lines 66 - 69, Remove the stale volume declaration postgres_data from the volumes section in docker-compose.yml since no Postgres service mounts it; delete the postgres_data entry and verify no services (e.g., any compose service configs) reference postgres_data elsewhere to avoid breaking mounts.
3-64: Document/align app container env values for Docker networking.Given
app/core/config.pyand.env.exampledefault tolocalhostfor Redis/Kafka, a containerized app in this compose network must useREDIS_HOST=redisandKAFKA_BOOTSTRAP_SERVERS=kafka:29092; otherwise connections will fail. Please add this to compose docs or an app service/env override.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.yml` around lines 3 - 64, The compose file currently exposes redis and kafka services on the network but the app defaults in app/core/config.py and .env.example point to localhost; update docker-compose to provide container-friendly environment overrides (e.g., set REDIS_HOST=redis and KAFKA_BOOTSTRAP_SERVERS=kafka:29092) for the application service or add clear docs in the compose comments explaining that in-container apps must use service names (redis, kafka:29092) instead of localhost; refer to the redis and kafka service names in the diff and the REDIS_HOST / KAFKA_BOOTSTRAP_SERVERS env keys to locate where to add the overrides.tests/test_main.py (1)
6-6: Module-level TestClient triggers lifespan during import.Instantiating
TestClient(app)at module level means the FastAPI lifespan (which callskafka_manager.start()) runs when the test module is imported. If Kafka is unavailable, this could cause import-time failures affecting other tests.Consider using a pytest fixture with appropriate scope, or wrapping in a context manager within each test.
♻️ Proposed fix using fixture
-client = TestClient(app) + +@pytest.fixture(scope="module") +def client(): + with TestClient(app) as c: + yield c -def test_health_check(): +def test_health_check(client): response = client.get("/health")Don't forget to add
import pytestat the top of the file.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_main.py` at line 6, The TestClient instantiation at module level causes FastAPI lifespan (which calls kafka_manager.start()) to run on import; change this by creating a pytest fixture that yields TestClient(app) (import pytest at top) and use that fixture in tests or instantiate TestClient within a context manager inside each test so the lifespan runs per-test and doesn't trigger kafka_manager.start() during module import; update references to the module-level client to use the new fixture name (or local context) instead.app/kafka/schemas.py (1)
17-17: Consider usingdatetime.now(timezone.utc)instead ofdatetime.utcnow().
datetime.utcnow()is deprecated since Python 3.12 and emits warnings. The recommended replacement isdatetime.now(timezone.utc)which returns a timezone-aware datetime.♻️ Proposed fix
+from datetime import datetime, timezone -from datetime import datetime- timestamp: datetime = Field(default_factory=datetime.utcnow) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))Apply the same change to
DLQEvent.failed_aton line 31.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/kafka/schemas.py` at line 17, Replace naive UTC construction with timezone-aware datetimes: change the Field default for timestamp (timestamp: datetime = Field(default_factory=datetime.utcnow)) to use datetime.now(timezone.utc) and do the same for DLQEvent.failed_at; update imports if necessary to include timezone from datetime and ensure both Field default_factory call sites reference datetime.now(timezone.utc) so the produced datetimes are timezone-aware.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/kafka/consumer.py`:
- Around line 147-176: The DLQ routine _send_to_dlq currently catches and
swallows all exceptions which allows _consume_loop to commit offsets after
_process_with_retry returns; change _send_to_dlq to propagate failures instead
of absorbing them — after logging the failure in the except block re-raise the
exception (or raise a specific DLQPublishError) so callers (_process_with_retry
and ultimately _consume_loop) can detect the DLQ publish failure and avoid
committing the original offset; ensure callers either catch and handle that
exception or let it bubble up to prevent message commit.
- Around line 59-76: The guard sets self._running = True before awaiting
self._consumer.start(), which leaves the consumer stuck if start() raises;
change the start sequence in the start method so you first construct
AIOKafkaConsumer, await self._consumer.start(), then set self._running = True
and only then create the asyncio task for self._consume_loop(); additionally, if
start() fails ensure you do not leave self._consumer or self._running in an
inconsistent state (set self._running = False and cleanup/close self._consumer
on exception) so subsequent retries can proceed.
In `@app/kafka/manager.py`:
- Around line 51-72: When starting in KafkaManager.start(), roll back partial
startup if a consumer.start(...) raises: wrap the consumer start loop so that on
exception you iterate over already-started consumers and call their stop(), then
stop self.producer, and re-raise the original error; reference start(),
consumers, producer, and settings.KAFKA_BOOTSTRAP_SERVERS. Likewise harden
KafkaManager.stop() so exceptions from one consumer.stop() do not abort the rest
— catch and log per-consumer exceptions, continue stopping remaining consumers,
and ensure producer.stop() is always attempted in a finally block (logging any
errors) so full cleanup always runs.
- Around line 82-87: The health_check() in manager.py currently returns str(e)
which can leak broker details; change it so producer.ping() exceptions are
logged with full details via logger.exception or logger.error(...,
exc_info=True) but the returned payload only contains a generic error (e.g.,
{"status":"unhealthy","error":"unavailable"} or omit "error"), so callers
(app/main.py -> services.kafka) never see sensitive broker/auth/TLS info; keep
the exception capture around await self.producer.ping(), reference health_check,
self.producer.ping, and logger when making these changes.
In `@tests/test_kafka/test_consumer.py`:
- Around line 1-2: Remove the unused asyncio import and fix the unused parameter
in the flaky_handle function: delete the top-level "asyncio" import from
tests/test_kafka/test_consumer.py (since it's unused) and update the
flaky_handle signature to either remove the unused callback parameter or rename
it to _callback (or use *_, _unused) so the linter no longer flags an unused
parameter; ensure references to flaky_handle remain consistent where it's used
in the tests.
In `@tests/test_kafka/test_schemas.py`:
- Around line 1-5: Remove the unused top-level import "pytest" from the test
module (it's currently imported alongside uuid and datetime) to satisfy linting
(ruff F401) and isort checks; locate the import line that reads "import pytest"
in the test_schemas module and delete it, and run the tests/linter to confirm no
other references to "pytest" remain.
---
Nitpick comments:
In `@app/kafka/schemas.py`:
- Line 17: Replace naive UTC construction with timezone-aware datetimes: change
the Field default for timestamp (timestamp: datetime =
Field(default_factory=datetime.utcnow)) to use datetime.now(timezone.utc) and do
the same for DLQEvent.failed_at; update imports if necessary to include timezone
from datetime and ensure both Field default_factory call sites reference
datetime.now(timezone.utc) so the produced datetimes are timezone-aware.
In `@docker-compose.yml`:
- Around line 66-69: Remove the stale volume declaration postgres_data from the
volumes section in docker-compose.yml since no Postgres service mounts it;
delete the postgres_data entry and verify no services (e.g., any compose service
configs) reference postgres_data elsewhere to avoid breaking mounts.
- Around line 3-64: The compose file currently exposes redis and kafka services
on the network but the app defaults in app/core/config.py and .env.example point
to localhost; update docker-compose to provide container-friendly environment
overrides (e.g., set REDIS_HOST=redis and KAFKA_BOOTSTRAP_SERVERS=kafka:29092)
for the application service or add clear docs in the compose comments explaining
that in-container apps must use service names (redis, kafka:29092) instead of
localhost; refer to the redis and kafka service names in the diff and the
REDIS_HOST / KAFKA_BOOTSTRAP_SERVERS env keys to locate where to add the
overrides.
In `@tests/test_main.py`:
- Line 6: The TestClient instantiation at module level causes FastAPI lifespan
(which calls kafka_manager.start()) to run on import; change this by creating a
pytest fixture that yields TestClient(app) (import pytest at top) and use that
fixture in tests or instantiate TestClient within a context manager inside each
test so the lifespan runs per-test and doesn't trigger kafka_manager.start()
during module import; update references to the module-level client to use the
new fixture name (or local context) instead.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a4d1e138-1277-44a4-a782-297910d64cc8
📒 Files selected for processing (16)
app/core/config.pyapp/kafka/__init__.pyapp/kafka/consumer.pyapp/kafka/exceptions.pyapp/kafka/manager.pyapp/kafka/producer.pyapp/kafka/schemas.pyapp/kafka/topics.pyapp/main.pydocker-compose.ymlinfra/docker-compose.ymltests/test_error_handling.pytests/test_kafka/test_consumer.pytests/test_kafka/test_producer.pytests/test_kafka/test_schemas.pytests/test_main.py
💤 Files with no reviewable changes (1)
- infra/docker-compose.yml
| if self._running: | ||
| return | ||
|
|
||
| self._running = True | ||
| self._consumer = AIOKafkaConsumer( | ||
| self.topic, | ||
| bootstrap_servers=bootstrap_servers, | ||
| group_id=self.group_id, | ||
| auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET, | ||
| # Manual commit: offsets are committed only after handle() succeeds, | ||
| # preventing silent message loss on pod restart mid-retry. | ||
| enable_auto_commit=False, | ||
| value_deserializer=lambda v: json.loads(v.decode("utf-8")), | ||
| ) | ||
| await self._consumer.start() | ||
| logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started") | ||
|
|
||
| self._task = asyncio.create_task(self._consume_loop()) |
There was a problem hiding this comment.
Only set _running after the consumer actually starts.
If await self._consumer.start() fails, _running stays True and the guard at Line 59 turns every later retry into a no-op. That leaves the consumer wedged until the process is restarted.
Suggested fix
- self._running = True
- self._consumer = AIOKafkaConsumer(
+ consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=bootstrap_servers,
group_id=self.group_id,
auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
# Manual commit: offsets are committed only after handle() succeeds,
# preventing silent message loss on pod restart mid-retry.
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
- await self._consumer.start()
+ try:
+ await consumer.start()
+ except Exception:
+ with contextlib.suppress(Exception):
+ await consumer.stop()
+ raise
+ self._consumer = consumer
+ self._running = True
logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/kafka/consumer.py` around lines 59 - 76, The guard sets self._running =
True before awaiting self._consumer.start(), which leaves the consumer stuck if
start() raises; change the start sequence in the start method so you first
construct AIOKafkaConsumer, await self._consumer.start(), then set self._running
= True and only then create the asyncio task for self._consume_loop();
additionally, if start() fails ensure you do not leave self._consumer or
self._running in an inconsistent state (set self._running = False and
cleanup/close self._consumer on exception) so subsequent retries can proceed.
| async def _send_to_dlq( | ||
| self, event: BaseEvent[Any], error_message: str, retries: int | ||
| ) -> None: | ||
| """ | ||
| Forward a failed event to its Dead Letter Queue topic. | ||
| Wraps it in a DLQEvent — a proper structured schema — instead of | ||
| mutating the original event payload. | ||
| """ | ||
| dlq_topic = f"{DLQ_PREFIX}{self.topic}" | ||
| dlq_event = DLQEvent( | ||
| original_event_id=event.event_id, | ||
| original_topic=self.topic, | ||
| original_event=event.model_dump(), | ||
| error_message=error_message, | ||
| retry_count=retries, | ||
| ) | ||
|
|
||
| try: | ||
| # Use the injected producer directly — no circular import needed | ||
| dlq_payload = dlq_event.model_dump() | ||
| await self._producer._producer.send_and_wait( # type: ignore[union-attr] | ||
| dlq_topic, | ||
| value=json.dumps(dlq_payload, default=str).encode("utf-8"), | ||
| ) | ||
| logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'") | ||
| except Exception: | ||
| logger.exception( | ||
| f"CRITICAL: Failed to forward event {event.event_id} " | ||
| f"to '{dlq_topic}'. Event is permanently lost." | ||
| ) |
There was a problem hiding this comment.
Propagate DLQ publish failures so the original offset is not committed.
_consume_loop() commits at Lines 103-106 whenever _process_with_retry() returns. Because _send_to_dlq() swallows send errors, a broken DLQ path still lets the original message get committed and lost without either successful handling or dead-lettering.
Suggested fix
try:
# Use the injected producer directly — no circular import needed
dlq_payload = dlq_event.model_dump()
await self._producer._producer.send_and_wait( # type: ignore[union-attr]
dlq_topic,
value=json.dumps(dlq_payload, default=str).encode("utf-8"),
)
logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'")
except Exception:
logger.exception(
f"CRITICAL: Failed to forward event {event.event_id} "
f"to '{dlq_topic}'. Event is permanently lost."
)
+ raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/kafka/consumer.py` around lines 147 - 176, The DLQ routine _send_to_dlq
currently catches and swallows all exceptions which allows _consume_loop to
commit offsets after _process_with_retry returns; change _send_to_dlq to
propagate failures instead of absorbing them — after logging the failure in the
except block re-raise the exception (or raise a specific DLQPublishError) so
callers (_process_with_retry and ultimately _consume_loop) can detect the DLQ
publish failure and avoid committing the original offset; ensure callers either
catch and handle that exception or let it bubble up to prevent message commit.
| async def start(self) -> None: | ||
| """Start the producer, then all registered consumers.""" | ||
| logger.info("Starting Kafka Manager...") | ||
| await self.producer.start() | ||
|
|
||
| for consumer in self.consumers: | ||
| # Pass bootstrap_servers at start-time — consumers don't store it | ||
| await consumer.start(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS) | ||
|
|
||
| logger.info( | ||
| f"Kafka Manager started — {len(self.consumers)} consumer(s) running" | ||
| ) | ||
|
|
||
| async def stop(self) -> None: | ||
| """Stop all consumers first, then the producer.""" | ||
| logger.info("Stopping Kafka Manager...") | ||
|
|
||
| for consumer in self.consumers: | ||
| await consumer.stop() | ||
|
|
||
| await self.producer.stop() | ||
| logger.info("Kafka Manager stopped") |
There was a problem hiding this comment.
Rollback partial startup and keep shutdown cleanup going.
If one consumer.start() raises after the producer has already started, app/main.py Lines 11-18 never reach shutdown and the earlier resources stay alive. The shutdown path has the mirror problem: one consumer.stop() exception prevents the remaining consumers and the producer from being stopped.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/kafka/manager.py` around lines 51 - 72, When starting in
KafkaManager.start(), roll back partial startup if a consumer.start(...) raises:
wrap the consumer start loop so that on exception you iterate over
already-started consumers and call their stop(), then stop self.producer, and
re-raise the original error; reference start(), consumers, producer, and
settings.KAFKA_BOOTSTRAP_SERVERS. Likewise harden KafkaManager.stop() so
exceptions from one consumer.stop() do not abort the rest — catch and log
per-consumer exceptions, continue stopping remaining consumers, and ensure
producer.stop() is always attempted in a finally block (logging any errors) so
full cleanup always runs.
| try: | ||
| await self.producer.ping() | ||
| return {"status": "healthy"} | ||
| except Exception as e: | ||
| logger.error(f"Kafka health check failed: {e}") | ||
| return {"status": "unhealthy", "error": str(e)} |
There was a problem hiding this comment.
Don't leak broker errors through /health.
health_check() returns str(e), and app/main.py Lines 45-50 expose that payload directly under services.kafka. That can leak broker addresses, auth failures, or TLS details to any caller of the health endpoint.
Suggested fix
- except Exception as e:
- logger.error(f"Kafka health check failed: {e}")
- return {"status": "unhealthy", "error": str(e)}
+ except Exception:
+ logger.exception("Kafka health check failed")
+ return {"status": "unhealthy"}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| await self.producer.ping() | |
| return {"status": "healthy"} | |
| except Exception as e: | |
| logger.error(f"Kafka health check failed: {e}") | |
| return {"status": "unhealthy", "error": str(e)} | |
| try: | |
| await self.producer.ping() | |
| return {"status": "healthy"} | |
| except Exception: | |
| logger.exception("Kafka health check failed") | |
| return {"status": "unhealthy"} |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/kafka/manager.py` around lines 82 - 87, The health_check() in manager.py
currently returns str(e) which can leak broker details; change it so
producer.ping() exceptions are logged with full details via logger.exception or
logger.error(..., exc_info=True) but the returned payload only contains a
generic error (e.g., {"status":"unhealthy","error":"unavailable"} or omit
"error"), so callers (app/main.py -> services.kafka) never see sensitive
broker/auth/TLS info; keep the exception capture around await
self.producer.ping(), reference health_check, self.producer.ping, and logger
when making these changes.
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
app/main.py (1)
42-52:⚠️ Potential issue | 🟡 MinorSensitive error details may be exposed to external users.
The
kafka_healthdict may contain an"error"field with internal exception details (permanager.pyline 86:"error": str(e)). Exposing raw exception messages can leak implementation details. Consider sanitizing or omitting the error field in the public response.🛡️ Proposed fix to sanitize error details
`@app.get`("/health", tags=["health"]) async def health_check() -> dict: kafka_manager = get_kafka_manager() kafka_health = await kafka_manager.health_check() + # Sanitize error details before exposing to clients + sanitized_kafka_health = {"status": kafka_health["status"]} + if kafka_health["status"] == "unhealthy": + sanitized_kafka_health["error"] = "Kafka connection failed" + return { "status": "ok" if kafka_health["status"] == "healthy" else "degraded", "version": settings.VERSION, "services": { - "kafka": kafka_health, + "kafka": sanitized_kafka_health, }, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/main.py` around lines 42 - 52, The health_check function currently returns kafka_health directly which may include raw exception text; update app.main.health_check to sanitize or omit sensitive fields from kafka_health before returning (call get_kafka_manager() and await kafka_manager.health_check() as before, then remove or replace kafka_health["error"] with a generic message like "unavailable" or omit the key entirely when the status isn't healthy). Ensure the returned dict still includes "status", "version" (settings.VERSION) and a sanitized "services": {"kafka": ...} object so no internal exception strings are exposed.
🧹 Nitpick comments (1)
app/main.py (1)
42-42: Consider a stricter return type annotation.Using
dictas the return type loses type safety. ATypedDictor Pydantic response model would provide better documentation and validation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/main.py` at line 42, The health_check function currently returns a bare dict which loses type safety; replace its return annotation with a concrete response model (either a typing.TypedDict like HealthCheckResponse or a Pydantic model like HealthCheckResponseModel) and update the function signature async def health_check() -> HealthCheckResponse/HealthCheckResponseModel, add the corresponding import and definition (ensure keys match the actual returned payload, e.g., status, version, uptime), and update any callers/tests to use the new type so the function both documents and validates its response shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/main.py`:
- Around line 12-19: The lifespan asynccontextmanager can leave Kafka resources
running if kafka_manager.start() raises after partially starting; modify the
lifespan function to wrap the startup in try/except/finally (or try/except with
explicit cleanup) so that on any exception during kafka_manager.start() you call
kafka_manager.stop() (or otherwise clean up started components) before
re-raising the error; locate the lifespan function and calls to
get_kafka_manager(), kafka_manager.start(), and kafka_manager.stop() and ensure
partial-start failures trigger kafka_manager.stop() to avoid dangling
producers/consumers.
---
Duplicate comments:
In `@app/main.py`:
- Around line 42-52: The health_check function currently returns kafka_health
directly which may include raw exception text; update app.main.health_check to
sanitize or omit sensitive fields from kafka_health before returning (call
get_kafka_manager() and await kafka_manager.health_check() as before, then
remove or replace kafka_health["error"] with a generic message like
"unavailable" or omit the key entirely when the status isn't healthy). Ensure
the returned dict still includes "status", "version" (settings.VERSION) and a
sanitized "services": {"kafka": ...} object so no internal exception strings are
exposed.
---
Nitpick comments:
In `@app/main.py`:
- Line 42: The health_check function currently returns a bare dict which loses
type safety; replace its return annotation with a concrete response model
(either a typing.TypedDict like HealthCheckResponse or a Pydantic model like
HealthCheckResponseModel) and update the function signature async def
health_check() -> HealthCheckResponse/HealthCheckResponseModel, add the
corresponding import and definition (ensure keys match the actual returned
payload, e.g., status, version, uptime), and update any callers/tests to use the
new type so the function both documents and validates its response shape.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 15762539-9526-481a-a9bf-cfb927ff3cc4
📒 Files selected for processing (3)
app/main.pytests/test_kafka/test_consumer.pytests/test_kafka/test_schemas.py
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/test_kafka/test_consumer.py
- tests/test_kafka/test_schemas.py
| @asynccontextmanager | ||
| async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: | ||
| # Startup | ||
| kafka_manager = get_kafka_manager() | ||
| await kafka_manager.start() | ||
| yield | ||
| # Shutdown | ||
| await kafka_manager.stop() |
There was a problem hiding this comment.
Startup failure leaves resources dangling.
If kafka_manager.start() fails partway through (e.g., producer starts but a consumer fails), the code after yield never executes, leaving the producer running. Wrap the startup in try/except to ensure cleanup on partial failure.
🛠️ Proposed fix
`@asynccontextmanager`
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
- # Startup
kafka_manager = get_kafka_manager()
- await kafka_manager.start()
- yield
- # Shutdown
- await kafka_manager.stop()
+ try:
+ await kafka_manager.start()
+ yield
+ finally:
+ await kafka_manager.stop()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/main.py` around lines 12 - 19, The lifespan asynccontextmanager can leave
Kafka resources running if kafka_manager.start() raises after partially
starting; modify the lifespan function to wrap the startup in try/except/finally
(or try/except with explicit cleanup) so that on any exception during
kafka_manager.start() you call kafka_manager.stop() (or otherwise clean up
started components) before re-raising the error; locate the lifespan function
and calls to get_kafka_manager(), kafka_manager.start(), and
kafka_manager.stop() and ensure partial-start failures trigger
kafka_manager.stop() to avoid dangling producers/consumers.
Summary by CodeRabbit
New Features
Chores