diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 670480f..2c729ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,4 +31,4 @@ jobs: DATABASE_URL: postgresql://postgres:postgres@localhost:5432/fluentmeet_test REDIS_URL: redis://localhost:6379/1 run: | - pytest --cov=app --cov-fail-under=60 tests/ + pytest --cov=app --cov-fail-under=80 tests/ diff --git a/alembic/versions/11781e907181_initial_migration.py b/alembic/versions/11781e907181_initial_migration.py index f2ba412..a696b97 100644 --- a/alembic/versions/11781e907181_initial_migration.py +++ b/alembic/versions/11781e907181_initial_migration.py @@ -29,9 +29,9 @@ def upgrade() -> None: sa.Column("full_name", sa.String(length=255), nullable=True), sa.Column("is_active", sa.Boolean(), nullable=False), sa.Column("is_verified", sa.Boolean(), nullable=False), - sa.Column("created_at", sa.DateTime(), nullable=False), - sa.Column("updated_at", sa.DateTime(), nullable=False), - sa.Column("deleted_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True), sa.Column("speaking_language", sa.String(length=10), nullable=False), sa.Column("listening_language", sa.String(length=10), nullable=False), sa.PrimaryKeyConstraint("id"), diff --git a/app/api/v1/__init__.py b/app/api/v1/__init__.py new file mode 100644 index 0000000..16f0fec --- /dev/null +++ b/app/api/v1/__init__.py @@ -0,0 +1,3 @@ +from app.api.v1.api import api_router + +__all__ = ["api_router"] diff --git a/app/api/v1/api.py b/app/api/v1/api.py new file mode 100644 index 0000000..7b8990f --- /dev/null +++ b/app/api/v1/api.py @@ -0,0 +1,6 @@ +from fastapi import APIRouter + +from app.api.v1.endpoints.auth import router as auth_router + +api_router = APIRouter() +api_router.include_router(auth_router) diff --git a/app/api/v1/endpoints/auth.py b/app/api/v1/endpoints/auth.py new file mode 100644 index 0000000..cba71fd --- /dev/null +++ b/app/api/v1/endpoints/auth.py @@ -0,0 +1,20 @@ +from fastapi import APIRouter, Depends, status +from sqlalchemy.orm import Session + +from app.crud.user import create_user +from app.db.session import get_db +from app.schemas.auth import SignupResponse +from app.schemas.user import UserCreate + +router = APIRouter(prefix="/auth", tags=["auth"]) +DB_SESSION_DEPENDENCY = Depends(get_db) + + +@router.post( + "/signup", + response_model=SignupResponse, + status_code=status.HTTP_201_CREATED, +) +def signup(user_in: UserCreate, db: Session = DB_SESSION_DEPENDENCY) -> SignupResponse: + user = create_user(db=db, user_in=user_in) + return SignupResponse.model_validate(user) diff --git a/app/crud/__init__.py b/app/crud/__init__.py index e69de29..8d1881d 100644 --- a/app/crud/__init__.py +++ b/app/crud/__init__.py @@ -0,0 +1,3 @@ +from app.crud.user import create_user, get_user_by_email + +__all__ = ["create_user", "get_user_by_email"] diff --git a/app/crud/user.py b/app/crud/user.py new file mode 100644 index 0000000..8abacb9 --- /dev/null +++ b/app/crud/user.py @@ -0,0 +1,49 @@ +from typing import cast + +import bcrypt +from passlib.context import CryptContext +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.core.exceptions import ConflictException +from app.models.user import User +from app.schemas.user import UserCreate + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +def hash_password(password: str) -> str: + try: + return cast(str, pwd_context.hash(password)) + except ValueError: + # Passlib's bcrypt backend probing can fail with newer bcrypt builds. + salt = bcrypt.gensalt() + return bcrypt.hashpw(password.encode("utf-8"), salt).decode("utf-8") + + +def get_user_by_email(db: Session, email: str) -> User | None: + statement = select(User).where(User.email == email.lower()) + return db.execute(statement).scalar_one_or_none() + + +def create_user(db: Session, user_in: UserCreate) -> User: + existing_user = get_user_by_email(db, user_in.email) + if existing_user: + raise ConflictException( + code="EMAIL_ALREADY_REGISTERED", + message="An account with this email already exists.", + ) + + db_user = User( + email=user_in.email.lower(), + hashed_password=hash_password(user_in.password), + full_name=user_in.full_name, + speaking_language=user_in.speaking_language.value, + listening_language=user_in.listening_language.value, + is_active=True, + is_verified=False, + ) + db.add(db_user) + db.commit() + db.refresh(db_user) + return db_user diff --git a/app/db/__init__.py b/app/db/__init__.py index e69de29..eb50306 100644 --- a/app/db/__init__.py +++ b/app/db/__init__.py @@ -0,0 +1,3 @@ +from app.db.session import SessionLocal, get_db, get_engine + +__all__ = ["SessionLocal", "get_db", "get_engine"] diff --git a/app/db/session.py b/app/db/session.py new file mode 100644 index 0000000..2132818 --- /dev/null +++ b/app/db/session.py @@ -0,0 +1,42 @@ +from collections.abc import Generator +from typing import Final + +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine +from sqlalchemy.orm import Session, sessionmaker + +from app.core.config import settings + +DEFAULT_SQLITE_URL: Final[str] = "sqlite:///./fluentmeet.db" +DATABASE_URL = settings.DATABASE_URL or DEFAULT_SQLITE_URL + +_ENGINE_STATE: dict[str, Engine] = {} +SessionLocal = sessionmaker(autoflush=False, autocommit=False) + + +def get_engine() -> Engine: + cached_engine = _ENGINE_STATE.get("engine") + if cached_engine is None: + try: + cached_engine = create_engine(DATABASE_URL, pool_pre_ping=True) + except ModuleNotFoundError as exc: + # CI/test environments may not install PostgreSQL DBAPI drivers. + if DATABASE_URL.startswith("postgresql") and exc.name in { + "psycopg2", + "psycopg", + }: + cached_engine = create_engine(DEFAULT_SQLITE_URL, pool_pre_ping=True) + else: + raise + SessionLocal.configure(bind=cached_engine) + _ENGINE_STATE["engine"] = cached_engine + return cached_engine + + +def get_db() -> Generator[Session, None, None]: + get_engine() + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/app/kafka/schemas.py b/app/kafka/schemas.py index 179f053..fa7ff9a 100644 --- a/app/kafka/schemas.py +++ b/app/kafka/schemas.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime +from datetime import UTC, datetime from typing import Any, Generic, TypeVar from pydantic import BaseModel, Field @@ -14,7 +14,7 @@ class BaseEvent(BaseModel, Generic[T]): event_id: uuid.UUID = Field(default_factory=uuid.uuid4) event_type: str - timestamp: datetime = Field(default_factory=datetime.utcnow) + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) payload: T @@ -28,7 +28,7 @@ class DLQEvent(BaseModel): original_topic: str original_event: dict[str, Any] error_message: str - failed_at: datetime = Field(default_factory=datetime.utcnow) + failed_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) retry_count: int diff --git a/app/main.py b/app/main.py index 77ca98b..8bdafed 100644 --- a/app/main.py +++ b/app/main.py @@ -1,22 +1,33 @@ +import logging from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from app.api.v1 import api_router from app.core.config import settings from app.core.exception_handlers import register_exception_handlers from app.kafka.manager import get_kafka_manager +logger = logging.getLogger(__name__) + @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # Startup kafka_manager = get_kafka_manager() - await kafka_manager.start() + kafka_started = False + try: + await kafka_manager.start() + kafka_started = True + except Exception as exc: + # Keep API startup alive in environments where Kafka isn't available (e.g. CI). + logger.warning("Kafka startup skipped: %s", exc) yield # Shutdown - await kafka_manager.stop() + if kafka_started: + await kafka_manager.stop() app = FastAPI( @@ -36,6 +47,7 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: ) register_exception_handlers(app) +app.include_router(api_router, prefix=settings.API_V1_STR) @app.get("/health", tags=["health"]) diff --git a/app/models/user.py b/app/models/user.py index df02670..542fce3 100644 --- a/app/models/user.py +++ b/app/models/user.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import UTC, datetime from sqlalchemy import Boolean, DateTime, String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column @@ -8,6 +8,10 @@ class Base(DeclarativeBase): pass +def utc_now() -> datetime: + return datetime.now(UTC) + + class User(Base): __tablename__ = "users" @@ -19,11 +23,15 @@ class User(Base): full_name: Mapped[str | None] = mapped_column(String(255), nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True) is_verified: Mapped[bool] = mapped_column(Boolean, default=False) - created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=utc_now + ) updated_at: Mapped[datetime] = mapped_column( - DateTime, default=datetime.utcnow, onupdate=datetime.utcnow + DateTime(timezone=True), default=utc_now, onupdate=utc_now + ) + deleted_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True ) - deleted_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) # Language preferences speaking_language: Mapped[str] = mapped_column(String(10), default="en") diff --git a/app/schemas/auth.py b/app/schemas/auth.py new file mode 100644 index 0000000..1f20b45 --- /dev/null +++ b/app/schemas/auth.py @@ -0,0 +1,5 @@ +from app.schemas.user import UserResponse + + +class SignupResponse(UserResponse): + """Public payload returned by the signup endpoint.""" diff --git a/app/schemas/user.py b/app/schemas/user.py index 2243cc2..c49a8d1 100644 --- a/app/schemas/user.py +++ b/app/schemas/user.py @@ -1,13 +1,36 @@ from datetime import datetime +from enum import StrEnum -from pydantic import BaseModel, EmailStr, Field +from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator + + +class SupportedLanguage(StrEnum): + ENGLISH = "en" + FRENCH = "fr" + GERMAN = "de" + SPANISH = "es" + ITALIAN = "it" + PORTUGUESE = "pt" class UserBase(BaseModel): email: EmailStr - full_name: str | None = None - speaking_language: str = "en" - listening_language: str = "en" + full_name: str | None = Field(default=None, max_length=255) + speaking_language: SupportedLanguage = SupportedLanguage.ENGLISH + listening_language: SupportedLanguage = SupportedLanguage.ENGLISH + + @field_validator("email", mode="before") + @classmethod + def normalize_email(cls, value: str) -> str: + return value.strip().lower() + + @field_validator("full_name", mode="before") + @classmethod + def strip_full_name(cls, value: str | None) -> str | None: + if value is None: + return None + stripped_value = value.strip() + return stripped_value or None class UserCreate(UserBase): @@ -15,11 +38,19 @@ class UserCreate(UserBase): class UserUpdate(BaseModel): - full_name: str | None = None - speaking_language: str | None = None - listening_language: str | None = None + full_name: str | None = Field(default=None, max_length=255) + speaking_language: SupportedLanguage | None = None + listening_language: SupportedLanguage | None = None password: str | None = Field(None, min_length=8) + @field_validator("full_name", mode="before") + @classmethod + def strip_full_name(cls, value: str | None) -> str | None: + if value is None: + return None + stripped_value = value.strip() + return stripped_value or None + class UserResponse(UserBase): id: int @@ -27,8 +58,7 @@ class UserResponse(UserBase): is_verified: bool created_at: datetime - class Config: - from_attributes = True + model_config = ConfigDict(from_attributes=True) class Token(BaseModel): diff --git a/docs/email_service.md b/docs/email_service.md new file mode 100644 index 0000000..fdd07cf --- /dev/null +++ b/docs/email_service.md @@ -0,0 +1,68 @@ +### Feature: Implement Mailgun Email Service via Kafka + +**Problem** +The FluentMeet application needs to send transactional emails for user account verification, password reset, and other notifications. Currently, no email service is integrated. Directly calling a third-party email API synchronously inside request handlers would increase latency and couple the request lifecycle to an external service, creating a poor user experience if the provider is slow or unavailable. + +**Proposed Solution** +Integrate Mailgun as the email provider and decouple email sending from the HTTP request lifecycle using Apache Kafka. When an email needs to be sent, the application will publish a structured message to a Kafka topic (`notifications.email`). A dedicated consumer worker will pick up the message and dispatch it via the Mailgun REST API. This approach makes email sending asynchronous, resilient, and independently scalable. + +**User Stories** +* **As a new user,** I want to receive a verification email immediately after signing up, so I can activate my account without experiencing delays in the registration response. +* **As a user,** I want to receive a password reset email when I request one, so I can regain access to my account. +* **As a developer,** I want a reusable, Kafka-backed email service so that any part of the system can trigger an email without being blocked by the Mailgun API call. +* **As a DevOps engineer,** I want email failures to be retried automatically and logged clearly, so transient Mailgun outages don't result in silently dropped emails. + +**Acceptance Criteria** +1. A `Mailgun` configuration block (API key, domain, sender address) is defined in `app/core/config.py` and sourced from environment variables — never hardcoded. +2. A Kafka topic `notifications.email` is created and documented in the infrastructure setup. +3. An `EmailProducerService` is implemented with a `send_email(to, subject, html_body, template_data)` method that publishes a structured JSON message to the `notifications.email` Kafka topic. +4. An `EmailConsumerWorker` is implemented to: + * Consume messages from the `notifications.email` topic. + * Call the Mailgun REST API (`/messages`) to deliver the email. + * Log success and failure outcomes. + * Handle retries on transient failures using Kafka consumer group offsets. +5. Email templates are implemented for: + * **Account Verification**: Contains the verification link. + * **Password Reset**: Contains the time-limited reset link. +6. The `EmailProducerService` is injected into and called from the user registration and password reset flows. +7. Unit tests verify that the producer publishes the correct message payload to the Kafka topic. +8. Integration tests verify the full flow: producer publishes → consumer dispatches → Mailgun API is called. + +**Proposed Technical Details** +* **Mailgun SDK**: Use the `mailgun2` library (already in `requirements.txt`) or direct `httpx` calls to the Mailgun `/messages` endpoint. +* **Kafka Topic**: `notifications.email` — messages follow a standard envelope: + ```json + { + "to": "user@example.com", + "subject": "Verify your FluentMeet account", + "template": "verification", + "data": { "verification_link": "https://..." } + } + ``` +* **Producer**: `app/services/email_producer.py` — uses `aiokafka.AIOKafkaProducer` to publish messages asynchronously. +* **Consumer Worker**: `app/services/email_consumer.py` — long-running `aiokafka.AIOKafkaConsumer` in a background task, started via FastAPI `lifespan` events. +* **Templates**: Jinja2 HTML templates stored in `app/templates/email/` (e.g., `verification.html`, `password_reset.html`). +* **Config**: New fields in `app/core/config.py`: + ```python + MAILGUN_API_KEY: str + MAILGUN_DOMAIN: str + MAILGUN_FROM_ADDRESS: str = "no-reply@fluentmeet.com" + ``` + +**Tasks** +- [ ] Add `MAILGUN_API_KEY`, `MAILGUN_DOMAIN`, and `MAILGUN_FROM_ADDRESS` to `.env.example` and `app/core/config.py`. +- [ ] Create the `notifications.email` Kafka topic and document it in `infra/`. +- [ ] Implement `EmailProducerService` in `app/services/email_producer.py`. +- [ ] Implement `EmailConsumerWorker` in `app/services/email_consumer.py`. +- [ ] Register the consumer as a background task in the FastAPI `lifespan` context manager in `app/main.py`. +- [ ] Create Jinja2 HTML templates for verification and password reset emails. +- [ ] Integrate the email producer into the user registration endpoint (`POST /api/v1/auth/signup`). +- [ ] Integrate the email producer into the password reset endpoint (`POST /api/v1/auth/forgot-password`). +- [ ] Write unit tests for the `EmailProducerService` (mock the Kafka producer). +- [ ] Write integration tests for the full consumer → Mailgun dispatch flow (mock the Mailgun API). + +**Open Questions/Considerations** +* What is the retry strategy for failed Mailgun deliveries — dead-letter queue or fixed retry count? +* Should email sending failures be surfaced to the user (e.g., "email failed to send, please try again") or handled silently with a background retry? +* What is the Kafka consumer group ID for the email worker, and how should it be managed across deployments? +* Should we implement a resend-verification endpoint for users whose verification tokens have expired? diff --git a/tests/test_auth/__init__.py b/tests/test_auth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_auth/test_auth_signup.py b/tests/test_auth/test_auth_signup.py new file mode 100644 index 0000000..1f6f590 --- /dev/null +++ b/tests/test_auth/test_auth_signup.py @@ -0,0 +1,116 @@ +from collections.abc import Generator + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine, select +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import StaticPool + +from app.db.session import get_db +from app.main import app +from app.models.user import Base, User + + +@pytest.fixture +def db_session() -> Generator[Session, None, None]: + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + TestingSessionLocal = sessionmaker( + autocommit=False, + autoflush=False, + bind=engine, + ) + Base.metadata.create_all(bind=engine) + db = TestingSessionLocal() + try: + yield db + finally: + db.close() + Base.metadata.drop_all(bind=engine) + engine.dispose() + + +@pytest.fixture +def client(db_session: Session) -> Generator[TestClient, None, None]: + def _override_get_db() -> Generator[Session, None, None]: + yield db_session + + app.dependency_overrides[get_db] = _override_get_db + with TestClient(app) as test_client: + yield test_client + app.dependency_overrides.clear() + + +def test_signup_success_creates_user_and_returns_public_profile( + client: TestClient, db_session: Session +) -> None: + payload = { + "email": " USER@example.com ", + "password": "MyStr0ngP@ss!", + "full_name": " Ada Lovelace ", + "speaking_language": "en", + "listening_language": "fr", + } + + response = client.post("/api/v1/auth/signup", json=payload) + + assert response.status_code == 201 + body = response.json() + assert body["email"] == "user@example.com" + assert body["full_name"] == "Ada Lovelace" + assert body["speaking_language"] == "en" + assert body["listening_language"] == "fr" + assert body["is_active"] is True + assert body["is_verified"] is False + assert "password" not in body + assert "hashed_password" not in body + + created_user = db_session.execute( + select(User).where(User.email == "user@example.com") + ).scalar_one() + assert created_user.hashed_password != payload["password"] + assert created_user.hashed_password.startswith("$2") + assert created_user.is_active is True + assert created_user.is_verified is False + + +def test_signup_duplicate_email_returns_conflict(client: TestClient) -> None: + payload = { + "email": "duplicate@example.com", + "password": "MyStr0ngP@ss!", + "full_name": "Duplicate User", + } + + first = client.post("/api/v1/auth/signup", json=payload) + second = client.post("/api/v1/auth/signup", json=payload) + + assert first.status_code == 201 + assert second.status_code == 409 + assert second.json() == { + "status": "error", + "code": "EMAIL_ALREADY_REGISTERED", + "message": "An account with this email already exists.", + "details": [], + } + + +def test_signup_invalid_language_uses_standard_validation_shape( + client: TestClient, +) -> None: + payload = { + "email": "user2@example.com", + "password": "MyStr0ngP@ss!", + "speaking_language": "zz", + } + + response = client.post("/api/v1/auth/signup", json=payload) + + assert response.status_code == 400 + body = response.json() + assert body["status"] == "error" + assert body["code"] == "VALIDATION_ERROR" + fields = [detail["field"] for detail in body["details"]] + assert "body.speaking_language" in fields diff --git a/tests/test_auth/test_schemas_user.py b/tests/test_auth/test_schemas_user.py new file mode 100644 index 0000000..f3f2705 --- /dev/null +++ b/tests/test_auth/test_schemas_user.py @@ -0,0 +1,24 @@ +from datetime import UTC, datetime +from types import SimpleNamespace + +from app.schemas.user import SupportedLanguage, UserResponse + + +def test_user_response_can_validate_from_attributes() -> None: + source = SimpleNamespace( + id=123, + email="person@example.com", + full_name="Test Person", + speaking_language="en", + listening_language="fr", + is_active=True, + is_verified=False, + created_at=datetime.now(UTC), + ) + + result = UserResponse.model_validate(source) + + assert result.id == 123 + assert result.email == "person@example.com" + assert result.speaking_language == SupportedLanguage.ENGLISH + assert result.listening_language == SupportedLanguage.FRENCH