Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ jobs:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/fluentmeet_test
REDIS_URL: redis://localhost:6379/1
run: |
pytest --cov=app --cov-fail-under=60 tests/
pytest --cov=app --cov-fail-under=80 tests/
6 changes: 3 additions & 3 deletions alembic/versions/11781e907181_initial_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ def upgrade() -> None:
sa.Column("full_name", sa.String(length=255), nullable=True),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("is_verified", sa.Boolean(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.Column("deleted_at", sa.DateTime(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("speaking_language", sa.String(length=10), nullable=False),
sa.Column("listening_language", sa.String(length=10), nullable=False),
sa.PrimaryKeyConstraint("id"),
Expand Down
3 changes: 3 additions & 0 deletions app/api/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from app.api.v1.api import api_router

__all__ = ["api_router"]
6 changes: 6 additions & 0 deletions app/api/v1/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from fastapi import APIRouter

from app.api.v1.endpoints.auth import router as auth_router

api_router = APIRouter()
api_router.include_router(auth_router)
20 changes: 20 additions & 0 deletions app/api/v1/endpoints/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session

from app.crud.user import create_user
from app.db.session import get_db
from app.schemas.auth import SignupResponse
from app.schemas.user import UserCreate

router = APIRouter(prefix="/auth", tags=["auth"])
DB_SESSION_DEPENDENCY = Depends(get_db)


@router.post(
"/signup",
response_model=SignupResponse,
status_code=status.HTTP_201_CREATED,
)
def signup(user_in: UserCreate, db: Session = DB_SESSION_DEPENDENCY) -> SignupResponse:
user = create_user(db=db, user_in=user_in)
return SignupResponse.model_validate(user)
3 changes: 3 additions & 0 deletions app/crud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from app.crud.user import create_user, get_user_by_email

__all__ = ["create_user", "get_user_by_email"]
49 changes: 49 additions & 0 deletions app/crud/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import cast

import bcrypt
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.orm import Session

from app.core.exceptions import ConflictException
from app.models.user import User
from app.schemas.user import UserCreate

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
try:
return cast(str, pwd_context.hash(password))
except ValueError:
# Passlib's bcrypt backend probing can fail with newer bcrypt builds.
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode("utf-8"), salt).decode("utf-8")


def get_user_by_email(db: Session, email: str) -> User | None:
statement = select(User).where(User.email == email.lower())
return db.execute(statement).scalar_one_or_none()


def create_user(db: Session, user_in: UserCreate) -> User:
existing_user = get_user_by_email(db, user_in.email)
if existing_user:
raise ConflictException(
code="EMAIL_ALREADY_REGISTERED",
message="An account with this email already exists.",
)

db_user = User(
email=user_in.email.lower(),
hashed_password=hash_password(user_in.password),
full_name=user_in.full_name,
speaking_language=user_in.speaking_language.value,
listening_language=user_in.listening_language.value,
is_active=True,
is_verified=False,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
Comment on lines +30 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== create_user transaction/error handling ==="
rg -nP 'def create_user|db\.commit\(|db\.rollback\(|IntegrityError|ConflictException' app/crud/user.py -C 3

echo "=== email uniqueness constraints ==="
rg -nP 'email.*unique=True|ix_users_email|UniqueConstraint' app/models/user.py alembic/versions/11781e907181_initial_migration.py -C 2

Repository: Brints/FluentMeet

Length of output: 1885


Handle unique-email race condition at commit time with proper error handling.

The application-level read-before-write check is vulnerable to a race condition under concurrency. Two concurrent requests can both pass the uniqueness check on line 31 and attempt to insert the same email, causing the second request to fail at the database constraint (unique index on email exists in migrations). Without error handling around db.commit(), this surfaces as an unhandled IntegrityError instead of your intended ConflictException.

Wrap db.commit() in a try-except block to catch IntegrityError and map it to ConflictException, with rollback on any exception:

Suggested fix
 from typing import cast
 
 import bcrypt
 from passlib.context import CryptContext
+from sqlalchemy.exc import IntegrityError
 from sqlalchemy import select
 from sqlalchemy.orm import Session
@@
 def create_user(db: Session, user_in: UserCreate) -> User:
@@
     db.add(db_user)
-    db.commit()
+    try:
+        db.commit()
+    except IntegrityError as exc:
+        db.rollback()
+        raise ConflictException(
+            code="EMAIL_ALREADY_REGISTERED",
+            message="An account with this email already exists.",
+        ) from exc
+    except Exception:
+        db.rollback()
+        raise
     db.refresh(db_user)
     return db_user
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/crud/user.py` around lines 30 - 49, The current read-before-write
uniqueness check using get_user_by_email is racy: wrap the db.commit() that
persists the new User (created in this block with email=user_in.email.lower()
and hashed_password=hash_password(...)) in a try/except that catches
sqlalchemy.exc.IntegrityError (or the DB driver's IntegrityError), calls
db.rollback(), and raises ConflictException(code="EMAIL_ALREADY_REGISTERED",
message="An account with this email already exists.") when the IntegrityError
indicates a duplicate-email constraint; re-raise other exceptions after
rollback. Ensure db.add(db_user) remains before the try and db.refresh/db_user
only run after a successful commit.

3 changes: 3 additions & 0 deletions app/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from app.db.session import SessionLocal, get_db, get_engine

__all__ = ["SessionLocal", "get_db", "get_engine"]
42 changes: 42 additions & 0 deletions app/db/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from collections.abc import Generator
from typing import Final

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker

from app.core.config import settings

DEFAULT_SQLITE_URL: Final[str] = "sqlite:///./fluentmeet.db"
DATABASE_URL = settings.DATABASE_URL or DEFAULT_SQLITE_URL

_ENGINE_STATE: dict[str, Engine] = {}
SessionLocal = sessionmaker(autoflush=False, autocommit=False)


def get_engine() -> Engine:
cached_engine = _ENGINE_STATE.get("engine")
if cached_engine is None:
try:
cached_engine = create_engine(DATABASE_URL, pool_pre_ping=True)
except ModuleNotFoundError as exc:
# CI/test environments may not install PostgreSQL DBAPI drivers.
if DATABASE_URL.startswith("postgresql") and exc.name in {
"psycopg2",
"psycopg",
}:
cached_engine = create_engine(DEFAULT_SQLITE_URL, pool_pre_ping=True)
Comment on lines +22 to +28
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fail fast outside tests instead of silently switching to SQLite.

This fallback is not actually scoped to CI/tests. Any runtime with a PostgreSQL URL and a missing driver will start writing to sqlite:///./fluentmeet.db instead of failing, which can hide a broken deployment and point the service at the wrong database. Please gate this behind an explicit test-only setting or remove the fallback in non-test environments.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/db/session.py` around lines 22 - 28, The current except block silently
falls back to SQLite when a PostgreSQL DBAPI is missing (checking DATABASE_URL
and creating cached_engine with DEFAULT_SQLITE_URL); change this so missing
drivers fail fast in non-test environments: detect an explicit test-only flag
(e.g., a TESTING or IS_TEST_ENV environment/config variable) and only perform
the SQLite fallback when that flag is true, otherwise re-raise the
ModuleNotFoundError or raise a clear RuntimeError with context including
DATABASE_URL and exc.name; update the code paths around the
create_engine/cached_engine handling in session.py to reference DATABASE_URL,
DEFAULT_SQLITE_URL, and the new TESTING flag so deployments never silently
switch to SQLite.

else:
raise
SessionLocal.configure(bind=cached_engine)
_ENGINE_STATE["engine"] = cached_engine
Comment on lines +18 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make engine initialization thread-safe.

get_engine() does an unsynchronized check-then-set on _ENGINE_STATE. Two concurrent first requests can both create an engine and race SessionLocal.configure(...), leaving multiple pools alive and startup binding nondeterministic.

🧵 One way to serialize first-use initialization
 def get_engine() -> Engine:
     cached_engine = _ENGINE_STATE.get("engine")
     if cached_engine is None:
-        try:
-            cached_engine = create_engine(DATABASE_URL, pool_pre_ping=True)
-        except ModuleNotFoundError as exc:
-            # CI/test environments may not install PostgreSQL DBAPI drivers.
-            if DATABASE_URL.startswith("postgresql") and exc.name in {
-                "psycopg2",
-                "psycopg",
-            }:
-                cached_engine = create_engine(DEFAULT_SQLITE_URL, pool_pre_ping=True)
-            else:
-                raise
-        SessionLocal.configure(bind=cached_engine)
-        _ENGINE_STATE["engine"] = cached_engine
+        with _ENGINE_LOCK:
+            cached_engine = _ENGINE_STATE.get("engine")
+            if cached_engine is None:
+                try:
+                    cached_engine = create_engine(DATABASE_URL, pool_pre_ping=True)
+                except ModuleNotFoundError as exc:
+                    # CI/test environments may not install PostgreSQL DBAPI drivers.
+                    if DATABASE_URL.startswith("postgresql") and exc.name in {
+                        "psycopg2",
+                        "psycopg",
+                    }:
+                        cached_engine = create_engine(DEFAULT_SQLITE_URL, pool_pre_ping=True)
+                    else:
+                        raise
+                SessionLocal.configure(bind=cached_engine)
+                _ENGINE_STATE["engine"] = cached_engine
     return cached_engine

Add a module lock next to _ENGINE_STATE:

from threading import Lock

_ENGINE_LOCK = Lock()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/db/session.py` around lines 18 - 32, The engine initialization performs
an unsynchronized check-then-set on _ENGINE_STATE which can race when two
threads call get_engine() concurrently; add a module-level threading.Lock (e.g.,
_ENGINE_LOCK = Lock()) and use it to serialize the first-use initialization:
perform a fast check for _ENGINE_STATE["engine"], if missing acquire
_ENGINE_LOCK, re-check _ENGINE_STATE["engine"] (double-checked locking), then
create the engine with create_engine(...), call
SessionLocal.configure(bind=...), and set _ENGINE_STATE["engine"] before
releasing the lock so only one engine/pool is created.

return cached_engine


def get_db() -> Generator[Session, None, None]:
get_engine()
db = SessionLocal()
try:
yield db
finally:
db.close()
6 changes: 3 additions & 3 deletions app/kafka/schemas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from datetime import datetime
from datetime import UTC, datetime
from typing import Any, Generic, TypeVar

from pydantic import BaseModel, Field
Expand All @@ -14,7 +14,7 @@ class BaseEvent(BaseModel, Generic[T]):

event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
event_type: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
payload: T


Expand All @@ -28,7 +28,7 @@ class DLQEvent(BaseModel):
original_topic: str
original_event: dict[str, Any]
error_message: str
failed_at: datetime = Field(default_factory=datetime.utcnow)
failed_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
retry_count: int


Expand Down
16 changes: 14 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.api.v1 import api_router
from app.core.config import settings
from app.core.exception_handlers import register_exception_handlers
from app.kafka.manager import get_kafka_manager

logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# Startup
kafka_manager = get_kafka_manager()
await kafka_manager.start()
kafka_started = False
try:
await kafka_manager.start()
kafka_started = True
except Exception as exc:
# Keep API startup alive in environments where Kafka isn't available (e.g. CI).
logger.warning("Kafka startup skipped: %s", exc)
yield
# Shutdown
await kafka_manager.stop()
if kafka_started:
await kafka_manager.stop()


app = FastAPI(
Expand All @@ -36,6 +47,7 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
)

register_exception_handlers(app)
app.include_router(api_router, prefix=settings.API_V1_STR)


@app.get("/health", tags=["health"])
Expand Down
16 changes: 12 additions & 4 deletions app/models/user.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import UTC, datetime

from sqlalchemy import Boolean, DateTime, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
Expand All @@ -8,6 +8,10 @@ class Base(DeclarativeBase):
pass


def utc_now() -> datetime:
return datetime.now(UTC)


class User(Base):
__tablename__ = "users"

Expand All @@ -19,11 +23,15 @@ class User(Base):
full_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_verified: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=utc_now
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
DateTime(timezone=True), default=utc_now, onupdate=utc_now
)
deleted_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
deleted_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)

# Language preferences
speaking_language: Mapped[str] = mapped_column(String(10), default="en")
Expand Down
5 changes: 5 additions & 0 deletions app/schemas/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from app.schemas.user import UserResponse


class SignupResponse(UserResponse):
"""Public payload returned by the signup endpoint."""
48 changes: 39 additions & 9 deletions app/schemas/user.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,64 @@
from datetime import datetime
from enum import StrEnum

from pydantic import BaseModel, EmailStr, Field
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator


class SupportedLanguage(StrEnum):
ENGLISH = "en"
FRENCH = "fr"
GERMAN = "de"
SPANISH = "es"
ITALIAN = "it"
PORTUGUESE = "pt"


class UserBase(BaseModel):
email: EmailStr
full_name: str | None = None
speaking_language: str = "en"
listening_language: str = "en"
full_name: str | None = Field(default=None, max_length=255)
speaking_language: SupportedLanguage = SupportedLanguage.ENGLISH
listening_language: SupportedLanguage = SupportedLanguage.ENGLISH

@field_validator("email", mode="before")
@classmethod
def normalize_email(cls, value: str) -> str:
return value.strip().lower()

@field_validator("full_name", mode="before")
@classmethod
def strip_full_name(cls, value: str | None) -> str | None:
if value is None:
return None
stripped_value = value.strip()
return stripped_value or None


class UserCreate(UserBase):
password: str = Field(..., min_length=8)


class UserUpdate(BaseModel):
full_name: str | None = None
speaking_language: str | None = None
listening_language: str | None = None
full_name: str | None = Field(default=None, max_length=255)
speaking_language: SupportedLanguage | None = None
listening_language: SupportedLanguage | None = None
password: str | None = Field(None, min_length=8)

@field_validator("full_name", mode="before")
@classmethod
def strip_full_name(cls, value: str | None) -> str | None:
if value is None:
return None
stripped_value = value.strip()
return stripped_value or None


class UserResponse(UserBase):
id: int
is_active: bool
is_verified: bool
created_at: datetime

class Config:
from_attributes = True
model_config = ConfigDict(from_attributes=True)


class Token(BaseModel):
Expand Down
Loading
Loading