diff --git a/src/xfer/cli.py b/src/xfer/cli.py index 223c2ec..2ae9ffc 100644 --- a/src/xfer/cli.py +++ b/src/xfer/cli.py @@ -54,7 +54,8 @@ # Helpers # ----------------------------- def now_run_id() -> str: - ts = dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" + # Use filename-safe format (no colons) + ts = dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") rnd = os.urandom(3).hex() return f"{ts}_{rnd}" @@ -245,6 +246,7 @@ def manifest_build( rclone_cmd += shlex.split(extra_lsjson_flags) rclone_cmd.append("--files-only") + rclone_cmd.append("--max-backlog=1000000") srun_cmd = ["srun", "-n", "1", "-c", "8", "--no-container-remap-root"] srun_cmd += pyxis_container_args( @@ -263,21 +265,62 @@ def manifest_build( # Write error details to xfer-err/ err_file = err_dir / f"manifest_build-{run_id}.log" with err_file.open("w", encoding="utf-8") as ef: - ef.write(f"Exception: {exc}\n") + ef.write(f"Exception: {exc}\n\n") + + # Log the command that was run + ef.write("--- COMMAND ---\n") + ef.write(" ".join(shlex.quote(c) for c in srun_cmd) + "\n\n") + + # Log relevant SLURM environment variables + ef.write("--- SLURM ENVIRONMENT ---\n") + slurm_vars = {k: v for k, v in os.environ.items() if k.startswith("SLURM")} + for k, v in sorted(slurm_vars.items()): + ef.write(f"{k}={v}\n") + ef.write("\n") + import traceback + ef.write("--- TRACEBACK ---\n") ef.write(traceback.format_exc()) - # If subprocess.CalledProcessError, try to write stderr + + # If subprocess.CalledProcessError, write stdout and stderr if hasattr(exc, "stderr") and exc.stderr: with err_file.open("a", encoding="utf-8") as ef: ef.write("\n--- STDERR ---\n") ef.write(str(exc.stderr)) + if hasattr(exc, "stdout") and exc.stdout: + with err_file.open("a", encoding="utf-8") as ef: + ef.write("\n--- STDOUT ---\n") + ef.write(str(exc.stdout)) + + # Also print key info to stderr for visibility in job logs eprint(f"ERROR: srun/rclone failed, see {err_file}") + eprint(f"Command: {' '.join(shlex.quote(c) for c in srun_cmd)}") + if hasattr(exc, "stderr") and exc.stderr: + eprint(f"stderr: {exc.stderr}") + slurm_mem_vars = { + k: v for k, v in os.environ.items() if "MEM" in k and k.startswith("SLURM") + } + if slurm_mem_vars: + eprint(f"SLURM memory env vars: {slurm_mem_vars}") raise - # Build JSONL + # Build JSONL with progress reporting n = 0 bytes_total = 0 + progress_file = out.parent / "manifest.jsonl.progress" + last_progress_n = 0 + PROGRESS_INTERVAL = 10_000 # update progress file every 10k files + + def _write_progress() -> None: + """Write current listing progress to a sidecar file.""" + try: + progress_file.write_text( + json.dumps({"files_listed": n, "bytes_listed": bytes_total}) + ) + except OSError: + pass + with out.open("w", encoding="utf-8") as f: for item in parse_lsjson_items(cp.stdout): # Skip directories @@ -319,6 +362,13 @@ def manifest_build( f.write(json.dumps(rec, separators=(",", ":")) + "\n") n += 1 + if n - last_progress_n >= PROGRESS_INTERVAL: + _write_progress() + eprint(f" manifest progress: {n:,} files, {bytes_total:,} bytes") + last_progress_n = n + + # Final progress update and cleanup + _write_progress() eprint(f"Wrote {n} items, {bytes_total} bytes -> {out}") @@ -398,6 +448,273 @@ def manifest_shard( eprint(f"Wrote {num_shards} shards to {outdir} (records={n}, bytes={total_bytes})") +@manifest_app.command("analyze") +def manifest_analyze( + infile: Path = typer.Option( + ..., + "--in", + exists=True, + dir_okay=False, + help="Input manifest JSONL", + resolve_path=True, + ), + out: Optional[Path] = typer.Option( + None, + help="Output analysis JSON file (if not specified, prints to stdout)", + resolve_path=True, + ), + base_flags: str = typer.Option( + "--retries 10 --low-level-retries 20 --stats 600s --progress", + help="Base rclone flags to always include", + ), +) -> None: + """ + Analyze manifest file sizes and suggest optimal rclone flags. + + Examines the file size distribution and recommends transfer settings: + - Many small files: higher parallelism (--transfers 64 --checkers 128) + - Large files: fewer streams, larger buffers (--transfers 16 --buffer-size 256M) + - Mixed: balanced defaults (--transfers 32 --checkers 64) + """ + from .est import ( + compute_file_size_stats, + format_histogram_data, + human_bytes, + suggest_rclone_flags_from_sizes, + ) + + # Read manifest and extract sizes + sizes: List[int] = [] + for ln in infile.read_text(encoding="utf-8").splitlines(): + if not ln.strip(): + continue + try: + r = json.loads(ln) + size = int(r.get("size") or 0) + sizes.append(size) + except (json.JSONDecodeError, ValueError): + continue + + if not sizes: + result = { + "status": "error", + "error": "No files found in manifest", + "suggested_flags": base_flags, + } + else: + stats = compute_file_size_stats(sizes) + suggestion = suggest_rclone_flags_from_sizes(sizes) + histogram = format_histogram_data(sizes) + + # Combine base flags with profile-specific flags + combined_flags = f"{suggestion.flags} {base_flags}" + + result = { + "status": "ok", + "total_files": stats.total_files, + "total_bytes": stats.total_bytes, + "total_bytes_human": human_bytes(stats.total_bytes), + "file_size_stats": { + "min_size": stats.min_size, + "min_size_human": human_bytes(stats.min_size), + "max_size": stats.max_size, + "max_size_human": human_bytes(stats.max_size), + "median_size": stats.median_size, + "median_size_human": human_bytes(stats.median_size), + "small_files_pct": round(stats.small_files_pct, 1), + "medium_files_pct": round(stats.medium_files_pct, 1), + "large_files_pct": round(stats.large_files_pct, 1), + }, + "profile": suggestion.profile, + "profile_explanation": suggestion.explanation, + "suggested_flags": combined_flags, + "histogram": histogram, + } + + json_output = json.dumps(result, indent=2) + if out: + out.write_text(json_output + "\n", encoding="utf-8") + eprint(f"Wrote analysis to {out}") + else: + print(json_output) + + +@manifest_app.command("rebase") +def manifest_rebase( + infile: Path = typer.Option( + ..., + "--in", + exists=True, + dir_okay=False, + help="Input manifest JSONL", + resolve_path=True, + ), + out: Path = typer.Option(..., help="Output manifest JSONL path", resolve_path=True), + source_root: Optional[str] = typer.Option( + None, + help="New source root (rclone remote:path). If omitted, kept from manifest.", + ), + dest_root: Optional[str] = typer.Option( + None, help="New dest root (rclone remote:path). If omitted, kept from manifest." + ), +) -> None: + """ + Rebase a manifest to a different source/dest root. + + Useful when a manifest was built on one cluster (e.g. against a local VAST path) + and needs to be used on another cluster where the same data is accessed via a + different endpoint (e.g. an S3 remote). The relative 'path' field is preserved; + only source_root, dest_root, source, and dest are rewritten. + """ + mkdirp(out.parent) + + n = 0 + with infile.open(encoding="utf-8") as fin, out.open("w", encoding="utf-8") as fout: + for ln in fin: + if not ln.strip(): + continue + rec = json.loads(ln) + new_src = ( + source_root if source_root is not None else rec.get("source_root", "") + ) + new_dst = dest_root if dest_root is not None else rec.get("dest_root", "") + rel = rec.get("path", "") + rec["source_root"] = new_src + rec["dest_root"] = new_dst + rec["source"] = new_src.rstrip("/") + "/" + rel + rec["dest"] = stable_dest_for_source(new_src, new_dst, rel) + fout.write(json.dumps(rec, separators=(",", ":")) + "\n") + n += 1 + + eprint(f"Rebased {n} records -> {out}") + if source_root is not None: + eprint(f" source_root: {source_root}") + if dest_root is not None: + eprint(f" dest_root: {dest_root}") + + +@manifest_app.command("combine") +def manifest_combine( + source: str = typer.Option( + ..., help="rclone source root, e.g. s3src:bucket/prefix" + ), + dest: str = typer.Option(..., help="rclone dest root, e.g. s3dst:bucket/prefix"), + parts_dir: Path = typer.Option( + ..., + exists=True, + help="Directory containing lsjson-*.json part files", + resolve_path=True, + ), + out: Path = typer.Option(..., help="Output manifest JSONL path", resolve_path=True), + run_id: Optional[str] = typer.Option( + None, help="Run identifier; default is generated" + ), +) -> None: + """ + Combine multiple lsjson part files into a unified manifest.jsonl. + + Reads lsjson-*.json files from --parts-dir, adjusts paths using .prefix + sidecar files, and writes a single manifest JSONL. + """ + run_id = run_id or now_run_id() + mkdirp(out.parent) + + # Glob part files + part_files = sorted(parts_dir.glob("lsjson-*.json")) + if not part_files: + eprint(f"No lsjson-*.json files found in {parts_dir}") + raise typer.Exit(code=2) + + n = 0 + bytes_total = 0 + last_progress_n = 0 + PROGRESS_INTERVAL = 10_000 + progress_file = out.parent / "manifest.jsonl.progress" + + def _write_progress() -> None: + try: + progress_file.write_text( + json.dumps({"files_listed": n, "bytes_listed": bytes_total}) + ) + except OSError: + pass + + with out.open("w", encoding="utf-8") as f: + for part_file in part_files: + # Determine prefix from sidecar file + prefix_file = part_file.with_suffix(".prefix") + prefix = "" + if prefix_file.exists(): + prefix = prefix_file.read_text(encoding="utf-8").strip() + + # Read the JSON array + try: + items = json.loads(part_file.read_text(encoding="utf-8")) + if not isinstance(items, list): + eprint(f"WARNING: {part_file} is not a JSON array, skipping") + continue + except (json.JSONDecodeError, OSError) as e: + eprint(f"WARNING: Failed to read {part_file}: {e}, skipping") + continue + + for item in items: + if not isinstance(item, dict): + continue + if item.get("IsDir") is True: + continue + + rel_path = item.get("Path") + if not rel_path or not isinstance(rel_path, str): + continue + + # Adjust path with prefix + if prefix: + rel_path = prefix + "/" + rel_path + + size = int(item.get("Size") or 0) + bytes_total += size + + mtime = item.get("ModTime") + hashes = ( + item.get("Hashes") if isinstance(item.get("Hashes"), dict) else {} + ) + etag = item.get("ETag") or item.get("etag") + storage_class = item.get("StorageClass") + meta = ( + item.get("Metadata") + if isinstance(item.get("Metadata"), dict) + else {} + ) + + rec = { + "schema": SCHEMA, + "run_id": run_id, + "source_root": source, + "dest_root": dest, + "source": source.rstrip("/") + "/" + rel_path, + "dest": stable_dest_for_source(source, dest, rel_path), + "path": rel_path, + "size": size, + "mtime": mtime, + "hashes": hashes, + "etag": etag, + "storage_class": storage_class, + "meta": meta, + } + f.write(json.dumps(rec, separators=(",", ":")) + "\n") + n += 1 + + if n - last_progress_n >= PROGRESS_INTERVAL: + _write_progress() + eprint(f" manifest progress: {n:,} files, {bytes_total:,} bytes") + last_progress_n = n + + _write_progress() + eprint( + f"Combined {len(part_files)} parts -> {n} items, {bytes_total} bytes -> {out}" + ) + + # ----------------------------- # Slurm render/submit # ----------------------------- @@ -517,7 +834,7 @@ def manifest_shard( : "${RUN_DIR:?}" cd "${RUN_DIR}" -sbatch "${RUN_DIR}/sbatch_array.sh" +sbatch --export=NONE "${RUN_DIR}/sbatch_array.sh" """ SBATCH_ARRAY_SH = r"""#!/usr/bin/env bash @@ -636,7 +953,7 @@ def shell_words(s: str) -> str: else s ) - extras = sbatch_extras.rstrip() + extras = sbatch_extras.replace("\\n", "\n").rstrip() extras = extras if extras else "" sbatch_text = SBATCH_ARRAY_SH.format( @@ -701,7 +1018,9 @@ def slurm_submit( sbatch_script = run_dir / "sbatch_array.sh" if not sbatch_script.exists(): raise typer.BadParameter(f"Missing {sbatch_script}. Run `slurm render` first.") - cp = run_cmd(["sbatch", str(sbatch_script)], capture=True, check=True) + cp = run_cmd( + ["sbatch", "--export=NONE", str(sbatch_script)], capture=True, check=True + ) print(cp.stdout.strip()) @@ -742,6 +1061,9 @@ def run_all( cpus_per_task: int = typer.Option(4, min=1, help="cpus-per-task"), mem: str = typer.Option("8G", help="Slurm mem"), max_attempts: int = typer.Option(5, min=1, help="Worker retry attempts"), + sbatch_extras: str = typer.Option( + "", help="Extra SBATCH lines, e.g. '#SBATCH --account=foo\\n#SBATCH --qos=bar'" + ), submit: bool = typer.Option(False, help="If set, submit job after rendering"), pyxis_extra: str = typer.Option("", help="Extra pyxis flags"), ) -> None: @@ -781,6 +1103,7 @@ def run_all( source_root=source, dest_root=dest, max_attempts=max_attempts, + sbatch_extras=sbatch_extras, pyxis_extra=pyxis_extra, ) if submit: diff --git a/src/xfer/est.py b/src/xfer/est.py index 0be3f9a..5d10e8c 100644 --- a/src/xfer/est.py +++ b/src/xfer/est.py @@ -449,6 +449,199 @@ def parse_bytes(s: str) -> int: print(f"| {label} | {c} | {pf:5.1f}% | {human_bytes(b)} | {pb:5.1f}% | {bar} |") +# ----------------------------- +# Analysis functions for programmatic use +# ----------------------------- +@dataclass +class FileSizeStats: + """Statistics about file sizes in a dataset.""" + + total_files: int + total_bytes: int + min_size: int + max_size: int + median_size: int + mean_size: float + p10_size: int # 10th percentile + p90_size: int # 90th percentile + small_files_pct: float # % of files < 1MB + medium_files_pct: float # % of files 1MB - 100MB + large_files_pct: float # % of files > 100MB + + +def compute_file_size_stats(sizes: List[int]) -> FileSizeStats: + """Compute statistics about file sizes.""" + if not sizes: + return FileSizeStats( + total_files=0, + total_bytes=0, + min_size=0, + max_size=0, + median_size=0, + mean_size=0.0, + p10_size=0, + p90_size=0, + small_files_pct=0.0, + medium_files_pct=0.0, + large_files_pct=0.0, + ) + + sorted_sizes = sorted(sizes) + n = len(sorted_sizes) + total = sum(sizes) + + # Percentile helper + def percentile(p: float) -> int: + idx = int(p * (n - 1)) + return sorted_sizes[idx] + + # Size thresholds + small_threshold = 1024 * 1024 # 1 MB + large_threshold = 100 * 1024 * 1024 # 100 MB + + small_count = sum(1 for s in sizes if s < small_threshold) + large_count = sum(1 for s in sizes if s > large_threshold) + medium_count = n - small_count - large_count + + return FileSizeStats( + total_files=n, + total_bytes=total, + min_size=sorted_sizes[0], + max_size=sorted_sizes[-1], + median_size=percentile(0.5), + mean_size=total / n, + p10_size=percentile(0.1), + p90_size=percentile(0.9), + small_files_pct=100.0 * small_count / n, + medium_files_pct=100.0 * medium_count / n, + large_files_pct=100.0 * large_count / n, + ) + + +@dataclass +class RcloneFlagsSuggestion: + """Suggested rclone flags based on file size distribution.""" + + profile: str # "small_files", "large_files", or "mixed" + flags: str + explanation: str + + +def suggest_rclone_flags_from_sizes(sizes: List[int]) -> RcloneFlagsSuggestion: + """ + Analyze file sizes and suggest optimal rclone flags. + + Profiles: + - small_files: Many small files (>70% < 1MB) - high parallelism + - large_files: Many large files (>50% > 100MB) - fewer streams, larger buffers + - mixed: Default balanced settings + """ + stats = compute_file_size_stats(sizes) + + if stats.total_files == 0: + return RcloneFlagsSuggestion( + profile="empty", + flags="--transfers 32 --checkers 64 --fast-list", + explanation="No files to analyze, using default settings", + ) + + # Small files profile: high parallelism for many small files + if stats.small_files_pct > 70 or stats.median_size < 1024 * 1024: + return RcloneFlagsSuggestion( + profile="small_files", + flags="--transfers 64 --checkers 128 --fast-list", + explanation=f"Optimized for small files ({stats.small_files_pct:.0f}% < 1MB, median {human_bytes(stats.median_size)})", + ) + + # Large files profile: fewer streams, larger buffers + if stats.large_files_pct > 50 or stats.median_size > 100 * 1024 * 1024: + return RcloneFlagsSuggestion( + profile="large_files", + flags="--transfers 16 --checkers 32 --buffer-size 256M", + explanation=f"Optimized for large files ({stats.large_files_pct:.0f}% > 100MB, median {human_bytes(stats.median_size)})", + ) + + # Mixed/default profile + return RcloneFlagsSuggestion( + profile="mixed", + flags="--transfers 32 --checkers 64 --fast-list", + explanation=f"Balanced settings for mixed file sizes (median {human_bytes(stats.median_size)})", + ) + + +def format_histogram_data( + sizes: List[int], +) -> List[Dict[str, Any]]: + """ + Return histogram as structured data (for JSON serialization). + + Returns list of dicts with: range_label, count, pct_files, bytes, pct_bytes + """ + if not sizes: + return [] + + obs_min = max(1, min(sizes)) + obs_max = max(sizes) + + # Use power-of-2 bins + min_b = 2 ** int(math.floor(math.log2(obs_min))) + max_b = 2 ** int(math.ceil(math.log2(obs_max))) + + edges = [float(x) for x in default_pow2_edges(min_b, max_b)] + counts, bytes_bin = histogram_counts(sizes, edges) + + total_files = len(sizes) + total_bytes = sum(sizes) + + result = [] + for i in range(len(edges) - 1): + lo = int(edges[i]) + hi = int(edges[i + 1]) + c = counts[i] + b = bytes_bin[i] + pf = (100.0 * c / total_files) if total_files else 0.0 + pb = (100.0 * b / total_bytes) if total_bytes else 0.0 + + result.append( + { + "range_min": lo, + "range_max": hi, + "range_label": f"{human_bytes(lo)} - {human_bytes(hi)}", + "file_count": c, + "pct_files": round(pf, 1), + "bytes": b, + "bytes_human": human_bytes(b), + "pct_bytes": round(pb, 1), + } + ) + + return result + + +def format_histogram_text(sizes: List[int], width: int = 30) -> str: + """Format histogram as text for display in Slack/terminal.""" + if not sizes: + return "No files to display" + + hist_data = format_histogram_data(sizes) + counts = [h["file_count"] for h in hist_data] + max_count = max(counts) if counts else 0 + + lines = ["File size distribution:"] + lines.append("") + + for h in hist_data: + if h["file_count"] == 0: + continue + bar_len = int((h["file_count"] / max_count) * width) if max_count > 0 else 0 + bar = "█" * bar_len + lines.append( + f" {h['range_label']:>20}: {h['file_count']:>8} files ({h['pct_files']:5.1f}%) {bar}" + ) + + return "\n".join(lines) + + # ----------------------------- # Main # -----------------------------