Skip to content
11 changes: 11 additions & 0 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class MultigridController:
data_collection_parameters: dict = field(default_factory=lambda: {})
token: str = ""
_machine_config: dict = field(default_factory=lambda: {})
visit_end_time: Optional[datetime] = None

def __post_init__(self):
if self.token:
Expand Down Expand Up @@ -99,6 +100,15 @@ def __post_init__(self):
register_client=False,
)

if self.visit_end_time:
current_time = datetime.now()
server_timestamp = requests.get(f"{self.murfey_url}/time").json()[
"timestamp"
]
self.visit_end_time += current_time - datetime.fromtimestamp(
server_timestamp
)

def _multigrid_watcher_finalised(self):
self.multigrid_watcher_active = False
self.dormancy_check()
Expand Down Expand Up @@ -277,6 +287,7 @@ def _start_rsyncer(
stop_callback=self._rsyncer_stopped,
do_transfer=self.do_transfer,
remove_files=remove_files,
end_time=self.visit_end_time,
)

def rsync_result(update: RSyncerUpdate):
Expand Down
22 changes: 20 additions & 2 deletions src/murfey/client/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import subprocess
import threading
import time
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Awaitable, Callable, List, NamedTuple
Expand Down Expand Up @@ -63,6 +64,7 @@ def __init__(
remove_files: bool = False,
required_substrings_for_removal: List[str] = [],
notify: bool = True,
end_time: datetime | None = None,
):
super().__init__()
self._basepath = basepath_local.absolute()
Expand All @@ -76,6 +78,9 @@ def __init__(
self._server_url = server_url
self._notify = notify
self._finalised = False
self._end_time = end_time

self._skipped_files: List[Path] = []

# Set rsync destination
if local:
Expand Down Expand Up @@ -214,6 +219,10 @@ def enqueue(self, file_path: Path):
absolute_path = self._basepath / file_path
self.queue.put(absolute_path)

def flush_skipped(self):
for f in self._skipped_files:
self.queue.put(f)

def _process(self):
logger.info("RSync thread starting")
files_to_transfer: list[Path]
Expand Down Expand Up @@ -304,14 +313,23 @@ def _fake_transfer(self, files: list[Path]) -> bool:

return True

def _transfer(self, files: list[Path]) -> bool:
def _transfer(self, infiles: list[Path]) -> bool:
"""
Transfer files via an rsync sub-process, and parses the rsync stdout to verify
the success of the transfer.
"""

# Set up initial variables
files = [f for f in files if f.is_file()]
if self._end_time:
files = [
f
for f in infiles
if f.is_file() and f.stat().st_ctime < self._end_time.timestamp()
]
self._skipped_files.extend(set(infiles).difference(set(files)))
else:
files = [f for f in infiles if f.is_file()]

previously_transferred = self._files_transferred
transfer_success: set[Path] = set()
successful_updates: list[RSyncerUpdate] = []
Expand Down
3 changes: 3 additions & 0 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def setup_multigrid_watcher(
token=tokens.get(session_id, "token"),
data_collection_parameters=data_collection_parameters.get(label, {}),
rsync_restarts=watcher_spec.rsync_restarts,
visit_end_time=watcher_spec.visit_end_time,
)
watcher_spec.source.mkdir(exist_ok=True)
machine_config = requests.get(
Expand Down Expand Up @@ -251,6 +252,7 @@ class ObserverInfo(BaseModel):
num_files_in_queue: int
alive: bool
stopping: bool
num_files_skipped: int = 0


@router.get("/sessions/{session_id}/rsyncer_info")
Expand All @@ -264,6 +266,7 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]:
num_files_in_queue=v.queue.qsize(),
alive=v.thread.is_alive(),
stopping=v._stopping,
num_files_skipped=len(v._skipped_files),
)
)
return info
Expand Down
24 changes: 22 additions & 2 deletions src/murfey/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ async def root(request: Request):
)


@router.get("/time")
async def get_current_timestamp():
return {"timestamp": datetime.datetime.now().timestamp()}


@router.get("/health/")
def health_check(db=murfey.server.ispyb.DB):
conn = db.connection()
Expand Down Expand Up @@ -1967,9 +1972,24 @@ async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_d
return FileResponse(path=test_path)


class VisitEndTime(BaseModel):
end_time: Optional[datetime.datetime] = None


@router.post("/instruments/{instrument_name}/visits/{visit}/session/{name}")
def create_session(instrument_name: str, visit: str, name: str, db=murfey_db) -> int:
s = Session(name=name, visit=visit, instrument_name=instrument_name)
def create_session(
instrument_name: str,
visit: str,
name: str,
visit_end_time: VisitEndTime,
db=murfey_db,
) -> int:
s = Session(
name=name,
visit=visit,
instrument_name=instrument_name,
visit_end_time=visit_end_time.end_time,
)
db.add(s)
db.commit()
sid = s.id
Expand Down
8 changes: 5 additions & 3 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ async def setup_multigrid_watcher(
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db
):
data = {}
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
session = db.exec(select(Session).where(Session.id == session_id)).one()
instrument_name = session.instrument_name
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
Expand Down Expand Up @@ -130,6 +129,7 @@ async def setup_multigrid_watcher(
str(k): v for k, v in watcher_spec.destination_overrides.items()
},
"rsync_restarts": watcher_spec.rsync_restarts,
"visit_end_time": session.visit_end_time,
},
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
Expand Down Expand Up @@ -487,6 +487,7 @@ class RSyncerInfo(BaseModel):
files_counted: int
transferring: bool
session_id: int
num_files_skipped: int = 0


@router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info")
Expand Down Expand Up @@ -564,6 +565,7 @@ async def get_rsyncer_info(
files_counted=ri.files_counted,
transferring=ri.transferring,
session_id=session_id,
num_files_skipped=rsync_data.get("num_files_skipped", 0),
)
)
return combined_data
2 changes: 2 additions & 0 deletions src/murfey/util/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
of the sessions that Murfey is overseeing, along with the relationships between them.
"""

from datetime import datetime
from typing import List, Optional

import sqlalchemy
Expand Down Expand Up @@ -48,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore
started: bool = Field(default=False)
current_gain_ref: str = Field(default="")
instrument_name: str = Field(default="")
visit_end_time: Optional[datetime] = Field(default=None)

# CLEM Workflow

Expand Down
4 changes: 3 additions & 1 deletion src/murfey/util/instrument_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional

from pydantic import BaseModel

Expand All @@ -15,3 +16,4 @@ class MultigridWatcherSpec(BaseModel):
skip_existing_processing: bool = False
destination_overrides: Dict[Path, str] = {}
rsync_restarts: List[str] = []
visit_end_time: Optional[datetime] = None