diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4acfd5f..c095654 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,10 +18,10 @@ jobs: python-version: "3.12" - name: Install dependencies - run: pip install -r requirements-test.txt + run: pip install -e ".[dev]" - name: Run tests with coverage - run: pytest tests/ --cov=. --cov-report=xml --cov-report=term -v + run: pytest tests/ --cov=src/slskd_transform --cov-report=xml --cov-report=term -v - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 diff --git a/.gitignore b/.gitignore index 68bc17f..2c661be 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# User config (contains API keys) +config.yml + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -158,3 +161,4 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +.omc/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3d6b41d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim AS builder + +WORKDIR /app +COPY pyproject.toml . +COPY src/ src/ + +RUN pip install --no-cache-dir --prefix=/install . + +FROM python:3.12-slim + +WORKDIR /app +COPY --from=builder /install /usr/local + +RUN useradd --create-home appuser +USER appuser + +RUN mkdir -p /app/music /app/downloads /app/organized + +ENTRYPOINT ["slskd-transform"] +CMD ["search"] diff --git a/README.md b/README.md index 5c8c343..8e57757 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@

License - Python 3.8+ + Python 3.10+ Requires slskd Stars Coverage @@ -16,86 +16,171 @@ --- -**slskd-transform** is a Python tool that scans your local music library, searches the [Soulseek](https://www.slsknet.org/) network through [slskd](https://github.com/slskd/slskd) for matching FLAC versions of each track, and automatically enqueues them for download. It matches songs by **audio duration** rather than filenames alone, ensuring you get the correct track every time. Songs that cannot be found are reported in a CSV file for manual follow-up. +**slskd-transform** scans your local music library, searches the [Soulseek](https://www.slsknet.org/) network through [slskd](https://github.com/slskd/slskd) for matching FLAC versions of each track, and automatically enqueues them for download. It matches songs by **audio duration** rather than filenames alone, ensuring you get the correct track every time. -A companion script handles post-download organization, renaming all downloaded FLACs into a clean `Artist - Title.flac` structure using embedded metadata. +A companion `rename` command handles post-download organization, renaming all downloaded FLACs into a clean `Artist - Title.flac` structure using embedded metadata. ## Features -- **Duration-based matching** -- Compares local track duration against search results with a configurable tolerance (default: 15 seconds), avoiding mismatches from inconsistent naming. -- **Multi-threaded search** -- Distributes searches across multiple threads (default: 5) for faster processing of large libraries. +- **Duration-based matching** -- Compares local track duration against search results with a configurable tolerance (default: 15 seconds). +- **Recursive scanning** -- Point it at your existing music library with `--recursive`, no need to flatten files first. +- **Multi-threaded search** -- Distributes searches across multiple threads (default: 5) for faster processing. +- **Flexible configuration** -- Config file, environment variables, or CLI flags. No code editing required. - **Automatic enqueue** -- Matched FLAC files are enqueued for download directly through the slskd API. -- **CSV reporting** -- Tracks that could not be found are written to `unfound_songs.csv` for later review. -- **Metadata-based renaming** -- The `rename-files.py` script reads FLAC tags and renames files to `Artist - Title.flac`, sanitizing any invalid characters. +- **CSV reporting** -- Tracks that could not be found are written to `unfound_songs.csv`. +- **Metadata-based renaming** -- Reads FLAC tags and renames files to `Artist - Title.flac`. +- **Docker support** -- Run alongside slskd in the same compose stack. ## Prerequisites -- **Python 3.8+** -- **[slskd](https://github.com/slskd/slskd)** running and accessible (by default at `http://127.0.0.1:5030`) +- **Python 3.10+** +- **[slskd](https://github.com/slskd/slskd)** running and accessible - A valid slskd **API key** (configured in slskd's settings) -- A local directory containing the lossy music files you want to upgrade ## Installation +```bash +pip install git+https://github.com/GeiserX/slskd-transform.git +``` + +Or for development: + ```bash git clone https://github.com/GeiserX/slskd-transform.git cd slskd-transform -pip install -r requirements.txt +pip install -e ".[dev]" ``` -## Usage +## Quick Start -### Step 1 -- Search and enqueue FLAC downloads +```bash +# Set your API key (or put it in config.yml) +export SLSKD_API_KEY="your-api-key-here" + +# Search for FLAC versions of all files in ./music +slskd-transform search -1. Place your lossy music files (MP3, AAC, OGG, etc.) in a `music/` directory inside the project root. -2. Open `main.py` and set your slskd connection details: +# Search recursively in your existing library +slskd-transform search --music-dir /path/to/library --recursive -```python -slskd = slskd_api.SlskdClient( - host="http://127.0.0.1:5030", - api_key="YOUR_API_KEY", - verify_ssl=False -) +# Rename downloaded FLACs using metadata +slskd-transform rename --source-dir /path/to/downloads --dest-dir /path/to/organized ``` -3. Run the search: +## Configuration + +slskd-transform loads configuration from multiple sources with this priority: -```bash -python main.py +``` +CLI flags > Environment variables > Config file > Defaults ``` -The script will search Soulseek for a FLAC version of each local track, match by duration, and enqueue any matches for download. Any songs that could not be found will be saved to `unfound_songs.csv`. +### Config File -### Step 2 -- Rename downloaded FLACs +Create `config.yml` in your working directory or `~/.config/slskd-transform/config.yml`: -Once downloads are complete, use the renaming script to organize them: +```yaml +# slskd connection +host: "http://127.0.0.1:5030" +api_key: "your-api-key" +verify_ssl: false -1. Open `rename-files.py` and set the source and destination directories: +# Search settings +music_dir: "./music" +duration_tolerance: 15 +num_threads: 5 +search_timeout: 60 +format: "flac" +recursive: false -```python -source_directory = '/path/to/slskd/downloads' -destination_directory = '/path/to/organized/music' +# Rename settings +source_dir: "./downloads" +destination_dir: "./organized" ``` -2. Run the script: +### Environment Variables + +All settings can be configured via `SLSKD_` prefixed environment variables: + +| Variable | Description | Default | +|----------|-------------|---------| +| `SLSKD_HOST` | slskd instance URL | `http://127.0.0.1:5030` | +| `SLSKD_API_KEY` | slskd API key | -- | +| `SLSKD_VERIFY_SSL` | Enable SSL verification | `false` | +| `SLSKD_MUSIC_DIR` | Source directory with lossy files | `./music` | +| `SLSKD_DURATION_TOLERANCE` | Max duration difference (seconds) | `15` | +| `SLSKD_NUM_THREADS` | Concurrent search threads | `5` | +| `SLSKD_SEARCH_TIMEOUT` | Wait time for search results (seconds) | `60` | +| `SLSKD_FORMAT` | Target format to search for | `flac` | +| `SLSKD_RECURSIVE` | Scan directories recursively | `false` | +| `SLSKD_SOURCE_DIR` | Download directory for rename | `./downloads` | +| `SLSKD_DESTINATION_DIR` | Output directory for rename | `./organized` | + +### CLI Reference -```bash -python rename-files.py +``` +slskd-transform [OPTIONS] COMMAND [ARGS]... + +Options: + -c, --config PATH Path to config.yml + --host TEXT slskd host URL + --api-key TEXT slskd API key + --no-verify-ssl Disable SSL verification + -t, --threads INTEGER Number of search threads + --help Show help + +Commands: + search Search Soulseek for lossless versions and enqueue downloads + rename Rename downloaded FLACs using metadata ``` -All `.flac` files in the source directory (including subdirectories) will be renamed to `Artist - Title.flac` and moved to the destination. +**search options:** +``` + -m, --music-dir PATH Directory with lossy source files + -r, --recursive Scan music directory recursively + -f, --format TEXT Target format (default: flac) + --tolerance INTEGER Duration match tolerance in seconds + --timeout INTEGER Seconds to wait for search results +``` -## Configuration +**rename options:** +``` + -s, --source-dir PATH Directory where slskd downloads land + -d, --dest-dir PATH Destination for renamed files +``` -| Parameter | Location | Default | Description | -|---|---|---|---| -| `host` | `main.py` | `http://127.0.0.1:5030` | slskd instance URL | -| `api_key` | `main.py` | -- | Your slskd API key | -| `MUSIC_DIR` | `main.py` | `./music` | Directory containing lossy source files | -| `duration_tolerance` | `main.py` | `15` (seconds) | Maximum duration difference for a match | -| `num_threads` | `main.py` | `5` | Number of concurrent search threads | -| `source_directory` | `rename-files.py` | -- | Where slskd downloads land | -| `destination_directory` | `rename-files.py` | -- | Where renamed FLACs are moved | +## Docker + +```bash +docker run --rm \ + -e SLSKD_HOST=http://slskd:5030 \ + -e SLSKD_API_KEY=your-key \ + -v /path/to/lossy:/app/music:ro \ + -v /path/to/downloads:/app/downloads \ + ghcr.io/geiserx/slskd-transform:2.0.0 search --recursive +``` + +Or in a compose stack alongside slskd: + +```yaml +services: + slskd: + image: slskd/slskd:0.21.4 + ports: + - "5030:5030" + volumes: + - ./slskd-data:/app + + slskd-transform: + image: ghcr.io/geiserx/slskd-transform:2.0.0 + environment: + SLSKD_HOST: http://slskd:5030 + SLSKD_API_KEY: your-key + volumes: + - /path/to/lossy:/app/music:ro + - ./slskd-data/downloads:/app/downloads + command: ["search", "--recursive"] +``` ## How It Works @@ -110,16 +195,10 @@ Local library Soulseek network Your disk │ │ │ └──── no match ──> unfound_songs.csv │ v - rename-files.py + slskd-transform rename Artist - Title.flac ``` -1. **Scan** -- `main.py` reads every file in the `music/` directory using [mutagen](https://mutagen.readthedocs.io/) to extract the audio duration. -2. **Search** -- For each track, a Soulseek search is issued via the slskd API with the query `" flac"`. Searches run in parallel across multiple threads with staggered starts to avoid flooding the network. -3. **Match** -- Each search result is compared by duration. The first result within the tolerance window is selected. -4. **Enqueue** -- Matched files are enqueued for download through slskd. Failed or unmatched tracks are logged. -5. **Rename** -- After downloading, `rename-files.py` walks the download directory, reads FLAC metadata, and moves files into a flat structure with clean filenames. - ## Related Music Tools | Project | Description | @@ -128,7 +207,6 @@ Local library Soulseek network Your disk | [audio-transcode-watcher](https://github.com/GeiserX/audio-transcode-watcher) | Automated multi-format audio transcoding with lyrics fetching | | [jellyfin-encoder](https://github.com/GeiserX/jellyfin-encoder) | Automatic 720p HEVC/AV1 transcoding for Jellyfin | - ## License -This project is licensed under the [MIT License](LICENSE). +This project is licensed under the [GPL-3.0 License](LICENSE). diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..f1147c5 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,19 @@ +# slskd-transform configuration +# All fields are optional — defaults shown below + +# slskd connection +host: "http://127.0.0.1:5030" +api_key: "" # REQUIRED (or set SLSKD_API_KEY env var) +verify_ssl: false + +# Search settings +music_dir: "./music" # Where your lossy source files live +duration_tolerance: 15 # Seconds of acceptable duration difference +num_threads: 5 # Concurrent search threads +search_timeout: 60 # Seconds to wait for search results +format: "flac" # Target format (appended to search query) +recursive: false # Scan music_dir recursively + +# Rename settings +source_dir: "./downloads" # Where slskd puts completed downloads +destination_dir: "./organized" # Where renamed FLACs are moved to diff --git a/main.py b/main.py index 063ef9d..4dc05f6 100644 --- a/main.py +++ b/main.py @@ -1,118 +1,5 @@ -import os -import slskd_api -import time -import mutagen -import requests -import csv -from threading import Thread - -def write_unfound_songs_to_csv(unfound_songs, filename): - with open(filename, mode='w', newline='', encoding='utf-8') as csv_file: - csv_writer = csv.writer(csv_file) - csv_writer.writerow(['Song Name']) - - for song in unfound_songs: - csv_writer.writerow([song]) - -def list_files_without_ext(music_dir): - filenames = [] - for file in os.listdir(music_dir): - if not file.startswith('.'): - file_without_ext = os.path.splitext(file)[0] - filenames.append(file_without_ext) - return filenames - -def list_files_with_duration(music_dir): - filenames = [] - for file in os.listdir(music_dir): - if not file.startswith('.'): - file_without_ext = os.path.splitext(file)[0] - file_path = os.path.join(music_dir, file) - audio_info = mutagen.File(file_path, easy=True) - duration = int(audio_info.info.length) - filenames.append((file_without_ext, duration)) - return filenames - -def find_close_duration_song(search_results, local_duration): - duration_tolerance = 15 # Set the duration tolerance in seconds - for result in search_results: - if 'files' in result and len(result['files']) > 0: - file_info = result['files'][0] - try: - result_duration = int(file_info['length']) - if abs(local_duration - result_duration) <= duration_tolerance: - return result - except KeyError: - pass - return None - -def remove_hyphens_and_trim(song_name): - return ' '.join(segment.strip() for segment in song_name.split('-')) - -def search_and_enqueue(songs_with_duration, unfound_songs): - for song_name, local_duration in songs_with_duration: - song_name_cleaned = remove_hyphens_and_trim(song_name) - print(f"Searching for: {song_name_cleaned}") - - search_response = slskd.searches.search_text(searchText=(song_name_cleaned + " flac")) - - # Wait a few seconds for search results to populate - time.sleep(60) - - search_id = search_response['id'] - search_results = slskd.searches.search_responses(id=search_id) - - result = find_close_duration_song(search_results, local_duration) - if result is not None: - file_info = result['files'][0] - print(f"Enqueueing: {file_info['filename']}") - try: - success = slskd.transfers.enqueue(username=result['username'], files=[{'filename': file_info['filename'], 'size': file_info['size']}]) - if success: - print(f"Enqueued: {file_info['filename']}") - else: - print(f"Failed to enqueue: {file_info['filename']}") - unfound_songs.append(song_name) - except requests.exceptions.HTTPError as http_error: - print(f"Error while sending request to slskd: {http_error}") - unfound_songs.append(song_name) - else: - print(f"Failed to find matching song with close duration for: {song_name}") - unfound_songs.append(song_name) - -def threaded_search_and_enqueue(songs_with_duration, unfound_songs, num_threads=5): - thread_list = [] - chunk_size = len(songs_with_duration) // num_threads - - for i in range(num_threads): - if i == num_threads - 1: # last thread takes the remaining songs - chunk = songs_with_duration[i * chunk_size:] - else: - chunk = songs_with_duration[i * chunk_size : (i + 1) * chunk_size] - thread = Thread(target=search_and_enqueue, args=(chunk, unfound_songs)) - thread.start() - thread_list.append(thread) - time.sleep(10) - - for thread in thread_list: - thread.join() - -def main(): - global slskd - slskd = slskd_api.SlskdClient(host="http://127.0.0.1:5030", api_key="...", verify_ssl=False) - - current_dir = os.getcwd() - MUSIC_DIR = os.path.join(current_dir, 'music') - songs_with_duration = list_files_with_duration(MUSIC_DIR) - - unfound_songs = [] - - threaded_search_and_enqueue(songs_with_duration, unfound_songs) - - if len(unfound_songs) > 0: - write_unfound_songs_to_csv(unfound_songs, 'unfound_songs.csv') - print("Unfound songs have been written to 'unfound_songs.csv'") - +"""Backwards-compatibility shim. Use `slskd-transform search` instead.""" +from slskd_transform.cli import cli if __name__ == '__main__': - main() + cli(["search"]) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..093e5c9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "slskd-transform" +version = "2.0.0" +description = "Bulk-upgrade your music library from lossy to lossless via Soulseek" +readme = "README.md" +license = "GPL-3.0-only" +requires-python = ">=3.10" +authors = [ + { name = "GeiserX" }, +] +dependencies = [ + "slskd-api>=0.1.5", + "mutagen>=1.47", + "requests>=2.31", + "click>=8.1", + "pyyaml>=6.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", + "pytest-cov>=4.0", +] + +[project.scripts] +slskd-transform = "slskd_transform.cli:cli" + +[tool.hatch.build.targets.wheel] +packages = ["src/slskd_transform"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "--cov=src/slskd_transform --cov-report=term" diff --git a/rename-files.py b/rename-files.py index 384ec0a..97b3e20 100644 --- a/rename-files.py +++ b/rename-files.py @@ -1,44 +1,5 @@ -import os -import shutil -from mutagen.flac import FLAC - -def collect_flac_files(directory): - flac_files = [] - for root, dirs, files in os.walk(directory): - for file in files: - if file.lower().endswith('.flac'): - flac_files.append(os.path.join(root, file)) - return flac_files - -def extract_metadata(file_path): - audio = FLAC(file_path) - title = audio.get('title', ['Unknown'])[0] - artist = audio.get('artist', ['Unknown'])[0] - return title, artist - -def sanitize_filename(filename): - invalid_chars = '\\/:*?"<>|' - for char in invalid_chars: - filename = filename.replace(char, '') - return filename - -def move_and_rename_flac_files(flac_files, destination): - for file in flac_files: - title, artist = extract_metadata(file) - new_filename = sanitize_filename(f"{artist} - {title}.flac") - new_filepath = os.path.join(destination, new_filename) - - # Move or rename the .flac file - shutil.move(file, new_filepath) - print(f"Moved and renamed: {new_filepath}") - -def main(): - source_directory = 'G:\\slskd\\downloads' - destination_directory = 'G:\\slskd\\' - flac_files = collect_flac_files(source_directory) - - move_and_rename_flac_files(flac_files, destination_directory) - +"""Backwards-compatibility shim. Use `slskd-transform rename` instead.""" +from slskd_transform.cli import cli if __name__ == '__main__': - main() + cli(["rename"]) diff --git a/requirements-test.txt b/requirements-test.txt deleted file mode 100644 index 3bc0049..0000000 --- a/requirements-test.txt +++ /dev/null @@ -1,3 +0,0 @@ --r requirements.txt -pytest>=9.0.3 -pytest-cov>=7.1.0 diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 821f087..0000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -slskd_api -mutagen -requests diff --git a/src/slskd_transform/__init__.py b/src/slskd_transform/__init__.py new file mode 100644 index 0000000..2f85346 --- /dev/null +++ b/src/slskd_transform/__init__.py @@ -0,0 +1,3 @@ +"""slskd-transform: Bulk-upgrade your music library from lossy to lossless via Soulseek.""" + +__version__ = "2.0.0" diff --git a/src/slskd_transform/cli.py b/src/slskd_transform/cli.py new file mode 100644 index 0000000..ec29a25 --- /dev/null +++ b/src/slskd_transform/cli.py @@ -0,0 +1,75 @@ +import click +from pathlib import Path + +from slskd_transform.config import load_config +from slskd_transform.search import run_search +from slskd_transform.rename import run_rename + + +@click.group() +@click.option("--config", "-c", type=click.Path(exists=True), default=None, help="Path to config.yml") +@click.option("--host", type=str, default=None, help="slskd host URL") +@click.option("--api-key", type=str, default=None, help="slskd API key") +@click.option("--no-verify-ssl", is_flag=True, default=False, help="Disable SSL verification") +@click.option("--threads", "-t", type=int, default=None, help="Number of search threads") +@click.pass_context +def cli(ctx, config, host, api_key, no_verify_ssl, threads): + """slskd-transform: Bulk-upgrade your music library from lossy to lossless.""" + ctx.ensure_object(dict) + ctx.obj["config_path"] = Path(config) if config else None + ctx.obj["cli_overrides"] = {} + if host is not None: + ctx.obj["cli_overrides"]["host"] = host + if api_key is not None: + ctx.obj["cli_overrides"]["api_key"] = api_key + if no_verify_ssl: + ctx.obj["cli_overrides"]["verify_ssl"] = False + if threads is not None: + ctx.obj["cli_overrides"]["num_threads"] = threads + + +@cli.command() +@click.option("--music-dir", "-m", type=click.Path(exists=True), default=None, help="Directory with lossy source files") +@click.option("--recursive", "-r", is_flag=True, default=None, help="Scan music directory recursively") +@click.option("--format", "-f", "fmt", type=str, default=None, help="Target format (default: flac)") +@click.option("--tolerance", type=int, default=None, help="Duration match tolerance in seconds") +@click.option("--timeout", type=int, default=None, help="Seconds to wait for search results") +@click.pass_context +def search(ctx, music_dir, recursive, fmt, tolerance, timeout): + """Search Soulseek for lossless versions and enqueue downloads.""" + overrides = dict(ctx.obj["cli_overrides"]) + if music_dir is not None: + overrides["music_dir"] = music_dir + if recursive is not None: + overrides["recursive"] = recursive + if fmt is not None: + overrides["format"] = fmt + if tolerance is not None: + overrides["duration_tolerance"] = tolerance + if timeout is not None: + overrides["search_timeout"] = timeout + + config = load_config(config_path=ctx.obj["config_path"], cli_overrides=overrides) + + if not config.api_key: + raise click.ClickException( + "No API key configured. Set SLSKD_API_KEY env var, add api_key to config.yml, or pass --api-key." + ) + + run_search(config) + + +@cli.command() +@click.option("--source-dir", "-s", type=click.Path(exists=True), default=None, help="Directory where slskd downloads land") +@click.option("--dest-dir", "-d", type=click.Path(), default=None, help="Destination for renamed files") +@click.pass_context +def rename(ctx, source_dir, dest_dir): + """Rename downloaded FLACs to 'Artist - Title.flac' using metadata.""" + overrides = dict(ctx.obj["cli_overrides"]) + if source_dir is not None: + overrides["source_dir"] = source_dir + if dest_dir is not None: + overrides["destination_dir"] = dest_dir + + config = load_config(config_path=ctx.obj["config_path"], cli_overrides=overrides) + run_rename(config) diff --git a/src/slskd_transform/config.py b/src/slskd_transform/config.py new file mode 100644 index 0000000..8aab466 --- /dev/null +++ b/src/slskd_transform/config.py @@ -0,0 +1,132 @@ +"""Configuration loading for slskd-transform.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +try: + import yaml +except ImportError: + yaml = None # type: ignore[assignment] + + +DEFAULTS: dict[str, object] = { + "host": "http://127.0.0.1:5030", + "api_key": "", + "verify_ssl": False, + "music_dir": "./music", + "source_dir": "./downloads", + "destination_dir": "./organized", + "duration_tolerance": 15, + "num_threads": 5, + "search_timeout": 60, + "format": "flac", + "recursive": False, +} + +_ENV_MAP: dict[str, str] = { + "SLSKD_HOST": "host", + "SLSKD_API_KEY": "api_key", + "SLSKD_VERIFY_SSL": "verify_ssl", + "SLSKD_MUSIC_DIR": "music_dir", + "SLSKD_SOURCE_DIR": "source_dir", + "SLSKD_DESTINATION_DIR": "destination_dir", + "SLSKD_DURATION_TOLERANCE": "duration_tolerance", + "SLSKD_NUM_THREADS": "num_threads", + "SLSKD_SEARCH_TIMEOUT": "search_timeout", + "SLSKD_FORMAT": "format", + "SLSKD_RECURSIVE": "recursive", +} + +_BOOL_FIELDS = {"verify_ssl", "recursive"} +_INT_FIELDS = {"duration_tolerance", "num_threads", "search_timeout"} +_PATH_FIELDS = {"music_dir", "source_dir", "destination_dir"} + + +def _parse_bool(value: object) -> bool: + if isinstance(value, bool): + return value + return str(value).lower() in ("true", "1", "yes") + + +@dataclass(frozen=True) +class SlskdConfig: + host: str + api_key: str + verify_ssl: bool + music_dir: Path + source_dir: Path + destination_dir: Path + duration_tolerance: int + num_threads: int + search_timeout: int + format: str + recursive: bool + + +def load_config( + *, + config_path: Path | None = None, + cli_overrides: dict[str, object] | None = None, +) -> SlskdConfig: + """Load configuration from YAML, env vars, and CLI overrides.""" + merged: dict[str, object] = dict(DEFAULTS) + + # Load YAML file + file_data = _load_yaml(config_path) + for key, value in file_data.items(): + if key in DEFAULTS: + merged[key] = value + + # Overlay environment variables + for env_var, field in _ENV_MAP.items(): + value = os.environ.get(env_var) + if value is not None: + merged[field] = value + + # Overlay CLI overrides + if cli_overrides: + for key, value in cli_overrides.items(): + if value is not None and key in DEFAULTS: + merged[key] = value + + # Convert types + for field in _BOOL_FIELDS: + merged[field] = _parse_bool(merged[field]) + for field in _INT_FIELDS: + merged[field] = int(merged[field]) # type: ignore[arg-type] + for field in _PATH_FIELDS: + merged[field] = Path(str(merged[field])) + + return SlskdConfig(**merged) # type: ignore[arg-type] + + +def _load_yaml(config_path: Path | None) -> dict[str, object]: + """Load YAML config from explicit path or auto-discovered locations.""" + if config_path is not None: + return _read_yaml(config_path) + + # Auto-discover + candidates = [ + Path("./config.yml"), + Path.home() / ".config" / "slskd-transform" / "config.yml", + ] + for candidate in candidates: + if candidate.exists(): + return _read_yaml(candidate) + + return {} + + +def _read_yaml(path: Path) -> dict[str, object]: + """Read and parse a YAML file. Returns empty dict on failure.""" + if yaml is None: + return {} + try: + with open(path) as f: + data = yaml.safe_load(f) + return data if isinstance(data, dict) else {} + except (OSError, yaml.YAMLError): + return {} diff --git a/src/slskd_transform/rename.py b/src/slskd_transform/rename.py new file mode 100644 index 0000000..fe13c5a --- /dev/null +++ b/src/slskd_transform/rename.py @@ -0,0 +1,44 @@ +"""Rename and organize downloaded FLAC files by metadata.""" + +import os +import shutil +from pathlib import Path + +from mutagen.flac import FLAC + +from slskd_transform.config import SlskdConfig +from slskd_transform.utils import sanitize_filename + + +def collect_flac_files(directory: Path) -> list[Path]: + """Recursively collect all .flac files (case-insensitive) under directory.""" + flac_files = [] + for root, _dirs, files in os.walk(directory): + for file in files: + if file.lower().endswith(".flac"): + flac_files.append(Path(root) / file) + return flac_files + + +def extract_metadata(file_path: Path) -> tuple[str, str]: + """Read FLAC tags, return (title, artist). Defaults to 'Unknown' if missing.""" + audio = FLAC(str(file_path)) + title = audio.get("title", ["Unknown"])[0] + artist = audio.get("artist", ["Unknown"])[0] + return title, artist + + +def move_and_rename_flac_files(flac_files: list[Path], destination: Path) -> None: + """Move each file to destination as 'Artist - Title.flac' (sanitized).""" + for file in flac_files: + title, artist = extract_metadata(file) + new_filename = sanitize_filename(f"{artist} - {title}.flac") + shutil.move(str(file), str(destination / new_filename)) + print(f"Moved and renamed: {destination / new_filename}") + + +def run_rename(config: SlskdConfig) -> None: + """Top-level orchestrator: collect files from source_dir, rename to destination_dir.""" + config.destination_dir.mkdir(parents=True, exist_ok=True) + flac_files = collect_flac_files(config.source_dir) + move_and_rename_flac_files(flac_files, config.destination_dir) diff --git a/src/slskd_transform/search.py b/src/slskd_transform/search.py new file mode 100644 index 0000000..282f9dd --- /dev/null +++ b/src/slskd_transform/search.py @@ -0,0 +1,185 @@ +"""Search and enqueue module for slskd-transform.""" + +from __future__ import annotations + +import os +import time +from pathlib import Path +from threading import Thread + +import mutagen +import requests +import slskd_api + +from slskd_transform.config import SlskdConfig +from slskd_transform.utils import write_unfound_songs_to_csv + + +def list_files_with_duration( + music_dir: Path | str, + *, + recursive: bool = False, +) -> list[tuple[str, int]]: + """Scan a directory for audio files and return (filename_without_ext, duration) tuples.""" + filenames: list[tuple[str, int]] = [] + + if recursive: + for root, _dirs, files in os.walk(music_dir): + for file in files: + if file.startswith('.'): + continue + file_path = os.path.join(root, file) + audio_info = mutagen.File(file_path, easy=True) + if audio_info is None or audio_info.info is None: + continue + duration = int(audio_info.info.length) + file_without_ext = os.path.splitext(file)[0] + filenames.append((file_without_ext, duration)) + else: + for file in os.listdir(music_dir): + if file.startswith('.'): + continue + file_path = os.path.join(str(music_dir), file) + if not os.path.isfile(file_path): + continue + audio_info = mutagen.File(file_path, easy=True) + if audio_info is None or audio_info.info is None: + continue + duration = int(audio_info.info.length) + file_without_ext = os.path.splitext(file)[0] + filenames.append((file_without_ext, duration)) + + return filenames + + +def find_close_duration_song( + search_results: list[dict], + local_duration: int, + *, + tolerance: int = 15, +) -> dict | None: + """Return the first search result whose duration is within tolerance of local_duration.""" + for result in search_results: + if 'files' in result and len(result['files']) > 0: + file_info = result['files'][0] + try: + result_duration = int(file_info['length']) + if abs(local_duration - result_duration) <= tolerance: + return result + except KeyError: + pass + return None + + +def remove_hyphens_and_trim(song_name: str) -> str: + """Split on hyphens, strip each segment, and rejoin with spaces.""" + return ' '.join(segment.strip() for segment in song_name.split('-')) + + +def search_and_enqueue( + songs_with_duration: list[tuple[str, int]], + unfound_songs: list[str], + *, + config: SlskdConfig, + client: slskd_api.SlskdClient, +) -> None: + """Search slskd for each song and enqueue matching results.""" + for song_name, local_duration in songs_with_duration: + song_name_cleaned = remove_hyphens_and_trim(song_name) + print(f"Searching for: {song_name_cleaned}") + + search_response = client.searches.search_text( + searchText=(song_name_cleaned + " " + config.format) + ) + + time.sleep(config.search_timeout) + + search_id = search_response['id'] + search_results = client.searches.search_responses(id=search_id) + + result = find_close_duration_song( + search_results, local_duration, tolerance=config.duration_tolerance + ) + if result is not None: + file_info = result['files'][0] + print(f"Enqueueing: {file_info['filename']}") + try: + success = client.transfers.enqueue( + username=result['username'], + files=[{'filename': file_info['filename'], 'size': file_info['size']}], + ) + if success: + print(f"Enqueued: {file_info['filename']}") + else: + print(f"Failed to enqueue: {file_info['filename']}") + unfound_songs.append(song_name) + except requests.exceptions.HTTPError as http_error: + print(f"Error while sending request to slskd: {http_error}") + unfound_songs.append(song_name) + else: + print(f"Failed to find matching song with close duration for: {song_name}") + unfound_songs.append(song_name) + + +def threaded_search_and_enqueue( + songs_with_duration: list[tuple[str, int]], + unfound_songs: list[str], + *, + config: SlskdConfig, + client: slskd_api.SlskdClient, +) -> None: + """Split songs into chunks and search/enqueue in parallel threads.""" + if not songs_with_duration: + return + num_threads = min(max(config.num_threads, 1), len(songs_with_duration)) + thread_list: list[Thread] = [] + chunk_size = len(songs_with_duration) // num_threads + + per_thread_unfound: list[list[str]] = [[] for _ in range(num_threads)] + + for i in range(num_threads): + if i == num_threads - 1: + chunk = songs_with_duration[i * chunk_size:] + else: + chunk = songs_with_duration[i * chunk_size : (i + 1) * chunk_size] + thread = Thread( + target=search_and_enqueue, + args=(chunk, per_thread_unfound[i]), + kwargs={"config": config, "client": client}, + ) + thread.start() + thread_list.append(thread) + time.sleep(10) + + for thread in thread_list: + thread.join() + + for thread_unfound in per_thread_unfound: + unfound_songs.extend(thread_unfound) + + +def run_search(config: SlskdConfig) -> None: + """Top-level orchestrator: create client, scan directory, search, and write CSV.""" + client = slskd_api.SlskdClient( + host=config.host, + api_key=config.api_key, + verify_ssl=config.verify_ssl, + ) + + songs_with_duration = list_files_with_duration( + config.music_dir, + recursive=config.recursive, + ) + + unfound_songs: list[str] = [] + + threaded_search_and_enqueue( + songs_with_duration, + unfound_songs, + config=config, + client=client, + ) + + if len(unfound_songs) > 0: + write_unfound_songs_to_csv(unfound_songs, config.music_dir / 'unfound_songs.csv') + print("Unfound songs have been written to 'unfound_songs.csv'") diff --git a/src/slskd_transform/utils.py b/src/slskd_transform/utils.py new file mode 100644 index 0000000..5dc9a6c --- /dev/null +++ b/src/slskd_transform/utils.py @@ -0,0 +1,19 @@ +import csv +from pathlib import Path + + +def write_unfound_songs_to_csv(unfound_songs: list[str], filepath: Path) -> None: + """Write unfound song names to a CSV file with a 'Song Name' header.""" + with open(filepath, mode='w', newline='', encoding='utf-8') as csv_file: + csv_writer = csv.writer(csv_file) + csv_writer.writerow(['Song Name']) + for song in unfound_songs: + csv_writer.writerow([song]) + + +def sanitize_filename(filename: str) -> str: + """Remove characters invalid on Windows/Linux filesystems: \\/:*?"<>|""" + invalid_chars = '\\/:*?"<>|' + for char in invalid_chars: + filename = filename.replace(char, '') + return filename diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..833e0b3 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,35 @@ +from click.testing import CliRunner +from unittest.mock import patch, MagicMock +from pathlib import Path + +from slskd_transform.cli import cli + + +class TestCliSearch: + @patch("slskd_transform.cli.run_search") + @patch("slskd_transform.cli.load_config") + def test_search_with_api_key(self, mock_load, mock_run): + mock_load.return_value = MagicMock(api_key="test-key") + runner = CliRunner() + result = runner.invoke(cli, ["--api-key", "test-key", "search"]) + assert result.exit_code == 0 + mock_run.assert_called_once() + + @patch("slskd_transform.cli.load_config") + def test_search_without_api_key_fails(self, mock_load): + mock_load.return_value = MagicMock(api_key="") + runner = CliRunner() + result = runner.invoke(cli, ["search"]) + assert result.exit_code != 0 + assert "API key" in result.output + + +class TestCliRename: + @patch("slskd_transform.cli.run_rename") + @patch("slskd_transform.cli.load_config") + def test_rename_runs(self, mock_load, mock_run): + mock_load.return_value = MagicMock() + runner = CliRunner() + result = runner.invoke(cli, ["rename"]) + assert result.exit_code == 0 + mock_run.assert_called_once() diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..8a297c9 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,93 @@ +import os +import tempfile +import pytest +from pathlib import Path +from unittest.mock import patch + +from slskd_transform.config import load_config, SlskdConfig, DEFAULTS + + +class TestDefaults: + def test_load_with_no_sources(self): + """With no config file, no env vars, no overrides — get defaults.""" + with patch.dict(os.environ, {}, clear=True): + # Remove any SLSKD_ env vars + env = {k: v for k, v in os.environ.items() if not k.startswith("SLSKD_")} + with patch.dict(os.environ, env, clear=True): + config = load_config(config_path=Path("/nonexistent/config.yml")) + assert config.host == "http://127.0.0.1:5030" + assert config.api_key == "" + assert config.verify_ssl is False + assert config.duration_tolerance == 15 + assert config.num_threads == 5 + assert config.search_timeout == 60 + assert config.format == "flac" + assert config.recursive is False + + +class TestEnvVars: + def test_env_overrides_defaults(self): + env = { + "SLSKD_HOST": "http://custom:9999", + "SLSKD_API_KEY": "test-key", + "SLSKD_VERIFY_SSL": "true", + "SLSKD_DURATION_TOLERANCE": "30", + "SLSKD_NUM_THREADS": "10", + "SLSKD_RECURSIVE": "yes", + } + with patch.dict(os.environ, env, clear=False): + config = load_config(config_path=Path("/nonexistent/config.yml")) + assert config.host == "http://custom:9999" + assert config.api_key == "test-key" + assert config.verify_ssl is True + assert config.duration_tolerance == 30 + assert config.num_threads == 10 + assert config.recursive is True + + def test_bool_env_variations(self): + for truthy in ("true", "1", "yes", "True", "YES"): + with patch.dict(os.environ, {"SLSKD_VERIFY_SSL": truthy}, clear=False): + config = load_config(config_path=Path("/nonexistent/config.yml")) + assert config.verify_ssl is True + + for falsy in ("false", "0", "no", ""): + with patch.dict(os.environ, {"SLSKD_VERIFY_SSL": falsy}, clear=False): + config = load_config(config_path=Path("/nonexistent/config.yml")) + assert config.verify_ssl is False + + +class TestYamlConfig: + def test_loads_from_yaml(self): + import yaml + config_data = { + "host": "http://yaml-host:5030", + "api_key": "yaml-key", + "num_threads": 8, + } + with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f: + yaml.dump(config_data, f) + f.flush() + config = load_config(config_path=Path(f.name)) + assert config.host == "http://yaml-host:5030" + assert config.api_key == "yaml-key" + assert config.num_threads == 8 + os.unlink(f.name) + + +class TestCliOverrides: + def test_cli_overrides_env_and_yaml(self): + with patch.dict(os.environ, {"SLSKD_HOST": "http://env:5030"}, clear=False): + config = load_config( + config_path=Path("/nonexistent/config.yml"), + cli_overrides={"host": "http://cli:5030", "num_threads": 3} + ) + assert config.host == "http://cli:5030" + assert config.num_threads == 3 + + def test_none_overrides_are_skipped(self): + config = load_config( + config_path=Path("/nonexistent/config.yml"), + cli_overrides={"host": None, "num_threads": 7} + ) + assert config.host == "http://127.0.0.1:5030" + assert config.num_threads == 7 diff --git a/tests/test_rename.py b/tests/test_rename.py new file mode 100644 index 0000000..fc3790f --- /dev/null +++ b/tests/test_rename.py @@ -0,0 +1,74 @@ +import tempfile +import os +from pathlib import Path +from unittest.mock import patch, MagicMock + +from slskd_transform.rename import collect_flac_files, extract_metadata, move_and_rename_flac_files + + +class TestCollectFlacFiles: + def test_finds_flac_files(self): + with tempfile.TemporaryDirectory() as tmpdir: + Path(tmpdir, "song.flac").touch() + Path(tmpdir, "other.mp3").touch() + subdir = Path(tmpdir, "subdir") + subdir.mkdir() + Path(subdir, "nested.FLAC").touch() + + result = collect_flac_files(Path(tmpdir)) + assert len(result) == 2 + + def test_empty_directory(self): + with tempfile.TemporaryDirectory() as tmpdir: + result = collect_flac_files(Path(tmpdir)) + assert result == [] + + def test_case_insensitive(self): + with tempfile.TemporaryDirectory() as tmpdir: + Path(tmpdir, "lower.flac").touch() + Path(tmpdir, "upper.FLAC").touch() + Path(tmpdir, "mixed.Flac").touch() + result = collect_flac_files(Path(tmpdir)) + assert len(result) == 3 + + +class TestExtractMetadata: + @patch("slskd_transform.rename.FLAC") + def test_extracts_title_and_artist(self, mock_flac_cls): + mock_audio = MagicMock() + mock_audio.get.side_effect = lambda key, default: { + "title": ["My Song"], "artist": ["My Artist"] + }.get(key, default) + mock_flac_cls.return_value = mock_audio + + title, artist = extract_metadata(Path("/fake/path.flac")) + assert title == "My Song" + assert artist == "My Artist" + + @patch("slskd_transform.rename.FLAC") + def test_defaults_to_unknown(self, mock_flac_cls): + mock_audio = MagicMock() + mock_audio.get.side_effect = lambda key, default: default + mock_flac_cls.return_value = mock_audio + + title, artist = extract_metadata(Path("/fake/path.flac")) + assert title == "Unknown" + assert artist == "Unknown" + + +class TestMoveAndRenameFlacFiles: + @patch("slskd_transform.rename.shutil") + @patch("slskd_transform.rename.extract_metadata") + def test_moves_with_metadata_name(self, mock_extract, mock_shutil): + mock_extract.return_value = ("My Song", "My Artist") + with tempfile.TemporaryDirectory() as dest: + move_and_rename_flac_files([Path("/source/file.flac")], Path(dest)) + expected = os.path.join(dest, "My Artist - My Song.flac") + mock_shutil.move.assert_called_once_with(str(Path("/source/file.flac")), expected) + + @patch("slskd_transform.rename.shutil") + @patch("slskd_transform.rename.extract_metadata") + def test_empty_list(self, mock_extract, mock_shutil): + with tempfile.TemporaryDirectory() as dest: + move_and_rename_flac_files([], Path(dest)) + mock_shutil.move.assert_not_called() diff --git a/tests/test_search.py b/tests/test_search.py new file mode 100644 index 0000000..dc38e79 --- /dev/null +++ b/tests/test_search.py @@ -0,0 +1,172 @@ +import tempfile +import os +import pytest +from pathlib import Path +from unittest.mock import patch, MagicMock + +from slskd_transform.search import ( + list_files_with_duration, + find_close_duration_song, + remove_hyphens_and_trim, + search_and_enqueue, + threaded_search_and_enqueue, +) +from slskd_transform.config import load_config + + +def _make_config(**overrides): + defaults = {"api_key": "test", "host": "http://test:5030"} + defaults.update(overrides) + return load_config(config_path=Path("/nonexistent"), cli_overrides=defaults) + + +class TestListFilesWithDuration: + def test_returns_name_duration_tuples(self): + with tempfile.TemporaryDirectory() as tmpdir: + Path(tmpdir, "Song A.flac").touch() + Path(tmpdir, "Song B.mp3").touch() + + mock_audio = MagicMock() + mock_audio.info.length = 245.5 + + with patch("slskd_transform.search.mutagen.File", return_value=mock_audio): + result = list_files_with_duration(Path(tmpdir)) + + assert len(result) == 2 + names = [r[0] for r in result] + assert "Song A" in names + assert "Song B" in names + for _, duration in result: + assert duration == 245 + + def test_skips_dotfiles(self): + with tempfile.TemporaryDirectory() as tmpdir: + Path(tmpdir, ".hidden.flac").touch() + Path(tmpdir, "visible.flac").touch() + + mock_audio = MagicMock() + mock_audio.info.length = 100.0 + + with patch("slskd_transform.search.mutagen.File", return_value=mock_audio): + result = list_files_with_duration(Path(tmpdir)) + + assert len(result) == 1 + assert result[0][0] == "visible" + + def test_empty_directory(self): + with tempfile.TemporaryDirectory() as tmpdir: + result = list_files_with_duration(Path(tmpdir)) + assert result == [] + + def test_recursive_scanning(self): + with tempfile.TemporaryDirectory() as tmpdir: + subdir = Path(tmpdir, "artist", "album") + subdir.mkdir(parents=True) + Path(tmpdir, "top.mp3").touch() + Path(subdir, "nested.flac").touch() + + mock_audio = MagicMock() + mock_audio.info.length = 180.0 + + with patch("slskd_transform.search.mutagen.File", return_value=mock_audio): + result = list_files_with_duration(Path(tmpdir), recursive=True) + + assert len(result) == 2 + + def test_non_recursive_ignores_subdirs(self): + with tempfile.TemporaryDirectory() as tmpdir: + subdir = Path(tmpdir, "sub") + subdir.mkdir() + Path(tmpdir, "top.mp3").touch() + Path(subdir, "nested.flac").touch() + + mock_audio = MagicMock() + mock_audio.info.length = 180.0 + + with patch("slskd_transform.search.mutagen.File", return_value=mock_audio): + result = list_files_with_duration(Path(tmpdir), recursive=False) + + assert len(result) == 1 + assert result[0][0] == "top" + + +class TestFindCloseDurationSong: + def test_finds_matching_duration(self): + results = [ + {"files": [{"filename": "song.flac", "length": 240}]}, + ] + match = find_close_duration_song(results, 242) + assert match is not None + + def test_no_match_outside_tolerance(self): + results = [{"files": [{"filename": "song.flac", "length": 240}]}] + match = find_close_duration_song(results, 300) + assert match is None + + def test_empty_results(self): + assert find_close_duration_song([], 200) is None + + def test_missing_length_key(self): + results = [{"files": [{"filename": "song.flac"}]}] + assert find_close_duration_song(results, 200) is None + + def test_custom_tolerance(self): + results = [{"files": [{"filename": "song.flac", "length": 230}]}] + assert find_close_duration_song(results, 200, tolerance=30) is not None + assert find_close_duration_song(results, 200, tolerance=10) is None + + +class TestRemoveHyphensAndTrim: + def test_basic(self): + assert remove_hyphens_and_trim("Artist - Song") == "Artist Song" + + def test_multiple_hyphens(self): + assert remove_hyphens_and_trim("A - B - C") == "A B C" + + def test_no_hyphens(self): + assert remove_hyphens_and_trim("No Hyphens") == "No Hyphens" + + def test_empty_string(self): + assert remove_hyphens_and_trim("") == "" + + +class TestSearchAndEnqueue: + @patch("slskd_transform.search.time.sleep") + def test_successful_enqueue(self, mock_sleep): + config = _make_config() + client = MagicMock() + client.searches.search_text.return_value = {"id": "s1"} + client.searches.search_responses.return_value = [ + {"username": "peer1", "files": [{"filename": "song.flac", "length": 200, "size": 5000}]} + ] + client.transfers.enqueue.return_value = True + + unfound = [] + search_and_enqueue([("Artist - Song", 200)], unfound, config=config, client=client) + assert unfound == [] + client.transfers.enqueue.assert_called_once() + + @patch("slskd_transform.search.time.sleep") + def test_no_match_adds_to_unfound(self, mock_sleep): + config = _make_config() + client = MagicMock() + client.searches.search_text.return_value = {"id": "s1"} + client.searches.search_responses.return_value = [ + {"username": "peer1", "files": [{"filename": "song.flac", "length": 999, "size": 5000}]} + ] + + unfound = [] + search_and_enqueue([("Song", 200)], unfound, config=config, client=client) + assert "Song" in unfound + + +class TestThreadedSearchAndEnqueue: + @patch("slskd_transform.search.time.sleep") + @patch("slskd_transform.search.search_and_enqueue") + def test_distributes_songs(self, mock_search, mock_sleep): + config = _make_config(num_threads=2) + client = MagicMock() + songs = [(f"Song {i}", i * 10) for i in range(10)] + + threaded_search_and_enqueue(songs, [], config=config, client=client) + assert mock_search.call_count == 2 diff --git a/tests/test_slskd_transform.py b/tests/test_slskd_transform.py deleted file mode 100644 index 3085e91..0000000 --- a/tests/test_slskd_transform.py +++ /dev/null @@ -1,667 +0,0 @@ -"""Tests for slskd-transform: main.py and rename-files.py. - -Imports the actual module functions (guarded by if __name__ == '__main__') -and tests them with mocked I/O, network calls, and external dependencies. -""" - -import pytest -import tempfile -import os -import sys -import csv -import importlib -import importlib.util -from pathlib import Path -from unittest.mock import patch, MagicMock, call -from threading import Thread - -import main -# rename-files.py has a hyphen so we need importlib -_spec = importlib.util.spec_from_file_location( - "rename_files", - os.path.join(os.path.dirname(os.path.dirname(__file__)), "rename-files.py"), -) -rename_files = importlib.util.module_from_spec(_spec) -_spec.loader.exec_module(rename_files) - - -# --------------------------------------------------------------------------- -# main.py — write_unfound_songs_to_csv -# --------------------------------------------------------------------------- - -class TestWriteUnfoundSongsToCsv: - def test_writes_header_and_songs(self): - with tempfile.TemporaryDirectory() as tmpdir: - filepath = os.path.join(tmpdir, "unfound.csv") - main.write_unfound_songs_to_csv(["Song A", "Song B"], filepath) - - with open(filepath, newline='', encoding='utf-8') as f: - reader = list(csv.reader(f)) - assert reader[0] == ["Song Name"] - assert reader[1] == ["Song A"] - assert reader[2] == ["Song B"] - assert len(reader) == 3 - - def test_writes_empty_list(self): - with tempfile.TemporaryDirectory() as tmpdir: - filepath = os.path.join(tmpdir, "unfound.csv") - main.write_unfound_songs_to_csv([], filepath) - - with open(filepath, newline='', encoding='utf-8') as f: - reader = list(csv.reader(f)) - assert reader == [["Song Name"]] - - def test_writes_special_characters(self): - with tempfile.TemporaryDirectory() as tmpdir: - filepath = os.path.join(tmpdir, "unfound.csv") - main.write_unfound_songs_to_csv(["Song, With Comma", 'Song "Quotes"'], filepath) - - with open(filepath, newline='', encoding='utf-8') as f: - reader = list(csv.reader(f)) - assert reader[1] == ["Song, With Comma"] - assert reader[2] == ['Song "Quotes"'] - - -# --------------------------------------------------------------------------- -# main.py — list_files_without_ext -# --------------------------------------------------------------------------- - -class TestListFilesWithoutExt: - def test_strips_extension(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "Artist - Song.flac").touch() - Path(tmpdir, "Other - Track.mp3").touch() - result = main.list_files_without_ext(tmpdir) - assert "Artist - Song" in result - assert "Other - Track" in result - - def test_skips_dotfiles(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, ".hidden").touch() - Path(tmpdir, "visible.flac").touch() - result = main.list_files_without_ext(tmpdir) - assert "visible" in result - assert len(result) == 1 - - def test_empty_directory(self): - with tempfile.TemporaryDirectory() as tmpdir: - result = main.list_files_without_ext(tmpdir) - assert result == [] - - def test_file_without_extension(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "noext").touch() - result = main.list_files_without_ext(tmpdir) - assert "noext" in result - - def test_multiple_dots_in_name(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "artist.feat.other.flac").touch() - result = main.list_files_without_ext(tmpdir) - assert "artist.feat.other" in result - - -# --------------------------------------------------------------------------- -# main.py — list_files_with_duration -# --------------------------------------------------------------------------- - -class TestListFilesWithDuration: - def test_returns_name_duration_tuples(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "Song A.flac").touch() - Path(tmpdir, "Song B.mp3").touch() - - mock_audio_info = MagicMock() - mock_audio_info.info.length = 245.5 - - with patch("main.mutagen.File", return_value=mock_audio_info): - result = main.list_files_with_duration(tmpdir) - - assert len(result) == 2 - names = [r[0] for r in result] - assert "Song A" in names - assert "Song B" in names - for _, duration in result: - assert duration == 245 - - def test_skips_dotfiles(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, ".hidden.flac").touch() - Path(tmpdir, "visible.flac").touch() - - mock_audio_info = MagicMock() - mock_audio_info.info.length = 100.0 - - with patch("main.mutagen.File", return_value=mock_audio_info): - result = main.list_files_with_duration(tmpdir) - - assert len(result) == 1 - assert result[0][0] == "visible" - - def test_empty_directory(self): - with tempfile.TemporaryDirectory() as tmpdir: - result = main.list_files_with_duration(tmpdir) - assert result == [] - - def test_duration_truncated_to_int(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "track.flac").touch() - - mock_audio_info = MagicMock() - mock_audio_info.info.length = 199.99 - - with patch("main.mutagen.File", return_value=mock_audio_info): - result = main.list_files_with_duration(tmpdir) - - assert result[0][1] == 199 - - -# --------------------------------------------------------------------------- -# main.py — find_close_duration_song -# --------------------------------------------------------------------------- - -class TestFindCloseDurationSong: - def test_finds_matching_duration(self): - results = [ - {"files": [{"filename": "song.flac", "length": 240}]}, - {"files": [{"filename": "other.flac", "length": 300}]}, - ] - match = main.find_close_duration_song(results, 242) - assert match is not None - assert match["files"][0]["filename"] == "song.flac" - - def test_no_match_outside_tolerance(self): - results = [ - {"files": [{"filename": "song.flac", "length": 240}]}, - ] - match = main.find_close_duration_song(results, 300) - assert match is None - - def test_empty_results(self): - match = main.find_close_duration_song([], 200) - assert match is None - - def test_missing_length_key(self): - results = [ - {"files": [{"filename": "song.flac"}]}, - ] - match = main.find_close_duration_song(results, 200) - assert match is None - - def test_tolerance_boundary_exact(self): - results = [ - {"files": [{"filename": "song.flac", "length": 215}]}, - ] - match = main.find_close_duration_song(results, 200) - assert match is not None - - def test_tolerance_boundary_exceeded(self): - results = [ - {"files": [{"filename": "song.flac", "length": 216}]}, - ] - match = main.find_close_duration_song(results, 200) - assert match is None - - def test_result_with_no_files_key(self): - results = [{"username": "user1"}] - match = main.find_close_duration_song(results, 200) - assert match is None - - def test_result_with_empty_files(self): - results = [{"files": []}] - match = main.find_close_duration_song(results, 200) - assert match is None - - def test_returns_first_match(self): - results = [ - {"files": [{"filename": "first.flac", "length": 200}]}, - {"files": [{"filename": "second.flac", "length": 201}]}, - ] - match = main.find_close_duration_song(results, 200) - assert match["files"][0]["filename"] == "first.flac" - - def test_skips_bad_result_finds_good(self): - results = [ - {"files": [{"filename": "bad.flac"}]}, # no length key - {"files": [{"filename": "good.flac", "length": 200}]}, - ] - match = main.find_close_duration_song(results, 200) - assert match["files"][0]["filename"] == "good.flac" - - -# --------------------------------------------------------------------------- -# main.py — remove_hyphens_and_trim -# --------------------------------------------------------------------------- - -class TestRemoveHyphensAndTrim: - def test_basic_hyphen_removal(self): - assert main.remove_hyphens_and_trim("Artist - Song Name") == "Artist Song Name" - - def test_multiple_hyphens(self): - assert main.remove_hyphens_and_trim("A - B - C") == "A B C" - - def test_no_hyphens(self): - assert main.remove_hyphens_and_trim("No Hyphens Here") == "No Hyphens Here" - - def test_leading_trailing_spaces(self): - result = main.remove_hyphens_and_trim(" Artist - Song ") - assert result == "Artist Song" - - def test_empty_string(self): - assert main.remove_hyphens_and_trim("") == "" - - def test_only_hyphen(self): - # Splitting "-" gives ["", ""] which join as " " - assert main.remove_hyphens_and_trim("-") == " " - - def test_consecutive_hyphens(self): - # "A--B" splits into ["A", "", "B"] which join as "A B" - result = main.remove_hyphens_and_trim("A--B") - assert result == "A B" - - -# --------------------------------------------------------------------------- -# main.py — search_and_enqueue -# --------------------------------------------------------------------------- - -class TestSearchAndEnqueue: - def _make_mock_slskd(self, search_response, search_results, enqueue_return=True): - mock = MagicMock() - mock.searches.search_text.return_value = search_response - mock.searches.search_responses.return_value = search_results - mock.transfers.enqueue.return_value = enqueue_return - return mock - - @patch("main.time.sleep") - def test_successful_enqueue(self, mock_sleep): - search_resp = {"id": "search-1"} - search_results = [ - { - "username": "peer1", - "files": [{"filename": "song.flac", "length": 200, "size": 5000}], - } - ] - main.slskd = self._make_mock_slskd(search_resp, search_results, enqueue_return=True) - - unfound = [] - main.search_and_enqueue([("Artist - Song", 200)], unfound) - - assert unfound == [] - main.slskd.searches.search_text.assert_called_once() - main.slskd.transfers.enqueue.assert_called_once_with( - username="peer1", - files=[{"filename": "song.flac", "size": 5000}], - ) - mock_sleep.assert_called_with(60) - - @patch("main.time.sleep") - def test_no_matching_duration(self, mock_sleep): - search_resp = {"id": "search-1"} - search_results = [ - { - "username": "peer1", - "files": [{"filename": "song.flac", "length": 999, "size": 5000}], - } - ] - main.slskd = self._make_mock_slskd(search_resp, search_results) - - unfound = [] - main.search_and_enqueue([("Artist - Song", 200)], unfound) - - assert "Artist - Song" in unfound - - @patch("main.time.sleep") - def test_enqueue_returns_false(self, mock_sleep): - search_resp = {"id": "search-1"} - search_results = [ - { - "username": "peer1", - "files": [{"filename": "song.flac", "length": 200, "size": 5000}], - } - ] - main.slskd = self._make_mock_slskd(search_resp, search_results, enqueue_return=False) - - unfound = [] - main.search_and_enqueue([("Artist - Song", 200)], unfound) - - assert "Artist - Song" in unfound - - @patch("main.time.sleep") - def test_enqueue_http_error(self, mock_sleep): - import requests - - search_resp = {"id": "search-1"} - search_results = [ - { - "username": "peer1", - "files": [{"filename": "song.flac", "length": 200, "size": 5000}], - } - ] - mock_slskd = self._make_mock_slskd(search_resp, search_results) - mock_slskd.transfers.enqueue.side_effect = requests.exceptions.HTTPError("500") - main.slskd = mock_slskd - - unfound = [] - main.search_and_enqueue([("Artist - Song", 200)], unfound) - - assert "Artist - Song" in unfound - - @patch("main.time.sleep") - def test_multiple_songs(self, mock_sleep): - search_resp = {"id": "search-1"} - # First song found, second not - main.slskd = MagicMock() - main.slskd.searches.search_text.return_value = search_resp - - good_results = [ - { - "username": "peer1", - "files": [{"filename": "song1.flac", "length": 200, "size": 5000}], - } - ] - bad_results = [ - { - "username": "peer2", - "files": [{"filename": "song2.flac", "length": 999, "size": 5000}], - } - ] - main.slskd.searches.search_responses.side_effect = [good_results, bad_results] - main.slskd.transfers.enqueue.return_value = True - - unfound = [] - main.search_and_enqueue( - [("Song One", 200), ("Song Two", 100)], unfound - ) - - assert len(unfound) == 1 - assert "Song Two" in unfound - - @patch("main.time.sleep") - def test_search_text_includes_flac(self, mock_sleep): - search_resp = {"id": "search-1"} - main.slskd = self._make_mock_slskd(search_resp, [], enqueue_return=True) - - unfound = [] - main.search_and_enqueue([("My Song", 200)], unfound) - - call_args = main.slskd.searches.search_text.call_args - assert "flac" in call_args[1]["searchText"] - - @patch("main.time.sleep") - def test_empty_song_list(self, mock_sleep): - main.slskd = MagicMock() - unfound = [] - main.search_and_enqueue([], unfound) - assert unfound == [] - main.slskd.searches.search_text.assert_not_called() - - -# --------------------------------------------------------------------------- -# main.py — threaded_search_and_enqueue -# --------------------------------------------------------------------------- - -class TestThreadedSearchAndEnqueue: - @patch("main.time.sleep") - @patch("main.search_and_enqueue") - def test_distributes_songs_across_threads(self, mock_search, mock_sleep): - songs = [(f"Song {i}", i * 10) for i in range(10)] - unfound = [] - - main.threaded_search_and_enqueue(songs, unfound, num_threads=2) - - assert mock_search.call_count == 2 - # First chunk: 5 songs, second chunk: remaining 5 - first_chunk = mock_search.call_args_list[0][0][0] - second_chunk = mock_search.call_args_list[1][0][0] - assert len(first_chunk) + len(second_chunk) == 10 - - @patch("main.time.sleep") - @patch("main.search_and_enqueue") - def test_single_thread(self, mock_search, mock_sleep): - songs = [("Song A", 100), ("Song B", 200)] - unfound = [] - - main.threaded_search_and_enqueue(songs, unfound, num_threads=1) - - assert mock_search.call_count == 1 - chunk = mock_search.call_args_list[0][0][0] - assert len(chunk) == 2 - - @patch("main.time.sleep") - @patch("main.search_and_enqueue") - def test_last_thread_gets_remainder(self, mock_search, mock_sleep): - songs = [(f"Song {i}", i * 10) for i in range(7)] - unfound = [] - - main.threaded_search_and_enqueue(songs, unfound, num_threads=3) - - assert mock_search.call_count == 3 - chunks = [c[0][0] for c in mock_search.call_args_list] - total = sum(len(c) for c in chunks) - assert total == 7 - # Last chunk should have the remainder - assert len(chunks[2]) >= len(chunks[0]) - - @patch("main.time.sleep") - @patch("main.search_and_enqueue") - def test_sleeps_between_thread_starts(self, mock_search, mock_sleep): - songs = [(f"Song {i}", i * 10) for i in range(6)] - unfound = [] - - main.threaded_search_and_enqueue(songs, unfound, num_threads=3) - - sleep_calls = [c[0][0] for c in mock_sleep.call_args_list] - assert sleep_calls.count(10) == 3 - - -# --------------------------------------------------------------------------- -# main.py — main() -# --------------------------------------------------------------------------- - -class TestMainFunction: - @patch("main.write_unfound_songs_to_csv") - @patch("main.threaded_search_and_enqueue") - @patch("main.list_files_with_duration", return_value=[("Song", 200)]) - @patch("main.slskd_api.SlskdClient") - def test_main_no_unfound(self, mock_client, mock_list, mock_threaded, mock_csv): - mock_threaded.side_effect = lambda songs, unfound, **kw: None - main.main() - - mock_client.assert_called_once() - mock_list.assert_called_once() - mock_threaded.assert_called_once() - mock_csv.assert_not_called() - - @patch("main.write_unfound_songs_to_csv") - @patch("main.threaded_search_and_enqueue") - @patch("main.list_files_with_duration", return_value=[("Song", 200)]) - @patch("main.slskd_api.SlskdClient") - def test_main_with_unfound(self, mock_client, mock_list, mock_threaded, mock_csv): - def add_unfound(songs, unfound, **kw): - unfound.append("Lost Song") - - mock_threaded.side_effect = add_unfound - main.main() - - mock_csv.assert_called_once_with(["Lost Song"], "unfound_songs.csv") - - -# --------------------------------------------------------------------------- -# rename-files.py — collect_flac_files -# --------------------------------------------------------------------------- - -class TestCollectFlacFiles: - def test_finds_flac_files(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "song.flac").touch() - Path(tmpdir, "other.mp3").touch() - subdir = Path(tmpdir, "subdir") - subdir.mkdir() - Path(subdir, "nested.FLAC").touch() - - result = rename_files.collect_flac_files(tmpdir) - assert len(result) == 2 - names = [os.path.basename(f) for f in result] - assert "song.flac" in names - assert "nested.FLAC" in names - assert "other.mp3" not in names - - def test_empty_directory(self): - with tempfile.TemporaryDirectory() as tmpdir: - result = rename_files.collect_flac_files(tmpdir) - assert result == [] - - def test_deeply_nested(self): - with tempfile.TemporaryDirectory() as tmpdir: - deep = Path(tmpdir, "a", "b", "c") - deep.mkdir(parents=True) - Path(deep, "deep.flac").touch() - result = rename_files.collect_flac_files(tmpdir) - assert len(result) == 1 - assert "deep.flac" in result[0] - - def test_case_insensitive_extension(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "lower.flac").touch() - Path(tmpdir, "upper.FLAC").touch() - Path(tmpdir, "mixed.Flac").touch() - result = rename_files.collect_flac_files(tmpdir) - assert len(result) == 3 - - def test_ignores_non_flac(self): - with tempfile.TemporaryDirectory() as tmpdir: - Path(tmpdir, "song.mp3").touch() - Path(tmpdir, "song.wav").touch() - Path(tmpdir, "song.ogg").touch() - Path(tmpdir, "song.flac.txt").touch() - result = rename_files.collect_flac_files(tmpdir) - assert result == [] - - -# --------------------------------------------------------------------------- -# rename-files.py — extract_metadata -# --------------------------------------------------------------------------- - -class TestExtractMetadata: - @patch.object(rename_files, "FLAC") - def test_extracts_title_and_artist(self, mock_flac_cls): - mock_audio = MagicMock() - mock_audio.get.side_effect = lambda key, default: { - "title": ["My Song"], - "artist": ["My Artist"], - }.get(key, default) - mock_flac_cls.return_value = mock_audio - - title, artist = rename_files.extract_metadata("/fake/path.flac") - assert title == "My Song" - assert artist == "My Artist" - - @patch.object(rename_files, "FLAC") - def test_defaults_to_unknown(self, mock_flac_cls): - mock_audio = MagicMock() - mock_audio.get.side_effect = lambda key, default: default - mock_flac_cls.return_value = mock_audio - - title, artist = rename_files.extract_metadata("/fake/path.flac") - assert title == "Unknown" - assert artist == "Unknown" - - -# --------------------------------------------------------------------------- -# rename-files.py — sanitize_filename -# --------------------------------------------------------------------------- - -class TestSanitizeFilename: - def test_removes_invalid_chars(self): - assert rename_files.sanitize_filename('Song: "Title"') == "Song Title" - assert rename_files.sanitize_filename("A/B\\C") == "ABC" - assert rename_files.sanitize_filename("normal.flac") == "normal.flac" - - def test_removes_all_special_chars(self): - result = rename_files.sanitize_filename("A*B?CE|F") - assert "*" not in result - assert "?" not in result - assert "<" not in result - assert ">" not in result - assert "|" not in result - - def test_preserves_valid_chars(self): - assert ( - rename_files.sanitize_filename("Hello World - Track 01.flac") - == "Hello World - Track 01.flac" - ) - - def test_empty_string(self): - assert rename_files.sanitize_filename("") == "" - - def test_all_invalid(self): - assert rename_files.sanitize_filename('\\/:*?"<>|') == "" - - -# --------------------------------------------------------------------------- -# rename-files.py — move_and_rename_flac_files -# --------------------------------------------------------------------------- - -class TestMoveAndRenameFlacFiles: - @patch.object(rename_files, "shutil") - @patch.object(rename_files, "extract_metadata") - def test_moves_files_with_metadata_name(self, mock_extract, mock_shutil): - mock_extract.return_value = ("My Song", "My Artist") - - with tempfile.TemporaryDirectory() as dest: - rename_files.move_and_rename_flac_files( - ["/source/file.flac"], dest - ) - - expected_path = os.path.join(dest, "My Artist - My Song.flac") - mock_shutil.move.assert_called_once_with("/source/file.flac", expected_path) - - @patch.object(rename_files, "shutil") - @patch.object(rename_files, "extract_metadata") - def test_sanitizes_filename(self, mock_extract, mock_shutil): - mock_extract.return_value = ('Song: "Special"', "Art/ist") - - with tempfile.TemporaryDirectory() as dest: - rename_files.move_and_rename_flac_files( - ["/source/file.flac"], dest - ) - - expected_path = os.path.join(dest, "Artist - Song Special.flac") - mock_shutil.move.assert_called_once_with("/source/file.flac", expected_path) - - @patch.object(rename_files, "shutil") - @patch.object(rename_files, "extract_metadata") - def test_multiple_files(self, mock_extract, mock_shutil): - mock_extract.side_effect = [ - ("Song A", "Artist A"), - ("Song B", "Artist B"), - ] - - with tempfile.TemporaryDirectory() as dest: - rename_files.move_and_rename_flac_files( - ["/source/a.flac", "/source/b.flac"], dest - ) - - assert mock_shutil.move.call_count == 2 - - @patch.object(rename_files, "shutil") - @patch.object(rename_files, "extract_metadata") - def test_empty_file_list(self, mock_extract, mock_shutil): - with tempfile.TemporaryDirectory() as dest: - rename_files.move_and_rename_flac_files([], dest) - mock_shutil.move.assert_not_called() - mock_extract.assert_not_called() - - -# --------------------------------------------------------------------------- -# rename-files.py — main() -# --------------------------------------------------------------------------- - -class TestRenameFilesMain: - @patch.object(rename_files, "move_and_rename_flac_files") - @patch.object(rename_files, "collect_flac_files", return_value=["/dl/song.flac"]) - def test_main_calls_pipeline(self, mock_collect, mock_move): - rename_files.main() - - mock_collect.assert_called_once_with("G:\\slskd\\downloads") - mock_move.assert_called_once_with(["/dl/song.flac"], "G:\\slskd\\") diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..9903882 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,56 @@ +import csv +import tempfile +import os +from pathlib import Path + +from slskd_transform.utils import write_unfound_songs_to_csv, sanitize_filename + + +class TestWriteUnfoundSongsToCsv: + def test_writes_header_and_songs(self): + with tempfile.TemporaryDirectory() as tmpdir: + filepath = Path(tmpdir) / "unfound.csv" + write_unfound_songs_to_csv(["Song A", "Song B"], filepath) + with open(filepath, newline='', encoding='utf-8') as f: + reader = list(csv.reader(f)) + assert reader[0] == ["Song Name"] + assert reader[1] == ["Song A"] + assert reader[2] == ["Song B"] + + def test_writes_empty_list(self): + with tempfile.TemporaryDirectory() as tmpdir: + filepath = Path(tmpdir) / "unfound.csv" + write_unfound_songs_to_csv([], filepath) + with open(filepath, newline='', encoding='utf-8') as f: + reader = list(csv.reader(f)) + assert reader == [["Song Name"]] + + def test_writes_special_characters(self): + with tempfile.TemporaryDirectory() as tmpdir: + filepath = Path(tmpdir) / "unfound.csv" + write_unfound_songs_to_csv(["Song, With Comma", 'Song "Quotes"'], filepath) + with open(filepath, newline='', encoding='utf-8') as f: + reader = list(csv.reader(f)) + assert reader[1] == ["Song, With Comma"] + assert reader[2] == ['Song "Quotes"'] + + +class TestSanitizeFilename: + def test_removes_invalid_chars(self): + assert sanitize_filename('Song: "Title"') == "Song Title" + assert sanitize_filename("A/B\\C") == "ABC" + assert sanitize_filename("normal.flac") == "normal.flac" + + def test_removes_all_special_chars(self): + result = sanitize_filename("A*B?CE|F") + assert "*" not in result + assert "?" not in result + + def test_preserves_valid_chars(self): + assert sanitize_filename("Hello World - Track 01.flac") == "Hello World - Track 01.flac" + + def test_empty_string(self): + assert sanitize_filename("") == "" + + def test_all_invalid(self): + assert sanitize_filename('\\/:*?"<>|') == ""