Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ REDIS_PORT=6379

# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_EMAIL_CONSUMER_GROUP_ID=email-worker

# External Services
DEEPGRAM_API_KEY=
Expand All @@ -38,5 +39,8 @@ S3_BUCKET_NAME=
# Email (SES/Mailgun/Resend)
MAILGUN_API_KEY=
MAILGUN_DOMAIN=
MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
MAILGUN_TIMEOUT_SECONDS=10
FRONTEND_BASE_URL=http://localhost:3000
Comment on lines 40 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix dotenv key ordering to satisfy linter.

FRONTEND_BASE_URL should be moved before MAILGUN_API_KEY to address the current dotenv-linter warning.

Proposed reorder
-# Email (SES/Mailgun/Resend)
-MAILGUN_API_KEY=
-MAILGUN_DOMAIN=
-MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
-MAILGUN_TIMEOUT_SECONDS=10
-FRONTEND_BASE_URL=http://localhost:3000
+# Email (SES/Mailgun/Resend)
+FRONTEND_BASE_URL=http://localhost:3000
+MAILGUN_API_KEY=
+MAILGUN_DOMAIN=
+MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
+MAILGUN_TIMEOUT_SECONDS=10
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
MAILGUN_API_KEY=
MAILGUN_DOMAIN=
MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
MAILGUN_TIMEOUT_SECONDS=10
FRONTEND_BASE_URL=http://localhost:3000
FRONTEND_BASE_URL=http://localhost:3000
MAILGUN_API_KEY=
MAILGUN_DOMAIN=
MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
MAILGUN_TIMEOUT_SECONDS=10
🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 44-44: [UnorderedKey] The FRONTEND_BASE_URL key should go before the MAILGUN_API_KEY key

(UnorderedKey)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.env.example around lines 40 - 44, Move the FRONTEND_BASE_URL key so it
appears before MAILGUN_API_KEY in .env.example to satisfy dotenv-linter
ordering; update the block containing FRONTEND_BASE_URL, MAILGUN_API_KEY,
MAILGUN_DOMAIN, MAILGUN_FROM_ADDRESS and MAILGUN_TIMEOUT_SECONDS so keys are in
the expected order (with FRONTEND_BASE_URL preceding MAILGUN_API_KEY) and keep
their values/format unchanged.

RESEND_API_KEY=
SES_SENDER_EMAIL=
79 changes: 76 additions & 3 deletions app/api/v1/endpoints/auth.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,93 @@
import logging
from uuid import uuid4

from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session

from app.crud.user import create_user
from app.core.config import settings
from app.crud.user import create_user, get_user_by_email
from app.db.session import get_db
from app.schemas.auth import SignupResponse
from app.schemas.auth import (
ActionAcknowledgement,
ForgotPasswordRequest,
SignupResponse,
)
from app.schemas.user import UserCreate
from app.services.email_producer import EmailProducerService, get_email_producer_service

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/auth", tags=["auth"])
DB_SESSION_DEPENDENCY = Depends(get_db)
EMAIL_PRODUCER_DEPENDENCY = Depends(get_email_producer_service)


@router.post(
"/signup",
response_model=SignupResponse,
status_code=status.HTTP_201_CREATED,
)
def signup(user_in: UserCreate, db: Session = DB_SESSION_DEPENDENCY) -> SignupResponse:
async def signup(
user_in: UserCreate,
db: Session = DB_SESSION_DEPENDENCY,
email_producer: EmailProducerService = EMAIL_PRODUCER_DEPENDENCY,
) -> SignupResponse:
user = create_user(db=db, user_in=user_in)

verification_link = (
f"{settings.FRONTEND_BASE_URL}/verify-email?user={user.id}&token={uuid4()}"
)
Comment on lines +37 to +39
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use a verifiable token in these links.

These UUIDs are generated inline but never persisted or signed in this flow. The verify/reset callback has no reliable way to validate or expire them as-is; use a stored token with TTL or a backend-signed token instead.

