From 852e1fa42f2d091741182626813245bf21352c45 Mon Sep 17 00:00:00 2001 From: aniebietafia Date: Tue, 17 Mar 2026 17:57:27 +0100 Subject: [PATCH 1/3] feat(email): add Kafka-backed Mailgun pipeline for signup and forgot-password Signed-off-by: aniebietafia --- .env.example | 4 + app/api/v1/endpoints/auth.py | 75 +++++++++++- app/core/config.py | 7 +- app/kafka/manager.py | 2 + app/kafka/schemas.py | 9 +- app/schemas/auth.py | 10 ++ app/services/email_consumer.py | 115 ++++++++++++++++++ app/services/email_producer.py | 43 +++++++ app/templates/email/password_reset.html | 13 ++ app/templates/email/verification.html | 12 ++ docs/email_service.md | 68 ----------- tests/test_auth/test_auth_signup.py | 62 +++++++++- tests/test_kafka/test_email_consumer.py | 64 ++++++++++ .../test_kafka/test_email_producer_service.py | 37 ++++++ tests/test_kafka/test_schemas.py | 14 +-- 15 files changed, 450 insertions(+), 85 deletions(-) create mode 100644 app/services/email_consumer.py create mode 100644 app/services/email_producer.py create mode 100644 app/templates/email/password_reset.html create mode 100644 app/templates/email/verification.html delete mode 100644 docs/email_service.md create mode 100644 tests/test_kafka/test_email_consumer.py create mode 100644 tests/test_kafka/test_email_producer_service.py diff --git a/.env.example b/.env.example index b030a90..b3d2f37 100644 --- a/.env.example +++ b/.env.example @@ -21,6 +21,7 @@ REDIS_PORT=6379 # Kafka KAFKA_BOOTSTRAP_SERVERS=localhost:9092 +KAFKA_EMAIL_CONSUMER_GROUP_ID=email-worker # External Services DEEPGRAM_API_KEY= @@ -38,5 +39,8 @@ S3_BUCKET_NAME= # Email (SES/Mailgun/Resend) MAILGUN_API_KEY= MAILGUN_DOMAIN= +MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com +MAILGUN_TIMEOUT_SECONDS=10 +FRONTEND_BASE_URL=http://localhost:3000 RESEND_API_KEY= SES_SENDER_EMAIL= diff --git a/app/api/v1/endpoints/auth.py b/app/api/v1/endpoints/auth.py index cba71fd..bf73ae9 100644 --- a/app/api/v1/endpoints/auth.py +++ b/app/api/v1/endpoints/auth.py @@ -1,13 +1,25 @@ +import logging +from uuid import uuid4 + from fastapi import APIRouter, Depends, status from sqlalchemy.orm import Session -from app.crud.user import create_user +from app.core.config import settings +from app.crud.user import create_user, get_user_by_email from app.db.session import get_db -from app.schemas.auth import SignupResponse +from app.schemas.auth import ( + ActionAcknowledgement, + ForgotPasswordRequest, + SignupResponse, +) from app.schemas.user import UserCreate +from app.services.email_producer import EmailProducerService, get_email_producer_service + +logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["auth"]) DB_SESSION_DEPENDENCY = Depends(get_db) +EMAIL_PRODUCER_DEPENDENCY = Depends(get_email_producer_service) @router.post( @@ -15,6 +27,63 @@ response_model=SignupResponse, status_code=status.HTTP_201_CREATED, ) -def signup(user_in: UserCreate, db: Session = DB_SESSION_DEPENDENCY) -> SignupResponse: +async def signup( + user_in: UserCreate, + db: Session = DB_SESSION_DEPENDENCY, + email_producer: EmailProducerService = EMAIL_PRODUCER_DEPENDENCY, +) -> SignupResponse: user = create_user(db=db, user_in=user_in) + + verification_link = ( + f"{settings.FRONTEND_BASE_URL}/verify-email?user={user.id}&token={uuid4()}" + ) + try: + await email_producer.send_email( + to=user.email, + subject="Verify your FluentMeet account", + html_body=None, + template_data={"verification_link": verification_link}, + template="verification", + ) + except Exception as exc: + # Signup should succeed even if email queueing fails. + logger.warning( + "Failed to enqueue verification email for user %s: %s", user.id, exc + ) + return SignupResponse.model_validate(user) + + +@router.post( + "/forgot-password", + response_model=ActionAcknowledgement, + status_code=status.HTTP_202_ACCEPTED, +) +async def forgot_password( + request: ForgotPasswordRequest, + db: Session = DB_SESSION_DEPENDENCY, + email_producer: EmailProducerService = EMAIL_PRODUCER_DEPENDENCY, +) -> ActionAcknowledgement: + user = get_user_by_email(db, request.email) + + if user: + reset_link = f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}&token={uuid4()}" + try: + await email_producer.send_email( + to=user.email, + subject="Reset your FluentMeet password", + html_body=None, + template_data={"reset_link": reset_link}, + template="password_reset", + ) + except Exception as exc: + logger.warning( + "Failed to enqueue password reset email for %s: %s", user.email, exc + ) + + return ActionAcknowledgement( + message=( + "If an account with that email exists, we have sent " + "password reset instructions." + ) + ) diff --git a/app/core/config.py b/app/core/config.py index 830d426..afc7832 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -40,6 +40,7 @@ class Settings(BaseSettings): KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest" KAFKA_MAX_RETRIES: int = 3 KAFKA_RETRY_BACKOFF_MS: int = 1000 + KAFKA_EMAIL_CONSUMER_GROUP_ID: str = "email-worker" # External Services Keys DEEPGRAM_API_KEY: str | None = None @@ -50,7 +51,11 @@ class Settings(BaseSettings): # Mailgun Email Service MAILGUN_API_KEY: str | None = None MAILGUN_DOMAIN: str | None = None - MAILGUN_FROM_EMAIL: str | None = None + MAILGUN_FROM_ADDRESS: str = "no-reply@fluentmeet.com" + MAILGUN_TIMEOUT_SECONDS: float = 10.0 + + # URL used in transactional email links + FRONTEND_BASE_URL: str = "http://localhost:3000" model_config = SettingsConfigDict( env_file=".env", case_sensitive=True, extra="ignore" diff --git a/app/kafka/manager.py b/app/kafka/manager.py index 5d8795f..34e2764 100644 --- a/app/kafka/manager.py +++ b/app/kafka/manager.py @@ -4,6 +4,7 @@ from app.core.config import settings from app.kafka.consumer import BaseConsumer from app.kafka.producer import KafkaProducer +from app.services.email_consumer import EmailConsumerWorker logger = logging.getLogger(__name__) @@ -36,6 +37,7 @@ def __init__(self) -> None: bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS ) self.consumers: list[BaseConsumer] = [] + self.register_consumer(EmailConsumerWorker(producer=self.producer)) self._initialized = True def register_consumer(self, consumer: BaseConsumer) -> None: diff --git a/app/kafka/schemas.py b/app/kafka/schemas.py index fa7ff9a..0ee09a7 100644 --- a/app/kafka/schemas.py +++ b/app/kafka/schemas.py @@ -33,10 +33,11 @@ class DLQEvent(BaseModel): class EmailPayload(BaseModel): - to_email: str + to: str subject: str - template_name: str - template_data: dict[str, Any] = {} + template: str + data: dict[str, Any] = Field(default_factory=dict) + html_body: str | None = None class EmailEvent(BaseEvent[EmailPayload]): @@ -47,7 +48,7 @@ class MediaUploadPayload(BaseModel): user_id: int file_path: str file_type: str # e.g., 'avatar', 'recording' - metadata: dict[str, Any] = {} + metadata: dict[str, Any] = Field(default_factory=dict) class MediaUploadEvent(BaseEvent[MediaUploadPayload]): diff --git a/app/schemas/auth.py b/app/schemas/auth.py index 1f20b45..9ae4637 100644 --- a/app/schemas/auth.py +++ b/app/schemas/auth.py @@ -1,5 +1,15 @@ +from pydantic import BaseModel, EmailStr + from app.schemas.user import UserResponse class SignupResponse(UserResponse): """Public payload returned by the signup endpoint.""" + + +class ForgotPasswordRequest(BaseModel): + email: EmailStr + + +class ActionAcknowledgement(BaseModel): + message: str diff --git a/app/services/email_consumer.py b/app/services/email_consumer.py new file mode 100644 index 0000000..fc53aa2 --- /dev/null +++ b/app/services/email_consumer.py @@ -0,0 +1,115 @@ +import logging +from pathlib import Path +from typing import Any + +import httpx +from jinja2 import Environment, FileSystemLoader, TemplateNotFound + +from app.core.config import settings +from app.kafka.consumer import BaseConsumer +from app.kafka.schemas import BaseEvent, EmailEvent +from app.kafka.topics import NOTIFICATIONS_EMAIL + +logger = logging.getLogger(__name__) + + +class TransientEmailDeliveryError(Exception): + """Signals failures that should trigger Kafka retries.""" + + +class EmailTemplateRenderer: + def __init__(self) -> None: + templates_root = Path(__file__).resolve().parent.parent / "templates" / "email" + self._environment = Environment( + loader=FileSystemLoader(str(templates_root)), + autoescape=True, + ) + + def render(self, template_name: str, data: dict[str, object]) -> str: + try: + template = self._environment.get_template(f"{template_name}.html") + except TemplateNotFound: + logger.warning( + "Template '%s' is missing, falling back to raw html", + template_name, + ) + return "" + return template.render(**data) + + +class MailgunEmailSender: + """Sends emails via Mailgun's /messages endpoint.""" + + def __init__( + self, timeout_seconds: float = settings.MAILGUN_TIMEOUT_SECONDS + ) -> None: + self._timeout_seconds = timeout_seconds + + async def send(self, to: str, subject: str, html_body: str) -> None: + if not settings.MAILGUN_API_KEY or not settings.MAILGUN_DOMAIN: + logger.warning("Mailgun credentials not configured; skipping dispatch") + return + + endpoint = f"https://api.mailgun.net/v3/{settings.MAILGUN_DOMAIN}/messages" + payload = { + "from": settings.MAILGUN_FROM_ADDRESS, + "to": to, + "subject": subject, + "html": html_body, + } + + async with httpx.AsyncClient(timeout=self._timeout_seconds) as client: + response = await client.post( + endpoint, + data=payload, + auth=("api", settings.MAILGUN_API_KEY), + ) + + if response.status_code in {408, 425, 429} or response.status_code >= 500: + raise TransientEmailDeliveryError( + f"Mailgun transient error ({response.status_code}): {response.text}" + ) + if response.status_code >= 400: + logger.error( + "Mailgun rejected email with status %s: %s", + response.status_code, + response.text, + ) + return + + +class EmailConsumerWorker(BaseConsumer): + topic = NOTIFICATIONS_EMAIL + group_id = settings.KAFKA_EMAIL_CONSUMER_GROUP_ID + event_schema = EmailEvent + + def __init__(self, producer: object) -> None: + super().__init__(producer=producer) + self._sender = MailgunEmailSender() + self._renderer = EmailTemplateRenderer() + + async def handle(self, event: BaseEvent[Any]) -> None: + email_event = EmailEvent.model_validate(event.model_dump()) + html_body = email_event.payload.html_body + if not html_body: + html_body = self._renderer.render( + email_event.payload.template, + email_event.payload.data, + ) + + if not html_body: + logger.error( + "No html body could be rendered for event %s", email_event.event_id + ) + return + + await self._sender.send( + to=email_event.payload.to, + subject=email_event.payload.subject, + html_body=html_body, + ) + logger.info( + "Dispatched email event %s to %s", + email_event.event_id, + email_event.payload.to, + ) diff --git a/app/services/email_producer.py b/app/services/email_producer.py new file mode 100644 index 0000000..4d0de8a --- /dev/null +++ b/app/services/email_producer.py @@ -0,0 +1,43 @@ +import logging +from typing import Any + +from app.kafka.manager import get_kafka_manager +from app.kafka.schemas import EmailEvent, EmailPayload +from app.kafka.topics import NOTIFICATIONS_EMAIL + +logger = logging.getLogger(__name__) + + +class EmailProducerService: + """Publishes email dispatch events to Kafka.""" + + def __init__(self, topic: str = NOTIFICATIONS_EMAIL) -> None: + self._topic = topic + + async def send_email( + self, + to: str, + subject: str, + html_body: str | None, + template_data: dict[str, Any], + template: str, + ) -> None: + payload = EmailPayload( + to=to, + subject=subject, + template=template, + data=template_data, + html_body=html_body, + ) + event = EmailEvent(payload=payload) + + kafka_manager = get_kafka_manager() + await kafka_manager.producer.send(self._topic, event, key=to) + logger.info("Queued email '%s' for %s", template, to) + + +_email_producer_service = EmailProducerService() + + +def get_email_producer_service() -> EmailProducerService: + return _email_producer_service diff --git a/app/templates/email/password_reset.html b/app/templates/email/password_reset.html new file mode 100644 index 0000000..6d18155 --- /dev/null +++ b/app/templates/email/password_reset.html @@ -0,0 +1,13 @@ + + + +

