From 794ae7f42b4b772e803445d3533b1bbf26160b3c Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Tue, 13 May 2025 19:55:36 +0100 Subject: [PATCH 1/8] Add option to only transfer files created before a set end time plus a grace period --- src/murfey/client/rsync.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index ab2c267f6..2dbe205b9 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -13,6 +13,7 @@ import subprocess import threading import time +from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Awaitable, Callable, List, NamedTuple @@ -63,6 +64,8 @@ def __init__( remove_files: bool = False, required_substrings_for_removal: List[str] = [], notify: bool = True, + end_time: datetime | None = None, + grace_period: int = 0, ): super().__init__() self._basepath = basepath_local.absolute() @@ -76,6 +79,10 @@ def __init__( self._server_url = server_url self._notify = notify self._finalised = False + self._end_time = end_time + self._grace_period = grace_period + + self._skipped_files: List[Path] = [] # Set rsync destination if local: @@ -304,14 +311,25 @@ def _fake_transfer(self, files: list[Path]) -> bool: return True - def _transfer(self, files: list[Path]) -> bool: + def _transfer(self, infiles: list[Path]) -> bool: """ Transfer files via an rsync sub-process, and parses the rsync stdout to verify the success of the transfer. """ # Set up initial variables - files = [f for f in files if f.is_file()] + if self._end_time: + files = [ + f + for f in infiles + if f.is_file() + and f.stat().st_ctime + < (self._end_time + timedelta(seconds=self._grace_period)).timestamp() + ] + self._skipped_files.extend(set(infiles).difference(set(files))) + else: + files = [f for f in infiles if f.is_file()] + previously_transferred = self._files_transferred transfer_success: set[Path] = set() successful_updates: list[RSyncerUpdate] = [] From 7942a7f849819edc4a6c8808c3fdc733a0d617d3 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Tue, 13 May 2025 20:18:10 +0100 Subject: [PATCH 2/8] API setup to pass visit end times to multigrid controller for use with rsyncers --- src/murfey/client/multigrid_control.py | 4 ++++ src/murfey/instrument_server/api.py | 2 ++ src/murfey/server/api/instrument.py | 7 ++++--- src/murfey/util/config.py | 2 ++ src/murfey/util/db.py | 2 ++ src/murfey/util/instrument_models.py | 5 ++++- 6 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a9fc3309c..38e5983e2 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -48,6 +48,8 @@ class MultigridController: data_collection_parameters: dict = field(default_factory=lambda: {}) token: str = "" _machine_config: dict = field(default_factory=lambda: {}) + visit_end_time: Optional[datetime] = None + end_time_grace_period: int = 0 def __post_init__(self): if self.token: @@ -277,6 +279,8 @@ def _start_rsyncer( stop_callback=self._rsyncer_stopped, do_transfer=self.do_transfer, remove_files=remove_files, + end_time=self.visit_end_time, + grace_period=self.end_time_grace_period, ) def rsync_result(update: RSyncerUpdate): diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 7c5ba7c59..abc1ec95d 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -160,6 +160,8 @@ def setup_multigrid_watcher( token=tokens.get(session_id, "token"), data_collection_parameters=data_collection_parameters.get(label, {}), rsync_restarts=watcher_spec.rsync_restarts, + visit_end_time=watcher_spec.visit_end_time, + end_time_grace_period=watcher_spec.grace_period, ) watcher_spec.source.mkdir(exist_ok=True) machine_config = requests.get( diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 33744ccf2..e8a9d7203 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -98,9 +98,8 @@ async def setup_multigrid_watcher( session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db ): data = {} - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + session = db.exec(select(Session).where(Session.id == session_id)).one() + instrument_name = session.instrument_name machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -130,6 +129,8 @@ async def setup_multigrid_watcher( str(k): v for k, v in watcher_spec.destination_overrides.items() }, "rsync_restarts": watcher_spec.rsync_restarts, + "visit_end_time": session.visit_end_time, + "grace_period": machine_config.grace_period, }, headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 95f3544be..b47ee1537 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -71,6 +71,8 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore notifications_queue: str = "pato_notification" + grace_period: int = 0 + def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]: with open(config_file_path, "r") as config_stream: diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index c5c65b456..5334b6ad2 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -3,6 +3,7 @@ of the sessions that Murfey is overseeing, along with the relationships between them. """ +from datetime import datetime from typing import List, Optional import sqlalchemy @@ -48,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore started: bool = Field(default=False) current_gain_ref: str = Field(default="") instrument_name: str = Field(default="") + visit_end_time: Optional[datetime] = Field(default=None) # CLEM Workflow diff --git a/src/murfey/util/instrument_models.py b/src/murfey/util/instrument_models.py index 3da23c57c..775024e64 100644 --- a/src/murfey/util/instrument_models.py +++ b/src/murfey/util/instrument_models.py @@ -1,5 +1,6 @@ +from datetime import datetime from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional from pydantic import BaseModel @@ -15,3 +16,5 @@ class MultigridWatcherSpec(BaseModel): skip_existing_processing: bool = False destination_overrides: Dict[Path, str] = {} rsync_restarts: List[str] = [] + visit_end_time: Optional[datetime] = None + grace_period: int = 0 From 5e1dbbd081dd035609d22d7fbba1b65c4fcbbd59 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Tue, 13 May 2025 21:03:57 +0100 Subject: [PATCH 3/8] Shouldn't need a grace period, just include when setting the end time --- src/murfey/client/multigrid_control.py | 2 -- src/murfey/client/rsync.py | 8 ++------ src/murfey/server/api/instrument.py | 1 - src/murfey/util/config.py | 2 -- src/murfey/util/instrument_models.py | 1 - 5 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 38e5983e2..8ee50d212 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -49,7 +49,6 @@ class MultigridController: token: str = "" _machine_config: dict = field(default_factory=lambda: {}) visit_end_time: Optional[datetime] = None - end_time_grace_period: int = 0 def __post_init__(self): if self.token: @@ -280,7 +279,6 @@ def _start_rsyncer( do_transfer=self.do_transfer, remove_files=remove_files, end_time=self.visit_end_time, - grace_period=self.end_time_grace_period, ) def rsync_result(update: RSyncerUpdate): diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 2dbe205b9..4fb5b6345 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -13,7 +13,7 @@ import subprocess import threading import time -from datetime import datetime, timedelta +from datetime import datetime from enum import Enum from pathlib import Path from typing import Awaitable, Callable, List, NamedTuple @@ -65,7 +65,6 @@ def __init__( required_substrings_for_removal: List[str] = [], notify: bool = True, end_time: datetime | None = None, - grace_period: int = 0, ): super().__init__() self._basepath = basepath_local.absolute() @@ -80,7 +79,6 @@ def __init__( self._notify = notify self._finalised = False self._end_time = end_time - self._grace_period = grace_period self._skipped_files: List[Path] = [] @@ -322,9 +320,7 @@ def _transfer(self, infiles: list[Path]) -> bool: files = [ f for f in infiles - if f.is_file() - and f.stat().st_ctime - < (self._end_time + timedelta(seconds=self._grace_period)).timestamp() + if f.is_file() and f.stat().st_ctime < self._end_time.timestamp() ] self._skipped_files.extend(set(infiles).difference(set(files))) else: diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index e8a9d7203..527409a4b 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -130,7 +130,6 @@ async def setup_multigrid_watcher( }, "rsync_restarts": watcher_spec.rsync_restarts, "visit_end_time": session.visit_end_time, - "grace_period": machine_config.grace_period, }, headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index b47ee1537..95f3544be 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -71,8 +71,6 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore notifications_queue: str = "pato_notification" - grace_period: int = 0 - def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]: with open(config_file_path, "r") as config_stream: diff --git a/src/murfey/util/instrument_models.py b/src/murfey/util/instrument_models.py index 775024e64..ee45b5468 100644 --- a/src/murfey/util/instrument_models.py +++ b/src/murfey/util/instrument_models.py @@ -17,4 +17,3 @@ class MultigridWatcherSpec(BaseModel): destination_overrides: Dict[Path, str] = {} rsync_restarts: List[str] = [] visit_end_time: Optional[datetime] = None - grace_period: int = 0 From 2f55ae617a9d4ae2c81cfcfc14cd26cfb2d7d0b6 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Tue, 13 May 2025 21:44:51 +0100 Subject: [PATCH 4/8] Able to ad visit end time when creating session --- src/murfey/server/api/__init__.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index a833118da..8c00e2338 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1967,9 +1967,24 @@ async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_d return FileResponse(path=test_path) +class VisitEndTime(BaseModel): + end_time: Optional[datetime.datetime] = None + + @router.post("/instruments/{instrument_name}/visits/{visit}/session/{name}") -def create_session(instrument_name: str, visit: str, name: str, db=murfey_db) -> int: - s = Session(name=name, visit=visit, instrument_name=instrument_name) +def create_session( + instrument_name: str, + visit: str, + name: str, + visit_end_time: VisitEndTime, + db=murfey_db, +) -> int: + s = Session( + name=name, + visit=visit, + instrument_name=instrument_name, + visit_end_time=visit_end_time.end_time, + ) db.add(s) db.commit() sid = s.id From 02ccd47bffff32d55f6c020c311786dd06ccfb28 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Tue, 13 May 2025 21:51:28 +0100 Subject: [PATCH 5/8] Add method to flush skipped files from an rsyncer --- src/murfey/client/rsync.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 4fb5b6345..eac9ceec3 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -219,6 +219,10 @@ def enqueue(self, file_path: Path): absolute_path = self._basepath / file_path self.queue.put(absolute_path) + def flush_skipped(self): + for f in self._skipped_files: + self.queue.put(f) + def _process(self): logger.info("RSync thread starting") files_to_transfer: list[Path] From f6a030b2dd543af7af2478ae99eb38abe12e22f1 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Tue, 13 May 2025 22:33:21 +0100 Subject: [PATCH 6/8] Get number of skipped files when collecting rsyncer information --- src/murfey/instrument_server/api.py | 3 ++- src/murfey/server/api/instrument.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index abc1ec95d..14f1de9af 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -161,7 +161,6 @@ def setup_multigrid_watcher( data_collection_parameters=data_collection_parameters.get(label, {}), rsync_restarts=watcher_spec.rsync_restarts, visit_end_time=watcher_spec.visit_end_time, - end_time_grace_period=watcher_spec.grace_period, ) watcher_spec.source.mkdir(exist_ok=True) machine_config = requests.get( @@ -253,6 +252,7 @@ class ObserverInfo(BaseModel): num_files_in_queue: int alive: bool stopping: bool + num_files_skipped: int = 0 @router.get("/sessions/{session_id}/rsyncer_info") @@ -266,6 +266,7 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]: num_files_in_queue=v.queue.qsize(), alive=v.thread.is_alive(), stopping=v._stopping, + num_files_skipped=len(v._skipped_files), ) ) return info diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 527409a4b..74798b190 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -487,6 +487,7 @@ class RSyncerInfo(BaseModel): files_counted: int transferring: bool session_id: int + num_files_skipped: int = 0 @router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") @@ -564,6 +565,7 @@ async def get_rsyncer_info( files_counted=ri.files_counted, transferring=ri.transferring, session_id=session_id, + num_files_skipped=rsync_data.get("num_files_skipped", 0), ) ) return combined_data From 85b4f38503a5b96f0a2f63afc68b4f382fe14e24 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Wed, 14 May 2025 09:49:22 +0100 Subject: [PATCH 7/8] Correct time on instrument server against time on main server --- src/murfey/client/multigrid_control.py | 9 +++++++++ src/murfey/server/api/__init__.py | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 8ee50d212..a6f3e7961 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -100,6 +100,15 @@ def __post_init__(self): register_client=False, ) + if self.visit_end_time: + current_time = datetime.now() + server_timestamp = requests.get(f"{self.murfey_url}/time").json()[ + "timestamp" + ] + self.visit_end_time += ( + datetime.fromtimestamp(server_timestamp) - current_time + ) + def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False self.dormancy_check() diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 8c00e2338..924f557ee 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -128,6 +128,11 @@ async def root(request: Request): ) +@router.get("/time") +async def get_current_timestamp(): + return {"timestamp": datetime.datetime.now().timestamp()} + + @router.get("/health/") def health_check(db=murfey.server.ispyb.DB): conn = db.connection() From 9f1f963d2488dc6f135499c0211edefe380e2540 Mon Sep 17 00:00:00 2001 From: Dan Hatton Date: Wed, 14 May 2025 10:27:29 +0100 Subject: [PATCH 8/8] Wrong way around --- src/murfey/client/multigrid_control.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a6f3e7961..483eff6f7 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -105,8 +105,8 @@ def __post_init__(self): server_timestamp = requests.get(f"{self.murfey_url}/time").json()[ "timestamp" ] - self.visit_end_time += ( - datetime.fromtimestamp(server_timestamp) - current_time + self.visit_end_time += current_time - datetime.fromtimestamp( + server_timestamp ) def _multigrid_watcher_finalised(self):