From 418b2fd6382888f018f0fe9896d3ea4ca55daaeb Mon Sep 17 00:00:00 2001 From: ttlequals0 Date: Sat, 14 Feb 2026 22:42:44 -0500 Subject: [PATCH 1/2] Fix PostgreSQL CREATE INDEX race condition on container startup (v2.5.62) Replace fcntl.flock file lock with PostgreSQL advisory lock for migration coordination. File locks only work within a single container /tmp filesystem, so the app and celery-worker containers raced against each other on every restart, causing duplicate key value violates unique constraint pg_class_relname_nsp_index errors per restart event. Advisory locks work across all connections to the same PostgreSQL database. Winner process runs migrations while others block and skip. Falls back to uncoordinated execution if advisory lock fails (each DDL statement already has its own idempotency handling). --- CHANGELOG.MD | 14 +++ app.py | 163 +++++++++++++++++++++-------------- tests/test_migration_lock.py | 117 +++++++++++++++++++++++++ version.py | 2 +- 4 files changed, 230 insertions(+), 66 deletions(-) create mode 100644 tests/test_migration_lock.py diff --git a/CHANGELOG.MD b/CHANGELOG.MD index efc6369..84d93b2 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0). +## [2.5.62] - 2026-02-14 + +### Bug Fixes + +- **Fix PostgreSQL CREATE INDEX race condition on container startup**: Replace `fcntl.flock` file lock with PostgreSQL advisory lock (`pg_advisory_lock`) for migration coordination + - File locks only work within a single container's `/tmp` filesystem -- the app and celery-worker containers each have their own `/tmp`, so they raced against each other + - Advisory locks work across ALL connections to the same PostgreSQL database, preventing the `duplicate key value violates unique constraint "pg_class_relname_nsp_index"` errors seen on every container restart (~8 errors per restart event) + - Winner process acquires the lock and runs migrations; other processes block until complete then skip + - Falls back to uncoordinated execution if advisory lock fails (each DDL statement already has its own idempotency handling via `IF NOT EXISTS` / `try/except`) + - Extract `_run_all_migrations()` helper for cleaner code structure + - Files affected: `app.py` + +--- + ## [2.5.61] - 2026-02-02 ### Bug Fixes diff --git a/app.py b/app.py index c2096ca..c7374de 100644 --- a/app.py +++ b/app.py @@ -565,83 +565,116 @@ def create_tables(): # Don't stop the application for table creation errors # The tables might already exist and be functional -def migrate_database(): - """Run database migrations - uses file lock to ensure only one worker runs migrations""" - import fcntl +MIGRATION_ADVISORY_LOCK_ID = 7283945162 - migration_lock_file = '/tmp/pixelprobe_migration.lock' +def _run_all_migrations(): + """Execute all database migrations. Called by migrate_database() after acquiring lock.""" + from tools.app_startup_migration import run_startup_migrations + # Run startup migrations + logger.info("Running startup migrations...") try: - # Try to acquire exclusive lock (non-blocking) - lock_file = open(migration_lock_file, 'w') - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) - - # We got the lock, so we're the migration process - logger.info(f"Acquired migration lock in process {os.getpid()}, running migrations") - - try: - # Run startup migrations - logger.info("Running startup migrations...") - from tools.app_startup_migration import run_startup_migrations - try: - run_startup_migrations(db) - logger.info("Startup migrations completed successfully") - except Exception as e: - logger.error(f"Startup migration failed: {e}") + run_startup_migrations(db) + logger.info("Startup migrations completed successfully") + except Exception as e: + logger.error(f"Startup migration failed: {e}") - # Run authentication tables migration for v2.4.0 - logger.info("Checking authentication tables...") - try: - run_auth_migration() - logger.info("Authentication tables verified") - except Exception as e: - logger.error(f"Authentication migration failed: {e}") + # Run authentication tables migration for v2.4.0 + logger.info("Checking authentication tables...") + try: + run_auth_migration() + logger.info("Authentication tables verified") + except Exception as e: + logger.error(f"Authentication migration failed: {e}") - # Run v2.4.35 migration - logger.info("Running v2.4.35 migration...") - try: - run_v2_4_35_migrations() - logger.info("v2.4.35 migration completed successfully") - except Exception as e: - logger.error(f"v2.4.35 migration failed: {e}") + # Run v2.4.35 migration + logger.info("Running v2.4.35 migration...") + try: + run_v2_4_35_migrations() + logger.info("v2.4.35 migration completed successfully") + except Exception as e: + logger.error(f"v2.4.35 migration failed: {e}") - # Run v2.4.113 migration - logger.info("Running v2.4.113 migration...") - try: - run_v2_4_113_migrations() - logger.info("v2.4.113 migration completed successfully") - except Exception as e: - logger.error(f"v2.4.113 migration failed: {e}") + # Run v2.4.113 migration + logger.info("Running v2.4.113 migration...") + try: + run_v2_4_113_migrations() + logger.info("v2.4.113 migration completed successfully") + except Exception as e: + logger.error(f"v2.4.113 migration failed: {e}") - # Create performance indexes - logger.info("Creating performance indexes...") - try: - create_performance_indexes() - logger.info("Performance indexes created successfully") - except Exception as e: - logger.error(f"Failed to create performance indexes: {e}") + # Create performance indexes + logger.info("Creating performance indexes...") + try: + create_performance_indexes() + logger.info("Performance indexes created successfully") + except Exception as e: + logger.error(f"Failed to create performance indexes: {e}") - logger.info("Database initialization completed") + logger.info("Database initialization completed") - finally: - # Release the lock - fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) - lock_file.close() +def migrate_database(): + """Run database migrations - uses PostgreSQL advisory lock to coordinate across containers. - except (IOError, OSError) as e: - # Another process has the lock - wait for it to complete - logger.info(f"Migrations already running in another process {os.getpid()}, waiting for completion...") + Advisory locks work across all connections to the same database, unlike file locks + which are scoped to a single container's filesystem. This prevents the + 'duplicate key value violates unique constraint pg_class_relname_nsp_index' errors + that occurred when app and celery-worker containers raced during CREATE INDEX. + """ + from sqlalchemy import text - # Wait for the lock to be available (blocking) - try: - lock_file = open(migration_lock_file, 'w') - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) # This blocks until lock is available - # Lock acquired means migrations are done - fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) - lock_file.close() + lock_conn = None + try: + # Get a dedicated connection for the advisory lock + lock_conn = db.engine.connect() + + # Try non-blocking lock acquisition + result = lock_conn.execute( + text("SELECT pg_try_advisory_lock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + acquired = result.scalar() + + if acquired: + # We are the migration leader + logger.info(f"Acquired PostgreSQL advisory lock in process {os.getpid()}, running migrations") + try: + _run_all_migrations() + except Exception as mig_err: + logger.error(f"Migration error (lock held): {mig_err}") + finally: + lock_conn.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + logger.info("Released PostgreSQL advisory lock") + else: + # Another process holds the lock - wait for it to finish + logger.info(f"Migrations already running in another process, waiting for completion (process {os.getpid()})...") + lock_conn.execute( + text("SELECT pg_advisory_lock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + # Lock acquired means the leader finished; release immediately + lock_conn.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) logger.info(f"Migrations completed by another process, continuing startup in process {os.getpid()}") - except Exception as wait_error: - logger.warning(f"Could not wait for migration lock: {wait_error}") + + except Exception as e: + # Advisory lock failed (e.g., connection error, non-PostgreSQL database) + # Fall back to running migrations uncoordinated - each DDL statement + # already has its own idempotency handling (IF NOT EXISTS, try/except) + logger.warning(f"Could not use advisory lock ({e}), running migrations without coordination") + _run_all_migrations() + + finally: + if lock_conn is not None: + try: + lock_conn.close() + except Exception: + pass def run_auth_migration(): """Run authentication tables migration for v2.4.0""" diff --git a/tests/test_migration_lock.py b/tests/test_migration_lock.py new file mode 100644 index 0000000..94dbc39 --- /dev/null +++ b/tests/test_migration_lock.py @@ -0,0 +1,117 @@ +""" +Tests for PostgreSQL advisory lock migration coordination in app.py +""" + +import os +import pytest +from unittest.mock import patch, MagicMock, call + + +def test_advisory_lock_id_is_stable(): + """MIGRATION_ADVISORY_LOCK_ID must never change -- other running containers + depend on the same value to coordinate.""" + # Set SECRET_KEY so config.py doesn't raise on import + os.environ.setdefault('SECRET_KEY', 'test-secret-key') + from app import MIGRATION_ADVISORY_LOCK_ID + assert MIGRATION_ADVISORY_LOCK_ID == 7283945162 + + +def test_migrate_database_falls_back_on_advisory_lock_failure(app): + """When advisory lock acquisition fails (e.g., SQLite test DB), migrations + still run via the fallback path.""" + with app.app_context(): + with patch('app._run_all_migrations') as mock_migrations: + from app import migrate_database + # SQLite does not support pg_try_advisory_lock, so this should + # hit the except branch and fall back to uncoordinated execution + migrate_database() + mock_migrations.assert_called_once() + + +def test_migrate_database_releases_lock_on_success(app): + """Advisory lock is released after migrations complete successfully.""" + with app.app_context(): + mock_conn = MagicMock() + # pg_try_advisory_lock returns True (we are the leader) + mock_scalar = MagicMock(return_value=True) + mock_result = MagicMock() + mock_result.scalar = mock_scalar + mock_conn.execute.return_value = mock_result + + mock_engine = MagicMock() + mock_engine.connect.return_value = mock_conn + + with patch('app._run_all_migrations') as mock_migrations, \ + patch('app.db') as mock_db: + mock_db.engine = mock_engine + + from app import migrate_database + migrate_database() + + mock_migrations.assert_called_once() + # 2 execute calls: pg_try_advisory_lock, pg_advisory_unlock + assert mock_conn.execute.call_count == 2 + # Verify the SQL text of the unlock call (second execute) + unlock_text_arg = mock_conn.execute.call_args_list[1][0][0] + assert 'pg_advisory_unlock' in unlock_text_arg.text + mock_conn.close.assert_called_once() + + +def test_migrate_database_releases_lock_on_migration_failure(app): + """Advisory lock is released even if migrations raise an exception.""" + with app.app_context(): + mock_conn = MagicMock() + mock_scalar = MagicMock(return_value=True) + mock_result = MagicMock() + mock_result.scalar = mock_scalar + mock_conn.execute.return_value = mock_result + + mock_engine = MagicMock() + mock_engine.connect.return_value = mock_conn + + with patch('app._run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations, \ + patch('app.db') as mock_db: + mock_db.engine = mock_engine + + from app import migrate_database + # Should not propagate -- exception is caught within the leader block + migrate_database() + + mock_migrations.assert_called_once() + # Verify unlock was still called despite the exception + assert mock_conn.execute.call_count == 2 + unlock_text_arg = mock_conn.execute.call_args_list[1][0][0] + assert 'pg_advisory_unlock' in unlock_text_arg.text + mock_conn.close.assert_called_once() + + +def test_migrate_database_waiter_path(app): + """When another process holds the lock, we wait then skip migrations.""" + with app.app_context(): + mock_conn = MagicMock() + + call_count = [0] + def mock_execute(stmt, params=None): + call_count[0] += 1 + result = MagicMock() + if call_count[0] == 1: + # pg_try_advisory_lock returns False (someone else has it) + result.scalar.return_value = False + return result + + mock_conn.execute = mock_execute + + mock_engine = MagicMock() + mock_engine.connect.return_value = mock_conn + + with patch('app._run_all_migrations') as mock_migrations, \ + patch('app.db') as mock_db: + mock_db.engine = mock_engine + + from app import migrate_database + migrate_database() + + # Migrations should NOT have been called (we are the waiter) + mock_migrations.assert_not_called() + # 3 calls: pg_try_advisory_lock (False), pg_advisory_lock (blocking), pg_advisory_unlock + assert call_count[0] == 3 diff --git a/version.py b/version.py index 899d526..84e6947 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,7 @@ # Default version - this is the single source of truth -_DEFAULT_VERSION = '2.5.61' +_DEFAULT_VERSION = '2.5.62' # Allow override via environment variable for CI/CD, but default to the hardcoded version From 858444043a17e0ef723f64bbf095ab93ccecdddf Mon Sep 17 00:00:00 2001 From: ttlequals0 Date: Sat, 14 Feb 2026 23:12:56 -0500 Subject: [PATCH 2/2] Fix CI: widen exception handling for Celery schedule reload calls Schedule CRUD endpoints caught only ImportError from reload_schedules_task.delay(), but when the app module is imported first (resolving a circular import), the Celery task loads successfully and .delay() fails with a Redis ConnectionError instead. Widen to catch Exception so schedule operations succeed regardless of Celery/Redis availability. Also refactor migration lock tests to avoid importing app module directly (prevents Celery initialization side effects). --- pixelprobe/api/admin_routes.py | 14 +++---- tests/test_migration_lock.py | 68 ++++++++++++++++++---------------- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/pixelprobe/api/admin_routes.py b/pixelprobe/api/admin_routes.py index 93adbb1..ab854fe 100644 --- a/pixelprobe/api/admin_routes.py +++ b/pixelprobe/api/admin_routes.py @@ -325,9 +325,9 @@ def create_schedule(): try: from pixelprobe.tasks import reload_schedules_task reload_schedules_task.delay() - except ImportError as e: - # May fail in test environment where Celery isn't fully initialized - logger.warning(f"Could not trigger schedule reload (Celery not available): {e}") + except Exception as e: + # May fail if Celery/Redis unavailable (ImportError, ConnectionError, etc.) + logger.warning(f"Could not trigger schedule reload: {e}") return schedule.to_dict(), 201 except Exception as e: @@ -376,8 +376,8 @@ def update_schedule(schedule_id): try: from pixelprobe.tasks import reload_schedules_task reload_schedules_task.delay() - except ImportError as e: - logger.warning(f"Could not trigger schedule reload (Celery not available): {e}") + except Exception as e: + logger.warning(f"Could not trigger schedule reload: {e}") return schedule.to_dict() except Exception as e: @@ -400,8 +400,8 @@ def delete_schedule(schedule_id): try: from pixelprobe.tasks import reload_schedules_task reload_schedules_task.delay() - except ImportError as e: - logger.warning(f"Could not trigger schedule reload (Celery not available): {e}") + except Exception as e: + logger.warning(f"Could not trigger schedule reload: {e}") return '', 204 except Exception as e: diff --git a/tests/test_migration_lock.py b/tests/test_migration_lock.py index 94dbc39..e1df6d8 100644 --- a/tests/test_migration_lock.py +++ b/tests/test_migration_lock.py @@ -1,38 +1,52 @@ """ Tests for PostgreSQL advisory lock migration coordination in app.py + +NOTE: These tests avoid 'from app import ...' because importing the app module +triggers Celery/Redis initialization at module level, which poisons the Redis +connection state for subsequent schedule integration tests. Instead, we read +the constant from source and test migrate_database via its module reference. """ -import os +import re +import sys import pytest -from unittest.mock import patch, MagicMock, call +from unittest.mock import patch, MagicMock +from pathlib import Path + + +def _get_app_module(): + """Get the app module if already imported (by conftest/fixtures), else skip.""" + mod = sys.modules.get('app') + if mod is None: + pytest.skip("app module not imported (test isolation)") + return mod def test_advisory_lock_id_is_stable(): """MIGRATION_ADVISORY_LOCK_ID must never change -- other running containers depend on the same value to coordinate.""" - # Set SECRET_KEY so config.py doesn't raise on import - os.environ.setdefault('SECRET_KEY', 'test-secret-key') - from app import MIGRATION_ADVISORY_LOCK_ID - assert MIGRATION_ADVISORY_LOCK_ID == 7283945162 + app_source = Path(__file__).parent.parent / 'app.py' + content = app_source.read_text() + match = re.search(r'MIGRATION_ADVISORY_LOCK_ID\s*=\s*(\d+)', content) + assert match is not None, "MIGRATION_ADVISORY_LOCK_ID not found in app.py" + assert int(match.group(1)) == 7283945162 def test_migrate_database_falls_back_on_advisory_lock_failure(app): """When advisory lock acquisition fails (e.g., SQLite test DB), migrations still run via the fallback path.""" + app_mod = _get_app_module() with app.app_context(): - with patch('app._run_all_migrations') as mock_migrations: - from app import migrate_database - # SQLite does not support pg_try_advisory_lock, so this should - # hit the except branch and fall back to uncoordinated execution - migrate_database() + with patch.object(app_mod, '_run_all_migrations') as mock_migrations: + app_mod.migrate_database() mock_migrations.assert_called_once() def test_migrate_database_releases_lock_on_success(app): """Advisory lock is released after migrations complete successfully.""" + app_mod = _get_app_module() with app.app_context(): mock_conn = MagicMock() - # pg_try_advisory_lock returns True (we are the leader) mock_scalar = MagicMock(return_value=True) mock_result = MagicMock() mock_result.scalar = mock_scalar @@ -41,17 +55,14 @@ def test_migrate_database_releases_lock_on_success(app): mock_engine = MagicMock() mock_engine.connect.return_value = mock_conn - with patch('app._run_all_migrations') as mock_migrations, \ - patch('app.db') as mock_db: + with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \ + patch.object(app_mod, 'db') as mock_db: mock_db.engine = mock_engine - from app import migrate_database - migrate_database() + app_mod.migrate_database() mock_migrations.assert_called_once() - # 2 execute calls: pg_try_advisory_lock, pg_advisory_unlock assert mock_conn.execute.call_count == 2 - # Verify the SQL text of the unlock call (second execute) unlock_text_arg = mock_conn.execute.call_args_list[1][0][0] assert 'pg_advisory_unlock' in unlock_text_arg.text mock_conn.close.assert_called_once() @@ -59,6 +70,7 @@ def test_migrate_database_releases_lock_on_success(app): def test_migrate_database_releases_lock_on_migration_failure(app): """Advisory lock is released even if migrations raise an exception.""" + app_mod = _get_app_module() with app.app_context(): mock_conn = MagicMock() mock_scalar = MagicMock(return_value=True) @@ -69,16 +81,13 @@ def test_migrate_database_releases_lock_on_migration_failure(app): mock_engine = MagicMock() mock_engine.connect.return_value = mock_conn - with patch('app._run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations, \ - patch('app.db') as mock_db: + with patch.object(app_mod, '_run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations, \ + patch.object(app_mod, 'db') as mock_db: mock_db.engine = mock_engine - from app import migrate_database - # Should not propagate -- exception is caught within the leader block - migrate_database() + app_mod.migrate_database() mock_migrations.assert_called_once() - # Verify unlock was still called despite the exception assert mock_conn.execute.call_count == 2 unlock_text_arg = mock_conn.execute.call_args_list[1][0][0] assert 'pg_advisory_unlock' in unlock_text_arg.text @@ -87,6 +96,7 @@ def test_migrate_database_releases_lock_on_migration_failure(app): def test_migrate_database_waiter_path(app): """When another process holds the lock, we wait then skip migrations.""" + app_mod = _get_app_module() with app.app_context(): mock_conn = MagicMock() @@ -95,7 +105,6 @@ def mock_execute(stmt, params=None): call_count[0] += 1 result = MagicMock() if call_count[0] == 1: - # pg_try_advisory_lock returns False (someone else has it) result.scalar.return_value = False return result @@ -104,14 +113,11 @@ def mock_execute(stmt, params=None): mock_engine = MagicMock() mock_engine.connect.return_value = mock_conn - with patch('app._run_all_migrations') as mock_migrations, \ - patch('app.db') as mock_db: + with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \ + patch.object(app_mod, 'db') as mock_db: mock_db.engine = mock_engine - from app import migrate_database - migrate_database() + app_mod.migrate_database() - # Migrations should NOT have been called (we are the waiter) mock_migrations.assert_not_called() - # 3 calls: pg_try_advisory_lock (False), pg_advisory_lock (blocking), pg_advisory_unlock assert call_count[0] == 3