diff --git a/CHANGELOG.MD b/CHANGELOG.MD index efc6369..84d93b2 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0). +## [2.5.62] - 2026-02-14 + +### Bug Fixes + +- **Fix PostgreSQL CREATE INDEX race condition on container startup**: Replace `fcntl.flock` file lock with PostgreSQL advisory lock (`pg_advisory_lock`) for migration coordination + - File locks only work within a single container's `/tmp` filesystem -- the app and celery-worker containers each have their own `/tmp`, so they raced against each other + - Advisory locks work across ALL connections to the same PostgreSQL database, preventing the `duplicate key value violates unique constraint "pg_class_relname_nsp_index"` errors seen on every container restart (~8 errors per restart event) + - Winner process acquires the lock and runs migrations; other processes block until complete then skip + - Falls back to uncoordinated execution if advisory lock fails (each DDL statement already has its own idempotency handling via `IF NOT EXISTS` / `try/except`) + - Extract `_run_all_migrations()` helper for cleaner code structure + - Files affected: `app.py` + +--- + ## [2.5.61] - 2026-02-02 ### Bug Fixes diff --git a/app.py b/app.py index c2096ca..c7374de 100644 --- a/app.py +++ b/app.py @@ -565,83 +565,116 @@ def create_tables(): # Don't stop the application for table creation errors # The tables might already exist and be functional -def migrate_database(): - """Run database migrations - uses file lock to ensure only one worker runs migrations""" - import fcntl +MIGRATION_ADVISORY_LOCK_ID = 7283945162 - migration_lock_file = '/tmp/pixelprobe_migration.lock' +def _run_all_migrations(): + """Execute all database migrations. Called by migrate_database() after acquiring lock.""" + from tools.app_startup_migration import run_startup_migrations + # Run startup migrations + logger.info("Running startup migrations...") try: - # Try to acquire exclusive lock (non-blocking) - lock_file = open(migration_lock_file, 'w') - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) - - # We got the lock, so we're the migration process - logger.info(f"Acquired migration lock in process {os.getpid()}, running migrations") - - try: - # Run startup migrations - logger.info("Running startup migrations...") - from tools.app_startup_migration import run_startup_migrations - try: - run_startup_migrations(db) - logger.info("Startup migrations completed successfully") - except Exception as e: - logger.error(f"Startup migration failed: {e}") + run_startup_migrations(db) + logger.info("Startup migrations completed successfully") + except Exception as e: + logger.error(f"Startup migration failed: {e}") - # Run authentication tables migration for v2.4.0 - logger.info("Checking authentication tables...") - try: - run_auth_migration() - logger.info("Authentication tables verified") - except Exception as e: - logger.error(f"Authentication migration failed: {e}") + # Run authentication tables migration for v2.4.0 + logger.info("Checking authentication tables...") + try: + run_auth_migration() + logger.info("Authentication tables verified") + except Exception as e: + logger.error(f"Authentication migration failed: {e}") - # Run v2.4.35 migration - logger.info("Running v2.4.35 migration...") - try: - run_v2_4_35_migrations() - logger.info("v2.4.35 migration completed successfully") - except Exception as e: - logger.error(f"v2.4.35 migration failed: {e}") + # Run v2.4.35 migration + logger.info("Running v2.4.35 migration...") + try: + run_v2_4_35_migrations() + logger.info("v2.4.35 migration completed successfully") + except Exception as e: + logger.error(f"v2.4.35 migration failed: {e}") - # Run v2.4.113 migration - logger.info("Running v2.4.113 migration...") - try: - run_v2_4_113_migrations() - logger.info("v2.4.113 migration completed successfully") - except Exception as e: - logger.error(f"v2.4.113 migration failed: {e}") + # Run v2.4.113 migration + logger.info("Running v2.4.113 migration...") + try: + run_v2_4_113_migrations() + logger.info("v2.4.113 migration completed successfully") + except Exception as e: + logger.error(f"v2.4.113 migration failed: {e}") - # Create performance indexes - logger.info("Creating performance indexes...") - try: - create_performance_indexes() - logger.info("Performance indexes created successfully") - except Exception as e: - logger.error(f"Failed to create performance indexes: {e}") + # Create performance indexes + logger.info("Creating performance indexes...") + try: + create_performance_indexes() + logger.info("Performance indexes created successfully") + except Exception as e: + logger.error(f"Failed to create performance indexes: {e}") - logger.info("Database initialization completed") + logger.info("Database initialization completed") - finally: - # Release the lock - fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) - lock_file.close() +def migrate_database(): + """Run database migrations - uses PostgreSQL advisory lock to coordinate across containers. - except (IOError, OSError) as e: - # Another process has the lock - wait for it to complete - logger.info(f"Migrations already running in another process {os.getpid()}, waiting for completion...") + Advisory locks work across all connections to the same database, unlike file locks + which are scoped to a single container's filesystem. This prevents the + 'duplicate key value violates unique constraint pg_class_relname_nsp_index' errors + that occurred when app and celery-worker containers raced during CREATE INDEX. + """ + from sqlalchemy import text - # Wait for the lock to be available (blocking) - try: - lock_file = open(migration_lock_file, 'w') - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) # This blocks until lock is available - # Lock acquired means migrations are done - fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) - lock_file.close() + lock_conn = None + try: + # Get a dedicated connection for the advisory lock + lock_conn = db.engine.connect() + + # Try non-blocking lock acquisition + result = lock_conn.execute( + text("SELECT pg_try_advisory_lock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + acquired = result.scalar() + + if acquired: + # We are the migration leader + logger.info(f"Acquired PostgreSQL advisory lock in process {os.getpid()}, running migrations") + try: + _run_all_migrations() + except Exception as mig_err: + logger.error(f"Migration error (lock held): {mig_err}") + finally: + lock_conn.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + logger.info("Released PostgreSQL advisory lock") + else: + # Another process holds the lock - wait for it to finish + logger.info(f"Migrations already running in another process, waiting for completion (process {os.getpid()})...") + lock_conn.execute( + text("SELECT pg_advisory_lock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + # Lock acquired means the leader finished; release immediately + lock_conn.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) logger.info(f"Migrations completed by another process, continuing startup in process {os.getpid()}") - except Exception as wait_error: - logger.warning(f"Could not wait for migration lock: {wait_error}") + + except Exception as e: + # Advisory lock failed (e.g., connection error, non-PostgreSQL database) + # Fall back to running migrations uncoordinated - each DDL statement + # already has its own idempotency handling (IF NOT EXISTS, try/except) + logger.warning(f"Could not use advisory lock ({e}), running migrations without coordination") + _run_all_migrations() + + finally: + if lock_conn is not None: + try: + lock_conn.close() + except Exception: + pass def run_auth_migration(): """Run authentication tables migration for v2.4.0""" diff --git a/pixelprobe/api/admin_routes.py b/pixelprobe/api/admin_routes.py index 93adbb1..ab854fe 100644 --- a/pixelprobe/api/admin_routes.py +++ b/pixelprobe/api/admin_routes.py @@ -325,9 +325,9 @@ def create_schedule(): try: from pixelprobe.tasks import reload_schedules_task reload_schedules_task.delay() - except ImportError as e: - # May fail in test environment where Celery isn't fully initialized - logger.warning(f"Could not trigger schedule reload (Celery not available): {e}") + except Exception as e: + # May fail if Celery/Redis unavailable (ImportError, ConnectionError, etc.) + logger.warning(f"Could not trigger schedule reload: {e}") return schedule.to_dict(), 201 except Exception as e: @@ -376,8 +376,8 @@ def update_schedule(schedule_id): try: from pixelprobe.tasks import reload_schedules_task reload_schedules_task.delay() - except ImportError as e: - logger.warning(f"Could not trigger schedule reload (Celery not available): {e}") + except Exception as e: + logger.warning(f"Could not trigger schedule reload: {e}") return schedule.to_dict() except Exception as e: @@ -400,8 +400,8 @@ def delete_schedule(schedule_id): try: from pixelprobe.tasks import reload_schedules_task reload_schedules_task.delay() - except ImportError as e: - logger.warning(f"Could not trigger schedule reload (Celery not available): {e}") + except Exception as e: + logger.warning(f"Could not trigger schedule reload: {e}") return '', 204 except Exception as e: diff --git a/tests/test_migration_lock.py b/tests/test_migration_lock.py new file mode 100644 index 0000000..e1df6d8 --- /dev/null +++ b/tests/test_migration_lock.py @@ -0,0 +1,123 @@ +""" +Tests for PostgreSQL advisory lock migration coordination in app.py + +NOTE: These tests avoid 'from app import ...' because importing the app module +triggers Celery/Redis initialization at module level, which poisons the Redis +connection state for subsequent schedule integration tests. Instead, we read +the constant from source and test migrate_database via its module reference. +""" + +import re +import sys +import pytest +from unittest.mock import patch, MagicMock +from pathlib import Path + + +def _get_app_module(): + """Get the app module if already imported (by conftest/fixtures), else skip.""" + mod = sys.modules.get('app') + if mod is None: + pytest.skip("app module not imported (test isolation)") + return mod + + +def test_advisory_lock_id_is_stable(): + """MIGRATION_ADVISORY_LOCK_ID must never change -- other running containers + depend on the same value to coordinate.""" + app_source = Path(__file__).parent.parent / 'app.py' + content = app_source.read_text() + match = re.search(r'MIGRATION_ADVISORY_LOCK_ID\s*=\s*(\d+)', content) + assert match is not None, "MIGRATION_ADVISORY_LOCK_ID not found in app.py" + assert int(match.group(1)) == 7283945162 + + +def test_migrate_database_falls_back_on_advisory_lock_failure(app): + """When advisory lock acquisition fails (e.g., SQLite test DB), migrations + still run via the fallback path.""" + app_mod = _get_app_module() + with app.app_context(): + with patch.object(app_mod, '_run_all_migrations') as mock_migrations: + app_mod.migrate_database() + mock_migrations.assert_called_once() + + +def test_migrate_database_releases_lock_on_success(app): + """Advisory lock is released after migrations complete successfully.""" + app_mod = _get_app_module() + with app.app_context(): + mock_conn = MagicMock() + mock_scalar = MagicMock(return_value=True) + mock_result = MagicMock() + mock_result.scalar = mock_scalar + mock_conn.execute.return_value = mock_result + + mock_engine = MagicMock() + mock_engine.connect.return_value = mock_conn + + with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \ + patch.object(app_mod, 'db') as mock_db: + mock_db.engine = mock_engine + + app_mod.migrate_database() + + mock_migrations.assert_called_once() + assert mock_conn.execute.call_count == 2 + unlock_text_arg = mock_conn.execute.call_args_list[1][0][0] + assert 'pg_advisory_unlock' in unlock_text_arg.text + mock_conn.close.assert_called_once() + + +def test_migrate_database_releases_lock_on_migration_failure(app): + """Advisory lock is released even if migrations raise an exception.""" + app_mod = _get_app_module() + with app.app_context(): + mock_conn = MagicMock() + mock_scalar = MagicMock(return_value=True) + mock_result = MagicMock() + mock_result.scalar = mock_scalar + mock_conn.execute.return_value = mock_result + + mock_engine = MagicMock() + mock_engine.connect.return_value = mock_conn + + with patch.object(app_mod, '_run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations, \ + patch.object(app_mod, 'db') as mock_db: + mock_db.engine = mock_engine + + app_mod.migrate_database() + + mock_migrations.assert_called_once() + assert mock_conn.execute.call_count == 2 + unlock_text_arg = mock_conn.execute.call_args_list[1][0][0] + assert 'pg_advisory_unlock' in unlock_text_arg.text + mock_conn.close.assert_called_once() + + +def test_migrate_database_waiter_path(app): + """When another process holds the lock, we wait then skip migrations.""" + app_mod = _get_app_module() + with app.app_context(): + mock_conn = MagicMock() + + call_count = [0] + def mock_execute(stmt, params=None): + call_count[0] += 1 + result = MagicMock() + if call_count[0] == 1: + result.scalar.return_value = False + return result + + mock_conn.execute = mock_execute + + mock_engine = MagicMock() + mock_engine.connect.return_value = mock_conn + + with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \ + patch.object(app_mod, 'db') as mock_db: + mock_db.engine = mock_engine + + app_mod.migrate_database() + + mock_migrations.assert_not_called() + assert call_count[0] == 3 diff --git a/version.py b/version.py index 899d526..84e6947 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,7 @@ # Default version - this is the single source of truth -_DEFAULT_VERSION = '2.5.61' +_DEFAULT_VERSION = '2.5.62' # Allow override via environment variable for CI/CD, but default to the hardcoded version