Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 77 additions & 3 deletions Backend/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Flask API entrypoint for Spotify auth and playlist processing routes."""

import os
import threading
import time
import uuid
from datetime import timedelta
from pathlib import Path
from urllib.parse import urlencode
Expand Down Expand Up @@ -74,12 +77,57 @@ def parse_bool(value: str | None, default: bool = False) -> bool:
supports_credentials=True,
)

JOB_STATUS_TTL_SECONDS = int(os.getenv("JOB_STATUS_TTL_SECONDS", "21600"))
_PROCESSING_JOBS: dict[str, dict[str, object]] = {}
_PROCESSING_JOBS_LOCK = threading.Lock()


def get_auth_token_from_request():
"""Return auth token from request cookies, falling back to server session."""
return session.get("auth_token") or request.cookies.get("auth_token")


def _prune_old_jobs():
"""Drop old completed jobs to keep in-memory status map bounded."""
cutoff = time.time() - JOB_STATUS_TTL_SECONDS
with _PROCESSING_JOBS_LOCK:
old_job_ids = [
job_id
for job_id, payload in _PROCESSING_JOBS.items()
if payload.get("finished_at") and float(payload["finished_at"]) < cutoff
]
for job_id in old_job_ids:
_PROCESSING_JOBS.pop(job_id, None)


def _set_job_state(job_id: str, **fields):
"""Update one job status entry in a thread-safe way."""
with _PROCESSING_JOBS_LOCK:
payload = _PROCESSING_JOBS.get(job_id, {})
payload.update(fields)
_PROCESSING_JOBS[job_id] = payload


def _run_process_playlist_job(job_id: str, auth_token: str, playlist_ids: list[str]):
"""Run playlist processing in background and persist status fields."""
_set_job_state(job_id, status="running", started_at=time.time())
try:
process_all(auth_token, playlist_ids)
_set_job_state(
job_id,
status="succeeded",
finished_at=time.time(),
error=None,
)
except Exception as error: # pylint: disable=broad-exception-caught
_set_job_state(
job_id,
status="failed",
finished_at=time.time(),
error=str(error),
)


@app.after_request
def add_security_headers(response):
"""Apply baseline security headers for browser-facing responses."""
Expand Down Expand Up @@ -197,7 +245,7 @@ def get_playlist_handler():
@app.route("/process-playlist", methods=["POST"])
@app.route("/api/process-playlist", methods=["POST"])
def process_playlist_handler():
"""Process selected playlists into clustered output playlists."""
"""Start async processing job for selected playlists."""
auth_token = get_auth_token_from_request()

if not auth_token or not is_access_token_valid(auth_token):
Expand All @@ -209,9 +257,35 @@ def process_playlist_handler():
if not playlist_ids:
return "No playlist IDs provided", 400

process_all(auth_token, playlist_ids)
_prune_old_jobs()
job_id = str(uuid.uuid4())
_set_job_state(
job_id,
status="queued",
created_at=time.time(),
finished_at=None,
error=None,
playlist_count=len(playlist_ids),
)
job_thread = threading.Thread(
target=_run_process_playlist_job,
args=(job_id, auth_token, playlist_ids),
daemon=True,
)
job_thread.start()

return jsonify({"jobId": job_id, "status": "queued"}), 202


return jsonify({"message": "Playlists processed successfully!"}), 200
@app.route("/process-playlist-status/<job_id>")
@app.route("/api/process-playlist-status/<job_id>")
def process_playlist_status_handler(job_id):
"""Return current status for an async playlist processing job."""
with _PROCESSING_JOBS_LOCK:
payload = _PROCESSING_JOBS.get(job_id)
if not payload:
return jsonify({"Code": 404, "Error": "Job not found"}), 404
return jsonify(payload), 200


if __name__ == "__main__":
Expand Down
71 changes: 66 additions & 5 deletions Backend/audio_feature_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@
search_track_ids_by_name_artist,
)
from Backend.track_utils import dedupe_track_ids, is_valid_spotify_track_id
from Backend.track_cache import (
get_cached_track_features,
cache_track_features,
cache_known_misses,
)
except ModuleNotFoundError:
from spotify_api import ( # type: ignore
get_reccobeats_audio_features_batch,
get_track_metadata_map,
search_track_ids_by_name_artist,
)
from track_utils import dedupe_track_ids, is_valid_spotify_track_id # type: ignore
from track_cache import ( # type: ignore
get_cached_track_features,
cache_track_features,
cache_known_misses,
)

FALLBACK_SEARCH_LIMIT = 3
FALLBACK_SEARCH_CONCURRENCY = 6
Expand Down Expand Up @@ -51,22 +61,55 @@ async def _search_candidates_for_query(
async def get_track_audio_features(track_ids, auth_token):
"""Fetch audio features from ReccoBeats, then try Spotify-search fallback for misses."""
start_time = time.time()
features, diagnostics, summary = await asyncio.to_thread(
get_reccobeats_audio_features_batch, track_ids, 40, True
unique_track_ids, _ = dedupe_track_ids(track_ids)
cached_features_by_id, cached_misses_by_id = await asyncio.to_thread(
get_cached_track_features, unique_track_ids
)
track_ids_to_fetch = [
track_id
for track_id in unique_track_ids
if track_id not in cached_features_by_id and track_id not in cached_misses_by_id
]

features = list(cached_features_by_id.values())
diagnostics = {
track_id: "ok" for track_id in cached_features_by_id
} | cached_misses_by_id
summary = {
"requested": len(track_ids_to_fetch),
"matched": 0,
"not_returned": 0,
"invalid_response_item_count": 0,
"unexpected_id_count": 0,
}

if track_ids_to_fetch:
fetched_features, fetched_diagnostics, fetched_summary = await asyncio.to_thread(
get_reccobeats_audio_features_batch, track_ids_to_fetch, 40, True
)
features.extend(fetched_features)
diagnostics.update(fetched_diagnostics)
summary = fetched_summary
await asyncio.to_thread(cache_track_features, fetched_features, "reccobeats")

print(
"ReccoBeats diagnostics:",
f"requested={summary['requested']},",
f"matched={summary['matched']},",
f"not_returned={summary['not_returned']},",
f"invalid_items={summary['invalid_response_item_count']},",
f"unexpected_ids={summary['unexpected_id_count']}",
f"unexpected_ids={summary['unexpected_id_count']},",
f"cache_hits={len(cached_features_by_id)},",
f"known_miss_cache_hits={len(cached_misses_by_id)}",
)

missing_track_ids = [
track_id for track_id, reason in diagnostics.items() if reason != "ok"
track_id
for track_id in track_ids_to_fetch
if diagnostics.get(track_id) != "ok"
]
resolved_count = 0
fallback_resolved_features = []

if missing_track_ids:
metadata_map = await asyncio.to_thread(
Expand Down Expand Up @@ -145,11 +188,29 @@ async def get_track_audio_features(track_ids, auth_token):
normalized = dict(replacement)
normalized["id"] = missing_track_id
features.append(normalized)
diagnostics[missing_track_id] = "resolved_via_spotify_search"
fallback_resolved_features.append(normalized)
diagnostics[missing_track_id] = "ok"
resolved_count += 1
elif diagnostics.get(missing_track_id) != "ok":
diagnostics[missing_track_id] = "fallback_candidates_not_in_reccobeats"

unresolved_misses = {
track_id: diagnostics.get(track_id, "unknown")
for track_id in track_ids_to_fetch
if diagnostics.get(track_id) != "ok"
}
if unresolved_misses:
await asyncio.to_thread(cache_known_misses, unresolved_misses)
if fallback_resolved_features:
await asyncio.to_thread(
cache_track_features, fallback_resolved_features, "fallback_replacement"
)

features_by_id = {
row["id"]: row for row in features if isinstance(row, dict) and row.get("id")
}
features = list(features_by_id.values())

if resolved_count > 0:
print(f"Fallback resolved {resolved_count} missing tracks via Spotify search.")

Expand Down
65 changes: 62 additions & 3 deletions Backend/grouping.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Clustering utilities for grouping tracks by audio feature similarity."""

