Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/code_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ name: Tests

on:
push:
branches: [ main ]
branches: [main]
pull_request:
branches: [ main ]
branches: [main]

schedule:
- cron: "0 0 * * 0" # Runs every Sunday at midnight UTC
- cron: "0 0 * * 0" # Runs every Sunday at midnight UTC

workflow_dispatch:

Expand All @@ -34,6 +34,8 @@ jobs:
test_backend:
needs: fetch-python-versions
runs-on: ubuntu-latest
env:
UV_NO_DEV: true

permissions:
id-token: write
Expand All @@ -52,7 +54,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: uv sync --group dev
run: uv sync --group test

- name: Unit tests
working-directory: backend
Expand Down
10 changes: 6 additions & 4 deletions .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ permissions:
contents: read
on:
push:
branches: [ main ]
branches: [main]
pull_request:
branches: [ main ]
branches: [main]

jobs:
sonarqube:
name: SonarQube
runs-on: ubuntu-latest
env:
UV_NO_DEV: true
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis

- uses: astral-sh/setup-uv@v7

- name: Install dependencies
run: uv sync --group dev
run: uv sync --group test

- name: Run tests and generate coverage
working-directory: backend
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ dist/
*.log
coverage.xml
.coverage.*
.fastembed_cache
.database
.amdb
Comment on lines +19 to +21
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Verify usage of fastembed and identify what .amdb files are

# Check for fastembed imports or references
echo "=== Checking for fastembed usage ==="
rg -i 'fastembed' --type=py -C 2

# Check for .amdb file references or existing files
echo -e "\n=== Checking for .amdb references ==="
rg -i '\.amdb' -C 2

# Look for any existing files matching these patterns
echo -e "\n=== Checking for existing files matching patterns ==="
fd -H -t f '^\.fastembed_cache$'
fd -H -t f '^\.database$'
fd -H '\.amdb$'

# Check requirements/dependencies for fastembed
echo -e "\n=== Checking dependencies ==="
fd -t f 'requirements.*\.txt|pyproject\.toml|setup\.py|Pipfile' --exec cat {}

Repository: eggzec/permit

Length of output: 5047


Remove .fastembed_cache from .gitignore — it's not a project dependency.

The verification shows FastEmbed is neither listed in pyproject.toml dependencies nor referenced in the codebase. This pattern appears to be out of scope for the license key generation PR. Additionally, .amdb is unidentified and lacks any references in the codebase. The .database pattern is unclear given the project uses PostgreSQL (server-based) rather than file-based databases.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.gitignore around lines 19 - 21, Remove the unrelated patterns from
.gitignore: delete the lines for ".fastembed_cache", ".amdb", and ".database"
since FastEmbed isn't a dependency and those artifacts aren't used by the
project; keep only repository-specific, relevant ignore entries (e.g., OS,
editor, build artifacts, or any actual local DB files if required), then commit
the updated .gitignore so the unnecessary cache/database patterns are no longer
ignored.

10 changes: 3 additions & 7 deletions backend/app/api/routes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
router = APIRouter()


