diff --git a/src/murfey/client/contexts/spa_metadata.py b/src/murfey/client/contexts/spa_metadata.py index d049c2a7a..5e99a7b7a 100644 --- a/src/murfey/client/contexts/spa_metadata.py +++ b/src/murfey/client/contexts/spa_metadata.py @@ -181,7 +181,11 @@ def post_transfer( }, ) - elif transferred_file.suffix == ".dm" and environment: + elif ( + transferred_file.suffix == ".dm" + and transferred_file.name.startswith("GridSquare") + and environment + ): gs_name = transferred_file.stem.split("_")[1] fh_positions = _foil_hole_positions(transferred_file, int(gs_name)) source = _get_source(transferred_file, environment=environment) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 39786ff2f..11c4b3b35 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -2948,7 +2948,12 @@ def feedback_callback(header: dict, message: dict) -> None: else: # Send it directly to DLQ without trying to rerun it _transport_object.transport.nack(header, requeue=False) + if not result: + logger.error( + f"Workflow {sanitise(message['register'])} returned {result}" + ) return None + logger.error(f"No workflow found for {sanitise(message['register'])}") if _transport_object: _transport_object.transport.nack(header, requeue=False) return None diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 4873f81ef..7fa7048d1 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -579,7 +579,7 @@ def flush_spa_processing( visit_name: str, session_id: MurfeySessionID, tag: Tag, db=murfey_db ): zocalo_message = { - "register": "flush_spa_preprocess", + "register": "spa.flush_spa_preprocess", "session_id": session_id, "tag": tag.tag, } diff --git a/src/murfey/util/spa_metadata.py b/src/murfey/util/spa_metadata.py index 6dea16ac1..bb4fd3d72 100644 --- a/src/murfey/util/spa_metadata.py +++ b/src/murfey/util/spa_metadata.py @@ -157,6 +157,8 @@ def foil_hole_data(xml_path: Path, foil_hole: int, grid_square: int) -> FoilHole serialization_array = data["TargetLocations"]["TargetLocationsEfficient"][ "a:m_serializationArray" ] + if len(serialization_array.keys()) == 0: + return FoilHoleInfo(id=foil_hole, grid_square_id=grid_square) for key in serialization_array.keys(): if key.startswith("b:KeyValuePairOfintTargetLocation"): required_key = key diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py index f2d68b36c..d0c220b5a 100644 --- a/src/murfey/workflows/spa/flush_spa_preprocess.py +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -9,9 +9,19 @@ from murfey.server import _murfey_id, _transport_object, sanitise from murfey.server.api.auth import MurfeySessionID -from murfey.server.murfey_db import murfey_db from murfey.util.config import get_machine_config, get_microscope -from murfey.util.db import DataCollectionGroup, FoilHole, GridSquare +from murfey.util.db import ( + AutoProcProgram, + DataCollection, + DataCollectionGroup, + FoilHole, + GridSquare, + Movie, + PreprocessStash, + ProcessingJob, +) +from murfey.util.db import Session as MurfeySession +from murfey.util.db import SPAFeedbackParameters, SPARelionParameters from murfey.util.models import FoilHoleParameters, GridSquareParameters from murfey.util.processing_params import default_spa_parameters from murfey.util.spa_metadata import ( @@ -30,10 +40,10 @@ def register_grid_square( session_id: MurfeySessionID, gsid: int, grid_square_params: GridSquareParameters, - db=murfey_db, + murfey_db: Session, ): try: - grid_square = db.exec( + grid_square = murfey_db.exec( select(GridSquare) .where(GridSquare.name == gsid) .where(GridSquare.tag == grid_square_params.tag) @@ -51,7 +61,7 @@ def register_grid_square( _transport_object.do_update_grid_square(grid_square.id, grid_square_params) except Exception: if _transport_object: - dcg = db.exec( + dcg = murfey_db.exec( select(DataCollectionGroup) .where(DataCollectionGroup.session_id == session_id) .where(DataCollectionGroup.tag == grid_square_params.tag) @@ -90,19 +100,19 @@ def register_grid_square( pixel_size=grid_square_params.pixel_size, image=secured_grid_square_image_path, ) - db.add(grid_square) - db.commit() - db.close() + murfey_db.add(grid_square) + murfey_db.commit() + murfey_db.close() def register_foil_hole( session_id: MurfeySessionID, gs_name: int, foil_hole_params: FoilHoleParameters, - db=murfey_db, + murfey_db: Session, ): try: - gs = db.exec( + gs = murfey_db.exec( select(GridSquare) .where(GridSquare.tag == foil_hole_params.tag) .where(GridSquare.session_id == session_id) @@ -120,7 +130,7 @@ def register_foil_hole( else: jpeg_size = (0, 0) try: - foil_hole = db.exec( + foil_hole = murfey_db.exec( select(FoilHole) .where(FoilHole.name == foil_hole_params.name) .where(FoilHole.grid_square_id == gsid) @@ -180,9 +190,9 @@ def register_foil_hole( pixel_size=foil_hole_params.pixel_size, image=secured_foil_hole_image_path, ) - db.add(foil_hole) - db.commit() - db.close() + murfey_db.add(foil_hole) + murfey_db.commit() + murfey_db.close() def _grid_square_metadata_file(f: Path, grid_square: int) -> Optional[Path]: @@ -198,11 +208,11 @@ def _grid_square_metadata_file(f: Path, grid_square: int) -> Optional[Path]: def _flush_position_analysis( - movie_path: Path, dcg_id: int, session_id: int, db: Session + movie_path: Path, dcg_id: int, session_id: int, murfey_db: Session ) -> Optional[int]: """Register a grid square and foil hole in the database""" data_collection_group = murfey_db.exec( - select(db.DataCollectionGroup).where(db.DataCollectionGroup.id == dcg_id) + select(DataCollectionGroup).where(DataCollectionGroup.id == dcg_id) ).one() # Work out the grid square and associated metadata file @@ -281,17 +291,19 @@ def _flush_position_analysis( return foil_hole -def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): +def flush_spa_preprocess(message: dict, murfey_db: Session, demo: bool = False) -> bool: session_id = message["session_id"] stashed_files = murfey_db.exec( - select(db.PreprocessStash) - .where(db.PreprocessStash.session_id == session_id) - .where(db.PreprocessStash.tag == message["tag"]) + select(PreprocessStash) + .where(PreprocessStash.session_id == session_id) + .where(PreprocessStash.tag == message["tag"]) ).all() if not stashed_files: - return None + return True instrument_name = ( - murfey_db.exec(select(db.Session).where(db.Session.id == message["session_id"])) + murfey_db.exec( + select(MurfeySession).where(MurfeySession.id == message["session_id"]) + ) .one() .instrument_name ) @@ -301,22 +313,22 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): recipe_name = machine_config.recipes.get("em-spa-preprocess", "em-spa-preprocess") collected_ids = murfey_db.exec( select( - db.DataCollectionGroup, - db.DataCollection, - db.ProcessingJob, - db.AutoProcProgram, + DataCollectionGroup, + DataCollection, + ProcessingJob, + AutoProcProgram, ) - .where(db.DataCollectionGroup.session_id == session_id) - .where(db.DataCollectionGroup.tag == message["tag"]) - .where(db.DataCollection.dcg_id == db.DataCollectionGroup.id) - .where(db.ProcessingJob.dc_id == db.DataCollection.id) - .where(db.AutoProcProgram.pj_id == db.ProcessingJob.id) - .where(db.ProcessingJob.recipe == recipe_name) + .where(DataCollectionGroup.session_id == session_id) + .where(DataCollectionGroup.tag == message["tag"]) + .where(DataCollection.dcg_id == DataCollectionGroup.id) + .where(ProcessingJob.dc_id == DataCollection.id) + .where(AutoProcProgram.pj_id == ProcessingJob.id) + .where(ProcessingJob.recipe == recipe_name) ).one() params = murfey_db.exec( - select(db.SPARelionParameters, db.SPAFeedbackParameters) - .where(db.SPARelionParameters.pj_id == collected_ids[2].id) - .where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id) + select(SPARelionParameters, SPAFeedbackParameters) + .where(SPARelionParameters.pj_id == collected_ids[2].id) + .where(SPAFeedbackParameters.pj_id == SPARelionParameters.pj_id) ).one() proc_params = params[0] feedback_params = params[1] @@ -324,9 +336,7 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): logger.warning( f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}" ) - raise ValueError( - "No processing parameters were found in the database when flushing SPA preprocessing" - ) + return False murfey_ids = _murfey_id( collected_ids[3].id, @@ -345,10 +355,10 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): # Register grid square and foil hole if not present try: foil_hole_id = _flush_position_analysis( - movie_path=f.file_path, + movie_path=Path(f.file_path), dcg_id=collected_ids[0].id, session_id=session_id, - db=db, + murfey_db=murfey_db, ) except Exception as e: logger.error( @@ -361,7 +371,7 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): ppath = Path(f.file_path) if not mrcp.parent.exists(): mrcp.parent.mkdir(parents=True) - movie = db.Movie( + movie = Movie( murfey_id=murfey_ids[2 * i], path=f.file_path, image_number=f.image_number, @@ -407,4 +417,4 @@ def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): ) murfey_db.commit() murfey_db.close() - return None + return True