From 26adcf45248f016195eaa9c481ebf8b83ca4ac5d Mon Sep 17 00:00:00 2001 From: saiiiii <49656052+saiprasanth-git@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:55:12 -0500 Subject: [PATCH 1/2] fix(langchain-openai): use per-loop cache for async httpx client to prevent cross-loop errors Implement caching for async httpx clients per event loop using WeakValueDictionary to prevent memory leaks and 'Event loop is closed' errors in multi-threaded environments. --- .../chat_models/_client_utils.py | 51 ++++++++++++++++--- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py index 4a0efce9e1bfe..fab29e04e6921 100644 --- a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py +++ b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py @@ -11,6 +11,7 @@ import asyncio import inspect import os +import weakref from collections.abc import Awaitable, Callable from functools import lru_cache from typing import Any, cast @@ -75,11 +76,43 @@ def _cached_sync_httpx_client( return _build_sync_httpx_client(base_url, timeout) -@lru_cache +# Cache async httpx clients per event loop to avoid cross-loop connection reuse. +# httpx.AsyncClient connections are bound to the event loop they were created on. +# Using a process-global @lru_cache would cause 'Event loop is closed' errors when +# asyncio.run() is called multiple times (e.g. in multi-threaded environments, Celery +# workers, or sequential asyncio.run() calls), because each call creates and later +# closes a new event loop while the cached client still holds connections to the +# previous (now-closed) loop. +# +# Using WeakValueDictionary ensures that clients are automatically cleaned up when +# their associated event loop is garbage collected, preventing memory leaks. +_async_httpx_client_cache: weakref.WeakValueDictionary = ( + weakref.WeakValueDictionary() +) + + def _cached_async_httpx_client( base_url: str | None, timeout: Any ) -> _AsyncHttpxClientWrapper: - return _build_async_httpx_client(base_url, timeout) + """Get a cached async httpx client scoped to the current event loop. + + Unlike sync clients, async httpx clients cannot be safely shared across + different event loops. This function uses the current loop's identity as + part of the cache key, so each event loop gets its own client instance. + """ + try: + loop = asyncio.get_running_loop() + loop_id = id(loop) + except RuntimeError: + # No running event loop; fall back to creating a fresh client. + return _build_async_httpx_client(base_url, timeout) + + cache_key = (loop_id, base_url, timeout) + client = _async_httpx_client_cache.get(cache_key) + if client is None: + client = _build_async_httpx_client(base_url, timeout) + _async_httpx_client_cache[cache_key] = client + return client def _get_default_httpx_client( @@ -100,9 +133,14 @@ def _get_default_httpx_client( def _get_default_async_httpx_client( base_url: str | None, timeout: Any ) -> _AsyncHttpxClientWrapper: - """Get default httpx client. + """Get default async httpx client, scoped to the current event loop. - Uses cached client unless timeout is `httpx.Timeout`, which is not hashable. + Async httpx clients are bound to the event loop they were created on, so + they cannot be safely shared across different event loops. This function + returns a client that is cached per-loop to avoid 'Event loop is closed' + errors in multi-threaded or multi-loop environments. + + Uses a fresh (uncached) client when timeout is not hashable. """ try: hash(timeout) @@ -117,8 +155,8 @@ def _resolve_sync_and_async_api_keys( ) -> tuple[str | None | Callable[[], str], str | Callable[[], Awaitable[str]]]: """Resolve sync and async API key values. - Because OpenAI and AsyncOpenAI clients support either sync or async callables for - the API key, we need to resolve separate values here. + Because OpenAI and AsyncOpenAI clients support either sync or async callables + for the API key, we need to resolve separate values here. """ if isinstance(api_key, SecretStr): sync_api_key_value: str | None | Callable[[], str] = api_key.get_secret_value() @@ -138,5 +176,4 @@ async def async_api_key_wrapper() -> str: ) async_api_key_value = async_api_key_wrapper - return sync_api_key_value, async_api_key_value From ffd9ca7db3758a1652134979abe1ce9ebff87c47 Mon Sep 17 00:00:00 2001 From: saiiiii <49656052+saiprasanth-git@users.noreply.github.com> Date: Mon, 16 Mar 2026 11:02:20 -0500 Subject: [PATCH 2/2] fix(langchain-openai): use loop-aware proxy to prevent cross-loop httpx errors Introduced a proxy class for async httpx clients to manage per-event-loop instances, preventing 'Event loop is closed' errors in multi-threaded environments. Updated the caching mechanism for async clients to use this new proxy. --- .../chat_models/_client_utils.py | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py index fab29e04e6921..40c672a3c4285 100644 --- a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py +++ b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py @@ -16,6 +16,7 @@ from functools import lru_cache from typing import Any, cast +import httpx import openai from pydantic import SecretStr @@ -47,6 +48,44 @@ def __del__(self) -> None: pass +class _LoopAwareAsyncHttpxClientProxy: + """A proxy for async httpx clients that maintains per-event-loop instances. + + httpx.AsyncClient connections are bound to the event loop they were created + on. This proxy is cached at the (base_url, timeout) level via @lru_cache, + preserving fast O(1) init-time lookup. Internally it stores one + _AsyncHttpxClientWrapper per event loop in a WeakValueDictionary so that + clients are automatically garbage-collected when their loop closes. + + This avoids the 'Event loop is closed' RuntimeError that occurs when a + process-global @lru_cache'd client is reused across different event loops + (e.g. in multi-threaded environments, Celery workers, or sequential + asyncio.run() calls). + """ + + def __init__(self, base_url: str | None, timeout: Any) -> None: + self._base_url = base_url + self._timeout = timeout + self._loop_clients: weakref.WeakValueDictionary[ + int, _AsyncHttpxClientWrapper + ] = weakref.WeakValueDictionary() + + def get_client(self) -> _AsyncHttpxClientWrapper: + """Return the cached client for the current event loop, creating one if needed.""" + try: + loop = asyncio.get_running_loop() + loop_id = id(loop) + except RuntimeError: + # No running event loop; return a fresh client (won't be cached). + return _build_async_httpx_client(self._base_url, self._timeout) + + client = self._loop_clients.get(loop_id) + if client is None: + client = _build_async_httpx_client(self._base_url, self._timeout) + self._loop_clients[loop_id] = client + return client + + def _build_sync_httpx_client( base_url: str | None, timeout: Any ) -> _SyncHttpxClientWrapper: @@ -76,43 +115,17 @@ def _cached_sync_httpx_client( return _build_sync_httpx_client(base_url, timeout) -# Cache async httpx clients per event loop to avoid cross-loop connection reuse. -# httpx.AsyncClient connections are bound to the event loop they were created on. -# Using a process-global @lru_cache would cause 'Event loop is closed' errors when -# asyncio.run() is called multiple times (e.g. in multi-threaded environments, Celery -# workers, or sequential asyncio.run() calls), because each call creates and later -# closes a new event loop while the cached client still holds connections to the -# previous (now-closed) loop. -# -# Using WeakValueDictionary ensures that clients are automatically cleaned up when -# their associated event loop is garbage collected, preventing memory leaks. -_async_httpx_client_cache: weakref.WeakValueDictionary = ( - weakref.WeakValueDictionary() -) - - -def _cached_async_httpx_client( +@lru_cache +def _cached_async_httpx_client_proxy( base_url: str | None, timeout: Any -) -> _AsyncHttpxClientWrapper: - """Get a cached async httpx client scoped to the current event loop. +) -> _LoopAwareAsyncHttpxClientProxy: + """Return a per-(base_url, timeout) proxy cached by @lru_cache. - Unlike sync clients, async httpx clients cannot be safely shared across - different event loops. This function uses the current loop's identity as - part of the cache key, so each event loop gets its own client instance. + The proxy itself is lightweight and @lru_cache'd, so init-time overhead is + identical to the original implementation. The proxy internally dispatches + to a per-event-loop client, preventing cross-loop connection reuse. """ - try: - loop = asyncio.get_running_loop() - loop_id = id(loop) - except RuntimeError: - # No running event loop; fall back to creating a fresh client. - return _build_async_httpx_client(base_url, timeout) - - cache_key = (loop_id, base_url, timeout) - client = _async_httpx_client_cache.get(cache_key) - if client is None: - client = _build_async_httpx_client(base_url, timeout) - _async_httpx_client_cache[cache_key] = client - return client + return _LoopAwareAsyncHttpxClientProxy(base_url, timeout) def _get_default_httpx_client( @@ -138,7 +151,8 @@ def _get_default_async_httpx_client( Async httpx clients are bound to the event loop they were created on, so they cannot be safely shared across different event loops. This function returns a client that is cached per-loop to avoid 'Event loop is closed' - errors in multi-threaded or multi-loop environments. + errors in multi-threaded or multi-loop environments (e.g. Celery, sequential + asyncio.run() calls, or multi-threaded FastAPI handlers). Uses a fresh (uncached) client when timeout is not hashable. """ @@ -147,7 +161,7 @@ def _get_default_async_httpx_client( except TypeError: return _build_async_httpx_client(base_url, timeout) else: - return _cached_async_httpx_client(base_url, timeout) + return _cached_async_httpx_client_proxy(base_url, timeout).get_client() def _resolve_sync_and_async_api_keys(