diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 18b28b2b4..8d4765e9a 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -370,6 +370,7 @@ def _analyse(self): ) self.post_transfer(transferred_file) self.queue.task_done() + logger.debug("Analyer thread has stopped analysing incoming files") self.notify(final=True) def _xml_file(self, data_file: Path) -> Path: @@ -403,6 +404,12 @@ def request_stop(self): self._stopping = True self._halt_thread = True + def is_safe_to_stop(self): + """ + Checks that the analyser thread is safe to stop + """ + return self._stopping and self._halt_thread and not self.queue.qsize() + def stop(self): logger.debug("Analyser thread stop requested") self._stopping = True @@ -412,5 +419,8 @@ def stop(self): self.queue.put(None) self.thread.join() except Exception as e: - logger.error(f"Exception encountered while stopping analyser: {e}") + logger.error( + f"Exception encountered while stopping Analyser: {e}", + exc_info=True, + ) logger.debug("Analyser thread stop completed") diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a016b49f4..44b91261a 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -2,6 +2,7 @@ import logging import subprocess import threading +import time from dataclasses import dataclass, field from datetime import datetime from functools import partial @@ -36,6 +37,7 @@ class MultigridController: rsync_url: str = "" rsync_module: str = "data" demo: bool = False + finalising: bool = False dormant: bool = False multigrid_watcher_active: bool = True processing_enabled: bool = True @@ -117,34 +119,70 @@ def __post_init__(self): def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False - self.dormancy_check() - def dormancy_check(self): + def is_ready_for_dormancy(self): + """ + When the multigrid watcher is no longer active, sends a request to safely stop + the analyser and file watcher threads, then checks to see that those threads + and the RSyncer processes associated with the current session have all been + safely stopped + """ + log.debug( + f"Starting dormancy check for MultigridController for session {self.session_id}" + ) if not self.multigrid_watcher_active: - if ( + for a in self.analysers.values(): + if a.is_safe_to_stop(): + a.stop() + for w in self._environment.watchers.values(): + if w.is_safe_to_stop(): + w.stop() + return ( all(r._finalised for r in self.rsync_processes.values()) and not any(a.thread.is_alive() for a in self.analysers.values()) and not any( w.thread.is_alive() for w in self._environment.watchers.values() ) - ): + ) + log.debug(f"Multigrid watcher for session {self.session_id} is still active") + return False + + def clean_up_once_dormant(self, running_threads: list[threading.Thread]): + """ + A function run in a separate thread that runs the post-session cleanup logic + once all threads associated with this current session are halted, and marks + the controller as being fully dormant after doing so. + """ + for thread in running_threads: + thread.join() + log.debug(f"RSyncer cleanup thread {thread.ident} has stopped safely") + while not self.is_ready_for_dormancy(): + time.sleep(10) + + # Once all threads are stopped, remove session from the database + log.debug( + f"Submitting request to remove session {self.session_id} from database" + ) + response = capture_delete( + f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}", + ) + success = response.status_code == 200 if response else False + if not success: + log.warning(f"Could not delete database data for {self.session_id}") + + # Send message to frontend to trigger a refresh + self.ws.send( + json.dumps( + { + "message": "refresh", + "target": "sessions", + "instrument_name": self.instrument_name, + } + ) + ) - def call_remove_session(): - response = capture_delete( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}", - ) - success = response.status_code == 200 if response else False - if not success: - log.warning( - f"Could not delete database data for {self.session_id}" - ) - - dormancy_thread = threading.Thread( - name=f"Session deletion thread {self.session_id}", - target=call_remove_session, - ) - dormancy_thread.start() - self.dormant = True + # Mark as dormant + self.dormant = True def abandon(self): for a in self.analysers.values(): @@ -155,12 +193,26 @@ def abandon(self): p.request_stop() def finalise(self): + self.finalising = True for a in self.analysers.values(): a.request_stop() + log.debug(f"Stop request sent to analyser {a}") for w in self._environment.watchers.values(): w.request_stop() + log.debug(f"Stop request sent to watcher {w}") + rsync_finaliser_threads = [] for p in self.rsync_processes.keys(): - self._finalise_rsyncer(p) + # Collect the running rsyncer finaliser threads to pass to the dormancy checker + rsync_finaliser_threads.append(self._finalise_rsyncer(p)) + log.debug(f"Finalised rsyncer {p}") + + # Run the session cleanup function in a separate thread + cleanup_upon_dormancy_thread = threading.Thread( + target=self.clean_up_once_dormant, + args=[rsync_finaliser_threads], + daemon=True, + ) + cleanup_upon_dormancy_thread.start() def update_visit_time(self, new_end_time: datetime): # Convert the received server timestamp into the local equivalent @@ -224,7 +276,15 @@ def _start_rsyncer_multigrid( transfer=machine_data.get("data_transfer_enabled", True), restarted=str(source) in self.rsync_restarts, ) - self.ws.send(json.dumps({"message": "refresh"})) + self.ws.send( + json.dumps( + { + "message": "refresh", + "target": "rsyncer", + "session_id": self.session_id, + } + ) + ) def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): if explicit_stop: @@ -235,15 +295,19 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): capture_post(stop_url, json={"source": str(source)}) def _finalise_rsyncer(self, source: Path): + """ + Starts a new Rsyncer thread that cleans up the directories, and returns that + thread to be managed by a central thread. + """ finalise_thread = threading.Thread( name=f"Controller finaliser thread ({source})", - target=partial( - self.rsync_processes[source].finalise, callback=self.dormancy_check - ), + target=self.rsync_processes[source].finalise, kwargs={"thread": False}, daemon=True, ) finalise_thread.start() + log.debug(f"Started RSync cleanup for {str(source)}") + return finalise_thread def _restart_rsyncer(self, source: Path): self.rsync_processes[source].restart() @@ -368,7 +432,6 @@ def rsync_result(update: RSyncerUpdate): ) else: self.analysers[source].subscribe(self._data_collection_form) - self.analysers[source].subscribe(self.dormancy_check, final=True) self.analysers[source].start() if transfer: self.rsync_processes[source].subscribe(self.analysers[source].enqueue) @@ -408,9 +471,6 @@ def _rsync_update_converter(p: Path) -> None: ), secondary=True, ) - self._environment.watchers[source].subscribe( - self.dormancy_check, final=True - ) self._environment.watchers[source].start() def _data_collection_form(self, response: dict): diff --git a/src/murfey/client/watchdir.py b/src/murfey/client/watchdir.py index c1473304d..4fead7977 100644 --- a/src/murfey/client/watchdir.py +++ b/src/murfey/client/watchdir.py @@ -67,16 +67,27 @@ def request_stop(self): self._stopping = True self._halt_thread = True + def is_safe_to_stop(self): + """ + Checks that the directory watcher thread is safe to stop + """ + return self._stopping and self._halt_thread and not self.queue.qsize() + def stop(self): - log.debug("DirWatcher thread stop requested") self._stopping = True if self.thread.is_alive(): self.queue.join() self._halt_thread = True - if self.thread.is_alive(): - self.queue.put(None) - self.thread.join() + try: + if self.thread.is_alive(): + self.queue.put(None) + self.thread.join() + except Exception as e: + log.error( + f"Exception encountered while stopping DirWatcher: {e}", + exc_info=True, + ) log.debug("DirWatcher thread stop completed") def _process(self): @@ -94,6 +105,7 @@ def _process(self): modification_time=modification_time, transfer_all=self._transfer_all ) time.sleep(15) + log.debug(f"DirWatcher {self} has stopped scanning") self.notify(final=True) def scan(self, modification_time: float | None = None, transfer_all: bool = False): diff --git a/src/murfey/instrument_server/__init__.py b/src/murfey/instrument_server/__init__.py index 180d177f4..41aca780f 100644 --- a/src/murfey/instrument_server/__init__.py +++ b/src/murfey/instrument_server/__init__.py @@ -45,21 +45,32 @@ def start_instrument_server(): LogFilter.install() + # Log everything from Murfey by default + logging.getLogger("murfey").setLevel(logging.DEBUG) + + # Show only logs at INFO level and above in the console rich_handler = RichHandler(enable_link_path=False) - logging.getLogger("murfey").setLevel(logging.INFO) + rich_handler.setLevel(logging.INFO) logging.getLogger("murfey").addHandler(rich_handler) logging.getLogger("fastapi").addHandler(rich_handler) logging.getLogger("uvicorn").addHandler(rich_handler) + # Create a websocket app to connect to the backend ws = murfey.client.websocket.WSApp( server=read_config().get("Murfey", "server", fallback=""), register_client=False, ) - handler = CustomHandler(ws.send) - logging.getLogger("murfey").addHandler(handler) - logging.getLogger("fastapi").addHandler(handler) - logging.getLogger("uvicorn").addHandler(handler) + # Forward DEBUG levels logs and above from Murfey to the backend + murfey_ws_handler = CustomHandler(ws.send) + murfey_ws_handler.setLevel(logging.DEBUG) + logging.getLogger("murfey").addHandler(murfey_ws_handler) + + # Forward only INFO level logs and above for other packages + other_ws_handler = CustomHandler(ws.send) + other_ws_handler.setLevel(logging.INFO) + logging.getLogger("fastapi").addHandler(other_ws_handler) + logging.getLogger("uvicorn").addHandler(other_ws_handler) logger.info( f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}" diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index ca31f9c21..32a17c3c1 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -7,6 +7,7 @@ from functools import partial from logging import getLogger from pathlib import Path +from threading import Lock from typing import Annotated, Any, Optional from urllib.parse import urlparse @@ -31,6 +32,7 @@ watchers: dict[str | int, MultigridDirWatcher] = {} rsyncers: dict[str, RSyncer] = {} controllers: dict[int, MultigridController] = {} +controller_lock = Lock() data_collection_parameters: dict = {} tokens = {} @@ -145,22 +147,26 @@ def check_token(session_id: MurfeySessionID): def setup_multigrid_watcher( session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec ): + # Remove dormant controllers from memory + with controller_lock: + controllers_to_remove = [ + sid for sid, controller in controllers.items() if controller.dormant + ] + for sid in controllers_to_remove: + del controllers[sid] + # Return 'True' if controllers are already set up if controllers.get(session_id) is not None: return {"success": True} - label = watcher_spec.label - for sid, controller in controllers.items(): - if controller.dormant: - del controllers[sid] - # Load machine config as dictionary machine_config: dict[str, Any] = requests.get( f"{_get_murfey_url()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=sanitise_nonpath(watcher_spec.instrument_name))}", headers={"Authorization": f"Bearer {tokens[session_id]}"}, ).json() - # Set up the multigrid controll controller + # Set up the multigrid controller + label = watcher_spec.label controllers[session_id] = MultigridController( [], watcher_spec.visit, @@ -205,7 +211,11 @@ def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True): return {"success": False} if not process: watchers[session_id]._analyse = False - watchers[session_id].start() + try: + watchers[session_id].start() + # Ignore RuntimeError; this happens when reconnecting after a backend server restart + except RuntimeError: + logger.debug(f"MultigridWatcher for session {session_id} is already active") return {"success": True} @@ -216,11 +226,15 @@ def stop_multigrid_watcher(session_id: MurfeySessionID, label: str): @router.get("/sessions/{session_id}/multigrid_controller/status") -def check_multigrid_controller_exists( +def check_multigrid_controller_status( session_id: MurfeySessionID, ): if controllers.get(session_id, None) is not None: - return {"exists": True} + return { + "dormant": controllers[session_id].dormant, + "exists": True, + "finalising": controllers[session_id].finalising, + } return {"exists": False} @@ -268,7 +282,9 @@ def finalise_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource) @router.post("/sessions/{session_id}/finalise_session") def finalise_session(session_id: MurfeySessionID): watchers[session_id].request_stop() + logger.debug(f"Stop request sent to multigrid watcher for session {session_id}") controllers[session_id].finalise() + logger.debug(f"Stop orders sent to multigrid controller for session {session_id} ") return {"success": True} @@ -295,7 +311,10 @@ class ObserverInfo(BaseModel): @router.get("/sessions/{session_id}/rsyncer_info") def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]: - info = [] + info: list[ObserverInfo] = [] + if controllers.get(session_id, None) is None: + logger.debug(f"Multigrid controller for session {session_id} doesn't exist") + return info for k, v in controllers[session_id].rsync_processes.items(): info.append( ObserverInfo( @@ -312,7 +331,10 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]: @router.get("/sessions/{session_id}/analyser_info") def get_analyser_info(session_id: MurfeySessionID) -> list[ObserverInfo]: - info = [] + info: list[ObserverInfo] = [] + if controllers.get(session_id, None) is None: + logger.debug(f"Multigrid controller for session {session_id} doesn't exist") + return info for k, v in controllers[session_id].analysers.items(): info.append( ObserverInfo( diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 9f2860f44..d3dd95275 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -102,7 +102,7 @@ async def check_if_session_is_active( @router.get("/sessions/{session_id}/multigrid_controller/status") -async def check_multigrid_controller_exists(session_id: MurfeySessionID, db=murfey_db): +async def check_multigrid_controller_status(session_id: MurfeySessionID, db=murfey_db): session = db.exec(select(Session).where(Session.id == session_id)).one() instrument_name = session.instrument_name machine_config = get_machine_config(instrument_name=instrument_name)[ @@ -112,9 +112,15 @@ async def check_multigrid_controller_exists(session_id: MurfeySessionID, db=murf log.debug( f"Submitting request to inspect multigrid controller for session {session_id}" ) + # Treat it as absent if the server-side has no stored token for the session + if ( + instrument_server_tokens.get(session_id, {}).get("access_token", None) + is None + ): + return {"exists": False} async with aiohttp.ClientSession() as clientsession: async with clientsession.get( - f"{machine_config.instrument_server_url}{url_path_for('api.router', 'check_multigrid_controller_exists', session_id=session_id)}", + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'check_multigrid_controller_status', session_id=session_id)}", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, @@ -433,6 +439,7 @@ async def finalise_rsyncer( @router.post("/sessions/{session_id}/finalise_session") async def finalise_session(session_id: MurfeySessionID, db=murfey_db): + log.debug(f"Finalising session {session_id}") data = {} instrument_name = ( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name @@ -449,6 +456,7 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db): }, ) as resp: data = await resp.json() + log.debug(f"Received response {data}") return data diff --git a/src/murfey/server/api/shared.py b/src/murfey/server/api/shared.py index 116b3b9d6..1b8bf84ae 100644 --- a/src/murfey/server/api/shared.py +++ b/src/murfey/server/api/shared.py @@ -91,6 +91,7 @@ def remove_session_by_id(session_id: int, db): ) db.delete(session) db.commit() + logger.debug(f"Successfully removed session {session_id} from database") return diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index a92821c19..615aaf5ce 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -38,16 +38,16 @@ murfey.instrument_server.api.router: type: str methods: - DELETE + - path: /sessions/{session_id}/multigrid_controller/status + function: check_multigrid_controller_status + path_params: [] + methods: + - GET - path: /sessions/{session_id}/multigrid_controller/visit_end_time function: update_multigrid_controller_visit_end_time path_params: [] methods: - POST - - path: /sessions/{session_id}/multigrid_controller/status - function: check_multigrid_controller_exists - path_params: [] - methods: - - GET - path: /sessions/{session_id}/stop_rsyncer function: stop_rsyncer path_params: [] @@ -498,6 +498,11 @@ murfey.server.api.instrument.router: type: int methods: - GET + - path: /instrument_server/sessions/{session_id}/multigrid_controller/status + function: check_multigrid_controller_status + path_params: [] + methods: + - GET - path: /instrument_server/sessions/{session_id}/multigrid_watcher function: setup_multigrid_watcher path_params: [] @@ -508,11 +513,11 @@ murfey.server.api.instrument.router: path_params: [] methods: - POST - - path: /instrument_server/sessions/{session_id}/multigrid_controller/status - function: check_multigrid_controller_exists + - path: /instrument_server/sessions/{session_id}/multigrid_controller/visit_end_time + function: update_visit_end_time path_params: [] methods: - - GET + - POST - path: /instrument_server/sessions/{session_id}/provided_processing_parameters function: pass_proc_params_to_instrument_server path_params: [] @@ -559,11 +564,6 @@ murfey.server.api.instrument.router: path_params: [] methods: - POST - - path: /instrument_server/sessions/{session_id}/multigrid_controller/visit_end_time - function: update_visit_end_time - path_params: [] - methods: - - POST - path: /instrument_server/sessions/{session_id}/abandon_session function: abandon_session path_params: [] diff --git a/tests/instrument_server/test_api.py b/tests/instrument_server/test_api.py index 310479fd6..54ed53563 100644 --- a/tests/instrument_server/test_api.py +++ b/tests/instrument_server/test_api.py @@ -72,22 +72,31 @@ def test_get_murfey_url( assert parsed_server.path == parsed_original.path -def test_check_multigrid_controller_exists(mocker: MockerFixture): +def test_check_multigrid_controller_status(mocker: MockerFixture): session_id = 1 # Patch out the multigrid controllers that have been stored in memory - mocker.patch("murfey.instrument_server.api.controllers", {session_id: MagicMock()}) + mock_controller = MagicMock() + mock_controller.dormant = False + mock_controller.finalising = False + mocker.patch( + "murfey.instrument_server.api.controllers", {session_id: mock_controller} + ) # Set up the test client client_server = set_up_test_client(session_id=session_id) url_path = url_path_for( - "api.router", "check_multigrid_controller_exists", session_id=session_id + "api.router", "check_multigrid_controller_status", session_id=session_id ) response = client_server.get(url_path) # Check that the result is as expected assert response.status_code == 200 - assert response.json() == {"exists": True} + assert response.json() == { + "dormant": False, + "exists": True, + "finalising": False, + } test_upload_gain_reference_params_matrix = ( diff --git a/tests/server/api/test_instrument.py b/tests/server/api/test_instrument.py index b8c83ae8f..a2b022311 100644 --- a/tests/server/api/test_instrument.py +++ b/tests/server/api/test_instrument.py @@ -51,7 +51,7 @@ def mock_aiohttp_clientsession( return mock_clientsession, mock_response -def test_check_multigrid_controller_exists(mocker: MockerFixture): +def test_check_multigrid_controller_status(mocker: MockerFixture): # Set up the objects to mock instrument_name = "test" session_id = 1 @@ -107,12 +107,12 @@ def mock_get_db_session(): # Construct the URL paths for poking and sending to backend_url_path = url_path_for( "api.instrument.router", - "check_multigrid_controller_exists", + "check_multigrid_controller_status", session_id=session_id, ) client_url_path = url_path_for( "api.router", - "check_multigrid_controller_exists", + "check_multigrid_controller_status", session_id=session_id, )