From 6922300f0f9f297acfa5895aa328d794c0cee6d9 Mon Sep 17 00:00:00 2001 From: aniebietafia Date: Sat, 14 Mar 2026 21:12:28 +0100 Subject: [PATCH 1/3] feat: Implement & harden Kafka event bus architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add app/kafka/ package — topics.py, schemas.py, producer.py, consumer.py, manager.py, exceptions.py - Integrate KafkaManager singleton lifecycle into FastAPI lifespan (app/main.py) - Extend /health endpoint with live Kafka broker connectivity check - Add new Kafka settings to app/core/config.py: acks, auto-offset-reset, retries, backoff - Inject KafkaProducer into BaseConsumer via KafkaManager.register_consumer() — eliminates circular import - Replace payload-mutating DLQ with a proper DLQEvent wrapper schema - Clean up retry logic from fragile while loop to explicit for attempt in range(...) - Add public ping() and is_started to KafkaProducer; health check no longer accesses private aiokafka internals - Declare _initialized: bool at class level in KafkaManager for Mypy compliance - Add tests/test_kafka/ — schema serialization, producer send/ping, consumer retry & DLQ routing Signed-off-by: aniebietafia --- app/core/config.py | 9 ++ app/kafka/__init__.py | 5 + app/kafka/consumer.py | 180 ++++++++++++++++++++++++++++++ app/kafka/exceptions.py | 23 ++++ app/kafka/manager.py | 92 +++++++++++++++ app/kafka/producer.py | 76 +++++++++++++ app/kafka/schemas.py | 54 +++++++++ app/kafka/topics.py | 19 ++++ app/main.py | 28 ++++- docker-compose.yml | 73 ++++++++++++ infra/docker-compose.yml | 43 ------- tests/test_error_handling.py | 5 +- tests/test_kafka/test_consumer.py | 109 ++++++++++++++++++ tests/test_kafka/test_producer.py | 61 ++++++++++ tests/test_kafka/test_schemas.py | 39 +++++++ tests/test_main.py | 5 +- 16 files changed, 774 insertions(+), 47 deletions(-) create mode 100644 app/kafka/__init__.py create mode 100644 app/kafka/consumer.py create mode 100644 app/kafka/exceptions.py create mode 100644 app/kafka/manager.py create mode 100644 app/kafka/producer.py create mode 100644 app/kafka/schemas.py create mode 100644 app/kafka/topics.py create mode 100644 docker-compose.yml delete mode 100644 infra/docker-compose.yml create mode 100644 tests/test_kafka/test_consumer.py create mode 100644 tests/test_kafka/test_producer.py create mode 100644 tests/test_kafka/test_schemas.py diff --git a/app/core/config.py b/app/core/config.py index 1b4f5a4..830d426 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -36,6 +36,10 @@ class Settings(BaseSettings): # Kafka KAFKA_BOOTSTRAP_SERVERS: str = "localhost:9092" + KAFKA_PRODUCER_ACK: str = "all" + KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest" + KAFKA_MAX_RETRIES: int = 3 + KAFKA_RETRY_BACKOFF_MS: int = 1000 # External Services Keys DEEPGRAM_API_KEY: str | None = None @@ -43,6 +47,11 @@ class Settings(BaseSettings): VOICE_AI_API_KEY: str | None = None OPENAI_API_KEY: str | None = None + # Mailgun Email Service + MAILGUN_API_KEY: str | None = None + MAILGUN_DOMAIN: str | None = None + MAILGUN_FROM_EMAIL: str | None = None + model_config = SettingsConfigDict( env_file=".env", case_sensitive=True, extra="ignore" ) diff --git a/app/kafka/__init__.py b/app/kafka/__init__.py new file mode 100644 index 0000000..71ee426 --- /dev/null +++ b/app/kafka/__init__.py @@ -0,0 +1,5 @@ +from app.kafka.manager import KafkaManager, get_kafka_manager +from app.kafka.producer import KafkaProducer +from app.kafka.schemas import BaseEvent + +__all__ = ["BaseEvent", "KafkaManager", "KafkaProducer", "get_kafka_manager"] diff --git a/app/kafka/consumer.py b/app/kafka/consumer.py new file mode 100644 index 0000000..39dd75a --- /dev/null +++ b/app/kafka/consumer.py @@ -0,0 +1,180 @@ +import abc +import asyncio +import contextlib +import json +import logging +from typing import Any + +from aiokafka import AIOKafkaConsumer + +from app.core.config import settings +from app.kafka.schemas import BaseEvent, DLQEvent +from app.kafka.topics import DLQ_PREFIX + +logger = logging.getLogger(__name__) + + +class BaseConsumer(abc.ABC): + """ + Abstract base class for all Kafka consumers. + + Subclasses must declare class-level attributes: + topic: str — the Kafka topic to subscribe to + group_id: str — the consumer group identifier + event_schema: Type — the Pydantic BaseEvent subclass for deserialization + + Features: + - Manual offset commits (offsets only committed after successful handle()) + - Configurable linear backoff retry with KAFKA_MAX_RETRIES + - Dead-letter queue (DLQ) forwarding via a proper DLQEvent wrapper + - Graceful shutdown via asyncio.Task cancellation + """ + + topic: str + group_id: str + event_schema: type[BaseEvent[Any]] + + # Declared here so Mypy can track it on the class body + _initialized: bool = False + + def __init__(self, producer: Any) -> None: + """ + Args: + producer: A KafkaProducer instance injected by KafkaManager. + Used to forward failed events to the DLQ. + """ + # Import here to avoid a circular module-level import + from app.kafka.producer import KafkaProducer + + self._producer: KafkaProducer = producer + self._consumer: AIOKafkaConsumer | None = None + self._running: bool = False + self._task: asyncio.Task[None] | None = None + + async def start(self, bootstrap_servers: str) -> None: + """ + Start the consumer background task. + Called by KafkaManager, which supplies the bootstrap_servers string. + """ + if self._running: + return + + self._running = True + self._consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=bootstrap_servers, + group_id=self.group_id, + auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET, + # Manual commit: offsets are committed only after handle() succeeds, + # preventing silent message loss on pod restart mid-retry. + enable_auto_commit=False, + value_deserializer=lambda v: json.loads(v.decode("utf-8")), + ) + await self._consumer.start() + logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started") + + self._task = asyncio.create_task(self._consume_loop()) + + async def stop(self) -> None: + """Stop the consumer background task gracefully.""" + self._running = False + if self._task: + self._task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._task + + if self._consumer: + await self._consumer.stop() + self._consumer = None + + logger.info(f"Consumer for '{self.topic}' stopped") + + async def _consume_loop(self) -> None: + """Main consumption loop.""" + if not self._consumer: + return + + try: + async for msg in self._consumer: + if not self._running: + break + + try: + event = self.event_schema.model_validate(msg.value) + await self._process_with_retry(event) + # Only commit after successful processing + await self._consumer.commit() + except Exception: + logger.exception( + f"Unrecoverable error on message from '{self.topic}'. " + f"Skipping commit — offset will be re-delivered on restart." + ) + except asyncio.CancelledError: + pass + except Exception: + logger.exception(f"Unexpected error in consumer loop for '{self.topic}'") + + async def _process_with_retry(self, event: BaseEvent[Any]) -> None: + """ + Process an event with linear backoff retries. + After exhausting all retries, routes the event to the DLQ. + """ + last_error: Exception | None = None + + for attempt in range(settings.KAFKA_MAX_RETRIES + 1): + try: + await self.handle(event) + return # Success + except Exception as e: + last_error = e + if attempt < settings.KAFKA_MAX_RETRIES: + wait_secs = (settings.KAFKA_RETRY_BACKOFF_MS / 1000) * (attempt + 1) + logger.warning( + f"Retry {attempt + 1}/{settings.KAFKA_MAX_RETRIES} " + f"for event {event.event_id} in {wait_secs:.1f}s. " + f"Reason: {e}" + ) + await asyncio.sleep(wait_secs) + + logger.error( + f"Event {event.event_id} failed after " + f"{settings.KAFKA_MAX_RETRIES} retries. Routing to DLQ." + ) + await self._send_to_dlq( + event, str(last_error), retries=settings.KAFKA_MAX_RETRIES + ) + + async def _send_to_dlq( + self, event: BaseEvent[Any], error_message: str, retries: int + ) -> None: + """ + Forward a failed event to its Dead Letter Queue topic. + Wraps it in a DLQEvent — a proper structured schema — instead of + mutating the original event payload. + """ + dlq_topic = f"{DLQ_PREFIX}{self.topic}" + dlq_event = DLQEvent( + original_event_id=event.event_id, + original_topic=self.topic, + original_event=event.model_dump(), + error_message=error_message, + retry_count=retries, + ) + + try: + # Use the injected producer directly — no circular import needed + dlq_payload = dlq_event.model_dump() + await self._producer._producer.send_and_wait( # type: ignore[union-attr] + dlq_topic, + value=json.dumps(dlq_payload, default=str).encode("utf-8"), + ) + logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'") + except Exception: + logger.exception( + f"CRITICAL: Failed to forward event {event.event_id} " + f"to '{dlq_topic}'. Event is permanently lost." + ) + + @abc.abstractmethod + async def handle(self, event: BaseEvent[Any]) -> None: + """Implement message processing logic in subclasses.""" diff --git a/app/kafka/exceptions.py b/app/kafka/exceptions.py new file mode 100644 index 0000000..23a9ee5 --- /dev/null +++ b/app/kafka/exceptions.py @@ -0,0 +1,23 @@ +from app.core.exceptions import FluentMeetException + + +class KafkaError(FluentMeetException): + """Base exception for Kafka-related errors.""" + + def __init__(self, message: str, code: str = "KAFKA_ERROR") -> None: + super().__init__(status_code=500, code=code, message=message) + + +class KafkaConnectionError(KafkaError): + def __init__(self, message: str = "Failed to connect to Kafka broker") -> None: + super().__init__(message, code="KAFKA_CONNECTION_ERROR") + + +class KafkaPublishError(KafkaError): + def __init__(self, message: str = "Failed to publish message to Kafka") -> None: + super().__init__(message, code="KAFKA_PUBLISH_ERROR") + + +class KafkaConsumeError(KafkaError): + def __init__(self, message: str = "Failed to consume message from Kafka") -> None: + super().__init__(message, code="KAFKA_CONSUME_ERROR") diff --git a/app/kafka/manager.py b/app/kafka/manager.py new file mode 100644 index 0000000..5d8795f --- /dev/null +++ b/app/kafka/manager.py @@ -0,0 +1,92 @@ +import logging +from typing import Optional + +from app.core.config import settings +from app.kafka.consumer import BaseConsumer +from app.kafka.producer import KafkaProducer + +logger = logging.getLogger(__name__) + + +class KafkaManager: + """ + Singleton manager responsible for Kafka producer and consumer lifecycles. + + Usage: + manager = get_kafka_manager() + manager.register_consumer(MyEmailConsumer()) + await manager.start() # called from FastAPI lifespan + await manager.stop() # called from FastAPI lifespan + """ + + _instance: Optional["KafkaManager"] = None + _initialized: bool = False + + def __new__(cls) -> "KafkaManager": + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + + self.producer = KafkaProducer( + bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS + ) + self.consumers: list[BaseConsumer] = [] + self._initialized = True + + def register_consumer(self, consumer: BaseConsumer) -> None: + """ + Register a consumer to be started when the manager starts. + The producer is injected into the consumer at this point so it + can access it for DLQ forwarding without a circular import. + """ + consumer._producer = self.producer + self.consumers.append(consumer) + logger.info(f"Registered consumer for topic: '{consumer.topic}'") + + async def start(self) -> None: + """Start the producer, then all registered consumers.""" + logger.info("Starting Kafka Manager...") + await self.producer.start() + + for consumer in self.consumers: + # Pass bootstrap_servers at start-time — consumers don't store it + await consumer.start(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS) + + logger.info( + f"Kafka Manager started — {len(self.consumers)} consumer(s) running" + ) + + async def stop(self) -> None: + """Stop all consumers first, then the producer.""" + logger.info("Stopping Kafka Manager...") + + for consumer in self.consumers: + await consumer.stop() + + await self.producer.stop() + logger.info("Kafka Manager stopped") + + async def health_check(self) -> dict: + """ + Verify Kafka broker connectivity via a metadata probe. + Uses the public producer.ping() API — no private attribute access. + """ + if not self.producer.is_started: + return {"status": "uninitialized", "details": "Producer not started"} + + try: + await self.producer.ping() + return {"status": "healthy"} + except Exception as e: + logger.error(f"Kafka health check failed: {e}") + return {"status": "unhealthy", "error": str(e)} + + +def get_kafka_manager() -> KafkaManager: + """Return the KafkaManager singleton.""" + return KafkaManager() diff --git a/app/kafka/producer.py b/app/kafka/producer.py new file mode 100644 index 0000000..17caf2b --- /dev/null +++ b/app/kafka/producer.py @@ -0,0 +1,76 @@ +import json +import logging +from typing import Any + +from aiokafka import AIOKafkaProducer + +from app.core.config import settings +from app.kafka.exceptions import KafkaPublishError +from app.kafka.schemas import BaseEvent + +logger = logging.getLogger(__name__) + + +class KafkaProducer: + """ + Wrapper around AIOKafkaProducer with Pydantic serialization. + """ + + def __init__(self, bootstrap_servers: str): + self._producer: AIOKafkaProducer | None = None + self._bootstrap_servers = bootstrap_servers + + @property + def is_started(self) -> bool: + """Returns True if the underlying producer is running.""" + return self._producer is not None + + async def start(self) -> None: + """Start the Kafka producer.""" + if self._producer is not None: + return + + self._producer = AIOKafkaProducer( + bootstrap_servers=self._bootstrap_servers, + acks=settings.KAFKA_PRODUCER_ACK, + value_serializer=lambda v: json.dumps(v, default=str).encode("utf-8"), + ) + await self._producer.start() + logger.info("Kafka producer started") + + async def stop(self) -> None: + """Stop the Kafka producer.""" + if self._producer: + await self._producer.stop() + self._producer = None + logger.info("Kafka producer stopped") + + async def ping(self) -> None: + """ + Verify broker connectivity by fetching cluster metadata. + Raises an exception if the broker is unreachable. + """ + if not self._producer: + raise KafkaPublishError("Kafka producer is not started") + await self._producer.client.force_metadata_update() + + async def send( + self, topic: str, event: BaseEvent[Any], key: str | None = None + ) -> None: + """ + Serialize and send an event to a Kafka topic. + """ + if not self._producer: + raise KafkaPublishError("Kafka producer is not started") + + try: + message_dict = event.model_dump() + await self._producer.send_and_wait( + topic, value=message_dict, key=key.encode("utf-8") if key else None + ) + logger.debug(f"Event {event.event_id} sent to topic {topic}") + except KafkaPublishError: + raise + except Exception as e: + logger.exception(f"Failed to publish event to {topic}") + raise KafkaPublishError(f"Error publishing to {topic}: {e!s}") from e diff --git a/app/kafka/schemas.py b/app/kafka/schemas.py new file mode 100644 index 0000000..179f053 --- /dev/null +++ b/app/kafka/schemas.py @@ -0,0 +1,54 @@ +import uuid +from datetime import datetime +from typing import Any, Generic, TypeVar + +from pydantic import BaseModel, Field + +T = TypeVar("T") + + +class BaseEvent(BaseModel, Generic[T]): + """ + Base class for all Kafka events. + """ + + event_id: uuid.UUID = Field(default_factory=uuid.uuid4) + event_type: str + timestamp: datetime = Field(default_factory=datetime.utcnow) + payload: T + + +class DLQEvent(BaseModel): + """ + Wrapper for events that failed processing and were routed to a Dead Letter Queue. + Captures the original event alongside structured failure metadata. + """ + + original_event_id: uuid.UUID + original_topic: str + original_event: dict[str, Any] + error_message: str + failed_at: datetime = Field(default_factory=datetime.utcnow) + retry_count: int + + +class EmailPayload(BaseModel): + to_email: str + subject: str + template_name: str + template_data: dict[str, Any] = {} + + +class EmailEvent(BaseEvent[EmailPayload]): + event_type: str = "email.dispatch" + + +class MediaUploadPayload(BaseModel): + user_id: int + file_path: str + file_type: str # e.g., 'avatar', 'recording' + metadata: dict[str, Any] = {} + + +class MediaUploadEvent(BaseEvent[MediaUploadPayload]): + event_type: str = "media.upload" diff --git a/app/kafka/topics.py b/app/kafka/topics.py new file mode 100644 index 0000000..4b16d17 --- /dev/null +++ b/app/kafka/topics.py @@ -0,0 +1,19 @@ +from typing import Final + +# Email notification topics +NOTIFICATIONS_EMAIL: Final = "notifications.email" + +# Media processing topics +MEDIA_UPLOAD: Final = "media.upload" +MEDIA_PROCESS_RECORDING: Final = "media.process_recording" + +# Real-time audio pipeline topics +AUDIO_RAW: Final = "audio.raw" +AUDIO_SYNTHESIZED: Final = "audio.synthesized" + +# Real-time text pipeline topics +TEXT_ORIGINAL: Final = "text.original" +TEXT_TRANSLATED: Final = "text.translated" + +# Dead-letter topics +DLQ_PREFIX: Final = "dlq." diff --git a/app/main.py b/app/main.py index d7e5ca4..ee88476 100644 --- a/app/main.py +++ b/app/main.py @@ -1,13 +1,28 @@ +from contextlib import asynccontextmanager +from typing import AsyncGenerator from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.core.config import settings from app.core.exception_handlers import register_exception_handlers +from app.kafka.manager import get_kafka_manager + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + # Startup + kafka_manager = get_kafka_manager() + await kafka_manager.start() + yield + # Shutdown + await kafka_manager.stop() + app = FastAPI( title=settings.PROJECT_NAME, description="Real-time voice translation video conferencing platform API", version=settings.VERSION, + lifespan=lifespan, ) # Set all CORS enabled origins @@ -23,8 +38,17 @@ @app.get("/health", tags=["health"]) -async def health_check() -> dict[str, str]: - return {"status": "ok", "version": settings.VERSION} +async def health_check() -> dict: + kafka_manager = get_kafka_manager() + kafka_health = await kafka_manager.health_check() + + return { + "status": "ok" if kafka_health["status"] == "healthy" else "degraded", + "version": settings.VERSION, + "services": { + "kafka": kafka_health, + }, + } if __name__ == "__main__": diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..49cb322 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,73 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + container_name: fluentmeet-redis + restart: always + ports: + - "6379:6379" + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - fluentmeet-net + + kafka: + image: apache/kafka:3.7.0 + container_name: fluentmeet-kafka + restart: always + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + volumes: + - kafka_data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092"] + interval: 15s + timeout: 10s + retries: 5 + start_period: 30s + networks: + - fluentmeet-net + + kafka-ui: + image: provectuslabs/kafka-ui:v0.7.2 + container_name: fluentmeet-kafka-ui + restart: always + ports: + - "8090:8080" + depends_on: + kafka: + condition: service_healthy + environment: + - KAFKA_CLUSTERS_0_NAME=local-kraft + - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092 + - DYNAMIC_CONFIG_ENABLED=true + networks: + - fluentmeet-net + +volumes: + postgres_data: + redis_data: + kafka_data: + +networks: + fluentmeet-net: + driver: bridge diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml deleted file mode 100644 index 88f5889..0000000 --- a/infra/docker-compose.yml +++ /dev/null @@ -1,43 +0,0 @@ -version: '3.8' - -services: - db: - image: postgres:15-alpine - container_name: fluentmeet-db - restart: always - environment: - - POSTGRES_USER=postgres - - POSTGRES_PASSWORD=postgres - - POSTGRES_DB=fluentmeet - ports: - - "5432:5432" - volumes: - - postgres_data:/var/lib/postgresql/data - - redis: - image: redis:7-alpine - container_name: fluentmeet-redis - restart: always - ports: - - "6379:6379" - - zookeeper: - image: confluentinc/cp-zookeeper:7.4.0 - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - - kafka: - image: confluentinc/cp-kafka:7.4.0 - depends_on: - - zookeeper - ports: - - "9092:9092" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - -volumes: - postgres_data: diff --git a/tests/test_error_handling.py b/tests/test_error_handling.py index 63e3323..8634e6e 100644 --- a/tests/test_error_handling.py +++ b/tests/test_error_handling.py @@ -132,4 +132,7 @@ def test_unhandled_exception_handler(): def test_health_check_remains_ok(): response = client.get("/health") assert response.status_code == 200 - assert response.json() == {"status": "ok", "version": settings.VERSION} + data = response.json() + assert data["status"] in {"ok", "degraded"} # depends on whether Kafka is running + assert data["version"] == settings.VERSION + assert "services" in data diff --git a/tests/test_kafka/test_consumer.py b/tests/test_kafka/test_consumer.py new file mode 100644 index 0000000..d7cf40f --- /dev/null +++ b/tests/test_kafka/test_consumer.py @@ -0,0 +1,109 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.kafka.consumer import BaseConsumer +from app.kafka.schemas import BaseEvent + + +class MockEvent(BaseEvent[dict]): + event_type: str = "test.mock" + + +class TestConsumer(BaseConsumer): + topic = "test-topic" + group_id = "test-group" + event_schema = MockEvent + + async def handle(self, event: BaseEvent) -> None: # type: ignore[override] + pass # overridden per test via mock + + +@pytest.fixture +def mock_producer(): + producer = MagicMock() + producer.is_started = True + producer._producer = AsyncMock() + producer._producer.send_and_wait = AsyncMock() + return producer + + +@pytest.fixture +def consumer(mock_producer): + return TestConsumer(producer=mock_producer) + + +@pytest.mark.asyncio +async def test_consumer_success_on_first_try(consumer): + """handle() succeeds on first attempt — no retries, no DLQ.""" + consumer.handle = AsyncMock() + event = MockEvent(payload={"data": "test"}) + + with patch("app.kafka.consumer.settings") as mock_settings: + mock_settings.KAFKA_MAX_RETRIES = 2 + mock_settings.KAFKA_RETRY_BACKOFF_MS = 1 + + await consumer._process_with_retry(event) + + consumer.handle.assert_awaited_once_with(event) + + +@pytest.mark.asyncio +async def test_consumer_retries_then_succeeds(consumer): + """handle() fails twice then succeeds on third attempt.""" + call_count = 0 + + async def flaky_handle(event): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ValueError("transient error") + + consumer.handle = flaky_handle + + with patch("app.kafka.consumer.settings") as mock_settings: + mock_settings.KAFKA_MAX_RETRIES = 3 + mock_settings.KAFKA_RETRY_BACKOFF_MS = 1 + + await consumer._process_with_retry(MockEvent(payload={})) + + assert call_count == 3 + + +@pytest.mark.asyncio +async def test_consumer_routes_to_dlq_after_exhausted_retries(consumer): + """After all retries fail, the event is sent to DLQ as a DLQEvent wrapper.""" + consumer.handle = AsyncMock(side_effect=Exception("permanent failure")) + consumer._send_to_dlq = AsyncMock() + event = MockEvent(payload={"data": "test"}) + + with patch("app.kafka.consumer.settings") as mock_settings: + mock_settings.KAFKA_MAX_RETRIES = 2 + mock_settings.KAFKA_RETRY_BACKOFF_MS = 1 + + await consumer._process_with_retry(event) + + # handle should be called MAX_RETRIES + 1 times (initial + retries) + assert consumer.handle.await_count == 3 + consumer._send_to_dlq.assert_awaited_once() + # Verify DLQ was called with the correct event and retry count + _, kwargs = consumer._send_to_dlq.call_args + assert kwargs.get("retries") == 2 or consumer._send_to_dlq.call_args.args[2] == 2 + + +@pytest.mark.asyncio +async def test_dlq_wraps_event_correctly(consumer, mock_producer): + """_send_to_dlq creates a proper DLQEvent — no payload mutation.""" + event = MockEvent(payload={"key": "value"}) + + await consumer._send_to_dlq(event, "test error", retries=3) + + # DLQ is sent via the injected producer's internal send_and_wait + mock_producer._producer.send_and_wait.assert_awaited_once() + call_args = mock_producer._producer.send_and_wait.call_args + # First positional arg should be the DLQ topic + assert call_args.args[0] == f"dlq.{consumer.topic}" + + # Verify original event payload was not mutated + assert not hasattr(event.payload, "error") diff --git a/tests/test_kafka/test_producer.py b/tests/test_kafka/test_producer.py new file mode 100644 index 0000000..fb52b20 --- /dev/null +++ b/tests/test_kafka/test_producer.py @@ -0,0 +1,61 @@ +from unittest.mock import AsyncMock, patch + +import pytest + +from app.kafka.producer import KafkaProducer +from app.kafka.schemas import BaseEvent + + +@pytest.mark.asyncio +async def test_producer_send_serializes_event(): + """send() correctly serializes a Pydantic event and routes it to the topic.""" + with patch("app.kafka.producer.AIOKafkaProducer") as mock_cls: + mock_internal = AsyncMock() + mock_cls.return_value = mock_internal + + producer = KafkaProducer(bootstrap_servers="localhost:9092") + await producer.start() + + assert producer.is_started + + event = BaseEvent(event_type="test.event", payload={"foo": "bar"}) + await producer.send("test-topic", event) + + mock_internal.send_and_wait.assert_awaited_once() + args, kwargs = mock_internal.send_and_wait.call_args + assert args[0] == "test-topic" + assert kwargs["value"]["event_type"] == "test.event" + assert kwargs["value"]["payload"] == {"foo": "bar"} + + await producer.stop() + assert not producer.is_started + mock_internal.stop.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_producer_ping_calls_metadata_update(): + """ping() delegates to force_metadata_update on the aiokafka client.""" + with patch("app.kafka.producer.AIOKafkaProducer") as mock_cls: + mock_internal = AsyncMock() + mock_internal.client = AsyncMock() + mock_cls.return_value = mock_internal + + producer = KafkaProducer(bootstrap_servers="localhost:9092") + await producer.start() + await producer.ping() + + mock_internal.client.force_metadata_update.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_producer_not_started_raises(): + """send() and ping() raise KafkaPublishError if the producer hasn't started.""" + from app.kafka.exceptions import KafkaPublishError + + producer = KafkaProducer(bootstrap_servers="localhost:9092") + + with pytest.raises(KafkaPublishError): + await producer.send("any-topic", BaseEvent(event_type="e", payload={})) + + with pytest.raises(KafkaPublishError): + await producer.ping() diff --git a/tests/test_kafka/test_schemas.py b/tests/test_kafka/test_schemas.py new file mode 100644 index 0000000..03512c3 --- /dev/null +++ b/tests/test_kafka/test_schemas.py @@ -0,0 +1,39 @@ +import uuid +from datetime import datetime + +import pytest +from app.kafka.schemas import BaseEvent, EmailEvent, EmailPayload + + +def test_base_event_serialization(): + payload = {"key": "value"} + event = BaseEvent(event_type="test.event", payload=payload) + + assert isinstance(event.event_id, uuid.UUID) + assert isinstance(event.timestamp, datetime) + assert event.event_type == "test.event" + assert event.payload == payload + + # Test JSON serialization via Pydantic + dump = event.model_dump() + assert dump["event_type"] == "test.event" + assert dump["payload"] == payload + + +def test_email_event_validation(): + payload = EmailPayload( + to_email="test@example.com", + subject="Hello", + template_name="welcome", + template_data={"name": "User"}, + ) + event = EmailEvent(payload=payload) + + assert event.event_type == "email.dispatch" + assert event.payload.to_email == "test@example.com" + + # Test model_validate + event_dict = event.model_dump() + validated_event = EmailEvent.model_validate(event_dict) + assert validated_event.event_id == event.event_id + assert validated_event.payload.to_email == "test@example.com" diff --git a/tests/test_main.py b/tests/test_main.py index 21e5916..abfade6 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,4 +9,7 @@ def test_health_check(): response = client.get("/health") assert response.status_code == 200 - assert response.json() == {"status": "ok", "version": settings.VERSION} + data = response.json() + assert data["status"] in {"ok", "degraded"} # depends on whether Kafka is running + assert data["version"] == settings.VERSION + assert "services" in data From e90ac9bc5b4d6c967f694a218341db4657db8d23 Mon Sep 17 00:00:00 2001 From: aniebietafia Date: Sat, 14 Mar 2026 21:24:25 +0100 Subject: [PATCH 2/3] fix: Import and linting errors Signed-off-by: aniebietafia --- app/main.py | 5 +++-- tests/test_kafka/test_consumer.py | 3 +-- tests/test_kafka/test_schemas.py | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/app/main.py b/app/main.py index ee88476..77ca98b 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ +from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -from typing import AsyncGenerator + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -9,7 +10,7 @@ @asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: +async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # Startup kafka_manager = get_kafka_manager() await kafka_manager.start() diff --git a/tests/test_kafka/test_consumer.py b/tests/test_kafka/test_consumer.py index d7cf40f..9d96be6 100644 --- a/tests/test_kafka/test_consumer.py +++ b/tests/test_kafka/test_consumer.py @@ -1,4 +1,3 @@ -import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -54,7 +53,7 @@ async def test_consumer_retries_then_succeeds(consumer): """handle() fails twice then succeeds on third attempt.""" call_count = 0 - async def flaky_handle(event): + async def flaky_handle(_event): nonlocal call_count call_count += 1 if call_count < 3: diff --git a/tests/test_kafka/test_schemas.py b/tests/test_kafka/test_schemas.py index 03512c3..1c3cb90 100644 --- a/tests/test_kafka/test_schemas.py +++ b/tests/test_kafka/test_schemas.py @@ -1,7 +1,6 @@ import uuid from datetime import datetime -import pytest from app.kafka.schemas import BaseEvent, EmailEvent, EmailPayload From f573d42b1e04ae9d1e97a61d9287c35a85d7fea0 Mon Sep 17 00:00:00 2001 From: aniebietafia Date: Sat, 14 Mar 2026 21:32:19 +0100 Subject: [PATCH 3/3] fix: Import and linting errors Signed-off-by: aniebietafia --- app/kafka/consumer.py | 2 +- app/kafka/producer.py | 2 +- mypy_output.txt | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) create mode 100644 mypy_output.txt diff --git a/app/kafka/consumer.py b/app/kafka/consumer.py index 39dd75a..f68b73b 100644 --- a/app/kafka/consumer.py +++ b/app/kafka/consumer.py @@ -5,7 +5,7 @@ import logging from typing import Any -from aiokafka import AIOKafkaConsumer +from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped] from app.core.config import settings from app.kafka.schemas import BaseEvent, DLQEvent diff --git a/app/kafka/producer.py b/app/kafka/producer.py index 17caf2b..b2d4a43 100644 --- a/app/kafka/producer.py +++ b/app/kafka/producer.py @@ -2,7 +2,7 @@ import logging from typing import Any -from aiokafka import AIOKafkaProducer +from aiokafka import AIOKafkaProducer # type: ignore[import-untyped] from app.core.config import settings from app.kafka.exceptions import KafkaPublishError diff --git a/mypy_output.txt b/mypy_output.txt new file mode 100644 index 0000000..d747b33 --- /dev/null +++ b/mypy_output.txt @@ -0,0 +1,4 @@ +app\kafka\producer.py:5: error: Skipping analyzing "aiokafka": module is installed, but missing library stubs or py.typed marker [import-untyped] +app\kafka\consumer.py:8: error: Skipping analyzing "aiokafka": module is installed, but missing library stubs or py.typed marker [import-untyped] +app\kafka\consumer.py:8: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports +Found 2 errors in 2 files (checked 23 source files)