Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ class Settings(BaseSettings):

# Kafka
KAFKA_BOOTSTRAP_SERVERS: str = "localhost:9092"
KAFKA_PRODUCER_ACK: str = "all"
KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest"
KAFKA_MAX_RETRIES: int = 3
KAFKA_RETRY_BACKOFF_MS: int = 1000

# External Services Keys
DEEPGRAM_API_KEY: str | None = None
DEEPL_API_KEY: str | None = None
VOICE_AI_API_KEY: str | None = None
OPENAI_API_KEY: str | None = None

# Mailgun Email Service
MAILGUN_API_KEY: str | None = None
MAILGUN_DOMAIN: str | None = None
MAILGUN_FROM_EMAIL: str | None = None

model_config = SettingsConfigDict(
env_file=".env", case_sensitive=True, extra="ignore"
)
Expand Down
5 changes: 5 additions & 0 deletions app/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from app.kafka.manager import KafkaManager, get_kafka_manager
from app.kafka.producer import KafkaProducer
from app.kafka.schemas import BaseEvent

__all__ = ["BaseEvent", "KafkaManager", "KafkaProducer", "get_kafka_manager"]
180 changes: 180 additions & 0 deletions app/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import abc
import asyncio
import contextlib
import json
import logging
from typing import Any

from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped]

from app.core.config import settings
from app.kafka.schemas import BaseEvent, DLQEvent
from app.kafka.topics import DLQ_PREFIX

logger = logging.getLogger(__name__)


class BaseConsumer(abc.ABC):
"""
Abstract base class for all Kafka consumers.

Subclasses must declare class-level attributes:
topic: str — the Kafka topic to subscribe to
group_id: str — the consumer group identifier
event_schema: Type — the Pydantic BaseEvent subclass for deserialization

Features:
- Manual offset commits (offsets only committed after successful handle())
- Configurable linear backoff retry with KAFKA_MAX_RETRIES
- Dead-letter queue (DLQ) forwarding via a proper DLQEvent wrapper
- Graceful shutdown via asyncio.Task cancellation
"""

topic: str
group_id: str
event_schema: type[BaseEvent[Any]]

# Declared here so Mypy can track it on the class body
_initialized: bool = False

def __init__(self, producer: Any) -> None:
"""
Args:
producer: A KafkaProducer instance injected by KafkaManager.
Used to forward failed events to the DLQ.
"""
# Import here to avoid a circular module-level import
from app.kafka.producer import KafkaProducer

self._producer: KafkaProducer = producer
self._consumer: AIOKafkaConsumer | None = None
self._running: bool = False
self._task: asyncio.Task[None] | None = None

async def start(self, bootstrap_servers: str) -> None:
"""
Start the consumer background task.
Called by KafkaManager, which supplies the bootstrap_servers string.
"""
if self._running:
return

self._running = True
self._consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=bootstrap_servers,
group_id=self.group_id,
auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
# Manual commit: offsets are committed only after handle() succeeds,
# preventing silent message loss on pod restart mid-retry.
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
await self._consumer.start()
logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started")

self._task = asyncio.create_task(self._consume_loop())
Comment on lines +59 to +76
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only set _running after the consumer actually starts.

If await self._consumer.start() fails, _running stays True and the guard at Line 59 turns every later retry into a no-op. That leaves the consumer wedged until the process is restarted.

