From bb7c2c45fc3913ecdcc1b6e7c2c13ac70192806e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 13:48:48 +0100 Subject: [PATCH 01/10] Rearranged MachineConfiguration keys according to purpose and grouped them under comment blocks --- src/murfey/util/config.py | 81 ++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 95f3544be..67d11ce7f 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -12,37 +12,53 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore - acquisition_software: List[str] - calibrations: Dict[str, Dict[str, Union[dict, float]]] - data_directories: List[Path] - rsync_basepath: Path - default_model: Path + """ + Keys that describe the type of workflow conducted on the client side, and how + Murfey will handle its data transfer and processing + """ + + # General info -------------------------------------------------------------------- display_name: str = "" instrument_name: str = "" image_path: Optional[Path] = None + machine_override: str = "" + + # Hardware and software ----------------------------------------------------------- + camera: str = "FALCON" + superres: bool = False + calibrations: Dict[str, Dict[str, Union[dict, float]]] + acquisition_software: List[str] software_versions: Dict[str, str] = {} - external_executables: Dict[str, str] = {} - external_executables_eer: Dict[str, str] = {} - external_environment: Dict[str, str] = {} - rsync_module: str = "" + software_settings_output_directories: Dict[str, List[str]] = {} + data_required_substrings: Dict[str, Dict[str, List[str]]] = {} + + # Client side directory setup ----------------------------------------------------- + data_directories: List[Path] create_directories: list[str] = ["atlas"] analyse_created_directories: List[str] = [] gain_reference_directory: Optional[Path] = None eer_fractionation_file_template: str = "" - processed_directory_name: str = "processed" - gain_directory_name: str = "processing" - node_creator_queue: str = "node_creator" - superres: bool = False - camera: str = "FALCON" - data_required_substrings: Dict[str, Dict[str, List[str]]] = {} - allow_removal: bool = False + + # Data transfer setup ------------------------------------------------------------- + # Rsync setup data_transfer_enabled: bool = True + rsync_url: str = "" + rsync_module: str = "" + rsync_basepath: Path + allow_removal: bool = False + + # Upstream data download setup + upstream_data_directories: List[Path] = [] # Previous sessions + upstream_data_download_directory: Optional[Path] = None # Set by microscope config + upstream_data_tiff_locations: List[str] = ["processed"] # Location of CLEM TIFFs + + # Data processing setup ----------------------------------------------------------- + # General processing setup processing_enabled: bool = True - machine_override: str = "" - processed_extra_directory: str = "" - plugin_packages: Dict[str, Path] = {} - software_settings_output_directories: Dict[str, List[str]] = {} process_by_default: bool = True + gain_directory_name: str = "processing" + processed_directory_name: str = "processed" + processed_extra_directory: str = "" recipes: Dict[str, str] = { "em-spa-bfactor": "em-spa-bfactor", "em-spa-class2d": "em-spa-class2d", @@ -53,22 +69,27 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore "em-tomo-align": "em-tomo-align", } - # Find and download upstream directories - upstream_data_directories: List[Path] = [] # Previous sessions - upstream_data_download_directory: Optional[Path] = None # Set by microscope config - upstream_data_tiff_locations: List[str] = ["processed"] # Location of CLEM TIFFs - + # Particle picking setup + default_model: Path model_search_directory: str = "processing" initial_model_search_directory: str = "processing/initial_model" - failure_queue: str = "" - instrument_server_url: str = "http://localhost:8001" - frontend_url: str = "http://localhost:3000" - murfey_url: str = "http://localhost:8000" - rsync_url: str = "" + # Data analysis plugins + external_executables: Dict[str, str] = {} + external_executables_eer: Dict[str, str] = {} + external_environment: Dict[str, str] = {} + plugin_packages: Dict[str, Path] = {} + # Server and network setup -------------------------------------------------------- + # Configurations and URLs security_configuration_path: Optional[Path] = None + murfey_url: str = "http://localhost:8000" + frontend_url: str = "http://localhost:3000" + instrument_server_url: str = "http://localhost:8001" + # Messaging queues + failure_queue: str = "" + node_creator_queue: str = "node_creator" notifications_queue: str = "pato_notification" From 93cdec05eb3170d55c2fd6fa87847a9d10c426cf Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 13:49:56 +0100 Subject: [PATCH 02/10] Switched to using built-in types instead of 'typing' module ones --- src/murfey/util/config.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 67d11ce7f..fb4c9a554 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -4,7 +4,7 @@ import socket from functools import lru_cache from pathlib import Path -from typing import Dict, List, Literal, Optional, Union +from typing import Literal, Optional, Union import yaml from backports.entry_points_selectable import entry_points @@ -26,16 +26,16 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore # Hardware and software ----------------------------------------------------------- camera: str = "FALCON" superres: bool = False - calibrations: Dict[str, Dict[str, Union[dict, float]]] - acquisition_software: List[str] - software_versions: Dict[str, str] = {} - software_settings_output_directories: Dict[str, List[str]] = {} - data_required_substrings: Dict[str, Dict[str, List[str]]] = {} + calibrations: dict[str, dict[str, Union[dict, float]]] + acquisition_software: list[str] + software_versions: dict[str, str] = {} + software_settings_output_directories: dict[str, list[str]] = {} + data_required_substrings: dict[str, dict[str, list[str]]] = {} # Client side directory setup ----------------------------------------------------- - data_directories: List[Path] + data_directories: list[Path] create_directories: list[str] = ["atlas"] - analyse_created_directories: List[str] = [] + analyse_created_directories: list[str] = [] gain_reference_directory: Optional[Path] = None eer_fractionation_file_template: str = "" @@ -48,9 +48,9 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore allow_removal: bool = False # Upstream data download setup - upstream_data_directories: List[Path] = [] # Previous sessions + upstream_data_directories: list[Path] = [] # Previous sessions upstream_data_download_directory: Optional[Path] = None # Set by microscope config - upstream_data_tiff_locations: List[str] = ["processed"] # Location of CLEM TIFFs + upstream_data_tiff_locations: list[str] = ["processed"] # Location of CLEM TIFFs # Data processing setup ----------------------------------------------------------- # General processing setup @@ -59,7 +59,7 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore gain_directory_name: str = "processing" processed_directory_name: str = "processed" processed_extra_directory: str = "" - recipes: Dict[str, str] = { + recipes: dict[str, str] = { "em-spa-bfactor": "em-spa-bfactor", "em-spa-class2d": "em-spa-class2d", "em-spa-class3d": "em-spa-class3d", @@ -75,10 +75,10 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore initial_model_search_directory: str = "processing/initial_model" # Data analysis plugins - external_executables: Dict[str, str] = {} - external_executables_eer: Dict[str, str] = {} - external_environment: Dict[str, str] = {} - plugin_packages: Dict[str, Path] = {} + external_executables: dict[str, str] = {} + external_executables_eer: dict[str, str] = {} + external_environment: dict[str, str] = {} + plugin_packages: dict[str, Path] = {} # Server and network setup -------------------------------------------------------- # Configurations and URLs @@ -93,7 +93,7 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore notifications_queue: str = "pato_notification" -def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]: +def from_file(config_file_path: Path, instrument: str = "") -> dict[str, MachineConfig]: with open(config_file_path, "r") as config_stream: config = yaml.safe_load(config_stream) return { @@ -110,7 +110,7 @@ class Security(BaseModel): auth_algorithm: str = "" auth_url: str = "" sqlalchemy_pooling: bool = True - allow_origins: List[str] = ["*"] + allow_origins: list[str] = ["*"] session_validation: str = "" session_token_timeout: Optional[int] = None auth_type: Literal["password", "cookie"] = "password" @@ -179,7 +179,7 @@ def get_security_config() -> Security: @lru_cache(maxsize=1) -def get_machine_config(instrument_name: str = "") -> Dict[str, MachineConfig]: +def get_machine_config(instrument_name: str = "") -> dict[str, MachineConfig]: machine_config = { "": MachineConfig( acquisition_software=[], From b0b437882d2434ad42b0c56183795b9239fb939b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 13:57:32 +0100 Subject: [PATCH 03/10] Added a 'Config' inner class to 'MachineConfig' to control JSON serialisation behaviour; moved 'Extra.allow' into this inner class --- src/murfey/util/config.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index fb4c9a554..47e75f77e 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -11,7 +11,7 @@ from pydantic import BaseModel, BaseSettings, Extra, validator -class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore +class MachineConfig(BaseModel): # type: ignore """ Keys that describe the type of workflow conducted on the client side, and how Murfey will handle its data transfer and processing @@ -92,6 +92,16 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore node_creator_queue: str = "node_creator" notifications_queue: str = "pato_notification" + class Config: + """ + Inner class that defines this model's parsing and serialising behaviour + """ + + extra = Extra.allow + json_encoders = { + Path: str, + } + def from_file(config_file_path: Path, instrument: str = "") -> dict[str, MachineConfig]: with open(config_file_path, "r") as config_stream: From 16acf3a15d1744da9ec8ade32566fe6ab99c5f7e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 14:01:57 +0100 Subject: [PATCH 04/10] Tidied up keys in 'Security' BaseModel --- src/murfey/util/config.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 47e75f77e..ff1a9936b 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -114,22 +114,31 @@ def from_file(config_file_path: Path, instrument: str = "") -> dict[str, Machine class Security(BaseModel): + # Murfey database settings murfey_db_credentials: Path crypto_key: str - auth_key: str = "" + sqlalchemy_pooling: bool = True + + # ISPyB settings + ispyb_credentials: Optional[Path] = None + + # Murfey server connection settings auth_algorithm: str = "" + auth_key: str = "" + auth_type: Literal["password", "cookie"] = "password" auth_url: str = "" - sqlalchemy_pooling: bool = True - allow_origins: list[str] = ["*"] + cookie_key: str = "" session_validation: str = "" session_token_timeout: Optional[int] = None - auth_type: Literal["password", "cookie"] = "password" - cookie_key: str = "" + allow_origins: list[str] = ["*"] + + # RabbitMQ settings rabbitmq_credentials: Path feedback_queue: str = "murfey_feedback" + + # Graylog settings graylog_host: str = "" graylog_port: Optional[int] = None - ispyb_credentials: Optional[Path] = None @validator("graylog_port") def check_port_present_if_host_is( From 5d7d2febaa8cda6a626545e36ebfd34de56db1c2 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 14:09:06 +0100 Subject: [PATCH 05/10] Added 'Config' inner class with JSON serialisation settings to 'Security' model --- src/murfey/util/config.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index ff1a9936b..31bb380fb 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -140,6 +140,11 @@ class Security(BaseModel): graylog_host: str = "" graylog_port: Optional[int] = None + class Config: + json_encoders = { + Path: str, + } + @validator("graylog_port") def check_port_present_if_host_is( cls, v: Optional[int], values: dict, **kwargs From 7b7e12b715489cd6c550ef5c85c8c2d9d197ceba Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 18:18:57 +0100 Subject: [PATCH 06/10] Don't pass machine config through the 'MultigridWatcherSpec' model between backend and instrument servers; get machine config via request instead --- src/murfey/instrument_server/api.py | 17 +++++++++++------ src/murfey/server/api/instrument.py | 11 +---------- src/murfey/util/instrument_models.py | 3 --- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index e45d46a7c..2e7bbc228 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -141,8 +141,16 @@ def check_token(session_id: MurfeySessionID): def setup_multigrid_watcher( session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec ): + # Return True if controllers are already set up if controllers.get(session_id) is not None: return {"success": True} + + # Load machine config as dictionary + machine_config: dict[str, Any] = requests.get( + f"{_get_murfey_url()}/instruments/{sanitise_nonpath(watcher_spec.instrument_name)}/machine", + headers={"Authorization": f"Bearer {tokens[session_id]}"}, + ).json() + label = watcher_spec.label for sid, controller in controllers.items(): if controller.dormant: @@ -156,22 +164,19 @@ def setup_multigrid_watcher( demo=True, do_transfer=True, processing_enabled=not watcher_spec.skip_existing_processing, - _machine_config=watcher_spec.configuration.dict(), + _machine_config=machine_config, token=tokens.get(session_id, "token"), data_collection_parameters=data_collection_parameters.get(label, {}), rsync_restarts=watcher_spec.rsync_restarts, visit_end_time=watcher_spec.visit_end_time, ) watcher_spec.source.mkdir(exist_ok=True) - machine_config = requests.get( - f"{_get_murfey_url()}/instruments/{sanitise_nonpath(watcher_spec.instrument_name)}/machine", - headers={"Authorization": f"Bearer {tokens[session_id]}"}, - ).json() + for d in machine_config.get("create_directories", []): (watcher_spec.source / d).mkdir(exist_ok=True) watchers[session_id] = MultigridDirWatcher( watcher_spec.source, - watcher_spec.configuration.dict(), + machine_config, skip_existing_processing=watcher_spec.skip_existing_processing, ) watchers[session_id].subscribe( diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 3f60e35d9..9a0d6c54d 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -106,22 +106,13 @@ async def setup_multigrid_watcher( if machine_config.instrument_server_url: session = db.exec(select(Session).where(Session.id == session_id)).one() visit = session.visit - _config = { - "acquisition_software": machine_config.acquisition_software, - "calibrations": machine_config.calibrations, - "data_directories": [str(k) for k in machine_config.data_directories], - "create_directories": [str(k) for k in machine_config.create_directories], - "rsync_basepath": str(machine_config.rsync_basepath), - "visit": visit, - "default_model": str(machine_config.default_model), - } + async with aiohttp.ClientSession() as clientsession: async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/multigrid_watcher", json={ "source": str(secure_path(watcher_spec.source / visit)), "visit": visit, - "configuration": _config, "label": visit, "instrument_name": instrument_name, "skip_existing_processing": watcher_spec.skip_existing_processing, diff --git a/src/murfey/util/instrument_models.py b/src/murfey/util/instrument_models.py index ee45b5468..4b2b1ff90 100644 --- a/src/murfey/util/instrument_models.py +++ b/src/murfey/util/instrument_models.py @@ -4,12 +4,9 @@ from pydantic import BaseModel -from murfey.util.config import MachineConfig - class MultigridWatcherSpec(BaseModel): source: Path - configuration: MachineConfig label: str visit: str instrument_name: str From f8b09b19bdf93a1dff26a65fadaf6ef5201d5818 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 21 May 2025 13:20:15 +0100 Subject: [PATCH 07/10] Updated logic for creating destination file path names for CLEM context --- src/murfey/client/contexts/clem.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 37d03f789..3238e0482 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -4,7 +4,6 @@ """ import logging -from datetime import datetime from pathlib import Path from typing import Dict, Generator, List, Optional from urllib.parse import quote @@ -31,13 +30,16 @@ def _file_transferred_to( instrument_name=environment.instrument_name, demo=environment.demo, ) - # rsync basepath and modules are set in the microscope's configuration YAML file - return ( - Path(machine_config.get("rsync_basepath", "")) - / str(datetime.now().year) - / source.name - / file_path.relative_to(source) + + # Construct destination path + base_destination = Path(machine_config.get("rsync_basepath", "")) / Path( + environment.default_destinations[source] ) + # Add the visit number to the path if it's not present in 'source' + if environment.visit not in environment.default_destinations[source]: + base_destination = base_destination / environment.visit + destination = base_destination / file_path.relative_to(source) + return destination def _get_source( From 3de74cdd39bd23d1ebb35f8621bba77aa6a8cebb Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 21 May 2025 13:23:24 +0100 Subject: [PATCH 08/10] Added INFO log to note down triggering of TIFF preprocessing --- src/murfey/client/contexts/clem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 3238e0482..1b8ba77e2 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -293,7 +293,7 @@ def post_transfer( post_result = self.process_tiff_series(tiff_dataset, environment) if post_result is False: return False - + logger.info(f"Started preprocessing of TIFF series {series_name}") else: logger.debug(f"TIFF series {series_name!r} is still being processed") From f57a762e577f0a16575a21a573b0909fd08e332a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 21 May 2025 13:25:15 +0100 Subject: [PATCH 09/10] Added debug log to keep track of message contents being sent to 'cryoem-services' for subsequent processing --- src/murfey/workflows/clem/process_raw_lifs.py | 41 ++++++++++-------- .../workflows/clem/process_raw_tiffs.py | 42 ++++++++++++------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/murfey/workflows/clem/process_raw_lifs.py b/src/murfey/workflows/clem/process_raw_lifs.py index 1d56bff68..22f50c5c0 100644 --- a/src/murfey/workflows/clem/process_raw_lifs.py +++ b/src/murfey/workflows/clem/process_raw_lifs.py @@ -3,6 +3,7 @@ The recipe referred to here is stored on GitLab. """ +from logging import getLogger from pathlib import Path from typing import Optional @@ -11,6 +12,8 @@ except AttributeError: pass # Ignore if ISPyB credentials environment variable not set +logger = getLogger("murfey.workflows.clem.process_raw_lifs") + def zocalo_cluster_request( file: Path, @@ -43,24 +46,28 @@ def zocalo_cluster_request( # Load machine config to get the feedback queue feedback_queue: str = messenger.feedback_queue - # Send the message - # The keys under "parameters" will populate all the matching fields in {} - # in the processing recipe - messenger.send( - "processing_recipe", - { - "recipes": ["clem-lif-to-stack"], - "parameters": { - # Job parameters - "lif_file": f"{str(file)}", - "root_folder": root_folder, - # Other recipe parameters - "session_dir": f"{str(session_dir)}", - "session_id": session_id, - "job_name": job_name, - "feedback_queue": feedback_queue, - }, + # Construct recipe and submit it for processing + recipe = { + "recipes": ["clem-lif-to-stack"], + "parameters": { + # Job parameters + "lif_file": f"{str(file)}", + "root_folder": root_folder, + # Other recipe parameters + "session_dir": f"{str(session_dir)}", + "session_id": session_id, + "job_name": job_name, + "feedback_queue": feedback_queue, }, + } + logger.debug( + f"Submitting LIF processing request to {messenger.feedback_queue!r} " + "with the following recipe: \n" + f"{recipe}" + ) + messenger.send( + queue="processing_recipe", + message=recipe, new_connection=True, ) else: diff --git a/src/murfey/workflows/clem/process_raw_tiffs.py b/src/murfey/workflows/clem/process_raw_tiffs.py index 52c371092..2c0c1a5b3 100644 --- a/src/murfey/workflows/clem/process_raw_tiffs.py +++ b/src/murfey/workflows/clem/process_raw_tiffs.py @@ -3,9 +3,12 @@ The recipe referred to here is stored on GitLab. """ +from logging import getLogger from pathlib import Path from typing import Optional +logger = getLogger("murfey.workflows.clem.process_raw_tiffs") + try: from murfey.server.ispyb import TransportManager # Session except AttributeError: @@ -50,23 +53,30 @@ def zocalo_cluster_request( # Load machine config to get the feedback queue feedback_queue: str = messenger.feedback_queue - messenger.send( - "processing_recipe", - { - "recipes": ["clem-tiff-to-stack"], - "parameters": { - # Job parameters - "tiff_list": "null", - "tiff_file": f"{str(tiff_list[0])}", - "root_folder": root_folder, - "metadata": f"{str(metadata)}", - # Other recipe parameters - "session_dir": f"{str(session_dir)}", - "session_id": session_id, - "job_name": job_name, - "feedback_queue": feedback_queue, - }, + # Construct recipe and submit it for processing + recipe = { + "recipes": ["clem-tiff-to-stack"], + "parameters": { + # Job parameters + "tiff_list": "null", + "tiff_file": f"{str(tiff_list[0])}", + "root_folder": root_folder, + "metadata": f"{str(metadata)}", + # Other recipe parameters + "session_dir": f"{str(session_dir)}", + "session_id": session_id, + "job_name": job_name, + "feedback_queue": feedback_queue, }, + } + logger.debug( + f"Submitting TIFF processing request to {messenger.feedback_queue!r} " + "with the following recipe: \n" + f"{recipe}" + ) + messenger.send( + queue="processing_recipe", + message=recipe, new_connection=True, ) else: From 004e6f3b7ac60a90680615e0d4e40028919b6a0c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 21 May 2025 13:29:55 +0100 Subject: [PATCH 10/10] Fixed broken tests due to changing from passing args to kwargs --- tests/workflows/clem/test_process_raw_lifs.py | 4 ++-- tests/workflows/clem/test_process_raw_tiffs.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/workflows/clem/test_process_raw_lifs.py b/tests/workflows/clem/test_process_raw_lifs.py index 857b0979c..d6d7bdea8 100644 --- a/tests/workflows/clem/test_process_raw_lifs.py +++ b/tests/workflows/clem/test_process_raw_lifs.py @@ -70,7 +70,7 @@ def test_zocalo_cluster_request( # Check that it sends the expected recipe mock_transport.send.assert_called_once_with( - "processing_recipe", - sent_recipe, + queue="processing_recipe", + message=sent_recipe, new_connection=True, ) diff --git a/tests/workflows/clem/test_process_raw_tiffs.py b/tests/workflows/clem/test_process_raw_tiffs.py index 885aa69a6..c2972f478 100644 --- a/tests/workflows/clem/test_process_raw_tiffs.py +++ b/tests/workflows/clem/test_process_raw_tiffs.py @@ -97,7 +97,7 @@ def test_zocalo_cluster_request( # Check that it sends the expected recipe mock_transport.send.assert_called_once_with( - "processing_recipe", - sent_recipe, + queue="processing_recipe", + message=sent_recipe, new_connection=True, )