diff --git a/examples/submit_transfer_task_async_example.py b/examples/submit_transfer_task_async_example.py new file mode 100644 index 00000000..3e8ce89a --- /dev/null +++ b/examples/submit_transfer_task_async_example.py @@ -0,0 +1,75 @@ +# scripts/submit_transfer_task_async_example.py +import uuid +from pathlib import Path + +from prefect import flow, get_run_logger + +from orchestration.flows.bl832.config import Config832 +from orchestration.globus.transfer import start_transfer +from orchestration.transfer_controller import globus_transfer_task + + +@flow(name="submit_transfer_task_async_example") +def submit_transfer_task_async_example(file_path: str = "/raw/transfer_tests/test.txt"): + """ + Example flow showing how to submit a transfer task asynchronously, allowing the flow to do other work while waiting + for the transfer to complete. In this example, we copy a file within spot832 first (blocking), + then submit an asynchronous transfer from spot832 to data832. + While waiting for that transfer to complete, we do some other work (simulated with log messages). + Once the first transfer is done, we check if it was successful before submitting another asynchronous transfer + from data832 to nersc832, again doing other work while waiting for it to complete. + """ + logger = get_run_logger() + config = Config832() + + # Copy file to a uniquely-named file in the same folder + file = Path(file_path) + new_file = str(file.with_name(f"test_{str(uuid.uuid4())}.txt")) + logger.info(f"New file: {new_file}") + + # Copy within spot832 (blocking — we need this file to exist before transferring it) + success, _ = start_transfer( + config.tc, config.spot832, file_path, config.spot832, new_file, logger=logger + ) + logger.info(f"spot832 internal copy success: {success}") + + # Fire off spot832 -> data832, don't wait + logger.info("Submitting spot832 -> data832 transfer, not waiting...") + spot_to_data_future = globus_transfer_task.submit( + file_path=new_file, + source=config.spot832, + destination=config.data832, + config=config, + ) + logger.info("spot832 -> data832 submitted. Moving on immediately.") + + logger.info("Doing other things while spot832 -> data832 runs...") + logger.info("... still doing other things ...") + + # Must wait for spot->data before submitting data->nersc (real data dependency) + logger.info("Waiting for spot832 -> data832 to complete before submitting to nersc...") + spot_to_data_success = spot_to_data_future.result() + logger.info(f"spot832 -> data832: {spot_to_data_success}") + + if not spot_to_data_success: + logger.error("spot832 -> data832 failed, skipping data832 -> nersc832 transfer.") + return + + # Now safe to submit data832 -> nersc832 + logger.info("Submitting data832 -> nersc832 transfer, not waiting...") + data_to_nersc_future = globus_transfer_task.submit( + file_path=new_file, + source=config.data832, + destination=config.nersc832, + config=config, + ) + logger.info("data832 -> nersc832 submitted. Moving on immediately.") + + logger.info("... doing more things while data832 -> nersc832 runs ...") + + data_to_nersc_success = data_to_nersc_future.result() + logger.info(f"data832 -> nersc832: {data_to_nersc_success}") + + +if __name__ == "__main__": + submit_transfer_task_async_example() diff --git a/orchestration/transfer_controller.py b/orchestration/transfer_controller.py index afd4bebb..8ac7acdb 100644 --- a/orchestration/transfer_controller.py +++ b/orchestration/transfer_controller.py @@ -8,6 +8,7 @@ from typing import Generic, TypeVar, Optional import globus_sdk +from prefect import task from orchestration.config import BeamlineConfig from orchestration.globus.transfer import GlobusEndpoint, start_transfer @@ -409,3 +410,28 @@ def get_transfer_controller( return SimpleTransferController(config) else: raise ValueError(f"Invalid transfer type: {transfer_type}") + + +@task(name="globus_transfer_task", task_run_name="transfer: {file_path} → {destination.name}") +def globus_transfer_task( + file_path: str, + source: GlobusEndpoint, + destination: GlobusEndpoint, + config: BeamlineConfig, + prometheus_metrics: Optional[PrometheusMetrics] = None +) -> bool: + """ + Perform a Globus transfer task. + + Args: + file_path (str): The path of the file to copy. + source (GlobusEndpoint): The source endpoint. + destination (GlobusEndpoint): The destination endpoint. + config (BeamlineConfig): The configuration object. + prometheus_metrics (Optional[PrometheusMetrics]): Prometheus metrics object for collecting transfer metrics. + + Returns: + bool: True if the transfer was successful, False otherwise. + """ + transfer_controller = get_transfer_controller(CopyMethod.GLOBUS, config, prometheus_metrics) + return transfer_controller.copy(file_path=file_path, source=source, destination=destination)