Skip to content
Merged
129 changes: 70 additions & 59 deletions docs/ncbi_ftp_e2e_walkthrough.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# NCBI FTP Pipeline — Local End-to-End Walkthrough

Step-by-step instructions for running a small (≤ 10 assembly) end-to-end sync
Step-by-step instructions for running a full production transfer in the Lakehouse,
or running a small (≤ 10 assembly) local end-to-end sync
of NCBI RefSeq records against a local CEPH container. The walkthrough uses
the two existing Jupyter notebooks for Phases 1 and 3, and the project's Docker
image for the Phase 2 download step.
a Jupyter notebook for manifest generation, a local or containerized download
and staging CLI tool, and a CLI tool for promotion of the staged records to an S3 store.

> **Prerequisites:**
> - Docker or Podman
Expand All @@ -15,9 +16,10 @@ image for the Phase 2 download step.
## Architecture overview

```
Phase 1 (notebook) Phase 2 (container) Phase 3 (notebook)
Manifest (notebook)
(to be replaced with CLI) Download (container CLI) Promote (local CLI)
┌────────────────────┐ ┌───────────────────────┐ ┌──────────────────────┐
│ Manifest notebook │ │ ncbi_ftp_sync CLI │ │ Promote notebook
│ Manifest notebook │ │ ncbi_ftp_sync CLI │ │ Promote CLI tool
│ ─ download FTP │────▶│ ─ read manifest │────▶│ ─ promote staged │
│ assembly summary │ │ ─ parallel FTP DL │ │ files to Lakehouse │
│ ─ diff against │ │ ─ MD5 verify │ │ ─ archive old ver. │
Expand All @@ -31,11 +33,6 @@ image for the Phase 2 download step.

---

## Path anatomy

All S3 paths in this pipeline compose from a small set of variables.
Understanding this decomposition is the key to configuring the notebooks.

### Path formats used

| Format | Example | Description |
Expand All @@ -49,8 +46,8 @@ Understanding this decomposition is the key to configuring the notebooks.
### Lakehouse object (final location)

```
s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename}
└── bucket ─────┘ └── key prefix ──────┘└── build_accession_path() ────────────────────────┘
s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}/raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename}
└── bucket ─────┘ └── key prefix ──────┘└── build_accession_path() ────────────────────────┘
```

Example:
Expand All @@ -61,7 +58,7 @@ s3://cdm-lake/tenant-general-warehouse/kbase/datasets/ncbi/raw_data/GCF/900/000/
### Staging object (Phase 2 output)

```
s3://{STAGING_BUCKET}/{STAGING_KEY_PREFIX}raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename}
s3://{STAGING_BUCKET}/{STAGING_KEY_PREFIX}/raw_data/{GCF|GCA}/{nnn}/{nnn}/{nnn}/{assembly_dir}/{filename}
└── bucket ─────┘ └── key prefix ────┘└── build_accession_path() ────────────────────────┘
```

Expand Down Expand Up @@ -92,6 +89,13 @@ docker run -d \
-e RGW_SECRET_KEY=test_access_secret \
ghcr.io/kbasetest/ceph-rgw-test-image:0.1.5
```
Set CEPH credentials as environment variables:

```sh
export AWS_ENDPOINT_URL=http://localhost:9000
export AWS_ACCESS_KEY_ID=test_access_key
export AWS_SECRET_ACCESS_KEY=test_access_secret
```

(Note that a similar service is included in the `docker-compose` configuration file at the root of
this repository that is used in CI test workflows.)
Expand Down Expand Up @@ -230,7 +234,7 @@ Step 3b below.
If you are testing against CEPH and want to exercise the S3 upload path:

```python
STAGING_URI = "s3://cdm-lake/staging/run1/"
STAGING_URI = "s3://cts/staging/run1/"
```

> **Tip:** If you re-run later with `PREVIOUS_SUMMARY_URI` pointing at a
Expand Down Expand Up @@ -264,16 +268,16 @@ docker build -t cdm-data-loaders .
### 3b. Prepare local directories