Also applies to: 69-70

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/api/v1/endpoints/auth.py` around lines 37 - 39, The verification_link
currently embeds an ephemeral uuid4() that is neither persisted nor signed, so
the verify/reset endpoints cannot validate or expire it; change this to generate
a verifiable token (either a stored token with TTL or a backend-signed token)
and include that token in the link. Concretely: replace the inline uuid4() usage
in verification_link (and the similar reset link around lines 69-70) with a
token produced by either (a) creating and persisting an EmailVerificationToken
(or PasswordResetToken) record tied to user.id with an expires_at and a securely
generated token value, then use that token string in the URL and have the
verify/reset handlers look up, validate expiry, and revoke the DB token; or (b)
issue a signed JWT containing user id and exp, sign with your server key, put
the JWT in the URL and validate signature+exp in the callback. Ensure token
creation, storage/revocation, and validation logic are added to functions
handling email generation and the corresponding verify/reset endpoints (so the
callback can reliably validate and expire tokens).

try:
await email_producer.send_email(
to=user.email,
subject="Verify your FluentMeet account",
html_body=None,
template_data={"verification_link": verification_link},
template="verification",
)
except Exception as exc:
# Signup should succeed even if email queueing fails.
logger.warning(
"Failed to enqueue verification email for user %s: %s", user.id, exc
)

return SignupResponse.model_validate(user)


@router.post(
"/forgot-password",
response_model=ActionAcknowledgement,
status_code=status.HTTP_202_ACCEPTED,
)
async def forgot_password(
request: ForgotPasswordRequest,
db: Session = DB_SESSION_DEPENDENCY,
email_producer: EmailProducerService = EMAIL_PRODUCER_DEPENDENCY,
) -> ActionAcknowledgement:
user = get_user_by_email(db, request.email)

if user:
reset_link = (
f""
f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}"
f"&token={uuid4()}"
)
try:
await email_producer.send_email(
to=user.email,
subject="Reset your FluentMeet password",
html_body=None,
template_data={"reset_link": reset_link},
template="password_reset",
)
except Exception as exc:
logger.warning(
"Failed to enqueue password reset email for %s: %s", user.email, exc
)

return ActionAcknowledgement(
message=(
"If an account with that email exists, we have sent "
"password reset instructions."
)
)
7 changes: 6 additions & 1 deletion app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Settings(BaseSettings):
KAFKA_CONSUMER_AUTO_OFFSET_RESET: str = "earliest"
KAFKA_MAX_RETRIES: int = 3
KAFKA_RETRY_BACKOFF_MS: int = 1000
KAFKA_EMAIL_CONSUMER_GROUP_ID: str = "email-worker"

# External Services Keys
DEEPGRAM_API_KEY: str | None = None
Expand All @@ -50,7 +51,11 @@ class Settings(BaseSettings):
# Mailgun Email Service
MAILGUN_API_KEY: str | None = None
MAILGUN_DOMAIN: str | None = None
MAILGUN_FROM_EMAIL: str | None = None
MAILGUN_FROM_ADDRESS: str = "no-reply@fluentmeet.com"
MAILGUN_TIMEOUT_SECONDS: float = 10.0

# URL used in transactional email links
FRONTEND_BASE_URL: str = "http://localhost:3000"

model_config = SettingsConfigDict(
env_file=".env", case_sensitive=True, extra="ignore"
Expand Down
2 changes: 2 additions & 0 deletions app/kafka/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from app.core.config import settings
from app.kafka.consumer import BaseConsumer
from app.kafka.producer import KafkaProducer
from app.services.email_consumer import EmailConsumerWorker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -36,6 +37,7 @@ def __init__(self) -> None:
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
)
self.consumers: list[BaseConsumer] = []
self.register_consumer(EmailConsumerWorker(producer=self.producer))
self._initialized = True

def register_consumer(self, consumer: BaseConsumer) -> None:
Expand Down
9 changes: 5 additions & 4 deletions app/kafka/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class DLQEvent(BaseModel):


class EmailPayload(BaseModel):
to_email: str
to: str
subject: str
template_name: str
template_data: dict[str, Any] = {}
template: str
data: dict[str, Any] = Field(default_factory=dict)
html_body: str | None = None


class EmailEvent(BaseEvent[EmailPayload]):
Expand All @@ -47,7 +48,7 @@ class MediaUploadPayload(BaseModel):
user_id: int
file_path: str
file_type: str # e.g., 'avatar', 'recording'
metadata: dict[str, Any] = {}
metadata: dict[str, Any] = Field(default_factory=dict)


class MediaUploadEvent(BaseEvent[MediaUploadPayload]):
Expand Down
10 changes: 10 additions & 0 deletions app/schemas/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from pydantic import BaseModel, EmailStr

from app.schemas.user import UserResponse


class SignupResponse(UserResponse):
"""Public payload returned by the signup endpoint."""


class ForgotPasswordRequest(BaseModel):
email: EmailStr


class ActionAcknowledgement(BaseModel):
message: str
115 changes: 115 additions & 0 deletions app/services/email_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from pathlib import Path
from typing import Any

import httpx
from jinja2 import Environment, FileSystemLoader, TemplateNotFound

from app.core.config import settings
from app.kafka.consumer import BaseConsumer
from app.kafka.schemas import BaseEvent, EmailEvent
from app.kafka.topics import NOTIFICATIONS_EMAIL

logger = logging.getLogger(__name__)


class TransientEmailDeliveryError(Exception):
"""Signals failures that should trigger Kafka retries."""


class EmailTemplateRenderer:
def __init__(self) -> None:
templates_root = Path(__file__).resolve().parent.parent / "templates" / "email"
self._environment = Environment(
loader=FileSystemLoader(str(templates_root)),
autoescape=True,
)

def render(self, template_name: str, data: dict[str, object]) -> str:
try:
template = self._environment.get_template(f"{template_name}.html")
except TemplateNotFound:
logger.warning(
"Template '%s' is missing, falling back to raw html",
template_name,
)
return ""
return template.render(**data)
Comment on lines +28 to +37
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fail fast when the email template is missing.

render() converts TemplateNotFound into an empty body, and handle() just returns when that happens. The worker treats the record as handled even though no email was sent, so a bad deploy can silently lose signup/reset emails. Raise a hard failure or route the record to a DLQ instead.

Also applies to: 100-104

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/email_consumer.py` around lines 28 - 37, The render() method
currently swallows TemplateNotFound and returns an empty string which lets
handle() treat the message as processed; instead, when
self._environment.get_template(...) raises TemplateNotFound, re-raise that
exception (or raise a specific EmailTemplateMissingError) so the worker fails
fast / triggers retry/DLQ logic rather than returning ""; update handle() to not
swallow this exception and to route the record to the DLQ or let it bubble to
the worker framework; apply the same change to the other identical
template-loading block around the second occurrence (the block at the other
occurrence mentioned) so missing templates always cause a hard failure or DLQ
routing.



