diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 483eff6f7..4abcb5c0b 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -160,7 +160,8 @@ def _start_rsyncer_multigrid( tag: str = "", limited: bool = False, ): - log.info(f"starting multigrid rsyncer: {source}") + log.info(f"Starting multigrid rsyncer: {source}") + log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}") destination_overrides = destination_overrides or {} machine_data = requests.get( f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine" diff --git a/src/murfey/client/watchdir_multigrid.py b/src/murfey/client/watchdir_multigrid.py index 46a9a2a11..1d29b0806 100644 --- a/src/murfey/client/watchdir_multigrid.py +++ b/src/murfey/client/watchdir_multigrid.py @@ -21,15 +21,17 @@ def __init__( ): super().__init__() self._basepath = Path(path) - self._skip_existing_processing = skip_existing_processing - self._seen_dirs: List[Path] = [] - self._stopping = False self._machine_config = machine_config + self._seen_dirs: List[Path] = [] self.thread = threading.Thread( name=f"MultigridDirWatcher {self._basepath}", target=self._process, daemon=True, ) + # Toggleable settings + self._analyse = True + self._skip_existing_processing = skip_existing_processing + self._stopping = False def start(self): if self.thread.is_alive(): @@ -60,8 +62,14 @@ def _process(self): include_mid_path=False, use_suggested_path=False, analyse=( - d.name - in self._machine_config["analyse_created_directories"] + ( + d.name + in self._machine_config[ + "analyse_created_directories" + ] + ) + if self._analyse + else False ), tag="atlas", ) @@ -72,7 +80,7 @@ def _process(self): d, extra_directory=f"metadata_{d.name}", include_mid_path=False, - analyse=True, # not (first_loop and self._skip_existing_processing), + analyse=self._analyse, limited=True, tag="metadata", ) @@ -80,15 +88,17 @@ def _process(self): processing_started = False for d02 in (d.parent.parent / d.name).glob("Images-Disc*"): if d02 not in self._seen_dirs: - # if skip exisiting processing is set then do not process for any - # data directories found on the first loop - # this allows you to avoid triggering processing again if murfey is restarted + # If 'skip_existing_processing' is set, do not process for + # any data directories found on the first loop. + # This allows you to avoid triggering processing again if Murfey is restarted self.notify( d02, include_mid_path=False, remove_files=True, - analyse=not ( - first_loop and self._skip_existing_processing + analyse=( + not (first_loop and self._skip_existing_processing) + if self._analyse + else False ), tag="fractions", ) @@ -104,8 +114,10 @@ def _process(self): self.notify( d02, include_mid_path=False, - analyse=not ( - first_loop and self._skip_existing_processing + analyse=( + not (first_loop and self._skip_existing_processing) + if self._analyse + else False ), tag="fractions", ) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 14f1de9af..e45d46a7c 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -7,7 +7,7 @@ from functools import partial from logging import getLogger from pathlib import Path -from typing import Annotated, Any, Dict, List, Optional, Union +from typing import Annotated, Any, Optional from urllib.parse import urlparse import requests @@ -27,9 +27,9 @@ logger = getLogger("murfey.instrument_server.api") -watchers: Dict[Union[str, int], MultigridDirWatcher] = {} -rsyncers: Dict[str, RSyncer] = {} -controllers: Dict[int, MultigridController] = {} +watchers: dict[str | int, MultigridDirWatcher] = {} +rsyncers: dict[str, RSyncer] = {} +controllers: dict[int, MultigridController] = {} data_collection_parameters: dict = {} tokens = {} @@ -187,9 +187,11 @@ def setup_multigrid_watcher( @router.post("/sessions/{session_id}/start_multigrid_watcher") -def start_multigrid_watcher(session_id: MurfeySessionID): +def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True): if watchers.get(session_id) is None: return {"success": False} + if not process: + watchers[session_id]._analyse = False watchers[session_id].start() return {"success": True} @@ -322,7 +324,7 @@ def register_processing_parameters( ) def get_possible_gain_references( instrument_name: str, session_id: MurfeySessionID -) -> List[File]: +) -> list[File]: machine_config = requests.get( f"{_get_murfey_url()}/instruments/{sanitise_nonpath(instrument_name)}/machine", headers={"Authorization": f"Bearer {tokens[session_id]}"}, diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 924f557ee..65e64bc61 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -372,13 +372,22 @@ async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClien def increment_rsync_file_count( visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db ): - rsync_instance = db.exec( - select(RsyncInstance).where( - RsyncInstance.source == rsyncer_info.source, - RsyncInstance.destination == rsyncer_info.destination, - RsyncInstance.session_id == rsyncer_info.session_id, + try: + rsync_instance = db.exec( + select(RsyncInstance).where( + RsyncInstance.source == rsyncer_info.source, + RsyncInstance.destination == rsyncer_info.destination, + RsyncInstance.session_id == rsyncer_info.session_id, + ) + ).one() + except Exception: + log.error( + f"Failed to find rsync instance for visit {sanitise(visit_name)} " + "with the following properties: \n" + f"{rsyncer_info.dict()}", + exc_info=True, ) - ).one() + return None rsync_instance.files_counted += rsyncer_info.increment_count db.add(rsync_instance) db.commit() @@ -1996,6 +2005,17 @@ def create_session( return sid +@router.post("/sessions/{session_id}") +def update_session( + session_id: MurfeySessionID, process: bool = True, db=murfey_db +) -> None: + session = db.exec(select(Session).where(Session.id == session_id)).one() + session.process = process + db.add(session) + db.commit() + return None + + @router.put("/sessions/{session_id}/current_gain_ref") def update_current_gain_ref( session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db=murfey_db diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 74798b190..3f60e35d9 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -142,21 +142,26 @@ async def setup_multigrid_watcher( @router.post("/sessions/{session_id}/start_multigrid_watcher") async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db): data = {} - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + session = db.exec(select(Session).where(Session.id == session_id)).one() + process = session.process + instrument_name = session.instrument_name machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] if machine_config.instrument_server_url: + log.debug( + f"Submitting request to start multigrid watcher for session {session_id} " + f"with processing {('enabled' if process else 'disabled')}" + ) async with aiohttp.ClientSession() as clientsession: async with clientsession.post( - f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher", + f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher?process={'true' if process else 'false'}", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, ) as resp: data = await resp.json() + log.debug(f"Received response: {data}") return data diff --git a/src/murfey/server/api/processing_parameters.py b/src/murfey/server/api/processing_parameters.py index fa359c322..bf506d30f 100644 --- a/src/murfey/server/api/processing_parameters.py +++ b/src/murfey/server/api/processing_parameters.py @@ -1,6 +1,7 @@ from logging import getLogger from typing import Optional +import sqlalchemy from fastapi import APIRouter, Depends from pydantic import BaseModel from sqlmodel import Session, select @@ -24,12 +25,15 @@ class EditableSessionProcessingParameters(BaseModel): @router.get("/sessions/{session_id}/session_processing_parameters") def get_session_processing_parameters( session_id: MurfeySessionID, db: Session = murfey_db -) -> EditableSessionProcessingParameters: - proc_params = db.exec( - select(SessionProcessingParameters).where( - SessionProcessingParameters.session_id == session_id - ) - ).one() +) -> Optional[EditableSessionProcessingParameters]: + try: + proc_params = db.exec( + select(SessionProcessingParameters).where( + SessionProcessingParameters.session_id == session_id + ) + ).one() + except sqlalchemy.exc.NoResultFound: + return None return EditableSessionProcessingParameters( gain_ref=proc_params.gain_ref, dose_per_frame=proc_params.dose_per_frame, diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 5334b6ad2..167ab91c3 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -49,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore started: bool = Field(default=False) current_gain_ref: str = Field(default="") instrument_name: str = Field(default="") + process: bool = Field(default=True) visit_end_time: Optional[datetime] = Field(default=None) # CLEM Workflow