```sh
mkdir -p notebooks/staging
cp notebooks/output/transfer_manifest.txt notebooks/staging/
mkdir -p notebooks/staging/input
cp notebooks/output/* notebooks/staging/input
```

### 3c. Run the download

```sh
docker run --rm \
--userns=keep-id \
-v "$(pwd)/notebooks/staging:/input:ro" \
-v "$(pwd)/notebooks/staging/input:/input:ro" \
-v "$(pwd)/notebooks/staging:/output" \
cdm-data-loaders ncbi_ftp_sync \
--manifest /input/transfer_manifest.txt \
Expand All @@ -293,7 +297,7 @@ docker run --rm \
| `--threads` | Parallel FTP connections (2 is polite for testing) |
| `--limit` | Redundant safety cap (already limited in Phase 1) |

After the container exits, `notebooks/staging/` will contain:
After the container exits, `notebooks/staging/` will contain something like:

```
staging/
Expand All @@ -319,7 +323,7 @@ the FTP server's `md5checksums.txt`.
> --threads 2 --limit 10
> ```

### 3d. Upload staged files to CEPH
### 3d. Local Testing: Upload staged files to CEPH

The download step writes to the local filesystem. To feed Phase 3 we need
to upload the staged files into CEPH under a staging prefix:
Expand All @@ -336,49 +340,52 @@ uv run python scripts/s3_local.py ls s3://cts/staging/run1/

---

## 4. Phase 3 — Promote & archive (notebook)
## 4. Phase 3 — Promote & archive (CLI tool)

Open `notebooks/ncbi_ftp_promote.ipynb`.
Phase 3 uses the `ncbi_ftp_promote` CLI tool to promote staged assemblies from
the S3 staging prefix to their final Lakehouse paths, archive replaced or
suppressed assemblies, and trim the transfer manifest for resumability.

### Constants to change (Cell 3)
### Arguments reference

| Constant | Walkthrough value | Format | Why |
|-------------------------|------------------------------------------------------|--------|---------------------------------------------|
| `STAGING_BUCKET` | `"cts"` | bucket name | CTS staging bucket (Phase 2 writes here) |
| `LAKEHOUSE_BUCKET` | `"cdm-lake"` | bucket name | final Lakehouse destination |
| `STAGING_KEY_PREFIX` | `"staging/run1/"` | S3 key prefix | matches the upload prefix from Step 3d |
| `REMOVED_MANIFEST_PATH` | `None` | local path | nothing to remove on first run |
| `UPDATED_MANIFEST_PATH` | `None` | local path | nothing to archive on first run |
| `NCBI_RELEASE` | `None` | string | no release tag needed for local testing |
| `MANIFEST_S3_KEY` | `None` | S3 object key | skip manifest trimming |
| `LAKEHOUSE_KEY_PREFIX` | `"tenant-general-warehouse/kbase/datasets/ncbi/"` | S3 key prefix | keep default |
| `DRY_RUN` | `True` | bool | **start with dry-run!** |
| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--staging-path` | `-s` | *(required)* | S3 key prefix where Phase 2 wrote its output (must contain a `raw_data/` folder) |
| `--destination-path` | | `tenant-general-warehouse/kbase/datasets/ncbi` | S3 key prefix in the destination bucket to promote files into |
| `--staging-bucket` | | `cts` | S3 bucket containing the staged files |
| `--destination-bucket` | | `cdm-lake` | S3 bucket to promote files into (Lakehouse) |
| `--removed-manifest` | `-r` | *(none)* | Local path to the removed manifest from Phase 1; omit to skip archiving removed assemblies |
| `--updated-manifest` | `-u` | *(none)* | Local path to the updated manifest from Phase 1; omit to skip archiving updated assemblies |
| `--transfer-manifest` | `-t` | `{staging-path}/transfer_manifest.txt` | S3 key of the transfer manifest to trim after a successful promote |
| `--dry-run` | | `False` | Log what would happen without making any changes |