class MailgunEmailSender:
"""Sends emails via Mailgun's /messages endpoint."""

def __init__(
self, timeout_seconds: float = settings.MAILGUN_TIMEOUT_SECONDS
) -> None:
self._timeout_seconds = timeout_seconds

async def send(self, to: str, subject: str, html_body: str) -> None:
if not settings.MAILGUN_API_KEY or not settings.MAILGUN_DOMAIN:
logger.warning("Mailgun credentials not configured; skipping dispatch")
return
Comment on lines +48 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat skipped or rejected Mailgun calls as successful sends.

send() returns normally when Mailgun credentials are missing and for every 4xx response. handle() then logs "Dispatched email event...", so monitoring will say the email was sent even when Mailgun skipped or rejected it. Have send() return an explicit delivery result or raise on these branches.

Also applies to: 72-78, 106-115

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/email_consumer.py` around lines 48 - 51, The send() method in
app.services.email_consumer (async def send) currently returns normally when
Mailgun creds are missing or when Mailgun returns 4xx, causing handle() to log
success incorrectly; change send() to either raise a specific exception (e.g.,
EmailDeliveryError) for skipped/misconfigured or rejected (4xx) cases OR return
an explicit result enum (e.g., EmailDeliveryResult {SENT, SKIPPED, REJECTED,
FAILED}) and include the HTTP status and body for non-2xx responses; update the
caller handle() to check that returned result == SENT (or catch
EmailDeliveryError) before logging "Dispatched email event..." and only log
success on true delivery, while preserving error logging for other cases.


endpoint = f"https://api.mailgun.net/v3/{settings.MAILGUN_DOMAIN}/messages"
payload = {
"from": settings.MAILGUN_FROM_ADDRESS,
"to": to,
"subject": subject,
"html": html_body,
}

async with httpx.AsyncClient(timeout=self._timeout_seconds) as client:
response = await client.post(
endpoint,
data=payload,
auth=("api", settings.MAILGUN_API_KEY),
)

if response.status_code in {408, 425, 429} or response.status_code >= 500:
raise TransientEmailDeliveryError(
f"Mailgun transient error ({response.status_code}): {response.text}"
)
if response.status_code >= 400:
logger.error(
"Mailgun rejected email with status %s: %s",
response.status_code,
response.text,
)
return


class EmailConsumerWorker(BaseConsumer):
topic = NOTIFICATIONS_EMAIL
group_id = settings.KAFKA_EMAIL_CONSUMER_GROUP_ID
event_schema = EmailEvent

def __init__(self, producer: object) -> None:
super().__init__(producer=producer)
self._sender = MailgunEmailSender()
self._renderer = EmailTemplateRenderer()

async def handle(self, event: BaseEvent[Any]) -> None:
email_event = EmailEvent.model_validate(event.model_dump())
html_body = email_event.payload.html_body
if not html_body:
html_body = self._renderer.render(
email_event.payload.template,
email_event.payload.data,
)

if not html_body:
logger.error(
"No html body could be rendered for event %s", email_event.event_id
)
return

await self._sender.send(
to=email_event.payload.to,
subject=email_event.payload.subject,
html_body=html_body,
)
logger.info(
"Dispatched email event %s to %s",
email_event.event_id,
email_event.payload.to,
)
43 changes: 43 additions & 0 deletions app/services/email_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from typing import Any

from app.kafka.manager import get_kafka_manager
from app.kafka.schemas import EmailEvent, EmailPayload
from app.kafka.topics import NOTIFICATIONS_EMAIL

logger = logging.getLogger(__name__)


class EmailProducerService:
"""Publishes email dispatch events to Kafka."""

def __init__(self, topic: str = NOTIFICATIONS_EMAIL) -> None:
self._topic = topic

async def send_email(
self,
to: str,
subject: str,
html_body: str | None,
template_data: dict[str, Any],
template: str,
) -> None:
payload = EmailPayload(
to=to,
subject=subject,
template=template,
data=template_data,
html_body=html_body,
)
event = EmailEvent(payload=payload)

kafka_manager = get_kafka_manager()
await kafka_manager.producer.send(self._topic, event, key=to)
logger.info("Queued email '%s' for %s", template, to)
Comment on lines +34 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid using the raw email address as the Kafka key.

key=to persists the recipient address in Kafka record metadata, and the info log repeats it. That creates retained PII in broker/admin tooling and log pipelines; use a non-PII stable key instead and redact the log. If you still need per-recipient ordering, hash the normalized address rather than storing it directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/email_producer.py` around lines 34 - 36, The code currently uses
the raw recipient address as Kafka key and logs it (see kafka_manager via
get_kafka_manager(), producer.send(self._topic, event, key=to) and logger.info),
which leaks PII; change to compute a stable non-PII key (e.g., hash of a
normalized email) and pass that to producer.send instead of `to`, and redact the
log entry (e.g., log a masked or hashed form or just the recipient domain) in
logger.info while keeping template and topic info; ensure normalization and
hashing happen in the same function/method that sends the message so ordering
remains stable per-recipient.



_email_producer_service = EmailProducerService()


def get_email_producer_service() -> EmailProducerService:
return _email_producer_service
13 changes: 13 additions & 0 deletions app/templates/email/password_reset.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!DOCTYPE html>
<html lang="en">
<body>
<h2>Reset your FluentMeet password</h2>
<p>We received a password reset request for your account.</p>
<p>
Continue by clicking
<a href="{{ reset_link }}">this password reset link</a>.
</p>
<p>If you did not request this change, you can ignore this email.</p>
</body>
</html>

12 changes: 12 additions & 0 deletions app/templates/email/verification.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!DOCTYPE html>
<html lang="en">
<body>
<h2>Verify your FluentMeet account</h2>
<p>Welcome to FluentMeet.</p>
<p>
Please verify your email by clicking
<a href="{{ verification_link }}">this verification link</a>.
</p>
</body>
</html>

Loading
Loading