Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@
tag: str = "",
limited: bool = False,
):
log.info(f"starting multigrid rsyncer: {source}")
log.info(f"Starting multigrid rsyncer: {source}")
log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}")

Check warning on line 164 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L163-L164

Added lines #L163 - L164 were not covered by tests
destination_overrides = destination_overrides or {}
machine_data = requests.get(
f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine"
Expand Down
38 changes: 25 additions & 13 deletions src/murfey/client/watchdir_multigrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
):
super().__init__()
self._basepath = Path(path)
self._skip_existing_processing = skip_existing_processing
self._seen_dirs: List[Path] = []
self._stopping = False
self._machine_config = machine_config
self._seen_dirs: List[Path] = []

Check warning on line 25 in src/murfey/client/watchdir_multigrid.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir_multigrid.py#L25

Added line #L25 was not covered by tests
self.thread = threading.Thread(
name=f"MultigridDirWatcher {self._basepath}",
target=self._process,
daemon=True,
)
# Toggleable settings
self._analyse = True
self._skip_existing_processing = skip_existing_processing
self._stopping = False

Check warning on line 34 in src/murfey/client/watchdir_multigrid.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir_multigrid.py#L32-L34

Added lines #L32 - L34 were not covered by tests

def start(self):
if self.thread.is_alive():
Expand Down Expand Up @@ -60,8 +62,14 @@
include_mid_path=False,
use_suggested_path=False,
analyse=(
d.name
in self._machine_config["analyse_created_directories"]
(
d.name
in self._machine_config[
"analyse_created_directories"
]
)
if self._analyse
else False
),
tag="atlas",
)
Expand All @@ -72,23 +80,25 @@
d,
extra_directory=f"metadata_{d.name}",
include_mid_path=False,
analyse=True, # not (first_loop and self._skip_existing_processing),
analyse=self._analyse,
limited=True,
tag="metadata",
)
self._seen_dirs.append(d)
processing_started = False
for d02 in (d.parent.parent / d.name).glob("Images-Disc*"):
if d02 not in self._seen_dirs:
# if skip exisiting processing is set then do not process for any
# data directories found on the first loop
# this allows you to avoid triggering processing again if murfey is restarted
# If 'skip_existing_processing' is set, do not process for
# any data directories found on the first loop.
# This allows you to avoid triggering processing again if Murfey is restarted
self.notify(
d02,
include_mid_path=False,
remove_files=True,
analyse=not (
first_loop and self._skip_existing_processing
analyse=(
not (first_loop and self._skip_existing_processing)
if self._analyse
else False
),
tag="fractions",
)
Expand All @@ -104,8 +114,10 @@
self.notify(
d02,
include_mid_path=False,
analyse=not (
first_loop and self._skip_existing_processing
analyse=(
not (first_loop and self._skip_existing_processing)
if self._analyse
else False
),
tag="fractions",
)
Expand Down
14 changes: 8 additions & 6 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from functools import partial
from logging import getLogger
from pathlib import Path
from typing import Annotated, Any, Dict, List, Optional, Union
from typing import Annotated, Any, Optional
from urllib.parse import urlparse

import requests
Expand All @@ -27,9 +27,9 @@

logger = getLogger("murfey.instrument_server.api")

watchers: Dict[Union[str, int], MultigridDirWatcher] = {}
rsyncers: Dict[str, RSyncer] = {}
controllers: Dict[int, MultigridController] = {}
watchers: dict[str | int, MultigridDirWatcher] = {}
rsyncers: dict[str, RSyncer] = {}
controllers: dict[int, MultigridController] = {}
data_collection_parameters: dict = {}
tokens = {}

Expand Down Expand Up @@ -187,9 +187,11 @@


@router.post("/sessions/{session_id}/start_multigrid_watcher")
def start_multigrid_watcher(session_id: MurfeySessionID):
def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True):
if watchers.get(session_id) is None:
return {"success": False}
if not process:
watchers[session_id]._analyse = False

Check warning on line 194 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L194

Added line #L194 was not covered by tests
watchers[session_id].start()
return {"success": True}

Expand Down Expand Up @@ -322,7 +324,7 @@
)
def get_possible_gain_references(
instrument_name: str, session_id: MurfeySessionID
) -> List[File]:
) -> list[File]:
machine_config = requests.get(
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(instrument_name)}/machine",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
Expand Down
32 changes: 26 additions & 6 deletions src/murfey/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,22 @@
def increment_rsync_file_count(
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
):
rsync_instance = db.exec(
select(RsyncInstance).where(
RsyncInstance.source == rsyncer_info.source,
RsyncInstance.destination == rsyncer_info.destination,
RsyncInstance.session_id == rsyncer_info.session_id,
try:
rsync_instance = db.exec(

Check warning on line 376 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L375-L376

Added lines #L375 - L376 were not covered by tests
select(RsyncInstance).where(
RsyncInstance.source == rsyncer_info.source,
RsyncInstance.destination == rsyncer_info.destination,
RsyncInstance.session_id == rsyncer_info.session_id,
)
).one()
except Exception:
log.error(

Check warning on line 384 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L383-L384

Added lines #L383 - L384 were not covered by tests
f"Failed to find rsync instance for visit {sanitise(visit_name)} "
"with the following properties: \n"
f"{rsyncer_info.dict()}",
exc_info=True,
)
).one()
return None

Check warning on line 390 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L390

Added line #L390 was not covered by tests
rsync_instance.files_counted += rsyncer_info.increment_count
db.add(rsync_instance)
db.commit()
Expand Down Expand Up @@ -1996,6 +2005,17 @@
return sid


@router.post("/sessions/{session_id}")
def update_session(
session_id: MurfeySessionID, process: bool = True, db=murfey_db
) -> None:
session = db.exec(select(Session).where(Session.id == session_id)).one()
session.process = process
db.add(session)
db.commit()
return None

Check warning on line 2016 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L2012-L2016

Added lines #L2012 - L2016 were not covered by tests


@router.put("/sessions/{session_id}/current_gain_ref")
def update_current_gain_ref(
session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db=murfey_db
Expand Down
13 changes: 9 additions & 4 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,26 @@
@router.post("/sessions/{session_id}/start_multigrid_watcher")
async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db):
data = {}
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
session = db.exec(select(Session).where(Session.id == session_id)).one()
process = session.process
instrument_name = session.instrument_name

Check warning on line 147 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L145-L147

Added lines #L145 - L147 were not covered by tests
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if machine_config.instrument_server_url:
log.debug(

Check warning on line 152 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L152

Added line #L152 was not covered by tests
f"Submitting request to start multigrid watcher for session {session_id} "
f"with processing {('enabled' if process else 'disabled')}"
)
async with aiohttp.ClientSession() as clientsession:
async with clientsession.post(
f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher",
f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher?process={'true' if process else 'false'}",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
) as resp:
data = await resp.json()
log.debug(f"Received response: {data}")

Check warning on line 164 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L164

Added line #L164 was not covered by tests
return data


Expand Down
16 changes: 10 additions & 6 deletions src/murfey/server/api/processing_parameters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from logging import getLogger
from typing import Optional

import sqlalchemy
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlmodel import Session, select
Expand All @@ -24,12 +25,15 @@
@router.get("/sessions/{session_id}/session_processing_parameters")
def get_session_processing_parameters(
session_id: MurfeySessionID, db: Session = murfey_db
) -> EditableSessionProcessingParameters:
proc_params = db.exec(
select(SessionProcessingParameters).where(
SessionProcessingParameters.session_id == session_id
)
).one()
) -> Optional[EditableSessionProcessingParameters]:
try:
proc_params = db.exec(

Check warning on line 30 in src/murfey/server/api/processing_parameters.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/processing_parameters.py#L29-L30

Added lines #L29 - L30 were not covered by tests
select(SessionProcessingParameters).where(
SessionProcessingParameters.session_id == session_id
)
).one()
except sqlalchemy.exc.NoResultFound:
return None

Check warning on line 36 in src/murfey/server/api/processing_parameters.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/processing_parameters.py#L35-L36

Added lines #L35 - L36 were not covered by tests
return EditableSessionProcessingParameters(
gain_ref=proc_params.gain_ref,
dose_per_frame=proc_params.dose_per_frame,
Expand Down
1 change: 1 addition & 0 deletions src/murfey/util/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore
started: bool = Field(default=False)
current_gain_ref: str = Field(default="")
instrument_name: str = Field(default="")
process: bool = Field(default=True)
visit_end_time: Optional[datetime] = Field(default=None)

# CLEM Workflow
Expand Down