From 501eea223b540e6ed1986c332bde7bda3d384934 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Feb 2026 12:07:17 -0800 Subject: [PATCH 1/5] Updating dispatcher to include options for aclf-seg and nersc-seg forge flows --- orchestration/flows/bl832/dispatcher.py | 49 ++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cf1d0c64..8964b441 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,9 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -25,7 +28,12 @@ class FlowParameterMapper: # From nersc.py "nersc_recon_flow/nersc_recon_flow": [ "file_path", - "config"] + "config"], + "nersc_recon_multinode_flow/nersc_recon_multinode_flow": [ + "file_path", + "num_nodes", + "config" + ] } @classmethod @@ -55,23 +63,36 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") -def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict: +def setup_decision_settings( + alcf_recon: bool, + alcf_forge_recon_segment: bool, + nersc_recon: bool, + nersc_recon_multinode: bool, + new_file_832: bool +) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. - :param nersc_move: Boolean indicating whether to move files to NERSC. + :param nersc_recon_multinode: Boolean indicating whether to run the NERSC multinode reconstruction flow. + :param new_file_832: Boolean indicating whether to run the new 832 file processing flow. :return: A dictionary containing the settings for each flow. """ logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " - f"nersc_recon={nersc_recon}, new_file_832={new_file_832}") + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"nersc_recon={nersc_recon}, " + f"nersc_recon_multinode={nersc_recon_multinode}, " + f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, + "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -145,10 +166,22 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) + if decision_settings.get("nersc_recon_multinode_flow/nersc_recon_multinode_flow"): + nersc_multinode_params = FlowParameterMapper.get_flow_parameters( + "nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params) + tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params)) + # Run ALCF and NERSC flows in parallel, if any if tasks: try: @@ -169,7 +202,13 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) + setup_decision_settings( + alcf_recon=True, + alcf_forge_recon_segment=False, + nersc_recon=True, + nersc_recon_multinode=False, + new_file_832=True + ) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC From c5391d4e18e7367447ade1a50311f15c10928fc3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Feb 2026 12:09:44 -0800 Subject: [PATCH 2/5] Setting defaults for setup_decision_settings --- orchestration/flows/bl832/dispatcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 8964b441..5945f23b 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -64,11 +64,11 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") def setup_decision_settings( - alcf_recon: bool, - alcf_forge_recon_segment: bool, - nersc_recon: bool, - nersc_recon_multinode: bool, - new_file_832: bool + alcf_recon: bool = False, + alcf_forge_recon_segment: bool = False, + nersc_recon: bool = False, + nersc_recon_multinode: bool = False, + new_file_832: bool = True ) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. From ccbfb45ce33a99f60e68bb9af7c98ba9a3950772 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 10 Feb 2026 10:28:36 -0800 Subject: [PATCH 3/5] Making multinode recon and multinode recon+seg on nersc separate options --- orchestration/flows/bl832/dispatcher.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 5945f23b..595a3a43 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -32,8 +32,11 @@ class FlowParameterMapper: "nersc_recon_multinode_flow/nersc_recon_multinode_flow": [ "file_path", "num_nodes", - "config" - ] + "config"], + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": [ + "file_path", + "num_nodes", + "config"] } @classmethod @@ -68,6 +71,7 @@ def setup_decision_settings( alcf_forge_recon_segment: bool = False, nersc_recon: bool = False, nersc_recon_multinode: bool = False, + nersc_forge_recon_segment: bool = False, new_file_832: bool = True ) -> dict: """ @@ -86,6 +90,7 @@ def setup_decision_settings( f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " f"nersc_recon={nersc_recon}, " f"nersc_recon_multinode={nersc_recon_multinode}, " + f"nersc_forge_recon_segment={nersc_forge_recon_segment}, " f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { @@ -93,6 +98,7 @@ def setup_decision_settings( "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": nersc_forge_recon_segment, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -182,6 +188,11 @@ async def dispatcher( "nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params)) + if decision_settings.get("nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow"): + nersc_forge_recon_segment_params = FlowParameterMapper.get_flow_parameters( + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", available_params) + tasks.append(run_recon_flow_async( + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", nersc_forge_recon_segment_params)) # Run ALCF and NERSC flows in parallel, if any if tasks: try: @@ -207,6 +218,7 @@ async def dispatcher( alcf_forge_recon_segment=False, nersc_recon=True, nersc_recon_multinode=False, + nersc_forge_recon_segment=False, new_file_832=True ) # Run the main decision flow with the specified parameters From 63497377e670907bdfe0d1daea77c1ba772b5a66 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 20 Feb 2026 11:01:11 -0800 Subject: [PATCH 4/5] updating dispatcher for the multiseg flow at alcf --- orchestration/flows/bl832/dispatcher.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 595a3a43..08423bfe 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -20,6 +20,9 @@ class FlowParameterMapper: "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ "file_path", "config"], + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -69,6 +72,7 @@ class DecisionFlowInputModel(BaseModel): def setup_decision_settings( alcf_recon: bool = False, alcf_forge_recon_segment: bool = False, + alcf_forge_recon_multisegment: bool = False, nersc_recon: bool = False, nersc_recon_multinode: bool = False, nersc_forge_recon_segment: bool = False, @@ -79,8 +83,10 @@ def setup_decision_settings( :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow. + :param alcf_forge_recon_multisegment: Boolean indicating whether to run the ALCF Forge reconstruction multisegment flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. :param nersc_recon_multinode: Boolean indicating whether to run the NERSC multinode reconstruction flow. + :param nersc_forge_recon_segment: Boolean indicating whether to run the NERSC Forge reconstruction segment flow. :param new_file_832: Boolean indicating whether to run the new 832 file processing flow. :return: A dictionary containing the settings for each flow. """ @@ -88,6 +94,7 @@ def setup_decision_settings( try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"alcf_forge_recon_multisegment={alcf_forge_recon_multisegment}, " f"nersc_recon={nersc_recon}, " f"nersc_recon_multinode={nersc_recon_multinode}, " f"nersc_forge_recon_segment={nersc_forge_recon_segment}, " @@ -96,6 +103,7 @@ def setup_decision_settings( settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": alcf_forge_recon_multisegment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": nersc_forge_recon_segment, @@ -179,6 +187,14 @@ async def dispatcher( ) tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + if decision_settings.get("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow", + alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) @@ -216,6 +232,7 @@ async def dispatcher( setup_decision_settings( alcf_recon=True, alcf_forge_recon_segment=False, + alcf_forge_recon_multisegment=False, nersc_recon=True, nersc_recon_multinode=False, nersc_forge_recon_segment=False, From 146e48e8ca9e32a30aca3110eb2b8eef8c8155ef Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 20 Feb 2026 11:30:30 -0800 Subject: [PATCH 5/5] updating dispatcher for the multiseg flow at nersc --- orchestration/flows/bl832/dispatcher.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 08423bfe..07000cfe 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -37,6 +37,10 @@ class FlowParameterMapper: "num_nodes", "config"], "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": [ + "file_path", + "num_nodes", + "config"], + "nersc_forge_recon_multisegment_flow/nersc_forge_recon_multisegment_flow": [ "file_path", "num_nodes", "config"] @@ -76,6 +80,7 @@ def setup_decision_settings( nersc_recon: bool = False, nersc_recon_multinode: bool = False, nersc_forge_recon_segment: bool = False, + nersc_forge_recon_multisegment: bool = False, new_file_832: bool = True ) -> dict: """ @@ -98,6 +103,7 @@ def setup_decision_settings( f"nersc_recon={nersc_recon}, " f"nersc_recon_multinode={nersc_recon_multinode}, " f"nersc_forge_recon_segment={nersc_forge_recon_segment}, " + f"nersc_forge_recon_multisegment={nersc_forge_recon_multisegment}, " f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { @@ -107,6 +113,7 @@ def setup_decision_settings( "nersc_recon_flow/nersc_recon_flow": nersc_recon, "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": nersc_forge_recon_segment, + "nersc_forge_recon_multisegment_flow/nersc_forge_recon_multisegment_flow": nersc_forge_recon_multisegment, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -209,6 +216,13 @@ async def dispatcher( "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", available_params) tasks.append(run_recon_flow_async( "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", nersc_forge_recon_segment_params)) + + if decision_settings.get("nersc_forge_recon_multisegment_flow/nersc_forge_recon_multisegment_flow"): + nersc_forge_recon_multisegment_params = FlowParameterMapper.get_flow_parameters( + "nersc_forge_recon_multisegment_flow/nersc_forge_recon_multisegment_flow", available_params) + tasks.append(run_recon_flow_async( + "nersc_forge_recon_multisegment_flow/nersc_forge_recon_multisegment_flow", nersc_forge_recon_multisegment_params)) + # Run ALCF and NERSC flows in parallel, if any if tasks: try: @@ -236,6 +250,7 @@ async def dispatcher( nersc_recon=True, nersc_recon_multinode=False, nersc_forge_recon_segment=False, + nersc_forge_recon_multisegment=False, new_file_832=True ) # Run the main decision flow with the specified parameters