From 66dadd2cdacd96e32bab7c3d42a24eb4e95c6994 Mon Sep 17 00:00:00 2001 From: Frans Irgolitsch Date: Wed, 29 Apr 2026 15:38:13 -0400 Subject: [PATCH] feat: add slice-config module and generator CLI (#106) --- linumpy/io/slice_config.py | 318 +++++ linumpy/tests/test_io_slice_config.py | 214 ++++ scripts/linum_generate_slice_config.py | 299 +++++ scripts/tests/test_generate_slice_config.py | 78 ++ workflows/preproc/nextflow.config | 119 +- workflows/preproc/preproc_rawtiles.nf | 91 +- workflows/reconst_3d/diagnostics.nf | 127 -- workflows/reconst_3d/nextflow.config | 511 +------- workflows/reconst_3d/soct_3d_reconst.nf | 1244 ++----------------- 9 files changed, 1091 insertions(+), 1910 deletions(-) create mode 100644 linumpy/io/slice_config.py create mode 100644 linumpy/tests/test_io_slice_config.py create mode 100644 scripts/linum_generate_slice_config.py create mode 100644 scripts/tests/test_generate_slice_config.py delete mode 100644 workflows/reconst_3d/diagnostics.nf diff --git a/linumpy/io/slice_config.py b/linumpy/io/slice_config.py new file mode 100644 index 00000000..b117eede --- /dev/null +++ b/linumpy/io/slice_config.py @@ -0,0 +1,318 @@ +""" +Shared helpers for reading, writing and stamping ``slice_config.csv``. + +``slice_config.csv`` is the single per-slice trace file threaded through +the reconstruction pipeline. Each stage that makes a per-slice decision +(quality assessment, rehoming correction, auto-exclusion, missing-slice +interpolation, ...) stamps its flag columns via this module and hands +the enriched file to the next stage. + +Only pipeline-*decision* columns live here; raw metrics belong in the +pipeline report and per-stage diagnostics JSON. + +Concurrency model +----------------- + +This module does **not** implement any file locking. Safe concurrent use +depends on the upstream Nextflow pipeline's channel discipline: + +* Every process receives ``slice_config.csv`` as an immutable input + staged into its own work directory. Nothing reads and writes the same + file at the same time. +* Per-slice stages (interpolation, pairwise registration, ...) emit + per-slice fragment files (``slice_z{NN}_manifest.csv``). Those fragments + are collected and merged sequentially in a single downstream process + (``finalise_interpolation``), so the CSV writer always runs on a single + worker. +* Stamping helpers (:func:`stamp` / :func:`merge_fragments`) always produce + a *new* CSV at ``slice_config_out`` rather than updating in place, so a + reader on the old version is never in a torn state. + +If you ever need to call these helpers outside of Nextflow (e.g. ad-hoc +scripts running in parallel), make sure each writer targets a distinct +output path; otherwise the last writer wins. +""" + +from __future__ import annotations + +import csv +from collections import OrderedDict +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + +CANONICAL_COLUMNS: list[str] = [ + "slice_id", + "use", + "exclude_reason", + "quality_score", + "galvo_confidence", + "galvo_fix", + "notes", + "rehomed", + "rehoming_reliable", + "auto_excluded", + "auto_exclude_reason", + "interpolated", + "interpolation_failed", + "interpolation_method_used", + "interpolation_fallback_reason", +] + +TRUE_STRINGS = frozenset({"true", "1", "yes", "y", "t"}) +FALSE_STRINGS = frozenset({"false", "0", "no", "n", "f", ""}) + + +def normalize_slice_id(slice_id: object) -> str: + """Return ``slice_id`` as a two-digit zero-padded string (``"01"``, ``"17"``). + + Accepts int / str / float ("1.0") inputs. Falls back to ``str(slice_id).strip()`` + for non-numeric ids. + """ + if slice_id is None: + return "" + if isinstance(slice_id, (int,)): + return f"{int(slice_id):02d}" + text = str(slice_id).strip() + if not text: + return "" + try: + return f"{int(float(text)):02d}" + except ValueError: + return text + + +def _coerce_bool(value: object) -> bool: + """Coerce a CSV cell to bool; empty / unknown => False.""" + if isinstance(value, bool): + return value + if value is None: + return False + text = str(value).strip().lower() + if text in TRUE_STRINGS: + return True + if text in FALSE_STRINGS: + return False + return False + + +def read(path: Path) -> OrderedDict[str, dict[str, str]]: + """Read ``slice_config.csv``; return ``slice_id -> row`` with normalized ids. + + Raises :class:`FileNotFoundError` if the file does not exist. + Row values are kept as strings (CSV native); use :func:`get_flag` for bool + coercion. + """ + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"slice_config not found: {path}") + rows: OrderedDict[str, dict[str, str]] = OrderedDict() + with path.open() as f: + reader = csv.DictReader(f) + for raw in reader: + sid = normalize_slice_id(raw.get("slice_id", "")) + if not sid: + continue + cleaned = {k: ("" if v is None else str(v)) for k, v in raw.items()} + cleaned["slice_id"] = sid + rows[sid] = cleaned + return rows + + +def read_header(path: Path) -> list[str]: + """Return the header row of ``path`` (empty list if file has no header).""" + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"slice_config not found: {path}") + with path.open() as f: + reader = csv.reader(f) + try: + return next(reader) + except StopIteration: + return [] + + +def _as_cell(value: object) -> str: + """Stringify a value for CSV storage (bool -> 'true'/'false').""" + if isinstance(value, bool): + return "true" if value else "false" + if value is None: + return "" + return str(value) + + +def _build_header(rows: Iterable[Mapping[str, object]], extra_columns: Iterable[str]) -> list[str]: + """Build header: canonical columns (in order) + any other columns seen in rows or in ``extra_columns``. + + Preserves insertion order. + """ + seen: list[str] = [] + seen_set: set[str] = set() + for col in CANONICAL_COLUMNS: + if col not in seen_set: + seen.append(col) + seen_set.add(col) + for col in extra_columns: + if col not in seen_set: + seen.append(col) + seen_set.add(col) + for row in rows: + for col in row: + if col not in seen_set: + seen.append(col) + seen_set.add(col) + return seen + + +def write( + path: Path, + rows: Iterable[Mapping[str, object]], + extra_columns: Iterable[str] = (), +) -> None: + """Atomically write ``rows`` to ``path``. + + - The header always starts with :data:`CANONICAL_COLUMNS` (in that order); + any extra columns come after. Missing canonical columns are emitted + empty. + - Rows are sorted by ``slice_id``. + - ``slice_id`` is normalised to a 2-digit string. + """ + rows_list = [dict(r) for r in rows] + for r in rows_list: + r["slice_id"] = normalize_slice_id(r.get("slice_id", "")) + rows_list.sort(key=lambda r: r.get("slice_id", "")) + + header = _build_header(rows_list, extra_columns) + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=header) + writer.writeheader() + for row in rows_list: + writer.writerow({col: _as_cell(row.get(col, "")) for col in header}) + tmp.replace(path) + + +def stamp( + path_in: Path, + path_out: Path, + slice_id: object, + **flags: object, +) -> None: + """Stamp a single slice: read ``path_in``, update ``slice_id`` with ``flags``, write to ``path_out``. + + New slice rows are appended with ``use=false`` when the row is absent. + """ + stamp_many(path_in, path_out, {normalize_slice_id(slice_id): dict(flags)}) + + +def stamp_many( + path_in: Path, + path_out: Path, + updates: Mapping[str, Mapping[str, object]], +) -> None: + """Stamp multiple slices at once. + + ``updates`` maps ``slice_id -> {column: value}``. Unknown slices are + appended with ``use=false`` unless the caller supplies a ``use`` key. + """ + rows = read(path_in) + for raw_sid, flags in updates.items(): + sid = normalize_slice_id(raw_sid) + if not sid: + continue + existing = rows.get(sid) + if existing is None: + new_row: dict[str, str] = {"slice_id": sid, "use": "false"} + for k, v in flags.items(): + new_row[k] = _as_cell(v) + rows[sid] = new_row + else: + for k, v in flags.items(): + existing[k] = _as_cell(v) + write(path_out, rows.values()) + + +def merge_fragments( + path_in: Path, + fragment_paths: Iterable[Path], + path_out: Path, + column_map: Mapping[str, str] | None = None, +) -> None: + """Merge per-slice CSV fragments into ``path_in`` and write to ``path_out``. + + Each fragment is a small CSV with at least a ``slice_id`` column. Columns + from the fragment are stamped onto the matching slice row, renamed via + ``column_map`` if provided (``{fragment_col: target_col}``). + + Fragments that reference slices absent from the base config add new rows + (``use=false``). + """ + updates: dict[str, dict[str, object]] = {} + for frag in fragment_paths: + frag_path = Path(frag) + if not frag_path.exists(): + continue + with frag_path.open() as f: + reader = csv.DictReader(f) + for raw in reader: + sid = normalize_slice_id(raw.get("slice_id", "")) + if not sid: + continue + entry = updates.setdefault(sid, {}) + for col, val in raw.items(): + if col == "slice_id" or val is None: + continue + target = column_map.get(col, col) if column_map else col + if target: + entry[target] = val + stamp_many(path_in, path_out, updates) + + +def filter_slices_to_use(path: Path) -> set[str]: + """Return the set of slice IDs whose ``use`` column is truthy. + + When ``slice_config.csv`` is missing this raises :class:`FileNotFoundError` + — callers should guard on ``path.exists()`` or pass an optional path + themselves. + """ + rows = read(path) + return {sid for sid, row in rows.items() if _coerce_bool(row.get("use", ""))} + + +def get_flag(row: Mapping[str, object], column: str, default: bool = False) -> bool: + """Return a boolean flag from a config row (default when absent/empty).""" + if column not in row: + return default + value = row.get(column, "") + if value is None or value == "": + return default + return _coerce_bool(value) + + +def is_interpolated(path: Path, slice_id: object) -> bool: + """Return True if ``slice_id`` is flagged as interpolated in ``path``.""" + sid = normalize_slice_id(slice_id) + rows = read(path) + row = rows.get(sid) + if row is None: + return False + return get_flag(row, "interpolated") + + +def force_skip_slices(path: Path) -> set[str]: + """Return slice IDs that stacking should treat as motor-only (force-skip their pairwise transforms). + + A slice is force-skipped when it is explicitly excluded (``use=false``) + or was flagged by auto-exclude (``auto_excluded=true``). + """ + rows = read(path) + skip: set[str] = set() + for sid, row in rows.items(): + used = _coerce_bool(row.get("use", "true")) if row.get("use", "") != "" else True + if not used or get_flag(row, "auto_excluded"): + skip.add(sid) + return skip diff --git a/linumpy/tests/test_io_slice_config.py b/linumpy/tests/test_io_slice_config.py new file mode 100644 index 00000000..b054d1d7 --- /dev/null +++ b/linumpy/tests/test_io_slice_config.py @@ -0,0 +1,214 @@ +"""Tests for linumpy/io/slice_config.py.""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from linumpy.io import slice_config + + +def _write(path: Path, header: list[str], rows: list[dict[str, object]]) -> None: + with path.open("w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=header) + writer.writeheader() + for row in rows: + writer.writerow({k: row.get(k, "") for k in header}) + + +def _read_rows(path: Path) -> tuple[list[str], list[dict[str, str]]]: + with path.open() as f: + reader = csv.DictReader(f) + return list(reader.fieldnames or []), list(reader) + + +def test_normalize_slice_id_variants(): + assert slice_config.normalize_slice_id(1) == "01" + assert slice_config.normalize_slice_id("1") == "01" + assert slice_config.normalize_slice_id("01") == "01" + assert slice_config.normalize_slice_id("1.0") == "01" + assert slice_config.normalize_slice_id(" 7 ") == "07" + assert slice_config.normalize_slice_id("") == "" + assert slice_config.normalize_slice_id("a_custom_id") == "a_custom_id" + + +def test_read_round_trip(tmp_path: Path): + path = tmp_path / "slice_config.csv" + _write( + path, + ["slice_id", "use", "notes"], + [ + {"slice_id": "00", "use": "true", "notes": ""}, + {"slice_id": "01", "use": "false", "notes": "bad"}, + ], + ) + rows = slice_config.read(path) + assert list(rows.keys()) == ["00", "01"] + assert rows["01"]["use"] == "false" + assert rows["01"]["notes"] == "bad" + + +def test_read_normalises_ids(tmp_path: Path): + path = tmp_path / "slice_config.csv" + _write( + path, + ["slice_id", "use"], + [ + {"slice_id": "1", "use": "true"}, + {"slice_id": "2.0", "use": "false"}, + ], + ) + rows = slice_config.read(path) + assert set(rows) == {"01", "02"} + + +def test_write_orders_canonical_first(tmp_path: Path): + path = tmp_path / "slice_config.csv" + slice_config.write( + path, + [ + {"slice_id": "02", "use": True, "custom": "extra", "interpolated": "true"}, + {"slice_id": "01", "use": False, "custom": "foo"}, + ], + ) + header, rows = _read_rows(path) + assert header[0] == "slice_id" + assert "use" in header + assert "interpolated" in header + assert "custom" in header + assert header.index("use") < header.index("custom") + assert header.index("interpolated") < header.index("custom") + assert [r["slice_id"] for r in rows] == ["01", "02"] + assert rows[0]["use"] == "false" + assert rows[1]["use"] == "true" + + +def test_stamp_updates_existing_row(tmp_path: Path): + path_in = tmp_path / "in.csv" + path_out = tmp_path / "out.csv" + _write( + path_in, + ["slice_id", "use"], + [{"slice_id": "00", "use": "true"}, {"slice_id": "01", "use": "true"}], + ) + slice_config.stamp(path_in, path_out, "01", rehomed=True, rehoming_reliable=0) + rows = slice_config.read(path_out) + assert rows["01"]["rehomed"] == "true" + assert rows["01"]["rehoming_reliable"] == "0" + assert rows["00"].get("rehomed", "") == "" + + +def test_stamp_adds_unknown_slice(tmp_path: Path): + path_in = tmp_path / "in.csv" + path_out = tmp_path / "out.csv" + _write(path_in, ["slice_id", "use"], [{"slice_id": "00", "use": "true"}]) + slice_config.stamp(path_in, path_out, "03", interpolated=True) + rows = slice_config.read(path_out) + assert "03" in rows + assert rows["03"]["use"] == "false" + assert rows["03"]["interpolated"] == "true" + + +def test_merge_fragments(tmp_path: Path): + base = tmp_path / "base.csv" + out = tmp_path / "out.csv" + _write( + base, + ["slice_id", "use", "notes"], + [ + {"slice_id": "00", "use": "true", "notes": ""}, + {"slice_id": "01", "use": "false", "notes": "bad"}, + {"slice_id": "02", "use": "true", "notes": ""}, + ], + ) + frag1 = tmp_path / "frag1.csv" + _write( + frag1, + ["slice_id", "method_used", "fallback_reason"], + [{"slice_id": "01", "method_used": "zmorph", "fallback_reason": ""}], + ) + frag2 = tmp_path / "frag2.csv" + _write( + frag2, + ["slice_id", "method_used"], + [{"slice_id": "05", "method_used": "weighted"}], + ) + slice_config.merge_fragments( + base, + [frag1, frag2], + out, + column_map={ + "method_used": "interpolation_method_used", + "fallback_reason": "interpolation_fallback_reason", + }, + ) + rows = slice_config.read(out) + assert rows["01"]["interpolation_method_used"] == "zmorph" + assert rows["01"]["notes"] == "bad" + assert rows["05"]["use"] == "false" + assert rows["05"]["interpolation_method_used"] == "weighted" + + +def test_filter_slices_to_use(tmp_path: Path): + path = tmp_path / "sc.csv" + _write( + path, + ["slice_id", "use"], + [ + {"slice_id": "00", "use": "true"}, + {"slice_id": "01", "use": "false"}, + {"slice_id": "02", "use": "YES"}, + {"slice_id": "03", "use": ""}, + ], + ) + assert slice_config.filter_slices_to_use(path) == {"00", "02"} + + +def test_force_skip_slices(tmp_path: Path): + path = tmp_path / "sc.csv" + _write( + path, + ["slice_id", "use", "auto_excluded"], + [ + {"slice_id": "00", "use": "true", "auto_excluded": "false"}, + {"slice_id": "01", "use": "false", "auto_excluded": "false"}, + {"slice_id": "02", "use": "true", "auto_excluded": "true"}, + ], + ) + assert slice_config.force_skip_slices(path) == {"01", "02"} + + +def test_is_interpolated(tmp_path: Path): + path = tmp_path / "sc.csv" + _write( + path, + ["slice_id", "use", "interpolated"], + [ + {"slice_id": "00", "use": "true", "interpolated": "false"}, + {"slice_id": "01", "use": "false", "interpolated": "true"}, + ], + ) + assert slice_config.is_interpolated(path, "01") is True + assert slice_config.is_interpolated(path, 0) is False + assert slice_config.is_interpolated(path, 99) is False + + +def test_read_missing_file_raises(tmp_path: Path): + with pytest.raises(FileNotFoundError): + slice_config.read(tmp_path / "does_not_exist.csv") + + +def test_stamp_preserves_unknown_extra_columns(tmp_path: Path): + path_in = tmp_path / "in.csv" + path_out = tmp_path / "out.csv" + _write( + path_in, + ["slice_id", "use", "legacy_metric"], + [{"slice_id": "00", "use": "true", "legacy_metric": "42.0"}], + ) + slice_config.stamp(path_in, path_out, "00", interpolated=True) + rows = slice_config.read(path_out) + assert rows["00"]["legacy_metric"] == "42.0" + assert rows["00"]["interpolated"] == "true" diff --git a/scripts/linum_generate_slice_config.py b/scripts/linum_generate_slice_config.py new file mode 100644 index 00000000..ed42fe82 --- /dev/null +++ b/scripts/linum_generate_slice_config.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +"""Generate a slice configuration file for controlling which slices are used in the 3D reconstruction pipeline. + +This script can detect slices from: +1. A directory containing mosaic grids (*.ome.zarr files with z## in the name) +2. A directory containing raw tiles (tile_x*_y*_z* folders) +3. An existing shifts_xy.csv file + +The output is a CSV file with columns: +- slice_id: The slice identifier (e.g., 00, 01, 02) +- use: Boolean whether to use this slice (true/false) +- galvo_confidence: (optional) Galvo shift detection confidence (0-1) +- galvo_fix: (optional) Whether galvo fix would be applied (true/false) +- notes: Optional notes for documentation + +Example usage: + # From mosaic grids directory + linum_generate_slice_config.py /path/to/mosaics slice_config.csv + + # From raw tiles directory + linum_generate_slice_config.py /path/to/raw_tiles slice_config.csv --from_tiles + + # From existing shifts file + linum_generate_slice_config.py /path/to/shifts_xy.csv slice_config.csv --from_shifts + + # With galvo detection (requires raw tiles) + linum_generate_slice_config.py /path/to/raw_tiles slice_config.csv --from_tiles --detect_galvo +""" + +# Configure thread limits before numpy/scipy imports +import linumpy.config.threads # noqa: F401 + +import argparse +import csv +import re +from pathlib import Path + +import numpy as np +from tqdm.auto import tqdm + +from linumpy.cli.args import add_overwrite_arg, assert_output_exists +from linumpy.geometry.galvo import detect_galvo_for_slice +from linumpy.io import slice_config as slice_config_io +from linumpy.microscope.oct import OCT +from linumpy.mosaic.discovery import get_tiles_ids + + +def _build_arg_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter) + p.add_argument("input", help="Input directory (mosaic grids or raw tiles) or shifts CSV file") + p.add_argument("output_file", help="Output slice configuration CSV file") + + source_group = p.add_mutually_exclusive_group() + source_group.add_argument("--from_tiles", action="store_true", help="Input is a raw tiles directory") + source_group.add_argument("--from_shifts", action="store_true", help="Input is an existing shifts_xy.csv file") + + p.add_argument("--exclude", nargs="+", type=int, default=[], help="List of slice IDs to exclude (set use=false)") + p.add_argument("--exclude_first", type=int, default=1, help="Exclude first N slices as calibration slices [%(default)s]") + + # Galvo detection options + galvo_group = p.add_argument_group("Galvo Detection", "Detect galvo shift artifacts in raw tiles") + galvo_group.add_argument( + "--detect_galvo", action="store_true", help="Run galvo shift detection (requires --from_tiles or raw tiles dir)" + ) + galvo_group.add_argument( + "--tiles_dir", type=str, default=None, help="Raw tiles directory for galvo detection (if input is shifts file)" + ) + galvo_group.add_argument( + "--galvo_threshold", type=float, default=0.6, help="Confidence threshold for galvo fix [%(default)s]" + ) + + add_overwrite_arg(p) + return p + + +def get_slice_ids_from_mosaics(directory: Path) -> list: + """Extract slice IDs from mosaic grid filenames.""" + pattern = r".*z(\d+).*\.ome\.zarr$" + slice_ids = [] + + for f in directory.iterdir(): + if f.is_dir() and f.suffix == ".zarr": + match = re.match(pattern, f.name) + if match: + slice_id = int(match.group(1)) + slice_ids.append(slice_id) + + return sorted(set(slice_ids)) + + +def get_slice_ids_from_tiles(directory: Path) -> list: + """Extract slice IDs from raw tile directories.""" + _, tile_ids = get_tiles_ids(directory) + z_values = np.unique([ids[2] for ids in tile_ids]) + return sorted(z_values.tolist()) + + +def get_slice_ids_from_shifts(shifts_file: Path) -> list: + """Extract slice IDs from an existing shifts_xy.csv file.""" + slice_ids = set() + + with Path(shifts_file).open() as f: + reader = csv.DictReader(f) + for row in reader: + # Handle both int and float string formats (e.g., '0' or '0.0') + slice_ids.add(int(float(row["fixed_id"]))) + slice_ids.add(int(float(row["moving_id"]))) + + return sorted(slice_ids) + + +def detect_galvo_for_slices(tiles_dir: Path, slice_ids: list, threshold: float = 0.3) -> dict: + """ + Detect galvo shift artifacts for each slice. + + Parameters + ---------- + tiles_dir : Path + Directory containing raw tiles + slice_ids : list + List of slice IDs to analyze + threshold : float + Confidence threshold for applying fix + + Returns + ------- + dict + Mapping from slice_id to {'confidence': float, 'would_fix': bool} + """ + results = {} + + for z in tqdm(slice_ids, desc="Detecting galvo shift"): + try: + # Get tiles for this slice + tiles, _ = get_tiles_ids(tiles_dir, z=z) + + if not tiles: + results[z] = {"confidence": 0.0, "would_fix": False, "error": "no_tiles"} + continue + + oct = OCT(tiles[0]) + n_extra = oct.info.get("n_extra", 0) + + if n_extra == 0: + results[z] = {"confidence": 0.0, "would_fix": False, "error": "no_extra_alines"} + continue + + # Use centralized detection with multi-tile sampling + shift, confidence = detect_galvo_for_slice(tiles, n_extra, threshold=threshold) + + results[z] = { + "confidence": confidence, + "would_fix": confidence >= threshold, + "shift": shift if confidence >= threshold else 0, + } + except Exception as e: + results[z] = {"confidence": 0.0, "would_fix": False, "error": str(e)} + + return results + + +def write_slice_config( + output_file: Path, + slice_ids: list, + exclude_ids: list | None = None, + galvo_results: dict | None = None, + first_slice_excludes: list | None = None, +) -> None: + """Write the slice configuration file. + + Parameters + ---------- + output_file : Path + Output CSV file path + slice_ids : list + List of slice IDs to include + exclude_ids : list + List of slice IDs to exclude (mark use=false) + galvo_results : dict + Optional galvo detection results + first_slice_excludes : list + List of slice IDs excluded as calibration/first slices + """ + if exclude_ids is None: + exclude_ids = [] + if first_slice_excludes is None: + first_slice_excludes = [] + + rows: list[dict[str, object]] = [] + for slice_id in slice_ids: + use = "false" if slice_id in exclude_ids else "true" + note = "calibration_slice" if slice_id in first_slice_excludes else "" + + row: dict[str, object] = {"slice_id": f"{slice_id:02d}", "use": use} + if galvo_results is not None: + galvo = galvo_results.get(slice_id) + if galvo is not None: + row["galvo_confidence"] = f"{galvo['confidence']:.3f}" + row["galvo_fix"] = "true" if galvo.get("would_fix", False) else "false" + galvo_note = galvo.get("error", "") + if galvo_note and note: + note = f"{note}; {galvo_note}" + elif galvo_note: + note = galvo_note + else: + row["galvo_confidence"] = "0.000" + row["galvo_fix"] = "false" + if not note: + note = "not_analyzed" + if note: + row["notes"] = note + rows.append(row) + + slice_config_io.write(output_file, rows) + + +def main() -> None: + """Run function operation.""" + p = _build_arg_parser() + args = p.parse_args() + + input_path = Path(args.input) + output_file = Path(args.output_file) + + assert_output_exists(output_file, p, args) + + # Determine tiles directory for galvo detection + tiles_dir = None + if args.tiles_dir: + tiles_dir = Path(args.tiles_dir) + elif args.from_tiles: + tiles_dir = input_path + + # Validate galvo detection requirements + if args.detect_galvo and tiles_dir is None: + p.error("--detect_galvo requires --from_tiles or --tiles_dir to specify raw tiles location") + + if args.detect_galvo and tiles_dir and not tiles_dir.is_dir(): + p.error(f"Tiles directory not found: {tiles_dir}") + + # Detect slice IDs based on input type + if args.from_shifts: + if not input_path.exists(): + p.error(f"Shifts file not found: {input_path}") + slice_ids = get_slice_ids_from_shifts(input_path) + print(f"Found {len(slice_ids)} slices in shifts file: {input_path}") + elif args.from_tiles: + if not input_path.is_dir(): + p.error(f"Tiles directory not found: {input_path}") + slice_ids = get_slice_ids_from_tiles(input_path) + print(f"Found {len(slice_ids)} slices in tiles directory: {input_path}") + else: + # Default: assume mosaic grids directory + if not input_path.is_dir(): + p.error(f"Mosaics directory not found: {input_path}") + slice_ids = get_slice_ids_from_mosaics(input_path) + print(f"Found {len(slice_ids)} slices in mosaics directory: {input_path}") + + if not slice_ids: + p.error("No slices found in input. Check the input path and type.") + + # Build exclude list + exclude_ids = list(args.exclude) + first_slice_excludes = [] + + # Exclude first N slices (calibration slices) + if args.exclude_first > 0: + first_n = slice_ids[: args.exclude_first] + first_slice_excludes = first_n + for sid in first_n: + if sid not in exclude_ids: + exclude_ids.append(sid) + print(f"Excluding first {args.exclude_first} slice(s) as calibration: {first_n}") + + # Run galvo detection if requested + galvo_results = None + if args.detect_galvo: + print(f"\nRunning galvo shift detection (threshold={args.galvo_threshold})...") + assert tiles_dir is not None + galvo_results = detect_galvo_for_slices(tiles_dir, slice_ids, args.galvo_threshold) + + # Print summary + fix_count = sum(1 for r in galvo_results.values() if r.get("would_fix", False)) + skip_count = len(galvo_results) - fix_count + print("\nGalvo Detection Summary:") + print(f" Fix would be applied: {fix_count} slices") + print(f" Fix would be skipped: {skip_count} slices") + + # Write the configuration file + write_slice_config(output_file, slice_ids, exclude_ids, galvo_results, first_slice_excludes) + + print(f"\nSlice configuration written to: {output_file}") + if args.exclude: + print(f"Excluded slices: {args.exclude}") + print(f"Slice IDs: {[f'{s:02d}' for s in slice_ids]}") + + +if __name__ == "__main__": + main() diff --git a/scripts/tests/test_generate_slice_config.py b/scripts/tests/test_generate_slice_config.py new file mode 100644 index 00000000..47677f90 --- /dev/null +++ b/scripts/tests/test_generate_slice_config.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +import csv +from pathlib import Path + + +def test_help(script_runner): + ret = script_runner.run(["linum_generate_slice_config.py", "--help"]) + assert ret.success + + +def test_from_shifts_file(script_runner, tmp_path): + """Test generating slice config from an existing shifts file.""" + # Create a sample shifts file + shifts_file = tmp_path / "shifts_xy.csv" + with Path(shifts_file).open("w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["fixed_id", "moving_id", "x_shift", "y_shift", "x_shift_mm", "y_shift_mm"]) + writer.writerow([0, 1, 10, 5, 0.01, 0.005]) + writer.writerow([1, 2, 8, 3, 0.008, 0.003]) + writer.writerow([2, 3, 12, 7, 0.012, 0.007]) + + output = tmp_path / "slice_config.csv" + ret = script_runner.run( + ["linum_generate_slice_config.py", str(shifts_file), str(output), "--from_shifts", "--exclude_first", "0"] + ) + assert ret.success + assert output.exists() + + # Verify the content + with Path(output).open() as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 4 # slices 0, 1, 2, 3 + for row in rows: + assert row["use"] == "true" + assert row["slice_id"] in ["00", "01", "02", "03"] + + +def test_from_shifts_file_with_exclude(script_runner, tmp_path): + """Test generating slice config with exclusions.""" + # Create a sample shifts file + shifts_file = tmp_path / "shifts_xy.csv" + with Path(shifts_file).open("w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["fixed_id", "moving_id", "x_shift", "y_shift", "x_shift_mm", "y_shift_mm"]) + writer.writerow([0, 1, 10, 5, 0.01, 0.005]) + writer.writerow([1, 2, 8, 3, 0.008, 0.003]) + writer.writerow([2, 3, 12, 7, 0.012, 0.007]) + + output = tmp_path / "slice_config.csv" + ret = script_runner.run( + [ + "linum_generate_slice_config.py", + str(shifts_file), + str(output), + "--from_shifts", + "--exclude_first", + "0", + "--exclude", + "1", + "2", + ] + ) + assert ret.success + assert output.exists() + + # Verify the content + with Path(output).open() as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 4 + for row in rows: + if row["slice_id"] in ["01", "02"]: + assert row["use"] == "false" + else: + assert row["use"] == "true" diff --git a/workflows/preproc/nextflow.config b/workflows/preproc/nextflow.config index bbd4c743..8deec284 100644 --- a/workflows/preproc/nextflow.config +++ b/workflows/preproc/nextflow.config @@ -2,131 +2,26 @@ manifest { nextflowVersion = '>= 23.10' } -params { - // ========================================================================= - // INPUT/OUTPUT - // ========================================================================= - input = "" - output = "output" - use_old_folder_structure = false // Use old folder structure where tiles are not in Z subfolders - - // ========================================================================= - // COMPUTE RESOURCES - // ========================================================================= - use_gpu = true // Enable GPU acceleration (auto-fallback to CPU if unavailable) - processes = 1 // Number of parallel Python processes per Nextflow task (CPU mode only) - - // CPU resource management - enable_cpu_limits = true // Enable CPU limiting via thread-count environment variables - max_cpus = null // null = auto-detect from machine, or set explicit number - reserved_cpus = 2 // CPUs reserved for system overhead when max_cpus is null - - // GPU concurrency - // Each GPU job uses ~2 CPU threads and a fraction of one GPU. - // With multiple GPUs set max_mosaic_forks = GPUs × concurrent-jobs-per-GPU. - // Example: 2 × 48 GB GPUs → max_mosaic_forks = 4 (2 jobs per GPU) - max_mosaic_forks = 4 // Max concurrent create_mosaic_grid jobs - max_aip_forks = 4 // Max concurrent generate_aip jobs - - // ========================================================================= - // MOSAIC GRID PARAMETERS - // ========================================================================= - axial_resolution = 1.36 // Axial resolution of imaging system in microns - resolution = -1 // Output resolution (µm/pixel). -1 = full native resolution - sharding_factor = 4 // There will be N × N chunks per shard - - // ========================================================================= - // CORRECTION OPTIONS - // ========================================================================= - fix_galvo_shift = true // Fix galvo mirror timing artifact (true for new data) - fix_camera_shift = false // Fix camera offset artifact (false for new data) - preprocess = false // Apply rotation/flip preprocessing (true for legacy data) - galvo_confidence_threshold = 0.6 // Minimum confidence (0–1) to apply galvo fix - - // ========================================================================= - // SLICE CONFIGURATION - // ========================================================================= - generate_slice_config = true // Generate slice_config.csv for controlling which slices to use - exclude_first_slices = 1 // Exclude first N slices as calibration - detect_galvo = false // Run galvo detection and include results in slice_config.csv - - // ========================================================================= - // OPTIONAL OUTPUTS - // ========================================================================= - generate_previews = false // Generate orthogonal view previews of mosaic grids - generate_aips = false // Generate AIP images from mosaic grids for QC visualization -} - -// ========================================================================= -// PROCESS CONFIGURATION -// ========================================================================= process { publishDir = {"$params.output"} scratch = true - stageInMode = 'symlink' - stageOutMode = 'rsync' + stageInMode='symlink' + stageOutMode='rsync' errorStrategy = { task.attempt <= 3 ? 'retry' : 'ignore' } maxRetries = 3 - afterScript = 'sleep 1' - - // Thread limiting for Python scripts. CPU budget logic is inlined here - // (Nextflow v26 strict config DSL no longer allows top-level `def`). - beforeScript = { - if (params.enable_cpu_limits == false) return "" - - int totalCpus = Runtime.runtime.availableProcessors() - int maxCpus = (params.max_cpus != null && params.max_cpus > 0) - ? Math.min(params.max_cpus as int, totalCpus) - : Math.max(1, totalCpus - (params.reserved_cpus ?: 2) as int) - int numProcesses = Math.max(1, (params.processes ?: 1) as int) - int threadsPerProcess = Math.max(1, (maxCpus / numProcesses) as int) - - def envVars = [] - if (params.max_cpus != null && params.max_cpus > 0) { - envVars << "export LINUMPY_MAX_CPUS=${params.max_cpus as int}" - } else { - envVars << "export LINUMPY_RESERVED_CPUS=${(params.reserved_cpus ?: 2) as int}" - } - - envVars << "export OMP_NUM_THREADS=${threadsPerProcess}" - envVars << "export MKL_NUM_THREADS=${threadsPerProcess}" - envVars << "export OPENBLAS_NUM_THREADS=${threadsPerProcess}" - envVars << "export VECLIB_MAXIMUM_THREADS=${threadsPerProcess}" - envVars << "export NUMEXPR_NUM_THREADS=${threadsPerProcess}" - envVars << "export NUMBA_NUM_THREADS=${threadsPerProcess}" - envVars << "export ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS=${threadsPerProcess}" - envVars << "export XLA_FLAGS='--xla_cpu_multi_thread_eigen=false intra_op_parallelism_threads=${threadsPerProcess}'" - - return envVars.join('\n') - } - - withName: "create_mosaic_grid" { - // In GPU mode each job uses ~2 CPU threads (main + I/O prefetch); GPU - // contention is capped by max_mosaic_forks. Set it to GPUs × jobs-per-GPU. - maxForks = params.use_gpu ? params.max_mosaic_forks : null - } - - withName: "generate_aip" { - maxForks = params.use_gpu ? params.max_aip_forks : null - } + afterScript='sleep 1' } -// ========================================================================= -// CONTAINER CONFIGURATION -// ========================================================================= apptainer { autoMounts = true enabled = true } -// ========================================================================= -// CLUSTER PROFILES -// ========================================================================= profiles { calliste { apptainer { - cacheDir = '/scratchCalliste/apptainer/cache' - libraryDir = '/scratchCalliste/apptainer/library' + cacheDir='/scratchCalliste/apptainer/cache' + libraryDir='/scratchCalliste/apptainer/library' autoMounts = true enabled = true runOptions = '-B /mnt/apptainer_tmp:/tmp' @@ -135,9 +30,9 @@ profiles { temp = '/mnt/apptainer_tmp' } process { - withName: "create_mosaic_grid" { + withName: "create_mosaic_grid" { scratch = false } } } -} +} \ No newline at end of file diff --git a/workflows/preproc/preproc_rawtiles.nf b/workflows/preproc/preproc_rawtiles.nf index 67edc110..d80d5719 100644 --- a/workflows/preproc/preproc_rawtiles.nf +++ b/workflows/preproc/preproc_rawtiles.nf @@ -5,12 +5,20 @@ nextflow.enable.dsl = 2 // Convert raw S-OCT tiles into mosaic grids and xy shifts // Input: Directory containing raw data set tiles // Output: Mosaic grids and xy shifts -// -// Parameters are defined in nextflow.config + +// Parameters +params.input = "" +params.output = "output" +params.use_old_folder_structure = false // Use the old folder structure where tiles are not stored in subfolders based on their Z +params.processes = 1 // Maximum number of python processes per nextflow process +params.axial_resolution = 1.5 // Axial resolution of imaging system in microns +params.resolution = -1 // resolution of mosaic grid. Defaults to full resolution. +params.sharding_factor = 4 // There will be N x N chunks per shard +params.fix_galvo_shift = true // should be true for new data, else false +params.fix_camera_shift = false // should be set to false for new data, else true process create_mosaic_grid { - publishDir "$params.output", mode: 'link' // Hard link: no duplication, file stays accessible - + cpus params.processes input: tuple val(slice_id), path(tiles) output: @@ -20,46 +28,14 @@ process create_mosaic_grid { options += params.fix_galvo_shift? "--fix_galvo_shift":"--no-fix_galvo_shift" options += " " options += params.fix_camera_shift? "--fix_camera_shift":"--no-fix_camera_shift" - options += " " - options += params.preprocess? "--preprocess":"--no-preprocess" - // Select GPU or CPU script based on use_gpu parameter - String gpu_opts = params.use_gpu ? "--use_gpu --galvo_threshold ${params.galvo_confidence_threshold}" : "--no-use_gpu" """ - linum_create_mosaic_grid_3d.py mosaic_grid_3d_z${slice_id}.ome.zarr --from_tiles_list $tiles --resolution ${params.resolution} --n_processes ${params.processes} --axial_resolution ${params.axial_resolution} --sharding_factor ${params.sharding_factor} ${options} ${gpu_opts} - """ -} - -process generate_aip { - publishDir "$params.output/aips", mode: 'copy' - - input: - tuple val(slice_id), path(mosaic_grid) - output: - tuple val(slice_id), path("aip_z${slice_id}.png") - script: - String gpu_opts = params.use_gpu ? "--use_gpu" : "--no-use_gpu" - """ - linum_aip_png.py ${mosaic_grid} aip_z${slice_id}.png ${gpu_opts} - """ -} - -process generate_mosaic_preview { - maxForks 1 - publishDir "$params.output/previews", mode: 'copy' - - input: - tuple val(slice_id), path(mosaic_grid) - output: - path("mosaic_grid_z${slice_id}_preview.png") - script: - """ - linum_screenshot_omezarr.py ${mosaic_grid} mosaic_grid_z${slice_id}_preview.png + linum_create_mosaic_grid_3d.py mosaic_grid_3d_z${slice_id}.ome.zarr --from_tiles_list $tiles --resolution ${params.resolution} --n_processes ${params.processes} --axial_resolution ${params.axial_resolution} --n_levels 0 --sharding_factor ${params.sharding_factor} ${options} """ } process estimate_xy_shifts_from_metadata { cpus params.processes - publishDir "$params.output", mode: 'copy' + publishDir "$params.output" input: path(input_dir) output: @@ -70,24 +46,6 @@ process estimate_xy_shifts_from_metadata { """ } -process generate_slice_config { - publishDir "$params.output", mode: 'copy' - - input: - tuple path(shifts_file), path(input_dir) - - output: - path("slice_config.csv") - - script: - String galvo_opts = params.detect_galvo ? "--detect_galvo --tiles_dir ${input_dir} --galvo_threshold ${params.galvo_confidence_threshold}" : "" - String exclude_first_opt = params.exclude_first_slices > 0 ? "--exclude_first ${params.exclude_first_slices}" : "--exclude_first 0" - """ - linum_generate_slice_config.py ${shifts_file} slice_config.csv --from_shifts ${exclude_first_opt} ${galvo_opts} - """ -} - - workflow { if (params.use_old_folder_structure) { @@ -106,27 +64,6 @@ workflow { // Generate a 3D mosaic grid at full resolution create_mosaic_grid(inputSlices) - // [Optional] Generate AIP images from mosaic grids for QC visualization - if (params.generate_aips) { - generate_aip(create_mosaic_grid.out) - } - - // [Optional] Generate orthogonal view previews of mosaic grids. - // maxForks 1 on the process keeps screenshots sequential to avoid spawning - // 52 concurrent I/O-heavy jobs. Each task depends only on its own zarr - // being complete, which Nextflow already guarantees via channel ordering. - if (params.generate_previews) { - generate_mosaic_preview(create_mosaic_grid.out) - } - // Estimate XY shifts from metadata estimate_xy_shifts_from_metadata(input_dir_channel) - - // Generate slice configuration file (for controlling which slices to use in reconstruction) - if (params.generate_slice_config) { - // Combine shifts file with input directory for optional galvo detection - slice_config_input = estimate_xy_shifts_from_metadata.out - .combine(input_dir_channel) - generate_slice_config(slice_config_input) - } } diff --git a/workflows/reconst_3d/diagnostics.nf b/workflows/reconst_3d/diagnostics.nf deleted file mode 100644 index 8390562a..00000000 --- a/workflows/reconst_3d/diagnostics.nf +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env nextflow -nextflow.enable.dsl = 2 - -/* - * Diagnostic processes for the 3D reconstruction pipeline. - * - * These are side-channel artefacts (rotation analyses, motor-only stitches / - * stacks, motor-vs-refined comparisons). They are gated in the main workflow - * by `params.diagnostic_mode` or per-stage flags - * (analyze_rotation_drift, motor_only_stitch, motor_only_stack, - * analyze_acquisition_rotation, compare_stitching). - * - * Sub-workflow conventions: docs/NEXTFLOW_WORKFLOWS.md. - */ - -process analyze_rotation_drift { - publishDir "${params.output}/diagnostics/rotation_analysis", mode: 'copy' - - input: - path("register_pairwise/*") - - output: - path "rotation_analysis/*" - - script: - """ - linum_analyze_registration_transforms.py register_pairwise rotation_analysis \ - --resolution ${params.resolution} \ - --rotation_threshold ${params.diagnostic_rotation_threshold} - """ -} - -process stitch_motor_only { - publishDir "${params.output}/diagnostics/motor_only_stitch", mode: 'copy' - - input: - tuple val(slice_id), path(mosaic_grid) - - output: - path "slice_z${slice_id}_motor_only.ome.zarr" - - script: - def blending = params.motor_only_stitch_blending ?: 'diffusion' - """ - linum_stitch_motor_only.py ${mosaic_grid} "slice_z${slice_id}_motor_only.ome.zarr" \ - --overlap_fraction ${params.motor_only_overlap} \ - --blending_method ${blending} - """ -} - -process stitch_refined { - publishDir "${params.output}/diagnostics/refined_stitch", mode: 'copy' - - input: - tuple val(slice_id), path(mosaic_grid) - - output: - path "slice_z${slice_id}_refined.ome.zarr" - path "slice_z${slice_id}_refinements.json", optional: true - - script: - def refinement_out = params.save_refinement_data ? "--output_refinements slice_z${slice_id}_refinements.json" : "" - """ - linum_stitch_3d_refined.py ${mosaic_grid} "slice_z${slice_id}_refined.ome.zarr" \ - --overlap_fraction ${params.stitch_overlap_fraction} \ - --blending_method diffusion \ - --refinement_mode blend_shift \ - --max_refinement_px ${params.max_blend_refinement_px} \ - ${refinement_out} -f - """ -} - -process compare_stitching { - publishDir "${params.output}/diagnostics/stitch_comparison", mode: 'copy' - - input: - tuple val(slice_id), path(motor_stitch), path(refined_stitch) - - output: - path "slice_z${slice_id}_comparison/*" - - script: - """ - linum_compare_stitching.py ${motor_stitch} ${refined_stitch} \ - "slice_z${slice_id}_comparison" \ - --label1 "Motor-only" --label2 "Refined" \ - --tile_step ${params.comparison_tile_step} - """ -} - -process stack_motor_only { - publishDir "${params.output}/diagnostics/motor_only_stack", mode: 'copy' - - input: - path("slices/*") - path(shifts_file) - - output: - path "motor_only_stack.ome.zarr" - path "motor_only_stack_preview.png", optional: true - - script: - def blending_arg = params.motor_only_stack_blending ?: 'none' - """ - linum_stack_motor_only.py slices ${shifts_file} motor_only_stack.ome.zarr \ - --blending ${blending_arg} \ - --preview motor_only_stack_preview.png - """ -} - -process analyze_acquisition_rotation { - publishDir "${params.output}/diagnostics/acquisition_rotation", mode: 'copy' - - input: - path(shifts_file) - path("register_pairwise/*") - - output: - path "acquisition_rotation_analysis/*" - - script: - """ - linum_analyze_acquisition_rotation.py ${shifts_file} acquisition_rotation_analysis \ - --registration_dir register_pairwise \ - --resolution ${params.resolution} - """ -} diff --git a/workflows/reconst_3d/nextflow.config b/workflows/reconst_3d/nextflow.config index 9ba73750..90ad608d 100644 --- a/workflows/reconst_3d/nextflow.config +++ b/workflows/reconst_3d/nextflow.config @@ -3,514 +3,59 @@ manifest { } params { - // ========================================================================= - // INPUT/OUTPUT - // ========================================================================= - input = "." // Directory containing mosaic_grid*.ome.zarr files - output = "." // Output directory for all pipeline results - shifts_xy = "" // Path to shifts CSV (default: {input}/shifts_xy.csv) - slice_config = "" // Path to slice config CSV (default: {input}/slice_config.csv) - subject_name = "" // Subject identifier (default: auto-extracted from path) + input = "." + shifts_xy = "$params.input/shifts_xy.csv" + output = "." + processes = 1 // Maximum number of python processes per nextflow process - // ========================================================================= - // COMPUTE RESOURCES - // ========================================================================= - use_gpu = true // Enable GPU acceleration (auto-fallback to CPU if unavailable) - processes = 8 // Number of parallel Python processes per Nextflow task + // Resolution of the reconstruction in micron/pixel + resolution = 10 // can be set to -1 to skip - // CPU resource management - enable_cpu_limits = true // Enable CPU limiting - max_cpus = 16 // Maximum CPUs to use (0 = no limit) - reserved_cpus = 4 // CPUs reserved for system overhead + // Clipping of outliers values + clip_percentile_upper = 99.9 - // ========================================================================= - // RESOLUTION & BASIC SETTINGS - // ========================================================================= - resolution = 10 // Target resolution in µm/pixel (set to -1 to skip resampling) - clip_percentile_upper = 99.9 // Upper percentile for intensity clipping (0–100) - // Used in illumination fix, beam profile correction, - // interface crop, and per-slice normalization + // Detect and compensate focal curvature + fix_curvature_enabled = true - // ========================================================================= - // PREPROCESSING - // ========================================================================= - fix_curvature_enabled = false // Detect and compensate focal curvature artifacts - fix_illum_enabled = true // Fix illumination inhomogeneity (BaSiCPy algorithm) - crop_interface_out_depth = 600 // Maximum tissue depth to retain after interface crop (µm) - normalize_min_contrast = 0.1 // Min contrast fraction to prevent over-amplification - // of weak/empty slices (0.0–1.0, 0.1 = 10% of global max) + // Fix illumination inhomogeneities using BaSiC + fix_illum_enabled = true - // ========================================================================= - // TILE STITCHING - // ========================================================================= - // Controls how tiles within each slice are assembled in XY. - use_motor_positions_for_stitching = true // Use motor encoder positions for tile stitching - // (recommended). Only used by diagnostic processes. - stitch_overlap_fraction = 0.2 // Expected tile overlap fraction (0.0–1.0). - // Should match the acquisition overlap setting. - // Also used as motor_only_overlap in diagnostics. - stitch_blending_method = 'diffusion' // Tile blending: 'none', 'average', 'diffusion' - max_blend_refinement_px = 10 // Maximum sub-pixel refinement shift for blending (pixels) + // Maximum depth of the cropped image in microns + crop_interface_out_depth = 600 - // Global tile-placement transform. - // When true, one 2x2 affine is fitted across a pool of mid-brain mosaic grids - // (instrument geometry is slice-invariant) and re-used for every slice. This - // removes the per-slice scale/rotation jitter that the default refined stitcher - // introduces when the LS fit is underdetermined on small or sparse grids. - // The fitted transform is passed to `linum_stitch_3d_refined.py --input_transform`, - // so blend-shift sub-pixel seam refinement still runs per slice. - stitch_global_transform = false // Enable pooled global affine estimation - stitch_global_transform_slices = '' // Optional comma-separated slice IDs to pool - // from (e.g. "10,11,12,...,40"). Empty = - // all slices passing slice_config. - stitch_global_transform_histogram_match = true // Match overlap histograms before phase correlation - stitch_global_transform_max_empty_fraction = 0.9 // Otsu-based empty-overlap filter fraction - // (matches old estimate_mosaic_transform behaviour). - // Set to null to use the simpler mean(>0) < 0.1 check. - stitch_global_transform_n_samples = 2048 // Max pooled pairs for the LS fit (0 = use all). - // Random-sampled for reproducibility when the pool - // exceeds this budget. - stitch_global_transform_seed = 0 // Random seed for pair sub-sampling + // Slices registration parameters + moving_slice_first_index = 4 // Skip this many voxels from the top of the moving 3d mosaic when registering slices + pairwise_transform = 'affine' // One of 'affine', 'euler', 'translation' + pairwise_registration_metric = 'MSE' // One of 'MSE', 'CC', 'AntsCC' or 'MI' - // ========================================================================= - // COMMON SPACE ALIGNMENT - // ========================================================================= - // Aligns each slice into a shared XY canvas using shifts_xy.csv motor positions. - // When detect_rehoming is true, encoder glitch spikes (large step that - // self-cancels with the adjacent step) are zeroed before alignment. - // Genuine re-homing events (large step that stays) are always preserved. - - detect_rehoming = true // Correct encoder glitch spikes before alignment - rehoming_return_fraction = 0.4 // Sensitivity: lower = more conservative (fewer corrections) - rehoming_max_shift_mm = 0.5 // Steps below this magnitude are not checked for spikes. - // Lower to catch smaller self-cancelling glitches. - tile_fov_mm = null // Post-hoc artifact step correction for shifts_xy.csv files - // generated with older versions of linum_estimate_xy_shift_from_metadata.py. - // The updated script now uses both mosaic boundaries to estimate - // shifts, so this correction is not needed for freshly-generated - // shifts files. Set only when re-running from an existing - // shifts_xy.csv that still contains mosaic-expansion artifacts - // (look for repeating near-equal large steps in x_shift_mm). - tile_fov_tolerance = 0.05 // Fractional tolerance for tile-FOV multiple detection. - // 0.05 → 5% margin around each integer multiple. - - common_space_excluded_slice_mode = 'local_median' // Interpolation for excluded slices - common_space_excluded_slice_window = 2 - common_space_refine_unreliable = false // Use image registration to refine shifts flagged as - // unreliable (reliable=0) by linum_estimate_xy_shift_from_metadata.py. - // Requires scikit-image. Set to true when mosaic grid expansions - // are expected (tissue growing significantly between slices). - common_space_refine_max_discrepancy_px = 0 // When common_space_refine_unreliable=true, reject the - // image-based shift estimate if it differs from the motor - // estimate by more than this many pixels (0 = accept all). - // Recommended: 50. Guards against phase-correlation failures - // on large-offset or low-overlap transitions. - common_space_refine_min_correlation = 0.0 // Minimum phase cross-correlation quality (0-1) to accept - // an image-based refinement. 0 = accept all (default). - // Recommended: 0.15-0.3. Rejects refinements where the - // correlation quality is too low. - - // ========================================================================= - // MISSING SLICE INTERPOLATION - // ========================================================================= - interpolate_missing_slices = true // Interpolate single-slice gaps automatically - interpolation_method = 'zmorph' // Method: 'zmorph', 'average', 'weighted' - // zmorph - z-aware morphing; output top matches vol_before, bottom - // matches vol_after, interior morphs smoothly via fractional - // affine transforms. Falls back to 'weighted' when quality - // gates fail. See docs/SLICE_INTERPOLATION_FEATURE.md. - // weighted - z-smoothed linear blend of vol_before and vol_after. - // average - plain 50/50 mean of the two neighbours. - interpolation_blend_method = 'gaussian' // Blending: 'gaussian' (feathered edges), 'linear' - interpolation_registration_metric = 'MSE' // Similarity metric for the boundary-plane registration used by zmorph - interpolation_max_iterations = 1000 // Maximum registration iterations - interpolation_overlap_search_window = 5 // Z-planes to search at each boundary for best overlap pair - interpolation_min_overlap_correlation = 0.3 // Pre-registration NCC threshold on boundary planes. Below this - // the method falls back to a weighted average. - interpolation_reference_slab_size = 3 // Number of planes averaged around the boundary reference plane - // before running the 2D registration. - interpolation_min_foreground_fraction = 0.1 // Minimum foreground fraction for a boundary plane to be considered - interpolation_min_ncc_improvement = 0.05 // Minimum post-reg NCC improvement to accept the transform; - // below this the method falls back to weighted average. - - // ========================================================================= - // AUTOMATIC SLICE QUALITY ASSESSMENT - // Runs linum_assess_slice_quality on normalized slices and writes a - // slice_config.csv that marks degraded slices for exclusion from the - // common-space step. Enabled by setting auto_assess_quality = true. - // ========================================================================= - auto_assess_quality = false // Run quality assessment on normalized slices - auto_assess_min_quality = 0.3 // Exclude slices with quality score below this - auto_assess_exclude_first = 1 // Exclude first N calibration slices automatically - auto_assess_roi_size = 1024 // Center-crop size in XY (pixels) for quality metrics. - // Mosaic grids are single-resolution, so this is the - // primary speed control: 1024×1024 loads ~2 MB per - // plane vs ~5 GB at full res. 0 = full plane. - - // ========================================================================= - // PAIRWISE REGISTRATION - // ========================================================================= - // Computes small corrections (rotation, sub-pixel translation) between consecutive - // slices. The main XY alignment comes from motor positions (shifts_xy.csv); - // these transforms are refinements applied on top. - - registration_transform = 'euler' // 'translation' (XY only) or 'euler' (XY + rotation) - registration_max_translation = 200.0 // Optimizer bound on translation (pixels). - // Keep large so the optimizer is not clamped — actual - // applied translations are controlled by max_rotation_deg - // and apply_rotation_only in stacking. - registration_max_rotation = 5.0 // Optimizer bound on rotation (degrees) - registration_initial_alignment = 'both' // Initial alignment before refinement: 'none', 'com', 'gradient', or 'both' - moving_slice_first_index = 4 // Starting Z-index in the moving volume - registration_slicing_interval_mm = 0.200 // Physical slice thickness (mm) - registration_allowed_drifting_mm = 0.100 // Z-search range (mm) - - // ========================================================================= - // STACKING & OUTPUT - // ========================================================================= - - // --- Common settings --- - stack_blend_enabled = true // Blend overlapping regions between slices - blend_refinement_px = 0 // Z-blend refinement: phase-correlation XY correction in - // the overlap zone before blending (like stitch_3d_with_refinement - // but for slice boundaries). Set to max allowed shift in pixels - // (e.g. 10). 0 = disabled. - stack_blend_z_refine_vox = 5 // Z-blend position refinement: search up to N voxels below the - // expected overlap boundary (use_expected_z_overlap) for the - // best-correlated tissue plane to blend at. Z-spacing stays fixed - // at slicing_interval. 0 = disabled. - - // --- Motor stacking --- - use_expected_z_overlap = true // Use expected Z-overlap instead of correlation. - // Recommended when correlation-based matching is unreliable. - apply_pairwise_transforms = true // Apply pairwise registration transforms during stacking. - // Set to false to stack using only motor positions + expected - // Z-overlap (ignores all registration corrections). - apply_rotation_only = false // Apply only the rotation component from registration, - // not translation — keeps XY from motor positions. - // When accumulate_translations is enabled, translations - // are accumulated as canvas offsets regardless. - max_rotation_deg = 5.0 // Rotation values larger than this are clamped before - // application, preventing registration errors from drifting - - // Per-slice adaptive transform degradation - // Confidence score (0–1) is computed from Z-correlation, translation magnitude and rotation. - // Slices with confidence >= transform_confidence_high: full transform applied (per apply_rotation_only). - // Slices with confidence < transform_confidence_high but >= transform_confidence_low: rotation-only. - // Slices with confidence < transform_confidence_low: transform skipped (identity). - transform_confidence_high = 0.6 // Threshold above which transforms are trusted fully - transform_confidence_low = 0.3 // Threshold below which transforms are skipped entirely - z_overlap_min_corr = 0.5 // Fall back to expected Z-overlap below this NCC score - blend_z_refine_min_confidence = 0.5 // Minimum confidence for blend_z_refine to run. - // Slices below this skip Z-blend position search - // and use expected overlap directly. - - // Auto-exclude extended clusters of consecutive low-quality registrations. - // The auto_exclude_slices process reads pairwise metrics after registration and - // produces a CSV listing slice IDs to force-skip (motor-only) during stacking. - auto_exclude_enabled = true // Enable automatic cluster detection - auto_exclude_consecutive = 3 // Min consecutive low-quality pairs to trigger exclusion - auto_exclude_z_corr = 0.6 // Z-correlation threshold below which a pair is low-quality - - load_transform_min_zcorr = 0.0 // Metric-based transform gating: minimum z_correlation - // to load a transform. When > 0 (with max_rotation), - // replaces status-based gating. 0 = disabled. - load_transform_max_rotation = 0.0 // Maximum rotation (degrees) for metric-based gating. - // Paired with load_transform_min_zcorr. 0 = disabled. - skip_error_transforms = true // Skip transforms flagged as overall_status="error" - // (e.g. registered against interpolated slices produce - // spurious large rotations causing visible jumps) - skip_warning_transforms = true // Also skip transforms with overall_status="warning". - // Warning transforms hit the optimizer boundary; their - // Z-offsets are unreliable and can create Z gaps. - // Recommended: keep true to prevent Z-positioning errors. - stack_accumulate_translations = true // Accumulate pairwise translations as cumulative canvas - // offsets (viewing-plane steering). - stack_confidence_weight_translations = true // Weight each pairwise translation by its confidence - // score before accumulating. Attenuates low-confidence - // translations proportionally. - stack_max_cumulative_drift_px = 50 // Maximum cumulative translation drift from motor - // baseline (pixels). Clamps total drift when exceeded. - // 0 = disabled (unlimited drift). - stack_max_pairwise_translation = 0 // Max pairwise translation (pixels) included in - // accumulation. Values near this limit are assumed to be - // optimizer-boundary hits and are zeroed out. - // 0 = disabled (accumulate all translations). - stack_smooth_window = 5 // Moving-average window (slices) for smoothing per-slice - // rotations. Reduces visible jumps from isolated outliers. - // 0 = disabled. - stack_translation_smooth_sigma = 3.0 // Gaussian sigma (slices) for smoothing accumulated - // translations. Applied BEFORE drift cap to remove - // slice-to-slice jitter while preserving trends. - // 0 = disabled. - stack_translation_min_zcorr = 0.2 // Minimum z_correlation to use a slice's translation - // for accumulation. Lower than load_min_zcorr to recover - // translations from slices with bad rotation but valid - // translation. 0 = use all translations. - - // --- Output pyramid --- - pyramid_resolutions = [10, 25, 50, 100] // Multi-resolution levels (µm); must be >= base resolution - pyramid_n_levels = null // Fixed level count (overrides pyramid_resolutions) - pyramid_make_isotropic = true // Resample to isotropic voxel spacing - - // ========================================================================= - // MANUAL ALIGNMENT - // ========================================================================= - // Export a lightweight data package for interactive manual alignment of - // pairwise slice transforms. When enabled, the pipeline produces a - // directory with AIP images and transforms that can be downloaded and - // opened by the manual alignment tool (tools/manual-align/). - export_manual_align = false // Export manual alignment data after register_pairwise - manual_align_level = 1 // Pyramid level for AIP export (0=full, 1=2x, ...) - manual_transforms_dir = '' // Path to manually corrected transforms directory. - // When set and refine_manual_transforms = false, - // manual transforms override automated ones for - // matching slice IDs during stacking. - refine_manual_transforms = false // Re-run pairwise registration for manually corrected - // pairs, initialised from the manual transform. - // Produces refined transforms that combine the manual - // correction with a tight image-based residual fix. - // Requires manual_transforms_dir to be set. - refine_max_translation_px = 10 // Max residual translation searched during refinement (px) - refine_max_rotation_deg = 2.0 // Max residual rotation searched during refinement (°) - - // ========================================================================= - // Z-INTENSITY NORMALIZATION - // ========================================================================= - // Corrects slow intensity drift across serial sections after stacking. - // Only active when normalize_z_slices = true. - normalize_z_slices = false // Enable post-stacking Z-intensity normalization - znorm_mode = 'histogram' // Normalization mode: - // 'histogram' — per-section histogram matching to a global - // reference; preserves relative contrast (white matter - // stays brighter than grey matter) - // 'percentile' — linear scaling to a smoothed percentile - // curve; may darken intrinsically bright sections - znorm_strength = 0.5 // Correction mixing strength (0.0 = passthrough, 1.0 = full). - // Start at 0.5 to avoid over-correction. - - // Histogram mode settings (znorm_mode = 'histogram') - znorm_tissue_threshold = 0.02 // Minimum intensity to classify as tissue. - // Voxels at or below this value are left unchanged, - // preventing background noise from being mapped toward tissue. - - // Percentile mode settings (znorm_mode = 'percentile') - znorm_smooth_sigma = 10.0 // Gaussian smoothing sigma (serial sections). - // ~3 sections: aggressive (removes large-scale anatomy) - // ~10 sections: corrects ~2 mm drift, preserves anatomy - // ~15 sections: conservative (slow trends only) - znorm_percentile = 80.0 // Percentile of non-zero tissue voxels used as intensity reference - znorm_max_scale = 2.0 // Maximum correction scale factor - znorm_min_scale = 0.5 // Minimum correction scale factor - - // ========================================================================= - // ATLAS REGISTRATION - // ========================================================================= - // Registers the final reconstructed volume to the Allen Mouse Brain Atlas - // (Common Coordinate Framework, RAS orientation). - // The atlas is downloaded automatically at the specified resolution. - align_to_ras_enabled = false // Enable Allen atlas registration - allen_resolution = 25 // Atlas resolution for registration (µm): 10, 25, 50, 100 - allen_metric = 'MI' // Registration metric: 'MI', 'MSE', 'CC', 'AntsCC' - allen_max_iterations = 1000 // Maximum registration iterations - allen_registration_level = 2 // Pyramid level of input zarr to register at - // (0 = full resolution; level 2 ≈ 50 µm → fast). - // Output is always written at all pyramid resolutions. - ras_input_orientation = '' // Orientation of the input volume (3-letter code: R/L, A/P, S/I). - // e.g. 'PIR' for dim0→Posterior, dim1→Inferior, dim2→Right. - // Leave empty if already roughly RAS. - ras_initial_rotation = '' // Initial rotation hint (degrees): "Rx Ry Rz". - // e.g. "0.0 0.0 90.0" for a 90° Z-axis pre-rotation. - // Leave empty for automatic MOMENTS-based initialization. - allen_preview = true // Save a 3×3 comparison preview (input / aligned / atlas template) - ras_orientation_preview = false // Save a 3-panel preview after --input-orientation and - // --initial-rotation are applied (before registration). - // Useful for verifying orientation parameters. - - // ========================================================================= - // PREVIEWS & REPORTS - // ========================================================================= - stitch_preview = true // Generate stitched slice preview images - common_space_preview = false // Generate common space alignment previews - rehoming_diagnostics = false // Save rehoming_report.json + rehoming_plot.png - interpolation_preview = false // Generate interpolated slice previews - generate_report = true // Generate HTML quality report after stacking - report_verbose = false // Include detailed per-slice metrics in report - report_format = 'zip' // Report format: 'html' (no images, lightweight) or 'zip' (HTML + bundled previews) - - // Annotated preview settings - annotated_label_every = 1 // Label every Nth slice (1 = all slices) - annotated_show_lines = false // Draw slice boundary lines on annotated preview - - // ========================================================================= - // DEBUGGING - // ========================================================================= - debug_slices = "" // Comma-separated slice IDs or ranges to process (e.g. "25,26" or "25-29"). - // Leave empty to process all slices. - analyze_shifts = true // Generate a shifts analysis report - outlier_iqr_multiplier = 1.5 // IQR multiplier for outlier detection in shifts analysis - - // ========================================================================= - // DIAGNOSTIC MODE - // ========================================================================= - // Enable for troubleshooting reconstruction artifacts (edge mismatches, - // overhangs, alignment issues) in obliquely-mounted samples. - diagnostic_mode = false // Master switch: enables all diagnostic analyses below - - // Individual diagnostic analyses (active when diagnostic_mode=false and set to true) - analyze_rotation_drift = false // Analyze cumulative rotation between slices - analyze_acquisition_rotation = false // Analyze acquisition-time rotation from shifts + registration - motor_only_stitch = false // Stitch slices using motor positions only (no image reg.) - motor_only_stack = false // Stack slices using motor positions only (no pairwise reg.) - compare_stitching = false // Compare motor-only vs refined stitching side-by-side - - // Diagnostic parameters - motor_only_overlap = 0.2 // Expected tile overlap for motor-only diagnostics (0.0–1.0). - // Should match stitch_overlap_fraction. - motor_only_stitch_blending = 'diffusion' // Blending for motor_only_stitch: 'none', 'average', 'diffusion' - motor_only_stack_blending = 'none' // Blending for motor_only_stack: 'none', 'average', 'max', 'feather' - diagnostic_rotation_threshold = 2.0 // Rotation warning threshold (degrees) - save_refinement_data = false // Save refined stitching transform data as JSON - comparison_tile_step = 60 // Tile step for seam detection in stitching comparison + // stack algorithm parameters + stack_blend_enabled = false + stack_max_overlap = -1 // maximum number of overlapping voxels (-1 to use all overlapping voxels) } -// ========================================================================= -// PROCESS CONFIGURATION -// ========================================================================= process { publishDir = {"$params.output/$slice_id/$task.process"} scratch = true errorStrategy = { task.attempt <= 2 ? 'retry' : 'ignore' } maxRetries = 2 - stageInMode = 'symlink' - stageOutMode = 'rsync' - afterScript = 'sleep 1' - - // Thread limiting for Python scripts. CPU budget logic is inlined here - // (Nextflow v26 strict config DSL no longer allows top-level `def`). - beforeScript = { - if (params.enable_cpu_limits == false) return "" - - int totalCpus = Runtime.runtime.availableProcessors() - int maxCpus = (params.max_cpus != null && params.max_cpus > 0) - ? Math.min(params.max_cpus as int, totalCpus) - : Math.max(1, totalCpus - (params.reserved_cpus ?: 2) as int) - int numProcesses = Math.max(1, (params.processes ?: 1) as int) - int threadsPerProcess = Math.max(1, (maxCpus / numProcesses) as int) - - def envVars = [] - if (params.max_cpus != null && params.max_cpus > 0) { - envVars << "export LINUMPY_MAX_CPUS=${params.max_cpus as int}" - } else { - envVars << "export LINUMPY_RESERVED_CPUS=${(params.reserved_cpus ?: 2) as int}" - } - - // Thread limiting environment variables - envVars << "export OMP_NUM_THREADS=${threadsPerProcess}" - envVars << "export MKL_NUM_THREADS=${threadsPerProcess}" - envVars << "export OPENBLAS_NUM_THREADS=${threadsPerProcess}" - envVars << "export VECLIB_MAXIMUM_THREADS=${threadsPerProcess}" - envVars << "export NUMEXPR_NUM_THREADS=${threadsPerProcess}" - envVars << "export NUMBA_NUM_THREADS=${threadsPerProcess}" - envVars << "export ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS=${threadsPerProcess}" - envVars << "export XLA_FLAGS='--xla_cpu_multi_thread_eigen=false intra_op_parallelism_threads=${threadsPerProcess}'" - - return envVars.join('\n') - } - + stageInMode='symlink' + stageOutMode='rsync' + afterScript='sleep 1' withName: "resample_mosaic_grid" { scratch = false - // Allow parallel mask creation on GPU - maxForks = params.use_gpu ? 4 : null - } - - withName: "fix_illumination" { - // Limit to 1 parallel instance - BaSiCPy/JAX is memory-intensive - maxForks = params.use_gpu ? 1 : null - // Don't set CUDA_VISIBLE_DEVICES - let linumpy.gpu auto-select GPU with most free memory - } - - withName: "normalize" { - // Allow parallel normalization on GPU - maxForks = params.use_gpu ? 4 : null } } -// ========================================================================= -// CONTAINER CONFIGURATION -// ========================================================================= apptainer { autoMounts = true enabled = true } -// ========================================================================= -// CLUSTER PROFILES -// ========================================================================= profiles { - // ----------------------------------------------------------------------- - // RECONSTRUCTION ROBUSTNESS PRESETS - // Use -profile conservative (default behaviour), aggressive, or minimal - // to set groups of related parameters without touching the params block. - // ----------------------------------------------------------------------- - - // conservative: safest defaults — trusts motor positions for XY, applies - // only rotation from registration, skips unreliable transforms, and - // interpolates single-slice gaps. Recommended starting point. - conservative { - params { - apply_rotation_only = true - skip_error_transforms = true - skip_warning_transforms = true - apply_pairwise_transforms = true - interpolate_missing_slices = true - use_expected_z_overlap = true - stack_blend_z_refine_vox = 5 - stack_smooth_window = 5 - stack_accumulate_translations = false - transform_confidence_high = 0.6 - transform_confidence_low = 0.3 - } - } - - // aggressive: uses full pairwise registration transforms including XY - // translations, and accumulates them cumulatively. Can produce better - // alignment when registration is reliable, but fails badly when it is not. - aggressive { - params { - apply_rotation_only = false - skip_error_transforms = false - skip_warning_transforms = false - apply_pairwise_transforms = true - interpolate_missing_slices = true - use_expected_z_overlap = false - stack_accumulate_translations = true - stack_max_pairwise_translation = 50 - stack_smooth_window = 3 - transform_confidence_high = 0.4 - transform_confidence_low = 0.2 - } - } - - // minimal: motor-only stacking — ignores all pairwise registration - // refinements. Most stable, fastest, and requires no image-based - // registration quality. Use when motor positions are reliable and - // registration consistently fails. - minimal { - params { - apply_pairwise_transforms = false - use_expected_z_overlap = true - stack_blend_z_refine_vox = 5 - stack_smooth_window = 0 - interpolate_missing_slices = true - normalize_z_slices = false - } - } - calliste { apptainer { - cacheDir = '/scratchCalliste/apptainer/cache' - libraryDir = '/scratchCalliste/apptainer/library' + cacheDir='/scratchCalliste/apptainer/cache' + libraryDir='/scratchCalliste/apptainer/library' autoMounts = true enabled = true runOptions = '-B /mnt/apptainer_tmp:/tmp' @@ -519,10 +64,10 @@ profiles { temp = '/mnt/apptainer_tmp' } process { - withName: "resample_mosaic_grid" { + withName: "resample_mosaic_grid" { scratch = false maxForks = 4 } } } -} +} \ No newline at end of file diff --git a/workflows/reconst_3d/soct_3d_reconst.nf b/workflows/reconst_3d/soct_3d_reconst.nf index cd3be493..621936ae 100644 --- a/workflows/reconst_3d/soct_3d_reconst.nf +++ b/workflows/reconst_3d/soct_3d_reconst.nf @@ -1,372 +1,47 @@ #!/usr/bin/env nextflow nextflow.enable.dsl = 2 -/* - * 3D RECONSTRUCTION PIPELINE FOR SERIAL OCT DATA - * - * Input: Directory containing mosaic_grid*.ome.zarr files + shifts_xy.csv - * Output: 3D OME-Zarr volume with multi-resolution pyramid - * - * Channel patterns and authoring conventions: docs/NEXTFLOW_WORKFLOWS.md - */ - -// ============================================================================= -// HELPER FUNCTIONS -// ============================================================================= - -// Annotated-screenshot CLI flags shared by `stack` and `normalize_z_intensity`. -def annotatedScreenshotArgs(String sliceIdsStr) { - def show_lines = params.annotated_show_lines ? '--show_lines' : '' - def orient = params.ras_input_orientation?.trim()?.replace("'", '') ?: '' - def orientation = orient ? "--orientation ${orient}" : '' - return "--slice_ids \"${sliceIdsStr}\" --label_every ${params.annotated_label_every} ${show_lines} ${orientation} --crop_to_tissue" -} - -// True when the named per-stage diagnostic flag (or `diagnostic_mode`) is set. -def diagEnabled(String flag) { params.diagnostic_mode || params[flag] } - -// Resolve subject_name from inputDir when not explicitly set: -// 1. `params.subject_name` if provided -// 2. `sub-XX` token anywhere in the path -// 3. parent of common input dirnames (`mosaic-grids`, `mosaics`, ...) -// 4. leaf directory name -def resolveSubjectName(String inputDir) { - if (params.subject_name) return params.subject_name - def subMatch = inputDir.split('/').find { part -> part ==~ /sub-\w+/ } - if (subMatch) return subMatch - def inputFile = file(inputDir) - def dirName = inputFile.getName() - if (dirName in ['mosaic-grids', 'mosaics', 'mosaic_grids', 'input', 'data']) { - return inputFile.getParent()?.getName() ?: dirName - } - return dirName -} - -// --------------------------------------------------------------------------- -// `stack` option builders. Split by concern so each `if` group lives next to -// the related parameters rather than as one 65-line imperative blob. -// --------------------------------------------------------------------------- - -def stackBlendingArgs() { - def opts = "" - if (params.stack_blend_enabled) opts += " --blend" - if (params.blend_refinement_px > 0) opts += " --blend_refinement_px ${params.blend_refinement_px}" - if (params.stack_blend_z_refine_vox > 0) opts += " --blend_z_refine_vox ${params.stack_blend_z_refine_vox}" - if (params.blend_z_refine_min_confidence > 0) opts += " --blend_z_refine_min_confidence ${params.blend_z_refine_min_confidence}" - return opts -} - -def stackZMatchingArgs() { - def opts = "" - opts += " --slicing_interval_mm ${params.registration_slicing_interval_mm}" - opts += " --search_range_mm ${params.registration_allowed_drifting_mm}" - opts += " --moving_z_first_index ${params.moving_slice_first_index}" - if (params.use_expected_z_overlap) opts += " --use_expected_overlap" - if (params.z_overlap_min_corr > 0) opts += " --z_overlap_min_corr ${params.z_overlap_min_corr}" - if (params.analyze_shifts) opts += " --output_z_matches z_matches.csv" - opts += " --output_stacking_decisions stacking_decisions.csv" - return opts -} - -def stackPairwiseTransformArgs() { - if (!params.apply_pairwise_transforms) return "" - def opts = " --transforms_dir transforms" - if (params.apply_rotation_only) opts += " --rotation_only" - opts += " --max_rotation_deg ${params.max_rotation_deg}" - if (params.load_transform_min_zcorr > 0) opts += " --load_min_zcorr ${params.load_transform_min_zcorr}" - if (params.load_transform_max_rotation > 0) opts += " --load_max_rotation ${params.load_transform_max_rotation}" - if (params.skip_error_transforms) opts += " --skip_error_transforms" - if (params.skip_warning_transforms) opts += " --skip_warning_transforms" - opts += " --confidence_high ${params.transform_confidence_high}" - opts += " --confidence_low ${params.transform_confidence_low}" - return opts -} - -// Drives per-slice use/auto_excluded → motor-only fallback in stack. -def stackSliceConfigArg(slice_config) { - return slice_config.name != 'NO_SLICE_CONFIG' ? " --slice_config ${slice_config}" : "" -} - -// Skipped when refine_manual_transforms baked manual corrections into the -// transforms directory; passing them again would double-apply. -def stackManualOverrideArg() { - return (params.manual_transforms_dir && !params.refine_manual_transforms) - ? " --manual_transforms_dir ${params.manual_transforms_dir}" - : "" -} - -def stackCumulativeArgs() { - if (!params.stack_accumulate_translations) return "" - def opts = " --accumulate_translations" - if (params.stack_confidence_weight_translations) opts += " --confidence_weight_translations" - if (params.stack_max_cumulative_drift_px > 0) opts += " --max_cumulative_drift_px ${params.stack_max_cumulative_drift_px}" - // > 0 filters clamped translations; 0 = keep all (preserves re-homing boundary corrections). - if (params.stack_max_pairwise_translation > 0) opts += " --max_pairwise_translation ${params.stack_max_pairwise_translation}" - return opts -} - -def stackSmoothingArgs() { - def opts = "" - if (params.stack_smooth_window > 0) opts += " --smooth_window ${params.stack_smooth_window}" - if (params.stack_translation_smooth_sigma > 0) opts += " --translation_smooth_sigma ${params.stack_translation_smooth_sigma}" - if (params.stack_translation_min_zcorr > 0) opts += " --translation_min_zcorr ${params.stack_translation_min_zcorr}" - return opts -} - -// Build pyramid-related CLI arguments from `params.pyramid_*` settings. -// `nLevelsFlag` names the downstream flag (`--n_levels` for most scripts, -// `--n-levels` for `linum_align_to_ras.py`). -def pyramidArgs(nLevelsFlag = '--n_levels') { - def opts = "" - if (params.pyramid_n_levels != null) { - opts += " ${nLevelsFlag} ${params.pyramid_n_levels}" - } else { - def base_res = params.resolution > 0 ? params.resolution : 10 - def valid = params.pyramid_resolutions.findAll { r -> r >= base_res }.sort() - if (!valid.contains(base_res)) valid = [base_res] + valid - opts += " --pyramid_resolutions " + valid.collect { r -> r.toString() }.join(' ') - opts += params.pyramid_make_isotropic ? " --make_isotropic" : " --no_isotropic" - } - return opts -} - -// Extract z## slice ID string from a filename; returns "unknown" if not found. -def extractSliceId(filename) { - def name = filename instanceof Path ? filename.getName() : filename.toString() - def matcher = name =~ /z(\d+)/ - return matcher ? matcher[0][1] : "unknown" -} - -// Extract slice ID as integer; returns -1 if not found. -def extractSliceIdInt(filename) { - def id = extractSliceId(filename) - return id == "unknown" ? -1 : id.toInteger() -} - -// Return tuple(slice_id, file) for a given file path. -def toSliceTuple(file_path) { - tuple(extractSliceId(file_path), file_path) -} - -// Return sorted, comma-separated slice IDs from a list of files (e.g. "01,02,03,05"). -def extractSliceIdsString(fileList) { - fileList - .collect { f -> extractSliceId(f) } - .findAll { s -> s != "unknown" } - .sort { s -> s.toInteger() } - .join(',') -} - -// Remove duplicate and trailing slashes from a path string. -def normalizePath(path) { - return path.replaceAll('/+', '/').replaceAll('/$', '') -} - -// Join path components safely. -def joinPath(base, filename) { - return "${normalizePath(base)}/${filename}" -} - -// Parse a slice_config.csv and return a map with the sets of slice IDs -// marked for use vs. excluded: `[use: Set, excluded: Set]`. -// Boolean parsing is kept in lockstep with `linumpy.io.slice_config._parse_bool` -// (true / 1 / yes / y / t, case-insensitive). Edit there when the canonical -// schema changes — Nextflow can't depend on Python at workflow-init time. -def parseSliceConfig(configPath) { - def slicesToUse = [] as Set - def slicesExcluded = [] as Set - def file = new File(configPath) - - if (!file.exists()) error("Slice config file not found: ${configPath}") - - def truthy = ['true', '1', 'yes', 'y', 't'] as Set - file.withReader { reader -> - reader.readLine() // Skip header - reader.eachLine { line -> - def parts = line.split(',') - if (parts.size() >= 2) { - def sliceId = parts[0].trim() - def use = parts[1].trim().toLowerCase() - if (truthy.contains(use)) slicesToUse.add(sliceId) - else slicesExcluded.add(sliceId) - } - } - } - - return [use: slicesToUse, excluded: slicesExcluded] -} - -// Detect single-slice gaps in a sorted slice list. -// Returns a list of [missingId, beforeId, afterId] tuples. -def detectSingleGaps(sliceList) { - def gaps = [] - def sliceIds = sliceList - .collect { f -> extractSliceIdInt(f) } - .findAll { n -> n >= 0 } - .sort() - - sliceIds.eachWithIndex { current, i -> - if (i >= sliceIds.size() - 1) { - return - } - def next = sliceIds[i + 1] - def gap = next - current - - if (gap == 2) { - def missingId = String.format("%02d", current + 1) - def beforeId = String.format("%02d", current) - def afterId = String.format("%02d", next) - gaps.add([missingId, beforeId, afterId]) - log.info "Gap detected: slice ${missingId} (between ${beforeId} and ${afterId})" - } else if (gap > 2) { - log.warn "Multiple missing slices between ${current} and ${next} - cannot interpolate" - } - } - return gaps -} - -// Partition a flat list of staged files into (slices, transforms): .ome.zarr -// items go to slices, everything else (excluding *.json metrics) to -// transforms. Used by export_manual_align / refine_manual_transforms inputs. -def partitionSlicesAndTransforms(items) { - def slices = items.findAll { f -> f.getName().endsWith('.ome.zarr') } - def transforms = items.findAll { f -> def n = f.getName(); !n.endsWith('.ome.zarr') && !n.endsWith('.json') } - return tuple(slices, transforms) -} - -// Parse debug_slices parameter; supports "25,26", "25-29", or "25,27-29". -// Returns a set of zero-padded slice IDs, or null if not specified. -def parseDebugSlices(debugSlicesStr) { - if (!debugSlicesStr || debugSlicesStr.trim().isEmpty()) return null - - def sliceIds = [] as Set - debugSlicesStr.split(',').each { part -> - part = part.trim() - if (part.contains('-')) { - def rangeParts = part.split('-') - if (rangeParts.size() == 2) { - def start = rangeParts[0].trim().toInteger() - def end = rangeParts[1].trim().toInteger() - (start..end).each { n -> sliceIds.add(String.format("%02d", n)) } - } - } else { - sliceIds.add(String.format("%02d", part.toInteger())) - } - } - return sliceIds -} - -// ============================================================================= -// SUB-WORKFLOW INCLUDES -// ============================================================================= - -// Diagnostic processes (analyze_rotation_drift, stitch_motor_only, stitch_refined, -// compare_stitching, stack_motor_only, analyze_acquisition_rotation) live in -// ./diagnostics.nf and are gated below by `params.diagnostic_mode` and -// per-stage flags. -include { - analyze_rotation_drift; - stitch_motor_only; - stitch_refined; - compare_stitching; - stack_motor_only; - analyze_acquisition_rotation; -} from './diagnostics.nf' - -// ============================================================================= -// PROCESSES -// ============================================================================= - -// ----------------------------------------------------------------------------- -// Utility Processes -// ----------------------------------------------------------------------------- +// Workflow Description +// Creates a 3D volume from raw S-OCT tiles +// Input: Directory containing input mosaic grids +// Output: 3D reconstruction process README { - publishDir { "${params.output}/${task.process}" }, mode: 'move' - + publishDir "$params.output/$task.process", mode: 'copy' output: - path "readme.txt" - + path "readme.txt" script: """ - echo "3D reconstruction pipeline" >> readme.txt - echo "" >> readme.txt + echo "3D reconstruction pipeline\n" >> readme.txt echo "[Params]" >> readme.txt - for p in ${params}; do echo " \$p" >> readme.txt; done + for p in $params; do + echo " \$p" >> readme.txt + done echo "" >> readme.txt - echo "[Command-line]" >> readme.txt - echo "${workflow.commandLine}" >> readme.txt - echo "" >> readme.txt - echo "[Configuration files]" >> readme.txt - for c in ${workflow.configFiles}; do echo " \$c" >> readme.txt; done - """ -} - -process analyze_shifts { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - - input: - path(shifts_file) - - output: - path "shifts_analysis/*" - - script: - """ - linum_analyze_shifts.py ${shifts_file} shifts_analysis \ - --resolution ${params.resolution} \ - --iqr_multiplier ${params.outlier_iqr_multiplier} + echo "[Command-line]\n $workflow.commandLine\n" >> readme.txt + echo "[Configuration files]">> readme.txt + for c in $workflow.configFiles; do + echo " \$c" >> readme.txt + done """ } -process generate_report { - publishDir "$params.output", mode: 'copy' - - input: - tuple path(zarr), path(zip), path(png), path(annotated_png) - val subject_name - - output: - path "${subject_name}_quality_report.${params.report_format ?: 'html'}" - - script: - def fmt = params.report_format ?: 'html' - def verbose_flag = params.report_verbose ? "--verbose" : "" - def overview_arg = png ? "--overview_png ${png}" : "" - def annotated_arg = annotated_png ? "--annotated_png ${annotated_png}" : "" - """ - linum_generate_pipeline_report.py ${params.output} ${subject_name}_quality_report.${fmt} \ - --title "Quality Report: ${subject_name}" \ - --format ${fmt} ${verbose_flag} ${overview_arg} ${annotated_arg} - """ -} - -// ----------------------------------------------------------------------------- -// Preprocessing Processes -// ----------------------------------------------------------------------------- - process resample_mosaic_grid { input: - tuple val(slice_id), path(mosaic_grid) - + tuple val(slice_id), path(mosaic_grid) output: - tuple val(slice_id), path("mosaic_grid_z${slice_id}_resampled.ome.zarr") - + tuple val(slice_id), path("mosaic_grid_z${slice_id}_resampled.ome.zarr") script: - def gpu_flag = params.use_gpu ? "--use_gpu" : "--no-use_gpu" """ - linum_resample_mosaic_grid.py ${mosaic_grid} "mosaic_grid_z${slice_id}_resampled.ome.zarr" \ - -r ${params.resolution} ${gpu_flag} -v + linum_resample_mosaic_grid.py ${mosaic_grid} "mosaic_grid_z${slice_id}_resampled.ome.zarr" -r ${params.resolution} """ } process fix_focal_curvature { input: - tuple val(slice_id), path(mosaic_grid) - + tuple val(slice_id), path(mosaic_grid) output: - tuple val(slice_id), path("mosaic_grid_z${slice_id}_focal_fix.ome.zarr") - + tuple val(slice_id), path("mosaic_grid_z${slice_id}_focal_fix.ome.zarr") script: """ linum_detect_focal_curvature.py ${mosaic_grid} "mosaic_grid_z${slice_id}_focal_fix.ome.zarr" @@ -375,874 +50,221 @@ process fix_focal_curvature { process fix_illumination { cpus params.processes - input: - tuple val(slice_id), path(mosaic_grid) - + tuple val(slice_id), path(mosaic_grid) output: - tuple val(slice_id), path("mosaic_grid_z${slice_id}_illum_fix.ome.zarr") - + tuple val(slice_id), path("mosaic_grid_z${slice_id}_illum_fix.ome.zarr") script: - def gpu_flag = params.use_gpu ? "--use_gpu" : "--no-use_gpu" """ - linum_fix_illumination_3d.py ${mosaic_grid} "mosaic_grid_z${slice_id}_illum_fix.ome.zarr" \ - --n_processes ${params.processes} \ - --percentile_max ${params.clip_percentile_upper} ${gpu_flag} + linum_fix_illumination_3d.py ${mosaic_grid} "mosaic_grid_z${slice_id}_illum_fix.ome.zarr" --n_processes ${params.processes} --percentile_max ${params.clip_percentile_upper} """ } -// ----------------------------------------------------------------------------- -// Stitching Processes -// ----------------------------------------------------------------------------- - -process estimate_global_transform { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - +process generate_aip { input: - path("pool_input/*") - path(slice_config) - + tuple val(slice_id), path(mosaic_grid) output: - path("global_affine.npy"), emit: transform - path("global_affine.json"), optional: true, emit: diagnostics - + tuple val(slice_id), path("mosaic_grid_z${slice_id}_aip.ome.zarr") script: - def slice_config_arg = slice_config.name != 'NO_SLICE_CONFIG' ? "--slice_config ${slice_config}" : "" - def histogram_arg = params.stitch_global_transform_histogram_match ? "--histogram_match" : "" - def empty_arg = params.stitch_global_transform_max_empty_fraction != null - ? "--max_empty_fraction ${params.stitch_global_transform_max_empty_fraction}" - : "" - def n_samples_arg = (params.stitch_global_transform_n_samples as int) > 0 - ? "--n_samples ${params.stitch_global_transform_n_samples as int}" - : "" - def include_arg = params.stitch_global_transform_slices?.trim() - ? "--include_slice " + params.stitch_global_transform_slices.toString().split('[,\\s]+').join(' ') - : "" - def gpu_flag = params.use_gpu ? "--use_gpu" : "--no-use_gpu" """ - linum_estimate_global_transform.py pool_input global_affine.npy \ - --overlap_fraction ${params.stitch_overlap_fraction} \ - ${slice_config_arg} \ - ${include_arg} \ - ${histogram_arg} \ - ${empty_arg} \ - ${n_samples_arg} \ - --seed ${params.stitch_global_transform_seed} \ - --diagnostics_json global_affine.json \ - -f ${gpu_flag} + linum_aip.py ${mosaic_grid} "mosaic_grid_z${slice_id}_aip.ome.zarr" """ } -process stitch_3d_with_refinement { - publishDir { "${params.output}/${task.process}" }, mode: 'copy', pattern: "*_metrics.json" - +process estimate_xy_transformation { input: - tuple val(slice_id), path(mosaic_grid), path(input_transform) - + tuple val(slice_id), path(aip) output: - tuple val(slice_id), path("slice_z${slice_id}_stitch_3d.ome.zarr"), emit: stitched - path("*_metrics.json"), optional: true, emit: metrics - + tuple val(slice_id), path("z${slice_id}_transform_xy.npy") script: - def transform_arg = input_transform.name != 'NO_TRANSFORM' ? "--input_transform ${input_transform}" : "" """ - linum_stitch_3d_refined.py ${mosaic_grid} "slice_z${slice_id}_stitch_3d.ome.zarr" \ - --overlap_fraction ${params.stitch_overlap_fraction} \ - --blending_method ${params.stitch_blending_method} \ - --refinement_mode blend_shift \ - --max_refinement_px ${params.max_blend_refinement_px} \ - ${transform_arg} \ - -f + linum_estimate_transform.py ${aip} "z${slice_id}_transform_xy.npy" """ } -process generate_stitch_preview { - publishDir "${params.output}/previews/stitched_slices", mode: 'copy' - +process stitch_3d { input: - tuple val(slice_id), path(stitched_slice) - + tuple val(slice_id), path(mosaic_grid), path(transform_xy) output: - path "slice_z${slice_id}_stitched.png" - + tuple val(slice_id), path("slice_z${slice_id}_stitch_3d.ome.zarr") script: """ - linum_screenshot_omezarr.py ${stitched_slice} "slice_z${slice_id}_stitched.png" \ - --z_slice 0 + linum_stitch_3d.py ${mosaic_grid} ${transform_xy} "slice_z${slice_id}_stitch_3d.ome.zarr" """ } -// ----------------------------------------------------------------------------- -// Correction Processes -// ----------------------------------------------------------------------------- - process beam_profile_correction { - publishDir { "${params.output}/${task.process}" }, mode: 'copy', pattern: "*_metrics.json" - input: - tuple val(slice_id), path(slice_3d) - + tuple val(slice_id), path(slice_3d) output: - tuple val(slice_id), path("slice_z${slice_id}_axial_corr.ome.zarr"), emit: corrected - path("*_metrics.json"), optional: true, emit: metrics - + tuple val(slice_id), path("slice_z${slice_id}_axial_corr.ome.zarr") script: """ - linum_compensate_psf_model_free.py ${slice_3d} "slice_z${slice_id}_axial_corr.ome.zarr" \ - --percentile_max ${params.clip_percentile_upper} + linum_compensate_psf_model_free.py ${slice_3d} "slice_z${slice_id}_axial_corr.ome.zarr" --percentile_max $params.clip_percentile_upper """ } process crop_interface { - publishDir { "${params.output}/${task.process}" }, mode: 'copy', pattern: "*_metrics.json" - input: - tuple val(slice_id), path(image) - + tuple val(slice_id), path(image) output: - tuple val(slice_id), path("slice_z${slice_id}_crop_interface.ome.zarr"), emit: cropped - path("*_metrics.json"), optional: true, emit: metrics - + tuple val(slice_id), path("slice_z${slice_id}_crop_interface.ome.zarr") script: """ - linum_crop_3d_mosaic_below_interface.py ${image} "slice_z${slice_id}_crop_interface.ome.zarr" \ - --depth ${params.crop_interface_out_depth} \ - --crop_before_interface \ - --percentile_max ${params.clip_percentile_upper} + linum_crop_3d_mosaic_below_interface.py $image "slice_z${slice_id}_crop_interface.ome.zarr" --depth $params.crop_interface_out_depth --crop_before_interface --percentile_max $params.clip_percentile_upper """ } process normalize { - publishDir { "${params.output}/${task.process}" }, mode: 'copy', pattern: "*_metrics.json" - input: - tuple val(slice_id), path(image) - + tuple val(slice_id), path(image) output: - tuple val(slice_id), path("slice_z${slice_id}_normalize.ome.zarr"), emit: normalized - path("*_metrics.json"), optional: true, emit: metrics - + tuple val(slice_id), path("slice_z${slice_id}_normalize.ome.zarr") script: - def gpu_flag = params.use_gpu ? "--use_gpu" : "--no-use_gpu" """ - linum_normalize_intensities_per_slice.py ${image} "slice_z${slice_id}_normalize.ome.zarr" \ - --percentile_max ${params.clip_percentile_upper} \ - --min_contrast_fraction ${params.normalize_min_contrast} ${gpu_flag} - """ -} - -// ----------------------------------------------------------------------------- -// Alignment Processes -// ----------------------------------------------------------------------------- - -process detect_rehoming_events { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - - input: - tuple path(shifts_csv), path(slice_config_in) - - output: - path "shifts_xy_clean.csv", emit: corrected_shifts - path "slice_config.csv", optional: true, emit: slice_config - path "diagnostics/*", optional: true, emit: diagnostics - - script: - def diag_arg = params.rehoming_diagnostics ? "--diagnostics diagnostics" : "" - def frac_arg = params.rehoming_return_fraction ? "--return_fraction ${params.rehoming_return_fraction}" : "" - def tile_fov_arg = params.tile_fov_mm ? "--tile_fov_mm ${params.tile_fov_mm}" : "" - def tile_tol_arg = (params.tile_fov_mm && params.tile_fov_tolerance != null) ? "--tile_fov_tolerance ${params.tile_fov_tolerance}" : "" - def max_shift_arg = params.rehoming_max_shift_mm ? "--max_shift_mm ${params.rehoming_max_shift_mm}" : "" - def sc_args = slice_config_in.name != 'NO_SLICE_CONFIG' - ? "--slice_config_in ${slice_config_in} --slice_config_out slice_config.csv" - : "" - """ - linum_detect_rehoming.py ${shifts_csv} shifts_xy_clean.csv \ - ${frac_arg} ${max_shift_arg} ${tile_fov_arg} ${tile_tol_arg} ${diag_arg} \ - ${sc_args} - """ -} - -// Auto-assess slice quality after normalization. An existing slice_config.csv -// (when supplied) is merged so manually-excluded slices stay excluded. -// See docs/NEXTFLOW_WORKFLOWS.md "Authoring Notes" for the two-input pattern. -process auto_assess_quality { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - - input: - path "inputs/*" - path existing_slice_config - - output: - path "slice_config.csv", emit: slice_config - - script: - def update_args = existing_slice_config.name != 'NO_SLICE_CONFIG' - ? "--update_existing --existing_config ${existing_slice_config}" - : "" - """ - linum_assess_slice_quality.py inputs slice_config.csv \\ - --min_quality ${params.auto_assess_min_quality} \\ - --exclude_first ${params.auto_assess_exclude_first} \\ - --roi_size ${params.auto_assess_roi_size} \\ - --processes ${params.processes} \\ - ${update_args} \\ - -f + linum_normalize_intensities_per_slice.py ${image} "slice_z${slice_id}_normalize.ome.zarr" --percentile_max ${params.clip_percentile_upper} """ } process bring_to_common_space { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - + publishDir "$params.output/$task.process", mode: 'copy' input: - tuple path("inputs/*"), path("shifts_xy.csv"), path(slice_config) - + tuple path("inputs/*"), path("shifts_xy.csv") output: - path "*.ome.zarr" - + path("*.ome.zarr") script: - def slice_config_arg = slice_config.name != 'NO_SLICE_CONFIG' ? "--slice_config ${slice_config}" : "" - - def excluded_args = params.common_space_excluded_slice_mode ? - "--excluded_slice_mode ${params.common_space_excluded_slice_mode} --excluded_slice_window ${params.common_space_excluded_slice_window}" : "" - - def refine_arg = params.common_space_refine_unreliable ? "--refine_unreliable" : "" - def discrepancy_arg = (params.common_space_refine_unreliable && params.common_space_refine_max_discrepancy_px > 0) ? - "--refine_max_discrepancy_px ${params.common_space_refine_max_discrepancy_px}" : "" - def min_corr_arg = (params.common_space_refine_unreliable && params.common_space_refine_min_correlation > 0) ? - "--refine_min_correlation ${params.common_space_refine_min_correlation}" : "" - """ - linum_align_mosaics_3d_from_shifts.py inputs shifts_xy.csv common_space \ - ${slice_config_arg} ${excluded_args} ${refine_arg} ${discrepancy_arg} ${min_corr_arg} + linum_align_mosaics_3d_from_shifts.py inputs shifts_xy.csv common_space mv common_space/* . """ } -process generate_common_space_preview { - publishDir "${params.output}/common_space_previews", mode: 'copy' - - input: - tuple val(slice_id), path(slice_zarr) - - output: - path "slice_z${slice_id}_preview.png" - - script: - """ - linum_screenshot_omezarr.py ${slice_zarr} "slice_z${slice_id}_preview.png" - """ -} - -// Interpolate a single missing slice via z-aware morphing (zmorph). -// On gate failure the zarr is omitted (hard skip); see -// docs/SLICE_INTERPOLATION_FEATURE.md for the full failure policy. -process interpolate_missing_slice { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - - input: - tuple val(missing_slice_id), path(slice_before), path(slice_after) - - output: - path "slice_z${missing_slice_id}_interpolated.ome.zarr", optional: true, emit: zarr - path "slice_z${missing_slice_id}_interpolated_preview.png", optional: true, emit: preview - path "slice_z${missing_slice_id}_interpolated_diagnostics.json", emit: diagnostics - path "slice_z${missing_slice_id}_manifest.csv", emit: manifest - - script: - def preview_opt = params.interpolation_preview ? "--preview slice_z${missing_slice_id}_interpolated_preview.png" : "" - def slab_opt = params.interpolation_reference_slab_size ? "--reference_slab_size ${params.interpolation_reference_slab_size}" : "" - def fg_opt = params.interpolation_min_foreground_fraction != null ? "--min_foreground_fraction ${params.interpolation_min_foreground_fraction}" : "" - def ncc_opt = params.interpolation_min_ncc_improvement != null ? "--min_ncc_improvement ${params.interpolation_min_ncc_improvement}" : "" - """ - linum_interpolate_missing_slice.py ${slice_before} ${slice_after} \ - "slice_z${missing_slice_id}_interpolated.ome.zarr" \ - --method ${params.interpolation_method} \ - --blend_method ${params.interpolation_blend_method} \ - --registration_metric ${params.interpolation_registration_metric} \ - --max_iterations ${params.interpolation_max_iterations} \ - --overlap_search_window ${params.interpolation_overlap_search_window} \ - --min_overlap_correlation ${params.interpolation_min_overlap_correlation} \ - ${slab_opt} \ - ${fg_opt} \ - ${ncc_opt} \ - --slice_id ${missing_slice_id} \ - --diagnostics slice_z${missing_slice_id}_interpolated_diagnostics.json \ - --manifest_entry slice_z${missing_slice_id}_manifest.csv \ - ${preview_opt} - """ -} - -// Merge per-slice interpolation manifest fragments into slice_config.csv. -// See docs/NEXTFLOW_WORKFLOWS.md "Authoring Notes" for the two-input pattern. -process finalise_interpolation { - publishDir "${params.output}", mode: 'copy' - - input: - path slice_config - path "fragments/*" - - output: - path "slice_config_final.csv" - - script: - """ - linum_interpolate_missing_slice.py --finalise \\ - --slice_config_in ${slice_config} \\ - --slice_config_out slice_config_final.csv \\ - --fragments fragments - """ -} - -// ----------------------------------------------------------------------------- -// Registration Processes -// ----------------------------------------------------------------------------- - process register_pairwise { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - + publishDir "$params.output/$task.process", mode: 'copy' input: - tuple path(fixed_vol), path(moving_vol) - + tuple path(fixed_vol), path(moving_vol) output: - path "*" - + path("*") script: - def rotation_flag = params.registration_transform == 'translation' ? "--no_rotation" : "--enable_rotation" """ - dirname=\$(basename ${moving_vol} .ome.zarr) - linum_register_pairwise.py ${fixed_vol} ${moving_vol} \$dirname \ - --slicing_interval_mm ${params.registration_slicing_interval_mm} \ - --search_range_mm ${params.registration_allowed_drifting_mm} \ - --moving_z_index ${params.moving_slice_first_index} \ - --max_rotation_deg ${params.registration_max_rotation} \ - --max_translation_px ${params.registration_max_translation} \ - --initial_alignment ${params.registration_initial_alignment} \ - ${rotation_flag} + dirname=`basename $moving_vol .ome.zarr` + linum_estimate_transform_pairwise.py ${fixed_vol} ${moving_vol} \$dirname --moving_slice_index $params.moving_slice_first_index --transform $params.pairwise_transform --metric $params.pairwise_registration_metric """ } -// Optional: re-register slice pairs that have a manual transform, using the -// manual alignment as initialisation. Produces a refined transform that -// combines the manual correction with a tight image-based residual correction. -// Only runs when params.refine_manual_transforms = true. -process refine_manual_transforms { - publishDir { "${params.output}/${task.process}" }, mode: 'copy' - - input: - tuple path("slices/*"), path("transforms/*") - - output: - path "*" - - script: - """ - linum_refine_manual_transforms.py slices transforms . \ - --manual_transforms_dir ${params.manual_transforms_dir} \ - --max_translation_px ${params.refine_max_translation_px} \ - --max_rotation_deg ${params.refine_max_rotation_deg} -f - """ -} - -// Auto-exclude clusters of consecutive low-quality registrations by stamping -// auto_excluded/auto_exclude_reason into slice_config.csv; stack reads them -// via --slice_config and treats those slices as motor-only. -// See docs/NEXTFLOW_WORKFLOWS.md "Authoring Notes" for the two-input pattern. -process auto_exclude_slices { - publishDir { "$params.output/$task.process" }, mode: 'copy' - - input: - path "transforms/*" - path slice_config_in - - output: - path "slice_config.csv", emit: slice_config - - script: - """ - linum_auto_exclude_slices.py transforms ${slice_config_in} slice_config.csv \ - --consecutive_threshold ${params.auto_exclude_consecutive} \ - --z_corr_threshold ${params.auto_exclude_z_corr} - """ -} - -// ----------------------------------------------------------------------------- -// Stacking Processes -// ----------------------------------------------------------------------------- - -// Export lightweight data package for the manual alignment tool. -// Produces AIP images and copies pairwise transforms into a self-contained -// directory that can be downloaded and opened by the manual alignment widget. -process make_manual_align_package { - publishDir { "$params.output/$task.process" }, mode: 'copy' - - input: - tuple path("slices/*"), path("transforms/*") - - output: - path("manual_align_package"), emit: pkg - - script: - """ - linum_export_manual_align.py slices transforms manual_align_package \ - --level ${params.manual_align_level} \ - --slices_remote_dir ${params.output}/bring_to_common_space - """ -} - -// Stacking: assembles common-space slices into a 3D volume using motor positions -// for XY placement, pairwise registration for rotation/translation refinement, -// and correlation or physics-based Z-matching. -// publishDir mode is conditional: 'symlink' when a downstream step will produce -// the final output (preserves work-dir files for -resume); 'move' when this is last. process stack { - publishDir { "$params.output/$task.process" }, - mode: (params.normalize_z_slices || params.align_to_ras_enabled) ? 'symlink' : 'move', - saveAs: { fn -> fn.endsWith('.ome.zarr') ? null : fn } - - input: - tuple path("slices/*"), path(shifts_file), path("transforms/*"), path(slice_config), val(subject_name), val(slice_ids_str) - - output: - tuple path("${subject_name}.ome.zarr"), path("${subject_name}.ome.zarr.zip"), path("${subject_name}.png"), path("${subject_name}_annotated.png"), emit: volume - path("*_metrics.json"), optional: true, emit: metrics - path("z_matches.csv"), optional: true, emit: z_matches - path("stacking_decisions.csv"), optional: true, emit: stacking_decisions - - script: - def options = stackBlendingArgs() + - stackZMatchingArgs() + - stackPairwiseTransformArgs() + - stackSliceConfigArg(slice_config) + - stackManualOverrideArg() + - stackCumulativeArgs() + - stackSmoothingArgs() + - " --no_xy_shift" + // slices are already in common space - pyramidArgs() - - def annotated_args = annotatedScreenshotArgs(slice_ids_str) - """ - linum_stack_slices_motor.py slices ${shifts_file} ${subject_name}.ome.zarr ${options} - zip -r ${subject_name}.ome.zarr.zip ${subject_name}.ome.zarr - linum_screenshot_omezarr.py ${subject_name}.ome.zarr ${subject_name}.png - linum_screenshot_omezarr_annotated.py ${subject_name}.ome.zarr ${subject_name}_annotated.png ${annotated_args} - """ -} - -// Post-stacking Z-direction intensity normalization. -// 'symlink' when align_to_ras follows; 'move' when this is the final output step. -process normalize_z_intensity { - publishDir { "$params.output/$task.process" }, - mode: params.align_to_ras_enabled ? 'symlink' : 'move', - saveAs: { fn -> fn.endsWith('.ome.zarr') ? null : fn } - + publishDir "$params.output/$task.process", mode: 'copy' input: - tuple path(stacked_zarr), val(subject_name), val(n_slices), val(slice_ids_str) - + tuple path("mosaics/*"), path("transforms/*") output: - tuple path("${subject_name}.ome.zarr"), path("${subject_name}.ome.zarr.zip"), path("${subject_name}.png"), path("${subject_name}_annotated.png") - + tuple path("3d_volume.ome.zarr"), path("3d_volume.ome.zarr.zip"), path("3d_volume.png") script: - def n_slices_opt = n_slices > 0 ? "--n_serial_slices ${n_slices}" : "" - def znorm_opts = (params.znorm_mode == 'histogram') - ? "--mode histogram --strength ${params.znorm_strength} --tissue_threshold ${params.znorm_tissue_threshold}" - : "--mode percentile --smooth_sigma ${params.znorm_smooth_sigma} --percentile ${params.znorm_percentile} --max_scale ${params.znorm_max_scale} --min_scale ${params.znorm_min_scale} --strength ${params.znorm_strength}" - def annotated_args = annotatedScreenshotArgs(slice_ids_str) - def gpu_flag = params.use_gpu ? "--use_gpu" : "--no-use_gpu" - """ - linum_normalize_z_intensity.py ${stacked_zarr} ${subject_name}.ome.zarr \ - ${n_slices_opt} \ - ${znorm_opts} \ - ${pyramidArgs()} ${gpu_flag} - - zip -r ${subject_name}.ome.zarr.zip ${subject_name}.ome.zarr - - linum_screenshot_omezarr.py ${subject_name}.ome.zarr ${subject_name}.png - - linum_screenshot_omezarr_annotated.py ${subject_name}.ome.zarr ${subject_name}_annotated.png ${annotated_args} - """ -} - -// Atlas registration to Allen Mouse Brain Atlas. Always the final step when enabled. -process align_to_ras { - publishDir { "$params.output/$task.process" }, mode: 'move', saveAs: { fn -> - fn.endsWith('.ome.zarr') ? null : fn + String options = "" + if(params.stack_blend_enabled) + { + options += "--blend" + if(params.stack_max_overlap > 0) + { + options += " --overlap ${params.stack_max_overlap}" + } } - - input: - tuple path(stacked_zarr), path(zarr_zip), path(png), path(annotated_png) - val subject_name - - output: - path "${subject_name}_ras.ome.zarr" - path "${subject_name}_ras.ome.zarr.zip" - path "${subject_name}_ras_transform.tfm", optional: true - path "${subject_name}_ras_preview.png", optional: true - path "${subject_name}_ras_orientation_preview.png", optional: true - - script: - def orientation_arg = params.ras_input_orientation ? "--input-orientation ${params.ras_input_orientation}" : "" - def rotation_arg = params.ras_initial_rotation ? "--initial-rotation ${params.ras_initial_rotation}" : "" - def preview_arg = params.allen_preview ? "--preview ${subject_name}_ras_preview.png" : "" - def orientation_preview_arg = params.ras_orientation_preview ? "--orientation-preview ${subject_name}_ras_orientation_preview.png" : "" - def ras_pyramid_opts = pyramidArgs('--n-levels') """ - linum_align_to_ras.py ${stacked_zarr} ${subject_name}_ras.ome.zarr \ - --allen-resolution ${params.allen_resolution} \ - --metric ${params.allen_metric} \ - --max-iterations ${params.allen_max_iterations} \ - --level ${params.allen_registration_level} \ - ${orientation_arg} ${rotation_arg} ${preview_arg} ${orientation_preview_arg} \ - ${ras_pyramid_opts} - zip -r ${subject_name}_ras.ome.zarr.zip ${subject_name}_ras.ome.zarr + linum_stack_slices_3d.py mosaics transforms 3d_volume.ome.zarr ${options} + zip -r 3d_volume.ome.zarr.zip 3d_volume.ome.zarr + linum_screenshot_omezarr.py 3d_volume.ome.zarr 3d_volume.png """ } -// ============================================================================= -// MAIN WORKFLOW -// ============================================================================= - workflow { + // Write readme containing the parameters for the current execution README() - def inputDir = normalizePath(params.input) - def subject_name = resolveSubjectName(inputDir) - log.info "Subject: ${subject_name}" - log.info "GPU: ${params.use_gpu ? 'ENABLED' : 'DISABLED'}" - - def debugSlices = parseDebugSlices(params.debug_slices) - if (debugSlices) { - log.info "DEBUG MODE: Processing only slices ${debugSlices.sort().join(', ')}" - } - - // Shifts file - def shifts_xy_path = params.shifts_xy ?: "${inputDir}/shifts_xy.csv" - log.info "Shifts file: ${shifts_xy_path}" - - if (!file(shifts_xy_path).exists()) { - error """ - Shifts file not found: ${shifts_xy_path} - - Please ensure shifts_xy.csv exists in your input directory, - or specify the path with --shifts_xy /path/to/shifts_xy.csv - """ - } - // Value channel — fans out to many consumers; see "Authoring Notes" in - // docs/NEXTFLOW_WORKFLOWS.md. - shifts_xy = channel.value(file(shifts_xy_path)) - - // Slice config (optional) - def slice_config_path = params.slice_config ?: joinPath(inputDir, "slice_config.csv") - def slicesToUse = null - if (file(slice_config_path).exists()) { - log.info "Slice config: ${slice_config_path}" - def parsed = parseSliceConfig(slice_config_path) - slicesToUse = parsed.use - def total = slicesToUse.size() + parsed.excluded.size() - log.info "Slice config: ${total} entries (${slicesToUse.size()} included, ${parsed.excluded.size()} excluded)" - } else if (params.slice_config) { - error("Slice config file not found: ${slice_config_path}") - } - - // Discover input mosaic grids - log.info "Looking for mosaic grids in: ${inputDir}" - - def inputDirFile = file(inputDir) - def mosaicFiles = inputDirFile.listFiles() - .findAll { f -> f.isDirectory() && f.name.startsWith('mosaic_grid') && f.name.endsWith('.ome.zarr') && f.name =~ /z\d+/ } - .sort { f -> f.name } - - if (mosaicFiles.isEmpty()) { - error("No mosaic grids found in ${inputDir}. Expected: mosaic_grid*_z00.ome.zarr") - } - - def selectedIds = mosaicFiles.collect { f -> extractSliceId(f) }.findAll { sid -> - if (debugSlices != null) return debugSlices.contains(sid) - if (slicesToUse != null) return slicesToUse.contains(sid) - return true - } - def skippedCount = mosaicFiles.size() - selectedIds.size() - if (skippedCount > 0) { - def reason = debugSlices != null ? "debug_slices filter" : "slice_config" - log.info "Found ${mosaicFiles.size()} mosaic grids; ${selectedIds.size()} selected, ${skippedCount} skipped (${reason})" - } else { - log.info "Found ${mosaicFiles.size()} mosaic grids; all selected" - } - - inputSlices = channel - .fromList(mosaicFiles) - .map { f -> toSliceTuple(f) } - .filter { slice_id, _files -> - if (debugSlices != null) { - def included = debugSlices.contains(slice_id) - if (!included) log.debug "Skipping slice ${slice_id} (not in debug_slices)" - return included - } - if (slicesToUse != null) return slicesToUse.contains(slice_id) - return true + // Parse inputs + inputSlices = channel.fromFilePairs("$params.input/mosaic_grid*_z*.ome.zarr", size: -1, type:'dir') + .ifEmpty { + error("No valid files found under '${params.input}'. Please supply a valid input directory.") + } + .map { id, files -> + // Extract the two digits after 'z' using regex + def matcher = id =~ /z(\d{2})/ + def key = matcher ? matcher[0][1] : "unknown" + [key, files] + } + shifts_xy = channel.fromPath("$params.shifts_xy", checkIfExists: true) + .ifEmpty { + error("XY shifts file not found at path '$params.shifts_xy'.") } - def has_slice_config = file(slice_config_path).exists() || params.auto_assess_quality - // Value channel — consumed by auto_assess, common_space, finalise, stack. - slice_config_channel = channel.value( - file(slice_config_path).exists() ? file(slice_config_path) : file('NO_SLICE_CONFIG') - ) + // [Optional] Resample the input mosaic grid + resampled_channel = params.resolution > 0 ? resample_mosaic_grid(inputSlices) : inputSlices - if (params.analyze_shifts) { - analyze_shifts(shifts_xy) - } + // [Optional] Focal plane curvature correction + fixed_focal_channel = params.fix_curvature_enabled ? fix_focal_curvature(resampled_channel) : resampled_channel - // Stage 1: Preprocessing - resampled = params.resolution > 0 ? resample_mosaic_grid(inputSlices) : inputSlices - focal_fixed = params.fix_curvature_enabled ? fix_focal_curvature(resampled) : resampled - illum_fixed = params.fix_illum_enabled ? fix_illumination(focal_fixed) : focal_fixed + // [Optional] Compensate for XY illumination inhomogeneity + fixed_illum_channel = params.fix_illum_enabled ? fix_illumination(fixed_focal_channel) : fixed_focal_channel - // Stage 2: XY Stitching (image-registration-based blend refinement) - if (params.stitch_global_transform) { - pooled_mosaics = illum_fixed.map { _id, p -> p }.collect() - estimate_global_transform(pooled_mosaics, slice_config_channel) - stitch_inputs = illum_fixed.combine(estimate_global_transform.out.transform) - } else { - // Value channel so the placeholder can fan out to every per-slice tuple. - no_transform = channel.value(file('NO_TRANSFORM')) - stitch_inputs = illum_fixed.combine(no_transform) - } - stitch_3d_with_refinement(stitch_inputs) - stitched_slices = stitch_3d_with_refinement.out.stitched + // Generate AIP mosaic grid + generate_aip(fixed_illum_channel) - if (params.stitch_preview) { - generate_stitch_preview(stitched_slices) - } + // Extract tile position (XY) from AIP mosaic grid + estimate_xy_transformation(generate_aip.out) - // Stage 3: Corrections - beam_profile_correction(stitched_slices) - crop_interface(beam_profile_correction.out.corrected) - normalize(crop_interface.out.cropped) + // Stitch the tiles in 3D mosaics + stitch_3d(fixed_illum_channel.combine(estimate_xy_transformation.out, by:0)) - // Stage 3.5: Auto slice quality assessment (optional). Generates a - // slice_config.csv that marks degraded slices; an existing static - // slice_config.csv is merged so manually-excluded slices stay excluded. - // current_slice_config = the latest slice_config as it flows through the - // pipeline; rebound by auto_assess / detect_rehoming when each runs. - current_slice_config = slice_config_channel - if (params.auto_assess_quality) { - auto_assess_inputs = normalize.out.normalized - .map { _id, norm_path -> norm_path } - .collect() - auto_assess_quality(auto_assess_inputs, slice_config_channel) - current_slice_config = auto_assess_quality.out.slice_config - } + // "PSF" correction + beam_profile_correction(stitch_3d.out) - // Stage 4: Common Space Alignment. - // detect_rehoming optionally corrects encoder-glitch spikes in the - // shifts file and (when a real slice_config exists) stamps - // rehomed/rehoming_reliable flags back into it. - if (params.detect_rehoming) { - detect_rehoming_input = shifts_xy.combine(current_slice_config) - detect_rehoming_events(detect_rehoming_input) - aligned_shifts = detect_rehoming_events.out.corrected_shifts - if (has_slice_config) { - current_slice_config = detect_rehoming_events.out.slice_config - } - } else { - aligned_shifts = shifts_xy - } + // Crop at interface + crop_interface(beam_profile_correction.out) - common_space_input = normalize.out.normalized - .toSortedList { a, b -> a[0] <=> b[0] } + // Normalize slice (compensate signal attenuation with depth) + normalize(crop_interface.out) + + // Slices stitching + common_space_channel = normalize.out + .toSortedList{a, b -> a[0] <=> b[0]} .flatten() .collate(2) - .map { _meta, filename -> filename } + .map{_meta, filename -> filename} .collect() - .merge(aligned_shifts) { a, b -> tuple(a, b) } - .merge(current_slice_config) { a, b -> tuple(a[0], a[1], b) } + .merge(shifts_xy){a, b -> tuple(a, b)} - bring_to_common_space(common_space_input) + // Bring all stitched slices to common space + bring_to_common_space(common_space_channel) - slices_common_space = bring_to_common_space.out + all_slices_common_space = bring_to_common_space.out .flatten() - .toSortedList { a, b -> a.getName() <=> b.getName() } - - if (params.common_space_preview) { - preview_input = bring_to_common_space.out - .flatten() - .map { f -> toSliceTuple(f) } - generate_common_space_preview(preview_input) - } + .toSortedList{a, b -> a[0] <=> b[0]} - // Stage 5: Missing Slice Interpolation (optional). - // Single-slice gaps (use=false slices already filtered upstream) are - // interpolated with zmorph; per-slice diagnostics are merged into - // slice_config_final.csv. See docs/SLICE_INTERPOLATION_FEATURE.md. - if (params.interpolate_missing_slices) { - gaps_channel = slices_common_space - .map { sliceList -> [detectSingleGaps(sliceList), sliceList] } - .flatMap { gapsAndSlices -> - def gaps = gapsAndSlices[0] - def sliceList = gapsAndSlices[1] - if (gaps.isEmpty()) return [] - - gaps.collect { gap -> - def (missingId, beforeId, afterId) = gap - def sliceBefore = sliceList.find { f -> f.getName().contains("slice_z${beforeId}") } - def sliceAfter = sliceList.find { f -> f.getName().contains("slice_z${afterId}") } - (sliceBefore && sliceAfter) ? tuple(missingId, sliceBefore, sliceAfter) : null - }.findAll { item -> item != null } + // Prepare for pairwise stack registration + fixed_channel = all_slices_common_space + .map {list -> + if(list.size() > 1){ + return list.subList(0, list.size() - 1) + } + else { + return channel.empty() } - - interpolate_missing_slice(gaps_channel) - - // Publish slice_config_final.csv as an artifact for the report. - // Intentionally NOT piped back into current_slice_config: when no - // gaps exist, interpolate_missing_slice does not run and finalise's - // output channel is empty, which would in turn empty out - // current_slice_config and silently skip stack. Stack only reads - // use/auto_excluded — neither column is modified here — so reading - // the upstream config is equivalent. - if (has_slice_config) { - finalise_interpolation( - current_slice_config, - interpolate_missing_slice.out.manifest.collect(), - ) } - - all_slices = slices_common_space - .mix(interpolate_missing_slice.out.zarr.collect()) - .flatten() - .toSortedList { a, b -> a.getName() <=> b.getName() } - } else { - all_slices = slices_common_space - } - - // Stage 6: Pairwise Registration - log.info "Registering slices pairwise" - - fixed_slices = all_slices - .map { list -> list.size() > 1 ? list.subList(0, list.size() - 1) : [] } - .flatten() - moving_slices = all_slices - .map { list -> list.size() > 1 ? list.subList(1, list.size()) : [] } .flatten() - pairs = fixed_slices.merge(moving_slices) - - register_pairwise(pairs) - - slices_collected = all_slices.flatten().collect() - transforms_collected = register_pairwise.out.collect() - - // Stage 6.5: Export manual-alignment package (optional). - if (params.export_manual_align) { - export_input = slices_collected - .combine(transforms_collected) - .map { items -> partitionSlicesAndTransforms(items) } - make_manual_align_package(export_input) - } - - // Stage 6.75: Refine manual transforms (optional). Re-runs pairwise - // registration initialised from each manual transform; non-manual pairs - // are copied unchanged. Refined outputs replace automated transforms. - if (params.refine_manual_transforms && params.manual_transforms_dir) { - log.info "Refining manual transforms from: ${params.manual_transforms_dir}" - refine_input = slices_collected - .combine(transforms_collected) - .map { items -> partitionSlicesAndTransforms(items) } - refine_manual_transforms(refine_input) - transforms_for_stack = refine_manual_transforms.out.collect() - } else { - transforms_for_stack = transforms_collected - } - - // Stage 7: Stacking - log.info "Stacking slices with registration refinements" - - // Auto-exclude: detect clusters of consecutive low-quality registrations. - // Stamps auto_excluded/auto_exclude_reason into slice_config so stack - // sees them via --slice_config. Requires a real slice_config. - stack_slice_config = current_slice_config - if (params.auto_exclude_enabled && has_slice_config) { - auto_exclude_slices(transforms_for_stack, current_slice_config) - stack_slice_config = auto_exclude_slices.out.slice_config - } - - // Build stack_input with `merge` (preserves list-vs-file structure of each - // input). Earlier versions used `combine`, which flattens lists into a - // single tuple and forced fragile filename-based dispatch in `.map`. - stack_input = slices_collected - .merge(shifts_xy) { s, x -> tuple(s, x) } - .merge(transforms_for_stack) { acc, t -> tuple(acc[0], acc[1], t) } - .merge(stack_slice_config) { acc, sc -> tuple(acc[0], acc[1], acc[2], sc) } - .map { slices, shifts, transforms, sc -> - tuple(slices, shifts, transforms, sc, subject_name, extractSliceIdsString(slices)) + moving_channel = all_slices_common_space + .map {list -> + if(list.size() > 1){ + return list.subList(1, list.size()) + } + else { + return channel.empty() + } } + .flatten() - stack(stack_input) - stack_output = stack.out.volume - stack_metadata = stack_input.map { _slices, _shifts, _transforms, _sc, name, ids_str -> - tuple(name, ids_str.split(',').size(), ids_str) - } - - // Stage 8: Z-Intensity Normalization (optional) - if (params.normalize_z_slices) { - log.info "Normalizing Z-direction intensity drift (sigma=${params.znorm_smooth_sigma} slices)" - znorm_input = stack_output - .combine(stack_metadata) - .map { zarr, _zip, _png, _annotated, name, n, ids_str -> tuple(zarr, name, n, ids_str) } - normalize_z_intensity(znorm_input) - final_stack_output = normalize_z_intensity.out - } else { - final_stack_output = stack_output - } - - // Stage 9: Report Generation (optional) - if (params.generate_report) { - generate_report(final_stack_output, subject_name) - } - - // Stage 10: Atlas Registration (optional) - if (params.align_to_ras_enabled) { - log.info "Registering to Allen Mouse Brain Atlas (RAS alignment)" - align_to_ras(final_stack_output, subject_name) - } - - // Stage 11: Diagnostics (optional). Toggle individually or via diagnostic_mode. - if (params.diagnostic_mode) { - log.info "DIAGNOSTIC MODE enabled (acq rotation, rotation drift, motor-only stitch/stack)" - } - - if (diagEnabled('analyze_acquisition_rotation')) { - analyze_acquisition_rotation(shifts_xy, register_pairwise.out.collect()) - } - - if (diagEnabled('analyze_rotation_drift')) { - analyze_rotation_drift(register_pairwise.out.collect()) - } - - if (diagEnabled('motor_only_stack')) { - motor_only_stack_input = normalize.out.normalized - .map { _id, slice_file -> slice_file } - .collect() - stack_motor_only(motor_only_stack_input, shifts_xy) - } - - // motor_only_stitch is also a prerequisite for compare_stitching, so run it - // whenever either is requested. A second `stitch_motor_only(illum_fixed)` - // call would emit the same channel twice, which Nextflow forbids. - def runMotorStitch = diagEnabled('motor_only_stitch') - def runComparison = params.compare_stitching || params.diagnostic_mode - if (runMotorStitch || runComparison) { - stitch_motor_only(illum_fixed) - } - - if (runComparison) { - log.info "Running stitching comparison (motor-only vs refined)..." - - stitch_refined(illum_fixed) - - motor_stitch_with_id = stitch_motor_only.out.map { f -> toSliceTuple(f) } - refined_stitch_with_id = stitch_refined.out[0].map { f -> toSliceTuple(f) } - - comparison_input = motor_stitch_with_id - .combine(refined_stitch_with_id, by: 0) + // Register slices pairwise + pairs_channel = fixed_channel.merge(moving_channel) + register_pairwise(pairs_channel) - compare_stitching(comparison_input) - } + // Stack all the slices in a single volume + stack_channel = all_slices_common_space.merge(register_pairwise.out.collect()){a, b -> tuple(a, b)} + stack(stack_channel) }