diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 4acfd5f..c095654 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -18,10 +18,10 @@ jobs:
python-version: "3.12"
- name: Install dependencies
- run: pip install -r requirements-test.txt
+ run: pip install -e ".[dev]"
- name: Run tests with coverage
- run: pytest tests/ --cov=. --cov-report=xml --cov-report=term -v
+ run: pytest tests/ --cov=src/slskd_transform --cov-report=xml --cov-report=term -v
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
diff --git a/.gitignore b/.gitignore
index 68bc17f..2c661be 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
+# User config (contains API keys)
+config.yml
+
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -158,3 +161,4 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
+.omc/
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..3d6b41d
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,20 @@
+FROM python:3.12-slim AS builder
+
+WORKDIR /app
+COPY pyproject.toml .
+COPY src/ src/
+
+RUN pip install --no-cache-dir --prefix=/install .
+
+FROM python:3.12-slim
+
+WORKDIR /app
+COPY --from=builder /install /usr/local
+
+RUN useradd --create-home appuser
+USER appuser
+
+RUN mkdir -p /app/music /app/downloads /app/organized
+
+ENTRYPOINT ["slskd-transform"]
+CMD ["search"]
diff --git a/README.md b/README.md
index 5c8c343..8e57757 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
-
+
@@ -16,86 +16,171 @@
---
-**slskd-transform** is a Python tool that scans your local music library, searches the [Soulseek](https://www.slsknet.org/) network through [slskd](https://github.com/slskd/slskd) for matching FLAC versions of each track, and automatically enqueues them for download. It matches songs by **audio duration** rather than filenames alone, ensuring you get the correct track every time. Songs that cannot be found are reported in a CSV file for manual follow-up.
+**slskd-transform** scans your local music library, searches the [Soulseek](https://www.slsknet.org/) network through [slskd](https://github.com/slskd/slskd) for matching FLAC versions of each track, and automatically enqueues them for download. It matches songs by **audio duration** rather than filenames alone, ensuring you get the correct track every time.
-A companion script handles post-download organization, renaming all downloaded FLACs into a clean `Artist - Title.flac` structure using embedded metadata.
+A companion `rename` command handles post-download organization, renaming all downloaded FLACs into a clean `Artist - Title.flac` structure using embedded metadata.
## Features
-- **Duration-based matching** -- Compares local track duration against search results with a configurable tolerance (default: 15 seconds), avoiding mismatches from inconsistent naming.
-- **Multi-threaded search** -- Distributes searches across multiple threads (default: 5) for faster processing of large libraries.
+- **Duration-based matching** -- Compares local track duration against search results with a configurable tolerance (default: 15 seconds).
+- **Recursive scanning** -- Point it at your existing music library with `--recursive`, no need to flatten files first.
+- **Multi-threaded search** -- Distributes searches across multiple threads (default: 5) for faster processing.
+- **Flexible configuration** -- Config file, environment variables, or CLI flags. No code editing required.
- **Automatic enqueue** -- Matched FLAC files are enqueued for download directly through the slskd API.
-- **CSV reporting** -- Tracks that could not be found are written to `unfound_songs.csv` for later review.
-- **Metadata-based renaming** -- The `rename-files.py` script reads FLAC tags and renames files to `Artist - Title.flac`, sanitizing any invalid characters.
+- **CSV reporting** -- Tracks that could not be found are written to `unfound_songs.csv`.
+- **Metadata-based renaming** -- Reads FLAC tags and renames files to `Artist - Title.flac`.
+- **Docker support** -- Run alongside slskd in the same compose stack.
## Prerequisites
-- **Python 3.8+**
-- **[slskd](https://github.com/slskd/slskd)** running and accessible (by default at `http://127.0.0.1:5030`)
+- **Python 3.10+**
+- **[slskd](https://github.com/slskd/slskd)** running and accessible
- A valid slskd **API key** (configured in slskd's settings)
-- A local directory containing the lossy music files you want to upgrade
## Installation
+```bash
+pip install git+https://github.com/GeiserX/slskd-transform.git
+```
+
+Or for development:
+
```bash
git clone https://github.com/GeiserX/slskd-transform.git
cd slskd-transform
-pip install -r requirements.txt
+pip install -e ".[dev]"
```
-## Usage
+## Quick Start
-### Step 1 -- Search and enqueue FLAC downloads
+```bash
+# Set your API key (or put it in config.yml)
+export SLSKD_API_KEY="your-api-key-here"
+
+# Search for FLAC versions of all files in ./music
+slskd-transform search
-1. Place your lossy music files (MP3, AAC, OGG, etc.) in a `music/` directory inside the project root.
-2. Open `main.py` and set your slskd connection details:
+# Search recursively in your existing library
+slskd-transform search --music-dir /path/to/library --recursive
-```python
-slskd = slskd_api.SlskdClient(
- host="http://127.0.0.1:5030",
- api_key="YOUR_API_KEY",
- verify_ssl=False
-)
+# Rename downloaded FLACs using metadata
+slskd-transform rename --source-dir /path/to/downloads --dest-dir /path/to/organized
```
-3. Run the search:
+## Configuration
+
+slskd-transform loads configuration from multiple sources with this priority:
-```bash
-python main.py
+```
+CLI flags > Environment variables > Config file > Defaults
```
-The script will search Soulseek for a FLAC version of each local track, match by duration, and enqueue any matches for download. Any songs that could not be found will be saved to `unfound_songs.csv`.
+### Config File
-### Step 2 -- Rename downloaded FLACs
+Create `config.yml` in your working directory or `~/.config/slskd-transform/config.yml`:
-Once downloads are complete, use the renaming script to organize them:
+```yaml
+# slskd connection
+host: "http://127.0.0.1:5030"
+api_key: "your-api-key"
+verify_ssl: false
-1. Open `rename-files.py` and set the source and destination directories:
+# Search settings
+music_dir: "./music"
+duration_tolerance: 15
+num_threads: 5
+search_timeout: 60
+format: "flac"
+recursive: false
-```python
-source_directory = '/path/to/slskd/downloads'
-destination_directory = '/path/to/organized/music'
+# Rename settings
+source_dir: "./downloads"
+destination_dir: "./organized"
```
-2. Run the script:
+### Environment Variables
+
+All settings can be configured via `SLSKD_` prefixed environment variables:
+
+| Variable | Description | Default |
+|----------|-------------|---------|
+| `SLSKD_HOST` | slskd instance URL | `http://127.0.0.1:5030` |
+| `SLSKD_API_KEY` | slskd API key | -- |
+| `SLSKD_VERIFY_SSL` | Enable SSL verification | `false` |
+| `SLSKD_MUSIC_DIR` | Source directory with lossy files | `./music` |
+| `SLSKD_DURATION_TOLERANCE` | Max duration difference (seconds) | `15` |
+| `SLSKD_NUM_THREADS` | Concurrent search threads | `5` |
+| `SLSKD_SEARCH_TIMEOUT` | Wait time for search results (seconds) | `60` |
+| `SLSKD_FORMAT` | Target format to search for | `flac` |
+| `SLSKD_RECURSIVE` | Scan directories recursively | `false` |
+| `SLSKD_SOURCE_DIR` | Download directory for rename | `./downloads` |
+| `SLSKD_DESTINATION_DIR` | Output directory for rename | `./organized` |
+
+### CLI Reference
-```bash
-python rename-files.py
+```
+slskd-transform [OPTIONS] COMMAND [ARGS]...
+
+Options:
+ -c, --config PATH Path to config.yml
+ --host TEXT slskd host URL
+ --api-key TEXT slskd API key
+ --no-verify-ssl Disable SSL verification
+ -t, --threads INTEGER Number of search threads
+ --help Show help
+
+Commands:
+ search Search Soulseek for lossless versions and enqueue downloads
+ rename Rename downloaded FLACs using metadata
```
-All `.flac` files in the source directory (including subdirectories) will be renamed to `Artist - Title.flac` and moved to the destination.
+**search options:**
+```
+ -m, --music-dir PATH Directory with lossy source files
+ -r, --recursive Scan music directory recursively
+ -f, --format TEXT Target format (default: flac)
+ --tolerance INTEGER Duration match tolerance in seconds
+ --timeout INTEGER Seconds to wait for search results
+```
-## Configuration
+**rename options:**
+```
+ -s, --source-dir PATH Directory where slskd downloads land
+ -d, --dest-dir PATH Destination for renamed files
+```
-| Parameter | Location | Default | Description |
-|---|---|---|---|
-| `host` | `main.py` | `http://127.0.0.1:5030` | slskd instance URL |
-| `api_key` | `main.py` | -- | Your slskd API key |
-| `MUSIC_DIR` | `main.py` | `./music` | Directory containing lossy source files |
-| `duration_tolerance` | `main.py` | `15` (seconds) | Maximum duration difference for a match |
-| `num_threads` | `main.py` | `5` | Number of concurrent search threads |
-| `source_directory` | `rename-files.py` | -- | Where slskd downloads land |
-| `destination_directory` | `rename-files.py` | -- | Where renamed FLACs are moved |
+## Docker
+
+```bash
+docker run --rm \
+ -e SLSKD_HOST=http://slskd:5030 \
+ -e SLSKD_API_KEY=your-key \
+ -v /path/to/lossy:/app/music:ro \
+ -v /path/to/downloads:/app/downloads \
+ ghcr.io/geiserx/slskd-transform:2.0.0 search --recursive
+```
+
+Or in a compose stack alongside slskd:
+
+```yaml
+services:
+ slskd:
+ image: slskd/slskd:0.21.4
+ ports:
+ - "5030:5030"
+ volumes:
+ - ./slskd-data:/app
+
+ slskd-transform:
+ image: ghcr.io/geiserx/slskd-transform:2.0.0
+ environment:
+ SLSKD_HOST: http://slskd:5030
+ SLSKD_API_KEY: your-key
+ volumes:
+ - /path/to/lossy:/app/music:ro
+ - ./slskd-data/downloads:/app/downloads
+ command: ["search", "--recursive"]
+```
## How It Works
@@ -110,16 +195,10 @@ Local library Soulseek network Your disk
│ │ │
└──── no match ──> unfound_songs.csv │
v
- rename-files.py
+ slskd-transform rename
Artist - Title.flac
```
-1. **Scan** -- `main.py` reads every file in the `music/` directory using [mutagen](https://mutagen.readthedocs.io/) to extract the audio duration.
-2. **Search** -- For each track, a Soulseek search is issued via the slskd API with the query `" flac"`. Searches run in parallel across multiple threads with staggered starts to avoid flooding the network.
-3. **Match** -- Each search result is compared by duration. The first result within the tolerance window is selected.
-4. **Enqueue** -- Matched files are enqueued for download through slskd. Failed or unmatched tracks are logged.
-5. **Rename** -- After downloading, `rename-files.py` walks the download directory, reads FLAC metadata, and moves files into a flat structure with clean filenames.
-
## Related Music Tools
| Project | Description |
@@ -128,7 +207,6 @@ Local library Soulseek network Your disk
| [audio-transcode-watcher](https://github.com/GeiserX/audio-transcode-watcher) | Automated multi-format audio transcoding with lyrics fetching |
| [jellyfin-encoder](https://github.com/GeiserX/jellyfin-encoder) | Automatic 720p HEVC/AV1 transcoding for Jellyfin |
-
## License
-This project is licensed under the [MIT License](LICENSE).
+This project is licensed under the [GPL-3.0 License](LICENSE).
diff --git a/config.example.yml b/config.example.yml
new file mode 100644
index 0000000..f1147c5
--- /dev/null
+++ b/config.example.yml
@@ -0,0 +1,19 @@
+# slskd-transform configuration
+# All fields are optional — defaults shown below
+
+# slskd connection
+host: "http://127.0.0.1:5030"
+api_key: "" # REQUIRED (or set SLSKD_API_KEY env var)
+verify_ssl: false
+
+# Search settings
+music_dir: "./music" # Where your lossy source files live
+duration_tolerance: 15 # Seconds of acceptable duration difference
+num_threads: 5 # Concurrent search threads
+search_timeout: 60 # Seconds to wait for search results
+format: "flac" # Target format (appended to search query)
+recursive: false # Scan music_dir recursively
+
+# Rename settings
+source_dir: "./downloads" # Where slskd puts completed downloads
+destination_dir: "./organized" # Where renamed FLACs are moved to
diff --git a/main.py b/main.py
index 063ef9d..4dc05f6 100644
--- a/main.py
+++ b/main.py
@@ -1,118 +1,5 @@
-import os
-import slskd_api
-import time
-import mutagen
-import requests
-import csv
-from threading import Thread
-
-def write_unfound_songs_to_csv(unfound_songs, filename):
- with open(filename, mode='w', newline='', encoding='utf-8') as csv_file:
- csv_writer = csv.writer(csv_file)
- csv_writer.writerow(['Song Name'])
-
- for song in unfound_songs:
- csv_writer.writerow([song])
-
-def list_files_without_ext(music_dir):
- filenames = []
- for file in os.listdir(music_dir):
- if not file.startswith('.'):
- file_without_ext = os.path.splitext(file)[0]
- filenames.append(file_without_ext)
- return filenames
-
-def list_files_with_duration(music_dir):
- filenames = []
- for file in os.listdir(music_dir):
- if not file.startswith('.'):
- file_without_ext = os.path.splitext(file)[0]
- file_path = os.path.join(music_dir, file)
- audio_info = mutagen.File(file_path, easy=True)
- duration = int(audio_info.info.length)
- filenames.append((file_without_ext, duration))
- return filenames
-
-def find_close_duration_song(search_results, local_duration):
- duration_tolerance = 15 # Set the duration tolerance in seconds
- for result in search_results:
- if 'files' in result and len(result['files']) > 0:
- file_info = result['files'][0]
- try:
- result_duration = int(file_info['length'])
- if abs(local_duration - result_duration) <= duration_tolerance:
- return result
- except KeyError:
- pass
- return None
-
-def remove_hyphens_and_trim(song_name):
- return ' '.join(segment.strip() for segment in song_name.split('-'))
-
-def search_and_enqueue(songs_with_duration, unfound_songs):
- for song_name, local_duration in songs_with_duration:
- song_name_cleaned = remove_hyphens_and_trim(song_name)
- print(f"Searching for: {song_name_cleaned}")
-
- search_response = slskd.searches.search_text(searchText=(song_name_cleaned + " flac"))
-
- # Wait a few seconds for search results to populate
- time.sleep(60)
-
- search_id = search_response['id']
- search_results = slskd.searches.search_responses(id=search_id)
-
- result = find_close_duration_song(search_results, local_duration)
- if result is not None:
- file_info = result['files'][0]
- print(f"Enqueueing: {file_info['filename']}")
- try:
- success = slskd.transfers.enqueue(username=result['username'], files=[{'filename': file_info['filename'], 'size': file_info['size']}])
- if success:
- print(f"Enqueued: {file_info['filename']}")
- else:
- print(f"Failed to enqueue: {file_info['filename']}")
- unfound_songs.append(song_name)
- except requests.exceptions.HTTPError as http_error:
- print(f"Error while sending request to slskd: {http_error}")
- unfound_songs.append(song_name)
- else:
- print(f"Failed to find matching song with close duration for: {song_name}")
- unfound_songs.append(song_name)
-
-def threaded_search_and_enqueue(songs_with_duration, unfound_songs, num_threads=5):
- thread_list = []
- chunk_size = len(songs_with_duration) // num_threads
-
- for i in range(num_threads):
- if i == num_threads - 1: # last thread takes the remaining songs
- chunk = songs_with_duration[i * chunk_size:]
- else:
- chunk = songs_with_duration[i * chunk_size : (i + 1) * chunk_size]
- thread = Thread(target=search_and_enqueue, args=(chunk, unfound_songs))
- thread.start()
- thread_list.append(thread)
- time.sleep(10)
-
- for thread in thread_list:
- thread.join()
-
-def main():
- global slskd
- slskd = slskd_api.SlskdClient(host="http://127.0.0.1:5030", api_key="...", verify_ssl=False)
-
- current_dir = os.getcwd()
- MUSIC_DIR = os.path.join(current_dir, 'music')
- songs_with_duration = list_files_with_duration(MUSIC_DIR)
-
- unfound_songs = []
-
- threaded_search_and_enqueue(songs_with_duration, unfound_songs)
-
- if len(unfound_songs) > 0:
- write_unfound_songs_to_csv(unfound_songs, 'unfound_songs.csv')
- print("Unfound songs have been written to 'unfound_songs.csv'")
-
+"""Backwards-compatibility shim. Use `slskd-transform search` instead."""
+from slskd_transform.cli import cli
if __name__ == '__main__':
- main()
+ cli(["search"])
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..093e5c9
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,37 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "slskd-transform"
+version = "2.0.0"
+description = "Bulk-upgrade your music library from lossy to lossless via Soulseek"
+readme = "README.md"
+license = "GPL-3.0-only"
+requires-python = ">=3.10"
+authors = [
+ { name = "GeiserX" },
+]
+dependencies = [
+ "slskd-api>=0.1.5",
+ "mutagen>=1.47",
+ "requests>=2.31",
+ "click>=8.1",
+ "pyyaml>=6.0",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=7.0",
+ "pytest-cov>=4.0",
+]
+
+[project.scripts]
+slskd-transform = "slskd_transform.cli:cli"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/slskd_transform"]
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
+addopts = "--cov=src/slskd_transform --cov-report=term"
diff --git a/rename-files.py b/rename-files.py
index 384ec0a..97b3e20 100644
--- a/rename-files.py
+++ b/rename-files.py
@@ -1,44 +1,5 @@
-import os
-import shutil
-from mutagen.flac import FLAC
-
-def collect_flac_files(directory):
- flac_files = []
- for root, dirs, files in os.walk(directory):
- for file in files:
- if file.lower().endswith('.flac'):
- flac_files.append(os.path.join(root, file))
- return flac_files
-
-def extract_metadata(file_path):
- audio = FLAC(file_path)
- title = audio.get('title', ['Unknown'])[0]
- artist = audio.get('artist', ['Unknown'])[0]
- return title, artist
-
-def sanitize_filename(filename):
- invalid_chars = '\\/:*?"<>|'
- for char in invalid_chars:
- filename = filename.replace(char, '')
- return filename
-
-def move_and_rename_flac_files(flac_files, destination):
- for file in flac_files:
- title, artist = extract_metadata(file)
- new_filename = sanitize_filename(f"{artist} - {title}.flac")
- new_filepath = os.path.join(destination, new_filename)
-
- # Move or rename the .flac file
- shutil.move(file, new_filepath)
- print(f"Moved and renamed: {new_filepath}")
-
-def main():
- source_directory = 'G:\\slskd\\downloads'
- destination_directory = 'G:\\slskd\\'
- flac_files = collect_flac_files(source_directory)
-
- move_and_rename_flac_files(flac_files, destination_directory)
-
+"""Backwards-compatibility shim. Use `slskd-transform rename` instead."""
+from slskd_transform.cli import cli
if __name__ == '__main__':
- main()
+ cli(["rename"])
diff --git a/requirements-test.txt b/requirements-test.txt
deleted file mode 100644
index 3bc0049..0000000
--- a/requirements-test.txt
+++ /dev/null
@@ -1,3 +0,0 @@
--r requirements.txt
-pytest>=9.0.3
-pytest-cov>=7.1.0
diff --git a/requirements.txt b/requirements.txt
deleted file mode 100644
index 821f087..0000000
--- a/requirements.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-slskd_api
-mutagen
-requests
diff --git a/src/slskd_transform/__init__.py b/src/slskd_transform/__init__.py
new file mode 100644
index 0000000..2f85346
--- /dev/null
+++ b/src/slskd_transform/__init__.py
@@ -0,0 +1,3 @@
+"""slskd-transform: Bulk-upgrade your music library from lossy to lossless via Soulseek."""
+
+__version__ = "2.0.0"
diff --git a/src/slskd_transform/cli.py b/src/slskd_transform/cli.py
new file mode 100644
index 0000000..ec29a25
--- /dev/null
+++ b/src/slskd_transform/cli.py
@@ -0,0 +1,75 @@
+import click
+from pathlib import Path
+
+from slskd_transform.config import load_config
+from slskd_transform.search import run_search
+from slskd_transform.rename import run_rename
+
+
+@click.group()
+@click.option("--config", "-c", type=click.Path(exists=True), default=None, help="Path to config.yml")
+@click.option("--host", type=str, default=None, help="slskd host URL")
+@click.option("--api-key", type=str, default=None, help="slskd API key")
+@click.option("--no-verify-ssl", is_flag=True, default=False, help="Disable SSL verification")
+@click.option("--threads", "-t", type=int, default=None, help="Number of search threads")
+@click.pass_context
+def cli(ctx, config, host, api_key, no_verify_ssl, threads):
+ """slskd-transform: Bulk-upgrade your music library from lossy to lossless."""
+ ctx.ensure_object(dict)
+ ctx.obj["config_path"] = Path(config) if config else None
+ ctx.obj["cli_overrides"] = {}
+ if host is not None:
+ ctx.obj["cli_overrides"]["host"] = host
+ if api_key is not None:
+ ctx.obj["cli_overrides"]["api_key"] = api_key
+ if no_verify_ssl:
+ ctx.obj["cli_overrides"]["verify_ssl"] = False
+ if threads is not None:
+ ctx.obj["cli_overrides"]["num_threads"] = threads
+
+
+@cli.command()
+@click.option("--music-dir", "-m", type=click.Path(exists=True), default=None, help="Directory with lossy source files")
+@click.option("--recursive", "-r", is_flag=True, default=None, help="Scan music directory recursively")
+@click.option("--format", "-f", "fmt", type=str, default=None, help="Target format (default: flac)")
+@click.option("--tolerance", type=int, default=None, help="Duration match tolerance in seconds")
+@click.option("--timeout", type=int, default=None, help="Seconds to wait for search results")
+@click.pass_context
+def search(ctx, music_dir, recursive, fmt, tolerance, timeout):
+ """Search Soulseek for lossless versions and enqueue downloads."""
+ overrides = dict(ctx.obj["cli_overrides"])
+ if music_dir is not None:
+ overrides["music_dir"] = music_dir
+ if recursive is not None:
+ overrides["recursive"] = recursive
+ if fmt is not None:
+ overrides["format"] = fmt
+ if tolerance is not None:
+ overrides["duration_tolerance"] = tolerance
+ if timeout is not None:
+ overrides["search_timeout"] = timeout
+
+ config = load_config(config_path=ctx.obj["config_path"], cli_overrides=overrides)
+
+ if not config.api_key:
+ raise click.ClickException(
+ "No API key configured. Set SLSKD_API_KEY env var, add api_key to config.yml, or pass --api-key."
+ )
+
+ run_search(config)
+
+
+@cli.command()
+@click.option("--source-dir", "-s", type=click.Path(exists=True), default=None, help="Directory where slskd downloads land")
+@click.option("--dest-dir", "-d", type=click.Path(), default=None, help="Destination for renamed files")
+@click.pass_context
+def rename(ctx, source_dir, dest_dir):
+ """Rename downloaded FLACs to 'Artist - Title.flac' using metadata."""
+ overrides = dict(ctx.obj["cli_overrides"])
+ if source_dir is not None:
+ overrides["source_dir"] = source_dir
+ if dest_dir is not None:
+ overrides["destination_dir"] = dest_dir
+
+ config = load_config(config_path=ctx.obj["config_path"], cli_overrides=overrides)
+ run_rename(config)
diff --git a/src/slskd_transform/config.py b/src/slskd_transform/config.py
new file mode 100644
index 0000000..8aab466
--- /dev/null
+++ b/src/slskd_transform/config.py
@@ -0,0 +1,132 @@
+"""Configuration loading for slskd-transform."""
+
+from __future__ import annotations
+
+import os
+from dataclasses import dataclass
+from pathlib import Path
+
+try:
+ import yaml
+except ImportError:
+ yaml = None # type: ignore[assignment]
+
+
+DEFAULTS: dict[str, object] = {
+ "host": "http://127.0.0.1:5030",
+ "api_key": "",
+ "verify_ssl": False,
+ "music_dir": "./music",
+ "source_dir": "./downloads",
+ "destination_dir": "./organized",
+ "duration_tolerance": 15,
+ "num_threads": 5,
+ "search_timeout": 60,
+ "format": "flac",
+ "recursive": False,
+}
+
+_ENV_MAP: dict[str, str] = {
+ "SLSKD_HOST": "host",
+ "SLSKD_API_KEY": "api_key",
+ "SLSKD_VERIFY_SSL": "verify_ssl",
+ "SLSKD_MUSIC_DIR": "music_dir",
+ "SLSKD_SOURCE_DIR": "source_dir",
+ "SLSKD_DESTINATION_DIR": "destination_dir",
+ "SLSKD_DURATION_TOLERANCE": "duration_tolerance",
+ "SLSKD_NUM_THREADS": "num_threads",
+ "SLSKD_SEARCH_TIMEOUT": "search_timeout",
+ "SLSKD_FORMAT": "format",
+ "SLSKD_RECURSIVE": "recursive",
+}
+
+_BOOL_FIELDS = {"verify_ssl", "recursive"}
+_INT_FIELDS = {"duration_tolerance", "num_threads", "search_timeout"}
+_PATH_FIELDS = {"music_dir", "source_dir", "destination_dir"}
+
+
+def _parse_bool(value: object) -> bool:
+ if isinstance(value, bool):
+ return value
+ return str(value).lower() in ("true", "1", "yes")
+
+
+@dataclass(frozen=True)
+class SlskdConfig:
+ host: str
+ api_key: str
+ verify_ssl: bool
+ music_dir: Path
+ source_dir: Path
+ destination_dir: Path
+ duration_tolerance: int
+ num_threads: int
+ search_timeout: int
+ format: str
+ recursive: bool
+
+
+def load_config(
+ *,
+ config_path: Path | None = None,
+ cli_overrides: dict[str, object] | None = None,
+) -> SlskdConfig:
+ """Load configuration from YAML, env vars, and CLI overrides."""
+ merged: dict[str, object] = dict(DEFAULTS)
+
+ # Load YAML file
+ file_data = _load_yaml(config_path)
+ for key, value in file_data.items():
+ if key in DEFAULTS:
+ merged[key] = value
+
+ # Overlay environment variables
+ for env_var, field in _ENV_MAP.items():
+ value = os.environ.get(env_var)
+ if value is not None:
+ merged[field] = value
+
+ # Overlay CLI overrides
+ if cli_overrides:
+ for key, value in cli_overrides.items():
+ if value is not None and key in DEFAULTS:
+ merged[key] = value
+
+ # Convert types
+ for field in _BOOL_FIELDS:
+ merged[field] = _parse_bool(merged[field])
+ for field in _INT_FIELDS:
+ merged[field] = int(merged[field]) # type: ignore[arg-type]
+ for field in _PATH_FIELDS:
+ merged[field] = Path(str(merged[field]))
+
+ return SlskdConfig(**merged) # type: ignore[arg-type]
+
+
+def _load_yaml(config_path: Path | None) -> dict[str, object]:
+ """Load YAML config from explicit path or auto-discovered locations."""
+ if config_path is not None:
+ return _read_yaml(config_path)
+
+ # Auto-discover
+ candidates = [
+ Path("./config.yml"),
+ Path.home() / ".config" / "slskd-transform" / "config.yml",
+ ]
+ for candidate in candidates:
+ if candidate.exists():
+ return _read_yaml(candidate)
+
+ return {}
+
+
+def _read_yaml(path: Path) -> dict[str, object]:
+ """Read and parse a YAML file. Returns empty dict on failure."""
+ if yaml is None:
+ return {}
+ try:
+ with open(path) as f:
+ data = yaml.safe_load(f)
+ return data if isinstance(data, dict) else {}
+ except (OSError, yaml.YAMLError):
+ return {}
diff --git a/src/slskd_transform/rename.py b/src/slskd_transform/rename.py
new file mode 100644
index 0000000..fe13c5a
--- /dev/null
+++ b/src/slskd_transform/rename.py
@@ -0,0 +1,44 @@
+"""Rename and organize downloaded FLAC files by metadata."""
+
+import os
+import shutil
+from pathlib import Path
+
+from mutagen.flac import FLAC
+
+from slskd_transform.config import SlskdConfig
+from slskd_transform.utils import sanitize_filename
+
+
+def collect_flac_files(directory: Path) -> list[Path]:
+ """Recursively collect all .flac files (case-insensitive) under directory."""
+ flac_files = []
+ for root, _dirs, files in os.walk(directory):
+ for file in files:
+ if file.lower().endswith(".flac"):
+ flac_files.append(Path(root) / file)
+ return flac_files
+
+
+def extract_metadata(file_path: Path) -> tuple[str, str]:
+ """Read FLAC tags, return (title, artist). Defaults to 'Unknown' if missing."""
+ audio = FLAC(str(file_path))
+ title = audio.get("title", ["Unknown"])[0]
+ artist = audio.get("artist", ["Unknown"])[0]
+ return title, artist
+
+
+def move_and_rename_flac_files(flac_files: list[Path], destination: Path) -> None:
+ """Move each file to destination as 'Artist - Title.flac' (sanitized)."""
+ for file in flac_files:
+ title, artist = extract_metadata(file)
+ new_filename = sanitize_filename(f"{artist} - {title}.flac")
+ shutil.move(str(file), str(destination / new_filename))
+ print(f"Moved and renamed: {destination / new_filename}")
+
+
+def run_rename(config: SlskdConfig) -> None:
+ """Top-level orchestrator: collect files from source_dir, rename to destination_dir."""
+ config.destination_dir.mkdir(parents=True, exist_ok=True)
+ flac_files = collect_flac_files(config.source_dir)
+ move_and_rename_flac_files(flac_files, config.destination_dir)
diff --git a/src/slskd_transform/search.py b/src/slskd_transform/search.py
new file mode 100644
index 0000000..282f9dd
--- /dev/null
+++ b/src/slskd_transform/search.py
@@ -0,0 +1,185 @@
+"""Search and enqueue module for slskd-transform."""
+
+from __future__ import annotations
+
+import os
+import time
+from pathlib import Path
+from threading import Thread
+
+import mutagen
+import requests
+import slskd_api
+
+from slskd_transform.config import SlskdConfig
+from slskd_transform.utils import write_unfound_songs_to_csv
+
+
+def list_files_with_duration(
+ music_dir: Path | str,
+ *,
+ recursive: bool = False,
+) -> list[tuple[str, int]]:
+ """Scan a directory for audio files and return (filename_without_ext, duration) tuples."""
+ filenames: list[tuple[str, int]] = []
+
+ if recursive:
+ for root, _dirs, files in os.walk(music_dir):
+ for file in files:
+ if file.startswith('.'):
+ continue
+ file_path = os.path.join(root, file)
+ audio_info = mutagen.File(file_path, easy=True)
+ if audio_info is None or audio_info.info is None:
+ continue
+ duration = int(audio_info.info.length)
+ file_without_ext = os.path.splitext(file)[0]
+ filenames.append((file_without_ext, duration))
+ else:
+ for file in os.listdir(music_dir):
+ if file.startswith('.'):
+ continue
+ file_path = os.path.join(str(music_dir), file)
+ if not os.path.isfile(file_path):
+ continue
+ audio_info = mutagen.File(file_path, easy=True)
+ if audio_info is None or audio_info.info is None:
+ continue
+ duration = int(audio_info.info.length)
+ file_without_ext = os.path.splitext(file)[0]
+ filenames.append((file_without_ext, duration))
+
+ return filenames
+
+
+def find_close_duration_song(
+ search_results: list[dict],
+ local_duration: int,
+ *,
+ tolerance: int = 15,
+) -> dict | None:
+ """Return the first search result whose duration is within tolerance of local_duration."""
+ for result in search_results:
+ if 'files' in result and len(result['files']) > 0:
+ file_info = result['files'][0]
+ try:
+ result_duration = int(file_info['length'])
+ if abs(local_duration - result_duration) <= tolerance:
+ return result
+ except KeyError:
+ pass
+ return None
+
+
+def remove_hyphens_and_trim(song_name: str) -> str:
+ """Split on hyphens, strip each segment, and rejoin with spaces."""
+ return ' '.join(segment.strip() for segment in song_name.split('-'))
+
+
+def search_and_enqueue(
+ songs_with_duration: list[tuple[str, int]],
+ unfound_songs: list[str],
+ *,
+ config: SlskdConfig,
+ client: slskd_api.SlskdClient,
+) -> None:
+ """Search slskd for each song and enqueue matching results."""
+ for song_name, local_duration in songs_with_duration:
+ song_name_cleaned = remove_hyphens_and_trim(song_name)
+ print(f"Searching for: {song_name_cleaned}")
+
+ search_response = client.searches.search_text(
+ searchText=(song_name_cleaned + " " + config.format)
+ )
+
+ time.sleep(config.search_timeout)
+
+ search_id = search_response['id']
+ search_results = client.searches.search_responses(id=search_id)
+
+ result = find_close_duration_song(
+ search_results, local_duration, tolerance=config.duration_tolerance
+ )
+ if result is not None:
+ file_info = result['files'][0]
+ print(f"Enqueueing: {file_info['filename']}")
+ try:
+ success = client.transfers.enqueue(
+ username=result['username'],
+ files=[{'filename': file_info['filename'], 'size': file_info['size']}],
+ )
+ if success:
+ print(f"Enqueued: {file_info['filename']}")
+ else:
+ print(f"Failed to enqueue: {file_info['filename']}")
+ unfound_songs.append(song_name)
+ except requests.exceptions.HTTPError as http_error:
+ print(f"Error while sending request to slskd: {http_error}")
+ unfound_songs.append(song_name)
+ else:
+ print(f"Failed to find matching song with close duration for: {song_name}")
+ unfound_songs.append(song_name)
+
+
+def threaded_search_and_enqueue(
+ songs_with_duration: list[tuple[str, int]],
+ unfound_songs: list[str],
+ *,
+ config: SlskdConfig,
+ client: slskd_api.SlskdClient,
+) -> None:
+ """Split songs into chunks and search/enqueue in parallel threads."""
+ if not songs_with_duration:
+ return
+ num_threads = min(max(config.num_threads, 1), len(songs_with_duration))
+ thread_list: list[Thread] = []
+ chunk_size = len(songs_with_duration) // num_threads
+
+ per_thread_unfound: list[list[str]] = [[] for _ in range(num_threads)]
+
+ for i in range(num_threads):
+ if i == num_threads - 1:
+ chunk = songs_with_duration[i * chunk_size:]
+ else:
+ chunk = songs_with_duration[i * chunk_size : (i + 1) * chunk_size]
+ thread = Thread(
+ target=search_and_enqueue,
+ args=(chunk, per_thread_unfound[i]),
+ kwargs={"config": config, "client": client},
+ )
+ thread.start()
+ thread_list.append(thread)
+ time.sleep(10)
+
+ for thread in thread_list:
+ thread.join()
+
+ for thread_unfound in per_thread_unfound:
+ unfound_songs.extend(thread_unfound)
+
+
+def run_search(config: SlskdConfig) -> None:
+ """Top-level orchestrator: create client, scan directory, search, and write CSV."""
+ client = slskd_api.SlskdClient(
+ host=config.host,
+ api_key=config.api_key,
+ verify_ssl=config.verify_ssl,
+ )
+
+ songs_with_duration = list_files_with_duration(
+ config.music_dir,
+ recursive=config.recursive,
+ )
+
+ unfound_songs: list[str] = []
+
+ threaded_search_and_enqueue(
+ songs_with_duration,
+ unfound_songs,
+ config=config,
+ client=client,
+ )
+
+ if len(unfound_songs) > 0:
+ write_unfound_songs_to_csv(unfound_songs, config.music_dir / 'unfound_songs.csv')
+ print("Unfound songs have been written to 'unfound_songs.csv'")
diff --git a/src/slskd_transform/utils.py b/src/slskd_transform/utils.py
new file mode 100644
index 0000000..5dc9a6c
--- /dev/null
+++ b/src/slskd_transform/utils.py
@@ -0,0 +1,19 @@
+import csv
+from pathlib import Path
+
+
+def write_unfound_songs_to_csv(unfound_songs: list[str], filepath: Path) -> None:
+ """Write unfound song names to a CSV file with a 'Song Name' header."""
+ with open(filepath, mode='w', newline='', encoding='utf-8') as csv_file:
+ csv_writer = csv.writer(csv_file)
+ csv_writer.writerow(['Song Name'])
+ for song in unfound_songs:
+ csv_writer.writerow([song])
+
+
+def sanitize_filename(filename: str) -> str:
+ """Remove characters invalid on Windows/Linux filesystems: \\/:*?"<>|"""
+ invalid_chars = '\\/:*?"<>|'
+ for char in invalid_chars:
+ filename = filename.replace(char, '')
+ return filename
diff --git a/tests/test_cli.py b/tests/test_cli.py
new file mode 100644
index 0000000..833e0b3
--- /dev/null
+++ b/tests/test_cli.py
@@ -0,0 +1,35 @@
+from click.testing import CliRunner
+from unittest.mock import patch, MagicMock
+from pathlib import Path
+
+from slskd_transform.cli import cli
+
+
+class TestCliSearch:
+ @patch("slskd_transform.cli.run_search")
+ @patch("slskd_transform.cli.load_config")
+ def test_search_with_api_key(self, mock_load, mock_run):
+ mock_load.return_value = MagicMock(api_key="test-key")
+ runner = CliRunner()
+ result = runner.invoke(cli, ["--api-key", "test-key", "search"])
+ assert result.exit_code == 0
+ mock_run.assert_called_once()
+
+ @patch("slskd_transform.cli.load_config")
+ def test_search_without_api_key_fails(self, mock_load):
+ mock_load.return_value = MagicMock(api_key="")
+ runner = CliRunner()
+ result = runner.invoke(cli, ["search"])
+ assert result.exit_code != 0
+ assert "API key" in result.output
+
+
+class TestCliRename:
+ @patch("slskd_transform.cli.run_rename")
+ @patch("slskd_transform.cli.load_config")
+ def test_rename_runs(self, mock_load, mock_run):
+ mock_load.return_value = MagicMock()
+ runner = CliRunner()
+ result = runner.invoke(cli, ["rename"])
+ assert result.exit_code == 0
+ mock_run.assert_called_once()
diff --git a/tests/test_config.py b/tests/test_config.py
new file mode 100644
index 0000000..8a297c9
--- /dev/null
+++ b/tests/test_config.py
@@ -0,0 +1,93 @@
+import os
+import tempfile
+import pytest
+from pathlib import Path
+from unittest.mock import patch
+
+from slskd_transform.config import load_config, SlskdConfig, DEFAULTS
+
+
+class TestDefaults:
+ def test_load_with_no_sources(self):
+ """With no config file, no env vars, no overrides — get defaults."""
+ with patch.dict(os.environ, {}, clear=True):
+ # Remove any SLSKD_ env vars
+ env = {k: v for k, v in os.environ.items() if not k.startswith("SLSKD_")}
+ with patch.dict(os.environ, env, clear=True):
+ config = load_config(config_path=Path("/nonexistent/config.yml"))
+ assert config.host == "http://127.0.0.1:5030"
+ assert config.api_key == ""
+ assert config.verify_ssl is False
+ assert config.duration_tolerance == 15
+ assert config.num_threads == 5
+ assert config.search_timeout == 60
+ assert config.format == "flac"
+ assert config.recursive is False
+
+
+class TestEnvVars:
+ def test_env_overrides_defaults(self):
+ env = {
+ "SLSKD_HOST": "http://custom:9999",
+ "SLSKD_API_KEY": "test-key",
+ "SLSKD_VERIFY_SSL": "true",
+ "SLSKD_DURATION_TOLERANCE": "30",
+ "SLSKD_NUM_THREADS": "10",
+ "SLSKD_RECURSIVE": "yes",
+ }
+ with patch.dict(os.environ, env, clear=False):
+ config = load_config(config_path=Path("/nonexistent/config.yml"))
+ assert config.host == "http://custom:9999"
+ assert config.api_key == "test-key"
+ assert config.verify_ssl is True
+ assert config.duration_tolerance == 30
+ assert config.num_threads == 10
+ assert config.recursive is True
+
+ def test_bool_env_variations(self):
+ for truthy in ("true", "1", "yes", "True", "YES"):
+ with patch.dict(os.environ, {"SLSKD_VERIFY_SSL": truthy}, clear=False):
+ config = load_config(config_path=Path("/nonexistent/config.yml"))
+ assert config.verify_ssl is True
+
+ for falsy in ("false", "0", "no", ""):
+ with patch.dict(os.environ, {"SLSKD_VERIFY_SSL": falsy}, clear=False):
+ config = load_config(config_path=Path("/nonexistent/config.yml"))
+ assert config.verify_ssl is False
+
+
+class TestYamlConfig:
+ def test_loads_from_yaml(self):
+ import yaml
+ config_data = {
+ "host": "http://yaml-host:5030",
+ "api_key": "yaml-key",
+ "num_threads": 8,
+ }
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
+ yaml.dump(config_data, f)
+ f.flush()
+ config = load_config(config_path=Path(f.name))
+ assert config.host == "http://yaml-host:5030"
+ assert config.api_key == "yaml-key"
+ assert config.num_threads == 8
+ os.unlink(f.name)
+
+
+class TestCliOverrides:
+ def test_cli_overrides_env_and_yaml(self):
+ with patch.dict(os.environ, {"SLSKD_HOST": "http://env:5030"}, clear=False):
+ config = load_config(
+ config_path=Path("/nonexistent/config.yml"),
+ cli_overrides={"host": "http://cli:5030", "num_threads": 3}
+ )
+ assert config.host == "http://cli:5030"
+ assert config.num_threads == 3
+
+ def test_none_overrides_are_skipped(self):
+ config = load_config(
+ config_path=Path("/nonexistent/config.yml"),
+ cli_overrides={"host": None, "num_threads": 7}
+ )
+ assert config.host == "http://127.0.0.1:5030"
+ assert config.num_threads == 7
diff --git a/tests/test_rename.py b/tests/test_rename.py
new file mode 100644
index 0000000..fc3790f
--- /dev/null
+++ b/tests/test_rename.py
@@ -0,0 +1,74 @@
+import tempfile
+import os
+from pathlib import Path
+from unittest.mock import patch, MagicMock
+
+from slskd_transform.rename import collect_flac_files, extract_metadata, move_and_rename_flac_files
+
+
+class TestCollectFlacFiles:
+ def test_finds_flac_files(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ Path(tmpdir, "song.flac").touch()
+ Path(tmpdir, "other.mp3").touch()
+ subdir = Path(tmpdir, "subdir")
+ subdir.mkdir()
+ Path(subdir, "nested.FLAC").touch()
+
+ result = collect_flac_files(Path(tmpdir))
+ assert len(result) == 2
+
+ def test_empty_directory(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ result = collect_flac_files(Path(tmpdir))
+ assert result == []
+
+ def test_case_insensitive(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ Path(tmpdir, "lower.flac").touch()
+ Path(tmpdir, "upper.FLAC").touch()
+ Path(tmpdir, "mixed.Flac").touch()
+ result = collect_flac_files(Path(tmpdir))
+ assert len(result) == 3
+
+
+class TestExtractMetadata:
+ @patch("slskd_transform.rename.FLAC")
+ def test_extracts_title_and_artist(self, mock_flac_cls):
+ mock_audio = MagicMock()
+ mock_audio.get.side_effect = lambda key, default: {
+ "title": ["My Song"], "artist": ["My Artist"]
+ }.get(key, default)
+ mock_flac_cls.return_value = mock_audio
+
+ title, artist = extract_metadata(Path("/fake/path.flac"))
+ assert title == "My Song"
+ assert artist == "My Artist"
+
+ @patch("slskd_transform.rename.FLAC")
+ def test_defaults_to_unknown(self, mock_flac_cls):
+ mock_audio = MagicMock()
+ mock_audio.get.side_effect = lambda key, default: default
+ mock_flac_cls.return_value = mock_audio
+
+ title, artist = extract_metadata(Path("/fake/path.flac"))
+ assert title == "Unknown"
+ assert artist == "Unknown"
+
+
+class TestMoveAndRenameFlacFiles:
+ @patch("slskd_transform.rename.shutil")
+ @patch("slskd_transform.rename.extract_metadata")
+ def test_moves_with_metadata_name(self, mock_extract, mock_shutil):
+ mock_extract.return_value = ("My Song", "My Artist")
+ with tempfile.TemporaryDirectory() as dest:
+ move_and_rename_flac_files([Path("/source/file.flac")], Path(dest))
+ expected = os.path.join(dest, "My Artist - My Song.flac")
+ mock_shutil.move.assert_called_once_with(str(Path("/source/file.flac")), expected)
+
+ @patch("slskd_transform.rename.shutil")
+ @patch("slskd_transform.rename.extract_metadata")
+ def test_empty_list(self, mock_extract, mock_shutil):
+ with tempfile.TemporaryDirectory() as dest:
+ move_and_rename_flac_files([], Path(dest))
+ mock_shutil.move.assert_not_called()
diff --git a/tests/test_search.py b/tests/test_search.py
new file mode 100644
index 0000000..dc38e79
--- /dev/null
+++ b/tests/test_search.py
@@ -0,0 +1,172 @@
+import tempfile
+import os
+import pytest
+from pathlib import Path
+from unittest.mock import patch, MagicMock
+
+from slskd_transform.search import (
+ list_files_with_duration,
+ find_close_duration_song,
+ remove_hyphens_and_trim,
+ search_and_enqueue,
+ threaded_search_and_enqueue,
+)
+from slskd_transform.config import load_config
+
+
+def _make_config(**overrides):
+ defaults = {"api_key": "test", "host": "http://test:5030"}
+ defaults.update(overrides)
+ return load_config(config_path=Path("/nonexistent"), cli_overrides=defaults)
+
+
+class TestListFilesWithDuration:
+ def test_returns_name_duration_tuples(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ Path(tmpdir, "Song A.flac").touch()
+ Path(tmpdir, "Song B.mp3").touch()
+
+ mock_audio = MagicMock()
+ mock_audio.info.length = 245.5
+
+ with patch("slskd_transform.search.mutagen.File", return_value=mock_audio):
+ result = list_files_with_duration(Path(tmpdir))
+
+ assert len(result) == 2
+ names = [r[0] for r in result]
+ assert "Song A" in names
+ assert "Song B" in names
+ for _, duration in result:
+ assert duration == 245
+
+ def test_skips_dotfiles(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ Path(tmpdir, ".hidden.flac").touch()
+ Path(tmpdir, "visible.flac").touch()
+
+ mock_audio = MagicMock()
+ mock_audio.info.length = 100.0
+
+ with patch("slskd_transform.search.mutagen.File", return_value=mock_audio):
+ result = list_files_with_duration(Path(tmpdir))
+
+ assert len(result) == 1
+ assert result[0][0] == "visible"
+
+ def test_empty_directory(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ result = list_files_with_duration(Path(tmpdir))
+ assert result == []
+
+ def test_recursive_scanning(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ subdir = Path(tmpdir, "artist", "album")
+ subdir.mkdir(parents=True)
+ Path(tmpdir, "top.mp3").touch()
+ Path(subdir, "nested.flac").touch()
+
+ mock_audio = MagicMock()
+ mock_audio.info.length = 180.0
+
+ with patch("slskd_transform.search.mutagen.File", return_value=mock_audio):
+ result = list_files_with_duration(Path(tmpdir), recursive=True)
+
+ assert len(result) == 2
+
+ def test_non_recursive_ignores_subdirs(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ subdir = Path(tmpdir, "sub")
+ subdir.mkdir()
+ Path(tmpdir, "top.mp3").touch()
+ Path(subdir, "nested.flac").touch()
+
+ mock_audio = MagicMock()
+ mock_audio.info.length = 180.0
+
+ with patch("slskd_transform.search.mutagen.File", return_value=mock_audio):
+ result = list_files_with_duration(Path(tmpdir), recursive=False)
+
+ assert len(result) == 1
+ assert result[0][0] == "top"
+
+
+class TestFindCloseDurationSong:
+ def test_finds_matching_duration(self):
+ results = [
+ {"files": [{"filename": "song.flac", "length": 240}]},
+ ]
+ match = find_close_duration_song(results, 242)
+ assert match is not None
+
+ def test_no_match_outside_tolerance(self):
+ results = [{"files": [{"filename": "song.flac", "length": 240}]}]
+ match = find_close_duration_song(results, 300)
+ assert match is None
+
+ def test_empty_results(self):
+ assert find_close_duration_song([], 200) is None
+
+ def test_missing_length_key(self):
+ results = [{"files": [{"filename": "song.flac"}]}]
+ assert find_close_duration_song(results, 200) is None
+
+ def test_custom_tolerance(self):
+ results = [{"files": [{"filename": "song.flac", "length": 230}]}]
+ assert find_close_duration_song(results, 200, tolerance=30) is not None
+ assert find_close_duration_song(results, 200, tolerance=10) is None
+
+
+class TestRemoveHyphensAndTrim:
+ def test_basic(self):
+ assert remove_hyphens_and_trim("Artist - Song") == "Artist Song"
+
+ def test_multiple_hyphens(self):
+ assert remove_hyphens_and_trim("A - B - C") == "A B C"
+
+ def test_no_hyphens(self):
+ assert remove_hyphens_and_trim("No Hyphens") == "No Hyphens"
+
+ def test_empty_string(self):
+ assert remove_hyphens_and_trim("") == ""
+
+
+class TestSearchAndEnqueue:
+ @patch("slskd_transform.search.time.sleep")
+ def test_successful_enqueue(self, mock_sleep):
+ config = _make_config()
+ client = MagicMock()
+ client.searches.search_text.return_value = {"id": "s1"}
+ client.searches.search_responses.return_value = [
+ {"username": "peer1", "files": [{"filename": "song.flac", "length": 200, "size": 5000}]}
+ ]
+ client.transfers.enqueue.return_value = True
+
+ unfound = []
+ search_and_enqueue([("Artist - Song", 200)], unfound, config=config, client=client)
+ assert unfound == []
+ client.transfers.enqueue.assert_called_once()
+
+ @patch("slskd_transform.search.time.sleep")
+ def test_no_match_adds_to_unfound(self, mock_sleep):
+ config = _make_config()
+ client = MagicMock()
+ client.searches.search_text.return_value = {"id": "s1"}
+ client.searches.search_responses.return_value = [
+ {"username": "peer1", "files": [{"filename": "song.flac", "length": 999, "size": 5000}]}
+ ]
+
+ unfound = []
+ search_and_enqueue([("Song", 200)], unfound, config=config, client=client)
+ assert "Song" in unfound
+
+
+class TestThreadedSearchAndEnqueue:
+ @patch("slskd_transform.search.time.sleep")
+ @patch("slskd_transform.search.search_and_enqueue")
+ def test_distributes_songs(self, mock_search, mock_sleep):
+ config = _make_config(num_threads=2)
+ client = MagicMock()
+ songs = [(f"Song {i}", i * 10) for i in range(10)]
+
+ threaded_search_and_enqueue(songs, [], config=config, client=client)
+ assert mock_search.call_count == 2
diff --git a/tests/test_slskd_transform.py b/tests/test_slskd_transform.py
deleted file mode 100644
index 3085e91..0000000
--- a/tests/test_slskd_transform.py
+++ /dev/null
@@ -1,667 +0,0 @@
-"""Tests for slskd-transform: main.py and rename-files.py.
-
-Imports the actual module functions (guarded by if __name__ == '__main__')
-and tests them with mocked I/O, network calls, and external dependencies.
-"""
-
-import pytest
-import tempfile
-import os
-import sys
-import csv
-import importlib
-import importlib.util
-from pathlib import Path
-from unittest.mock import patch, MagicMock, call
-from threading import Thread
-
-import main
-# rename-files.py has a hyphen so we need importlib
-_spec = importlib.util.spec_from_file_location(
- "rename_files",
- os.path.join(os.path.dirname(os.path.dirname(__file__)), "rename-files.py"),
-)
-rename_files = importlib.util.module_from_spec(_spec)
-_spec.loader.exec_module(rename_files)
-
-
-# ---------------------------------------------------------------------------
-# main.py — write_unfound_songs_to_csv
-# ---------------------------------------------------------------------------
-
-class TestWriteUnfoundSongsToCsv:
- def test_writes_header_and_songs(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- filepath = os.path.join(tmpdir, "unfound.csv")
- main.write_unfound_songs_to_csv(["Song A", "Song B"], filepath)
-
- with open(filepath, newline='', encoding='utf-8') as f:
- reader = list(csv.reader(f))
- assert reader[0] == ["Song Name"]
- assert reader[1] == ["Song A"]
- assert reader[2] == ["Song B"]
- assert len(reader) == 3
-
- def test_writes_empty_list(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- filepath = os.path.join(tmpdir, "unfound.csv")
- main.write_unfound_songs_to_csv([], filepath)
-
- with open(filepath, newline='', encoding='utf-8') as f:
- reader = list(csv.reader(f))
- assert reader == [["Song Name"]]
-
- def test_writes_special_characters(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- filepath = os.path.join(tmpdir, "unfound.csv")
- main.write_unfound_songs_to_csv(["Song, With Comma", 'Song "Quotes"'], filepath)
-
- with open(filepath, newline='', encoding='utf-8') as f:
- reader = list(csv.reader(f))
- assert reader[1] == ["Song, With Comma"]
- assert reader[2] == ['Song "Quotes"']
-
-
-# ---------------------------------------------------------------------------
-# main.py — list_files_without_ext
-# ---------------------------------------------------------------------------
-
-class TestListFilesWithoutExt:
- def test_strips_extension(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "Artist - Song.flac").touch()
- Path(tmpdir, "Other - Track.mp3").touch()
- result = main.list_files_without_ext(tmpdir)
- assert "Artist - Song" in result
- assert "Other - Track" in result
-
- def test_skips_dotfiles(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, ".hidden").touch()
- Path(tmpdir, "visible.flac").touch()
- result = main.list_files_without_ext(tmpdir)
- assert "visible" in result
- assert len(result) == 1
-
- def test_empty_directory(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- result = main.list_files_without_ext(tmpdir)
- assert result == []
-
- def test_file_without_extension(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "noext").touch()
- result = main.list_files_without_ext(tmpdir)
- assert "noext" in result
-
- def test_multiple_dots_in_name(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "artist.feat.other.flac").touch()
- result = main.list_files_without_ext(tmpdir)
- assert "artist.feat.other" in result
-
-
-# ---------------------------------------------------------------------------
-# main.py — list_files_with_duration
-# ---------------------------------------------------------------------------
-
-class TestListFilesWithDuration:
- def test_returns_name_duration_tuples(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "Song A.flac").touch()
- Path(tmpdir, "Song B.mp3").touch()
-
- mock_audio_info = MagicMock()
- mock_audio_info.info.length = 245.5
-
- with patch("main.mutagen.File", return_value=mock_audio_info):
- result = main.list_files_with_duration(tmpdir)
-
- assert len(result) == 2
- names = [r[0] for r in result]
- assert "Song A" in names
- assert "Song B" in names
- for _, duration in result:
- assert duration == 245
-
- def test_skips_dotfiles(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, ".hidden.flac").touch()
- Path(tmpdir, "visible.flac").touch()
-
- mock_audio_info = MagicMock()
- mock_audio_info.info.length = 100.0
-
- with patch("main.mutagen.File", return_value=mock_audio_info):
- result = main.list_files_with_duration(tmpdir)
-
- assert len(result) == 1
- assert result[0][0] == "visible"
-
- def test_empty_directory(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- result = main.list_files_with_duration(tmpdir)
- assert result == []
-
- def test_duration_truncated_to_int(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "track.flac").touch()
-
- mock_audio_info = MagicMock()
- mock_audio_info.info.length = 199.99
-
- with patch("main.mutagen.File", return_value=mock_audio_info):
- result = main.list_files_with_duration(tmpdir)
-
- assert result[0][1] == 199
-
-
-# ---------------------------------------------------------------------------
-# main.py — find_close_duration_song
-# ---------------------------------------------------------------------------
-
-class TestFindCloseDurationSong:
- def test_finds_matching_duration(self):
- results = [
- {"files": [{"filename": "song.flac", "length": 240}]},
- {"files": [{"filename": "other.flac", "length": 300}]},
- ]
- match = main.find_close_duration_song(results, 242)
- assert match is not None
- assert match["files"][0]["filename"] == "song.flac"
-
- def test_no_match_outside_tolerance(self):
- results = [
- {"files": [{"filename": "song.flac", "length": 240}]},
- ]
- match = main.find_close_duration_song(results, 300)
- assert match is None
-
- def test_empty_results(self):
- match = main.find_close_duration_song([], 200)
- assert match is None
-
- def test_missing_length_key(self):
- results = [
- {"files": [{"filename": "song.flac"}]},
- ]
- match = main.find_close_duration_song(results, 200)
- assert match is None
-
- def test_tolerance_boundary_exact(self):
- results = [
- {"files": [{"filename": "song.flac", "length": 215}]},
- ]
- match = main.find_close_duration_song(results, 200)
- assert match is not None
-
- def test_tolerance_boundary_exceeded(self):
- results = [
- {"files": [{"filename": "song.flac", "length": 216}]},
- ]
- match = main.find_close_duration_song(results, 200)
- assert match is None
-
- def test_result_with_no_files_key(self):
- results = [{"username": "user1"}]
- match = main.find_close_duration_song(results, 200)
- assert match is None
-
- def test_result_with_empty_files(self):
- results = [{"files": []}]
- match = main.find_close_duration_song(results, 200)
- assert match is None
-
- def test_returns_first_match(self):
- results = [
- {"files": [{"filename": "first.flac", "length": 200}]},
- {"files": [{"filename": "second.flac", "length": 201}]},
- ]
- match = main.find_close_duration_song(results, 200)
- assert match["files"][0]["filename"] == "first.flac"
-
- def test_skips_bad_result_finds_good(self):
- results = [
- {"files": [{"filename": "bad.flac"}]}, # no length key
- {"files": [{"filename": "good.flac", "length": 200}]},
- ]
- match = main.find_close_duration_song(results, 200)
- assert match["files"][0]["filename"] == "good.flac"
-
-
-# ---------------------------------------------------------------------------
-# main.py — remove_hyphens_and_trim
-# ---------------------------------------------------------------------------
-
-class TestRemoveHyphensAndTrim:
- def test_basic_hyphen_removal(self):
- assert main.remove_hyphens_and_trim("Artist - Song Name") == "Artist Song Name"
-
- def test_multiple_hyphens(self):
- assert main.remove_hyphens_and_trim("A - B - C") == "A B C"
-
- def test_no_hyphens(self):
- assert main.remove_hyphens_and_trim("No Hyphens Here") == "No Hyphens Here"
-
- def test_leading_trailing_spaces(self):
- result = main.remove_hyphens_and_trim(" Artist - Song ")
- assert result == "Artist Song"
-
- def test_empty_string(self):
- assert main.remove_hyphens_and_trim("") == ""
-
- def test_only_hyphen(self):
- # Splitting "-" gives ["", ""] which join as " "
- assert main.remove_hyphens_and_trim("-") == " "
-
- def test_consecutive_hyphens(self):
- # "A--B" splits into ["A", "", "B"] which join as "A B"
- result = main.remove_hyphens_and_trim("A--B")
- assert result == "A B"
-
-
-# ---------------------------------------------------------------------------
-# main.py — search_and_enqueue
-# ---------------------------------------------------------------------------
-
-class TestSearchAndEnqueue:
- def _make_mock_slskd(self, search_response, search_results, enqueue_return=True):
- mock = MagicMock()
- mock.searches.search_text.return_value = search_response
- mock.searches.search_responses.return_value = search_results
- mock.transfers.enqueue.return_value = enqueue_return
- return mock
-
- @patch("main.time.sleep")
- def test_successful_enqueue(self, mock_sleep):
- search_resp = {"id": "search-1"}
- search_results = [
- {
- "username": "peer1",
- "files": [{"filename": "song.flac", "length": 200, "size": 5000}],
- }
- ]
- main.slskd = self._make_mock_slskd(search_resp, search_results, enqueue_return=True)
-
- unfound = []
- main.search_and_enqueue([("Artist - Song", 200)], unfound)
-
- assert unfound == []
- main.slskd.searches.search_text.assert_called_once()
- main.slskd.transfers.enqueue.assert_called_once_with(
- username="peer1",
- files=[{"filename": "song.flac", "size": 5000}],
- )
- mock_sleep.assert_called_with(60)
-
- @patch("main.time.sleep")
- def test_no_matching_duration(self, mock_sleep):
- search_resp = {"id": "search-1"}
- search_results = [
- {
- "username": "peer1",
- "files": [{"filename": "song.flac", "length": 999, "size": 5000}],
- }
- ]
- main.slskd = self._make_mock_slskd(search_resp, search_results)
-
- unfound = []
- main.search_and_enqueue([("Artist - Song", 200)], unfound)
-
- assert "Artist - Song" in unfound
-
- @patch("main.time.sleep")
- def test_enqueue_returns_false(self, mock_sleep):
- search_resp = {"id": "search-1"}
- search_results = [
- {
- "username": "peer1",
- "files": [{"filename": "song.flac", "length": 200, "size": 5000}],
- }
- ]
- main.slskd = self._make_mock_slskd(search_resp, search_results, enqueue_return=False)
-
- unfound = []
- main.search_and_enqueue([("Artist - Song", 200)], unfound)
-
- assert "Artist - Song" in unfound
-
- @patch("main.time.sleep")
- def test_enqueue_http_error(self, mock_sleep):
- import requests
-
- search_resp = {"id": "search-1"}
- search_results = [
- {
- "username": "peer1",
- "files": [{"filename": "song.flac", "length": 200, "size": 5000}],
- }
- ]
- mock_slskd = self._make_mock_slskd(search_resp, search_results)
- mock_slskd.transfers.enqueue.side_effect = requests.exceptions.HTTPError("500")
- main.slskd = mock_slskd
-
- unfound = []
- main.search_and_enqueue([("Artist - Song", 200)], unfound)
-
- assert "Artist - Song" in unfound
-
- @patch("main.time.sleep")
- def test_multiple_songs(self, mock_sleep):
- search_resp = {"id": "search-1"}
- # First song found, second not
- main.slskd = MagicMock()
- main.slskd.searches.search_text.return_value = search_resp
-
- good_results = [
- {
- "username": "peer1",
- "files": [{"filename": "song1.flac", "length": 200, "size": 5000}],
- }
- ]
- bad_results = [
- {
- "username": "peer2",
- "files": [{"filename": "song2.flac", "length": 999, "size": 5000}],
- }
- ]
- main.slskd.searches.search_responses.side_effect = [good_results, bad_results]
- main.slskd.transfers.enqueue.return_value = True
-
- unfound = []
- main.search_and_enqueue(
- [("Song One", 200), ("Song Two", 100)], unfound
- )
-
- assert len(unfound) == 1
- assert "Song Two" in unfound
-
- @patch("main.time.sleep")
- def test_search_text_includes_flac(self, mock_sleep):
- search_resp = {"id": "search-1"}
- main.slskd = self._make_mock_slskd(search_resp, [], enqueue_return=True)
-
- unfound = []
- main.search_and_enqueue([("My Song", 200)], unfound)
-
- call_args = main.slskd.searches.search_text.call_args
- assert "flac" in call_args[1]["searchText"]
-
- @patch("main.time.sleep")
- def test_empty_song_list(self, mock_sleep):
- main.slskd = MagicMock()
- unfound = []
- main.search_and_enqueue([], unfound)
- assert unfound == []
- main.slskd.searches.search_text.assert_not_called()
-
-
-# ---------------------------------------------------------------------------
-# main.py — threaded_search_and_enqueue
-# ---------------------------------------------------------------------------
-
-class TestThreadedSearchAndEnqueue:
- @patch("main.time.sleep")
- @patch("main.search_and_enqueue")
- def test_distributes_songs_across_threads(self, mock_search, mock_sleep):
- songs = [(f"Song {i}", i * 10) for i in range(10)]
- unfound = []
-
- main.threaded_search_and_enqueue(songs, unfound, num_threads=2)
-
- assert mock_search.call_count == 2
- # First chunk: 5 songs, second chunk: remaining 5
- first_chunk = mock_search.call_args_list[0][0][0]
- second_chunk = mock_search.call_args_list[1][0][0]
- assert len(first_chunk) + len(second_chunk) == 10
-
- @patch("main.time.sleep")
- @patch("main.search_and_enqueue")
- def test_single_thread(self, mock_search, mock_sleep):
- songs = [("Song A", 100), ("Song B", 200)]
- unfound = []
-
- main.threaded_search_and_enqueue(songs, unfound, num_threads=1)
-
- assert mock_search.call_count == 1
- chunk = mock_search.call_args_list[0][0][0]
- assert len(chunk) == 2
-
- @patch("main.time.sleep")
- @patch("main.search_and_enqueue")
- def test_last_thread_gets_remainder(self, mock_search, mock_sleep):
- songs = [(f"Song {i}", i * 10) for i in range(7)]
- unfound = []
-
- main.threaded_search_and_enqueue(songs, unfound, num_threads=3)
-
- assert mock_search.call_count == 3
- chunks = [c[0][0] for c in mock_search.call_args_list]
- total = sum(len(c) for c in chunks)
- assert total == 7
- # Last chunk should have the remainder
- assert len(chunks[2]) >= len(chunks[0])
-
- @patch("main.time.sleep")
- @patch("main.search_and_enqueue")
- def test_sleeps_between_thread_starts(self, mock_search, mock_sleep):
- songs = [(f"Song {i}", i * 10) for i in range(6)]
- unfound = []
-
- main.threaded_search_and_enqueue(songs, unfound, num_threads=3)
-
- sleep_calls = [c[0][0] for c in mock_sleep.call_args_list]
- assert sleep_calls.count(10) == 3
-
-
-# ---------------------------------------------------------------------------
-# main.py — main()
-# ---------------------------------------------------------------------------
-
-class TestMainFunction:
- @patch("main.write_unfound_songs_to_csv")
- @patch("main.threaded_search_and_enqueue")
- @patch("main.list_files_with_duration", return_value=[("Song", 200)])
- @patch("main.slskd_api.SlskdClient")
- def test_main_no_unfound(self, mock_client, mock_list, mock_threaded, mock_csv):
- mock_threaded.side_effect = lambda songs, unfound, **kw: None
- main.main()
-
- mock_client.assert_called_once()
- mock_list.assert_called_once()
- mock_threaded.assert_called_once()
- mock_csv.assert_not_called()
-
- @patch("main.write_unfound_songs_to_csv")
- @patch("main.threaded_search_and_enqueue")
- @patch("main.list_files_with_duration", return_value=[("Song", 200)])
- @patch("main.slskd_api.SlskdClient")
- def test_main_with_unfound(self, mock_client, mock_list, mock_threaded, mock_csv):
- def add_unfound(songs, unfound, **kw):
- unfound.append("Lost Song")
-
- mock_threaded.side_effect = add_unfound
- main.main()
-
- mock_csv.assert_called_once_with(["Lost Song"], "unfound_songs.csv")
-
-
-# ---------------------------------------------------------------------------
-# rename-files.py — collect_flac_files
-# ---------------------------------------------------------------------------
-
-class TestCollectFlacFiles:
- def test_finds_flac_files(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "song.flac").touch()
- Path(tmpdir, "other.mp3").touch()
- subdir = Path(tmpdir, "subdir")
- subdir.mkdir()
- Path(subdir, "nested.FLAC").touch()
-
- result = rename_files.collect_flac_files(tmpdir)
- assert len(result) == 2
- names = [os.path.basename(f) for f in result]
- assert "song.flac" in names
- assert "nested.FLAC" in names
- assert "other.mp3" not in names
-
- def test_empty_directory(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- result = rename_files.collect_flac_files(tmpdir)
- assert result == []
-
- def test_deeply_nested(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- deep = Path(tmpdir, "a", "b", "c")
- deep.mkdir(parents=True)
- Path(deep, "deep.flac").touch()
- result = rename_files.collect_flac_files(tmpdir)
- assert len(result) == 1
- assert "deep.flac" in result[0]
-
- def test_case_insensitive_extension(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "lower.flac").touch()
- Path(tmpdir, "upper.FLAC").touch()
- Path(tmpdir, "mixed.Flac").touch()
- result = rename_files.collect_flac_files(tmpdir)
- assert len(result) == 3
-
- def test_ignores_non_flac(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- Path(tmpdir, "song.mp3").touch()
- Path(tmpdir, "song.wav").touch()
- Path(tmpdir, "song.ogg").touch()
- Path(tmpdir, "song.flac.txt").touch()
- result = rename_files.collect_flac_files(tmpdir)
- assert result == []
-
-
-# ---------------------------------------------------------------------------
-# rename-files.py — extract_metadata
-# ---------------------------------------------------------------------------
-
-class TestExtractMetadata:
- @patch.object(rename_files, "FLAC")
- def test_extracts_title_and_artist(self, mock_flac_cls):
- mock_audio = MagicMock()
- mock_audio.get.side_effect = lambda key, default: {
- "title": ["My Song"],
- "artist": ["My Artist"],
- }.get(key, default)
- mock_flac_cls.return_value = mock_audio
-
- title, artist = rename_files.extract_metadata("/fake/path.flac")
- assert title == "My Song"
- assert artist == "My Artist"
-
- @patch.object(rename_files, "FLAC")
- def test_defaults_to_unknown(self, mock_flac_cls):
- mock_audio = MagicMock()
- mock_audio.get.side_effect = lambda key, default: default
- mock_flac_cls.return_value = mock_audio
-
- title, artist = rename_files.extract_metadata("/fake/path.flac")
- assert title == "Unknown"
- assert artist == "Unknown"
-
-
-# ---------------------------------------------------------------------------
-# rename-files.py — sanitize_filename
-# ---------------------------------------------------------------------------
-
-class TestSanitizeFilename:
- def test_removes_invalid_chars(self):
- assert rename_files.sanitize_filename('Song: "Title"') == "Song Title"
- assert rename_files.sanitize_filename("A/B\\C") == "ABC"
- assert rename_files.sanitize_filename("normal.flac") == "normal.flac"
-
- def test_removes_all_special_chars(self):
- result = rename_files.sanitize_filename("A*B?CE|F")
- assert "*" not in result
- assert "?" not in result
- assert "<" not in result
- assert ">" not in result
- assert "|" not in result
-
- def test_preserves_valid_chars(self):
- assert (
- rename_files.sanitize_filename("Hello World - Track 01.flac")
- == "Hello World - Track 01.flac"
- )
-
- def test_empty_string(self):
- assert rename_files.sanitize_filename("") == ""
-
- def test_all_invalid(self):
- assert rename_files.sanitize_filename('\\/:*?"<>|') == ""
-
-
-# ---------------------------------------------------------------------------
-# rename-files.py — move_and_rename_flac_files
-# ---------------------------------------------------------------------------
-
-class TestMoveAndRenameFlacFiles:
- @patch.object(rename_files, "shutil")
- @patch.object(rename_files, "extract_metadata")
- def test_moves_files_with_metadata_name(self, mock_extract, mock_shutil):
- mock_extract.return_value = ("My Song", "My Artist")
-
- with tempfile.TemporaryDirectory() as dest:
- rename_files.move_and_rename_flac_files(
- ["/source/file.flac"], dest
- )
-
- expected_path = os.path.join(dest, "My Artist - My Song.flac")
- mock_shutil.move.assert_called_once_with("/source/file.flac", expected_path)
-
- @patch.object(rename_files, "shutil")
- @patch.object(rename_files, "extract_metadata")
- def test_sanitizes_filename(self, mock_extract, mock_shutil):
- mock_extract.return_value = ('Song: "Special"', "Art/ist")
-
- with tempfile.TemporaryDirectory() as dest:
- rename_files.move_and_rename_flac_files(
- ["/source/file.flac"], dest
- )
-
- expected_path = os.path.join(dest, "Artist - Song Special.flac")
- mock_shutil.move.assert_called_once_with("/source/file.flac", expected_path)
-
- @patch.object(rename_files, "shutil")
- @patch.object(rename_files, "extract_metadata")
- def test_multiple_files(self, mock_extract, mock_shutil):
- mock_extract.side_effect = [
- ("Song A", "Artist A"),
- ("Song B", "Artist B"),
- ]
-
- with tempfile.TemporaryDirectory() as dest:
- rename_files.move_and_rename_flac_files(
- ["/source/a.flac", "/source/b.flac"], dest
- )
-
- assert mock_shutil.move.call_count == 2
-
- @patch.object(rename_files, "shutil")
- @patch.object(rename_files, "extract_metadata")
- def test_empty_file_list(self, mock_extract, mock_shutil):
- with tempfile.TemporaryDirectory() as dest:
- rename_files.move_and_rename_flac_files([], dest)
- mock_shutil.move.assert_not_called()
- mock_extract.assert_not_called()
-
-
-# ---------------------------------------------------------------------------
-# rename-files.py — main()
-# ---------------------------------------------------------------------------
-
-class TestRenameFilesMain:
- @patch.object(rename_files, "move_and_rename_flac_files")
- @patch.object(rename_files, "collect_flac_files", return_value=["/dl/song.flac"])
- def test_main_calls_pipeline(self, mock_collect, mock_move):
- rename_files.main()
-
- mock_collect.assert_called_once_with("G:\\slskd\\downloads")
- mock_move.assert_called_once_with(["/dl/song.flac"], "G:\\slskd\\")
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..9903882
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,56 @@
+import csv
+import tempfile
+import os
+from pathlib import Path
+
+from slskd_transform.utils import write_unfound_songs_to_csv, sanitize_filename
+
+
+class TestWriteUnfoundSongsToCsv:
+ def test_writes_header_and_songs(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ filepath = Path(tmpdir) / "unfound.csv"
+ write_unfound_songs_to_csv(["Song A", "Song B"], filepath)
+ with open(filepath, newline='', encoding='utf-8') as f:
+ reader = list(csv.reader(f))
+ assert reader[0] == ["Song Name"]
+ assert reader[1] == ["Song A"]
+ assert reader[2] == ["Song B"]
+
+ def test_writes_empty_list(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ filepath = Path(tmpdir) / "unfound.csv"
+ write_unfound_songs_to_csv([], filepath)
+ with open(filepath, newline='', encoding='utf-8') as f:
+ reader = list(csv.reader(f))
+ assert reader == [["Song Name"]]
+
+ def test_writes_special_characters(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ filepath = Path(tmpdir) / "unfound.csv"
+ write_unfound_songs_to_csv(["Song, With Comma", 'Song "Quotes"'], filepath)
+ with open(filepath, newline='', encoding='utf-8') as f:
+ reader = list(csv.reader(f))
+ assert reader[1] == ["Song, With Comma"]
+ assert reader[2] == ['Song "Quotes"']
+
+
+class TestSanitizeFilename:
+ def test_removes_invalid_chars(self):
+ assert sanitize_filename('Song: "Title"') == "Song Title"
+ assert sanitize_filename("A/B\\C") == "ABC"
+ assert sanitize_filename("normal.flac") == "normal.flac"
+
+ def test_removes_all_special_chars(self):
+ result = sanitize_filename("A*B?CE|F")
+ assert "*" not in result
+ assert "?" not in result
+
+ def test_preserves_valid_chars(self):
+ assert sanitize_filename("Hello World - Track 01.flac") == "Hello World - Track 01.flac"
+
+ def test_empty_string(self):
+ assert sanitize_filename("") == ""
+
+ def test_all_invalid(self):
+ assert sanitize_filename('\\/:*?"<>|') == ""