diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a9fc3309c..483eff6f7 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -48,6 +48,7 @@ class MultigridController: data_collection_parameters: dict = field(default_factory=lambda: {}) token: str = "" _machine_config: dict = field(default_factory=lambda: {}) + visit_end_time: Optional[datetime] = None def __post_init__(self): if self.token: @@ -99,6 +100,15 @@ def __post_init__(self): register_client=False, ) + if self.visit_end_time: + current_time = datetime.now() + server_timestamp = requests.get(f"{self.murfey_url}/time").json()[ + "timestamp" + ] + self.visit_end_time += current_time - datetime.fromtimestamp( + server_timestamp + ) + def _multigrid_watcher_finalised(self): self.multigrid_watcher_active = False self.dormancy_check() @@ -277,6 +287,7 @@ def _start_rsyncer( stop_callback=self._rsyncer_stopped, do_transfer=self.do_transfer, remove_files=remove_files, + end_time=self.visit_end_time, ) def rsync_result(update: RSyncerUpdate): diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index ab2c267f6..eac9ceec3 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -13,6 +13,7 @@ import subprocess import threading import time +from datetime import datetime from enum import Enum from pathlib import Path from typing import Awaitable, Callable, List, NamedTuple @@ -63,6 +64,7 @@ def __init__( remove_files: bool = False, required_substrings_for_removal: List[str] = [], notify: bool = True, + end_time: datetime | None = None, ): super().__init__() self._basepath = basepath_local.absolute() @@ -76,6 +78,9 @@ def __init__( self._server_url = server_url self._notify = notify self._finalised = False + self._end_time = end_time + + self._skipped_files: List[Path] = [] # Set rsync destination if local: @@ -214,6 +219,10 @@ def enqueue(self, file_path: Path): absolute_path = self._basepath / file_path self.queue.put(absolute_path) + def flush_skipped(self): + for f in self._skipped_files: + self.queue.put(f) + def _process(self): logger.info("RSync thread starting") files_to_transfer: list[Path] @@ -304,14 +313,23 @@ def _fake_transfer(self, files: list[Path]) -> bool: return True - def _transfer(self, files: list[Path]) -> bool: + def _transfer(self, infiles: list[Path]) -> bool: """ Transfer files via an rsync sub-process, and parses the rsync stdout to verify the success of the transfer. """ # Set up initial variables - files = [f for f in files if f.is_file()] + if self._end_time: + files = [ + f + for f in infiles + if f.is_file() and f.stat().st_ctime < self._end_time.timestamp() + ] + self._skipped_files.extend(set(infiles).difference(set(files))) + else: + files = [f for f in infiles if f.is_file()] + previously_transferred = self._files_transferred transfer_success: set[Path] = set() successful_updates: list[RSyncerUpdate] = [] diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 7c5ba7c59..14f1de9af 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -160,6 +160,7 @@ def setup_multigrid_watcher( token=tokens.get(session_id, "token"), data_collection_parameters=data_collection_parameters.get(label, {}), rsync_restarts=watcher_spec.rsync_restarts, + visit_end_time=watcher_spec.visit_end_time, ) watcher_spec.source.mkdir(exist_ok=True) machine_config = requests.get( @@ -251,6 +252,7 @@ class ObserverInfo(BaseModel): num_files_in_queue: int alive: bool stopping: bool + num_files_skipped: int = 0 @router.get("/sessions/{session_id}/rsyncer_info") @@ -264,6 +266,7 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]: num_files_in_queue=v.queue.qsize(), alive=v.thread.is_alive(), stopping=v._stopping, + num_files_skipped=len(v._skipped_files), ) ) return info diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index a833118da..924f557ee 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -128,6 +128,11 @@ async def root(request: Request): ) +@router.get("/time") +async def get_current_timestamp(): + return {"timestamp": datetime.datetime.now().timestamp()} + + @router.get("/health/") def health_check(db=murfey.server.ispyb.DB): conn = db.connection() @@ -1967,9 +1972,24 @@ async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_d return FileResponse(path=test_path) +class VisitEndTime(BaseModel): + end_time: Optional[datetime.datetime] = None + + @router.post("/instruments/{instrument_name}/visits/{visit}/session/{name}") -def create_session(instrument_name: str, visit: str, name: str, db=murfey_db) -> int: - s = Session(name=name, visit=visit, instrument_name=instrument_name) +def create_session( + instrument_name: str, + visit: str, + name: str, + visit_end_time: VisitEndTime, + db=murfey_db, +) -> int: + s = Session( + name=name, + visit=visit, + instrument_name=instrument_name, + visit_end_time=visit_end_time.end_time, + ) db.add(s) db.commit() sid = s.id diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 33744ccf2..74798b190 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -98,9 +98,8 @@ async def setup_multigrid_watcher( session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db ): data = {} - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + session = db.exec(select(Session).where(Session.id == session_id)).one() + instrument_name = session.instrument_name machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -130,6 +129,7 @@ async def setup_multigrid_watcher( str(k): v for k, v in watcher_spec.destination_overrides.items() }, "rsync_restarts": watcher_spec.rsync_restarts, + "visit_end_time": session.visit_end_time, }, headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" @@ -487,6 +487,7 @@ class RSyncerInfo(BaseModel): files_counted: int transferring: bool session_id: int + num_files_skipped: int = 0 @router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info") @@ -564,6 +565,7 @@ async def get_rsyncer_info( files_counted=ri.files_counted, transferring=ri.transferring, session_id=session_id, + num_files_skipped=rsync_data.get("num_files_skipped", 0), ) ) return combined_data diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index c5c65b456..5334b6ad2 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -3,6 +3,7 @@ of the sessions that Murfey is overseeing, along with the relationships between them. """ +from datetime import datetime from typing import List, Optional import sqlalchemy @@ -48,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore started: bool = Field(default=False) current_gain_ref: str = Field(default="") instrument_name: str = Field(default="") + visit_end_time: Optional[datetime] = Field(default=None) # CLEM Workflow diff --git a/src/murfey/util/instrument_models.py b/src/murfey/util/instrument_models.py index 3da23c57c..ee45b5468 100644 --- a/src/murfey/util/instrument_models.py +++ b/src/murfey/util/instrument_models.py @@ -1,5 +1,6 @@ +from datetime import datetime from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional from pydantic import BaseModel @@ -15,3 +16,4 @@ class MultigridWatcherSpec(BaseModel): skip_existing_processing: bool = False destination_overrides: Dict[Path, str] = {} rsync_restarts: List[str] = [] + visit_end_time: Optional[datetime] = None