import os

import numpy as np
from sklearn.metrics import pairwise_distances
from sklearn.mixture import GaussianMixture
Expand All @@ -26,6 +28,7 @@
BALANCE_WEIGHT = 0.03
COHESION_SPLIT_DISTANCE = 2.0
COHESION_IMPROVEMENT_RATIO = 0.92
DEFAULT_GMM_N_INIT = 3
FEATURE_WEIGHTS = {
"acousticness": 1.10,
"danceability": 1.35,
Expand All @@ -39,6 +42,33 @@
}


def _env_positive_int(name: str, default_value: int) -> int:
"""Parse positive integer env settings with fallback."""
raw_value = os.getenv(name)
if raw_value is None:
return default_value
parsed_text = raw_value.strip()
if not parsed_text or not parsed_text.lstrip("-").isdigit():
return default_value

parsed_value = int(parsed_text)
if parsed_value <= 0:
return 1
return parsed_value


MAX_K_CANDIDATES = _env_positive_int("CLUSTER_MAX_K_CANDIDATES", 10)
MAX_OUTPUT_PLAYLISTS = _env_positive_int("MAX_OUTPUT_PLAYLISTS", 10)
LARGE_PLAYLIST_TRACK_THRESHOLD = _env_positive_int(
"CLUSTER_LARGE_PLAYLIST_THRESHOLD", 700
)
LARGE_PLAYLIST_MAX_K_CANDIDATES = _env_positive_int(
"CLUSTER_LARGE_MAX_K_CANDIDATES", 6
)
LARGE_PLAYLIST_GMM_N_INIT = _env_positive_int("CLUSTER_LARGE_GMM_N_INIT", 2)
REFINE_MAX_TRACKS = _env_positive_int("CLUSTER_REFINE_MAX_TRACKS", 600)