Suggested fix
-        self._running = True
-        self._consumer = AIOKafkaConsumer(
+        consumer = AIOKafkaConsumer(
             self.topic,
             bootstrap_servers=bootstrap_servers,
             group_id=self.group_id,
             auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
             # Manual commit: offsets are committed only after handle() succeeds,
             # preventing silent message loss on pod restart mid-retry.
             enable_auto_commit=False,
             value_deserializer=lambda v: json.loads(v.decode("utf-8")),
         )
-        await self._consumer.start()
+        try:
+            await consumer.start()
+        except Exception:
+            with contextlib.suppress(Exception):
+                await consumer.stop()
+            raise
+        self._consumer = consumer
+        self._running = True
         logger.info(f"Consumer for '{self.topic}' (group: '{self.group_id}') started")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/consumer.py` around lines 59 - 76, The guard sets self._running =
True before awaiting self._consumer.start(), which leaves the consumer stuck if
start() raises; change the start sequence in the start method so you first
construct AIOKafkaConsumer, await self._consumer.start(), then set self._running
= True and only then create the asyncio task for self._consume_loop();
additionally, if start() fails ensure you do not leave self._consumer or
self._running in an inconsistent state (set self._running = False and
cleanup/close self._consumer on exception) so subsequent retries can proceed.


async def stop(self) -> None:
"""Stop the consumer background task gracefully."""
self._running = False
if self._task:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task

if self._consumer:
await self._consumer.stop()
self._consumer = None

logger.info(f"Consumer for '{self.topic}' stopped")

async def _consume_loop(self) -> None:
"""Main consumption loop."""
if not self._consumer:
return

try:
async for msg in self._consumer:
if not self._running:
break

try:
event = self.event_schema.model_validate(msg.value)
await self._process_with_retry(event)
# Only commit after successful processing
await self._consumer.commit()
except Exception:
logger.exception(
f"Unrecoverable error on message from '{self.topic}'. "
f"Skipping commit — offset will be re-delivered on restart."
)
except asyncio.CancelledError:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 5 days ago

General approach: avoid a bare pass in the except asyncio.CancelledError block by either logging the cancellation or documenting it with a comment and possibly re-raising. We want to preserve existing shutdown behavior, where cancellation during stop() is treated as normal and does not log as an error.

Best specific fix: on line 112–113, replace the empty handler:

        except asyncio.CancelledError:
            pass

with a handler that records that the consumer loop was cancelled in a non-noisy way, e.g.:

        except asyncio.CancelledError:
            logger.info(f"Consumer loop for '{self.topic}' was cancelled")
            # Cancellation is expected during shutdown; no further action required.

or use logger.debug if you want less frequent logs. This preserves the semantics (we still swallow the cancellation and don’t propagate it as an error) but removes the empty except, explains the intent, and satisfies CodeQL. No new imports or helper methods are required; logger and asyncio are already available in this file.

Location details:

  • File: app/kafka/consumer.py
  • Region: the _consume_loop method, the outer try/except around the async for msg in self._consumer loop, currently lines 112–113.
Suggested changeset 1
app/kafka/consumer.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/app/kafka/consumer.py b/app/kafka/consumer.py
--- a/app/kafka/consumer.py
+++ b/app/kafka/consumer.py
@@ -110,7 +110,8 @@
                         f"Skipping commit — offset will be re-delivered on restart."
                     )
         except asyncio.CancelledError:
-            pass
+            logger.info(f"Consumer loop for '{self.topic}' was cancelled")
+            # Cancellation is expected during graceful shutdown; no further action required.
         except Exception:
             logger.exception(f"Unexpected error in consumer loop for '{self.topic}'")
 
EOF
@@ -110,7 +110,8 @@
f"Skipping commit — offset will be re-delivered on restart."
)
except asyncio.CancelledError:
pass
logger.info(f"Consumer loop for '{self.topic}' was cancelled")
# Cancellation is expected during graceful shutdown; no further action required.
except Exception:
logger.exception(f"Unexpected error in consumer loop for '{self.topic}'")

Copilot is powered by AI and may make mistakes. Always verify output.
pass
except Exception:
logger.exception(f"Unexpected error in consumer loop for '{self.topic}'")

async def _process_with_retry(self, event: BaseEvent[Any]) -> None:
"""
Process an event with linear backoff retries.
After exhausting all retries, routes the event to the DLQ.
"""
last_error: Exception | None = None

for attempt in range(settings.KAFKA_MAX_RETRIES + 1):
try:
await self.handle(event)
return # Success
except Exception as e:
last_error = e
if attempt < settings.KAFKA_MAX_RETRIES:
wait_secs = (settings.KAFKA_RETRY_BACKOFF_MS / 1000) * (attempt + 1)
logger.warning(
f"Retry {attempt + 1}/{settings.KAFKA_MAX_RETRIES} "
f"for event {event.event_id} in {wait_secs:.1f}s. "
f"Reason: {e}"
)
await asyncio.sleep(wait_secs)

logger.error(
f"Event {event.event_id} failed after "
f"{settings.KAFKA_MAX_RETRIES} retries. Routing to DLQ."
)
await self._send_to_dlq(
event, str(last_error), retries=settings.KAFKA_MAX_RETRIES
)

async def _send_to_dlq(
self, event: BaseEvent[Any], error_message: str, retries: int
) -> None:
"""
Forward a failed event to its Dead Letter Queue topic.
Wraps it in a DLQEvent — a proper structured schema — instead of
mutating the original event payload.
"""
dlq_topic = f"{DLQ_PREFIX}{self.topic}"
dlq_event = DLQEvent(
original_event_id=event.event_id,
original_topic=self.topic,
original_event=event.model_dump(),
error_message=error_message,
retry_count=retries,
)

try:
# Use the injected producer directly — no circular import needed
dlq_payload = dlq_event.model_dump()
await self._producer._producer.send_and_wait( # type: ignore[union-attr]
dlq_topic,
value=json.dumps(dlq_payload, default=str).encode("utf-8"),
)
logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'")
except Exception:
logger.exception(
f"CRITICAL: Failed to forward event {event.event_id} "
f"to '{dlq_topic}'. Event is permanently lost."
)
Comment on lines +147 to +176
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Propagate DLQ publish failures so the original offset is not committed.

_consume_loop() commits at Lines 103-106 whenever _process_with_retry() returns. Because _send_to_dlq() swallows send errors, a broken DLQ path still lets the original message get committed and lost without either successful handling or dead-lettering.

Suggested fix
         try:
             # Use the injected producer directly — no circular import needed
             dlq_payload = dlq_event.model_dump()
             await self._producer._producer.send_and_wait(  # type: ignore[union-attr]
                 dlq_topic,
                 value=json.dumps(dlq_payload, default=str).encode("utf-8"),
             )
             logger.info(f"Event {event.event_id} forwarded to DLQ topic '{dlq_topic}'")
         except Exception:
             logger.exception(
                 f"CRITICAL: Failed to forward event {event.event_id} "
                 f"to '{dlq_topic}'. Event is permanently lost."
             )
+            raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/consumer.py` around lines 147 - 176, The DLQ routine _send_to_dlq
currently catches and swallows all exceptions which allows _consume_loop to
commit offsets after _process_with_retry returns; change _send_to_dlq to
propagate failures instead of absorbing them — after logging the failure in the
except block re-raise the exception (or raise a specific DLQPublishError) so
callers (_process_with_retry and ultimately _consume_loop) can detect the DLQ
publish failure and avoid committing the original offset; ensure callers either
catch and handle that exception or let it bubble up to prevent message commit.


@abc.abstractmethod
async def handle(self, event: BaseEvent[Any]) -> None:
"""Implement message processing logic in subclasses."""
23 changes: 23 additions & 0 deletions app/kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from app.core.exceptions import FluentMeetException


class KafkaError(FluentMeetException):
"""Base exception for Kafka-related errors."""

def __init__(self, message: str, code: str = "KAFKA_ERROR") -> None:
super().__init__(status_code=500, code=code, message=message)


class KafkaConnectionError(KafkaError):
def __init__(self, message: str = "Failed to connect to Kafka broker") -> None:
super().__init__(message, code="KAFKA_CONNECTION_ERROR")


class KafkaPublishError(KafkaError):
def __init__(self, message: str = "Failed to publish message to Kafka") -> None:
super().__init__(message, code="KAFKA_PUBLISH_ERROR")


class KafkaConsumeError(KafkaError):
def __init__(self, message: str = "Failed to consume message from Kafka") -> None:
super().__init__(message, code="KAFKA_CONSUME_ERROR")
92 changes: 92 additions & 0 deletions app/kafka/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
from typing import Optional

from app.core.config import settings
from app.kafka.consumer import BaseConsumer
from app.kafka.producer import KafkaProducer

logger = logging.getLogger(__name__)


class KafkaManager:
"""
Singleton manager responsible for Kafka producer and consumer lifecycles.

Usage:
manager = get_kafka_manager()
manager.register_consumer(MyEmailConsumer())
await manager.start() # called from FastAPI lifespan
await manager.stop() # called from FastAPI lifespan
"""

_instance: Optional["KafkaManager"] = None
_initialized: bool = False

def __new__(cls) -> "KafkaManager":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance

def __init__(self) -> None:
if self._initialized:
return

self.producer = KafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
)
self.consumers: list[BaseConsumer] = []
self._initialized = True

