-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Implement & harden Kafka event bus architecture #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| from app.kafka.manager import KafkaManager, get_kafka_manager | ||
| from app.kafka.producer import KafkaProducer | ||
| from app.kafka.schemas import BaseEvent | ||
|
|
||
| __all__ = ["BaseEvent", "KafkaManager", "KafkaProducer", "get_kafka_manager"] |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,180 @@ | ||||||||||||||||||||||||||||||||
| import abc | ||||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||||
| import contextlib | ||||||||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped] | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| from app.core.config import settings | ||||||||||||||||||||||||||||||||
| from app.kafka.schemas import BaseEvent, DLQEvent | ||||||||||||||||||||||||||||||||
| from app.kafka.topics import DLQ_PREFIX | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| class BaseConsumer(abc.ABC): | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| Abstract base class for all Kafka consumers. | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Subclasses must declare class-level attributes: | ||||||||||||||||||||||||||||||||
| topic: str — the Kafka topic to subscribe to | ||||||||||||||||||||||||||||||||
| group_id: str — the consumer group identifier | ||||||||||||||||||||||||||||||||
| event_schema: Type — the Pydantic BaseEvent subclass for deserialization | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Features: | ||||||||||||||||||||||||||||||||
| - Manual offset commits (offsets only committed after successful handle()) | ||||||||||||||||||||||||||||||||
| - Configurable linear backoff retry with KAFKA_MAX_RETRIES | ||||||||||||||||||||||||||||||||
| - Dead-letter queue (DLQ) forwarding via a proper DLQEvent wrapper | ||||||||||||||||||||||||||||||||
| - Graceful shutdown via asyncio.Task cancellation | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| topic: str | ||||||||||||||||||||||||||||||||
| group_id: str | ||||||||||||||||||||||||||||||||
| event_schema: type[BaseEvent[Any]] | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Declared here so Mypy can track it on the class body | ||||||||||||||||||||||||||||||||
| _initialized: bool = False | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def __init__(self, producer: Any) -> None: | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||
| producer: A KafkaProducer instance injected by KafkaManager. | ||||||||||||||||||||||||||||||||
| Used to forward failed events to the DLQ. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| # Import here to avoid a circular module-level import | ||||||||||||||||||||||||||||||||
| from app.kafka.producer import KafkaProducer | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| self._producer: KafkaProducer = producer | ||||||||||||||||||||||||||||||||
| self._consumer: AIOKafkaConsumer | None = None | ||||||||||||||||||||||||||||||||
| self._running: bool = False | ||||||||||||||||||||||||||||||||
| self._task: asyncio.Task[None] | None = None | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async def start(self, bootstrap_servers: str) -> None: | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| Start the consumer background task. | ||||||||||||||||||||||||||||||||
| Called by KafkaManager, which supplies the bootstrap_servers string. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| if self._running: | ||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| self._running = True | ||||||||||||||||||||||||||||||||
| self._consumer = AIOKafkaConsumer( | ||||||||||||||||||||||||||||||||
| self.topic, | ||||||||||||||||||||||||||||||||
| bootstrap_servers=bootstrap_servers, | ||||||||||||||||||||||||||||||||
| group_id=self.group_id, | ||||||||||||||||||||||||||||||||
| auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET, | ||||||||||||||||||||||||||||||||
| # Manual commit: offsets are committed only after handle() succeeds, | ||||||||||||||||||||||||||||||||
| # preventing silent message loss on pod restart mid-retry. | ||||||||||||||||||||||||||||||||
| enable_auto_commit=False, | ||||||||||||||||||||||||||||||||
| value_deserializer=lambda v: json.loads(v.decode("utf-8")), | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| await self._consumer.start() | ||||||||||||||||||||||||||||||||
| logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started") | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| self._task = asyncio.create_task(self._consume_loop()) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async def stop(self) -> None: | ||||||||||||||||||||||||||||||||
| """Stop the consumer background task gracefully.""" | ||||||||||||||||||||||||||||||||
| self._running = False | ||||||||||||||||||||||||||||||||
| if self._task: | ||||||||||||||||||||||||||||||||
| self._task.cancel() | ||||||||||||||||||||||||||||||||
| with contextlib.suppress(asyncio.CancelledError): | ||||||||||||||||||||||||||||||||
| await self._task | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if self._consumer: | ||||||||||||||||||||||||||||||||
| await self._consumer.stop() | ||||||||||||||||||||||||||||||||
| self._consumer = None | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| logger.info(f"Consumer for '{self.topic}' stopped") | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async def _consume_loop(self) -> None: | ||||||||||||||||||||||||||||||||
| """Main consumption loop.""" | ||||||||||||||||||||||||||||||||
| if not self._consumer: | ||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||
| async for msg in self._consumer: | ||||||||||||||||||||||||||||||||
| if not self._running: | ||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||
| event = self.event_schema.model_validate(msg.value) | ||||||||||||||||||||||||||||||||
| await self._process_with_retry(event) | ||||||||||||||||||||||||||||||||
| # Only commit after successful processing | ||||||||||||||||||||||||||||||||
| await self._consumer.commit() | ||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||
| logger.exception( | ||||||||||||||||||||||||||||||||
| f"Unrecoverable error on message from '{self.topic}'. " | ||||||||||||||||||||||||||||||||
| f"Skipping commit — offset will be re-delivered on restart." | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| except asyncio.CancelledError: | ||||||||||||||||||||||||||||||||
Check noticeCode scanning / CodeQL Empty except Note
'except' clause does nothing but pass and there is no explanatory comment.
Copilot AutofixAI 5 days ago General approach: avoid a bare Best specific fix: on line 112–113, replace the empty handler: except asyncio.CancelledError:
passwith a handler that records that the consumer loop was cancelled in a non-noisy way, e.g.: except asyncio.CancelledError:
logger.info(f"Consumer loop for '{self.topic}' was cancelled")
# Cancellation is expected during shutdown; no further action required.or use Location details:
Suggested changeset
1
app/kafka/consumer.py
Copilot is powered by AI and may make mistakes. Always verify output.
Refresh and try again.
|
||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||
| logger.exception(f"Unexpected error in consumer loop for '{self.topic}'") | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async def _process_with_retry(self, event: BaseEvent[Any]) -> None: | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| Process an event with linear backoff retries. | ||||||||||||||||||||||||||||||||
| After exhausting all retries, routes the event to the DLQ. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| last_error: Exception | None = None | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for attempt in range(settings.KAFKA_MAX_RETRIES + 1): | ||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||
| await self.handle(event) | ||||||||||||||||||||||||||||||||
| return # Success | ||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||
| last_error = e | ||||||||||||||||||||||||||||||||
| if attempt < settings.KAFKA_MAX_RETRIES: | ||||||||||||||||||||||||||||||||
| wait_secs = (settings.KAFKA_RETRY_BACKOFF_MS / 1000) * (attempt + 1) | ||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||
| f"Retry {attempt + 1}/{settings.KAFKA_MAX_RETRIES} " | ||||||||||||||||||||||||||||||||
| f"for event {event.event_id} in {wait_secs:.1f}s. " | ||||||||||||||||||||||||||||||||
| f"Reason: {e}" | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| await asyncio.sleep(wait_secs) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| logger.error( | ||||||||||||||||||||||||||||||||
| f"Event {event.event_id} failed after " | ||||||||||||||||||||||||||||||||
| f"{settings.KAFKA_MAX_RETRIES} retries. Routing to DLQ." | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| await self._send_to_dlq( | ||||||||||||||||||||||||||||||||
| event, str(last_error), retries=settings.KAFKA_MAX_RETRIES | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async def _send_to_dlq( | ||||||||||||||||||||||||||||||||
| self, event: BaseEvent[Any], error_message: str, retries: int | ||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| Forward a failed event to its Dead Letter Queue topic. | ||||||||||||||||||||||||||||||||
| Wraps it in a DLQEvent — a proper structured schema — instead of | ||||||||||||||||||||||||||||||||
| mutating the original event payload. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| dlq_topic = f"{DLQ_PREFIX}{self.topic}" | ||||||||||||||||||||||||||||||||
| dlq_event = DLQEvent( | ||||||||||||||||||||||||||||||||
| original_event_id=event.event_id, | ||||||||||||||||||||||||||||||||
| original_topic=self.topic, | ||||||||||||||||||||||||||||||||
| original_event=event.model_dump(), | ||||||||||||||||||||||||||||||||
| error_message=error_message, | ||||||||||||||||||||||||||||||||
| retry_count=retries, | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||
| # Use the injected producer directly — no circular import needed | ||||||||||||||||||||||||||||||||
| dlq_payload = dlq_event.model_dump() | ||||||||||||||||||||||||||||||||
| await self._producer._producer.send_and_wait( # type: ignore[union-attr] | ||||||||||||||||||||||||||||||||
| dlq_topic, | ||||||||||||||||||||||||||||||||
| value=json.dumps(dlq_payload, default=str).encode("utf-8"), | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'") | ||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||
| logger.exception( | ||||||||||||||||||||||||||||||||
| f"CRITICAL: Failed to forward event {event.event_id} " | ||||||||||||||||||||||||||||||||
| f"to '{dlq_topic}'. Event is permanently lost." | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
Comment on lines
+147
to
+176
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Propagate DLQ publish failures so the original offset is not committed.
Suggested fix try:
# Use the injected producer directly — no circular import needed
dlq_payload = dlq_event.model_dump()
await self._producer._producer.send_and_wait( # type: ignore[union-attr]
dlq_topic,
value=json.dumps(dlq_payload, default=str).encode("utf-8"),
)
logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'")
except Exception:
logger.exception(
f"CRITICAL: Failed to forward event {event.event_id} "
f"to '{dlq_topic}'. Event is permanently lost."
)
+ raise🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| @abc.abstractmethod | ||||||||||||||||||||||||||||||||
| async def handle(self, event: BaseEvent[Any]) -> None: | ||||||||||||||||||||||||||||||||
| """Implement message processing logic in subclasses.""" | ||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| from app.core.exceptions import FluentMeetException | ||
|
|
||
|
|
||
| class KafkaError(FluentMeetException): | ||
| """Base exception for Kafka-related errors.""" | ||
|
|
||
| def __init__(self, message: str, code: str = "KAFKA_ERROR") -> None: | ||
| super().__init__(status_code=500, code=code, message=message) | ||
|
|
||
|
|
||
| class KafkaConnectionError(KafkaError): | ||
| def __init__(self, message: str = "Failed to connect to Kafka broker") -> None: | ||
| super().__init__(message, code="KAFKA_CONNECTION_ERROR") | ||
|
|
||
|
|
||
| class KafkaPublishError(KafkaError): | ||
| def __init__(self, message: str = "Failed to publish message to Kafka") -> None: | ||
| super().__init__(message, code="KAFKA_PUBLISH_ERROR") | ||
|
|
||
|
|
||
| class KafkaConsumeError(KafkaError): | ||
| def __init__(self, message: str = "Failed to consume message from Kafka") -> None: | ||
| super().__init__(message, code="KAFKA_CONSUME_ERROR") |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,92 @@ | ||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||
| from typing import Optional | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| from app.core.config import settings | ||||||||||||||||||||||||||
| from app.kafka.consumer import BaseConsumer | ||||||||||||||||||||||||||
| from app.kafka.producer import KafkaProducer | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| class KafkaManager: | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| Singleton manager responsible for Kafka producer and consumer lifecycles. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Usage: | ||||||||||||||||||||||||||
| manager = get_kafka_manager() | ||||||||||||||||||||||||||
| manager.register_consumer(MyEmailConsumer()) | ||||||||||||||||||||||||||
| await manager.start() # called from FastAPI lifespan | ||||||||||||||||||||||||||
| await manager.stop() # called from FastAPI lifespan | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| _instance: Optional["KafkaManager"] = None | ||||||||||||||||||||||||||
| _initialized: bool = False | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def __new__(cls) -> "KafkaManager": | ||||||||||||||||||||||||||
| if cls._instance is None: | ||||||||||||||||||||||||||
| cls._instance = super().__new__(cls) | ||||||||||||||||||||||||||
| cls._instance._initialized = False | ||||||||||||||||||||||||||
| return cls._instance | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def __init__(self) -> None: | ||||||||||||||||||||||||||
| if self._initialized: | ||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| self.producer = KafkaProducer( | ||||||||||||||||||||||||||
| bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| self.consumers: list[BaseConsumer] = [] | ||||||||||||||||||||||||||
| self._initialized = True | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def register_consumer(self, consumer: BaseConsumer) -> None: | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| Register a consumer to be started when the manager starts. | ||||||||||||||||||||||||||
| The producer is injected into the consumer at this point so it | ||||||||||||||||||||||||||
| can access it for DLQ forwarding without a circular import. | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| consumer._producer = self.producer | ||||||||||||||||||||||||||
| self.consumers.append(consumer) | ||||||||||||||||||||||||||
| logger.info(f"Registered consumer for topic: '{consumer.topic}'") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async def start(self) -> None: | ||||||||||||||||||||||||||
| """Start the producer, then all registered consumers.""" | ||||||||||||||||||||||||||
| logger.info("Starting Kafka Manager...") | ||||||||||||||||||||||||||
| await self.producer.start() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for consumer in self.consumers: | ||||||||||||||||||||||||||
| # Pass bootstrap_servers at start-time — consumers don't store it | ||||||||||||||||||||||||||
| await consumer.start(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||
| f"Kafka Manager started — {len(self.consumers)} consumer(s) running" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async def stop(self) -> None: | ||||||||||||||||||||||||||
| """Stop all consumers first, then the producer.""" | ||||||||||||||||||||||||||
| logger.info("Stopping Kafka Manager...") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for consumer in self.consumers: | ||||||||||||||||||||||||||
| await consumer.stop() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| await self.producer.stop() | ||||||||||||||||||||||||||
| logger.info("Kafka Manager stopped") | ||||||||||||||||||||||||||
|
Comment on lines
+51
to
+72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rollback partial startup and keep shutdown cleanup going. If one 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async def health_check(self) -> dict: | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| Verify Kafka broker connectivity via a metadata probe. | ||||||||||||||||||||||||||
| Uses the public producer.ping() API — no private attribute access. | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| if not self.producer.is_started: | ||||||||||||||||||||||||||
| return {"status": "uninitialized", "details": "Producer not started"} | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| await self.producer.ping() | ||||||||||||||||||||||||||
| return {"status": "healthy"} | ||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||
| logger.error(f"Kafka health check failed: {e}") | ||||||||||||||||||||||||||
| return {"status": "unhealthy", "error": str(e)} | ||||||||||||||||||||||||||
|
Comment on lines
+82
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't leak broker errors through
Suggested fix- except Exception as e:
- logger.error(f"Kafka health check failed: {e}")
- return {"status": "unhealthy", "error": str(e)}
+ except Exception:
+ logger.exception("Kafka health check failed")
+ return {"status": "unhealthy"}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def get_kafka_manager() -> KafkaManager: | ||||||||||||||||||||||||||
| """Return the KafkaManager singleton.""" | ||||||||||||||||||||||||||
| return KafkaManager() | ||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only set
_runningafter the consumer actually starts.If
await self._consumer.start()fails,_runningstaysTrueand the guard at Line 59 turns every later retry into a no-op. That leaves the consumer wedged until the process is restarted.Suggested fix
🤖 Prompt for AI Agents