From 6c4bac7b07896c63462d919632072952c5f3a145 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 25 Mar 2025 17:52:01 +0000 Subject: [PATCH 01/10] Add mechanism to record when a multigrid controller has exited --- src/murfey/client/multigrid_control.py | 5 +++++ src/murfey/client/watchdir_multigrid.py | 2 ++ src/murfey/instrument_server/api.py | 1 + src/murfey/util/__init__.py | 30 ++++++++++++++++++++----- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 01ef1d635..22fe0a20d 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -34,6 +34,8 @@ class MultigridController: rsync_url: str = "" rsync_module: str = "data" demo: bool = False + dormant: bool = False + multigrid_watcher_active: bool = True processing_enabled: bool = True do_transfer: bool = True dummy_dc: bool = False @@ -95,6 +97,9 @@ def __post_init__(self): register_client=False, ) + def _multigrid_watcher_finalised(self): + self.multigrid_watcher_active = False + def _start_rsyncer_multigrid( self, source: Path, diff --git a/src/murfey/client/watchdir_multigrid.py b/src/murfey/client/watchdir_multigrid.py index 153473fc3..cf3a95c90 100644 --- a/src/murfey/client/watchdir_multigrid.py +++ b/src/murfey/client/watchdir_multigrid.py @@ -114,3 +114,5 @@ def _process(self): if first_loop: first_loop = False time.sleep(15) + + self.notify(final=True) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 9503422d7..54df2ad44 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -176,6 +176,7 @@ def start_multigrid_watcher( destination_overrides=watcher_spec.destination_overrides, ) ) + watchers[session_id].subscribe(controllers[session_id]._multigrid_watcher_finalised) watchers[session_id].start() return {"success": True} diff --git a/src/murfey/util/__init__.py b/src/murfey/util/__init__.py index 450fa9858..ff500389b 100644 --- a/src/murfey/util/__init__.py +++ b/src/murfey/util/__init__.py @@ -228,19 +228,31 @@ class Observer: def __init__(self): self._listeners: list[Callable[..., Awaitable[None] | None]] = [] self._secondary_listeners: list[Callable[..., Awaitable[None] | None]] = [] + self._final_listeners: list[Callable[..., Awaitable[None] | None]] = [] super().__init__() def subscribe( - self, fn: Callable[..., Awaitable[None] | None], secondary: bool = False + self, + fn: Callable[..., Awaitable[None] | None], + secondary: bool = False, + final: bool = False, ): - if secondary: + if final: + self._final_listeners.append(fn) + elif secondary: self._secondary_listeners.append(fn) else: self._listeners.append(fn) - async def anotify(self, *args, secondary: bool = False, **kwargs) -> None: + async def anotify( + self, *args, secondary: bool = False, final: bool = False, **kwargs + ) -> None: awaitables: list[Awaitable] = [] - listeners = self._secondary_listeners if secondary else self._listeners + listeners = ( + self._secondary_listeners + if secondary + else self._final_listeners if final else self._listeners + ) for notify_function in listeners: result = notify_function(*args, **kwargs) if result is not None and inspect.isawaitable(result): @@ -253,9 +265,15 @@ async def _await_all(awaitables: list[Awaitable]): for awaitable in asyncio.as_completed(awaitables): await awaitable - def notify(self, *args, secondary: bool = False, **kwargs) -> None: + def notify( + self, *args, secondary: bool = False, final: bool = False, **kwargs + ) -> None: awaitables: list[Awaitable] = [] - listeners = self._secondary_listeners if secondary else self._listeners + listeners = ( + self._secondary_listeners + if secondary + else self._final_listeners if final else self._listeners + ) for notify_function in listeners: result = notify_function(*args, **kwargs) if result is not None and inspect.isawaitable(result): From f2ec83d2067cf5eb48b65e2f5e7854b56ac71a10 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 25 Mar 2025 20:05:55 +0000 Subject: [PATCH 02/10] Multigrid controller can now passively request the shutdown of all watchers and analysers, and finalisation of all rsyncers. Each of these is given a callback that checks the state of all relevant threads before labelling the multigrid controller as dormant once all threads have shutdown --- src/murfey/client/analyser.py | 5 +++++ src/murfey/client/multigrid_control.py | 27 +++++++++++++++++++++++++- src/murfey/client/rsync.py | 12 ++++++++++-- src/murfey/client/watchdir.py | 5 +++++ 4 files changed, 46 insertions(+), 3 deletions(-) diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index ac188546f..877ed5026 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -404,6 +404,7 @@ def _analyse(self): ) self.post_transfer(transferred_file) self.queue.task_done() + self.notify(final=True) def _xml_file(self, data_file: Path) -> Path: if not self._environment: @@ -432,6 +433,10 @@ def start(self): logger.info(f"Analyser thread starting for {self}") self.thread.start() + def request_stop(self): + self._stopping = True + self._halt_thread = True + def stop(self): logger.debug("Analyser thread stop requested") self._stopping = True diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 22fe0a20d..00868aa7c 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -100,6 +100,25 @@ def __post_init__(self): def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False + def dormancy_check(self): + if not self.multigrid_watcher_active: + if ( + all(r._finalised for r in self.rsync_processes.values()) + and not any(a.thread.is_alive() for a in self.analysers.values()) + and not any( + w.thread.is_alive() for w in self._environment.watchers.values() + ) + ): + self.dormant = True + + def finalise(self): + for a in self.analysers.values(): + a.request_stop() + for w in self._environment.watchers.values(): + w.request_stop() + for p in self.rsync_processes.keys(): + self._finalise_rsyncer(p) + def _start_rsyncer_multigrid( self, source: Path, @@ -170,7 +189,9 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): def _finalise_rsyncer(self, source: Path): finalise_thread = threading.Thread( name=f"Controller finaliser thread ({source})", - target=self.rsync_processes[source].finalise, + target=partial( + self.rsync_processes[source].finalise, callback=self.dormancy_check + ), kwargs={"thread": False}, daemon=True, ) @@ -302,6 +323,7 @@ def rsync_result(update: RSyncerUpdate): ) else: self.analysers[source].subscribe(self._data_collection_form) + self.analysers[source].subscribe(self.dormancy_check, final=True) self.analysers[source].start() if transfer: self.rsync_processes[source].subscribe(self.analysers[source].enqueue) @@ -341,6 +363,9 @@ def _rsync_update_converter(p: Path) -> None: ), secondary=True, ) + self._environment.watchers[source].subscribe( + self.dormancy_check, final=True + ) self._environment.watchers[source].start() def _data_collection_form(self, response: dict): diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index e3caee849..f53d40440 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -15,7 +15,7 @@ import time from enum import Enum from pathlib import Path -from typing import Callable, List, NamedTuple +from typing import Awaitable, Callable, List, NamedTuple from urllib.parse import ParseResult from murfey.client.tui.status_bar import StatusBar @@ -75,6 +75,7 @@ def __init__( self._local = local self._server_url = server_url self._notify = notify + self._finalised = False # Set rsync destination if local: @@ -181,7 +182,11 @@ def stop(self): self.thread.join() logger.debug("RSync thread stop completed") - def finalise(self, thread: bool = True): + def finalise( + self, + thread: bool = True, + callback: Callable[..., Awaitable[None] | None] | None = None, + ): self.stop() self._remove_files = True self._notify = False @@ -196,6 +201,9 @@ def finalise(self, thread: bool = True): self.stop() else: self._transfer(list(self._basepath.glob("**/*"))) + self._finalised = True + if callback: + callback() def enqueue(self, file_path: Path): if not self._stopping: diff --git a/src/murfey/client/watchdir.py b/src/murfey/client/watchdir.py index 5fcf30c29..57b5aac6d 100644 --- a/src/murfey/client/watchdir.py +++ b/src/murfey/client/watchdir.py @@ -63,6 +63,10 @@ def start(self): log.info(f"DirWatcher thread starting for {self}") self.thread.start() + def request_stop(self): + self._stopping = True + self._halt_thread = True + def stop(self): log.debug("DirWatcher thread stop requested") self._stopping = True @@ -90,6 +94,7 @@ def _process(self): modification_time=modification_time, transfer_all=self._transfer_all ) time.sleep(15) + self.notify(final=True) def scan(self, modification_time: float | None = None, transfer_all: bool = False): """ From cd07dab763a94fb79b5ce574706e7bce378a30c0 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 25 Mar 2025 20:09:22 +0000 Subject: [PATCH 03/10] Add session finalising endpoints --- src/murfey/instrument_server/api.py | 6 ++++++ src/murfey/server/api/instrument.py | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 54df2ad44..30a3dcd32 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -214,6 +214,12 @@ def finalise_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource) return {"success": True} +@router.post("/sessions/{session_id}/finalise_session") +def finalise_session(session_id: MurfeySessionID): + controllers[session_id].finalise() + return {"success": True} + + @router.post("/sessions/{session_id}/restart_rsyncer") def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): controllers[session_id]._restart_rsyncer(rsyncer_source.source) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 197d62dad..6a0dce4d2 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -353,6 +353,27 @@ async def finalise_rsyncer( return data +@router.post("/sessions/{session_id}/finalise_session") +async def finalise_session(session_id: MurfeySessionID, db=murfey_db): + data = {} + instrument_name = ( + db.exec(select(Session).where(Session.id == session_id)).one().instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + if machine_config.instrument_server_url: + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( + f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_session", + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + ) as resp: + data = await resp.json() + return data + + @router.post("/sessions/{session_id}/remove_rsyncer") async def remove_rsyncer( session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db From 703458e47a3966ad439cb6fa5357f45471d87198 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 25 Mar 2025 20:10:54 +0000 Subject: [PATCH 04/10] When adding new multigrid controllers check for dormant ones and remove them --- src/murfey/instrument_server/api.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 30a3dcd32..4721380bb 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -144,6 +144,9 @@ def start_multigrid_watcher( if controllers.get(session_id) is not None: return {"success": True} label = watcher_spec.label + for sid, controller in controllers.items(): + if controller.dormant: + del controllers[sid] controllers[session_id] = MultigridController( [], watcher_spec.visit, From a9c44ae2849bd422d9efde97609f8b82ce57af78 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 25 Mar 2025 20:39:54 +0000 Subject: [PATCH 05/10] A few missing bits of logic --- src/murfey/client/multigrid_control.py | 1 + src/murfey/instrument_server/api.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 00868aa7c..2322eead7 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -99,6 +99,7 @@ def __post_init__(self): def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False + self.dormancy_check() def dormancy_check(self): if not self.multigrid_watcher_active: diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 4721380bb..c0a88da20 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -179,7 +179,9 @@ def start_multigrid_watcher( destination_overrides=watcher_spec.destination_overrides, ) ) - watchers[session_id].subscribe(controllers[session_id]._multigrid_watcher_finalised) + watchers[session_id].subscribe( + controllers[session_id]._multigrid_watcher_finalised, final=True + ) watchers[session_id].start() return {"success": True} @@ -219,6 +221,7 @@ def finalise_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource) @router.post("/sessions/{session_id}/finalise_session") def finalise_session(session_id: MurfeySessionID): + watchers[session_id].request_stop() controllers[session_id].finalise() return {"success": True} From e6c00f3b6b72bdc5341be7f67a4a24cc80982a1d Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 10:22:39 +0000 Subject: [PATCH 06/10] Delete session after removing dormant multigrid controller --- src/murfey/instrument_server/api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index c0a88da20..801141ffe 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -147,6 +147,10 @@ def start_multigrid_watcher( for sid, controller in controllers.items(): if controller.dormant: del controllers[sid] + requests.delete( + f"{_get_murfey_url()}/sessions/{sid}", + headers={"Authorization": f"Bearer {tokens[sid]}"}, + ) controllers[session_id] = MultigridController( [], watcher_spec.visit, From 149c0f0f609dda9f8b810d4c4ff8290580e1d8a3 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 10:40:27 +0000 Subject: [PATCH 07/10] sanitisation check --- src/murfey/instrument_server/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 801141ffe..5c79cc51d 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -148,7 +148,7 @@ def start_multigrid_watcher( if controller.dormant: del controllers[sid] requests.delete( - f"{_get_murfey_url()}/sessions/{sid}", + f"{_get_murfey_url()}/sessions/{sanitise_nonpath(str(sid))}", headers={"Authorization": f"Bearer {tokens[sid]}"}, ) controllers[session_id] = MultigridController( From c4692110488a1457a3f9de9b56bde463f4362746 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 11:23:39 +0000 Subject: [PATCH 08/10] More sanitising --- src/murfey/server/api/instrument.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 6a0dce4d2..726db94a3 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -21,7 +21,7 @@ validate_token, ) from murfey.server.murfey_db import murfey_db -from murfey.util import secure_path +from murfey.util import sanitise_nonpath, secure_path from murfey.util.config import get_machine_config from murfey.util.db import Session, SessionProcessingParameters from murfey.util.models import File, MultigridWatcherSetup @@ -365,7 +365,7 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db): if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: async with clientsession.post( - f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_session", + f"{machine_config.instrument_server_url}/sessions/{sanitise_nonpath(str(session_id))}/finalise_session", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, From 6032dde7d519bf2030b4d72950f6618a583a71ee Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 17:10:18 +0000 Subject: [PATCH 09/10] Move session deletion to the point where the multigrid controller becomes dormant and make it asynchronous --- pyproject.toml | 1 + src/murfey/client/multigrid_control.py | 11 ++++++++++- src/murfey/instrument_server/api.py | 4 ---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 505912a2e..58a6807c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ developer = [ "pytest", # Test code functionality ] instrument-server = [ + "aiohttp", "fastapi[standard]", "python-jose[cryptography]", "uvicorn[standard]", diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 2322eead7..a52bb0430 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -9,6 +9,7 @@ from typing import Dict, List, Optional from urllib.parse import urlparse +import aiohttp import requests import murfey.client.websocket @@ -101,7 +102,7 @@ def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False self.dormancy_check() - def dormancy_check(self): + async def dormancy_check(self): if not self.multigrid_watcher_active: if ( all(r._finalised for r in self.rsync_processes.values()) @@ -110,6 +111,14 @@ def dormancy_check(self): w.thread.is_alive() for w in self._environment.watchers.values() ) ): + async with aiohttp.ClientSession() as clientsession: + async with clientsession.delete( + f"{self._environment.url.geturl()}/sessions/{self.session_id}", + json={"access_token": self.token, "token_type": "bearer"}, + ) as response: + success = response.status == 200 + if not success: + log.warning(f"Could not delete database data for {self.session_id}") self.dormant = True def finalise(self): diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 5c79cc51d..c0a88da20 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -147,10 +147,6 @@ def start_multigrid_watcher( for sid, controller in controllers.items(): if controller.dormant: del controllers[sid] - requests.delete( - f"{_get_murfey_url()}/sessions/{sanitise_nonpath(str(sid))}", - headers={"Authorization": f"Bearer {tokens[sid]}"}, - ) controllers[session_id] = MultigridController( [], watcher_spec.visit, From 3c44eb97c4b967e5b8b8565f4c1819e94715ebf8 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 31 Mar 2025 15:34:39 +0100 Subject: [PATCH 10/10] Drop unnecessary sanitisation --- src/murfey/server/api/instrument.py | 4 ++-- src/murfey/util/state.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 726db94a3..6a0dce4d2 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -21,7 +21,7 @@ validate_token, ) from murfey.server.murfey_db import murfey_db -from murfey.util import sanitise_nonpath, secure_path +from murfey.util import secure_path from murfey.util.config import get_machine_config from murfey.util.db import Session, SessionProcessingParameters from murfey.util.models import File, MultigridWatcherSetup @@ -365,7 +365,7 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db): if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: async with clientsession.post( - f"{machine_config.instrument_server_url}/sessions/{sanitise_nonpath(str(session_id))}/finalise_session", + f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_session", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, diff --git a/src/murfey/util/state.py b/src/murfey/util/state.py index 007e02674..10f02299c 100644 --- a/src/murfey/util/state.py +++ b/src/murfey/util/state.py @@ -69,9 +69,12 @@ def subscribe( self, fn: Callable[[str, T | None], Awaitable[None] | None], secondary: bool = False, + final: bool = False, ): if secondary: self._secondary_listeners.append(fn) + elif final: + self._final_listeners.append(fn) else: self._listeners.append(fn)