diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index c3b24a050..8d1a660b5 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -104,14 +104,18 @@ def __post_init__(self): register_client=False, ) + # Calculate the time offset between the client and the server + current_time = datetime.now() + server_timestamp = requests.get( + f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" + ).json()["timestamp"] + self.server_time_offset = current_time - datetime.fromtimestamp( + server_timestamp + ) + + # Store the visit end time in the current device's equivalent time if self.visit_end_time: - current_time = datetime.now() - server_timestamp = requests.get( - f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" - ).json()["timestamp"] - self.visit_end_time += current_time - datetime.fromtimestamp( - server_timestamp - ) + self.visit_end_time += self.server_time_offset def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False @@ -153,9 +157,10 @@ def finalise(self): self._finalise_rsyncer(p) def update_visit_time(self, new_end_time: datetime): - self.visit_end_time = new_end_time + # Convert the received server timestamp into the local equivalent + self.visit_end_time = new_end_time + self.server_time_offset for rp in self.rsync_processes.values(): - rp._end_time = new_end_time + rp._end_time = self.visit_end_time def _start_rsyncer_multigrid( self, diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index fa229f132..de97d96ee 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -220,7 +220,6 @@ def enqueue(self, file_path: Path): self.queue.put(absolute_path) def flush_skipped(self): - self._end_time = datetime.now() for f in self._skipped_files: self.queue.put(f) self._skipped_files = [] @@ -561,7 +560,9 @@ def parse_stderr(line: str): success = False if result is None: - logger.error(f"No rsync process ran for files: {files}") + # Only log this as an error if files were scheduled for transfer + if files: + logger.error(f"No rsync process ran for files: {files}") else: logger.log( logging.WARNING if result.returncode else logging.DEBUG, diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 3ba3a88e9..afd890255 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -212,6 +212,7 @@ def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True): @router.delete("/sessions/{session_id}/multigrid_watcher/{label}") def stop_multigrid_watcher(session_id: MurfeySessionID, label: str): watchers[label].request_stop() + return {"success": True} @router.post("/sessions/{session_id}/multigrid_controller/visit_end_time") @@ -219,11 +220,11 @@ def update_multigrid_controller_visit_end_time( session_id: MurfeySessionID, end_time: datetime ): controllers[session_id].update_visit_time(end_time) + return {"success": True} class RsyncerSource(BaseModel): source: Path - label: str @router.post("/sessions/{session_id}/stop_rsyncer") diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 970f963c5..e4acae82c 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -3,9 +3,9 @@ import asyncio import datetime import logging -import urllib from pathlib import Path from typing import Annotated, List, Optional +from urllib.parse import quote import aiohttp from fastapi import APIRouter, Depends @@ -340,7 +340,6 @@ async def stop_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'stop_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -367,7 +366,6 @@ async def finalise_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'finalise_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -420,7 +418,7 @@ async def update_visit_end_time( if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: async with clientsession.post( - f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}", + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(end_time.isoformat())}", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, @@ -467,7 +465,6 @@ async def remove_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'remove_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -495,7 +492,6 @@ async def restart_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'restart_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -510,20 +506,44 @@ async def restart_rsyncer( async def flush_skipped_rsyncer( session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db ): - data = {} - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + # Load data for session + session_entry = db.exec(select(Session).where(Session.id == session_id)).one() + instrument_name = session_entry.instrument_name + + # Define a new visit end time that's slightly ahead of current time + new_end_time = datetime.datetime.now().replace( + second=0, microsecond=0 + ) + datetime.timedelta(minutes=5) + # Update the stored visit end time if the new one exceeds it + if session_entry.visit_end_time: + if new_end_time > session_entry.visit_end_time: + session_entry.visit_end_time = new_end_time + db.add(session_entry) + db.commit() + + # Send request to flush rsyncer + data: dict = {} + update_result: dict = {} machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] if isinstance(session_id, int): if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: + # Send request to instrument server to update multigrid controller + async with clientsession.post( + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(session_entry.visit_end_time.isoformat())}", + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + ) as resp: + update_result = await resp.json() + if not update_result.get("success", False): + return {"success": False} + # Send request to flush the rsyncer async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={