Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/submit_transfer_task_async_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# scripts/submit_transfer_task_async_example.py
import uuid
from pathlib import Path

from prefect import flow, get_run_logger

from orchestration.flows.bl832.config import Config832
from orchestration.globus.transfer import start_transfer
from orchestration.transfer_controller import globus_transfer_task


@flow(name="submit_transfer_task_async_example")
def submit_transfer_task_async_example(file_path: str = "/raw/transfer_tests/test.txt"):
"""
Example flow showing how to submit a transfer task asynchronously, allowing the flow to do other work while waiting
for the transfer to complete. In this example, we copy a file within spot832 first (blocking),
then submit an asynchronous transfer from spot832 to data832.
While waiting for that transfer to complete, we do some other work (simulated with log messages).
Once the first transfer is done, we check if it was successful before submitting another asynchronous transfer
from data832 to nersc832, again doing other work while waiting for it to complete.
"""
logger = get_run_logger()
config = Config832()

# Copy file to a uniquely-named file in the same folder
file = Path(file_path)
new_file = str(file.with_name(f"test_{str(uuid.uuid4())}.txt"))
logger.info(f"New file: {new_file}")

# Copy within spot832 (blocking — we need this file to exist before transferring it)
success, _ = start_transfer(
config.tc, config.spot832, file_path, config.spot832, new_file, logger=logger
)
logger.info(f"spot832 internal copy success: {success}")

# Fire off spot832 -> data832, don't wait
logger.info("Submitting spot832 -> data832 transfer, not waiting...")
spot_to_data_future = globus_transfer_task.submit(
file_path=new_file,
source=config.spot832,
destination=config.data832,
config=config,
)
logger.info("spot832 -> data832 submitted. Moving on immediately.")

logger.info("Doing other things while spot832 -> data832 runs...")
logger.info("... still doing other things ...")

# Must wait for spot->data before submitting data->nersc (real data dependency)
logger.info("Waiting for spot832 -> data832 to complete before submitting to nersc...")
spot_to_data_success = spot_to_data_future.result()
logger.info(f"spot832 -> data832: {spot_to_data_success}")

if not spot_to_data_success:
logger.error("spot832 -> data832 failed, skipping data832 -> nersc832 transfer.")
return

# Now safe to submit data832 -> nersc832
logger.info("Submitting data832 -> nersc832 transfer, not waiting...")
data_to_nersc_future = globus_transfer_task.submit(
file_path=new_file,
source=config.data832,
destination=config.nersc832,
config=config,
)
logger.info("data832 -> nersc832 submitted. Moving on immediately.")

logger.info("... doing more things while data832 -> nersc832 runs ...")

data_to_nersc_success = data_to_nersc_future.result()
logger.info(f"data832 -> nersc832: {data_to_nersc_success}")


if __name__ == "__main__":
submit_transfer_task_async_example()
26 changes: 26 additions & 0 deletions orchestration/transfer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Generic, TypeVar, Optional

import globus_sdk
from prefect import task

from orchestration.config import BeamlineConfig
from orchestration.globus.transfer import GlobusEndpoint, start_transfer
Expand Down Expand Up @@ -409,3 +410,28 @@ def get_transfer_controller(
return SimpleTransferController(config)
else:
raise ValueError(f"Invalid transfer type: {transfer_type}")


@task(name="globus_transfer_task", task_run_name="transfer: {file_path} → {destination.name}")
def globus_transfer_task(
file_path: str,
source: GlobusEndpoint,
destination: GlobusEndpoint,
config: BeamlineConfig,
prometheus_metrics: Optional[PrometheusMetrics] = None
) -> bool:
"""
Perform a Globus transfer task.

Args:
file_path (str): The path of the file to copy.
source (GlobusEndpoint): The source endpoint.
destination (GlobusEndpoint): The destination endpoint.
config (BeamlineConfig): The configuration object.
prometheus_metrics (Optional[PrometheusMetrics]): Prometheus metrics object for collecting transfer metrics.

Returns:
bool: True if the transfer was successful, False otherwise.
"""
transfer_controller = get_transfer_controller(CopyMethod.GLOBUS, config, prometheus_metrics)
return transfer_controller.copy(file_path=file_path, source=source, destination=destination)