From c1c7b45c0e017a34824e354fb9dd1d105fc8305a Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 10 Apr 2025 09:39:48 +0100 Subject: [PATCH 1/2] Add information about the analysers into the rsyncer api calls --- src/murfey/instrument_server/api.py | 16 +++++++++ src/murfey/server/api/instrument.py | 50 +++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index cb67ea25f..7fdd16deb 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -263,6 +263,22 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: return info +@router.get("/sessions/{session_id}/analyser_info") +def get_analyser_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: + info = [] + for k, v in controllers[session_id].analysers.items(): + info.append( + RSyncerInfo( + source=str(k), + num_files_transferred=0, + num_files_in_queue=v.queue.qsize(), + alive=v.thread.is_alive(), + stopping=v._stopping, + ) + ) + return info + + class ProcessingParameters(BaseModel): gain_ref: str dose_per_frame: Optional[float] = None diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index e431e1394..5c0bb1a3c 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -455,8 +455,11 @@ class RSyncerInfo(BaseModel): source: str num_files_transferred: int num_files_in_queue: int + num_files_to_analyse: int alive: bool stopping: bool + analyser_alive: bool + analyser_stopping: bool destination: str tag: str files_transferred: int @@ -469,7 +472,8 @@ class RSyncerInfo(BaseModel): async def get_rsyncer_info( instrument_name: str, session_id: MurfeySessionID, db=murfey_db ) -> List[RSyncerInfo]: - data = [] + rsyncer_data = [] + analyser_data = [] machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -486,27 +490,53 @@ async def get_rsyncer_info( headers={"Authorization": f"Bearer {token}"}, ) as resp: if resp.status == 200: - data = await resp.json() + rsyncer_data = await resp.json() else: - data = [] + rsyncer_data = [] except KeyError: - data = [] + rsyncer_data = [] except Exception: log.warning( "Exception encountered gathering rsyncer info from the instrument server", exc_info=True, ) + + try: + async with lock: + token = instrument_server_tokens[session_id]["access_token"] + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( + f"{machine_config.instrument_server_url}/sessions/{session_id}/analyser_info", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + analyser_data = await resp.json() + else: + analyser_data = [] + except KeyError: + analyser_data = [] + except Exception: + log.warning( + "Exception encountered gathering analyser info from the instrument server", + exc_info=True, + ) + combined_data = [] - data_source_lookup = {d["source"]: d for d in data} + rsyncer_source_lookup = {d["source"]: d for d in rsyncer_data} + analyser_source_lookup = {d["source"]: d for d in analyser_data} for ri in rsync_instances: - d = data_source_lookup.get(ri.source, {}) + rsync_inst = rsyncer_source_lookup.get(ri.source, {}) + analyser_inst = analyser_source_lookup.get(ri.source, {}) combined_data.append( RSyncerInfo( source=ri.source, - num_files_transferred=d.get("num_files_transferred", 0), - num_files_in_queue=d.get("num_files_in_queue", 0), - alive=d.get("alive", False), - stopping=d.get("stopping", True), + num_files_transferred=rsync_inst.get("num_files_transferred", 0), + num_files_in_queue=rsync_inst.get("num_files_in_queue", 0), + num_files_to_analyse=analyser_inst.get("num_files_in_queue", 0), + alive=rsync_inst.get("alive", False), + stopping=rsync_inst.get("stopping", True), + analyser_alive=analyser_inst.get("alive", False), + analyser_stopping=analyser_inst.get("stopping", True), destination=ri.destination, tag=ri.tag, files_transferred=ri.files_transferred, From 94a845f7e47a50118c913075463dd9d2a121466d Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 10 Apr 2025 10:30:15 +0100 Subject: [PATCH 2/2] Cleaner naming --- src/murfey/instrument_server/api.py | 10 ++++---- src/murfey/server/api/instrument.py | 38 ++++++++++++++--------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 7fdd16deb..d2f5c1834 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -239,7 +239,7 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): return {"success": True} -class RSyncerInfo(BaseModel): +class ObserverInfo(BaseModel): source: str num_files_transferred: int num_files_in_queue: int @@ -248,11 +248,11 @@ class RSyncerInfo(BaseModel): @router.get("/sessions/{session_id}/rsyncer_info") -def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: +def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]: info = [] for k, v in controllers[session_id].rsync_processes.items(): info.append( - RSyncerInfo( + ObserverInfo( source=str(k), num_files_transferred=v._files_transferred, num_files_in_queue=v.queue.qsize(), @@ -264,11 +264,11 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: @router.get("/sessions/{session_id}/analyser_info") -def get_analyser_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: +def get_analyser_info(session_id: MurfeySessionID) -> list[ObserverInfo]: info = [] for k, v in controllers[session_id].analysers.items(): info.append( - RSyncerInfo( + ObserverInfo( source=str(k), num_files_transferred=0, num_files_in_queue=v.queue.qsize(), diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 5c0bb1a3c..2001c04b6 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -472,8 +472,8 @@ class RSyncerInfo(BaseModel): async def get_rsyncer_info( instrument_name: str, session_id: MurfeySessionID, db=murfey_db ) -> List[RSyncerInfo]: - rsyncer_data = [] - analyser_data = [] + rsyncer_list = [] + analyser_list = [] machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -490,11 +490,11 @@ async def get_rsyncer_info( headers={"Authorization": f"Bearer {token}"}, ) as resp: if resp.status == 200: - rsyncer_data = await resp.json() + rsyncer_list = await resp.json() else: - rsyncer_data = [] + rsyncer_list = [] except KeyError: - rsyncer_data = [] + rsyncer_list = [] except Exception: log.warning( "Exception encountered gathering rsyncer info from the instrument server", @@ -510,11 +510,11 @@ async def get_rsyncer_info( headers={"Authorization": f"Bearer {token}"}, ) as resp: if resp.status == 200: - analyser_data = await resp.json() + analyser_list = await resp.json() else: - analyser_data = [] + analyser_list = [] except KeyError: - analyser_data = [] + analyser_list = [] except Exception: log.warning( "Exception encountered gathering analyser info from the instrument server", @@ -522,21 +522,21 @@ async def get_rsyncer_info( ) combined_data = [] - rsyncer_source_lookup = {d["source"]: d for d in rsyncer_data} - analyser_source_lookup = {d["source"]: d for d in analyser_data} + rsyncer_source_lookup = {d["source"]: d for d in rsyncer_list} + analyser_source_lookup = {d["source"]: d for d in analyser_list} for ri in rsync_instances: - rsync_inst = rsyncer_source_lookup.get(ri.source, {}) - analyser_inst = analyser_source_lookup.get(ri.source, {}) + rsync_data = rsyncer_source_lookup.get(ri.source, {}) + analyser_data = analyser_source_lookup.get(ri.source, {}) combined_data.append( RSyncerInfo( source=ri.source, - num_files_transferred=rsync_inst.get("num_files_transferred", 0), - num_files_in_queue=rsync_inst.get("num_files_in_queue", 0), - num_files_to_analyse=analyser_inst.get("num_files_in_queue", 0), - alive=rsync_inst.get("alive", False), - stopping=rsync_inst.get("stopping", True), - analyser_alive=analyser_inst.get("alive", False), - analyser_stopping=analyser_inst.get("stopping", True), + num_files_transferred=rsync_data.get("num_files_transferred", 0), + num_files_in_queue=rsync_data.get("num_files_in_queue", 0), + num_files_to_analyse=analyser_data.get("num_files_in_queue", 0), + alive=rsync_data.get("alive", False), + stopping=rsync_data.get("stopping", True), + analyser_alive=analyser_data.get("alive", False), + analyser_stopping=analyser_data.get("stopping", True), destination=ri.destination, tag=ri.tag, files_transferred=ri.files_transferred,