### Initialise the S3 client for CEPH

The notebook calls `get_s3_client()` which, by default, tries to import
credentials from `berdl_notebook_utils`. For local CEPH you need to
initialise the client manually **before** running Cell 4. Insert a new cell
after Cell 2 (Imports) with:

```python
from cdm_data_loaders.utils.s3 import get_s3_client, reset_s3_client
First, do a dry run to make sure everything looks as expected:

reset_s3_client() # clear any cached client
get_s3_client({
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "test_access_key",
"aws_secret_access_key": "test_access_secret",
})
```sh
uv run ncbi_ftp_promote \
--staging-path staging/run1 \
--removed-manifest notebooks/output/removed.txt \
--updated-manifest notebooks/output/updated.txt \
--dry-run
```

### Run the notebook
Once everything looks good, run the actual promotion:
```sh
uv run ncbi_ftp_promote \
--staging-path staging/run1 \
--removed-manifest notebooks/output/removed.txt \
--updated-manifest notebooks/output/updated.txt
```

If you exclude the removed and updated manifests, no archiving will occur, just promotion of staged records.

1. Execute all cells. With `DRY_RUN = True` the promote step will log what it
*would* do without moving any objects.
2. Review the report in Cell 6.
3. If the dry-run looks correct, set `DRY_RUN = False` in Cell 3 and re-run
from Cell 3.
The CLI prints a promote summary on completion:

```
PROMOTE SUMMARY: 10 promoted, 0 archived, 0 failed
```

After promotion the final Lakehouse layout in CEPH will look like:

Expand Down Expand Up @@ -437,8 +444,12 @@ copied to:
s3://{LAKEHOUSE_BUCKET}/{LAKEHOUSE_KEY_PREFIX}archive/{release_tag}/metadata/{assembly_dir}_datapackage.json
```

Use the last cell of `notebooks/ncbi_ftp_promote.ipynb` to list and preview
all descriptors written in a promote run.
Use `scripts/s3_local.py ls` to list all descriptors written in a promote run:

```sh
uv run python scripts/s3_local.py ls \
s3://cdm-lake/tenant-general-warehouse/kbase/datasets/ncbi/metadata/
```

To inspect a descriptor directly:

Expand All @@ -460,9 +471,9 @@ previous snapshot:
2. **Phase 1:** The diff will now show `updated`, `replaced`, and
`suppressed` entries (if any changed between runs).
3. **Phase 2:** Download the new manifest.
4. **Phase 3:** Set `REMOVED_MANIFEST_PATH` and `UPDATED_MANIFEST_PATH` to the paths
from Phase 1. Updated assemblies will be archived before overwrite;
removed assemblies will be archived and deleted.
4. **Phase 3:** Pass `--removed-manifest` and `--updated-manifest` pointing at
the files produced in Phase 1. Updated assemblies are archived before
overwrite; removed assemblies are archived and deleted.

---

Expand All @@ -482,9 +493,9 @@ rm -rf staging/ output/

