diff --git a/pyproject.toml b/pyproject.toml index 505912a2e..58a6807c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ developer = [ "pytest", # Test code functionality ] instrument-server = [ + "aiohttp", "fastapi[standard]", "python-jose[cryptography]", "uvicorn[standard]", diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index ac188546f..877ed5026 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -404,6 +404,7 @@ def _analyse(self): ) self.post_transfer(transferred_file) self.queue.task_done() + self.notify(final=True) def _xml_file(self, data_file: Path) -> Path: if not self._environment: @@ -432,6 +433,10 @@ def start(self): logger.info(f"Analyser thread starting for {self}") self.thread.start() + def request_stop(self): + self._stopping = True + self._halt_thread = True + def stop(self): logger.debug("Analyser thread stop requested") self._stopping = True diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 01ef1d635..a52bb0430 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -9,6 +9,7 @@ from typing import Dict, List, Optional from urllib.parse import urlparse +import aiohttp import requests import murfey.client.websocket @@ -34,6 +35,8 @@ class MultigridController: rsync_url: str = "" rsync_module: str = "data" demo: bool = False + dormant: bool = False + multigrid_watcher_active: bool = True processing_enabled: bool = True do_transfer: bool = True dummy_dc: bool = False @@ -95,6 +98,37 @@ def __post_init__(self): register_client=False, ) + def _multigrid_watcher_finalised(self): + self.multigrid_watcher_active = False + self.dormancy_check() + + async def dormancy_check(self): + if not self.multigrid_watcher_active: + if ( + all(r._finalised for r in self.rsync_processes.values()) + and not any(a.thread.is_alive() for a in self.analysers.values()) + and not any( + w.thread.is_alive() for w in self._environment.watchers.values() + ) + ): + async with aiohttp.ClientSession() as clientsession: + async with clientsession.delete( + f"{self._environment.url.geturl()}/sessions/{self.session_id}", + json={"access_token": self.token, "token_type": "bearer"}, + ) as response: + success = response.status == 200 + if not success: + log.warning(f"Could not delete database data for {self.session_id}") + self.dormant = True + + def finalise(self): + for a in self.analysers.values(): + a.request_stop() + for w in self._environment.watchers.values(): + w.request_stop() + for p in self.rsync_processes.keys(): + self._finalise_rsyncer(p) + def _start_rsyncer_multigrid( self, source: Path, @@ -165,7 +199,9 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): def _finalise_rsyncer(self, source: Path): finalise_thread = threading.Thread( name=f"Controller finaliser thread ({source})", - target=self.rsync_processes[source].finalise, + target=partial( + self.rsync_processes[source].finalise, callback=self.dormancy_check + ), kwargs={"thread": False}, daemon=True, ) @@ -297,6 +333,7 @@ def rsync_result(update: RSyncerUpdate): ) else: self.analysers[source].subscribe(self._data_collection_form) + self.analysers[source].subscribe(self.dormancy_check, final=True) self.analysers[source].start() if transfer: self.rsync_processes[source].subscribe(self.analysers[source].enqueue) @@ -336,6 +373,9 @@ def _rsync_update_converter(p: Path) -> None: ), secondary=True, ) + self._environment.watchers[source].subscribe( + self.dormancy_check, final=True + ) self._environment.watchers[source].start() def _data_collection_form(self, response: dict): diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index e3caee849..f53d40440 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -15,7 +15,7 @@ import time from enum import Enum from pathlib import Path -from typing import Callable, List, NamedTuple +from typing import Awaitable, Callable, List, NamedTuple from urllib.parse import ParseResult from murfey.client.tui.status_bar import StatusBar @@ -75,6 +75,7 @@ def __init__( self._local = local self._server_url = server_url self._notify = notify + self._finalised = False # Set rsync destination if local: @@ -181,7 +182,11 @@ def stop(self): self.thread.join() logger.debug("RSync thread stop completed") - def finalise(self, thread: bool = True): + def finalise( + self, + thread: bool = True, + callback: Callable[..., Awaitable[None] | None] | None = None, + ): self.stop() self._remove_files = True self._notify = False @@ -196,6 +201,9 @@ def finalise(self, thread: bool = True): self.stop() else: self._transfer(list(self._basepath.glob("**/*"))) + self._finalised = True + if callback: + callback() def enqueue(self, file_path: Path): if not self._stopping: diff --git a/src/murfey/client/watchdir.py b/src/murfey/client/watchdir.py index 5fcf30c29..57b5aac6d 100644 --- a/src/murfey/client/watchdir.py +++ b/src/murfey/client/watchdir.py @@ -63,6 +63,10 @@ def start(self): log.info(f"DirWatcher thread starting for {self}") self.thread.start() + def request_stop(self): + self._stopping = True + self._halt_thread = True + def stop(self): log.debug("DirWatcher thread stop requested") self._stopping = True @@ -90,6 +94,7 @@ def _process(self): modification_time=modification_time, transfer_all=self._transfer_all ) time.sleep(15) + self.notify(final=True) def scan(self, modification_time: float | None = None, transfer_all: bool = False): """ diff --git a/src/murfey/client/watchdir_multigrid.py b/src/murfey/client/watchdir_multigrid.py index 153473fc3..cf3a95c90 100644 --- a/src/murfey/client/watchdir_multigrid.py +++ b/src/murfey/client/watchdir_multigrid.py @@ -114,3 +114,5 @@ def _process(self): if first_loop: first_loop = False time.sleep(15) + + self.notify(final=True) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 9503422d7..c0a88da20 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -144,6 +144,9 @@ def start_multigrid_watcher( if controllers.get(session_id) is not None: return {"success": True} label = watcher_spec.label + for sid, controller in controllers.items(): + if controller.dormant: + del controllers[sid] controllers[session_id] = MultigridController( [], watcher_spec.visit, @@ -176,6 +179,9 @@ def start_multigrid_watcher( destination_overrides=watcher_spec.destination_overrides, ) ) + watchers[session_id].subscribe( + controllers[session_id]._multigrid_watcher_finalised, final=True + ) watchers[session_id].start() return {"success": True} @@ -213,6 +219,13 @@ def finalise_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource) return {"success": True} +@router.post("/sessions/{session_id}/finalise_session") +def finalise_session(session_id: MurfeySessionID): + watchers[session_id].request_stop() + controllers[session_id].finalise() + return {"success": True} + + @router.post("/sessions/{session_id}/restart_rsyncer") def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): controllers[session_id]._restart_rsyncer(rsyncer_source.source) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 197d62dad..6a0dce4d2 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -353,6 +353,27 @@ async def finalise_rsyncer( return data +@router.post("/sessions/{session_id}/finalise_session") +async def finalise_session(session_id: MurfeySessionID, db=murfey_db): + data = {} + instrument_name = ( + db.exec(select(Session).where(Session.id == session_id)).one().instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + if machine_config.instrument_server_url: + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( + f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_session", + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + ) as resp: + data = await resp.json() + return data + + @router.post("/sessions/{session_id}/remove_rsyncer") async def remove_rsyncer( session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db diff --git a/src/murfey/util/__init__.py b/src/murfey/util/__init__.py index 450fa9858..ff500389b 100644 --- a/src/murfey/util/__init__.py +++ b/src/murfey/util/__init__.py @@ -228,19 +228,31 @@ class Observer: def __init__(self): self._listeners: list[Callable[..., Awaitable[None] | None]] = [] self._secondary_listeners: list[Callable[..., Awaitable[None] | None]] = [] + self._final_listeners: list[Callable[..., Awaitable[None] | None]] = [] super().__init__() def subscribe( - self, fn: Callable[..., Awaitable[None] | None], secondary: bool = False + self, + fn: Callable[..., Awaitable[None] | None], + secondary: bool = False, + final: bool = False, ): - if secondary: + if final: + self._final_listeners.append(fn) + elif secondary: self._secondary_listeners.append(fn) else: self._listeners.append(fn) - async def anotify(self, *args, secondary: bool = False, **kwargs) -> None: + async def anotify( + self, *args, secondary: bool = False, final: bool = False, **kwargs + ) -> None: awaitables: list[Awaitable] = [] - listeners = self._secondary_listeners if secondary else self._listeners + listeners = ( + self._secondary_listeners + if secondary + else self._final_listeners if final else self._listeners + ) for notify_function in listeners: result = notify_function(*args, **kwargs) if result is not None and inspect.isawaitable(result): @@ -253,9 +265,15 @@ async def _await_all(awaitables: list[Awaitable]): for awaitable in asyncio.as_completed(awaitables): await awaitable - def notify(self, *args, secondary: bool = False, **kwargs) -> None: + def notify( + self, *args, secondary: bool = False, final: bool = False, **kwargs + ) -> None: awaitables: list[Awaitable] = [] - listeners = self._secondary_listeners if secondary else self._listeners + listeners = ( + self._secondary_listeners + if secondary + else self._final_listeners if final else self._listeners + ) for notify_function in listeners: result = notify_function(*args, **kwargs) if result is not None and inspect.isawaitable(result): diff --git a/src/murfey/util/state.py b/src/murfey/util/state.py index 007e02674..10f02299c 100644 --- a/src/murfey/util/state.py +++ b/src/murfey/util/state.py @@ -69,9 +69,12 @@ def subscribe( self, fn: Callable[[str, T | None], Awaitable[None] | None], secondary: bool = False, + final: bool = False, ): if secondary: self._secondary_listeners.append(fn) + elif final: + self._final_listeners.append(fn) else: self._listeners.append(fn)