Skip to content

Commit b58249d

Browse files
authored
clear session if the session is closed to avoid using closed sessions (#81)
Why === when we get a `SESSION_STATE_MISMATCH`, we close the old session and delete it, but the recreate fn still has a handle to it _Describe what prompted you to make this change, link relevant resources: Linear issues, Slack discussions, etc._ What changed ============ if the retry loop detects the old session to be closed, stop using it _Describe what changed to a level of detail that someone with no context with your PR could be able to review it_ Test plan ========= _Describe what you did to test this change to a level of detail that allows your reviewer to test it_
1 parent aa1e501 commit b58249d

2 files changed

Lines changed: 11 additions & 3 deletions

File tree

replit_river/client_transport.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,19 @@ async def _establish_new_connection(
102102
max_retry = self._transport_options.connection_retry_options.max_retry
103103
client_id = self._client_id
104104
logger.info("Attempting to establish new ws connection")
105+
105106
for i in range(max_retry):
106107
if i > 0:
107108
logger.info(f"Retrying build handshake number {i} times")
108109
if not rate_limit.has_budget(client_id):
109110
logger.debug("No retry budget for %s.", client_id)
110111
break
111112
rate_limit.consume_budget(client_id)
113+
114+
# if the session is closed, we shouldn't use it
115+
if old_session and not await old_session.is_session_open():
116+
old_session = None
117+
112118
try:
113119
ws = await websockets.connect(self._websocket_uri)
114120
session_id = (
@@ -333,6 +339,8 @@ async def _establish_handshake(
333339
await self._delete_session(old_session)
334340

335341
raise RiverException(
336-
ERROR_HANDSHAKE, f"Handshake failed: {handshake_response.status.reason}"
342+
ERROR_HANDSHAKE,
343+
f"Handshake failed with code ${handshake_response.status.code}: "
344+
+ f"{handshake_response.status.reason}",
337345
)
338346
return handshake_request, handshake_response

replit_river/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ async def _begin_close_session_countdown(self) -> None:
124124
if self._close_session_after_time_secs is not None:
125125
# already in grace period, no need to set again
126126
return
127-
logger.debug(
127+
logger.info(
128128
"websocket closed from %s to %s begin grace period",
129129
self._transport_id,
130130
self._to_id,
@@ -133,6 +133,7 @@ async def _begin_close_session_countdown(self) -> None:
133133

134134
async def serve(self) -> None:
135135
"""Serve messages from the websocket."""
136+
self._reset_session_close_countdown()
136137
try:
137138
async with asyncio.TaskGroup() as tg:
138139
try:
@@ -226,7 +227,6 @@ async def replace_with_new_websocket(
226227
old_wrapper = self._ws_wrapper
227228
old_ws_id = old_wrapper.ws.id
228229
if new_ws.id != old_ws_id:
229-
self._reset_session_close_countdown()
230230
await old_wrapper.close()
231231
self._ws_wrapper = WebsocketWrapper(new_ws)
232232
await self._send_buffered_messages(new_ws)

0 commit comments

Comments
 (0)