| Symptom | Cause | Fix |
|---------|-------|-----|
| `berdl_notebook_utils` import error in notebook | Missing local CEPH client init | Add the `get_s3_client({...})` cell described in Step 4 |
| `connect_ftp() timeout` | NCBI FTP may be slow or rate-limited | Retry; reduce `--threads` to 1 |
| Phase 3 shows 0 promoted | Staging prefix doesn't match or bucket is wrong | Verify `STAGING_KEY_PREFIX` matches the S3 upload path from Step 3d |
| Phase 3 shows 0 promoted | Staging prefix doesn't match or bucket is wrong | Verify `--staging-path` matches the S3 upload path from Step 3d |
| Phase 3 S3 auth error | Missing credentials for CEPH | Export `AWS_ENDPOINT_URL`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` before running |
| Container can't reach FTP | Docker network isolation | Use `--network host` or ensure DNS resolution works inside the container |

---
Expand Down
16 changes: 8 additions & 8 deletions notebooks/ncbi_ftp_manifest.ipynb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{

Check failure on line 1 in notebooks/ncbi_ftp_manifest.ipynb

View workflow job for this annotation

GitHub Actions / Check code formatting

ruff (PLW0603)

notebooks/ncbi_ftp_manifest.ipynb:1:1: PLW0603 Using the global statement to update `_last_refresh` is discouraged
"cells": [
{
"cell_type": "markdown",
Expand Down Expand Up @@ -49,7 +49,7 @@
"\"\"\"Imports and S3 client initialisation.\"\"\"\n",
"\n",
"import json\n",
"from pathlib import Path\n",
"from pathlib import Path, PurePosixPath\n",
"\n",
"from cdm_data_loaders.ncbi_ftp.assembly import FTP_HOST\n",
"from cdm_data_loaders.ncbi_ftp.manifest import (\n",
Expand Down Expand Up @@ -100,8 +100,8 @@
"DATABASE = \"refseq\"\n",
"\n",
"# Accession prefix filtering (3-digit, inclusive). Set to None to skip.\n",
"PREFIX_FROM: str | None = \"900\" # e.g. \"000\"\n",
"PREFIX_TO: str | None = \"900\" # e.g. \"003\"\n",
"PREFIX_FROM: str | None = \"960\" # e.g. \"000\"\n",
"PREFIX_TO: str | None = \"962\" # e.g. \"003\"\n",
"\n",
"# Maximum number of new/updated assemblies to include (None = unlimited)\n",
"LIMIT: int | None = 10\n",
Expand All @@ -120,9 +120,9 @@
"# Set LAKEHOUSE_BUCKET to your bucket name to enable, or None to skip.\n",
"# STORE_KEY_PREFIX should point to the directory containing `raw_data/`.\n",
"# format: bucket name (no s3:// scheme)\n",
"LAKEHOUSE_BUCKET: str | None = \"cdm-lake\"\n",
"LAKEHOUSE_BUCKET: PurePosixPath | None = PurePosixPath(\"cdm-lake\")\n",
"# format: S3 key prefix within LAKEHOUSE_BUCKET (no scheme, no bucket)\n",
"STORE_KEY_PREFIX = \"tenant-general-warehouse/kbase/datasets/ncbi/\"\n",
"STORE_KEY_PREFIX: PurePosixPath = PurePosixPath(\"tenant-general-warehouse/kbase/datasets/ncbi/\")\n",
"\n",
"# Local output directory for manifest files\n",
"# format: local directory path\n",
Expand Down Expand Up @@ -254,7 +254,7 @@
" # Store scan didn't run, or was skipped. Try to load from S3.\n",
" if PREVIOUS_SUMMARY_URI:\n",
" s3 = get_s3_client()\n",
" bucket, key = split_s3_path(PREVIOUS_SUMMARY_URI)\n",
" bucket, key = split_s3_path(str(PREVIOUS_SUMMARY_URI))\n",
" resp = s3.get_object(Bucket=bucket, Key=key)\n",
" prev_text = resp[\"Body\"].read().decode(\"utf-8\")\n",
" previous = parse_assembly_summary(prev_text)\n",
Expand Down Expand Up @@ -396,7 +396,7 @@
"# Upload new snapshot to S3 for future diffing\n",
"if SNAPSHOT_UPLOAD_URI:\n",
" s3 = get_s3_client()\n",
" bucket, key = split_s3_path(SNAPSHOT_UPLOAD_URI)\n",
" bucket, key = split_s3_path(str(SNAPSHOT_UPLOAD_URI))\n",
" s3.put_object(Bucket=bucket, Key=key, Body=raw_summary.encode(\"utf-8\"))\n",
" print(f\"Uploaded new snapshot to {SNAPSHOT_UPLOAD_URI}\")\n",
"else:\n",
Expand Down Expand Up @@ -443,7 +443,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "cdm-data-loaders (3.13.11)",
"display_name": "cdm-data-loaders (3.13.11.final.0)",
"language": "python",
"name": "python3"
},
Expand Down
Loading
Loading