From f00c48212b4211472708b7183ff977a825b5a93d Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 15 May 2025 15:08:54 +0100 Subject: [PATCH 1/6] Rearranged lines in 'run()' in logical blocks; added comments to describe purpose of code blocks --- src/murfey/client/__init__.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/murfey/client/__init__.py b/src/murfey/client/__init__.py index 4f5a64029..eb454c29e 100644 --- a/src/murfey/client/__init__.py +++ b/src/murfey/client/__init__.py @@ -11,6 +11,7 @@ import webbrowser from datetime import datetime from pathlib import Path +from pprint import pprint from queue import Queue from typing import List, Literal from urllib.parse import ParseResult, urlparse @@ -91,6 +92,7 @@ def _check_for_updates( def run(): + # Load client config and server information config = read_config() instrument_name = config["Murfey"]["instrument_name"] try: @@ -109,6 +111,7 @@ def run(): else: known_server = config["Murfey"].get("server") + # Set up argument parser with dynamic defaults based on client config parser = argparse.ArgumentParser(description="Start the Murfey client") parser.add_argument( "--server", @@ -194,16 +197,15 @@ def run(): default=False, help="Do not trigger processing for any data directories currently on disk (you may have started processing for them in a previous murfey run)", ) - args = parser.parse_args() + # Logic to exit early based on parsed args if not args.server: exit("Murfey server not set. Please run with --server host:port") if not args.server.startswith(("http://", "https://")): if "://" in args.server: exit("Unknown server protocol. Only http:// and https:// are allowed") args.server = f"http://{args.server}" - if args.remove_files: remove_prompt = Confirm.ask( f"Are you sure you want to remove files from {args.source or Path('.').absolute()}?" @@ -211,6 +213,7 @@ def run(): if not remove_prompt: exit("Exiting") + # If a new server URL is provided, save info to config file murfey_url = urlparse(args.server, allow_fragments=False) if args.server != known_server: # New server specified. Verify that it is real @@ -232,8 +235,7 @@ def run(): if args.no_transfer: log.info("No files will be transferred as --no-transfer flag was specified") - from pprint import pprint - + # Check ISPyB (if set up) for ongoing visits ongoing_visits = [] if args.visit: ongoing_visits = [args.visit] @@ -250,35 +252,38 @@ def run(): _enable_webbrowser_in_cygwin() + # Set up additional log handlers log.setLevel(logging.DEBUG) log_queue = Queue() input_queue = Queue() - # rich_handler = DirectableRichHandler(log_queue, enable_link_path=False) + # Rich-based console handler rich_handler = DirectableRichHandler(enable_link_path=False) rich_handler.setLevel(logging.DEBUG if args.debug else logging.INFO) + # Set up websocket app and handler client_id = requests.get(f"{murfey_url.geturl()}/new_client_id/").json() ws = murfey.client.websocket.WSApp( server=args.server, id=client_id["new_id"], ) + ws_handler = CustomHandler(ws.send) + # Add additional handlers and set logging levels logging.getLogger().addHandler(rich_handler) - handler = CustomHandler(ws.send) - logging.getLogger().addHandler(handler) + logging.getLogger().addHandler(ws_handler) logging.getLogger("murfey").setLevel(logging.INFO) logging.getLogger("websocket").setLevel(logging.WARNING) log.info("Starting Websocket connection") - status_bar = StatusBar() - + # Load machine data for subsequent sections machine_data = requests.get( f"{murfey_url.geturl()}/instruments/{instrument_name}/machine" ).json() gain_ref: Path | None = None + # Set up Murfey environment instance and map it to websocket app instance_environment = MurfeyInstanceEnvironment( url=murfey_url, client_id=ws.id, @@ -295,9 +300,10 @@ def run(): else "" ), ) - ws.environment = instance_environment + # Set up and run Murfey TUI app + status_bar = StatusBar() rich_handler.redirect = True app = MurfeyTUI( environment=instance_environment, From 65f6580a897fddf152f8af2f1063e6b588727b8b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 15 May 2025 15:41:22 +0100 Subject: [PATCH 2/6] Added log to record client ID registered; added more type hinting --- src/murfey/server/websocket.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index ba04ef309..c69de18ac 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -7,7 +7,7 @@ from typing import Any, Dict, TypeVar, Union from fastapi import APIRouter, WebSocket, WebSocketDisconnect -from sqlmodel import select +from sqlmodel import Session, select import murfey.server.prometheus as prom from murfey.server.murfey_db import get_murfey_db_session @@ -38,8 +38,9 @@ async def connect( @staticmethod def _register_new_client(client_id: int): + log.debug(f"Registering new client with ID {client_id}") new_client = ClientEnvironment(client_id=client_id, connected=True) - murfey_db = next(get_murfey_db_session()) + murfey_db: Session = next(get_murfey_db_session()) murfey_db.add(new_client) murfey_db.commit() murfey_db.close() @@ -47,7 +48,7 @@ def _register_new_client(client_id: int): def disconnect(self, client_id: int | str, unregister_client: bool = True): self.active_connections.pop(client_id) if unregister_client: - murfey_db = next(get_murfey_db_session()) + murfey_db: Session = next(get_murfey_db_session()) client_env = murfey_db.exec( select(ClientEnvironment).where( ClientEnvironment.client_id == client_id @@ -73,7 +74,7 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int): while True: data = await websocket.receive_text() try: - json_data = json.loads(data) + json_data: dict = json.loads(data) if json_data["type"] == "log": # and isinstance(json_data, dict) json_data.pop("type") await forward_log(json_data, websocket) @@ -100,7 +101,7 @@ async def websocket_connection_endpoint( while True: data = await websocket.receive_text() try: - json_data = json.loads(data) + json_data: dict = json.loads(data) if json_data.get("type") == "log": # and isinstance(json_data, dict) json_data.pop("type") await forward_log(json_data, websocket) @@ -115,12 +116,12 @@ async def websocket_connection_endpoint( await manager.broadcast(f"Client #{client_id} disconnected") -async def check_connections(active_connections): +async def check_connections(active_connections: list[WebSocket]): log.info("Checking connections") for connection in active_connections: log.info("Checking response") try: - await asyncio.wait_for(connection.receive(), timeout=6) + await asyncio.wait_for(connection.receive(), timeout=10) except asyncio.TimeoutError: log.info(f"Disconnecting Client {connection[0]}") manager.disconnect(connection[0], connection[1]) @@ -139,7 +140,7 @@ async def forward_log(logrecord: dict[str, Any], websocket: WebSocket): @ws.delete("/test/{client_id}") async def close_ws_connection(client_id: int): - murfey_db = next(get_murfey_db_session()) + murfey_db: Session = next(get_murfey_db_session()) client_env = murfey_db.exec( select(ClientEnvironment).where(ClientEnvironment.client_id == client_id) ).one() From 1d8a11a9d7a3ac9d3891a5d19dca78a08156d280 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 15 May 2025 15:45:30 +0100 Subject: [PATCH 3/6] Encased 'WebSocket.send()' and 'WebSocket.close()' in try-except blocks; added regular pinging when executing 'WebSocketApp.run_forever()' --- src/murfey/client/websocket.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/murfey/client/websocket.py b/src/murfey/client/websocket.py index bc03d1f3e..d5be8bcb0 100644 --- a/src/murfey/client/websocket.py +++ b/src/murfey/client/websocket.py @@ -87,7 +87,7 @@ def _run_websocket_event_loop(self): backoff = 0 while True: attempt_start = time.perf_counter() - connection_failure = self._ws.run_forever() + connection_failure = self._ws.run_forever(ping_interval=30, ping_timeout=10) if not connection_failure: break if (time.perf_counter() - attempt_start) < 5: @@ -108,7 +108,10 @@ def _send_queue_feeder(self): continue while not self._ready: time.sleep(0.3) - self._ws.send(element) + try: + self._ws.send(element) + except Exception: + log.error("Error sending message through websocket", exc_info=True) self._send_queue.task_done() log.debug("Websocket send-queue-feeder thread stopped") @@ -135,7 +138,10 @@ def close(self): self._send_queue.put(None) self._feeder_thread.join() self._receiver_thread.join() - self._ws.close() + try: + self._ws.close() + except Exception: + log.error("Error closing websocket connection", exc_info=True) def on_message(self, ws: websocket.WebSocketApp, message: str): self._receive_queue.put(message) From 0907d7b76ea451dc909f3794bc45d4dac82bda43 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 15 May 2025 16:09:36 +0100 Subject: [PATCH 4/6] Updated type hints --- src/murfey/client/__init__.py | 4 ++-- src/murfey/server/websocket.py | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/murfey/client/__init__.py b/src/murfey/client/__init__.py index eb454c29e..aafc38697 100644 --- a/src/murfey/client/__init__.py +++ b/src/murfey/client/__init__.py @@ -13,7 +13,7 @@ from pathlib import Path from pprint import pprint from queue import Queue -from typing import List, Literal +from typing import Literal from urllib.parse import ParseResult, urlparse import requests @@ -41,7 +41,7 @@ def write_config(config: configparser.ConfigParser): def main_loop( - source_watchers: List[murfey.client.watchdir.DirWatcher], + source_watchers: list[murfey.client.watchdir.DirWatcher], appearance_time: float, transfer_all: bool, ): diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index c69de18ac..ab1fad9a6 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -4,7 +4,7 @@ import json import logging from datetime import datetime -from typing import Any, Dict, TypeVar, Union +from typing import Any, TypeVar from fastapi import APIRouter, WebSocket, WebSocketDisconnect from sqlmodel import Session, select @@ -22,7 +22,7 @@ class ConnectionManager: def __init__(self): - self.active_connections: Dict[int | str, WebSocket] = {} + self.active_connections: dict[int | str, WebSocket] = {} async def connect( self, websocket: WebSocket, client_id: int | str, register_client: bool = True @@ -93,7 +93,8 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int): @ws.websocket("/connect/{client_id}") async def websocket_connection_endpoint( - websocket: WebSocket, client_id: Union[int, str] + websocket: WebSocket, + client_id: int | str, ): await manager.connect(websocket, client_id, register_client=False) await manager.broadcast(f"Client {client_id} joined") @@ -157,7 +158,7 @@ async def close_ws_connection(client_id: int): @ws.delete("/connect/{client_id}") -async def close_unrecorded_ws_connection(client_id: Union[int, str]): +async def close_unrecorded_ws_connection(client_id: int | str): client_id_str = str(client_id).replace("\r\n", "").replace("\n", "") log.info(f"Disconnecting {client_id_str}") manager.disconnect(client_id) From 1ba068318d85975a3fc1347dd52a653126751940 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 15 May 2025 17:40:40 +0100 Subject: [PATCH 5/6] Older 'Union' type hint is still needed in Python 3.9, oops --- src/murfey/server/websocket.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index ab1fad9a6..b4523fd73 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -4,7 +4,7 @@ import json import logging from datetime import datetime -from typing import Any, TypeVar +from typing import Any, TypeVar, Union from fastapi import APIRouter, WebSocket, WebSocketDisconnect from sqlmodel import Session, select @@ -22,10 +22,13 @@ class ConnectionManager: def __init__(self): - self.active_connections: dict[int | str, WebSocket] = {} + self.active_connections: dict[Union[int, str], WebSocket] = {} async def connect( - self, websocket: WebSocket, client_id: int | str, register_client: bool = True + self, + websocket: WebSocket, + client_id: Union[int, str], + register_client: bool = True, ): await websocket.accept() self.active_connections[client_id] = websocket @@ -45,7 +48,7 @@ def _register_new_client(client_id: int): murfey_db.commit() murfey_db.close() - def disconnect(self, client_id: int | str, unregister_client: bool = True): + def disconnect(self, client_id: Union[int, str], unregister_client: bool = True): self.active_connections.pop(client_id) if unregister_client: murfey_db: Session = next(get_murfey_db_session()) @@ -94,7 +97,7 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int): @ws.websocket("/connect/{client_id}") async def websocket_connection_endpoint( websocket: WebSocket, - client_id: int | str, + client_id: Union[int, str], ): await manager.connect(websocket, client_id, register_client=False) await manager.broadcast(f"Client {client_id} joined") @@ -158,7 +161,7 @@ async def close_ws_connection(client_id: int): @ws.delete("/connect/{client_id}") -async def close_unrecorded_ws_connection(client_id: int | str): +async def close_unrecorded_ws_connection(client_id: Union[int, str]): client_id_str = str(client_id).replace("\r\n", "").replace("\n", "") log.info(f"Disconnecting {client_id_str}") manager.disconnect(client_id) From d1640c079951b8eb52505cd57baba8358d95d673 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 20 May 2025 11:56:13 +0100 Subject: [PATCH 6/6] See if 'int | str' is accepted as a type hint for ConnectionManger --- src/murfey/server/websocket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/websocket.py b/src/murfey/server/websocket.py index b4523fd73..13cbcc73e 100644 --- a/src/murfey/server/websocket.py +++ b/src/murfey/server/websocket.py @@ -22,7 +22,7 @@ class ConnectionManager: def __init__(self): - self.active_connections: dict[Union[int, str], WebSocket] = {} + self.active_connections: dict[int | str, WebSocket] = {} async def connect( self,