From aa0c741f26420e613528c742011cb0285e5be60d Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 13:27:02 -0400 Subject: [PATCH 01/16] Refactor to use variation-normalizer programmatically for CopyNumberCount and CopyNumberChange --- clinvar_gk_pilot/main.py | 120 ++++++++++++++++++++++++++++++++++++++- pyproject.toml | 3 +- 2 files changed, 119 insertions(+), 4 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index b8e479c..9f008c9 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -1,3 +1,4 @@ +import asyncio import contextlib import gzip import json @@ -6,11 +7,14 @@ import pathlib import queue import sys +from dataclasses import dataclass from functools import partial from typing import List +import requests from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator +from ga4gh.vrs.models import CopyChange from clinvar_gk_pilot.cli import parse_args from clinvar_gk_pilot.gcs import ( @@ -178,7 +182,7 @@ def allele(clinvar_json: dict) -> dict: return {"errors": str(e)} -def copy_number_count(clinvar_json: dict) -> dict: +def copy_number_count_vrspython(clinvar_json: dict) -> dict: try: tlr = cnv_translators[clinvar_json.get("assembly_version", "38")] kwargs = {"copies": clinvar_json["absolute_copies"]} @@ -194,6 +198,82 @@ def copy_number_count(clinvar_json: dict) -> dict: def copy_number_change(clinvar_json: dict) -> dict: + """ + Create a VRS CopyNumberChange variation using the variation-normalization module. + + Returns: + Dictionary containing VRS representation or error information + """ + try: + # Extract required parameters from clinvar_json + hgvs_expr = clinvar_json["source"] + # absolute_copies = clinvar_json["absolute_copies"] + + # Get baseline_copies by offsetting by one from absolute_copies + if clinvar_json["variation_type"] in ["Deletion", "copy number loss"]: + # baseline_copies = absolute_copies + 1 + copy_change = CopyChange.LOSS + elif clinvar_json["variation_type"] in ["Duplication", "copy number gain"]: + # baseline_copies = absolute_copies - 1 + copy_change = CopyChange.GAIN + else: + return {"errors": f"Unknown variation_type: {clinvar_json}"} + + vrs_variant = asyncio.run( + query_handler.to_copy_number_handler.hgvs_to_copy_number_change( + hgvs_expr=hgvs_expr, copy_change=copy_change + ) + ).copy_number_change + return vrs_variant.model_dump(exclude_none=True) + + except Exception as e: + error_msg = f"Unexpected error: {e}" + logger.error(f"Exception in copy_number_count: {clinvar_json}: {error_msg}") + return {"errors": error_msg} + + +def copy_number_count(clinvar_json: dict) -> dict: + """ + Create a VRS Copy Number Count variation using the variation-normalization service. + + Args: + clinvar_json: Dictionary containing ClinVar data with keys: + - source: HGVS expression or other variation string + - absolute_copies: The absolute number of copies + - assembly_version: Optional assembly version (defaults to 38) + + Returns: + Dictionary containing VRS representation or error information + """ + try: + # Extract required parameters from clinvar_json + hgvs_expr = clinvar_json["source"] + absolute_copies = clinvar_json["absolute_copies"] + + # Get baseline_copies by offsetting by one from absolute_copies + if clinvar_json["variation_type"] in ["Deletion", "copy number loss"]: + baseline_copies = absolute_copies + 1 + # copy_change = CopyChange.LOSS + elif clinvar_json["variation_type"] in ["Duplication", "copy number gain"]: + baseline_copies = absolute_copies - 1 + # copy_change = CopyChange.GAIN + else: + return {"errors": f"Unknown variation_type: {clinvar_json}"} + + vrs_variant = asyncio.run( + query_handler.to_copy_number_handler.hgvs_to_copy_number_count( + hgvs_expr=hgvs_expr, baseline_copies=baseline_copies + ) + ).copy_number_change + return vrs_variant + + except Exception as e: + error_msg = f"Unexpected error: {e}" + logger.error(f"Exception in copy_number_count: {clinvar_json}: {error_msg}") + return {"errors": error_msg} + + +def copy_number_change_vrspython(clinvar_json: dict) -> dict: try: tlr = cnv_translators[clinvar_json.get("assembly_version", "38")] kwargs = {"copy_change": clinvar_json["copy_change_type"]} @@ -215,7 +295,7 @@ def partition_file_lines_gz(local_file_path_gz: str, partitions: int) -> List[st Return a list of `partitions` file names that are a roughly equal number of lines from `local_file_path_gz`. """ - filenames = [f"{local_file_path_gz}.part_{i+1}" for i in range(partitions)] + filenames = [f"{local_file_path_gz}.part_{i + 1}" for i in range(partitions)] # Read the file and write each line to a file, looping through the output files with gzip.open(local_file_path_gz, "rt") as f: @@ -251,6 +331,8 @@ def main(argv=sys.argv[1:]): # Make parents os.makedirs(os.path.dirname(outfile), exist_ok=True) + # run_opts = Options(vrs_type_filter=opts.vrs_type_filter) + if opts["parallelism"] == 0: process_as_json_single_thread(local_file_name, outfile) else: @@ -258,13 +340,45 @@ def main(argv=sys.argv[1:]): if __name__ == "__main__": + creds_contents = """[default] + aws_access_key_id = asdf + aws_secret_access_key = asdf""" + aws_fake_creds_filename = "aws_fake_creds" + with open(aws_fake_creds_filename, "w") as f: + f.write(creds_contents) + os.environ["AWS_SHARED_CREDENTIALS_FILE"] = str( + pathlib.Path.cwd() / aws_fake_creds_filename + ) + if "GENE_NORM_DB_URL" not in os.environ: + raise RuntimeError("Must set GENE_NORM_DB_URL (e.g. http://localhost:8001)") + if "SEQREPO_ROOT_DIR" not in os.environ: + raise RuntimeError( + "Must set SEQREPO_ROOT_DIR (e.g. /Users/kferrite/dev/data/seqrepo/2024-12-20)" + ) + if "UTA_DB_URL" not in os.environ: + raise RuntimeError( + "Must set UTA_DB_URL (e.g. postgresql://anonymous@localhost:5433/uta/uta_20241220)" + ) + + # Import and initialize variation-normalizer QueryHandler + # Requires env vars to be set and dynamodb jar to be run locally and pointed to with GENE_NORM_DB_URL + # https://github.com/clingen-data-model/architecture/tree/master/helm/charts/clingen-vicc/docker/dynamodb + # In the `dynamodb` directory above, build: + # podman build -t gene-normalizer-dynamodb:latest . + # Then run it (uses host gcloud config to authenticate to our bucket which has a snapshot of the gene database) + # podman run -it -p 8001:8000 -v $HOME/.config/gcloud:/config/gcloud -v dynamodb:/data -e DATA_DIR=/data -e GOOGLE_APPLICATION_CREDENTIALS=/config/gcloud/application_default_credentials.json -e CLOUDSDK_CONFIG=/config/gcloud gene-normalizer-dynamodb:latest + import variation + from variation.query import QueryHandler + + query_handler = QueryHandler() + if len(sys.argv) == 1: main( [ "--filename", "gs://clinvar-gk-pilot/2025-03-23/dev/vi.json.gz", "--parallelism", - "10", + "2", ] ) diff --git a/pyproject.toml b/pyproject.toml index 09a728d..4d5d9f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,9 +9,10 @@ license = { text = "MIT" } classifiers = ["Programming Language :: Python :: 3"] dependencies = [ "google-cloud-storage~=2.13.0", - "ga4gh.vrs[extras] @ git+https://github.com/ga4gh/vrs-python@2.1.1", + "ga4gh.vrs[extras] @ git+https://github.com/ga4gh/vrs-python@2.1.3", "gunicorn==22.0.0", "flask~=3.0.3", + "variation-normalizer @ git+https://github.com/cancervariants/variation-normalization@0.15.0" ] dynamic = ["version"] From 10e962858af2045de7d765a894b96515aa63a4af Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 13:47:16 -0400 Subject: [PATCH 02/16] Adding timeout logic to copy_number_count --- clinvar_gk_pilot/main.py | 53 ++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 9f008c9..e9d8ba9 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -7,6 +7,7 @@ import pathlib import queue import sys +import threading from dataclasses import dataclass from functools import partial from typing import List @@ -236,19 +237,14 @@ def copy_number_count(clinvar_json: dict) -> dict: """ Create a VRS Copy Number Count variation using the variation-normalization service. - Args: - clinvar_json: Dictionary containing ClinVar data with keys: - - source: HGVS expression or other variation string - - absolute_copies: The absolute number of copies - - assembly_version: Optional assembly version (defaults to 38) - Returns: Dictionary containing VRS representation or error information """ + request_timeout = 5 # seconds try: # Extract required parameters from clinvar_json hgvs_expr = clinvar_json["source"] - absolute_copies = clinvar_json["absolute_copies"] + absolute_copies = int(clinvar_json["absolute_copies"]) # Get baseline_copies by offsetting by one from absolute_copies if clinvar_json["variation_type"] in ["Deletion", "copy number loss"]: @@ -260,12 +256,43 @@ def copy_number_count(clinvar_json: dict) -> dict: else: return {"errors": f"Unknown variation_type: {clinvar_json}"} - vrs_variant = asyncio.run( - query_handler.to_copy_number_handler.hgvs_to_copy_number_count( - hgvs_expr=hgvs_expr, baseline_copies=baseline_copies - ) - ).copy_number_change - return vrs_variant + result_container = [None] + exception_container = [None] + + def call_hgvs_to_copy_number_count(): + try: + # result = asyncio.run( + # query_handler.to_copy_number_handler.hgvs_to_copy_number_count( + # hgvs_expr=hgvs_expr, baseline_copies=baseline_copies + # ) + # ).copy_number_count.model_dump(exclude_none=True) + result = asyncio.run( + query_handler.to_copy_number_handler.hgvs_to_copy_number_count( + hgvs_expr=hgvs_expr, baseline_copies=baseline_copies + ) + ) + if result.copy_number_count: + result_container[0] = result.copy_number_count.model_dump( + exclude_none=True + ) + else: + result_container[0] = {"errors": result.warnings} + except Exception as e: + exception_container[0] = e + + thread = threading.Thread(target=call_hgvs_to_copy_number_count) + thread.start() + thread.join(timeout=request_timeout) + + if thread.is_alive(): + return { + "errors": f"hgvs_to_copy_number_count call timed out after {request_timeout} seconds" + } + + if exception_container[0]: + raise exception_container[0] + + return result_container[0] except Exception as e: error_msg = f"Unexpected error: {e}" From 639f7bb02163a4396413015338c715674b0a4189 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 13:50:59 -0400 Subject: [PATCH 03/16] asdf --- clinvar_gk_pilot/main.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index e9d8ba9..714187d 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -249,10 +249,8 @@ def copy_number_count(clinvar_json: dict) -> dict: # Get baseline_copies by offsetting by one from absolute_copies if clinvar_json["variation_type"] in ["Deletion", "copy number loss"]: baseline_copies = absolute_copies + 1 - # copy_change = CopyChange.LOSS elif clinvar_json["variation_type"] in ["Duplication", "copy number gain"]: baseline_copies = absolute_copies - 1 - # copy_change = CopyChange.GAIN else: return {"errors": f"Unknown variation_type: {clinvar_json}"} @@ -261,11 +259,6 @@ def copy_number_count(clinvar_json: dict) -> dict: def call_hgvs_to_copy_number_count(): try: - # result = asyncio.run( - # query_handler.to_copy_number_handler.hgvs_to_copy_number_count( - # hgvs_expr=hgvs_expr, baseline_copies=baseline_copies - # ) - # ).copy_number_count.model_dump(exclude_none=True) result = asyncio.run( query_handler.to_copy_number_handler.hgvs_to_copy_number_count( hgvs_expr=hgvs_expr, baseline_copies=baseline_copies From 9985c03c667f65b148cdde5c778310860acde224 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 14:00:37 -0400 Subject: [PATCH 04/16] Try with background worker process. Too slow to initialize though. --- clinvar_gk_pilot/main.py | 80 ++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 714187d..66a7696 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -7,7 +7,6 @@ import pathlib import queue import sys -import threading from dataclasses import dataclass from functools import partial from typing import List @@ -233,6 +232,30 @@ def copy_number_change(clinvar_json: dict) -> dict: return {"errors": error_msg} +def _call_hgvs_to_copy_number_count_worker(result_queue, hgvs_expr, baseline_copies): + """Worker function for multiprocessing call to hgvs_to_copy_number_count""" + try: + # Import here to avoid issues with multiprocessing and module-level imports + import asyncio + + from variation.query import QueryHandler + + # Create a new QueryHandler instance in the worker process + query_handler = QueryHandler() + + result = asyncio.run( + query_handler.to_copy_number_handler.hgvs_to_copy_number_count( + hgvs_expr=hgvs_expr, baseline_copies=baseline_copies + ) + ) + if result.copy_number_count: + result_queue.put(result.copy_number_count.model_dump(exclude_none=True)) + else: + result_queue.put({"errors": result.warnings}) + except Exception as e: + result_queue.put({"errors": str(e)}) + + def copy_number_count(clinvar_json: dict) -> dict: """ Create a VRS Copy Number Count variation using the variation-normalization service. @@ -254,38 +277,39 @@ def copy_number_count(clinvar_json: dict) -> dict: else: return {"errors": f"Unknown variation_type: {clinvar_json}"} - result_container = [None] - exception_container = [None] + # Create a queue for communication between processes + result_queue = multiprocessing.Queue() - def call_hgvs_to_copy_number_count(): - try: - result = asyncio.run( - query_handler.to_copy_number_handler.hgvs_to_copy_number_count( - hgvs_expr=hgvs_expr, baseline_copies=baseline_copies - ) - ) - if result.copy_number_count: - result_container[0] = result.copy_number_count.model_dump( - exclude_none=True - ) - else: - result_container[0] = {"errors": result.warnings} - except Exception as e: - exception_container[0] = e - - thread = threading.Thread(target=call_hgvs_to_copy_number_count) - thread.start() - thread.join(timeout=request_timeout) - - if thread.is_alive(): + # Create and start the worker process + process = multiprocessing.Process( + target=_call_hgvs_to_copy_number_count_worker, + args=(result_queue, hgvs_expr, baseline_copies), + ) + process.start() + + # Wait for the process to complete or timeout + process.join(timeout=request_timeout) + + if process.is_alive(): + # Process didn't complete in time, forcefully terminate it + process.terminate() + process.join() # Wait for the process to actually terminate return { "errors": f"hgvs_to_copy_number_count call timed out after {request_timeout} seconds" } - if exception_container[0]: - raise exception_container[0] + # Check if the process completed successfully + if process.exitcode != 0: + return { + "errors": f"Worker process failed with exit code {process.exitcode}" + } - return result_container[0] + # Get the result from the queue + try: + result = result_queue.get_nowait() + return result + except queue.Empty: + return {"errors": "No result received from worker process"} except Exception as e: error_msg = f"Unexpected error: {e}" @@ -412,3 +436,5 @@ def main(argv=sys.argv[1:]): # ) else: main(sys.argv[1:]) + main(sys.argv[1:]) + main(sys.argv[1:]) From fac57786039532f38ddd7c3bc9d25f33d2e40b27 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 14:53:14 -0400 Subject: [PATCH 05/16] Fix worker init. Make CopyNumberChange code match. --- clinvar_gk_pilot/main.py | 125 +++++++++++++-------------------------- 1 file changed, 42 insertions(+), 83 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 66a7696..7947c35 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -65,11 +65,15 @@ def process_line(line: str) -> str: def _task_worker( - task_queue: multiprocessing.Queue, return_queue: multiprocessing.Queue + task_queue: multiprocessing.Queue, return_queue: multiprocessing.Queue, init_fn=None ): """ Worker function that processes tasks from a queue. """ + # Run any per-process initialization + if init_fn: + init_fn() + while True: task = task_queue.get() if task is None: @@ -77,11 +81,20 @@ def _task_worker( return_queue.put(task()) +# Define init function to set up QueryHandler in this process +def init_query_handler(): + from variation.query import QueryHandler + + global query_handler + query_handler = QueryHandler() + + def worker(file_name_gz: str, output_file_name: str) -> None: """ Takes an input file (a GZIP file of newline delimited), runs `process_line` on each line, and writes the output to a new GZIP file called `output_file_name`. """ + with ( gzip.open(file_name_gz, "rt", encoding="utf-8") as input_file, gzip.open(output_file_name, "wt", encoding="utf-8") as output_file, @@ -93,10 +106,11 @@ def worker(file_name_gz: str, output_file_name: str) -> None: def make_background_process(): p = multiprocessing.Process( target=_task_worker, - args=(task_queue, return_queue), + args=(task_queue, return_queue, init_query_handler), ) return p + print("Making background process _task_worker") background_process = make_background_process() background_process.start() @@ -145,6 +159,7 @@ def process_as_json( part_input_file_names = partition_file_lines_gz(input_file_name, parallelism) part_output_file_names = [f"{ofn}.out" for ofn in part_input_file_names] + print(f"Partitioned filenames: {part_output_file_names}") workers = [] # Start a worker per file name @@ -153,6 +168,8 @@ def process_as_json( w.start() workers.append(w) + print(f"Started {len(workers)} workers") + # Wait for all workers to finish for w in workers: w.join() @@ -207,63 +224,38 @@ def copy_number_change(clinvar_json: dict) -> dict: try: # Extract required parameters from clinvar_json hgvs_expr = clinvar_json["source"] - # absolute_copies = clinvar_json["absolute_copies"] # Get baseline_copies by offsetting by one from absolute_copies if clinvar_json["variation_type"] in ["Deletion", "copy number loss"]: - # baseline_copies = absolute_copies + 1 copy_change = CopyChange.LOSS elif clinvar_json["variation_type"] in ["Duplication", "copy number gain"]: - # baseline_copies = absolute_copies - 1 copy_change = CopyChange.GAIN else: return {"errors": f"Unknown variation_type: {clinvar_json}"} - vrs_variant = asyncio.run( + result = asyncio.run( query_handler.to_copy_number_handler.hgvs_to_copy_number_change( hgvs_expr=hgvs_expr, copy_change=copy_change ) - ).copy_number_change - return vrs_variant.model_dump(exclude_none=True) + ) + if result.copy_number_change: + return result.copy_number_change.model_dump(exclude_none=True) + else: + return {"errors": result.warnings} except Exception as e: error_msg = f"Unexpected error: {e}" - logger.error(f"Exception in copy_number_count: {clinvar_json}: {error_msg}") + logger.error(f"Exception in copy_number_change: {clinvar_json}: {error_msg}") return {"errors": error_msg} -def _call_hgvs_to_copy_number_count_worker(result_queue, hgvs_expr, baseline_copies): - """Worker function for multiprocessing call to hgvs_to_copy_number_count""" - try: - # Import here to avoid issues with multiprocessing and module-level imports - import asyncio - - from variation.query import QueryHandler - - # Create a new QueryHandler instance in the worker process - query_handler = QueryHandler() - - result = asyncio.run( - query_handler.to_copy_number_handler.hgvs_to_copy_number_count( - hgvs_expr=hgvs_expr, baseline_copies=baseline_copies - ) - ) - if result.copy_number_count: - result_queue.put(result.copy_number_count.model_dump(exclude_none=True)) - else: - result_queue.put({"errors": result.warnings}) - except Exception as e: - result_queue.put({"errors": str(e)}) - - def copy_number_count(clinvar_json: dict) -> dict: """ - Create a VRS Copy Number Count variation using the variation-normalization service. + Create a VRS CopyNumberCount variation using the variation-normalization service. Returns: Dictionary containing VRS representation or error information """ - request_timeout = 5 # seconds try: # Extract required parameters from clinvar_json hgvs_expr = clinvar_json["source"] @@ -277,39 +269,15 @@ def copy_number_count(clinvar_json: dict) -> dict: else: return {"errors": f"Unknown variation_type: {clinvar_json}"} - # Create a queue for communication between processes - result_queue = multiprocessing.Queue() - - # Create and start the worker process - process = multiprocessing.Process( - target=_call_hgvs_to_copy_number_count_worker, - args=(result_queue, hgvs_expr, baseline_copies), + result = asyncio.run( + query_handler.to_copy_number_handler.hgvs_to_copy_number_count( + hgvs_expr=hgvs_expr, baseline_copies=baseline_copies + ) ) - process.start() - - # Wait for the process to complete or timeout - process.join(timeout=request_timeout) - - if process.is_alive(): - # Process didn't complete in time, forcefully terminate it - process.terminate() - process.join() # Wait for the process to actually terminate - return { - "errors": f"hgvs_to_copy_number_count call timed out after {request_timeout} seconds" - } - - # Check if the process completed successfully - if process.exitcode != 0: - return { - "errors": f"Worker process failed with exit code {process.exitcode}" - } - - # Get the result from the queue - try: - result = result_queue.get_nowait() - return result - except queue.Empty: - return {"errors": "No result received from worker process"} + if result.copy_number_count: + return result.copy_number_count.model_dump(exclude_none=True) + else: + return {"errors": result.warnings} except Exception as e: error_msg = f"Unexpected error: {e}" @@ -375,8 +343,6 @@ def main(argv=sys.argv[1:]): # Make parents os.makedirs(os.path.dirname(outfile), exist_ok=True) - # run_opts = Options(vrs_type_filter=opts.vrs_type_filter) - if opts["parallelism"] == 0: process_as_json_single_thread(local_file_name, outfile) else: @@ -384,6 +350,13 @@ def main(argv=sys.argv[1:]): if __name__ == "__main__": + # Importing and initializing the variation-normalizer QueryHandler + # requires env vars to be set and dynamodb jar to be run locally and pointed to with GENE_NORM_DB_URL + # https://github.com/clingen-data-model/architecture/tree/master/helm/charts/clingen-vicc/docker/dynamodb + # In the `dynamodb` directory above, build: + # podman build -t gene-normalizer-dynamodb:latest . + # Then run it (uses host gcloud config to authenticate to our bucket which has a snapshot of the gene database) + # podman run -it -p 8001:8000 -v $HOME/.config/gcloud:/config/gcloud -v dynamodb:/data -e DATA_DIR=/data -e GOOGLE_APPLICATION_CREDENTIALS=/config/gcloud/application_default_credentials.json -e CLOUDSDK_CONFIG=/config/gcloud gene-normalizer-dynamodb:latest creds_contents = """[default] aws_access_key_id = asdf aws_secret_access_key = asdf""" @@ -404,18 +377,6 @@ def main(argv=sys.argv[1:]): "Must set UTA_DB_URL (e.g. postgresql://anonymous@localhost:5433/uta/uta_20241220)" ) - # Import and initialize variation-normalizer QueryHandler - # Requires env vars to be set and dynamodb jar to be run locally and pointed to with GENE_NORM_DB_URL - # https://github.com/clingen-data-model/architecture/tree/master/helm/charts/clingen-vicc/docker/dynamodb - # In the `dynamodb` directory above, build: - # podman build -t gene-normalizer-dynamodb:latest . - # Then run it (uses host gcloud config to authenticate to our bucket which has a snapshot of the gene database) - # podman run -it -p 8001:8000 -v $HOME/.config/gcloud:/config/gcloud -v dynamodb:/data -e DATA_DIR=/data -e GOOGLE_APPLICATION_CREDENTIALS=/config/gcloud/application_default_credentials.json -e CLOUDSDK_CONFIG=/config/gcloud gene-normalizer-dynamodb:latest - import variation - from variation.query import QueryHandler - - query_handler = QueryHandler() - if len(sys.argv) == 1: main( [ @@ -436,5 +397,3 @@ def main(argv=sys.argv[1:]): # ) else: main(sys.argv[1:]) - main(sys.argv[1:]) - main(sys.argv[1:]) From 1c9f51ed9f5dc3a0cc6a78e39eae255451a77755 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 15:05:22 -0400 Subject: [PATCH 06/16] Add worker logging --- clinvar_gk_pilot/main.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 7947c35..7aac578 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -2,6 +2,7 @@ import contextlib import gzip import json +import logging import multiprocessing import os import pathlib @@ -95,6 +96,21 @@ def worker(file_name_gz: str, output_file_name: str) -> None: on each line, and writes the output to a new GZIP file called `output_file_name`. """ + # Set up file-specific logger + log_file_name = f"{file_name_gz}.log" + file_logger = logging.getLogger(f"worker_{os.path.basename(file_name_gz)}") + file_logger.setLevel(logging.INFO) + + # Create file handler with the same format as log_conf.json + file_handler = logging.FileHandler(log_file_name) + formatter = logging.Formatter( + fmt="%(asctime)s - %(module)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + file_handler.setFormatter(formatter) + file_logger.addHandler(file_handler) + file_logger.propagate = False # Prevent duplicate logs + with ( gzip.open(file_name_gz, "rt", encoding="utf-8") as input_file, gzip.open(output_file_name, "wt", encoding="utf-8") as output_file, @@ -114,7 +130,10 @@ def make_background_process(): background_process = make_background_process() background_process.start() + line_number = -1 for line in input_file: + line_number += 1 + file_logger.info(f"Processing line (index: {line_number}): {line}") task_queue.put(partial(process_line, line)) try: ret = return_queue.get(timeout=task_timeout) @@ -139,6 +158,10 @@ def make_background_process(): task_queue.put(None) background_process.join() + # Clean up logger handler + file_handler.close() + file_logger.removeHandler(file_handler) + def process_as_json_single_thread(input_file_name: str, output_file_name: str) -> None: with gzip.open(input_file_name, "rt", encoding="utf-8") as f_in: From e9ab18f333e0c1a7db40294fc55764fb2cd7f169 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 22 Jul 2025 15:26:54 -0400 Subject: [PATCH 07/16] Add more detailed logging of progress --- clinvar_gk_pilot/main.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 7aac578..baf04c2 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -185,17 +185,31 @@ def process_as_json( print(f"Partitioned filenames: {part_output_file_names}") workers = [] + worker_info = [] # Start a worker per file name - for part_ifn, part_ofn in zip(part_input_file_names, part_output_file_names): + for i, (part_ifn, part_ofn) in enumerate( + zip(part_input_file_names, part_output_file_names) + ): w = multiprocessing.Process(target=worker, args=(part_ifn, part_ofn)) w.start() workers.append(w) + worker_info.append((i, w, part_ifn)) - print(f"Started {len(workers)} workers") + print(f"Started {len(workers)} workers", flush=True) # Wait for all workers to finish - for w in workers: - w.join() + remaining_workers = worker_info.copy() + while remaining_workers: + for worker_idx, w, part_ifn in remaining_workers.copy(): + w.join(timeout=5) + if not w.is_alive(): + remaining_workers.remove((worker_idx, w, part_ifn)) + + if remaining_workers: + still_running = [ + f"Worker {idx} ({part_ifn})" for idx, w, part_ifn in remaining_workers + ] + print(f"Still running: {', '.join(still_running)}", flush=True) with gzip.open(output_file_name, "wt", encoding="utf-8") as f_out: for part_ofn in part_output_file_names: @@ -262,7 +276,10 @@ def copy_number_change(clinvar_json: dict) -> dict: ) ) if result.copy_number_change: - return result.copy_number_change.model_dump(exclude_none=True) + vrs_variant = result.copy_number_change + if vrs_variant.location.sequence: + vrs_variant.location.sequence = None + return vrs_variant.model_dump(exclude_none=True) else: return {"errors": result.warnings} @@ -298,7 +315,10 @@ def copy_number_count(clinvar_json: dict) -> dict: ) ) if result.copy_number_count: - return result.copy_number_count.model_dump(exclude_none=True) + vrs_variant = result.copy_number_count + if vrs_variant.location.sequence: + vrs_variant.location.sequence = None + return vrs_variant.model_dump(exclude_none=True) else: return {"errors": result.warnings} From 6f9db60b8230e623ee2c353116c1c60cf2387315 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Fri, 25 Jul 2025 15:11:33 -0400 Subject: [PATCH 08/16] Add custom initialization to variation-normalizer --- clinvar_gk_pilot/main.py | 38 ++++++++++++++++++++++++++++++++++++++ pyproject.toml | 4 +++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index baf04c2..73e0ce6 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -367,6 +367,39 @@ def partition_file_lines_gz(local_file_path_gz: str, partitions: int) -> List[st return filenames +def initialize_variation_normalizer_ref_data(): + """Download and import the variation normalizer reference data script at runtime""" + import importlib.util + import tempfile + + # URL to the script + script_url = "https://raw.githubusercontent.com/GenomicMedLab/variation-normalizer-manuscript/issue-116/analysis/download_cool_seq_tool_files.py" + + # Download the script + response = requests.get(script_url) + response.raise_for_status() + + # Create a temporary file and write the script content + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as temp_file: + temp_file.write(response.text) + temp_file_path = temp_file.name + + try: + # Import the module from the temporary file + spec = importlib.util.spec_from_file_location( + "download_cool_seq_tool_files", temp_file_path + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Call the download function + module.download_cool_seq_tool_files(is_docker_env=False) + + finally: + # Clean up the temporary file + os.unlink(temp_file_path) + + def main(argv=sys.argv[1:]): """ Process the --filename argument (expected as 'gs://..../filename.json.gz') @@ -386,6 +419,11 @@ def main(argv=sys.argv[1:]): # Make parents os.makedirs(os.path.dirname(outfile), exist_ok=True) + # Initialize the variation-normalizer to use specific snapshotted reference data. + initialize_variation_normalizer_ref_data() + for k in os.environ: + print(f"{k}:{os.environ[k]}") + if opts["parallelism"] == 0: process_as_json_single_thread(local_file_name, outfile) else: diff --git a/pyproject.toml b/pyproject.toml index 4d5d9f7..e1da952 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,9 @@ dependencies = [ "ga4gh.vrs[extras] @ git+https://github.com/ga4gh/vrs-python@2.1.3", "gunicorn==22.0.0", "flask~=3.0.3", - "variation-normalizer @ git+https://github.com/cancervariants/variation-normalization@0.15.0" + "requests~=2.0", + "variation-normalizer @ git+https://github.com/cancervariants/variation-normalization@0.15.0", + # "variation-normalizer-manuscript @ git+https://github.com/GenomicMedLab/variation-normalizer-manuscript@issue-116", ] dynamic = ["version"] From 0539deb5defd74048219842bd7cb51d734827181 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Fri, 25 Jul 2025 15:26:40 -0400 Subject: [PATCH 09/16] Remove vrs-python copynumber functions --- clinvar_gk_pilot/main.py | 32 +------------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 73e0ce6..df51a7f 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -236,21 +236,6 @@ def allele(clinvar_json: dict) -> dict: return {"errors": str(e)} -def copy_number_count_vrspython(clinvar_json: dict) -> dict: - try: - tlr = cnv_translators[clinvar_json.get("assembly_version", "38")] - kwargs = {"copies": clinvar_json["absolute_copies"]} - vrs = tlr.translate_from( - var=clinvar_json["source"], fmt=clinvar_json["fmt"], **kwargs - ) - return vrs.model_dump(exclude_none=True) - except Exception as e: - logger.error( - f"Exception raised in 'copy_number_count' processing: {clinvar_json}: {e}" - ) - return {"errors": str(e)} - - def copy_number_change(clinvar_json: dict) -> dict: """ Create a VRS CopyNumberChange variation using the variation-normalization module. @@ -328,21 +313,6 @@ def copy_number_count(clinvar_json: dict) -> dict: return {"errors": error_msg} -def copy_number_change_vrspython(clinvar_json: dict) -> dict: - try: - tlr = cnv_translators[clinvar_json.get("assembly_version", "38")] - kwargs = {"copy_change": clinvar_json["copy_change_type"]} - vrs = tlr.translate_from( - var=clinvar_json["source"], fmt=clinvar_json["fmt"], **kwargs - ) - return vrs.model_dump(exclude_none=True) - except Exception as e: - logger.error( - f"Exception raised in 'copy_number_change' processing: {clinvar_json}: {e}" - ) - return {"errors": str(e)} - - def partition_file_lines_gz(local_file_path_gz: str, partitions: int) -> List[str]: """ Split `local_file_path_gz` into `partitions` roughly equal parts by line count. @@ -376,7 +346,7 @@ def initialize_variation_normalizer_ref_data(): script_url = "https://raw.githubusercontent.com/GenomicMedLab/variation-normalizer-manuscript/issue-116/analysis/download_cool_seq_tool_files.py" # Download the script - response = requests.get(script_url) + response = requests.get(script_url, timeout=30) response.raise_for_status() # Create a temporary file and write the script content From 50addae70e3cebac8a884958820f014f972bd9b7 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Fri, 25 Jul 2025 18:06:50 -0400 Subject: [PATCH 10/16] Adding liftover support via variation-normalization --- clinvar_gk_pilot/cli.py | 5 +++ clinvar_gk_pilot/main.py | 88 ++++++++++++++++++++++++++++------------ 2 files changed, 68 insertions(+), 25 deletions(-) diff --git a/clinvar_gk_pilot/cli.py b/clinvar_gk_pilot/cli.py index ee9e6b1..7f818c1 100644 --- a/clinvar_gk_pilot/cli.py +++ b/clinvar_gk_pilot/cli.py @@ -18,4 +18,9 @@ def parse_args(args: List[str]) -> dict: "Set to 0 to run in main thread." ), ) + parser.add_argument( + "--liftover", + action="store_true", + help="Enable attempting to liftover non-GRCh38 genomic variants to GRCh38", + ) return vars(parser.parse_args(args)) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index df51a7f..ff12148 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -8,7 +8,6 @@ import pathlib import queue import sys -from dataclasses import dataclass from functools import partial from typing import List @@ -47,7 +46,7 @@ } -def process_line(line: str) -> str: +def process_line(line: str, opts: dict = None) -> str: """ Takes a line of JSON, processes it, and returns the result as a JSON string. """ @@ -56,11 +55,11 @@ def process_line(line: str) -> str: if clinvar_json.get("issue") is None: cls = clinvar_json["vrs_class"] if cls == "Allele": - result = allele(clinvar_json) + result = allele(clinvar_json, opts or {}) elif cls == "CopyNumberChange": - result = copy_number_change(clinvar_json) + result = copy_number_change(clinvar_json, opts or {}) elif cls == "CopyNumberCount": - result = copy_number_count(clinvar_json) + result = copy_number_count(clinvar_json, opts or {}) content = {"in": clinvar_json, "out": result} return json.dumps(content) @@ -90,7 +89,7 @@ def init_query_handler(): query_handler = QueryHandler() -def worker(file_name_gz: str, output_file_name: str) -> None: +def worker(file_name_gz: str, output_file_name: str, opts: dict = None) -> None: """ Takes an input file (a GZIP file of newline delimited), runs `process_line` on each line, and writes the output to a new GZIP file called `output_file_name`. @@ -134,7 +133,7 @@ def make_background_process(): for line in input_file: line_number += 1 file_logger.info(f"Processing line (index: {line_number}): {line}") - task_queue.put(partial(process_line, line)) + task_queue.put(partial(process_line, line, opts)) try: ret = return_queue.get(timeout=task_timeout) except queue.Empty: @@ -163,17 +162,19 @@ def make_background_process(): file_logger.removeHandler(file_handler) -def process_as_json_single_thread(input_file_name: str, output_file_name: str) -> None: +def process_as_json_single_thread( + input_file_name: str, output_file_name: str, opts: dict = None +) -> None: with gzip.open(input_file_name, "rt", encoding="utf-8") as f_in: with gzip.open(output_file_name, "wt", encoding="utf-8") as f_out: for line in f_in: - f_out.write(process_line(line)) + f_out.write(process_line(line, opts)) f_out.write("\n") print(f"Output written to {output_file_name}") def process_as_json( - input_file_name: str, output_file_name: str, parallelism: int + input_file_name: str, output_file_name: str, parallelism: int, opts: dict = None ) -> None: """ Process `input_file_name` in parallel and write the results to `output_file_name`. @@ -190,7 +191,7 @@ def process_as_json( for i, (part_ifn, part_ofn) in enumerate( zip(part_input_file_names, part_output_file_names) ): - w = multiprocessing.Process(target=worker, args=(part_ifn, part_ofn)) + w = multiprocessing.Process(target=worker, args=(part_ifn, part_ofn, opts)) w.start() workers.append(w) worker_info.append((i, w, part_ifn)) @@ -226,17 +227,50 @@ def process_as_json( print(f"Output written to {output_file_name}") -def allele(clinvar_json: dict) -> dict: +# def allele(clinvar_json: dict, opts: dict) -> dict: +# try: +# tlr = allele_translators[clinvar_json.get("assembly_version", "38")] +# vrs = tlr.translate_from(var=clinvar_json["source"], fmt=clinvar_json["fmt"]) +# return vrs.model_dump(exclude_none=True) +# except Exception as e: +# logger.error(f"Exception raised in 'allele' processing: {clinvar_json}: {e}") +# return {"errors": str(e)} + + +def allele(clinvar_json: dict, opts: dict) -> dict: try: - tlr = allele_translators[clinvar_json.get("assembly_version", "38")] - vrs = tlr.translate_from(var=clinvar_json["source"], fmt=clinvar_json["fmt"]) - return vrs.model_dump(exclude_none=True) + assembly_version = clinvar_json.get("assembly_version", "38") + source = clinvar_json["source"] + fmt = clinvar_json["fmt"] + + if fmt == "spdi" or not opts.get("liftover", False): + if fmt == "spdi" and assembly_version != "38": + raise ValueError( + f"Unexpected assembly '{assembly_version}' for SPDI expression {source}" + ) + return query_handler.vrs_python_tlr.translate_from( + source, fmt=fmt + ).model_dump(exclude_none=True) + elif fmt == "hgvs": + if opts.get("liftover", False): + # do /normalize. This also automatically tries to liftover to GRCh38 + result = asyncio.run( + query_handler.normalize_handler.normalize( + q=source, + ) + ) + if result.variation: + return result.variation.model_dump(exclude_none=True) + else: + return {"errors": json.dumps(result.warnings)} + else: + raise ValueError(f"Unsupported format: {fmt}") except Exception as e: logger.error(f"Exception raised in 'allele' processing: {clinvar_json}: {e}") return {"errors": str(e)} -def copy_number_change(clinvar_json: dict) -> dict: +def copy_number_change(clinvar_json: dict, opts: dict) -> dict: """ Create a VRS CopyNumberChange variation using the variation-normalization module. @@ -257,7 +291,9 @@ def copy_number_change(clinvar_json: dict) -> dict: result = asyncio.run( query_handler.to_copy_number_handler.hgvs_to_copy_number_change( - hgvs_expr=hgvs_expr, copy_change=copy_change + hgvs_expr=hgvs_expr, + copy_change=copy_change, + do_liftover=opts.get("liftover", False), ) ) if result.copy_number_change: @@ -266,7 +302,7 @@ def copy_number_change(clinvar_json: dict) -> dict: vrs_variant.location.sequence = None return vrs_variant.model_dump(exclude_none=True) else: - return {"errors": result.warnings} + return {"errors": json.dumps(result.warnings)} except Exception as e: error_msg = f"Unexpected error: {e}" @@ -274,7 +310,7 @@ def copy_number_change(clinvar_json: dict) -> dict: return {"errors": error_msg} -def copy_number_count(clinvar_json: dict) -> dict: +def copy_number_count(clinvar_json: dict, opts: dict) -> dict: """ Create a VRS CopyNumberCount variation using the variation-normalization service. @@ -296,7 +332,9 @@ def copy_number_count(clinvar_json: dict) -> dict: result = asyncio.run( query_handler.to_copy_number_handler.hgvs_to_copy_number_count( - hgvs_expr=hgvs_expr, baseline_copies=baseline_copies + hgvs_expr=hgvs_expr, + baseline_copies=baseline_copies, + do_liftover=opts.get("liftover", False), ) ) if result.copy_number_count: @@ -305,7 +343,7 @@ def copy_number_count(clinvar_json: dict) -> dict: vrs_variant.location.sequence = None return vrs_variant.model_dump(exclude_none=True) else: - return {"errors": result.warnings} + return {"errors": json.dumps(result.warnings)} except Exception as e: error_msg = f"Unexpected error: {e}" @@ -391,13 +429,13 @@ def main(argv=sys.argv[1:]): # Initialize the variation-normalizer to use specific snapshotted reference data. initialize_variation_normalizer_ref_data() - for k in os.environ: - print(f"{k}:{os.environ[k]}") + # for k in os.environ: + # print(f"{k}:{os.environ[k]}") if opts["parallelism"] == 0: - process_as_json_single_thread(local_file_name, outfile) + process_as_json_single_thread(local_file_name, outfile, opts) else: - process_as_json(local_file_name, outfile, opts["parallelism"]) + process_as_json(local_file_name, outfile, opts["parallelism"], opts) if __name__ == "__main__": From e77d98535d1fa3d5616c54c6c6a871982e3ad6ac Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Fri, 25 Jul 2025 18:30:52 -0400 Subject: [PATCH 11/16] Create one event loop per worker and send async normalizer tasks to it --- clinvar_gk_pilot/main.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index ff12148..720b402 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -1,6 +1,7 @@ import asyncio import contextlib import gzip +import importlib.util import json import logging import multiprocessing @@ -8,6 +9,7 @@ import pathlib import queue import sys +import tempfile from functools import partial from typing import List @@ -81,13 +83,26 @@ def _task_worker( return_queue.put(task()) -# Define init function to set up QueryHandler in this process +# Define init function to set up QueryHandler and event loop in this process def init_query_handler(): from variation.query import QueryHandler - global query_handler + global query_handler, event_loop query_handler = QueryHandler() + # Create a persistent event loop for this worker process + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + + +def run_async_with_persistent_loop(coro): + """ + Run an async coroutine using the persistent event loop for this worker process. + This replaces asyncio.run() calls to avoid creating/destroying event loops repeatedly. + """ + global event_loop + return event_loop.run_until_complete(coro) + def worker(file_name_gz: str, output_file_name: str, opts: dict = None) -> None: """ @@ -254,7 +269,7 @@ def allele(clinvar_json: dict, opts: dict) -> dict: elif fmt == "hgvs": if opts.get("liftover", False): # do /normalize. This also automatically tries to liftover to GRCh38 - result = asyncio.run( + result = run_async_with_persistent_loop( query_handler.normalize_handler.normalize( q=source, ) @@ -289,7 +304,7 @@ def copy_number_change(clinvar_json: dict, opts: dict) -> dict: else: return {"errors": f"Unknown variation_type: {clinvar_json}"} - result = asyncio.run( + result = run_async_with_persistent_loop( query_handler.to_copy_number_handler.hgvs_to_copy_number_change( hgvs_expr=hgvs_expr, copy_change=copy_change, @@ -330,7 +345,7 @@ def copy_number_count(clinvar_json: dict, opts: dict) -> dict: else: return {"errors": f"Unknown variation_type: {clinvar_json}"} - result = asyncio.run( + result = run_async_with_persistent_loop( query_handler.to_copy_number_handler.hgvs_to_copy_number_count( hgvs_expr=hgvs_expr, baseline_copies=baseline_copies, @@ -377,9 +392,6 @@ def partition_file_lines_gz(local_file_path_gz: str, partitions: int) -> List[st def initialize_variation_normalizer_ref_data(): """Download and import the variation normalizer reference data script at runtime""" - import importlib.util - import tempfile - # URL to the script script_url = "https://raw.githubusercontent.com/GenomicMedLab/variation-normalizer-manuscript/issue-116/analysis/download_cool_seq_tool_files.py" @@ -429,8 +441,6 @@ def main(argv=sys.argv[1:]): # Initialize the variation-normalizer to use specific snapshotted reference data. initialize_variation_normalizer_ref_data() - # for k in os.environ: - # print(f"{k}:{os.environ[k]}") if opts["parallelism"] == 0: process_as_json_single_thread(local_file_name, outfile, opts) @@ -475,14 +485,5 @@ def main(argv=sys.argv[1:]): "2", ] ) - - # main( - # [ - # "--filename", - # "vi-100000.json.gz", - # "--parallelism", - # "1", - # ] - # ) else: main(sys.argv[1:]) From 62c803c0aa324794b718e7f3ffb649fe266853d2 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Mon, 28 Jul 2025 10:22:09 -0400 Subject: [PATCH 12/16] Remove location.sequence from Alleles --- clinvar_gk_pilot/main.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 720b402..c1ac03b 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -263,9 +263,10 @@ def allele(clinvar_json: dict, opts: dict) -> dict: raise ValueError( f"Unexpected assembly '{assembly_version}' for SPDI expression {source}" ) - return query_handler.vrs_python_tlr.translate_from( - source, fmt=fmt - ).model_dump(exclude_none=True) + vrs_variant = query_handler.vrs_python_tlr.translate_from(source, fmt=fmt) + if vrs_variant.location.sequence: + vrs_variant.location.sequence = None + return vrs_variant.model_dump(exclude_none=True) elif fmt == "hgvs": if opts.get("liftover", False): # do /normalize. This also automatically tries to liftover to GRCh38 @@ -275,7 +276,10 @@ def allele(clinvar_json: dict, opts: dict) -> dict: ) ) if result.variation: - return result.variation.model_dump(exclude_none=True) + vrs_variant = result.variation + if vrs_variant.location.sequence: + vrs_variant.location.sequence = None + return vrs_variant.model_dump(exclude_none=True) else: return {"errors": json.dumps(result.warnings)} else: From 6d69634523795ab2b082aa15e59bec6577e3ae25 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Mon, 28 Jul 2025 15:02:11 -0400 Subject: [PATCH 13/16] Update CLI test --- test/test_cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_cli.py b/test/test_cli.py index 8505376..4d7a7ab 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -6,4 +6,5 @@ def test_parse_args(): opts = parse_args(argv) assert opts["filename"] == "test.txt" assert opts["parallelism"] == 1 - assert len(opts) == 2 + assert opts["liftover"] is False + assert len(opts) == 3 From 35dee0f61739343d810b0a791fbe29fb72705783 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Fri, 1 Aug 2025 01:05:24 -0400 Subject: [PATCH 14/16] Stop filtering out inputs with 'issue' value. For exceptions recorded as errors, wrap in repr() --- clinvar_gk_pilot/main.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index c1ac03b..f68e1cb 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -54,7 +54,8 @@ def process_line(line: str, opts: dict = None) -> str: """ clinvar_json = json.loads(line) result = None - if clinvar_json.get("issue") is None: + # if clinvar_json.get("issue") is None: + if True: cls = clinvar_json["vrs_class"] if cls == "Allele": result = allele(clinvar_json, opts or {}) @@ -242,16 +243,6 @@ def process_as_json( print(f"Output written to {output_file_name}") -# def allele(clinvar_json: dict, opts: dict) -> dict: -# try: -# tlr = allele_translators[clinvar_json.get("assembly_version", "38")] -# vrs = tlr.translate_from(var=clinvar_json["source"], fmt=clinvar_json["fmt"]) -# return vrs.model_dump(exclude_none=True) -# except Exception as e: -# logger.error(f"Exception raised in 'allele' processing: {clinvar_json}: {e}") -# return {"errors": str(e)} - - def allele(clinvar_json: dict, opts: dict) -> dict: try: assembly_version = clinvar_json.get("assembly_version", "38") @@ -285,8 +276,9 @@ def allele(clinvar_json: dict, opts: dict) -> dict: else: raise ValueError(f"Unsupported format: {fmt}") except Exception as e: - logger.error(f"Exception raised in 'allele' processing: {clinvar_json}: {e}") - return {"errors": str(e)} + error_msg = f"Unexpected error: {repr(e)}" + logger.error(f"Exception in allele: {clinvar_json}: {error_msg}") + return {"errors": error_msg} def copy_number_change(clinvar_json: dict, opts: dict) -> dict: @@ -324,7 +316,7 @@ def copy_number_change(clinvar_json: dict, opts: dict) -> dict: return {"errors": json.dumps(result.warnings)} except Exception as e: - error_msg = f"Unexpected error: {e}" + error_msg = f"Unexpected error: {repr(e)}" logger.error(f"Exception in copy_number_change: {clinvar_json}: {error_msg}") return {"errors": error_msg} @@ -365,7 +357,7 @@ def copy_number_count(clinvar_json: dict, opts: dict) -> dict: return {"errors": json.dumps(result.warnings)} except Exception as e: - error_msg = f"Unexpected error: {e}" + error_msg = f"Unexpected error: {repr(e)}" logger.error(f"Exception in copy_number_count: {clinvar_json}: {error_msg}") return {"errors": error_msg} From c460184a4fb289d9d0dcaa56930dfdcfc1912f11 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 6 Aug 2025 12:56:17 -0400 Subject: [PATCH 15/16] Add to README --- README.md | 153 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 152 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 803b4b9..80b2a6b 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,153 @@ # clinvar-gk-python -Project for reading and normalizing ClinVar variants into GA4GH GKS forms + +Project for reading and normalizing ClinVar variants into GA4GH GKS forms. + +## Setup + +### Prerequisites + +1. **Docker** - Required to run the variation-normalization services +2. **Python 3.11+** - Required for the main application +3. **SeqRepo database** - Local sequence repository + +### Database Services Setup + +This project requires several database services that can be easily set up using the Docker compose configuration from the variation-normalization project. + +1. Download the compose.yaml file from variation-normalization v0.15.0 (matching the version in pyproject.toml): + +```bash +curl -o variation-normalizer-compose.yaml https://raw.githubusercontent.com/cancervariants/variation-normalization/0.15.0/compose.yaml +``` + +2. Start the required services: + +```bash +docker compose -f variation-normalizer-compose.yaml up -d +``` + +This will start: +- **UTA database** (port 5432): Universal Transcript Archive for transcript mapping +- **Gene Normalizer database** (port 8000): Gene normalization service +- **Variation Normalizer API** (port 8001): Variation normalization service + +**Note on Port Conflicts**: If you already have services running on these ports, you can modify the port mappings in `variation-normalizer-compose.yaml`: +- For UTA database: Change `5432:5432` to `5433:5432` (or another available port) +- For Gene Normalizer: Change `8000:8000` to `8002:8000` (or another available port) +- For Variation Normalizer API: Change `8001:80` to `8003:80` (or another available port) + +### Environment Configuration + +Set up the required environment variables. You can use the provided `env.sh` as a reference: + +```bash +# SeqRepo configuration - Update path to your local SeqRepo installation +export SEQREPO_ROOT_DIR=/Users/kferrite/dev/data/seqrepo/2024-12-20 +export SEQREPO_DATAPROXY_URL=seqrepo+file://${SEQREPO_ROOT_DIR} + +# Database URLs (using the Docker compose services) +export UTA_DB_URL=postgresql://anonymous:anonymous@localhost:5432/uta/uta_20241220 +export GENE_NORM_DB_URL=http://localhost:8000 +``` + +**Important**: If you modified the ports in the compose file, update the corresponding environment variables accordingly (e.g., change `5432` to `5433` in `UTA_DB_URL` if you changed the UTA port). + +Or source the provided environment file: +```bash +source env.sh +``` + +### Python Installation + +Install the project and its dependencies: + +```bash +pip install -e '.[dev]' +``` + +## Running + +### Basic Usage + +Process a ClinVar variants file: + +```bash +python clinvar_gk_pilot/main.py --filename gs://clinvar-gks/2025-07-06/dev/vi.json.gz --parallelism 4 +``` + +### Alternative Entry Point + +You can also use the installed command: + +```bash +clinvar-gk-pilot --filename gs://clinvar-gks/2025-07-06/dev/vi.json.gz --parallelism 4 +``` + +### Command Line Options + +- `--filename`: Input file path (supports local files and gs:// URLs) +- `--parallelism`: Number of worker processes for parallel processing (default: 1) +- `--liftover`: Enable liftover functionality for genomic coordinate conversion + +### Example Commands + +Process a local file: +```bash +clinvar-gk-pilot --filename sample-input.ndjson.gz --parallelism 2 +``` + +Process a file from Google Cloud Storage: +```bash +clinvar-gk-pilot --filename gs://clinvar-gks/2025-07-06/dev/vi.json.gz --parallelism 4 +``` + +Process with liftover enabled: +```bash +clinvar-gk-pilot --filename gs://clinvar-gks/2025-07-06/dev/vi.json.gz --parallelism 2 --liftover +``` + +### Important Notes on Liftover + +When using the `--liftover` option, the application will send queries to the UTA PostgreSQL database for genomic coordinate conversion. Due to Docker's default shared memory constraints, high parallelism combined with liftover can cause out-of-memory errors. + +**Recommendations:** +- Keep `--parallelism` on the lower side (2-4) when using `--liftover` +- Alternatively, increase the `shm_size` for the UTA container in `variation-normalizer-compose.yaml`: + +```yaml +services: + uta: + # ... other configuration + shm_size: 256m # Add this line to increase shared memory to 256MB +``` + +## Development + +### Testing + +Run the test suite: +```bash +pytest +``` + +Run specific tests: +```bash +pytest test/test_cli.py::test_parse_args +``` + +### Code Quality + +Check and fix code quality issues: +```bash +# Check code quality +./lint.sh + +# Apply automatic fixes +./lint.sh apply +``` + +The lint script runs: +- black (code formatting) +- isort (import sorting) +- ruff (fast linter) +- pylint (code analysis) From 7b5a0f15cfaf61a5663867f2733ba400223e9665 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 6 Aug 2025 12:57:48 -0400 Subject: [PATCH 16/16] Update README --- README.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 80b2a6b..cb3ada4 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Set up the required environment variables. You can use the provided `env.sh` as ```bash # SeqRepo configuration - Update path to your local SeqRepo installation -export SEQREPO_ROOT_DIR=/Users/kferrite/dev/data/seqrepo/2024-12-20 +export SEQREPO_ROOT_DIR=/usr/local/share/seqrepo/2024-12-20 export SEQREPO_DATAPROXY_URL=seqrepo+file://${SEQREPO_ROOT_DIR} # Database URLs (using the Docker compose services) @@ -147,7 +147,4 @@ Check and fix code quality issues: ``` The lint script runs: -- black (code formatting) -- isort (import sorting) -- ruff (fast linter) -- pylint (code analysis) +- black, isort, ruff, pylint