From 7b995056dc85d798dd00c4913b1387d85041e150 Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Sun, 8 Mar 2026 23:20:27 -0700 Subject: [PATCH 1/8] unblocked spotify request concurrency --- Backend/spotify_api.py | 206 +++++++++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 80 deletions(-) diff --git a/Backend/spotify_api.py b/Backend/spotify_api.py index 028f27a..a4e4990 100644 --- a/Backend/spotify_api.py +++ b/Backend/spotify_api.py @@ -8,29 +8,59 @@ from urllib.parse import urlparse import requests +from requests.adapters import HTTPAdapter SPOTIFY_API_URL = "https://api.spotify.com/v1" RECCOBEATS_API_URL = "https://api.reccobeats.com/v1" SPOTIFY_TRACK_ID_PATTERN = re.compile(r"^[A-Za-z0-9]{22}$") SPOTIFY_TRACK_PATH_PATTERN = re.compile(r"/track/([A-Za-z0-9]{22})") REQUEST_TIMEOUT_SECONDS = 15 -SPOTIFY_REQUEST_INTERVAL_SECONDS = float(os.getenv("SPOTIFY_REQUEST_INTERVAL_SECONDS", "0.25")) -SPOTIFY_MAX_RETRY_ATTEMPTS = int(os.getenv("SPOTIFY_MAX_RETRY_ATTEMPTS", "3")) -SPOTIFY_RETRY_BASE_SECONDS = float(os.getenv("SPOTIFY_RETRY_BASE_SECONDS", "0.75")) SPOTIFY_RETRYABLE_STATUS_CODES = {500, 502, 503, 504} + + +def _env_non_negative_float(name: str, default_value: float) -> float: + """Parse a non-negative float env setting with fallback.""" + raw_value = os.getenv(name, str(default_value)) + try: + parsed = float(raw_value) + except ValueError: + return default_value + return max(0.0, parsed) + + +def _env_positive_int(name: str, default_value: int) -> int: + """Parse a positive integer env setting with fallback.""" + raw_value = os.getenv(name, str(default_value)) + try: + parsed = int(raw_value) + except ValueError: + return default_value + return max(1, parsed) + + +SPOTIFY_REQUEST_INTERVAL_SECONDS = _env_non_negative_float( + "SPOTIFY_REQUEST_INTERVAL_SECONDS", 0.05 +) +SPOTIFY_MAX_RETRY_ATTEMPTS = _env_positive_int("SPOTIFY_MAX_RETRY_ATTEMPTS", 3) +SPOTIFY_RETRY_BASE_SECONDS = _env_non_negative_float("SPOTIFY_RETRY_BASE_SECONDS", 0.75) +SPOTIFY_MAX_INFLIGHT_REQUESTS = _env_positive_int("SPOTIFY_MAX_INFLIGHT_REQUESTS", 12) +SPOTIFY_POOL_CONNECTIONS = _env_positive_int("SPOTIFY_POOL_CONNECTIONS", 32) +SPOTIFY_POOL_MAXSIZE = _env_positive_int("SPOTIFY_POOL_MAXSIZE", 64) _SPOTIFY_RATE_LIMIT_LOCK = threading.Lock() _SPOTIFY_NEXT_ALLOWED_TS = 0.0 +_SPOTIFY_INFLIGHT_SEMAPHORE = threading.BoundedSemaphore(SPOTIFY_MAX_INFLIGHT_REQUESTS) +_SPOTIFY_SESSION = None +_SPOTIFY_SESSION_LOCK = threading.Lock() -def _wait_for_spotify_slot(): - """Global per-process pacing for Spotify API requests.""" +def _reserve_spotify_slot_delay() -> float: + """Reserve the next Spotify request slot and return required sleep time.""" global _SPOTIFY_NEXT_ALLOWED_TS # pylint: disable=global-statement with _SPOTIFY_RATE_LIMIT_LOCK: now = time.time() - if now < _SPOTIFY_NEXT_ALLOWED_TS: - time.sleep(_SPOTIFY_NEXT_ALLOWED_TS - now) - now = time.time() - _SPOTIFY_NEXT_ALLOWED_TS = now + SPOTIFY_REQUEST_INTERVAL_SECONDS + reserved_ts = max(now, _SPOTIFY_NEXT_ALLOWED_TS) + _SPOTIFY_NEXT_ALLOWED_TS = reserved_ts + SPOTIFY_REQUEST_INTERVAL_SECONDS + return max(0.0, reserved_ts - now) def _apply_spotify_retry_after(retry_after_seconds: int): @@ -42,6 +72,37 @@ def _apply_spotify_retry_after(retry_after_seconds: int): ) +def _parse_retry_after_seconds(value) -> int: + """Parse Retry-After seconds safely with a minimum of one second.""" + try: + parsed = int(float(value)) + except (TypeError, ValueError): + return 1 + return max(1, parsed) + + +def _get_spotify_session() -> requests.Session: + """Return process-wide pooled HTTP session for Spotify requests.""" + global _SPOTIFY_SESSION # pylint: disable=global-statement + if _SPOTIFY_SESSION is not None: + return _SPOTIFY_SESSION + + with _SPOTIFY_SESSION_LOCK: + if _SPOTIFY_SESSION is None: + session = requests.Session() + adapter = HTTPAdapter( + pool_connections=SPOTIFY_POOL_CONNECTIONS, + pool_maxsize=SPOTIFY_POOL_MAXSIZE, + pool_block=True, + ) + session.mount("https://", adapter) + session.mount("http://", adapter) + _SPOTIFY_SESSION = session + + assert _SPOTIFY_SESSION is not None + return _SPOTIFY_SESSION + + def get_spotify_redirect_uri() -> str: """Return OAuth callback URI based on env defaults and overrides.""" app_base_url = os.getenv("APP_BASE_URL", "http://127.0.0.1:8080").rstrip("/") @@ -63,82 +124,67 @@ def spotify_request( "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json", } + session = _get_spotify_session() + + current_retry_attempt = retry_attempt + + while True: + slot_delay_seconds = _reserve_spotify_slot_delay() + if slot_delay_seconds > 0: + time.sleep(slot_delay_seconds) + + try: + with _SPOTIFY_INFLIGHT_SEMAPHORE: + response = session.request( + method, + url, + headers=headers, + params=params, + data=data, + json=json_data, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + except requests.RequestException as error: + if current_retry_attempt < SPOTIFY_MAX_RETRY_ATTEMPTS: + sleep_seconds = SPOTIFY_RETRY_BASE_SECONDS * (2 ** current_retry_attempt) + print( + "Spotify request network error. Retrying:", + f"attempt={current_retry_attempt + 1},", + f"sleep={sleep_seconds:.2f}s,", + f"error={error}", + ) + current_retry_attempt += 1 + time.sleep(sleep_seconds) + continue + print(f"Spotify request network error (final): {error}") + return {} + + if response.status_code == 429: # Rate limited + retry_after = _parse_retry_after_seconds(response.headers.get("Retry-After")) + print(f"Rate limited. Retrying after {retry_after} seconds.") + _apply_spotify_retry_after(retry_after) + time.sleep(retry_after) + continue - _wait_for_spotify_slot() - - try: - response = requests.request( - method, - url, - headers=headers, - params=params, - data=data, - json=json_data, - timeout=REQUEST_TIMEOUT_SECONDS, - ) - except requests.RequestException as error: - if retry_attempt < SPOTIFY_MAX_RETRY_ATTEMPTS: - sleep_seconds = SPOTIFY_RETRY_BASE_SECONDS * (2 ** retry_attempt) + if ( + response.status_code in SPOTIFY_RETRYABLE_STATUS_CODES + and current_retry_attempt < SPOTIFY_MAX_RETRY_ATTEMPTS + ): + sleep_seconds = SPOTIFY_RETRY_BASE_SECONDS * (2 ** current_retry_attempt) print( - "Spotify request network error. Retrying:", - f"attempt={retry_attempt + 1},", - f"sleep={sleep_seconds:.2f}s,", - f"error={error}", + "Spotify retryable error. Retrying:", + f"status={response.status_code},", + f"attempt={current_retry_attempt + 1},", + f"sleep={sleep_seconds:.2f}s", ) + current_retry_attempt += 1 time.sleep(sleep_seconds) - return spotify_request( - method, - endpoint, - auth_token, - params, - data, - json_data, - retry_attempt + 1, - ) - print(f"Spotify request network error (final): {error}") - return {} - - if response.status_code == 429: # Rate limited - retry_after = int(response.headers.get("Retry-After", 1)) - print(f"Rate limited. Retrying after {retry_after} seconds.") - _apply_spotify_retry_after(retry_after) - time.sleep(retry_after) - return spotify_request( - method, - endpoint, - auth_token, - params, - data, - json_data, - retry_attempt, - ) - - if ( - response.status_code in SPOTIFY_RETRYABLE_STATUS_CODES - and retry_attempt < SPOTIFY_MAX_RETRY_ATTEMPTS - ): - sleep_seconds = SPOTIFY_RETRY_BASE_SECONDS * (2 ** retry_attempt) - print( - "Spotify retryable error. Retrying:", - f"status={response.status_code},", - f"attempt={retry_attempt + 1},", - f"sleep={sleep_seconds:.2f}s", - ) - time.sleep(sleep_seconds) - return spotify_request( - method, - endpoint, - auth_token, - params, - data, - json_data, - retry_attempt + 1, - ) + continue - if response.status_code >= 400: - print(f"Spotify API request error: {response.status_code}, {response.text}") - return {} - return response.json() + if response.status_code >= 400: + print(f"Spotify API request error: {response.status_code}, {response.text}") + return {} + return response.json() def reccobeats_request(method, endpoint, params=None): From 6242687fc284919d3c285bdd48e22a514f819f08 Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Sun, 8 Mar 2026 23:30:21 -0700 Subject: [PATCH 2/8] pool Spotify API requests and avoid extra playlist-length call --- Backend/playlist_processing.py | 21 ++++++++++++++------- Backend/spotify_api.py | 7 +++++-- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/Backend/playlist_processing.py b/Backend/playlist_processing.py index cec73c6..9c35c57 100644 --- a/Backend/playlist_processing.py +++ b/Backend/playlist_processing.py @@ -6,7 +6,6 @@ from collections import defaultdict try: from Backend.spotify_api import ( - get_playlist_length, get_playlist_children, create_playlist, add_songs, @@ -25,7 +24,6 @@ from Backend.track_utils import dedupe_track_ids, is_valid_spotify_track_id except ModuleNotFoundError: from spotify_api import ( # type: ignore - get_playlist_length, get_playlist_children, create_playlist, add_songs, @@ -68,17 +66,26 @@ def log_step_time(step_name, start_time): async def get_playlist_track_ids(auth_token, playlist_id): """Return all track IDs from the playlist.""" start_time = time.time() - slices = calc_slices(get_playlist_length(playlist_id, auth_token)) track_ids = [] - - offsets = list(range(0, slices * 100, 100)) + first_page_response = await get_playlist_children( + 0, playlist_id, auth_token, include_total=True + ) + total_tracks = ( + first_page_response.get("total", 0) + if isinstance(first_page_response, dict) + else 0 + ) + slices = calc_slices(total_tracks) + responses = [first_page_response] if first_page_response else [] + offsets = list(range(100, slices * 100, 100)) semaphore = asyncio.Semaphore(FETCH_PAGE_CONCURRENCY) async def fetch_page(offset): async with semaphore: return await get_playlist_children(offset, playlist_id, auth_token) - responses = await asyncio.gather(*(fetch_page(offset) for offset in offsets)) + if offsets: + responses.extend(await asyncio.gather(*(fetch_page(offset) for offset in offsets))) for response in responses: if not response or "items" not in response: continue @@ -97,7 +104,7 @@ async def fetch_page(offset): print( "Playlist track fetch stats:", f"playlist_id={playlist_id},", - f"pages={len(offsets)},", + f"pages={len(responses)},", f"tracks={len(unique_track_ids)},", f"duration={time.time() - start_time:.2f}s", ) diff --git a/Backend/spotify_api.py b/Backend/spotify_api.py index a4e4990..9c363c1 100644 --- a/Backend/spotify_api.py +++ b/Backend/spotify_api.py @@ -309,13 +309,16 @@ def get_playlist_name(playlist_id, auth_token): return "" -async def get_playlist_children(start_index, playlist_id, auth_token): +async def get_playlist_children(start_index, playlist_id, auth_token, include_total=False): """Return one page of playlist tracks using offset pagination.""" + fields = "items(track(id,uri))" + if include_total: + fields = "total,items(track(id,uri))" endpoint = f"/playlists/{playlist_id}/tracks" params = { "offset": start_index, "limit": 100, - "fields": "items(track(id,uri))", + "fields": fields, } response = await asyncio.to_thread( spotify_request, "GET", endpoint, auth_token, params, None, None From c4dd0224fe8d83db6ed30019842775ff2f3a099c Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Sun, 8 Mar 2026 23:35:38 -0700 Subject: [PATCH 3/8] offload clustering to thread and trim large-playlist GMM search --- Backend/grouping.py | 4 ++-- Backend/playlist_processing.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Backend/grouping.py b/Backend/grouping.py index 07de0c8..07d5016 100644 --- a/Backend/grouping.py +++ b/Backend/grouping.py @@ -63,9 +63,9 @@ def _env_positive_int(name: str, default_value: int) -> int: "CLUSTER_LARGE_PLAYLIST_THRESHOLD", 700 ) LARGE_PLAYLIST_MAX_K_CANDIDATES = _env_positive_int( - "CLUSTER_LARGE_MAX_K_CANDIDATES", 6 + "CLUSTER_LARGE_MAX_K_CANDIDATES", 4 ) -LARGE_PLAYLIST_GMM_N_INIT = _env_positive_int("CLUSTER_LARGE_GMM_N_INIT", 2) +LARGE_PLAYLIST_GMM_N_INIT = _env_positive_int("CLUSTER_LARGE_GMM_N_INIT", 1) REFINE_MAX_TRACKS = _env_positive_int("CLUSTER_REFINE_MAX_TRACKS", 600) diff --git a/Backend/playlist_processing.py b/Backend/playlist_processing.py index 9c35c57..300c573 100644 --- a/Backend/playlist_processing.py +++ b/Backend/playlist_processing.py @@ -277,7 +277,7 @@ async def process_single_playlist(auth_token, playlist_id, user_id): removed = len(track_ids) - len(feature_by_track_id) print(f"Dropped {removed} tracks without usable unique audio features.") cluster_start = time.time() - clustered_tracks = cluster_df(audio_features) + clustered_tracks = await asyncio.to_thread(cluster_df, audio_features) log_step_time(f"Cluster tracks ({playlist_id})", cluster_start) if clustered_tracks.empty: print(f"Failed to cluster tracks for playlist {playlist_id}") From 0cbe5ce4e2bf01b28bfe411606acf3b79f37d009 Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Mon, 9 Mar 2026 01:11:24 -0700 Subject: [PATCH 4/8] pipeline cluster playlist create/add is now async per cluster job --- Backend/playlist_processing.py | 111 +++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 39 deletions(-) diff --git a/Backend/playlist_processing.py b/Backend/playlist_processing.py index 300c573..962b647 100644 --- a/Backend/playlist_processing.py +++ b/Backend/playlist_processing.py @@ -44,12 +44,17 @@ def _env_positive_int(name: str, default_value: int) -> int: """Parse positive integer env values with safe fallback.""" - raw_value = os.getenv(name, str(default_value)) + raw_value = os.getenv(name) + if raw_value is None: + return default_value + candidate = raw_value.strip() + if not candidate: + return default_value try: - parsed = int(raw_value) + parsed = int(candidate) except ValueError: return default_value - return max(1, parsed) + return parsed if parsed > 0 else 1 FETCH_PAGE_CONCURRENCY = _env_positive_int("PLAYLIST_FETCH_PAGE_CONCURRENCY", 8) @@ -180,64 +185,89 @@ async def create_and_populate_cluster_playlists( print("No eligible clusters to create playlists for.") return - create_start = time.time() + pipeline_start = time.time() create_semaphore = asyncio.Semaphore(CREATE_PLAYLIST_CONCURRENCY) + add_semaphore = asyncio.Semaphore(ADD_SONGS_CONCURRENCY) + + async def process_cluster(candidate): + create_duration = 0.0 + add_duration = 0.0 + add_calls = 0 + add_calls_successful = 0 + tracks_added = 0 - async def create_cluster_playlist(candidate): async with create_semaphore: + create_call_start = time.time() created_playlist_id = await create_playlist( user_id, auth_token, candidate["playlist_title"], candidate["playlist_description"], ) - if not created_playlist_id: - return None - return (candidate, created_playlist_id) + create_duration += time.time() - create_call_start - created_entries = await asyncio.gather( - *(create_cluster_playlist(candidate) for candidate in cluster_candidates) - ) - created_entries = [entry for entry in created_entries if entry is not None] - create_duration = time.time() - create_start + if not created_playlist_id: + return { + "cluster_created": 0, + "create_calls": 1, + "create_time": create_duration, + "add_calls": add_calls, + "add_calls_successful": add_calls_successful, + "tracks_added": tracks_added, + "add_time": add_duration, + } - add_jobs = [] - total_tracks_to_add = 0 - for candidate, created_playlist_id in created_entries: track_ids = candidate["track_ids"] slices = calc_slices(len(track_ids)) for index in range(0, slices * 100, 100): track_slice = track_ids[index : index + 100] track_uris = [f"spotify:track:{track_id}" for track_id in track_slice] - total_tracks_to_add += len(track_uris) - add_jobs.append((created_playlist_id, track_uris)) + tracks_added += len(track_uris) + add_calls += 1 - add_duration = 0.0 - successful_add_calls = 0 - if add_jobs: - add_start = time.time() - add_semaphore = asyncio.Semaphore(ADD_SONGS_CONCURRENCY) - - async def add_song_batch(job): - playlist_id, track_uris = job async with add_semaphore: # Avoid "Index out of bounds" races by appending without explicit position. - return await add_songs(playlist_id, track_uris, auth_token) - - add_results = await asyncio.gather(*(add_song_batch(job) for job in add_jobs)) - successful_add_calls = sum(1 for result in add_results if result) - add_duration = time.time() - add_start + add_call_start = time.time() + add_result = await add_songs(created_playlist_id, track_uris, auth_token) + add_duration += time.time() - add_call_start + if add_result: + add_calls_successful += 1 + + return { + "cluster_created": 1, + "create_calls": 1, + "create_time": create_duration, + "add_calls": add_calls, + "add_calls_successful": add_calls_successful, + "tracks_added": tracks_added, + "add_time": add_duration, + } + + cluster_results = await asyncio.gather( + *(process_cluster(candidate) for candidate in cluster_candidates) + ) + clusters_created = sum(result["cluster_created"] for result in cluster_results) + create_calls = sum(result["create_calls"] for result in cluster_results) + create_duration = sum(result["create_time"] for result in cluster_results) + add_calls = sum(result["add_calls"] for result in cluster_results) + successful_add_calls = sum( + result["add_calls_successful"] for result in cluster_results + ) + total_tracks_to_add = sum(result["tracks_added"] for result in cluster_results) + add_duration = sum(result["add_time"] for result in cluster_results) + pipeline_duration = time.time() - pipeline_start print( "Playlist write stats:", f"clusters_considered={len(sorted_clusters)},", - f"clusters_created={len(created_entries)},", - f"create_calls={len(cluster_candidates)},", + f"clusters_created={clusters_created},", + f"create_calls={create_calls},", f"create_time={create_duration:.2f}s,", - f"add_calls={len(add_jobs)},", + f"add_calls={add_calls},", f"add_calls_successful={successful_add_calls},", f"tracks_added={total_tracks_to_add},", - f"add_time={add_duration:.2f}s", + f"add_time={add_duration:.2f}s,", + f"pipeline_time={pipeline_duration:.2f}s", ) log_step_time("Creating and populating cluster playlists", start_time) @@ -248,15 +278,18 @@ async def process_single_playlist(auth_token, playlist_id, user_id): start_time = time.time() print(f"Processing {playlist_id}...") playlist_name_start = time.time() - playlist_name = get_playlist_name(playlist_id, auth_token) - log_step_time( - f"Fetch source playlist name ({playlist_id})", - playlist_name_start, + playlist_name_task = asyncio.create_task( + asyncio.to_thread(get_playlist_name, playlist_id, auth_token) ) track_fetch_start = time.time() track_ids = await get_playlist_track_ids(auth_token, playlist_id) log_step_time(f"Collect playlist tracks ({playlist_id})", track_fetch_start) + playlist_name = await playlist_name_task + log_step_time( + f"Fetch source playlist name ({playlist_id})", + playlist_name_start, + ) if not track_ids: print(f"No tracks found for playlist {playlist_id}") return From 0f80c764251d0d2d0fea009d54a38b8a9e819712 Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Mon, 9 Mar 2026 19:52:00 -0700 Subject: [PATCH 5/8] adjust playlist fetch/write concurrency --- Backend/playlist_processing.py | 20 ++++++++++++++------ Backend/spotify_api.py | 4 ++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Backend/playlist_processing.py b/Backend/playlist_processing.py index 962b647..f3239cb 100644 --- a/Backend/playlist_processing.py +++ b/Backend/playlist_processing.py @@ -57,9 +57,9 @@ def _env_positive_int(name: str, default_value: int) -> int: return parsed if parsed > 0 else 1 -FETCH_PAGE_CONCURRENCY = _env_positive_int("PLAYLIST_FETCH_PAGE_CONCURRENCY", 8) -CREATE_PLAYLIST_CONCURRENCY = _env_positive_int("PLAYLIST_CREATE_CONCURRENCY", 4) -ADD_SONGS_CONCURRENCY = _env_positive_int("PLAYLIST_ADD_CONCURRENCY", 12) +FETCH_PAGE_CONCURRENCY = _env_positive_int("PLAYLIST_FETCH_PAGE_CONCURRENCY", 12) +CREATE_PLAYLIST_CONCURRENCY = _env_positive_int("PLAYLIST_CREATE_CONCURRENCY", 6) +ADD_SONGS_CONCURRENCY = _env_positive_int("PLAYLIST_ADD_CONCURRENCY", 16) def log_step_time(step_name, start_time): @@ -83,7 +83,8 @@ async def get_playlist_track_ids(auth_token, playlist_id): slices = calc_slices(total_tracks) responses = [first_page_response] if first_page_response else [] offsets = list(range(100, slices * 100, 100)) - semaphore = asyncio.Semaphore(FETCH_PAGE_CONCURRENCY) + fetch_concurrency = max(1, min(FETCH_PAGE_CONCURRENCY, len(offsets) or 1)) + semaphore = asyncio.Semaphore(fetch_concurrency) async def fetch_page(offset): async with semaphore: @@ -186,8 +187,13 @@ async def create_and_populate_cluster_playlists( return pipeline_start = time.time() - create_semaphore = asyncio.Semaphore(CREATE_PLAYLIST_CONCURRENCY) - add_semaphore = asyncio.Semaphore(ADD_SONGS_CONCURRENCY) + create_concurrency = max(1, min(CREATE_PLAYLIST_CONCURRENCY, len(cluster_candidates))) + create_semaphore = asyncio.Semaphore(create_concurrency) + total_add_jobs = sum( + calc_slices(len(candidate["track_ids"])) for candidate in cluster_candidates + ) + add_concurrency = max(1, min(ADD_SONGS_CONCURRENCY, total_add_jobs or 1)) + add_semaphore = asyncio.Semaphore(add_concurrency) async def process_cluster(candidate): create_duration = 0.0 @@ -261,6 +267,8 @@ async def process_cluster(candidate): "Playlist write stats:", f"clusters_considered={len(sorted_clusters)},", f"clusters_created={clusters_created},", + f"create_concurrency={create_concurrency},", + f"add_concurrency={add_concurrency},", f"create_calls={create_calls},", f"create_time={create_duration:.2f}s,", f"add_calls={add_calls},", diff --git a/Backend/spotify_api.py b/Backend/spotify_api.py index 9c363c1..4f89878 100644 --- a/Backend/spotify_api.py +++ b/Backend/spotify_api.py @@ -311,9 +311,9 @@ def get_playlist_name(playlist_id, auth_token): async def get_playlist_children(start_index, playlist_id, auth_token, include_total=False): """Return one page of playlist tracks using offset pagination.""" - fields = "items(track(id,uri))" + fields = "items(track(id))" if include_total: - fields = "total,items(track(id,uri))" + fields = "total,items(track(id))" endpoint = f"/playlists/{playlist_id}/tracks" params = { "offset": start_index, From fe71db3786baee8b916f9504eee64801b309c01d Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Mon, 9 Mar 2026 20:02:38 -0700 Subject: [PATCH 6/8] L1 cache added --- Backend/track_cache.py | 181 +++++++++++++++++++++++++++++++++++------ 1 file changed, 157 insertions(+), 24 deletions(-) diff --git a/Backend/track_cache.py b/Backend/track_cache.py index e8121e9..368cdb0 100644 --- a/Backend/track_cache.py +++ b/Backend/track_cache.py @@ -1,5 +1,6 @@ """Postgres-backed shared cache for track audio features and known misses.""" +from collections import OrderedDict import json import os import re @@ -43,6 +44,15 @@ def _parse_pool_size(env_name: str, default_value: int) -> int: return max(1, parsed) +def _parse_non_negative_int(env_name: str, default_value: int) -> int: + """Parse non-negative integer setting with fallback.""" + raw_value = os.getenv(env_name, str(default_value)).strip() + if not raw_value.lstrip("-").isdigit(): + return default_value + parsed = int(raw_value) + return max(0, parsed) + + CACHE_TTL_SECONDS = _parse_ttl_seconds( "TRACK_CACHE_TTL_SECONDS", 30 * 24 * 60 * 60 ) @@ -51,11 +61,18 @@ def _parse_pool_size(env_name: str, default_value: int) -> int: ) POOL_MIN_CONN = _parse_pool_size("TRACK_CACHE_DB_POOL_MIN", DEFAULT_POOL_MIN_CONN) POOL_MAX_CONN = _parse_pool_size("TRACK_CACHE_DB_POOL_MAX", DEFAULT_POOL_MAX_CONN) +L1_CACHE_MAX_ENTRIES = _parse_non_negative_int("TRACK_CACHE_L1_MAX_ENTRIES", 25000) +L1_FEATURE_TTL_SECONDS = _parse_non_negative_int("TRACK_CACHE_L1_FEATURE_TTL_SECONDS", 0) +L1_MISS_TTL_SECONDS = _parse_non_negative_int( + "TRACK_CACHE_L1_MISS_TTL_SECONDS", 24 * 60 * 60 +) _POOL = None _POOL_LOCK = threading.Lock() _SCHEMA_READY = False _SCHEMA_LOCK = threading.Lock() +_L1_CACHE: OrderedDict[str, tuple[bool, Any, float | None]] = OrderedDict() +_L1_CACHE_LOCK = threading.Lock() def _cache_enabled() -> bool: @@ -68,6 +85,95 @@ def _is_valid_track_id(track_id: str | None) -> bool: return bool(track_id) and bool(SPOTIFY_TRACK_ID_PATTERN.fullmatch(track_id)) +def _l1_cache_enabled() -> bool: + """Return whether process-local L1 cache is enabled.""" + return L1_CACHE_MAX_ENTRIES > 0 + + +def _l1_expiry_from_ttl(ttl_seconds: int) -> float | None: + """Return absolute expiry for a TTL, or None for non-expiring entries.""" + if ttl_seconds <= 0: + return None + return time.time() + ttl_seconds + + +def _l1_insert_entry_locked( + track_id: str, known_missing: bool, payload: Any, expires_at: float | None +) -> None: + """Upsert one L1 entry and enforce global max-entry cap.""" + _L1_CACHE[track_id] = (known_missing, payload, expires_at) + _L1_CACHE.move_to_end(track_id) + while len(_L1_CACHE) > L1_CACHE_MAX_ENTRIES: + _L1_CACHE.popitem(last=False) + + +def _l1_store_features(features_by_track_id: dict[str, dict[str, Any]]) -> None: + """Store feature payloads in L1 cache.""" + if not _l1_cache_enabled() or not features_by_track_id: + return + expires_at = _l1_expiry_from_ttl(L1_FEATURE_TTL_SECONDS) + with _L1_CACHE_LOCK: + for track_id, payload in features_by_track_id.items(): + normalized = dict(payload) + normalized["id"] = track_id + _l1_insert_entry_locked(track_id, False, normalized, expires_at) + + +def _l1_store_misses(miss_reasons_by_track_id: dict[str, str]) -> None: + """Store known-miss reasons in L1 cache.""" + if not _l1_cache_enabled() or not miss_reasons_by_track_id: + return + expires_at = _l1_expiry_from_ttl(L1_MISS_TTL_SECONDS) + with _L1_CACHE_LOCK: + for track_id, reason in miss_reasons_by_track_id.items(): + reason_text = reason if reason else "known_missing_cached" + _l1_insert_entry_locked(track_id, True, reason_text, expires_at) + + +def _l1_lookup_track_ids( + track_ids: list[str], +) -> tuple[dict[str, dict[str, Any]], dict[str, str], list[str]]: + """Return L1 hits and unresolved IDs for DB fallback lookup.""" + if not _l1_cache_enabled() or not track_ids: + return {}, {}, track_ids + + now = time.time() + features_by_track_id: dict[str, dict[str, Any]] = {} + misses_by_track_id: dict[str, str] = {} + unresolved_track_ids: list[str] = [] + + with _L1_CACHE_LOCK: + for track_id in track_ids: + cached_entry = _L1_CACHE.get(track_id) + if cached_entry is None: + unresolved_track_ids.append(track_id) + continue + + known_missing, payload, expires_at = cached_entry + if expires_at is not None and expires_at < now: + _L1_CACHE.pop(track_id, None) + unresolved_track_ids.append(track_id) + continue + + _L1_CACHE.move_to_end(track_id) + if known_missing: + misses_by_track_id[track_id] = ( + str(payload) if payload else "known_missing_cached" + ) + continue + + if not isinstance(payload, dict): + _L1_CACHE.pop(track_id, None) + unresolved_track_ids.append(track_id) + continue + + normalized = dict(payload) + normalized["id"] = track_id + features_by_track_id[track_id] = normalized + + return features_by_track_id, misses_by_track_id, unresolved_track_ids + + def _pool_bounds() -> tuple[int, int]: """Return valid min/max pool bounds.""" min_conn = max(1, POOL_MIN_CONN) @@ -167,7 +273,7 @@ def get_cached_track_features( track_ids: list[str], ) -> tuple[dict[str, dict[str, Any]], dict[str, str]]: """Return cached features and known-miss reasons for provided track IDs.""" - if not _cache_enabled() or not track_ids: + if not track_ids: return {}, {} valid_track_ids = [ track_id for track_id in track_ids if _is_valid_track_id(track_id) @@ -175,17 +281,23 @@ def get_cached_track_features( if not valid_track_ids: return {}, {} + features_by_track_id, misses_by_track_id, unresolved_track_ids = _l1_lookup_track_ids( + valid_track_ids + ) + if not unresolved_track_ids or not _cache_enabled(): + return features_by_track_id, misses_by_track_id + _ensure_schema() now = int(time.time()) min_feature_updated_at = now - CACHE_TTL_SECONDS min_miss_updated_at = now - MISS_TTL_SECONDS - features_by_track_id: dict[str, dict[str, Any]] = {} - misses_by_track_id: dict[str, str] = {} + db_features_by_track_id: dict[str, dict[str, Any]] = {} + db_misses_by_track_id: dict[str, str] = {} pool, connection = _acquire_connection() if connection is None: - return {}, {} + return features_by_track_id, misses_by_track_id try: with connection.cursor() as cursor: cursor.execute( @@ -199,14 +311,17 @@ def get_cached_track_features( (known_missing = TRUE AND updated_at >= %s) ) """, - (valid_track_ids, min_feature_updated_at, min_miss_updated_at), + (unresolved_track_ids, min_feature_updated_at, min_miss_updated_at), ) rows = cursor.fetchall() for track_id, payload_json, known_missing, miss_reason in rows: + track_id_text = str(track_id) if known_missing: - misses_by_track_id[str(track_id)] = ( + miss_reason_text = ( str(miss_reason) if miss_reason else "known_missing_cached" ) + misses_by_track_id[track_id_text] = miss_reason_text + db_misses_by_track_id[track_id_text] = miss_reason_text continue if not payload_json: @@ -216,11 +331,14 @@ def get_cached_track_features( except (TypeError, json.JSONDecodeError): continue if isinstance(payload, dict): - payload["id"] = str(track_id) - features_by_track_id[str(track_id)] = payload + payload["id"] = track_id_text + features_by_track_id[track_id_text] = payload + db_features_by_track_id[track_id_text] = payload finally: _release_connection(pool, connection) + _l1_store_features(db_features_by_track_id) + _l1_store_misses(db_misses_by_track_id) return features_by_track_id, misses_by_track_id @@ -228,11 +346,12 @@ def cache_track_features( features: list[dict[str, Any]], source: str = "reccobeats" ) -> None: """Upsert resolved track features into cache.""" - if not _cache_enabled() or not features: + if not features: return - _ensure_schema() - now = int(time.time()) + db_cache_enabled = _cache_enabled() + now = int(time.time()) if db_cache_enabled else 0 + l1_features_by_track_id: dict[str, dict[str, Any]] = {} rows: list[tuple[str, str, bool, str, str, int]] = [] for feature in features: track_id = feature.get("id") @@ -240,19 +359,24 @@ def cache_track_features( continue payload = dict(feature) payload["id"] = str(track_id) - rows.append( - ( - str(track_id), - json.dumps(payload, separators=(",", ":")), - False, - "", - source, - now, + l1_features_by_track_id[str(track_id)] = payload + if db_cache_enabled: + rows.append( + ( + str(track_id), + json.dumps(payload, separators=(",", ":")), + False, + "", + source, + now, + ) ) - ) - if not rows: + _l1_store_features(l1_features_by_track_id) + if not db_cache_enabled or not rows: return + _ensure_schema() + pool, connection = _acquire_connection() if connection is None: return @@ -283,15 +407,24 @@ def cache_track_features( def cache_known_misses(miss_reasons_by_track_id: dict[str, str]) -> None: """Upsert known misses so future runs can skip repeated failed lookups.""" - if not _cache_enabled() or not miss_reasons_by_track_id: + if not miss_reasons_by_track_id: + return + + db_cache_enabled = _cache_enabled() + valid_miss_reasons = { + track_id: str(reason) + for track_id, reason in miss_reasons_by_track_id.items() + if _is_valid_track_id(track_id) + } + _l1_store_misses(valid_miss_reasons) + if not db_cache_enabled: return _ensure_schema() now = int(time.time()) rows = [ (track_id, None, True, reason, "known_miss", now) - for track_id, reason in miss_reasons_by_track_id.items() - if _is_valid_track_id(track_id) + for track_id, reason in valid_miss_reasons.items() ] if not rows: return From 6e425f82dc007bf02a28ef6a65c0bac9e9a6b89d Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Mon, 9 Mar 2026 23:11:39 -0700 Subject: [PATCH 7/8] persist playlist job status across workers and enforce spotify scopes --- Backend/Dockerfile | 2 +- Backend/app.py | 90 ++++++++--- Backend/job_status_store.py | 305 ++++++++++++++++++++++++++++++++++++ Backend/spotify_api.py | 28 +++- 4 files changed, 402 insertions(+), 23 deletions(-) create mode 100644 Backend/job_status_store.py diff --git a/Backend/Dockerfile b/Backend/Dockerfile index e073b8d..7cea818 100644 --- a/Backend/Dockerfile +++ b/Backend/Dockerfile @@ -11,4 +11,4 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . . -CMD ["gunicorn", "--bind", "0.0.0.0:8080", "app:app"] +CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "2", "--worker-class", "gthread", "--threads", "4", "app:app"] diff --git a/Backend/app.py b/Backend/app.py index 47a8ccd..01319a2 100644 --- a/Backend/app.py +++ b/Backend/app.py @@ -24,6 +24,11 @@ ) from Backend.playlist_processing import process_all from Backend.helpers import generate_random_string + from Backend.job_status_store import ( + set_job_state, + get_job_state, + prune_finished_jobs_older_than, + ) except ModuleNotFoundError: from spotify_api import ( # type: ignore is_access_token_valid, @@ -35,6 +40,11 @@ ) from playlist_processing import process_all # type: ignore from helpers import generate_random_string # type: ignore + from job_status_store import ( # type: ignore + set_job_state, + get_job_state, + prune_finished_jobs_older_than, + ) load_dotenv(Path(__file__).resolve().parent / ".env") @@ -46,6 +56,36 @@ def parse_bool(value: str | None, default: bool = False) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} +REQUIRED_SPOTIFY_SCOPES = { + "user-read-private", + "playlist-read-private", + "playlist-modify-public", + "playlist-modify-private", +} +SPOTIFY_LOGIN_SCOPE = ( + "user-read-private playlist-read-private " + "playlist-modify-public playlist-modify-private" +) + + +def _parse_scope_set(scope_value) -> set[str]: + """Normalize stored Spotify scope value into a scope-name set.""" + if isinstance(scope_value, str): + tokens = scope_value.replace(",", " ").split() + return {token.strip() for token in tokens if token.strip()} + + if isinstance(scope_value, list): + return {str(token).strip() for token in scope_value if str(token).strip()} + + return set() + + +def _missing_required_scopes() -> list[str]: + """Return required Spotify scopes missing from current session.""" + granted_scopes = _parse_scope_set(session.get("auth_scopes")) + return sorted(REQUIRED_SPOTIFY_SCOPES - granted_scopes) + + frontend_url = os.getenv("FRONTEND_URL", "http://127.0.0.1:3000").rstrip("/") cors_origins = [ origin.strip() @@ -78,8 +118,6 @@ def parse_bool(value: str | None, default: bool = False) -> bool: ) JOB_STATUS_TTL_SECONDS = int(os.getenv("JOB_STATUS_TTL_SECONDS", "21600")) -_PROCESSING_JOBS: dict[str, dict[str, object]] = {} -_PROCESSING_JOBS_LOCK = threading.Lock() def get_auth_token_from_request(): @@ -88,24 +126,14 @@ def get_auth_token_from_request(): def _prune_old_jobs(): - """Drop old completed jobs to keep in-memory status map bounded.""" + """Drop old completed jobs to keep status store bounded.""" cutoff = time.time() - JOB_STATUS_TTL_SECONDS - with _PROCESSING_JOBS_LOCK: - old_job_ids = [ - job_id - for job_id, payload in _PROCESSING_JOBS.items() - if payload.get("finished_at") and float(payload["finished_at"]) < cutoff - ] - for job_id in old_job_ids: - _PROCESSING_JOBS.pop(job_id, None) + prune_finished_jobs_older_than(cutoff) def _set_job_state(job_id: str, **fields): - """Update one job status entry in a thread-safe way.""" - with _PROCESSING_JOBS_LOCK: - payload = _PROCESSING_JOBS.get(job_id, {}) - payload.update(fields) - _PROCESSING_JOBS[job_id] = payload + """Update one job status entry in shared status store.""" + set_job_state(job_id, **fields) def _run_process_playlist_job(job_id: str, auth_token: str, playlist_ids: list[str]): @@ -151,6 +179,14 @@ def login_handler(): refresh_token = session.get("refresh_token") if uid: + missing_scopes = _missing_required_scopes() + if missing_scopes: + print( + "Session missing required Spotify scopes.", + f"missing={','.join(missing_scopes)}", + ) + return redirect_to_spotify_login() + if not auth_token: return redirect_to_spotify_login() @@ -178,12 +214,11 @@ def redirect_to_spotify_login(): state = generate_random_string(16) session["oauth_state"] = state - scope = "user-read-private playlist-modify-public playlist-read-private" params = { "response_type": "code", "client_id": client_id, - "scope": scope, + "scope": SPOTIFY_LOGIN_SCOPE, "show_dialog": "true", "redirect_uri": redirect_uri, "state": state, @@ -220,6 +255,7 @@ def callback_handler(): session["uid"] = user_id session["auth_token"] = auth_token session["refresh_token"] = token_data.get("refresh_token") + session["auth_scopes"] = sorted(_parse_scope_set(token_data.get("scope", ""))) return redirect(f"{frontend_url}/input-playlist") @@ -248,9 +284,22 @@ def process_playlist_handler(): """Start async processing job for selected playlists.""" auth_token = get_auth_token_from_request() - if not auth_token or not is_access_token_valid(auth_token): + if not auth_token: return "Authorization required", 401 + missing_scopes = _missing_required_scopes() + if missing_scopes: + return ( + jsonify( + { + "Code": 403, + "Error": "Insufficient Spotify scopes. Please re-login.", + "missingScopes": missing_scopes, + } + ), + 403, + ) + assert request.json playlist_ids = request.json.get("playlistIds", []) @@ -281,8 +330,7 @@ def process_playlist_handler(): @app.route("/api/process-playlist-status/") def process_playlist_status_handler(job_id): """Return current status for an async playlist processing job.""" - with _PROCESSING_JOBS_LOCK: - payload = _PROCESSING_JOBS.get(job_id) + payload = get_job_state(job_id) if not payload: return jsonify({"Code": 404, "Error": "Job not found"}), 404 return jsonify(payload), 200 diff --git a/Backend/job_status_store.py b/Backend/job_status_store.py new file mode 100644 index 0000000..fc64a95 --- /dev/null +++ b/Backend/job_status_store.py @@ -0,0 +1,305 @@ +"""Shared job-status persistence for async playlist processing jobs.""" + +import json +import os +import threading +import time +from typing import Any + +DEFAULT_POOL_MIN_CONN = 1 +DEFAULT_POOL_MAX_CONN = 4 +JOB_STATUS_TABLE_NAME = "playlist_processing_job_status" + +_POOL = None +_POOL_LOCK = threading.Lock() +_SCHEMA_READY = False +_SCHEMA_LOCK = threading.Lock() + +_MEMORY_JOBS: dict[str, dict[str, Any]] = {} +_MEMORY_JOBS_LOCK = threading.Lock() + + +def _parse_positive_int(name: str, default_value: int) -> int: + """Parse positive integer env values with fallback.""" + raw_value = os.getenv(name, str(default_value)).strip() + if not raw_value.lstrip("-").isdigit(): + return default_value + parsed = int(raw_value) + return parsed if parsed > 0 else 1 + + +JOB_STATUS_DB_POOL_MIN = _parse_positive_int( + "JOB_STATUS_DB_POOL_MIN", DEFAULT_POOL_MIN_CONN +) +JOB_STATUS_DB_POOL_MAX = _parse_positive_int( + "JOB_STATUS_DB_POOL_MAX", DEFAULT_POOL_MAX_CONN +) + + +def _database_url() -> str: + """Return validated DATABASE_URL, if set.""" + db_url = os.getenv("DATABASE_URL", "").strip() + if not db_url: + return "" + if "\x00" in db_url: + raise ValueError("Invalid DATABASE_URL") + return db_url + + +def _db_enabled() -> bool: + """Return whether DB-backed shared status storage is enabled.""" + enabled_text = os.getenv("JOB_STATUS_DB_ENABLED", "true").strip().lower() + return enabled_text in {"1", "true", "yes", "on"} and bool(_database_url()) + + +def _pool_bounds() -> tuple[int, int]: + """Return valid min/max connection pool bounds.""" + min_conn = max(1, JOB_STATUS_DB_POOL_MIN) + max_conn = max(min_conn, JOB_STATUS_DB_POOL_MAX) + return min_conn, max_conn + + +def _load_pool_class(): + """Load psycopg2 threaded connection pool lazily.""" + pool_module = __import__("psycopg2.pool", fromlist=["ThreadedConnectionPool"]) + return pool_module.ThreadedConnectionPool + + +def _get_pool(): + """Create (or return) pooled DB connector.""" + if not _db_enabled(): + return None + + global _POOL # pylint: disable=global-statement + if _POOL is not None: + return _POOL + + with _POOL_LOCK: + if _POOL is not None: + return _POOL + min_conn, max_conn = _pool_bounds() + pool_class = _load_pool_class() + _POOL = pool_class( + min_conn, + max_conn, + _database_url(), + connect_timeout=10, + ) + return _POOL + + +def _acquire_connection(): + """Acquire one connection from DB pool.""" + pool = _get_pool() + if pool is None: + return None, None + return pool, pool.getconn() + + +def _release_connection(pool, connection) -> None: + """Return connection to DB pool.""" + if pool is None or connection is None: + return + pool.putconn(connection) + + +def _ensure_schema() -> None: + """Create DB table/indexes for shared job-status storage.""" + global _SCHEMA_READY # pylint: disable=global-statement + if not _db_enabled() or _SCHEMA_READY: + return + + with _SCHEMA_LOCK: + if _SCHEMA_READY: + return + pool, connection = _acquire_connection() + if connection is None: + return + try: + with connection: + with connection.cursor() as cursor: + cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {JOB_STATUS_TABLE_NAME} ( + job_id TEXT PRIMARY KEY, + payload_json TEXT NOT NULL, + finished_at DOUBLE PRECISION, + updated_at DOUBLE PRECISION NOT NULL + ) + """ + ) + cursor.execute( + f""" + CREATE INDEX IF NOT EXISTS idx_{JOB_STATUS_TABLE_NAME}_finished_at + ON {JOB_STATUS_TABLE_NAME} (finished_at) + """ + ) + _SCHEMA_READY = True + finally: + _release_connection(pool, connection) + + +def _set_memory_job_state(job_id: str, **fields) -> None: + """Set job state in process-local fallback store.""" + with _MEMORY_JOBS_LOCK: + payload = _MEMORY_JOBS.get(job_id, {}) + payload.update(fields) + _MEMORY_JOBS[job_id] = payload + + +def _get_memory_job_state(job_id: str) -> dict[str, Any] | None: + """Read job state from process-local fallback store.""" + with _MEMORY_JOBS_LOCK: + payload = _MEMORY_JOBS.get(job_id) + if payload is None: + return None + return dict(payload) + + +def _prune_memory_jobs(cutoff: float) -> int: + """Prune old completed jobs from process-local fallback store.""" + removed = 0 + with _MEMORY_JOBS_LOCK: + old_job_ids = [] + for job_id, payload in _MEMORY_JOBS.items(): + finished_at = payload.get("finished_at") + if isinstance(finished_at, (int, float)) and float(finished_at) < cutoff: + old_job_ids.append(job_id) + for job_id in old_job_ids: + _MEMORY_JOBS.pop(job_id, None) + removed += 1 + return removed + + +def _finished_at_from_fields(fields: dict[str, Any]) -> float | None: + """Normalize finished_at field value when present.""" + if "finished_at" not in fields: + return None + finished_at = fields.get("finished_at") + if finished_at is None: + return None + if isinstance(finished_at, (int, float)): + return float(finished_at) + return None + + +def set_job_state(job_id: str, **fields) -> None: + """Upsert shared job state for a single job ID.""" + if not job_id: + return + + if not _db_enabled(): + _set_memory_job_state(job_id, **fields) + return + + _ensure_schema() + pool, connection = _acquire_connection() + if connection is None: + _set_memory_job_state(job_id, **fields) + return + try: + now = float(time.time()) + payload_json = json.dumps(fields, separators=(",", ":")) + finished_at = _finished_at_from_fields(fields) + with connection: + with connection.cursor() as cursor: + cursor.execute( + f""" + INSERT INTO {JOB_STATUS_TABLE_NAME} + (job_id, payload_json, finished_at, updated_at) + VALUES (%s, %s, %s, %s) + ON CONFLICT(job_id) DO UPDATE SET + payload_json=( + ( + COALESCE({JOB_STATUS_TABLE_NAME}.payload_json, '{{}}')::jsonb + || EXCLUDED.payload_json::jsonb + )::text + ), + finished_at=( + CASE + WHEN EXCLUDED.payload_json::jsonb ? 'finished_at' + THEN EXCLUDED.finished_at + ELSE {JOB_STATUS_TABLE_NAME}.finished_at + END + ), + updated_at=EXCLUDED.updated_at + """, + (job_id, payload_json, finished_at, now), + ) + except Exception as error: # pylint: disable=broad-exception-caught + print(f"Job status DB write failed, using in-memory fallback: {error}") + _set_memory_job_state(job_id, **fields) + finally: + _release_connection(pool, connection) + + +def get_job_state(job_id: str) -> dict[str, Any] | None: + """Return shared job status payload for one job ID.""" + if not job_id: + return None + + if not _db_enabled(): + return _get_memory_job_state(job_id) + + _ensure_schema() + pool, connection = _acquire_connection() + if connection is None: + return _get_memory_job_state(job_id) + try: + with connection.cursor() as cursor: + cursor.execute( + f""" + SELECT payload_json + FROM {JOB_STATUS_TABLE_NAME} + WHERE job_id = %s + """, + (job_id,), + ) + row = cursor.fetchone() + if not row: + return _get_memory_job_state(job_id) + + payload_json = row[0] + if not payload_json: + return None + payload = json.loads(payload_json) + if isinstance(payload, dict): + return payload + return None + except (TypeError, json.JSONDecodeError) as error: + print(f"Job status DB payload decode failed: {error}") + return None + except Exception as error: # pylint: disable=broad-exception-caught + print(f"Job status DB read failed, using in-memory fallback: {error}") + return _get_memory_job_state(job_id) + finally: + _release_connection(pool, connection) + + +def prune_finished_jobs_older_than(cutoff: float) -> int: + """Delete completed jobs older than cutoff timestamp.""" + if not _db_enabled(): + return _prune_memory_jobs(cutoff) + + _ensure_schema() + pool, connection = _acquire_connection() + if connection is None: + return _prune_memory_jobs(cutoff) + try: + with connection: + with connection.cursor() as cursor: + cursor.execute( + f""" + DELETE FROM {JOB_STATUS_TABLE_NAME} + WHERE finished_at IS NOT NULL + AND finished_at < %s + """, + (cutoff,), + ) + deleted = cursor.rowcount + return int(deleted) + except Exception as error: # pylint: disable=broad-exception-caught + print(f"Job status DB prune failed, using in-memory fallback: {error}") + return _prune_memory_jobs(cutoff) + finally: + _release_connection(pool, connection) diff --git a/Backend/spotify_api.py b/Backend/spotify_api.py index 4f89878..9dfac36 100644 --- a/Backend/spotify_api.py +++ b/Backend/spotify_api.py @@ -53,6 +53,14 @@ def _env_positive_int(name: str, default_value: int) -> int: _SPOTIFY_SESSION_LOCK = threading.Lock() +def _trim_response_text(text: str, max_length: int = 300) -> str: + """Return one-line response preview to keep logs readable.""" + normalized = " ".join(str(text).split()) + if len(normalized) <= max_length: + return normalized + return normalized[: max_length - 3] + "..." + + def _reserve_spotify_slot_delay() -> float: """Reserve the next Spotify request slot and return required sleep time.""" global _SPOTIFY_NEXT_ALLOWED_TS # pylint: disable=global-statement @@ -182,7 +190,25 @@ def spotify_request( continue if response.status_code >= 400: - print(f"Spotify API request error: {response.status_code}, {response.text}") + response_preview = _trim_response_text(response.text) + print( + "Spotify API request error:", + f"status={response.status_code},", + f"method={method},", + f"endpoint={endpoint},", + f"response={response_preview}", + ) + if ( + response.status_code == 403 + and "insufficient client scope" in response_preview.lower() + ): + print( + "Spotify scope failure:", + f"method={method},", + f"endpoint={endpoint},", + "hint=Re-authenticate with playlist-modify-public and " + "playlist-modify-private scopes.", + ) return {} return response.json() From b03d5abf33068d9ff92af5253862fdfe3f71bd1f Mon Sep 17 00:00:00 2001 From: Stanley Good Date: Mon, 9 Mar 2026 23:20:19 -0700 Subject: [PATCH 8/8] lint --- Backend/job_status_store.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Backend/job_status_store.py b/Backend/job_status_store.py index fc64a95..36aafdb 100644 --- a/Backend/job_status_store.py +++ b/Backend/job_status_store.py @@ -1,5 +1,8 @@ """Shared job-status persistence for async playlist processing jobs.""" +# Reuses connection-pool/schema patterns from track_cache intentionally. +# pylint: disable=duplicate-code + import json import os import threading