diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 8d4765e9a..5fbc4bd26 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -15,6 +15,7 @@ from typing import Type from murfey.client.context import Context +from murfey.client.contexts.atlas import AtlasContext from murfey.client.contexts.clem import CLEMContext from murfey.client.contexts.spa import SPAModularContext from murfey.client.contexts.spa_metadata import SPAMetadataContext @@ -135,7 +136,7 @@ def _find_context(self, file_path: Path) -> bool: # Tomography and SPA workflow checks if "atlas" in file_path.parts: - self._context = SPAMetadataContext("epu", self._basepath) + self._context = AtlasContext("epu", self._basepath) return True if "Metadata" in file_path.parts or file_path.name == "EpuSession.dm": @@ -266,7 +267,7 @@ def _analyse(self): ) except Exception as e: logger.error(f"Exception encountered: {e}") - if "atlas" not in transferred_file.parts: + if not isinstance(self._context, AtlasContext): if not dc_metadata: try: dc_metadata = self._context.gather_metadata( @@ -308,6 +309,10 @@ def _analyse(self): ) self.post_transfer(transferred_file) + elif isinstance(self._context, AtlasContext): + logger.debug(f"File {transferred_file.name!r} is part of the atlas") + self.post_transfer(transferred_file) + # Handle files with tomography and SPA context differently elif not self._extension or self._unseen_xml: valid_extension = self._find_extension(transferred_file) @@ -325,36 +330,35 @@ def _analyse(self): ) except Exception as e: logger.error(f"Exception encountered: {e}") - if "atlas" not in transferred_file.parts: - if not dc_metadata: - try: - dc_metadata = self._context.gather_metadata( - mdoc_for_reading - or self._xml_file(transferred_file), - environment=self._environment, - ) - except KeyError as e: - logger.error( - f"Metadata gathering failed with a key error for key: {e.args[0]}" - ) - raise e - if not dc_metadata or not self._force_mdoc_metadata: - mdoc_for_reading = None - self._unseen_xml.append(transferred_file) - if dc_metadata: - self._unseen_xml = [] - if dc_metadata.get("file_extension"): - self._extension = dc_metadata["file_extension"] - else: - dc_metadata["file_extension"] = self._extension - dc_metadata["acquisition_software"] = ( - self._context._acquisition_software + if not dc_metadata: + try: + dc_metadata = self._context.gather_metadata( + mdoc_for_reading + or self._xml_file(transferred_file), + environment=self._environment, ) - self.notify( - { - "form": dc_metadata, - } + except KeyError as e: + logger.error( + f"Metadata gathering failed with a key error for key: {e.args[0]}" ) + raise e + if not dc_metadata or not self._force_mdoc_metadata: + mdoc_for_reading = None + self._unseen_xml.append(transferred_file) + if dc_metadata: + self._unseen_xml = [] + if dc_metadata.get("file_extension"): + self._extension = dc_metadata["file_extension"] + else: + dc_metadata["file_extension"] = self._extension + dc_metadata["acquisition_software"] = ( + self._context._acquisition_software + ) + self.notify( + { + "form": dc_metadata, + } + ) elif isinstance( self._context, ( diff --git a/src/murfey/client/contexts/atlas.py b/src/murfey/client/contexts/atlas.py new file mode 100644 index 000000000..0dcac02bb --- /dev/null +++ b/src/murfey/client/contexts/atlas.py @@ -0,0 +1,52 @@ +import logging +from pathlib import Path +from typing import Optional + +import requests + +from murfey.client.context import Context +from murfey.client.contexts.spa import _get_source +from murfey.client.contexts.spa_metadata import _atlas_destination +from murfey.client.instance_environment import MurfeyInstanceEnvironment +from murfey.util.api import url_path_for +from murfey.util.client import authorised_requests, capture_post + +logger = logging.getLogger("murfey.client.contexts.atlas") + +requests.get, requests.post, requests.put, requests.delete = authorised_requests() + + +class AtlasContext(Context): + def __init__(self, acquisition_software: str, basepath: Path): + super().__init__("Atlas", acquisition_software) + self._basepath = basepath + + def post_transfer( + self, + transferred_file: Path, + environment: Optional[MurfeyInstanceEnvironment] = None, + **kwargs, + ): + super().post_transfer( + transferred_file=transferred_file, + environment=environment, + **kwargs, + ) + + if ( + environment + and "Atlas_" in transferred_file.stem + and transferred_file.suffix == ".mrc" + ): + source = _get_source(transferred_file, environment) + if source: + transferred_atlas_name = _atlas_destination( + environment, source, transferred_file + ) / transferred_file.relative_to(source.parent) + capture_post( + f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'make_atlas_jpg', session_id=environment.murfey_session)}", + json={"path": str(transferred_atlas_name)}, + ) + logger.info( + f"Submitted request to create JPG image of atlas {str(transferred_atlas_name)!r}" + ) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 44b91261a..931126d42 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -292,7 +292,7 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): requests.delete(remove_url) else: stop_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_stopped_rsyncer', session_id=self.session_id)}" - capture_post(stop_url, json={"source": str(source)}) + capture_post(stop_url, json={"path": str(source)}) def _finalise_rsyncer(self, source: Path): """ @@ -312,7 +312,7 @@ def _finalise_rsyncer(self, source: Path): def _restart_rsyncer(self, source: Path): self.rsync_processes[source].restart() restarted_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_restarted_rsyncer', session_id=self.session_id)}" - capture_post(restarted_url, json={"source": str(source)}) + capture_post(restarted_url, json={"path": str(source)}) def _request_watcher_stop(self, source: Path): self._environment.watchers[source]._stopping = True diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index 45b282eaf..f8966a5f2 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -54,6 +54,7 @@ SearchMapParameters, Visit, ) +from murfey.workflows.spa.atlas import atlas_jpg_from_mrc from murfey.workflows.spa.flush_spa_preprocess import ( register_foil_hole as _register_foil_hole, ) @@ -262,18 +263,18 @@ def get_rsyncers_for_session(session_id: MurfeySessionID, db=murfey_db): return rsync_instances.all() -class RsyncerSource(BaseModel): - source: str +class StringOfPathModel(BaseModel): + path: str @router.post("/sessions/{session_id}/rsyncer_stopped") def register_stopped_rsyncer( - session_id: int, rsyncer_source: RsyncerSource, db=murfey_db + session_id: int, rsyncer_source: StringOfPathModel, db=murfey_db ): rsyncer = db.exec( select(RsyncInstance) .where(RsyncInstance.session_id == session_id) - .where(RsyncInstance.source == rsyncer_source.source) + .where(RsyncInstance.source == rsyncer_source.path) ).one() rsyncer.transferring = False db.add(rsyncer) @@ -282,12 +283,12 @@ def register_stopped_rsyncer( @router.post("/sessions/{session_id}/rsyncer_started") def register_restarted_rsyncer( - session_id: int, rsyncer_source: RsyncerSource, db=murfey_db + session_id: int, rsyncer_source: StringOfPathModel, db=murfey_db ): rsyncer = db.exec( select(RsyncInstance) .where(RsyncInstance.session_id == session_id) - .where(RsyncInstance.source == rsyncer_source.source) + .where(RsyncInstance.source == rsyncer_source.path) ).one() rsyncer.transferring = True db.add(rsyncer) @@ -347,6 +348,19 @@ def get_foil_hole( return _get_foil_hole(session_id, fh_name, db) +@spa_router.post("/sessions/{session_id}/make_atlas_jpg") +def make_atlas_jpg( + session_id: MurfeySessionID, atlas_mrc: StringOfPathModel, db=murfey_db +): + logger.debug( + f"Received request to create JPG image of atlas {sanitise(atlas_mrc.path)!r}" + ) + session = db.exec(select(Session).where(Session.id == session_id)).one() + return atlas_jpg_from_mrc( + session.instrument_name, session.visit, Path(atlas_mrc.path) + ) + + @spa_router.post("/sessions/{session_id}/grid_square/{gsid}") def register_grid_square( session_id: MurfeySessionID, diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index 615aaf5ce..237061d53 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -814,6 +814,11 @@ murfey.server.api.session_control.spa_router: type: int methods: - GET + - path: /session_control/spa/sessions/{session_id}/make_atlas_jpg + function: make_atlas_jpg + path_params: [] + methods: + - POST - path: /session_control/spa/sessions/{session_id}/grid_square/{gsid} function: register_grid_square path_params: diff --git a/src/murfey/workflows/spa/atlas.py b/src/murfey/workflows/spa/atlas.py new file mode 100644 index 000000000..12a761fe7 --- /dev/null +++ b/src/murfey/workflows/spa/atlas.py @@ -0,0 +1,43 @@ +import logging +from pathlib import Path + +import mrcfile +import PIL.Image +from werkzeug.utils import secure_filename + +from murfey.util import sanitise +from murfey.util.config import get_machine_config + +logger = logging.getLogger("murfey.workflows.spa.atlas") + + +def atlas_jpg_from_mrc(instrument_name: str, visit_name: str, atlas_mrc: Path): + logger.debug( + f"Starting workflow to create JPG image of atlas {sanitise(str(atlas_mrc))!r}" + ) + with mrcfile.open(atlas_mrc) as mrc: + data = mrc.data + + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + + parts = [secure_filename(p) for p in atlas_mrc.parts] + visit_idx = parts.index(visit_name) + core = Path("/".join(parts[: visit_idx + 1])) + sample_id = "Sample" + for p in parts: + if "Sample" in p: + sample_id = p + break + atlas_jpg_file = ( + core + / machine_config.processed_directory_name + / "atlas" + / secure_filename(f"{sample_id}_{atlas_mrc.stem}_fullres.jpg") + ) + atlas_jpg_file.parent.mkdir(parents=True, exist_ok=True) + + im = PIL.Image.fromarray(data) + im.convert(mode="L").save(atlas_jpg_file) + logger.debug(f"JPG image of atlas saved as {str(atlas_jpg_file)!r}") diff --git a/tests/client/test_analyser.py b/tests/client/test_analyser.py index 175916aa1..0e24f0b0d 100644 --- a/tests/client/test_analyser.py +++ b/tests/client/test_analyser.py @@ -3,6 +3,7 @@ import pytest from murfey.client.analyser import Analyser +from murfey.client.contexts.atlas import AtlasContext from murfey.client.contexts.clem import CLEMContext from murfey.client.contexts.spa import SPAModularContext from murfey.client.contexts.spa_metadata import SPAMetadataContext @@ -28,7 +29,7 @@ ["visit/FoilHole_01234_fractions.tiff", SPAModularContext], ["visit/FoilHole_01234_EER.eer", SPAModularContext], # SPA metadata - ["atlas/atlas.mrc", SPAMetadataContext], + ["atlas/atlas.mrc", AtlasContext], ["visit/EpuSession.dm", SPAMetadataContext], ["visit/Metadata/GridSquare.dm", SPAMetadataContext], # CLEM LIF file diff --git a/tests/server/api/test_session_control.py b/tests/server/api/test_session_control.py new file mode 100644 index 000000000..abf63285b --- /dev/null +++ b/tests/server/api/test_session_control.py @@ -0,0 +1,76 @@ +from pathlib import Path +from unittest import mock +from unittest.mock import MagicMock + +from fastapi import FastAPI +from fastapi.testclient import TestClient +from pytest_mock import MockerFixture + +from murfey.server.api.auth import ( + validate_instrument_server_session_access, + validate_instrument_token, +) +from murfey.server.api.session_control import spa_router +from murfey.server.murfey_db import murfey_db_session +from murfey.util.api import url_path_for + + +def test_make_atlas_jpg(mocker: MockerFixture, tmp_path: Path): + # Set up the objects to mock + instrument_name = "test" + visit_name = "test_visit" + session_id = 1 + + # Override the database session generator + mock_session = MagicMock() + mock_session.instrument_name = instrument_name + mock_session.visit = visit_name + mock_query_result = MagicMock() + mock_query_result.one.return_value = mock_session + mock_db_session = MagicMock() + mock_db_session.exec.return_value = mock_query_result + + def mock_get_db_session(): + yield mock_db_session + + # Mock the instrument server tokens dictionary + mock_tokens = mocker.patch( + "murfey.server.api.instrument.instrument_server_tokens", + {session_id: {"access_token": mock.sentinel}}, + ) + + # Mock the called workflow function + mock_atlas_jpg = mocker.patch( + "murfey.server.api.session_control.atlas_jpg_from_mrc", + return_value=None, + ) + + # Set up the test file + image_dir = tmp_path / instrument_name / "data" / visit_name / "Atlas" + image_dir.mkdir(parents=True, exist_ok=True) + test_file = image_dir / "Atlas1.mrc" + + # Set up the backend server + backend_app = FastAPI() + + # Override validation and database dependencies + backend_app.dependency_overrides[validate_instrument_token] = lambda: None + backend_app.dependency_overrides[validate_instrument_server_session_access] = ( + lambda: session_id + ) + backend_app.dependency_overrides[murfey_db_session] = mock_get_db_session + backend_app.include_router(spa_router) + backend_server = TestClient(backend_app) + + atlas_jpg_url = url_path_for( + "api.session_control.spa_router", "make_atlas_jpg", session_id=session_id + ) + response = backend_server.post( + atlas_jpg_url, + json={"path": str(test_file)}, + headers={"Authorization": f"Bearer {mock_tokens[session_id]['access_token']}"}, + ) + + # Check that the expected calls were made + mock_atlas_jpg.assert_called_once_with(instrument_name, visit_name, test_file) + assert response.status_code == 200 diff --git a/tests/workflows/spa/test_atlas_workflow.py b/tests/workflows/spa/test_atlas_workflow.py new file mode 100644 index 000000000..1b1ea5e0c --- /dev/null +++ b/tests/workflows/spa/test_atlas_workflow.py @@ -0,0 +1,69 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import numpy as np +import pytest +from pytest_mock import MockerFixture +from werkzeug.utils import secure_filename + +from murfey.workflows.spa.atlas import atlas_jpg_from_mrc + +atlas_jpg_from_mrc_test_matrix = ( + ("Atlas1.mrc",), + ("Sample1/Atlas1.mrc",), +) + + +@pytest.mark.parametrize("test_params", atlas_jpg_from_mrc_test_matrix) +def test_atlas_jpg_from_mrc( + mocker: MockerFixture, tmp_path: Path, test_params: tuple[str] +): + # Unpack test params + (file_name_stub,) = test_params + + # Set up mock session params + visit_name = "test_visit" + instrument_name = "test" + processed_dir_name = "processed" + + # Create a 16-bit grayscale image + shape = (64, 64) + test_data = np.ones(shape).astype("uint16") + + # Mock out the data returned from openning the file + mock_mrcfile = mocker.patch("murfey.workflows.spa.atlas.mrcfile") + mock_mrc = MagicMock() + mock_mrc.data = test_data + mock_mrcfile.open.return_value.__enter__.return_value = mock_mrc + + # Mock the return result of 'get_machine_config()' + mock_machine_config = MagicMock() + mock_machine_config.processed_directory_name = processed_dir_name + mocker.patch( + "murfey.workflows.spa.atlas.get_machine_config", + return_value={"test": mock_machine_config}, + ) + + # Create a test file + test_file = ( + tmp_path / instrument_name / "data" / visit_name / "atlas" / file_name_stub + ) + test_file.parent.mkdir(parents=True, exist_ok=True) + test_file.touch(exist_ok=True) + + # Create the expected destination directory and file + processed_dir = ( + tmp_path / instrument_name / "data" / visit_name / processed_dir_name / "atlas" + ) + sample_id = "Sample" + for part in file_name_stub.split("/"): + if part.startswith("Sample"): + sample_id = part + break + processed_file_name = processed_dir / secure_filename( + f"{sample_id}_{test_file.stem}_fullres.jpg" + ) + + # Run the function and check that the expected calls are made + atlas_jpg_from_mrc(instrument_name, visit_name, test_file) + assert processed_file_name.exists()