From c73edfdd3cbedab72540f116d682971e3c3fb777 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 7 Jul 2025 14:20:37 +0100 Subject: [PATCH 1/6] Removed 'label' field from the 'RsyncerSource' Pydantic models, and removed the key from requests to endpoints that use that model --- src/murfey/instrument_server/api.py | 1 - src/murfey/server/api/instrument.py | 5 ----- 2 files changed, 6 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 3ba3a88e9..4a8a4b2bc 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -223,7 +223,6 @@ def update_multigrid_controller_visit_end_time( class RsyncerSource(BaseModel): source: Path - label: str @router.post("/sessions/{session_id}/stop_rsyncer") diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 970f963c5..27a7f9df4 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -340,7 +340,6 @@ async def stop_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'stop_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -367,7 +366,6 @@ async def finalise_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'finalise_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -467,7 +465,6 @@ async def remove_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'remove_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -495,7 +492,6 @@ async def restart_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'restart_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ @@ -523,7 +519,6 @@ async def flush_skipped_rsyncer( async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}", json={ - "label": session_id, "source": str(secure_path(Path(rsyncer_source.source))), }, headers={ From cf342512e00cd8fe9edbba3651bc0a95e0ab4185 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 7 Jul 2025 14:44:17 +0100 Subject: [PATCH 2/6] Only display error message about no rsync processes being run if files were scheduled for transfer; this should skip the case of an empty list --- src/murfey/client/rsync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index fa229f132..b77659a4e 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -561,7 +561,9 @@ def parse_stderr(line: str): success = False if result is None: - logger.error(f"No rsync process ran for files: {files}") + # Only log this as an error if files were scheduled for transfer + if files: + logger.error(f"No rsync process ran for files: {files}") else: logger.log( logging.WARNING if result.returncode else logging.DEBUG, From 2cfe735538bafa1335d788ef6c4965f7b1bb4f28 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 7 Jul 2025 17:43:24 +0100 Subject: [PATCH 3/6] Update visit end time information in rsyncer and database when triggering a flush, but only if the end time is greater than what is currently stored in the database --- src/murfey/server/api/instrument.py | 37 ++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 27a7f9df4..e4acae82c 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -3,9 +3,9 @@ import asyncio import datetime import logging -import urllib from pathlib import Path from typing import Annotated, List, Optional +from urllib.parse import quote import aiohttp from fastapi import APIRouter, Depends @@ -418,7 +418,7 @@ async def update_visit_end_time( if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: async with clientsession.post( - f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}", + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(end_time.isoformat())}", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, @@ -506,16 +506,41 @@ async def restart_rsyncer( async def flush_skipped_rsyncer( session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db ): - data = {} - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + # Load data for session + session_entry = db.exec(select(Session).where(Session.id == session_id)).one() + instrument_name = session_entry.instrument_name + + # Define a new visit end time that's slightly ahead of current time + new_end_time = datetime.datetime.now().replace( + second=0, microsecond=0 + ) + datetime.timedelta(minutes=5) + # Update the stored visit end time if the new one exceeds it + if session_entry.visit_end_time: + if new_end_time > session_entry.visit_end_time: + session_entry.visit_end_time = new_end_time + db.add(session_entry) + db.commit() + + # Send request to flush rsyncer + data: dict = {} + update_result: dict = {} machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] if isinstance(session_id, int): if machine_config.instrument_server_url: async with aiohttp.ClientSession() as clientsession: + # Send request to instrument server to update multigrid controller + async with clientsession.post( + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(session_entry.visit_end_time.isoformat())}", + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + ) as resp: + update_result = await resp.json() + if not update_result.get("success", False): + return {"success": False} + # Send request to flush the rsyncer async with clientsession.post( f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}", json={ From 7152a5e4c2263351afb627c5983cfc4b372b6b42 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 7 Jul 2025 17:53:16 +0100 Subject: [PATCH 4/6] The 'end_time' attribute of the RSyncer is modiifed by the instrument server endpoint, and not as part of the flush --- src/murfey/client/rsync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index b77659a4e..de97d96ee 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -220,7 +220,6 @@ def enqueue(self, file_path: Path): self.queue.put(absolute_path) def flush_skipped(self): - self._end_time = datetime.now() for f in self._skipped_files: self.queue.put(f) self._skipped_files = [] From 59a84c66970dbdeca3d5ae793126325092dc252b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 7 Jul 2025 17:58:13 +0100 Subject: [PATCH 5/6] Stored the time offset between the server and the client as an attribute in the MultigridController; convert the received server timestamp into the client's time when updating visit end times --- src/murfey/client/multigrid_control.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index c3b24a050..8d1a660b5 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -104,14 +104,18 @@ def __post_init__(self): register_client=False, ) + # Calculate the time offset between the client and the server + current_time = datetime.now() + server_timestamp = requests.get( + f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" + ).json()["timestamp"] + self.server_time_offset = current_time - datetime.fromtimestamp( + server_timestamp + ) + + # Store the visit end time in the current device's equivalent time if self.visit_end_time: - current_time = datetime.now() - server_timestamp = requests.get( - f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" - ).json()["timestamp"] - self.visit_end_time += current_time - datetime.fromtimestamp( - server_timestamp - ) + self.visit_end_time += self.server_time_offset def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False @@ -153,9 +157,10 @@ def finalise(self): self._finalise_rsyncer(p) def update_visit_time(self, new_end_time: datetime): - self.visit_end_time = new_end_time + # Convert the received server timestamp into the local equivalent + self.visit_end_time = new_end_time + self.server_time_offset for rp in self.rsync_processes.values(): - rp._end_time = new_end_time + rp._end_time = self.visit_end_time def _start_rsyncer_multigrid( self, From e1a9cf7561fa2508027ce942a2b22a3ae72dfbf9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 7 Jul 2025 18:15:22 +0100 Subject: [PATCH 6/6] Added a return message to the 'stop_multigrid_watcher' and 'update_multigrid_controller_visit_end_time' endpoints in the instrument server --- src/murfey/instrument_server/api.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 4a8a4b2bc..afd890255 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -212,6 +212,7 @@ def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True): @router.delete("/sessions/{session_id}/multigrid_watcher/{label}") def stop_multigrid_watcher(session_id: MurfeySessionID, label: str): watchers[label].request_stop() + return {"success": True} @router.post("/sessions/{session_id}/multigrid_controller/visit_end_time") @@ -219,6 +220,7 @@ def update_multigrid_controller_visit_end_time( session_id: MurfeySessionID, end_time: datetime ): controllers[session_id].update_visit_time(end_time) + return {"success": True} class RsyncerSource(BaseModel):