diff --git a/CHANGELOG.md b/CHANGELOG.md index 52db0d1a..7ef40aa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,16 @@ Instead, changes appear below grouped by the date they were added to the workflo ## 2025 +* 08 October 2025: phylogenetic - Major update to the definition of inputs. ([#339][]) + * Configs are now required to include the `inputs` param to define inputs for the workflow + + ```yaml + inputs: + - name: ncbi + metadata: "https://data.nextstrain.org/files/workflows/mpox/metadata.tsv.zst" + sequences: "https://data.nextstrain.org/files/workflows/mpox/sequences.fasta.zst" + ``` + * 02 July 2025: phylogenetic - config schema updates for easier config overlays ([#321][]) * new required config params * `exclude` - path to exclude.txt for `augur filter` @@ -65,4 +75,5 @@ Instead, changes appear below grouped by the date they were added to the workflo [#318]: https://github.com/nextstrain/mpox/pull/318 [#319]: https://github.com/nextstrain/mpox/pull/319 [#321]: https://github.com/nextstrain/mpox/pull/321 +[#339]: https://github.com/nextstrain/mpox/pull/339 [NCBI Datasets mnemonics]: https://www.ncbi.nlm.nih.gov/datasets/docs/v2/reference-docs/command-line/dataformat/tsv/dataformat_tsv_virus-genome/#fields diff --git a/phylogenetic/Snakefile b/phylogenetic/Snakefile index c1bf3389..b8739ea9 100644 --- a/phylogenetic/Snakefile +++ b/phylogenetic/Snakefile @@ -2,6 +2,10 @@ from packaging import version from augur.__version__ import __version__ as augur_version import os import sys +from snakemake.utils import min_version + +# Minimum Snakemake version needed for the storage plugins used in remote_files.smk +min_version("8.0.0") # The 'exec &> >(tee {log:q})' in each rule makes Python think it's not connected to terminal # Setting unbuffered makes it behave as expected @@ -46,7 +50,9 @@ rule all: """ +include: "../shared/vendored/snakemake/remote_files.smk" include: "rules/config.smk" +include: "rules/merge_inputs.smk" include: "rules/prepare_sequences.smk" include: "rules/construct_phylogeny.smk" include: "rules/annotate_phylogeny.smk" diff --git a/phylogenetic/build-configs/chores/chores.smk b/phylogenetic/build-configs/chores/chores.smk index 2998701f..cee22b99 100644 --- a/phylogenetic/build-configs/chores/chores.smk +++ b/phylogenetic/build-configs/chores/chores.smk @@ -10,8 +10,8 @@ rule update_example_data: - ensures all clades and lineages are accounted for using --group-by """ input: - sequences="data/sequences.fasta", - metadata="data/metadata.tsv", + sequences="results/sequences.fasta", + metadata="results/metadata.tsv", output: sequences="example_data/sequences.fasta", metadata="example_data/metadata.tsv", diff --git a/phylogenetic/build-configs/ci/config.yaml b/phylogenetic/build-configs/ci/config.yaml index d49db873..b60fecff 100644 --- a/phylogenetic/build-configs/ci/config.yaml +++ b/phylogenetic/build-configs/ci/config.yaml @@ -1,5 +1,7 @@ -custom_rules: - - build-configs/ci/copy_example_data.smk +inputs: + - name: example + metadata: "example_data/metadata.tsv" + sequences: "example_data/sequences.fasta" reference: "defaults/reference.fasta" genome_annotation: "defaults/genome_annotation.gff3" diff --git a/phylogenetic/build-configs/ci/copy_example_data.smk b/phylogenetic/build-configs/ci/copy_example_data.smk deleted file mode 100644 index 447240f3..00000000 --- a/phylogenetic/build-configs/ci/copy_example_data.smk +++ /dev/null @@ -1,15 +0,0 @@ -rule copy_example_data: - input: - sequences="example_data/sequences.fasta", - metadata="example_data/metadata.tsv", - output: - sequences="data/sequences.fasta", - metadata="data/metadata.tsv", - shell: - """ - cp -f {input.sequences} {output.sequences} - cp -f {input.metadata} {output.metadata} - """ - - -ruleorder: copy_example_data > decompress diff --git a/phylogenetic/defaults/clade-i/config.yaml b/phylogenetic/defaults/clade-i/config.yaml index 82b6170c..82fd9c43 100644 --- a/phylogenetic/defaults/clade-i/config.yaml +++ b/phylogenetic/defaults/clade-i/config.yaml @@ -1,3 +1,8 @@ +inputs: + - name: ncbi + metadata: "https://data.nextstrain.org/files/workflows/mpox/metadata.tsv.zst" + sequences: "https://data.nextstrain.org/files/workflows/mpox/sequences.fasta.zst" + reference: "defaults/clade-i/reference.fasta" genome_annotation: "defaults/clade-i/genome_annotation.gff3" genbank_reference: "defaults/clade-i/reference.gb" diff --git a/phylogenetic/defaults/hmpxv1/config.yaml b/phylogenetic/defaults/hmpxv1/config.yaml index b9a8a906..d036dadd 100644 --- a/phylogenetic/defaults/hmpxv1/config.yaml +++ b/phylogenetic/defaults/hmpxv1/config.yaml @@ -1,3 +1,8 @@ +inputs: + - name: ncbi + metadata: "https://data.nextstrain.org/files/workflows/mpox/metadata.tsv.zst" + sequences: "https://data.nextstrain.org/files/workflows/mpox/sequences.fasta.zst" + reference: "defaults/reference.fasta" genome_annotation: "defaults/genome_annotation.gff3" genbank_reference: "defaults/reference.gb" diff --git a/phylogenetic/defaults/hmpxv1_big/config.yaml b/phylogenetic/defaults/hmpxv1_big/config.yaml index 973d83fe..93b508e2 100644 --- a/phylogenetic/defaults/hmpxv1_big/config.yaml +++ b/phylogenetic/defaults/hmpxv1_big/config.yaml @@ -1,3 +1,8 @@ +inputs: + - name: ncbi + metadata: "https://data.nextstrain.org/files/workflows/mpox/metadata.tsv.zst" + sequences: "https://data.nextstrain.org/files/workflows/mpox/sequences.fasta.zst" + reference: "defaults/reference.fasta" genome_annotation: "defaults/genome_annotation.gff3" genbank_reference: "defaults/reference.gb" diff --git a/phylogenetic/defaults/mpxv/config.yaml b/phylogenetic/defaults/mpxv/config.yaml index a5c819f2..f9a66a74 100644 --- a/phylogenetic/defaults/mpxv/config.yaml +++ b/phylogenetic/defaults/mpxv/config.yaml @@ -1,3 +1,8 @@ +inputs: + - name: ncbi + metadata: "https://data.nextstrain.org/files/workflows/mpox/metadata.tsv.zst" + sequences: "https://data.nextstrain.org/files/workflows/mpox/sequences.fasta.zst" + auspice_config: "defaults/mpxv/auspice_config.json" include: "defaults/mpxv/include.txt" exclude: "defaults/exclude.txt" diff --git a/phylogenetic/rules/merge_inputs.smk b/phylogenetic/rules/merge_inputs.smk new file mode 100644 index 00000000..0c576101 --- /dev/null +++ b/phylogenetic/rules/merge_inputs.smk @@ -0,0 +1,186 @@ +""" +This part of the workflow merges inputs based on what is defined in the config. + +OUTPUTS: + + metadata = results/metadata.tsv + sequences = results/sequences.fasta + +The config dict is expected to have a top-level `inputs` list that defines the +separate inputs' name, metadata, and sequences. Optionally, the config can have +a top-level `additional-inputs` list that is used to define additional data that +are combined with the default inputs: + +```yaml +inputs: + - name: default + metadata: + sequences: + +additional_inputs: + - name: private + metadata: + sequences: +``` + +Supports any of the compression formats that are supported by `augur read-file`, +see +""" + +from pathlib import Path + + +def _gather_inputs(): + all_inputs = [*config["inputs"], *config.get("additional_inputs", [])] + + if len(all_inputs) == 0: + raise InvalidConfigError( + "Config must define at least one element in config.inputs or config.additional_inputs lists" + ) + if not all([isinstance(i, dict) for i in all_inputs]): + raise InvalidConfigError( + "All of the elements in config.inputs and config.additional_inputs lists must be dictionaries. " + "If you've used a command line '--config' double check your quoting." + ) + if len({i["name"] for i in all_inputs}) != len(all_inputs): + raise InvalidConfigError( + "Names of inputs (config.inputs and config.additional_inputs) must be unique" + ) + if not all( + ["name" in i and ("sequences" in i or "metadata" in i) for i in all_inputs] + ): + raise InvalidConfigError( + "Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'" + ) + if not any(["metadata" in i for i in all_inputs]): + raise InvalidConfigError("At least one input must have 'metadata'") + if not any(["sequences" in i for i in all_inputs]): + raise InvalidConfigError("At least one input must have 'sequences'") + + available_keys = set(["name", "metadata", "sequences"]) + if any([len(set(el.keys()) - available_keys) > 0 for el in all_inputs]): + raise InvalidConfigError( + f"Each input (config.inputs and config.additional_inputs) can only include keys of {', '.join(available_keys)}" + ) + + return { + el["name"]: {k: (v if k == "name" else path_or_url(v)) for k, v in el.items()} + for el in all_inputs + } + + +input_sources = _gather_inputs() +_input_metadata = [ + info["metadata"] for info in input_sources.values() if info.get("metadata", None) +] +_input_sequences = [ + info["sequences"] for info in input_sources.values() if info.get("sequences", None) +] + + +if len(_input_metadata) == 1: + + rule decompress_metadata: + """ + This rule is invoked when there is a single metadata input to + ensure that we have a decompressed input for downstream rules to match + the output of rule.merge_metadata. + """ + input: + metadata=_input_metadata[0], + output: + metadata="results/metadata.tsv", + log: + "logs/decompress_metadata.txt", + benchmark: + "benchmarks/decompress_metadata.txt" + shell: + r""" + exec &> >(tee {log:q}) + + augur read-file {input.metadata:q} > {output.metadata:q} + """ + +else: + + rule merge_metadata: + """ + This rule is invoked when there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + **{ + name: info["metadata"] + for name, info in input_sources.items() + if info.get("metadata", None) + }, + params: + metadata=lambda w, input: list(map("=".join, input.items())), + id_field=config["strain_id_field"], + output: + metadata="results/metadata.tsv", + log: + "logs/merge_metadata.txt", + benchmark: + "benchmarks/merge_metadata.txt" + shell: + r""" + exec &> >(tee {log:q}) + + augur merge \ + --metadata {params.metadata:q} \ + --metadata-id-columns {params.id_field:q} \ + --output-metadata {output.metadata:q} + """ + + +if len(_input_sequences) == 1: + + rule decompress_sequences: + """ + This rule is invoked when there is a single sequences input to + ensure that we have a decompressed input for downstream rules to match + the output of rule.merge_sequences. + """ + input: + sequences=_input_sequences[0], + output: + sequences="results/sequences.fasta", + log: + "logs/decompress_sequences.txt", + benchmark: + "benchmarks/decompress_sequences.txt" + shell: + r""" + exec &> >(tee {log:q}) + + augur read-file {input.sequences:q} > {output.sequences:q} + """ + +else: + + rule merge_sequences: + """ + This rule is invoked when there are multiple defined sequences inputs + (config.inputs + config.additional_inputs) + """ + input: + **{ + name: info["sequences"] + for name, info in input_sources.items() + if info.get("sequences", None) + }, + output: + sequences="results/sequences.fasta", + log: + "logs/merge_sequences.txt", + benchmark: + "benchmarks/merge_sequences.txt" + shell: + r""" + exec &> >(tee {log:q}) + + augur merge \ + --sequences {input:q} \ + --output-sequences {output.sequences:q} + """ diff --git a/phylogenetic/rules/prepare_sequences.smk b/phylogenetic/rules/prepare_sequences.smk index f847f28b..ef6de3ed 100644 --- a/phylogenetic/rules/prepare_sequences.smk +++ b/phylogenetic/rules/prepare_sequences.smk @@ -3,6 +3,8 @@ This part of the workflow prepares sequences for constructing the phylogenetic t REQUIRED INPUTS: + metadata = results/metadata.tsv + sequences = results/sequences.fasta include = path to file of sequences to force include exclude = path to file of sequences to exclude reference = path to reference sequence FASTA for Nextclade alignment @@ -16,59 +18,13 @@ OUTPUTS: """ -rule download: - """ - Downloading sequences and metadata from data.nextstrain.org - """ - output: - sequences="data/sequences.fasta.zst", - metadata="data/metadata.tsv.zst", - params: - sequences_url="https://data.nextstrain.org/files/workflows/mpox/sequences.fasta.zst", - metadata_url="https://data.nextstrain.org/files/workflows/mpox/metadata.tsv.zst", - log: - "logs/download.txt", - benchmark: - "benchmarks/download.txt" - shell: - r""" - exec &> >(tee {log:q}) - - curl -fsSL --compressed {params.sequences_url:q} --output {output.sequences:q} - curl -fsSL --compressed {params.metadata_url:q} --output {output.metadata:q} - """ - - -rule decompress: - """ - Decompressing sequences and metadata - """ - input: - sequences="data/sequences.fasta.zst", - metadata="data/metadata.tsv.zst", - output: - sequences="data/sequences.fasta", - metadata="data/metadata.tsv", - log: - "logs/decompress.txt", - benchmark: - "benchmarks/decompress.txt" - shell: - r""" - exec &> >(tee {log:q}) - - zstd --decompress --stdout {input.sequences:q} > {output.sequences:q} - zstd --decompress --stdout {input.metadata:q} > {output.metadata:q} - """ - - rule filter: """ Removing strains that do not satisfy certain requirements. """ input: - sequences="data/sequences.fasta", - metadata="data/metadata.tsv", + sequences="results/sequences.fasta", + metadata="results/metadata.tsv", exclude=config["exclude"], output: sequences=build_dir + "/{build_name}/good_sequences.fasta", diff --git a/shared/vendored/.github/workflows/ci.yaml b/shared/vendored/.github/workflows/ci.yaml index c716277e..94d3054e 100644 --- a/shared/vendored/.github/workflows/ci.yaml +++ b/shared/vendored/.github/workflows/ci.yaml @@ -11,5 +11,5 @@ jobs: shellcheck: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: nextstrain/.github/actions/shellcheck@master diff --git a/shared/vendored/.github/workflows/pre-commit.yaml b/shared/vendored/.github/workflows/pre-commit.yaml index 70da533c..bea15f67 100644 --- a/shared/vendored/.github/workflows/pre-commit.yaml +++ b/shared/vendored/.github/workflows/pre-commit.yaml @@ -7,8 +7,8 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 + - uses: actions/checkout@v5 + - uses: actions/setup-python@v6 with: python-version: "3.12" - uses: pre-commit/action@v3.0.1 diff --git a/shared/vendored/.gitrepo b/shared/vendored/.gitrepo index 841be68b..deee8ab8 100644 --- a/shared/vendored/.gitrepo +++ b/shared/vendored/.gitrepo @@ -6,7 +6,7 @@ [subrepo] remote = https://github.com/nextstrain/shared branch = main - commit = ee00711e19ad30267957e0466fd55fe307109773 - parent = 4cc39272c5db7129263cfdd482a8d848e92dc9a5 + commit = 43e5a6fe06c95f605cc573a7e6ad2d4a023db7b9 + parent = b6b1000536050c6ebcbde0c79902c16b0b76a11a method = merge cmdver = 0.4.6 diff --git a/shared/vendored/README.md b/shared/vendored/README.md index 6393c3d9..eb2e3ba6 100644 --- a/shared/vendored/README.md +++ b/shared/vendored/README.md @@ -124,6 +124,8 @@ Potential Nextstrain CLI scripts Snakemake workflow functions that are shared across many pathogen workflows that don’t really belong in any of our existing tools. - [config.smk](snakemake/config.smk) - Shared functions for parsing workflow configs. +- [remote_files.smk](snakemake/remote_files.smk) - Exposes the `path_or_url` function which will use Snakemake's storage plugins to download/upload files to remote providers as needed. + ## Software requirements diff --git a/shared/vendored/snakemake/remote_files.smk b/shared/vendored/snakemake/remote_files.smk new file mode 100644 index 00000000..844f80e6 --- /dev/null +++ b/shared/vendored/snakemake/remote_files.smk @@ -0,0 +1,159 @@ +""" +Helper functions to set-up storage plugins for remote inputs/outputs. See the +docstring of `path_or_url` for usage instructions. + +The errors raised by storage plugins are often confusing. For instance, a HTTP +404 error will result in a `MissingInputException` with little hint as to the +underlying issue. S3 credentials errors are similarly confusing and we attempt +to check these ourselves to improve UX here. +""" + +from urllib.parse import urlparse + +# Keep a list of known public buckets, which we'll allow uncredentialled (unsigned) access to +# We could make this config-definable in the future +PUBLIC_BUCKETS = set(['nextstrain-data']) + +# Keep track of registered storage plugins to enable reuse +_storage_registry = {} + +class RemoteFilesMissingCredentials(Exception): + pass + +def _storage_s3(*, bucket, keep_local, retries) -> snakemake.storage.StorageProviderProxy: + """ + Registers and returns an instance of snakemake-storage-plugin-s3. Typically AWS + credentials are required for _any_ request however we allow requests to known + public buckets (see `PUBLIC_BUCKETS`) to be unsigned which allows for a nice user + experience in the common case of downloading inputs from s3://nextstrain-data. + + The intended behaviour for various (S3) URIs supplied to `path_or_url` is: + + | | S3 buckets | credentials present | credentials missing | + |----------|----------------------------|---------------------|---------------------| + | download | private / private + public | signed | Credentials Error | + | | public | signed | unsigned | + | upload | private / private + public | signed | Credentials Error | + | | public | signed | AccessDenied Error | + """ + # If the bucket is public then we may use an unsigned request which has the nice UX + # of not needing credentials to be present. If we've made other signed requests _or_ + # credentials are present then we just sign everything. This has implications for upload: + # if you attempt to upload to a public bucket without credentials then we allow that here + # and you'll get a subsequent `AccessDenied` error when the upload is attempted. + if bucket in PUBLIC_BUCKETS and \ + "s3_signed" not in _storage_registry and \ + ("s3_unsigned" in _storage_registry or not _aws_credentials_present()): + + if provider:=_storage_registry.get('s3_unsigned', None): + return provider + + from botocore import UNSIGNED # dependency of snakemake-storage-plugin-s3 + storage s3_unsigned: + provider="s3", + signature_version=UNSIGNED, + retries=retries, + keep_local=keep_local, + + _storage_registry['s3_unsigned'] = storage.s3_unsigned + return _storage_registry['s3_unsigned'] + + # Resource fetched/uploaded via a signed request, which will require AWS credentials + if provider:=_storage_registry.get('s3_signed', None): + return provider + + # Enforce the presence of credentials to paper over + if not _aws_credentials_present(): + raise RemoteFilesMissingCredentials() + + # the tag appears in the local file path, so reference 'signed' to give a hint about credential errors + storage s3_signed: + provider="s3", + retries=retries, + keep_local=keep_local, + + _storage_registry['s3_signed'] = storage.s3_signed + return _storage_registry['s3_signed'] + +def _aws_credentials_present() -> bool: + import boto3 # dependency of snakemake-storage-plugin-s3 + session = boto3.Session() + creds = session.get_credentials() + return creds is not None + +def _storage_http(*, keep_local, retries) -> snakemake.storage.StorageProviderProxy: + """ + Registers and returns an instance of snakemake-storage-plugin-http + """ + if provider:=_storage_registry.get('http', None): + return provider + + storage: + provider="http", + allow_redirects=True, + supports_head=True, + keep_local=keep_local, + retries=retries, + + _storage_registry['http'] = storage.http + return _storage_registry['http'] + + +def path_or_url(uri, *, keep_local=True, retries=2) -> str: + """ + Intended for use in Snakemake inputs / outputs to transparently use remote + resources. Returns the URI wrapped by an applicable storage plugin. Local + filepaths will be returned unchanged. + + For example, the following rule will download inputs from HTTPs and upload + the output to S3: + + rule filter: + input: + sequences = path_or_url("https://data.nextstrain.org/..."), + metadata = path_or_url("https://data.nextstrain.org/..."), + output: + sequences = path_or_url("s3://...") + shell: + r''' + augur filter \ + --sequences {input.sequences:q} \ + --metadata {input.metadata:q} \ + --metadata-id-columns accession \ + --output-sequences {output.sequences:q} + ''' + + If *keep_local* is True (the default) then downloaded/uploaded files will + remain in `.snakemake/storage/`. The presence of a previously downloaded + file (via `keep_local=True`) does not guarantee that the file will not be + re-downloaded if the storage plugin decides the local file is out of date. + + Depending on the *uri* authentication may be required. See the specific + helper functions (such as `_storage_s3`) for more details. + + See for + more information on Snakemake storage plugins. Note: various snakemake + plugins will be required depending on the URIs provided. + """ + info = urlparse(uri) + + if info.scheme=='': # local + return uri # no storage wrapper + + if info.scheme=='s3': + try: + return _storage_s3(bucket=info.netloc, keep_local=keep_local, retries=retries)(uri) + except RemoteFilesMissingCredentials as e: + raise Exception(f"AWS credentials are required to access {uri!r}") from e + + if info.scheme=='https': + return _storage_http(keep_local=keep_local, retries=retries)(uri) + elif info.scheme=='http': + raise Exception(f"HTTP remote file support is not implemented in nextstrain workflows (attempting to access {uri!r}).\n" + "Please use an HTTPS address instead.") + + if info.scheme in ['gs', 'gcs']: + raise Exception(f"Google Storage is not yet implemented for nextstrain workflows (attempting to access {uri!r}).\n" + "Please get in touch if you require this functionality and we can add it to our workflows") + + raise Exception(f"Input address {uri!r} (scheme={info.scheme!r}) is from a non-supported remote")