diff --git a/CHANGELOG.md b/CHANGELOG.md index 025d24a..b2e9656 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ CHANGELOG ========= +## 0.9.2 + +* Renames the move CLI from `smove` to `smv` +* Makes `smv` behave more like sequence-aware `mv`, including destination-based renames +* Adds explicit range support to `smv` and `scopy` for both compressed and embedded sequence syntax +* Adds an initial `srm` CLI for removing resolved sequence members +* Removes the legacy `--rename` flag from `smv` and `scopy` in favor of destination-based naming +* Updates tests and README examples for the new CLI behavior + ## 0.9.1 * Removes the envstack runtime dependency and migrates packaging metadata to `pyproject.toml` diff --git a/README.md b/README.md index 9209b9a..f242903 100644 --- a/README.md +++ b/README.md @@ -210,7 +210,8 @@ PySeq comes with the following sequence-aware command-line tools: | `sdiff` | Compare two sequences | `sdiff A.%04d.exr B.%04d.exr` | | `sstat` | Print detailed stats about a sequence | `sstat render.%04d.exr` | | `scopy` | Copy a sequence to another directory | `scopy a.%04d.exr /tmp/output/` | -| `smove` | Move a sequence to another directory | `smove b.%04d.exr /tmp/archive/` | +| `srm` | Remove a sequence or frame range | `srm a.1001-1100.exr` | +| `smv` | Move or rename a sequence | `smv b.%04d.exr /tmp/archive/` | Example commands: @@ -231,11 +232,23 @@ $ sdiff comp_A.%04d.exr comp_B.%04d.exr $ sstat render.%04d.exr $ sstat --json render.%04d.exr -# Copy a sequence and rename it -$ scopy input.%04d.exr output/ --rename scene01 +# Copy a sequence into a directory +$ scopy input.%04d.exr output/ + +# Copy an embedded frame range into a new sequence +$ scopy input.1-100.exr scene.1001-1100.exr + +# Remove an embedded frame range +$ srm input.1-100.exr + +# Rename a sequence in place +$ smv old.%04d.exr new.%04d.exr + +# Move an embedded frame range into a new sequence +$ smv old.1-100.rgb new.1001-1100.rgb # Move and renumber a sequence starting at frame 1001 -$ smove old.%04d.exr archive/ --renumber 1001 +$ smv old.%04d.exr archive/ --renumber 1001 ``` ## Frame Patterns diff --git a/bin/smove b/bin/smv similarity index 100% rename from bin/smove rename to bin/smv diff --git a/bin/smove.bat b/bin/smv.bat similarity index 100% rename from bin/smove.bat rename to bin/smv.bat diff --git a/lib/pyseq/__init__.py b/lib/pyseq/__init__.py index 1d189f1..7800c27 100644 --- a/lib/pyseq/__init__.py +++ b/lib/pyseq/__init__.py @@ -48,6 +48,6 @@ """ __author__ = "Ryan Galloway" -__version__ = "0.9.1" +__version__ = "0.9.2" from .seq import * diff --git a/lib/pyseq/scopy.py b/lib/pyseq/scopy.py index 3dcbc84..4e3a8bd 100644 --- a/lib/pyseq/scopy.py +++ b/lib/pyseq/scopy.py @@ -37,14 +37,13 @@ import os import argparse import shutil -import fnmatch from typing import Optional import pyseq from pyseq.util import ( cli_catch_keyboard_interrupt, - is_compressed_format_string, - resolve_sequence, + parse_destination_reference, + resolve_sequence_reference, ) @@ -100,20 +99,12 @@ def main(): """Main function to parse cli args and copy sequences.""" parser = argparse.ArgumentParser( - description="Copy image sequences with renaming/renumbering support", + description="Copy image sequences with destination-based renaming and renumbering support", ) parser.add_argument( - "sources", + "paths", nargs="+", - help="Source sequences (wildcards or compressed format strings)", - ) - parser.add_argument( - "dest", - help="Destination directory", - ) - parser.add_argument( - "--rename", - help="Rename sequence basename", + help="Source sequence(s) followed by a destination directory or sequence pattern", ) parser.add_argument( "--renumber", @@ -145,35 +136,38 @@ def main(): ) args = parser.parse_args() - if not os.path.isdir(args.dest): - print(f"Error: destination {args.dest} is not a directory", file=sys.stderr) + if len(args.paths) < 2: + print("Error: expected at least one source and a destination", file=sys.stderr) return 1 - for source in args.sources: + sources = args.paths[:-1] + dest = args.paths[-1] + + for source in sources: try: - if is_compressed_format_string(source): - seq = resolve_sequence(source) - dirname = os.path.dirname(source) or "." - else: - # treat as glob - dirname = os.path.dirname(source) or "." - basename = os.path.basename(source) - matches = [ - f for f in os.listdir(dirname) if fnmatch.fnmatchcase(f, basename) - ] - sequences = pyseq.get_sequences(matches) - if not sequences: - print(f"No sequence found matching {source}", file=sys.stderr) - continue - seq = sequences[0] + seq, dirname = resolve_sequence_reference(source) + dest_spec = parse_destination_reference(dest, seq) + + if len(sources) > 1 and dest_spec["kind"] != "directory": + raise ValueError( + "destination must be a directory when copying multiple sources" + ) + + rename = dest_spec["rename"] + pad = args.pad if dest_spec["kind"] == "directory" else dest_spec["pad"] + renumber = ( + args.renumber + if dest_spec["kind"] == "directory" + else dest_spec["renumber"] + ) copy_sequence( seq, dirname, - args.dest, - rename=args.rename, - renumber=args.renumber, - pad=args.pad, + dest_spec["dest_dir"], + rename=rename, + renumber=renumber, + pad=pad, force=args.force, dryrun=args.dryrun, verbose=args.verbose, diff --git a/lib/pyseq/smove.py b/lib/pyseq/smove.py index 19f62a9..cdbe79a 100644 --- a/lib/pyseq/smove.py +++ b/lib/pyseq/smove.py @@ -37,14 +37,13 @@ import os import argparse import shutil -import fnmatch from typing import Optional import pyseq from pyseq.util import ( cli_catch_keyboard_interrupt, - is_compressed_format_string, - resolve_sequence, + parse_destination_reference, + resolve_sequence_reference, ) @@ -100,20 +99,12 @@ def main(): """Main function to handle command line arguments and call the move_sequence.""" parser = argparse.ArgumentParser( - description="Move image sequences with renaming/renumbering support", + description="Move image sequences with destination-based renaming and renumbering support", ) parser.add_argument( - "sources", + "paths", nargs="+", - help="Source sequences (wildcards or compressed format strings)", - ) - parser.add_argument( - "dest", - help="Destination directory", - ) - parser.add_argument( - "--rename", - help="Rename sequence basename", + help="Source sequence(s) followed by a destination directory or sequence pattern", ) parser.add_argument( "--renumber", @@ -145,35 +136,39 @@ def main(): ) args = parser.parse_args() - if not os.path.isdir(args.dest): - print(f"Error: destination {args.dest} is not a directory", file=sys.stderr) + if len(args.paths) < 2: + print("Error: expected at least one source and a destination", file=sys.stderr) return 1 - for source in args.sources: + sources = args.paths[:-1] + dest = args.paths[-1] + + for source in sources: try: - if is_compressed_format_string(source): - seq = resolve_sequence(source) - dirname = os.path.dirname(source) or "." - else: - # treat as glob - dirname = os.path.dirname(source) or "." - basename = os.path.basename(source) - matches = [ - f for f in os.listdir(dirname) if fnmatch.fnmatchcase(f, basename) - ] - sequences = pyseq.get_sequences(matches) - if not sequences: - print(f"No sequence found matching {source}", file=sys.stderr) - continue - seq = sequences[0] + seq, dirname = resolve_sequence_reference(source) + dest_spec = parse_destination_reference(dest, seq) + + if len(sources) > 1 and dest_spec["kind"] != "directory": + raise ValueError( + "destination must be a directory when moving multiple sources" + ) + + rename = dest_spec["rename"] + pad = args.pad if dest_spec["kind"] == "directory" else dest_spec["pad"] + renumber = ( + args.renumber + if dest_spec["kind"] == "directory" + else dest_spec["renumber"] + ) + dest_dir = dest_spec["dest_dir"] move_sequence( seq, dirname, - args.dest, - rename=args.rename, - renumber=args.renumber, - pad=args.pad, + dest_dir, + rename=rename, + renumber=renumber, + pad=pad, force=args.force, dryrun=args.dryrun, verbose=args.verbose, diff --git a/lib/pyseq/sremove.py b/lib/pyseq/sremove.py new file mode 100644 index 0000000..20fe685 --- /dev/null +++ b/lib/pyseq/sremove.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2025, Ryan Galloway (ryan@rsgalloway.com) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# - Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# - Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# - Neither the name of the software nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# ----------------------------------------------------------------------------- + +__doc__ = """ +Contains the main sremove functions for the pyseq module. +""" + +import argparse +import os +import sys + +import pyseq +from pyseq.util import cli_catch_keyboard_interrupt, resolve_sequence_reference + + +def remove_sequence( + seq: pyseq.Sequence, + src_dir: str, + force: bool = False, + dryrun: bool = False, + verbose: bool = False, +): + """Remove a sequence of files from src_dir.""" + for frame in seq: + src_path = os.path.join(src_dir, frame.name) + + if verbose or dryrun: + print(src_path) + + if dryrun: + continue + + try: + os.remove(src_path) + except FileNotFoundError: + if not force: + raise + + +@cli_catch_keyboard_interrupt +def main(): + """Main function to parse cli args and remove sequences.""" + parser = argparse.ArgumentParser( + description="Remove image sequences resolved from patterns, ranges, or globs", + ) + parser.add_argument( + "sources", + nargs="+", + help="Source sequences (globs, compressed patterns, or explicit ranges)", + ) + parser.add_argument( + "-f", + "--force", + action="store_true", + help="Ignore missing files", + ) + parser.add_argument( + "-d", + "--dryrun", + action="store_true", + help="Preview removals without performing them", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Verbose output", + ) + args = parser.parse_args() + + for source in args.sources: + try: + seq, dirname = resolve_sequence_reference(source) + remove_sequence( + seq, + dirname, + force=args.force, + dryrun=args.dryrun, + verbose=args.verbose, + ) + except Exception as e: + print(f"Error processing {source}: {e}", file=sys.stderr) + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/lib/pyseq/util.py b/lib/pyseq/util.py index 94a1c88..442d317 100644 --- a/lib/pyseq/util.py +++ b/lib/pyseq/util.py @@ -30,13 +30,16 @@ # ----------------------------------------------------------------------------- import functools +import fnmatch import glob import os import re import sys import warnings +from typing import Optional import pyseq +from pyseq.config import range_join def deprecated(func): @@ -172,3 +175,193 @@ def resolve_sequence(sequence_string: str): raise ValueError("Multiple sequences found: %s" % sequences) return sequences[0] + + +def build_sequence_pattern(head: str, pad: Optional[int], tail: str) -> str: + """Build a compressed sequence pattern from sequence components.""" + if pad: + return f"{head}%0{pad}d{tail}" + return f"{head}%d{tail}" + + +def subset_sequence(seq, frames): + """Return a sequence containing only the requested frames.""" + frame_set = set(frames) + items = [item for item in seq if item.frame in frame_set] + found_frames = {item.frame for item in items} + missing_frames = sorted(frame_set - found_frames) + if missing_frames: + raise FileNotFoundError(f"Missing frames in sequence: {missing_frames}") + + sequences = pyseq.get_sequences(items) + if not sequences: + raise ValueError("No valid sequence found for requested frame subset") + return sequences[0] + + +def parse_explicit_sequence_string(reference: str): + """Parse a serialized sequence string, including embedded range syntax.""" + dirname = os.path.dirname(reference) or "." + basename = os.path.basename(reference) + + embedded = re.match( + r"^(?P.+?)(?P\[(?:[^\]]+)\]|\d+-\d+)(?P\.[^/\s]+)$", + basename, + ) + if embedded: + range_text = embedded.group("range") + frames = [] + if range_text.startswith("["): + for number_group in range_text[1:-1].split(range_join): + number_group = number_group.strip() + if not number_group: + continue + if "-" in number_group: + start, end = number_group.split("-", 1) + frames.extend(range(int(start), int(end) + 1)) + else: + frames.append(int(number_group)) + else: + start, end = range_text.split("-", 1) + frames = list(range(int(start), int(end) + 1)) + + items = [ + pyseq.Item( + os.path.join( + dirname, + f"{embedded.group('head')}{frame}{embedded.group('tail')}", + ) + ) + for frame in frames + ] + sequences = pyseq.get_sequences(items) + if sequences: + return { + "seq": sequences[0], + "has_pad": False, + } + + formats = ( + ("%h%p%t %R", True), + ("%h%p%t %r", True), + ("%h%R%t", False), + ("%h%r%t", False), + ) + for fmt, has_pad in formats: + seq = pyseq.uncompress(reference, fmt=fmt) + if seq: + return { + "seq": seq, + "has_pad": has_pad, + } + return None + + +def resolve_sequence_reference(reference: str): + """Resolve a source reference into a sequence and its containing directory.""" + explicit = parse_explicit_sequence_string(reference) + if explicit: + dirname = os.path.dirname(reference) or "." + requested_seq = explicit["seq"] + + if explicit["has_pad"]: + pattern = os.path.join( + dirname, + build_sequence_pattern( + requested_seq.head(), + requested_seq.pad, + requested_seq.tail(), + ), + ) + full_seq = resolve_sequence(pattern) + else: + sequences = pyseq.get_sequences(os.listdir(dirname)) + candidates = [ + seq + for seq in sequences + if seq.head() == requested_seq.head() + and seq.tail() == requested_seq.tail() + ] + candidates = [ + seq + for seq in candidates + if set(requested_seq.frames()).issubset(set(seq.frames())) + ] + if not candidates: + raise FileNotFoundError(f"No sequence found matching {reference}") + if len(candidates) > 1: + raise ValueError( + f"Multiple sequences found matching {reference}: {candidates}" + ) + full_seq = candidates[0] + + return subset_sequence(full_seq, requested_seq.frames()), dirname + + if is_compressed_format_string(reference): + seq = resolve_sequence(reference) + dirname = os.path.dirname(reference) or "." + return seq, dirname + + dirname = os.path.dirname(reference) or "." + basename = os.path.basename(reference) + matches = [f for f in os.listdir(dirname) if fnmatch.fnmatchcase(f, basename)] + sequences = pyseq.get_sequences(matches) + if not sequences: + raise FileNotFoundError(f"No sequence found matching {reference}") + if len(sequences) > 1: + raise ValueError(f"Multiple sequences found matching {reference}: {sequences}") + return sequences[0], dirname + + +def parse_destination_reference(destination: str, source_seq): + """Parse a destination string as either a directory or destination sequence.""" + explicit = parse_explicit_sequence_string(destination) + if explicit: + dest_seq = explicit["seq"] + dest_frames = list(dest_seq.frames()) + expected_frames = list(range(dest_frames[0], dest_frames[0] + len(source_seq))) + + if dest_seq.tail() != source_seq.tail(): + raise ValueError( + "Destination sequence pattern must preserve the source extension" + ) + if dest_frames != expected_frames: + raise ValueError("Destination explicit range must be contiguous") + if len(dest_seq) != len(source_seq): + raise ValueError("Destination explicit range must match source length") + + return { + "kind": "sequence", + "dest_dir": os.path.dirname(destination) or ".", + "rename": dest_seq.head(), + "pad": dest_seq.pad if explicit["has_pad"] else None, + "renumber": dest_frames[0], + } + + if not is_compressed_format_string(destination): + return { + "kind": "directory", + "dest_dir": destination, + "rename": None, + "pad": None, + "renumber": None, + } + + filename = os.path.basename(destination) + match = re.search(r"%(?:0(?P\d+))?d", filename) + if not match: + raise ValueError(f"Invalid destination sequence pattern: {destination}") + + tail = filename[match.end() :] + if tail != source_seq.tail(): + raise ValueError( + "Destination sequence pattern must preserve the source extension" + ) + + return { + "kind": "sequence", + "dest_dir": os.path.dirname(destination) or ".", + "rename": filename[: match.start()], + "pad": int(match.group("pad")) if match.group("pad") else None, + "renumber": None, + } diff --git a/pyproject.toml b/pyproject.toml index 558e0a6..c3f8825 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,8 @@ lss = "pyseq.lss:main" scopy = "pyseq.scopy:main" sdiff = "pyseq.sdiff:main" sfind = "pyseq.sfind:main" -smove = "pyseq.smove:main" +smv = "pyseq.smove:main" +srm = "pyseq.sremove:main" sstat = "pyseq.sstat:main" stree = "pyseq.stree:main" diff --git a/tests/test_cli_interrupts.py b/tests/test_cli_interrupts.py index a4ef2e4..c8207e3 100644 --- a/tests/test_cli_interrupts.py +++ b/tests/test_cli_interrupts.py @@ -2,7 +2,7 @@ import pytest -from pyseq import lss, scopy, sdiff, sfind, smove, sstat, stree +from pyseq import lss, scopy, sdiff, sfind, smove, sremove, sstat, stree def _raise_keyboard_interrupt(*args, **kwargs): @@ -31,16 +31,28 @@ def test_cli_main_handles_keyboard_interrupt( assert "stopping..." in captured.err -@pytest.mark.parametrize(("module", "command"), [(scopy, "scopy"), (smove, "smove")]) +@pytest.mark.parametrize(("module", "command"), [(scopy, "scopy"), (smove, "smv")]) def test_copy_move_cli_main_handles_keyboard_interrupt( monkeypatch, capsys, tmp_path, module, command ): dest_dir = tmp_path / "dest" dest_dir.mkdir() - monkeypatch.setattr(module, "resolve_sequence", _raise_keyboard_interrupt) + monkeypatch.setattr(module, "resolve_sequence_reference", _raise_keyboard_interrupt) monkeypatch.setattr("sys.argv", [command, "a.%04d.exr", str(dest_dir)]) assert module.main() == 1 captured = capsys.readouterr() assert "stopping..." in captured.err + + +def test_srm_cli_main_handles_keyboard_interrupt(monkeypatch, capsys, tmp_path): + monkeypatch.chdir(tmp_path) + monkeypatch.setattr( + sremove, "resolve_sequence_reference", _raise_keyboard_interrupt + ) + monkeypatch.setattr("sys.argv", ["srm", "a.%04d.exr"]) + + assert sremove.main() == 1 + captured = capsys.readouterr() + assert "stopping..." in captured.err diff --git a/tests/test_scopy.py b/tests/test_scopy.py index 740300c..250066a 100644 --- a/tests/test_scopy.py +++ b/tests/test_scopy.py @@ -96,3 +96,47 @@ def test_scopy_cli(sample_sequence): for i in range(1, 4): expected = os.path.join(destdir, f"test.{i:04d}.exr") assert os.path.exists(expected) + + +def test_scopy_cli_explicit_sequence_string_source_and_dest(tmp_path): + """Serialized sequence strings should resolve before copying.""" + for i in range(1, 6): + (tmp_path / f"shot.{i:04d}.exr").write_text("dummy frame") + + src = str(tmp_path / "shot.%04d.exr") + " 1-3" + dest = str(tmp_path / "take.%04d.exr") + " 1001-1003" + + result = subprocess.run( + [scopy_bin, src, dest], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1, 6): + assert os.path.exists(tmp_path / f"shot.{i:04d}.exr") + for i in range(1001, 1004): + assert os.path.exists(tmp_path / f"take.{i:04d}.exr") + + +def test_scopy_cli_embedded_range_source_and_dest(tmp_path): + """Embedded range syntax should work for copy operations too.""" + for i in range(1, 6): + (tmp_path / f"plate.{i:04d}.rgb").write_text("dummy frame") + + src = str(tmp_path / "plate.2-4.rgb") + dest = str(tmp_path / "beauty.20-22.rgb") + + result = subprocess.run( + [scopy_bin, src, dest], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1, 6): + assert os.path.exists(tmp_path / f"plate.{i:04d}.rgb") + for i in range(20, 23): + assert os.path.exists(tmp_path / f"beauty.{i:04d}.rgb") diff --git a/tests/test_smove.py b/tests/test_smove.py index 5936e58..bea5983 100644 --- a/tests/test_smove.py +++ b/tests/test_smove.py @@ -30,7 +30,7 @@ # __doc__ = """ -Contains tests for the smove module. +Contains tests for the smv console command and smove module. """ import os @@ -42,7 +42,7 @@ from conftest import get_installed_command from pyseq.smove import move_sequence -smove_bin = get_installed_command("smove") +smv_bin = get_installed_command("smv") @pytest.fixture @@ -84,14 +84,14 @@ def test_move_sequence_basic(sample_sequence): assert not os.path.exists(old_path) -def test_smove_cli(sample_sequence): - """Test the command-line interface of smove.""" +def test_smv_cli(sample_sequence): + """Test the command-line interface of smv.""" src_dir, _ = sample_sequence with tempfile.TemporaryDirectory() as dest_dir: pattern = os.path.join(src_dir, "test.%04d.exr") result = subprocess.run( - [smove_bin, pattern, dest_dir], + [smv_bin, pattern, dest_dir], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, @@ -103,3 +103,145 @@ def test_smove_cli(sample_sequence): assert os.path.exists(new_path) old_path = os.path.join(src_dir, f"test.{i:04d}.exr") assert not os.path.exists(old_path) + + +def test_smv_cli_rename_sequence_pattern(sample_sequence): + """smv should support mv-style sequence renames.""" + src_dir, _ = sample_sequence + src_pattern = os.path.join(src_dir, "test.%04d.exr") + dest_pattern = os.path.join(src_dir, "renamed.%04d.exr") + + result = subprocess.run( + [smv_bin, src_pattern, dest_pattern], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1, 4): + assert os.path.exists(os.path.join(src_dir, f"renamed.{i:04d}.exr")) + assert not os.path.exists(os.path.join(src_dir, f"test.{i:04d}.exr")) + + +def test_smv_cli_creates_destination_directory(sample_sequence): + """smv should create a destination directory when moving a sequence.""" + src_dir, _ = sample_sequence + parent_dir = tempfile.mkdtemp() + try: + dest_dir = os.path.join(parent_dir, "archive") + pattern = os.path.join(src_dir, "test.%04d.exr") + + result = subprocess.run( + [smv_bin, pattern, dest_dir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1, 4): + assert os.path.exists(os.path.join(dest_dir, f"test.{i:04d}.exr")) + assert not os.path.exists(os.path.join(src_dir, f"test.{i:04d}.exr")) + finally: + for root, dirs, files in os.walk(parent_dir, topdown=False): + for name in files: + os.remove(os.path.join(root, name)) + for name in dirs: + os.rmdir(os.path.join(root, name)) + os.rmdir(parent_dir) + + +def test_smv_cli_wildcard_source(sample_sequence): + """Wildcard sources should resolve to a sequence before moving.""" + src_dir, _ = sample_sequence + dest_dir = tempfile.mkdtemp() + try: + wildcard = os.path.join(src_dir, "test.*.exr") + + result = subprocess.run( + [smv_bin, wildcard, dest_dir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1, 4): + assert os.path.exists(os.path.join(dest_dir, f"test.{i:04d}.exr")) + finally: + for root, dirs, files in os.walk(dest_dir, topdown=False): + for name in files: + os.remove(os.path.join(root, name)) + for name in dirs: + os.rmdir(os.path.join(root, name)) + os.rmdir(dest_dir) + + +def test_smv_cli_multiple_sources_require_directory(tmp_path): + """Multiple sources should require a destination directory.""" + for prefix in ("a", "b"): + for i in range(1, 3): + (tmp_path / f"{prefix}.{i:04d}.exr").write_text("dummy frame") + + src_a = str(tmp_path / "a.%04d.exr") + src_b = str(tmp_path / "b.%04d.exr") + dest_pattern = str(tmp_path / "renamed.%04d.exr") + + result = subprocess.run( + [smv_bin, src_a, src_b, dest_pattern], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 1 + assert "destination must be a directory" in result.stderr + + +def test_smv_cli_explicit_sequence_string_source_and_dest(tmp_path): + """Serialized sequence strings should resolve before moving.""" + for i in range(1, 6): + (tmp_path / f"shot.{i:04d}.exr").write_text("dummy frame") + + src = str(tmp_path / "shot.%04d.exr") + " 1-3" + dest = str(tmp_path / "take.%04d.exr") + " 1001-1003" + + result = subprocess.run( + [smv_bin, src, dest], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1001, 1004): + assert os.path.exists(tmp_path / f"take.{i:04d}.exr") + for i in range(1, 4): + assert not os.path.exists(tmp_path / f"shot.{i:04d}.exr") + for i in range(4, 6): + assert os.path.exists(tmp_path / f"shot.{i:04d}.exr") + + +def test_smv_cli_embedded_range_source_and_dest(tmp_path): + """Embedded range syntax should resolve against on-disk padded sequences.""" + for i in range(1, 6): + (tmp_path / f"plate.{i:04d}.rgb").write_text("dummy frame") + + src = str(tmp_path / "plate.2-4.rgb") + dest = str(tmp_path / "beauty.20-22.rgb") + + result = subprocess.run( + [smv_bin, src, dest], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(20, 23): + assert os.path.exists(tmp_path / f"beauty.{i:04d}.rgb") + assert os.path.exists(tmp_path / "plate.0001.rgb") + assert os.path.exists(tmp_path / "plate.0005.rgb") + for i in range(2, 5): + assert not os.path.exists(tmp_path / f"plate.{i:04d}.rgb") diff --git a/tests/test_srm.py b/tests/test_srm.py new file mode 100644 index 0000000..5ee56bb --- /dev/null +++ b/tests/test_srm.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2025, Ryan Galloway (ryan@rsgalloway.com) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# - Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# - Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# - Neither the name of the software nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# + +__doc__ = """ +Contains tests for the srm CLI and sremove module. +""" + +import os +import subprocess +import tempfile +import pytest + +import pyseq +from conftest import get_installed_command +from pyseq.sremove import remove_sequence + +srm_bin = get_installed_command("srm") + + +@pytest.fixture +def sample_sequence(): + """Fixture to create a temporary directory with a sequence of files.""" + with tempfile.TemporaryDirectory() as tmpdir: + for i in range(1, 4): + path = os.path.join(tmpdir, f"test.{i:04d}.exr") + with open(path, "w") as f: + f.write("dummy frame") + seq = pyseq.get_sequences(os.listdir(tmpdir))[0] + yield tmpdir, seq + + +def test_remove_sequence_basic(sample_sequence): + """Test removing a sequence of files.""" + src_dir, seq = sample_sequence + remove_sequence( + seq=seq, + src_dir=src_dir, + force=False, + dryrun=False, + verbose=False, + ) + + for i in range(1, 4): + old_path = os.path.join(src_dir, f"test.{i:04d}.exr") + assert not os.path.exists(old_path) + + +def test_srm_cli(sample_sequence): + """Test the command-line interface of srm.""" + src_dir, _ = sample_sequence + pattern = os.path.join(src_dir, "test.%04d.exr") + + result = subprocess.run( + [srm_bin, pattern], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + for i in range(1, 4): + old_path = os.path.join(src_dir, f"test.{i:04d}.exr") + assert not os.path.exists(old_path) + + +def test_srm_cli_dryrun(sample_sequence): + """Dry-run should print planned removals without deleting files.""" + src_dir, _ = sample_sequence + pattern = os.path.join(src_dir, "test.%04d.exr") + + result = subprocess.run( + [srm_bin, pattern, "--dryrun"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + assert "test.0001.exr" in result.stdout + for i in range(1, 4): + assert os.path.exists(os.path.join(src_dir, f"test.{i:04d}.exr")) + + +def test_srm_cli_explicit_sequence_string(tmp_path): + """Serialized sequence strings should resolve before removal.""" + for i in range(1, 6): + (tmp_path / f"shot.{i:04d}.exr").write_text("dummy frame") + + src = str(tmp_path / "shot.%04d.exr") + " 2-4" + + result = subprocess.run( + [srm_bin, src], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + assert os.path.exists(tmp_path / "shot.0001.exr") + assert os.path.exists(tmp_path / "shot.0005.exr") + for i in range(2, 5): + assert not os.path.exists(tmp_path / f"shot.{i:04d}.exr") + + +def test_srm_cli_embedded_range(tmp_path): + """Embedded range syntax should resolve against on-disk padded sequences.""" + for i in range(1, 6): + (tmp_path / f"plate.{i:04d}.rgb").write_text("dummy frame") + + result = subprocess.run( + [srm_bin, str(tmp_path / "plate.2-4.rgb")], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + assert result.returncode == 0, result.stderr + assert os.path.exists(tmp_path / "plate.0001.rgb") + assert os.path.exists(tmp_path / "plate.0005.rgb") + for i in range(2, 5): + assert not os.path.exists(tmp_path / f"plate.{i:04d}.rgb")