Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 11 additions & 27 deletions twotone/tools/melt/melt_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ def analyze_duplicates(self, duplicates: dict[str, list[str]]) -> list[dict[str,

return analysis_plan

_stream_short_details = staticmethod(stream_short_details)

@staticmethod
def _pick_track_by_tid(streams: Sequence[dict[str, Any]], tid: int) -> dict[str, Any]:
track = next((item for item in streams if item.get("tid") == tid), None)
Expand Down Expand Up @@ -112,7 +110,7 @@ def show(key: str) -> bool:
self.logger.debug(f" {stream_type}: {len(streams)} track(s)")
for stream in streams:
lang_name = language_utils.language_name(stream.get("language"))
short = self._stream_short_details(stream_type, stream)
short = stream_short_details(stream_type, stream)

info = lang_name
if short:
Expand Down Expand Up @@ -154,7 +152,7 @@ def _print_streams_details(
track_infos = tracks.get(path, {}).get(stype, [])
for info in track_infos:
if info.get("tid") == tid:
stream_details = self._stream_short_details(stype, info)
stream_details = stream_short_details(stype, info)
break

extra = f" ({stream_details})" if stream_details else ""
Expand All @@ -177,8 +175,7 @@ def _probe_inputs(files: Sequence[str]) -> tuple[dict[str, Any], dict[str, Any],
tracks = {file: info["tracks"] for file, info in details_full.items()}
return details_full, attachments, tracks

@staticmethod
def _prepare_duplicates_set(duplicates: dict[str, list[str]]) -> list[dict[str, Any]]:
def _prepare_duplicates_set(self, duplicates: dict[str, list[str]]) -> list[dict[str, Any]]:
"""Prepare groups of duplicate files and output names per title.

Returns a plan in the form:
Expand All @@ -194,33 +191,25 @@ def file_without_ext(path: str) -> str:
dir, name, _ = files_utils.split_path(path)
return os.path.join(dir, name)

def collect_media_files(dir_path: str) -> list[str]:
media_files = video_utils.collect_video_files(dir_path, self.duplicates_source.interruption)
media_files.sort()
return media_files

if all(os.path.isdir(p) for p in entries):
dirs = entries

if len(dirs) == 1:
# Special case: single dir → treat all files as one group of duplicates
dir_path = dirs[0]
media_files = [
os.path.join(root, file)
for root, _, filenames in os.walk(dir_path)
for file in filenames
if video_utils.is_video(file)
]
media_files.sort()
media_files = collect_media_files(dir_path)
output_name = file_without_ext(os.path.relpath(media_files[0], dir_path)) if media_files else "output"
return [(media_files, output_name)]

# Multiple dirs → group matching files by position
files_per_dir = []
for dir_path in dirs:
media_files = [
os.path.join(root, file)
for root, _, filenames in os.walk(dir_path)
for file in filenames
if video_utils.is_video(file)
]
media_files.sort()
files_per_dir.append(media_files)
files_per_dir.append(collect_media_files(dir_path))

lengths = [len(files) for files in files_per_dir]
if len(set(lengths)) != 1:
Expand Down Expand Up @@ -324,12 +313,7 @@ def _log_group_issue(self, title: str, issue: str, files: Sequence[str], ids: di
self.logger.warning(" #%d: %s", ids[path], self._format_group_path(path))

def _format_group_path(self, path: str) -> str:
if not self.base_path:
return path
try:
return os.path.relpath(path, self.base_path)
except ValueError:
return path
return files_utils.format_path(path, self.base_path)

def _validate_group_lengths(
self,
Expand Down
6 changes: 2 additions & 4 deletions twotone/tools/melt/melt_common.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
import re

from typing import Any, Literal

from ..utils import generic_utils
from ..utils import files_utils, generic_utils

FramesInfo = dict[int, dict[str, str]]
StreamType = Literal["video", "audio", "subtitle"]
Expand Down Expand Up @@ -71,8 +70,7 @@ def _split_path_fix(value: str) -> list[str]:


def _ensure_working_dir(working_dir: str) -> str:
os.makedirs(working_dir, exist_ok=True)
return working_dir
return files_utils.ensure_dir(working_dir)


def _is_length_mismatch(base_ms: int | None, other_ms: int | None, tolerance_ms: int) -> bool:
Expand Down
5 changes: 1 addition & 4 deletions twotone/tools/melt/melt_performer.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,10 +881,7 @@ def _build_output_path(self, title: str, output_name: str) -> str:
return os.path.join(self.output_dir, title, output_name + ".mkv")

def _display_path(self, path: str) -> str:
try:
return os.path.relpath(path, self.output_dir)
except ValueError:
return path
return files_utils.format_path(path, self.output_dir)

def _copy_single_input(self, input_path: str, output_path: str) -> None:
self.logger.info(
Expand Down
5 changes: 1 addition & 4 deletions twotone/tools/melt/melt_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,13 @@ def render(self, logger: logging.Logger) -> None:

self._render_planned(logger, planned_items)

_stream_short_details = staticmethod(stream_short_details)

@staticmethod
def _format_track_line(stype: StreamType, stream: dict[str, Any], used: bool, override_lang: str | None = None) -> str:
tid = stream.get("tid", "?")
name = stream.get("name")
lang = override_lang or stream.get("language")
lang_name = language_utils.language_name(lang) if lang else "unknown"
details = MeltPlan._stream_short_details(stype, stream)
details = stream_short_details(stype, stream)
parts = []
if stype != "video" or lang_name != "unknown":
parts.append(lang_name)
Expand Down Expand Up @@ -154,4 +152,3 @@ def _render_group_streams(
flag = "used" if used else "skip"
logger.info("%s #%s (%s): %s", prefix, tid, flag, name)


20 changes: 10 additions & 10 deletions twotone/tools/melt/streams_picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ def pick_streams(self, files_details: dict, ids: dict[str, int]):
video_stream_path = video_stream[0]

# pick audio streams
forced_audio_language_raw = {path: self.duplicates_source.get_metadata_for(path).get("audio_lang") for path in files_details}
forced_audio_language: dict[str, str] = {}
for path, lang in forced_audio_language_raw.items():
if lang and isinstance(lang, str):
forced_audio_language[path] = language_utils.unify_lang(lang)
forced_audio_language = self._metadata_language_overrides(files_details, "audio_lang", normalize=True)
audio_streams = self._pick_streams(
files_details,
video_stream_path,
Expand All @@ -56,11 +52,7 @@ def pick_streams(self, files_details: dict, ids: dict[str, int]):
)

# pick subtitle streams
forced_subtitle_language_raw = {path: self.duplicates_source.get_metadata_for(path).get("subtitle_lang") for path in files_details}
forced_subtitle_language: dict[str, str] = {}
for path, lang in forced_subtitle_language_raw.items():
if lang and isinstance(lang, str):
forced_subtitle_language[path] = lang
forced_subtitle_language = self._metadata_language_overrides(files_details, "subtitle_lang", normalize=False)
subtitle_streams = self._pick_streams(
files_details,
video_stream_path,
Expand All @@ -86,6 +78,14 @@ def _path_forces_all_streams(self, path: str) -> bool:
metadata = self.duplicates_source.get_metadata_for(path)
return self._metadata_flag_is_enabled(metadata.get("force_all_streams"))

def _metadata_language_overrides(self, files_details: dict, key: str, *, normalize: bool) -> dict[str, str]:
overrides: dict[str, str] = {}
for path in files_details:
lang = self.duplicates_source.get_metadata_for(path).get(key)
if lang and isinstance(lang, str):
overrides[path] = language_utils.unify_lang(lang) if normalize else lang
return overrides


@staticmethod
def _iter_starting_with(d: dict, start_key) -> Generator[Any, Any, Any]:
Expand Down
5 changes: 5 additions & 0 deletions twotone/tools/utils/files_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def split_path(path: str) -> tuple[str, str, str]:
return str(info.parent), info.stem, info.suffix[1:]


def ensure_dir(path: str) -> str:
os.makedirs(path, exist_ok=True)
return path


class ScopedDirectory:
def __init__(self, path: str):
self.path = Path(path)
Expand Down
Loading