diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index cb67ea25f..d2f5c1834 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -239,7 +239,7 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): return {"success": True} -class RSyncerInfo(BaseModel): +class ObserverInfo(BaseModel): source: str num_files_transferred: int num_files_in_queue: int @@ -248,11 +248,11 @@ class RSyncerInfo(BaseModel): @router.get("/sessions/{session_id}/rsyncer_info") -def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: +def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]: info = [] for k, v in controllers[session_id].rsync_processes.items(): info.append( - RSyncerInfo( + ObserverInfo( source=str(k), num_files_transferred=v._files_transferred, num_files_in_queue=v.queue.qsize(), @@ -263,6 +263,22 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: return info +@router.get("/sessions/{session_id}/analyser_info") +def get_analyser_info(session_id: MurfeySessionID) -> list[ObserverInfo]: + info = [] + for k, v in controllers[session_id].analysers.items(): + info.append( + ObserverInfo( + source=str(k), + num_files_transferred=0, + num_files_in_queue=v.queue.qsize(), + alive=v.thread.is_alive(), + stopping=v._stopping, + ) + ) + return info + + class ProcessingParameters(BaseModel): gain_ref: str dose_per_frame: Optional[float] = None diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index e431e1394..2001c04b6 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -455,8 +455,11 @@ class RSyncerInfo(BaseModel): source: str num_files_transferred: int num_files_in_queue: int + num_files_to_analyse: int alive: bool stopping: bool + analyser_alive: bool + analyser_stopping: bool destination: str tag: str files_transferred: int @@ -469,7 +472,8 @@ class RSyncerInfo(BaseModel): async def get_rsyncer_info( instrument_name: str, session_id: MurfeySessionID, db=murfey_db ) -> List[RSyncerInfo]: - data = [] + rsyncer_list = [] + analyser_list = [] machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -486,27 +490,53 @@ async def get_rsyncer_info( headers={"Authorization": f"Bearer {token}"}, ) as resp: if resp.status == 200: - data = await resp.json() + rsyncer_list = await resp.json() else: - data = [] + rsyncer_list = [] except KeyError: - data = [] + rsyncer_list = [] except Exception: log.warning( "Exception encountered gathering rsyncer info from the instrument server", exc_info=True, ) + + try: + async with lock: + token = instrument_server_tokens[session_id]["access_token"] + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( + f"{machine_config.instrument_server_url}/sessions/{session_id}/analyser_info", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + analyser_list = await resp.json() + else: + analyser_list = [] + except KeyError: + analyser_list = [] + except Exception: + log.warning( + "Exception encountered gathering analyser info from the instrument server", + exc_info=True, + ) + combined_data = [] - data_source_lookup = {d["source"]: d for d in data} + rsyncer_source_lookup = {d["source"]: d for d in rsyncer_list} + analyser_source_lookup = {d["source"]: d for d in analyser_list} for ri in rsync_instances: - d = data_source_lookup.get(ri.source, {}) + rsync_data = rsyncer_source_lookup.get(ri.source, {}) + analyser_data = analyser_source_lookup.get(ri.source, {}) combined_data.append( RSyncerInfo( source=ri.source, - num_files_transferred=d.get("num_files_transferred", 0), - num_files_in_queue=d.get("num_files_in_queue", 0), - alive=d.get("alive", False), - stopping=d.get("stopping", True), + num_files_transferred=rsync_data.get("num_files_transferred", 0), + num_files_in_queue=rsync_data.get("num_files_in_queue", 0), + num_files_to_analyse=analyser_data.get("num_files_in_queue", 0), + alive=rsync_data.get("alive", False), + stopping=rsync_data.get("stopping", True), + analyser_alive=analyser_data.get("alive", False), + analyser_stopping=analyser_data.get("stopping", True), destination=ri.destination, tag=ri.tag, files_transferred=ri.files_transferred,