diff --git a/src/murfey/client/tui/screens.py b/src/murfey/client/tui/screens.py index 21af41dd0..a30261cad 100644 --- a/src/murfey/client/tui/screens.py +++ b/src/murfey/client/tui/screens.py @@ -685,15 +685,23 @@ def on_button_pressed(self, event: Button.Pressed): self.app.push_screen("launcher") if machine_data.get("upstream_data_directories"): - upstream_downloads = capture_get( + upstream_downloads: dict[str, dict[str, Path]] = capture_get( base_url=str(self.app._environment.url.geturl()), router_name="session_control.correlative_router", function_name="find_upstream_visits", token=token, session_id=self.app._environment.murfey_session, ).json() + # Pass flattened dict for backwards compatibility self.app.install_screen( - UpstreamDownloads(upstream_downloads), "upstream-downloads" + UpstreamDownloads( + { + visit_name: visit_dir + for _, upstream_visits in upstream_downloads.items() + for visit_name, visit_dir in upstream_visits.items() + } + ), + "upstream-downloads", ) self.app.push_screen("upstream-downloads") @@ -759,15 +767,23 @@ def on_button_pressed(self, event: Button.Pressed): self.app.push_screen("directory-select") if machine_data.get("upstream_data_directories"): - upstream_downloads = capture_get( + upstream_downloads: dict[str, dict[str, Path]] = capture_get( base_url=str(self.app._environment.url.geturl()), router_name="session_control.correlative_router", function_name="find_upstream_visits", token=token, session_id=self.app._environment.murfey_session, ).json() + # Pass a flattened dict for backwards compatibility self.app.install_screen( - UpstreamDownloads(upstream_downloads), "upstream-downloads" + UpstreamDownloads( + { + visit_name: visit_dir + for _, upstream_visits in upstream_downloads.items() + for visit_name, visit_dir in upstream_visits.items() + } + ), + "upstream-downloads", ) self.app.push_screen("upstream-downloads") @@ -817,7 +833,7 @@ def on_button_pressed(self, event: Button.Pressed): stream_response = capture_get( base_url=str(self.app._environment.url.geturl()), router_name="session_control.correlative_router", - function_name="get_tiff", + function_name="get_tiff_file", token=token, visit_name=event.button.label, session_id=self.app._environment.murfey_session, diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 8209bf582..3e4adafb5 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -469,13 +469,91 @@ def upload_gain_reference( return {"success": True} -class UpstreamTiffInfo(BaseModel): +class UpstreamFileDownloadInfo(BaseModel): + download_dir: Path + upstream_instrument: str + upstream_visit_path: Path + + +@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request") +def gather_upstream_files( + visit_name: str, + session_id: MurfeySessionID, + upstream_file_download: UpstreamFileDownloadInfo, +): + """ + Instrument server endpoint that will query the backend for files in the chosen + visit directory + """ + # Check for forbidden characters + if any(c in visit_name for c in ("/", "\\", ":", ";")): + logger.error( + f"Forbidden characters are present in visit name {sanitise(visit_name)}" + ) + return { + "succss": False, + "detail": "Forbidden characters present in visit name", + } + + # Sanitise inputs + download_dir = secure_path(upstream_file_download.download_dir) + upstream_instrument = sanitise(upstream_file_download.upstream_instrument) + upstream_visit_path = secure_path(upstream_file_download.upstream_visit_path) + + # Get the list of files to download + murfey_url = urlparse(_get_murfey_url(), allow_fragments=False) + sanitised_visit_name = sanitise_nonpath(visit_name) + url_path = url_path_for( + "session_control.correlative_router", + "gather_upstream_files", + session_id=session_id, + visit_name=sanitised_visit_name, + ) + upstream_files: list[str] = requests.get( + f"{murfey_url.geturl()}{url_path}", + headers={"Authorization": f"Bearer {tokens[session_id]}"}, + json={ + "upstream_instrument": upstream_instrument, + "upstream_visit_path": str(upstream_visit_path), + }, + ).json() + + # Make the download directory and download gathered files + download_dir.mkdir(exist_ok=True) + for upstream_file in upstream_files: + url_path = url_path_for( + "session_control.correlative_router", + "get_upstream_file", + session_id=session_id, + visit_name=sanitised_visit_name, + upstream_file_path=upstream_file, + ) + file_data = requests.get( + f"{murfey_url.geturl()}{url_path}", + headers={"Authorization": f"Bearer {tokens[session_id]}"}, + stream=True, + ) + upstream_file_relative_path = secure_path( + Path(upstream_file).relative_to(upstream_visit_path) + ) + save_file = download_dir / upstream_file_relative_path + save_file.parent.mkdir(parents=True, exist_ok=True) + with open(save_file, "wb") as f: + for chunk in file_data.iter_content(chunk_size=32 * 1024**2): + f.write(chunk) + logger.info(f"Saved file to {str(save_file)!r}") + return {"success": True} + + +class UpstreamTiffDownloadInfo(BaseModel): download_dir: Path @router.post("/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request") def gather_upstream_tiffs( - visit_name: str, session_id: MurfeySessionID, upstream_tiff_info: UpstreamTiffInfo + visit_name: str, + session_id: MurfeySessionID, + upstream_tiff_info: UpstreamTiffDownloadInfo, ): sanitised_visit_name = sanitise_nonpath(visit_name) assert not any(c in visit_name for c in ("/", "\\", ":", ";")) @@ -490,7 +568,7 @@ def gather_upstream_tiffs( ) for tiff_path in upstream_tiff_paths: tiff_data = requests.get( - f"{murfey_url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff', session_id=session_id, visit_name=sanitised_visit_name, tiff_path=tiff_path)}", + f"{murfey_url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff_file', session_id=session_id, visit_name=sanitised_visit_name, tiff_path=tiff_path)}", stream=True, headers={"Authorization": f"Bearer {tokens[session_id]}"}, ) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 73ad8efdf..9069535ce 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -27,7 +27,7 @@ from murfey.util.api import url_path_for from murfey.util.config import get_machine_config from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters -from murfey.util.models import File, MultigridWatcherSetup +from murfey.util.models import File, MultigridWatcherSetup, UpstreamFileRequestInfo # Create APIRouter class object router = APIRouter( @@ -396,6 +396,64 @@ async def request_upstream_tiff_data_download( return data +@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request") +async def request_upstream_file_data_download( + visit_name: str, + session_id: MurfeySessionID, + upstream_file_request: UpstreamFileRequestInfo, + db=murfey_db, +): + """ + Forwards a request to the instrument server to trigger a file download request. + """ + # Load the current instrument's machine config + instrument_name = ( + db.exec(select(Session).where(Session.id == session_id)).one().instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + + # Log and return errors if download directory or URL weren't specified + if not machine_config.upstream_data_download_directory: + log.error("No download directory was configured for this instrument") + return { + "success": False, + "detail": "No download directory configured", + } + if not machine_config.instrument_server_url: + log.error("Couldn't find instrument server URL to post request to") + return { + "success": False, + "detail": "No instrument server URL", + } + + # Forward the download request + download_dir = str( + machine_config.upstream_data_download_directory / secure_filename(visit_name) + ) + async with aiohttp.ClientSession() as clientsession: + url_path = url_path_for( + "api.router", + "gather_upstream_files", + visit_name=secure_filename(visit_name), + session_id=session_id, + ) + async with clientsession.post( + f"{machine_config.instrument_server_url}{url_path}", + headers={ + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" + }, + json={ + "download_dir": download_dir, + "upstream_instrument": upstream_file_request.upstream_instrument, + "upstream_visit_path": str(upstream_file_request.upstream_visit_path), + }, + ) as resp: + data = await resp.json() + return data + + class RsyncerSource(BaseModel): source: str diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index 6316cbcb3..9d9770029 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -9,7 +9,6 @@ from pydantic import BaseModel from sqlalchemy import func from sqlmodel import select -from werkzeug.utils import secure_filename import murfey.server.prometheus as prom from murfey.server import _transport_object @@ -17,19 +16,23 @@ MurfeySessionIDInstrument as MurfeySessionID, validate_instrument_token, ) -from murfey.server.api.shared import ( +from murfey.server.api.session_shared import ( + find_upstream_visits as _find_upstream_visits, + gather_upstream_files as _gather_upstream_files, + gather_upstream_tiffs as _gather_upstream_tiffs, get_foil_hole as _get_foil_hole, get_foil_holes_from_grid_square as _get_foil_holes_from_grid_square, get_grid_squares as _get_grid_squares, get_grid_squares_from_dcg as _get_grid_squares_from_dcg, get_machine_config_for_instrument, - get_upstream_tiff_dirs, + get_tiff_file as _get_tiff_file, + get_upstream_file as _get_upstream_file, remove_session_by_id, ) from murfey.server.ispyb import DB as ispyb_db, get_all_ongoing_visits from murfey.server.murfey_db import murfey_db from murfey.util import sanitise -from murfey.util.config import MachineConfig, get_machine_config +from murfey.util.config import MachineConfig from murfey.util.db import ( AutoProcProgram, ClientEnvironment, @@ -49,6 +52,7 @@ GridSquareParameters, RsyncerInfo, SearchMapParameters, + UpstreamFileRequestInfo, Visit, ) from murfey.workflows.spa.atlas import atlas_jpg_from_mrc @@ -418,62 +422,53 @@ def register_batch_position( @correlative_router.get("/sessions/{session_id}/upstream_visits") async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db): - murfey_session = db.exec(select(Session).where(Session.id == session_id)).one() - visit_name = murfey_session.visit - instrument_name = murfey_session.instrument_name - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - upstream_visits = {} - # Iterates through provided upstream directories - for p in machine_config.upstream_data_directories: - # Looks for visit name in file path - for v in Path(p).glob(f"{visit_name.split('-')[0]}-*"): - upstream_visits[v.name] = v / machine_config.processed_directory_name - return upstream_visits + return _find_upstream_visits(session_id=session_id, db=db) + + +@correlative_router.get( + "/visits/{visit_name}/sessions/{session_id}/upstream_file_paths" +) +async def gather_upstream_files( + visit_name: str, + session_id: MurfeySessionID, + upstream_file_request: UpstreamFileRequestInfo, + db=murfey_db, +): + return _gather_upstream_files( + session_id=session_id, + upstream_instrument=upstream_file_request.upstream_instrument, + upstream_visit_path=upstream_file_request.upstream_visit_path, + db=db, + ) + + +@correlative_router.get( + "/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path}" +) +async def get_upstream_file( + visit_name: str, + session_id: MurfeySessionID, + upstream_file_path: str, + db=murfey_db, +): + upstream_file = _get_upstream_file(upstream_file_path) + return ( + FileResponse(path=upstream_file) if upstream_file is not None else upstream_file + ) @correlative_router.get( "/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths" ) async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): - """ - Looks for TIFF files associated with the current session in the permitted storage - servers, and returns their relative file paths as a list. - """ - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) - upstream_tiff_paths = [] - tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name) - if not tiff_dirs: - return None - for tiff_dir in tiff_dirs: - for f in tiff_dir.glob("**/*.tiff"): - upstream_tiff_paths.append(str(f.relative_to(tiff_dir))) - for f in tiff_dir.glob("**/*.tif"): - upstream_tiff_paths.append(str(f.relative_to(tiff_dir))) - return upstream_tiff_paths + return _gather_upstream_tiffs(visit_name=visit_name, session_id=session_id, db=db) @correlative_router.get( "/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}" ) -async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name +async def get_tiff_file(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): + tiff_file = _get_tiff_file( + visit_name=visit_name, session_id=session_id, tiff_path=tiff_path, db=db ) - tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name) - if not tiff_dirs: - return None - - tiff_path = "/".join(secure_filename(p) for p in tiff_path.split("/")) - for tiff_dir in tiff_dirs: - test_path = tiff_dir / tiff_path - if test_path.is_file(): - break - else: - logger.warning(f"TIFF {tiff_path} not found") - return None - - return FileResponse(path=test_path) + return FileResponse(path=tiff_file) if tiff_file is not None else tiff_file diff --git a/src/murfey/server/api/session_info.py b/src/murfey/server/api/session_info.py index 62f3be1f3..57db345ce 100644 --- a/src/murfey/server/api/session_info.py +++ b/src/murfey/server/api/session_info.py @@ -7,7 +7,6 @@ from fastapi.responses import FileResponse from pydantic import BaseModel from sqlmodel import select -from werkzeug.utils import secure_filename import murfey.server.api.websocket as ws from murfey.server import _transport_object @@ -17,19 +16,23 @@ MurfeySessionIDFrontend as MurfeySessionID, validate_token, ) -from murfey.server.api.shared import ( +from murfey.server.api.session_shared import ( + find_upstream_visits as _find_upstream_visits, + gather_upstream_files as _gather_upstream_files, + gather_upstream_tiffs as _gather_upstream_tiffs, get_foil_hole as _get_foil_hole, get_foil_holes_from_grid_square as _get_foil_holes_from_grid_square, get_grid_squares as _get_grid_squares, get_grid_squares_from_dcg as _get_grid_squares_from_dcg, get_machine_config_for_instrument, - get_upstream_tiff_dirs, + get_tiff_file as _get_tiff_file, + get_upstream_file as _get_upstream_file, remove_session_by_id, ) from murfey.server.ispyb import DB as ispyb_db, get_all_ongoing_visits from murfey.server.murfey_db import murfey_db from murfey.util import sanitise -from murfey.util.config import MachineConfig, get_machine_config +from murfey.util.config import MachineConfig from murfey.util.db import ( ClientEnvironment, DataCollection, @@ -46,7 +49,7 @@ Tilt, TiltSeries, ) -from murfey.util.models import Visit +from murfey.util.models import UpstreamFileRequestInfo, Visit logger = getLogger("murfey.server.api.session_info") @@ -412,62 +415,53 @@ def get_tilts( @correlative_router.get("/sessions/{session_id}/upstream_visits") async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db): - murfey_session = db.exec(select(Session).where(Session.id == session_id)).one() - visit_name = murfey_session.visit - instrument_name = murfey_session.instrument_name - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - upstream_visits = {} - # Iterates through provided upstream directories - for p in machine_config.upstream_data_directories: - # Looks for visit name in file path - for v in Path(p).glob(f"{visit_name.split('-')[0]}-*"): - upstream_visits[v.name] = v / machine_config.processed_directory_name - return upstream_visits + return _find_upstream_visits(session_id=session_id, db=db) @correlative_router.get( - "/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths" + "/visits/{visit_name}/sessions/{session_id}/upstream_file_paths" ) -async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): - """ - Looks for TIFF files associated with the current session in the permitted storage - servers, and returns their relative file paths as a list. - """ - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name +async def gather_upstream_files( + visit_name: str, + session_id: MurfeySessionID, + upstream_file_request: UpstreamFileRequestInfo, + db=murfey_db, +): + return _gather_upstream_files( + session_id=session_id, + upstream_instrument=upstream_file_request.upstream_instrument, + upstream_visit_path=upstream_file_request.upstream_visit_path, + db=db, ) - upstream_tiff_paths = [] - tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name) - if not tiff_dirs: - return None - for tiff_dir in tiff_dirs: - for f in tiff_dir.glob("**/*.tiff"): - upstream_tiff_paths.append(str(f.relative_to(tiff_dir))) - for f in tiff_dir.glob("**/*.tif"): - upstream_tiff_paths.append(str(f.relative_to(tiff_dir))) - return upstream_tiff_paths @correlative_router.get( - "/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}" + "/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path}" ) -async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name +async def get_upstream_file( + visit_name: str, + session_id: MurfeySessionID, + upstream_file_path: Path, + db=murfey_db, +): + upstream_file = _get_upstream_file(upstream_file_path) + return ( + FileResponse(path=upstream_file) if upstream_file is not None else upstream_file ) - tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name) - if not tiff_dirs: - return None - tiff_path = "/".join(secure_filename(p) for p in tiff_path.split("/")) - for tiff_dir in tiff_dirs: - test_path = tiff_dir / tiff_path - if test_path.is_file(): - break - else: - logger.warning(f"TIFF {tiff_path} not found") - return None - return FileResponse(path=test_path) +@correlative_router.get( + "/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths" +) +async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): + return _gather_upstream_tiffs(visit_name=visit_name, session_id=session_id, db=db) + + +@correlative_router.get( + "/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}" +) +async def get_tiff_file(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): + tiff_file = _get_tiff_file( + visit_name=visit_name, session_id=session_id, tiff_path=tiff_path, db=db + ) + return FileResponse(path=tiff_file) if isinstance(tiff_file, Path) else tiff_file diff --git a/src/murfey/server/api/shared.py b/src/murfey/server/api/session_shared.py similarity index 52% rename from src/murfey/server/api/shared.py rename to src/murfey/server/api/session_shared.py index fd0cc353e..49362d9bb 100644 --- a/src/murfey/server/api/shared.py +++ b/src/murfey/server/api/session_shared.py @@ -4,10 +4,11 @@ from typing import Dict, List, Optional from sqlmodel import select +from sqlmodel.orm.session import Session as SQLModelSession from werkzeug.utils import secure_filename import murfey.server.prometheus as prom -from murfey.util import safe_run, sanitise +from murfey.util import safe_run, sanitise, secure_path from murfey.util.config import MachineConfig, from_file, get_machine_config, settings from murfey.util.db import ( DataCollection, @@ -16,7 +17,7 @@ GridSquare, ProcessingJob, RsyncInstance, - Session, + Session as MurfeySession, ) logger = logging.getLogger("murfey.server.api.shared") @@ -32,9 +33,9 @@ def get_machine_config_for_instrument(instrument_name: str) -> Optional[MachineC def remove_session_by_id(session_id: int, db): - session = db.exec(select(Session).where(Session.id == session_id)).one() + session = db.exec(select(MurfeySession).where(MurfeySession.id == session_id)).one() sessions_for_visit = db.exec( - select(Session).where(Session.visit == session.visit) + select(MurfeySession).where(MurfeySession.visit == session.visit) ).all() # Don't remove prometheus metrics if there are other sessions using them if len(sessions_for_visit) == 1: @@ -145,13 +146,94 @@ def get_foil_hole(session_id: int, fh_name: int, db) -> Dict[str, int]: return {f[1].tag: f[0].id for f in foil_holes} +def find_upstream_visits(session_id: int, db: SQLModelSession): + """ + Returns a nested dictionary, in which visits and the full paths to their directories + are further grouped by instrument name. + """ + murfey_session = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + visit_name = murfey_session.visit + instrument_name = murfey_session.instrument_name + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + upstream_visits: dict[str, dict[str, Path]] = {} + # Iterates through provided upstream directories + for ( + upstream_instrument, + upstream_data_dir, + ) in machine_config.upstream_data_directories.items(): + # Looks for visit name in file path + current_upstream_visits = {} + for visit_path in Path(upstream_data_dir).glob(f"{visit_name.split('-')[0]}-*"): + current_upstream_visits[visit_path.name] = visit_path + upstream_visits[upstream_instrument] = current_upstream_visits + return upstream_visits + + +def gather_upstream_files( + session_id: int, + upstream_instrument: str, + upstream_visit_path: Path, + db: SQLModelSession, +): + """ + Searches the specified upstream instrument for files based on the search strings + set in the MachineConfig and returns them as a list of file paths. + """ + # Load the current instrument's machine config + murfey_session = db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + instrument_name = murfey_session.instrument_name + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + + # Search for files using the configured strings for that upstream instrument + file_list: list[Path] = [] + logger.info(f"Searching for files in {sanitise(str(upstream_visit_path))!r}") + if ( + machine_config.upstream_data_search_strings.get(upstream_instrument, None) + is not None + ): + for search_string in machine_config.upstream_data_search_strings[ + upstream_instrument + ]: + logger.info(f"Using search string {search_string}") + for file in upstream_visit_path.glob(search_string): + if file.is_file(): + file_list.append(file) + logger.info( + f"Found {len(file_list)} files for download " + f"from {sanitise(upstream_instrument)}" + ) + else: + logger.warning( + "Upstream file searching has not been configured for " + f"{sanitise(upstream_instrument)} on {sanitise(instrument_name)}" + ) + return file_list + + +def get_upstream_file(file_path: str | Path): + file_path = Path(file_path) if isinstance(file_path, str) else file_path + file_path = secure_path(file_path) + if file_path.exists() and file_path.is_file(): + return file_path + logger.warning(f"Requested file {sanitise(str(file_path))!r} was not found") + return None + + def get_upstream_tiff_dirs(visit_name: str, instrument_name: str) -> List[Path]: tiff_dirs = [] machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] for directory_name in machine_config.upstream_data_tiff_locations: - for p in machine_config.upstream_data_directories: + for _, p in machine_config.upstream_data_directories.items(): if (Path(p) / secure_filename(visit_name)).is_dir(): processed_dir = Path(p) / secure_filename(visit_name) / directory_name tiff_dirs.append(processed_dir) @@ -161,3 +243,49 @@ def get_upstream_tiff_dirs(visit_name: str, instrument_name: str) -> List[Path]: f"No candidate directory found for upstream download from visit {sanitise(visit_name)}" ) return tiff_dirs + + +def gather_upstream_tiffs(visit_name: str, session_id: int, db: SQLModelSession): + """ + Looks for TIFF files associated with the current session in the permitted storage + servers, and returns their relative file paths as a list. + """ + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + upstream_tiff_paths = [] + tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name) + if not tiff_dirs: + return None + for tiff_dir in tiff_dirs: + for f in tiff_dir.glob("**/*.tiff"): + upstream_tiff_paths.append(str(f.relative_to(tiff_dir))) + for f in tiff_dir.glob("**/*.tif"): + upstream_tiff_paths.append(str(f.relative_to(tiff_dir))) + return upstream_tiff_paths + + +def get_tiff_file( + visit_name: str, session_id: int, tiff_path: str, db: SQLModelSession +): + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name) + if not tiff_dirs: + return None + + tiff_path = "/".join(secure_filename(p) for p in tiff_path.split("/")) + for tiff_dir in tiff_dirs: + tiff_file = tiff_dir / tiff_path + if tiff_file.is_file(): + break + else: + logger.warning(f"TIFF {tiff_path} not found") + return None + + return tiff_file diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 8ff64dc4a..458507b84 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -58,8 +58,9 @@ class MachineConfig(BaseModel): # type: ignore allow_removal: bool = False # Upstream data download setup - upstream_data_directories: list[Path] = [] # Previous sessions + upstream_data_directories: dict[str, Path] = {} # Previous sessions upstream_data_download_directory: Optional[Path] = None # Set by microscope config + upstream_data_search_strings: dict[str, list[str]] = {} # For glob search upstream_data_tiff_locations: list[str] = ["processed"] # Location of CLEM TIFFs # Data processing setup ----------------------------------------------------------- diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 72b0b542c..64f9cd4f4 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -84,6 +84,12 @@ class RsyncerSkippedFiles(BaseModel): increment_count: int = 1 +class UpstreamFileRequestInfo(BaseModel): + # Used in backend server for cross-instrument file download requests + upstream_instrument: str + upstream_visit_path: Path + + """ Single Particle Analysis ======================== diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index 1c998a336..2f8cd36a3 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -148,6 +148,15 @@ murfey.instrument_server.api.router: type: int methods: - POST + - path: /visits/{visit_name}/sessions/{session_id}/upstream_file_data_request + function: gather_upstream_files + path_params: + - name: visit_name + type: str + - name: session_id + type: int + methods: + - POST - path: /visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request function: gather_upstream_tiffs path_params: @@ -616,6 +625,15 @@ murfey.server.api.instrument.router: type: int methods: - POST + - path: /instrument_server/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request + function: request_upstream_file_data_download + path_params: + - name: visit_name + type: str + - name: session_id + type: int + methods: + - POST - path: /instrument_server/sessions/{session_id}/stop_rsyncer function: stop_rsyncer path_params: @@ -760,6 +778,26 @@ murfey.server.api.session_control.correlative_router: type: int methods: - GET + - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_file_paths + function: gather_upstream_files + path_params: + - name: visit_name + type: str + - name: session_id + type: int + methods: + - GET + - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path} + function: get_upstream_file + path_params: + - name: visit_name + type: str + - name: upstream_file_path + type: str + - name: session_id + type: int + methods: + - GET - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths function: gather_upstream_tiffs path_params: @@ -770,7 +808,7 @@ murfey.server.api.session_control.correlative_router: methods: - GET - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path} - function: get_tiff + function: get_tiff_file path_params: - name: visit_name type: str @@ -983,6 +1021,26 @@ murfey.server.api.session_info.correlative_router: type: int methods: - GET + - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_file_paths + function: gather_upstream_files + path_params: + - name: visit_name + type: str + - name: session_id + type: int + methods: + - GET + - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path} + function: get_upstream_file + path_params: + - name: visit_name + type: str + - name: upstream_file_path + type: Path + - name: session_id + type: int + methods: + - GET - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths function: gather_upstream_tiffs path_params: @@ -993,7 +1051,7 @@ murfey.server.api.session_info.correlative_router: methods: - GET - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path} - function: get_tiff + function: get_tiff_file path_params: - name: visit_name type: str diff --git a/tests/client/tui/test_main.py b/tests/client/tui/test_main.py index a42801547..82b006990 100644 --- a/tests/client/tui/test_main.py +++ b/tests/client/tui/test_main.py @@ -60,7 +60,7 @@ def test_get_visit_list( base_url=server_url, router_name="session_control.router", function_name="get_current_visits", - token="", + token=mock.ANY, instrument_name=instrument_name, ) diff --git a/tests/server/api/test_session_shared.py b/tests/server/api/test_session_shared.py new file mode 100644 index 000000000..7f971b948 --- /dev/null +++ b/tests/server/api/test_session_shared.py @@ -0,0 +1,184 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from murfey.server.api.session_shared import find_upstream_visits, gather_upstream_files +from murfey.util.config import MachineConfig +from tests.conftest import ExampleVisit + + +def test_find_upstream_visits( + mocker: MockerFixture, + tmp_path: Path, + # murfey_db_session, +): + # Get the visit, instrument name, and session ID + visit_name_root = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}" + visit_name = f"{visit_name_root}-{ExampleVisit.visit_number}" + instrument_name = ExampleVisit.instrument_name + session_id = ExampleVisit.murfey_session_id + + # Mock the database call + mock_murfey_session_row = MagicMock() + mock_murfey_session_row.visit = visit_name + mock_murfey_session_row.instrument_name = instrument_name + mock_murfey_db = MagicMock() + mock_murfey_db.exec.return_value.one.return_value = mock_murfey_session_row + + # Create mock upstream visit directories and necessary data structures + upstream_visits = {} + upstream_data_dirs = {} + for n in range(5): + upstream_instrument = f"{instrument_name}{str(n).zfill(2)}" + upstream_visit = ( + tmp_path / f"{upstream_instrument}/data/2020/{visit_name_root}-{n}" + ) + upstream_visit.mkdir(parents=True, exist_ok=True) + upstream_visits[upstream_instrument] = {upstream_visit.stem: upstream_visit} + upstream_data_dirs[upstream_instrument] = upstream_visit.parent + + # Mock the MachineConfig for this instrument + mock_machine_config = MagicMock(spec=MachineConfig) + mock_machine_config.upstream_data_directories = upstream_data_dirs + mock_get_machine_config = mocker.patch( + "murfey.server.api.session_shared.get_machine_config", + ) + mock_get_machine_config.return_value = {instrument_name: mock_machine_config} + + # Run the function: + result = find_upstream_visits(session_id=session_id, db=mock_murfey_db) + + # Mock the database call + assert result == upstream_visits + + +gather_upstream_files_test_matrix: tuple[ + tuple[tuple[str, list[str], list[str]], ...], ... +] = ( + # CLEM + ( + # Search strings, files to match, and files to avoid + ( + "processed/**/composite*.tiff", + [ + file + for sublist in [ + [ + f"processed/grid1/TileScan1/Position_{n}/composite_BF_FL.tiff" + for n in range(5) + ], + ] + for file in sublist + ], + [ + file + for sublist in [ + [ + f"processed/grid1/TileScan1/Position_{n}/{color}.tiff" + for n in range(5) + for color in ("gray", "green", "red") + ], + ] + for file in sublist + ], + ), + ( + "screenshots/**/*", + [ + file + for sublist in [ + [f"screenshots/overview_{n}.png" for n in range(10)], + [f"screenshots/annotated_{n}.png" for n in range(10)], + ] + for file in sublist + ], + [], + ), + ), + # FIB + ( + # Search strings, files to match, and files to avoid + ( + "maps/**/*", + [ + file + for sublist in [ + [f"maps/data_{n}.txt" for n in range(5)], + [f"maps/map/image_{n}.tiff" for n in range(5)], + ] + for file in sublist + ], + [], + ), + ), +) + + +@pytest.mark.parametrize("test_params", gather_upstream_files_test_matrix) +def test_gather_upstream_files( + mocker: MockerFixture, + tmp_path: Path, + test_params: tuple[tuple[str, list[str], list[str]], ...], +): + # Get the visit, instrument name, and session ID + visit_name_root = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}" + visit_name = f"{visit_name_root}-{ExampleVisit.visit_number}" + instrument_name = ExampleVisit.instrument_name + session_id = ExampleVisit.murfey_session_id + + # Unpack the test params + search_strings = [item[0] for item in test_params] + upstream_relative_paths = [file for item in test_params for file in item[1]] + other_relative_paths = [file for item in test_params for file in item[2]] + + # Set the upstream instrument and upstream visit to access + upstream_instrument = f"{instrument_name}01" + upstream_visit = f"{visit_name_root}-5" + upstream_visit_path = tmp_path / f"{upstream_instrument}/data/2020/{upstream_visit}" + + # Construct the files and directories + upstream_files = [ + upstream_visit_path / relative_path for relative_path in upstream_relative_paths + ] + other_files = [ + upstream_visit_path / relative_path for relative_path in other_relative_paths + ] + + for file in upstream_files: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + file.touch(exist_ok=True) + assert file.is_file() + for file in other_files: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + file.touch(exist_ok=True) + assert file.is_file() + + # Mock the database call + mock_murfey_session_row = MagicMock() + mock_murfey_session_row.visit = visit_name + mock_murfey_session_row.instrument_name = instrument_name + mock_murfey_db = MagicMock() + mock_murfey_db.exec.return_value.one.return_value = mock_murfey_session_row + + # Mock the MachineConfig for this instrument + mock_machine_config = MagicMock(spec=MachineConfig) + mock_machine_config.upstream_data_search_strings = { + upstream_instrument: search_strings + } + mock_get_machine_config = mocker.patch( + "murfey.server.api.session_shared.get_machine_config", + ) + mock_get_machine_config.return_value = {instrument_name: mock_machine_config} + + assert sorted( + gather_upstream_files( + session_id=session_id, + upstream_instrument=upstream_instrument, + upstream_visit_path=upstream_visit_path, + db=mock_murfey_db, + ) + ) == sorted(upstream_files)