Reset your FluentMeet password

+

We received a password reset request for your account.

+

+ Continue by clicking + this password reset link. +

+

If you did not request this change, you can ignore this email.

+ + + diff --git a/app/templates/email/verification.html b/app/templates/email/verification.html new file mode 100644 index 0000000..8cc3597 --- /dev/null +++ b/app/templates/email/verification.html @@ -0,0 +1,12 @@ + + + +

Verify your FluentMeet account

+

Welcome to FluentMeet.

+

+ Please verify your email by clicking + this verification link. +

+ + + diff --git a/docs/email_service.md b/docs/email_service.md deleted file mode 100644 index fdd07cf..0000000 --- a/docs/email_service.md +++ /dev/null @@ -1,68 +0,0 @@ -### Feature: Implement Mailgun Email Service via Kafka - -**Problem** -The FluentMeet application needs to send transactional emails for user account verification, password reset, and other notifications. Currently, no email service is integrated. Directly calling a third-party email API synchronously inside request handlers would increase latency and couple the request lifecycle to an external service, creating a poor user experience if the provider is slow or unavailable. - -**Proposed Solution** -Integrate Mailgun as the email provider and decouple email sending from the HTTP request lifecycle using Apache Kafka. When an email needs to be sent, the application will publish a structured message to a Kafka topic (`notifications.email`). A dedicated consumer worker will pick up the message and dispatch it via the Mailgun REST API. This approach makes email sending asynchronous, resilient, and independently scalable. - -**User Stories** -* **As a new user,** I want to receive a verification email immediately after signing up, so I can activate my account without experiencing delays in the registration response. -* **As a user,** I want to receive a password reset email when I request one, so I can regain access to my account. -* **As a developer,** I want a reusable, Kafka-backed email service so that any part of the system can trigger an email without being blocked by the Mailgun API call. -* **As a DevOps engineer,** I want email failures to be retried automatically and logged clearly, so transient Mailgun outages don't result in silently dropped emails. - -**Acceptance Criteria** -1. A `Mailgun` configuration block (API key, domain, sender address) is defined in `app/core/config.py` and sourced from environment variables — never hardcoded. -2. A Kafka topic `notifications.email` is created and documented in the infrastructure setup. -3. An `EmailProducerService` is implemented with a `send_email(to, subject, html_body, template_data)` method that publishes a structured JSON message to the `notifications.email` Kafka topic. -4. An `EmailConsumerWorker` is implemented to: - * Consume messages from the `notifications.email` topic. - * Call the Mailgun REST API (`/messages`) to deliver the email. - * Log success and failure outcomes. - * Handle retries on transient failures using Kafka consumer group offsets. -5. Email templates are implemented for: - * **Account Verification**: Contains the verification link. - * **Password Reset**: Contains the time-limited reset link. -6. The `EmailProducerService` is injected into and called from the user registration and password reset flows. -7. Unit tests verify that the producer publishes the correct message payload to the Kafka topic. -8. Integration tests verify the full flow: producer publishes → consumer dispatches → Mailgun API is called. - -**Proposed Technical Details** -* **Mailgun SDK**: Use the `mailgun2` library (already in `requirements.txt`) or direct `httpx` calls to the Mailgun `/messages` endpoint. -* **Kafka Topic**: `notifications.email` — messages follow a standard envelope: - ```json - { - "to": "user@example.com", - "subject": "Verify your FluentMeet account", - "template": "verification", - "data": { "verification_link": "https://..." } - } - ``` -* **Producer**: `app/services/email_producer.py` — uses `aiokafka.AIOKafkaProducer` to publish messages asynchronously. -* **Consumer Worker**: `app/services/email_consumer.py` — long-running `aiokafka.AIOKafkaConsumer` in a background task, started via FastAPI `lifespan` events. -* **Templates**: Jinja2 HTML templates stored in `app/templates/email/` (e.g., `verification.html`, `password_reset.html`). -* **Config**: New fields in `app/core/config.py`: - ```python - MAILGUN_API_KEY: str - MAILGUN_DOMAIN: str - MAILGUN_FROM_ADDRESS: str = "no-reply@fluentmeet.com" - ``` - -**Tasks** -- [ ] Add `MAILGUN_API_KEY`, `MAILGUN_DOMAIN`, and `MAILGUN_FROM_ADDRESS` to `.env.example` and `app/core/config.py`. -- [ ] Create the `notifications.email` Kafka topic and document it in `infra/`. -- [ ] Implement `EmailProducerService` in `app/services/email_producer.py`. -- [ ] Implement `EmailConsumerWorker` in `app/services/email_consumer.py`. -- [ ] Register the consumer as a background task in the FastAPI `lifespan` context manager in `app/main.py`. -- [ ] Create Jinja2 HTML templates for verification and password reset emails. -- [ ] Integrate the email producer into the user registration endpoint (`POST /api/v1/auth/signup`). -- [ ] Integrate the email producer into the password reset endpoint (`POST /api/v1/auth/forgot-password`). -- [ ] Write unit tests for the `EmailProducerService` (mock the Kafka producer). -- [ ] Write integration tests for the full consumer → Mailgun dispatch flow (mock the Mailgun API). - -**Open Questions/Considerations** -* What is the retry strategy for failed Mailgun deliveries — dead-letter queue or fixed retry count? -* Should email sending failures be surfaced to the user (e.g., "email failed to send, please try again") or handled silently with a background retry? -* What is the Kafka consumer group ID for the email worker, and how should it be managed across deployments? -* Should we implement a resend-verification endpoint for users whose verification tokens have expired? diff --git a/tests/test_auth/test_auth_signup.py b/tests/test_auth/test_auth_signup.py index 1f6f590..12c379f 100644 --- a/tests/test_auth/test_auth_signup.py +++ b/tests/test_auth/test_auth_signup.py @@ -1,4 +1,5 @@ from collections.abc import Generator +from unittest.mock import AsyncMock import pytest from fastapi.testclient import TestClient @@ -9,6 +10,7 @@ from app.db.session import get_db from app.main import app from app.models.user import Base, User +from app.services.email_producer import get_email_producer_service @pytest.fixture @@ -34,18 +36,33 @@ def db_session() -> Generator[Session, None, None]: @pytest.fixture -def client(db_session: Session) -> Generator[TestClient, None, None]: +def email_producer_mock() -> AsyncMock: + mock = AsyncMock() + mock.send_email = AsyncMock() + return mock + + +@pytest.fixture +def client( + db_session: Session, email_producer_mock: AsyncMock +) -> Generator[TestClient, None, None]: def _override_get_db() -> Generator[Session, None, None]: yield db_session + def _override_email_producer() -> AsyncMock: + return email_producer_mock + app.dependency_overrides[get_db] = _override_get_db + app.dependency_overrides[get_email_producer_service] = _override_email_producer with TestClient(app) as test_client: yield test_client app.dependency_overrides.clear() def test_signup_success_creates_user_and_returns_public_profile( - client: TestClient, db_session: Session + client: TestClient, + db_session: Session, + email_producer_mock: AsyncMock, ) -> None: payload = { "email": " USER@example.com ", @@ -75,6 +92,7 @@ def test_signup_success_creates_user_and_returns_public_profile( assert created_user.hashed_password.startswith("$2") assert created_user.is_active is True assert created_user.is_verified is False + email_producer_mock.send_email.assert_awaited_once() def test_signup_duplicate_email_returns_conflict(client: TestClient) -> None: @@ -114,3 +132,43 @@ def test_signup_invalid_language_uses_standard_validation_shape( assert body["code"] == "VALIDATION_ERROR" fields = [detail["field"] for detail in body["details"]] assert "body.speaking_language" in fields + + +def test_forgot_password_returns_generic_accepted_response( + client: TestClient, email_producer_mock: AsyncMock +) -> None: + payload = { + "email": "missing-user@example.com", + } + + response = client.post("/api/v1/auth/forgot-password", json=payload) + + assert response.status_code == 202 + assert response.json() == { + "message": ( + "If an account with that email exists, we have sent " + "password reset instructions." + ) + } + email_producer_mock.send_email.assert_not_awaited() + + +def test_forgot_password_enqueues_reset_email_for_existing_user( + client: TestClient, + email_producer_mock: AsyncMock, +) -> None: + signup_payload = { + "email": "pwreset@example.com", + "password": "MyStr0ngP@ss!", + "full_name": "Reset User", + } + assert client.post("/api/v1/auth/signup", json=signup_payload).status_code == 201 + email_producer_mock.send_email.reset_mock() + + response = client.post( + "/api/v1/auth/forgot-password", + json={"email": "pwreset@example.com"}, + ) + + assert response.status_code == 202 + email_producer_mock.send_email.assert_awaited_once() diff --git a/tests/test_kafka/test_email_consumer.py b/tests/test_kafka/test_email_consumer.py new file mode 100644 index 0000000..62157bc --- /dev/null +++ b/tests/test_kafka/test_email_consumer.py @@ -0,0 +1,64 @@ +from typing import Any, cast +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.kafka.schemas import BaseEvent, EmailEvent, EmailPayload +from app.services.email_consumer import EmailConsumerWorker + + +@pytest.mark.asyncio +async def test_email_consumer_renders_template_when_html_missing() -> None: + worker = EmailConsumerWorker(producer=MagicMock()) + render_mock = MagicMock(return_value="rendered") + send_mock = AsyncMock() + + event = EmailEvent( + payload=EmailPayload( + to="user@example.com", + subject="Verify", + template="verification", + data={"verification_link": "https://example.com"}, + ) + ) + + worker._renderer.render = render_mock # type: ignore[method-assign] + worker._sender.send = send_mock # type: ignore[method-assign] + await worker.handle(cast(BaseEvent[Any], event)) + + render_mock.assert_called_once_with( + "verification", {"verification_link": "https://example.com"} + ) + send_mock.assert_awaited_once_with( + to="user@example.com", + subject="Verify", + html_body="rendered", + ) + + +@pytest.mark.asyncio +async def test_email_consumer_prefers_html_body_over_template_rendering() -> None: + worker = EmailConsumerWorker(producer=MagicMock()) + render_mock = MagicMock() + send_mock = AsyncMock() + + event = EmailEvent( + payload=EmailPayload( + to="user@example.com", + subject="Reset Password", + template="password_reset", + data={"reset_link": "https://example.com"}, + html_body="provided", + ) + ) + + worker._renderer.render = render_mock # type: ignore[method-assign] + worker._sender.send = send_mock # type: ignore[method-assign] + await worker.handle(cast(BaseEvent[Any], event)) + + render_mock.assert_not_called() + send_mock.assert_awaited_once_with( + to="user@example.com", + subject="Reset Password", + html_body="provided", + ) diff --git a/tests/test_kafka/test_email_producer_service.py b/tests/test_kafka/test_email_producer_service.py new file mode 100644 index 0000000..d3e1208 --- /dev/null +++ b/tests/test_kafka/test_email_producer_service.py @@ -0,0 +1,37 @@ +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from app.services.email_producer import EmailProducerService + + +@pytest.mark.asyncio +async def test_send_email_publishes_email_event() -> None: + producer = Mock() + producer.send = AsyncMock() + + kafka_manager = Mock() + kafka_manager.producer = producer + + service = EmailProducerService() + with patch( + "app.services.email_producer.get_kafka_manager", + return_value=kafka_manager, + ): + await service.send_email( + to="user@example.com", + subject="Verify account", + html_body=None, + template_data={"verification_link": "https://example.com/verify"}, + template="verification", + ) + + producer.send.assert_awaited_once() + args, kwargs = producer.send.call_args + assert args[0] == "notifications.email" + assert kwargs["key"] == "user@example.com" + + event = args[1] + assert event.payload.to == "user@example.com" + assert event.payload.template == "verification" + assert event.payload.data["verification_link"].startswith("https://") diff --git a/tests/test_kafka/test_schemas.py b/tests/test_kafka/test_schemas.py index 1c3cb90..b02dd09 100644 --- a/tests/test_kafka/test_schemas.py +++ b/tests/test_kafka/test_schemas.py @@ -4,7 +4,7 @@ from app.kafka.schemas import BaseEvent, EmailEvent, EmailPayload -def test_base_event_serialization(): +def test_base_event_serialization() -> None: payload = {"key": "value"} event = BaseEvent(event_type="test.event", payload=payload) @@ -19,20 +19,20 @@ def test_base_event_serialization(): assert dump["payload"] == payload -def test_email_event_validation(): +def test_email_event_validation() -> None: payload = EmailPayload( - to_email="test@example.com", + to="test@example.com", subject="Hello", - template_name="welcome", - template_data={"name": "User"}, + template="welcome", + data={"name": "User"}, ) event = EmailEvent(payload=payload) assert event.event_type == "email.dispatch" - assert event.payload.to_email == "test@example.com" + assert event.payload.to == "test@example.com" # Test model_validate event_dict = event.model_dump() validated_event = EmailEvent.model_validate(event_dict) assert validated_event.event_id == event.event_id - assert validated_event.payload.to_email == "test@example.com" + assert validated_event.payload.to == "test@example.com" From c7aed38dabac2d7a260797cc60b8e95453dddcdc Mon Sep 17 00:00:00 2001 From: aniebietafia Date: Tue, 17 Mar 2026 18:06:22 +0100 Subject: [PATCH 2/3] feat(email): add Kafka-backed Mailgun pipeline for signup and forgot-password Signed-off-by: aniebietafia --- app/api/v1/endpoints/auth.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/api/v1/endpoints/auth.py b/app/api/v1/endpoints/auth.py index bf73ae9..b98a00a 100644 --- a/app/api/v1/endpoints/auth.py +++ b/app/api/v1/endpoints/auth.py @@ -67,7 +67,9 @@ async def forgot_password( user = get_user_by_email(db, request.email) if user: - reset_link = f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}&token={uuid4()}" + reset_link = ( + f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}&token={uuid4()}" + ) try: await email_producer.send_email( to=user.email, From 66226de9957c76529e9f200757abc2240250c0b1 Mon Sep 17 00:00:00 2001 From: aniebietafia Date: Tue, 17 Mar 2026 18:13:47 +0100 Subject: [PATCH 3/3] feat(email): add Kafka-backed Mailgun pipeline for signup and forgot-password Signed-off-by: aniebietafia --- app/api/v1/endpoints/auth.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/api/v1/endpoints/auth.py b/app/api/v1/endpoints/auth.py index b98a00a..6dd3b2f 100644 --- a/app/api/v1/endpoints/auth.py +++ b/app/api/v1/endpoints/auth.py @@ -68,7 +68,9 @@ async def forgot_password( if user: reset_link = ( - f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}&token={uuid4()}" + f"" + f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}" + f"&token={uuid4()}" ) try: await email_producer.send_email(