diff --git a/Backend/app.py b/Backend/app.py index 347aefa..47a8ccd 100644 --- a/Backend/app.py +++ b/Backend/app.py @@ -1,6 +1,9 @@ """Flask API entrypoint for Spotify auth and playlist processing routes.""" import os +import threading +import time +import uuid from datetime import timedelta from pathlib import Path from urllib.parse import urlencode @@ -74,12 +77,57 @@ def parse_bool(value: str | None, default: bool = False) -> bool: supports_credentials=True, ) +JOB_STATUS_TTL_SECONDS = int(os.getenv("JOB_STATUS_TTL_SECONDS", "21600")) +_PROCESSING_JOBS: dict[str, dict[str, object]] = {} +_PROCESSING_JOBS_LOCK = threading.Lock() + def get_auth_token_from_request(): """Return auth token from request cookies, falling back to server session.""" return session.get("auth_token") or request.cookies.get("auth_token") +def _prune_old_jobs(): + """Drop old completed jobs to keep in-memory status map bounded.""" + cutoff = time.time() - JOB_STATUS_TTL_SECONDS + with _PROCESSING_JOBS_LOCK: + old_job_ids = [ + job_id + for job_id, payload in _PROCESSING_JOBS.items() + if payload.get("finished_at") and float(payload["finished_at"]) < cutoff + ] + for job_id in old_job_ids: + _PROCESSING_JOBS.pop(job_id, None) + + +def _set_job_state(job_id: str, **fields): + """Update one job status entry in a thread-safe way.""" + with _PROCESSING_JOBS_LOCK: + payload = _PROCESSING_JOBS.get(job_id, {}) + payload.update(fields) + _PROCESSING_JOBS[job_id] = payload + + +def _run_process_playlist_job(job_id: str, auth_token: str, playlist_ids: list[str]): + """Run playlist processing in background and persist status fields.""" + _set_job_state(job_id, status="running", started_at=time.time()) + try: + process_all(auth_token, playlist_ids) + _set_job_state( + job_id, + status="succeeded", + finished_at=time.time(), + error=None, + ) + except Exception as error: # pylint: disable=broad-exception-caught + _set_job_state( + job_id, + status="failed", + finished_at=time.time(), + error=str(error), + ) + + @app.after_request def add_security_headers(response): """Apply baseline security headers for browser-facing responses.""" @@ -197,7 +245,7 @@ def get_playlist_handler(): @app.route("/process-playlist", methods=["POST"]) @app.route("/api/process-playlist", methods=["POST"]) def process_playlist_handler(): - """Process selected playlists into clustered output playlists.""" + """Start async processing job for selected playlists.""" auth_token = get_auth_token_from_request() if not auth_token or not is_access_token_valid(auth_token): @@ -209,9 +257,35 @@ def process_playlist_handler(): if not playlist_ids: return "No playlist IDs provided", 400 - process_all(auth_token, playlist_ids) + _prune_old_jobs() + job_id = str(uuid.uuid4()) + _set_job_state( + job_id, + status="queued", + created_at=time.time(), + finished_at=None, + error=None, + playlist_count=len(playlist_ids), + ) + job_thread = threading.Thread( + target=_run_process_playlist_job, + args=(job_id, auth_token, playlist_ids), + daemon=True, + ) + job_thread.start() + + return jsonify({"jobId": job_id, "status": "queued"}), 202 + - return jsonify({"message": "Playlists processed successfully!"}), 200 +@app.route("/process-playlist-status/") +@app.route("/api/process-playlist-status/") +def process_playlist_status_handler(job_id): + """Return current status for an async playlist processing job.""" + with _PROCESSING_JOBS_LOCK: + payload = _PROCESSING_JOBS.get(job_id) + if not payload: + return jsonify({"Code": 404, "Error": "Job not found"}), 404 + return jsonify(payload), 200 if __name__ == "__main__": diff --git a/Backend/audio_feature_pipeline.py b/Backend/audio_feature_pipeline.py index 3ddf150..321ee79 100644 --- a/Backend/audio_feature_pipeline.py +++ b/Backend/audio_feature_pipeline.py @@ -10,6 +10,11 @@ search_track_ids_by_name_artist, ) from Backend.track_utils import dedupe_track_ids, is_valid_spotify_track_id + from Backend.track_cache import ( + get_cached_track_features, + cache_track_features, + cache_known_misses, + ) except ModuleNotFoundError: from spotify_api import ( # type: ignore get_reccobeats_audio_features_batch, @@ -17,6 +22,11 @@ search_track_ids_by_name_artist, ) from track_utils import dedupe_track_ids, is_valid_spotify_track_id # type: ignore + from track_cache import ( # type: ignore + get_cached_track_features, + cache_track_features, + cache_known_misses, + ) FALLBACK_SEARCH_LIMIT = 3 FALLBACK_SEARCH_CONCURRENCY = 6 @@ -51,22 +61,55 @@ async def _search_candidates_for_query( async def get_track_audio_features(track_ids, auth_token): """Fetch audio features from ReccoBeats, then try Spotify-search fallback for misses.""" start_time = time.time() - features, diagnostics, summary = await asyncio.to_thread( - get_reccobeats_audio_features_batch, track_ids, 40, True + unique_track_ids, _ = dedupe_track_ids(track_ids) + cached_features_by_id, cached_misses_by_id = await asyncio.to_thread( + get_cached_track_features, unique_track_ids ) + track_ids_to_fetch = [ + track_id + for track_id in unique_track_ids + if track_id not in cached_features_by_id and track_id not in cached_misses_by_id + ] + + features = list(cached_features_by_id.values()) + diagnostics = { + track_id: "ok" for track_id in cached_features_by_id + } | cached_misses_by_id + summary = { + "requested": len(track_ids_to_fetch), + "matched": 0, + "not_returned": 0, + "invalid_response_item_count": 0, + "unexpected_id_count": 0, + } + + if track_ids_to_fetch: + fetched_features, fetched_diagnostics, fetched_summary = await asyncio.to_thread( + get_reccobeats_audio_features_batch, track_ids_to_fetch, 40, True + ) + features.extend(fetched_features) + diagnostics.update(fetched_diagnostics) + summary = fetched_summary + await asyncio.to_thread(cache_track_features, fetched_features, "reccobeats") + print( "ReccoBeats diagnostics:", f"requested={summary['requested']},", f"matched={summary['matched']},", f"not_returned={summary['not_returned']},", f"invalid_items={summary['invalid_response_item_count']},", - f"unexpected_ids={summary['unexpected_id_count']}", + f"unexpected_ids={summary['unexpected_id_count']},", + f"cache_hits={len(cached_features_by_id)},", + f"known_miss_cache_hits={len(cached_misses_by_id)}", ) missing_track_ids = [ - track_id for track_id, reason in diagnostics.items() if reason != "ok" + track_id + for track_id in track_ids_to_fetch + if diagnostics.get(track_id) != "ok" ] resolved_count = 0 + fallback_resolved_features = [] if missing_track_ids: metadata_map = await asyncio.to_thread( @@ -145,11 +188,29 @@ async def get_track_audio_features(track_ids, auth_token): normalized = dict(replacement) normalized["id"] = missing_track_id features.append(normalized) - diagnostics[missing_track_id] = "resolved_via_spotify_search" + fallback_resolved_features.append(normalized) + diagnostics[missing_track_id] = "ok" resolved_count += 1 elif diagnostics.get(missing_track_id) != "ok": diagnostics[missing_track_id] = "fallback_candidates_not_in_reccobeats" + unresolved_misses = { + track_id: diagnostics.get(track_id, "unknown") + for track_id in track_ids_to_fetch + if diagnostics.get(track_id) != "ok" + } + if unresolved_misses: + await asyncio.to_thread(cache_known_misses, unresolved_misses) + if fallback_resolved_features: + await asyncio.to_thread( + cache_track_features, fallback_resolved_features, "fallback_replacement" + ) + + features_by_id = { + row["id"]: row for row in features if isinstance(row, dict) and row.get("id") + } + features = list(features_by_id.values()) + if resolved_count > 0: print(f"Fallback resolved {resolved_count} missing tracks via Spotify search.") diff --git a/Backend/grouping.py b/Backend/grouping.py index 5a43749..07de0c8 100644 --- a/Backend/grouping.py +++ b/Backend/grouping.py @@ -1,5 +1,7 @@ """Clustering utilities for grouping tracks by audio feature similarity.""" +import os + import numpy as np from sklearn.metrics import pairwise_distances from sklearn.mixture import GaussianMixture @@ -26,6 +28,7 @@ BALANCE_WEIGHT = 0.03 COHESION_SPLIT_DISTANCE = 2.0 COHESION_IMPROVEMENT_RATIO = 0.92 +DEFAULT_GMM_N_INIT = 3 FEATURE_WEIGHTS = { "acousticness": 1.10, "danceability": 1.35, @@ -39,6 +42,33 @@ } +def _env_positive_int(name: str, default_value: int) -> int: + """Parse positive integer env settings with fallback.""" + raw_value = os.getenv(name) + if raw_value is None: + return default_value + parsed_text = raw_value.strip() + if not parsed_text or not parsed_text.lstrip("-").isdigit(): + return default_value + + parsed_value = int(parsed_text) + if parsed_value <= 0: + return 1 + return parsed_value + + +MAX_K_CANDIDATES = _env_positive_int("CLUSTER_MAX_K_CANDIDATES", 10) +MAX_OUTPUT_PLAYLISTS = _env_positive_int("MAX_OUTPUT_PLAYLISTS", 10) +LARGE_PLAYLIST_TRACK_THRESHOLD = _env_positive_int( + "CLUSTER_LARGE_PLAYLIST_THRESHOLD", 700 +) +LARGE_PLAYLIST_MAX_K_CANDIDATES = _env_positive_int( + "CLUSTER_LARGE_MAX_K_CANDIDATES", 6 +) +LARGE_PLAYLIST_GMM_N_INIT = _env_positive_int("CLUSTER_LARGE_GMM_N_INIT", 2) +REFINE_MAX_TRACKS = _env_positive_int("CLUSTER_REFINE_MAX_TRACKS", 600) + + def _merge_small_clusters( scaled_features: np.ndarray, labels: np.ndarray, min_cluster_size: int ) -> np.ndarray: @@ -145,6 +175,27 @@ def _evaluate_labels(scaled_features: np.ndarray, labels: np.ndarray) -> dict[st } +def _candidate_k_values(k_upper_bound: int, track_count: int) -> list[int]: + """Return a bounded set of k candidates for GMM search.""" + all_values = list(range(2, k_upper_bound + 1)) + if not all_values: + return [] + + max_candidates = MAX_K_CANDIDATES + if track_count >= LARGE_PLAYLIST_TRACK_THRESHOLD: + max_candidates = min(max_candidates, LARGE_PLAYLIST_MAX_K_CANDIDATES) + if len(all_values) <= max_candidates: + return all_values + + index_values = np.linspace( + 0, len(all_values) - 1, num=max_candidates, dtype=int + ).tolist() + selected_values = sorted({all_values[index] for index in index_values}) + if all_values[-1] not in selected_values: + selected_values.append(all_values[-1]) + return sorted(set(selected_values)) + + def _refine_cluster_cohesion( scaled_features: np.ndarray, labels: np.ndarray, min_cluster_size: int ) -> np.ndarray: @@ -232,6 +283,7 @@ def cluster_df(track_audio_features: list[dict]) -> pd.DataFrame: track_count = len(feature_frame) k_upper_bound = min( + MAX_OUTPUT_PLAYLISTS, ABSOLUTE_MAX_CLUSTERS, max(2, track_count // MIN_CLUSTER_SIZE), track_count - 1, @@ -240,19 +292,26 @@ def cluster_df(track_audio_features: list[dict]) -> pd.DataFrame: return pd.DataFrame({"id": ids.values, "cluster": [0] * track_count}) candidates = [] + gmm_n_init = ( + LARGE_PLAYLIST_GMM_N_INIT + if track_count >= LARGE_PLAYLIST_TRACK_THRESHOLD + else DEFAULT_GMM_N_INIT + ) + candidate_k_values = _candidate_k_values(k_upper_bound, track_count) - for k in range(2, k_upper_bound + 1): + for k in candidate_k_values: model = GaussianMixture( n_components=k, covariance_type="diag", - n_init=3, + n_init=gmm_n_init, random_state=0, ) model.fit(weighted_scaled) bic = model.bic(weighted_scaled) labels = model.predict(weighted_scaled) labels = _merge_small_clusters(weighted_scaled, np.array(labels), MIN_CLUSTER_SIZE) - labels = _refine_cluster_cohesion(weighted_scaled, labels, MIN_CLUSTER_SIZE) + if track_count <= REFINE_MAX_TRACKS: + labels = _refine_cluster_cohesion(weighted_scaled, labels, MIN_CLUSTER_SIZE) labels = _reindex_labels(labels) if len(set(labels.tolist())) < 2: continue diff --git a/Backend/playlist_processing.py b/Backend/playlist_processing.py index 5dcf55f..cec73c6 100644 --- a/Backend/playlist_processing.py +++ b/Backend/playlist_processing.py @@ -1,6 +1,7 @@ """Playlist processing orchestration for feature fetch, clustering, and output creation.""" import asyncio +import os import time from collections import defaultdict try: @@ -43,6 +44,21 @@ from track_utils import dedupe_track_ids, is_valid_spotify_track_id # type: ignore +def _env_positive_int(name: str, default_value: int) -> int: + """Parse positive integer env values with safe fallback.""" + raw_value = os.getenv(name, str(default_value)) + try: + parsed = int(raw_value) + except ValueError: + return default_value + return max(1, parsed) + + +FETCH_PAGE_CONCURRENCY = _env_positive_int("PLAYLIST_FETCH_PAGE_CONCURRENCY", 8) +CREATE_PLAYLIST_CONCURRENCY = _env_positive_int("PLAYLIST_CREATE_CONCURRENCY", 4) +ADD_SONGS_CONCURRENCY = _env_positive_int("PLAYLIST_ADD_CONCURRENCY", 12) + + def log_step_time(step_name, start_time): """Print elapsed seconds for a named processing step.""" elapsed_time = time.time() - start_time @@ -51,11 +67,19 @@ def log_step_time(step_name, start_time): async def get_playlist_track_ids(auth_token, playlist_id): """Return all track IDs from the playlist.""" + start_time = time.time() slices = calc_slices(get_playlist_length(playlist_id, auth_token)) track_ids = [] - for i in range(0, slices * 100, 100): - response = await get_playlist_children(i, playlist_id, auth_token) + offsets = list(range(0, slices * 100, 100)) + semaphore = asyncio.Semaphore(FETCH_PAGE_CONCURRENCY) + + async def fetch_page(offset): + async with semaphore: + return await get_playlist_children(offset, playlist_id, auth_token) + + responses = await asyncio.gather(*(fetch_page(offset) for offset in offsets)) + for response in responses: if not response or "items" not in response: continue @@ -70,13 +94,20 @@ async def get_playlist_track_ids(auth_token, playlist_id): print( f"Removed {duplicate_count} duplicate tracks from source playlist {playlist_id}." ) + print( + "Playlist track fetch stats:", + f"playlist_id={playlist_id},", + f"pages={len(offsets)},", + f"tracks={len(unique_track_ids)},", + f"duration={time.time() - start_time:.2f}s", + ) return unique_track_ids async def create_and_populate_cluster_playlists( clustered_tracks, feature_by_track_id, user_id, auth_token, playlist_name ): - """Create one playlist per K-means cluster and add grouped tracks.""" + """Create playlists for clusters and populate them with grouped tracks.""" start_time = time.time() tracks_by_cluster = defaultdict(list) @@ -90,6 +121,7 @@ async def create_and_populate_cluster_playlists( list(clustered_tracks["id"]), feature_by_track_id ) + cluster_candidates = [] for cluster_id, cluster_track_ids in sorted_clusters: valid_cluster_track_ids = [ track_id @@ -124,46 +156,109 @@ async def create_and_populate_cluster_playlists( if len(playlist_title) > 100: playlist_title = playlist_title[:97] + "..." - playlist_id = await create_playlist( - user_id, - auth_token, - playlist_title, - f"Grouped by audio similarity: {cluster_reason}. " - f"Trait drivers: {cluster_trait_summary}. " - "Made using Splitify: https://splitifytool.com/", + cluster_candidates.append( + { + "cluster_id": cluster_id, + "track_ids": valid_cluster_track_ids, + "playlist_title": playlist_title, + "playlist_description": ( + f"Grouped by audio similarity: {cluster_reason}. " + f"Trait drivers: {cluster_trait_summary}. " + "Made using Splitify: https://splitifytool.com/" + ), + } ) - if not playlist_id: - continue + if not cluster_candidates: + print("No eligible clusters to create playlists for.") + return - slices = calc_slices(len(valid_cluster_track_ids)) - add_tasks = [] - for position in range(0, slices * 100, 100): - track_slice = valid_cluster_track_ids[position : position + 100] - track_uris = [f"spotify:track:{track_id}" for track_id in track_slice] - add_tasks.append( - add_songs(playlist_id, track_uris, auth_token, position) + create_start = time.time() + create_semaphore = asyncio.Semaphore(CREATE_PLAYLIST_CONCURRENCY) + + async def create_cluster_playlist(candidate): + async with create_semaphore: + created_playlist_id = await create_playlist( + user_id, + auth_token, + candidate["playlist_title"], + candidate["playlist_description"], ) + if not created_playlist_id: + return None + return (candidate, created_playlist_id) - await asyncio.gather(*add_tasks) + created_entries = await asyncio.gather( + *(create_cluster_playlist(candidate) for candidate in cluster_candidates) + ) + created_entries = [entry for entry in created_entries if entry is not None] + create_duration = time.time() - create_start + + add_jobs = [] + total_tracks_to_add = 0 + for candidate, created_playlist_id in created_entries: + track_ids = candidate["track_ids"] + slices = calc_slices(len(track_ids)) + for index in range(0, slices * 100, 100): + track_slice = track_ids[index : index + 100] + track_uris = [f"spotify:track:{track_id}" for track_id in track_slice] + total_tracks_to_add += len(track_uris) + add_jobs.append((created_playlist_id, track_uris)) + + add_duration = 0.0 + successful_add_calls = 0 + if add_jobs: + add_start = time.time() + add_semaphore = asyncio.Semaphore(ADD_SONGS_CONCURRENCY) + + async def add_song_batch(job): + playlist_id, track_uris = job + async with add_semaphore: + # Avoid "Index out of bounds" races by appending without explicit position. + return await add_songs(playlist_id, track_uris, auth_token) + + add_results = await asyncio.gather(*(add_song_batch(job) for job in add_jobs)) + successful_add_calls = sum(1 for result in add_results if result) + add_duration = time.time() - add_start + + print( + "Playlist write stats:", + f"clusters_considered={len(sorted_clusters)},", + f"clusters_created={len(created_entries)},", + f"create_calls={len(cluster_candidates)},", + f"create_time={create_duration:.2f}s,", + f"add_calls={len(add_jobs)},", + f"add_calls_successful={successful_add_calls},", + f"tracks_added={total_tracks_to_add},", + f"add_time={add_duration:.2f}s", + ) log_step_time("Creating and populating cluster playlists", start_time) async def process_single_playlist(auth_token, playlist_id, user_id): - """Split one playlist into K-means cluster playlists.""" + """Split one playlist into cluster playlists.""" start_time = time.time() print(f"Processing {playlist_id}...") + playlist_name_start = time.time() playlist_name = get_playlist_name(playlist_id, auth_token) + log_step_time( + f"Fetch source playlist name ({playlist_id})", + playlist_name_start, + ) + track_fetch_start = time.time() track_ids = await get_playlist_track_ids(auth_token, playlist_id) + log_step_time(f"Collect playlist tracks ({playlist_id})", track_fetch_start) if not track_ids: print(f"No tracks found for playlist {playlist_id}") return + feature_fetch_start = time.time() audio_features, _reccobeats_diagnostics = await get_track_audio_features( track_ids, auth_token ) + log_step_time(f"Resolve audio features ({playlist_id})", feature_fetch_start) if not audio_features: print(f"No audio features available for playlist {playlist_id}") return @@ -174,14 +269,28 @@ async def process_single_playlist(auth_token, playlist_id, user_id): if len(feature_by_track_id) < len(track_ids): removed = len(track_ids) - len(feature_by_track_id) print(f"Dropped {removed} tracks without usable unique audio features.") + cluster_start = time.time() clustered_tracks = cluster_df(audio_features) + log_step_time(f"Cluster tracks ({playlist_id})", cluster_start) if clustered_tracks.empty: print(f"Failed to cluster tracks for playlist {playlist_id}") return + cluster_sizes = clustered_tracks["cluster"].value_counts().tolist() + print( + "Cluster distribution stats:", + f"clusters={len(cluster_sizes)},", + f"largest={max(cluster_sizes) if cluster_sizes else 0},", + f"smallest={min(cluster_sizes) if cluster_sizes else 0}", + ) + playlist_write_start = time.time() await create_and_populate_cluster_playlists( clustered_tracks, feature_by_track_id, user_id, auth_token, playlist_name ) + log_step_time( + f"Write clustered playlists ({playlist_id})", + playlist_write_start, + ) log_step_time(f"Processing playlist {playlist_id}", start_time) diff --git a/Backend/requirements.txt b/Backend/requirements.txt index 14cfffa..93f6f31 100644 --- a/Backend/requirements.txt +++ b/Backend/requirements.txt @@ -6,3 +6,4 @@ pandas==2.2.3 python-dotenv==1.1.0 requests==2.32.3 scikit-learn==1.5.2 +psycopg2-binary==2.9.9 diff --git a/Backend/spotify_api.py b/Backend/spotify_api.py index 524613a..028f27a 100644 --- a/Backend/spotify_api.py +++ b/Backend/spotify_api.py @@ -15,6 +15,9 @@ SPOTIFY_TRACK_PATH_PATTERN = re.compile(r"/track/([A-Za-z0-9]{22})") REQUEST_TIMEOUT_SECONDS = 15 SPOTIFY_REQUEST_INTERVAL_SECONDS = float(os.getenv("SPOTIFY_REQUEST_INTERVAL_SECONDS", "0.25")) +SPOTIFY_MAX_RETRY_ATTEMPTS = int(os.getenv("SPOTIFY_MAX_RETRY_ATTEMPTS", "3")) +SPOTIFY_RETRY_BASE_SECONDS = float(os.getenv("SPOTIFY_RETRY_BASE_SECONDS", "0.75")) +SPOTIFY_RETRYABLE_STATUS_CODES = {500, 502, 503, 504} _SPOTIFY_RATE_LIMIT_LOCK = threading.Lock() _SPOTIFY_NEXT_ALLOWED_TS = 0.0 @@ -46,7 +49,13 @@ def get_spotify_redirect_uri() -> str: def spotify_request( - method, endpoint, auth_token, params=None, data=None, json_data=None + method, + endpoint, + auth_token, + params=None, + data=None, + json_data=None, + retry_attempt: int = 0, ): """Send a Spotify Web API request with shared throttling and retry handling.""" url = f"{SPOTIFY_API_URL}{endpoint}" @@ -57,22 +66,74 @@ def spotify_request( _wait_for_spotify_slot() - response = requests.request( - method, - url, - headers=headers, - params=params, - data=data, - json=json_data, - timeout=REQUEST_TIMEOUT_SECONDS, - ) + try: + response = requests.request( + method, + url, + headers=headers, + params=params, + data=data, + json=json_data, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + except requests.RequestException as error: + if retry_attempt < SPOTIFY_MAX_RETRY_ATTEMPTS: + sleep_seconds = SPOTIFY_RETRY_BASE_SECONDS * (2 ** retry_attempt) + print( + "Spotify request network error. Retrying:", + f"attempt={retry_attempt + 1},", + f"sleep={sleep_seconds:.2f}s,", + f"error={error}", + ) + time.sleep(sleep_seconds) + return spotify_request( + method, + endpoint, + auth_token, + params, + data, + json_data, + retry_attempt + 1, + ) + print(f"Spotify request network error (final): {error}") + return {} if response.status_code == 429: # Rate limited retry_after = int(response.headers.get("Retry-After", 1)) print(f"Rate limited. Retrying after {retry_after} seconds.") _apply_spotify_retry_after(retry_after) time.sleep(retry_after) - return spotify_request(method, endpoint, auth_token, params, data, json_data) + return spotify_request( + method, + endpoint, + auth_token, + params, + data, + json_data, + retry_attempt, + ) + + if ( + response.status_code in SPOTIFY_RETRYABLE_STATUS_CODES + and retry_attempt < SPOTIFY_MAX_RETRY_ATTEMPTS + ): + sleep_seconds = SPOTIFY_RETRY_BASE_SECONDS * (2 ** retry_attempt) + print( + "Spotify retryable error. Retrying:", + f"status={response.status_code},", + f"attempt={retry_attempt + 1},", + f"sleep={sleep_seconds:.2f}s", + ) + time.sleep(sleep_seconds) + return spotify_request( + method, + endpoint, + auth_token, + params, + data, + json_data, + retry_attempt + 1, + ) if response.status_code >= 400: print(f"Spotify API request error: {response.status_code}, {response.text}") @@ -210,7 +271,9 @@ async def get_playlist_children(start_index, playlist_id, auth_token): "limit": 100, "fields": "items(track(id,uri))", } - response = spotify_request("GET", endpoint, auth_token, params=params) + response = await asyncio.to_thread( + spotify_request, "GET", endpoint, auth_token, params, None, None + ) return response @@ -429,18 +492,23 @@ async def create_playlist(user_id, auth_token, name, description): """Create a Spotify playlist and return its ID.""" endpoint = f"/users/{user_id}/playlists" json_data = {"name": name, "description": description, "public": True} - response = spotify_request("POST", endpoint, auth_token, json_data=json_data) + response = await asyncio.to_thread( + spotify_request, "POST", endpoint, auth_token, None, None, json_data + ) if response: return response.get("id") return None -async def add_songs(playlist_id, track_uris, auth_token, position): +async def add_songs(playlist_id, track_uris, auth_token, position=None): """Add tracks to a Spotify playlist at a given insertion position.""" endpoint = f"/playlists/{playlist_id}/tracks" - json_data = {"uris": track_uris, "position": position} - response = spotify_request("POST", endpoint, auth_token, json_data=json_data) - await asyncio.sleep(0.5) + json_data = {"uris": track_uris} + if position is not None: + json_data["position"] = position + response = await asyncio.to_thread( + spotify_request, "POST", endpoint, auth_token, None, None, json_data + ) if response: return response return None diff --git a/Backend/track_cache.py b/Backend/track_cache.py new file mode 100644 index 0000000..e8121e9 --- /dev/null +++ b/Backend/track_cache.py @@ -0,0 +1,324 @@ +"""Postgres-backed shared cache for track audio features and known misses.""" + +import json +import os +import re +import threading +import time +from typing import Any + +SPOTIFY_TRACK_ID_PATTERN = re.compile(r"^[A-Za-z0-9]{22}$") +MIN_TTL_SECONDS = 60 +MAX_TTL_SECONDS = 365 * 24 * 60 * 60 +DEFAULT_POOL_MIN_CONN = 1 +DEFAULT_POOL_MAX_CONN = 8 + + +def _parse_ttl_seconds(env_name: str, default_value: int) -> int: + """Parse TTL env var safely and clamp to sane bounds.""" + raw_value = os.getenv(env_name, str(default_value)).strip() + try: + parsed = int(raw_value) + except ValueError: + parsed = default_value + return max(MIN_TTL_SECONDS, min(parsed, MAX_TTL_SECONDS)) + + +def _database_url() -> str: + """Return validated DATABASE_URL.""" + db_url = os.getenv("DATABASE_URL", "").strip() + if not db_url: + return "" + if "\x00" in db_url: + raise ValueError("Invalid DATABASE_URL") + return db_url + + +def _parse_pool_size(env_name: str, default_value: int) -> int: + """Parse positive pool size setting with fallback.""" + raw_value = os.getenv(env_name, str(default_value)).strip() + if not raw_value.lstrip("-").isdigit(): + return default_value + parsed = int(raw_value) + return max(1, parsed) + + +CACHE_TTL_SECONDS = _parse_ttl_seconds( + "TRACK_CACHE_TTL_SECONDS", 30 * 24 * 60 * 60 +) +MISS_TTL_SECONDS = _parse_ttl_seconds( + "TRACK_CACHE_MISS_TTL_SECONDS", 7 * 24 * 60 * 60 +) +POOL_MIN_CONN = _parse_pool_size("TRACK_CACHE_DB_POOL_MIN", DEFAULT_POOL_MIN_CONN) +POOL_MAX_CONN = _parse_pool_size("TRACK_CACHE_DB_POOL_MAX", DEFAULT_POOL_MAX_CONN) + +_POOL = None +_POOL_LOCK = threading.Lock() +_SCHEMA_READY = False +_SCHEMA_LOCK = threading.Lock() + + +def _cache_enabled() -> bool: + value = os.getenv("TRACK_CACHE_ENABLED", "true").strip().lower() + return value in {"1", "true", "yes", "on"} and bool(_database_url()) + + +def _is_valid_track_id(track_id: str | None) -> bool: + """Return whether a track ID matches Spotify base62 format.""" + return bool(track_id) and bool(SPOTIFY_TRACK_ID_PATTERN.fullmatch(track_id)) + + +def _pool_bounds() -> tuple[int, int]: + """Return valid min/max pool bounds.""" + min_conn = max(1, POOL_MIN_CONN) + max_conn = max(min_conn, POOL_MAX_CONN) + return min_conn, max_conn + + +def _load_pool_class(): + """Load psycopg2 pooled connection class lazily.""" + pool_module = __import__("psycopg2.pool", fromlist=["ThreadedConnectionPool"]) + return pool_module.ThreadedConnectionPool + + +def _load_execute_values(): + """Load psycopg2 execute_values helper lazily.""" + extras_module = __import__("psycopg2.extras", fromlist=["execute_values"]) + return extras_module.execute_values + + +def _get_pool(): + """Create (or return) pooled DB connector.""" + if not _cache_enabled(): + return None + global _POOL # pylint: disable=global-statement + if _POOL is not None: + return _POOL + + with _POOL_LOCK: + if _POOL is not None: + return _POOL + pool_class = _load_pool_class() + min_conn, max_conn = _pool_bounds() + _POOL = pool_class( + min_conn, + max_conn, + _database_url(), + connect_timeout=10, + ) + return _POOL + + +def _acquire_connection(): + """Acquire connection from pool.""" + pool = _get_pool() + if pool is None: + return None, None + return pool, pool.getconn() + + +def _release_connection(pool, connection): + """Return connection to pool.""" + if pool is None or connection is None: + return + pool.putconn(connection) + + +def _ensure_schema() -> None: + """Create cache table if needed.""" + global _SCHEMA_READY # pylint: disable=global-statement + if not _cache_enabled(): + return + if _SCHEMA_READY: + return + with _SCHEMA_LOCK: + if _SCHEMA_READY: + return + pool, connection = _acquire_connection() + if connection is None: + return + try: + with connection: + with connection.cursor() as cursor: + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS track_audio_cache ( + track_id TEXT PRIMARY KEY, + payload_json TEXT, + known_missing BOOLEAN NOT NULL DEFAULT FALSE, + miss_reason TEXT, + source TEXT, + updated_at BIGINT NOT NULL + ) + """ + ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_track_audio_cache_updated_at + ON track_audio_cache (updated_at) + """ + ) + _SCHEMA_READY = True + finally: + _release_connection(pool, connection) + + +def get_cached_track_features( + track_ids: list[str], +) -> tuple[dict[str, dict[str, Any]], dict[str, str]]: + """Return cached features and known-miss reasons for provided track IDs.""" + if not _cache_enabled() or not track_ids: + return {}, {} + valid_track_ids = [ + track_id for track_id in track_ids if _is_valid_track_id(track_id) + ] + if not valid_track_ids: + return {}, {} + + _ensure_schema() + now = int(time.time()) + min_feature_updated_at = now - CACHE_TTL_SECONDS + min_miss_updated_at = now - MISS_TTL_SECONDS + + features_by_track_id: dict[str, dict[str, Any]] = {} + misses_by_track_id: dict[str, str] = {} + + pool, connection = _acquire_connection() + if connection is None: + return {}, {} + try: + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT track_id, payload_json, known_missing, miss_reason + FROM track_audio_cache + WHERE track_id = ANY(%s) + AND ( + (known_missing = FALSE AND payload_json IS NOT NULL AND updated_at >= %s) + OR + (known_missing = TRUE AND updated_at >= %s) + ) + """, + (valid_track_ids, min_feature_updated_at, min_miss_updated_at), + ) + rows = cursor.fetchall() + for track_id, payload_json, known_missing, miss_reason in rows: + if known_missing: + misses_by_track_id[str(track_id)] = ( + str(miss_reason) if miss_reason else "known_missing_cached" + ) + continue + + if not payload_json: + continue + try: + payload = json.loads(payload_json) + except (TypeError, json.JSONDecodeError): + continue + if isinstance(payload, dict): + payload["id"] = str(track_id) + features_by_track_id[str(track_id)] = payload + finally: + _release_connection(pool, connection) + + return features_by_track_id, misses_by_track_id + + +def cache_track_features( + features: list[dict[str, Any]], source: str = "reccobeats" +) -> None: + """Upsert resolved track features into cache.""" + if not _cache_enabled() or not features: + return + + _ensure_schema() + now = int(time.time()) + rows: list[tuple[str, str, bool, str, str, int]] = [] + for feature in features: + track_id = feature.get("id") + if not _is_valid_track_id(track_id): + continue + payload = dict(feature) + payload["id"] = str(track_id) + rows.append( + ( + str(track_id), + json.dumps(payload, separators=(",", ":")), + False, + "", + source, + now, + ) + ) + if not rows: + return + + pool, connection = _acquire_connection() + if connection is None: + return + try: + execute_values = _load_execute_values() + with connection: + with connection.cursor() as cursor: + execute_values( + cursor, + """ + INSERT INTO track_audio_cache + (track_id, payload_json, known_missing, miss_reason, source, updated_at) + VALUES %s + ON CONFLICT(track_id) DO UPDATE SET + payload_json=excluded.payload_json, + known_missing=FALSE, + miss_reason='', + source=excluded.source, + updated_at=excluded.updated_at + """, + rows, + template="(%s, %s, %s, %s, %s, %s)", + page_size=500, + ) + finally: + _release_connection(pool, connection) + + +def cache_known_misses(miss_reasons_by_track_id: dict[str, str]) -> None: + """Upsert known misses so future runs can skip repeated failed lookups.""" + if not _cache_enabled() or not miss_reasons_by_track_id: + return + + _ensure_schema() + now = int(time.time()) + rows = [ + (track_id, None, True, reason, "known_miss", now) + for track_id, reason in miss_reasons_by_track_id.items() + if _is_valid_track_id(track_id) + ] + if not rows: + return + + pool, connection = _acquire_connection() + if connection is None: + return + try: + execute_values = _load_execute_values() + with connection: + with connection.cursor() as cursor: + execute_values( + cursor, + """ + INSERT INTO track_audio_cache + (track_id, payload_json, known_missing, miss_reason, source, updated_at) + VALUES %s + ON CONFLICT(track_id) DO UPDATE SET + payload_json=NULL, + known_missing=TRUE, + miss_reason=excluded.miss_reason, + source=excluded.source, + updated_at=excluded.updated_at + """, + rows, + template="(%s, %s, %s, %s, %s, %s)", + page_size=500, + ) + finally: + _release_connection(pool, connection) diff --git a/Frontend/spotify-oauth/firebase.json b/Frontend/spotify-oauth/firebase.json index ca44014..ecb242e 100644 --- a/Frontend/spotify-oauth/firebase.json +++ b/Frontend/spotify-oauth/firebase.json @@ -10,7 +10,7 @@ { "source": "/api/**", "run": { - "serviceId": "splitify-backend", + "serviceId": "splitify-backend-staging", "region": "us-central1" } }, diff --git a/Frontend/spotify-oauth/src/PlaylistInputPage.jsx b/Frontend/spotify-oauth/src/PlaylistInputPage.jsx index 4b7935c..0c232d9 100644 --- a/Frontend/spotify-oauth/src/PlaylistInputPage.jsx +++ b/Frontend/spotify-oauth/src/PlaylistInputPage.jsx @@ -9,6 +9,7 @@ const API_BASE_URL = function PlaylistInputPage() { const [playlists, setPlaylists] = useState([]); const [selectedPlaylists, setSelectedPlaylists] = useState([]); + const [isProcessing, setIsProcessing] = useState(false); useEffect(() => { @@ -43,7 +44,12 @@ function PlaylistInputPage() { const handleProcessPlaylists = () => { console.log("Selected Playlists:", selectedPlaylists); - + if (!selectedPlaylists.length) { + alert("Select at least one playlist."); + return; + } + setIsProcessing(true); + fetch(`${API_BASE_URL}/process-playlist`, { method: "POST", headers: { @@ -52,18 +58,48 @@ function PlaylistInputPage() { credentials: 'include', body: JSON.stringify({ playlistIds: selectedPlaylists }) }) - .then(response => { + .then(async response => { if (!response.ok) { - throw new Error("Network response was not ok"); + const errorText = await response.text(); + throw new Error( + `Request failed (${response.status} ${response.statusText}): ${errorText}` + ); } return response.json(); }) .then(data => { - console.log("Response from server:", data); - alert("Playlists processed successfully!"); + if (!data || !data.jobId) { + throw new Error("Missing jobId from backend response"); + } + console.log("Started processing job:", data.jobId); + return data.jobId; + }) + .then(jobId => { + const pollDelayMs = 4000; + const pollJob = () => fetch( + `${API_BASE_URL}/process-playlist-status/${jobId}`, + { credentials: 'include' } + ) + .then(response => response.json()) + .then(statusPayload => { + const status = statusPayload.status; + if (status === "succeeded") { + alert("Playlists processed successfully!"); + setIsProcessing(false); + return; + } + if (status === "failed") { + throw new Error(statusPayload.error || "Processing failed"); + } + setTimeout(pollJob, pollDelayMs); + }); + + return pollJob(); }) .catch(error => { console.error("There was a problem with the fetch operation:", error); + alert(`Processing failed: ${error.message}`); + setIsProcessing(false); }); }; @@ -90,7 +126,9 @@ function PlaylistInputPage() { ))} - + ); }