Skip to content

Commit 4dae288

Browse files
committed
fix(ws): stale socket cleanup no longer evicts a reconnected node
ConnectionManager.disconnect() popped the connection blindly by node_id, and the node_websocket receive loop calls it unconditionally in its finally. When a node reconnects to the SAME process, connect() replaces the old socket with the new one — but the old socket's loop then exits and calls disconnect(node_id), evicting the NEW (live) connection. Symptom: a node that reconnected (flaky USB/RTSP camera networking, a node-initiated reconnect) keeps heartbeating — DB/dashboard show it online — but manager.is_connected() returns False, so every WS command (view_camera, attach_snapshot, recording control) fails with "node not connected" until the node happens to reconnect again. Not data loss, but it silently breaks live features on the core node channel. Fix: make disconnect() identity-aware — it now takes the disconnecting ws and only tears down if the stored socket IS that ws. A stale old socket's cleanup early-returns, leaving the new connection and its pending commands intact. The receive loop passes its own ws. Back-compat: disconnect(node_id) with no ws still tears down (legacy call shape). Adds ConnectionManager regression tests (reconnect race, no-ws back-compat, pending-command preservation). Full suite 663 passed.
1 parent c29eaac commit 4dae288

2 files changed

Lines changed: 105 additions & 2 deletions

File tree

backend/app/api/ws.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,20 @@ async def connect(self, node_id: str, ws: WebSocket):
157157
# Use print() so it always appears in fly logs (logger.info is filtered by default)
158158
print(f"[WS] Node {node_id} connected via WebSocket")
159159

160-
def disconnect(self, node_id: str):
160+
def disconnect(self, node_id: str, ws: WebSocket | None = None):
161+
# Identity guard for the reconnect race: when a node reconnects to
162+
# the SAME process, `connect()` replaces the old socket with the new
163+
# one, but the OLD socket's receive loop then exits and calls
164+
# disconnect(node_id). A blind pop(node_id) would evict the NEW
165+
# (live) connection — leaving a node that keeps heartbeating yet
166+
# reports `is_connected() == False`, so every command (snapshot /
167+
# view / recording over WS) fails until it reconnects again. Only
168+
# tear down if the stored socket is the one actually disconnecting.
169+
current = self._connections.get(node_id)
170+
if ws is not None and current is not None and current is not ws:
171+
# A newer connection already replaced us — leave the registry and
172+
# the new connection's pending commands untouched.
173+
return
161174
self._connections.pop(node_id, None)
162175
# Cancel pending command futures so callers don't wait until
163176
# timeout for a node that's already gone.
@@ -379,7 +392,9 @@ async def node_websocket(
379392
except Exception as e:
380393
logger.error("WebSocket error for node %s: %s", node_id, e)
381394
finally:
382-
manager.disconnect(node_id)
395+
# Pass THIS socket so a stale old connection's cleanup can't evict a
396+
# newer one that already replaced it (see ConnectionManager.disconnect).
397+
manager.disconnect(node_id, ws)
383398
_ws_rate_limiter.forget(node_id)
384399

385400

backend/tests/test_ws_manager.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""ConnectionManager connection-lifecycle regression tests.
2+
3+
The reconnect race: when a node reconnects to the same process, `connect()`
4+
replaces the old socket with the new one. The OLD socket's receive loop then
5+
exits and calls `disconnect(node_id, old_ws)`. That must NOT evict the new,
6+
live connection — the bug it guards against left a node that kept heartbeating
7+
(DB shows it online) but reported `is_connected() == False`, so every WS
8+
command (snapshot / view / recording) failed until the node reconnected again.
9+
"""
10+
11+
import asyncio
12+
13+
from app.api.ws import ConnectionManager
14+
15+
16+
class _FakeWS:
17+
"""Minimal stand-in: connect() awaits .close() on the replaced socket."""
18+
19+
def __init__(self, name: str) -> None:
20+
self.name = name
21+
self.closed = False
22+
23+
async def close(self, code: int = 1000, reason: str = "") -> None:
24+
self.closed = True
25+
26+
def __repr__(self) -> str:
27+
return f"_FakeWS({self.name})"
28+
29+
30+
def test_stale_disconnect_does_not_evict_new_connection():
31+
async def scenario():
32+
mgr = ConnectionManager()
33+
ws1, ws2 = _FakeWS("ws1"), _FakeWS("ws2")
34+
35+
await mgr.connect("node_a", ws1)
36+
assert mgr.is_connected("node_a")
37+
38+
# Node reconnects: ws2 replaces ws1 (connect() closes the old one).
39+
await mgr.connect("node_a", ws2)
40+
assert ws1.closed is True
41+
assert mgr._connections["node_a"] is ws2
42+
43+
# The OLD socket's receive loop now exits and disconnects with ITS ws.
44+
mgr.disconnect("node_a", ws1)
45+
46+
# The new connection must survive — this is the regression.
47+
assert mgr.is_connected("node_a") is True
48+
assert mgr._connections["node_a"] is ws2
49+
50+
# When the CURRENT socket disconnects, it IS torn down.
51+
mgr.disconnect("node_a", ws2)
52+
assert mgr.is_connected("node_a") is False
53+
54+
asyncio.run(scenario())
55+
56+
57+
def test_disconnect_without_ws_still_tears_down():
58+
"""Back-compat: disconnect(node_id) with no ws still removes the
59+
connection (the identity guard only engages when a ws is passed)."""
60+
async def scenario():
61+
mgr = ConnectionManager()
62+
await mgr.connect("node_b", _FakeWS("ws1"))
63+
mgr.disconnect("node_b") # legacy call shape, no ws
64+
assert mgr.is_connected("node_b") is False
65+
66+
asyncio.run(scenario())
67+
68+
69+
def test_stale_disconnect_preserves_live_connections_pending_commands():
70+
"""A stale (old-socket) disconnect must not cancel the live connection's
71+
in-flight command futures."""
72+
async def scenario():
73+
mgr = ConnectionManager()
74+
ws1, ws2 = _FakeWS("ws1"), _FakeWS("ws2")
75+
await mgr.connect("node_c", ws1)
76+
await mgr.connect("node_c", ws2)
77+
78+
loop = asyncio.get_running_loop()
79+
fut = loop.create_future()
80+
mgr._pending_commands["corr-1"] = ("node_c", fut)
81+
82+
mgr.disconnect("node_c", ws1) # stale — early-returns
83+
84+
assert not fut.cancelled()
85+
assert "corr-1" in mgr._pending_commands
86+
fut.cancel() # tidy up the dangling future
87+
88+
asyncio.run(scenario())

0 commit comments

Comments
 (0)