Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0).

## [2.5.62] - 2026-02-14

### Bug Fixes

- **Fix PostgreSQL CREATE INDEX race condition on container startup**: Replace `fcntl.flock` file lock with PostgreSQL advisory lock (`pg_advisory_lock`) for migration coordination
- File locks only work within a single container's `/tmp` filesystem -- the app and celery-worker containers each have their own `/tmp`, so they raced against each other
- Advisory locks work across ALL connections to the same PostgreSQL database, preventing the `duplicate key value violates unique constraint "pg_class_relname_nsp_index"` errors seen on every container restart (~8 errors per restart event)
- Winner process acquires the lock and runs migrations; other processes block until complete then skip
- Falls back to uncoordinated execution if advisory lock fails (each DDL statement already has its own idempotency handling via `IF NOT EXISTS` / `try/except`)
- Extract `_run_all_migrations()` helper for cleaner code structure
- Files affected: `app.py`

---

## [2.5.61] - 2026-02-02

### Bug Fixes
Expand Down
163 changes: 98 additions & 65 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,83 +565,116 @@ def create_tables():
# Don't stop the application for table creation errors
# The tables might already exist and be functional

def migrate_database():
"""Run database migrations - uses file lock to ensure only one worker runs migrations"""
import fcntl
MIGRATION_ADVISORY_LOCK_ID = 7283945162

migration_lock_file = '/tmp/pixelprobe_migration.lock'
def _run_all_migrations():
"""Execute all database migrations. Called by migrate_database() after acquiring lock."""
from tools.app_startup_migration import run_startup_migrations

# Run startup migrations
logger.info("Running startup migrations...")
try:
# Try to acquire exclusive lock (non-blocking)
lock_file = open(migration_lock_file, 'w')
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)

# We got the lock, so we're the migration process
logger.info(f"Acquired migration lock in process {os.getpid()}, running migrations")

try:
# Run startup migrations
logger.info("Running startup migrations...")
from tools.app_startup_migration import run_startup_migrations
try:
run_startup_migrations(db)
logger.info("Startup migrations completed successfully")
except Exception as e:
logger.error(f"Startup migration failed: {e}")
run_startup_migrations(db)
logger.info("Startup migrations completed successfully")
except Exception as e:
logger.error(f"Startup migration failed: {e}")

# Run authentication tables migration for v2.4.0
logger.info("Checking authentication tables...")
try:
run_auth_migration()
logger.info("Authentication tables verified")
except Exception as e:
logger.error(f"Authentication migration failed: {e}")
# Run authentication tables migration for v2.4.0
logger.info("Checking authentication tables...")
try:
run_auth_migration()
logger.info("Authentication tables verified")
except Exception as e:
logger.error(f"Authentication migration failed: {e}")

# Run v2.4.35 migration
logger.info("Running v2.4.35 migration...")
try:
run_v2_4_35_migrations()
logger.info("v2.4.35 migration completed successfully")
except Exception as e:
logger.error(f"v2.4.35 migration failed: {e}")
# Run v2.4.35 migration
logger.info("Running v2.4.35 migration...")
try:
run_v2_4_35_migrations()
logger.info("v2.4.35 migration completed successfully")
except Exception as e:
logger.error(f"v2.4.35 migration failed: {e}")

# Run v2.4.113 migration
logger.info("Running v2.4.113 migration...")
try:
run_v2_4_113_migrations()
logger.info("v2.4.113 migration completed successfully")
except Exception as e:
logger.error(f"v2.4.113 migration failed: {e}")
# Run v2.4.113 migration
logger.info("Running v2.4.113 migration...")
try:
run_v2_4_113_migrations()
logger.info("v2.4.113 migration completed successfully")
except Exception as e:
logger.error(f"v2.4.113 migration failed: {e}")

# Create performance indexes
logger.info("Creating performance indexes...")
try:
create_performance_indexes()
logger.info("Performance indexes created successfully")
except Exception as e:
logger.error(f"Failed to create performance indexes: {e}")
# Create performance indexes
logger.info("Creating performance indexes...")
try:
create_performance_indexes()
logger.info("Performance indexes created successfully")
except Exception as e:
logger.error(f"Failed to create performance indexes: {e}")

logger.info("Database initialization completed")
logger.info("Database initialization completed")

