From 9d5f3571192948a4062d3f84f34b09bbb5b33106 Mon Sep 17 00:00:00 2001 From: redpower5x5 Date: Wed, 11 Mar 2026 19:56:11 +0300 Subject: [PATCH 1/3] set connection limits, new retry for some errors --- flymyai/core/clients/AsyncClient.py | 36 +++++++++++++++++++++-------- flymyai/core/clients/SyncClient.py | 35 +++++++++++++++++++++------- flymyai/core/clients/base_client.py | 32 +++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 17 deletions(-) diff --git a/flymyai/core/clients/AsyncClient.py b/flymyai/core/clients/AsyncClient.py index b8664a3..d9161d6 100644 --- a/flymyai/core/clients/AsyncClient.py +++ b/flymyai/core/clients/AsyncClient.py @@ -8,7 +8,14 @@ ) from flymyai.core._streaming import SSEDecoder from flymyai.core.authorizations import APIKeyClientInfo -from flymyai.core.clients.base_client import BaseClient, _predict_timeout +from flymyai.core.clients.base_client import ( + BaseClient, + _predict_timeout, + _http2, + _limits, + _is_reconnectable_error, + _RECONNECT_RETRIES, +) from flymyai.core.exceptions import ( BaseFlyMyAIException, FlyMyAIOpenAPIException, @@ -25,11 +32,11 @@ from flymyai.multipart import MultipartPayload from flymyai.utils.utils import aretryable_callback - class BaseAsyncClient(BaseClient[httpx.AsyncClient]): def _construct_client(self): return httpx.AsyncClient( - http2=True, + http2=_http2, + limits=_limits, headers=self.client_info.authorization_headers, base_url=os.getenv("FLYMYAI_DSN", "https://api.flymy.ai/"), timeout=_predict_timeout, @@ -44,11 +51,20 @@ async def _reconnect_client(self): self._client = self._construct_client() async def _awith_reconnect(self, fn): - try: - return await fn() - except httpx.RemoteProtocolError: - await self._reconnect_client() - return await fn() + last_exc = None + for attempt in range(1 + _RECONNECT_RETRIES): + try: + return await fn() + except BaseException as e: + last_exc = e + if not _is_reconnectable_error(e): + raise + if attempt < _RECONNECT_RETRIES: + await self._reconnect_client() + continue + raise + assert last_exc is not None + raise last_exc async def __aenter__(self): return self @@ -236,7 +252,9 @@ async def _stream(self, client_info: APIKeyClientInfo, payload: dict): except BaseFlyMyAIException as e: raise FlyMyAIPredictException.from_base_exception(e) yield response - except httpx.RemoteProtocolError: + except BaseException as e: + if not _is_reconnectable_error(e): + raise await self._reconnect_client() stream_iterator = self._stream_iterator( client_info, payload, is_long_stream=True diff --git a/flymyai/core/clients/SyncClient.py b/flymyai/core/clients/SyncClient.py index 596d60c..db0ec60 100644 --- a/flymyai/core/clients/SyncClient.py +++ b/flymyai/core/clients/SyncClient.py @@ -5,7 +5,14 @@ from flymyai.core._streaming import SSEDecoder from flymyai.core.authorizations import APIKeyClientInfo -from flymyai.core.clients.base_client import BaseClient, _predict_timeout +from flymyai.core.clients.base_client import ( + BaseClient, + _predict_timeout, + _http2, + _limits, + _is_reconnectable_error, + _RECONNECT_RETRIES, +) from flymyai.core.exceptions import ( BaseFlyMyAIException, FlyMyAIOpenAPIException, @@ -29,18 +36,28 @@ class BaseSyncClient(BaseClient[httpx.Client]): def _construct_client(self): return httpx.Client( - http2=True, + http2=_http2, + limits=_limits, headers=self.client_info.authorization_headers, base_url=os.getenv("FLYMYAI_DSN", "https://api.flymy.ai/"), timeout=_predict_timeout, ) def _with_reconnect(self, fn): - try: - return fn() - except httpx.RemoteProtocolError: - self._reconnect_client() - return fn() + last_exc = None + for attempt in range(1 + _RECONNECT_RETRIES): + try: + return fn() + except BaseException as e: + last_exc = e + if not _is_reconnectable_error(e): + raise + if attempt < _RECONNECT_RETRIES: + self._reconnect_client() + continue + raise + assert last_exc is not None + raise last_exc def __enter__(self): return self @@ -165,7 +182,9 @@ def _stream(self, client_info: APIKeyClientInfo, payload: dict): except BaseFlyMyAIException as e: raise FlyMyAIPredictException.from_base_exception(e) yield response - except httpx.RemoteProtocolError: + except BaseException as e: + if not _is_reconnectable_error(e): + raise self._reconnect_client() response_iterator = self._stream_iterator( client_info, payload, is_long_stream=True diff --git a/flymyai/core/clients/base_client.py b/flymyai/core/clients/base_client.py index 5cad66a..a12e278 100644 --- a/flymyai/core/clients/base_client.py +++ b/flymyai/core/clients/base_client.py @@ -35,6 +35,31 @@ "_PossibleClients", bound=Union[httpx.Client, httpx.AsyncClient] ) +# Connection-style errors that warrant one reconnect + retry (high RPS / HTTP2 drops) +_CONNECT_RECONNECT_EXC = ( + httpx.RemoteProtocolError, + httpx.WriteError, + httpx.ReadError, + httpx.ConnectError, +) + +# How many times to reconnect and retry on connection/stream errors (1 initial + this many retries) +_RECONNECT_RETRIES = int(os.getenv("FMA_RECONNECT_RETRIES", "2")) + + +def _is_reconnectable_error(exc: BaseException) -> bool: + if isinstance(exc, _CONNECT_RECONNECT_EXC): + return True + if isinstance(exc, RuntimeError) and "client has been closed" in str(exc).lower(): + return True + if type(exc).__name__ in ("ClosedResourceError", "BrokenResourceError"): + return True + # HTTP/2 connection closed then reused (e.g. "ConnectionState.CLOSED", "SEND_SETTINGS", "StreamReset") + msg = str(exc) + if "ConnectionState.CLOSED" in msg or "SEND_SETTINGS" in msg or "StreamReset" in msg: + return True + return False + _predict_timeout = httpx.Timeout( connect=int(os.getenv("FMA_CONNECT_TIMEOUT", 999999)), @@ -43,6 +68,13 @@ pool=int(os.getenv("FMA_POOL_TIMEOUT", 999999)), ) +_http2 = os.getenv("FLYMYAI_HTTP2", "true").lower() in ("1", "true", "yes") +_limits = httpx.Limits( + max_connections=int(os.getenv("FMA_MAX_CONNECTIONS", "100")), + max_keepalive_connections=int(os.getenv("FMA_MAX_KEEPALIVE_CONNECTIONS", "50")), + keepalive_expiry=float(os.getenv("FMA_KEEPALIVE_EXPIRY", "60")), +) + class BaseClient(Generic[_PossibleClients]): """ From 5d024d22a488bc9fa682a523b59c8657386c1c2c Mon Sep 17 00:00:00 2001 From: redpower5x5 Date: Wed, 11 Mar 2026 21:26:35 +0300 Subject: [PATCH 2/3] increase retries count --- flymyai/core/clients/base_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flymyai/core/clients/base_client.py b/flymyai/core/clients/base_client.py index a12e278..3191500 100644 --- a/flymyai/core/clients/base_client.py +++ b/flymyai/core/clients/base_client.py @@ -44,7 +44,7 @@ ) # How many times to reconnect and retry on connection/stream errors (1 initial + this many retries) -_RECONNECT_RETRIES = int(os.getenv("FMA_RECONNECT_RETRIES", "2")) +_RECONNECT_RETRIES = int(os.getenv("FMA_RECONNECT_RETRIES", "3")) def _is_reconnectable_error(exc: BaseException) -> bool: From 32757992de7b3605fb53508e9665a877249edf1f Mon Sep 17 00:00:00 2001 From: n-freman Date: Mon, 16 Mar 2026 14:11:51 +0500 Subject: [PATCH 3/3] lint --- flymyai/core/clients/AsyncClient.py | 1 + flymyai/core/clients/base_client.py | 6 +++++- flymyai/core/clients/m1AsyncClient.py | 4 +--- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flymyai/core/clients/AsyncClient.py b/flymyai/core/clients/AsyncClient.py index d9161d6..313e4ee 100644 --- a/flymyai/core/clients/AsyncClient.py +++ b/flymyai/core/clients/AsyncClient.py @@ -32,6 +32,7 @@ from flymyai.multipart import MultipartPayload from flymyai.utils.utils import aretryable_callback + class BaseAsyncClient(BaseClient[httpx.AsyncClient]): def _construct_client(self): return httpx.AsyncClient( diff --git a/flymyai/core/clients/base_client.py b/flymyai/core/clients/base_client.py index 3191500..63e339a 100644 --- a/flymyai/core/clients/base_client.py +++ b/flymyai/core/clients/base_client.py @@ -56,7 +56,11 @@ def _is_reconnectable_error(exc: BaseException) -> bool: return True # HTTP/2 connection closed then reused (e.g. "ConnectionState.CLOSED", "SEND_SETTINGS", "StreamReset") msg = str(exc) - if "ConnectionState.CLOSED" in msg or "SEND_SETTINGS" in msg or "StreamReset" in msg: + if ( + "ConnectionState.CLOSED" in msg + or "SEND_SETTINGS" in msg + or "StreamReset" in msg + ): return True return False diff --git a/flymyai/core/clients/m1AsyncClient.py b/flymyai/core/clients/m1AsyncClient.py index 34491b7..46f2e1b 100644 --- a/flymyai/core/clients/m1AsyncClient.py +++ b/flymyai/core/clients/m1AsyncClient.py @@ -93,9 +93,7 @@ async def generation_task_result( ) -> FlyMyAIM1Response: while True: response = await self._awith_reconnect( - lambda: self._client.get( - self._populate_result_path(generation_task) - ) + lambda: self._client.get(self._populate_result_path(generation_task)) ) response.raise_for_status() response_data = response.json()