Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,11 @@ def run_twotone(tool: str, tool_options: list[str] | None = None, global_options
def simulate_process_failure(target_exec: str):
original = process_utils.start_process

def wrapper(cmd, args):
def wrapper(cmd, args, *wrapper_args, **wrapper_kwargs):
_, exec_name, _ = files_utils.split_path(cmd)
if exec_name == target_exec:
return process_utils.ProcessResult(1, b"", b"")
return original(cmd, args)
return process_utils.ProcessResult(1, "", "")
return original(cmd, args, *wrapper_args, **wrapper_kwargs)

with patch("twotone.tools.utils.process_utils.start_process", side_effect=wrapper) as p:
yield p
2 changes: 1 addition & 1 deletion tests/melt/test_pair_matcher_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _make_pair_matcher(self, lhs_fps: float = 25.0, rhs_fps: float = 25.0) -> Pa
fps_map = {"/fake/lhs.mp4": str(lhs_fps), "/fake/rhs.mp4": str(rhs_fps)}

with patch.object(video_utils, 'get_video_data',
side_effect=lambda p: {"video": [{"fps": fps_map[p]}]}):
side_effect=lambda p, **_kwargs: {"video": [{"fps": fps_map[p]}]}):
pm = PairMatcher(
interruption=generic_utils.InterruptibleProcess(),
wd=tempfile.mkdtemp(),
Expand Down
8 changes: 4 additions & 4 deletions tests/melt/test_performer_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def fake_raise_on_error(result):
right_points = [p[1] for p in segment_pairs]
source_segment_dur = max(right_points) - min(right_points)

def fake_get_duration(path):
def fake_get_duration(path, **_kwargs):
if "source_trimmed" in path or "source_scaled" in path or "out." in path:
return source_segment_dur
return base_duration_ms
Expand Down Expand Up @@ -703,7 +703,7 @@ def fake_start_process(tool, args, **kwargs):
calls.append((tool, list(args)))
return _FAKE_PROCESS_OK

def fake_get_duration(path):
def fake_get_duration(path, **_kwargs):
# Trimmed audio is shorter than requested → simulates AVI deficit
if "source_trimmed" in path:
return actual_trimmed_dur
Expand Down Expand Up @@ -773,7 +773,7 @@ def test_patch_audio_no_deficit_keeps_original_sync_offset(self):
pairs = [(seg1_start, seg2_start), (seg1_end, seg2_end)]
source_dur = seg2_end - seg2_start

def fake_get_duration(path):
def fake_get_duration(path, **_kwargs):
if "source_trimmed" in path or "source_scaled" in path or "out." in path:
return source_dur # no deficit — actual equals requested
return 7000000
Expand Down Expand Up @@ -808,7 +808,7 @@ def test_patch_audio_fill_gaps_raises_on_deficit(self):
source_dur = seg2_end - seg2_start
audio_deficit_ms = 488

def fake_get_duration(path):
def fake_get_duration(path, **_kwargs):
if "source_trimmed" in path:
return source_dur - audio_deficit_ms
return 7000000
Expand Down
79 changes: 79 additions & 0 deletions tests/utils/test_logging_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import ast
import unittest
from pathlib import Path


FORBIDDEN_LOGGING_CALLS = {
"critical",
"debug",
"error",
"exception",
"info",
"log",
"warn",
"warning",
}


class LoggingUsageTests(unittest.TestCase):
def test_production_code_does_not_use_root_logging_calls(self):
project_root = Path(__file__).resolve().parents[2]
production_root = project_root / "twotone"

offenders: list[str] = []
for path in sorted(production_root.rglob("*.py")):
tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
func = node.func
if not isinstance(func, ast.Attribute):
continue
value = func.value
if not isinstance(value, ast.Name) or value.id != "logging":
continue
if func.attr not in FORBIDDEN_LOGGING_CALLS:
continue

rel_path = path.relative_to(project_root)
offenders.append(f"{rel_path}:{node.lineno} logging.{func.attr}()")

self.assertEqual([], offenders)

def test_production_code_uses_twotone_logger_hierarchy(self):
project_root = Path(__file__).resolve().parents[2]
production_root = project_root / "twotone"

offenders: list[str] = []
for path in sorted(production_root.rglob("*.py")):
tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
func = node.func
if not isinstance(func, ast.Attribute):
continue
value = func.value
if not isinstance(value, ast.Name) or value.id != "logging":
continue
if func.attr != "getLogger":
continue

rel_path = path.relative_to(project_root)
if not node.args:
offenders.append(f"{rel_path}:{node.lineno} logging.getLogger()")
continue

logger_name = node.args[0]
if isinstance(logger_name, ast.Constant) and isinstance(logger_name.value, str):
if not logger_name.value.startswith("TwoTone"):
offenders.append(f"{rel_path}:{node.lineno} logging.getLogger({logger_name.value!r})")
continue

offenders.append(f"{rel_path}:{node.lineno} logging.getLogger(<dynamic>)")

self.assertEqual([], offenders)


if __name__ == "__main__":
unittest.main()
7 changes: 3 additions & 4 deletions twotone/tools/concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

class Concatenate(generic_utils.InterruptibleProcess):
def __init__(self, logger: logging.Logger, working_dir: str):
super().__init__()

super().__init__(logger)
self.logger = logger
self.working_dir = working_dir

Expand Down Expand Up @@ -109,7 +108,7 @@ def perform(self, sorted_videos: dict[str, list[tuple[str, int]]]) -> None:

audio_codec = "copy"
for input_file in input_files:
file_details = video_utils.get_video_data(input_file)
file_details = video_utils.get_video_data(input_file, logger=self.logger)
audio_streams = file_details.get("audio", [])
for audio_stream in audio_streams:
codec = audio_stream.get("codec")
Expand All @@ -125,7 +124,7 @@ def escape_path(path: str) -> str:
ffmpeg_args = ["-f", "concat", "-safe", "0", "-i", input_file, "-c:v", "copy", "-c:a", audio_codec, output]

self.logger.info(f"Concatenating files into {output} file")
status = process_utils.start_process("ffmpeg", ffmpeg_args)
status = process_utils.start_process("ffmpeg", ffmpeg_args, logger=self.logger)
if status.returncode == 0:
for input_file in input_files:
os.remove(input_file)
Expand Down
10 changes: 5 additions & 5 deletions twotone/tools/language_fixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def _scan_directory(self, path: str, include_audio: bool) -> list[dict]:
def _set_context(self, logger: logging.Logger, working_dir: str) -> None:
self.logger = logger
self.working_dir = working_dir
self._interruption = generic_utils.InterruptibleProcess()
self._interruption = generic_utils.InterruptibleProcess(logger)

def check_for_stop(self) -> None:
if self._interruption is not None:
Expand All @@ -348,7 +348,7 @@ def _is_rmvb(path: str) -> bool:

def _get_tracks_mkvmerge(self, video_path: str) -> list[dict]:
start = time.perf_counter()
info = video_utils.get_video_full_info_mkvmerge(video_path)
info = video_utils.get_video_full_info_mkvmerge(video_path, logger=self.logger)
self._log_if_slow("get_tracks_mkvmerge", video_path, start)
tracks: list[dict] = []

Expand Down Expand Up @@ -383,7 +383,7 @@ def _get_tracks_mkvmerge(self, video_path: str) -> list[dict]:

def _get_tracks_ffprobe(self, video_path: str) -> list[dict]:
start = time.perf_counter()
info = video_utils.get_video_full_info(video_path)
info = video_utils.get_video_full_info(video_path, logger=self.logger)
self._log_if_slow("get_tracks_ffprobe", video_path, start)
tracks: list[dict] = []
for stream in info.get("streams", []):
Expand Down Expand Up @@ -581,7 +581,7 @@ def _detect_subtitle_languages(self, video_path: str, missing_subtitles: list[in
encoding = subtitles_utils.file_encoding(path)
self._log_if_slow("subtitle_file_encoding", path, encoding_start)
lang_start = time.perf_counter()
detected_lang = subtitles_utils.guess_subtitle_language(path, encoding)
detected_lang = subtitles_utils.guess_subtitle_language(path, encoding, logger=self.logger)
self._log_if_slow("subtitle_language_guess", path, lang_start)
if detected_lang:
try:
Expand Down Expand Up @@ -664,7 +664,7 @@ def _apply_language_updates(self, video_path: str, updates: dict[int, str]) -> b
args.extend(["--language", f"{tid}:{lang}"])
args.append(video_path)

status = process_utils.start_process("mkvmerge", args)
status = process_utils.start_process("mkvmerge", args, logger=self.logger)
if status.returncode not in (0, 1):
output = (status.stdout or "") + (status.stderr or "")
self.logger.error(
Expand Down
6 changes: 4 additions & 2 deletions twotone/tools/melt/jellyfin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
from .duplicates_source import DuplicatesSource


DEFAULT_LOGGER = logging.getLogger("TwoTone.melt.jellyfin")


class JellyfinSource(DuplicatesSource):
def __init__(self, interruption: generic_utils.InterruptibleProcess, url: str, token: str, path_fix: tuple[str, str] | None, logger: logging.Logger | None = None) -> None:
super().__init__(interruption)

self.url = url
self.token = token
self.path_fix = path_fix
# allow injecting a logger for better control in callers/tests
self.logger = logger or logging.getLogger(__name__)
self.logger = logger or DEFAULT_LOGGER
self.tmdb_id_by_path: dict[str, str] = {}
self.tmdb_cache = TmdbCache(logger=self.logger)

Expand Down
5 changes: 2 additions & 3 deletions twotone/tools/melt/melt_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ def _print_attachments_details(self, ids: dict[str, int], all_attachments: Itera
file_id = ids[path]
self.logger.debug(f"Attachment ID #{tid} from file #{file_id}")

@staticmethod
def _probe_inputs(files: Sequence[str]) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
details_full = {file: video_utils.get_video_data_mkvmerge(file, enrich=True) for file in files}
def _probe_inputs(self, files: Sequence[str]) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
details_full = {file: video_utils.get_video_data_mkvmerge(file, enrich=True, logger=self.logger) for file in files}
attachments = {file: info["attachments"] for file, info in details_full.items()}
tracks = {file: info["tracks"] for file, info in details_full.items()}
return details_full, attachments, tracks
Expand Down
Loading
Loading