From 131345b7fc2a91cdc0d51109139bbbbd256044a8 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 20 Mar 2025 09:07:36 +0000 Subject: [PATCH 1/6] Add handlers for logging from instrument server --- src/murfey/instrument_server/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/murfey/instrument_server/__init__.py b/src/murfey/instrument_server/__init__.py index 1bf239827..19d9233ec 100644 --- a/src/murfey/instrument_server/__init__.py +++ b/src/murfey/instrument_server/__init__.py @@ -41,7 +41,9 @@ def run(): ) handler = CustomHandler(ws.send) - logging.getLogger().addHandler(handler) + logging.getLogger("murfey").addHandler(handler) + logging.getLogger("fastapi").addHandler(handler) + logging.getLogger("uvicorn").addHandler(handler) logger.info( f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}" From 3252c80b8dabbd2990ba332e5af7a633e6ab3044 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 20 Mar 2025 09:11:43 +0000 Subject: [PATCH 2/6] Misnamed processing job id --- src/murfey/server/api/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 50439fd2d..333ecf055 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1262,8 +1262,7 @@ async def request_tomography_preprocessing( processing_job_parameters = db.exec( select(TomographyProcessingParameters).where( - TomographyProcessingParameters.processing_job_id - == data_collection[2].id + TomographyProcessingParameters.pj_id == data_collection[2].id ) ).all() if processing_job_parameters: From 3ac92da4a2118f5a42ae63775fb36e224911975d Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 20 Mar 2025 11:02:51 +0000 Subject: [PATCH 3/6] Need to add tag to gain directory --- src/murfey/server/gain.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/murfey/server/gain.py b/src/murfey/server/gain.py index 5f78b70a0..1153b70fe 100644 --- a/src/murfey/server/gain.py +++ b/src/murfey/server/gain.py @@ -18,8 +18,11 @@ class Camera(Enum): FALCON = 3 -def _sanitise(gain_path: Path) -> Path: - dest = gain_path.parent / "gain" / gain_path.name.replace(" ", "_") +def _sanitise(gain_path: Path, tag: str) -> Path: + if tag: + dest = gain_path.parent / f"gain_{tag}" / gain_path.name.replace(" ", "_") + else: + dest = gain_path.parent / "gain" / gain_path.name.replace(" ", "_") dest.write_bytes(gain_path.read_bytes()) return dest @@ -57,7 +60,7 @@ async def prepare_gain( secure_path(gain_path.parent / f"gain_{tag}").mkdir(exist_ok=True) else: secure_path(gain_path.parent / "gain").mkdir(exist_ok=True) - gain_path = _sanitise(gain_path) + gain_path = _sanitise(gain_path, tag) flip = "flipx" if camera == Camera.K3_FLIPX else "flipy" gain_path_mrc = gain_path.with_suffix(".mrc") gain_path_superres = gain_path.parent / (gain_path.name + "_superres.mrc") From 1456b51b18565f4e4313ba776b8c9bb475b23810 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 20 Mar 2025 11:08:41 +0000 Subject: [PATCH 4/6] Need to insert the session id --- src/murfey/server/api/instrument.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 2a9d36b47..de0eaa3e6 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -154,6 +154,7 @@ async def pass_proc_params_to_instrument_server( session = db.exec(select(Session).where(Session.id == session_id)).one() session_processing_parameters = SessionProcessingParameters( + session_id=session_id, dose_per_frame=proc_params.dose_per_frame, gain_ref=session.current_gain_ref, symmetry=proc_params.symmetry, From 3fd3eb391d5949e08507de036227de6f2616ee05 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 20 Mar 2025 11:16:11 +0000 Subject: [PATCH 5/6] Dont redefine the session variable --- src/murfey/server/api/instrument.py | 48 ++++++++++++++--------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index de0eaa3e6..197d62dad 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -60,8 +60,8 @@ async def activate_instrument_server_for_session( machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/token", json={"access_token": token, "token_type": "bearer"}, ) as response: @@ -80,11 +80,11 @@ async def check_if_session_is_active(instrument_name: str, session_id: int): if instrument_server_tokens.get(session_id) is None: return {"active": False} async with lock: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession() as clientsession: machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - async with session.get( + async with clientsession.get( f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/check_token", headers={ "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" @@ -116,8 +116,8 @@ async def start_multigrid_watcher( "visit": visit, "default_model": str(machine_config.default_model), } - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/multigrid_watcher", json={ "source": str(secure_path(watcher_spec.source / visit)), @@ -171,8 +171,8 @@ async def pass_proc_params_to_instrument_server( ] if machine_config.instrument_server_url: label = db.exec(select(Session).where(Session.id == session_id)).one().name - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/processing_parameters", json={ "label": label, @@ -200,8 +200,8 @@ async def check_instrument_server(instrument_name: str): instrument_name ] if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.get( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( f"{machine_config.instrument_server_url}/health", ) as resp: data = await resp.json() @@ -221,8 +221,8 @@ async def get_possible_gain_references( if machine_config.instrument_server_url: async with lock: token = instrument_server_tokens[session_id]["access_token"] - async with aiohttp.ClientSession() as session: - async with session.get( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.get( f"{machine_config.instrument_server_url}/instruments/{sanitise(instrument_name)}/sessions/{sanitise(str(session_id))}/possible_gain_references", headers={"Authorization": f"Bearer {token}"}, ) as resp: @@ -250,8 +250,8 @@ async def request_gain_reference_upload( visit_path = f"{datetime.datetime.now().year}/{visit}" data = {} if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/instruments/{instrument_name}/sessions/{session_id}/upload_gain_reference", json={ "gain_path": str(gain_reference_request.gain_path), @@ -283,8 +283,8 @@ async def request_upstream_tiff_data_download( / secure_filename(visit_name) ) if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/visits/{secure_filename(visit_name)}/sessions/{sanitise(str(session_id))}/upstream_tiff_data_request", json={"download_dir": download_dir}, headers={ @@ -311,8 +311,8 @@ async def stop_rsyncer( instrument_name ] if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/stop_rsyncer", json={ "label": session_id, @@ -338,8 +338,8 @@ async def finalise_rsyncer( instrument_name ] if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_rsyncer", json={ "label": session_id, @@ -366,8 +366,8 @@ async def remove_rsyncer( ] if isinstance(session_id, int): if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/remove_rsyncer", json={ "label": session_id, @@ -394,8 +394,8 @@ async def restart_rsyncer( ] if isinstance(session_id, int): if machine_config.instrument_server_url: - async with aiohttp.ClientSession() as session: - async with session.post( + async with aiohttp.ClientSession() as clientsession: + async with clientsession.post( f"{machine_config.instrument_server_url}/sessions/{session_id}/restart_rsyncer", json={ "label": session_id, From f97c3ca0b4c273e1b1a091015a0315276becee4a Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 20 Mar 2025 15:01:30 +0000 Subject: [PATCH 6/6] Wrong name for parameters --- src/murfey/client/analyser.py | 4 +++- src/murfey/server/api/__init__.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 9e7a37d1d..3e4ea4830 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -218,7 +218,9 @@ def post_transfer(self, transferred_file: Path): transferred_file, environment=self._environment ) except Exception as e: - logger.error(f"An exception was encountered post transfer: {e}") + logger.error( + f"An exception was encountered post transfer: {e}", exc_info=True + ) def _analyse(self): logger.info("Analyser thread started") diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 333ecf055..fa85608e2 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1546,7 +1546,7 @@ def register_proc( proc_params: ProcessingJobParameters, db=murfey_db, ): - proc_parameters = { + proc_parameters: dict = { "session_id": session_id, "experiment_type": proc_params.experiment_type, "recipe": proc_params.recipe, @@ -1564,7 +1564,8 @@ def register_proc( ).all() if session_processing_parameters: - proc_params["job_parameters"].update( + job_parameters: dict = proc_parameters["job_parameters"] + job_parameters.update( { "gain_ref": session_processing_parameters[0].gain_ref, "dose_per_frame": session_processing_parameters[0].dose_per_frame, @@ -1574,6 +1575,7 @@ def register_proc( "symmetry": session_processing_parameters[0].symmetry, } ) + proc_parameters["job_parameters"] = job_parameters if _transport_object: _transport_object.send(