From 0ea82068a6232777bdc29099e310cd7fd792b7a9 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Mon, 6 Apr 2026 10:28:23 -0700 Subject: [PATCH 1/3] Add Phase 1: daily incremental assembly version tracking --- flows/parsers/backfill_missing_versions.py | 232 +++++++ .../parsers/update_historical_incremental.py | 401 +++++++++++ tests/test_incremental_update.py | 644 ++++++++++++++++++ 3 files changed, 1277 insertions(+) create mode 100644 flows/parsers/backfill_missing_versions.py create mode 100644 flows/parsers/update_historical_incremental.py create mode 100644 tests/test_incremental_update.py diff --git a/flows/parsers/backfill_missing_versions.py b/flows/parsers/backfill_missing_versions.py new file mode 100644 index 0000000..d153097 --- /dev/null +++ b/flows/parsers/backfill_missing_versions.py @@ -0,0 +1,232 @@ +"""Targeted backfill for assembly versions missing from historical records. + +Run this when the incremental updater reports assemblies whose previous +version was absent from the previous parsed TSV. Fetches only the specified +missing versions from NCBI, parses them, and merges the result into the +existing assembly_historical.tsv. + +Usage: + python -m flows.parsers.backfill_missing_versions \\ + --missing_json tmp/missing_versions.json \\ + --yaml_path configs/assembly_historical.types.yaml \\ + --work_dir tmp +""" + +import csv +import json +import os +from pathlib import Path +from typing import Optional + +from flows.lib import utils +from flows.lib.conditional_import import flow +from flows.lib.shared_args import WORK_DIR, YAML_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required +from flows.lib.utils import Parser +from flows.parsers.parse_backfill_historical_versions import ( + find_all_assembly_versions, + parse_historical_version, + parse_version, + setup_cache_directories, +) +from flows.parsers.parse_ncbi_assemblies import write_to_tsv + +MISSING_JSON = { + "flags": ["-m", "--missing_json"], + "keys": { + "help": "Path to the missing_versions.json produced by the incremental updater.", + "type": str, + }, +} + + +def load_missing_versions(missing_json: str) -> list[dict]: + """Load the list of missing versions from a JSON file. + + The file is written by run_incremental_historical_update when it + encounters assemblies with no matching previous version in the parsed TSV. + + Args: + missing_json (str): Path to missing_versions.json. + + Returns: + list: Missing-version records, each with base_accession, missing_version, + new_version, and new_accession keys. + """ + with open(missing_json, encoding="utf-8") as f: + return json.load(f) + + +def load_existing_historical(historical_tsv: str) -> dict[str, dict]: + """Load an existing assembly_historical.tsv keyed by genbankAccession. + + Args: + historical_tsv (str): Path to the existing historical TSV file. + + Returns: + dict: Rows keyed by genbankAccession, or empty dict if the file is absent. + """ + existing: dict[str, dict] = {} + if not Path(historical_tsv).exists(): + return existing + + with open(historical_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + acc = row.get("genbankAccession", "") + if acc: + existing[acc] = dict(row) + + return existing + + +@flow(log_prints=True) +def backfill_missing_versions( + missing_json: str, + yaml_path: str, + work_dir: str = ".", +) -> None: + """Fetch and parse assembly versions missing from the historical TSV. + + For each entry in missing_json, discovers all versions of that assembly + via NCBI FTP, fetches metadata for the specific missing version, parses it + through the standard GenomeHubs pipeline, and merges the result into the + existing assembly_historical.tsv. + + Args: + missing_json (str): Path to missing_versions.json from the incremental + updater. + yaml_path (str): Path to assembly_historical.types.yaml. + work_dir (str): Working directory for caches and output. + """ + setup_cache_directories(work_dir) + config = utils.load_config(config_file=yaml_path) + + missing = load_missing_versions(missing_json) + if not missing: + print("No missing versions to backfill.") + return + + historical_tsv = config.meta["file_name"] + existing = load_existing_historical(historical_tsv) + parsed = dict(existing) + + total = len(missing) + succeeded = 0 + failed = 0 + + separator = "=" * 80 + print(f"\n{separator}") + print("MISSING VERSION BACKFILL") + print(f"{separator}") + print(f" Missing entries to process: {total}") + print(f" Merging into: {historical_tsv}") + print(f" Existing records: {len(existing)}") + print(f"{separator}\n") + + for i, entry in enumerate(missing): + base_acc = entry["base_accession"] + missing_version = entry["missing_version"] + new_accession = entry["new_accession"] + target_acc = f"{base_acc}.{missing_version}" + + print(f"[{i + 1}/{total}] {target_acc}") + + all_versions = find_all_assembly_versions(new_accession, work_dir) + if not all_versions: + print(" Warning: No versions found via FTP — skipping.") + failed += 1 + continue + + version_data = next( + (v for v in all_versions if parse_version(v.get("accession", "")) == missing_version), + None, + ) + if version_data is None: + print(f" Warning: v{missing_version} not found in FTP listing — skipping.") + failed += 1 + continue + + try: + print(f" Parsing v{missing_version}...", end=" ", flush=True) + row = parse_historical_version( + version_data=version_data, + config=config, + base_accession=base_acc, + version_num=missing_version, + current_accession=new_accession, + ) + genbank_acc = row.get("genbankAccession", target_acc) + parsed[genbank_acc] = row + succeeded += 1 + print("done") + except Exception as e: + print(f"failed ({e})") + failed += 1 + continue + + if succeeded > 0: + print(f"\nWriting {len(parsed)} records to {historical_tsv}...") + write_to_tsv(parsed, config) + + print(f"\n{separator}") + print("MISSING VERSION BACKFILL COMPLETE") + print(f"{separator}") + print(f" Succeeded: {succeeded}/{total}") + if failed > 0: + print(f" Failed: {failed}/{total}") + print(f" Total records in {historical_tsv}: {len(parsed)}") + print(f"{separator}\n") + + +def backfill_missing_versions_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +) -> None: + """Wrapper matching the fetch_parse_validate parser signature. + + Locates missing_versions.json in work_dir and delegates to + backfill_missing_versions. + + Args: + working_yaml (str): Path to the working YAML file. + work_dir (str): Path to the working directory containing + missing_versions.json. + append (bool): Unused; accepted for pipeline compatibility. + data_freeze_path (str, optional): Unused; accepted for pipeline + compatibility. + **kwargs: Additional keyword arguments passed by the pipeline. + """ + missing_json_path = os.path.join(work_dir, "missing_versions.json") + if not Path(missing_json_path).exists(): + raise FileNotFoundError(f"No missing_versions.json found in {work_dir}") + + backfill_missing_versions( + missing_json=missing_json_path, + yaml_path=working_yaml, + work_dir=work_dir, + ) + + +def plugin() -> Parser: + """Register the flow.""" + return Parser( + name="BACKFILL_MISSING_VERSIONS", + func=backfill_missing_versions_wrapper, + description="Targeted backfill for assembly versions missing from historical records.", + ) + + +if __name__ == "__main__": + args = _parse_args( + [required(MISSING_JSON), required(YAML_PATH), WORK_DIR], + description="Targeted backfill for assembly versions missing from historical records", + ) + backfill_missing_versions( + missing_json=args.missing_json, + yaml_path=args.yaml_path, + work_dir=args.work_dir, + ) diff --git a/flows/parsers/update_historical_incremental.py b/flows/parsers/update_historical_incremental.py new file mode 100644 index 0000000..3de0253 --- /dev/null +++ b/flows/parsers/update_historical_incremental.py @@ -0,0 +1,401 @@ +"""Daily incremental updates to historical assembly records. + +Identifies assembly versions newly superseded since the last run and appends them to assembly_historical.tsv. No NCBI fetches are required — data is copied directly from the previous assembly_current.tsv parse output. + +Usage: + python -m flows.parsers.update_historical_incremental \\ + --input_path assembly_data_report.jsonl \\ + --previous_tsv assembly_current.tsv.previous \\ + --historical_tsv outputs/assembly_historical.tsv +""" + +import csv +import json +import os +from glob import glob +from pathlib import Path +from typing import Optional + +from flows.lib.conditional_import import flow +from flows.lib.shared_args import INPUT_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required +from flows.lib.utils import Parser +from flows.parsers.parse_backfill_historical_versions import parse_accession + +PREVIOUS_TSV = { + "flags": ["-p", "--previous_tsv"], + "keys": { + "help": "Path to assembly_current.tsv from the previous run.", + "type": str, + }, +} + +HISTORICAL_TSV = { + "flags": ["-H", "--historical_tsv"], + "keys": { + "help": "Path to assembly_historical.tsv to update.", + "type": str, + }, +} + + +def load_previous_parsed_by_base(previous_tsv: str) -> dict[str, dict[int, dict]]: + """Load previous parsed results indexed by base accession and version. + + Args: + previous_tsv (str): Path to assembly_current.tsv from the previous run. + + Returns: + dict: Nested mapping of base_accession -> version -> row data. + Returns an empty dict if the file is not found, which is expected + on the first run after the Phase 0 backfill. + """ + previous_by_base: dict[str, dict[int, dict]] = {} + + try: + with open(previous_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + accession = row["accession"] + base_acc, version = parse_accession(accession) + if base_acc not in previous_by_base: + previous_by_base[base_acc] = {} + previous_by_base[base_acc][version] = dict(row) + except FileNotFoundError: + print(f"Warning: Previous TSV not found: {previous_tsv}") + print(" This is expected for the first run after the Phase 0 backfill.") + return {} + + total = sum(len(v) for v in previous_by_base.values()) + print(f"Loaded {total} assemblies from previous parsed results.") + print(f" Unique base accessions: {len(previous_by_base)}") + + return previous_by_base + + +def build_superseded_row( + previous_row: dict, + previous_version: int, + new_accession: str, + new_version: int, + release_date: str, +) -> dict: + """Build a superseded row from a previous row with updated metadata. + + Args: + previous_row (dict): Row data copied from the previous parsed TSV. + previous_version (int): Version number of the assembly being superseded. + new_accession (str): Accession of the assembly that supersedes this one. + new_version (int): Version number of the superseding assembly. + release_date (str): Release date of the superseding assembly. + + Returns: + dict: Updated row with version_status, assembly_id, and superseded_by fields. + """ + base_acc, _ = parse_accession(previous_row["accession"]) + row = previous_row.copy() + row["version_status"] = "superseded" + row["assembly_id"] = f"{base_acc}_{previous_version}" + row["superseded_by"] = new_accession + row["superseded_by_version"] = new_version + row["superseded_date"] = release_date + return row + + +def build_missing_version_record( + base_acc: str, + missing_version: int, + new_version: int, + new_accession: str, + is_new_series: bool = False, +) -> dict: + """Build a record describing a version missing from the previous parsed TSV. + + Args: + base_acc (str): Base accession without version suffix. + missing_version (int): The version number that could not be found. + new_version (int): The new version number that triggered this check. + new_accession (str): Full accession of the new assembly. + is_new_series (bool): True if this is a new assembly series with no + prior history at all. + + Returns: + dict: Record suitable for writing to a missing-versions JSON file. + """ + record = { + "base_accession": base_acc, + "missing_version": missing_version, + "new_version": new_version, + "new_accession": new_accession, + } + if is_new_series: + record["note"] = "New assembly series — prior versions may need backfill" + return record + + +def identify_newly_superseded( + new_jsonl: str, + previous_by_base: dict[str, dict[int, dict]], +) -> tuple[list[dict], list[dict]]: + """Identify versions that became superseded in the current JSONL update. + + For each assembly with version > 1 in the new JSONL, checks whether the + immediately preceding version exists in the previous parsed TSV. Assemblies + whose predecessor is found are added to the superseded list; those missing a + predecessor are recorded for optional backfill. + + Args: + new_jsonl (str): Path to the current assembly_data_report.jsonl. + previous_by_base (dict): Indexed previous parsed results from + load_previous_parsed_by_base. + + Returns: + tuple: (newly_superseded, missing_versions) where each is a list of dicts. + """ + newly_superseded: list[dict] = [] + missing_versions: list[dict] = [] + + with open(new_jsonl) as f: + for line in f: + assembly = json.loads(line) + accession = assembly["accession"] + base_acc, new_version = parse_accession(accession) + + if new_version <= 1: + continue + + previous_version = new_version - 1 + + if base_acc not in previous_by_base: + missing_versions.append(build_missing_version_record( + base_acc, previous_version, new_version, accession, + is_new_series=True, + )) + continue + + if previous_version not in previous_by_base[base_acc]: + missing_versions.append(build_missing_version_record( + base_acc, previous_version, new_version, accession, + )) + continue + + previous_row = previous_by_base[base_acc][previous_version] + release_date = assembly.get("releaseDate") or "" + newly_superseded.append(build_superseded_row( + previous_row, previous_version, accession, new_version, release_date, + )) + + return newly_superseded, missing_versions + + +def append_superseded_to_tsv( + newly_superseded: list[dict], historical_tsv: str +) -> None: + """Append newly superseded rows to the historical TSV, deduplicating by assembly_id. + + Reads the existing file if present, merges new rows (new rows take + precedence on duplicate assembly_id), and writes the combined result back. + + Args: + newly_superseded (list): Row dicts from identify_newly_superseded. + historical_tsv (str): Path to assembly_historical.tsv. + """ + if not newly_superseded: + print(" No newly superseded versions to add.") + return + + existing: dict[str, dict] = {} + historical_path = Path(historical_tsv) + + if historical_path.exists(): + with open(historical_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + existing[row["assembly_id"]] = dict(row) + + for row in newly_superseded: + existing[row["assembly_id"]] = row + + fieldnames = list(next(iter(existing.values())).keys()) + with open(historical_tsv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, fieldnames=fieldnames, delimiter="\t", extrasaction="ignore" + ) + writer.writeheader() + writer.writerows(existing.values()) + + print(f" Added {len(newly_superseded)} newly superseded versions.") + print(f" Total records in {historical_tsv}: {len(existing)}") + + +def print_superseded_summary(newly_superseded: list[dict]) -> None: + """Print a short summary of the newly superseded versions. + + Args: + newly_superseded (list): Row dicts from identify_newly_superseded. + """ + if not newly_superseded: + print(" Found: 0 newly superseded versions.") + return + + print(f" Found: {len(newly_superseded)} newly superseded versions.") + print(" Examples:") + for row in newly_superseded[:5]: + print(f" {row['accession']} -> superseded by v{row['superseded_by_version']}") + if len(newly_superseded) > 5: + print(f" ... and {len(newly_superseded) - 5} more") + + +def print_missing_versions_warning(missing: list[dict]) -> None: + """Print a warning listing versions absent from the previous parsed TSV. + + Args: + missing (list): Missing-version records from identify_newly_superseded. + """ + if not missing: + return + + print(f"\n Warning: {len(missing)} assemblies have missing previous versions.") + print(" These may need manual backfill:") + for m in missing[:5]: + print( + f" {m['base_accession']}: " + f"need v{m['missing_version']}, have v{m['new_version']}" + ) + if len(missing) > 5: + print(f" ... and {len(missing) - 5} more") + print("\n To backfill missing versions, run:") + print(" python -m flows.parsers.backfill_missing_versions") + + +@flow(log_prints=True) +def run_incremental_historical_update( + new_jsonl: str, + previous_tsv: str, + historical_tsv: str, +) -> dict: + """Daily incremental update of the historical assembly TSV. + + Called after parse_ncbi_assemblies completes. Uses the previous parsed + TSV — no NCBI fetches are made. + + Args: + new_jsonl (str): Path to the current assembly_data_report.jsonl. + previous_tsv (str): Path to assembly_current.tsv from the previous run. + historical_tsv (str): Path to assembly_historical.tsv to update. + + Returns: + dict: Summary with keys newly_superseded_count, missing_versions_count, + and missing_versions (list of records). + """ + separator = "=" * 80 + print(f"\n{separator}") + print("INCREMENTAL HISTORICAL UPDATE") + print(f"{separator}\n") + + print("[1/3] Loading previous parsed results...") + previous_by_base = load_previous_parsed_by_base(previous_tsv) + + if not previous_by_base: + print(" No previous parsed data available — skipping incremental update.") + print(" This is expected for the first run after the Phase 0 backfill.\n") + return { + "newly_superseded_count": 0, + "missing_versions_count": 0, + "missing_versions": [], + } + + print("\n[2/3] Identifying newly superseded versions...") + newly_superseded, missing = identify_newly_superseded(new_jsonl, previous_by_base) + print_superseded_summary(newly_superseded) + print_missing_versions_warning(missing) + + print("\n[3/3] Updating historical TSV...") + append_superseded_to_tsv(newly_superseded, historical_tsv) + + print(f"\n{separator}") + print( + f"INCREMENTAL UPDATE COMPLETE " + f"Superseded: {len(newly_superseded)} " + f"Missing: {len(missing)}" + ) + print(f"{separator}\n") + + return { + "newly_superseded_count": len(newly_superseded), + "missing_versions_count": len(missing), + "missing_versions": missing, + } + + +def incremental_update_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +) -> None: + """Wrapper matching the fetch_parse_validate parser signature. + + Derives the previous TSV and historical TSV paths from work_dir and + delegates to run_incremental_historical_update. + + Args: + working_yaml (str): Path to the working YAML file (unused; accepted + for pipeline compatibility). + work_dir (str): Path to the working directory containing the JSONL, + the previous TSV, and the historical TSV. + append (bool): Unused; accepted for pipeline compatibility. + data_freeze_path (str, optional): Unused; accepted for pipeline + compatibility. + **kwargs: Additional keyword arguments passed by the pipeline. + """ + glob_path = os.path.join(work_dir, "*.jsonl") + paths = glob(glob_path) + if not paths: + raise FileNotFoundError(f"No jsonl file found in {work_dir}") + if len(paths) > 1: + raise ValueError(f"More than one jsonl file found in {work_dir}") + + results = run_incremental_historical_update( + new_jsonl=paths[0], + previous_tsv=os.path.join(work_dir, "assembly_current.tsv.previous"), + historical_tsv=os.path.join(work_dir, "assembly_historical.tsv"), + ) + + if results["missing_versions_count"] > 0: + missing_json_path = os.path.join(work_dir, "missing_versions.json") + with open(missing_json_path, "w", encoding="utf-8") as f: + json.dump(results["missing_versions"], f, indent=2) + print(f" Missing versions written to: {missing_json_path}") + + +def plugin() -> Parser: + """Register the flow.""" + return Parser( + name="UPDATE_HISTORICAL_INCREMENTAL", + func=incremental_update_wrapper, + description="Daily incremental update of historical assembly records.", + ) + + +if __name__ == "__main__": + args = _parse_args( + [required(INPUT_PATH), required(PREVIOUS_TSV), required(HISTORICAL_TSV)], + description="Daily incremental update of historical assembly records", + ) + results = run_incremental_historical_update( + new_jsonl=args.input_path, + previous_tsv=args.previous_tsv, + historical_tsv=args.historical_tsv, + ) + print(f"Summary: superseded={results['newly_superseded_count']}, " + f"missing={results['missing_versions_count']}") + if results["missing_versions_count"] > 0: + missing_json_path = Path(args.historical_tsv).parent / "missing_versions.json" + with open(missing_json_path, "w", encoding="utf-8") as f: + json.dump(results["missing_versions"], f, indent=2) + print( + f" Action needed: {results['missing_versions_count']} missing versions." + ) + print(f" Written to: {missing_json_path}") + print(" Run: python -m flows.parsers.backfill_missing_versions") diff --git a/tests/test_incremental_update.py b/tests/test_incremental_update.py new file mode 100644 index 0000000..9d05642 --- /dev/null +++ b/tests/test_incremental_update.py @@ -0,0 +1,644 @@ +"""Tests for update_historical_incremental.py and backfill_missing_versions.py + +Covers: +- Loading and indexing previous parsed TSV results +- Building superseded and missing-version records +- Core supersession detection logic (superseded, missing-with-gap, new-series, v1-skip) +- Appending to historical TSV with deduplication +- Incremental orchestrator flow behaviour +- Loading existing historical TSV (backfill helper) +- Backfill flow: version selection and TSV merge +""" + +import csv +import json +import os +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +os.environ["SKIP_PREFECT"] = "true" + +from flows.parsers import ( # noqa: E402 + backfill_missing_versions as backfill_module, + update_historical_incremental as incremental_module, +) +from flows.lib.utils import Parser # noqa: E402 +from flows.parsers.backfill_missing_versions import ( # noqa: E402 + backfill_missing_versions, + backfill_missing_versions_wrapper, + load_existing_historical, +) +from flows.parsers.update_historical_incremental import ( # noqa: E402 + append_superseded_to_tsv, + build_missing_version_record, + build_superseded_row, + identify_newly_superseded, + load_previous_parsed_by_base, + run_incremental_historical_update, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def write_tsv(path: Path, rows: list[dict]) -> None: + """Write a list of dicts to a tab-separated file.""" + if not rows: + return + with open(path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()), delimiter="\t") + writer.writeheader() + writer.writerows(rows) + + +def read_tsv(path: Path) -> list[dict]: + """Read a tab-separated file into a list of dicts.""" + with open(path, encoding="utf-8") as f: + return list(csv.DictReader(f, delimiter="\t")) + + +def write_jsonl(path: Path, records: list[dict]) -> None: + """Write a list of dicts as newline-delimited JSON.""" + with open(path, "w", encoding="utf-8") as f: + for record in records: + f.write(json.dumps(record) + "\n") + + +# --------------------------------------------------------------------------- +# TestLoadPreviousParsed +# --------------------------------------------------------------------------- + +class TestLoadPreviousParsed: + """load_previous_parsed_by_base indexes rows by base accession and version.""" + + def test_missing_file_returns_empty(self, tmp_path): + result = load_previous_parsed_by_base(str(tmp_path / "nope.tsv")) + assert result == {} + + def test_single_version_indexed(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [{"accession": "GCA_000222935.1", "taxon_id": "12345"}]) + result = load_previous_parsed_by_base(str(tsv)) + assert "GCA_000222935" in result + assert 1 in result["GCA_000222935"] + assert result["GCA_000222935"][1]["taxon_id"] == "12345" + + def test_multi_version_same_base(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"}, + {"accession": "GCA_000222935.2", "taxon_id": "1"}, + ]) + result = load_previous_parsed_by_base(str(tsv)) + assert len(result["GCA_000222935"]) == 2 + assert 1 in result["GCA_000222935"] + assert 2 in result["GCA_000222935"] + + def test_multiple_base_accessions(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"}, + {"accession": "GCA_000412225.1", "taxon_id": "2"}, + ]) + result = load_previous_parsed_by_base(str(tsv)) + assert len(result) == 2 + assert "GCA_000222935" in result + assert "GCA_000412225" in result + + +# --------------------------------------------------------------------------- +# TestBuildSupersededRow +# --------------------------------------------------------------------------- + +class TestBuildSupersededRow: + """build_superseded_row stamps the correct metadata onto a copied row.""" + + def _base_row(self): + return { + "accession": "GCA_000222935.1", + "taxon_id": "12345", + "assembly_level": "Chromosome", + } + + def test_version_status_set(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["version_status"] == "superseded" + + def test_assembly_id_format(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["assembly_id"] == "GCA_000222935_1" + + def test_superseded_by_fields(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["superseded_by"] == "GCA_000222935.2" + assert row["superseded_by_version"] == 2 + assert row["superseded_date"] == "2024-01-15" + + def test_original_row_not_mutated(self): + original = self._base_row() + build_superseded_row(original, 1, "GCA_000222935.2", 2, "2024-01-15") + assert "version_status" not in original + + def test_existing_fields_preserved(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["taxon_id"] == "12345" + assert row["assembly_level"] == "Chromosome" + + +# --------------------------------------------------------------------------- +# TestBuildMissingVersionRecord +# --------------------------------------------------------------------------- + +class TestBuildMissingVersionRecord: + """build_missing_version_record captures the gap details.""" + + def test_required_fields(self): + rec = build_missing_version_record("GCA_000222935", 2, 3, "GCA_000222935.3") + assert rec["base_accession"] == "GCA_000222935" + assert rec["missing_version"] == 2 + assert rec["new_version"] == 3 + assert rec["new_accession"] == "GCA_000222935.3" + + def test_no_note_by_default(self): + rec = build_missing_version_record("GCA_000222935", 1, 2, "GCA_000222935.2") + assert "note" not in rec + + def test_note_present_for_new_series(self): + rec = build_missing_version_record( + "GCA_000222935", 1, 2, "GCA_000222935.2", is_new_series=True + ) + assert "note" in rec + + +# --------------------------------------------------------------------------- +# TestIdentifyNewlySuperseded +# --------------------------------------------------------------------------- + +class TestIdentifyNewlySuperseded: + """identify_newly_superseded covers all branching cases.""" + + def _write_jsonl(self, tmp_path, records): + path = tmp_path / "new.jsonl" + write_jsonl(path, records) + return str(path) + + def test_v1_assembly_skipped(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_000222935.1"}]) + superseded, missing = identify_newly_superseded(jsonl, {}) + assert superseded == [] + assert missing == [] + + def test_superseded_found_when_previous_version_present(self, tmp_path): + jsonl = self._write_jsonl( + tmp_path, [{"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"}] + ) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert len(superseded) == 1 + assert superseded[0]["superseded_by"] == "GCA_000222935.2" + assert missing == [] + + def test_missing_with_version_gap(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_000222935.3"}]) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert superseded == [] + assert len(missing) == 1 + assert missing[0]["missing_version"] == 2 + + def test_new_series_no_prior_base(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_999999999.2"}]) + superseded, missing = identify_newly_superseded(jsonl, {}) + assert superseded == [] + assert len(missing) == 1 + assert missing[0]["note"] + + def test_mixed_batch(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-01"}, + {"accession": "GCA_000412225.1"}, + {"accession": "GCA_999999999.2"}, + ]) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert len(superseded) == 1 + assert len(missing) == 1 + + +# --------------------------------------------------------------------------- +# TestAppendSupersededToTsv +# --------------------------------------------------------------------------- + +class TestAppendSupersededToTsv: + """append_superseded_to_tsv correctly creates, appends, and deduplicates.""" + + def _make_row(self, acc, assembly_id, status="superseded"): + return { + "accession": acc, + "assembly_id": assembly_id, + "version_status": status, + } + + def test_creates_new_file(self, tmp_path): + tsv = tmp_path / "historical.tsv" + rows = [self._make_row("GCA_000222935.1", "GCA_000222935_1")] + append_superseded_to_tsv(rows, str(tsv)) + assert tsv.exists() + result = read_tsv(tsv) + assert len(result) == 1 + assert result[0]["accession"] == "GCA_000222935.1" + + def test_appends_to_existing(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [self._make_row("GCA_000412225.1", "GCA_000412225_1")]) + append_superseded_to_tsv( + [self._make_row("GCA_000222935.1", "GCA_000222935_1")], str(tsv) + ) + result = read_tsv(tsv) + assert len(result) == 2 + + def test_dedup_on_assembly_id_keeps_new(self, tmp_path): + tsv = tmp_path / "historical.tsv" + old_row = { + "accession": "GCA_000222935.1", + "assembly_id": "GCA_000222935_1", + "version_status": "superseded", + "superseded_by": "GCA_000222935.2", + } + write_tsv(tsv, [old_row]) + new_row = dict(old_row) + new_row["superseded_by"] = "GCA_000222935.3" + append_superseded_to_tsv([new_row], str(tsv)) + result = read_tsv(tsv) + assert len(result) == 1 + assert result[0]["superseded_by"] == "GCA_000222935.3" + + def test_no_op_when_empty_list(self, tmp_path): + tsv = tmp_path / "historical.tsv" + append_superseded_to_tsv([], str(tsv)) + assert not tsv.exists() + + +# --------------------------------------------------------------------------- +# TestIncrementalOrchestrator +# --------------------------------------------------------------------------- + +class TestIncrementalOrchestrator: + """run_incremental_historical_update orchestrator behaviour.""" + + def test_no_previous_tsv_returns_empty_result(self, tmp_path): + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [{"accession": "GCA_000222935.2"}]) + result = run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(tmp_path / "nope.tsv"), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["newly_superseded_count"] == 0 + assert result["missing_versions_count"] == 0 + assert result["missing_versions"] == [] + + def test_one_superseded_produces_correct_counts(self, tmp_path): + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} + ]) + result = run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["newly_superseded_count"] == 1 + assert result["missing_versions_count"] == 0 + + def test_missing_version_detected_in_orchestrator_result(self, tmp_path): + """v3 present, v2 missing → missing_versions_count == 1.""" + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.3", "releaseDate": "2024-06-01"} + ]) + result = run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["missing_versions_count"] == 1 + assert result["missing_versions"][0]["base_accession"] == "GCA_000222935" + assert result["missing_versions"][0]["missing_version"] == 2 + + def test_historical_tsv_written(self, tmp_path): + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} + ]) + historical_tsv = tmp_path / "historical.tsv" + run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(historical_tsv), + ) + assert historical_tsv.exists() + rows = read_tsv(historical_tsv) + assert len(rows) == 1 + assert rows[0]["version_status"] == "superseded" + + +# --------------------------------------------------------------------------- +# TestLoadExistingHistorical +# --------------------------------------------------------------------------- + +class TestLoadExistingHistorical: + """load_existing_historical indexes rows by genbankAccession.""" + + def test_missing_file_returns_empty(self, tmp_path): + result = load_existing_historical(str(tmp_path / "nope.tsv")) + assert result == {} + + def test_rows_keyed_by_genbank_accession(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [ + {"genbankAccession": "GCA_000222935.1", "version_status": "superseded"}, + {"genbankAccession": "GCA_000412225.1", "version_status": "superseded"}, + ]) + result = load_existing_historical(str(tsv)) + assert "GCA_000222935.1" in result + assert "GCA_000412225.1" in result + assert len(result) == 2 + + def test_row_data_preserved(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [ + {"genbankAccession": "GCA_000222935.1", "version_status": "superseded"} + ]) + result = load_existing_historical(str(tsv)) + assert result["GCA_000222935.1"]["version_status"] == "superseded" + + +# --------------------------------------------------------------------------- +# TestBackfillMissingVersionsFlow +# --------------------------------------------------------------------------- + +class TestBackfillMissingVersionsFlow: + """backfill_missing_versions selects the right version and merges into TSV.""" + + def _write_missing_json(self, tmp_path, entries): + path = tmp_path / "missing.json" + with open(path, "w") as f: + json.dump(entries, f) + return str(path) + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_correct_version_selected( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """Only the requested missing version should be parsed, not all versions.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.return_value = [ + {"accession": "GCA_000222935.1"}, + {"accession": "GCA_000222935.2"}, + ] + mock_parse.return_value = {"genbankAccession": "GCA_000222935.1"} + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_parse.assert_called_once() + parsed_version_data = mock_parse.call_args[1]["version_data"] + assert parsed_version_data["accession"] == "GCA_000222935.1" + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_merges_with_existing_historical( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """New rows must be merged with existing historical rows before writing.""" + historical_tsv = tmp_path / "historical.tsv" + write_tsv(historical_tsv, [ + {"genbankAccession": "GCA_000412225.1", "version_status": "superseded"} + ]) + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(historical_tsv)} + ) + mock_find.return_value = [{"accession": "GCA_000222935.1"}] + mock_parse.return_value = {"genbankAccession": "GCA_000222935.1"} + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_called_once() + written_parsed = mock_write.call_args[0][0] + assert "GCA_000412225.1" in written_parsed + assert "GCA_000222935.1" in written_parsed + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_no_write_when_version_not_found_in_ftp( + self, mock_utils, mock_find, mock_write, tmp_path + ): + """If the FTP listing does not include the missing version, skip silently.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.return_value = [{"accession": "GCA_000222935.2"}] + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_not_called() + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_no_versions_returned_from_ftp( + self, mock_utils, mock_find, mock_parse, mock_write, tmp_path + ): + """If FTP returns an empty list, skip gracefully without parsing or writing.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.return_value = [] + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_parse.assert_not_called() + mock_write.assert_not_called() + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_partial_parse_failure_writes_successful_rows( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """If one entry fails to parse, the successfully parsed ones are still written.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.side_effect = [ + [{"accession": "GCA_000222935.1"}, {"accession": "GCA_000222935.2"}], + [{"accession": "GCA_000412225.1"}, {"accession": "GCA_000412225.2"}], + ] + mock_parse.side_effect = [ + {"genbankAccession": "GCA_000222935.1"}, + ValueError("simulated parse failure"), + ] + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + }, + { + "base_accession": "GCA_000412225", + "missing_version": 1, + "new_accession": "GCA_000412225.2", + }, + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_called_once() + written = mock_write.call_args[0][0] + assert "GCA_000222935.1" in written + assert "GCA_000412225.1" not in written + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_empty_missing_json_no_op( + self, mock_utils, mock_find, mock_write, tmp_path + ): + """An empty missing_versions.json should not call write_to_tsv.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + missing_json = self._write_missing_json(tmp_path, []) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_not_called() + mock_find.assert_not_called() + + +# --------------------------------------------------------------------------- +# TestBackfillMissingVersionsWrapper +# --------------------------------------------------------------------------- + +class TestBackfillMissingVersionsWrapper: + """backfill_missing_versions_wrapper delegates to the flow correctly.""" + + @patch.object(backfill_module, "backfill_missing_versions") + def test_delegates_to_flow(self, mock_flow, tmp_path): + """Wrapper locates missing_versions.json and passes it to the flow.""" + missing_json = tmp_path / "missing_versions.json" + missing_json.write_text("[]", encoding="utf-8") + + backfill_missing_versions_wrapper( + working_yaml=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + append=False, + ) + + mock_flow.assert_called_once_with( + missing_json=str(missing_json), + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + def test_raises_when_missing_json_absent(self, tmp_path): + """Wrapper raises FileNotFoundError if missing_versions.json does not exist.""" + with pytest.raises(FileNotFoundError): + backfill_missing_versions_wrapper( + working_yaml=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + append=False, + ) + + +# --------------------------------------------------------------------------- +# TestBackfillMissingVersionsPlugin +# --------------------------------------------------------------------------- + +class TestBackfillMissingVersionsPlugin: + """plugin() returns a correctly configured Parser.""" + + def test_plugin_returns_parser(self): + result = backfill_module.plugin() + assert isinstance(result, Parser) + assert result.name == "BACKFILL_MISSING_VERSIONS" + assert result.func is backfill_missing_versions_wrapper From 5f79605f85bd24b42db881a4cf39c1e437171ae3 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 24 Apr 2026 09:41:38 -0700 Subject: [PATCH 2/3] Rename and reorganise Phase 1 files per Rich's review feedback --- ...remental.py => parse_assembly_versions.py} | 28 +++--- .../update_assembly_versions.py} | 86 ++++++----------- ...al_update.py => test_assembly_versions.py} | 96 ++++++------------- 3 files changed, 74 insertions(+), 136 deletions(-) rename flows/parsers/{update_historical_incremental.py => parse_assembly_versions.py} (94%) rename flows/{parsers/backfill_missing_versions.py => updaters/update_assembly_versions.py} (69%) rename tests/{test_incremental_update.py => test_assembly_versions.py} (88%) diff --git a/flows/parsers/update_historical_incremental.py b/flows/parsers/parse_assembly_versions.py similarity index 94% rename from flows/parsers/update_historical_incremental.py rename to flows/parsers/parse_assembly_versions.py index 3de0253..01243fd 100644 --- a/flows/parsers/update_historical_incremental.py +++ b/flows/parsers/parse_assembly_versions.py @@ -1,9 +1,11 @@ """Daily incremental updates to historical assembly records. -Identifies assembly versions newly superseded since the last run and appends them to assembly_historical.tsv. No NCBI fetches are required — data is copied directly from the previous assembly_current.tsv parse output. +Identifies assembly versions newly superseded since the last run and appends +them to assembly_historical.tsv. No NCBI fetches are required — data is +copied directly from the previous assembly_current.tsv parse output. Usage: - python -m flows.parsers.update_historical_incremental \\ + python -m flows.parsers.parse_assembly_versions \\ --input_path assembly_data_report.jsonl \\ --previous_tsv assembly_current.tsv.previous \\ --historical_tsv outputs/assembly_historical.tsv @@ -264,11 +266,11 @@ def print_missing_versions_warning(missing: list[dict]) -> None: if len(missing) > 5: print(f" ... and {len(missing) - 5} more") print("\n To backfill missing versions, run:") - print(" python -m flows.parsers.backfill_missing_versions") + print(" python -m flows.updaters.update_assembly_versions") @flow(log_prints=True) -def run_incremental_historical_update( +def parse_assembly_versions( new_jsonl: str, previous_tsv: str, historical_tsv: str, @@ -289,7 +291,7 @@ def run_incremental_historical_update( """ separator = "=" * 80 print(f"\n{separator}") - print("INCREMENTAL HISTORICAL UPDATE") + print("ASSEMBLY VERSION PARSE") print(f"{separator}\n") print("[1/3] Loading previous parsed results...") @@ -314,7 +316,7 @@ def run_incremental_historical_update( print(f"\n{separator}") print( - f"INCREMENTAL UPDATE COMPLETE " + f"ASSEMBLY VERSION PARSE COMPLETE " f"Superseded: {len(newly_superseded)} " f"Missing: {len(missing)}" ) @@ -327,7 +329,7 @@ def run_incremental_historical_update( } -def incremental_update_wrapper( +def parse_assembly_versions_wrapper( working_yaml: str, work_dir: str, append: bool, @@ -337,7 +339,7 @@ def incremental_update_wrapper( """Wrapper matching the fetch_parse_validate parser signature. Derives the previous TSV and historical TSV paths from work_dir and - delegates to run_incremental_historical_update. + delegates to parse_assembly_versions. Args: working_yaml (str): Path to the working YAML file (unused; accepted @@ -356,7 +358,7 @@ def incremental_update_wrapper( if len(paths) > 1: raise ValueError(f"More than one jsonl file found in {work_dir}") - results = run_incremental_historical_update( + results = parse_assembly_versions( new_jsonl=paths[0], previous_tsv=os.path.join(work_dir, "assembly_current.tsv.previous"), historical_tsv=os.path.join(work_dir, "assembly_historical.tsv"), @@ -372,8 +374,8 @@ def incremental_update_wrapper( def plugin() -> Parser: """Register the flow.""" return Parser( - name="UPDATE_HISTORICAL_INCREMENTAL", - func=incremental_update_wrapper, + name="PARSE_ASSEMBLY_VERSIONS", + func=parse_assembly_versions_wrapper, description="Daily incremental update of historical assembly records.", ) @@ -383,7 +385,7 @@ def plugin() -> Parser: [required(INPUT_PATH), required(PREVIOUS_TSV), required(HISTORICAL_TSV)], description="Daily incremental update of historical assembly records", ) - results = run_incremental_historical_update( + results = parse_assembly_versions( new_jsonl=args.input_path, previous_tsv=args.previous_tsv, historical_tsv=args.historical_tsv, @@ -398,4 +400,4 @@ def plugin() -> Parser: f" Action needed: {results['missing_versions_count']} missing versions." ) print(f" Written to: {missing_json_path}") - print(" Run: python -m flows.parsers.backfill_missing_versions") + print(" Run: python -m flows.updaters.update_assembly_versions") diff --git a/flows/parsers/backfill_missing_versions.py b/flows/updaters/update_assembly_versions.py similarity index 69% rename from flows/parsers/backfill_missing_versions.py rename to flows/updaters/update_assembly_versions.py index d153097..fb86f9d 100644 --- a/flows/parsers/backfill_missing_versions.py +++ b/flows/updaters/update_assembly_versions.py @@ -1,12 +1,12 @@ -"""Targeted backfill for assembly versions missing from historical records. +"""Fetch assembly versions missing from historical records. -Run this when the incremental updater reports assemblies whose previous +Run this when parse_assembly_versions reports assemblies whose previous version was absent from the previous parsed TSV. Fetches only the specified -missing versions from NCBI, parses them, and merges the result into the +missing versions from NCBI FTP, parses them, and merges the result into the existing assembly_historical.tsv. Usage: - python -m flows.parsers.backfill_missing_versions \\ + python -m flows.updaters.update_assembly_versions \\ --missing_json tmp/missing_versions.json \\ --yaml_path configs/assembly_historical.types.yaml \\ --work_dir tmp @@ -19,11 +19,10 @@ from typing import Optional from flows.lib import utils -from flows.lib.conditional_import import flow +from flows.lib.conditional_import import emit_event, flow from flows.lib.shared_args import WORK_DIR, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args from flows.lib.shared_args import required -from flows.lib.utils import Parser from flows.parsers.parse_backfill_historical_versions import ( find_all_assembly_versions, parse_historical_version, @@ -35,7 +34,7 @@ MISSING_JSON = { "flags": ["-m", "--missing_json"], "keys": { - "help": "Path to the missing_versions.json produced by the incremental updater.", + "help": "Path to the missing_versions.json produced by parse_assembly_versions.", "type": str, }, } @@ -44,8 +43,8 @@ def load_missing_versions(missing_json: str) -> list[dict]: """Load the list of missing versions from a JSON file. - The file is written by run_incremental_historical_update when it - encounters assemblies with no matching previous version in the parsed TSV. + The file is written by parse_assembly_versions when it encounters + assemblies with no matching previous version in the parsed TSV. Args: missing_json (str): Path to missing_versions.json. @@ -81,7 +80,7 @@ def load_existing_historical(historical_tsv: str) -> dict[str, dict]: @flow(log_prints=True) -def backfill_missing_versions( +def update_assembly_versions( missing_json: str, yaml_path: str, work_dir: str = ".", @@ -91,11 +90,10 @@ def backfill_missing_versions( For each entry in missing_json, discovers all versions of that assembly via NCBI FTP, fetches metadata for the specific missing version, parses it through the standard GenomeHubs pipeline, and merges the result into the - existing assembly_historical.tsv. + existing assembly_historical.tsv. Emits a completion event on finish. Args: - missing_json (str): Path to missing_versions.json from the incremental - updater. + missing_json (str): Path to missing_versions.json from parse_assembly_versions. yaml_path (str): Path to assembly_historical.types.yaml. work_dir (str): Working directory for caches and output. """ @@ -105,6 +103,14 @@ def backfill_missing_versions( missing = load_missing_versions(missing_json) if not missing: print("No missing versions to backfill.") + emit_event( + event="update.assembly_versions.completed", + resource={ + "prefect.resource.id": f"update.assembly_versions.{work_dir}", + "prefect.resource.type": "assembly.versions", + }, + payload={"succeeded": 0, "failed": 0, "status": "no_op"}, + ) return historical_tsv = config.meta["file_name"] @@ -117,7 +123,7 @@ def backfill_missing_versions( separator = "=" * 80 print(f"\n{separator}") - print("MISSING VERSION BACKFILL") + print("ASSEMBLY VERSION UPDATE") print(f"{separator}") print(f" Missing entries to process: {total}") print(f" Merging into: {historical_tsv}") @@ -170,7 +176,7 @@ def backfill_missing_versions( write_to_tsv(parsed, config) print(f"\n{separator}") - print("MISSING VERSION BACKFILL COMPLETE") + print("ASSEMBLY VERSION UPDATE COMPLETE") print(f"{separator}") print(f" Succeeded: {succeeded}/{total}") if failed > 0: @@ -178,54 +184,22 @@ def backfill_missing_versions( print(f" Total records in {historical_tsv}: {len(parsed)}") print(f"{separator}\n") - -def backfill_missing_versions_wrapper( - working_yaml: str, - work_dir: str, - append: bool, - data_freeze_path: Optional[str] = None, - **kwargs, -) -> None: - """Wrapper matching the fetch_parse_validate parser signature. - - Locates missing_versions.json in work_dir and delegates to - backfill_missing_versions. - - Args: - working_yaml (str): Path to the working YAML file. - work_dir (str): Path to the working directory containing - missing_versions.json. - append (bool): Unused; accepted for pipeline compatibility. - data_freeze_path (str, optional): Unused; accepted for pipeline - compatibility. - **kwargs: Additional keyword arguments passed by the pipeline. - """ - missing_json_path = os.path.join(work_dir, "missing_versions.json") - if not Path(missing_json_path).exists(): - raise FileNotFoundError(f"No missing_versions.json found in {work_dir}") - - backfill_missing_versions( - missing_json=missing_json_path, - yaml_path=working_yaml, - work_dir=work_dir, - ) - - -def plugin() -> Parser: - """Register the flow.""" - return Parser( - name="BACKFILL_MISSING_VERSIONS", - func=backfill_missing_versions_wrapper, - description="Targeted backfill for assembly versions missing from historical records.", + emit_event( + event="update.assembly_versions.completed", + resource={ + "prefect.resource.id": f"update.assembly_versions.{work_dir}", + "prefect.resource.type": "assembly.versions", + }, + payload={"succeeded": succeeded, "failed": failed, "status": "success"}, ) if __name__ == "__main__": args = _parse_args( [required(MISSING_JSON), required(YAML_PATH), WORK_DIR], - description="Targeted backfill for assembly versions missing from historical records", + description="Fetch assembly versions missing from historical records", ) - backfill_missing_versions( + update_assembly_versions( missing_json=args.missing_json, yaml_path=args.yaml_path, work_dir=args.work_dir, diff --git a/tests/test_incremental_update.py b/tests/test_assembly_versions.py similarity index 88% rename from tests/test_incremental_update.py rename to tests/test_assembly_versions.py index 9d05642..8dbd37e 100644 --- a/tests/test_incremental_update.py +++ b/tests/test_assembly_versions.py @@ -1,13 +1,13 @@ -"""Tests for update_historical_incremental.py and backfill_missing_versions.py +"""Tests for parse_assembly_versions.py and update_assembly_versions.py Covers: - Loading and indexing previous parsed TSV results - Building superseded and missing-version records - Core supersession detection logic (superseded, missing-with-gap, new-series, v1-skip) - Appending to historical TSV with deduplication -- Incremental orchestrator flow behaviour -- Loading existing historical TSV (backfill helper) -- Backfill flow: version selection and TSV merge +- Parser orchestrator flow behaviour +- Loading existing historical TSV (updater helper) +- Updater flow: version selection and TSV merge """ import csv @@ -23,23 +23,20 @@ os.environ["SKIP_PREFECT"] = "true" -from flows.parsers import ( # noqa: E402 - backfill_missing_versions as backfill_module, - update_historical_incremental as incremental_module, -) from flows.lib.utils import Parser # noqa: E402 -from flows.parsers.backfill_missing_versions import ( # noqa: E402 - backfill_missing_versions, - backfill_missing_versions_wrapper, - load_existing_historical, -) -from flows.parsers.update_historical_incremental import ( # noqa: E402 +from flows.parsers import parse_assembly_versions as incremental_module # noqa: E402 +from flows.parsers.parse_assembly_versions import ( # noqa: E402 append_superseded_to_tsv, build_missing_version_record, build_superseded_row, identify_newly_superseded, load_previous_parsed_by_base, - run_incremental_historical_update, + parse_assembly_versions, +) +from flows.updaters import update_assembly_versions as backfill_module # noqa: E402 +from flows.updaters.update_assembly_versions import ( # noqa: E402 + load_existing_historical, + update_assembly_versions, ) @@ -296,12 +293,12 @@ def test_no_op_when_empty_list(self, tmp_path): # --------------------------------------------------------------------------- class TestIncrementalOrchestrator: - """run_incremental_historical_update orchestrator behaviour.""" + """parse_assembly_versions orchestrator behaviour.""" def test_no_previous_tsv_returns_empty_result(self, tmp_path): jsonl = tmp_path / "new.jsonl" write_jsonl(jsonl, [{"accession": "GCA_000222935.2"}]) - result = run_incremental_historical_update( + result = parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(tmp_path / "nope.tsv"), historical_tsv=str(tmp_path / "historical.tsv"), @@ -319,7 +316,7 @@ def test_one_superseded_produces_correct_counts(self, tmp_path): write_jsonl(jsonl, [ {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} ]) - result = run_incremental_historical_update( + result = parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(previous_tsv), historical_tsv=str(tmp_path / "historical.tsv"), @@ -337,7 +334,7 @@ def test_missing_version_detected_in_orchestrator_result(self, tmp_path): write_jsonl(jsonl, [ {"accession": "GCA_000222935.3", "releaseDate": "2024-06-01"} ]) - result = run_incremental_historical_update( + result = parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(previous_tsv), historical_tsv=str(tmp_path / "historical.tsv"), @@ -356,7 +353,7 @@ def test_historical_tsv_written(self, tmp_path): {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} ]) historical_tsv = tmp_path / "historical.tsv" - run_incremental_historical_update( + parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(previous_tsv), historical_tsv=str(historical_tsv), @@ -403,7 +400,7 @@ def test_row_data_preserved(self, tmp_path): # --------------------------------------------------------------------------- class TestBackfillMissingVersionsFlow: - """backfill_missing_versions selects the right version and merges into TSV.""" + """update_assembly_versions selects the right version and merges into TSV.""" def _write_missing_json(self, tmp_path, entries): path = tmp_path / "missing.json" @@ -435,7 +432,7 @@ def test_correct_version_selected( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -469,7 +466,7 @@ def test_merges_with_existing_historical( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -498,7 +495,7 @@ def test_no_write_when_version_not_found_in_ftp( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -525,7 +522,7 @@ def test_no_versions_returned_from_ftp( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -565,7 +562,7 @@ def test_partial_parse_failure_writes_successful_rows( "new_accession": "GCA_000412225.2", }, ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -586,7 +583,7 @@ def test_empty_missing_json_no_op( meta={"file_name": str(tmp_path / "historical.tsv")} ) missing_json = self._write_missing_json(tmp_path, []) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -596,49 +593,14 @@ def test_empty_missing_json_no_op( # --------------------------------------------------------------------------- -# TestBackfillMissingVersionsWrapper -# --------------------------------------------------------------------------- - -class TestBackfillMissingVersionsWrapper: - """backfill_missing_versions_wrapper delegates to the flow correctly.""" - - @patch.object(backfill_module, "backfill_missing_versions") - def test_delegates_to_flow(self, mock_flow, tmp_path): - """Wrapper locates missing_versions.json and passes it to the flow.""" - missing_json = tmp_path / "missing_versions.json" - missing_json.write_text("[]", encoding="utf-8") - - backfill_missing_versions_wrapper( - working_yaml=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - append=False, - ) - - mock_flow.assert_called_once_with( - missing_json=str(missing_json), - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - def test_raises_when_missing_json_absent(self, tmp_path): - """Wrapper raises FileNotFoundError if missing_versions.json does not exist.""" - with pytest.raises(FileNotFoundError): - backfill_missing_versions_wrapper( - working_yaml=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - append=False, - ) - - -# --------------------------------------------------------------------------- -# TestBackfillMissingVersionsPlugin +# TestParseAssemblyVersionsPlugin # --------------------------------------------------------------------------- -class TestBackfillMissingVersionsPlugin: +class TestParseAssemblyVersionsPlugin: """plugin() returns a correctly configured Parser.""" def test_plugin_returns_parser(self): - result = backfill_module.plugin() + result = incremental_module.plugin() assert isinstance(result, Parser) - assert result.name == "BACKFILL_MISSING_VERSIONS" - assert result.func is backfill_missing_versions_wrapper + assert result.name == "PARSE_ASSEMBLY_VERSIONS" + assert result.func is incremental_module.parse_assembly_versions_wrapper From 2a5ef51950bba0cf65a51dd30ecd5e85fc72ced6 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 5 Jun 2026 09:38:51 -0700 Subject: [PATCH 3/3] updates on regulating parser and updater behavior. --- flows/lib/assembly_versions_utils.py | 228 +++++++++++++++ flows/lib/shared_args.py | 8 + flows/parsers/parse_assembly_versions.py | 49 ++-- .../parse_backfill_historical_versions.py | 227 +-------------- flows/updaters/update_assembly_versions.py | 156 +++-------- tests/test_assembly_versions.py | 261 ++++++------------ 6 files changed, 390 insertions(+), 539 deletions(-) create mode 100644 flows/lib/assembly_versions_utils.py diff --git a/flows/lib/assembly_versions_utils.py b/flows/lib/assembly_versions_utils.py new file mode 100644 index 0000000..070e7c9 --- /dev/null +++ b/flows/lib/assembly_versions_utils.py @@ -0,0 +1,228 @@ +"""Shared utilities for assembly version discovery and fetching. + +Used by both the backfill parser and the incremental updater to discover +assembly versions via NCBI FTP and fetch per-version metadata. +""" + +import json +import os +import re +import time + +from flows.lib import utils + +ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$") + + +def parse_version(accession: str) -> int: + """Extract the version number from a dotted accession string. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + int: Version number (defaults to 1 if no dot-suffix). + """ + parts = accession.split(".") + return int(parts[1]) if len(parts) > 1 else 1 + + +def parse_accession(accession: str) -> tuple[str, int]: + """Split an accession into its base and version components. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + tuple: (base_accession, version_number). + """ + parts = accession.split(".") + return parts[0], int(parts[1]) if len(parts) > 1 else 1 + + +def setup_cache_directories(work_dir: str) -> None: + """Create cache directory structure under work_dir. + + Args: + work_dir (str): Path to the working directory. + """ + for subdir in ("version_discovery", "metadata"): + os.makedirs( + os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True + ) + + +def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str: + """Generate a human-readable cache file path. + + Args: + work_dir (str): Path to the working directory. + cache_type (str): Cache category (version_discovery or metadata). + identifier (str): Accession string used as the filename stem. + + Returns: + str: Path to the JSON cache file. + """ + safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier) + return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json") + + +def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict: + """Load data from cache if it exists and is recent enough. + + Args: + cache_path (str): Path to the cache JSON file. + max_age_days (int): Maximum acceptable age in days. + + Returns: + dict: Cached data, or empty dict on miss/expiry. + """ + try: + if os.path.exists(cache_path): + cache_age = time.time() - os.path.getmtime(cache_path) + if cache_age < (max_age_days * 24 * 3600): + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f" Warning: Could not load cache from {cache_path}: {e}") + return {} + + +def save_to_cache(cache_path: str, data: dict) -> None: + """Save data to a cache file, creating parent dirs as needed. + + Args: + cache_path (str): Path to the cache JSON file. + data (dict): Data to persist. + """ + try: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f" Warning: Could not save cache to {cache_path}: {e}") + + +def discover_version_accessions(base_accession: str, work_dir: str) -> list[str]: + """Discover all versioned accessions for a base assembly via NCBI FTP. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: Sorted list of versioned accession strings. + """ + import requests + + base_match = re.match(r"(GC[AF]_\d+)", base_accession) + if not base_match: + return [] + + base = base_match.group(1) + setup_cache_directories(work_dir) + cache_path = get_cache_path(work_dir, "version_discovery", base) + cached = load_from_cache(cache_path, max_age_days=7) + + if cached and "accessions" in cached: + print(f" Using cached version list for {base}") + return cached["accessions"] + + print(f" Discovering versions for {base} via FTP") + ftp_url = ( + f"https://ftp.ncbi.nlm.nih.gov/genomes/all/" + f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" + ) + + try: + response = requests.get(ftp_url, timeout=30) + if response.status_code != 200: + print(f" Warning: FTP query failed for {base}") + return [] + except Exception as e: + print(f" Error querying FTP for {base}: {e}") + return [] + + version_pattern = rf"{re.escape(base)}\.\d+" + accessions = sorted(set(re.findall(version_pattern, response.text))) + + save_to_cache(cache_path, { + "accessions": accessions, + "base_accession": base, + "ftp_url": ftp_url, + }) + return accessions + + +def fetch_version_metadata(version_acc: str, work_dir: str) -> dict: + """Fetch NCBI datasets metadata for a single assembly version. + + Uses utils.run_quoted to safely invoke the datasets CLI. Results are + cached for 30 days. + + Args: + version_acc (str): Versioned accession (e.g. GCA_000002035.1). + work_dir (str): Working directory for cache storage. + + Returns: + dict: Metadata dict, or empty dict on failure. + """ + cache_path = get_cache_path(work_dir, "metadata", version_acc) + cached = load_from_cache(cache_path, max_age_days=30) + + if cached and "metadata" in cached: + return cached["metadata"] + + if not ACCESSION_PATTERN.match(version_acc): + print(f" Skipping unexpected accession format: {version_acc}") + return {} + + cmd = [ + "datasets", "summary", "genome", "accession", + version_acc, "--as-json-lines", + ] + try: + result = utils.run_quoted( + cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="ignore", + timeout=60, + ) + if result.returncode == 0 and result.stdout and result.stdout.strip(): + version_data = json.loads(result.stdout.strip()) + save_to_cache(cache_path, { + "metadata": version_data, + "cached_at": time.time(), + }) + return version_data + + print(f" Warning: No metadata for {version_acc}") + except Exception as e: + print(f" Warning: Error fetching {version_acc}: {e}") + + return {} + + +def find_all_assembly_versions(base_accession: str, work_dir: str) -> list[dict]: + """Discover all versions and fetch metadata for each. + + Delegates to discover_version_accessions for FTP discovery and + fetch_version_metadata for per-version metadata retrieval. Both layers + use independent caches. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: List of metadata dicts, one per version found. + """ + accessions = discover_version_accessions(base_accession, work_dir) + versions = [] + for version_acc in accessions: + metadata = fetch_version_metadata(version_acc, work_dir) + if metadata: + versions.append(metadata) + return versions diff --git a/flows/lib/shared_args.py b/flows/lib/shared_args.py index 4d5d801..b8087fc 100644 --- a/flows/lib/shared_args.py +++ b/flows/lib/shared_args.py @@ -182,6 +182,14 @@ }, } +MISSING_JSON = { + "flags": ["-m", "--missing_json"], + "keys": { + "help": "Path to missing_versions.json produced by parse_assembly_versions.", + "type": str, + }, +} + WORK_DIR = { "flags": ["-w", "--work_dir"], "keys": { diff --git a/flows/parsers/parse_assembly_versions.py b/flows/parsers/parse_assembly_versions.py index 01243fd..5b2f4bb 100644 --- a/flows/parsers/parse_assembly_versions.py +++ b/flows/parsers/parse_assembly_versions.py @@ -6,9 +6,7 @@ Usage: python -m flows.parsers.parse_assembly_versions \\ - --input_path assembly_data_report.jsonl \\ - --previous_tsv assembly_current.tsv.previous \\ - --historical_tsv outputs/assembly_historical.tsv + --input_path assembly_data_report.jsonl """ import csv @@ -18,28 +16,29 @@ from pathlib import Path from typing import Optional +from flows.lib.assembly_versions_utils import parse_accession from flows.lib.conditional_import import flow from flows.lib.shared_args import INPUT_PATH from flows.lib.shared_args import parse_args as _parse_args from flows.lib.shared_args import required from flows.lib.utils import Parser -from flows.parsers.parse_backfill_historical_versions import parse_accession -PREVIOUS_TSV = { - "flags": ["-p", "--previous_tsv"], - "keys": { - "help": "Path to assembly_current.tsv from the previous run.", - "type": str, - }, -} +def derive_assembly_version_paths(input_path: str) -> tuple[str, str]: + """Derive previous_tsv and historical_tsv paths from the input JSONL path. -HISTORICAL_TSV = { - "flags": ["-H", "--historical_tsv"], - "keys": { - "help": "Path to assembly_historical.tsv to update.", - "type": str, - }, -} + Both files are assumed to live alongside the JSONL in the same directory, + following the same convention used by parse_ncbi_assemblies. + + Args: + input_path (str): Path to the current assembly_data_report.jsonl. + + Returns: + tuple: (previous_tsv, historical_tsv) absolute paths. + """ + work_dir = os.path.dirname(os.path.abspath(input_path)) + previous_tsv = os.path.join(work_dir, "assembly_current.tsv.previous") + historical_tsv = os.path.join(work_dir, "assembly_historical.tsv") + return previous_tsv, historical_tsv def load_previous_parsed_by_base(previous_tsv: str) -> dict[str, dict[int, dict]]: @@ -358,10 +357,11 @@ def parse_assembly_versions_wrapper( if len(paths) > 1: raise ValueError(f"More than one jsonl file found in {work_dir}") + previous_tsv, historical_tsv = derive_assembly_version_paths(paths[0]) results = parse_assembly_versions( new_jsonl=paths[0], - previous_tsv=os.path.join(work_dir, "assembly_current.tsv.previous"), - historical_tsv=os.path.join(work_dir, "assembly_historical.tsv"), + previous_tsv=previous_tsv, + historical_tsv=historical_tsv, ) if results["missing_versions_count"] > 0: @@ -382,18 +382,19 @@ def plugin() -> Parser: if __name__ == "__main__": args = _parse_args( - [required(INPUT_PATH), required(PREVIOUS_TSV), required(HISTORICAL_TSV)], + [required(INPUT_PATH)], description="Daily incremental update of historical assembly records", ) + previous_tsv, historical_tsv = derive_assembly_version_paths(args.input_path) results = parse_assembly_versions( new_jsonl=args.input_path, - previous_tsv=args.previous_tsv, - historical_tsv=args.historical_tsv, + previous_tsv=previous_tsv, + historical_tsv=historical_tsv, ) print(f"Summary: superseded={results['newly_superseded_count']}, " f"missing={results['missing_versions_count']}") if results["missing_versions_count"] > 0: - missing_json_path = Path(args.historical_tsv).parent / "missing_versions.json" + missing_json_path = Path(historical_tsv).parent / "missing_versions.json" with open(missing_json_path, "w", encoding="utf-8") as f: json.dump(results["missing_versions"], f, indent=2) print( diff --git a/flows/parsers/parse_backfill_historical_versions.py b/flows/parsers/parse_backfill_historical_versions.py index cc18b17..4b7acb0 100644 --- a/flows/parsers/parse_backfill_historical_versions.py +++ b/flows/parsers/parse_backfill_historical_versions.py @@ -12,17 +12,20 @@ import json import os -import re -import time from datetime import datetime from glob import glob from pathlib import Path from typing import Optional -import requests from genomehubs import utils as gh_utils from flows.lib import utils +from flows.lib.assembly_versions_utils import ( + find_all_assembly_versions, + parse_accession, + parse_version, + setup_cache_directories, +) from flows.lib.conditional_import import flow from flows.lib.shared_args import INPUT_PATH, WORK_DIR, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args @@ -34,198 +37,6 @@ write_to_tsv, ) -ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$") - - -def setup_cache_directories(work_dir: str): - """Create cache directory structure under work_dir. - - Args: - work_dir (str): Path to the working directory. - """ - for subdir in ("version_discovery", "metadata"): - os.makedirs( - os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True - ) - - -def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str: - """Generate a human-readable cache file path. - - Args: - work_dir (str): Path to the working directory. - cache_type (str): Cache category (version_discovery or metadata). - identifier (str): Accession string used as the filename stem. - - Returns: - str: Path to the JSON cache file. - """ - safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier) - return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json") - - -def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict: - """Load data from cache if it exists and is recent enough. - - Args: - cache_path (str): Path to the cache JSON file. - max_age_days (int): Maximum acceptable age in days. - - Returns: - dict: Cached data, or empty dict on miss/expiry. - """ - try: - if os.path.exists(cache_path): - cache_age = time.time() - os.path.getmtime(cache_path) - if cache_age < (max_age_days * 24 * 3600): - with open(cache_path, "r", encoding="utf-8") as f: - return json.load(f) - except Exception as e: - print(f" Warning: Could not load cache from {cache_path}: {e}") - return {} - - -def save_to_cache(cache_path: str, data: dict): - """Save data to a cache file, creating parent dirs as needed. - - Args: - cache_path (str): Path to the cache JSON file. - data (dict): Data to persist. - """ - try: - os.makedirs(os.path.dirname(cache_path), exist_ok=True) - with open(cache_path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - except Exception as e: - print(f" Warning: Could not save cache to {cache_path}: {e}") - - -def discover_version_accessions( - base_accession: str, work_dir: str -) -> list[str]: - """Discover all versioned accessions for a base assembly via NCBI FTP. - - Args: - base_accession (str): Full accession (e.g. GCA_000002035.3). - work_dir (str): Working directory for cache storage. - - Returns: - list: Sorted list of versioned accession strings. - """ - base_match = re.match(r"(GC[AF]_\d+)", base_accession) - if not base_match: - return [] - - base = base_match.group(1) - setup_cache_directories(work_dir) - cache_path = get_cache_path(work_dir, "version_discovery", base) - cached = load_from_cache(cache_path, max_age_days=7) - - if cached and "accessions" in cached: - print(f" Using cached version list for {base}") - return cached["accessions"] - - print(f" Discovering versions for {base} via FTP") - ftp_url = ( - f"https://ftp.ncbi.nlm.nih.gov/genomes/all/" - f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" - ) - - try: - response = requests.get(ftp_url, timeout=30) - if response.status_code != 200: - print(f" Warning: FTP query failed for {base}") - return [] - except Exception as e: - print(f" Error querying FTP for {base}: {e}") - return [] - - version_pattern = rf"{re.escape(base)}\.\d+" - accessions = sorted(set(re.findall(version_pattern, response.text))) - - save_to_cache(cache_path, { - "accessions": accessions, - "base_accession": base, - "ftp_url": ftp_url, - }) - return accessions - - -def fetch_version_metadata(version_acc: str, work_dir: str) -> dict: - """Fetch NCBI datasets metadata for a single assembly version. - - Uses utils.run_quoted to safely invoke the datasets CLI. Results are - cached for 30 days. - - Args: - version_acc (str): Versioned accession (e.g. GCA_000002035.1). - work_dir (str): Working directory for cache storage. - - Returns: - dict: Metadata dict, or empty dict on failure. - """ - cache_path = get_cache_path(work_dir, "metadata", version_acc) - cached = load_from_cache(cache_path, max_age_days=30) - - if cached and "metadata" in cached: - return cached["metadata"] - - if not ACCESSION_PATTERN.match(version_acc): - print(f" Skipping unexpected accession format: {version_acc}") - return {} - - cmd = [ - "datasets", "summary", "genome", "accession", - version_acc, "--as-json-lines", - ] - try: - result = utils.run_quoted( - cmd, - capture_output=True, - text=True, - encoding="utf-8", - errors="ignore", - timeout=60, - ) - if result.returncode == 0 and result.stdout and result.stdout.strip(): - version_data = json.loads(result.stdout.strip()) - save_to_cache(cache_path, { - "metadata": version_data, - "cached_at": time.time(), - }) - return version_data - - print(f" Warning: No metadata for {version_acc}") - except Exception as e: - print(f" Warning: Error fetching {version_acc}: {e}") - - return {} - - -def find_all_assembly_versions( - base_accession: str, work_dir: str -) -> list[dict]: - """Discover all versions and fetch metadata for each. - - Delegates to discover_version_accessions for FTP discovery and - fetch_version_metadata for per-version metadata retrieval. Both layers - use independent caches. - - Args: - base_accession (str): Full accession (e.g. GCA_000002035.3). - work_dir (str): Working directory for cache storage. - - Returns: - list: List of metadata dicts, one per version found. - """ - accessions = discover_version_accessions(base_accession, work_dir) - versions = [] - for version_acc in accessions: - metadata = fetch_version_metadata(version_acc, work_dir) - if metadata: - versions.append(metadata) - return versions - def parse_historical_version( version_data: dict, @@ -269,32 +80,6 @@ def parse_historical_version( return gh_utils.parse_report_values(config.parse_fns, processed_report) -def parse_version(accession: str) -> int: - """Extract version number from a dotted accession string. - - Args: - accession (str): e.g. GCA_000002035.3 - - Returns: - int: Version number (defaults to 1 if no dot-suffix). - """ - parts = accession.split(".") - return int(parts[1]) if len(parts) > 1 else 1 - - -def parse_accession(accession: str) -> tuple[str, int]: - """Split an accession into its base and version components. - - Args: - accession (str): e.g. GCA_000002035.3 - - Returns: - tuple: (base_accession, version_number). - """ - parts = accession.split(".") - return parts[0], int(parts[1]) if len(parts) > 1 else 1 - - def derive_checkpoint_path( input_path: str, yaml_path: str, work_dir: str ) -> str: diff --git a/flows/updaters/update_assembly_versions.py b/flows/updaters/update_assembly_versions.py index fb86f9d..b7b36e2 100644 --- a/flows/updaters/update_assembly_versions.py +++ b/flows/updaters/update_assembly_versions.py @@ -1,43 +1,29 @@ -"""Fetch assembly versions missing from historical records. +"""Fetch metadata for assembly versions missing from historical records. Run this when parse_assembly_versions reports assemblies whose previous -version was absent from the previous parsed TSV. Fetches only the specified -missing versions from NCBI FTP, parses them, and merges the result into the -existing assembly_historical.tsv. +version was absent from the previous parsed TSV. Fetches raw NCBI metadata +for each missing accession and writes it to a JSONL file for downstream +parsing by parse_backfill_historical_versions. Usage: python -m flows.updaters.update_assembly_versions \\ --missing_json tmp/missing_versions.json \\ - --yaml_path configs/assembly_historical.types.yaml \\ --work_dir tmp """ -import csv import json import os -from pathlib import Path -from typing import Optional -from flows.lib import utils +from flows.lib.assembly_versions_utils import ( + fetch_version_metadata, + setup_cache_directories, +) from flows.lib.conditional_import import emit_event, flow -from flows.lib.shared_args import WORK_DIR, YAML_PATH +from flows.lib.shared_args import MISSING_JSON, WORK_DIR from flows.lib.shared_args import parse_args as _parse_args from flows.lib.shared_args import required -from flows.parsers.parse_backfill_historical_versions import ( - find_all_assembly_versions, - parse_historical_version, - parse_version, - setup_cache_directories, -) -from flows.parsers.parse_ncbi_assemblies import write_to_tsv -MISSING_JSON = { - "flags": ["-m", "--missing_json"], - "keys": { - "help": "Path to the missing_versions.json produced by parse_assembly_versions.", - "type": str, - }, -} +OUTPUT_JSONL = "missing_assembly_versions.jsonl" def load_missing_versions(missing_json: str) -> list[dict]: @@ -57,131 +43,70 @@ def load_missing_versions(missing_json: str) -> list[dict]: return json.load(f) -def load_existing_historical(historical_tsv: str) -> dict[str, dict]: - """Load an existing assembly_historical.tsv keyed by genbankAccession. - - Args: - historical_tsv (str): Path to the existing historical TSV file. - - Returns: - dict: Rows keyed by genbankAccession, or empty dict if the file is absent. - """ - existing: dict[str, dict] = {} - if not Path(historical_tsv).exists(): - return existing - - with open(historical_tsv, encoding="utf-8") as f: - for row in csv.DictReader(f, delimiter="\t"): - acc = row.get("genbankAccession", "") - if acc: - existing[acc] = dict(row) - - return existing - - @flow(log_prints=True) def update_assembly_versions( missing_json: str, - yaml_path: str, work_dir: str = ".", ) -> None: - """Fetch and parse assembly versions missing from the historical TSV. + """Fetch metadata for missing assembly versions and write to JSONL. - For each entry in missing_json, discovers all versions of that assembly - via NCBI FTP, fetches metadata for the specific missing version, parses it - through the standard GenomeHubs pipeline, and merges the result into the - existing assembly_historical.tsv. Emits a completion event on finish. + For each entry in missing_json, fetches the new accession's raw metadata + from NCBI datasets and writes the results to a JSONL file. The JSONL is + consumed by parse_backfill_historical_versions. Emits a completion event + on finish. Args: missing_json (str): Path to missing_versions.json from parse_assembly_versions. - yaml_path (str): Path to assembly_historical.types.yaml. - work_dir (str): Working directory for caches and output. + work_dir (str): Working directory for cache and JSONL output. """ setup_cache_directories(work_dir) - config = utils.load_config(config_file=yaml_path) - missing = load_missing_versions(missing_json) + if not missing: - print("No missing versions to backfill.") + print("No missing versions to fetch.") emit_event( event="update.assembly_versions.completed", resource={ "prefect.resource.id": f"update.assembly_versions.{work_dir}", "prefect.resource.type": "assembly.versions", }, - payload={"succeeded": 0, "failed": 0, "status": "no_op"}, + payload={"fetched": 0, "failed": 0, "status": "no_op"}, ) return - historical_tsv = config.meta["file_name"] - existing = load_existing_historical(historical_tsv) - parsed = dict(existing) - + output_jsonl = os.path.join(work_dir, OUTPUT_JSONL) total = len(missing) - succeeded = 0 + fetched = 0 failed = 0 separator = "=" * 80 print(f"\n{separator}") print("ASSEMBLY VERSION UPDATE") print(f"{separator}") - print(f" Missing entries to process: {total}") - print(f" Merging into: {historical_tsv}") - print(f" Existing records: {len(existing)}") + print(f" Entries to fetch: {total}") + print(f" Output JSONL: {output_jsonl}") print(f"{separator}\n") - for i, entry in enumerate(missing): - base_acc = entry["base_accession"] - missing_version = entry["missing_version"] - new_accession = entry["new_accession"] - target_acc = f"{base_acc}.{missing_version}" - - print(f"[{i + 1}/{total}] {target_acc}") - - all_versions = find_all_assembly_versions(new_accession, work_dir) - if not all_versions: - print(" Warning: No versions found via FTP — skipping.") - failed += 1 - continue - - version_data = next( - (v for v in all_versions if parse_version(v.get("accession", "")) == missing_version), - None, - ) - if version_data is None: - print(f" Warning: v{missing_version} not found in FTP listing — skipping.") - failed += 1 - continue - - try: - print(f" Parsing v{missing_version}...", end=" ", flush=True) - row = parse_historical_version( - version_data=version_data, - config=config, - base_accession=base_acc, - version_num=missing_version, - current_accession=new_accession, - ) - genbank_acc = row.get("genbankAccession", target_acc) - parsed[genbank_acc] = row - succeeded += 1 - print("done") - except Exception as e: - print(f"failed ({e})") - failed += 1 - continue - - if succeeded > 0: - print(f"\nWriting {len(parsed)} records to {historical_tsv}...") - write_to_tsv(parsed, config) + with open(output_jsonl, "w", encoding="utf-8") as f: + for i, entry in enumerate(missing): + new_accession = entry["new_accession"] + print(f"[{i + 1}/{total}] Fetching {new_accession}...", end=" ", flush=True) + metadata = fetch_version_metadata(new_accession, work_dir) + if metadata: + f.write(json.dumps(metadata) + "\n") + fetched += 1 + print("done") + else: + failed += 1 + print("failed (no metadata returned)") print(f"\n{separator}") print("ASSEMBLY VERSION UPDATE COMPLETE") print(f"{separator}") - print(f" Succeeded: {succeeded}/{total}") + print(f" Fetched: {fetched}/{total}") if failed > 0: - print(f" Failed: {failed}/{total}") - print(f" Total records in {historical_tsv}: {len(parsed)}") + print(f" Failed: {failed}/{total}") + print(f" Output: {output_jsonl}") print(f"{separator}\n") emit_event( @@ -190,17 +115,16 @@ def update_assembly_versions( "prefect.resource.id": f"update.assembly_versions.{work_dir}", "prefect.resource.type": "assembly.versions", }, - payload={"succeeded": succeeded, "failed": failed, "status": "success"}, + payload={"fetched": fetched, "failed": failed, "status": "success"}, ) if __name__ == "__main__": args = _parse_args( - [required(MISSING_JSON), required(YAML_PATH), WORK_DIR], + [required(MISSING_JSON), WORK_DIR], description="Fetch assembly versions missing from historical records", ) update_assembly_versions( missing_json=args.missing_json, - yaml_path=args.yaml_path, work_dir=args.work_dir, ) diff --git a/tests/test_assembly_versions.py b/tests/test_assembly_versions.py index 8dbd37e..f9317c7 100644 --- a/tests/test_assembly_versions.py +++ b/tests/test_assembly_versions.py @@ -6,8 +6,7 @@ - Core supersession detection logic (superseded, missing-with-gap, new-series, v1-skip) - Appending to historical TSV with deduplication - Parser orchestrator flow behaviour -- Loading existing historical TSV (updater helper) -- Updater flow: version selection and TSV merge +- Updater flow: fetch metadata and write JSONL """ import csv @@ -29,13 +28,14 @@ append_superseded_to_tsv, build_missing_version_record, build_superseded_row, + derive_assembly_version_paths, identify_newly_superseded, load_previous_parsed_by_base, parse_assembly_versions, ) -from flows.updaters import update_assembly_versions as backfill_module # noqa: E402 +from flows.updaters import update_assembly_versions as updater_module # noqa: E402 from flows.updaters.update_assembly_versions import ( # noqa: E402 - load_existing_historical, + load_missing_versions, update_assembly_versions, ) @@ -365,42 +365,33 @@ def test_historical_tsv_written(self, tmp_path): # --------------------------------------------------------------------------- -# TestLoadExistingHistorical +# TestDeriveAssemblyVersionPaths # --------------------------------------------------------------------------- -class TestLoadExistingHistorical: - """load_existing_historical indexes rows by genbankAccession.""" +class TestDeriveAssemblyVersionPaths: + """derive_assembly_version_paths produces correct sibling file paths.""" - def test_missing_file_returns_empty(self, tmp_path): - result = load_existing_historical(str(tmp_path / "nope.tsv")) - assert result == {} + def test_previous_tsv_in_same_directory(self, tmp_path): + jsonl = tmp_path / "assembly_data_report.jsonl" + jsonl.touch() + previous_tsv, _ = derive_assembly_version_paths(str(jsonl)) + assert os.path.dirname(previous_tsv) == str(tmp_path) + assert previous_tsv.endswith("assembly_current.tsv.previous") - def test_rows_keyed_by_genbank_accession(self, tmp_path): - tsv = tmp_path / "historical.tsv" - write_tsv(tsv, [ - {"genbankAccession": "GCA_000222935.1", "version_status": "superseded"}, - {"genbankAccession": "GCA_000412225.1", "version_status": "superseded"}, - ]) - result = load_existing_historical(str(tsv)) - assert "GCA_000222935.1" in result - assert "GCA_000412225.1" in result - assert len(result) == 2 - - def test_row_data_preserved(self, tmp_path): - tsv = tmp_path / "historical.tsv" - write_tsv(tsv, [ - {"genbankAccession": "GCA_000222935.1", "version_status": "superseded"} - ]) - result = load_existing_historical(str(tsv)) - assert result["GCA_000222935.1"]["version_status"] == "superseded" + def test_historical_tsv_in_same_directory(self, tmp_path): + jsonl = tmp_path / "assembly_data_report.jsonl" + jsonl.touch() + _, historical_tsv = derive_assembly_version_paths(str(jsonl)) + assert os.path.dirname(historical_tsv) == str(tmp_path) + assert historical_tsv.endswith("assembly_historical.tsv") # --------------------------------------------------------------------------- -# TestBackfillMissingVersionsFlow +# TestUpdateAssemblyVersionsFlow # --------------------------------------------------------------------------- -class TestBackfillMissingVersionsFlow: - """update_assembly_versions selects the right version and merges into TSV.""" +class TestUpdateAssemblyVersionsFlow: + """update_assembly_versions fetches metadata and writes JSONL.""" def _write_missing_json(self, tmp_path, entries): path = tmp_path / "missing.json" @@ -408,188 +399,94 @@ def _write_missing_json(self, tmp_path, entries): json.dump(entries, f) return str(path) - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_correct_version_selected( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """Only the requested missing version should be parsed, not all versions.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": str(tmp_path / "historical.tsv")} - ) - mock_find.return_value = [ - {"accession": "GCA_000222935.1"}, - {"accession": "GCA_000222935.2"}, - ] - mock_parse.return_value = {"genbankAccession": "GCA_000222935.1"} - - missing_json = self._write_missing_json(tmp_path, [ - { - "base_accession": "GCA_000222935", - "missing_version": 1, - "new_accession": "GCA_000222935.2", - } - ]) - update_assembly_versions( - missing_json=missing_json, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - mock_parse.assert_called_once() - parsed_version_data = mock_parse.call_args[1]["version_data"] - assert parsed_version_data["accession"] == "GCA_000222935.1" - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_merges_with_existing_historical( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """New rows must be merged with existing historical rows before writing.""" - historical_tsv = tmp_path / "historical.tsv" - write_tsv(historical_tsv, [ - {"genbankAccession": "GCA_000412225.1", "version_status": "superseded"} - ]) - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": str(historical_tsv)} - ) - mock_find.return_value = [{"accession": "GCA_000222935.1"}] - mock_parse.return_value = {"genbankAccession": "GCA_000222935.1"} - + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_correct_accession_fetched(self, mock_fetch, mock_setup, tmp_path): + """new_accession from missing_json should be passed to fetch_version_metadata.""" + mock_fetch.return_value = {"accession": "GCA_000222935.2"} missing_json = self._write_missing_json(tmp_path, [ { "base_accession": "GCA_000222935", "missing_version": 1, + "new_version": 2, "new_accession": "GCA_000222935.2", } ]) - update_assembly_versions( - missing_json=missing_json, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - mock_write.assert_called_once() - written_parsed = mock_write.call_args[0][0] - assert "GCA_000412225.1" in written_parsed - assert "GCA_000222935.1" in written_parsed - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "utils") - def test_no_write_when_version_not_found_in_ftp( - self, mock_utils, mock_find, mock_write, tmp_path - ): - """If the FTP listing does not include the missing version, skip silently.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": str(tmp_path / "historical.tsv")} - ) - mock_find.return_value = [{"accession": "GCA_000222935.2"}] - + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + mock_fetch.assert_called_once_with("GCA_000222935.2", str(tmp_path)) + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_jsonl_written_with_fetched_records(self, mock_fetch, mock_setup, tmp_path): + """Fetched metadata should be written as JSONL lines.""" + mock_fetch.return_value = {"accession": "GCA_000222935.2", "someField": "value"} missing_json = self._write_missing_json(tmp_path, [ { "base_accession": "GCA_000222935", "missing_version": 1, + "new_version": 2, "new_accession": "GCA_000222935.2", } ]) - update_assembly_versions( - missing_json=missing_json, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - mock_write.assert_not_called() - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "utils") - def test_no_versions_returned_from_ftp( - self, mock_utils, mock_find, mock_parse, mock_write, tmp_path - ): - """If FTP returns an empty list, skip gracefully without parsing or writing.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": str(tmp_path / "historical.tsv")} - ) - mock_find.return_value = [] - + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + jsonl_path = tmp_path / "missing_assembly_versions.jsonl" + assert jsonl_path.exists() + records = [json.loads(line) for line in jsonl_path.read_text().strip().splitlines()] + assert len(records) == 1 + assert records[0]["accession"] == "GCA_000222935.2" + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_fetch_failure_skipped(self, mock_fetch, mock_setup, tmp_path): + """If fetch_version_metadata returns empty dict, entry is omitted from JSONL.""" + mock_fetch.return_value = {} missing_json = self._write_missing_json(tmp_path, [ { "base_accession": "GCA_000222935", "missing_version": 1, + "new_version": 2, "new_accession": "GCA_000222935.2", } ]) - update_assembly_versions( - missing_json=missing_json, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - mock_parse.assert_not_called() - mock_write.assert_not_called() - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_partial_parse_failure_writes_successful_rows( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """If one entry fails to parse, the successfully parsed ones are still written.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": str(tmp_path / "historical.tsv")} - ) - mock_find.side_effect = [ - [{"accession": "GCA_000222935.1"}, {"accession": "GCA_000222935.2"}], - [{"accession": "GCA_000412225.1"}, {"accession": "GCA_000412225.2"}], - ] - mock_parse.side_effect = [ - {"genbankAccession": "GCA_000222935.1"}, - ValueError("simulated parse failure"), + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + jsonl_path = tmp_path / "missing_assembly_versions.jsonl" + assert jsonl_path.read_text().strip() == "" + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_partial_fetch_failures_writes_successful(self, mock_fetch, mock_setup, tmp_path): + """Successful fetches are written even when some entries return no metadata.""" + mock_fetch.side_effect = [ + {"accession": "GCA_000222935.2"}, + {}, ] - missing_json = self._write_missing_json(tmp_path, [ { "base_accession": "GCA_000222935", "missing_version": 1, + "new_version": 2, "new_accession": "GCA_000222935.2", }, { "base_accession": "GCA_000412225", "missing_version": 1, + "new_version": 2, "new_accession": "GCA_000412225.2", }, ]) - update_assembly_versions( - missing_json=missing_json, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - mock_write.assert_called_once() - written = mock_write.call_args[0][0] - assert "GCA_000222935.1" in written - assert "GCA_000412225.1" not in written - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "utils") - def test_empty_missing_json_no_op( - self, mock_utils, mock_find, mock_write, tmp_path - ): - """An empty missing_versions.json should not call write_to_tsv.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": str(tmp_path / "historical.tsv")} - ) + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + jsonl_path = tmp_path / "missing_assembly_versions.jsonl" + records = [json.loads(line) for line in jsonl_path.read_text().strip().splitlines()] + assert len(records) == 1 + assert records[0]["accession"] == "GCA_000222935.2" + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_empty_missing_json_no_op(self, mock_fetch, mock_setup, tmp_path): + """An empty missing_versions.json should not call fetch_version_metadata.""" missing_json = self._write_missing_json(tmp_path, []) - update_assembly_versions( - missing_json=missing_json, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - mock_write.assert_not_called() - mock_find.assert_not_called() + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + mock_fetch.assert_not_called() # --------------------------------------------------------------------------- @@ -604,3 +501,11 @@ def test_plugin_returns_parser(self): assert isinstance(result, Parser) assert result.name == "PARSE_ASSEMBLY_VERSIONS" assert result.func is incremental_module.parse_assembly_versions_wrapper + + def test_load_missing_versions(self, tmp_path): + """load_missing_versions reads a JSON file into a list of dicts.""" + path = tmp_path / "missing.json" + entries = [{"base_accession": "GCA_000222935", "missing_version": 1}] + path.write_text(json.dumps(entries)) + result = load_missing_versions(str(path)) + assert result == entries