diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 9503422d7..be9cdc80d 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -219,6 +219,30 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): return {"success": True} +class RSyncerInfo(BaseModel): + source: str + num_files_transferred: int + num_files_in_queue: int + alive: bool + stopping: bool + + +@router.get("/sessions/{session_id}/rsyncer_info") +def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]: + info = [] + for k, v in controllers[session_id].rsync_processes.items(): + info.append( + RSyncerInfo( + source=str(k), + num_files_transferred=v._files_transferred, + num_files_in_queue=v.queue.qsize(), + alive=v.thread.is_alive(), + stopping=v._stopping, + ) + ) + return info + + class ProcessingParameters(BaseModel): gain_ref: str dose_per_frame: Optional[float] = None diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 197d62dad..2757413a4 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -23,7 +23,7 @@ from murfey.server.murfey_db import murfey_db from murfey.util import secure_path from murfey.util.config import get_machine_config -from murfey.util.db import Session, SessionProcessingParameters +from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters from murfey.util.models import File, MultigridWatcherSetup # Create APIRouter class object @@ -407,3 +407,67 @@ async def restart_rsyncer( ) as resp: data = await resp.json() return data + + +class RSyncerInfo(BaseModel): + source: str + num_files_transferred: int + num_files_in_queue: int + alive: bool + stopping: bool + destination: str + tag: str + files_transferred: int + files_counted: int + transferring: bool + session_id: int + + +@router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") +async def get_rsyncer_info( + instrument_name: str, session_id: MurfeySessionID, db=murfey_db +) -> List[RSyncerInfo]: + data = [] + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + rsync_instances = db.exec( + select(RsyncInstance).where(RsyncInstance.session_id == session_id) + ).all() + if machine_config.instrument_server_url: + try: + async with lock: + token = instrument_server_tokens[session_id]["access_token"] + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( + f"{machine_config.instrument_server_url}/sessions/{session_id}/rsyncer_info", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + data = await resp.json() + except KeyError: + data = [] + except Exception: + log.warning( + "Exception encountered gathering rsyncer info from the instrument server", + exc_info=True, + ) + combined_data = [] + data_source_lookup = {d["source"]: d for d in data} + for ri in rsync_instances: + d = data_source_lookup.get(ri.source, {}) + combined_data.append( + RSyncerInfo( + source=ri.source, + num_files_transferred=d.get("num_files_transferred", 0), + num_files_in_queue=d.get("num_files_in_queue", 0), + alive=d.get("alive", False), + stopping=d.get("stopping", True), + destination=ri.destination, + tag=ri.tag, + files_transferred=ri.files_transferred, + files_counted=ri.files_counted, + transferring=ri.transferring, + session_id=session_id, + ) + ) + return combined_data