diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 84d93b2..1b3ae1b 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -5,6 +5,61 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0). +## [2.5.63] - 2026-02-18 + +### Code Simplification + +- **Deduplicate `convert_to_tz()` in models.py**: Extract 8 identical inline copies into a single module-level function +- **Consolidate `rate_limit()` / `exempt_from_rate_limit()`**: Update canonical `pixelprobe/utils/rate_limiting.py` to match inline implementation, replace 5 inline copies in route files with imports +- **Deduplicate Bearer token extraction in auth.py**: Extract `_extract_bearer_token()` helper, replace 4 inline token-parsing blocks, remove dead `token = token` no-ops +- **Unify file extension lists**: Remove divergent extension sets from `pixelprobe/utils/helpers.py`, import canonical lists from `pixelprobe/constants.py` +- **Rename ambiguous `validate_file_path`**: Rename `validators.py` version to `validate_file_path_format()` to distinguish from security.py's path traversal prevention +- **Deduplicate `check_celery_available()`**: Move to new `pixelprobe/utils/celery_utils.py`, replace copies in `scan_routes.py` and `scan_routes_parallel.py` +- **Deduplicate `ContextTask` in celery_config.py**: Extract `_make_context_task()` factory, replace 3 identical inner class definitions +- **Replace deprecated `datetime.utcnow()`**: Use `datetime.now(timezone.utc)` in `models.py` (4 locations) and `security.py` (2 locations) +- **Extract `get_configured_scan_paths()`**: Create shared helper in `pixelprobe/utils/helpers.py`, replace 6+ inline DB-fallback-to-env patterns +- **Use `TERMINAL_SCAN_PHASES` constant**: Replace 5 hardcoded `['idle', 'completed', 'error', 'crashed', 'cancelled']` lists with import from `pixelprobe/constants.py` +- **Consolidate scheduler scan methods**: Extract `_filter_excluded_paths()` and `_execute_scan_request()` helpers in `scheduler.py` +- **Remove dead `truncate_scan_output()`**: Remove no-op function from `media_checker.py`, replace 7 call sites with direct variable reference +- **Merge `load_exclusions_with_patterns()`**: Read `exclusions.json` once instead of twice +- **Remove unnecessary `getattr()` calls**: Use direct attribute access in `ScanReport.to_dict()` +- **Break up `app.py`** (1137 -> 541 lines): + - Extract migration functions to `pixelprobe/migrations/startup.py` + - Extract scheduler lock management to `pixelprobe/scheduler_lock.py` + - Extract startup cleanup routines to `pixelprobe/startup.py` + - Simplify v2.2.68 column migration with loop +- Files affected: `models.py`, `auth.py`, `app.py`, `scheduler.py`, `celery_config.py`, `media_checker.py`, `version.py`, `pixelprobe/utils/rate_limiting.py`, `pixelprobe/utils/helpers.py`, `pixelprobe/utils/validators.py`, `pixelprobe/utils/celery_utils.py` (new), `pixelprobe/utils/__init__.py`, `pixelprobe/constants.py`, `pixelprobe/migrations/startup.py` (new), `pixelprobe/scheduler_lock.py` (new), `pixelprobe/startup.py` (new), `pixelprobe/api/admin_routes.py`, `pixelprobe/api/scan_routes.py`, `pixelprobe/api/scan_routes_parallel.py`, `pixelprobe/api/maintenance_routes.py`, `pixelprobe/api/stats_routes.py`, `pixelprobe/services/stats_service.py` + +--- + +## [2.5.63] - 2026-02-18 + +### Code Simplification + +- **Deduplicate `convert_to_tz()` in models.py**: Extracted 8 identical inline copies into a single module-level function +- **Consolidate `rate_limit()` / `exempt_from_rate_limit()`**: Updated canonical `pixelprobe/utils/rate_limiting.py` to match inline implementations and replaced all 4 inline copies in route files +- **Deduplicate Bearer token extraction in auth.py**: Extracted `_extract_bearer_token()` helper, replaced 4 inline copies, removed `token = token` no-ops +- **Unify file extension lists**: Removed divergent inline sets from `helpers.py`, now imports from `pixelprobe/constants.py` +- **Break up `app.py`**: Extracted 600+ lines into focused modules: + - `pixelprobe/migrations/startup.py` -- all DB migration functions + - `pixelprobe/scheduler_lock.py` -- Redis distributed lock management + - `pixelprobe/startup.py` -- startup cleanup routines + - Reduced `app.py` from 1137 to 541 lines +- **Deduplicate `check_celery_available()`**: Moved to `pixelprobe/utils/celery_utils.py`, replaced copies in `scan_routes.py` and `scan_routes_parallel.py` +- **Deduplicate `ContextTask` in `celery_config.py`**: Extracted `_make_context_task()` factory, replaced 3 identical class definitions +- **Replace deprecated `datetime.utcnow()`**: All occurrences in `models.py` and `security.py` now use `datetime.now(timezone.utc)` +- **Extract `get_configured_scan_paths()`**: Created shared helper in `pixelprobe/utils/helpers.py`, replaced 6+ inline "read from DB, fallback to env var" patterns +- **Use `TERMINAL_SCAN_PHASES` constant**: Replaced 5 hardcoded `['idle', 'completed', 'error', 'crashed', 'cancelled']` lists with the constant from `pixelprobe/constants.py` +- **Consolidate scheduler scan methods**: Extracted `_filter_excluded_paths()` and `_execute_scan_request()` helpers, reducing duplication between `_run_periodic_scan` and `_run_scheduled_scan` +- **Rename ambiguous `validate_file_path`**: Renamed validators.py version to `validate_file_path_format()` to avoid confusion with the security.py version +- **Remove dead `truncate_scan_output()`**: Was a no-op (returned input unchanged), replaced 7 call sites with direct variable usage +- **Merge `load_exclusions_with_patterns()`**: Now reads `exclusions.json` once instead of twice +- **Remove unnecessary `getattr()` calls**: `ScanReport.to_dict()` now accesses `num_workers`, `files_added`, `files_updated` directly +- **Update stale `deep_scan` comment**: Replaced outdated version-specific comment with generic backward-compat note +- Files affected: `models.py`, `auth.py`, `app.py`, `scheduler.py`, `celery_config.py`, `media_checker.py`, `pixelprobe/utils/rate_limiting.py`, `pixelprobe/utils/helpers.py`, `pixelprobe/utils/validators.py`, `pixelprobe/utils/celery_utils.py` (new), `pixelprobe/utils/security.py`, `pixelprobe/utils/__init__.py`, `pixelprobe/api/admin_routes.py`, `pixelprobe/api/scan_routes.py`, `pixelprobe/api/scan_routes_parallel.py`, `pixelprobe/api/maintenance_routes.py`, `pixelprobe/api/stats_routes.py`, `pixelprobe/services/stats_service.py`, `pixelprobe/migrations/startup.py` (new), `pixelprobe/scheduler_lock.py` (new), `pixelprobe/startup.py` (new) + +--- + ## [2.5.62] - 2026-02-14 ### Bug Fixes diff --git a/app.py b/app.py index c7374de..dae5013 100644 --- a/app.py +++ b/app.py @@ -454,408 +454,63 @@ def logo(): return send_file(logo_path, mimetype='image/png') return '', 404 -def cleanup_stuck_operations(): - """Clean up any stuck operations from previous runs""" - try: - from models import FileChangesState, CleanupState - - # Mark any active file changes as failed - active_file_changes = FileChangesState.query.filter_by(is_active=True).all() - for file_change in active_file_changes: - file_change.is_active = False - file_change.phase = 'failed' - file_change.end_time = datetime.now(timezone.utc) - file_change.progress_message = 'Application restarted - operation marked as failed' - logger.warning(f"Marking stuck file changes operation {file_change.check_id} as failed") - - # Mark any active cleanup operations as failed - active_cleanups = CleanupState.query.filter_by(is_active=True).all() - for cleanup in active_cleanups: - cleanup.is_active = False - cleanup.phase = 'failed' - cleanup.end_time = datetime.now(timezone.utc) - cleanup.progress_message = 'Application restarted - operation marked as failed' - logger.warning(f"Marking stuck cleanup operation {cleanup.cleanup_id} as failed") - - if active_file_changes or active_cleanups: - db.session.commit() - logger.info(f"Cleaned up {len(active_file_changes)} stuck file changes and {len(active_cleanups)} stuck cleanup operations") - - except Exception as e: - logger.error(f"Error cleaning up stuck operations: {str(e)}") - def create_tables(): """Initialize database tables and run migrations""" + from pixelprobe.migrations.startup import migrate_database + from pixelprobe.startup import cleanup_stuck_operations + logger.info(f"Starting PixelProbe v{__version__}") with app.app_context(): try: - # Use inspector to check existing tables from sqlalchemy import inspect, exc, text - + try: inspector = inspect(db.engine) existing_tables = inspector.get_table_names() - - # Only create tables that don't exist + for table_name, table in db.metadata.tables.items(): if table_name not in existing_tables: try: table.create(db.engine) logger.info(f"Created table: {table_name}") except (exc.OperationalError, exc.IntegrityError, exc.ProgrammingError) as e: - # Table might have been created by another worker - suppress common race condition errors err_str = str(e).lower() if any(msg in err_str for msg in ["already exists", "duplicate key", "typname_nsp_index"]): logger.debug(f"Table {table_name} already created by another worker") else: logger.error(f"Error creating table {table_name}: {str(e)}") - + logger.info("Database tables verified successfully") - + # Run migrations for v2.2.68 - add tracking columns if they don't exist if 'scan_state' in existing_tables: try: - # Check if new columns exist columns = [col['name'] for col in inspector.get_columns('scan_state')] - - # Add missing columns with safe migration with db.engine.connect() as conn: - if 'num_workers' not in columns: - try: - conn.execute(text("ALTER TABLE scan_state ADD COLUMN num_workers INTEGER DEFAULT 1")) - conn.commit() - logger.info("Added num_workers column to scan_state table") - except exc.OperationalError as e: - if "already exists" not in str(e).lower(): - logger.warning(f"Could not add num_workers column: {e}") - - if 'files_added' not in columns: - try: - conn.execute(text("ALTER TABLE scan_state ADD COLUMN files_added INTEGER DEFAULT 0")) - conn.commit() - logger.info("Added files_added column to scan_state table") - except exc.OperationalError as e: - if "already exists" not in str(e).lower(): - logger.warning(f"Could not add files_added column: {e}") - - if 'files_updated' not in columns: - try: - conn.execute(text("ALTER TABLE scan_state ADD COLUMN files_updated INTEGER DEFAULT 0")) - conn.commit() - logger.info("Added files_updated column to scan_state table") - except exc.OperationalError as e: - if "already exists" not in str(e).lower(): - logger.warning(f"Could not add files_updated column: {e}") - + for col_name in ['num_workers', 'files_added', 'files_updated']: + if col_name not in columns: + default = '1' if col_name == 'num_workers' else '0' + try: + conn.execute(text(f"ALTER TABLE scan_state ADD COLUMN {col_name} INTEGER DEFAULT {default}")) + conn.commit() + logger.info(f"Added {col_name} column to scan_state table") + except exc.OperationalError as e: + if "already exists" not in str(e).lower(): + logger.warning(f"Could not add {col_name} column: {e}") except Exception as e: logger.warning(f"Migration check failed (non-critical): {e}") - + except exc.OperationalError as e: - # This might happen if the database is locked or another worker created tables if "already exists" not in str(e): logger.error(f"Database operation error: {str(e)}") else: logger.info("Tables already exist (created by another worker)") - - migrate_database() - cleanup_stuck_operations() - - except Exception as e: - logger.error(f"Error in database initialization: {str(e)}") - # Don't stop the application for table creation errors - # The tables might already exist and be functional - -MIGRATION_ADVISORY_LOCK_ID = 7283945162 - -def _run_all_migrations(): - """Execute all database migrations. Called by migrate_database() after acquiring lock.""" - from tools.app_startup_migration import run_startup_migrations - - # Run startup migrations - logger.info("Running startup migrations...") - try: - run_startup_migrations(db) - logger.info("Startup migrations completed successfully") - except Exception as e: - logger.error(f"Startup migration failed: {e}") - - # Run authentication tables migration for v2.4.0 - logger.info("Checking authentication tables...") - try: - run_auth_migration() - logger.info("Authentication tables verified") - except Exception as e: - logger.error(f"Authentication migration failed: {e}") - - # Run v2.4.35 migration - logger.info("Running v2.4.35 migration...") - try: - run_v2_4_35_migrations() - logger.info("v2.4.35 migration completed successfully") - except Exception as e: - logger.error(f"v2.4.35 migration failed: {e}") - - # Run v2.4.113 migration - logger.info("Running v2.4.113 migration...") - try: - run_v2_4_113_migrations() - logger.info("v2.4.113 migration completed successfully") - except Exception as e: - logger.error(f"v2.4.113 migration failed: {e}") - - # Create performance indexes - logger.info("Creating performance indexes...") - try: - create_performance_indexes() - logger.info("Performance indexes created successfully") - except Exception as e: - logger.error(f"Failed to create performance indexes: {e}") - - logger.info("Database initialization completed") - -def migrate_database(): - """Run database migrations - uses PostgreSQL advisory lock to coordinate across containers. - - Advisory locks work across all connections to the same database, unlike file locks - which are scoped to a single container's filesystem. This prevents the - 'duplicate key value violates unique constraint pg_class_relname_nsp_index' errors - that occurred when app and celery-worker containers raced during CREATE INDEX. - """ - from sqlalchemy import text - - lock_conn = None - try: - # Get a dedicated connection for the advisory lock - lock_conn = db.engine.connect() - - # Try non-blocking lock acquisition - result = lock_conn.execute( - text("SELECT pg_try_advisory_lock(:lock_id)"), - {"lock_id": MIGRATION_ADVISORY_LOCK_ID} - ) - acquired = result.scalar() - - if acquired: - # We are the migration leader - logger.info(f"Acquired PostgreSQL advisory lock in process {os.getpid()}, running migrations") - try: - _run_all_migrations() - except Exception as mig_err: - logger.error(f"Migration error (lock held): {mig_err}") - finally: - lock_conn.execute( - text("SELECT pg_advisory_unlock(:lock_id)"), - {"lock_id": MIGRATION_ADVISORY_LOCK_ID} - ) - logger.info("Released PostgreSQL advisory lock") - else: - # Another process holds the lock - wait for it to finish - logger.info(f"Migrations already running in another process, waiting for completion (process {os.getpid()})...") - lock_conn.execute( - text("SELECT pg_advisory_lock(:lock_id)"), - {"lock_id": MIGRATION_ADVISORY_LOCK_ID} - ) - # Lock acquired means the leader finished; release immediately - lock_conn.execute( - text("SELECT pg_advisory_unlock(:lock_id)"), - {"lock_id": MIGRATION_ADVISORY_LOCK_ID} - ) - logger.info(f"Migrations completed by another process, continuing startup in process {os.getpid()}") - - except Exception as e: - # Advisory lock failed (e.g., connection error, non-PostgreSQL database) - # Fall back to running migrations uncoordinated - each DDL statement - # already has its own idempotency handling (IF NOT EXISTS, try/except) - logger.warning(f"Could not use advisory lock ({e}), running migrations without coordination") - _run_all_migrations() - - finally: - if lock_conn is not None: - try: - lock_conn.close() - except Exception: - pass - -def run_auth_migration(): - """Run authentication tables migration for v2.4.0""" - from sqlalchemy import text, inspect - - try: - # Check if tables already exist - inspector = inspect(db.engine) - existing_tables = inspector.get_table_names() - - with db.engine.connect() as conn: - # Create users table if it doesn't exist - if 'users' not in existing_tables: - conn.execute(text(""" - CREATE TABLE IF NOT EXISTS users ( - id SERIAL PRIMARY KEY, - username VARCHAR(80) UNIQUE NOT NULL, - email VARCHAR(120) UNIQUE NOT NULL, - password_hash VARCHAR(128) NOT NULL, - is_admin BOOLEAN NOT NULL DEFAULT TRUE, - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_login TIMESTAMP WITH TIME ZONE, - is_active BOOLEAN NOT NULL DEFAULT TRUE, - first_setup_required BOOLEAN NOT NULL DEFAULT FALSE - ) - """)) - conn.execute(text("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")) - conn.execute(text("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")) - logger.info("Created users table via migration") - - # Create API tokens table if it doesn't exist - if 'api_tokens' not in existing_tables: - conn.execute(text(""" - CREATE TABLE IF NOT EXISTS api_tokens ( - id SERIAL PRIMARY KEY, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - token VARCHAR(64) UNIQUE NOT NULL, - description VARCHAR(200), - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_used TIMESTAMP WITH TIME ZONE, - expires_at TIMESTAMP WITH TIME ZONE, - is_active BOOLEAN NOT NULL DEFAULT TRUE - ) - """)) - conn.execute(text("CREATE INDEX IF NOT EXISTS idx_api_tokens_token ON api_tokens(token)")) - conn.execute(text("CREATE INDEX IF NOT EXISTS idx_api_tokens_user_id ON api_tokens(user_id)")) - logger.info("Created api_tokens table via migration") - - # No longer create default admin user automatically - # Users must use the /api/auth/setup endpoint on first run - logger.info("Authentication tables migration completed") - - conn.commit() - - except Exception as e: - # Don't fail startup if migration issues - logger.warning(f"Authentication migration encountered issues: {e}") -def run_v2_4_35_migrations(): - """Run migrations for v2.4.35 - add last_heartbeat column to file_changes_state""" - from sqlalchemy import text + migrate_database(db) + cleanup_stuck_operations(db) - try: - with db.engine.connect() as conn: - # Check if file_changes_state table exists first - table_check = conn.execute(text(""" - SELECT table_name - FROM information_schema.tables - WHERE table_name = 'file_changes_state' - """)) - - if not table_check.fetchone(): - logger.debug("file_changes_state table does not exist - skipping migration (new installation)") - return - - # Check if last_heartbeat column exists in file_changes_state - result = conn.execute(text(""" - SELECT column_name - FROM information_schema.columns - WHERE table_name = 'file_changes_state' - AND column_name = 'last_heartbeat' - """)) - - if not result.fetchone(): - logger.info("Applying migration: Adding last_heartbeat column to file_changes_state table") - conn.execute(text(""" - ALTER TABLE file_changes_state - ADD COLUMN last_heartbeat TIMESTAMP WITH TIME ZONE - """)) - - conn.commit() - logger.info("Migration completed: last_heartbeat column added successfully") - else: - logger.debug("Migration already applied: last_heartbeat column exists") - - except Exception as e: - logger.error(f"Migration v2.4.35 failed: {e}") - # Don't fail startup - app might still work without this column - -def run_v2_4_113_migrations(): - """Run migrations for v2.4.113 - add last_integrity_check_date column to scan_results""" - from sqlalchemy import text - - try: - with db.engine.connect() as conn: - # Check if scan_results table exists first - table_check = conn.execute(text(""" - SELECT table_name - FROM information_schema.tables - WHERE table_name = 'scan_results' - """)) - - if not table_check.fetchone(): - logger.debug("scan_results table does not exist - skipping migration (new installation)") - return - - # Check if last_integrity_check_date column exists in scan_results - result = conn.execute(text(""" - SELECT column_name - FROM information_schema.columns - WHERE table_name = 'scan_results' - AND column_name = 'last_integrity_check_date' - """)) - - if not result.fetchone(): - logger.info("Applying migration: Adding last_integrity_check_date column to scan_results table") - conn.execute(text(""" - ALTER TABLE scan_results - ADD COLUMN last_integrity_check_date TIMESTAMP - """)) - - # Create index for better query performance - conn.execute(text(""" - CREATE INDEX IF NOT EXISTS idx_scan_results_last_integrity_check - ON scan_results(last_integrity_check_date) - """)) - - conn.commit() - logger.info("Migration completed: last_integrity_check_date column and index added successfully") - else: - logger.debug("Migration already applied: last_integrity_check_date column exists") - - except Exception as e: - logger.error(f"Migration v2.4.113 failed: {e}") - # Don't fail startup - app might still work without this column - -def create_performance_indexes(): - """Create performance indexes""" - from sqlalchemy import text - - indexes = [ - "CREATE INDEX IF NOT EXISTS idx_scan_status ON scan_results(scan_status)", - "CREATE INDEX IF NOT EXISTS idx_scan_date ON scan_results(scan_date)", - "CREATE INDEX IF NOT EXISTS idx_is_corrupted ON scan_results(is_corrupted)", - "CREATE INDEX IF NOT EXISTS idx_marked_as_good ON scan_results(marked_as_good)", - "CREATE INDEX IF NOT EXISTS idx_discovered_date ON scan_results(discovered_date)", - "CREATE INDEX IF NOT EXISTS idx_file_hash ON scan_results(file_hash)", - "CREATE INDEX IF NOT EXISTS idx_last_modified ON scan_results(last_modified)", - "CREATE INDEX IF NOT EXISTS idx_file_path ON scan_results(file_path)", - "CREATE INDEX IF NOT EXISTS idx_status_date ON scan_results(scan_status, scan_date)", - "CREATE INDEX IF NOT EXISTS idx_corrupted_good ON scan_results(is_corrupted, marked_as_good)", - "CREATE INDEX IF NOT EXISTS idx_file_path_status ON scan_results(file_path, scan_status)" - ] - - logger.info("Creating performance indexes...") - created_count = 0 - for index_sql in indexes: - try: - # Use separate transaction for each index - with db.engine.begin() as conn: - conn.execute(text(index_sql)) - created_count += 1 except Exception as e: - # Index might already exist or column might not exist - if 'already exists' not in str(e).lower() and 'does not exist' not in str(e).lower(): - logger.debug(f"Could not create index: {e}") - - if created_count > 0: - logger.info(f"Created {created_count} performance indexes") - else: - logger.debug("All performance indexes already exist") + logger.error(f"Error in database initialization: {str(e)}") # Initialize on startup for better Docker compatibility with app.app_context(): @@ -869,265 +524,14 @@ def create_performance_indexes(): # This allows celery-worker to read paths from DB instead of needing env var sync_scan_paths_to_db() - # Use Redis-based distributed lock for cross-container scheduler coordination - # File locks don't work across containers (each has separate /tmp filesystem) - from pixelprobe.progress_utils import get_redis_client - from datetime import datetime, timezone - import socket - - redis_client = get_redis_client() - scheduler_initialized = [False] # Use list for mutable reference in nested functions - - # Get container hostname for lock ownership detection - # When container restarts with same hostname, we can detect stale self-locks - container_hostname = socket.gethostname() - - # Helper functions for scheduler lock management - def parse_scheduler_lock(lock_value: str) -> tuple: - """Parse lock value into (hostname, pid, timestamp_str). - - Lock formats: - - New: "hostname:pid:timestamp" (e.g., "pixelprobe-app:123:2026-01-01T00:00:00+00:00") - - Old: "pid:timestamp" (e.g., "123:2026-01-01T00:00:00+00:00") - """ - parts = lock_value.split(':') - # Detect format: new has 3+ parts where first is not a digit (hostname) - if len(parts) >= 3 and not parts[0].isdigit(): - # New format: hostname:pid:timestamp - return parts[0], parts[1], ':'.join(parts[2:]) - # Old format: pid:timestamp - no hostname - return None, parts[0], ':'.join(parts[1:]) - - def should_force_acquire_lock(lock_hostname, lock_pid, lock_age, - my_hostname, my_pid, staleness_threshold=65) -> tuple: - """Determine if we should force-acquire an existing lock. - - Returns (should_acquire: bool, reason: str). - - Decision matrix: - - Same hostname AND same PID: self-lock (refresh/re-acquire) - - Same hostname, different PID: sibling worker (only acquire if stale) - - Different hostname: remote container (only acquire if stale) - """ - if lock_hostname == my_hostname and lock_pid == my_pid: - return True, "self-lock" - if lock_hostname == my_hostname: - # Sibling worker in same container - don't steal unless stale - if lock_age > staleness_threshold: - return True, "stale-sibling" - return False, "active-sibling" - # Different hostname - remote container - if lock_age > staleness_threshold: - return True, "stale-remote" - return False, "active-remote" - - def start_scheduler_heartbeat(lock_key, redis_client, container_hostname): - """Start a daemon thread that refreshes the scheduler lock every 30 seconds.""" - def heartbeat_loop(): - import time - while True: - try: - time.sleep(30) - refresh_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" - redis_client.set(lock_key, refresh_value, ex=60) - logger.debug(f"Refreshed scheduler lock in process {os.getpid()}") - except Exception as e: - logger.warning(f"Failed to refresh scheduler lock: {e}") - break # Stop refreshing if we can't connect to Redis - - import threading - heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True) - heartbeat_thread.start() - logger.info(f"Started scheduler lock heartbeat thread in process {os.getpid()}") - - # Use Redis-based distributed lock for scheduler coordination - # Any process can attempt to acquire the lock - first one wins - # This allows gunicorn workers to run the scheduler (APScheduler works with gunicorn) - if redis_client: - try: - # Use Redis SETNX for atomic lock acquisition (60-second expiry for auto-recovery) - # Short TTL ensures stale locks from crashed containers expire quickly - lock_key = 'pixelprobe:scheduler:lock' - # Lock format: hostname:pid:timestamp - allows detection of stale self-locks after container restart - lock_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" - - # NOTE: We do NOT delete existing locks on startup - the 60-second TTL handles stale locks - # from crashed containers. Deleting unconditionally causes race conditions when multiple - # containers start simultaneously (e.g., during deployment) as they delete each other's locks. - - # Try to set the lock (only succeeds if key doesn't exist) - acquired = redis_client.set(lock_key, lock_value, nx=True, ex=60) - - if acquired: - logger.info(f"Acquired Redis scheduler lock in process {os.getpid()}, initializing scheduler") - scheduler.init_app(app) - scheduler_initialized[0] = True - app.scheduler_redis_lock_key = lock_key - start_scheduler_heartbeat(lock_key, redis_client, container_hostname) - else: - # Check who has the lock and whether it's stale or from same process - existing = redis_client.get(lock_key) - if existing: - existing = existing.decode('utf-8') if isinstance(existing, bytes) else existing - try: - # Parse lock to extract hostname, pid, and timestamp - lock_hostname, lock_pid, lock_timestamp_str = parse_scheduler_lock(existing) - lock_timestamp = datetime.fromisoformat(lock_timestamp_str) - lock_age = (datetime.now(timezone.utc) - lock_timestamp).total_seconds() - current_pid = str(os.getpid()) - - # Determine if we should acquire the lock - should_acquire, reason = should_force_acquire_lock( - lock_hostname, lock_pid, lock_age, - container_hostname, current_pid - ) - - if should_acquire: - logger.warning(f"Acquiring scheduler lock (reason={reason}, age={lock_age:.0f}s, holder={existing})") - redis_client.set(lock_key, lock_value, ex=60) - logger.info(f"Acquired scheduler lock in process {os.getpid()}, initializing scheduler") - scheduler.init_app(app) - scheduler_initialized[0] = True - app.scheduler_redis_lock_key = lock_key - start_scheduler_heartbeat(lock_key, redis_client, container_hostname) - else: - logger.info(f"Scheduler lock held by sibling/remote (reason={reason}, holder={existing}, age={lock_age:.0f}s), skipping in process {os.getpid()}") - - # Start background retry thread for stale lock recovery - def retry_scheduler_lock(): - import time - retry_count = 0 - max_retries = 10 # Try for ~5 minutes - - while not scheduler_initialized[0] and retry_count < max_retries: - time.sleep(30) - retry_count += 1 - - try: - current_lock = redis_client.get(lock_key) - if not current_lock: - # Lock expired, try to acquire - new_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" - retry_acquired = redis_client.set(lock_key, new_value, nx=True, ex=60) - if retry_acquired: - logger.info(f"Retry #{retry_count}: Acquired scheduler lock, initializing scheduler") - with app.app_context(): - scheduler.init_app(app) - scheduler_initialized[0] = True - app.scheduler_redis_lock_key = lock_key - start_scheduler_heartbeat(lock_key, redis_client, container_hostname) - break - else: - # Check if lock is now stale - lock_str = current_lock.decode('utf-8') if isinstance(current_lock, bytes) else current_lock - retry_hostname, retry_pid, retry_ts_str = parse_scheduler_lock(lock_str) - lock_ts = datetime.fromisoformat(retry_ts_str) - current_age = (datetime.now(timezone.utc) - lock_ts).total_seconds() - retry_current_pid = str(os.getpid()) - - retry_should_acquire, retry_reason = should_force_acquire_lock( - retry_hostname, retry_pid, current_age, - container_hostname, retry_current_pid - ) - - if retry_should_acquire: - new_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" - redis_client.set(lock_key, new_value, ex=60) - logger.info(f"Retry #{retry_count}: Acquired lock (reason={retry_reason}, age={current_age:.0f}s), initializing scheduler") - with app.app_context(): - scheduler.init_app(app) - scheduler_initialized[0] = True - app.scheduler_redis_lock_key = lock_key - start_scheduler_heartbeat(lock_key, redis_client, container_hostname) - break - else: - logger.debug(f"Retry #{retry_count}: Lock still held (reason={retry_reason}, age={current_age:.0f}s)") - except Exception as retry_err: - logger.warning(f"Retry #{retry_count} failed: {retry_err}") - - if not scheduler_initialized[0]: - logger.warning("Scheduler lock retry exhausted - another process must have it") - - import threading - retry_thread = threading.Thread(target=retry_scheduler_lock, daemon=True) - retry_thread.start() - logger.info(f"Started scheduler lock retry thread in process {os.getpid()}") - - except Exception as parse_err: - # Couldn't parse timestamp, just log and skip - logger.info(f"Scheduler already running (lock held by: {existing}), skipping in process {os.getpid()}") - - except Exception as e: - logger.warning(f"Redis lock failed ({e}), falling back to file lock") - redis_client = None - - # Fallback to file lock if Redis unavailable (for local development without Redis) - if not redis_client and not scheduler_initialized[0]: - import fcntl - scheduler_lock_file = '/tmp/pixelprobe_scheduler.lock' - - try: - lock_file = open(scheduler_lock_file, 'w') - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) - - logger.info(f"Acquired file scheduler lock in process {os.getpid()}, initializing scheduler (Redis unavailable)") - scheduler.init_app(app) - app.scheduler_lock_file = lock_file - - except (IOError, OSError) as e: - logger.info(f"Scheduler already running in another process, skipping initialization in process {os.getpid()}") - + # Initialize scheduler with distributed lock coordination + from pixelprobe.scheduler_lock import initialize_scheduler_with_lock + initialize_scheduler_with_lock(app, scheduler) - # Clean up ALL active scans from previous runs - they can't still be running after restart - # NOTE: This query happens AFTER migration, so celery_task_id column exists - try: - from datetime import datetime, timezone, timedelta - from models import ScanState - stuck_scans = ScanState.query.filter( - ScanState.is_active == True - ).all() - - for scan in stuck_scans: - logger.warning(f"Found active scan {scan.id} from {scan.start_time}, marking as crashed (app restarted)") - scan.is_active = False - scan.phase = 'crashed' - scan.error_message = "Application restarted - scan was interrupted" - - if stuck_scans: - db.session.commit() - logger.info(f"Cleaned up {len(stuck_scans)} abandoned scans from previous run") - except Exception as e: - # If the query fails (e.g., column doesn't exist yet), log but don't crash - logger.warning(f"Could not clean up stuck scans on startup: {e}") - # This is not critical for app startup, so we continue - - # Clean up bloated scan results from pre-v2.4.213 (when warning_details stored thousands of lines) - # Files with large scan_output or warning_details will be marked for rescan - try: - from models import ScanResult - # Find records with large text fields (>50KB indicates old bloated format) - # Using SQL length() function for efficiency - bloated_results = db.session.query(ScanResult).filter( - db.or_( - db.func.length(ScanResult.scan_output) > 50000, - db.func.length(ScanResult.warning_details) > 50000 - ) - ).all() - - if bloated_results: - logger.info(f"Found {len(bloated_results)} scan results with bloated output fields (pre-v2.4.213 format)") - logger.info("Deleting bloated records to trigger efficient rescan with v2.4.213+ format") - - for result in bloated_results: - db.session.delete(result) - - db.session.commit() - logger.info(f"Deleted {len(bloated_results)} bloated scan results - they will be rescanned with efficient storage") - else: - logger.debug("No bloated scan results found - database is clean") - except Exception as e: - logger.warning(f"Could not clean up bloated scan results on startup: {e}") - # Not critical for app startup + # Clean up stale state from previous runs + from pixelprobe.startup import cleanup_stuck_scans, cleanup_bloated_scan_results + cleanup_stuck_scans(db) + cleanup_bloated_scan_results(db) if __name__ == '__main__': # Start the application (initialization already done above) diff --git a/auth.py b/auth.py index f3f1be1..7aa6220 100644 --- a/auth.py +++ b/auth.py @@ -16,6 +16,30 @@ login_manager = LoginManager() + +def _extract_bearer_token(req): + """Extract a Bearer token from the Authorization header. + + Supports both 'Bearer ' and raw token formats (for Swagger UI). + Returns None if no valid token is found. + """ + auth_header = req.headers.get('Authorization') + if not auth_header: + return None + + if ' ' in auth_header: + try: + scheme, token = auth_header.split(' ', 1) + if scheme.lower() == 'bearer': + return token + except ValueError: + pass + return None + else: + # No space means it's just the token (from Swagger UI) + return auth_header + + def init_auth(app): """Initialize authentication for the Flask app""" login_manager.init_app(app) @@ -69,31 +93,15 @@ def load_user(user_id): @login_manager.request_loader def load_user_from_request(request): # Check for API token in Authorization header - auth_header = request.headers.get('Authorization') - if auth_header: - try: - token = None - # Check if it has Bearer prefix - if ' ' in auth_header: - scheme, token = auth_header.split(' ', 1) - if scheme.lower() == 'bearer': - token = token - else: - # No space means it's just the token (from Swagger UI) - token = auth_header - - if token: - api_token = APIToken.query.filter_by(token=token, is_active=True).first() - if api_token and api_token.is_valid(): - api_token.update_last_used() - return api_token.user - except Exception as e: - logger.error(f"Error loading user from API token: {e}") - - # SECURITY: API tokens in query parameters removed per P0 security audit - # Tokens in URLs are logged and exposed in browser history - # API tokens MUST be sent via Authorization header only - # If you need to authenticate, use: Authorization: Bearer + try: + token = _extract_bearer_token(request) + if token: + api_token = APIToken.query.filter_by(token=token, is_active=True).first() + if api_token and api_token.is_valid(): + api_token.update_last_used() + return api_token.user + except Exception as e: + logger.error(f"Error loading user from API token: {e}") return None @@ -114,31 +122,11 @@ def decorated_function(*args, **kwargs): return f(*args, **kwargs) # Check for API token - auth_header = request.headers.get('Authorization') - token = None - - if auth_header: - # Check if it has Bearer prefix - if ' ' in auth_header: - try: - scheme, token = auth_header.split(' ', 1) - if scheme.lower() != 'bearer': - token = None - except ValueError: - pass - else: - # No space means it's just the token (from Swagger UI) - token = auth_header - - # SECURITY: API tokens in query parameters removed per P0 security audit - # Tokens in URLs are logged and exposed in browser history - # API tokens MUST be sent via Authorization header only - + token = _extract_bearer_token(request) if token: api_token = APIToken.query.filter_by(token=token, is_active=True).first() if api_token and api_token.is_valid(): api_token.update_last_used() - # Temporarily set current_user for this request request.current_user = api_token.user return f(*args, **kwargs) @@ -163,31 +151,11 @@ def check_auth(): return True # Check for API token - auth_header = request.headers.get('Authorization') - token = None - - if auth_header: - # Check if it has Bearer prefix - if ' ' in auth_header: - try: - scheme, token = auth_header.split(' ', 1) - if scheme.lower() != 'bearer': - token = None - except ValueError: - pass - else: - # No space means it's just the token (from Swagger UI) - token = auth_header - - # SECURITY: API tokens in query parameters removed per P0 security audit - # Tokens in URLs are logged and exposed in browser history - # API tokens MUST be sent via Authorization header only - + token = _extract_bearer_token(request) if token: api_token = APIToken.query.filter_by(token=token, is_active=True).first() if api_token and api_token.is_valid(): api_token.update_last_used() - # Store user in request context request.current_user = api_token.user return True @@ -258,30 +226,10 @@ def get_authenticated_user(request): return current_user # Check for API token in header - auth_header = request.headers.get('Authorization') - if auth_header: - token = None - # Check if it has Bearer prefix - if ' ' in auth_header: - try: - scheme, token = auth_header.split(' ', 1) - if scheme.lower() == 'bearer': - token = token - else: - token = None - except ValueError: - pass - else: - # No space means it's just the token (from Swagger UI) - token = auth_header - - if token: - api_token = APIToken.query.filter_by(token=token, is_active=True).first() - if api_token and api_token.is_valid(): - return api_token.user - - # SECURITY: API tokens in query parameters removed per P0 security audit - # Tokens in URLs are logged and exposed in browser history - # API tokens MUST be sent via Authorization header only + token = _extract_bearer_token(request) + if token: + api_token = APIToken.query.filter_by(token=token, is_active=True).first() + if api_token and api_token.is_valid(): + return api_token.user return None \ No newline at end of file diff --git a/celery_config.py b/celery_config.py index 1326cc5..c5b30e6 100644 --- a/celery_config.py +++ b/celery_config.py @@ -6,6 +6,15 @@ import os +def _make_context_task(celery_instance, flask_app): + """Create a ContextTask class that runs Celery tasks inside a Flask app context.""" + class ContextTask(celery_instance.Task): + def __call__(self, *args, **kwargs): + with flask_app.app_context(): + return self.run(*args, **kwargs) + celery_instance.Task = ContextTask + + def create_celery(app=None): """ Create and configure Celery instance for PixelProbe @@ -116,15 +125,7 @@ def create_celery(app=None): # Flask app context integration if app: - class ContextTask(celery.Task): - """Make celery tasks work with Flask app context""" - def __call__(self, *args, **kwargs): - with app.app_context(): - return self.run(*args, **kwargs) - - celery.Task = ContextTask - - # Update configuration from Flask app + _make_context_task(celery, app) celery.conf.update(app.config) return celery @@ -138,14 +139,7 @@ def _create_worker_celery(): # Import Flask app and set up context task for workers from app import app - - class ContextTask(celery.Task): - """Make celery tasks work with Flask app context""" - def __call__(self, *args, **kwargs): - with app.app_context(): - return self.run(*args, **kwargs) - - celery.Task = ContextTask + _make_context_task(celery, app) return celery celery_app = _create_worker_celery() @@ -154,12 +148,5 @@ def __call__(self, *args, **kwargs): def init_celery(app, celery): """Initialize Celery with Flask app""" celery.conf.update(app.config) - - class ContextTask(celery.Task): - """Make celery tasks work with Flask app context""" - def __call__(self, *args, **kwargs): - with app.app_context(): - return self.run(*args, **kwargs) - - celery.Task = ContextTask + _make_context_task(celery, app) return celery \ No newline at end of file diff --git a/media_checker.py b/media_checker.py index da1a912..5252f7a 100644 --- a/media_checker.py +++ b/media_checker.py @@ -68,40 +68,30 @@ def load_exclusions(): return default_excluded_paths, default_excluded_extensions def load_exclusions_with_patterns(): - """Load exclusion patterns including filename patterns - + """Load exclusion patterns including filename patterns. + + Reads exclusions.json once and returns all exclusion data. + Returns: tuple: (excluded_paths, excluded_extensions, excluded_patterns) """ - paths, extensions = load_exclusions() - - # Get default patterns default_patterns = get_default_filename_patterns() - - # Load user patterns from file if exists - user_patterns = [] + try: exclusions_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'exclusions.json') if os.path.exists(exclusions_file): with open(exclusions_file, 'r') as f: data = json.load(f) + paths = list(set(data.get('paths', []))) + extensions = list(set(data.get('extensions', []))) user_patterns = data.get('filename_patterns', []) + excluded_patterns = list(set(default_patterns + user_patterns)) + return paths, extensions, excluded_patterns except Exception as e: - logger.error(f"Error loading filename patterns: {e}") - - # Combine patterns - excluded_patterns = list(set(default_patterns + user_patterns)) - - return paths, extensions, excluded_patterns + logger.error(f"Error loading exclusions.json: {e}") -def truncate_scan_output(output_lines, max_lines=None, max_chars=None): - """Return scan output without truncation - users need full output for debugging""" - if not output_lines: - return [] + return [], [], default_patterns - # No truncation - return full output - # Users need complete ffmpeg output to diagnose issues - return output_lines class PixelProbe: def __init__(self, max_workers=None, excluded_paths=None, excluded_extensions=None, database_path=None, excluded_patterns=None): @@ -1140,7 +1130,7 @@ def _check_image_corruption(self, file_path): # Clear corruption details since we're treating it as a warning corruption_details = [] - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details def _check_video_corruption(self, file_path): corruption_details = [] @@ -1171,7 +1161,7 @@ def _check_video_corruption(self, file_path): is_corrupted = True scan_output.append("FFmpeg probe: No streams found") logger.warning(f"No streams found in {file_path}") - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None) if not video_stream: @@ -1411,7 +1401,7 @@ def _check_video_corruption(self, file_path): scan_output.extend(hevc_output) # Return warning details as well - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details def _check_audio_corruption(self, file_path): """Check audio files for corruption using FFmpeg and format-specific tools""" @@ -1433,7 +1423,7 @@ def _check_audio_corruption(self, file_path): is_corrupted = True scan_output.append("FFmpeg probe: No streams found") logger.warning(f"No streams found in {file_path}") - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details audio_stream = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None) if not audio_stream: @@ -1441,7 +1431,7 @@ def _check_audio_corruption(self, file_path): is_corrupted = True scan_output.append("FFmpeg probe: No audio stream") logger.warning(f"No audio stream found in {file_path}") - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details # Check audio stream properties codec_name = audio_stream.get('codec_name', 'unknown') @@ -1472,7 +1462,7 @@ def _check_audio_corruption(self, file_path): scan_tool = "ffmpeg" scan_output.append(f"FFprobe: FAILED - {stderr[:200]}") logger.error(f"FFprobe error on audio {file_path}: {stderr[:200]}") - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details # Step 2: Attempt to decode audio to check for corruption logger.info(f"Performing comprehensive audio validation for: {file_path}") @@ -1589,7 +1579,7 @@ def _check_audio_corruption(self, file_path): except Exception as e: logger.debug(f"FLAC test error: {str(e)}") - return is_corrupted, corruption_details, scan_tool, truncate_scan_output(scan_output), warning_details + return is_corrupted, corruption_details, scan_tool, scan_output, warning_details def _check_hevc_main10_issues(self, file_path): """Check for HEVC Main 10 specific issues that cause green tint/freezing""" diff --git a/models.py b/models.py index 8f400c5..66cf4d5 100644 --- a/models.py +++ b/models.py @@ -13,10 +13,17 @@ db = SQLAlchemy() -# Timezone handling moved to pixelprobe.utils.timezone -# Import shared utilities after models are loaded -# This will be imported in app.py to avoid circular imports +def convert_to_tz(dt): + """Return datetime as ISO string for display. + If datetime is naive, assume it's UTC. + Timezone conversion is handled in API routes. + """ + if dt is None: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.isoformat() class ScanResult(db.Model): __tablename__ = 'scan_results' @@ -52,8 +59,7 @@ class ScanResult(db.Model): media_info = db.Column(db.Text, nullable=True) # JSON string of media metadata file_exists = db.Column(db.Boolean, nullable=False, default=True, index=True) # Whether file exists on disk - # Temporary: Keep deep_scan column until migration is run (will be removed in v2.2.90) - # This prevents insert failures in production environments that haven't run the migration yet + # Legacy column kept for backward compatibility with older database schemas deep_scan = db.Column(db.Boolean, nullable=True, default=False, server_default='false') # Output rotation tracking @@ -68,16 +74,6 @@ class ScanResult(db.Model): ) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'file_path': self.file_path, @@ -151,20 +147,10 @@ class IgnoredErrorPattern(db.Model): pattern = db.Column(db.String(200), nullable=False, unique=True) description = db.Column(db.String(500)) created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)) - created_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) # Keep for backward compatibility + created_date = db.Column(db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) # Keep for backward compatibility is_active = db.Column(db.Boolean, nullable=False, default=True) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'pattern': self.pattern, @@ -188,16 +174,6 @@ class Exclusion(db.Model): ) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'type': self.exclusion_type, @@ -220,19 +196,9 @@ class ScanSchedule(db.Model): last_run = db.Column(db.DateTime, nullable=True) next_run = db.Column(db.DateTime, nullable=True) created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)) - created_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) # Keep for backward compatibility + created_date = db.Column(db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) # Keep for backward compatibility def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'name': self.name, @@ -269,16 +235,6 @@ class HealthcheckConfig(db.Model): schedule = db.relationship('ScanSchedule', backref=db.backref('healthcheck_config', uselist=False, cascade='all, delete-orphan')) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'schedule_id': self.schedule_id, @@ -302,7 +258,7 @@ class ScanConfiguration(db.Model): key = db.Column(db.String(50), nullable=True, unique=True) value = db.Column(db.Text, nullable=True) description = db.Column(db.String(200)) - updated_date = db.Column(db.DateTime, nullable=True, default=datetime.utcnow) + updated_date = db.Column(db.DateTime, nullable=True, default=lambda: datetime.now(timezone.utc)) # New structure expected by API and repositories path = db.Column(db.String(500), nullable=True, unique=True) @@ -310,16 +266,6 @@ class ScanConfiguration(db.Model): created_at = db.Column(db.DateTime(timezone=True), nullable=True, default=lambda: datetime.now(timezone.utc)) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - # Support both old and new structures if self.path is not None: # New path-based structure @@ -352,7 +298,7 @@ class ScanState(db.Model): files_processed = db.Column(db.Integer, nullable=False, default=0) estimated_total = db.Column(db.Integer, nullable=False, default=0) discovery_count = db.Column(db.Integer, nullable=False, default=0) - start_time = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + start_time = db.Column(db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) end_time = db.Column(db.DateTime, nullable=True) current_file = db.Column(db.String(500), nullable=True) progress_message = db.Column(db.String(1000), nullable=True) # Increased from 200 @@ -701,16 +647,6 @@ class ScanChunk(db.Model): __table_args__ = (db.UniqueConstraint('scan_id', 'chunk_id', name='uq_scan_chunks_scan_chunk'),) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'scan_id': self.scan_id, @@ -767,16 +703,6 @@ class ScanReport(db.Model): created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)) def to_dict(self): - def convert_to_tz(dt): - """Return datetime as ISO string for display""" - if dt is None: - return None - # If datetime is naive, assume it's UTC - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - # Return as ISO string - timezone conversion handled in API routes - return dt.isoformat() - return { 'id': self.id, 'report_id': self.report_id, @@ -786,11 +712,11 @@ def convert_to_tz(dt): 'duration_seconds': self.duration_seconds, 'directories_scanned': json.loads(self.directories_scanned) if self.directories_scanned else [], 'force_rescan': self.force_rescan, - 'num_workers': getattr(self, 'num_workers', 1), + 'num_workers': self.num_workers, 'total_files_discovered': self.total_files_discovered, 'files_scanned': self.files_scanned, - 'files_added': getattr(self, 'files_added', 0), - 'files_updated': getattr(self, 'files_updated', 0), + 'files_added': self.files_added, + 'files_updated': self.files_updated, 'files_corrupted': self.files_corrupted, 'files_with_warnings': self.files_with_warnings, 'files_error': self.files_error, diff --git a/pixelprobe/api/admin_routes.py b/pixelprobe/api/admin_routes.py index ab854fe..9391f02 100644 --- a/pixelprobe/api/admin_routes.py +++ b/pixelprobe/api/admin_routes.py @@ -15,27 +15,8 @@ admin_bp = Blueprint('admin', __name__, url_prefix='/api') -# Import limiter from main app from flask import current_app -from functools import wraps - -# Create rate limit decorators that work with Flask-Limiter -def rate_limit(limit_string): - """Decorator to apply rate limits using the app's limiter""" - def decorator(f): - @wraps(f) - def wrapped(*args, **kwargs): - # Get the limiter from the current app - limiter = current_app.extensions.get('flask-limiter') - if limiter: - # Apply the rate limit dynamically - limited_func = limiter.limit(limit_string, exempt_when=lambda: False)(f) - return limited_func(*args, **kwargs) - else: - # If no limiter, just call the function - return f(*args, **kwargs) - return wrapped - return decorator +from pixelprobe.utils.rate_limiting import rate_limit # Get scheduler instance (will be initialized in app context) scheduler = None diff --git a/pixelprobe/api/maintenance_routes.py b/pixelprobe/api/maintenance_routes.py index edb4d1f..ae77a53 100644 --- a/pixelprobe/api/maintenance_routes.py +++ b/pixelprobe/api/maintenance_routes.py @@ -25,42 +25,8 @@ maintenance_bp = Blueprint('maintenance', __name__, url_prefix='/api') -# Import limiter from main app from flask import current_app -from functools import wraps - -# Create rate limit decorators that work with Flask-Limiter -def rate_limit(limit_string): - """Decorator to apply rate limits using the app's limiter""" - def decorator(f): - @wraps(f) - def wrapped(*args, **kwargs): - # Get the limiter from the current app - limiter = current_app.extensions.get('flask-limiter') - if limiter: - # Apply the rate limit dynamically - limited_func = limiter.limit(limit_string, exempt_when=lambda: False)(f) - return limited_func(*args, **kwargs) - else: - # If no limiter, just call the function - return f(*args, **kwargs) - return wrapped - return decorator - -def exempt_from_rate_limit(f): - """Decorator to exempt a function from rate limiting""" - @wraps(f) - def wrapped(*args, **kwargs): - # Get the limiter from the current app - limiter = current_app.extensions.get('flask-limiter') - if limiter: - # Apply exemption dynamically - exempt_func = limiter.exempt(f) - return exempt_func(*args, **kwargs) - else: - # If no limiter, just call the function - return f(*args, **kwargs) - return wrapped +from pixelprobe.utils.rate_limiting import rate_limit, exempt_from_rate_limit # Global state tracking - will be moved to service layer cleanup_state = { diff --git a/pixelprobe/api/scan_routes.py b/pixelprobe/api/scan_routes.py index ef25ee4..5429689 100644 --- a/pixelprobe/api/scan_routes.py +++ b/pixelprobe/api/scan_routes.py @@ -7,6 +7,7 @@ from media_checker import PixelProbe, load_exclusions from models import db, ScanResult, ScanState +from pixelprobe.constants import TERMINAL_SCAN_PHASES from version import __version__ from auth import auth_required @@ -19,19 +20,7 @@ logger = logging.getLogger(__name__) -def check_celery_available(): - """Check if Celery is available and broker is reachable""" - celery_enabled = current_app.config.get('CELERY_BROKER_URL') and hasattr(current_app, 'celery') - - if celery_enabled: - # Test Celery broker connection before using - try: - current_app.celery.control.ping(timeout=1.0) - except Exception as e: - logger.warning(f"Celery broker connection failed: {e}. Falling back to direct scan service.") - celery_enabled = False - - return celery_enabled +from pixelprobe.utils.celery_utils import check_celery_available def safe_check_task_state(celery_task_id, app, max_retries=3, base_delay=0.5): @@ -80,41 +69,7 @@ def safe_check_task_state(celery_task_id, app, max_retries=3, base_delay=0.5): scan_bp = Blueprint('scan', __name__, url_prefix='/api') -# Import limiter from main app -from functools import wraps - -# Create rate limit decorators that work with Flask-Limiter -def rate_limit(limit_string): - """Decorator to apply rate limits using the app's limiter""" - def decorator(f): - @wraps(f) - def wrapped(*args, **kwargs): - # Get the limiter from the current app - limiter = current_app.extensions.get('flask-limiter') - if limiter: - # Apply the rate limit dynamically - limited_func = limiter.limit(limit_string, exempt_when=lambda: False)(f) - return limited_func(*args, **kwargs) - else: - # If no limiter, just call the function - return f(*args, **kwargs) - return wrapped - return decorator - -def exempt_from_rate_limit(f): - """Decorator to exempt a function from rate limiting""" - @wraps(f) - def wrapped(*args, **kwargs): - # Get the limiter from the current app - limiter = current_app.extensions.get('flask-limiter') - if limiter: - # Apply exemption dynamically - exempt_func = limiter.exempt(f) - return exempt_func(*args, **kwargs) - else: - # If no limiter, just call the function - return f(*args, **kwargs) - return wrapped +from pixelprobe.utils.rate_limiting import rate_limit, exempt_from_rate_limit def is_scan_running(): """Check if a scan is currently running (thread or Celery)""" @@ -125,7 +80,7 @@ def is_scan_running(): # Check database for active Celery-based scans try: active_scan = ScanState.query.filter_by(is_active=True).first() - if active_scan and active_scan.phase not in ['idle', 'completed', 'error', 'crashed', 'cancelled']: + if active_scan and active_scan.phase not in TERMINAL_SCAN_PHASES: # Check if scan is stale (no progress update) from datetime import datetime, timezone, timedelta @@ -467,7 +422,7 @@ def scan(): scan_state = db.session.query(ScanState).with_for_update(nowait=True).first() # Check if a scan is already running while we hold the lock - if scan_state.is_active and scan_state.phase not in ['idle', 'completed', 'error', 'crashed', 'cancelled']: + if scan_state.is_active and scan_state.phase not in TERMINAL_SCAN_PHASES: phase_info = f" (Phase: {scan_state.phase}, Files processed: {scan_state.files_processed})" db.session.rollback() return { @@ -496,16 +451,9 @@ def scan(): # If no directories provided, use configured ones if not scan_dirs: - from models import ScanConfiguration - configs = ScanConfiguration.query.filter_by(is_active=True).all() - scan_dirs = [config.path for config in configs] - - # If no database config, fall back to environment variable - if not scan_dirs: - scan_paths_env = os.environ.get('SCAN_PATHS', '') - scan_dirs = [path.strip() for path in scan_paths_env.split(',') if path.strip()] - logger.info(f"Using SCAN_PATHS from environment: {scan_dirs}") - + from pixelprobe.utils.helpers import get_configured_scan_paths + scan_dirs = get_configured_scan_paths() + if not scan_dirs: return {'error': 'No directories configured for scanning. Set SCAN_PATHS environment variable or configure paths in the admin interface.'}, 400 @@ -1042,16 +990,9 @@ def scan_files_parallel(): # Otherwise scan directories # If no directories provided, use configured ones if not scan_dirs: - from models import ScanConfiguration - configs = ScanConfiguration.query.filter_by(is_active=True).all() - scan_dirs = [config.path for config in configs] - - # If no database config, fall back to environment variable - if not scan_dirs: - scan_paths_env = os.environ.get('SCAN_PATHS', '') - scan_dirs = [path.strip() for path in scan_paths_env.split(',') if path.strip()] - logger.info(f"Using SCAN_PATHS from environment: {scan_dirs}") - + from pixelprobe.utils.helpers import get_configured_scan_paths + scan_dirs = get_configured_scan_paths() + if not scan_dirs: return {'error': 'No directories configured for scanning. Set SCAN_PATHS environment variable or configure paths in the admin interface.'}, 400 diff --git a/pixelprobe/api/scan_routes_parallel.py b/pixelprobe/api/scan_routes_parallel.py index 0343e2b..81766ad 100644 --- a/pixelprobe/api/scan_routes_parallel.py +++ b/pixelprobe/api/scan_routes_parallel.py @@ -19,25 +19,7 @@ parallel_scan_bp = Blueprint('parallel_scan', __name__, url_prefix='/api') -def check_celery_available(): - """Check if Celery is available and configured""" - try: - if not hasattr(current_app, 'celery'): - return False - - # Check if broker URL is configured - broker_url = current_app.config.get('CELERY_BROKER_URL') - if not broker_url: - return False - - # Try to get worker stats - from celery import current_app as celery_app - stats = celery_app.control.inspect().stats() - return stats is not None and len(stats) > 0 - - except Exception as e: - logger.debug(f"Celery not available: {e}") - return False +from pixelprobe.utils.celery_utils import check_celery_available @parallel_scan_bp.route('/scan-parallel', methods=['POST']) diff --git a/pixelprobe/api/stats_routes.py b/pixelprobe/api/stats_routes.py index c941fc6..c2d0945 100644 --- a/pixelprobe/api/stats_routes.py +++ b/pixelprobe/api/stats_routes.py @@ -4,7 +4,6 @@ import time import logging from datetime import datetime, timezone -from functools import wraps from models import db, ScanResult from version import __version__ @@ -16,20 +15,7 @@ stats_bp = Blueprint('stats', __name__, url_prefix='/api') -def exempt_from_rate_limit(f): - """Decorator to exempt a function from rate limiting""" - @wraps(f) - def wrapped(*args, **kwargs): - # Get the limiter from the current app - limiter = current_app.extensions.get('flask-limiter') - if limiter: - # Apply exemption dynamically - exempt_func = limiter.exempt(f) - return exempt_func(*args, **kwargs) - else: - # If no limiter, just call the function - return f(*args, **kwargs) - return wrapped +from pixelprobe.utils.rate_limiting import exempt_from_rate_limit @stats_bp.route('/stats') @exempt_from_rate_limit @@ -344,10 +330,9 @@ def get_system_info(): monitored_paths = [] total_filesystem_files = db_total_files # Use DB total since all files are scanned - # Get configured scan paths from environment (no hardcoded defaults) - scan_paths_env = os.environ.get('SCAN_PATHS', '') - scan_paths = [p.strip() for p in scan_paths_env.split(',') if p.strip()] # Remove empty strings - + from pixelprobe.utils.helpers import get_configured_scan_paths + scan_paths = get_configured_scan_paths() + if not scan_paths: # No scan paths configured - use empty path counts path_counts = {} diff --git a/pixelprobe/migrations/__init__.py b/pixelprobe/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pixelprobe/migrations/startup.py b/pixelprobe/migrations/startup.py new file mode 100644 index 0000000..8c01466 --- /dev/null +++ b/pixelprobe/migrations/startup.py @@ -0,0 +1,266 @@ +""" +Database migration functions executed during PixelProbe startup. + +These are run once on application startup to ensure the database schema +is up-to-date. Each migration is idempotent (safe to re-run). +""" + +import os +import logging +from sqlalchemy import text, inspect, exc + +logger = logging.getLogger(__name__) + +MIGRATION_ADVISORY_LOCK_ID = 7283945162 + + +def run_auth_migration(db): + """Run authentication tables migration for v2.4.0""" + try: + inspector = inspect(db.engine) + existing_tables = inspector.get_table_names() + + with db.engine.connect() as conn: + if 'users' not in existing_tables: + conn.execute(text(""" + CREATE TABLE IF NOT EXISTS users ( + id SERIAL PRIMARY KEY, + username VARCHAR(80) UNIQUE NOT NULL, + email VARCHAR(120) UNIQUE NOT NULL, + password_hash VARCHAR(128) NOT NULL, + is_admin BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_login TIMESTAMP WITH TIME ZONE, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + first_setup_required BOOLEAN NOT NULL DEFAULT FALSE + ) + """)) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")) + logger.info("Created users table via migration") + + if 'api_tokens' not in existing_tables: + conn.execute(text(""" + CREATE TABLE IF NOT EXISTS api_tokens ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + token VARCHAR(64) UNIQUE NOT NULL, + description VARCHAR(200), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_used TIMESTAMP WITH TIME ZONE, + expires_at TIMESTAMP WITH TIME ZONE, + is_active BOOLEAN NOT NULL DEFAULT TRUE + ) + """)) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_api_tokens_token ON api_tokens(token)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_api_tokens_user_id ON api_tokens(user_id)")) + logger.info("Created api_tokens table via migration") + + logger.info("Authentication tables migration completed") + conn.commit() + + except Exception as e: + logger.warning(f"Authentication migration encountered issues: {e}") + + +def run_v2_4_35_migrations(db): + """Run migrations for v2.4.35 - add last_heartbeat column to file_changes_state""" + try: + with db.engine.connect() as conn: + table_check = conn.execute(text(""" + SELECT table_name + FROM information_schema.tables + WHERE table_name = 'file_changes_state' + """)) + + if not table_check.fetchone(): + logger.debug("file_changes_state table does not exist - skipping migration (new installation)") + return + + result = conn.execute(text(""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'file_changes_state' + AND column_name = 'last_heartbeat' + """)) + + if not result.fetchone(): + logger.info("Applying migration: Adding last_heartbeat column to file_changes_state table") + conn.execute(text(""" + ALTER TABLE file_changes_state + ADD COLUMN last_heartbeat TIMESTAMP WITH TIME ZONE + """)) + conn.commit() + logger.info("Migration completed: last_heartbeat column added successfully") + else: + logger.debug("Migration already applied: last_heartbeat column exists") + + except Exception as e: + logger.error(f"Migration v2.4.35 failed: {e}") + + +def run_v2_4_113_migrations(db): + """Run migrations for v2.4.113 - add last_integrity_check_date column to scan_results""" + try: + with db.engine.connect() as conn: + table_check = conn.execute(text(""" + SELECT table_name + FROM information_schema.tables + WHERE table_name = 'scan_results' + """)) + + if not table_check.fetchone(): + logger.debug("scan_results table does not exist - skipping migration (new installation)") + return + + result = conn.execute(text(""" + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'scan_results' + AND column_name = 'last_integrity_check_date' + """)) + + if not result.fetchone(): + logger.info("Applying migration: Adding last_integrity_check_date column to scan_results table") + conn.execute(text(""" + ALTER TABLE scan_results + ADD COLUMN last_integrity_check_date TIMESTAMP + """)) + conn.execute(text(""" + CREATE INDEX IF NOT EXISTS idx_scan_results_last_integrity_check + ON scan_results(last_integrity_check_date) + """)) + conn.commit() + logger.info("Migration completed: last_integrity_check_date column and index added successfully") + else: + logger.debug("Migration already applied: last_integrity_check_date column exists") + + except Exception as e: + logger.error(f"Migration v2.4.113 failed: {e}") + + +def create_performance_indexes(db): + """Create performance indexes""" + indexes = [ + "CREATE INDEX IF NOT EXISTS idx_scan_status ON scan_results(scan_status)", + "CREATE INDEX IF NOT EXISTS idx_scan_date ON scan_results(scan_date)", + "CREATE INDEX IF NOT EXISTS idx_is_corrupted ON scan_results(is_corrupted)", + "CREATE INDEX IF NOT EXISTS idx_marked_as_good ON scan_results(marked_as_good)", + "CREATE INDEX IF NOT EXISTS idx_discovered_date ON scan_results(discovered_date)", + "CREATE INDEX IF NOT EXISTS idx_file_hash ON scan_results(file_hash)", + "CREATE INDEX IF NOT EXISTS idx_last_modified ON scan_results(last_modified)", + "CREATE INDEX IF NOT EXISTS idx_file_path ON scan_results(file_path)", + "CREATE INDEX IF NOT EXISTS idx_status_date ON scan_results(scan_status, scan_date)", + "CREATE INDEX IF NOT EXISTS idx_corrupted_good ON scan_results(is_corrupted, marked_as_good)", + "CREATE INDEX IF NOT EXISTS idx_file_path_status ON scan_results(file_path, scan_status)" + ] + + logger.info("Creating performance indexes...") + created_count = 0 + for index_sql in indexes: + try: + with db.engine.begin() as conn: + conn.execute(text(index_sql)) + created_count += 1 + except Exception as e: + if 'already exists' not in str(e).lower() and 'does not exist' not in str(e).lower(): + logger.debug(f"Could not create index: {e}") + + if created_count > 0: + logger.info(f"Created {created_count} performance indexes") + else: + logger.debug("All performance indexes already exist") + + +def _run_all_migrations(db): + """Execute all database migrations. Called by migrate_database() after acquiring lock.""" + from tools.app_startup_migration import run_startup_migrations + + logger.info("Running startup migrations...") + try: + run_startup_migrations(db) + logger.info("Startup migrations completed successfully") + except Exception as e: + logger.error(f"Startup migration failed: {e}") + + logger.info("Checking authentication tables...") + try: + run_auth_migration(db) + logger.info("Authentication tables verified") + except Exception as e: + logger.error(f"Authentication migration failed: {e}") + + logger.info("Running v2.4.35 migration...") + try: + run_v2_4_35_migrations(db) + logger.info("v2.4.35 migration completed successfully") + except Exception as e: + logger.error(f"v2.4.35 migration failed: {e}") + + logger.info("Running v2.4.113 migration...") + try: + run_v2_4_113_migrations(db) + logger.info("v2.4.113 migration completed successfully") + except Exception as e: + logger.error(f"v2.4.113 migration failed: {e}") + + logger.info("Creating performance indexes...") + try: + create_performance_indexes(db) + logger.info("Performance indexes created successfully") + except Exception as e: + logger.error(f"Failed to create performance indexes: {e}") + + logger.info("Database initialization completed") + + +def migrate_database(db): + """Run database migrations - uses PostgreSQL advisory lock to coordinate across containers. + + Advisory locks work across all connections to the same database, unlike file locks + which are scoped to a single container's filesystem. + """ + lock_conn = None + try: + lock_conn = db.engine.connect() + + result = lock_conn.execute( + text("SELECT pg_try_advisory_lock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + acquired = result.scalar() + + if acquired: + logger.info(f"Acquired PostgreSQL advisory lock in process {os.getpid()}, running migrations") + try: + _run_all_migrations(db) + except Exception as mig_err: + logger.error(f"Migration error (lock held): {mig_err}") + finally: + lock_conn.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + logger.info("Released PostgreSQL advisory lock") + else: + logger.info(f"Migrations already running in another process, waiting for completion (process {os.getpid()})...") + lock_conn.execute( + text("SELECT pg_advisory_lock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + lock_conn.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": MIGRATION_ADVISORY_LOCK_ID} + ) + logger.info(f"Migrations completed by another process, continuing startup in process {os.getpid()}") + + except Exception as e: + logger.warning(f"Could not use advisory lock ({e}), running migrations without coordination") + _run_all_migrations(db) + + finally: + if lock_conn is not None: + try: + lock_conn.close() + except Exception: + pass diff --git a/pixelprobe/scheduler_lock.py b/pixelprobe/scheduler_lock.py new file mode 100644 index 0000000..26d49da --- /dev/null +++ b/pixelprobe/scheduler_lock.py @@ -0,0 +1,210 @@ +""" +Redis-based distributed lock for scheduler coordination across containers. + +Uses Redis SETNX for atomic lock acquisition with a 60-second TTL for auto-recovery. +A heartbeat thread refreshes the lock every 30 seconds to keep it alive. +Falls back to file-based locking when Redis is unavailable. +""" + +import os +import logging +import threading +import time +import socket +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + +LOCK_KEY = 'pixelprobe:scheduler:lock' + + +def parse_scheduler_lock(lock_value): + """Parse lock value into (hostname, pid, timestamp_str). + + Lock formats: + - New: "hostname:pid:timestamp" (e.g., "pixelprobe-app:123:2026-01-01T00:00:00+00:00") + - Old: "pid:timestamp" (e.g., "123:2026-01-01T00:00:00+00:00") + """ + parts = lock_value.split(':') + if len(parts) >= 3 and not parts[0].isdigit(): + return parts[0], parts[1], ':'.join(parts[2:]) + return None, parts[0], ':'.join(parts[1:]) + + +def should_force_acquire_lock(lock_hostname, lock_pid, lock_age, + my_hostname, my_pid, staleness_threshold=65): + """Determine if we should force-acquire an existing lock. + + Returns (should_acquire: bool, reason: str). + """ + if lock_hostname == my_hostname and lock_pid == my_pid: + return True, "self-lock" + if lock_hostname == my_hostname: + if lock_age > staleness_threshold: + return True, "stale-sibling" + return False, "active-sibling" + if lock_age > staleness_threshold: + return True, "stale-remote" + return False, "active-remote" + + +def _start_heartbeat(lock_key, redis_client, container_hostname): + """Start a daemon thread that refreshes the scheduler lock every 30 seconds.""" + def heartbeat_loop(): + while True: + try: + time.sleep(30) + refresh_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" + redis_client.set(lock_key, refresh_value, ex=60) + logger.debug(f"Refreshed scheduler lock in process {os.getpid()}") + except Exception as e: + logger.warning(f"Failed to refresh scheduler lock: {e}") + break + + heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True) + heartbeat_thread.start() + logger.info(f"Started scheduler lock heartbeat thread in process {os.getpid()}") + + +def _try_acquire_and_init(redis_client, lock_key, lock_value, container_hostname, + scheduler, app, scheduler_initialized): + """Acquire the lock, initialize the scheduler, and start heartbeat.""" + scheduler.init_app(app) + scheduler_initialized[0] = True + app.scheduler_redis_lock_key = lock_key + _start_heartbeat(lock_key, redis_client, container_hostname) + + +def _start_retry_thread(redis_client, lock_key, container_hostname, + scheduler, app, scheduler_initialized): + """Start background retry thread for stale lock recovery.""" + def retry_scheduler_lock(): + retry_count = 0 + max_retries = 10 + + while not scheduler_initialized[0] and retry_count < max_retries: + time.sleep(30) + retry_count += 1 + + try: + current_lock = redis_client.get(lock_key) + new_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" + + if not current_lock: + retry_acquired = redis_client.set(lock_key, new_value, nx=True, ex=60) + if retry_acquired: + logger.info(f"Retry #{retry_count}: Acquired scheduler lock, initializing scheduler") + with app.app_context(): + _try_acquire_and_init( + redis_client, lock_key, new_value, container_hostname, + scheduler, app, scheduler_initialized + ) + break + else: + lock_str = current_lock.decode('utf-8') if isinstance(current_lock, bytes) else current_lock + retry_hostname, retry_pid, retry_ts_str = parse_scheduler_lock(lock_str) + lock_ts = datetime.fromisoformat(retry_ts_str) + current_age = (datetime.now(timezone.utc) - lock_ts).total_seconds() + + retry_should_acquire, retry_reason = should_force_acquire_lock( + retry_hostname, retry_pid, current_age, + container_hostname, str(os.getpid()) + ) + + if retry_should_acquire: + redis_client.set(lock_key, new_value, ex=60) + logger.info(f"Retry #{retry_count}: Acquired lock (reason={retry_reason}, age={current_age:.0f}s), initializing scheduler") + with app.app_context(): + _try_acquire_and_init( + redis_client, lock_key, new_value, container_hostname, + scheduler, app, scheduler_initialized + ) + break + else: + logger.debug(f"Retry #{retry_count}: Lock still held (reason={retry_reason}, age={current_age:.0f}s)") + except Exception as retry_err: + logger.warning(f"Retry #{retry_count} failed: {retry_err}") + + if not scheduler_initialized[0]: + logger.warning("Scheduler lock retry exhausted - another process must have it") + + retry_thread = threading.Thread(target=retry_scheduler_lock, daemon=True) + retry_thread.start() + logger.info(f"Started scheduler lock retry thread in process {os.getpid()}") + + +def initialize_scheduler_with_lock(app, scheduler): + """Initialize the scheduler using a Redis distributed lock (or file lock fallback). + + Returns True if the scheduler was initialized in this process. + """ + from pixelprobe.progress_utils import get_redis_client + + redis_client = get_redis_client() + scheduler_initialized = [False] + container_hostname = socket.gethostname() + + if redis_client: + try: + lock_value = f"{container_hostname}:{os.getpid()}:{datetime.now(timezone.utc).isoformat()}" + acquired = redis_client.set(LOCK_KEY, lock_value, nx=True, ex=60) + + if acquired: + logger.info(f"Acquired Redis scheduler lock in process {os.getpid()}, initializing scheduler") + _try_acquire_and_init( + redis_client, LOCK_KEY, lock_value, container_hostname, + scheduler, app, scheduler_initialized + ) + else: + existing = redis_client.get(LOCK_KEY) + if existing: + existing = existing.decode('utf-8') if isinstance(existing, bytes) else existing + try: + lock_hostname, lock_pid, lock_timestamp_str = parse_scheduler_lock(existing) + lock_timestamp = datetime.fromisoformat(lock_timestamp_str) + lock_age = (datetime.now(timezone.utc) - lock_timestamp).total_seconds() + + should_acquire, reason = should_force_acquire_lock( + lock_hostname, lock_pid, lock_age, + container_hostname, str(os.getpid()) + ) + + if should_acquire: + logger.warning(f"Acquiring scheduler lock (reason={reason}, age={lock_age:.0f}s, holder={existing})") + redis_client.set(LOCK_KEY, lock_value, ex=60) + logger.info(f"Acquired scheduler lock in process {os.getpid()}, initializing scheduler") + _try_acquire_and_init( + redis_client, LOCK_KEY, lock_value, container_hostname, + scheduler, app, scheduler_initialized + ) + else: + logger.info(f"Scheduler lock held by sibling/remote (reason={reason}, holder={existing}, age={lock_age:.0f}s), skipping in process {os.getpid()}") + _start_retry_thread( + redis_client, LOCK_KEY, container_hostname, + scheduler, app, scheduler_initialized + ) + except Exception as parse_err: + logger.info(f"Scheduler already running (lock held by: {existing}), skipping in process {os.getpid()}") + + except Exception as e: + logger.warning(f"Redis lock failed ({e}), falling back to file lock") + redis_client = None + + # Fallback to file lock if Redis unavailable + if not redis_client and not scheduler_initialized[0]: + import fcntl + scheduler_lock_file = '/tmp/pixelprobe_scheduler.lock' + + try: + lock_file = open(scheduler_lock_file, 'w') + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + + logger.info(f"Acquired file scheduler lock in process {os.getpid()}, initializing scheduler (Redis unavailable)") + scheduler.init_app(app) + app.scheduler_lock_file = lock_file + scheduler_initialized[0] = True + + except (IOError, OSError): + logger.info(f"Scheduler already running in another process, skipping initialization in process {os.getpid()}") + + return scheduler_initialized[0] diff --git a/pixelprobe/services/stats_service.py b/pixelprobe/services/stats_service.py index cb6ab6b..6588866 100644 --- a/pixelprobe/services/stats_service.py +++ b/pixelprobe/services/stats_service.py @@ -174,9 +174,8 @@ def _get_stats_fallback(self) -> Dict: def _get_monitored_paths(self) -> List[Dict]: """Get information about monitored paths""" try: - # Get configured scan paths (no hardcoded defaults) - scan_paths_env = os.environ.get('SCAN_PATHS', '') - scan_paths = [p.strip() for p in scan_paths_env.split(',') if p.strip()] + from pixelprobe.utils.helpers import get_configured_scan_paths + scan_paths = get_configured_scan_paths() if not scan_paths: # No scan paths configured diff --git a/pixelprobe/startup.py b/pixelprobe/startup.py new file mode 100644 index 0000000..094e946 --- /dev/null +++ b/pixelprobe/startup.py @@ -0,0 +1,92 @@ +""" +Startup cleanup routines for PixelProbe. + +These functions run during application initialization to clean up +state from previous runs (e.g., crashed scans, bloated records). +""" + +import logging +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + + +def cleanup_stuck_operations(db): + """Clean up any stuck operations from previous runs""" + try: + from models import FileChangesState, CleanupState + + active_file_changes = FileChangesState.query.filter_by(is_active=True).all() + for file_change in active_file_changes: + file_change.is_active = False + file_change.phase = 'failed' + file_change.end_time = datetime.now(timezone.utc) + file_change.progress_message = 'Application restarted - operation marked as failed' + logger.warning(f"Marking stuck file changes operation {file_change.check_id} as failed") + + active_cleanups = CleanupState.query.filter_by(is_active=True).all() + for cleanup in active_cleanups: + cleanup.is_active = False + cleanup.phase = 'failed' + cleanup.end_time = datetime.now(timezone.utc) + cleanup.progress_message = 'Application restarted - operation marked as failed' + logger.warning(f"Marking stuck cleanup operation {cleanup.cleanup_id} as failed") + + if active_file_changes or active_cleanups: + db.session.commit() + logger.info(f"Cleaned up {len(active_file_changes)} stuck file changes and {len(active_cleanups)} stuck cleanup operations") + + except Exception as e: + logger.error(f"Error cleaning up stuck operations: {str(e)}") + + +def cleanup_stuck_scans(db): + """Clean up ALL active scans from previous runs - they can't still be running after restart.""" + try: + from models import ScanState + stuck_scans = ScanState.query.filter( + ScanState.is_active == True + ).all() + + for scan in stuck_scans: + logger.warning(f"Found active scan {scan.id} from {scan.start_time}, marking as crashed (app restarted)") + scan.is_active = False + scan.phase = 'crashed' + scan.error_message = "Application restarted - scan was interrupted" + + if stuck_scans: + db.session.commit() + logger.info(f"Cleaned up {len(stuck_scans)} abandoned scans from previous run") + except Exception as e: + logger.warning(f"Could not clean up stuck scans on startup: {e}") + + +def cleanup_bloated_scan_results(db): + """Clean up bloated scan results from pre-v2.4.213. + + Files with large scan_output or warning_details (>50KB) stored thousands + of lines in the old format. Delete them so they get rescanned with the + efficient storage format. + """ + try: + from models import ScanResult + bloated_results = db.session.query(ScanResult).filter( + db.or_( + db.func.length(ScanResult.scan_output) > 50000, + db.func.length(ScanResult.warning_details) > 50000 + ) + ).all() + + if bloated_results: + logger.info(f"Found {len(bloated_results)} scan results with bloated output fields (pre-v2.4.213 format)") + logger.info("Deleting bloated records to trigger efficient rescan with v2.4.213+ format") + + for result in bloated_results: + db.session.delete(result) + + db.session.commit() + logger.info(f"Deleted {len(bloated_results)} bloated scan results - they will be rescanned with efficient storage") + else: + logger.debug("No bloated scan results found - database is clean") + except Exception as e: + logger.warning(f"Could not clean up bloated scan results on startup: {e}") diff --git a/pixelprobe/utils/__init__.py b/pixelprobe/utils/__init__.py index 79e02c1..e6a8d90 100644 --- a/pixelprobe/utils/__init__.py +++ b/pixelprobe/utils/__init__.py @@ -3,13 +3,13 @@ """ from .decorators import require_json, handle_errors -from .validators import validate_file_path, validate_scan_config +from .validators import validate_file_path_format, validate_scan_config from .helpers import format_file_size, is_media_file __all__ = [ 'require_json', 'handle_errors', - 'validate_file_path', + 'validate_file_path_format', 'validate_scan_config', 'format_file_size', 'is_media_file' diff --git a/pixelprobe/utils/celery_utils.py b/pixelprobe/utils/celery_utils.py new file mode 100644 index 0000000..2ed67c2 --- /dev/null +++ b/pixelprobe/utils/celery_utils.py @@ -0,0 +1,25 @@ +"""Celery utility functions for PixelProbe""" + +import logging +from flask import current_app + +logger = logging.getLogger(__name__) + + +def check_celery_available(): + """Check if Celery is available and broker is reachable. + + Returns True if Celery is configured, has a broker URL, and the broker + responds to a ping. Returns False otherwise, allowing callers to + fall back to direct (non-Celery) execution. + """ + celery_enabled = current_app.config.get('CELERY_BROKER_URL') and hasattr(current_app, 'celery') + + if celery_enabled: + try: + current_app.celery.control.ping(timeout=1.0) + except Exception as e: + logger.warning(f"Celery broker connection failed: {e}. Falling back to direct execution.") + celery_enabled = False + + return celery_enabled diff --git a/pixelprobe/utils/helpers.py b/pixelprobe/utils/helpers.py index a8336b0..e3442c1 100644 --- a/pixelprobe/utils/helpers.py +++ b/pixelprobe/utils/helpers.py @@ -4,6 +4,7 @@ import os import logging +from pixelprobe.constants import VIDEO_EXTENSIONS, IMAGE_EXTENSIONS, AUDIO_EXTENSIONS logger = logging.getLogger(__name__) @@ -17,31 +18,23 @@ def format_file_size(size_bytes): def is_media_file(file_path): """Check if file is a supported media file""" - # Video extensions - video_extensions = { - '.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.mpg', - '.mpeg', '.3gp', '.3g2', '.mxf', '.roq', '.nsv', '.f4v', '.f4p', '.f4a', - '.f4b', '.mts', '.m2ts', '.ts', '.vob', '.ogv', '.drc', '.gif', '.gifv', - '.mng', '.qt', '.yuv', '.rm', '.rmvb', '.asf', '.amv', '.m2v', '.svi', - '.mpc', '.mpv', '.mpe', '.m1v', '.m2p', '.m2t', '.m2ts', '.mts', '.mt2s', - '.rec', '.divx', '.xvid', '.evo', '.fli', '.flc', '.tod', '.avchd' - } - - # Image extensions - image_extensions = { - '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp', '.ico', - '.heic', '.heif', '.raw', '.arw', '.cr2', '.cr3', '.nef', '.nrw', '.orf', - '.rw2', '.pef', '.srw', '.x3f', '.erf', '.kdc', '.dcs', '.rw1', '.raw', - '.dng', '.raf', '.dcr', '.ptx', '.pxn', '.bay', '.crw', '.3fr', '.sr2', - '.srf', '.mef', '.mos', '.gpr', '.mrw', '.mdc', '.rwl', '.iiq', '.cap' - } - - # Audio extensions - audio_extensions = { - '.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma', '.m4a', '.opus', '.ape', - '.ac3', '.dts', '.alac', '.aiff', '.au', '.m4b', '.m4p', '.m4r', '.mka', - '.mpa', '.mpc', '.mp2', '.ra', '.tta', '.voc', '.vox', '.wv', '.8svx' - } - ext = os.path.splitext(file_path)[1].lower() - return ext in video_extensions or ext in image_extensions or ext in audio_extensions \ No newline at end of file + return ext in VIDEO_EXTENSIONS or ext in IMAGE_EXTENSIONS or ext in AUDIO_EXTENSIONS + + +def get_configured_scan_paths(): + """Read scan paths from DB (ScanConfiguration), falling back to SCAN_PATHS env var. + + Returns a list of path strings. May be empty if nothing is configured. + """ + try: + from models import ScanConfiguration + configs = ScanConfiguration.query.filter_by(is_active=True).all() + paths = [config.path for config in configs if config.path] + if paths: + return paths + except Exception: + pass + + scan_paths_env = os.environ.get('SCAN_PATHS', '') + return [p.strip() for p in scan_paths_env.split(',') if p.strip()] diff --git a/pixelprobe/utils/rate_limiting.py b/pixelprobe/utils/rate_limiting.py index 4b150de..7d2655f 100644 --- a/pixelprobe/utils/rate_limiting.py +++ b/pixelprobe/utils/rate_limiting.py @@ -1,28 +1,43 @@ """Rate limiting configuration for the application""" -from flask import Flask +from flask import Flask, current_app from flask_limiter import Limiter from functools import wraps import logging logger = logging.getLogger(__name__) + def rate_limit(limit_string): - """ - Decorator to mark endpoints for rate limiting - + """Decorator to apply rate limits using the app's limiter. + Args: limit_string: Rate limit string (e.g., "10 per minute", "100 per hour") """ def decorator(f): - # Mark the function with rate limit metadata - f._rate_limit = limit_string - return f + @wraps(f) + def wrapped(*args, **kwargs): + limiter = current_app.extensions.get('flask-limiter') + if limiter: + limited_func = limiter.limit(limit_string, exempt_when=lambda: False)(f) + return limited_func(*args, **kwargs) + else: + return f(*args, **kwargs) + return wrapped return decorator + def exempt_from_rate_limit(f): - """Decorator to exempt an endpoint from rate limiting""" - f._rate_limit_exempt = True - return f + """Decorator to exempt a function from rate limiting""" + @wraps(f) + def wrapped(*args, **kwargs): + limiter = current_app.extensions.get('flask-limiter') + if limiter: + exempt_func = limiter.exempt(f) + return exempt_func(*args, **kwargs) + else: + return f(*args, **kwargs) + return wrapped + def apply_rate_limits(app: Flask, limiter: Limiter): """Apply rate limits to blueprint endpoints after registration""" @@ -64,4 +79,4 @@ def apply_rate_limits(app: Flask, limiter: Limiter): logger.info("Rate limits applied successfully") except Exception as e: logger.error(f"Error applying rate limits: {e}") - # Don't fail the app startup if rate limiting setup fails \ No newline at end of file + # Don't fail the app startup if rate limiting setup fails diff --git a/pixelprobe/utils/security.py b/pixelprobe/utils/security.py index c4a0c1a..7ea7bea 100644 --- a/pixelprobe/utils/security.py +++ b/pixelprobe/utils/security.py @@ -5,7 +5,7 @@ import re import logging from functools import wraps -from datetime import datetime +from datetime import datetime, timezone from flask import request, jsonify, current_app from werkzeug.security import safe_join from models import db, ScanConfiguration @@ -237,7 +237,7 @@ def log_action(action, details=None, user=None, ip_address=None): ip_address = request.remote_addr log_entry = { - 'timestamp': datetime.utcnow().isoformat(), + 'timestamp': datetime.now(timezone.utc).isoformat(), 'action': action, 'user': user or 'anonymous', 'ip_address': ip_address, @@ -261,7 +261,7 @@ def log_security_event(event_type, message, severity='warning'): severity: Severity level (info, warning, error, critical) """ log_entry = { - 'timestamp': datetime.utcnow().isoformat(), + 'timestamp': datetime.now(timezone.utc).isoformat(), 'event_type': event_type, 'message': message, 'severity': severity, diff --git a/pixelprobe/utils/validators.py b/pixelprobe/utils/validators.py index 2cef260..a88bfe7 100644 --- a/pixelprobe/utils/validators.py +++ b/pixelprobe/utils/validators.py @@ -5,8 +5,8 @@ import os import re -def validate_file_path(file_path): - """Validate a file path""" +def validate_file_path_format(file_path): + """Validate a file path format (simple format check, no path traversal prevention)""" if not file_path: return False, "File path is required" diff --git a/scheduler.py b/scheduler.py index f2894fa..ff49145 100644 --- a/scheduler.py +++ b/scheduler.py @@ -7,6 +7,7 @@ from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from models import db, ScanSchedule, ScanResult, ScanState, HealthcheckConfig, ScanReport +from pixelprobe.constants import TERMINAL_SCAN_PHASES from sqlalchemy import text import threading import requests @@ -26,6 +27,48 @@ def __init__(self, app=None): # Load exclusions from environment self._load_exclusions() + def _filter_excluded_paths(self, scan_paths): + """Filter out excluded paths and return the remaining ones.""" + return [ + p.strip() for p in scan_paths + if not any(p.strip().startswith(exc) for exc in self.excluded_paths) + ] + + def _execute_scan_request(self, endpoint, payload, scan_label, timeout=30): + """Execute an HTTP scan request against the local API. + + Args: + endpoint: API path (e.g. '/api/scan') + payload: JSON payload dict + scan_label: Human-readable label for logging (e.g. 'Periodic scan') + timeout: Request timeout in seconds + + Returns: + The requests.Response object, or None on connection error. + """ + base_url = self._get_api_base_url() + headers = { + 'X-Internal-Request': 'scheduler', + 'Content-Type': 'application/json' + } + try: + response = requests.post( + f'{base_url}{endpoint}', + json=payload, + headers=headers, + timeout=timeout + ) + if response.status_code == 200: + logger.info(f"{scan_label} started successfully") + elif response.status_code == 409: + logger.warning(f"{scan_label} skipped - another scan is already running") + else: + logger.error(f"{scan_label} API call failed: {response.status_code} - {response.text}") + return response + except requests.exceptions.RequestException as e: + logger.error(f"Failed to call API for {scan_label}: {e}") + return None + def _get_api_base_url(self): """ Get the base URL for API calls. @@ -261,64 +304,31 @@ def _run_periodic_scan(self): with self.app.app_context(): # Check if ANY scan is already running before proceeding scan_state = ScanState.get_or_create() - if scan_state.is_active and scan_state.phase not in ['idle', 'completed', 'error', 'crashed', 'cancelled']: + if scan_state.is_active and scan_state.phase not in TERMINAL_SCAN_PHASES: logger.warning(f"Periodic scan skipped - another scan is already running (phase: {scan_state.phase})") return - # Read SCAN_PATHS from database (synced from env on main app startup) - from models import ScanConfiguration - scan_configs = ScanConfiguration.query.filter_by(is_active=True).all() - scan_paths = [config.path for config in scan_configs if config.path] - - if not scan_paths: - # Fallback to environment variable if DB is empty - scan_paths_env = os.environ.get('SCAN_PATHS', '') - scan_paths = [p.strip() for p in scan_paths_env.split(',') if p.strip()] + from pixelprobe.utils.helpers import get_configured_scan_paths + scan_paths = get_configured_scan_paths() logger.info(f"Starting periodic scan of paths: {scan_paths}") - - # Filter out excluded paths - filtered_paths = [] - for path in scan_paths: - path = path.strip() - if not any(path.startswith(exc) for exc in self.excluded_paths): - filtered_paths.append(path) - + + filtered_paths = self._filter_excluded_paths(scan_paths) if not filtered_paths: logger.warning("No paths to scan after exclusions") return - - # Use HTTP self-call to trigger scan - base_url = self._get_api_base_url() - - # Add internal request header - headers = { - 'X-Internal-Request': 'scheduler', - 'Content-Type': 'application/json' - } - - try: - # Run scan with deep check to detect changes - payload = { + + self._execute_scan_request( + '/api/scan', + { 'scan_type': 'full', 'directories': filtered_paths, 'force_rescan': False, 'source': 'scheduled_periodic' - } - response = requests.post(f'{base_url}/api/scan', - json=payload, - headers=headers, - timeout=30) - - if response.status_code == 200: - logger.info("Periodic scan started successfully") - elif response.status_code == 409: - logger.warning("Periodic scan skipped - another scan is already running") - else: - logger.error(f"Periodic scan API call failed: {response.status_code} - {response.text}") - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to call API for periodic scan: {e}") + }, + 'Periodic scan', + timeout=30 + ) except Exception as e: logger.error(f"Failed to run periodic scan: {e}") @@ -335,7 +345,7 @@ def _run_scheduled_scan(self, schedule_id: int): with self.app.app_context(): # Check if ANY scan is already running before proceeding scan_state = ScanState.get_or_create() - if scan_state.is_active and scan_state.phase not in ['idle', 'completed', 'error', 'crashed', 'cancelled']: + if scan_state.is_active and scan_state.phase not in TERMINAL_SCAN_PHASES: logger.warning(f"Scheduled scan {schedule_id} skipped - another scan is already running (phase: {scan_state.phase})") return @@ -378,16 +388,8 @@ def _run_scheduled_scan(self, schedule_id: int): db.session.commit() - # Read SCAN_PATHS from database (synced from env on main app startup) - # This allows celery-worker to access paths without needing env var - from models import ScanConfiguration - scan_configs = ScanConfiguration.query.filter_by(is_active=True).all() - scan_paths = [config.path for config in scan_configs if config.path] - - if not scan_paths: - # Fallback to environment variable if DB is empty (for backwards compat) - scan_paths_env = os.environ.get('SCAN_PATHS', '') - scan_paths = [p.strip() for p in scan_paths_env.split(',') if p.strip()] + from pixelprobe.utils.helpers import get_configured_scan_paths + scan_paths = get_configured_scan_paths() if not scan_paths: logger.error(f"Scheduled scan {schedule_id}: No scan paths configured in database or SCAN_PATHS env var!") @@ -398,67 +400,37 @@ def _run_scheduled_scan(self, schedule_id: int): logger.info(f"Running scheduled scan '{cached_schedule_name}' (type: {scan_type}) on paths: {scan_paths}") - # Filter out excluded paths - filtered_paths = [] - for path in scan_paths: - path = path.strip() - if not any(path.startswith(exc) for exc in self.excluded_paths): - filtered_paths.append(path) - - # Validate we have paths to scan after filtering + filtered_paths = self._filter_excluded_paths(scan_paths) if not filtered_paths: logger.error(f"Scheduled scan {schedule_id} has no valid paths after filtering. " f"SCAN_PATHS={scan_paths}, excluded_paths={self.excluded_paths}") return - if filtered_paths: - # Use HTTP self-calls to trigger scans, avoiding Flask context issues - base_url = self._get_api_base_url() - - # Add internal request header for identification - headers = { - 'X-Internal-Request': 'scheduler', - 'Content-Type': 'application/json' - } - - try: - if scan_type == 'orphan': - # Run orphan cleanup with longer timeout since it can take time - response = requests.post(f'{base_url}/api/cleanup-orphaned', - json={'schedule_id': schedule_id}, - headers=headers, - timeout=60) - elif scan_type == 'file_changes': - # Run file changes scan with longer timeout - response = requests.post(f'{base_url}/api/file-changes', - json={'schedule_id': schedule_id}, - headers=headers, - timeout=60) - else: - # Default to normal scan with proper payload - # Use timestamp in scan_id to make each scheduled run unique - # This prevents false "already completed" detection between runs - timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') - payload = { - 'scan_type': 'full', - 'directories': filtered_paths, - 'force_rescan': schedule.force_rescan if hasattr(schedule, 'force_rescan') else False, - 'source': f'scheduled_{schedule_id}_{timestamp}' - } - response = requests.post(f'{base_url}/api/scan', - json=payload, - headers=headers, - timeout=60) - - if response.status_code == 200: - logger.info(f"Scheduled scan {schedule_id} started successfully") - elif response.status_code == 409: - logger.warning(f"Scheduled scan {schedule_id} skipped - another scan is already running") - else: - logger.error(f"Scheduled scan {schedule_id} API call failed: {response.status_code} - {response.text}") - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to call API for scheduled scan {schedule_id}: {e}") + scan_label = f"Scheduled scan {schedule_id}" + if scan_type == 'orphan': + self._execute_scan_request( + '/api/cleanup-orphaned', + {'schedule_id': schedule_id}, + scan_label, timeout=60 + ) + elif scan_type == 'file_changes': + self._execute_scan_request( + '/api/file-changes', + {'schedule_id': schedule_id}, + scan_label, timeout=60 + ) + else: + timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') + self._execute_scan_request( + '/api/scan', + { + 'scan_type': 'full', + 'directories': filtered_paths, + 'force_rescan': schedule.force_rescan if hasattr(schedule, 'force_rescan') else False, + 'source': f'scheduled_{schedule_id}_{timestamp}' + }, + scan_label, timeout=60 + ) except Exception as e: logger.error(f"Failed to run scheduled scan {schedule_id}: {e}") @@ -635,7 +607,7 @@ def _check_stuck_scans(self): # Find active scans stuck_scans = ScanState.query.filter( ScanState.is_active == True, - ScanState.phase.notin_(['idle', 'completed', 'error', 'crashed', 'cancelled']) + ScanState.phase.notin_(TERMINAL_SCAN_PHASES) ).all() scans_to_mark = [] diff --git a/tests/test_migration_lock.py b/tests/test_migration_lock.py index e1df6d8..4e10d06 100644 --- a/tests/test_migration_lock.py +++ b/tests/test_migration_lock.py @@ -1,10 +1,7 @@ """ -Tests for PostgreSQL advisory lock migration coordination in app.py +Tests for PostgreSQL advisory lock migration coordination. -NOTE: These tests avoid 'from app import ...' because importing the app module -triggers Celery/Redis initialization at module level, which poisons the Redis -connection state for subsequent schedule integration tests. Instead, we read -the constant from source and test migrate_database via its module reference. +The migration logic now lives in pixelprobe.migrations.startup. """ import re @@ -14,37 +11,36 @@ from pathlib import Path -def _get_app_module(): - """Get the app module if already imported (by conftest/fixtures), else skip.""" - mod = sys.modules.get('app') - if mod is None: - pytest.skip("app module not imported (test isolation)") - return mod +def _get_migrations_module(): + """Import the migrations startup module directly (safe, no Celery side effects).""" + from pixelprobe.migrations import startup + return startup def test_advisory_lock_id_is_stable(): """MIGRATION_ADVISORY_LOCK_ID must never change -- other running containers depend on the same value to coordinate.""" - app_source = Path(__file__).parent.parent / 'app.py' - content = app_source.read_text() + source = Path(__file__).parent.parent / 'pixelprobe' / 'migrations' / 'startup.py' + content = source.read_text() match = re.search(r'MIGRATION_ADVISORY_LOCK_ID\s*=\s*(\d+)', content) - assert match is not None, "MIGRATION_ADVISORY_LOCK_ID not found in app.py" + assert match is not None, "MIGRATION_ADVISORY_LOCK_ID not found in pixelprobe/migrations/startup.py" assert int(match.group(1)) == 7283945162 def test_migrate_database_falls_back_on_advisory_lock_failure(app): """When advisory lock acquisition fails (e.g., SQLite test DB), migrations still run via the fallback path.""" - app_mod = _get_app_module() + mig_mod = _get_migrations_module() with app.app_context(): - with patch.object(app_mod, '_run_all_migrations') as mock_migrations: - app_mod.migrate_database() + with patch.object(mig_mod, '_run_all_migrations') as mock_migrations: + from models import db + mig_mod.migrate_database(db) mock_migrations.assert_called_once() def test_migrate_database_releases_lock_on_success(app): """Advisory lock is released after migrations complete successfully.""" - app_mod = _get_app_module() + mig_mod = _get_migrations_module() with app.app_context(): mock_conn = MagicMock() mock_scalar = MagicMock(return_value=True) @@ -55,11 +51,11 @@ def test_migrate_database_releases_lock_on_success(app): mock_engine = MagicMock() mock_engine.connect.return_value = mock_conn - with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \ - patch.object(app_mod, 'db') as mock_db: - mock_db.engine = mock_engine + mock_db = MagicMock() + mock_db.engine = mock_engine - app_mod.migrate_database() + with patch.object(mig_mod, '_run_all_migrations') as mock_migrations: + mig_mod.migrate_database(mock_db) mock_migrations.assert_called_once() assert mock_conn.execute.call_count == 2 @@ -70,7 +66,7 @@ def test_migrate_database_releases_lock_on_success(app): def test_migrate_database_releases_lock_on_migration_failure(app): """Advisory lock is released even if migrations raise an exception.""" - app_mod = _get_app_module() + mig_mod = _get_migrations_module() with app.app_context(): mock_conn = MagicMock() mock_scalar = MagicMock(return_value=True) @@ -81,11 +77,11 @@ def test_migrate_database_releases_lock_on_migration_failure(app): mock_engine = MagicMock() mock_engine.connect.return_value = mock_conn - with patch.object(app_mod, '_run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations, \ - patch.object(app_mod, 'db') as mock_db: - mock_db.engine = mock_engine + mock_db = MagicMock() + mock_db.engine = mock_engine - app_mod.migrate_database() + with patch.object(mig_mod, '_run_all_migrations', side_effect=RuntimeError("migration boom")) as mock_migrations: + mig_mod.migrate_database(mock_db) mock_migrations.assert_called_once() assert mock_conn.execute.call_count == 2 @@ -96,7 +92,7 @@ def test_migrate_database_releases_lock_on_migration_failure(app): def test_migrate_database_waiter_path(app): """When another process holds the lock, we wait then skip migrations.""" - app_mod = _get_app_module() + mig_mod = _get_migrations_module() with app.app_context(): mock_conn = MagicMock() @@ -113,11 +109,11 @@ def mock_execute(stmt, params=None): mock_engine = MagicMock() mock_engine.connect.return_value = mock_conn - with patch.object(app_mod, '_run_all_migrations') as mock_migrations, \ - patch.object(app_mod, 'db') as mock_db: - mock_db.engine = mock_engine + mock_db = MagicMock() + mock_db.engine = mock_engine - app_mod.migrate_database() + with patch.object(mig_mod, '_run_all_migrations') as mock_migrations: + mig_mod.migrate_database(mock_db) mock_migrations.assert_not_called() assert call_count[0] == 3 diff --git a/tests/unit/test_stats_service.py b/tests/unit/test_stats_service.py index 91d71d9..6af9869 100644 --- a/tests/unit/test_stats_service.py +++ b/tests/unit/test_stats_service.py @@ -119,14 +119,13 @@ def test_get_corruption_statistics(self, mock_db, stats_service): assert stats['audio/mp3']['corrupted'] == 0 assert stats['audio/mp3']['corruption_rate'] == 0.0 + @patch('pixelprobe.utils.helpers.get_configured_scan_paths', return_value=['/movies', '/tv', '/originals']) @patch('pixelprobe.services.stats_service.os') @patch('pixelprobe.services.stats_service.db') - def test_get_monitored_paths(self, mock_db, mock_os, stats_service): + def test_get_monitored_paths(self, mock_db, mock_os, mock_get_paths, stats_service): """Test monitored paths retrieval""" - # Mock environment - mock_os.environ.get.return_value = '/movies,/tv,/originals' mock_os.path.exists.side_effect = [True, True, False] # originals doesn't exist - + # Mock database results mock_results = [ ('/movies', 50), @@ -134,14 +133,14 @@ def test_get_monitored_paths(self, mock_db, mock_os, stats_service): ('other', 10) ] mock_db.session.execute.return_value.fetchall.return_value = mock_results - + paths = stats_service._get_monitored_paths() - + assert len(paths) == 3 assert paths[0]['path'] == '/movies' assert paths[0]['exists'] == True assert paths[0]['file_count'] == 50 - + assert paths[2]['path'] == '/originals' assert paths[2]['exists'] == False assert paths[2]['file_count'] == 0 diff --git a/version.py b/version.py index 84e6947..602fb05 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,7 @@ # Default version - this is the single source of truth -_DEFAULT_VERSION = '2.5.62' +_DEFAULT_VERSION = '2.5.63' # Allow override via environment variable for CI/CD, but default to the hardcoded version