UN-3211 [FEAT] HTTP session lifecycle management for workers API clients#1782
UN-3211 [FEAT] HTTP session lifecycle management for workers API clients#1782muhammad-ali-e wants to merge 4 commits intomainfrom
Conversation
Summary by CodeRabbit
WalkthroughCentralizes API client and StateStore cleanup in finally blocks; adds singleton/shared HTTP session support with task-count reset and observability; introduces Celery signal handlers for lifecycle events; increases API client pool defaults; adds test fixtures and extensive session lifecycle tests. Changes
Sequence Diagram(s)mermaid Task->>API: setup_execution_context() => api_client Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Add _owns_session flag to prevent singleton shared session from being closed by individual clients - Wire API_CLIENT_POOL_SIZE into HTTPAdapter connection pools - Add idempotent close() and __del__ destructor to BaseAPIClient - Add try/finally cleanup in api-deployment and callback tasks - Add on_worker_process_shutdown hook and early-return guard in postrun - Add 25 unit tests for session lifecycle behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7f18370 to
0752a37
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@workers/shared/api/internal_client.py`:
- Around line 256-268: The reset_singleton method currently swallows exceptions
when closing cls._shared_session; change this to log the exception details
instead of silently passing so FD leaks/errors are visible—catch the Exception
around cls._shared_session.close() and call logger.exception or logger.error
with the exception/context (mentioning InternalAPIClient.reset_singleton and
cls._shared_session) before continuing to set cls._shared_session,
cls._shared_base_client, cls._initialization_count, and cls._task_counter to
None/0 and logging the reset completion.
- Around line 271-294: The non-atomic update in increment_task_counter can lose
counts under threaded/eventlet/gevent worker pools; make the method thread-safe
by adding a class-level lock (e.g., _task_counter_lock = threading.Lock()) and
wrapping the read/increment/check/reset sequence in a with _task_counter_lock:
block (import threading where needed) so the operations on _task_counter and
_last_reset_time and the call to reset_singleton() are atomic; alternatively, if
you require prefork-only deployments, add a precondition/assertion at the start
of increment_task_counter that the worker is running in prefork mode and skip
changes.
🧹 Nitpick comments (2)
workers/api-deployment/tasks.py (1)
263-269: Don’t swallow api_client.close() errors silently.
A debug/warn log helps diagnose lingering sessions if close fails.🔧 Suggested tweak
- try: - api_client.close() - except Exception: - pass + try: + api_client.close() + except Exception as e: + logger.debug("api_client.close() failed during cleanup: %s", e)workers/callback/tasks.py (1)
1508-1512: Avoid silent failures when closing API clients.
A debug/warn log helps diagnose leaks if close fails in either finally block.🔧 Suggested tweak
- try: - context.api_client.close() - except Exception: - pass + try: + context.api_client.close() + except Exception as e: + logger.debug("api_client.close() failed during callback cleanup: %s", e) @@ - try: - api_client.close() - except Exception: - pass + try: + api_client.close() + except Exception as e: + logger.debug("api_client.close() failed during API callback cleanup: %s", e)Also applies to: 1781-1784
…afe counter - Log warning instead of silently swallowing exceptions in reset_singleton() - Add threading.Lock around task counter increment for thread safety with threads/gevent/eventlet pools Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@workers/shared/api/internal_client.py`:
- Around line 258-296: reset_singleton() can close cls._shared_session while
other threads are mid-request; change to a "swap-then-close" approach: inside
reset_singleton() grab a session lock (use existing _task_counter_lock or add a
dedicated _session_lock), atomically replace cls._shared_session with a new
requests.Session() (or None if you want lazy re-init) and store the old session
in a local variable, then spawn a short-lived background worker (thread or
timer) that waits a configurable grace period and then closes the old session
(catching exceptions); update increment_task_counter to use the same session
lock when reading/swapping to avoid races and ensure any code that reads
_shared_session uses that lock or reads a local reference so in-flight requests
continue using the old session until it is closed after the grace period.
🧹 Nitpick comments (1)
workers/shared/api/internal_client.py (1)
280-296:WorkerConfig()is instantiated inside the lock on every task completion.Line 285 creates a new
WorkerConfig(parsing env vars) while holding_task_counter_lock. This extends the critical section unnecessarily and allocates an object per task. Consider reading the threshold once (e.g., as a class-level cached value or outside the lock).Also,
cls._task_counter = 0on line 295 is redundant sincereset_singleton()(line 269) already resets it.♻️ Proposed refactor: read config outside the lock, remove redundant reset
`@classmethod` def increment_task_counter(cls) -> None: - with cls._task_counter_lock: - cls._task_counter += 1 - - from shared.infrastructure.config.worker_config import WorkerConfig + from shared.infrastructure.config.worker_config import WorkerConfig - threshold = WorkerConfig().singleton_reset_task_threshold - if threshold > 0 and cls._task_counter >= threshold: - import time + threshold = WorkerConfig().singleton_reset_task_threshold + with cls._task_counter_lock: + cls._task_counter += 1 + if threshold > 0 and cls._task_counter >= threshold: + import time - logger.info( - "Task counter reached threshold (%d/%d), resetting singleton session", - cls._task_counter, - threshold, - ) - cls.reset_singleton() - cls._task_counter = 0 - cls._last_reset_time = time.time() + logger.info( + "Task counter reached threshold (%d/%d), resetting singleton session", + cls._task_counter, + threshold, + ) + cls.reset_singleton() + cls._last_reset_time = time.time()
… document thread-safety - Move WorkerConfig() instantiation outside lock in increment_task_counter() - Remove redundant _task_counter=0 (already done inside reset_singleton) - Document thread-safety caveat in reset_singleton() docstring - Log close failures in task cleanup instead of silently swallowing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workers/callback/tasks.py (1)
1380-1389:⚠️ Potential issue | 🟡 Minor
api_clientcreated inside_extract_callback_parametersis not covered by thetry/finallyif extraction fails mid-way.If
_extract_callback_parametersraises aftercreate_api_client()(line 702 or 772 in the extraction function) but before assigning tocontext.api_client(line 846), the newly created client becomes an orphan local variable. Thetry/finallyat lines 1389/1507 won't run because the exception occurs before entering that block.The
__del__destructor (FR-1) serves as the safety net here, which is the design intent. Just flagging for awareness — if deterministic cleanup is desired, thetry/finallycould be widened to wrap_extract_callback_parametersas well.
🤖 Fix all issues with AI agents
In `@workers/shared/tests/test_session_lifecycle.py`:
- Around line 371-396: Replace the inline simulated guard in the tests with
calls to the real signal handler on_task_postrun (imported from workers.worker)
and mock InternalAPIClient.increment_task_counter so the handler's guard,
try/except, and logging paths are exercised; for the singleton-disabled test
call on_task_postrun(sender=None, task_id=None, **{}) and assert
increment_task_counter was not called, and for the singleton-enabled test patch
the same method and call on_task_postrun then assert increment_task_counter was
called once, ensuring the patch target matches the import path used inside
on_task_postrun.
🧹 Nitpick comments (5)
workers/shared/api/internal_client.py (2)
125-179: Singleton initialization creates and immediately discards 7 sessions.Each specialized client's
__init__(viaBaseAPIClient.__init__) creates a freshrequests.Sessionwith mountedHTTPAdapter, which_share_sessionimmediately closes and replaces. For 7 specialized clients, that's 7 throwaway sessions perInternalAPIClientinstantiation.This isn't a bug — the sessions are properly closed — but it's wasteful, especially if
InternalAPIClientis instantiated frequently (e.g., per-task in non-singleton mode). Consider passing an existing session into the specialized client constructors to avoid the create-then-close pattern.
286-300:WorkerConfig()instantiated on every task completion.
increment_task_counteris called viatask_postrunsignal after every task. Each call constructs a newWorkerConfig(), which reads environment variables. While this keeps the threshold dynamically reconfigurable, it adds overhead on every task completion.If env-var reading becomes a concern at scale, consider caching the threshold at the class level and only refreshing it on reset.
workers/shared/tests/test_session_lifecycle.py (1)
322-348:mock_config_singletonfixtures are required for env setup — Ruff ARG002 is a false positive.The
mock_config_singletonparameter intest_increment_counterandtest_threshold_triggers_resetisn't directly referenced in the test body, but it's needed because the fixture patchesos.environwithWORKER_SINGLETON_RESET_THRESHOLD=3andENABLE_API_CLIENT_SINGLETON=true. Without it,WorkerConfig()insideincrement_task_counterwould read unpatched env vars.To silence the Ruff warning while keeping the intent clear, you could prefix with underscore:
Suggested fix
- def test_increment_counter(self, mock_config_singleton): + def test_increment_counter(self, mock_config_singleton): # noqa: ARG002Or rename the parameter:
- def test_increment_counter(self, mock_config_singleton): + def test_increment_counter(self, _mock_config_singleton):workers/callback/tasks.py (2)
1484-1503: Uselogger.exceptionfor better tracebacks in error handlers.At lines 1485-1487 and 1503,
logger.error(...)is used to log exceptions, butlogger.exception(...)would automatically include the traceback, which is more useful for debugging callback failures.Suggested fix
except Exception as e: - logger.error( - f"Unified batch callback processing failed for execution {context.execution_id}: {e}" + logger.exception( + f"Unified batch callback processing failed for execution {context.execution_id}: {e}" )except Exception as cleanup_error: - logger.error(f"Failed to mark execution as failed: {cleanup_error}") + logger.exception(f"Failed to mark execution as failed: {cleanup_error}")
1757-1777: Samelogger.error→logger.exceptionopportunity in API callback error path.Lines 1758-1760 and 1777 use
logger.errorwherelogger.exceptionwould capture the full traceback for easier debugging.Suggested fix
except Exception as e: - logger.error( + logger.exception( f"API callback processing failed for execution {execution_id}: {e}" )except Exception as update_error: - logger.error(f"Failed to update execution status: {update_error}") + logger.exception(f"Failed to update execution status: {update_error}")
…strun handler Tests now call the real worker.on_task_postrun() signal handler instead of simulating the guard logic inline, catching divergence if the handler's guard, try/except, or import path changes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@workers/shared/tests/test_session_lifecycle.py`:
- Around line 243-263: test_default_pool_size uses patch.dict(..., clear=False)
so an externally-set API_CLIENT_POOL_SIZE can leak into the test; ensure the
test removes any inherited value before instantiating WorkerConfig by
deleting/unsetting API_CLIENT_POOL_SIZE in the patched context (so
WorkerConfig().api_client_pool_size is forced to use the hardcoded default).
Locate test_default_pool_size and adjust the patched block to explicitly remove
API_CLIENT_POOL_SIZE from os.environ (e.g., pop if present) before creating
WorkerConfig and asserting api_client_pool_size == 10.
🧹 Nitpick comments (3)
workers/shared/tests/test_session_lifecycle.py (3)
322-348: Prefix unused fixture parameters with_to suppress Ruff ARG002.
mock_config_singletonis correctly used for its env-patching side effect, but Ruff flags it as unused. Prefixing with_is the idiomatic pytest convention for fixtures consumed only for side effects.- def test_increment_counter(self, mock_config_singleton): + def test_increment_counter(self, _mock_config_singleton):- def test_threshold_triggers_reset(self, mock_config_singleton): + def test_threshold_triggers_reset(self, _mock_config_singleton):Alternatively, apply
@pytest.mark.usefixtures("mock_config_singleton")at the class or method level to avoid the parameter entirely.
415-424: Extract the repeated sub-client attribute list into a constant.The same 8-element list appears three times in this class. If a sub-client is added or renamed in
InternalAPIClient, only some lists may get updated, causing silent test gaps.Suggested refactor
Define once at module or class level:
_SUB_CLIENT_ATTRS = [ "base_client", "execution_client", "file_client", "webhook_client", "organization_client", "tool_client", "workflow_client", "usage_client", ]Then reference
_SUB_CLIENT_ATTRSin all three test methods.Also applies to: 466-475, 488-497
523-526: Prefix unused unpacked variables with_to suppress Ruff RUF059.The unpacked values aren't needed in these cleanup-focused tests.
- ) as (cfg, client): + ) as (_cfg, _client):Also applies to: 541-545