def _merge_small_clusters(
scaled_features: np.ndarray, labels: np.ndarray, min_cluster_size: int
) -> np.ndarray:
Expand Down Expand Up @@ -145,6 +175,27 @@ def _evaluate_labels(scaled_features: np.ndarray, labels: np.ndarray) -> dict[st
}


def _candidate_k_values(k_upper_bound: int, track_count: int) -> list[int]:
"""Return a bounded set of k candidates for GMM search."""
all_values = list(range(2, k_upper_bound + 1))
if not all_values:
return []

max_candidates = MAX_K_CANDIDATES
if track_count >= LARGE_PLAYLIST_TRACK_THRESHOLD:
max_candidates = min(max_candidates, LARGE_PLAYLIST_MAX_K_CANDIDATES)
if len(all_values) <= max_candidates:
return all_values

index_values = np.linspace(
0, len(all_values) - 1, num=max_candidates, dtype=int
).tolist()
selected_values = sorted({all_values[index] for index in index_values})
if all_values[-1] not in selected_values:
selected_values.append(all_values[-1])
return sorted(set(selected_values))


def _refine_cluster_cohesion(
scaled_features: np.ndarray, labels: np.ndarray, min_cluster_size: int
) -> np.ndarray:
Expand Down Expand Up @@ -232,6 +283,7 @@ def cluster_df(track_audio_features: list[dict]) -> pd.DataFrame:

track_count = len(feature_frame)
k_upper_bound = min(
MAX_OUTPUT_PLAYLISTS,
ABSOLUTE_MAX_CLUSTERS,
max(2, track_count // MIN_CLUSTER_SIZE),
track_count - 1,
Expand All @@ -240,19 +292,26 @@ def cluster_df(track_audio_features: list[dict]) -> pd.DataFrame:
return pd.DataFrame({"id": ids.values, "cluster": [0] * track_count})

candidates = []
gmm_n_init = (
LARGE_PLAYLIST_GMM_N_INIT
if track_count >= LARGE_PLAYLIST_TRACK_THRESHOLD
else DEFAULT_GMM_N_INIT
)
candidate_k_values = _candidate_k_values(k_upper_bound, track_count)

for k in range(2, k_upper_bound + 1):
for k in candidate_k_values:
model = GaussianMixture(
n_components=k,
covariance_type="diag",
n_init=3,
n_init=gmm_n_init,
random_state=0,
)
model.fit(weighted_scaled)
bic = model.bic(weighted_scaled)
labels = model.predict(weighted_scaled)
labels = _merge_small_clusters(weighted_scaled, np.array(labels), MIN_CLUSTER_SIZE)
labels = _refine_cluster_cohesion(weighted_scaled, labels, MIN_CLUSTER_SIZE)
if track_count <= REFINE_MAX_TRACKS:
labels = _refine_cluster_cohesion(weighted_scaled, labels, MIN_CLUSTER_SIZE)
labels = _reindex_labels(labels)
if len(set(labels.tolist())) < 2:
continue
Expand Down
Loading