From 17aca936b5ec3034841663ed6c44a21826d8979a Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Fri, 9 May 2025 13:29:07 +0100 Subject: [PATCH 01/10] Add process column to Session table to record if processing is requested or not --- src/murfey/server/api/__init__.py | 11 +++++++++++ src/murfey/server/api/processing_parameters.py | 16 ++++++++++------ src/murfey/util/db.py | 1 + 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index f63ca27aa..0c41d108e 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1973,6 +1973,17 @@ def create_session(instrument_name: str, visit: str, name: str, db=murfey_db) -> return sid +@router.post("/sessions/{session_id}") +def update_session( + session_id: MurfeySessionID, process: bool = True, db=murfey_db +) -> None: + session = db.exec(select(Session).where(session_id == session_id)).one() + session.process = process + db.add(session) + db.commit() + return None + + @router.put("/sessions/{session_id}/current_gain_ref") def update_current_gain_ref( session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db=murfey_db diff --git a/src/murfey/server/api/processing_parameters.py b/src/murfey/server/api/processing_parameters.py index fa359c322..bf506d30f 100644 --- a/src/murfey/server/api/processing_parameters.py +++ b/src/murfey/server/api/processing_parameters.py @@ -1,6 +1,7 @@ from logging import getLogger from typing import Optional +import sqlalchemy from fastapi import APIRouter, Depends from pydantic import BaseModel from sqlmodel import Session, select @@ -24,12 +25,15 @@ class EditableSessionProcessingParameters(BaseModel): @router.get("/sessions/{session_id}/session_processing_parameters") def get_session_processing_parameters( session_id: MurfeySessionID, db: Session = murfey_db -) -> EditableSessionProcessingParameters: - proc_params = db.exec( - select(SessionProcessingParameters).where( - SessionProcessingParameters.session_id == session_id - ) - ).one() +) -> Optional[EditableSessionProcessingParameters]: + try: + proc_params = db.exec( + select(SessionProcessingParameters).where( + SessionProcessingParameters.session_id == session_id + ) + ).one() + except sqlalchemy.exc.NoResultFound: + return None return EditableSessionProcessingParameters( gain_ref=proc_params.gain_ref, dose_per_frame=proc_params.dose_per_frame, diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index c5c65b456..a83067ab5 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -48,6 +48,7 @@ class Session(SQLModel, table=True): # type: ignore started: bool = Field(default=False) current_gain_ref: str = Field(default="") instrument_name: str = Field(default="") + process: bool = Field(default=True) # CLEM Workflow From dc652ba05bd6f993f595febbc523572f245a479f Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Fri, 9 May 2025 16:36:07 +0100 Subject: [PATCH 02/10] Allow analysis to be turned off via a query parameter --- src/murfey/instrument_server/api.py | 6 +++++- src/murfey/server/api/instrument.py | 6 ++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 7c5ba7c59..96db2c80c 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -186,9 +186,13 @@ def setup_multigrid_watcher( @router.post("/sessions/{session_id}/start_multigrid_watcher") -def start_multigrid_watcher(session_id: MurfeySessionID): +def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True): if watchers.get(session_id) is None: return {"success": False} + if not process: + watchers[session_id]._listeners = [ + partial(watchers[session_id]._listeners[0], analyse=False) + ] watchers[session_id].start() return {"success": True} diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 33744ccf2..f82825ce4 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -140,7 +140,9 @@ async def setup_multigrid_watcher( @router.post("/sessions/{session_id}/start_multigrid_watcher") -async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db): +async def start_multigrid_watcher( + session_id: MurfeySessionID, process: bool = True, db=murfey_db +): data = {} instrument_name = ( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name @@ -151,7 +153,7 @@ async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db): if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: async with clientsession.post( - f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher", + f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher?process={'true' if process else 'false'}", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, From 8f3860731dc4c0b1b97368f94c01aafd20d216e6 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 12 May 2025 10:03:06 +0100 Subject: [PATCH 03/10] The server can work out from the session whether there should be processing or not --- src/murfey/server/api/instrument.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index f82825ce4..33111989d 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -140,13 +140,11 @@ async def setup_multigrid_watcher( @router.post("/sessions/{session_id}/start_multigrid_watcher") -async def start_multigrid_watcher( - session_id: MurfeySessionID, process: bool = True, db=murfey_db -): +async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db): data = {} - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + session = db.exec(select(Session).where(Session.id == session_id)).one() + process = session.process + instrument_name = session.instrument_name machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] From dfd9a2f393ad00aef73ec0448f1611f612d5d4a0 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 13 May 2025 13:34:42 +0100 Subject: [PATCH 04/10] Corrected comparison of 'session_id' to 'id' column in Session table --- src/murfey/server/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 0c41d108e..462f4dcf1 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1977,7 +1977,7 @@ def create_session(instrument_name: str, visit: str, name: str, db=murfey_db) -> def update_session( session_id: MurfeySessionID, process: bool = True, db=murfey_db ) -> None: - session = db.exec(select(Session).where(session_id == session_id)).one() + session = db.exec(select(Session).where(Session.id == session_id)).one() session.process = process db.add(session) db.commit() From b6499bd08c76e9a597a1f8c66c4f3ab941fd817f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 14 May 2025 09:50:24 +0100 Subject: [PATCH 05/10] Added '_analyse' attribute to MultigridDirWatcher to control when to process data; adjusts logic when notifying listeners to take 'self_analyse' into account --- src/murfey/client/watchdir_multigrid.py | 38 ++++++++++++++++--------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/murfey/client/watchdir_multigrid.py b/src/murfey/client/watchdir_multigrid.py index 46a9a2a11..1d29b0806 100644 --- a/src/murfey/client/watchdir_multigrid.py +++ b/src/murfey/client/watchdir_multigrid.py @@ -21,15 +21,17 @@ def __init__( ): super().__init__() self._basepath = Path(path) - self._skip_existing_processing = skip_existing_processing - self._seen_dirs: List[Path] = [] - self._stopping = False self._machine_config = machine_config + self._seen_dirs: List[Path] = [] self.thread = threading.Thread( name=f"MultigridDirWatcher {self._basepath}", target=self._process, daemon=True, ) + # Toggleable settings + self._analyse = True + self._skip_existing_processing = skip_existing_processing + self._stopping = False def start(self): if self.thread.is_alive(): @@ -60,8 +62,14 @@ def _process(self): include_mid_path=False, use_suggested_path=False, analyse=( - d.name - in self._machine_config["analyse_created_directories"] + ( + d.name + in self._machine_config[ + "analyse_created_directories" + ] + ) + if self._analyse + else False ), tag="atlas", ) @@ -72,7 +80,7 @@ def _process(self): d, extra_directory=f"metadata_{d.name}", include_mid_path=False, - analyse=True, # not (first_loop and self._skip_existing_processing), + analyse=self._analyse, limited=True, tag="metadata", ) @@ -80,15 +88,17 @@ def _process(self): processing_started = False for d02 in (d.parent.parent / d.name).glob("Images-Disc*"): if d02 not in self._seen_dirs: - # if skip exisiting processing is set then do not process for any - # data directories found on the first loop - # this allows you to avoid triggering processing again if murfey is restarted + # If 'skip_existing_processing' is set, do not process for + # any data directories found on the first loop. + # This allows you to avoid triggering processing again if Murfey is restarted self.notify( d02, include_mid_path=False, remove_files=True, - analyse=not ( - first_loop and self._skip_existing_processing + analyse=( + not (first_loop and self._skip_existing_processing) + if self._analyse + else False ), tag="fractions", ) @@ -104,8 +114,10 @@ def _process(self): self.notify( d02, include_mid_path=False, - analyse=not ( - first_loop and self._skip_existing_processing + analyse=( + not (first_loop and self._skip_existing_processing) + if self._analyse + else False ), tag="fractions", ) From 28f0e2294fee14ac1ffb9bda589218597e866ca8 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 14 May 2025 10:15:03 +0100 Subject: [PATCH 06/10] Added log to keep track of whether analysis is enabled/disabled --- src/murfey/client/multigrid_control.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a9fc3309c..c1bb42a3b 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -150,7 +150,8 @@ def _start_rsyncer_multigrid( tag: str = "", limited: bool = False, ): - log.info(f"starting multigrid rsyncer: {source}") + log.info(f"Starting multigrid rsyncer: {source}") + log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}") destination_overrides = destination_overrides or {} machine_data = requests.get( f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine" From 67cc462def5b7ae2ebfdc9688f66efb3c9ed8612 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 14 May 2025 10:19:49 +0100 Subject: [PATCH 07/10] Use new '_analyse' attribute in MultigridDirWatcher class to enable/disable data processing --- src/murfey/instrument_server/api.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 96db2c80c..9086b32e6 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -7,7 +7,7 @@ from functools import partial from logging import getLogger from pathlib import Path -from typing import Annotated, Any, Dict, List, Optional, Union +from typing import Annotated, Any, Dict, List, Optional from urllib.parse import urlparse import requests @@ -27,7 +27,7 @@ logger = getLogger("murfey.instrument_server.api") -watchers: Dict[Union[str, int], MultigridDirWatcher] = {} +watchers: Dict[str | int, MultigridDirWatcher] = {} rsyncers: Dict[str, RSyncer] = {} controllers: Dict[int, MultigridController] = {} data_collection_parameters: dict = {} @@ -190,9 +190,7 @@ def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True): if watchers.get(session_id) is None: return {"success": False} if not process: - watchers[session_id]._listeners = [ - partial(watchers[session_id]._listeners[0], analyse=False) - ] + watchers[session_id]._analyse = False watchers[session_id].start() return {"success": True} From ad19a311c06fd39e8ce0595cfa835dd7cd66a33a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 14 May 2025 10:21:27 +0100 Subject: [PATCH 08/10] Used built-in 'dict' and 'list' for type hinting instead of 'typing.Dict' and 'typing.List' --- src/murfey/instrument_server/api.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 9086b32e6..9cac1ff3a 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -7,7 +7,7 @@ from functools import partial from logging import getLogger from pathlib import Path -from typing import Annotated, Any, Dict, List, Optional +from typing import Annotated, Any, Optional from urllib.parse import urlparse import requests @@ -27,9 +27,9 @@ logger = getLogger("murfey.instrument_server.api") -watchers: Dict[str | int, MultigridDirWatcher] = {} -rsyncers: Dict[str, RSyncer] = {} -controllers: Dict[int, MultigridController] = {} +watchers: dict[str | int, MultigridDirWatcher] = {} +rsyncers: dict[str, RSyncer] = {} +controllers: dict[int, MultigridController] = {} data_collection_parameters: dict = {} tokens = {} @@ -321,7 +321,7 @@ def register_processing_parameters( ) def get_possible_gain_references( instrument_name: str, session_id: MurfeySessionID -) -> List[File]: +) -> list[File]: machine_config = requests.get( f"{_get_murfey_url()}/instruments/{sanitise_nonpath(instrument_name)}/machine", headers={"Authorization": f"Bearer {tokens[session_id]}"}, From 641381aaae90769cbe7d1354dc00fceef778bbb4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 14 May 2025 10:36:50 +0100 Subject: [PATCH 09/10] Added debug logs to keep track of request to start multigrid watcher --- src/murfey/server/api/instrument.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 33111989d..7a9cdefeb 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -149,6 +149,10 @@ async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db): instrument_name ] if machine_config.instrument_server_url: + log.debug( + f"Submitting request to start multigrid watcher for session {session_id} " + f"with processing {('enabled' if process else 'disabled')}" + ) async with aiohttp.ClientSession() as clientsession: async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher?process={'true' if process else 'false'}", @@ -157,6 +161,7 @@ async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db): }, ) as resp: data = await resp.json() + log.debug(f"Received response: {data}") return data From 07a25e5fbcb75bf92f1098994037376ae4cb7f36 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 14 May 2025 10:38:58 +0100 Subject: [PATCH 10/10] Log verbose error when 'increment_rsync_file_count()' fails --- src/murfey/server/api/__init__.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 713d594ce..000135934 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -367,13 +367,22 @@ async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClien def increment_rsync_file_count( visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db ): - rsync_instance = db.exec( - select(RsyncInstance).where( - RsyncInstance.source == rsyncer_info.source, - RsyncInstance.destination == rsyncer_info.destination, - RsyncInstance.session_id == rsyncer_info.session_id, + try: + rsync_instance = db.exec( + select(RsyncInstance).where( + RsyncInstance.source == rsyncer_info.source, + RsyncInstance.destination == rsyncer_info.destination, + RsyncInstance.session_id == rsyncer_info.session_id, + ) + ).one() + except Exception: + log.error( + f"Failed to find rsync instance for visit {sanitise(visit_name)} " + "with the following properties: \n" + f"{rsyncer_info.dict()}", + exc_info=True, ) - ).one() + return None rsync_instance.files_counted += rsyncer_info.increment_count db.add(rsync_instance) db.commit()