What
API_CLIENT_POOL_SIZEinto HTTPAdapter connection pools_owns_sessionflag to prevent singleton shared sessions from being closed by individual clientsWhy
API_CLIENT_POOL_SIZEconfig existed but was never wired into HTTPAdapter (dead config)on_task_postrunsignal handler ran uselessly on every task when singleton mode was disabledHow
base_client.py: Added_owns_sessionflag, idempotentclose(),__del__destructor, wired pool size into HTTPAdapterinternal_client.py: Set_owns_session=Falseon all clients sharing singleton sessionapi-deployment/tasks.py: Added try/finally withapi_client.close()for missing cleanupcallback/tasks.py: try/finally cleanup in callback task functionsworker.py: Early-return guard inon_task_postrunwhen singleton disabled;on_worker_process_shutdownhookworker_config.py: Default pool size 10, singleton reset threshold configCan this PR break any existing features?
ENABLE_API_CLIENT_SINGLETON=falseremains the default. Pool size default stays at 10.Database Migrations
Env Config
API_CLIENT_POOL_SIZE— now actually wired in (default: 10, unchanged)WORKER_SINGLETON_RESET_THRESHOLD— already documented in sample.env (default: 1000)ENABLE_API_CLIENT_SINGLETON— existing, unchanged (default: false)Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
cd workers && PYTHONPATH=.:../unstract .venv/bin/python -m pytest shared/tests/ -v🤖 Generated with Claude Code