From 5bb486503a14d3a78d83cf09602784b4345161cc Mon Sep 17 00:00:00 2001 From: David Abramov Date: Sat, 21 Feb 2026 14:39:45 -0800 Subject: [PATCH 1/5] Adding a test and adding a task for concurrent transfer submission --- .../_tests/submit_transfer_task_test.py | 67 +++++++++++++++++++ orchestration/transfer_controller.py | 26 +++++++ 2 files changed, 93 insertions(+) create mode 100644 orchestration/_tests/submit_transfer_task_test.py diff --git a/orchestration/_tests/submit_transfer_task_test.py b/orchestration/_tests/submit_transfer_task_test.py new file mode 100644 index 00000000..4a5f189e --- /dev/null +++ b/orchestration/_tests/submit_transfer_task_test.py @@ -0,0 +1,67 @@ +# _tests/submit_transfer_task_test.py +import uuid +from pathlib import Path + +from prefect import flow, get_run_logger + +from orchestration.flows.bl832.config import Config832 +from orchestration.globus.transfer import start_transfer +from orchestration.transfer_controller import globus_transfer_task + + +@flow(name="submit_transfer_task_test") +def submit_transfer_task_test(file_path: str = "/raw/transfer_tests/test.txt"): + logger = get_run_logger() + config = Config832() + + # Copy file to a uniquely-named file in the same folder + file = Path(file_path) + new_file = str(file.with_name(f"test_{str(uuid.uuid4())}.txt")) + logger.info(f"New file: {new_file}") + + # Copy within spot832 (blocking — we need this file to exist before transferring it) + success, _ = start_transfer( + config.tc, config.spot832, file_path, config.spot832, new_file, logger=logger + ) + logger.info(f"spot832 internal copy success: {success}") + + # Fire off spot832 -> data832, don't wait + logger.info("Submitting spot832 -> data832 transfer, not waiting...") + spot_to_data_future = globus_transfer_task.submit( + file_path=new_file, + source=config.spot832, + destination=config.data832, + config=config, + ) + logger.info("spot832 -> data832 submitted. Moving on immediately.") + + logger.info("Doing other things while spot832 -> data832 runs...") + logger.info("... still doing other things ...") + + # Must wait for spot->data before submitting data->nersc (real data dependency) + logger.info("Waiting for spot832 -> data832 to complete before submitting to nersc...") + spot_to_data_success = spot_to_data_future.result() + logger.info(f"spot832 -> data832: {spot_to_data_success}") + + if not spot_to_data_success: + logger.error("spot832 -> data832 failed, skipping data832 -> nersc832 transfer.") + return + + # Now safe to submit data832 -> nersc832 + logger.info("Submitting data832 -> nersc832 transfer, not waiting...") + data_to_nersc_future = globus_transfer_task.submit( + file_path=new_file, + source=config.data832, + destination=config.nersc832, + config=config, + ) + logger.info("data832 -> nersc832 submitted. Moving on immediately.") + + logger.info("... doing more things while data832 -> nersc832 runs ...") + + data_to_nersc_success = data_to_nersc_future.result() + logger.info(f"data832 -> nersc832: {data_to_nersc_success}") + + +if __name__ == "__main__": + submit_transfer_task_test() diff --git a/orchestration/transfer_controller.py b/orchestration/transfer_controller.py index afd4bebb..8ac7acdb 100644 --- a/orchestration/transfer_controller.py +++ b/orchestration/transfer_controller.py @@ -8,6 +8,7 @@ from typing import Generic, TypeVar, Optional import globus_sdk +from prefect import task from orchestration.config import BeamlineConfig from orchestration.globus.transfer import GlobusEndpoint, start_transfer @@ -409,3 +410,28 @@ def get_transfer_controller( return SimpleTransferController(config) else: raise ValueError(f"Invalid transfer type: {transfer_type}") + + +@task(name="globus_transfer_task", task_run_name="transfer: {file_path} → {destination.name}") +def globus_transfer_task( + file_path: str, + source: GlobusEndpoint, + destination: GlobusEndpoint, + config: BeamlineConfig, + prometheus_metrics: Optional[PrometheusMetrics] = None +) -> bool: + """ + Perform a Globus transfer task. + + Args: + file_path (str): The path of the file to copy. + source (GlobusEndpoint): The source endpoint. + destination (GlobusEndpoint): The destination endpoint. + config (BeamlineConfig): The configuration object. + prometheus_metrics (Optional[PrometheusMetrics]): Prometheus metrics object for collecting transfer metrics. + + Returns: + bool: True if the transfer was successful, False otherwise. + """ + transfer_controller = get_transfer_controller(CopyMethod.GLOBUS, config, prometheus_metrics) + return transfer_controller.copy(file_path=file_path, source=source, destination=destination) From 75a023cc0f9068eef618a0fb65380b404d6cc9ab Mon Sep 17 00:00:00 2001 From: David Abramov Date: Sat, 21 Feb 2026 15:20:54 -0800 Subject: [PATCH 2/5] moving submit_transfer_task_test.py to scripts/ so it isn't tried to run as a pytest --- {orchestration/_tests => scripts}/submit_transfer_task_test.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {orchestration/_tests => scripts}/submit_transfer_task_test.py (100%) diff --git a/orchestration/_tests/submit_transfer_task_test.py b/scripts/submit_transfer_task_test.py similarity index 100% rename from orchestration/_tests/submit_transfer_task_test.py rename to scripts/submit_transfer_task_test.py From 7de0e3062df33192ba0aadc2c322f6723bbf2a9c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Mar 2026 10:35:02 -0700 Subject: [PATCH 3/5] renaming and adding docstring --- ....py => submit_transfer_task_async_example.py} | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) rename scripts/{submit_transfer_task_test.py => submit_transfer_task_async_example.py} (73%) diff --git a/scripts/submit_transfer_task_test.py b/scripts/submit_transfer_task_async_example.py similarity index 73% rename from scripts/submit_transfer_task_test.py rename to scripts/submit_transfer_task_async_example.py index 4a5f189e..f4c4fae4 100644 --- a/scripts/submit_transfer_task_test.py +++ b/scripts/submit_transfer_task_async_example.py @@ -1,4 +1,4 @@ -# _tests/submit_transfer_task_test.py +# scripts/submit_transfer_task_async_example.py import uuid from pathlib import Path @@ -9,8 +9,16 @@ from orchestration.transfer_controller import globus_transfer_task -@flow(name="submit_transfer_task_test") -def submit_transfer_task_test(file_path: str = "/raw/transfer_tests/test.txt"): +@flow(name="submit_transfer_task_async_example") +def submit_transfer_task_async_example(file_path: str = "/raw/transfer_tests/test.txt"): + """ + Example flow showing how to submit a transfer task asynchronously, allowing the flow to do other work while waiting + for the transfer to complete. In this example, we copy a file within spot832 first (blocking), + then submit an asynchronous transfer from spot832 to data832. + While waiting for that transfer to complete, we do some other work (simulated with log messages). + Once the first transfer is done, we check if it was successful before submitting another asynchronous transfer + from data832 to nersc832, again doing other work while waiting for it to complete. + """ logger = get_run_logger() config = Config832() @@ -64,4 +72,4 @@ def submit_transfer_task_test(file_path: str = "/raw/transfer_tests/test.txt"): if __name__ == "__main__": - submit_transfer_task_test() + submit_transfer_task_async_example() From d3df22dc17663a0fe4582223a599ce6fb6abb1cb Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Mar 2026 10:36:03 -0700 Subject: [PATCH 4/5] linting --- scripts/submit_transfer_task_async_example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/submit_transfer_task_async_example.py b/scripts/submit_transfer_task_async_example.py index f4c4fae4..3e8ce89a 100644 --- a/scripts/submit_transfer_task_async_example.py +++ b/scripts/submit_transfer_task_async_example.py @@ -11,7 +11,7 @@ @flow(name="submit_transfer_task_async_example") def submit_transfer_task_async_example(file_path: str = "/raw/transfer_tests/test.txt"): - """ + """ Example flow showing how to submit a transfer task asynchronously, allowing the flow to do other work while waiting for the transfer to complete. In this example, we copy a file within spot832 first (blocking), then submit an asynchronous transfer from spot832 to data832. From 798030ade58c9fc160927ad482af0ccf4e6f4e8c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Mar 2026 10:54:42 -0700 Subject: [PATCH 5/5] moving the async example from scripts/ to examples/ --- {scripts => examples}/submit_transfer_task_async_example.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {scripts => examples}/submit_transfer_task_async_example.py (100%) diff --git a/scripts/submit_transfer_task_async_example.py b/examples/submit_transfer_task_async_example.py similarity index 100% rename from scripts/submit_transfer_task_async_example.py rename to examples/submit_transfer_task_async_example.py