finally:
# Release the lock
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
def migrate_database():
"""Run database migrations - uses PostgreSQL advisory lock to coordinate across containers.

except (IOError, OSError) as e:
# Another process has the lock - wait for it to complete
logger.info(f"Migrations already running in another process {os.getpid()}, waiting for completion...")
Advisory locks work across all connections to the same database, unlike file locks
which are scoped to a single container's filesystem. This prevents the
'duplicate key value violates unique constraint pg_class_relname_nsp_index' errors
that occurred when app and celery-worker containers raced during CREATE INDEX.
"""
from sqlalchemy import text

# Wait for the lock to be available (blocking)
try:
lock_file = open(migration_lock_file, 'w')
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) # This blocks until lock is available
# Lock acquired means migrations are done
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
lock_conn = None
try:
# Get a dedicated connection for the advisory lock
lock_conn = db.engine.connect()

# Try non-blocking lock acquisition
result = lock_conn.execute(
text("SELECT pg_try_advisory_lock(:lock_id)"),
{"lock_id": MIGRATION_ADVISORY_LOCK_ID}
)
acquired = result.scalar()

if acquired:
# We are the migration leader
logger.info(f"Acquired PostgreSQL advisory lock in process {os.getpid()}, running migrations")
try:
_run_all_migrations()
except Exception as mig_err:
logger.error(f"Migration error (lock held): {mig_err}")
finally:
lock_conn.execute(
text("SELECT pg_advisory_unlock(:lock_id)"),
{"lock_id": MIGRATION_ADVISORY_LOCK_ID}
)
logger.info("Released PostgreSQL advisory lock")
else:
# Another process holds the lock - wait for it to finish
logger.info(f"Migrations already running in another process, waiting for completion (process {os.getpid()})...")
lock_conn.execute(
text("SELECT pg_advisory_lock(:lock_id)"),
{"lock_id": MIGRATION_ADVISORY_LOCK_ID}
)
# Lock acquired means the leader finished; release immediately
lock_conn.execute(
text("SELECT pg_advisory_unlock(:lock_id)"),
{"lock_id": MIGRATION_ADVISORY_LOCK_ID}
)
logger.info(f"Migrations completed by another process, continuing startup in process {os.getpid()}")
except Exception as wait_error:
logger.warning(f"Could not wait for migration lock: {wait_error}")

except Exception as e:
# Advisory lock failed (e.g., connection error, non-PostgreSQL database)
# Fall back to running migrations uncoordinated - each DDL statement
# already has its own idempotency handling (IF NOT EXISTS, try/except)
logger.warning(f"Could not use advisory lock ({e}), running migrations without coordination")
_run_all_migrations()

finally:
if lock_conn is not None:
try:
lock_conn.close()
except Exception:
pass

def run_auth_migration():
"""Run authentication tables migration for v2.4.0"""
Expand Down
14 changes: 7 additions & 7 deletions pixelprobe/api/admin_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ def create_schedule():
try:
from pixelprobe.tasks import reload_schedules_task
reload_schedules_task.delay()
except ImportError as e:
# May fail in test environment where Celery isn't fully initialized
logger.warning(f"Could not trigger schedule reload (Celery not available): {e}")
except Exception as e:
# May fail if Celery/Redis unavailable (ImportError, ConnectionError, etc.)
logger.warning(f"Could not trigger schedule reload: {e}")

return schedule.to_dict(), 201
except Exception as e:
Expand Down Expand Up @@ -376,8 +376,8 @@ def update_schedule(schedule_id):
try:
from pixelprobe.tasks import reload_schedules_task
reload_schedules_task.delay()
except ImportError as e:
logger.warning(f"Could not trigger schedule reload (Celery not available): {e}")
except Exception as e:
logger.warning(f"Could not trigger schedule reload: {e}")

return schedule.to_dict()
except Exception as e:
Expand All @@ -400,8 +400,8 @@ def delete_schedule(schedule_id):
try:
from pixelprobe.tasks import reload_schedules_task
reload_schedules_task.delay()
except ImportError as e:
logger.warning(f"Could not trigger schedule reload (Celery not available): {e}")
except Exception as e:
logger.warning(f"Could not trigger schedule reload: {e}")

