From a3845c30680ea55066486fba3265b9b2f9689d07 Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 14:33:05 -0700 Subject: [PATCH 01/10] update gitignore --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 15201acc..c638dc45 100644 --- a/.gitignore +++ b/.gitignore @@ -129,7 +129,7 @@ celerybeat.pid # Environments .env -.venv +.venv* env/ venv/ ENV/ @@ -169,3 +169,5 @@ cython_debug/ # PyPI configuration file .pypirc + +.idea \ No newline at end of file From 14f1d12f970d1233b671b92389a2f3707c015862 Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 14:33:36 -0700 Subject: [PATCH 02/10] make LOCALNET_IMAGE_NAME controllable in conftest --- tests/conftest.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 0e312e3e..1cf88368 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,11 @@ +import os import subprocess from collections import namedtuple CONTAINER_NAME_PREFIX = "test_local_chain_" -LOCALNET_IMAGE_NAME = "ghcr.io/opentensor/subtensor-localnet:devnet-ready" +LOCALNET_IMAGE_NAME = os.getenv( + "LOCALNET_IMAGE_NAME", "ghcr.io/opentensor/subtensor-localnet:devnet-ready" +) Container = namedtuple("Container", ["process", "name", "uri"]) From 4ee61b187fb71c7dec1213588b12fe374ec6564c Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 14:43:30 -0700 Subject: [PATCH 03/10] fix websocket poison pill that gets stuck on Max retries exceeded --- async_substrate_interface/async_substrate.py | 218 ++++++++++++------- 1 file changed, 135 insertions(+), 83 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 152b0398..b98c3987 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -641,10 +641,35 @@ def state(self): return self.ws.state async def __aenter__(self): + await self._restart_handler_if_dead() if self.state not in (State.CONNECTING, State.OPEN): await self.connect() return self + async def _restart_handler_if_dead(self) -> None: + """ + Revive the background send/recv handler if it has terminated. + + When `_handler` finishes (for example by returning `TimeoutError("Max retries exceeded.")` after exhausting its + retries), the underlying socket may still be in the OPEN state. In that case neither `__aenter__` nor `connect` + recreate the handler, since both only act when the state is not OPEN/CONNECTING, so the connection is wedged + permanently and every `retrieve` re-raises the dead task's stored error. Detect that here and force a clean + reconnect (fresh socket and handler) under the lock. + """ + task = self._send_recv_task + if task is None or not task.done(): + return + async with self._lock: + task = self._send_recv_task + if task is None or not task.done(): + # Another caller already revived the handler. + return + if not task.cancelled(): + # Consume the dead task's outcome so it is not later reported as an unretrieved exception. + task.exception() + self._attempts = 0 + await self._connect_internal(force=True) + async def mark_waiting_for_response(self): """ Mark that a response is expected. This will cause the websocket to not automatically close. @@ -1253,6 +1278,23 @@ async def retrieve(self, item_id: str) -> Optional[dict]: raise e return None + async def discard_request(self, item_id: str) -> None: + """ + Drop a request that never completed and release the subscription permit that `send` acquired for it. + + This is idempotent and safe to call on ids that already completed: + `retrieve` removes those from `_received` after releasing their permit, so this becomes a no-op and never + double-releases. The id is deliberately not returned to `_in_use_ids`, so a late response from the node for it + is dropped by `_dispatch_response` (which checks `_received`) rather than misrouted to a reused id. + """ + async with self._lock: + fut = self._received.pop(item_id, None) + self._inflight.pop(item_id, None) + if fut is not None: + self.max_subscriptions.release() + if not fut.done(): + fut.cancel() + class AsyncSubstrateInterface(SubstrateMixin): ws: "Websocket" @@ -2728,84 +2770,88 @@ async def _make_rpc_request( async with self.ws as ws: await ws.mark_waiting_for_response() - for payload in payloads: - item_id = await ws.send(payload["payload"]) - request_manager.add_request(item_id, payload["id"]) - # truncate to 2000 chars for debug logging - if len(stringified_payload := str(payload)) < 2_000: - output_payload = stringified_payload - else: - output_payload = f"{stringified_payload[:2_000]} (truncated)" - logger.debug( - f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}" - ) + try: + for payload in payloads: + item_id = await ws.send(payload["payload"]) + request_manager.add_request(item_id, payload["id"]) + # truncate to 2000 chars for debug logging + if len(stringified_payload := str(payload)) < 2_000: + output_payload = stringified_payload + else: + output_payload = f"{stringified_payload[:2_000]} (truncated)" + logger.debug( + f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}" + ) - while True: - for item_id in request_manager.unresponded(): - if ( - item_id not in request_manager.responses - or inspect.iscoroutinefunction(result_handler) - ): - if response := await ws.retrieve(item_id): - if ( - inspect.iscoroutinefunction(result_handler) - and not subscription_added - ): - # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} - # with {subscription_id : payload_id} - try: - item_id = request_manager.overwrite_request( - item_id, response["result"] - ) - subscription_added = True - except KeyError: - logger.error( - f"Error received from subtensor for {item_id}: {response}\n" - f"Currently received responses: {request_manager.get_results()}" + while True: + for item_id in request_manager.unresponded(): + if ( + item_id not in request_manager.responses + or inspect.iscoroutinefunction(result_handler) + ): + if response := await ws.retrieve(item_id): + if ( + inspect.iscoroutinefunction(result_handler) + and not subscription_added + ): + # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} + # with {subscription_id : payload_id} + try: + item_id = request_manager.overwrite_request( + item_id, response["result"] + ) + subscription_added = True + except KeyError: + logger.error( + f"Error received from subtensor for {item_id}: {response}\n" + f"Currently received responses: {request_manager.get_results()}" + ) + raise SubstrateRequestException(str(response)) + ( + decoded_response, + complete, + ) = await self._process_response( + response, + item_id, + value_scale_type, + storage_item, + result_handler, + runtime=runtime, + ) + if ( + result_processor is not None + and not inspect.iscoroutinefunction(result_handler) + ): + decoded_response = result_processor( + decoded_response, item_id ) - raise SubstrateRequestException(str(response)) - ( - decoded_response, - complete, - ) = await self._process_response( - response, - item_id, - value_scale_type, - storage_item, - result_handler, - runtime=runtime, - ) - if ( - result_processor is not None - and not inspect.iscoroutinefunction(result_handler) - ): - decoded_response = result_processor( - decoded_response, item_id + request_manager.add_response( + item_id, decoded_response, complete ) - request_manager.add_response( - item_id, decoded_response, complete - ) - # truncate to 2000 chars for debug logging - if ( - len(stringified_response := str(decoded_response)) - < 2_000 - ): - output_response = stringified_response - # avoids clogging logs up needlessly (esp for Metadata stuff) - else: - output_response = ( - f"{stringified_response[:2_000]} (truncated)" + # truncate to 2000 chars for debug logging + if ( + len(stringified_response := str(decoded_response)) + < 2_000 + ): + output_response = stringified_response + # avoids clogging logs up needlessly (esp for Metadata stuff) + else: + output_response = ( + f"{stringified_response[:2_000]} (truncated)" + ) + logger.debug( + f"Received response for item ID {item_id}:\n{output_response}\n" + f"Complete: {complete}" ) - logger.debug( - f"Received response for item ID {item_id}:\n{output_response}\n" - f"Complete: {complete}" - ) - if request_manager.is_complete: - await ws.mark_response_received() - break - else: - await asyncio.sleep(0.01) + if request_manager.is_complete: + break + else: + await asyncio.sleep(0.01) + finally: + await ws.mark_response_received() + for item_id in request_manager.unresponded(): + await ws.discard_request(item_id) return request_manager.get_results() @@ -3670,17 +3716,23 @@ async def runtime_calls( # Send all calls as one JSON-RPC batch frame, then gather responses by id. async with self.ws as ws: await ws.mark_waiting_for_response() - item_ids = await ws.send_batch(payloads) + item_ids: list[str] = [] responses: dict[str, dict] = {} - pending = set(item_ids) - while pending: - for item_id in list(pending): - if (response := await ws.retrieve(item_id)) is not None: - responses[item_id] = response - pending.discard(item_id) - if pending: - await asyncio.sleep(0.01) - await ws.mark_response_received() + pending: set[str] = set() + try: + item_ids = await ws.send_batch(payloads) + pending = set(item_ids) + while pending: + for item_id in list(pending): + if (response := await ws.retrieve(item_id)) is not None: + responses[item_id] = response + pending.discard(item_id) + if pending: + await asyncio.sleep(0.01) + finally: + await ws.mark_response_received() + for item_id in pending: + await ws.discard_request(item_id) # Decode each result against its own output type, preserving input order. results: list[ScaleValue] = [] From 6055058368bf59772e3c034fd7ad8c66624b86f7 Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 14:43:52 -0700 Subject: [PATCH 04/10] add `AsyncSilenceProxy` --- tests/helpers/async_proxy.py | 95 ++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/helpers/async_proxy.py diff --git a/tests/helpers/async_proxy.py b/tests/helpers/async_proxy.py new file mode 100644 index 00000000..9ef80a91 --- /dev/null +++ b/tests/helpers/async_proxy.py @@ -0,0 +1,95 @@ +import asyncio +import logging + +from websockets.asyncio.client import connect +from websockets.asyncio.server import serve +from websockets.exceptions import ConnectionClosed + +logger = logging.getLogger("websockets.proxy") + + +class AsyncSilenceProxy: + """ + Async websocket proxy that can be commanded to go silent and then resume. + + It relays application frames between a downstream client and an upstream node. While paused it stops relaying + application frames in both directions, but the underlying connections stay OPEN because the websockets library keeps + answering ping/pong automatically. That reproduces the production "poison pill": the client's activity timeout fires + and its retries exhaust against a socket that never closes, leaving a dead handler on an OPEN connection. + + Usage: + proxy = await AsyncSilenceProxy(upstream_url).start() + ... # traffic flows + proxy.pause() # node goes silent, socket stays open + ... # client retries exhaust -> poison + proxy.resume() # traffic flows again + await proxy.close() + + This is the async analogue of `tests/helpers/proxy_server.py`, using a command-driven pause (an `asyncio.Event`) + instead of a time-based one so tests are deterministic. + """ + + def __init__(self, upstream: str, host: str = "127.0.0.1"): + self.upstream = upstream + self.host = host + self._server = None + self._forwarding = asyncio.Event() + self._forwarding.set() + self._tasks: set[asyncio.Task] = set() + + @property + def port(self) -> int: + """The OS-assigned port the client connects to. Valid after `start()`.""" + return self._server.sockets[0].getsockname()[1] + + @property + def url(self) -> str: + return f"ws://{self.host}:{self.port}" + + async def start(self) -> "AsyncSilenceProxy": + self._server = await serve(self._handle_client, self.host, 0) + return self + + def pause(self) -> None: + """Stop relaying application frames; the sockets stay OPEN (go silent).""" + self._forwarding.clear() + + def resume(self) -> None: + """Resume relaying application frames.""" + self._forwarding.set() + + async def close(self) -> None: + # Unblock and cancel the pumps so handlers return promptly, then shut the listener down. Cancelling first avoids + # waiting on a pump that is blocked reading the still-open (idle) upstream. + self.resume() + for task in list(self._tasks): + task.cancel() + if self._server is not None: + self._server.close() + await self._server.wait_closed() + + async def _handle_client(self, client) -> None: + async with connect(self.upstream) as upstream: + pumps = [ + asyncio.create_task(self._pump(client, upstream)), + asyncio.create_task(self._pump(upstream, client)), + ] + self._tasks.update(pumps) + try: + # When either direction ends (e.g. the client reconnects and drops this connection), stop the sibling so + # the handler returns instead of blocking forever on the still-open upstream. + await asyncio.wait(pumps, return_when=asyncio.FIRST_COMPLETED) + finally: + for pump in pumps: + pump.cancel() + self._tasks.discard(pump) + + async def _pump(self, src, dst) -> None: + try: + async for message in src: + # Block here while paused: the frame is held (not relayed) until `resume()`, so the peer sees silence on + # an otherwise-open socket. + await self._forwarding.wait() + await dst.send(message) + except ConnectionClosed: + pass From b56c7fe44df61c4206044aee3df946ddadd2cbc1 Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 14:44:06 -0700 Subject: [PATCH 05/10] add e2e --- .../test_websocket_poison_recovery_e2e.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 tests/e2e_tests/test_websocket_poison_recovery_e2e.py diff --git a/tests/e2e_tests/test_websocket_poison_recovery_e2e.py b/tests/e2e_tests/test_websocket_poison_recovery_e2e.py new file mode 100644 index 00000000..465078a4 --- /dev/null +++ b/tests/e2e_tests/test_websocket_poison_recovery_e2e.py @@ -0,0 +1,71 @@ +import subprocess + +import pytest +from websockets.protocol import State + +from async_substrate_interface.async_substrate import AsyncSubstrateInterface +from tests.conftest import start_docker_container +from tests.e2e_tests.test_substrate_addons import wait_for_output +from tests.helpers.async_proxy import AsyncSilenceProxy + + +@pytest.fixture(scope="function") +def local_chain(): + container = start_docker_container(9955, "poison") + try: + if not wait_for_output(container.process, "Imported #1", timeout=60): + raise TimeoutError( + "Docker container did not start properly - 'Imported #1' not found" + ) + yield container + finally: + subprocess.run(["docker", "kill", container.name]) + container.process.kill() + + +@pytest.mark.asyncio +async def test_poison_pill_recovers_after_silence(local_chain): + """ + A dead handler left on an OPEN socket (the relayed "Max retries exceeded." poison pill) must recover on the next + call instead of failing forever. + + The AsyncSilenceProxy sits between the client and the localnet. Pausing it makes the node go silent without closing + the socket, so the client's retries exhaust and the background handler dies while `ws.state` stays OPEN. Resuming + and issuing one more call must transparently rebuild the connection and succeed. + """ + proxy = await AsyncSilenceProxy(local_chain.uri).start() + try: + substrate = AsyncSubstrateInterface( + proxy.url, + retry_timeout=2.0, + max_retries=2, + ws_shutdown_timer=None, + ) + try: + # Baseline: traffic flows through the proxy. + head = await substrate.get_chain_head() + assert head.startswith("0x") + + # Go silent: the socket stays open but no responses come back, so the client's retries exhaust and the + # handler dies. + proxy.pause() + poison = None + try: + await substrate.get_chain_head() + except Exception as exc: # noqa: BLE001 + poison = exc + assert poison is not None + assert "Max retries exceeded" in str(poison) + + # The exact poison condition: a finished handler on an OPEN socket. + assert substrate.ws._send_recv_task.done() + assert substrate.ws.state is State.OPEN + + # Resume and prove the next call rebuilds the connection and succeeds. + proxy.resume() + recovered = await substrate.get_chain_head() + assert recovered.startswith("0x") + finally: + await substrate.close() + finally: + await proxy.close() From a4928cce1056102c3eb317ca28795fb5c201b05c Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 14:44:34 -0700 Subject: [PATCH 06/10] add unit tests --- .../asyncio_/test_websocket_resilience.py | 190 ++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 tests/unit_tests/asyncio_/test_websocket_resilience.py diff --git a/tests/unit_tests/asyncio_/test_websocket_resilience.py b/tests/unit_tests/asyncio_/test_websocket_resilience.py new file mode 100644 index 00000000..21b3e9a1 --- /dev/null +++ b/tests/unit_tests/asyncio_/test_websocket_resilience.py @@ -0,0 +1,190 @@ +""" +Regression tests for the Websocket resilience fixes. + +These cover the failure modes behind the relayed "Max retries exceeded." errors: +- A dead background handler on a still-OPEN socket must be revived on re-entry instead of permanently poisoning the +shared connection ("poison pill"). +- A request that fails mid-flight must still balance `_waiting_for_response` and give back the subscription permit that +`send` acquired for it. +- `discard_request` must release that permit, drop the pending future, and burn the id so a late node response can never +be misrouted to a reused id. +""" + +import asyncio +from contextlib import suppress +from unittest.mock import AsyncMock, MagicMock + +import pytest +from websockets.protocol import State + +from async_substrate_interface.async_substrate import ( + AsyncSubstrateInterface, + Websocket, +) + + +def _make_payload(id_: str) -> dict: + return { + "id": id_, + "payload": { + "jsonrpc": "2.0", + "method": "state_getRuntimeVersion", + "params": [], + }, + } + + +class _FakePoisonWs: + """ + Minimal Websocket stand-in whose `retrieve` always poisons (raises). + + It mirrors the real permit lifecycle so the cleanup path of `_make_rpc_request` can be exercised without real + sockets: `send` acquires a permit, and on the failure path the only thing that gives it back is `discard_request` + (exactly what the `_make_rpc_request` finally-block is expected to call). + """ + + def __init__(self, max_subscriptions: int = 1024): + self._waiting_for_response = 0 + self.max_subscriptions = asyncio.Semaphore(max_subscriptions) + self.permits_held = 0 + self._sent = 0 + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + async def mark_waiting_for_response(self): + self._waiting_for_response += 1 + + async def mark_response_received(self): + self._waiting_for_response -= 1 + + async def send(self, payload): + await self.max_subscriptions.acquire() + self.permits_held += 1 + item_id = f"id{self._sent}" + self._sent += 1 + return item_id + + async def retrieve(self, item_id): + raise TimeoutError("Max retries exceeded.") + + async def discard_request(self, item_id): + self.max_subscriptions.release() + self.permits_held -= 1 + + +def _interface_with_ws(fake_ws) -> AsyncSubstrateInterface: + substrate = AsyncSubstrateInterface("ws://localhost", _mock=True) + substrate.ws = MagicMock() + substrate.ws.__aenter__ = AsyncMock(return_value=fake_ws) + substrate.ws.__aexit__ = AsyncMock(return_value=False) + return substrate + + +@pytest.mark.asyncio +async def test_dead_handler_is_revived_on_enter(): + """ + A finished `_send_recv_task` on an OPEN socket must be revived by `__aenter__`. + + Without the revive, the socket stays OPEN so neither `__aenter__` nor `connect` rebuild the handler, the dead task + lingers, and every `retrieve` re-raises its stored error forever. + """ + ws = Websocket( + "ws://fake:9944", + max_retries=1, + retry_timeout=0.1, + shutdown_timer=None, + ) + + async def _dead(): + # The real handler returns (does not raise) the error after its retries. + return TimeoutError("Max retries exceeded.") + + ws._send_recv_task = asyncio.ensure_future(_dead()) + await ws._send_recv_task + ws.ws = MagicMock(state=State.OPEN) + ws._attempts = 5 + + reconnect_calls = [] + + async def fake_connect_internal(force): + reconnect_calls.append(force) + ws.ws = MagicMock(state=State.OPEN) + ws._send_recv_task = asyncio.ensure_future(asyncio.sleep(3600)) + + ws._connect_internal = fake_connect_internal + + try: + await ws.__aenter__() + + assert reconnect_calls == [True] + assert not ws._send_recv_task.done() + assert ws._attempts == 0 + finally: + ws._send_recv_task.cancel() + with suppress(asyncio.CancelledError): + await ws._send_recv_task + + +@pytest.mark.asyncio +async def test_failed_request_balances_waiting_counter(): + """A request that fails mid-flight must still decrement `_waiting_for_response`.""" + fake = _FakePoisonWs() + substrate = _interface_with_ws(fake) + + with pytest.raises(TimeoutError, match="Max retries exceeded."): + await substrate._make_rpc_request([_make_payload("a")]) + + assert fake._waiting_for_response == 0 + + +@pytest.mark.asyncio +async def test_failed_requests_do_not_leak_subscription_permits(): + """Each failed request must give its `send` permit back via `discard_request`.""" + fake = _FakePoisonWs(max_subscriptions=8) + substrate = _interface_with_ws(fake) + + for i in range(5): + with pytest.raises(TimeoutError, match="Max retries exceeded."): + await substrate._make_rpc_request([_make_payload(f"req{i}")]) + + assert fake.permits_held == 0 + assert fake.max_subscriptions._value == 8 + + +@pytest.mark.asyncio +async def test_discard_request_releases_permit_and_burns_id(): + """ + `discard_request` releases the permit and drops the future, the id stays burned, and a late node response for it is + dropped rather than misrouted to a reused id. + """ + ws = Websocket("ws://fake:9944", shutdown_timer=None) + + item_id = "Xy1" + fut = asyncio.get_running_loop().create_future() + ws._received[item_id] = fut + ws._inflight[item_id] = '{"id": "Xy1"}' + ws._in_use_ids.add(item_id) + await ws.max_subscriptions.acquire() + permits_after_send = ws.max_subscriptions._value + + await ws.discard_request(item_id) + + assert item_id not in ws._received + assert item_id not in ws._inflight + # Burned on purpose: kept in _in_use_ids so it cannot be reissued while a late response for it may still arrive. + assert item_id in ws._in_use_ids + assert ws.max_subscriptions._value == permits_after_send + 1 + assert fut.cancelled() + + # A late response for the discarded id must be dropped: no re-created future, no exception. + await ws._dispatch_response({"id": item_id, "result": "0xLATE"}) + assert item_id not in ws._received + + # Idempotent: a second discard for the same id must not double-release a permit. + permits_before = ws.max_subscriptions._value + await ws.discard_request(item_id) + assert ws.max_subscriptions._value == permits_before From 5ae1d02678e6f8fad495c3f487de8b4a1715706e Mon Sep 17 00:00:00 2001 From: ibraheem-latent Date: Fri, 26 Jun 2026 19:04:27 -0700 Subject: [PATCH 07/10] avoid releasing subscription twice --- async_substrate_interface/async_substrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index b98c3987..88eff40f 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -1251,8 +1251,8 @@ async def retrieve(self, item_id: str) -> Optional[dict]: item: Optional[asyncio.Future] = self._received.get(item_id) if item is not None: if item.done(): - self.max_subscriptions.release() res = item.result() + self.max_subscriptions.release() del self._received[item_id] return res else: From 66b5460179e5e68a2c2b31a3b2b69c6fac6598f0 Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 21:19:46 -0700 Subject: [PATCH 08/10] update gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index c638dc45..e058417e 100644 --- a/.gitignore +++ b/.gitignore @@ -170,4 +170,5 @@ cython_debug/ # PyPI configuration file .pypirc -.idea \ No newline at end of file +.idea +.DS_Store From 0959076cec7493fcc391c011847c52cbf60c438e Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Fri, 26 Jun 2026 21:20:05 -0700 Subject: [PATCH 09/10] add tests for Abe's fix --- .../asyncio_/test_websocket_resilience.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/unit_tests/asyncio_/test_websocket_resilience.py b/tests/unit_tests/asyncio_/test_websocket_resilience.py index 21b3e9a1..1a173334 100644 --- a/tests/unit_tests/asyncio_/test_websocket_resilience.py +++ b/tests/unit_tests/asyncio_/test_websocket_resilience.py @@ -8,6 +8,8 @@ `send` acquired for it. - `discard_request` must release that permit, drop the pending future, and burn the id so a late node response can never be misrouted to a reused id. +- A done future whose `result()` raises must not release its permit inside `retrieve`; the paired `discard_request` +owns that single release, so a mid-flight transport error can never double-release the subscription semaphore. """ import asyncio @@ -188,3 +190,40 @@ async def test_discard_request_releases_permit_and_burns_id(): permits_before = ws.max_subscriptions._value await ws.discard_request(item_id) assert ws.max_subscriptions._value == permits_before + + +@pytest.mark.asyncio +async def test_failed_retrieve_then_discard_releases_permit_once(): + """ + A done future whose `result()` raises must not release its permit in `retrieve`. + + `retrieve` releases the permit only on the success path; on the exception path it leaves the still-pending id in + place so the caller's finally-block can hand it to `discard_request`, the single owner of that release. Releasing + inside `retrieve` here (the pre-fix order) would double-release the subscription semaphore once `discard_request` + runs. + """ + ws = Websocket("ws://fake:9944", shutdown_timer=None) + + item_id = "Ab1" + fut = asyncio.get_running_loop().create_future() + fut.set_exception(ConnectionError("connection broke mid-flight")) + ws._received[item_id] = fut + ws._inflight[item_id] = '{"id": "Ab1"}' + ws._in_use_ids.add(item_id) + await ws.max_subscriptions.acquire() + permits_after_send = ws.max_subscriptions._value + + # `retrieve` hits a done future whose `result()` raises. It must propagate that error WITHOUT releasing the permit, + # otherwise the paired `discard_request` would release a second time (the double-release this fix prevents). + with pytest.raises(ConnectionError, match="connection broke mid-flight"): + await ws.retrieve(item_id) + + assert ws.max_subscriptions._value == permits_after_send + assert item_id in ws._received + + # The callers' finally-block then discards the still-pending id, which is the single owner of that release. + await ws.discard_request(item_id) + + assert ws.max_subscriptions._value == permits_after_send + 1 + assert item_id not in ws._received + assert item_id not in ws._inflight From 18f9a2c8f2b0f3198f7bdce7ce9e59fca334d9ed Mon Sep 17 00:00:00 2001 From: Roman Chkhaidze Date: Mon, 29 Jun 2026 10:38:22 -0700 Subject: [PATCH 10/10] Update CHANGELOG.md + bumping version --- CHANGELOG.md | 7 +++++++ pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01a1fec8..894e8023 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 2.2.1 /2026-06-29 + +## What's Changed +* Fix websocket poison connection and leaks on failed requests by @basfroman in https://github.com/latent-to/async-substrate-interface/pull/367 + +**Full Changelog**: https://github.com/latent-to/async-substrate-interface/compare/v2.2.0...v2.2.1 + ## 2.2.0 /2026-06-11 ## What's Changed diff --git a/pyproject.toml b/pyproject.toml index 84c17477..b4087390 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "2.2.0" +version = "2.2.1" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }