diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 91e969bff..c3b24a050 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -152,6 +152,11 @@ def finalise(self): for p in self.rsync_processes.keys(): self._finalise_rsyncer(p) + def update_visit_time(self, new_end_time: datetime): + self.visit_end_time = new_end_time + for rp in self.rsync_processes.values(): + rp._end_time = new_end_time + def _start_rsyncer_multigrid( self, source: Path, diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 2ff0133d8..fa229f132 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -220,8 +220,10 @@ def enqueue(self, file_path: Path): self.queue.put(absolute_path) def flush_skipped(self): + self._end_time = datetime.now() for f in self._skipped_files: self.queue.put(f) + self._skipped_files = [] def _process(self): logger.info("RSync thread starting") diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 8fcd46756..05b4d228b 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -214,6 +214,13 @@ def stop_multigrid_watcher(session_id: MurfeySessionID, label: str): watchers[label].request_stop() +@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time") +def update_multigrid_controller_visit_end_time( + session_id: MurfeySessionID, end_time: datetime +): + controllers[session_id].update_visit_time(end_time) + + class RsyncerSource(BaseModel): source: Path label: str @@ -261,6 +268,12 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): return {"success": True} +@router.post("/sessions/{session_id}/flush_skipped_rsyncer") +def flush_skipped_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource): + controllers[session_id].rsync_processes[rsyncer_source.source].flush_skipped() + return {"success": True} + + class ObserverInfo(BaseModel): source: str num_files_transferred: int diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 1f8bd32d2..10a6244d3 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -3,6 +3,7 @@ import asyncio import datetime import logging +import urllib from pathlib import Path from typing import Annotated, List, Optional @@ -126,7 +127,7 @@ async def setup_multigrid_watcher( str(k): v for k, v in watcher_spec.destination_overrides.items() }, "rsync_restarts": watcher_spec.rsync_restarts, - "visit_end_time": session.visit_end_time, + "visit_end_time": str(session.visit_end_time), }, headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" @@ -396,6 +397,36 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db): return data +@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time") +async def update_visit_end_time( + session_id: MurfeySessionID, end_time: datetime.datetime, db=murfey_db +): + # Load data for session + session_entry = db.exec(select(Session).where(Session.id == session_id)).one() + instrument_name = session_entry.instrument_name + + # Update visit end time in database + session_entry.visit_end_time = end_time + db.add(session_entry) + db.commit() + + # Update the multigrid controller + data = {} + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + if machine_config.instrument_server_url: + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}", + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + ) as resp: + data = await resp.json() + return data + + @router.post("/sessions/{session_id}/abandon_session") async def abandon_session(session_id: MurfeySessionID, db=murfey_db): data = {} @@ -473,6 +504,34 @@ async def restart_rsyncer( return data +@router.post("/sessions/{session_id}/flush_skipped_rsyncer") +async def flush_skipped_rsyncer( + session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db +): + data = {} + instrument_name = ( + db.exec(select(Session).where(Session.id == session_id)).one().instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + if isinstance(session_id, int): + if machine_config.instrument_server_url: + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}", + json={ + "label": session_id, + "source": str(secure_path(Path(rsyncer_source.source))), + }, + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + ) as resp: + data = await resp.json() + return data + + class RSyncerInfo(BaseModel): source: str num_files_transferred: int diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index ae187f251..d4bedb42e 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -38,6 +38,11 @@ murfey.instrument_server.api.router: type: str methods: - DELETE + - path: /sessions/{session_id}/multigrid_controller/visit_end_time + function: update_multigrid_controller_visit_end_time + path_params: [] + methods: + - POST - path: /sessions/{session_id}/stop_rsyncer function: stop_rsyncer path_params: [] @@ -68,6 +73,11 @@ murfey.instrument_server.api.router: path_params: [] methods: - POST + - path: /sessions/{session_id}/flush_skipped_rsyncer + function: flush_skipped_rsyncer + path_params: [] + methods: + - POST - path: /sessions/{session_id}/rsyncer_info function: get_rsyncer_info path_params: [] @@ -535,6 +545,11 @@ murfey.server.api.instrument.router: path_params: [] methods: - POST + - path: /instrument_server/sessions/{session_id}/multigrid_controller/visit_end_time + function: update_visit_end_time + path_params: [] + methods: + - POST - path: /instrument_server/sessions/{session_id}/abandon_session function: abandon_session path_params: [] @@ -550,6 +565,11 @@ murfey.server.api.instrument.router: path_params: [] methods: - POST + - path: /instrument_server/sessions/{session_id}/flush_skipped_rsyncer + function: flush_skipped_rsyncer + path_params: [] + methods: + - POST - path: /instrument_server/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info function: get_rsyncer_info path_params: @@ -821,11 +841,6 @@ murfey.server.api.session_info.correlative_router: methods: - GET murfey.server.api.session_info.router: - - path: /session_info/ - function: root - path_params: [] - methods: - - GET - path: /session_info/health/ function: health_check path_params: []