diff --git a/docs/ncbi_ftp_e2e_walkthrough.md b/docs/ncbi_ftp_e2e_walkthrough.md index 32d39be..cbebcee 100644 --- a/docs/ncbi_ftp_e2e_walkthrough.md +++ b/docs/ncbi_ftp_e2e_walkthrough.md @@ -1,9 +1,10 @@ # NCBI FTP Pipeline — Local End-to-End Walkthrough -Step-by-step instructions for running a small (≤ 10 assembly) end-to-end sync +Step-by-step instructions for running a full production transfer in the Lakehouse, +or running a small (≤ 10 assembly) local end-to-end sync of NCBI RefSeq records against a local CEPH container. The walkthrough uses -the two existing Jupyter notebooks for Phases 1 and 3, and the project's Docker -image for the Phase 2 download step. +a Jupyter notebook for manifest generation, a local or containerized download +and staging CLI tool, and a CLI tool for promotion of the staged records to an S3 store. > **Prerequisites:** > - Docker or Podman @@ -15,9 +16,10 @@ image for the Phase 2 download step. ## Architecture overview ``` - Phase 1 (notebook) Phase 2 (container) Phase 3 (notebook) + Manifest (notebook) + (to be replaced with CLI) Download (container CLI) Promote (local CLI) ┌────────────────────┐ ┌───────────────────────┐ ┌──────────────────────┐ -│ Manifest notebook │ │ ncbi_ftp_sync CLI │ │ Promote notebook │ +│ Manifest notebook │ │ ncbi_ftp_sync CLI │ │ Promote CLI tool │ │ ─ download FTP │────▶│ ─ read manifest │────▶│ ─ promote staged │ │ assembly summary │ │ ─ parallel FTP DL │ │ files to Lakehouse │ │ ─ diff against │ │ ─ MD5 verify │ │ ─ archive old ver. │ @@ -31,11 +33,6 @@ image for the Phase 2 download step. --- -## Path anatomy - -All S3 paths in this pipeline compose from a small set of variables. -Understanding this decomposition is the key to configuring the notebooks. - ### Path formats used | Format | Example | Description | @@ -49,8 +46,8 @@ Understanding this decomposition is the key to configuring the notebooks. ### Lakehouse object (final location) ``` -s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename} - └── bucket ─────┘ └── key prefix ──────┘└── build_accession_path() ────────────────────────┘ +s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}/raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename} + └── bucket ─────┘ └── key prefix ───────┘└── build_accession_path() ────────────────────────┘ ``` Example: @@ -61,7 +58,7 @@ s3://cdm-lake/tenant-general-warehouse/kbase/datasets/ncbi/raw_data/GCF/900/000/ ### Staging object (Phase 2 output) ``` -s3://{STAGING_BUCKET}/{STAGING_KEY_PREFIX}raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename} +s3://{STAGING_BUCKET}/{STAGING_KEY_PREFIX}/raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename} └── bucket ─────┘ └── key prefix ────┘└── build_accession_path() ────────────────────────┘ ``` @@ -92,6 +89,13 @@ docker run -d \ -e RGW_SECRET_KEY=test_access_secret \ ghcr.io/kbasetest/ceph-rgw-test-image:0.1.5 ``` +Set CEPH credentials as environment variables: + +```sh +export AWS_ENDPOINT_URL=http://localhost:9000 +export AWS_ACCESS_KEY_ID=test_access_key +export AWS_SECRET_ACCESS_KEY=test_access_secret +``` (Note that a similar service is included in the `docker-compose` configuration file at the root of this repository that is used in CI test workflows.) @@ -230,7 +234,7 @@ Step 3b below. If you are testing against CEPH and want to exercise the S3 upload path: ```python -STAGING_URI = "s3://cdm-lake/staging/run1/" +STAGING_URI = "s3://cts/staging/run1/" ``` > **Tip:** If you re-run later with `PREVIOUS_SUMMARY_URI` pointing at a @@ -264,8 +268,8 @@ docker build -t cdm-data-loaders . ### 3b. Prepare local directories ```sh -mkdir -p notebooks/staging -cp notebooks/output/transfer_manifest.txt notebooks/staging/ +mkdir -p notebooks/staging/input +cp notebooks/output/* notebooks/staging/input ``` ### 3c. Run the download @@ -273,7 +277,7 @@ cp notebooks/output/transfer_manifest.txt notebooks/staging/ ```sh docker run --rm \ --userns=keep-id \ - -v "$(pwd)/notebooks/staging:/input:ro" \ + -v "$(pwd)/notebooks/staging/input:/input:ro" \ -v "$(pwd)/notebooks/staging:/output" \ cdm-data-loaders ncbi_ftp_sync \ --manifest /input/transfer_manifest.txt \ @@ -293,7 +297,7 @@ docker run --rm \ | `--threads` | Parallel FTP connections (2 is polite for testing) | | `--limit` | Redundant safety cap (already limited in Phase 1) | -After the container exits, `notebooks/staging/` will contain: +After the container exits, `notebooks/staging/` will contain something like: ``` staging/ @@ -319,7 +323,7 @@ the FTP server's `md5checksums.txt`. > --threads 2 --limit 10 > ``` -### 3d. Upload staged files to CEPH +### 3d. Local Testing: Upload staged files to CEPH The download step writes to the local filesystem. To feed Phase 3 we need to upload the staged files into CEPH under a staging prefix: @@ -336,49 +340,52 @@ uv run python scripts/s3_local.py ls s3://cts/staging/run1/ --- -## 4. Phase 3 — Promote & archive (notebook) +## 4. Phase 3 — Promote & archive (CLI tool) -Open `notebooks/ncbi_ftp_promote.ipynb`. +Phase 3 uses the `ncbi_ftp_promote` CLI tool to promote staged assemblies from +the S3 staging prefix to their final Lakehouse paths, archive replaced or +suppressed assemblies, and trim the transfer manifest for resumability. -### Constants to change (Cell 3) +### Arguments reference -| Constant | Walkthrough value | Format | Why | -|-------------------------|------------------------------------------------------|--------|---------------------------------------------| -| `STAGING_BUCKET` | `"cts"` | bucket name | CTS staging bucket (Phase 2 writes here) | -| `LAKEHOUSE_BUCKET` | `"cdm-lake"` | bucket name | final Lakehouse destination | -| `STAGING_KEY_PREFIX` | `"staging/run1/"` | S3 key prefix | matches the upload prefix from Step 3d | -| `REMOVED_MANIFEST_PATH` | `None` | local path | nothing to remove on first run | -| `UPDATED_MANIFEST_PATH` | `None` | local path | nothing to archive on first run | -| `NCBI_RELEASE` | `None` | string | no release tag needed for local testing | -| `MANIFEST_S3_KEY` | `None` | S3 object key | skip manifest trimming | -| `LAKEHOUSE_KEY_PREFIX` | `"tenant-general-warehouse/kbase/datasets/ncbi/"` | S3 key prefix | keep default | -| `DRY_RUN` | `True` | bool | **start with dry-run!** | +| Flag | Short | Default | Description | +|------|-------|---------|-------------| +| `--staging-path` | `-s` | *(required)* | S3 key prefix where Phase 2 wrote its output (must contain a `raw_data/` folder) | +| `--destination-path` | | `tenant-general-warehouse/kbase/datasets/ncbi` | S3 key prefix in the destination bucket to promote files into | +| `--staging-bucket` | | `cts` | S3 bucket containing the staged files | +| `--destination-bucket` | | `cdm-lake` | S3 bucket to promote files into (Lakehouse) | +| `--removed-manifest` | `-r` | *(none)* | Local path to the removed manifest from Phase 1; omit to skip archiving removed assemblies | +| `--updated-manifest` | `-u` | *(none)* | Local path to the updated manifest from Phase 1; omit to skip archiving updated assemblies | +| `--transfer-manifest` | `-t` | `{staging-path}/transfer_manifest.txt` | S3 key of the transfer manifest to trim after a successful promote | +| `--dry-run` | | `False` | Log what would happen without making any changes | -### Initialise the S3 client for CEPH -The notebook calls `get_s3_client()` which, by default, tries to import -credentials from `berdl_notebook_utils`. For local CEPH you need to -initialise the client manually **before** running Cell 4. Insert a new cell -after Cell 2 (Imports) with: -```python -from cdm_data_loaders.utils.s3 import get_s3_client, reset_s3_client +First, do a dry run to make sure everything looks as expected: -reset_s3_client() # clear any cached client -get_s3_client({ - "endpoint_url": "http://localhost:9000", - "aws_access_key_id": "test_access_key", - "aws_secret_access_key": "test_access_secret", -}) +```sh +uv run ncbi_ftp_promote \ + --staging-path staging/run1 \ + --removed-manifest notebooks/output/removed.txt \ + --updated-manifest notebooks/output/updated.txt \ + --dry-run ``` -### Run the notebook +Once everything looks good, run the actual promotion: +```sh +uv run ncbi_ftp_promote \ + --staging-path staging/run1 \ + --removed-manifest notebooks/output/removed.txt \ + --updated-manifest notebooks/output/updated.txt +``` + +If you exclude the removed and updated manifests, no archiving will occur, just promotion of staged records. -1. Execute all cells. With `DRY_RUN = True` the promote step will log what it - *would* do without moving any objects. -2. Review the report in Cell 6. -3. If the dry-run looks correct, set `DRY_RUN = False` in Cell 3 and re-run - from Cell 3. +The CLI prints a promote summary on completion: + +``` +PROMOTE SUMMARY: 10 promoted, 0 archived, 0 failed +``` After promotion the final Lakehouse layout in CEPH will look like: @@ -437,8 +444,12 @@ copied to: s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}archive/{release_tag}/metadata/{assembly_dir}_datapackage.json ``` -Use the last cell of `notebooks/ncbi_ftp_promote.ipynb` to list and preview -all descriptors written in a promote run. +Use `scripts/s3_local.py ls` to list all descriptors written in a promote run: + +```sh +uv run python scripts/s3_local.py ls \ + s3://cdm-lake/tenant-general-warehouse/kbase/datasets/ncbi/metadata/ +``` To inspect a descriptor directly: @@ -460,9 +471,9 @@ previous snapshot: 2. **Phase 1:** The diff will now show `updated`, `replaced`, and `suppressed` entries (if any changed between runs). 3. **Phase 2:** Download the new manifest. -4. **Phase 3:** Set `REMOVED_MANIFEST_PATH` and `UPDATED_MANIFEST_PATH` to the paths - from Phase 1. Updated assemblies will be archived before overwrite; - removed assemblies will be archived and deleted. +4. **Phase 3:** Pass `--removed-manifest` and `--updated-manifest` pointing at + the files produced in Phase 1. Updated assemblies are archived before + overwrite; removed assemblies are archived and deleted. --- @@ -482,9 +493,9 @@ rm -rf staging/ output/ | Symptom | Cause | Fix | |---------|-------|-----| -| `berdl_notebook_utils` import error in notebook | Missing local CEPH client init | Add the `get_s3_client({...})` cell described in Step 4 | | `connect_ftp() timeout` | NCBI FTP may be slow or rate-limited | Retry; reduce `--threads` to 1 | -| Phase 3 shows 0 promoted | Staging prefix doesn't match or bucket is wrong | Verify `STAGING_KEY_PREFIX` matches the S3 upload path from Step 3d | +| Phase 3 shows 0 promoted | Staging prefix doesn't match or bucket is wrong | Verify `--staging-path` matches the S3 upload path from Step 3d | +| Phase 3 S3 auth error | Missing credentials for CEPH | Export `AWS_ENDPOINT_URL`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` before running | | Container can't reach FTP | Docker network isolation | Use `--network host` or ensure DNS resolution works inside the container | --- diff --git a/notebooks/ncbi_ftp_manifest.ipynb b/notebooks/ncbi_ftp_manifest.ipynb index d6caa2a..de4fe00 100644 --- a/notebooks/ncbi_ftp_manifest.ipynb +++ b/notebooks/ncbi_ftp_manifest.ipynb @@ -49,7 +49,7 @@ "\"\"\"Imports and S3 client initialisation.\"\"\"\n", "\n", "import json\n", - "from pathlib import Path\n", + "from pathlib import Path, PurePosixPath\n", "\n", "from cdm_data_loaders.ncbi_ftp.assembly import FTP_HOST\n", "from cdm_data_loaders.ncbi_ftp.manifest import (\n", @@ -100,8 +100,8 @@ "DATABASE = \"refseq\"\n", "\n", "# Accession prefix filtering (3-digit, inclusive). Set to None to skip.\n", - "PREFIX_FROM: str | None = \"900\" # e.g. \"000\"\n", - "PREFIX_TO: str | None = \"900\" # e.g. \"003\"\n", + "PREFIX_FROM: str | None = \"960\" # e.g. \"000\"\n", + "PREFIX_TO: str | None = \"962\" # e.g. \"003\"\n", "\n", "# Maximum number of new/updated assemblies to include (None = unlimited)\n", "LIMIT: int | None = 10\n", @@ -120,9 +120,9 @@ "# Set LAKEHOUSE_BUCKET to your bucket name to enable, or None to skip.\n", "# STORE_KEY_PREFIX should point to the directory containing `raw_data/`.\n", "# format: bucket name (no s3:// scheme)\n", - "LAKEHOUSE_BUCKET: str | None = \"cdm-lake\"\n", + "LAKEHOUSE_BUCKET: PurePosixPath | None = PurePosixPath(\"cdm-lake\")\n", "# format: S3 key prefix within LAKEHOUSE_BUCKET (no scheme, no bucket)\n", - "STORE_KEY_PREFIX = \"tenant-general-warehouse/kbase/datasets/ncbi/\"\n", + "STORE_KEY_PREFIX: PurePosixPath = PurePosixPath(\"tenant-general-warehouse/kbase/datasets/ncbi/\")\n", "\n", "# Local output directory for manifest files\n", "# format: local directory path\n", @@ -254,7 +254,7 @@ " # Store scan didn't run, or was skipped. Try to load from S3.\n", " if PREVIOUS_SUMMARY_URI:\n", " s3 = get_s3_client()\n", - " bucket, key = split_s3_path(PREVIOUS_SUMMARY_URI)\n", + " bucket, key = split_s3_path(str(PREVIOUS_SUMMARY_URI))\n", " resp = s3.get_object(Bucket=bucket, Key=key)\n", " prev_text = resp[\"Body\"].read().decode(\"utf-8\")\n", " previous = parse_assembly_summary(prev_text)\n", @@ -396,7 +396,7 @@ "# Upload new snapshot to S3 for future diffing\n", "if SNAPSHOT_UPLOAD_URI:\n", " s3 = get_s3_client()\n", - " bucket, key = split_s3_path(SNAPSHOT_UPLOAD_URI)\n", + " bucket, key = split_s3_path(str(SNAPSHOT_UPLOAD_URI))\n", " s3.put_object(Bucket=bucket, Key=key, Body=raw_summary.encode(\"utf-8\"))\n", " print(f\"Uploaded new snapshot to {SNAPSHOT_UPLOAD_URI}\")\n", "else:\n", @@ -443,7 +443,7 @@ ], "metadata": { "kernelspec": { - "display_name": "cdm-data-loaders (3.13.11)", + "display_name": "cdm-data-loaders (3.13.11.final.0)", "language": "python", "name": "python3" }, diff --git a/notebooks/ncbi_ftp_promote.ipynb b/notebooks/ncbi_ftp_promote.ipynb deleted file mode 100644 index 1bd6e2b..0000000 --- a/notebooks/ncbi_ftp_promote.ipynb +++ /dev/null @@ -1,315 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "2eda19a9", - "metadata": {}, - "source": [ - "# NCBI Assembly Promote & Archive (Phase 3)\n", - "\n", - "Promotes staged assembly files from S3 staging (written by CTS Phase 2)\n", - "to their final Lakehouse paths, archives replaced/suppressed assemblies,\n", - "and trims the transfer manifest for resumability.\n", - "\n", - "Steps:\n", - "1. Configure staging prefix, removed manifest, updated manifest, and release tag\n", - "2. Scan staged files and display summary\n", - "3. Archive existing versions of updated assemblies (pre-overwrite)\n", - "4. Promote files to final paths with MD5 metadata\n", - "5. Archive replaced/suppressed assemblies\n", - "6. Trim manifest (remove promoted entries)" - ] - }, - { - "cell_type": "markdown", - "id": "2f98c43e", - "metadata": {}, - "source": [ - "## Path formats quick reference\n", - "\n", - "| Suffix in variable name | Format | Example |\n", - "|-------------------------|--------|---------|\n", - "| `_BUCKET` | bucket name only | `cdm-lake` |\n", - "| `_KEY_PREFIX` | S3 key prefix (no scheme/bucket) | `staging/run1/` |\n", - "| `_S3_KEY` | S3 object key (no scheme/bucket) | `staging/transfer_manifest.txt` |\n", - "| `_PATH` | local filesystem path | `output/removed_manifest.txt` |\n", - "\n", - "Lakehouse object: `s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}raw_data/…/{filename}`\n", - "Staging object: `s3://{STAGING_BUCKET}/{STAGING_KEY_PREFIX}raw_data/…/{filename}`" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9b736665", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"Imports and S3 client initialisation.\"\"\"\n", - "\n", - "import json\n", - "\n", - "from cdm_data_loaders.ncbi_ftp.promote import (\n", - " DEFAULT_LAKEHOUSE_KEY_PREFIX,\n", - " promote_from_s3,\n", - ")\n", - "from cdm_data_loaders.utils.s3 import get_s3_client" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b36a556c", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"Configure parameters.\n", - "\n", - "Path layout (how variables compose into a full S3 object path):\n", - " s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{file}\n", - " s3://{STAGING_BUCKET}/{STAGING_KEY_PREFIX}raw_data/{assembly_dir}/{file}\n", - "\"\"\"\n", - "\n", - "# S3 bucket where CTS Phase 2 writes staged files\n", - "# format: bucket name (no s3:// scheme)\n", - "STAGING_BUCKET = \"cts\"\n", - "\n", - "# S3 bucket for the final Lakehouse destination\n", - "# format: bucket name (no s3:// scheme)\n", - "LAKEHOUSE_BUCKET = \"cdm-lake\"\n", - "\n", - "# Staging prefix written by CTS Phase 2\n", - "# format: S3 key prefix within STAGING_BUCKET (no scheme, no bucket)\n", - "STAGING_KEY_PREFIX = \"io/matt-cohere/staging/run1/output/\"\n", - "\n", - "# Local path to removed_manifest.txt from Phase 1 (or None to skip archiving)\n", - "# format: local file path\n", - "REMOVED_MANIFEST_PATH: str | None = None # e.g. \"output/removed_manifest.txt\"\n", - "\n", - "# Local path to updated_manifest.txt from Phase 1 (or None to skip pre-overwrite archiving)\n", - "# format: local file path\n", - "UPDATED_MANIFEST_PATH: str | None = None # e.g. \"output/updated_manifest.txt\"\n", - "\n", - "# NCBI release tag for archive metadata (e.g. \"2024-01\")\n", - "NCBI_RELEASE: str | None = None\n", - "\n", - "# S3 key of transfer_manifest.txt for trimming after promotion (or None to skip).\n", - "# Only needed if the manifest was uploaded to S3 (e.g. via the staging cell in Phase 1).\n", - "# format: S3 object key within STAGING_BUCKET (no scheme, no bucket)\n", - "MANIFEST_S3_KEY: str | None = (\n", - " \"io/matt-cohere/staging/run1/input/transfer_manifest.txt\" # e.g. \"staging/transfer_manifest.txt\"\n", - ")\n", - "\n", - "# Local path to transfer_manifest.txt (used when the manifest has not been uploaded to S3).\n", - "# Used only for the object-count estimate in the scan step; set to None to skip.\n", - "# format: local file path\n", - "MANIFEST_LOCAL_PATH: str | None = None # e.g. \"output/transfer_manifest.txt\"\n", - "\n", - "# Final Lakehouse path prefix\n", - "# format: S3 key prefix within LAKEHOUSE_BUCKET (no scheme, no bucket)\n", - "LAKEHOUSE_KEY_PREFIX = DEFAULT_LAKEHOUSE_KEY_PREFIX\n", - "\n", - "# Dry-run mode — log actions without making changes\n", - "DRY_RUN = False\n", - "\n", - "print(f\"Staging bucket: {STAGING_BUCKET}\")\n", - "print(f\"Lakehouse bucket: {LAKEHOUSE_BUCKET}\")\n", - "print(f\"Staging key prefix: {STAGING_KEY_PREFIX}\")\n", - "print(f\"Removed manifest: {REMOVED_MANIFEST_PATH}\")\n", - "print(f\"Updated manifest: {UPDATED_MANIFEST_PATH}\")\n", - "print(f\"NCBI release: {NCBI_RELEASE}\")\n", - "print(f\"Manifest S3 key: {MANIFEST_S3_KEY}\")\n", - "print(f\"Manifest local path: {MANIFEST_LOCAL_PATH}\")\n", - "print(f\"Lakehouse prefix: {LAKEHOUSE_KEY_PREFIX}\")\n", - "print(f\"Dry-run: {DRY_RUN}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ccfd88d9", - "metadata": {}, - "outputs": [], - "source": [ - "from cdm_data_loaders.utils.s3 import reset_s3_client\n", - "\n", - "# Provide S3 credentials (use for local testing against CEPH test container)\n", - "PROVIDE_CREDENTIALS = False # Set to False to rely on environment credentials (e.g. IAM role)\n", - "if PROVIDE_CREDENTIALS:\n", - " reset_s3_client() # Clear any existing client to ensure new credentials are used\n", - " get_s3_client(\n", - " {\n", - " \"endpoint_url\": \"http://localhost:9000\",\n", - " \"aws_access_key_id\": \"test_access_key\",\n", - " \"aws_secret_access_key\": \"test_access_secret\",\n", - " }\n", - " )" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e521fd45", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"Scan staged files and display summary.\"\"\"\n", - "\n", - "import tqdm\n", - "\n", - "s3 = get_s3_client()\n", - "paginator = s3.get_paginator(\"list_objects_v2\")\n", - "\n", - "# Estimate total objects from manifest so tqdm can show a percentage.\n", - "# Each assembly typically produces ~11 data files + ~10 .md5 sidecars = ~21 objects.\n", - "_FILES_PER_ASSEMBLY_EST = 21\n", - "estimated_total = None\n", - "if MANIFEST_S3_KEY:\n", - " try:\n", - " _resp = s3.get_object(Bucket=STAGING_BUCKET, Key=MANIFEST_S3_KEY)\n", - " _lines = [\n", - " ln.strip() for ln in _resp[\"Body\"].read().decode().splitlines() if ln.strip() and not ln.startswith(\"#\")\n", - " ]\n", - " estimated_total = len(_lines) * _FILES_PER_ASSEMBLY_EST\n", - " print(f\"Manifest (S3) has {len(_lines)} assemblies → estimated ~{estimated_total} staged objects\")\n", - " except Exception as e:\n", - " print(f\"Could not read S3 manifest for estimate: {e}\")\n", - "elif MANIFEST_LOCAL_PATH:\n", - " try:\n", - " from pathlib import Path\n", - "\n", - " _lines = [\n", - " ln.strip()\n", - " for ln in Path(MANIFEST_LOCAL_PATH).read_text().splitlines()\n", - " if ln.strip() and not ln.startswith(\"#\")\n", - " ]\n", - " estimated_total = len(_lines) * _FILES_PER_ASSEMBLY_EST\n", - " print(f\"Manifest (local) has {len(_lines)} assemblies → estimated ~{estimated_total} staged objects\")\n", - " except Exception as e:\n", - " print(f\"Could not read local manifest for estimate: {e}\")\n", - "\n", - "staged: list[str] = []\n", - "with tqdm.tqdm(total=estimated_total, unit=\"obj\", desc=\"Scanning staging prefix\", dynamic_ncols=True) as pbar:\n", - " for page in paginator.paginate(Bucket=STAGING_BUCKET, Prefix=STAGING_KEY_PREFIX):\n", - " keys = [obj[\"Key\"] for obj in page.get(\"Contents\", [])]\n", - " staged.extend(keys)\n", - " pbar.update(len(keys))\n", - "\n", - "sidecars = []\n", - "data_files = []\n", - "for k in staged:\n", - " if k.endswith((\".md5\", \".crc64nvme\")):\n", - " sidecars.append(k)\n", - " else:\n", - " data_files.append(k)\n", - "\n", - "print(f\"Staged objects: {len(staged)}\")\n", - "print(f\" Data files: {len(data_files)}\")\n", - "print(f\" Sidecars: {len(sidecars)}\")\n", - "\n", - "# Show first few data files\n", - "PREVIEW_COUNT = 10\n", - "for key in data_files[:PREVIEW_COUNT]:\n", - " print(f\" {key}\")\n", - "if len(data_files) > PREVIEW_COUNT:\n", - " print(f\" ... and {len(data_files) - PREVIEW_COUNT} more\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "10a46367", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"Promote staged files to final Lakehouse paths.\"\"\"\n", - "\n", - "report = promote_from_s3(\n", - " staging_key_prefix=STAGING_KEY_PREFIX,\n", - " staging_bucket=STAGING_BUCKET,\n", - " lakehouse_bucket=LAKEHOUSE_BUCKET,\n", - " removed_manifest_path=REMOVED_MANIFEST_PATH,\n", - " updated_manifest_path=UPDATED_MANIFEST_PATH,\n", - " ncbi_release=NCBI_RELEASE,\n", - " manifest_s3_key=MANIFEST_S3_KEY,\n", - " lakehouse_key_prefix=LAKEHOUSE_KEY_PREFIX,\n", - " dry_run=DRY_RUN,\n", - ")\n", - "\n", - "print(json.dumps(report, indent=2))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9d18a1e0", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"Display promotion report.\"\"\"\n", - "\n", - "print(\"=\" * 50)\n", - "print(\"PROMOTION REPORT\")\n", - "print(\"=\" * 50)\n", - "print(f\"Promoted: {report['promoted']}\")\n", - "print(f\"Archived: {report['archived']}\")\n", - "print(f\"Failed: {report['failed']}\")\n", - "print(f\"Dry-run: {report['dry_run']}\")\n", - "print(f\"Timestamp: {report['timestamp']}\")\n", - "\n", - "if report[\"failed\"] > 0:\n", - " print(\"\\n⚠️ Some operations failed — check logs above for details.\")\n", - "\n", - "if report[\"dry_run\"]:\n", - " print(\"\\n📋 This was a dry-run. Set DRY_RUN = False and re-run to apply changes.\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7fb27b941602401d91542211134fc71a", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"Inspect frictionless descriptors written to metadata/.\"\"\"\n", - "\n", - "\n", - "s3 = get_s3_client()\n", - "paginator = s3.get_paginator(\"list_objects_v2\")\n", - "\n", - "descriptor_keys: list[str] = []\n", - "for page in paginator.paginate(Bucket=LAKEHOUSE_BUCKET, Prefix=LAKEHOUSE_KEY_PREFIX + \"metadata/\"):\n", - " descriptor_keys.extend(obj[\"Key\"] for obj in page.get(\"Contents\", []))\n", - "\n", - "print(f\"Found {len(descriptor_keys)} descriptor(s) in metadata/\")\n", - "\n", - "for key in descriptor_keys[:5]: # preview first 5\n", - " obj = s3.get_object(Bucket=LAKEHOUSE_BUCKET, Key=key)\n", - " descriptor = json.loads(obj[\"Body\"].read())\n", - " print()\n", - " print(f\" Key: {key}\")" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "cdm-data-loaders (3.13.11)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.13.11" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/pyproject.toml b/pyproject.toml index 3686ae5..f8d85b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ [project.scripts] all_the_bacteria = "cdm_data_loaders.pipelines.all_the_bacteria:cli" +ncbi_ftp_promote = "cdm_data_loaders.pipelines.ncbi_ftp_promote:cli" ncbi_ftp_sync = "cdm_data_loaders.pipelines.ncbi_ftp_download:cli" ncbi_rest_api = "cdm_data_loaders.pipelines.ncbi_rest_api:cli" uniprot = "cdm_data_loaders.pipelines.uniprot_kb:cli" diff --git a/src/cdm_data_loaders/ncbi_ftp/manifest.py b/src/cdm_data_loaders/ncbi_ftp/manifest.py index 152b243..7aa4dfa 100644 --- a/src/cdm_data_loaders/ncbi_ftp/manifest.py +++ b/src/cdm_data_loaders/ncbi_ftp/manifest.py @@ -136,7 +136,14 @@ def _parse_lines(lines: Iterable[str]) -> None: continue accession = row[_ACCESSION_COL] ftp_url = row[_FTP_URL_COL] - if ftp_url == "na": + if "https://ftp" not in ftp_url: + for col in row: + if "https://ftp" in col: + ftp_url = col + break + if "https://ftp" not in ftp_url: + msg = f"Missing ftp path for record {accession}." + logger.warning(msg) continue assembly_dir = PurePosixPath(_ftp_dir_from_url(ftp_url).name) assemblies[accession] = AssemblyRecord( diff --git a/src/cdm_data_loaders/ncbi_ftp/promote.py b/src/cdm_data_loaders/ncbi_ftp/promote.py index a107a01..6e3596a 100644 --- a/src/cdm_data_loaders/ncbi_ftp/promote.py +++ b/src/cdm_data_loaders/ncbi_ftp/promote.py @@ -36,8 +36,6 @@ logger: Logger = getLogger(__name__) -DEFAULT_LAKEHOUSE_KEY_PREFIX: PurePosixPath = PurePosixPath("tenant-general-warehouse/kbase/datasets/ncbi") - _MAX_DRY_RUN_LOGS = 10 @@ -49,7 +47,7 @@ def promote_from_s3( # noqa: PLR0913 staging_bucket: PurePosixPath, staging_key_prefix: PurePosixPath, lakehouse_bucket: PurePosixPath, - lakehouse_key_prefix: PurePosixPath = DEFAULT_LAKEHOUSE_KEY_PREFIX, + lakehouse_key_prefix: PurePosixPath, removed_manifest_path: Path | None = None, updated_manifest_path: Path | None = None, manifest_s3_key: PurePosixPath | None = None, @@ -91,8 +89,8 @@ def promote_from_s3( # noqa: PLR0913 archived += _archive_assemblies( manifest_file, lakehouse_bucket=lakehouse_bucket, - ncbi_release=ncbi_release, lakehouse_key_prefix=lakehouse_key_prefix, + ncbi_release=ncbi_release, archive_reason=reason, delete_source=delete, dry_run=dry_run, @@ -473,8 +471,8 @@ def _archive_objects( def _archive_assemblies( # noqa: PLR0913 manifest_local_path: Path, lakehouse_bucket: PurePosixPath, + lakehouse_key_prefix: PurePosixPath, ncbi_release: str | None = None, - lakehouse_key_prefix: PurePosixPath = DEFAULT_LAKEHOUSE_KEY_PREFIX, archive_reason: str = "unknown", *, delete_source: bool = False, diff --git a/src/cdm_data_loaders/pipelines/ncbi_ftp_download.py b/src/cdm_data_loaders/pipelines/ncbi_ftp_download.py index 636eed4..97897d7 100644 --- a/src/cdm_data_loaders/pipelines/ncbi_ftp_download.py +++ b/src/cdm_data_loaders/pipelines/ncbi_ftp_download.py @@ -19,7 +19,6 @@ import tqdm from pydantic import AliasChoices, Field -from pydantic_settings import BaseSettings, SettingsConfigDict from tenacity import before_sleep_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from cdm_data_loaders.ncbi_ftp.assembly import ( @@ -29,7 +28,7 @@ parse_assembly_path, ) from cdm_data_loaders.pipelines.core import run_cli -from cdm_data_loaders.pipelines.cts_defaults import DEFAULT_SETTINGS_CONFIG_DICT, INPUT_MOUNT, OUTPUT_MOUNT +from cdm_data_loaders.pipelines.cts_defaults import INPUT_MOUNT, OUTPUT_MOUNT, CtsSettings from cdm_data_loaders.utils.ftp_client import ThreadLocalFTP from cdm_data_loaders.utils.s3 import get_s3_client, upload_file @@ -39,11 +38,9 @@ DEFAULT_STAGING_KEY_PREFIX: PurePosixPath = PurePosixPath("staging") -class DownloadSettings(BaseSettings): +class DownloadSettings(CtsSettings): """Configuration for the NCBI FTP assembly download pipeline.""" - model_config = SettingsConfigDict(**DEFAULT_SETTINGS_CONFIG_DICT) - manifest: Path = Field( default=Path(INPUT_MOUNT) / "transfer_manifest.txt", description="Path to the transfer manifest file listing FTP paths to download", diff --git a/src/cdm_data_loaders/pipelines/ncbi_ftp_promote.py b/src/cdm_data_loaders/pipelines/ncbi_ftp_promote.py new file mode 100644 index 0000000..d2f3736 --- /dev/null +++ b/src/cdm_data_loaders/pipelines/ncbi_ftp_promote.py @@ -0,0 +1,92 @@ +"""NCBI FTP staged file promotion pipeline (Phase 3). + +Promotes staged files from the containerized NCBI FTP download pipeline to their +final location in the Lakehouse. +""" + +import logging +from pathlib import Path, PurePosixPath + +from pydantic import AliasChoices, Field +from pydantic_settings import CliImplicitFlag + +from cdm_data_loaders.ncbi_ftp.promote import promote_from_s3 +from cdm_data_loaders.pipelines.core import run_cli +from cdm_data_loaders.pipelines.cts_defaults import CtsSettings + +DEFAULT_STAGING_BUCKET: PurePosixPath = PurePosixPath("cts") +DEFAULT_DESTINATION_BUCKET: PurePosixPath = PurePosixPath("cdm-lake") +DEFAULT_DESTINATION_PREFIX: PurePosixPath = PurePosixPath("tenant-general-warehouse/kbase/datasets/ncbi") +DEFAULT_TRANSFER_MANIFEST_FILE: PurePosixPath = PurePosixPath("transfer_manifest.txt") +ESTIMATED_FILES_PER_ASSEMBLY: int = 21 + +logger = logging.getLogger("dlt") + + +class PromoteSettings(CtsSettings): + """Configuration for the NCBI FTP file promotion pipeline.""" + + staging_bucket: PurePosixPath = Field( + default=DEFAULT_STAGING_BUCKET, + description="Bucket where staged files are located after download", + validation_alias=AliasChoices("staging-bucket", "staging_bucket"), + ) + destination_bucket: PurePosixPath = Field( + default=DEFAULT_DESTINATION_BUCKET, + description="Bucket to which staged files will be promoted.", + validation_alias=AliasChoices("destination-bucket", "destination_bucket"), + ) + staging_path: PurePosixPath = Field( + description="Path to folder in the staging bucket where staged files are located; Should contain `raw_data/` folder", + validation_alias=AliasChoices("s", "staging-path", "staging_path"), + ) + destination_path: PurePosixPath = Field( + default=DEFAULT_DESTINATION_PREFIX, + description="Path to folder in the destination bucket where files will be promoted to; Will contain `raw_data/` folder", + validation_alias=AliasChoices("destination-path", "destination_path"), + ) + removed_manifest_path: Path | None = Field( + default=None, + description="Local filesystem path to the removed files manifest from Phase 1, or None to skip archiving removed records", + validation_alias=AliasChoices("r", "removed-manifest", "removed_manifest"), + ) + updated_manifest_path: Path | None = Field( + default=None, + description="Local filesystem path to the updated files manifest from Phase 1, or None to skip archiving updated records", + validation_alias=AliasChoices("u", "updated-manifest", "updated_manifest"), + ) + transfer_manifest_path: PurePosixPath | None = Field( + default_factory=lambda settings: settings["staging_path"] / DEFAULT_TRANSFER_MANIFEST_FILE, + description="S3 object key of the transfer manifest to trim after promotion, or None to skip pruning staged files", + validation_alias=AliasChoices("t", "transfer-manifest", "transfer_manifest"), + ) + dry_run: CliImplicitFlag[bool] = Field( + default=False, + description="Log actions without making changes", + validation_alias=AliasChoices("dry-run", "dry_run"), + ) + + +def run_promote(config: PromoteSettings) -> None: + """Main CTS entry point for Phase 3 promotion. + + :param config: validated promote settings + """ + report = promote_from_s3( + staging_bucket=config.staging_bucket, + staging_key_prefix=config.staging_path, + lakehouse_bucket=config.destination_bucket, + lakehouse_key_prefix=config.destination_path, + removed_manifest_path=config.removed_manifest_path, + updated_manifest_path=config.updated_manifest_path, + manifest_s3_key=config.transfer_manifest_path, + dry_run=config.dry_run, + ) + if report["failed"] > 0: + msg = f"Promote completed with {report['failed']} failures" + raise RuntimeError(msg) + + +def cli() -> None: + """CLI entry point for ``ncbi_ftp_promote``.""" + run_cli(PromoteSettings, run_promote) diff --git a/tests/integration/test_full_pipeline.py b/tests/integration/test_full_pipeline.py index 19e8852..f7e4089 100644 --- a/tests/integration/test_full_pipeline.py +++ b/tests/integration/test_full_pipeline.py @@ -22,11 +22,12 @@ write_transfer_manifest, write_updated_manifest, ) -from cdm_data_loaders.ncbi_ftp.promote import DEFAULT_LAKEHOUSE_KEY_PREFIX, promote_from_s3 +from cdm_data_loaders.ncbi_ftp.promote import promote_from_s3 from cdm_data_loaders.pipelines.ncbi_ftp_download import download_batch from .conftest import get_object_metadata, list_all_keys, stage_files_to_ceph +DEFAULT_LAKEHOUSE_KEY_PREFIX: PurePosixPath = PurePosixPath("tenant-general-warehouse/kbase/datasets/ncbi") STABLE_PREFIX = "900" STAGING_PREFIX = PurePosixPath("staging") / "run1" PATH_PREFIX = DEFAULT_LAKEHOUSE_KEY_PREFIX diff --git a/tests/integration/test_manifest_e2e.py b/tests/integration/test_manifest_e2e.py index bacefd0..5656ba3 100644 --- a/tests/integration/test_manifest_e2e.py +++ b/tests/integration/test_manifest_e2e.py @@ -25,9 +25,10 @@ write_transfer_manifest, write_updated_manifest, ) -from cdm_data_loaders.ncbi_ftp.promote import DEFAULT_LAKEHOUSE_KEY_PREFIX from cdm_data_loaders.utils.ftp_client import connect_ftp, ftp_retrieve_text +DEFAULT_LAKEHOUSE_KEY_PREFIX: PurePosixPath = PurePosixPath("tenant-general-warehouse/kbase/datasets/ncbi") + # Use a high-numbered prefix range that typically has only a handful of # assemblies, keeping FTP traffic minimal. STABLE_PREFIX = "900" diff --git a/tests/integration/test_promote_e2e.py b/tests/integration/test_promote_e2e.py index 26b2106..ab6cf4d 100644 --- a/tests/integration/test_promote_e2e.py +++ b/tests/integration/test_promote_e2e.py @@ -22,10 +22,12 @@ build_descriptor_key, create_descriptor, ) -from cdm_data_loaders.ncbi_ftp.promote import DEFAULT_LAKEHOUSE_KEY_PREFIX, _archive_assemblies, promote_from_s3 +from cdm_data_loaders.ncbi_ftp.promote import _archive_assemblies, promote_from_s3 from .conftest import get_object_metadata, list_all_keys, seed_lakehouse +DEFAULT_LAKEHOUSE_KEY_PREFIX: PurePosixPath = PurePosixPath("tenant-general-warehouse/kbase/datasets/ncbi") + # Fake assembly details used across tests ACCESSION_A = "GCF_900000001.1" ASSEMBLY_DIR_A: PurePosixPath = PurePosixPath("GCF_900000001.1_FakeAssemblyA") diff --git a/tests/ncbi_ftp/test_notebooks.py b/tests/ncbi_ftp/test_notebooks.py index 652feea..b61afd1 100644 --- a/tests/ncbi_ftp/test_notebooks.py +++ b/tests/ncbi_ftp/test_notebooks.py @@ -13,21 +13,15 @@ download_assembly_summary, write_updated_manifest, ) -from cdm_data_loaders.ncbi_ftp.promote import ( - DEFAULT_LAKEHOUSE_KEY_PREFIX, - promote_from_s3, -) from cdm_data_loaders.pipelines.ncbi_ftp_download import ( DEFAULT_STAGING_KEY_PREFIX, download_and_stage, ) -from cdm_data_loaders.utils.s3 import split_s3_path NOTEBOOKS_DIR = Path(__file__).resolve().parents[2] / "notebooks" NCBI_NOTEBOOKS = [ "ncbi_ftp_manifest.ipynb", - "ncbi_ftp_promote.ipynb", "ncbi_ftp_download.ipynb", ] @@ -68,13 +62,6 @@ def test_manifest_notebook_imports() -> None: assert callable(write_updated_manifest) -def test_promote_notebook_imports() -> None: - """All promote notebook imports are verified at module load time above.""" - assert callable(promote_from_s3) - assert isinstance(DEFAULT_LAKEHOUSE_KEY_PREFIX, PurePosixPath) - assert callable(split_s3_path) - - def test_download_notebook_imports() -> None: """All download notebook imports resolve without error.""" assert callable(download_and_stage) diff --git a/tests/ncbi_ftp/test_promote.py b/tests/ncbi_ftp/test_promote.py index 09c5ddd..b773e0f 100644 --- a/tests/ncbi_ftp/test_promote.py +++ b/tests/ncbi_ftp/test_promote.py @@ -11,7 +11,6 @@ import pytest from cdm_data_loaders.ncbi_ftp.promote import ( - DEFAULT_LAKEHOUSE_KEY_PREFIX, _archive_assemblies, _archive_objects, _dry_run_output, @@ -22,6 +21,8 @@ ) from tests.ncbi_ftp.conftest import ACC_PATH_215, ACC_PATH_845, TEST_BUCKET +DEFAULT_LAKEHOUSE_KEY_PREFIX: PurePosixPath = PurePosixPath("tenant-general-warehouse/kbase/datasets/ncbi") + # Promotion test constants _STAGE_PREFIX: PurePosixPath = PurePosixPath("staging") / "run1" @@ -90,7 +91,11 @@ def test_promote_dry_run_no_writes(mock_s3_client_no_checksum: botocore.client.B _stage_files(mock_s3_client_no_checksum, prefix) report = promote_from_s3( - staging_key_prefix=prefix, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, dry_run=True + staging_key_prefix=prefix, + staging_bucket=TEST_BUCKET, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + dry_run=True, ) assert report["promoted"] == 1 assert report["dry_run"] is True @@ -108,7 +113,12 @@ def test_promote_with_metadata(mock_s3_client_no_checksum: botocore.client.BaseC prefix = PurePosixPath("staging/run1") _stage_files(mock_s3_client_no_checksum, prefix) - report = promote_from_s3(staging_key_prefix=prefix, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET) + report = promote_from_s3( + staging_key_prefix=prefix, + staging_bucket=TEST_BUCKET, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ) assert report["promoted"] == 1 # only .fna.gz, not download_report.json assert report["failed"] == 0 @@ -288,7 +298,7 @@ def test_get_accession_path_prefix( ), ], ) -def test_get_source_dest_pairs_for_accession( # noqa: PLR0913 +def test_get_source_dest_pairs_for_accession( accession: str, bucket: PurePosixPath, prefix: PurePosixPath, @@ -383,7 +393,7 @@ def test_dry_run_output( ], ) @pytest.mark.s3 -def test_archive_objects( # noqa: PLR0913 +def test_archive_objects( mock_s3_client_no_checksum: botocore.client.BaseClient, key_pairs: list[tuple[PurePosixPath, PurePosixPath]], bucket: PurePosixPath, @@ -414,6 +424,7 @@ def test_archive_assemblies_removed(mock_s3_client_no_checksum: botocore.client. _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-01", archive_reason="replaced_or_suppressed", delete_source=True, @@ -453,6 +464,7 @@ def test_archive_assemblies_updated_no_delete( _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-06", archive_reason="updated", delete_source=False, @@ -486,9 +498,21 @@ def test_archive_assemblies_multiple_releases_no_collision( manifest = tmp_path / "updated.txt" manifest.write_text(f"{accession}\n") - _archive_assemblies(manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-01", archive_reason="updated") + _archive_assemblies( + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-01", + archive_reason="updated", + ) mock_s3_client_no_checksum.put_object(Bucket=str(TEST_BUCKET), Key=str(key), Body=b"v2-data") - _archive_assemblies(manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-06", archive_reason="updated") + _archive_assemblies( + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-06", + archive_reason="updated", + ) archive_key_1 = ( DEFAULT_LAKEHOUSE_KEY_PREFIX @@ -532,6 +556,7 @@ def test_archive_assemblies_dry_run(mock_s3_client_no_checksum: botocore.client. _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-01", archive_reason="replaced_or_suppressed", delete_source=True, @@ -558,7 +583,15 @@ def test_archive_assemblies_no_objects_skips( """Accessions with no existing S3 objects are silently skipped.""" manifest = tmp_path / "updated.txt" manifest.write_text("GCF_000001215.4\n") - assert _archive_assemblies(manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-01") == 0 + assert ( + _archive_assemblies( + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-01", + ) + == 0 + ) @pytest.mark.s3 @@ -573,7 +606,12 @@ def test_archive_assemblies_unknown_release_fallback( manifest = tmp_path / "updated.txt" manifest.write_text(f"{accession}\n") - assert _archive_assemblies(manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release=None) == 1 + assert ( + _archive_assemblies( + manifest, lakehouse_bucket=TEST_BUCKET, lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release=None + ) + == 1 + ) archive_key = ( DEFAULT_LAKEHOUSE_KEY_PREFIX @@ -616,6 +654,7 @@ def test_archive_assemblies_multi_file_all_copied( archived = _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-01", archive_reason="updated", delete_source=False, @@ -648,6 +687,7 @@ def test_archive_assemblies_multi_file_content_preserved( _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-01", archive_reason="updated", delete_source=False, @@ -680,6 +720,7 @@ def test_archive_assemblies_multi_file_delete_all( archived = _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-03", archive_reason="replaced_or_suppressed", delete_source=True, @@ -731,6 +772,7 @@ def test_archive_assemblies_partial_already_archived_overwritten( archived = _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-01", archive_reason="updated", delete_source=False, @@ -780,6 +822,7 @@ def test_archive_assemblies_partial_delete_resumes( archived = _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-03", archive_reason="replaced_or_suppressed", delete_source=True, @@ -813,10 +856,18 @@ def test_archive_assemblies_idempotent_updated_reruns_cleanly( manifest.write_text(f"{accession}\n") archived_1 = _archive_assemblies( - manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-01", archive_reason="updated" + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-01", + archive_reason="updated", ) archived_2 = _archive_assemblies( - manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-01", archive_reason="updated" + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-01", + archive_reason="updated", ) assert archived_1 == len(file_names) @@ -844,7 +895,11 @@ def test_archive_assemblies_multi_accession_manifest( manifest.write_text("\n".join(acc for acc, _, _ in accessions) + "\n") archived = _archive_assemblies( - manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-01", archive_reason="updated" + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-01", + archive_reason="updated", ) assert archived == len(accessions) @@ -880,6 +935,7 @@ def test_archive_assemblies_dry_run_multi_file( archived = _archive_assemblies( manifest, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ncbi_release="2024-01", archive_reason="replaced_or_suppressed", delete_source=True, @@ -911,7 +967,11 @@ def test_archive_assemblies_invalid_accession_skipped( manifest.write_text("NOT_AN_ACCESSION\n\n \n" + f"{accession}\n") archived = _archive_assemblies( - manifest, lakehouse_bucket=TEST_BUCKET, ncbi_release="2024-01", archive_reason="updated" + manifest, + lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, + ncbi_release="2024-01", + archive_reason="updated", ) assert archived == 1 @@ -937,6 +997,7 @@ def test_promote_multi_file_all_land_at_final_path( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report["promoted"] == len(file_names) @@ -962,6 +1023,7 @@ def test_promote_multi_file_content_preserved( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) for fname, expected in files.items(): @@ -983,6 +1045,7 @@ def test_promote_md5_metadata_set_from_sidecar( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) resp = mock_s3_client_no_checksum.head_object(Bucket=str(TEST_BUCKET), Key=str(_LKH1 / fname)) @@ -1001,6 +1064,7 @@ def test_promote_no_sidecar_no_md5_metadata( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) resp = mock_s3_client_no_checksum.head_object(Bucket=str(TEST_BUCKET), Key=str(_LKH1 / fname)) @@ -1022,6 +1086,7 @@ def test_promote_staging_data_files_deleted_after_promote( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) for key in staged_keys: @@ -1044,6 +1109,7 @@ def test_promote_md5_sidecars_deleted_after_promote( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) for key in staged_keys: @@ -1064,6 +1130,7 @@ def test_promote_crc64nvme_sidecars_deleted_after_promote( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) staged_key = f"{_STG1}{fname}" @@ -1103,6 +1170,7 @@ def _download_one_fail(Bucket: str, Key: str, Filename: str, **kw: object) -> No staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report["failed"] == 1 @@ -1142,6 +1210,7 @@ def _download_middle_fail(Bucket: str, Key: str, Filename: str, **kw: object) -> staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report["failed"] == 1 @@ -1185,6 +1254,7 @@ def _patched(Bucket: str, Key: str, Filename: str, **kw: object) -> None: # noq staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report["failed"] == 1 @@ -1214,6 +1284,7 @@ def test_promote_multi_assembly_all_succeed_all_cleaned( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report["promoted"] == 2 # noqa: PLR2004 @@ -1241,6 +1312,7 @@ def test_promote_dry_run_multi_file_no_writes_no_cleanup( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, dry_run=True, ) @@ -1275,6 +1347,7 @@ def test_promote_skips_non_raw_data_paths( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report["promoted"] == 1 # only the .fna.gz @@ -1292,11 +1365,13 @@ def test_promote_idempotent_second_run_on_empty_staging( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) report2 = promote_from_s3( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) assert report1["promoted"] == 1 @@ -1324,6 +1399,7 @@ def test_promote_multi_file_md5_per_file( staging_key_prefix=_STAGE_PREFIX, staging_bucket=TEST_BUCKET, lakehouse_bucket=TEST_BUCKET, + lakehouse_key_prefix=DEFAULT_LAKEHOUSE_KEY_PREFIX, ) for fname, content in files.items(): diff --git a/tests/pipelines/test_ncbi_ftp_download.py b/tests/pipelines/test_ncbi_ftp_download.py index 675ac53..83ae76f 100644 --- a/tests/pipelines/test_ncbi_ftp_download.py +++ b/tests/pipelines/test_ncbi_ftp_download.py @@ -20,6 +20,7 @@ download_batch, ) from cdm_data_loaders.utils.s3 import reset_s3_client +from tests.pipelines.conftest import _generate_dlt_config _MOCK_STATS = { "accession": "GCF_000001215.4", @@ -43,7 +44,7 @@ def make_settings(**kwargs: str | int | bool | Path | PurePosixPath) -> DownloadSettings: """Generate a validated DownloadSettings object.""" settings_ctor = cast("Any", DownloadSettings) - return settings_ctor(_cli_parse_args=[], **kwargs) + return settings_ctor(_cli_parse_args=[], dlt_config=_generate_dlt_config(), **kwargs) # Settings defaults diff --git a/tests/pipelines/test_ncbi_ftp_promote.py b/tests/pipelines/test_ncbi_ftp_promote.py new file mode 100644 index 0000000..8d2eb20 --- /dev/null +++ b/tests/pipelines/test_ncbi_ftp_promote.py @@ -0,0 +1,275 @@ +"""Tests for pipelines.ncbi_ftp_promote — settings, pipeline orchestration, CLI.""" + +from pathlib import Path, PurePosixPath +from typing import Any, cast +from unittest.mock import patch + +import pytest +from pydantic import ValidationError + +from cdm_data_loaders.pipelines.ncbi_ftp_promote import ( + DEFAULT_DESTINATION_BUCKET, + DEFAULT_DESTINATION_PREFIX, + DEFAULT_STAGING_BUCKET, + DEFAULT_TRANSFER_MANIFEST_FILE, + PromoteSettings, + run_promote, +) +from tests.pipelines.conftest import _generate_dlt_config + +_DEFAULT_STAGING_PATH: PurePosixPath = PurePosixPath("staging") / "run1" + + +def make_settings(**kwargs: str | int | bool | Path | PurePosixPath | None) -> PromoteSettings: + """Generate a validated PromoteSettings object with a required staging_path default.""" + settings_ctor = cast("Any", PromoteSettings) + kwargs.setdefault("staging_path", _DEFAULT_STAGING_PATH) + return settings_ctor(_cli_parse_args=[], dlt_config=_generate_dlt_config(), **kwargs) + + +# Settings defaults + + +class TestPromoteSettingsDefaults: + """Test default settings.""" + + def test_staging_bucket_default(self) -> None: + """Verify default staging_bucket matches DEFAULT_STAGING_BUCKET constant.""" + s = make_settings() + assert s.staging_bucket == DEFAULT_STAGING_BUCKET + + def test_destination_bucket_default(self) -> None: + """Verify default destination_bucket matches DEFAULT_DESTINATION_BUCKET constant.""" + s = make_settings() + assert s.destination_bucket == DEFAULT_DESTINATION_BUCKET + + def test_destination_path_default(self) -> None: + """Verify default destination_path matches DEFAULT_DESTINATION_PREFIX constant.""" + s = make_settings() + assert s.destination_path == DEFAULT_DESTINATION_PREFIX + + def test_removed_manifest_default_none(self) -> None: + """Verify default removed_manifest_path is None.""" + s = make_settings() + assert s.removed_manifest_path is None + + def test_updated_manifest_default_none(self) -> None: + """Verify default updated_manifest_path is None.""" + s = make_settings() + assert s.updated_manifest_path is None + + def test_transfer_manifest_default(self) -> None: + """Verify default transfer_manifest_path is derived from staging_path.""" + s = make_settings() + assert s.transfer_manifest_path == _DEFAULT_STAGING_PATH / DEFAULT_TRANSFER_MANIFEST_FILE + + def test_dry_run_default_false(self) -> None: + """Verify default dry_run is False.""" + s = make_settings() + assert s.dry_run is False + + +# Settings all params + + +class TestPromoteSettingsAllParams: + """Test with all params explicitly set.""" + + def test_all_params(self, tmp_path: Path) -> None: + """Verify all parameters are correctly set when provided.""" + staging = PurePosixPath("my-staging-bucket") + dest = PurePosixPath("my-dest-bucket") + staging_path = PurePosixPath("staging") / "run42" + destination_path = PurePosixPath("warehouse") / "ncbi" + removed = tmp_path / "removed.txt" + updated = tmp_path / "updated.txt" + transfer = PurePosixPath("staging") / "run42" / "transfer_manifest.txt" + + s = make_settings( + staging_bucket=staging, + destination_bucket=dest, + staging_path=staging_path, + destination_path=destination_path, + removed_manifest=removed, + updated_manifest=updated, + transfer_manifest=transfer, + dry_run=True, + ) + + assert s.staging_bucket == staging + assert s.destination_bucket == dest + assert s.staging_path == staging_path + assert s.destination_path == destination_path + assert s.removed_manifest_path == removed + assert s.updated_manifest_path == updated + assert s.transfer_manifest_path == transfer + assert s.dry_run is True + + +# Settings aliases + + +class TestPromoteSettingsAliases: + """Test CLI alias resolution.""" + + def test_staging_path_alias_s(self) -> None: + """Verify 's' alias resolves to staging_path.""" + path = PurePosixPath("staging") / "runX" + s = make_settings(s=path) + assert s.staging_path == path + + def test_destination_path_alias_destination_path(self) -> None: + """Verify 'destination_path' alias resolves to destination_path.""" + path = PurePosixPath("warehouse") / "custom" + s = make_settings(destination_path=path) + assert s.destination_path == path + + def test_removed_manifest_alias_r(self, tmp_path: Path) -> None: + """Verify 'r' alias resolves to removed_manifest_path.""" + p = tmp_path / "removed.txt" + s = make_settings(r=p) + assert s.removed_manifest_path == p + + def test_updated_manifest_alias_u(self, tmp_path: Path) -> None: + """Verify 'u' alias resolves to updated_manifest_path.""" + p = tmp_path / "updated.txt" + s = make_settings(u=p) + assert s.updated_manifest_path == p + + def test_transfer_manifest_alias_t(self) -> None: + """Verify 't' alias resolves to transfer_manifest_path.""" + p = PurePosixPath("staging") / "run1" / "manifest.txt" + s = make_settings(t=p) + assert s.transfer_manifest_path == p + + def test_staging_bucket_alias(self) -> None: + """Verify 'staging_bucket' alias resolves to staging_bucket.""" + bucket = PurePosixPath("alt-staging") + s = make_settings(staging_bucket=bucket) + assert s.staging_bucket == bucket + + def test_destination_bucket_alias(self) -> None: + """Verify 'destination_bucket' alias resolves to destination_bucket.""" + bucket = PurePosixPath("alt-dest") + s = make_settings(destination_bucket=bucket) + assert s.destination_bucket == bucket + + +# Settings validation + + +class TestPromoteSettingsValidation: + """Test validation constraints.""" + + def test_staging_path_required(self) -> None: + """Verify omitting staging_path raises ValidationError.""" + settings_ctor = cast("Any", PromoteSettings) + with pytest.raises((ValidationError, Exception)): + settings_ctor(_cli_parse_args=[], dlt_config=_generate_dlt_config()) + + def test_transfer_manifest_path_can_be_none(self) -> None: + """Verify transfer_manifest_path can be explicitly set to None.""" + s = make_settings(transfer_manifest=None) + assert s.transfer_manifest_path is None + + +# run_promote + + +_MOCK_REPORT_SUCCESS: dict[str, Any] = { + "timestamp": "2026-01-01T00:00:00+00:00", + "promoted": 5, + "archived": 0, + "failed": 0, + "dry_run": False, +} + +_MOCK_REPORT_WITH_FAILURES: dict[str, Any] = { + **_MOCK_REPORT_SUCCESS, + "promoted": 3, + "failed": 2, +} + + +class TestRunPromote: + """Test run_promote orchestration.""" + + def test_calls_promote_from_s3_with_correct_args(self, tmp_path: Path) -> None: + """Verify run_promote passes all PromoteSettings fields to promote_from_s3.""" + staging_path = PurePosixPath("staging") / "run1" + dest_path = PurePosixPath("warehouse") / "ncbi" + removed = tmp_path / "removed.txt" + updated = tmp_path / "updated.txt" + transfer = PurePosixPath("staging") / "run1" / "transfer_manifest.txt" + config = make_settings( + staging_bucket=PurePosixPath("my-staging"), + destination_bucket=PurePosixPath("my-dest"), + staging_path=staging_path, + destination_path=dest_path, + removed_manifest=removed, + updated_manifest=updated, + transfer_manifest=transfer, + dry_run=True, + ) + + with patch( + "cdm_data_loaders.pipelines.ncbi_ftp_promote.promote_from_s3", + return_value=_MOCK_REPORT_SUCCESS, + ) as mock_promote: + run_promote(config) + + mock_promote.assert_called_once_with( + staging_bucket=PurePosixPath("my-staging"), + staging_key_prefix=staging_path, + lakehouse_bucket=PurePosixPath("my-dest"), + lakehouse_key_prefix=dest_path, + removed_manifest_path=removed, + updated_manifest_path=updated, + manifest_s3_key=transfer, + dry_run=True, + ) + + def test_no_error_on_zero_failures(self) -> None: + """Verify run_promote does not raise when promote_from_s3 reports zero failures.""" + config = make_settings() + with patch( + "cdm_data_loaders.pipelines.ncbi_ftp_promote.promote_from_s3", + return_value=_MOCK_REPORT_SUCCESS, + ): + run_promote(config) # should not raise + + def test_raises_runtime_error_on_failures(self) -> None: + """Verify run_promote raises RuntimeError when promote_from_s3 reports failures.""" + config = make_settings() + with ( + patch( + "cdm_data_loaders.pipelines.ncbi_ftp_promote.promote_from_s3", + return_value=_MOCK_REPORT_WITH_FAILURES, + ), + pytest.raises(RuntimeError, match="2 failures"), + ): + run_promote(config) + + def test_dry_run_forwarded(self) -> None: + """Verify dry_run=True is forwarded to promote_from_s3.""" + config = make_settings(dry_run=True) + with patch( + "cdm_data_loaders.pipelines.ncbi_ftp_promote.promote_from_s3", + return_value=_MOCK_REPORT_SUCCESS, + ) as mock_promote: + run_promote(config) + + _, kwargs = mock_promote.call_args + assert kwargs["dry_run"] is True + + def test_transfer_manifest_none_forwarded(self) -> None: + """Verify transfer_manifest_path=None is forwarded to promote_from_s3 as manifest_s3_key=None.""" + config = make_settings(transfer_manifest=None) + with patch( + "cdm_data_loaders.pipelines.ncbi_ftp_promote.promote_from_s3", + return_value=_MOCK_REPORT_SUCCESS, + ) as mock_promote: + run_promote(config) + + _, kwargs = mock_promote.call_args + assert kwargs["manifest_s3_key"] is None