-
Notifications
You must be signed in to change notification settings - Fork 0
feat(email): add Kafka-backed Mailgun pipeline for signup and forgot-… #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
852e1fa
c7aed38
66226de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,93 @@ | ||
| import logging | ||
| from uuid import uuid4 | ||
|
|
||
| from fastapi import APIRouter, Depends, status | ||
| from sqlalchemy.orm import Session | ||
|
|
||
| from app.crud.user import create_user | ||
| from app.core.config import settings | ||
| from app.crud.user import create_user, get_user_by_email | ||
| from app.db.session import get_db | ||
| from app.schemas.auth import SignupResponse | ||
| from app.schemas.auth import ( | ||
| ActionAcknowledgement, | ||
| ForgotPasswordRequest, | ||
| SignupResponse, | ||
| ) | ||
| from app.schemas.user import UserCreate | ||
| from app.services.email_producer import EmailProducerService, get_email_producer_service | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| router = APIRouter(prefix="/auth", tags=["auth"]) | ||
| DB_SESSION_DEPENDENCY = Depends(get_db) | ||
| EMAIL_PRODUCER_DEPENDENCY = Depends(get_email_producer_service) | ||
|
|
||
|
|
||
| @router.post( | ||
| "/signup", | ||
| response_model=SignupResponse, | ||
| status_code=status.HTTP_201_CREATED, | ||
| ) | ||
| def signup(user_in: UserCreate, db: Session = DB_SESSION_DEPENDENCY) -> SignupResponse: | ||
| async def signup( | ||
| user_in: UserCreate, | ||
| db: Session = DB_SESSION_DEPENDENCY, | ||
| email_producer: EmailProducerService = EMAIL_PRODUCER_DEPENDENCY, | ||
| ) -> SignupResponse: | ||
| user = create_user(db=db, user_in=user_in) | ||
|
|
||
| verification_link = ( | ||
| f"{settings.FRONTEND_BASE_URL}/verify-email?user={user.id}&token={uuid4()}" | ||
| ) | ||
|
Comment on lines
+37
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a verifiable token in these links. These UUIDs are generated inline but never persisted or signed in this flow. The verify/reset callback has no reliable way to validate or expire them as-is; use a stored token with TTL or a backend-signed token instead. Also applies to: 69-70 🤖 Prompt for AI Agents |
||
| try: | ||
| await email_producer.send_email( | ||
| to=user.email, | ||
| subject="Verify your FluentMeet account", | ||
| html_body=None, | ||
| template_data={"verification_link": verification_link}, | ||
| template="verification", | ||
| ) | ||
| except Exception as exc: | ||
| # Signup should succeed even if email queueing fails. | ||
| logger.warning( | ||
| "Failed to enqueue verification email for user %s: %s", user.id, exc | ||
| ) | ||
|
|
||
| return SignupResponse.model_validate(user) | ||
|
|
||
|
|
||
| @router.post( | ||
| "/forgot-password", | ||
| response_model=ActionAcknowledgement, | ||
| status_code=status.HTTP_202_ACCEPTED, | ||
| ) | ||
| async def forgot_password( | ||
| request: ForgotPasswordRequest, | ||
| db: Session = DB_SESSION_DEPENDENCY, | ||
| email_producer: EmailProducerService = EMAIL_PRODUCER_DEPENDENCY, | ||
| ) -> ActionAcknowledgement: | ||
| user = get_user_by_email(db, request.email) | ||
|
|
||
| if user: | ||
| reset_link = ( | ||
| f"" | ||
| f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}" | ||
| f"&token={uuid4()}" | ||
| ) | ||
| try: | ||
| await email_producer.send_email( | ||
| to=user.email, | ||
| subject="Reset your FluentMeet password", | ||
| html_body=None, | ||
| template_data={"reset_link": reset_link}, | ||
| template="password_reset", | ||
| ) | ||
| except Exception as exc: | ||
| logger.warning( | ||
| "Failed to enqueue password reset email for %s: %s", user.email, exc | ||
| ) | ||
|
|
||
| return ActionAcknowledgement( | ||
| message=( | ||
| "If an account with that email exists, we have sent " | ||
| "password reset instructions." | ||
| ) | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,15 @@ | ||
| from pydantic import BaseModel, EmailStr | ||
|
|
||
| from app.schemas.user import UserResponse | ||
|
|
||
|
|
||
| class SignupResponse(UserResponse): | ||
| """Public payload returned by the signup endpoint.""" | ||
|
|
||
|
|
||
| class ForgotPasswordRequest(BaseModel): | ||
| email: EmailStr | ||
|
|
||
|
|
||
| class ActionAcknowledgement(BaseModel): | ||
| message: str |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| import logging | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| import httpx | ||
| from jinja2 import Environment, FileSystemLoader, TemplateNotFound | ||
|
|
||
| from app.core.config import settings | ||
| from app.kafka.consumer import BaseConsumer | ||
| from app.kafka.schemas import BaseEvent, EmailEvent | ||
| from app.kafka.topics import NOTIFICATIONS_EMAIL | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class TransientEmailDeliveryError(Exception): | ||
| """Signals failures that should trigger Kafka retries.""" | ||
|
|
||
|
|
||
| class EmailTemplateRenderer: | ||
| def __init__(self) -> None: | ||
| templates_root = Path(__file__).resolve().parent.parent / "templates" / "email" | ||
| self._environment = Environment( | ||
| loader=FileSystemLoader(str(templates_root)), | ||
| autoescape=True, | ||
| ) | ||
|
|
||
| def render(self, template_name: str, data: dict[str, object]) -> str: | ||
| try: | ||
| template = self._environment.get_template(f"{template_name}.html") | ||
| except TemplateNotFound: | ||
| logger.warning( | ||
| "Template '%s' is missing, falling back to raw html", | ||
| template_name, | ||
| ) | ||
| return "" | ||
| return template.render(**data) | ||
|
Comment on lines
+28
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fail fast when the email template is missing.
Also applies to: 100-104 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| class MailgunEmailSender: | ||
| """Sends emails via Mailgun's /messages endpoint.""" | ||
|
|
||
| def __init__( | ||
| self, timeout_seconds: float = settings.MAILGUN_TIMEOUT_SECONDS | ||
| ) -> None: | ||
| self._timeout_seconds = timeout_seconds | ||
|
|
||
| async def send(self, to: str, subject: str, html_body: str) -> None: | ||
| if not settings.MAILGUN_API_KEY or not settings.MAILGUN_DOMAIN: | ||
| logger.warning("Mailgun credentials not configured; skipping dispatch") | ||
| return | ||
|
Comment on lines
+48
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't treat skipped or rejected Mailgun calls as successful sends.
Also applies to: 72-78, 106-115 🤖 Prompt for AI Agents |
||
|
|
||
| endpoint = f"https://api.mailgun.net/v3/{settings.MAILGUN_DOMAIN}/messages" | ||
| payload = { | ||
| "from": settings.MAILGUN_FROM_ADDRESS, | ||
| "to": to, | ||
| "subject": subject, | ||
| "html": html_body, | ||
| } | ||
|
|
||
| async with httpx.AsyncClient(timeout=self._timeout_seconds) as client: | ||
| response = await client.post( | ||
| endpoint, | ||
| data=payload, | ||
| auth=("api", settings.MAILGUN_API_KEY), | ||
| ) | ||
|
|
||
| if response.status_code in {408, 425, 429} or response.status_code >= 500: | ||
| raise TransientEmailDeliveryError( | ||
| f"Mailgun transient error ({response.status_code}): {response.text}" | ||
| ) | ||
| if response.status_code >= 400: | ||
| logger.error( | ||
| "Mailgun rejected email with status %s: %s", | ||
| response.status_code, | ||
| response.text, | ||
| ) | ||
| return | ||
|
|
||
|
|
||
| class EmailConsumerWorker(BaseConsumer): | ||
| topic = NOTIFICATIONS_EMAIL | ||
| group_id = settings.KAFKA_EMAIL_CONSUMER_GROUP_ID | ||
| event_schema = EmailEvent | ||
|
|
||
| def __init__(self, producer: object) -> None: | ||
| super().__init__(producer=producer) | ||
| self._sender = MailgunEmailSender() | ||
| self._renderer = EmailTemplateRenderer() | ||
|
|
||
| async def handle(self, event: BaseEvent[Any]) -> None: | ||
| email_event = EmailEvent.model_validate(event.model_dump()) | ||
| html_body = email_event.payload.html_body | ||
| if not html_body: | ||
| html_body = self._renderer.render( | ||
| email_event.payload.template, | ||
| email_event.payload.data, | ||
| ) | ||
|
|
||
| if not html_body: | ||
| logger.error( | ||
| "No html body could be rendered for event %s", email_event.event_id | ||
| ) | ||
| return | ||
|
|
||
| await self._sender.send( | ||
| to=email_event.payload.to, | ||
| subject=email_event.payload.subject, | ||
| html_body=html_body, | ||
| ) | ||
| logger.info( | ||
| "Dispatched email event %s to %s", | ||
| email_event.event_id, | ||
| email_event.payload.to, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| from app.kafka.manager import get_kafka_manager | ||
| from app.kafka.schemas import EmailEvent, EmailPayload | ||
| from app.kafka.topics import NOTIFICATIONS_EMAIL | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class EmailProducerService: | ||
| """Publishes email dispatch events to Kafka.""" | ||
|
|
||
| def __init__(self, topic: str = NOTIFICATIONS_EMAIL) -> None: | ||
| self._topic = topic | ||
|
|
||
| async def send_email( | ||
| self, | ||
| to: str, | ||
| subject: str, | ||
| html_body: str | None, | ||
| template_data: dict[str, Any], | ||
| template: str, | ||
| ) -> None: | ||
| payload = EmailPayload( | ||
| to=to, | ||
| subject=subject, | ||
| template=template, | ||
| data=template_data, | ||
| html_body=html_body, | ||
| ) | ||
| event = EmailEvent(payload=payload) | ||
|
|
||
| kafka_manager = get_kafka_manager() | ||
| await kafka_manager.producer.send(self._topic, event, key=to) | ||
| logger.info("Queued email '%s' for %s", template, to) | ||
|
Comment on lines
+34
to
+36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using the raw email address as the Kafka key.
🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| _email_producer_service = EmailProducerService() | ||
|
|
||
|
|
||
| def get_email_producer_service() -> EmailProducerService: | ||
| return _email_producer_service | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| <!DOCTYPE html> | ||
| <html lang="en"> | ||
| <body> | ||
| <h2>Reset your FluentMeet password</h2> | ||
| <p>We received a password reset request for your account.</p> | ||
| <p> | ||
| Continue by clicking | ||
| <a href="{{ reset_link }}">this password reset link</a>. | ||
| </p> | ||
| <p>If you did not request this change, you can ignore this email.</p> | ||
| </body> | ||
| </html> | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| <!DOCTYPE html> | ||
| <html lang="en"> | ||
| <body> | ||
| <h2>Verify your FluentMeet account</h2> | ||
| <p>Welcome to FluentMeet.</p> | ||
| <p> | ||
| Please verify your email by clicking | ||
| <a href="{{ verification_link }}">this verification link</a>. | ||
| </p> | ||
| </body> | ||
| </html> | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix dotenv key ordering to satisfy linter.
FRONTEND_BASE_URLshould be moved beforeMAILGUN_API_KEYto address the current dotenv-linter warning.Proposed reorder
📝 Committable suggestion
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 44-44: [UnorderedKey] The FRONTEND_BASE_URL key should go before the MAILGUN_API_KEY key
(UnorderedKey)
🤖 Prompt for AI Agents