diff --git a/flows/lib/assembly_versions_utils.py b/flows/lib/assembly_versions_utils.py new file mode 100644 index 0000000..070e7c9 --- /dev/null +++ b/flows/lib/assembly_versions_utils.py @@ -0,0 +1,228 @@ +"""Shared utilities for assembly version discovery and fetching. + +Used by both the backfill parser and the incremental updater to discover +assembly versions via NCBI FTP and fetch per-version metadata. +""" + +import json +import os +import re +import time + +from flows.lib import utils + +ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$") + + +def parse_version(accession: str) -> int: + """Extract the version number from a dotted accession string. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + int: Version number (defaults to 1 if no dot-suffix). + """ + parts = accession.split(".") + return int(parts[1]) if len(parts) > 1 else 1 + + +def parse_accession(accession: str) -> tuple[str, int]: + """Split an accession into its base and version components. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + tuple: (base_accession, version_number). + """ + parts = accession.split(".") + return parts[0], int(parts[1]) if len(parts) > 1 else 1 + + +def setup_cache_directories(work_dir: str) -> None: + """Create cache directory structure under work_dir. + + Args: + work_dir (str): Path to the working directory. + """ + for subdir in ("version_discovery", "metadata"): + os.makedirs( + os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True + ) + + +def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str: + """Generate a human-readable cache file path. + + Args: + work_dir (str): Path to the working directory. + cache_type (str): Cache category (version_discovery or metadata). + identifier (str): Accession string used as the filename stem. + + Returns: + str: Path to the JSON cache file. + """ + safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier) + return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json") + + +def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict: + """Load data from cache if it exists and is recent enough. + + Args: + cache_path (str): Path to the cache JSON file. + max_age_days (int): Maximum acceptable age in days. + + Returns: + dict: Cached data, or empty dict on miss/expiry. + """ + try: + if os.path.exists(cache_path): + cache_age = time.time() - os.path.getmtime(cache_path) + if cache_age < (max_age_days * 24 * 3600): + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f" Warning: Could not load cache from {cache_path}: {e}") + return {} + + +def save_to_cache(cache_path: str, data: dict) -> None: + """Save data to a cache file, creating parent dirs as needed. + + Args: + cache_path (str): Path to the cache JSON file. + data (dict): Data to persist. + """ + try: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f" Warning: Could not save cache to {cache_path}: {e}") + + +def discover_version_accessions(base_accession: str, work_dir: str) -> list[str]: + """Discover all versioned accessions for a base assembly via NCBI FTP. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: Sorted list of versioned accession strings. + """ + import requests + + base_match = re.match(r"(GC[AF]_\d+)", base_accession) + if not base_match: + return [] + + base = base_match.group(1) + setup_cache_directories(work_dir) + cache_path = get_cache_path(work_dir, "version_discovery", base) + cached = load_from_cache(cache_path, max_age_days=7) + + if cached and "accessions" in cached: + print(f" Using cached version list for {base}") + return cached["accessions"] + + print(f" Discovering versions for {base} via FTP") + ftp_url = ( + f"https://ftp.ncbi.nlm.nih.gov/genomes/all/" + f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" + ) + + try: + response = requests.get(ftp_url, timeout=30) + if response.status_code != 200: + print(f" Warning: FTP query failed for {base}") + return [] + except Exception as e: + print(f" Error querying FTP for {base}: {e}") + return [] + + version_pattern = rf"{re.escape(base)}\.\d+" + accessions = sorted(set(re.findall(version_pattern, response.text))) + + save_to_cache(cache_path, { + "accessions": accessions, + "base_accession": base, + "ftp_url": ftp_url, + }) + return accessions + + +def fetch_version_metadata(version_acc: str, work_dir: str) -> dict: + """Fetch NCBI datasets metadata for a single assembly version. + + Uses utils.run_quoted to safely invoke the datasets CLI. Results are + cached for 30 days. + + Args: + version_acc (str): Versioned accession (e.g. GCA_000002035.1). + work_dir (str): Working directory for cache storage. + + Returns: + dict: Metadata dict, or empty dict on failure. + """ + cache_path = get_cache_path(work_dir, "metadata", version_acc) + cached = load_from_cache(cache_path, max_age_days=30) + + if cached and "metadata" in cached: + return cached["metadata"] + + if not ACCESSION_PATTERN.match(version_acc): + print(f" Skipping unexpected accession format: {version_acc}") + return {} + + cmd = [ + "datasets", "summary", "genome", "accession", + version_acc, "--as-json-lines", + ] + try: + result = utils.run_quoted( + cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="ignore", + timeout=60, + ) + if result.returncode == 0 and result.stdout and result.stdout.strip(): + version_data = json.loads(result.stdout.strip()) + save_to_cache(cache_path, { + "metadata": version_data, + "cached_at": time.time(), + }) + return version_data + + print(f" Warning: No metadata for {version_acc}") + except Exception as e: + print(f" Warning: Error fetching {version_acc}: {e}") + + return {} + + +def find_all_assembly_versions(base_accession: str, work_dir: str) -> list[dict]: + """Discover all versions and fetch metadata for each. + + Delegates to discover_version_accessions for FTP discovery and + fetch_version_metadata for per-version metadata retrieval. Both layers + use independent caches. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: List of metadata dicts, one per version found. + """ + accessions = discover_version_accessions(base_accession, work_dir) + versions = [] + for version_acc in accessions: + metadata = fetch_version_metadata(version_acc, work_dir) + if metadata: + versions.append(metadata) + return versions diff --git a/flows/lib/shared_args.py b/flows/lib/shared_args.py index 4d5d801..b8087fc 100644 --- a/flows/lib/shared_args.py +++ b/flows/lib/shared_args.py @@ -182,6 +182,14 @@ }, } +MISSING_JSON = { + "flags": ["-m", "--missing_json"], + "keys": { + "help": "Path to missing_versions.json produced by parse_assembly_versions.", + "type": str, + }, +} + WORK_DIR = { "flags": ["-w", "--work_dir"], "keys": { diff --git a/flows/parsers/parse_assembly_versions.py b/flows/parsers/parse_assembly_versions.py new file mode 100644 index 0000000..5b2f4bb --- /dev/null +++ b/flows/parsers/parse_assembly_versions.py @@ -0,0 +1,404 @@ +"""Daily incremental updates to historical assembly records. + +Identifies assembly versions newly superseded since the last run and appends +them to assembly_historical.tsv. No NCBI fetches are required — data is +copied directly from the previous assembly_current.tsv parse output. + +Usage: + python -m flows.parsers.parse_assembly_versions \\ + --input_path assembly_data_report.jsonl +""" + +import csv +import json +import os +from glob import glob +from pathlib import Path +from typing import Optional + +from flows.lib.assembly_versions_utils import parse_accession +from flows.lib.conditional_import import flow +from flows.lib.shared_args import INPUT_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required +from flows.lib.utils import Parser + +def derive_assembly_version_paths(input_path: str) -> tuple[str, str]: + """Derive previous_tsv and historical_tsv paths from the input JSONL path. + + Both files are assumed to live alongside the JSONL in the same directory, + following the same convention used by parse_ncbi_assemblies. + + Args: + input_path (str): Path to the current assembly_data_report.jsonl. + + Returns: + tuple: (previous_tsv, historical_tsv) absolute paths. + """ + work_dir = os.path.dirname(os.path.abspath(input_path)) + previous_tsv = os.path.join(work_dir, "assembly_current.tsv.previous") + historical_tsv = os.path.join(work_dir, "assembly_historical.tsv") + return previous_tsv, historical_tsv + + +def load_previous_parsed_by_base(previous_tsv: str) -> dict[str, dict[int, dict]]: + """Load previous parsed results indexed by base accession and version. + + Args: + previous_tsv (str): Path to assembly_current.tsv from the previous run. + + Returns: + dict: Nested mapping of base_accession -> version -> row data. + Returns an empty dict if the file is not found, which is expected + on the first run after the Phase 0 backfill. + """ + previous_by_base: dict[str, dict[int, dict]] = {} + + try: + with open(previous_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + accession = row["accession"] + base_acc, version = parse_accession(accession) + if base_acc not in previous_by_base: + previous_by_base[base_acc] = {} + previous_by_base[base_acc][version] = dict(row) + except FileNotFoundError: + print(f"Warning: Previous TSV not found: {previous_tsv}") + print(" This is expected for the first run after the Phase 0 backfill.") + return {} + + total = sum(len(v) for v in previous_by_base.values()) + print(f"Loaded {total} assemblies from previous parsed results.") + print(f" Unique base accessions: {len(previous_by_base)}") + + return previous_by_base + + +def build_superseded_row( + previous_row: dict, + previous_version: int, + new_accession: str, + new_version: int, + release_date: str, +) -> dict: + """Build a superseded row from a previous row with updated metadata. + + Args: + previous_row (dict): Row data copied from the previous parsed TSV. + previous_version (int): Version number of the assembly being superseded. + new_accession (str): Accession of the assembly that supersedes this one. + new_version (int): Version number of the superseding assembly. + release_date (str): Release date of the superseding assembly. + + Returns: + dict: Updated row with version_status, assembly_id, and superseded_by fields. + """ + base_acc, _ = parse_accession(previous_row["accession"]) + row = previous_row.copy() + row["version_status"] = "superseded" + row["assembly_id"] = f"{base_acc}_{previous_version}" + row["superseded_by"] = new_accession + row["superseded_by_version"] = new_version + row["superseded_date"] = release_date + return row + + +def build_missing_version_record( + base_acc: str, + missing_version: int, + new_version: int, + new_accession: str, + is_new_series: bool = False, +) -> dict: + """Build a record describing a version missing from the previous parsed TSV. + + Args: + base_acc (str): Base accession without version suffix. + missing_version (int): The version number that could not be found. + new_version (int): The new version number that triggered this check. + new_accession (str): Full accession of the new assembly. + is_new_series (bool): True if this is a new assembly series with no + prior history at all. + + Returns: + dict: Record suitable for writing to a missing-versions JSON file. + """ + record = { + "base_accession": base_acc, + "missing_version": missing_version, + "new_version": new_version, + "new_accession": new_accession, + } + if is_new_series: + record["note"] = "New assembly series — prior versions may need backfill" + return record + + +def identify_newly_superseded( + new_jsonl: str, + previous_by_base: dict[str, dict[int, dict]], +) -> tuple[list[dict], list[dict]]: + """Identify versions that became superseded in the current JSONL update. + + For each assembly with version > 1 in the new JSONL, checks whether the + immediately preceding version exists in the previous parsed TSV. Assemblies + whose predecessor is found are added to the superseded list; those missing a + predecessor are recorded for optional backfill. + + Args: + new_jsonl (str): Path to the current assembly_data_report.jsonl. + previous_by_base (dict): Indexed previous parsed results from + load_previous_parsed_by_base. + + Returns: + tuple: (newly_superseded, missing_versions) where each is a list of dicts. + """ + newly_superseded: list[dict] = [] + missing_versions: list[dict] = [] + + with open(new_jsonl) as f: + for line in f: + assembly = json.loads(line) + accession = assembly["accession"] + base_acc, new_version = parse_accession(accession) + + if new_version <= 1: + continue + + previous_version = new_version - 1 + + if base_acc not in previous_by_base: + missing_versions.append(build_missing_version_record( + base_acc, previous_version, new_version, accession, + is_new_series=True, + )) + continue + + if previous_version not in previous_by_base[base_acc]: + missing_versions.append(build_missing_version_record( + base_acc, previous_version, new_version, accession, + )) + continue + + previous_row = previous_by_base[base_acc][previous_version] + release_date = assembly.get("releaseDate") or "" + newly_superseded.append(build_superseded_row( + previous_row, previous_version, accession, new_version, release_date, + )) + + return newly_superseded, missing_versions + + +def append_superseded_to_tsv( + newly_superseded: list[dict], historical_tsv: str +) -> None: + """Append newly superseded rows to the historical TSV, deduplicating by assembly_id. + + Reads the existing file if present, merges new rows (new rows take + precedence on duplicate assembly_id), and writes the combined result back. + + Args: + newly_superseded (list): Row dicts from identify_newly_superseded. + historical_tsv (str): Path to assembly_historical.tsv. + """ + if not newly_superseded: + print(" No newly superseded versions to add.") + return + + existing: dict[str, dict] = {} + historical_path = Path(historical_tsv) + + if historical_path.exists(): + with open(historical_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + existing[row["assembly_id"]] = dict(row) + + for row in newly_superseded: + existing[row["assembly_id"]] = row + + fieldnames = list(next(iter(existing.values())).keys()) + with open(historical_tsv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, fieldnames=fieldnames, delimiter="\t", extrasaction="ignore" + ) + writer.writeheader() + writer.writerows(existing.values()) + + print(f" Added {len(newly_superseded)} newly superseded versions.") + print(f" Total records in {historical_tsv}: {len(existing)}") + + +def print_superseded_summary(newly_superseded: list[dict]) -> None: + """Print a short summary of the newly superseded versions. + + Args: + newly_superseded (list): Row dicts from identify_newly_superseded. + """ + if not newly_superseded: + print(" Found: 0 newly superseded versions.") + return + + print(f" Found: {len(newly_superseded)} newly superseded versions.") + print(" Examples:") + for row in newly_superseded[:5]: + print(f" {row['accession']} -> superseded by v{row['superseded_by_version']}") + if len(newly_superseded) > 5: + print(f" ... and {len(newly_superseded) - 5} more") + + +def print_missing_versions_warning(missing: list[dict]) -> None: + """Print a warning listing versions absent from the previous parsed TSV. + + Args: + missing (list): Missing-version records from identify_newly_superseded. + """ + if not missing: + return + + print(f"\n Warning: {len(missing)} assemblies have missing previous versions.") + print(" These may need manual backfill:") + for m in missing[:5]: + print( + f" {m['base_accession']}: " + f"need v{m['missing_version']}, have v{m['new_version']}" + ) + if len(missing) > 5: + print(f" ... and {len(missing) - 5} more") + print("\n To backfill missing versions, run:") + print(" python -m flows.updaters.update_assembly_versions") + + +@flow(log_prints=True) +def parse_assembly_versions( + new_jsonl: str, + previous_tsv: str, + historical_tsv: str, +) -> dict: + """Daily incremental update of the historical assembly TSV. + + Called after parse_ncbi_assemblies completes. Uses the previous parsed + TSV — no NCBI fetches are made. + + Args: + new_jsonl (str): Path to the current assembly_data_report.jsonl. + previous_tsv (str): Path to assembly_current.tsv from the previous run. + historical_tsv (str): Path to assembly_historical.tsv to update. + + Returns: + dict: Summary with keys newly_superseded_count, missing_versions_count, + and missing_versions (list of records). + """ + separator = "=" * 80 + print(f"\n{separator}") + print("ASSEMBLY VERSION PARSE") + print(f"{separator}\n") + + print("[1/3] Loading previous parsed results...") + previous_by_base = load_previous_parsed_by_base(previous_tsv) + + if not previous_by_base: + print(" No previous parsed data available — skipping incremental update.") + print(" This is expected for the first run after the Phase 0 backfill.\n") + return { + "newly_superseded_count": 0, + "missing_versions_count": 0, + "missing_versions": [], + } + + print("\n[2/3] Identifying newly superseded versions...") + newly_superseded, missing = identify_newly_superseded(new_jsonl, previous_by_base) + print_superseded_summary(newly_superseded) + print_missing_versions_warning(missing) + + print("\n[3/3] Updating historical TSV...") + append_superseded_to_tsv(newly_superseded, historical_tsv) + + print(f"\n{separator}") + print( + f"ASSEMBLY VERSION PARSE COMPLETE " + f"Superseded: {len(newly_superseded)} " + f"Missing: {len(missing)}" + ) + print(f"{separator}\n") + + return { + "newly_superseded_count": len(newly_superseded), + "missing_versions_count": len(missing), + "missing_versions": missing, + } + + +def parse_assembly_versions_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +) -> None: + """Wrapper matching the fetch_parse_validate parser signature. + + Derives the previous TSV and historical TSV paths from work_dir and + delegates to parse_assembly_versions. + + Args: + working_yaml (str): Path to the working YAML file (unused; accepted + for pipeline compatibility). + work_dir (str): Path to the working directory containing the JSONL, + the previous TSV, and the historical TSV. + append (bool): Unused; accepted for pipeline compatibility. + data_freeze_path (str, optional): Unused; accepted for pipeline + compatibility. + **kwargs: Additional keyword arguments passed by the pipeline. + """ + glob_path = os.path.join(work_dir, "*.jsonl") + paths = glob(glob_path) + if not paths: + raise FileNotFoundError(f"No jsonl file found in {work_dir}") + if len(paths) > 1: + raise ValueError(f"More than one jsonl file found in {work_dir}") + + previous_tsv, historical_tsv = derive_assembly_version_paths(paths[0]) + results = parse_assembly_versions( + new_jsonl=paths[0], + previous_tsv=previous_tsv, + historical_tsv=historical_tsv, + ) + + if results["missing_versions_count"] > 0: + missing_json_path = os.path.join(work_dir, "missing_versions.json") + with open(missing_json_path, "w", encoding="utf-8") as f: + json.dump(results["missing_versions"], f, indent=2) + print(f" Missing versions written to: {missing_json_path}") + + +def plugin() -> Parser: + """Register the flow.""" + return Parser( + name="PARSE_ASSEMBLY_VERSIONS", + func=parse_assembly_versions_wrapper, + description="Daily incremental update of historical assembly records.", + ) + + +if __name__ == "__main__": + args = _parse_args( + [required(INPUT_PATH)], + description="Daily incremental update of historical assembly records", + ) + previous_tsv, historical_tsv = derive_assembly_version_paths(args.input_path) + results = parse_assembly_versions( + new_jsonl=args.input_path, + previous_tsv=previous_tsv, + historical_tsv=historical_tsv, + ) + print(f"Summary: superseded={results['newly_superseded_count']}, " + f"missing={results['missing_versions_count']}") + if results["missing_versions_count"] > 0: + missing_json_path = Path(historical_tsv).parent / "missing_versions.json" + with open(missing_json_path, "w", encoding="utf-8") as f: + json.dump(results["missing_versions"], f, indent=2) + print( + f" Action needed: {results['missing_versions_count']} missing versions." + ) + print(f" Written to: {missing_json_path}") + print(" Run: python -m flows.updaters.update_assembly_versions") diff --git a/flows/parsers/parse_backfill_historical_versions.py b/flows/parsers/parse_backfill_historical_versions.py index cc18b17..4b7acb0 100644 --- a/flows/parsers/parse_backfill_historical_versions.py +++ b/flows/parsers/parse_backfill_historical_versions.py @@ -12,17 +12,20 @@ import json import os -import re -import time from datetime import datetime from glob import glob from pathlib import Path from typing import Optional -import requests from genomehubs import utils as gh_utils from flows.lib import utils +from flows.lib.assembly_versions_utils import ( + find_all_assembly_versions, + parse_accession, + parse_version, + setup_cache_directories, +) from flows.lib.conditional_import import flow from flows.lib.shared_args import INPUT_PATH, WORK_DIR, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args @@ -34,198 +37,6 @@ write_to_tsv, ) -ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$") - - -def setup_cache_directories(work_dir: str): - """Create cache directory structure under work_dir. - - Args: - work_dir (str): Path to the working directory. - """ - for subdir in ("version_discovery", "metadata"): - os.makedirs( - os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True - ) - - -def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str: - """Generate a human-readable cache file path. - - Args: - work_dir (str): Path to the working directory. - cache_type (str): Cache category (version_discovery or metadata). - identifier (str): Accession string used as the filename stem. - - Returns: - str: Path to the JSON cache file. - """ - safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier) - return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json") - - -def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict: - """Load data from cache if it exists and is recent enough. - - Args: - cache_path (str): Path to the cache JSON file. - max_age_days (int): Maximum acceptable age in days. - - Returns: - dict: Cached data, or empty dict on miss/expiry. - """ - try: - if os.path.exists(cache_path): - cache_age = time.time() - os.path.getmtime(cache_path) - if cache_age < (max_age_days * 24 * 3600): - with open(cache_path, "r", encoding="utf-8") as f: - return json.load(f) - except Exception as e: - print(f" Warning: Could not load cache from {cache_path}: {e}") - return {} - - -def save_to_cache(cache_path: str, data: dict): - """Save data to a cache file, creating parent dirs as needed. - - Args: - cache_path (str): Path to the cache JSON file. - data (dict): Data to persist. - """ - try: - os.makedirs(os.path.dirname(cache_path), exist_ok=True) - with open(cache_path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - except Exception as e: - print(f" Warning: Could not save cache to {cache_path}: {e}") - - -def discover_version_accessions( - base_accession: str, work_dir: str -) -> list[str]: - """Discover all versioned accessions for a base assembly via NCBI FTP. - - Args: - base_accession (str): Full accession (e.g. GCA_000002035.3). - work_dir (str): Working directory for cache storage. - - Returns: - list: Sorted list of versioned accession strings. - """ - base_match = re.match(r"(GC[AF]_\d+)", base_accession) - if not base_match: - return [] - - base = base_match.group(1) - setup_cache_directories(work_dir) - cache_path = get_cache_path(work_dir, "version_discovery", base) - cached = load_from_cache(cache_path, max_age_days=7) - - if cached and "accessions" in cached: - print(f" Using cached version list for {base}") - return cached["accessions"] - - print(f" Discovering versions for {base} via FTP") - ftp_url = ( - f"https://ftp.ncbi.nlm.nih.gov/genomes/all/" - f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" - ) - - try: - response = requests.get(ftp_url, timeout=30) - if response.status_code != 200: - print(f" Warning: FTP query failed for {base}") - return [] - except Exception as e: - print(f" Error querying FTP for {base}: {e}") - return [] - - version_pattern = rf"{re.escape(base)}\.\d+" - accessions = sorted(set(re.findall(version_pattern, response.text))) - - save_to_cache(cache_path, { - "accessions": accessions, - "base_accession": base, - "ftp_url": ftp_url, - }) - return accessions - - -def fetch_version_metadata(version_acc: str, work_dir: str) -> dict: - """Fetch NCBI datasets metadata for a single assembly version. - - Uses utils.run_quoted to safely invoke the datasets CLI. Results are - cached for 30 days. - - Args: - version_acc (str): Versioned accession (e.g. GCA_000002035.1). - work_dir (str): Working directory for cache storage. - - Returns: - dict: Metadata dict, or empty dict on failure. - """ - cache_path = get_cache_path(work_dir, "metadata", version_acc) - cached = load_from_cache(cache_path, max_age_days=30) - - if cached and "metadata" in cached: - return cached["metadata"] - - if not ACCESSION_PATTERN.match(version_acc): - print(f" Skipping unexpected accession format: {version_acc}") - return {} - - cmd = [ - "datasets", "summary", "genome", "accession", - version_acc, "--as-json-lines", - ] - try: - result = utils.run_quoted( - cmd, - capture_output=True, - text=True, - encoding="utf-8", - errors="ignore", - timeout=60, - ) - if result.returncode == 0 and result.stdout and result.stdout.strip(): - version_data = json.loads(result.stdout.strip()) - save_to_cache(cache_path, { - "metadata": version_data, - "cached_at": time.time(), - }) - return version_data - - print(f" Warning: No metadata for {version_acc}") - except Exception as e: - print(f" Warning: Error fetching {version_acc}: {e}") - - return {} - - -def find_all_assembly_versions( - base_accession: str, work_dir: str -) -> list[dict]: - """Discover all versions and fetch metadata for each. - - Delegates to discover_version_accessions for FTP discovery and - fetch_version_metadata for per-version metadata retrieval. Both layers - use independent caches. - - Args: - base_accession (str): Full accession (e.g. GCA_000002035.3). - work_dir (str): Working directory for cache storage. - - Returns: - list: List of metadata dicts, one per version found. - """ - accessions = discover_version_accessions(base_accession, work_dir) - versions = [] - for version_acc in accessions: - metadata = fetch_version_metadata(version_acc, work_dir) - if metadata: - versions.append(metadata) - return versions - def parse_historical_version( version_data: dict, @@ -269,32 +80,6 @@ def parse_historical_version( return gh_utils.parse_report_values(config.parse_fns, processed_report) -def parse_version(accession: str) -> int: - """Extract version number from a dotted accession string. - - Args: - accession (str): e.g. GCA_000002035.3 - - Returns: - int: Version number (defaults to 1 if no dot-suffix). - """ - parts = accession.split(".") - return int(parts[1]) if len(parts) > 1 else 1 - - -def parse_accession(accession: str) -> tuple[str, int]: - """Split an accession into its base and version components. - - Args: - accession (str): e.g. GCA_000002035.3 - - Returns: - tuple: (base_accession, version_number). - """ - parts = accession.split(".") - return parts[0], int(parts[1]) if len(parts) > 1 else 1 - - def derive_checkpoint_path( input_path: str, yaml_path: str, work_dir: str ) -> str: diff --git a/flows/updaters/update_assembly_versions.py b/flows/updaters/update_assembly_versions.py new file mode 100644 index 0000000..b7b36e2 --- /dev/null +++ b/flows/updaters/update_assembly_versions.py @@ -0,0 +1,130 @@ +"""Fetch metadata for assembly versions missing from historical records. + +Run this when parse_assembly_versions reports assemblies whose previous +version was absent from the previous parsed TSV. Fetches raw NCBI metadata +for each missing accession and writes it to a JSONL file for downstream +parsing by parse_backfill_historical_versions. + +Usage: + python -m flows.updaters.update_assembly_versions \\ + --missing_json tmp/missing_versions.json \\ + --work_dir tmp +""" + +import json +import os + +from flows.lib.assembly_versions_utils import ( + fetch_version_metadata, + setup_cache_directories, +) +from flows.lib.conditional_import import emit_event, flow +from flows.lib.shared_args import MISSING_JSON, WORK_DIR +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required + +OUTPUT_JSONL = "missing_assembly_versions.jsonl" + + +def load_missing_versions(missing_json: str) -> list[dict]: + """Load the list of missing versions from a JSON file. + + The file is written by parse_assembly_versions when it encounters + assemblies with no matching previous version in the parsed TSV. + + Args: + missing_json (str): Path to missing_versions.json. + + Returns: + list: Missing-version records, each with base_accession, missing_version, + new_version, and new_accession keys. + """ + with open(missing_json, encoding="utf-8") as f: + return json.load(f) + + +@flow(log_prints=True) +def update_assembly_versions( + missing_json: str, + work_dir: str = ".", +) -> None: + """Fetch metadata for missing assembly versions and write to JSONL. + + For each entry in missing_json, fetches the new accession's raw metadata + from NCBI datasets and writes the results to a JSONL file. The JSONL is + consumed by parse_backfill_historical_versions. Emits a completion event + on finish. + + Args: + missing_json (str): Path to missing_versions.json from parse_assembly_versions. + work_dir (str): Working directory for cache and JSONL output. + """ + setup_cache_directories(work_dir) + missing = load_missing_versions(missing_json) + + if not missing: + print("No missing versions to fetch.") + emit_event( + event="update.assembly_versions.completed", + resource={ + "prefect.resource.id": f"update.assembly_versions.{work_dir}", + "prefect.resource.type": "assembly.versions", + }, + payload={"fetched": 0, "failed": 0, "status": "no_op"}, + ) + return + + output_jsonl = os.path.join(work_dir, OUTPUT_JSONL) + total = len(missing) + fetched = 0 + failed = 0 + + separator = "=" * 80 + print(f"\n{separator}") + print("ASSEMBLY VERSION UPDATE") + print(f"{separator}") + print(f" Entries to fetch: {total}") + print(f" Output JSONL: {output_jsonl}") + print(f"{separator}\n") + + with open(output_jsonl, "w", encoding="utf-8") as f: + for i, entry in enumerate(missing): + new_accession = entry["new_accession"] + print(f"[{i + 1}/{total}] Fetching {new_accession}...", end=" ", flush=True) + metadata = fetch_version_metadata(new_accession, work_dir) + if metadata: + f.write(json.dumps(metadata) + "\n") + fetched += 1 + print("done") + else: + failed += 1 + print("failed (no metadata returned)") + + print(f"\n{separator}") + print("ASSEMBLY VERSION UPDATE COMPLETE") + print(f"{separator}") + print(f" Fetched: {fetched}/{total}") + if failed > 0: + print(f" Failed: {failed}/{total}") + print(f" Output: {output_jsonl}") + print(f"{separator}\n") + + emit_event( + event="update.assembly_versions.completed", + resource={ + "prefect.resource.id": f"update.assembly_versions.{work_dir}", + "prefect.resource.type": "assembly.versions", + }, + payload={"fetched": fetched, "failed": failed, "status": "success"}, + ) + + +if __name__ == "__main__": + args = _parse_args( + [required(MISSING_JSON), WORK_DIR], + description="Fetch assembly versions missing from historical records", + ) + update_assembly_versions( + missing_json=args.missing_json, + work_dir=args.work_dir, + ) diff --git a/tests/test_assembly_versions.py b/tests/test_assembly_versions.py new file mode 100644 index 0000000..f9317c7 --- /dev/null +++ b/tests/test_assembly_versions.py @@ -0,0 +1,511 @@ +"""Tests for parse_assembly_versions.py and update_assembly_versions.py + +Covers: +- Loading and indexing previous parsed TSV results +- Building superseded and missing-version records +- Core supersession detection logic (superseded, missing-with-gap, new-series, v1-skip) +- Appending to historical TSV with deduplication +- Parser orchestrator flow behaviour +- Updater flow: fetch metadata and write JSONL +""" + +import csv +import json +import os +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +os.environ["SKIP_PREFECT"] = "true" + +from flows.lib.utils import Parser # noqa: E402 +from flows.parsers import parse_assembly_versions as incremental_module # noqa: E402 +from flows.parsers.parse_assembly_versions import ( # noqa: E402 + append_superseded_to_tsv, + build_missing_version_record, + build_superseded_row, + derive_assembly_version_paths, + identify_newly_superseded, + load_previous_parsed_by_base, + parse_assembly_versions, +) +from flows.updaters import update_assembly_versions as updater_module # noqa: E402 +from flows.updaters.update_assembly_versions import ( # noqa: E402 + load_missing_versions, + update_assembly_versions, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def write_tsv(path: Path, rows: list[dict]) -> None: + """Write a list of dicts to a tab-separated file.""" + if not rows: + return + with open(path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()), delimiter="\t") + writer.writeheader() + writer.writerows(rows) + + +def read_tsv(path: Path) -> list[dict]: + """Read a tab-separated file into a list of dicts.""" + with open(path, encoding="utf-8") as f: + return list(csv.DictReader(f, delimiter="\t")) + + +def write_jsonl(path: Path, records: list[dict]) -> None: + """Write a list of dicts as newline-delimited JSON.""" + with open(path, "w", encoding="utf-8") as f: + for record in records: + f.write(json.dumps(record) + "\n") + + +# --------------------------------------------------------------------------- +# TestLoadPreviousParsed +# --------------------------------------------------------------------------- + +class TestLoadPreviousParsed: + """load_previous_parsed_by_base indexes rows by base accession and version.""" + + def test_missing_file_returns_empty(self, tmp_path): + result = load_previous_parsed_by_base(str(tmp_path / "nope.tsv")) + assert result == {} + + def test_single_version_indexed(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [{"accession": "GCA_000222935.1", "taxon_id": "12345"}]) + result = load_previous_parsed_by_base(str(tsv)) + assert "GCA_000222935" in result + assert 1 in result["GCA_000222935"] + assert result["GCA_000222935"][1]["taxon_id"] == "12345" + + def test_multi_version_same_base(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"}, + {"accession": "GCA_000222935.2", "taxon_id": "1"}, + ]) + result = load_previous_parsed_by_base(str(tsv)) + assert len(result["GCA_000222935"]) == 2 + assert 1 in result["GCA_000222935"] + assert 2 in result["GCA_000222935"] + + def test_multiple_base_accessions(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"}, + {"accession": "GCA_000412225.1", "taxon_id": "2"}, + ]) + result = load_previous_parsed_by_base(str(tsv)) + assert len(result) == 2 + assert "GCA_000222935" in result + assert "GCA_000412225" in result + + +# --------------------------------------------------------------------------- +# TestBuildSupersededRow +# --------------------------------------------------------------------------- + +class TestBuildSupersededRow: + """build_superseded_row stamps the correct metadata onto a copied row.""" + + def _base_row(self): + return { + "accession": "GCA_000222935.1", + "taxon_id": "12345", + "assembly_level": "Chromosome", + } + + def test_version_status_set(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["version_status"] == "superseded" + + def test_assembly_id_format(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["assembly_id"] == "GCA_000222935_1" + + def test_superseded_by_fields(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["superseded_by"] == "GCA_000222935.2" + assert row["superseded_by_version"] == 2 + assert row["superseded_date"] == "2024-01-15" + + def test_original_row_not_mutated(self): + original = self._base_row() + build_superseded_row(original, 1, "GCA_000222935.2", 2, "2024-01-15") + assert "version_status" not in original + + def test_existing_fields_preserved(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["taxon_id"] == "12345" + assert row["assembly_level"] == "Chromosome" + + +# --------------------------------------------------------------------------- +# TestBuildMissingVersionRecord +# --------------------------------------------------------------------------- + +class TestBuildMissingVersionRecord: + """build_missing_version_record captures the gap details.""" + + def test_required_fields(self): + rec = build_missing_version_record("GCA_000222935", 2, 3, "GCA_000222935.3") + assert rec["base_accession"] == "GCA_000222935" + assert rec["missing_version"] == 2 + assert rec["new_version"] == 3 + assert rec["new_accession"] == "GCA_000222935.3" + + def test_no_note_by_default(self): + rec = build_missing_version_record("GCA_000222935", 1, 2, "GCA_000222935.2") + assert "note" not in rec + + def test_note_present_for_new_series(self): + rec = build_missing_version_record( + "GCA_000222935", 1, 2, "GCA_000222935.2", is_new_series=True + ) + assert "note" in rec + + +# --------------------------------------------------------------------------- +# TestIdentifyNewlySuperseded +# --------------------------------------------------------------------------- + +class TestIdentifyNewlySuperseded: + """identify_newly_superseded covers all branching cases.""" + + def _write_jsonl(self, tmp_path, records): + path = tmp_path / "new.jsonl" + write_jsonl(path, records) + return str(path) + + def test_v1_assembly_skipped(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_000222935.1"}]) + superseded, missing = identify_newly_superseded(jsonl, {}) + assert superseded == [] + assert missing == [] + + def test_superseded_found_when_previous_version_present(self, tmp_path): + jsonl = self._write_jsonl( + tmp_path, [{"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"}] + ) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert len(superseded) == 1 + assert superseded[0]["superseded_by"] == "GCA_000222935.2" + assert missing == [] + + def test_missing_with_version_gap(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_000222935.3"}]) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert superseded == [] + assert len(missing) == 1 + assert missing[0]["missing_version"] == 2 + + def test_new_series_no_prior_base(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_999999999.2"}]) + superseded, missing = identify_newly_superseded(jsonl, {}) + assert superseded == [] + assert len(missing) == 1 + assert missing[0]["note"] + + def test_mixed_batch(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-01"}, + {"accession": "GCA_000412225.1"}, + {"accession": "GCA_999999999.2"}, + ]) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert len(superseded) == 1 + assert len(missing) == 1 + + +# --------------------------------------------------------------------------- +# TestAppendSupersededToTsv +# --------------------------------------------------------------------------- + +class TestAppendSupersededToTsv: + """append_superseded_to_tsv correctly creates, appends, and deduplicates.""" + + def _make_row(self, acc, assembly_id, status="superseded"): + return { + "accession": acc, + "assembly_id": assembly_id, + "version_status": status, + } + + def test_creates_new_file(self, tmp_path): + tsv = tmp_path / "historical.tsv" + rows = [self._make_row("GCA_000222935.1", "GCA_000222935_1")] + append_superseded_to_tsv(rows, str(tsv)) + assert tsv.exists() + result = read_tsv(tsv) + assert len(result) == 1 + assert result[0]["accession"] == "GCA_000222935.1" + + def test_appends_to_existing(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [self._make_row("GCA_000412225.1", "GCA_000412225_1")]) + append_superseded_to_tsv( + [self._make_row("GCA_000222935.1", "GCA_000222935_1")], str(tsv) + ) + result = read_tsv(tsv) + assert len(result) == 2 + + def test_dedup_on_assembly_id_keeps_new(self, tmp_path): + tsv = tmp_path / "historical.tsv" + old_row = { + "accession": "GCA_000222935.1", + "assembly_id": "GCA_000222935_1", + "version_status": "superseded", + "superseded_by": "GCA_000222935.2", + } + write_tsv(tsv, [old_row]) + new_row = dict(old_row) + new_row["superseded_by"] = "GCA_000222935.3" + append_superseded_to_tsv([new_row], str(tsv)) + result = read_tsv(tsv) + assert len(result) == 1 + assert result[0]["superseded_by"] == "GCA_000222935.3" + + def test_no_op_when_empty_list(self, tmp_path): + tsv = tmp_path / "historical.tsv" + append_superseded_to_tsv([], str(tsv)) + assert not tsv.exists() + + +# --------------------------------------------------------------------------- +# TestIncrementalOrchestrator +# --------------------------------------------------------------------------- + +class TestIncrementalOrchestrator: + """parse_assembly_versions orchestrator behaviour.""" + + def test_no_previous_tsv_returns_empty_result(self, tmp_path): + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [{"accession": "GCA_000222935.2"}]) + result = parse_assembly_versions( + new_jsonl=str(jsonl), + previous_tsv=str(tmp_path / "nope.tsv"), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["newly_superseded_count"] == 0 + assert result["missing_versions_count"] == 0 + assert result["missing_versions"] == [] + + def test_one_superseded_produces_correct_counts(self, tmp_path): + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} + ]) + result = parse_assembly_versions( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["newly_superseded_count"] == 1 + assert result["missing_versions_count"] == 0 + + def test_missing_version_detected_in_orchestrator_result(self, tmp_path): + """v3 present, v2 missing → missing_versions_count == 1.""" + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.3", "releaseDate": "2024-06-01"} + ]) + result = parse_assembly_versions( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["missing_versions_count"] == 1 + assert result["missing_versions"][0]["base_accession"] == "GCA_000222935" + assert result["missing_versions"][0]["missing_version"] == 2 + + def test_historical_tsv_written(self, tmp_path): + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} + ]) + historical_tsv = tmp_path / "historical.tsv" + parse_assembly_versions( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(historical_tsv), + ) + assert historical_tsv.exists() + rows = read_tsv(historical_tsv) + assert len(rows) == 1 + assert rows[0]["version_status"] == "superseded" + + +# --------------------------------------------------------------------------- +# TestDeriveAssemblyVersionPaths +# --------------------------------------------------------------------------- + +class TestDeriveAssemblyVersionPaths: + """derive_assembly_version_paths produces correct sibling file paths.""" + + def test_previous_tsv_in_same_directory(self, tmp_path): + jsonl = tmp_path / "assembly_data_report.jsonl" + jsonl.touch() + previous_tsv, _ = derive_assembly_version_paths(str(jsonl)) + assert os.path.dirname(previous_tsv) == str(tmp_path) + assert previous_tsv.endswith("assembly_current.tsv.previous") + + def test_historical_tsv_in_same_directory(self, tmp_path): + jsonl = tmp_path / "assembly_data_report.jsonl" + jsonl.touch() + _, historical_tsv = derive_assembly_version_paths(str(jsonl)) + assert os.path.dirname(historical_tsv) == str(tmp_path) + assert historical_tsv.endswith("assembly_historical.tsv") + + +# --------------------------------------------------------------------------- +# TestUpdateAssemblyVersionsFlow +# --------------------------------------------------------------------------- + +class TestUpdateAssemblyVersionsFlow: + """update_assembly_versions fetches metadata and writes JSONL.""" + + def _write_missing_json(self, tmp_path, entries): + path = tmp_path / "missing.json" + with open(path, "w") as f: + json.dump(entries, f) + return str(path) + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_correct_accession_fetched(self, mock_fetch, mock_setup, tmp_path): + """new_accession from missing_json should be passed to fetch_version_metadata.""" + mock_fetch.return_value = {"accession": "GCA_000222935.2"} + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_version": 2, + "new_accession": "GCA_000222935.2", + } + ]) + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + mock_fetch.assert_called_once_with("GCA_000222935.2", str(tmp_path)) + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_jsonl_written_with_fetched_records(self, mock_fetch, mock_setup, tmp_path): + """Fetched metadata should be written as JSONL lines.""" + mock_fetch.return_value = {"accession": "GCA_000222935.2", "someField": "value"} + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_version": 2, + "new_accession": "GCA_000222935.2", + } + ]) + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + jsonl_path = tmp_path / "missing_assembly_versions.jsonl" + assert jsonl_path.exists() + records = [json.loads(line) for line in jsonl_path.read_text().strip().splitlines()] + assert len(records) == 1 + assert records[0]["accession"] == "GCA_000222935.2" + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_fetch_failure_skipped(self, mock_fetch, mock_setup, tmp_path): + """If fetch_version_metadata returns empty dict, entry is omitted from JSONL.""" + mock_fetch.return_value = {} + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_version": 2, + "new_accession": "GCA_000222935.2", + } + ]) + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + jsonl_path = tmp_path / "missing_assembly_versions.jsonl" + assert jsonl_path.read_text().strip() == "" + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_partial_fetch_failures_writes_successful(self, mock_fetch, mock_setup, tmp_path): + """Successful fetches are written even when some entries return no metadata.""" + mock_fetch.side_effect = [ + {"accession": "GCA_000222935.2"}, + {}, + ] + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_version": 2, + "new_accession": "GCA_000222935.2", + }, + { + "base_accession": "GCA_000412225", + "missing_version": 1, + "new_version": 2, + "new_accession": "GCA_000412225.2", + }, + ]) + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + jsonl_path = tmp_path / "missing_assembly_versions.jsonl" + records = [json.loads(line) for line in jsonl_path.read_text().strip().splitlines()] + assert len(records) == 1 + assert records[0]["accession"] == "GCA_000222935.2" + + @patch.object(updater_module, "setup_cache_directories") + @patch.object(updater_module, "fetch_version_metadata") + def test_empty_missing_json_no_op(self, mock_fetch, mock_setup, tmp_path): + """An empty missing_versions.json should not call fetch_version_metadata.""" + missing_json = self._write_missing_json(tmp_path, []) + update_assembly_versions(missing_json=missing_json, work_dir=str(tmp_path)) + mock_fetch.assert_not_called() + + +# --------------------------------------------------------------------------- +# TestParseAssemblyVersionsPlugin +# --------------------------------------------------------------------------- + +class TestParseAssemblyVersionsPlugin: + """plugin() returns a correctly configured Parser.""" + + def test_plugin_returns_parser(self): + result = incremental_module.plugin() + assert isinstance(result, Parser) + assert result.name == "PARSE_ASSEMBLY_VERSIONS" + assert result.func is incremental_module.parse_assembly_versions_wrapper + + def test_load_missing_versions(self, tmp_path): + """load_missing_versions reads a JSON file into a list of dicts.""" + path = tmp_path / "missing.json" + entries = [{"base_accession": "GCA_000222935", "missing_version": 1}] + path.write_text(json.dumps(entries)) + result = load_missing_versions(str(path)) + assert result == entries