def register_consumer(self, consumer: BaseConsumer) -> None:
"""
Register a consumer to be started when the manager starts.
The producer is injected into the consumer at this point so it
can access it for DLQ forwarding without a circular import.
"""
consumer._producer = self.producer
self.consumers.append(consumer)
logger.info(f"Registered consumer for topic: '{consumer.topic}'")

async def start(self) -> None:
"""Start the producer, then all registered consumers."""
logger.info("Starting Kafka Manager...")
await self.producer.start()

for consumer in self.consumers:
# Pass bootstrap_servers at start-time — consumers don't store it
await consumer.start(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)

logger.info(
f"Kafka Manager started — {len(self.consumers)} consumer(s) running"
)

async def stop(self) -> None:
"""Stop all consumers first, then the producer."""
logger.info("Stopping Kafka Manager...")

for consumer in self.consumers:
await consumer.stop()

await self.producer.stop()
logger.info("Kafka Manager stopped")
Comment on lines +51 to +72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Rollback partial startup and keep shutdown cleanup going.

If one consumer.start() raises after the producer has already started, app/main.py Lines 11-18 never reach shutdown and the earlier resources stay alive. The shutdown path has the mirror problem: one consumer.stop() exception prevents the remaining consumers and the producer from being stopped.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/manager.py` around lines 51 - 72, When starting in
KafkaManager.start(), roll back partial startup if a consumer.start(...) raises:
wrap the consumer start loop so that on exception you iterate over
already-started consumers and call their stop(), then stop self.producer, and
re-raise the original error; reference start(), consumers, producer, and
settings.KAFKA_BOOTSTRAP_SERVERS. Likewise harden KafkaManager.stop() so
exceptions from one consumer.stop() do not abort the rest — catch and log
per-consumer exceptions, continue stopping remaining consumers, and ensure
producer.stop() is always attempted in a finally block (logging any errors) so
full cleanup always runs.


async def health_check(self) -> dict:
"""
Verify Kafka broker connectivity via a metadata probe.
Uses the public producer.ping() API — no private attribute access.
"""
if not self.producer.is_started:
return {"status": "uninitialized", "details": "Producer not started"}

try:
await self.producer.ping()
return {"status": "healthy"}
except Exception as e:
logger.error(f"Kafka health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
Comment on lines +82 to +87
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't leak broker errors through /health.

health_check() returns str(e), and app/main.py Lines 45-50 expose that payload directly under services.kafka. That can leak broker addresses, auth failures, or TLS details to any caller of the health endpoint.

Suggested fix
-        except Exception as e:
-            logger.error(f"Kafka health check failed: {e}")
-            return {"status": "unhealthy", "error": str(e)}
+        except Exception:
+            logger.exception("Kafka health check failed")
+            return {"status": "unhealthy"}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
await self.producer.ping()
return {"status": "healthy"}
except Exception as e:
logger.error(f"Kafka health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
try:
await self.producer.ping()
return {"status": "healthy"}
except Exception:
logger.exception("Kafka health check failed")
return {"status": "unhealthy"}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/kafka/manager.py` around lines 82 - 87, The health_check() in manager.py
currently returns str(e) which can leak broker details; change it so
producer.ping() exceptions are logged with full details via logger.exception or
logger.error(..., exc_info=True) but the returned payload only contains a
generic error (e.g., {"status":"unhealthy","error":"unavailable"} or omit
"error"), so callers (app/main.py -> services.kafka) never see sensitive
broker/auth/TLS info; keep the exception capture around await
self.producer.ping(), reference health_check, self.producer.ping, and logger
when making these changes.



def get_kafka_manager() -> KafkaManager:
"""Return the KafkaManager singleton."""
return KafkaManager()
Loading
Loading