diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 9e7a37d1d..3e4ea4830 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -218,7 +218,9 @@ def post_transfer(self, transferred_file: Path): transferred_file, environment=self._environment ) except Exception as e: - logger.error(f"An exception was encountered post transfer: {e}") + logger.error( + f"An exception was encountered post transfer: {e}", exc_info=True + ) def _analyse(self): logger.info("Analyser thread started") diff --git a/src/murfey/instrument_server/__init__.py b/src/murfey/instrument_server/__init__.py index 1bf239827..19d9233ec 100644 --- a/src/murfey/instrument_server/__init__.py +++ b/src/murfey/instrument_server/__init__.py @@ -41,7 +41,9 @@ def run(): ) handler = CustomHandler(ws.send) - logging.getLogger().addHandler(handler) + logging.getLogger("murfey").addHandler(handler) + logging.getLogger("fastapi").addHandler(handler) + logging.getLogger("uvicorn").addHandler(handler) logger.info( f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}" diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 50439fd2d..fa85608e2 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1262,8 +1262,7 @@ async def request_tomography_preprocessing( processing_job_parameters = db.exec( select(TomographyProcessingParameters).where( - TomographyProcessingParameters.processing_job_id - == data_collection[2].id + TomographyProcessingParameters.pj_id == data_collection[2].id ) ).all() if processing_job_parameters: @@ -1547,7 +1546,7 @@ def register_proc( proc_params: ProcessingJobParameters, db=murfey_db, ): - proc_parameters = { + proc_parameters: dict = { "session_id": session_id, "experiment_type": proc_params.experiment_type, "recipe": proc_params.recipe, @@ -1565,7 +1564,8 @@ def register_proc( ).all() if session_processing_parameters: - proc_params["job_parameters"].update( + job_parameters: dict = proc_parameters["job_parameters"] + job_parameters.update( { "gain_ref": session_processing_parameters[0].gain_ref, "dose_per_frame": session_processing_parameters[0].dose_per_frame, @@ -1575,6 +1575,7 @@ def register_proc( "symmetry": session_processing_parameters[0].symmetry, } ) + proc_parameters["job_parameters"] = job_parameters if _transport_object: _transport_object.send( diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 2a9d36b47..197d62dad 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -60,8 +60,8 @@ async def activate_instrument_server_for_session( machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/token", json={"access_token": token, "token_type": "bearer"}, ) as response: @@ -80,11 +80,11 @@ async def check_if_session_is_active(instrument_name: str, session_id: int): if instrument_server_tokens.get(session_id) is None: return {"active": False} async with lock: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession() as clientsession: machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - async with session.get( + async with clientsession.get( f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/check_token", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" @@ -116,8 +116,8 @@ async def start_multigrid_watcher( "visit": visit, "default_model": str(machine_config.default_model), } - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/multigrid_watcher", json={ "source": str(secure_path(watcher_spec.source / visit)), @@ -154,6 +154,7 @@ async def pass_proc_params_to_instrument_server( session = db.exec(select(Session).where(Session.id == session_id)).one() session_processing_parameters = SessionProcessingParameters( + session_id=session_id, dose_per_frame=proc_params.dose_per_frame, gain_ref=session.current_gain_ref, symmetry=proc_params.symmetry, @@ -170,8 +171,8 @@ async def pass_proc_params_to_instrument_server( ] if machine_config.instrument_server_url: label = db.exec(select(Session).where(Session.id == session_id)).one().name - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/processing_parameters", json={ "label": label, @@ -199,8 +200,8 @@ async def check_instrument_server(instrument_name: str): instrument_name ] if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.get( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( f"{machine_config.instrument_server_url}/health", ) as resp: data = await resp.json() @@ -220,8 +221,8 @@ async def get_possible_gain_references( if machine_config.instrument_server_url: async with lock: token = instrument_server_tokens[session_id]["access_token"] - async with aiohttp.ClientSession() as session: - async with session.get( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( f"{machine_config.instrument_server_url}/instruments/{sanitise(instrument_name)}/sessions/{sanitise(str(session_id))}/possible_gain_references", headers={"Authorization": f"Bearer {token}"}, ) as resp: @@ -249,8 +250,8 @@ async def request_gain_reference_upload( visit_path = f"{datetime.datetime.now().year}/{visit}" data = {} if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/instruments/{instrument_name}/sessions/{session_id}/upload_gain_reference", json={ "gain_path": str(gain_reference_request.gain_path), @@ -282,8 +283,8 @@ async def request_upstream_tiff_data_download( / secure_filename(visit_name) ) if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/visits/{secure_filename(visit_name)}/sessions/{sanitise(str(session_id))}/upstream_tiff_data_request", json={"download_dir": download_dir}, headers={ @@ -310,8 +311,8 @@ async def stop_rsyncer( instrument_name ] if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/stop_rsyncer", json={ "label": session_id, @@ -337,8 +338,8 @@ async def finalise_rsyncer( instrument_name ] if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_rsyncer", json={ "label": session_id, @@ -365,8 +366,8 @@ async def remove_rsyncer( ] if isinstance(session_id, int): if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/remove_rsyncer", json={ "label": session_id, @@ -393,8 +394,8 @@ async def restart_rsyncer( ] if isinstance(session_id, int): if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/restart_rsyncer", json={ "label": session_id, diff --git a/src/murfey/server/gain.py b/src/murfey/server/gain.py index 5f78b70a0..1153b70fe 100644 --- a/src/murfey/server/gain.py +++ b/src/murfey/server/gain.py @@ -18,8 +18,11 @@ class Camera(Enum): FALCON = 3 -def _sanitise(gain_path: Path) -> Path: - dest = gain_path.parent / "gain" / gain_path.name.replace(" ", "_") +def _sanitise(gain_path: Path, tag: str) -> Path: + if tag: + dest = gain_path.parent / f"gain_{tag}" / gain_path.name.replace(" ", "_") + else: + dest = gain_path.parent / "gain" / gain_path.name.replace(" ", "_") dest.write_bytes(gain_path.read_bytes()) return dest @@ -57,7 +60,7 @@ async def prepare_gain( secure_path(gain_path.parent / f"gain_{tag}").mkdir(exist_ok=True) else: secure_path(gain_path.parent / "gain").mkdir(exist_ok=True) - gain_path = _sanitise(gain_path) + gain_path = _sanitise(gain_path, tag) flip = "flipx" if camera == Camera.K3_FLIPX else "flipy" gain_path_mrc = gain_path.with_suffix(".mrc") gain_path_superres = gain_path.parent / (gain_path.name + "_superres.mrc")