@router.post(
"/signup",
status_code=status.HTTP_201_CREATED,
response_model=SuccessResponse[SignupResponse],
)
@router.post("/signup", status_code=status.HTTP_201_CREATED)
def signup(
body: SignupRequest, cursor: CursorDep, settings: SettingsDep
) -> SuccessResponse[SignupResponse]:
Expand All @@ -40,7 +36,7 @@ def signup(
return SuccessResponse(data=result)


@router.post("/login", response_model=SuccessResponse[TokenPair])
@router.post("/login")
def login(
body: LoginRequest, cursor: CursorDep, settings: SettingsDep
) -> SuccessResponse[TokenPair]:
Expand All @@ -59,7 +55,7 @@ def login(
return SuccessResponse(data=result)


@router.post("/refresh", response_model=SuccessResponse[TokenPair])
@router.post("/refresh")
def refresh(
body: RefreshRequest, cursor: CursorDep, settings: SettingsDep
) -> SuccessResponse[TokenPair]:
Expand Down
4 changes: 0 additions & 4 deletions backend/app/api/routes/login.py

This file was deleted.

30 changes: 30 additions & 0 deletions backend/app/core/ed25519.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key,
load_pem_public_key,
)
from cryptography.utils import Buffer


__all__ = ["load_private_ed25519_key", "load_public_ed25519_key"]


def load_private_ed25519_key(
pem: Buffer, password: bytes | None = None
) -> Ed25519PrivateKey:
key = load_pem_private_key(pem, password=password)
if not isinstance(key, Ed25519PrivateKey):
raise TypeError("Provided key is not an Ed25519 private key")
return key


def load_public_ed25519_key(pem: bytes) -> Ed25519PublicKey:
key = load_pem_public_key(pem)
if not isinstance(key, Ed25519PublicKey):
raise TypeError("Provided key is not an Ed25519 public key")
return key
Comment on lines +17 to +30
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Type signature inconsistency between private and public key loaders.

load_private_ed25519_key accepts Buffer (which includes bytes, bytearray, memoryview) for the pem parameter, but load_public_ed25519_key only accepts bytes. Since load_pem_public_key from cryptography also accepts Buffer, consider using consistent types for both functions.

♻️ Proposed fix for type consistency
-def load_public_ed25519_key(pem: bytes) -> Ed25519PublicKey:
+def load_public_ed25519_key(pem: Buffer) -> Ed25519PublicKey:
     key = load_pem_public_key(pem)
     if not isinstance(key, Ed25519PublicKey):
         raise TypeError("Provided key is not an Ed25519 public key")
     return key
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/ed25519.py` around lines 17 - 30, The function
load_public_ed25519_key has an inconsistent type hint for its pem parameter
(bytes) compared to load_private_ed25519_key (Buffer); update
load_public_ed25519_key's signature to accept the same Buffer type as
load_private_ed25519_key (or the equivalent union like bytes | bytearray |
memoryview) so both loaders use consistent input types, and keep the existing
runtime behavior (call load_pem_public_key and the isinstance check against
Ed25519PublicKey).

10 changes: 5 additions & 5 deletions backend/app/core/exception_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def api_exception_handler(request: Request, exc: APIException) -> JSONResponse:
code=exc.error_code,
message=exc.message,
http_status=exc.http_status,
details=_build_error_details(exc.details),
details=build_error_details(exc.details),
request_id=request_id,
)
)
Expand All @@ -64,7 +64,7 @@ def validation_exception_handler(
details = []
for error in exc.errors():
field_path = ".".join(str(loc) for loc in error["loc"][1:])
field = field_path if field_path else None
field = field_path or None
details.append({"field": field, "message": error["msg"]})

logger.warning(
Expand All @@ -84,7 +84,7 @@ def validation_exception_handler(
code=ErrorCode.VALIDATION_FAILED,
message="Validation error",
http_status=http_422_unprocessable_content,
details=_build_error_details(details),
details=build_error_details(details),
request_id=request_id,
)
)
Expand All @@ -105,7 +105,7 @@ def general_exception_handler(request: Request, exc: Exception) -> JSONResponse:
request_id = getattr(request.state, "request_id", str(uuid.uuid4()))

# Log the full traceback server-side
logger.exception(
logger.exception( # noqa: LOG004 - this will be used by the exceptions
"Unexpected error: %s", exc, extra={"request_id": request_id}
)

Expand All @@ -127,7 +127,7 @@ def general_exception_handler(request: Request, exc: Exception) -> JSONResponse:
)


def _build_error_details(
def build_error_details(
details: list[dict | ErrorDetail] | dict | ErrorDetail | None,
) -> list[ErrorDetail]:
"""
Expand Down
13 changes: 13 additions & 0 deletions backend/app/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,16 @@ def __init__(self, message: str = "License has expired") -> None:
message=message,
http_status=status.HTTP_409_CONFLICT,
)


class LicenseKeyGenerationError(APIException):
"""License key generation failed after exhausting retries."""

def __init__(
self, message: str = "Failed to generate a unique license key"
) -> None:
super().__init__(
error_code=ErrorCode.LICENSE_KEY_GENERATION_ERROR,
message=message,
http_status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
22 changes: 11 additions & 11 deletions backend/app/core/security.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID

import jwt
from pwdlib import PasswordHash
from pwdlib.hashers.argon2 import Argon2Hasher
from pwdlib.hashers.bcrypt import BcryptHasher
from pwdlib.hashers import argon2, bcrypt

from app.core.config import Settings


# BcryptHasher is listed first so new passwords are hashed with bcrypt.
# Argon2Hasher is kept for verification of legacy hashes.
password_hash = PasswordHash((BcryptHasher(), Argon2Hasher()))
PASSWORD_HASH = PasswordHash((bcrypt.BcryptHasher(), argon2.Argon2Hasher()))


ALGORITHM = "HS256"
JWT_ALGORITHM: str = "HS256"


def create_access_token(
vendor_id: str,
vendor_id: str | UUID,
settings: Settings,
*,
expires_delta: timedelta | None = None,
Expand All @@ -36,11 +36,11 @@ def create_access_token(
"exp": expire,
"token_type": "access",
}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=JWT_ALGORITHM)


def create_refresh_token(
vendor_id: str,
vendor_id: str | UUID,
settings: Settings,
*,
expires_delta: timedelta | None = None,
Expand All @@ -58,7 +58,7 @@ def create_refresh_token(
"exp": expire,
"token_type": "refresh",
}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=JWT_ALGORITHM)


def decode_token(token: str, settings: Settings) -> dict[str, Any]:
Expand All @@ -67,14 +67,14 @@ def decode_token(token: str, settings: Settings) -> dict[str, Any]:
Returns:
dict[str, Any]: The decoded token payload.
"""
return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
return jwt.decode(token, settings.SECRET_KEY, algorithms=[JWT_ALGORITHM])


def verify_password(
plain_password: str, hashed_password: str
) -> tuple[bool, str | None]:
return password_hash.verify_and_update(plain_password, hashed_password)
return PASSWORD_HASH.verify_and_update(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
return password_hash.hash(password)
return PASSWORD_HASH.hash(password)
34 changes: 20 additions & 14 deletions backend/app/crud/vendor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ def get_vendor_by_email(cursor: Cursor, email: str) -> dict[str, Any] | None:
Returns:
dict[str, Any] | None: The vendor row or None.
"""
cursor.execute(
'SELECT "id", "email", "password_hash" '
'FROM app."vendors" '
'WHERE LOWER("email") = LOWER(%s) '
'AND "deleted_at" IS NULL',
cursor.execute( # SQL
"""
SELECT "id", "email", "password_hash"
FROM app."vendors"
WHERE LOWER("email") = LOWER(%s)
AND "deleted_at" IS NULL
""",
(email,),
)
row = cursor.fetchone()
Expand All @@ -32,10 +34,12 @@ def get_vendor_by_id(cursor: Cursor, vendor_id: str) -> dict[str, Any] | None:
Returns:
dict[str, Any] | None: The vendor row or None.
"""
cursor.execute(
'SELECT "id", "email" '
'FROM app."vendors" '
'WHERE "id" = %s AND "deleted_at" IS NULL',
cursor.execute( # SQL
"""
SELECT "id", "email"
FROM app."vendors"
WHERE "id" = %s AND "deleted_at" IS NULL
""",
(vendor_id,),
)
row = cursor.fetchone()
Expand All @@ -56,11 +60,13 @@ def create_vendor(
Returns:
dict[str, Any] | None: The created vendor row, or None on conflict.
"""
cursor.execute(
'INSERT INTO app."vendors" ("email", "password_hash") '
"VALUES (%s, %s) "
'ON CONFLICT ((LOWER("email"))) DO NOTHING '
'RETURNING "id", "email"',
cursor.execute( # SQL
"""
INSERT INTO app."vendors" ("email", "password_hash")
VALUES (%s, %s)
ON CONFLICT (LOWER("email")) DO NOTHING
RETURNING "id", "email"
""",
(email, password_hash),
)
row = cursor.fetchone()
Expand Down
60 changes: 60 additions & 0 deletions backend/app/domain/activation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from typing import ClassVar
from uuid import UUID

import uuid6
from pydantic import BaseModel, ConfigDict, computed_field, field_validator

from app.internal import base32_crockford


__all__ = ["ActivationCode"]


class ActivationCode(BaseModel):
model_config = ConfigDict(validate_assignment=True)

LENGTH: ClassVar[int] = 30
GROUP: ClassVar[int] = 5

code: str

@field_validator("code")
@classmethod
def validate_code(cls, v: str) -> str:
# TODO: need the proper error handling here.
normalized = base32_crockford.normalize(v)

if len(normalized) != cls.LENGTH:
raise ValueError("Activation code must contain 30 symbols")

base32_crockford.decode(normalized, checksum=True)

return "-".join(
normalized[i : i + cls.GROUP]
for i in range(0, cls.LENGTH, cls.GROUP)
)

@computed_field
@property
def uuid(self) -> UUID:
flat = base32_crockford.normalize(self.code)
n = base32_crockford.decode(flat, checksum=True)
return UUID(int=n)
Comment on lines +39 to +44
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Redundant normalization and decode in uuid property.

The validate_code validator already normalizes and decodes the code. The uuid property re-normalizes and re-decodes the same value. Consider caching or restructuring to avoid duplicate work.

♻️ Option: Store decoded value during validation

One approach is to store the decoded integer as a private attribute during validation, then use it in the uuid property. Alternatively, accept the small overhead since the operations are fast and the model is likely short-lived.

+    _decoded_int: int | None = None  # Private field for cached decode result
+
     `@field_validator`("code")
     `@classmethod`
     def validate_code(cls, v: str) -> str:
         normalized = base32_crockford.normalize(v)
         if len(normalized) != cls.LENGTH:
             raise ValueError("Activation code must contain 30 symbols")
-        base32_crockford.decode(normalized, checksum=True)
+        # Decode is performed for checksum validation; result used by uuid property
+        base32_crockford.decode(normalized, checksum=True)
         return "-".join(
             normalized[i : i + cls.GROUP]
             for i in range(0, cls.LENGTH, cls.GROUP)
         )

Note: Pydantic's field_validator doesn't easily allow setting other fields. If performance matters, consider using model_validator instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/domain/activation.py` around lines 39 - 44, The uuid property
re-normalizes and re-decodes the activation code redundantly; fix by caching the
decoded integer during validation (e.g., in the validate_code validator store
the decoded int on a private attribute like _decoded_uuid_int) and then have the
uuid property return UUID(int=self._decoded_uuid_int), or alternatively make the
uuid property a cached result (use functools.cached_property or pydantic model
caching) so base32_crockford.normalize/decode only run once; update the
validate_code (or switch to a model_validator) and the uuid property accordingly
to use the cached value.


@classmethod
def generate(cls, uuid: UUID | None = None) -> ActivationCode:
if uuid is None:
uuid = uuid6.uuid7()
if not isinstance(uuid, UUID):
raise TypeError(f"uuid cannot be of type {uuid.__class__.__name__}")
if uuid.version != 7: # noqa: PLR2004
raise TypeError(
f"uuid must be a UUID version 7, got {uuid.version}"
)
Comment on lines +46 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Redundant type check after None handling.

The isinstance(uuid, UUID) check at line 50 is unreachable when uuid was initially None, since uuid6.uuid7() returns a UUID object. The check only matters when a non-None value was passed, but the type hint already indicates UUID | None.

♻️ Suggested refactor to clarify intent
     `@classmethod`
     def generate(cls, uuid: UUID | None = None) -> ActivationCode:
+        if uuid is not None and not isinstance(uuid, UUID):
+            raise TypeError(f"uuid cannot be of type {uuid.__class__.__name__}")
         if uuid is None:
             uuid = uuid6.uuid7()
-        if not isinstance(uuid, UUID):
-            raise TypeError(f"uuid cannot be of type {uuid.__class__.__name__}")
         if uuid.version != 7:  # noqa: PLR2004
             raise TypeError(
                 f"uuid must be a UUID version 7, got {uuid.version}"
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/domain/activation.py` around lines 46 - 55, The isinstance(uuid,
UUID) check in Activation.generate is redundant after assigning uuid6.uuid7()
when uuid is None; update the validation to only enforce type when an external
value was passed: after the initial if uuid is None: uuid = uuid6.uuid7() block,
validate with "if not isinstance(uuid, UUID): raise TypeError(...)" but only
when the original argument was not None (or simply remove the unreachable branch
by performing the type check inside an else branch of the None handling), then
keep the existing UUID version check (uuid.version != 7) to raise the same
TypeError if needed; target the classmethod generate in activation.py.

Comment on lines +52 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider ValueError instead of TypeError for version mismatch.

The UUID type is correct; only the version value is wrong. ValueError is more semantically appropriate for "correct type, invalid value" scenarios.

♻️ Proposed change
         if uuid.version != 7:  # noqa: PLR2004
-            raise TypeError(
+            raise ValueError(
                 f"uuid must be a UUID version 7, got {uuid.version}"
             )

Note: This would require updating the corresponding test to expect ValueError instead of TypeError.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/domain/activation.py` around lines 52 - 55, The check that
currently raises TypeError when uuid.version != 7 should instead raise
ValueError because the UUID is the correct type but has an invalid value; update
the raise in the function/method performing the version check (the block
referencing uuid.version != 7) to raise ValueError with the same descriptive
message, and update any tests that assert a TypeError to expect ValueError
instead.


encoded = base32_crockford.encode(uuid.int, checksum=True)
encoded = encoded.rjust(cls.LENGTH, "0")

return cls(code=encoded)
Loading
Loading