From ad3c1466f008d4d9ebc1dc8d067bca5c12c7fbd7 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 3 Mar 2026 13:29:17 +0300 Subject: [PATCH 1/5] add pinger routine --- lighter/ws_client.py | 50 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/lighter/ws_client.py b/lighter/ws_client.py index 1369417..8e05680 100644 --- a/lighter/ws_client.py +++ b/lighter/ws_client.py @@ -1,4 +1,7 @@ import json +import threading +import asyncio +import time from websockets.sync.client import connect from websockets.client import connect as connect_async from lighter.configuration import Configuration @@ -12,6 +15,7 @@ def __init__( account_ids=[], on_order_book_update=print, on_account_update=print, + ping_interval=30, ): if host is None: host = Configuration.get_default().host.replace("https://", "") @@ -33,6 +37,8 @@ def __init__( self.on_account_update = on_account_update self.ws = None + self.ping_interval = ping_interval + self.stop_event = threading.Event() def on_message(self, ws, message): if isinstance(message, str): @@ -148,21 +154,57 @@ def handle_unhandled_message(self, message): raise Exception(f"Unhandled message: {message}") def on_error(self, ws, error): + self.stop_event.set() raise Exception(f"Error: {error}") def on_close(self, ws, close_status_code, close_msg): + self.stop_event.set() raise Exception(f"Closed: {close_status_code} {close_msg}") + def _ping_loop(self): + while not self.stop_event.is_set(): + time.sleep(self.ping_interval) + if self.ws and not self.stop_event.is_set(): + try: + self.ws.send(json.dumps({"type": "ping"})) + except: + break + + async def _ping_loop_async(self): + while not self.stop_event.is_set(): + await asyncio.sleep(self.ping_interval) + if self.ws and not self.stop_event.is_set(): + try: + await self.ws.send(json.dumps({"type": "ping"})) + except: + break + def run(self): + self.stop_event.clear() ws = connect(self.base_url) self.ws = ws - for message in ws: - self.on_message(ws, message) + ping_thread = threading.Thread(target=self._ping_loop, daemon=True) + ping_thread.start() + + try: + for message in ws: + self.on_message(ws, message) + finally: + self.stop_event.set() + self.ws = None async def run_async(self): + self.stop_event.clear() ws = await connect_async(self.base_url) self.ws = ws - async for message in ws: - await self.on_message_async(ws, message) + ping_task = asyncio.create_task(self._ping_loop_async()) + + try: + async for message in ws: + await self.on_message_async(ws, message) + finally: + self.stop_event.set() + ping_task.cancel() + self.ws = None From b5cbeada59e5eb30480b14769f378671f05d6f9c Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 3 Mar 2026 13:38:38 +0300 Subject: [PATCH 2/5] handle pong --- lighter/ws_client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lighter/ws_client.py b/lighter/ws_client.py index 8e05680..123d0c2 100644 --- a/lighter/ws_client.py +++ b/lighter/ws_client.py @@ -59,6 +59,9 @@ def on_message(self, ws, message): elif message_type == "ping": # Respond to ping with pong ws.send(json.dumps({"type": "pong"})) + elif message_type == "pong": + # Noop + pass else: self.handle_unhandled_message(message) @@ -71,6 +74,9 @@ async def on_message_async(self, ws, message): elif message_type == "ping": # Respond to ping with pong await ws.send(json.dumps({"type": "pong"})) + elif message_type == "pong": + # Noop + pass else: self.on_message(ws, message) From 43d5f01d5e6464b5099cf37700461c79cf66e020 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 3 Mar 2026 14:18:21 +0300 Subject: [PATCH 3/5] exception --- lighter/ws_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lighter/ws_client.py b/lighter/ws_client.py index 123d0c2..d7f4e86 100644 --- a/lighter/ws_client.py +++ b/lighter/ws_client.py @@ -173,7 +173,7 @@ def _ping_loop(self): if self.ws and not self.stop_event.is_set(): try: self.ws.send(json.dumps({"type": "ping"})) - except: + except Exception: break async def _ping_loop_async(self): @@ -182,7 +182,7 @@ async def _ping_loop_async(self): if self.ws and not self.stop_event.is_set(): try: await self.ws.send(json.dumps({"type": "ping"})) - except: + except Exception: break def run(self): From 061b8f91e871e68879813d3c8c5c944c074d8b92 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 3 Mar 2026 14:42:51 +0300 Subject: [PATCH 4/5] some refactor --- lighter/ws_client.py | 80 +++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/lighter/ws_client.py b/lighter/ws_client.py index d7f4e86..3d1a1c1 100644 --- a/lighter/ws_client.py +++ b/lighter/ws_client.py @@ -38,7 +38,6 @@ def __init__( self.ws = None self.ping_interval = ping_interval - self.stop_event = threading.Event() def on_message(self, ws, message): if isinstance(message, str): @@ -60,7 +59,6 @@ def on_message(self, ws, message): # Respond to ping with pong ws.send(json.dumps({"type": "pong"})) elif message_type == "pong": - # Noop pass else: self.handle_unhandled_message(message) @@ -160,57 +158,71 @@ def handle_unhandled_message(self, message): raise Exception(f"Unhandled message: {message}") def on_error(self, ws, error): - self.stop_event.set() raise Exception(f"Error: {error}") def on_close(self, ws, close_status_code, close_msg): - self.stop_event.set() raise Exception(f"Closed: {close_status_code} {close_msg}") - def _ping_loop(self): - while not self.stop_event.is_set(): - time.sleep(self.ping_interval) - if self.ws and not self.stop_event.is_set(): + def _ping_loop(self, stop_event): + while not stop_event.is_set(): + stop_event.wait(self.ping_interval) + if self.ws and not stop_event.is_set(): try: self.ws.send(json.dumps({"type": "ping"})) - except Exception: + except Exception as e: + print(f"Ping failed: {e}") break - async def _ping_loop_async(self): - while not self.stop_event.is_set(): - await asyncio.sleep(self.ping_interval) - if self.ws and not self.stop_event.is_set(): - try: + async def _ping_loop_async(self, stop_event): + while not stop_event.is_set(): + try: + await asyncio.sleep(self.ping_interval) + if self.ws and not stop_event.is_set(): await self.ws.send(json.dumps({"type": "ping"})) - except Exception: - break + except asyncio.CancelledError: + break + except Exception as e: + print(f"Async ping failed: {e}") + break def run(self): - self.stop_event.clear() - ws = connect(self.base_url) - self.ws = ws + stop_event = threading.Event() + ping_thread = None + try: + with connect(self.base_url) as ws: + self.ws = ws + ping_thread = threading.Thread(target=self._ping_loop, args=(stop_event,), daemon=True) + ping_thread.start() - ping_thread = threading.Thread(target=self._ping_loop, daemon=True) - ping_thread.start() + for message in ws: + self.on_message(ws, message) - try: - for message in ws: - self.on_message(ws, message) + except Exception as e: + print(f"Connection terminated unexpectedly: {e}") finally: - self.stop_event.set() + stop_event.set() self.ws = None + if ping_thread: + ping_thread.join(timeout=1) async def run_async(self): - self.stop_event.clear() - ws = await connect_async(self.base_url) - self.ws = ws + stop_event = asyncio.Event() + ping_task = None + try: + async with connect_async(self.base_url) as ws: + self.ws = ws + ping_task = asyncio.create_task(self._ping_loop_async(stop_event)) - ping_task = asyncio.create_task(self._ping_loop_async()) + async for message in ws: + await self.on_message_async(ws, message) - try: - async for message in ws: - await self.on_message_async(ws, message) + except Exception as e: + print(f"Connection terminated unexpectedly: {e}") finally: - self.stop_event.set() - ping_task.cancel() + stop_event.set() + if ping_task: + ping_task.cancel() + # Wait for the task to acknowledge cancellation + await asyncio.gather(ping_task, return_exceptions=True) self.ws = None + From d01f0714471facdc22644878b356042e29b81c21 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 3 Mar 2026 14:55:46 +0300 Subject: [PATCH 5/5] update --- lighter/ws_client.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lighter/ws_client.py b/lighter/ws_client.py index 3d1a1c1..d226dec 100644 --- a/lighter/ws_client.py +++ b/lighter/ws_client.py @@ -1,7 +1,6 @@ import json import threading import asyncio -import time from websockets.sync.client import connect from websockets.client import connect as connect_async from lighter.configuration import Configuration @@ -196,14 +195,11 @@ def run(self): for message in ws: self.on_message(ws, message) - - except Exception as e: - print(f"Connection terminated unexpectedly: {e}") finally: stop_event.set() - self.ws = None if ping_thread: ping_thread.join(timeout=1) + self.ws = None # clear after thread has exited async def run_async(self): stop_event = asyncio.Event() @@ -215,9 +211,6 @@ async def run_async(self): async for message in ws: await self.on_message_async(ws, message) - - except Exception as e: - print(f"Connection terminated unexpectedly: {e}") finally: stop_event.set() if ping_task: