From bc448403f77e1911d5f15700dd998f30f1b6719f Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 25 Mar 2025 23:45:43 +0000 Subject: [PATCH 1/6] Endpoint for getting rsyncer info direct from the rsyncer --- src/murfey/instrument_server/api.py | 20 ++++++++++++++++++++ src/murfey/server/api/instrument.py | 26 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 9503422d7..9c1cf0955 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -219,6 +219,26 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): return {"success": True} +class RSyncerInfo(BaseModel): + source: str + num_files_transferred: int + num_files_in_queue: int + + +@router.get("/sessions/{session_id}/rsyncer_info") +def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: + info = [] + for k, v in controllers[session_id].rsync_processes.items(): + info.append( + RSyncerInfo( + source=str(k), + num_files_transferred=v._files_transferred, + num_files_in_queue=v.queue.qsize(), + ) + ) + return info + + class ProcessingParameters(BaseModel): gain_ref: str dose_per_frame: Optional[float] = None diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 197d62dad..767cb8ef5 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -407,3 +407,29 @@ async def restart_rsyncer( ) as resp: data = await resp.json() return data + + +class RSyncerInfo(BaseModel): + source: str + num_files_transferred: int + num_files_in_queue: int + + +@router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") +async def get_rsyncer_info( + instrument_name: str, session_id: MurfeySessionID +) -> List[RSyncerInfo]: + data = [] + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + if machine_config.instrument_server_url: + async with lock: + token = instrument_server_tokens[session_id]["access_token"] + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( + f"{machine_config.instrument_server_url}/sessions/{sanitise(str(session_id))}/rsyncer_info", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + data = await resp.json() + return data From 95eab5b10417cfe4b9f01e9f45ee44114d3fe31f Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 15:02:43 +0000 Subject: [PATCH 2/6] Add a couple of extra things to rsyncer info --- src/murfey/instrument_server/api.py | 4 ++++ src/murfey/server/api/instrument.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 9c1cf0955..be9cdc80d 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -223,6 +223,8 @@ class RSyncerInfo(BaseModel): source: str num_files_transferred: int num_files_in_queue: int + alive: bool + stopping: bool @router.get("/sessions/{session_id}/rsyncer_info") @@ -234,6 +236,8 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: source=str(k), num_files_transferred=v._files_transferred, num_files_in_queue=v.queue.qsize(), + alive=v.thread.is_alive(), + stopping=v._stopping, ) ) return info diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 767cb8ef5..d991c4212 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -413,6 +413,8 @@ class RSyncerInfo(BaseModel): source: str num_files_transferred: int num_files_in_queue: int + alive: bool + stopping: bool @router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") From 8142442a76c71e44db107e883b1fc77d7c14944d Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 15:29:45 +0000 Subject: [PATCH 3/6] Combine database and instrument server information about rsync instances --- src/murfey/server/api/instrument.py | 32 ++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index d991c4212..9794df167 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -23,7 +23,7 @@ from murfey.server.murfey_db import murfey_db from murfey.util import secure_path from murfey.util.config import get_machine_config -from murfey.util.db import Session, SessionProcessingParameters +from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters from murfey.util.models import File, MultigridWatcherSetup # Create APIRouter class object @@ -415,16 +415,24 @@ class RSyncerInfo(BaseModel): num_files_in_queue: int alive: bool stopping: bool + destination: str + tag: str + files_transferred: int + files_counted: int + transferring: bool @router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") async def get_rsyncer_info( - instrument_name: str, session_id: MurfeySessionID + instrument_name: str, session_id: MurfeySessionID, db=murfey_db ) -> List[RSyncerInfo]: data = [] machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] + rsync_instances = db.exec( + select(RsyncInstance).where(RsyncInstance.session_id == session_id) + ).all() if machine_config.instrument_server_url: async with lock: token = instrument_server_tokens[session_id]["access_token"] @@ -434,4 +442,22 @@ async def get_rsyncer_info( headers={"Authorization": f"Bearer {token}"}, ) as resp: data = await resp.json() - return data + combined_data = [] + data_source_lookup = {d.source: d for d in data} + for ri in rsync_instances: + d = data_source_lookup.get(ri.source, {}) + combined_data.append( + RSyncerInfo( + source=ri.source, + num_files_transferred=d.get("num_files_transferred", 0), + num_files_in_queue=d.get("num_files_in_queue", 0), + alive=d.get("alive", False), + stopping=d.get("stopping", True), + destination=ri.destination, + tag=ri.tag, + files_transferred=ri.files_transferred, + files_counted=ri.files_counted, + transferring=ri.transferring, + ) + ) + return combined_data From c3b5371b799cf91f16e7a7ff1f8653158a1f1430 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 16:49:02 +0000 Subject: [PATCH 4/6] Need to add session ID --- src/murfey/server/api/instrument.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 9794df167..f765c8335 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -420,6 +420,7 @@ class RSyncerInfo(BaseModel): files_transferred: int files_counted: int transferring: bool + session_id: int @router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") @@ -443,7 +444,7 @@ async def get_rsyncer_info( ) as resp: data = await resp.json() combined_data = [] - data_source_lookup = {d.source: d for d in data} + data_source_lookup = {d["source"]: d for d in data} for ri in rsync_instances: d = data_source_lookup.get(ri.source, {}) combined_data.append( @@ -458,6 +459,7 @@ async def get_rsyncer_info( files_transferred=ri.files_transferred, files_counted=ri.files_counted, transferring=ri.transferring, + session_id=session_id, ) ) return combined_data From 58129f9f3c3293abf6d37418ec4f4e7a2650d54e Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Thu, 27 Mar 2025 17:36:56 +0000 Subject: [PATCH 5/6] Catch case when not reconnected after instrument server goes down --- src/murfey/server/api/instrument.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index f765c8335..6bdcb3a79 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -435,14 +435,17 @@ async def get_rsyncer_info( select(RsyncInstance).where(RsyncInstance.session_id == session_id) ).all() if machine_config.instrument_server_url: - async with lock: - token = instrument_server_tokens[session_id]["access_token"] - async with aiohttp.ClientSession() as clientsession: - async with clientsession.get( - f"{machine_config.instrument_server_url}/sessions/{sanitise(str(session_id))}/rsyncer_info", - headers={"Authorization": f"Bearer {token}"}, - ) as resp: - data = await resp.json() + try: + async with lock: + token = instrument_server_tokens[session_id]["access_token"] + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( + f"{machine_config.instrument_server_url}/sessions/{sanitise(str(session_id))}/rsyncer_info", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + data = await resp.json() + except KeyError: + data = [] combined_data = [] data_source_lookup = {d["source"]: d for d in data} for ri in rsync_instances: From 202a244303f6f23450a55de0109a988ea4d19222 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Fri, 28 Mar 2025 09:59:43 +0000 Subject: [PATCH 6/6] Address review --- src/murfey/server/api/instrument.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 6bdcb3a79..2757413a4 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -440,12 +440,17 @@ async def get_rsyncer_info( token = instrument_server_tokens[session_id]["access_token"] async with aiohttp.ClientSession() as clientsession: async with clientsession.get( - f"{machine_config.instrument_server_url}/sessions/{sanitise(str(session_id))}/rsyncer_info", + f"{machine_config.instrument_server_url}/sessions/{session_id}/rsyncer_info", headers={"Authorization": f"Bearer {token}"}, ) as resp: data = await resp.json() except KeyError: data = [] + except Exception: + log.warning( + "Exception encountered gathering rsyncer info from the instrument server", + exc_info=True, + ) combined_data = [] data_source_lookup = {d["source"]: d for d in data} for ri in rsync_instances: