From 8effd584032752ec16f27303516e5c6ff5934568 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 22 Sep 2025 18:20:37 +0100 Subject: [PATCH 01/11] Moved 'TiffInfo' Pydantic model to above function where it's called --- src/murfey/server/api/clem.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 8e749d032..f4f2060e3 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -204,13 +204,6 @@ class LifInfo(BaseModel): child_stacks: list[Path] = [] -class TiffInfo(BaseModel): - tiff_file: Path - associated_metadata: Optional[Path] = None - associated_series: Optional[str] = None - associated_stack: Optional[Path] = None - - @router.post("/sessions/{session_id}/clem/lif_files") def register_lif_file( lif_file: LifInfo, @@ -314,6 +307,13 @@ def register_lif_file( return True +class TiffInfo(BaseModel): + tiff_file: Path + associated_metadata: Optional[Path] = None + associated_series: Optional[str] = None + associated_stack: Optional[Path] = None + + @router.post("/sessions/{session_id}/clem/tiff_files") def register_tiff_file( tiff_file: TiffInfo, From 6c4e9d21de5b2235c5db4995e6296aa4f3322cfa Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 22 Sep 2025 18:21:35 +0100 Subject: [PATCH 02/11] Removed use of 'quote' around CLEM files, since the file is no longer used in the URL path --- src/murfey/client/contexts/clem.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index c422337e9..fdda5cda0 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -6,7 +6,6 @@ import logging from pathlib import Path from typing import Dict, Generator, List, Optional -from urllib.parse import quote from xml.etree import ElementTree as ET from defusedxml.ElementTree import parse @@ -361,7 +360,7 @@ def register_lif_file( function_name="register_lif_file", token=self._token, session_id=environment.murfey_session, - data={"lif_file": quote(str(lif_file), safe="")}, + data={"lif_file": str(lif_file)}, ) return True except Exception as e: @@ -387,7 +386,7 @@ def process_lif_file( function_name="process_raw_lifs", token=self._token, session_id=environment.murfey_session, - data={"lif_file": quote(str(lif_file), safe="")}, + data={"lif_file": str(lif_file)}, ) return True except Exception as e: @@ -411,7 +410,7 @@ def register_tiff_file( function_name="register_tiff_file", token=self._token, session_id=environment.murfey_session, - data={"tiff_file": quote(str(tiff_file), safe="")}, + data={"tiff_file": str(tiff_file)}, ) return True except Exception as e: From cbf056e13ed0d227c63e7241452cb6ace30823e9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 23 Sep 2025 18:38:24 +0100 Subject: [PATCH 03/11] Updated CLEM preprocessing result registration workflow to parse new messsage formats --- .../clem/register_preprocessing_results.py | 183 ++++++++---------- 1 file changed, 78 insertions(+), 105 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 738d36bce..0d6348c74 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -10,10 +10,10 @@ import json import logging import traceback -from ast import literal_eval from pathlib import Path +from typing import Literal -from pydantic import BaseModel, field_validator +from pydantic import BaseModel from sqlmodel import Session, select from murfey.server import _transport_object @@ -35,12 +35,21 @@ class LIFPreprocessingResult(BaseModel): - image_stack: Path - metadata: Path series_name: str - channel: str number_of_members: int + is_stack: bool + is_montage: bool + output_files: dict[ + Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], Path + ] + metadata: Path parent_lif: Path + pixels_x: int + pixels_y: int + units: str + pixel_size: float + resolution: float + extent: list[float] def register_lif_preprocessing_result( @@ -84,13 +93,6 @@ def register_lif_preprocessing_result( try: # Register items in database if not already present try: - clem_img_stk: CLEMImageStack = get_db_entry( - db=murfey_db, - table=CLEMImageStack, - session_id=session_id, - file_path=result.image_stack, - ) - clem_img_series: CLEMImageSeries = get_db_entry( db=murfey_db, table=CLEMImageSeries, @@ -112,14 +114,24 @@ def register_lif_preprocessing_result( file_path=result.parent_lif, ) - # Link tables to one another and populate fields - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_lif = clem_lif_file - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = result.channel - murfey_db.add(clem_img_stk) - murfey_db.commit() + # Iterate through image stacks and start populating them first + for channel, output_file in result.output_files.items(): + clem_img_stk: CLEMImageStack = get_db_entry( + db=murfey_db, + table=CLEMImageStack, + session_id=session_id, + file_path=output_file, + ) + + # Link tables to one another and populate fields + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_lif = clem_lif_file + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = channel + murfey_db.add(clem_img_stk) + murfey_db.commit() + # Link other tables together clem_img_series.associated_metadata = clem_metadata clem_img_series.parent_lif = clem_lif_file clem_img_series.number_of_members = result.number_of_members @@ -132,30 +144,18 @@ def register_lif_preprocessing_result( logger.info( f"LIF preprocessing results registered for {result.series_name!r} " - f"{result.channel!r} image stack" ) except Exception: logger.error( "Exception encountered when registering LIF preprocessing result for " - f"{result.series_name!r} {result.channel!r} image stack: \n" + f"{result.series_name!r}: \n" f"{traceback.format_exc()}" ) return False - # Load all image stacks associated with current series from database + # Load instrument name try: - image_stacks = [ - Path(row) - for row in murfey_db.exec( - select(CLEMImageStack.file_path).where( - CLEMImageStack.series_id == clem_img_series.id - ) - ).all() - ] - logger.debug( - f"Found the following images: {[str(file) for file in image_stacks]}" - ) instrument_name = ( murfey_db.exec( select(MurfeySession).where(MurfeySession.id == session_id) @@ -170,20 +170,12 @@ def register_lif_preprocessing_result( ) return False - # Check if all image stacks for this series are accounted for - if not len(image_stacks) == clem_img_series.number_of_members: - logger.info( - f"Members of the series {result.series_name!r} are still missing; " - "the next stage of processing will not be triggered yet" - ) - return True - # Request for next stage of processing if all members are present cluster_response = submit_cluster_request( session_id=session_id, instrument_name=instrument_name, series_name=result.series_name, - images=image_stacks, + images=list(result.output_files.values()), metadata=result.metadata, crop_to_n_frames=processing_params.crop_to_n_frames, align_self=processing_params.align_self, @@ -208,26 +200,23 @@ def register_lif_preprocessing_result( class TIFFPreprocessingResult(BaseModel): - image_stack: Path - metadata: Path series_name: str - channel: str number_of_members: int - parent_tiffs: list[Path] - - @field_validator("parent_tiffs", mode="before") - @classmethod - def parse_stringified_list(cls, value): - if isinstance(value, str): - try: - eval_result = literal_eval(value) - if isinstance(eval_result, list): - parent_tiffs = [Path(p) for p in eval_result] - return parent_tiffs - except (SyntaxError, ValueError): - raise ValueError("Unable to parse input") - # Return value as-is; if it fails, it fails - return value + is_stack: bool + is_montage: bool + output_files: dict[ + Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], Path + ] + metadata: Path + parent_tiffs: dict[ + Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], list[Path] + ] + pixels_x: int + pixels_y: int + units: str + pixel_size: float + resolution: float + extent: list[float] def register_tiff_preprocessing_result( @@ -261,12 +250,6 @@ def register_tiff_preprocessing_result( try: # Register items in database if not already present try: - clem_img_stk: CLEMImageStack = get_db_entry( - db=murfey_db, - table=CLEMImageStack, - session_id=session_id, - file_path=result.image_stack, - ) clem_img_series: CLEMImageSeries = get_db_entry( db=murfey_db, table=CLEMImageSeries, @@ -279,27 +262,37 @@ def register_tiff_preprocessing_result( session_id=session_id, file_path=result.metadata, ) - - # Link tables to one another and populate fields - # Register TIFF files and populate them iteratively first - for file in result.parent_tiffs: - clem_tiff_file: CLEMTIFFFile = get_db_entry( + # Iteratively register the output image stacks + for channel, output_file in result.output_files.items(): + clem_img_stk: CLEMImageStack = get_db_entry( db=murfey_db, - table=CLEMTIFFFile, + table=CLEMImageStack, session_id=session_id, - file_path=file, + file_path=output_file, ) - clem_tiff_file.associated_metadata = clem_metadata - clem_tiff_file.child_series = clem_img_series - clem_tiff_file.child_stack = clem_img_stk - murfey_db.add(clem_tiff_file) + + # Link associated metadata + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = channel + murfey_db.add(clem_img_stk) murfey_db.commit() - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = result.channel - murfey_db.add(clem_img_stk) - murfey_db.commit() + # Register parent TIFF files iteratively for each channel + for file in result.parent_tiffs[channel]: + clem_tiff_file: CLEMTIFFFile = get_db_entry( + db=murfey_db, + table=CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + + # Link associated metadata + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + murfey_db.add(clem_tiff_file) + murfey_db.commit() clem_img_series.associated_metadata = clem_metadata clem_img_series.number_of_members = result.number_of_members @@ -308,30 +301,18 @@ def register_tiff_preprocessing_result( logger.info( f"TIFF preprocessing results registered for {result.series_name!r} " - f"{result.channel!r} image stack" ) except Exception: logger.error( "Exception encountered when registering TIFF preprocessing result for " - f"{result.series_name!r} {result.channel!r} image stack: \n" + f"{result.series_name!r}: \n" f"{traceback.format_exc()}" ) return False - # Load all image stacks associated with current series from database + # Load instrument name try: - image_stacks = [ - Path(row) - for row in murfey_db.exec( - select(CLEMImageStack.file_path).where( - CLEMImageStack.series_id == clem_img_series.id - ) - ).all() - ] - logger.debug( - f"Found the following images: {[str(file) for file in image_stacks]}" - ) instrument_name = ( murfey_db.exec( select(MurfeySession).where(MurfeySession.id == session_id) @@ -346,20 +327,12 @@ def register_tiff_preprocessing_result( ) return False - # Check if all image stacks for this series are accounted for - if not len(image_stacks) == clem_img_series.number_of_members: - logger.info( - f"Members of the series {result.series_name!r} are still missing; " - "the next stage of processing will not be triggered yet" - ) - return True - # Request for next stage of processing if all members are present cluster_response = submit_cluster_request( session_id=session_id, instrument_name=instrument_name, series_name=result.series_name, - images=image_stacks, + images=list(result.output_files.values()), metadata=result.metadata, crop_to_n_frames=processing_params.crop_to_n_frames, align_self=processing_params.align_self, From 9c1ace966e3aac1ca009ba12becaaf4430cea4e8 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 24 Sep 2025 17:49:55 +0100 Subject: [PATCH 04/11] Added logic to request for both bright field-fluorescent and fluorescent-only images to be created --- .../clem/register_preprocessing_results.py | 55 ++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 0d6348c74..fc497f053 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -327,27 +327,44 @@ def register_tiff_preprocessing_result( ) return False - # Request for next stage of processing if all members are present - cluster_response = submit_cluster_request( - session_id=session_id, - instrument_name=instrument_name, - series_name=result.series_name, - images=list(result.output_files.values()), - metadata=result.metadata, - crop_to_n_frames=processing_params.crop_to_n_frames, - align_self=processing_params.align_self, - flatten=processing_params.flatten, - align_across=processing_params.align_across, - messenger=_transport_object, - ) - if cluster_response is False: - logger.error( - "Error requesting align-and-merge processing job for " - f"{result.series_name!r} series" + # Construct list of files to use for image alignment and merging steps + image_combos_to_process = [ + list(result.output_files.values()) # Composite image of all channels + ] + # Create additional job for fluorescent-only composite image if fluorescent channels are present + if ("gray" in result.output_files.keys()) and len(result.output_files) > 1: + image_combos_to_process.append( + [ + file + for channel, file in result.output_files.items() + if channel != "gray" + ] ) - return False + + # Request for image alignment and processing for the requested combinations + for image_combo in image_combos_to_process: + try: + submit_cluster_request( + session_id=session_id, + instrument_name=instrument_name, + series_name=result.series_name, + images=image_combo, + metadata=result.metadata, + crop_to_n_frames=processing_params.crop_to_n_frames, + align_self=processing_params.align_self, + flatten=processing_params.flatten, + align_across=processing_params.align_across, + messenger=_transport_object, + ) + except Exception: + logger.error( + "Error requesting image alignment and merging job for " + f"{result.series_name!r} series", + exc_info=True, + ) + return False logger.info( - "Successfully requested align-and-merge processing job for " + "Successfully requested image alignment and merging job for " f"{result.series_name!r} series" ) return True From 6d23984ed9a652cb1dafd8d49e9042ec607a423e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 25 Sep 2025 17:09:17 +0100 Subject: [PATCH 05/11] Collapsed the LIF and TIFF preprocessing result registration functions into one single function that accepts messages from both workflows --- pyproject.toml | 3 +- .../clem/register_preprocessing_results.py | 231 ++++-------------- 2 files changed, 42 insertions(+), 192 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5dc664769..f656eca6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -104,8 +104,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey" "clem.process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" "clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" "clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result" -"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" -"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" +"clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run" "pato" = "murfey.workflows.notifications:notification_setup" "picked_particles" = "murfey.workflows.spa.picking:particles_picked" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index fc497f053..5e4dd3b82 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -11,7 +11,7 @@ import logging import traceback from pathlib import Path -from typing import Literal +from typing import Literal, Optional from pydantic import BaseModel from sqlmodel import Session, select @@ -34,7 +34,7 @@ logger = logging.getLogger("murfey.workflows.clem.register_preprocessing_results") -class LIFPreprocessingResult(BaseModel): +class CLEMPreprocessingResult(BaseModel): series_name: str number_of_members: int is_stack: bool @@ -43,7 +43,10 @@ class LIFPreprocessingResult(BaseModel): Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], Path ] metadata: Path - parent_lif: Path + parent_lif: Optional[Path] = None + parent_tiffs: dict[ + Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], list[Path] + ] = {} pixels_x: int pixels_y: int units: str @@ -52,39 +55,27 @@ class LIFPreprocessingResult(BaseModel): extent: list[float] -def register_lif_preprocessing_result( - message: dict, murfey_db: Session, demo: bool = False -) -> bool: - """ - session_id (recipe) - register (wrapper) - result (wrapper) - key1 - key2 - ... - """ +def run(message: dict, murfey_db: Session, demo: bool = False) -> bool: session_id: int = ( int(message["session_id"]) if not isinstance(message["session_id"], int) else message["session_id"] ) - - # Validate message and try and load results try: if isinstance(message["result"], str): json_obj: dict = json.loads(message["result"]) - result = LIFPreprocessingResult(**json_obj) + result = CLEMPreprocessingResult(**json_obj) elif isinstance(message["result"], dict): - result = LIFPreprocessingResult(**message["result"]) + result = CLEMPreprocessingResult(**message["result"]) else: logger.error( - f"Invalid type for LIF preprocessing result: {type(message['result'])}" + f"Invalid type for TIFF preprocessing result: {type(message['result'])}" ) return False except Exception: logger.error( - "Exception encountered when parsing LIF preprocessing result: \n" + "Exception encountered when parsing TIFF preprocessing result: \n" f"{traceback.format_exc()}" ) return False @@ -99,169 +90,29 @@ def register_lif_preprocessing_result( session_id=session_id, series_name=result.series_name, ) - clem_metadata: CLEMImageMetadata = get_db_entry( db=murfey_db, table=CLEMImageMetadata, session_id=session_id, file_path=result.metadata, ) - - clem_lif_file: CLEMLIFFile = get_db_entry( - db=murfey_db, - table=CLEMLIFFile, - session_id=session_id, - file_path=result.parent_lif, - ) - - # Iterate through image stacks and start populating them first - for channel, output_file in result.output_files.items(): - clem_img_stk: CLEMImageStack = get_db_entry( + # Register and link parent LIF file if present + if result.parent_lif is not None: + clem_lif_file: CLEMLIFFile = get_db_entry( db=murfey_db, - table=CLEMImageStack, + table=CLEMLIFFile, session_id=session_id, - file_path=output_file, + file_path=result.parent_lif, ) + clem_img_series.parent_lif = clem_lif_file + clem_metadata.parent_lif = clem_lif_file - # Link tables to one another and populate fields - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_lif = clem_lif_file - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = channel - murfey_db.add(clem_img_stk) - murfey_db.commit() - - # Link other tables together + # Link and commit series and metadata tables first clem_img_series.associated_metadata = clem_metadata - clem_img_series.parent_lif = clem_lif_file clem_img_series.number_of_members = result.number_of_members - murfey_db.add(clem_img_series) - murfey_db.commit() - - clem_metadata.parent_lif = clem_lif_file - murfey_db.add(clem_metadata) + murfey_db.add_all([clem_img_series, clem_metadata]) murfey_db.commit() - logger.info( - f"LIF preprocessing results registered for {result.series_name!r} " - ) - - except Exception: - logger.error( - "Exception encountered when registering LIF preprocessing result for " - f"{result.series_name!r}: \n" - f"{traceback.format_exc()}" - ) - return False - - # Load instrument name - try: - instrument_name = ( - murfey_db.exec( - select(MurfeySession).where(MurfeySession.id == session_id) - ) - .one() - .instrument_name - ) - except Exception: - logger.error( - f"Error requesting data from database for {result.series_name!r} series: \n" - f"{traceback.format_exc()}" - ) - return False - - # Request for next stage of processing if all members are present - cluster_response = submit_cluster_request( - session_id=session_id, - instrument_name=instrument_name, - series_name=result.series_name, - images=list(result.output_files.values()), - metadata=result.metadata, - crop_to_n_frames=processing_params.crop_to_n_frames, - align_self=processing_params.align_self, - flatten=processing_params.flatten, - align_across=processing_params.align_across, - messenger=_transport_object, - ) - if cluster_response is False: - logger.error( - "Error requesting align-and-merge processing job for " - f"{result.series_name!r} series" - ) - return False - logger.info( - "Successfully requested align-and-merge processing job for " - f"{result.series_name!r} series" - ) - return True - - finally: - murfey_db.close() - - -class TIFFPreprocessingResult(BaseModel): - series_name: str - number_of_members: int - is_stack: bool - is_montage: bool - output_files: dict[ - Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], Path - ] - metadata: Path - parent_tiffs: dict[ - Literal["gray", "red", "green", "blue", "cyan", "magenta", "yellow"], list[Path] - ] - pixels_x: int - pixels_y: int - units: str - pixel_size: float - resolution: float - extent: list[float] - - -def register_tiff_preprocessing_result( - message: dict, murfey_db: Session, demo: bool = False -) -> bool: - - session_id: int = ( - int(message["session_id"]) - if not isinstance(message["session_id"], int) - else message["session_id"] - ) - try: - if isinstance(message["result"], str): - json_obj: dict = json.loads(message["result"]) - result = TIFFPreprocessingResult(**json_obj) - elif isinstance(message["result"], dict): - result = TIFFPreprocessingResult(**message["result"]) - else: - logger.error( - f"Invalid type for TIFF preprocessing result: {type(message['result'])}" - ) - return False - except Exception: - logger.error( - "Exception encountered when parsing TIFF preprocessing result: \n" - f"{traceback.format_exc()}" - ) - return False - - # Outer try-finally block for tidying up database-related section of function - try: - # Register items in database if not already present - try: - clem_img_series: CLEMImageSeries = get_db_entry( - db=murfey_db, - table=CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, - ) - clem_metadata: CLEMImageMetadata = get_db_entry( - db=murfey_db, - table=CLEMImageMetadata, - session_id=session_id, - file_path=result.metadata, - ) # Iteratively register the output image stacks for channel, output_file in result.output_files.items(): clem_img_stk: CLEMImageStack = get_db_entry( @@ -275,37 +126,37 @@ def register_tiff_preprocessing_result( clem_img_stk.associated_metadata = clem_metadata clem_img_stk.parent_series = clem_img_series clem_img_stk.channel_name = channel + if result.parent_lif is not None: + clem_img_stk.parent_lif = clem_lif_file murfey_db.add(clem_img_stk) murfey_db.commit() - # Register parent TIFF files iteratively for each channel - for file in result.parent_tiffs[channel]: - clem_tiff_file: CLEMTIFFFile = get_db_entry( - db=murfey_db, - table=CLEMTIFFFile, - session_id=session_id, - file_path=file, - ) - - # Link associated metadata - clem_tiff_file.associated_metadata = clem_metadata - clem_tiff_file.child_series = clem_img_series - clem_tiff_file.child_stack = clem_img_stk - murfey_db.add(clem_tiff_file) + # Register and link parent TIFF files if present + if result.parent_tiffs: + tiff_files_to_register = [] + for file in result.parent_tiffs[channel]: + clem_tiff_file: CLEMTIFFFile = get_db_entry( + db=murfey_db, + table=CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + + # Link associated metadata + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + tiff_files_to_register.append(clem_tiff_file) + murfey_db.add_all(tiff_files_to_register) murfey_db.commit() - clem_img_series.associated_metadata = clem_metadata - clem_img_series.number_of_members = result.number_of_members - murfey_db.add(clem_img_series) - murfey_db.commit() - logger.info( - f"TIFF preprocessing results registered for {result.series_name!r} " + f"CLEM preprocessing results registered for {result.series_name!r} " ) except Exception: logger.error( - "Exception encountered when registering TIFF preprocessing result for " + "Exception encountered when registering CLEM preprocessing result for " f"{result.series_name!r}: \n" f"{traceback.format_exc()}" ) From ef7a976176765013bb89cd4629179eaeaf4f1e89 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 25 Sep 2025 17:14:33 +0100 Subject: [PATCH 06/11] Updated Murfey's analyser and context handler logic to support CLEM TIFF datasets of any combination of tiles, channels, or image stacks * Adjusted file parsing logic to look for keywords indicating if the dataset has multiple tiles ('--Stage'), frames ('--Z'), and/or colour channels ('--C') * Take number of tiles, colour channels, and frames into account when calculating the expected number of files to collect --- src/murfey/client/analyser.py | 7 +-- src/murfey/client/contexts/clem.py | 91 ++++++++++++------------------ 2 files changed, 38 insertions(+), 60 deletions(-) diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 86e5d384b..4ef16f859 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -129,10 +129,9 @@ def _find_context(self, file_path: Path) -> bool: self._context = CLEMContext("leica", self._basepath, self._token) return True # Look for TIFF files associated with CLEM workflow - # Leica's autosave mode seems to name the TIFFs in the format - # PostionXX--ZXX--CXX.tif - if all( - pattern in file_path.name for pattern in ("--Z", "--C") + # CLEM TIFF files will have "--Stage", "--Z", and/or "--C" in their file stem + if any( + pattern in file_path.stem for pattern in ("--Stage", "--Z", "--C") ) and file_path.suffix in (".tiff", ".tif"): self._context = CLEMContext("leica", self._basepath, self._token) return True diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index fdda5cda0..72f61277f 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -145,58 +145,40 @@ def post_transfer( # Process TIF/TIFF files if transferred_file.suffix in (".tif", ".tiff"): - - # Files should be named "PositionX--ZXX--CXX.tif" by default - # If Position is repeated, it will add an additional --00X to the end - if len(transferred_file.stem.split("--")) not in [3, 4]: + # CLEM TIFF files will have "--Stage", "--C", and/or "--Z" in their file stem + if not any( + pattern in transferred_file.stem + for pattern in ("--Stage", "--Z", "--C") + ): logger.warning( f"File {transferred_file.name!r} is likely not part of the CLEM workflow" ) return False - logger.debug( f"File {transferred_file.name!r} is part of a TIFF image series" ) # Create a unique name for the series - # For standard file name - if len(transferred_file.stem.split("--")) == 3: - series_name = "--".join( - [ - *destination_file.parent.parts[ - -2: - ], # Upper 2 parent directories - destination_file.stem.split("--")[0], - ] - ) - # When this a repeated position - elif len(transferred_file.stem.split("--")) == 4: - series_name = "--".join( - [ - *destination_file.parent.parts[ - -2: - ], # Upper 2 parent directories - "--".join( - destination_file.stem.split("--")[i] for i in [0, -1] - ), - ] - ) - else: - logger.error( - f"Series name could not be generated from file {transferred_file.name!r}" - ) - return False - logger.debug( - f"File {transferred_file.name!r} given the series name {series_name!r}" + series_name = "--".join( + [ + *destination_file.parent.parts[ + -2: + ], # Upper 2 parent directories + destination_file.stem.split("--")[0].replace(" ", "_"), + ] ) # Create key-value pairs containing empty list if not already present if series_name not in self._tiff_series.keys(): self._tiff_series[series_name] = [] + logger.debug( + f"Created new dictionary entry for TIFF series {series_name!r}" + ) + # Append information to list self._tiff_series[series_name].append(str(destination_file)) logger.debug( - f"Created TIFF file dictionary entries for {series_name!r}" + f"File {transferred_file.name!r} added to series {series_name!r}" ) # Register the TIFF file in the database @@ -230,32 +212,30 @@ def post_transfer( # XLIF files don't have the "--ZXX--CXX" additions in the file name # But they have "/Metadata/" as the immediate parent series_name = "--".join( - [*destination_file.parent.parent.parts[-2:], destination_file.stem] + [ + *destination_file.parent.parent.parts[-2:], + destination_file.stem.replace(" ", "_"), + ] ) # The previous 2 parent directories should be unique enough - logger.debug( - f"File {transferred_file.name!r} given the series name {series_name!r}" - ) # Extract metadata to get the expected size of the series metadata = parse(transferred_file).getroot() metadata = _get_image_elements(metadata)[0] # Get channel and dimension information - channels = metadata.findall( - "Data/Image/ImageDescription/Channels/ChannelDescription" - ) - dimensions = metadata.findall( - "Data/Image/ImageDescription/Dimensions/DimensionDescription" - ) + channels = metadata.findall(".//ChannelDescription") + dimensions = metadata.findall(".//DimensionDescription") # Calculate expected number of files for this series num_channels = len(channels) - num_frames = ( - int(dimensions[2].attrib["NumberOfElements"]) - if len(dimensions) > 2 - else 1 - ) - num_files = num_channels * num_frames + num_frames = 1 + num_tiles = 1 + for dim in dimensions: + if dim.attrib["DimID"] == "3": + num_frames = int(dim.attrib["NumberOfElements"]) + if dim.attrib["DimID"] == "10": + num_tiles = int(dim.attrib["NumberOfElements"]) + num_files = num_channels * num_frames * num_tiles logger.debug( f"Expected number of files in {series_name!r}: {num_files}" ) @@ -263,10 +243,9 @@ def post_transfer( # Update dictionary entries self._files_in_series[series_name] = num_files self._series_metadata[series_name] = str(destination_file) - logger.debug(f"Created dictionary entries for {series_name!r} metadata") - - # A new copy of the metadata file is created in 'processed', so no need - # to register this instance of it + logger.debug( + f"File {transferred_file.name!r} added to series {series_name!r}" + ) # Post message if all files for the associated series have been collected # .get(series_name, 0) returns 0 if no associated key is found @@ -280,7 +259,7 @@ def post_transfer( return True elif len( self._tiff_series.get(series_name, []) - ) == self._files_in_series.get(series_name, 0): + ) >= self._files_in_series.get(series_name, 0): logger.debug( f"Collected expected number of TIFF files for series {series_name!r}; posting job to server" ) From 073bc72026457237d250932c21e28aae00198e4c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 25 Sep 2025 19:04:40 +0100 Subject: [PATCH 07/11] Renamed target recipes for CLEM workflows --- src/murfey/workflows/clem/process_raw_lifs.py | 2 +- src/murfey/workflows/clem/process_raw_tiffs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/workflows/clem/process_raw_lifs.py b/src/murfey/workflows/clem/process_raw_lifs.py index 22f50c5c0..9d2dd4937 100644 --- a/src/murfey/workflows/clem/process_raw_lifs.py +++ b/src/murfey/workflows/clem/process_raw_lifs.py @@ -48,7 +48,7 @@ def zocalo_cluster_request( # Construct recipe and submit it for processing recipe = { - "recipes": ["clem-lif-to-stack"], + "recipes": ["clem-process-raw-lifs"], "parameters": { # Job parameters "lif_file": f"{str(file)}", diff --git a/src/murfey/workflows/clem/process_raw_tiffs.py b/src/murfey/workflows/clem/process_raw_tiffs.py index 2c0c1a5b3..e8f84a966 100644 --- a/src/murfey/workflows/clem/process_raw_tiffs.py +++ b/src/murfey/workflows/clem/process_raw_tiffs.py @@ -55,7 +55,7 @@ def zocalo_cluster_request( # Construct recipe and submit it for processing recipe = { - "recipes": ["clem-tiff-to-stack"], + "recipes": ["clem-process-raw-tiffs"], "parameters": { # Job parameters "tiff_list": "null", From 52b5a52ee4dc873d99386445c49ee5251ce82176 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 25 Sep 2025 22:00:22 +0100 Subject: [PATCH 08/11] Provide a seed TIFF file for the preprocessing workflow instead of the entire file list, as some datasets are too large to be submitted --- src/murfey/client/contexts/clem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 72f61277f..439e51b66 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -267,7 +267,7 @@ def post_transfer( # Post the message and log any errors that arise tiff_dataset = { "series_name": series_name, - "tiff_files": self._tiff_series[series_name], + "tiff_files": self._tiff_series[series_name][0:1], "series_metadata": self._series_metadata[series_name], } post_result = self.process_tiff_series(tiff_dataset, environment) From 6bbeec20021d0d0bda364a5e8e2d1218f93d42d5 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 26 Sep 2025 10:07:03 +0100 Subject: [PATCH 09/11] Fixed broken tests due to change in recipe name --- tests/workflows/clem/test_process_raw_lifs.py | 2 +- tests/workflows/clem/test_process_raw_tiffs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/clem/test_process_raw_lifs.py b/tests/workflows/clem/test_process_raw_lifs.py index d6d7bdea8..3399a80df 100644 --- a/tests/workflows/clem/test_process_raw_lifs.py +++ b/tests/workflows/clem/test_process_raw_lifs.py @@ -55,7 +55,7 @@ def test_zocalo_cluster_request( ] ) sent_recipe = { - "recipes": ["clem-lif-to-stack"], + "recipes": ["clem-process-raw-lifs"], "parameters": { # Job parameters "lif_file": f"{str(lif_file)}", diff --git a/tests/workflows/clem/test_process_raw_tiffs.py b/tests/workflows/clem/test_process_raw_tiffs.py index c2972f478..f784a154f 100644 --- a/tests/workflows/clem/test_process_raw_tiffs.py +++ b/tests/workflows/clem/test_process_raw_tiffs.py @@ -80,7 +80,7 @@ def test_zocalo_cluster_request( ] ) sent_recipe = { - "recipes": ["clem-tiff-to-stack"], + "recipes": ["clem-process-raw-tiffs"], "parameters": { # Job parameters "tiff_list": "null", From 0a8b28ca8e21640903980347637b65a4c761b05d Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 26 Sep 2025 10:42:55 +0100 Subject: [PATCH 10/11] Use reference file from dataset to generate list of files to register --- .../clem/register_preprocessing_results.py | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 5e4dd3b82..191512c1c 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -9,6 +9,7 @@ import json import logging +import re import traceback from pathlib import Path from typing import Literal, Optional @@ -114,7 +115,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool: murfey_db.commit() # Iteratively register the output image stacks - for channel, output_file in result.output_files.items(): + for c, (channel, output_file) in enumerate(result.output_files.items()): clem_img_stk: CLEMImageStack = get_db_entry( db=murfey_db, table=CLEMImageStack, @@ -133,8 +134,27 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool: # Register and link parent TIFF files if present if result.parent_tiffs: - tiff_files_to_register = [] - for file in result.parent_tiffs[channel]: + seed_file = result.parent_tiffs[channel][0] + if c == 0: + # Load list of files to register from seed file + series_identifier = seed_file.stem.split("--")[0] + "--" + tiff_list = list( + seed_file.parent.glob(f"{series_identifier}--") + ) + + # Load TIFF files by colour channel if "--C" in file stem + match = re.search(r"--C[\d]{2,3}", seed_file.stem) + tiff_file_subset = [ + file + for file in tiff_list + if file.stem.startswith(series_identifier) + and (match.group(0) in file.stem if match else True) + ] + tiff_file_subset.sort() + + # Register TIFF file subset + clem_tiff_files = [] + for file in tiff_file_subset: clem_tiff_file: CLEMTIFFFile = get_db_entry( db=murfey_db, table=CLEMTIFFFile, @@ -146,8 +166,10 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool: clem_tiff_file.associated_metadata = clem_metadata clem_tiff_file.child_series = clem_img_series clem_tiff_file.child_stack = clem_img_stk - tiff_files_to_register.append(clem_tiff_file) - murfey_db.add_all(tiff_files_to_register) + + clem_tiff_files.append(clem_tiff_file) + + murfey_db.add_all(clem_tiff_files) murfey_db.commit() logger.info( From 5b1563370b93883574967d431f23204001f2194e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 26 Sep 2025 10:43:54 +0100 Subject: [PATCH 11/11] Delete dictionary entries for series from analyser memory after successfully sending it off --- src/murfey/client/contexts/clem.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index 439e51b66..0c3aa36b2 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -274,6 +274,11 @@ def post_transfer( if post_result is False: return False logger.info(f"Started preprocessing of TIFF series {series_name!r}") + + # Clean up memory after posting + del self._tiff_series[series_name] + del self._series_metadata[series_name] + del self._files_in_series[series_name] else: logger.debug(f"TIFF series {series_name!r} is still being processed")