return '', 204
except Exception as e:
Expand Down
123 changes: 123 additions & 0 deletions tests/test_migration_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Tests for PostgreSQL advisory lock migration coordination in app.py

NOTE: These tests avoid 'from app import ...' because importing the app module
triggers Celery/Redis initialization at module level, which poisons the Redis
connection state for subsequent schedule integration tests. Instead, we read
the constant from source and test migrate_database via its module reference.
"""

import re
import sys
import pytest
from unittest.mock import patch, MagicMock
from pathlib import Path


def _get_app_module():
"""Get the app module if already imported (by conftest/fixtures), else skip."""
mod = sys.modules.get('app')
if mod is None:
pytest.skip("app module not imported (test isolation)")
return mod


def test_advisory_lock_id_is_stable():
"""MIGRATION_ADVISORY_LOCK_ID must never change -- other running containers
depend on the same value to coordinate."""
app_source = Path(__file__).parent.parent / 'app.py'
content = app_source.read_text()
match = re.search(r'MIGRATION_ADVISORY_LOCK_ID\s*=\s*(\d+)', content)
assert match is not None, "MIGRATION_ADVISORY_LOCK_ID not found in app.py"
assert int(match.group(1)) == 7283945162


def test_migrate_database_falls_back_on_advisory_lock_failure(app):
"""When advisory lock acquisition fails (e.g., SQLite test DB), migrations
still run via the fallback path."""
app_mod = _get_app_module()
with app.app_context():
with patch.object(app_mod, '_run_all_migrations') as mock_migrations:
app_mod.migrate_database()
mock_migrations.assert_called_once()


def test_migrate_database_releases_lock_on_success(app):
"""Advisory lock is released after migrations complete successfully."""
app_mod = _get_app_module()
with app.app_context():
mock_conn = MagicMock()
mock_scalar = MagicMock(return_value=True)
mock_result = MagicMock()
mock_result.scalar = mock_scalar
mock_conn.execute.return_value = mock_result

mock_engine = MagicMock()
mock_engine.connect.return_value = mock_conn

with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \
patch.object(app_mod, 'db') as mock_db:
mock_db.engine = mock_engine

app_mod.migrate_database()

mock_migrations.assert_called_once()
assert mock_conn.execute.call_count == 2
unlock_text_arg = mock_conn.execute.call_args_list[1][0][0]
assert 'pg_advisory_unlock' in unlock_text_arg.text
mock_conn.close.assert_called_once()


def test_migrate_database_releases_lock_on_migration_failure(app):
"""Advisory lock is released even if migrations raise an exception."""
app_mod = _get_app_module()
with app.app_context():
mock_conn = MagicMock()
mock_scalar = MagicMock(return_value=True)
mock_result = MagicMock()
mock_result.scalar = mock_scalar
mock_conn.execute.return_value = mock_result

mock_engine = MagicMock()
mock_engine.connect.return_value = mock_conn

with patch.object(app_mod, '_run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations, \
patch.object(app_mod, 'db') as mock_db:
mock_db.engine = mock_engine

app_mod.migrate_database()

mock_migrations.assert_called_once()
assert mock_conn.execute.call_count == 2
unlock_text_arg = mock_conn.execute.call_args_list[1][0][0]
assert 'pg_advisory_unlock' in unlock_text_arg.text
mock_conn.close.assert_called_once()


def test_migrate_database_waiter_path(app):
"""When another process holds the lock, we wait then skip migrations."""
app_mod = _get_app_module()
with app.app_context():
mock_conn = MagicMock()

call_count = [0]
def mock_execute(stmt, params=None):
call_count[0] += 1
result = MagicMock()
if call_count[0] == 1:
result.scalar.return_value = False
return result

mock_conn.execute = mock_execute

mock_engine = MagicMock()
mock_engine.connect.return_value = mock_conn

with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \
patch.object(app_mod, 'db') as mock_db:
mock_db.engine = mock_engine

app_mod.migrate_database()

mock_migrations.assert_not_called()
assert call_count[0] == 3
2 changes: 1 addition & 1 deletion version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Default version - this is the single source of truth


_DEFAULT_VERSION = '2.5.61'
_DEFAULT_VERSION = '2.5.62'


# Allow override via environment variable for CI/CD, but default to the hardcoded version
Expand Down