From f68bbf77568d30c96ef36c1d12ac241ae1017ec0 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 11:45:33 +0400 Subject: [PATCH 01/25] feat: preserve SFU error code in SignalingError When the SFU returns an error (e.g. SFU_FULL), the error code was discarded and only the message string was kept. This preserves the full error object so downstream retry logic can inspect the code. --- getstream/video/rtc/signaling.py | 8 +++++--- tests/test_signaling.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/getstream/video/rtc/signaling.py b/getstream/video/rtc/signaling.py index 675dc7aa..3050a426 100644 --- a/getstream/video/rtc/signaling.py +++ b/getstream/video/rtc/signaling.py @@ -26,7 +26,9 @@ class SignalingError(Exception): """Exception raised for errors in the signaling process.""" - pass + def __init__(self, message: str, error=None): + super().__init__(message) + self.error = error class WebSocketClient(StreamAsyncIOEventEmitter): @@ -111,8 +113,8 @@ async def connect(self): # Check if the first message is an error if self.first_message and self.first_message.HasField("error"): - error_msg = self.first_message.error.error.message - raise SignalingError(f"Connection failed: {error_msg}") + sfu_error = self.first_message.error.error + raise SignalingError(f"Connection failed: {sfu_error.message}", error=sfu_error) # Check if we got join_response if self.first_message and self.first_message.HasField("join_response"): diff --git a/tests/test_signaling.py b/tests/test_signaling.py index 724e0f50..e9930c61 100644 --- a/tests/test_signaling.py +++ b/tests/test_signaling.py @@ -5,6 +5,7 @@ from getstream.video.rtc.signaling import WebSocketClient, SignalingError from getstream.video.rtc.pb.stream.video.sfu.event import events_pb2 +from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 class TestWebSocketClient: @@ -129,6 +130,36 @@ async def test_connect_error(self, join_request, mock_websocket): # Clean up client.close() + @pytest.mark.asyncio + async def test_connect_error_preserves_error_code(self, join_request, mock_websocket): + """Test that SignalingError preserves the SFU error code.""" + client = WebSocketClient( + "wss://test.url", join_request, asyncio.get_running_loop() + ) + + # Prepare an SFU FULL error response + error_response = events_pb2.SfuEvent() + error_response.error.error.code = models_pb2.ERROR_CODE_SFU_FULL + error_response.error.error.message = "server is full" + error_response_bytes = error_response.SerializeToString() + + connect_task = asyncio.create_task(client.connect()) + await asyncio.sleep(0.1) + + on_open_callback = mock_websocket.call_args[1]["on_open"] + on_open_callback(mock_websocket.return_value) + + on_message_callback = mock_websocket.call_args[1]["on_message"] + on_message_callback(mock_websocket.return_value, error_response_bytes) + + with pytest.raises(SignalingError) as exc_info: + await connect_task + + assert exc_info.value.error is not None + assert exc_info.value.error.code == models_pb2.ERROR_CODE_SFU_FULL + + client.close() + @pytest.mark.asyncio async def test_websocket_error_during_connect(self, join_request, mock_websocket): """Test WebSocket error during connection.""" From bd33f67c7a897790141e734fee398c1812beebb7 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 11:51:20 +0400 Subject: [PATCH 02/25] feat: add SfuJoinError and retryable error detection in connect_websocket connect_websocket now distinguishes retryable SFU errors (SFU_FULL, SFU_SHUTTING_DOWN, CALL_PARTICIPANT_LIMIT_REACHED) from fatal ones, raising SfuJoinError so the retry loop in connect() can request a different SFU from the coordinator. --- getstream/video/rtc/connection_utils.py | 24 +++++++++ tests/test_connection_utils.py | 71 +++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 tests/test_connection_utils.py diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index bdc8d02f..99bb52dd 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -94,6 +94,22 @@ class SfuConnectionError(Exception): pass +class SfuJoinError(SfuConnectionError): + """Raised when SFU join fails with a retryable error code.""" + + def __init__(self, message: str, error_code: int = 0, should_retry: bool = False): + super().__init__(message) + self.error_code = error_code + self.should_retry = should_retry + + +_RETRYABLE_SFU_ERROR_CODES = { + 700, # ERROR_CODE_SFU_FULL + 600, # ERROR_CODE_SFU_SHUTTING_DOWN + 301, # ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED +} + + @dataclass class ConnectionOptions: """Options for the connection process.""" @@ -450,6 +466,14 @@ async def connect_websocket( logger.debug("WebSocket connection established") return ws_client, sfu_event + except SignalingError as e: + if e.error and hasattr(e.error, "code") and e.error.code in _RETRYABLE_SFU_ERROR_CODES: + raise SfuJoinError( + str(e), + error_code=e.error.code, + should_retry=True, + ) from e + raise except Exception as e: logger.error(f"Failed to connect WebSocket to {ws_url}: {e}") raise SignalingError(f"WebSocket connection failed: {e}") diff --git a/tests/test_connection_utils.py b/tests/test_connection_utils.py new file mode 100644 index 00000000..c6160d11 --- /dev/null +++ b/tests/test_connection_utils.py @@ -0,0 +1,71 @@ +import pytest +from unittest.mock import AsyncMock, patch + +from getstream.video.rtc.connection_utils import ( + connect_websocket, + ConnectionOptions, + SfuConnectionError, +) +from getstream.video.rtc.signaling import SignalingError +from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 + + +class TestConnectWebsocket: + @pytest.mark.asyncio + async def test_raises_sfu_join_error_on_sfu_full(self): + """connect_websocket should raise SfuJoinError when SFU is full.""" + from getstream.video.rtc.connection_utils import SfuJoinError + + # Create a models_pb2.Error with SFU_FULL code + sfu_error = models_pb2.Error( + code=models_pb2.ERROR_CODE_SFU_FULL, + message="server is full", + should_retry=True, + ) + signaling_error = SignalingError("Connection failed: server is full", error=sfu_error) + + with patch("getstream.video.rtc.connection_utils.WebSocketClient") as mock_ws_cls: + mock_ws = AsyncMock() + mock_ws.connect = AsyncMock(side_effect=signaling_error) + mock_ws_cls.return_value = mock_ws + + with pytest.raises(SfuJoinError) as exc_info: + await connect_websocket( + token="test_token", + ws_url="wss://test.url", + session_id="test_session", + options=ConnectionOptions(), + ) + + assert exc_info.value.error_code == models_pb2.ERROR_CODE_SFU_FULL + assert exc_info.value.should_retry is True + # SfuJoinError should be a subclass of SfuConnectionError + assert isinstance(exc_info.value, SfuConnectionError) + + @pytest.mark.asyncio + async def test_non_retryable_error_propagates_as_signaling_error(self): + """Non-retryable SignalingError should not become SfuJoinError.""" + from getstream.video.rtc.connection_utils import SfuJoinError + + # Error with non-retryable code (e.g. permission denied) + sfu_error = models_pb2.Error( + code=models_pb2.ERROR_CODE_PERMISSION_DENIED, + message="permission denied", + should_retry=False, + ) + signaling_error = SignalingError("Connection failed: permission denied", error=sfu_error) + + with patch("getstream.video.rtc.connection_utils.WebSocketClient") as mock_ws_cls: + mock_ws = AsyncMock() + mock_ws.connect = AsyncMock(side_effect=signaling_error) + mock_ws_cls.return_value = mock_ws + + with pytest.raises(SignalingError) as exc_info: + await connect_websocket( + token="test_token", + ws_url="wss://test.url", + session_id="test_session", + options=ConnectionOptions(), + ) + + assert not isinstance(exc_info.value, SfuJoinError) From 369a085b21b2259e89a82b675450148bc2e32160 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 11:58:22 +0400 Subject: [PATCH 03/25] feat: pass migrating_from to coordinator join request Allows the coordinator to exclude full/failed SFUs when assigning a new SFU for the participant, by passing migrating_from and migrating_from_list in the join call body. --- getstream/video/rtc/connection_utils.py | 6 +++ tests/test_connection_utils.py | 67 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index 99bb52dd..4c10d096 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -191,6 +191,8 @@ async def join_call_coordinator_request( notify: Optional[bool] = None, video: Optional[bool] = None, location: Optional[str] = None, + migrating_from: Optional[str] = None, + migrating_from_list: Optional[list] = None, ) -> StreamResponse[JoinCallResponse]: """Make a request to join a call via the coordinator. @@ -224,6 +226,10 @@ async def join_call_coordinator_request( video=video, data=data, ) + if migrating_from: + json_body["migrating_from"] = migrating_from + if migrating_from_list: + json_body["migrating_from_list"] = migrating_from_list # Make the POST request to join the call return await client.post( diff --git a/tests/test_connection_utils.py b/tests/test_connection_utils.py index c6160d11..5f5cc2f5 100644 --- a/tests/test_connection_utils.py +++ b/tests/test_connection_utils.py @@ -5,6 +5,7 @@ connect_websocket, ConnectionOptions, SfuConnectionError, + join_call_coordinator_request, ) from getstream.video.rtc.signaling import SignalingError from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 @@ -69,3 +70,69 @@ async def test_non_retryable_error_propagates_as_signaling_error(self): ) assert not isinstance(exc_info.value, SfuJoinError) + + +class TestJoinCallCoordinatorRequest: + @pytest.mark.asyncio + async def test_includes_migrating_from_in_body(self): + """migrating_from and migrating_from_list should be included in the request body.""" + mock_call = AsyncMock() + mock_call.call_type = "default" + mock_call.id = "test_call" + mock_call.client.stream.api_key = "key" + mock_call.client.stream.api_secret = "secret" + mock_call.client.stream.base_url = "https://test.url" + + captured_body = {} + + with patch("getstream.video.rtc.connection_utils.user_client") as mock_user_client: + mock_client = AsyncMock() + + async def capture_post(*args, **kwargs): + captured_body.update(kwargs.get("json", {})) + return AsyncMock() + + mock_client.post = capture_post + mock_user_client.return_value = mock_client + + await join_call_coordinator_request( + call=mock_call, + user_id="user1", + location="auto", + migrating_from="sfu-london-1", + migrating_from_list=["sfu-london-1", "sfu-paris-2"], + ) + + assert captured_body["migrating_from"] == "sfu-london-1" + assert captured_body["migrating_from_list"] == ["sfu-london-1", "sfu-paris-2"] + + @pytest.mark.asyncio + async def test_omits_migrating_from_when_not_provided(self): + """migrating_from should not appear in body when not provided.""" + mock_call = AsyncMock() + mock_call.call_type = "default" + mock_call.id = "test_call" + mock_call.client.stream.api_key = "key" + mock_call.client.stream.api_secret = "secret" + mock_call.client.stream.base_url = "https://test.url" + + captured_body = {} + + with patch("getstream.video.rtc.connection_utils.user_client") as mock_user_client: + mock_client = AsyncMock() + + async def capture_post(*args, **kwargs): + captured_body.update(kwargs.get("json", {})) + return AsyncMock() + + mock_client.post = capture_post + mock_user_client.return_value = mock_client + + await join_call_coordinator_request( + call=mock_call, + user_id="user1", + location="auto", + ) + + assert "migrating_from" not in captured_body + assert "migrating_from_list" not in captured_body From 8678b80c425cb28e2f12b6ed23312749548d22ba Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 12:04:32 +0400 Subject: [PATCH 04/25] feat: retry connect() on SFU full by requesting a different SFU When an SFU rejects a join with a retryable error (SFU_FULL, SFU_SHUTTING_DOWN, CALL_PARTICIPANT_LIMIT_REACHED), connect() now retries by asking the coordinator for a different SFU via migrating_from_list, instead of immediately failing. --- getstream/video/rtc/connection_manager.py | 45 ++++++- tests/test_connection_manager.py | 138 ++++++++++++++++++++++ 2 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 tests/test_connection_manager.py diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index e781d917..394c24d7 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -19,6 +19,7 @@ from getstream.video.rtc.connection_utils import ( ConnectionState, SfuConnectionError, + SfuJoinError, ConnectionOptions, connect_websocket, join_call, @@ -55,6 +56,7 @@ def __init__( user_id: Optional[str] = None, create: bool = True, subscription_config: Optional[SubscriptionConfig] = None, + max_join_retries: int = 3, **kwargs: Any, ): super().__init__() @@ -68,6 +70,7 @@ def __init__( self.session_id: str = str(uuid.uuid4()) self.join_response: Optional[JoinCallResponse] = None self.local_sfu: bool = False # Local SFU flag for development + self._max_join_retries: int = max_join_retries # Private attributes self._connection_state: ConnectionState = ConnectionState.IDLE @@ -282,6 +285,7 @@ async def _connect_internal( ws_url: Optional[str] = None, token: Optional[str] = None, session_id: Optional[str] = None, + migrating_from_list: Optional[list] = None, ) -> None: """ Internal connection method that handles the core connection logic. @@ -324,6 +328,8 @@ async def _connect_internal( "auto", self.create, self.local_sfu, + migrating_from=migrating_from_list[-1] if migrating_from_list else None, + migrating_from_list=migrating_from_list, **self.kwargs, ) ws_url = join_response.data.credentials.server.ws_endpoint @@ -395,6 +401,8 @@ async def _connect_internal( logger.exception(f"No join response from WebSocket: {sfu_event}") logger.debug(f"WebSocket connected successfully to {ws_url}") + except SfuJoinError: + raise except Exception as e: logger.exception(f"Failed to connect WebSocket to {ws_url}: {e}") raise SfuConnectionError(f"WebSocket connection failed: {e}") from e @@ -427,7 +435,8 @@ async def connect(self): Connect to SFU. This method automatically handles retry logic for transient errors - like "server is full" and network issues. + like "server is full" by requesting a different SFU from the + coordinator. """ logger.info("Connecting to SFU") # Fire-and-forget the coordinator WS connection so we don't block here @@ -445,7 +454,39 @@ def _on_coordinator_task_done(task: asyncio.Task): logger.exception("Coordinator WS task failed") self._coordinator_task.add_done_callback(_on_coordinator_task_done) - await self._connect_internal() + + failed_sfus: list[str] = [] + last_error: Optional[SfuJoinError] = None + + for attempt in range(1 + self._max_join_retries): + try: + await self._connect_internal( + migrating_from_list=failed_sfus if failed_sfus else None, + ) + return + except SfuJoinError as e: + last_error = e + # Track the failed SFU + if self.join_response and self.join_response.credentials: + edge = self.join_response.credentials.server.edge_name + if edge and edge not in failed_sfus: + failed_sfus.append(edge) + logger.warning( + f"SFU join failed (attempt {attempt + 1}/{1 + self._max_join_retries}, " + f"code={e.error_code}). Failed SFUs: {failed_sfus}" + ) + # Clean up partial state before retry + if self._ws_client: + self._ws_client.close() + self._ws_client = None + self.connection_state = ConnectionState.IDLE + + if attempt < self._max_join_retries: + delay = 0.5 * (2.0 ** attempt) + logger.info(f"Retrying in {delay}s with different SFU...") + await asyncio.sleep(delay) + + raise last_error # type: ignore[misc] async def wait(self): """ diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py new file mode 100644 index 00000000..ba2ff2b4 --- /dev/null +++ b/tests/test_connection_manager.py @@ -0,0 +1,138 @@ +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + +from getstream.video.rtc.connection_utils import SfuJoinError, SfuConnectionError +from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 + + +class TestConnectRetry: + """Tests for connect() retry logic when SFU is full.""" + + def _make_connection_manager(self, max_join_retries=3): + """Create a ConnectionManager with mocked dependencies.""" + with patch("getstream.video.rtc.connection_manager.PeerConnectionManager"), \ + patch("getstream.video.rtc.connection_manager.NetworkMonitor"), \ + patch("getstream.video.rtc.connection_manager.ReconnectionManager"), \ + patch("getstream.video.rtc.connection_manager.RecordingManager"), \ + patch("getstream.video.rtc.connection_manager.SubscriptionManager"), \ + patch("getstream.video.rtc.connection_manager.ParticipantsState"), \ + patch("getstream.video.rtc.connection_manager.Tracer"): + from getstream.video.rtc.connection_manager import ConnectionManager + + mock_call = MagicMock() + mock_call.call_type = "default" + mock_call.id = "test_call" + cm = ConnectionManager(call=mock_call, user_id="user1", max_join_retries=max_join_retries) + return cm + + @pytest.mark.asyncio + async def test_retries_on_sfu_join_error_and_passes_failed_sfus(self): + """When SFU is full, connect() should retry with migrating_from_list.""" + cm = self._make_connection_manager(max_join_retries=2) + + call_count = 0 + received_migrating_from_list = [] + + async def mock_connect_internal(migrating_from_list=None, **kwargs): + nonlocal call_count + call_count += 1 + received_migrating_from_list.append(migrating_from_list) + + if call_count <= 2: + # Simulate SFU assigning an edge_name before failing + mock_join_response = MagicMock() + mock_join_response.credentials.server.edge_name = f"sfu-node-{call_count}" + cm.join_response = mock_join_response + raise SfuJoinError( + "server is full", + error_code=models_pb2.ERROR_CODE_SFU_FULL, + should_retry=True, + ) + # Third attempt succeeds + cm.running = True + + cm._connect_internal = mock_connect_internal + cm._connect_coordinator_ws = AsyncMock() + + await cm.connect() + + assert call_count == 3 + # First attempt: no failed SFUs + assert received_migrating_from_list[0] is None + # Second attempt: first SFU in the exclude list + assert "sfu-node-1" in received_migrating_from_list[1] + # Third attempt: both SFUs in the exclude list + assert received_migrating_from_list[2] == ["sfu-node-1", "sfu-node-2"] + + @pytest.mark.asyncio + async def test_raises_after_all_retries_exhausted(self): + """When all retries are exhausted, connect() should raise SfuJoinError.""" + cm = self._make_connection_manager(max_join_retries=1) + + async def always_fail(migrating_from_list=None, **kwargs): + mock_join_response = MagicMock() + mock_join_response.credentials.server.edge_name = "sfu-node-1" + cm.join_response = mock_join_response + raise SfuJoinError( + "server is full", + error_code=models_pb2.ERROR_CODE_SFU_FULL, + should_retry=True, + ) + + cm._connect_internal = always_fail + cm._connect_coordinator_ws = AsyncMock() + + with pytest.raises(SfuJoinError): + await cm.connect() + + @pytest.mark.asyncio + async def test_non_retryable_error_propagates_immediately(self): + """Non-retryable errors should not trigger retry.""" + cm = self._make_connection_manager(max_join_retries=3) + + call_count = 0 + + async def fail_with_generic_error(migrating_from_list=None, **kwargs): + nonlocal call_count + call_count += 1 + raise SfuConnectionError("something went wrong") + + cm._connect_internal = fail_with_generic_error + cm._connect_coordinator_ws = AsyncMock() + + with pytest.raises(SfuConnectionError): + await cm.connect() + + # Should not retry — only one call + assert call_count == 1 + + @pytest.mark.asyncio + async def test_cleans_up_ws_client_between_retries(self): + """Partial WS state should be cleaned up before retry.""" + cm = self._make_connection_manager(max_join_retries=1) + + call_count = 0 + + async def mock_connect_internal(migrating_from_list=None, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + # Simulate partial WS connection + cm._ws_client = MagicMock() + mock_join_response = MagicMock() + mock_join_response.credentials.server.edge_name = "sfu-node-1" + cm.join_response = mock_join_response + raise SfuJoinError( + "server is full", + error_code=models_pb2.ERROR_CODE_SFU_FULL, + should_retry=True, + ) + # Second attempt: ws_client should have been cleaned up + cm.running = True + + cm._connect_internal = mock_connect_internal + cm._connect_coordinator_ws = AsyncMock() + + await cm.connect() + + assert call_count == 2 From 8f73de874f7fcb2b928ec66fa6e9c87bbd207e3f Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 12:05:59 +0400 Subject: [PATCH 05/25] chore: remove dead retry code replaced by error-code-based detection _RETRYABLE_ERROR_PATTERNS and _is_retryable() used string matching and were never called. Retry logic now uses SFU error codes via SfuJoinError and _RETRYABLE_SFU_ERROR_CODES. --- getstream/video/rtc/connection_utils.py | 39 ------------------------- 1 file changed, 39 deletions(-) diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index 4c10d096..24d31687 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -58,19 +58,6 @@ "connect_websocket", ] -# Private constants - internal use only -_RETRYABLE_ERROR_PATTERNS = [ - "server is full", - "server overloaded", - "capacity exceeded", - "try again later", - "service unavailable", - "connection timeout", - "network error", - "temporary failure", - "connection refused", - "connection reset", -] # Public classes and exceptions @@ -483,29 +470,3 @@ async def connect_websocket( except Exception as e: logger.error(f"Failed to connect WebSocket to {ws_url}: {e}") raise SignalingError(f"WebSocket connection failed: {e}") - - -# Private functions -def _is_retryable(retry_state: Any) -> bool: - """Check if an error should be retried. - - Args: - retry_state: The retry state object from tenacity - - Returns: - True if the error should be retried, False otherwise - """ - # Extract the actual exception from the retry state - if hasattr(retry_state, "outcome") and retry_state.outcome.failed: - error = retry_state.outcome.exception() - else: - return False - - # Import here to avoid circular imports - from getstream.video.rtc.signaling import SignalingError - - if not isinstance(error, (SignalingError, SfuConnectionError)): - return False - - error_message = str(error).lower() - return any(pattern in error_message for pattern in _RETRYABLE_ERROR_PATTERNS) From 6a352b9f38602d5ddf5ed42f36b7621380247421 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 12:10:40 +0400 Subject: [PATCH 06/25] style: remove extra blank line left after dead code removal --- getstream/video/rtc/connection_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index 24d31687..237fef5e 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -59,7 +59,6 @@ ] - # Public classes and exceptions class ConnectionState(Enum): """Enumeration of possible connection states.""" From 536f95d391627414c9e81b2146afa279d47fcdfe Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 12:13:12 +0400 Subject: [PATCH 07/25] style: apply ruff formatting --- getstream/video/rtc/connection_manager.py | 6 ++++-- getstream/video/rtc/connection_utils.py | 6 +++++- getstream/video/rtc/signaling.py | 4 +++- tests/test_connection_manager.py | 24 ++++++++++++++--------- tests/test_connection_utils.py | 24 +++++++++++++++++------ 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 394c24d7..1d628832 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -328,7 +328,9 @@ async def _connect_internal( "auto", self.create, self.local_sfu, - migrating_from=migrating_from_list[-1] if migrating_from_list else None, + migrating_from=migrating_from_list[-1] + if migrating_from_list + else None, migrating_from_list=migrating_from_list, **self.kwargs, ) @@ -482,7 +484,7 @@ def _on_coordinator_task_done(task: asyncio.Task): self.connection_state = ConnectionState.IDLE if attempt < self._max_join_retries: - delay = 0.5 * (2.0 ** attempt) + delay = 0.5 * (2.0**attempt) logger.info(f"Retrying in {delay}s with different SFU...") await asyncio.sleep(delay) diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index 237fef5e..94ab5f8b 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -459,7 +459,11 @@ async def connect_websocket( return ws_client, sfu_event except SignalingError as e: - if e.error and hasattr(e.error, "code") and e.error.code in _RETRYABLE_SFU_ERROR_CODES: + if ( + e.error + and hasattr(e.error, "code") + and e.error.code in _RETRYABLE_SFU_ERROR_CODES + ): raise SfuJoinError( str(e), error_code=e.error.code, diff --git a/getstream/video/rtc/signaling.py b/getstream/video/rtc/signaling.py index 3050a426..38ecabcf 100644 --- a/getstream/video/rtc/signaling.py +++ b/getstream/video/rtc/signaling.py @@ -114,7 +114,9 @@ async def connect(self): # Check if the first message is an error if self.first_message and self.first_message.HasField("error"): sfu_error = self.first_message.error.error - raise SignalingError(f"Connection failed: {sfu_error.message}", error=sfu_error) + raise SignalingError( + f"Connection failed: {sfu_error.message}", error=sfu_error + ) # Check if we got join_response if self.first_message and self.first_message.HasField("join_response"): diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index ba2ff2b4..3371bb81 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -10,19 +10,23 @@ class TestConnectRetry: def _make_connection_manager(self, max_join_retries=3): """Create a ConnectionManager with mocked dependencies.""" - with patch("getstream.video.rtc.connection_manager.PeerConnectionManager"), \ - patch("getstream.video.rtc.connection_manager.NetworkMonitor"), \ - patch("getstream.video.rtc.connection_manager.ReconnectionManager"), \ - patch("getstream.video.rtc.connection_manager.RecordingManager"), \ - patch("getstream.video.rtc.connection_manager.SubscriptionManager"), \ - patch("getstream.video.rtc.connection_manager.ParticipantsState"), \ - patch("getstream.video.rtc.connection_manager.Tracer"): + with ( + patch("getstream.video.rtc.connection_manager.PeerConnectionManager"), + patch("getstream.video.rtc.connection_manager.NetworkMonitor"), + patch("getstream.video.rtc.connection_manager.ReconnectionManager"), + patch("getstream.video.rtc.connection_manager.RecordingManager"), + patch("getstream.video.rtc.connection_manager.SubscriptionManager"), + patch("getstream.video.rtc.connection_manager.ParticipantsState"), + patch("getstream.video.rtc.connection_manager.Tracer"), + ): from getstream.video.rtc.connection_manager import ConnectionManager mock_call = MagicMock() mock_call.call_type = "default" mock_call.id = "test_call" - cm = ConnectionManager(call=mock_call, user_id="user1", max_join_retries=max_join_retries) + cm = ConnectionManager( + call=mock_call, user_id="user1", max_join_retries=max_join_retries + ) return cm @pytest.mark.asyncio @@ -41,7 +45,9 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): if call_count <= 2: # Simulate SFU assigning an edge_name before failing mock_join_response = MagicMock() - mock_join_response.credentials.server.edge_name = f"sfu-node-{call_count}" + mock_join_response.credentials.server.edge_name = ( + f"sfu-node-{call_count}" + ) cm.join_response = mock_join_response raise SfuJoinError( "server is full", diff --git a/tests/test_connection_utils.py b/tests/test_connection_utils.py index 5f5cc2f5..6043c4fb 100644 --- a/tests/test_connection_utils.py +++ b/tests/test_connection_utils.py @@ -23,9 +23,13 @@ async def test_raises_sfu_join_error_on_sfu_full(self): message="server is full", should_retry=True, ) - signaling_error = SignalingError("Connection failed: server is full", error=sfu_error) + signaling_error = SignalingError( + "Connection failed: server is full", error=sfu_error + ) - with patch("getstream.video.rtc.connection_utils.WebSocketClient") as mock_ws_cls: + with patch( + "getstream.video.rtc.connection_utils.WebSocketClient" + ) as mock_ws_cls: mock_ws = AsyncMock() mock_ws.connect = AsyncMock(side_effect=signaling_error) mock_ws_cls.return_value = mock_ws @@ -54,9 +58,13 @@ async def test_non_retryable_error_propagates_as_signaling_error(self): message="permission denied", should_retry=False, ) - signaling_error = SignalingError("Connection failed: permission denied", error=sfu_error) + signaling_error = SignalingError( + "Connection failed: permission denied", error=sfu_error + ) - with patch("getstream.video.rtc.connection_utils.WebSocketClient") as mock_ws_cls: + with patch( + "getstream.video.rtc.connection_utils.WebSocketClient" + ) as mock_ws_cls: mock_ws = AsyncMock() mock_ws.connect = AsyncMock(side_effect=signaling_error) mock_ws_cls.return_value = mock_ws @@ -85,7 +93,9 @@ async def test_includes_migrating_from_in_body(self): captured_body = {} - with patch("getstream.video.rtc.connection_utils.user_client") as mock_user_client: + with patch( + "getstream.video.rtc.connection_utils.user_client" + ) as mock_user_client: mock_client = AsyncMock() async def capture_post(*args, **kwargs): @@ -118,7 +128,9 @@ async def test_omits_migrating_from_when_not_provided(self): captured_body = {} - with patch("getstream.video.rtc.connection_utils.user_client") as mock_user_client: + with patch( + "getstream.video.rtc.connection_utils.user_client" + ) as mock_user_client: mock_client = AsyncMock() async def capture_post(*args, **kwargs): From 311447d88af091d4a087948a655b34bc820059cb Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 13:39:01 +0400 Subject: [PATCH 08/25] style: apply ruff formatting to test_signaling.py --- tests/test_signaling.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_signaling.py b/tests/test_signaling.py index e9930c61..2241f9f0 100644 --- a/tests/test_signaling.py +++ b/tests/test_signaling.py @@ -131,7 +131,9 @@ async def test_connect_error(self, join_request, mock_websocket): client.close() @pytest.mark.asyncio - async def test_connect_error_preserves_error_code(self, join_request, mock_websocket): + async def test_connect_error_preserves_error_code( + self, join_request, mock_websocket + ): """Test that SignalingError preserves the SFU error code.""" client = WebSocketClient( "wss://test.url", join_request, asyncio.get_running_loop() From 61da6118eb9e9d450a2ce4133b408c8a9451c65f Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 13:43:47 +0400 Subject: [PATCH 09/25] refactor: extract _handle_join_failure from connect() retry loop --- getstream/video/rtc/connection_manager.py | 32 +++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 1d628832..273c4170 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -468,20 +468,7 @@ def _on_coordinator_task_done(task: asyncio.Task): return except SfuJoinError as e: last_error = e - # Track the failed SFU - if self.join_response and self.join_response.credentials: - edge = self.join_response.credentials.server.edge_name - if edge and edge not in failed_sfus: - failed_sfus.append(edge) - logger.warning( - f"SFU join failed (attempt {attempt + 1}/{1 + self._max_join_retries}, " - f"code={e.error_code}). Failed SFUs: {failed_sfus}" - ) - # Clean up partial state before retry - if self._ws_client: - self._ws_client.close() - self._ws_client = None - self.connection_state = ConnectionState.IDLE + self._handle_join_failure(e, attempt, failed_sfus) if attempt < self._max_join_retries: delay = 0.5 * (2.0**attempt) @@ -490,6 +477,23 @@ def _on_coordinator_task_done(task: asyncio.Task): raise last_error # type: ignore[misc] + def _handle_join_failure( + self, error: SfuJoinError, attempt: int, failed_sfus: list[str] + ) -> None: + """Track a failed SFU and clean up partial connection state.""" + if self.join_response and self.join_response.credentials: + edge = self.join_response.credentials.server.edge_name + if edge and edge not in failed_sfus: + failed_sfus.append(edge) + logger.warning( + f"SFU join failed (attempt {attempt + 1}/{1 + self._max_join_retries}, " + f"code={error.error_code}). Failed SFUs: {failed_sfus}" + ) + if self._ws_client: + self._ws_client.close() + self._ws_client = None + self.connection_state = ConnectionState.IDLE + async def wait(self): """ Wait until the connection is over. From 2cf9b7397e71f546c5645169e5889ed76b79b0e4 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 14:40:59 +0400 Subject: [PATCH 10/25] refactor: use exp_backoff with sleep parameter in connect() retry loop Add sleep parameter to exp_backoff so callers don't need to manually call asyncio.sleep. Replaces inline backoff calculation in connect(). --- getstream/video/rtc/connection_manager.py | 22 +++++++++++---- getstream/video/rtc/coordinator/backoff.py | 9 +++++- tests/rtc/coordinator/test_backoff.py | 33 ++++++++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 273c4170..01a149ca 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -25,6 +25,7 @@ join_call, watch_call, ) +from getstream.video.rtc.coordinator.backoff import exp_backoff from getstream.video.rtc.track_util import ( fix_sdp_msid_semantic, fix_sdp_rtcp_fb, @@ -460,7 +461,21 @@ def _on_coordinator_task_done(task: asyncio.Task): failed_sfus: list[str] = [] last_error: Optional[SfuJoinError] = None - for attempt in range(1 + self._max_join_retries): + # First attempt without delay + attempt = 0 + try: + await self._connect_internal() + return + except SfuJoinError as e: + last_error = e + self._handle_join_failure(e, attempt, failed_sfus) + + # Retries with exponential backoff, requesting a different SFU + async for delay in exp_backoff( + max_retries=self._max_join_retries, base=0.5, sleep=True + ): + attempt += 1 + logger.info(f"Retrying with different SFU (waited {delay}s)...") try: await self._connect_internal( migrating_from_list=failed_sfus if failed_sfus else None, @@ -470,11 +485,6 @@ def _on_coordinator_task_done(task: asyncio.Task): last_error = e self._handle_join_failure(e, attempt, failed_sfus) - if attempt < self._max_join_retries: - delay = 0.5 * (2.0**attempt) - logger.info(f"Retrying in {delay}s with different SFU...") - await asyncio.sleep(delay) - raise last_error # type: ignore[misc] def _handle_join_failure( diff --git a/getstream/video/rtc/coordinator/backoff.py b/getstream/video/rtc/coordinator/backoff.py index 9d990c6b..c1d59f77 100644 --- a/getstream/video/rtc/coordinator/backoff.py +++ b/getstream/video/rtc/coordinator/backoff.py @@ -5,6 +5,7 @@ when reconnecting to failed WebSocket connections. """ +import asyncio import logging from typing import AsyncIterator @@ -12,7 +13,10 @@ async def exp_backoff( - max_retries: int, base: float = 1.0, factor: float = 2.0 + max_retries: int, + base: float = 1.0, + factor: float = 2.0, + sleep: bool = False, ) -> AsyncIterator[float]: """ Generate exponential backoff delays for retry attempts. @@ -21,6 +25,7 @@ async def exp_backoff( max_retries: Maximum number of retry attempts base: Base delay in seconds for the first retry factor: Multiplicative factor for each subsequent retry + sleep: If True, sleep for the delay before yielding Yields: float: Delay in seconds for each retry attempt @@ -39,4 +44,6 @@ async def exp_backoff( for attempt in range(max_retries): delay = base * (factor**attempt) logger.debug(f"Backoff attempt {attempt + 1}/{max_retries}: {delay}s delay") + if sleep: + await asyncio.sleep(delay) yield delay diff --git a/tests/rtc/coordinator/test_backoff.py b/tests/rtc/coordinator/test_backoff.py index 985f03b5..563ffffc 100644 --- a/tests/rtc/coordinator/test_backoff.py +++ b/tests/rtc/coordinator/test_backoff.py @@ -3,6 +3,7 @@ """ import pytest +from unittest.mock import patch, AsyncMock from getstream.video.rtc.coordinator.backoff import exp_backoff @@ -84,3 +85,35 @@ async def test_exp_backoff_fractional_factor(): assert actual_delays == expected_delays, ( f"Expected delays {expected_delays}, but got {actual_delays}" ) + + +@pytest.mark.asyncio +async def test_exp_backoff_sleep(): + """Test that sleep=True calls asyncio.sleep with each delay.""" + expected_delays = [0.5, 1.0, 2.0] + + with patch( + "getstream.video.rtc.coordinator.backoff.asyncio.sleep", + new_callable=AsyncMock, + ) as mock_sleep: + actual_delays = [] + async for delay in exp_backoff(max_retries=3, base=0.5, sleep=True): + actual_delays.append(delay) + + assert actual_delays == expected_delays + assert mock_sleep.await_count == 3 + for expected, call in zip(expected_delays, mock_sleep.await_args_list): + assert call.args[0] == expected + + +@pytest.mark.asyncio +async def test_exp_backoff_no_sleep_by_default(): + """Test that sleep=False (default) does not call asyncio.sleep.""" + with patch( + "getstream.video.rtc.coordinator.backoff.asyncio.sleep", + new_callable=AsyncMock, + ) as mock_sleep: + async for _ in exp_backoff(max_retries=3): + pass + + mock_sleep.assert_not_awaited() From bacbce18ce9508a48380970ddffd5db5db39117e Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 14:51:57 +0400 Subject: [PATCH 11/25] refactor: move ConnectionManager import to module level in tests --- tests/test_connection_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index 3371bb81..89db0c16 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -1,6 +1,7 @@ import pytest from unittest.mock import AsyncMock, patch, MagicMock +from getstream.video.rtc.connection_manager import ConnectionManager from getstream.video.rtc.connection_utils import SfuJoinError, SfuConnectionError from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 @@ -19,8 +20,6 @@ def _make_connection_manager(self, max_join_retries=3): patch("getstream.video.rtc.connection_manager.ParticipantsState"), patch("getstream.video.rtc.connection_manager.Tracer"), ): - from getstream.video.rtc.connection_manager import ConnectionManager - mock_call = MagicMock() mock_call.call_type = "default" mock_call.id = "test_call" From 9db988df00d9412ed66051deed68af3fe8641eff Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 15:55:29 +0400 Subject: [PATCH 12/25] fix: close ws_client on connect_websocket failure to prevent thread leak WebSocketClient starts a background thread on creation. If connect() fails (e.g. SFU full), the client was not being closed, leaking the thread. --- getstream/video/rtc/connection_utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index 94ab5f8b..75036a14 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -431,6 +431,8 @@ async def connect_websocket( """ logger.info(f"Connecting to WebSocket at {ws_url}") + ws_client = None + success = False try: # Create JoinRequest for WebSocket connection join_request = await create_join_request(token, session_id) @@ -456,6 +458,7 @@ async def connect_websocket( sfu_event = await ws_client.connect() logger.debug("WebSocket connection established") + success = True return ws_client, sfu_event except SignalingError as e: @@ -473,3 +476,6 @@ async def connect_websocket( except Exception as e: logger.error(f"Failed to connect WebSocket to {ws_url}: {e}") raise SignalingError(f"WebSocket connection failed: {e}") + finally: + if ws_client and not success: + ws_client.close() From 6a34c055c1dd8627f921a13318765736cec53558 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 15:59:47 +0400 Subject: [PATCH 13/25] test: mock exp_backoff in connect() tests to avoid real sleep delays --- tests/test_connection_manager.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index 89db0c16..c4dee87e 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -6,6 +6,12 @@ from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 +async def _instant_backoff(max_retries, base=1.0, factor=2.0, sleep=False): + """exp_backoff replacement that never sleeps.""" + for attempt in range(max_retries): + yield base * (factor**attempt) + + class TestConnectRetry: """Tests for connect() retry logic when SFU is full.""" @@ -29,6 +35,7 @@ def _make_connection_manager(self, max_join_retries=3): return cm @pytest.mark.asyncio + @patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff) async def test_retries_on_sfu_join_error_and_passes_failed_sfus(self): """When SFU is full, connect() should retry with migrating_from_list.""" cm = self._make_connection_manager(max_join_retries=2) @@ -70,6 +77,7 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): assert received_migrating_from_list[2] == ["sfu-node-1", "sfu-node-2"] @pytest.mark.asyncio + @patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff) async def test_raises_after_all_retries_exhausted(self): """When all retries are exhausted, connect() should raise SfuJoinError.""" cm = self._make_connection_manager(max_join_retries=1) @@ -112,6 +120,7 @@ async def fail_with_generic_error(migrating_from_list=None, **kwargs): assert call_count == 1 @pytest.mark.asyncio + @patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff) async def test_cleans_up_ws_client_between_retries(self): """Partial WS state should be cleaned up before retry.""" cm = self._make_connection_manager(max_join_retries=1) From 857d3c79e353a6880a94dfa2d9f5753a80b4014d Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 16:00:33 +0400 Subject: [PATCH 14/25] chore: update uv.lock --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index 08c32e71..3bc04b64 100644 --- a/uv.lock +++ b/uv.lock @@ -955,7 +955,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiohttp", marker = "extra == 'webrtc'", specifier = ">=3.13.2,<4" }, - { name = "aiortc", marker = "extra == 'webrtc'", specifier = ">=1.14.0,<2" }, + { name = "aiortc", marker = "extra == 'webrtc'", specifier = ">=1.14.0,<1.15.0" }, { name = "av", marker = "extra == 'webrtc'", specifier = ">=14.2.0,<17" }, { name = "dataclasses-json", specifier = ">=0.6.0,<0.7" }, { name = "httpx", specifier = ">=0.28.1" }, From efd9b092db9a195fb8f870035bc6122272ea50be Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 16:14:28 +0400 Subject: [PATCH 15/25] refactor: use pytest fixture for ConnectionManager setup in tests Replace helper method with a shared fixture that handles dependency patching, backoff mocking, and coordinator setup in one place. --- tests/test_connection_manager.py | 85 +++++++++++++++----------------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index c4dee87e..8cf7f45d 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -12,34 +12,43 @@ async def _instant_backoff(max_retries, base=1.0, factor=2.0, sleep=False): yield base * (factor**attempt) +@pytest.fixture +def connection_manager(request): + """Create a ConnectionManager with mocked heavy dependencies. + + Accepts max_join_retries via indirect parametrize, defaults to 3. + """ + max_join_retries = getattr(request, "param", 3) + with ( + patch("getstream.video.rtc.connection_manager.PeerConnectionManager"), + patch("getstream.video.rtc.connection_manager.NetworkMonitor"), + patch("getstream.video.rtc.connection_manager.ReconnectionManager"), + patch("getstream.video.rtc.connection_manager.RecordingManager"), + patch("getstream.video.rtc.connection_manager.SubscriptionManager"), + patch("getstream.video.rtc.connection_manager.ParticipantsState"), + patch("getstream.video.rtc.connection_manager.Tracer"), + patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff), + ): + mock_call = MagicMock() + mock_call.call_type = "default" + mock_call.id = "test_call" + cm = ConnectionManager( + call=mock_call, user_id="user1", max_join_retries=max_join_retries + ) + cm._connect_coordinator_ws = AsyncMock() + yield cm + + class TestConnectRetry: """Tests for connect() retry logic when SFU is full.""" - def _make_connection_manager(self, max_join_retries=3): - """Create a ConnectionManager with mocked dependencies.""" - with ( - patch("getstream.video.rtc.connection_manager.PeerConnectionManager"), - patch("getstream.video.rtc.connection_manager.NetworkMonitor"), - patch("getstream.video.rtc.connection_manager.ReconnectionManager"), - patch("getstream.video.rtc.connection_manager.RecordingManager"), - patch("getstream.video.rtc.connection_manager.SubscriptionManager"), - patch("getstream.video.rtc.connection_manager.ParticipantsState"), - patch("getstream.video.rtc.connection_manager.Tracer"), - ): - mock_call = MagicMock() - mock_call.call_type = "default" - mock_call.id = "test_call" - cm = ConnectionManager( - call=mock_call, user_id="user1", max_join_retries=max_join_retries - ) - return cm - @pytest.mark.asyncio - @patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff) - async def test_retries_on_sfu_join_error_and_passes_failed_sfus(self): + @pytest.mark.parametrize("connection_manager", [2], indirect=True) + async def test_retries_on_sfu_join_error_and_passes_failed_sfus( + self, connection_manager + ): """When SFU is full, connect() should retry with migrating_from_list.""" - cm = self._make_connection_manager(max_join_retries=2) - + cm = connection_manager call_count = 0 received_migrating_from_list = [] @@ -49,7 +58,6 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): received_migrating_from_list.append(migrating_from_list) if call_count <= 2: - # Simulate SFU assigning an edge_name before failing mock_join_response = MagicMock() mock_join_response.credentials.server.edge_name = ( f"sfu-node-{call_count}" @@ -60,27 +68,22 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): error_code=models_pb2.ERROR_CODE_SFU_FULL, should_retry=True, ) - # Third attempt succeeds cm.running = True cm._connect_internal = mock_connect_internal - cm._connect_coordinator_ws = AsyncMock() await cm.connect() assert call_count == 3 - # First attempt: no failed SFUs assert received_migrating_from_list[0] is None - # Second attempt: first SFU in the exclude list assert "sfu-node-1" in received_migrating_from_list[1] - # Third attempt: both SFUs in the exclude list assert received_migrating_from_list[2] == ["sfu-node-1", "sfu-node-2"] @pytest.mark.asyncio - @patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff) - async def test_raises_after_all_retries_exhausted(self): + @pytest.mark.parametrize("connection_manager", [1], indirect=True) + async def test_raises_after_all_retries_exhausted(self, connection_manager): """When all retries are exhausted, connect() should raise SfuJoinError.""" - cm = self._make_connection_manager(max_join_retries=1) + cm = connection_manager async def always_fail(migrating_from_list=None, **kwargs): mock_join_response = MagicMock() @@ -93,16 +96,14 @@ async def always_fail(migrating_from_list=None, **kwargs): ) cm._connect_internal = always_fail - cm._connect_coordinator_ws = AsyncMock() with pytest.raises(SfuJoinError): await cm.connect() @pytest.mark.asyncio - async def test_non_retryable_error_propagates_immediately(self): + async def test_non_retryable_error_propagates_immediately(self, connection_manager): """Non-retryable errors should not trigger retry.""" - cm = self._make_connection_manager(max_join_retries=3) - + cm = connection_manager call_count = 0 async def fail_with_generic_error(migrating_from_list=None, **kwargs): @@ -111,27 +112,23 @@ async def fail_with_generic_error(migrating_from_list=None, **kwargs): raise SfuConnectionError("something went wrong") cm._connect_internal = fail_with_generic_error - cm._connect_coordinator_ws = AsyncMock() with pytest.raises(SfuConnectionError): await cm.connect() - # Should not retry — only one call assert call_count == 1 @pytest.mark.asyncio - @patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff) - async def test_cleans_up_ws_client_between_retries(self): + @pytest.mark.parametrize("connection_manager", [1], indirect=True) + async def test_cleans_up_ws_client_between_retries(self, connection_manager): """Partial WS state should be cleaned up before retry.""" - cm = self._make_connection_manager(max_join_retries=1) - + cm = connection_manager call_count = 0 async def mock_connect_internal(migrating_from_list=None, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: - # Simulate partial WS connection cm._ws_client = MagicMock() mock_join_response = MagicMock() mock_join_response.credentials.server.edge_name = "sfu-node-1" @@ -141,11 +138,9 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): error_code=models_pb2.ERROR_CODE_SFU_FULL, should_retry=True, ) - # Second attempt: ws_client should have been cleaned up cm.running = True cm._connect_internal = mock_connect_internal - cm._connect_coordinator_ws = AsyncMock() await cm.connect() From ab8a27cb1cb1c0a51c04df00edead568dd9b1a06 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 16:38:29 +0400 Subject: [PATCH 16/25] refactor: use fixtures in test_connection_utils, snapshot mutable list in test_connection_manager Extract mock_ws_client and coordinator_request fixtures to reduce duplication. Snapshot migrating_from_list per attempt to avoid mutable reference aliasing in assertions. --- tests/test_connection_manager.py | 6 +- tests/test_connection_utils.py | 185 ++++++++++++++----------------- 2 files changed, 87 insertions(+), 104 deletions(-) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index 8cf7f45d..5bbd0e39 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -55,7 +55,9 @@ async def test_retries_on_sfu_join_error_and_passes_failed_sfus( async def mock_connect_internal(migrating_from_list=None, **kwargs): nonlocal call_count call_count += 1 - received_migrating_from_list.append(migrating_from_list) + received_migrating_from_list.append( + list(migrating_from_list) if migrating_from_list else None + ) if call_count <= 2: mock_join_response = MagicMock() @@ -76,7 +78,7 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): assert call_count == 3 assert received_migrating_from_list[0] is None - assert "sfu-node-1" in received_migrating_from_list[1] + assert received_migrating_from_list[1] == ["sfu-node-1"] assert received_migrating_from_list[2] == ["sfu-node-1", "sfu-node-2"] @pytest.mark.asyncio diff --git a/tests/test_connection_utils.py b/tests/test_connection_utils.py index 6043c4fb..ff04a0ca 100644 --- a/tests/test_connection_utils.py +++ b/tests/test_connection_utils.py @@ -5,146 +5,127 @@ connect_websocket, ConnectionOptions, SfuConnectionError, + SfuJoinError, join_call_coordinator_request, ) from getstream.video.rtc.signaling import SignalingError from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 +@pytest.fixture +def mock_ws_client(): + """Patch WebSocketClient and yield the mock instance.""" + with patch("getstream.video.rtc.connection_utils.WebSocketClient") as mock_ws_cls: + mock_ws = AsyncMock() + mock_ws_cls.return_value = mock_ws + yield mock_ws + + +@pytest.fixture +def coordinator_request(): + """Set up a mock coordinator client that captures the request body.""" + mock_call = AsyncMock() + mock_call.call_type = "default" + mock_call.id = "test_call" + mock_call.client.stream.api_key = "key" + mock_call.client.stream.api_secret = "secret" + mock_call.client.stream.base_url = "https://test.url" + + captured_body = {} + + with patch("getstream.video.rtc.connection_utils.user_client") as mock_user_client: + mock_client = AsyncMock() + + async def capture_post(*args, **kwargs): + captured_body.update(kwargs.get("json", {})) + return AsyncMock() + + mock_client.post = capture_post + mock_user_client.return_value = mock_client + yield mock_call, captured_body + + class TestConnectWebsocket: @pytest.mark.asyncio - async def test_raises_sfu_join_error_on_sfu_full(self): + async def test_raises_sfu_join_error_on_sfu_full(self, mock_ws_client): """connect_websocket should raise SfuJoinError when SFU is full.""" - from getstream.video.rtc.connection_utils import SfuJoinError - - # Create a models_pb2.Error with SFU_FULL code sfu_error = models_pb2.Error( code=models_pb2.ERROR_CODE_SFU_FULL, message="server is full", should_retry=True, ) - signaling_error = SignalingError( - "Connection failed: server is full", error=sfu_error + mock_ws_client.connect = AsyncMock( + side_effect=SignalingError( + "Connection failed: server is full", error=sfu_error + ) ) - with patch( - "getstream.video.rtc.connection_utils.WebSocketClient" - ) as mock_ws_cls: - mock_ws = AsyncMock() - mock_ws.connect = AsyncMock(side_effect=signaling_error) - mock_ws_cls.return_value = mock_ws - - with pytest.raises(SfuJoinError) as exc_info: - await connect_websocket( - token="test_token", - ws_url="wss://test.url", - session_id="test_session", - options=ConnectionOptions(), - ) - - assert exc_info.value.error_code == models_pb2.ERROR_CODE_SFU_FULL - assert exc_info.value.should_retry is True - # SfuJoinError should be a subclass of SfuConnectionError - assert isinstance(exc_info.value, SfuConnectionError) + with pytest.raises(SfuJoinError) as exc_info: + await connect_websocket( + token="test_token", + ws_url="wss://test.url", + session_id="test_session", + options=ConnectionOptions(), + ) + + assert exc_info.value.error_code == models_pb2.ERROR_CODE_SFU_FULL + assert exc_info.value.should_retry is True + assert isinstance(exc_info.value, SfuConnectionError) @pytest.mark.asyncio - async def test_non_retryable_error_propagates_as_signaling_error(self): + async def test_non_retryable_error_propagates_as_signaling_error( + self, mock_ws_client + ): """Non-retryable SignalingError should not become SfuJoinError.""" - from getstream.video.rtc.connection_utils import SfuJoinError - - # Error with non-retryable code (e.g. permission denied) sfu_error = models_pb2.Error( code=models_pb2.ERROR_CODE_PERMISSION_DENIED, message="permission denied", should_retry=False, ) - signaling_error = SignalingError( - "Connection failed: permission denied", error=sfu_error + mock_ws_client.connect = AsyncMock( + side_effect=SignalingError( + "Connection failed: permission denied", error=sfu_error + ) ) - with patch( - "getstream.video.rtc.connection_utils.WebSocketClient" - ) as mock_ws_cls: - mock_ws = AsyncMock() - mock_ws.connect = AsyncMock(side_effect=signaling_error) - mock_ws_cls.return_value = mock_ws - - with pytest.raises(SignalingError) as exc_info: - await connect_websocket( - token="test_token", - ws_url="wss://test.url", - session_id="test_session", - options=ConnectionOptions(), - ) + with pytest.raises(SignalingError) as exc_info: + await connect_websocket( + token="test_token", + ws_url="wss://test.url", + session_id="test_session", + options=ConnectionOptions(), + ) - assert not isinstance(exc_info.value, SfuJoinError) + assert not isinstance(exc_info.value, SfuJoinError) class TestJoinCallCoordinatorRequest: @pytest.mark.asyncio - async def test_includes_migrating_from_in_body(self): + async def test_includes_migrating_from_in_body(self, coordinator_request): """migrating_from and migrating_from_list should be included in the request body.""" - mock_call = AsyncMock() - mock_call.call_type = "default" - mock_call.id = "test_call" - mock_call.client.stream.api_key = "key" - mock_call.client.stream.api_secret = "secret" - mock_call.client.stream.base_url = "https://test.url" - - captured_body = {} - - with patch( - "getstream.video.rtc.connection_utils.user_client" - ) as mock_user_client: - mock_client = AsyncMock() - - async def capture_post(*args, **kwargs): - captured_body.update(kwargs.get("json", {})) - return AsyncMock() - - mock_client.post = capture_post - mock_user_client.return_value = mock_client - - await join_call_coordinator_request( - call=mock_call, - user_id="user1", - location="auto", - migrating_from="sfu-london-1", - migrating_from_list=["sfu-london-1", "sfu-paris-2"], - ) + mock_call, captured_body = coordinator_request + + await join_call_coordinator_request( + call=mock_call, + user_id="user1", + location="auto", + migrating_from="sfu-london-1", + migrating_from_list=["sfu-london-1", "sfu-paris-2"], + ) assert captured_body["migrating_from"] == "sfu-london-1" assert captured_body["migrating_from_list"] == ["sfu-london-1", "sfu-paris-2"] @pytest.mark.asyncio - async def test_omits_migrating_from_when_not_provided(self): + async def test_omits_migrating_from_when_not_provided(self, coordinator_request): """migrating_from should not appear in body when not provided.""" - mock_call = AsyncMock() - mock_call.call_type = "default" - mock_call.id = "test_call" - mock_call.client.stream.api_key = "key" - mock_call.client.stream.api_secret = "secret" - mock_call.client.stream.base_url = "https://test.url" - - captured_body = {} - - with patch( - "getstream.video.rtc.connection_utils.user_client" - ) as mock_user_client: - mock_client = AsyncMock() - - async def capture_post(*args, **kwargs): - captured_body.update(kwargs.get("json", {})) - return AsyncMock() - - mock_client.post = capture_post - mock_user_client.return_value = mock_client - - await join_call_coordinator_request( - call=mock_call, - user_id="user1", - location="auto", - ) + mock_call, captured_body = coordinator_request + + await join_call_coordinator_request( + call=mock_call, + user_id="user1", + location="auto", + ) assert "migrating_from" not in captured_body assert "migrating_from_list" not in captured_body From df9a07cdf346a90495cd21b9caf988274c6bd596 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 17 Mar 2026 16:39:34 +0400 Subject: [PATCH 17/25] test: assert ws_client cleanup behavior, not just retry count --- tests/test_connection_manager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index 5bbd0e39..562eaeb6 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -127,11 +127,13 @@ async def test_cleans_up_ws_client_between_retries(self, connection_manager): cm = connection_manager call_count = 0 + first_ws_client = MagicMock() + async def mock_connect_internal(migrating_from_list=None, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: - cm._ws_client = MagicMock() + cm._ws_client = first_ws_client mock_join_response = MagicMock() mock_join_response.credentials.server.edge_name = "sfu-node-1" cm.join_response = mock_join_response @@ -147,3 +149,5 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): await cm.connect() assert call_count == 2 + first_ws_client.close.assert_called_once() + assert cm._ws_client is None From de6d889bf917bb2da4a22783f5b8c1a8f8e83a1c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 18 Mar 2026 10:22:31 +0400 Subject: [PATCH 18/25] refactor: remove sleep param from exp_backoff, keep sleep in caller Keeps exp_backoff as a pure delay generator, consistent with how coordinator/ws.py uses it. The caller (connect()) handles asyncio.sleep explicitly. --- getstream/video/rtc/connection_manager.py | 7 ++--- getstream/video/rtc/coordinator/backoff.py | 9 +----- tests/rtc/coordinator/test_backoff.py | 33 ---------------------- tests/test_connection_manager.py | 8 ++++-- 4 files changed, 10 insertions(+), 47 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 01a149ca..d0336dcd 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -471,11 +471,10 @@ def _on_coordinator_task_done(task: asyncio.Task): self._handle_join_failure(e, attempt, failed_sfus) # Retries with exponential backoff, requesting a different SFU - async for delay in exp_backoff( - max_retries=self._max_join_retries, base=0.5, sleep=True - ): + async for delay in exp_backoff(max_retries=self._max_join_retries, base=0.5): attempt += 1 - logger.info(f"Retrying with different SFU (waited {delay}s)...") + logger.info(f"Retrying in {delay}s with different SFU...") + await asyncio.sleep(delay) try: await self._connect_internal( migrating_from_list=failed_sfus if failed_sfus else None, diff --git a/getstream/video/rtc/coordinator/backoff.py b/getstream/video/rtc/coordinator/backoff.py index c1d59f77..9d990c6b 100644 --- a/getstream/video/rtc/coordinator/backoff.py +++ b/getstream/video/rtc/coordinator/backoff.py @@ -5,7 +5,6 @@ when reconnecting to failed WebSocket connections. """ -import asyncio import logging from typing import AsyncIterator @@ -13,10 +12,7 @@ async def exp_backoff( - max_retries: int, - base: float = 1.0, - factor: float = 2.0, - sleep: bool = False, + max_retries: int, base: float = 1.0, factor: float = 2.0 ) -> AsyncIterator[float]: """ Generate exponential backoff delays for retry attempts. @@ -25,7 +21,6 @@ async def exp_backoff( max_retries: Maximum number of retry attempts base: Base delay in seconds for the first retry factor: Multiplicative factor for each subsequent retry - sleep: If True, sleep for the delay before yielding Yields: float: Delay in seconds for each retry attempt @@ -44,6 +39,4 @@ async def exp_backoff( for attempt in range(max_retries): delay = base * (factor**attempt) logger.debug(f"Backoff attempt {attempt + 1}/{max_retries}: {delay}s delay") - if sleep: - await asyncio.sleep(delay) yield delay diff --git a/tests/rtc/coordinator/test_backoff.py b/tests/rtc/coordinator/test_backoff.py index 563ffffc..985f03b5 100644 --- a/tests/rtc/coordinator/test_backoff.py +++ b/tests/rtc/coordinator/test_backoff.py @@ -3,7 +3,6 @@ """ import pytest -from unittest.mock import patch, AsyncMock from getstream.video.rtc.coordinator.backoff import exp_backoff @@ -85,35 +84,3 @@ async def test_exp_backoff_fractional_factor(): assert actual_delays == expected_delays, ( f"Expected delays {expected_delays}, but got {actual_delays}" ) - - -@pytest.mark.asyncio -async def test_exp_backoff_sleep(): - """Test that sleep=True calls asyncio.sleep with each delay.""" - expected_delays = [0.5, 1.0, 2.0] - - with patch( - "getstream.video.rtc.coordinator.backoff.asyncio.sleep", - new_callable=AsyncMock, - ) as mock_sleep: - actual_delays = [] - async for delay in exp_backoff(max_retries=3, base=0.5, sleep=True): - actual_delays.append(delay) - - assert actual_delays == expected_delays - assert mock_sleep.await_count == 3 - for expected, call in zip(expected_delays, mock_sleep.await_args_list): - assert call.args[0] == expected - - -@pytest.mark.asyncio -async def test_exp_backoff_no_sleep_by_default(): - """Test that sleep=False (default) does not call asyncio.sleep.""" - with patch( - "getstream.video.rtc.coordinator.backoff.asyncio.sleep", - new_callable=AsyncMock, - ) as mock_sleep: - async for _ in exp_backoff(max_retries=3): - pass - - mock_sleep.assert_not_awaited() diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index 562eaeb6..5178c4a0 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -6,8 +6,8 @@ from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 -async def _instant_backoff(max_retries, base=1.0, factor=2.0, sleep=False): - """exp_backoff replacement that never sleeps.""" +async def _instant_backoff(max_retries, base=1.0, factor=2.0): + """exp_backoff replacement that yields without delay.""" for attempt in range(max_retries): yield base * (factor**attempt) @@ -28,6 +28,10 @@ def connection_manager(request): patch("getstream.video.rtc.connection_manager.ParticipantsState"), patch("getstream.video.rtc.connection_manager.Tracer"), patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff), + patch( + "getstream.video.rtc.connection_manager.asyncio.sleep", + new_callable=AsyncMock, + ), ): mock_call = MagicMock() mock_call.call_type = "default" From 23dc2f2d19db44a414dbd2b1cc5f423f13bb085f Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 18 Mar 2026 17:01:27 +0400 Subject: [PATCH 19/25] refactor: remove redundant _instant_backoff test helper exp_backoff is already a pure generator with no sleep, so patching it with an identical helper added no value. asyncio.sleep is mocked separately. --- tests/test_connection_manager.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index 5178c4a0..dc68c716 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -6,12 +6,6 @@ from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 -async def _instant_backoff(max_retries, base=1.0, factor=2.0): - """exp_backoff replacement that yields without delay.""" - for attempt in range(max_retries): - yield base * (factor**attempt) - - @pytest.fixture def connection_manager(request): """Create a ConnectionManager with mocked heavy dependencies. @@ -27,7 +21,6 @@ def connection_manager(request): patch("getstream.video.rtc.connection_manager.SubscriptionManager"), patch("getstream.video.rtc.connection_manager.ParticipantsState"), patch("getstream.video.rtc.connection_manager.Tracer"), - patch("getstream.video.rtc.connection_manager.exp_backoff", _instant_backoff), patch( "getstream.video.rtc.connection_manager.asyncio.sleep", new_callable=AsyncMock, From dd998132b9e390c43959fb34720dc1cf0a57a988 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 18 Mar 2026 17:20:25 +0400 Subject: [PATCH 20/25] test: assert retry count in exhausted-retries test --- tests/test_connection_manager.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index dc68c716..ed7f8710 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -83,8 +83,11 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): async def test_raises_after_all_retries_exhausted(self, connection_manager): """When all retries are exhausted, connect() should raise SfuJoinError.""" cm = connection_manager + call_count = 0 async def always_fail(migrating_from_list=None, **kwargs): + nonlocal call_count + call_count += 1 mock_join_response = MagicMock() mock_join_response.credentials.server.edge_name = "sfu-node-1" cm.join_response = mock_join_response @@ -99,6 +102,8 @@ async def always_fail(migrating_from_list=None, **kwargs): with pytest.raises(SfuJoinError): await cm.connect() + assert call_count == 2 # 1 initial + 1 retry + @pytest.mark.asyncio async def test_non_retryable_error_propagates_immediately(self, connection_manager): """Non-retryable errors should not trigger retry.""" From 66708cdffa31ec03fd028c28332ea4543b72924d Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Wed, 18 Mar 2026 17:27:54 +0400 Subject: [PATCH 21/25] refactor: extract _connect_with_sfu_reassignment from connect() Separates coordinator setup from SFU retry logic, making connect() easier to read. --- getstream/video/rtc/connection_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index d0336dcd..3302f1ab 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -458,6 +458,10 @@ def _on_coordinator_task_done(task: asyncio.Task): self._coordinator_task.add_done_callback(_on_coordinator_task_done) + await self._connect_with_sfu_reassignment() + + async def _connect_with_sfu_reassignment(self) -> None: + """Try connecting to SFU, reassigning to a different one on failure.""" failed_sfus: list[str] = [] last_error: Optional[SfuJoinError] = None From 41ec538e511017e4635fb70235543509179caa25 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 19 Mar 2026 02:34:08 +0400 Subject: [PATCH 22/25] chore: add utility script for testing SFU connection and retry behavior --- scripts/test_sfu_connect.py | 96 +++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 scripts/test_sfu_connect.py diff --git a/scripts/test_sfu_connect.py b/scripts/test_sfu_connect.py new file mode 100644 index 00000000..cff1f690 --- /dev/null +++ b/scripts/test_sfu_connect.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Utility script for testing SFU connection and retry behavior. + +Connects to a call as a given user and logs each step of the connection +process — useful for verifying SFU assignment, retry on transient errors +(e.g. SFU_FULL), and reassignment via the coordinator. + +Environment variables +--------------------- +STREAM_API_KEY — Stream API key (required) +STREAM_API_SECRET — Stream API secret (required) +STREAM_BASE_URL — Coordinator URL (default: Stream cloud). + Set to http://127.0.0.1:3030 for a local coordinator. +USER_ID — User ID to join as (default: "test-user"). +CALL_TYPE — Call type (default: "default"). +CALL_ID — Call ID. If not set, a random UUID is generated. + +Usage +----- + # Connect via cloud coordinator + STREAM_API_KEY=... STREAM_API_SECRET=... \\ + uv run --extra webrtc python scripts/test_sfu_connect.py + + # Connect via local coordinator + STREAM_BASE_URL=http://127.0.0.1:3030 \\ + uv run --extra webrtc python scripts/test_sfu_connect.py +""" + +import asyncio +import logging +import os +import uuid + +from dotenv import load_dotenv + +from getstream import AsyncStream +from getstream.models import CallRequest +from getstream.video.rtc import ConnectionManager + +load_dotenv() + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +async def run(): + base_url = os.getenv("STREAM_BASE_URL") + user_id = os.getenv("USER_ID", "test-user") + call_type = os.getenv("CALL_TYPE", "default") + call_id = os.getenv("CALL_ID", str(uuid.uuid4())) + + logger.info("Configuration:") + logger.info(f" Coordinator: {base_url or 'cloud (default)'}") + logger.info(f" User: {user_id}") + logger.info(f" Call: {call_type}:{call_id}") + + client_kwargs = {} + if base_url: + client_kwargs["base_url"] = base_url + + client = AsyncStream(timeout=10.0, **client_kwargs) + + call = client.video.call(call_type, call_id) + logger.info("Creating call...") + await call.get_or_create(data=CallRequest(created_by_id=user_id)) + logger.info("Call created") + + cm = ConnectionManager( + call=call, + user_id=user_id, + create=False, + ) + + logger.info("Connecting to SFU...") + + async with cm: + join = cm.join_response + if join and join.credentials: + logger.info(f"Connected to SFU: {join.credentials.server.edge_name}") + logger.info(f" WS endpoint: {join.credentials.server.ws_endpoint}") + logger.info(f" Session ID: {cm.session_id}") + + logger.info("Holding connection for 3s...") + await asyncio.sleep(3) + + logger.info("Leaving call") + + logger.info("Done") + + +if __name__ == "__main__": + asyncio.run(run()) From 33d18b9eef915308cb93cf247743f64bcef5c262 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 19 Mar 2026 13:50:15 +0400 Subject: [PATCH 23/25] refactor: extract last_failed variable for clarity in _connect_internal --- getstream/video/rtc/connection_manager.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 3302f1ab..2d7366e0 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -323,15 +323,14 @@ async def _connect_internal( if not (ws_url or token): if self.user_id is None: raise ValueError("user_id is required for joining a call") + last_failed = migrating_from_list[-1] if migrating_from_list else None join_response = await join_call( self.call, self.user_id, "auto", self.create, self.local_sfu, - migrating_from=migrating_from_list[-1] - if migrating_from_list - else None, + migrating_from=last_failed, migrating_from_list=migrating_from_list, **self.kwargs, ) From 2a75703ca1355125a3750f634a8d5618532c5383 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 19 Mar 2026 14:42:21 +0400 Subject: [PATCH 24/25] refactor: raise directly from except instead of tracking last_error Eliminates the Optional[SfuJoinError] variable and TYPE_CHECKING assert by raising from the except block when retries are exhausted. --- getstream/video/rtc/connection_manager.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 2d7366e0..b0823007 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -462,7 +462,6 @@ def _on_coordinator_task_done(task: asyncio.Task): async def _connect_with_sfu_reassignment(self) -> None: """Try connecting to SFU, reassigning to a different one on failure.""" failed_sfus: list[str] = [] - last_error: Optional[SfuJoinError] = None # First attempt without delay attempt = 0 @@ -470,8 +469,9 @@ async def _connect_with_sfu_reassignment(self) -> None: await self._connect_internal() return except SfuJoinError as e: - last_error = e self._handle_join_failure(e, attempt, failed_sfus) + if self._max_join_retries == 0: + raise # Retries with exponential backoff, requesting a different SFU async for delay in exp_backoff(max_retries=self._max_join_retries, base=0.5): @@ -484,10 +484,9 @@ async def _connect_with_sfu_reassignment(self) -> None: ) return except SfuJoinError as e: - last_error = e self._handle_join_failure(e, attempt, failed_sfus) - - raise last_error # type: ignore[misc] + if attempt >= self._max_join_retries: + raise def _handle_join_failure( self, error: SfuJoinError, attempt: int, failed_sfus: list[str] From ab392f2310a33069adeb3dc33d7618130cbf17cc Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 19 Mar 2026 14:49:18 +0400 Subject: [PATCH 25/25] feat: validate max_join_retries and extract patched_dependencies helper Raise ValueError if max_join_retries < 0 instead of silently clamping. Extract shared patch context manager to reduce duplication in tests. --- getstream/video/rtc/connection_manager.py | 2 ++ tests/test_connection_manager.py | 31 ++++++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index b0823007..25f59a8f 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -71,6 +71,8 @@ def __init__( self.session_id: str = str(uuid.uuid4()) self.join_response: Optional[JoinCallResponse] = None self.local_sfu: bool = False # Local SFU flag for development + if max_join_retries < 0: + raise ValueError("max_join_retries must be >= 0") self._max_join_retries: int = max_join_retries # Private attributes diff --git a/tests/test_connection_manager.py b/tests/test_connection_manager.py index ed7f8710..91e8bbfe 100644 --- a/tests/test_connection_manager.py +++ b/tests/test_connection_manager.py @@ -1,3 +1,5 @@ +import contextlib + import pytest from unittest.mock import AsyncMock, patch, MagicMock @@ -6,13 +8,9 @@ from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 -@pytest.fixture -def connection_manager(request): - """Create a ConnectionManager with mocked heavy dependencies. - - Accepts max_join_retries via indirect parametrize, defaults to 3. - """ - max_join_retries = getattr(request, "param", 3) +@contextlib.contextmanager +def patched_dependencies(): + """Patch heavy ConnectionManager dependencies for unit testing.""" with ( patch("getstream.video.rtc.connection_manager.PeerConnectionManager"), patch("getstream.video.rtc.connection_manager.NetworkMonitor"), @@ -26,6 +24,17 @@ def connection_manager(request): new_callable=AsyncMock, ), ): + yield + + +@pytest.fixture +def connection_manager(request): + """Create a ConnectionManager with mocked heavy dependencies. + + Accepts max_join_retries via indirect parametrize, defaults to 3. + """ + max_join_retries = getattr(request, "param", 3) + with patched_dependencies(): mock_call = MagicMock() mock_call.call_type = "default" mock_call.id = "test_call" @@ -153,3 +162,11 @@ async def mock_connect_internal(migrating_from_list=None, **kwargs): assert call_count == 2 first_ws_client.close.assert_called_once() assert cm._ws_client is None + + def test_rejects_negative_max_join_retries(self): + """max_join_retries must be >= 0.""" + with ( + patched_dependencies(), + pytest.raises(ValueError, match="max_join_retries must be >= 0"), + ): + ConnectionManager(call=MagicMock(), user_id="user1", max_join_retries=-1)