diff --git a/tests/actor/code_act/test_execute_code_output.py b/tests/actor/code_act/test_execute_code_output.py index 5090555a0..81397655b 100644 --- a/tests/actor/code_act/test_execute_code_output.py +++ b/tests/actor/code_act/test_execute_code_output.py @@ -134,6 +134,63 @@ def get_result(out: Any) -> Any: return get_output_field(out, "result", None) +# --------------------------------------------------------------------------- +# Test: Runtime OAuth token helper exposed to real actor execute_code +# --------------------------------------------------------------------------- + + +class _FakeOAuthSecretManager: + def __init__(self) -> None: + self.calls: list[Any] = [] + self.secrets = { + "MICROSOFT_ACCESS_TOKEN": "microsoft:fresh-token", + "MICROSOFT_TOKEN_EXPIRES_AT": "2999-01-01T00:00:00+00:00", + "GOOGLE_ACCESS_TOKEN": "google:fresh-token", + "GOOGLE_TOKEN_EXPIRES_AT": "2999-01-01T00:00:00+00:00", + } + + def sync_assistant_secrets_if_stale(self, **kwargs: Any) -> bool: + self.calls.append(("sync", kwargs)) + return True + + def _get_secret_value(self, name: str) -> str | None: + self.calls.append(("secret", name)) + return self.secrets.get(name) + + +@pytest.mark.asyncio +async def test_execute_code_oauth_helper_uses_parent_secret_manager( + execute_code_tool: tuple[Any, Primitives], + monkeypatch: pytest.MonkeyPatch, +) -> None: + execute_code, _ = execute_code_tool + fake_secret_manager = _FakeOAuthSecretManager() + monkeypatch.setattr( + ManagerRegistry, + "get_secret_manager", + lambda: fake_secret_manager, + ) + + out = await execute_code( + "mock scenario: call rotating OAuth token helper for multiple providers", + """ +microsoft_token = get_oauth_access_token("microsoft", min_ttl_seconds=123) +google_token = get_oauth_access_token("google", min_ttl_seconds=456) +assert microsoft_token == "microsoft:fresh-token" +assert google_token == "google:fresh-token" +print("TOKEN_OK") +""", + language="python", + state_mode="stateless", + ) + + assert get_error(out) is None + assert "TOKEN_OK" in get_stdout_text(out) + assert ("secret", "MICROSOFT_ACCESS_TOKEN") in fake_secret_manager.calls + assert ("secret", "GOOGLE_ACCESS_TOKEN") in fake_secret_manager.calls + assert any(call[0] == "sync" for call in fake_secret_manager.calls) + + # --------------------------------------------------------------------------- # Test: Basic stdout capture from primitives.*.ask().result() # --------------------------------------------------------------------------- diff --git a/tests/actor/code_act/test_prompt_builders.py b/tests/actor/code_act/test_prompt_builders.py index da67a54a5..13401f9bb 100644 --- a/tests/actor/code_act/test_prompt_builders.py +++ b/tests/actor/code_act/test_prompt_builders.py @@ -111,6 +111,26 @@ def test_code_act_prompt_includes_diverse_examples_sessions_computer_primitives_ assert "execute_function vs execute_code decision" in prompt +@pytest.mark.timeout(30) +def test_code_act_prompt_teaches_refresh_token_oauth_helper(): + actor = CodeActActor() + prompt = build_code_act_prompt( + environments={}, + tools=dict(actor.get_tools("act")), + ) + + assert "def reason(" in prompt + assert ( + "def get_oauth_access_token(provider: str, *, " + "min_ttl_seconds: int = 300) -> str" + ) in prompt + assert 'get_oauth_access_token("microsoft")' in prompt + assert 'get_oauth_access_token("google")' in prompt + assert "refresh-token backed OAuth" in prompt + assert "Do not print, log, return, or store the token value." in prompt + assert "provider sdk/default credential behavior" in prompt.lower() + + @pytest.mark.timeout(30) def test_code_act_prompt_includes_comms_namespace_and_docstrings(): from unity.actor.environments.state_managers import StateManagerEnvironment diff --git a/tests/common/test_runtime_oauth.py b/tests/common/test_runtime_oauth.py new file mode 100644 index 000000000..998452275 --- /dev/null +++ b/tests/common/test_runtime_oauth.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Any + +import pytest + +from unity.common import runtime_oauth + + +def _future_expiry() -> str: + return (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat() + + +def _past_expiry() -> str: + return (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat() + + +class _FakeSecretManager: + def __init__(self, secrets: dict[str, str]) -> None: + self.secrets = secrets + self.sync_calls: list[dict[str, Any]] = [] + self.on_sync = None + + def _get_secret_value(self, name: str) -> str | None: + return self.secrets.get(name) + + def sync_assistant_secrets_if_stale(self, **kwargs: Any) -> bool: + self.sync_calls.append(kwargs) + if self.on_sync is not None: + self.on_sync() + return True + + +def _install_secret_manager(monkeypatch, sm: _FakeSecretManager) -> None: + monkeypatch.setattr(runtime_oauth, "_get_secret_manager", lambda: sm) + + +def test_get_oauth_access_token_supports_provider_alias(monkeypatch): + sm = _FakeSecretManager( + { + "MICROSOFT_ACCESS_TOKEN": "fresh-ms-token", + "MICROSOFT_TOKEN_EXPIRES_AT": _future_expiry(), + }, + ) + _install_secret_manager(monkeypatch, sm) + + assert runtime_oauth.get_oauth_access_token("graph") == "fresh-ms-token" + assert sm.sync_calls[-1]["force"] is False + + +def test_get_oauth_access_token_unknown_provider_is_non_secret_error(monkeypatch): + sm = _FakeSecretManager({}) + _install_secret_manager(monkeypatch, sm) + + with pytest.raises(ValueError, match="Unknown refresh-token OAuth provider"): + runtime_oauth.get_oauth_access_token("not-a-real-provider") + + +def test_get_oauth_access_token_missing_token_raises_after_sync(monkeypatch): + sm = _FakeSecretManager({"MICROSOFT_TOKEN_EXPIRES_AT": _future_expiry()}) + _install_secret_manager(monkeypatch, sm) + + with pytest.raises(ValueError, match="No access token is available"): + runtime_oauth.get_oauth_access_token("microsoft") + + assert sm.sync_calls[-1]["force"] is True + + +def test_get_oauth_access_token_forces_sync_when_token_is_expired(monkeypatch): + sm = _FakeSecretManager( + { + "MICROSOFT_ACCESS_TOKEN": "old-ms-token", + "MICROSOFT_TOKEN_EXPIRES_AT": _past_expiry(), + }, + ) + _install_secret_manager(monkeypatch, sm) + + def refresh_token() -> None: + sm.secrets["MICROSOFT_ACCESS_TOKEN"] = "fresh-ms-token" + sm.secrets["MICROSOFT_TOKEN_EXPIRES_AT"] = _future_expiry() + + sm.on_sync = refresh_token + + assert runtime_oauth.get_oauth_access_token("microsoft") == "fresh-ms-token" + assert sm.sync_calls[-1]["force"] is True + + +def test_get_oauth_access_token_forces_sync_when_expiry_is_invalid(monkeypatch): + sm = _FakeSecretManager( + { + "GOOGLE_ACCESS_TOKEN": "old-google-token", + "GOOGLE_TOKEN_EXPIRES_AT": "not-a-date", + }, + ) + _install_secret_manager(monkeypatch, sm) + + def refresh_token() -> None: + sm.secrets["GOOGLE_ACCESS_TOKEN"] = "fresh-google-token" + sm.secrets["GOOGLE_TOKEN_EXPIRES_AT"] = _future_expiry() + + sm.on_sync = refresh_token + + assert runtime_oauth.get_oauth_access_token("google") == "fresh-google-token" + assert sm.sync_calls[-1]["force"] is True + + +def test_get_oauth_access_token_supports_multiple_providers(monkeypatch): + sm = _FakeSecretManager( + { + "MICROSOFT_ACCESS_TOKEN": "fresh-ms-token", + "MICROSOFT_TOKEN_EXPIRES_AT": _future_expiry(), + "GOOGLE_ACCESS_TOKEN": "fresh-google-token", + "GOOGLE_TOKEN_EXPIRES_AT": _future_expiry(), + }, + ) + _install_secret_manager(monkeypatch, sm) + + assert runtime_oauth.get_oauth_access_token("microsoft") == "fresh-ms-token" + assert runtime_oauth.get_oauth_access_token("google") == "fresh-google-token" + + +def test_get_refresh_token_oauth_env_overlay_returns_all_current_values(monkeypatch): + sm = _FakeSecretManager( + { + "MICROSOFT_ACCESS_TOKEN": "fresh-ms-token", + "MICROSOFT_TOKEN_EXPIRES_AT": _future_expiry(), + "GOOGLE_ACCESS_TOKEN": "fresh-google-token", + "GOOGLE_TOKEN_EXPIRES_AT": _future_expiry(), + }, + ) + _install_secret_manager(monkeypatch, sm) + + overlay = runtime_oauth.get_refresh_token_oauth_env_overlay() + + assert overlay["MICROSOFT_ACCESS_TOKEN"] == "fresh-ms-token" + assert overlay["GOOGLE_ACCESS_TOKEN"] == "fresh-google-token" + assert sm.sync_calls[-1]["reason"] == "oauth_env_overlay" diff --git a/tests/function_manager/test_runtime_oauth_bridge.py b/tests/function_manager/test_runtime_oauth_bridge.py new file mode 100644 index 000000000..7570f31cd --- /dev/null +++ b/tests/function_manager/test_runtime_oauth_bridge.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +import pytest + +from unity.function_manager.function_manager import FunctionManager +from unity.function_manager import function_manager as function_manager_module + + +@dataclass +class _FakeSecretManager: + sync_reasons: list[str] = field(default_factory=list) + + def _get_secret_value(self, name: str) -> str | None: + values = { + "GOOGLE_ACCESS_TOKEN": "google-token", + "GOOGLE_TOKEN_EXPIRES_AT": "2999-01-01T00:00:00+00:00", + } + return values.get(name) + + def sync_assistant_secrets_if_stale(self, **kwargs) -> bool: + self.sync_reasons.append(kwargs["reason"]) + return True + + +@pytest.mark.asyncio +async def test_shell_runtime_oauth_token_helper_uses_parent_rpc(monkeypatch): + fake_secret_manager = _FakeSecretManager() + monkeypatch.setattr( + function_manager_module.ManagerRegistry, + "get_secret_manager", + lambda: fake_secret_manager, + ) + + fm = object.__new__(FunctionManager) + result = await fm.execute_shell_script( + implementation=( + "#!/bin/sh\n" + "token=$(unity-primitive runtime get_oauth_access_token " + "--provider google --min_ttl_seconds 42)\n" + 'if [ "$token" = \'"google-token"\' ]; then echo "TOKEN_OK"; fi\n' + ), + language="sh", + ) + + assert result["error"] is None + assert result["result"] == 0 + assert "TOKEN_OK" in result["stdout"] + assert fake_secret_manager.sync_reasons == ["oauth_access_token:google"] + + +def test_runtime_oauth_env_overlay_routes_through_runtime_helper(monkeypatch): + from unity.common import runtime_oauth + + monkeypatch.setattr( + runtime_oauth, + "get_refresh_token_oauth_env_overlay", + lambda: {"GOOGLE_ACCESS_TOKEN": "fresh-google-token"}, + ) + + fm = object.__new__(FunctionManager) + + assert fm._get_runtime_oauth_env_overlay() == { + "GOOGLE_ACCESS_TOKEN": "fresh-google-token", + } + + +def test_venv_runtime_oauth_helper_uses_parent_rpc(monkeypatch): + from unity.function_manager import venv_runner + + calls = [] + + def fake_rpc_call_sync(path, kwargs): + calls.append((path, kwargs)) + return "fresh-ms-token" + + monkeypatch.setattr(venv_runner, "rpc_call_sync", fake_rpc_call_sync) + + assert ( + venv_runner.get_oauth_access_token("microsoft", min_ttl_seconds=12) + == "fresh-ms-token" + ) + assert calls == [ + ( + "runtime.get_oauth_access_token", + {"provider": "microsoft", "min_ttl_seconds": 12}, + ), + ] diff --git a/tests/secret_manager/test_oauth_tokens.py b/tests/secret_manager/test_oauth_tokens.py new file mode 100644 index 000000000..0370f9cf6 --- /dev/null +++ b/tests/secret_manager/test_oauth_tokens.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import os +from threading import Lock + +from unity.secret_manager.secret_manager import SecretManager + + +def _unit_secret_manager() -> SecretManager: + sm = object.__new__(SecretManager) + sm._assistant_secret_sync_lock = Lock() + sm._last_assistant_secret_sync_success_at = None + sm._last_assistant_secret_sync_failure_at = None + return sm + + +def test_sync_assistant_secrets_if_stale_debounces(monkeypatch): + sm = _unit_secret_manager() + calls: list[str] = [] + + monkeypatch.setattr( + sm, + "_sync_assistant_secrets", + lambda: calls.append("assistant"), + ) + monkeypatch.setattr(sm, "_sync_dotenv", lambda: calls.append("dotenv")) + + assert sm.sync_assistant_secrets_if_stale(reason="test") is True + assert sm.sync_assistant_secrets_if_stale(reason="test") is False + + assert calls == ["assistant", "dotenv"] + + +def test_sync_assistant_secrets_if_stale_force_bypasses_debounce(monkeypatch): + sm = _unit_secret_manager() + calls: list[str] = [] + + monkeypatch.setattr( + sm, + "_sync_assistant_secrets", + lambda: calls.append("assistant"), + ) + monkeypatch.setattr(sm, "_sync_dotenv", lambda: calls.append("dotenv")) + + assert sm.sync_assistant_secrets_if_stale(reason="test") is True + assert sm.sync_assistant_secrets_if_stale(reason="test", force=True) is True + + assert calls == ["assistant", "dotenv", "assistant", "dotenv"] + + +def test_sync_assistant_secrets_if_stale_observes_failure_cooldown(monkeypatch): + sm = _unit_secret_manager() + calls = {"assistant": 0, "dotenv": 0} + + def fail_sync(): + calls["assistant"] += 1 + raise RuntimeError("sync failed") + + def sync_dotenv(): + calls["dotenv"] += 1 + + monkeypatch.setattr(sm, "_sync_assistant_secrets", fail_sync) + monkeypatch.setattr(sm, "_sync_dotenv", sync_dotenv) + + assert sm.sync_assistant_secrets_if_stale(reason="test") is False + assert sm.sync_assistant_secrets_if_stale(reason="test") is False + assert calls == {"assistant": 1, "dotenv": 0} + + +def test_resolve_secret_allowlist_includes_runtime_oauth_secret_names(): + allowlist = SecretManager._resolve_secret_allowlist() + + assert "MICROSOFT_ACCESS_TOKEN" in allowlist + assert "MICROSOFT_TOKEN_EXPIRES_AT" in allowlist + assert "GOOGLE_ACCESS_TOKEN" in allowlist + assert "GOOGLE_TOKEN_EXPIRES_AT" in allowlist + + +def test_env_merge_and_write_updates_dotenv_and_process_env(monkeypatch, tmp_path): + sm = _unit_secret_manager() + dotenv_path = tmp_path / ".env" + + monkeypatch.setattr(sm, "_dotenv_path", lambda: str(dotenv_path)) + monkeypatch.delenv("MICROSOFT_ACCESS_TOKEN", raising=False) + + sm._env_merge_and_write( + add_or_update={"MICROSOFT_ACCESS_TOKEN": "fresh-token"}, + remove_keys=None, + ) + + assert dotenv_path.read_text() == "MICROSOFT_ACCESS_TOKEN=fresh-token\n" + assert os.environ["MICROSOFT_ACCESS_TOKEN"] == "fresh-token" diff --git a/unity/actor/code_act_actor.py b/unity/actor/code_act_actor.py index 5938dbf79..b4a32b778 100644 --- a/unity/actor/code_act_actor.py +++ b/unity/actor/code_act_actor.py @@ -2025,6 +2025,15 @@ async def execute_code( - **session_created**: True if a new session was created by this call. - **duration_ms**: Execution duration in milliseconds. + Runtime credential helpers + -------------------------- + Python execution globals include + ``get_oauth_access_token(provider)`` for refresh-token backed OAuth + providers when a provider SDK, client, or direct HTTP request needs + an explicit access token. Static API keys and provider SDKs that + read credentials from the environment may still use ``os.environ`` + after checking available secret names. + For in-process Python execution with rich output, the result is wrapped in an ExecutionResult object (a Pydantic model implementing FormattedToolResult). """ @@ -2088,6 +2097,23 @@ async def _pub_safe(**payload: Any) -> None: notification_q = _notification_up_q sandbox_id = None try: + try: + from unity.manager_registry import ManagerRegistry + + # Keep generated code's normal environment-based credential + # path fresh at the execution boundary. The SecretManager + # gate is debounced, so repeated execute_code calls only pay + # a cheap timestamp check within the TTL window. + ManagerRegistry.get_secret_manager().sync_assistant_secrets_if_stale( + ttl_seconds=60.0, + reason="execute_code", + ) + except Exception: + logger.warning( + "execute_code assistant secret sync failed", + exc_info=True, + ) + _rs = self._resolve_session( state_mode=state_mode, language=str(language), diff --git a/unity/actor/execution/session.py b/unity/actor/execution/session.py index 1210ea137..ca9cecb19 100644 --- a/unity/actor/execution/session.py +++ b/unity/actor/execution/session.py @@ -11,6 +11,8 @@ import ast import contextvars import logging +import json +import shlex import sys import traceback import types @@ -40,6 +42,25 @@ logger = logging.getLogger(__name__) +def _with_shell_env_overlay( + command: str, + env_overlay: dict[str, str], + *, + language: str, +) -> str: + if not env_overlay: + return command + if language == "powershell": + assignments = "\n".join( + f"$env:{key} = {json.dumps(value)}" for key, value in env_overlay.items() + ) + else: + assignments = "\n".join( + f"export {key}={shlex.quote(value)}" for key, value in env_overlay.items() + ) + return f"{assignments}\n{command}" + + # --------------------------------------------------------------------------- # Type aliases # --------------------------------------------------------------------------- @@ -780,6 +801,22 @@ def _se_ms(): if computer_primitives is None: computer_primitives = self._computer_primitives + def _runtime_oauth_env_overlay() -> dict[str, str]: + # The parent execute_code boundary already performs the generic + # debounced secret sync. This overlay is the subprocess-specific + # bridge: venv and shell sessions may be long-lived, so they need + # current rotating OAuth env vars injected for each execution. + if self._function_manager is None: + return {} + getter = getattr( + self._function_manager, + "_get_runtime_oauth_env_overlay", + None, + ) + if getter is None: + return {} + return getter() + async def _execute_in_python_session( sb: PythonExecutionSession, ) -> Dict[str, Any]: @@ -881,6 +918,10 @@ async def _execute_in_python_session( # Wrap arbitrary code in a function definition so venv_runner can execute it. implementation = _wrap_code_as_async_function(code) if state_mode == "stateful": + # Persistent venv workers keep their process environment + # across calls. Pass the OAuth overlay so SDK/default-env + # credential paths see fresh access tokens without the actor + # manually exporting anything. out = await self._venv_pool.execute_in_venv( venv_id=int(venv_id), implementation=implementation, @@ -891,6 +932,7 @@ async def _execute_in_python_session( computer_primitives=computer_primitives, function_manager=self._function_manager, timeout=self._timeout, + env_overlay=_runtime_oauth_env_overlay(), ) return { **out, @@ -916,6 +958,9 @@ async def _execute_in_python_session( session_id=int(session_id), timeout=10.0, ) + # Read-only venv execution runs in a one-shot subprocess + # seeded from persistent state, but still receives the same + # runtime OAuth overlay before code executes. out = await self._function_manager.execute_in_venv( venv_id=int(venv_id), implementation=implementation, @@ -924,6 +969,7 @@ async def _execute_in_python_session( initial_state=initial_state, primitives=primitives, computer_primitives=computer_primitives, + env_overlay=_runtime_oauth_env_overlay(), ) return { **out, @@ -1047,11 +1093,21 @@ async def _execute_in_python_session( language=language, # type: ignore[arg-type] session_id=int(session_id), ) + # Shells are especially prone to stale env because exports persist + # inside the session. We both pass an env overlay to the pool and + # prepend explicit assignments to the command so the current command + # and future commands in the same shell agree on the refreshed token + # values. res = await self._shell_pool.execute( language=language, # type: ignore[arg-type] - command=code, + command=_with_shell_env_overlay( + code, + _runtime_oauth_env_overlay(), + language=str(language), + ), session_id=int(session_id), timeout=self._timeout, + env=_runtime_oauth_env_overlay(), ) return { "stdout": res.stdout, diff --git a/unity/actor/prompt_builders.py b/unity/actor/prompt_builders.py index 19e5ab627..a6d59f121 100644 --- a/unity/actor/prompt_builders.py +++ b/unity/actor/prompt_builders.py @@ -482,15 +482,29 @@ Cloud, `slack-sdk` for Slack, `boto3` for AWS, `stripe` for Stripe). 3. **Integrate**: Write Python code that uses the SDK with the stored - credentials to interact with the service. Credentials are synced to - environment variables via the `.env` file managed by SecretManager — - use `os.environ` to access them after confirming their names via - `primitives.secrets.ask(...)`. + credentials to interact with the service. Static credentials and + non-rotating API keys are synced to environment variables via the `.env` + file managed by SecretManager; use `os.environ` for those after + confirming their names via `primitives.secrets.ask(...)`. For provider + SDKs that can read OAuth credentials from environment variables, prefer + the SDK's normal/default credential behavior. When a provider SDK, + client, or direct HTTP request requires an explicit refresh-token backed + OAuth access token, use the sandbox helper + `get_oauth_access_token(provider)` instead of reading access-token env + vars directly. + + ```python + microsoft_token = get_oauth_access_token("microsoft") + google_token = get_oauth_access_token("google") + ``` 4. **Store for reuse**: After a successful integration, store reusable functions via `store_skills` and document the setup via `GuidanceManager_add_guidance` so future interactions can reuse the - integration without rediscovery. + integration without rediscovery. Reusable OAuth integrations should + call `get_oauth_access_token(provider)` at runtime only when an explicit + token is required; never store or capture a concrete access-token value + inside a function implementation. **Prefer Python SDKs over CLI tools.** Python packages benefit from full environment management (isolated venvs, dependency resolution via @@ -500,11 +514,11 @@ #### Checking OAuth Scope Before API Calls - Before making Google or Microsoft API calls that rely on - platform-managed OAuth tokens, check whether the scope you need - has been granted. `GOOGLE_GRANTED_SCOPES` and - `MICROSOFT_GRANTED_SCOPES` hold space-separated raw OAuth scope - strings — not feature names. Examples of what you will see: + Before making API calls that rely on platform-managed OAuth tokens, + check whether the scope you need has been granted when the provider has + a granted-scopes secret. For the built-in providers, `GOOGLE_GRANTED_SCOPES` + and `MICROSOFT_GRANTED_SCOPES` hold space-separated raw OAuth scope + strings — not feature names. Examples of what you will see: - Google: full URLs such as `https://www.googleapis.com/auth/drive` and @@ -833,8 +847,10 @@ def build_code_act_prompt( parts.append(_EXECUTION_RULES) parts.append(_SEMANTIC_REASONING_SELECTION) from unity.common.reasoning import get_reasoning_prompt_context + from unity.common.runtime_oauth import get_oauth_prompt_context parts.append(get_reasoning_prompt_context()) + parts.append(get_oauth_prompt_context()) parts.append(_INCREMENTAL_EXECUTION) parts.append(_EXTERNAL_APP_INTEGRATION) diff --git a/unity/common/runtime_oauth.py b/unity/common/runtime_oauth.py new file mode 100644 index 000000000..24b70ae57 --- /dev/null +++ b/unity/common/runtime_oauth.py @@ -0,0 +1,250 @@ +"""Runtime helpers for refresh-token backed OAuth credentials. + +SecretManager owns storage and synchronization: it mirrors allowlisted assistant +secrets from Orchestra into the local ``Secrets`` context, ``.env``, and +``os.environ``. This module owns the runtime interpretation of those mirrored +values: provider aliases, access-token/expiry secret names, freshness checks, +and the sandbox helper exposed to actor-written Python. + +The split is deliberate. ``get_oauth_access_token(...)`` is not a +``primitives.secrets`` tool and does not expose arbitrary secrets; it behaves +like ``reason(...)``/``notify(...)`` as a Python runtime helper for code paths +that must pass an explicit OAuth access token to an SDK/client/request. Code +that can rely on provider SDK/default environment credential behavior should +continue to do so; env overlays below keep rotating OAuth env vars fresh for +venv and shell backends. +""" + +import inspect +import os +from dataclasses import dataclass +from datetime import datetime, timezone + + +@dataclass(frozen=True) +class OAuthProviderMetadata: + """Runtime metadata for a refresh-token backed OAuth provider.""" + + canonical_name: str + aliases: tuple[str, ...] + access_token_secret: str + refresh_token_secret: str | None = None + expiry_secret: str | None = None + granted_scopes_secret: str | None = None + docs_label: str = "" + + @property + def secret_names(self) -> frozenset[str]: + return frozenset( + name + for name in ( + self.access_token_secret, + self.refresh_token_secret, + self.expiry_secret, + self.granted_scopes_secret, + ) + if name + ) + + +_OAUTH_PROVIDER_METADATA: dict[str, OAuthProviderMetadata] = { + "google": OAuthProviderMetadata( + canonical_name="google", + aliases=("google", "gmail", "google_workspace", "drive"), + access_token_secret="GOOGLE_ACCESS_TOKEN", + refresh_token_secret="GOOGLE_REFRESH_TOKEN", + expiry_secret="GOOGLE_TOKEN_EXPIRES_AT", + granted_scopes_secret="GOOGLE_GRANTED_SCOPES", + docs_label="Google APIs", + ), + "microsoft": OAuthProviderMetadata( + canonical_name="microsoft", + aliases=("microsoft", "msft", "ms365", "microsoft_365", "graph"), + access_token_secret="MICROSOFT_ACCESS_TOKEN", + refresh_token_secret="MICROSOFT_REFRESH_TOKEN", + expiry_secret="MICROSOFT_TOKEN_EXPIRES_AT", + granted_scopes_secret="MICROSOFT_GRANTED_SCOPES", + docs_label="Microsoft Graph", + ), +} +_OAUTH_PROVIDER_ALIASES: dict[str, str] = { + alias.strip().lower().replace("-", "_"): metadata.canonical_name + for metadata in _OAUTH_PROVIDER_METADATA.values() + for alias in metadata.aliases +} + + +def _resolve_oauth_provider(provider: str) -> OAuthProviderMetadata: + if not isinstance(provider, str) or not provider.strip(): + supported = ", ".join(sorted(_OAUTH_PROVIDER_METADATA)) + raise ValueError( + "A refresh-token OAuth provider name is required. " + f"Supported providers: {supported}", + ) + normalized = provider.strip().lower().replace("-", "_") + canonical = _OAUTH_PROVIDER_ALIASES.get(normalized, normalized) + metadata = _OAUTH_PROVIDER_METADATA.get(canonical) + if metadata is None: + supported = ", ".join(sorted(_OAUTH_PROVIDER_METADATA)) + raise ValueError( + f"Unknown refresh-token OAuth provider {provider!r}. " + f"Supported providers: {supported}", + ) + return metadata + + +def refresh_token_oauth_secret_names() -> frozenset[str]: + names: set[str] = set() + for metadata in _OAUTH_PROVIDER_METADATA.values(): + names.update(metadata.secret_names) + return frozenset(names) + + +def _get_secret_manager(): + from unity.manager_registry import ManagerRegistry + + return ManagerRegistry.get_secret_manager() + + +def _get_secret_value(secret_manager, name: str) -> str | None: + getter = getattr(secret_manager, "_get_secret_value", None) + if callable(getter): + value = getter(name) + if isinstance(value, str) and value: + return value + value = os.environ.get(name) + return value if value else None + + +def _parse_expiry(value: str) -> datetime: + if value.isdigit(): + return datetime.fromtimestamp(int(value), tz=timezone.utc) + normalized = value.replace("Z", "+00:00") + parsed = datetime.fromisoformat(normalized) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def _token_expires_within( + secret_manager, + metadata: OAuthProviderMetadata, + min_ttl_seconds: int, +) -> bool: + if metadata.expiry_secret is None: + return False + expiry_value = _get_secret_value(secret_manager, metadata.expiry_secret) + if not expiry_value: + return True + try: + expiry = _parse_expiry(expiry_value) + except ValueError: + return True + remaining = (expiry - datetime.now(timezone.utc)).total_seconds() + return remaining <= min_ttl_seconds + + +def get_oauth_access_token(provider: str, *, min_ttl_seconds: int = 300) -> str: + """ + Return a current OAuth access token for a refresh-token backed provider. + + Use this runtime helper inside generated Python code when a provider SDK, + client, or direct HTTP request requires an explicit access token. Prefer + provider SDK/default credential behavior when it can read credentials from + the runtime environment directly; Unity keeps rotating OAuth env vars + synced separately for that path. + + Parameters + ---------- + provider: + Provider name or alias. Built-in aliases include ``"microsoft"``, + ``"graph"``, ``"google"``, ``"gmail"``, and ``"drive"``. + min_ttl_seconds: + Minimum acceptable token lifetime. If the current token is missing or + expires within this many seconds, the parent runtime forces an + assistant-secret sync from Orchestra before returning a token. + + Examples + -------- + Multiple providers can be used in one sandbox; request each explicitly:: + + microsoft_token = get_oauth_access_token("microsoft") + google_token = get_oauth_access_token("google") + + For direct OAuth2 HTTP APIs such as Microsoft Graph, provider docs commonly + show the access token in an ``Authorization: Bearer ...`` header. Other SDKs + may require a credential object or may read environment variables directly, + so follow the provider's SDK/API docs for how to apply the token. + + Anti-patterns + ------------- + - Do not print, log, return, or store the token value. + - Do not save concrete token values in FunctionManager functions or + GuidanceManager guidance. + - Do not read rotating OAuth access-token env vars directly when this + helper is available and an explicit access token is required. + """ + metadata = _resolve_oauth_provider(provider) + secret_manager = _get_secret_manager() + token = _get_secret_value(secret_manager, metadata.access_token_secret) + needs_force_sync = token is None or _token_expires_within( + secret_manager, + metadata, + min_ttl_seconds, + ) + secret_manager.sync_assistant_secrets_if_stale( + ttl_seconds=60.0, + force=needs_force_sync, + reason=f"oauth_access_token:{metadata.canonical_name}", + ) + token = _get_secret_value(secret_manager, metadata.access_token_secret) + if not token: + raise ValueError( + f"No access token is available for refresh-token OAuth provider " + f"{metadata.canonical_name!r}.", + ) + if _token_expires_within(secret_manager, metadata, min_ttl_seconds): + raise ValueError( + f"The access token for refresh-token OAuth provider " + f"{metadata.canonical_name!r} is expired or near expiry after sync.", + ) + return token + + +def get_refresh_token_oauth_env_overlay() -> dict[str, str]: + """Return fresh rotating OAuth env vars for subprocess execution backends. + + Venv and persistent shell sessions can outlive the parent process's last + environment update, so they cannot rely solely on the environment copied at + process start. This helper performs the debounced assistant-secret sync, + then returns only the built-in refresh-token OAuth variables that should be + overlaid into those subprocesses before execution. + """ + secret_manager = _get_secret_manager() + secret_manager.sync_assistant_secrets_if_stale( + ttl_seconds=60.0, + reason="oauth_env_overlay", + ) + overlay: dict[str, str] = {} + for name in refresh_token_oauth_secret_names(): + value = _get_secret_value(secret_manager, name) + if value: + overlay[name] = value + return overlay + + +def get_oauth_prompt_context() -> str: + """Return actor-facing documentation for OAuth runtime helpers.""" + doc = inspect.getdoc(get_oauth_access_token) or "" + signature = ( + f"def {get_oauth_access_token.__name__}" + f"{inspect.signature(get_oauth_access_token)}" + ) + return ( + "### OAuth Access Token Helper: `get_oauth_access_token(...)`\n\n" + "`get_oauth_access_token(...)` is available inside `execute_code` " + "Python sessions and stored Python functions. It is a normal sandbox " + "helper, not a JSON tool call.\n\n" + f"```python\n{signature}\n```\n\n" + f"{doc}" + ) diff --git a/unity/conversation_manager/domains/managers_utils.py b/unity/conversation_manager/domains/managers_utils.py index 944f27161..3d6e7b971 100644 --- a/unity/conversation_manager/domains/managers_utils.py +++ b/unity/conversation_manager/domains/managers_utils.py @@ -1211,7 +1211,7 @@ async def sync_assistant_secrets() -> None: from unity.manager_registry import ManagerRegistry sm = ManagerRegistry.get_secret_manager() - sm._sync_assistant_secrets() + sm.sync_assistant_secrets_if_stale(force=True, reason="assistant_update") # Contact updates diff --git a/unity/function_manager/execution_env.py b/unity/function_manager/execution_env.py index 799639282..adde67f17 100644 --- a/unity/function_manager/execution_env.py +++ b/unity/function_manager/execution_env.py @@ -259,11 +259,13 @@ async def my_workflow(goal: str) -> SteerableToolHandle: ) from unity.common.llm_client import new_llm_client from unity.common.reasoning import reason + from unity.common.runtime_oauth import get_oauth_access_token globals_dict["SteerableToolHandle"] = SteerableToolHandle globals_dict["start_async_tool_loop"] = start_async_tool_loop globals_dict["new_llm_client"] = new_llm_client globals_dict["reason"] = reason + globals_dict["get_oauth_access_token"] = get_oauth_access_token globals_dict["unillm"] = unillm return globals_dict diff --git a/unity/function_manager/function_manager.py b/unity/function_manager/function_manager.py index 64868adbf..9c25ce08d 100644 --- a/unity/function_manager/function_manager.py +++ b/unity/function_manager/function_manager.py @@ -414,6 +414,7 @@ async def execute( primitives: Optional[Any] = None, computer_primitives: Optional[Any] = None, timeout: Optional[float] = None, + env_overlay: Optional[Dict[str, str]] = None, ) -> dict: """ Execute a function in the persistent venv subprocess. @@ -446,6 +447,7 @@ async def execute( "implementation": implementation, "call_kwargs": call_kwargs, "is_async": is_async, + "env_overlay": env_overlay or {}, }, ) @@ -507,6 +509,16 @@ async def _handle_rpc_call( result, ), } + if namespace == "runtime" and method == "get_oauth_access_token": + from unity.common.runtime_oauth import get_oauth_access_token + + provider = kwargs.get("provider") + min_ttl_seconds = int(kwargs.get("min_ttl_seconds", 300)) + result = get_oauth_access_token( + provider, + min_ttl_seconds=min_ttl_seconds, + ) + return {"type": "rpc_result", "id": request_id, "result": result} if namespace == "computer" and computer_primitives is not None: fn = getattr(computer_primitives, method, None) elif primitives is not None: @@ -709,6 +721,7 @@ async def execute_in_venv( computer_primitives: Optional[Any] = None, function_manager: "FunctionManager", timeout: Optional[float] = None, + env_overlay: Optional[Dict[str, str]] = None, ) -> dict: """ Execute a function in a persistent venv subprocess. @@ -751,6 +764,7 @@ async def execute_in_venv( primitives=primitives, computer_primitives=computer_primitives, timeout=timeout, + env_overlay=env_overlay, ) # Update last_used best-effort md = self._metadata.get(key) @@ -779,6 +793,7 @@ async def execute_in_venv( primitives=primitives, computer_primitives=computer_primitives, timeout=timeout, + env_overlay=env_overlay, ) raise @@ -1646,6 +1661,23 @@ def __init__( # Dict[session_id, Dict[str, Any]] - persistent globals per session self._in_process_sessions: Dict[int, Dict[str, Any]] = {} + def _get_runtime_oauth_env_overlay(self) -> Dict[str, str]: + """Build the rotating OAuth env overlay for venv/shell execution. + + This is intentionally routed through ``unity.common.runtime_oauth`` + rather than SecretManager so provider metadata, expiry semantics, and + runtime helper behavior stay in one place. Failures should not block + unrelated function execution; explicit token calls can still surface a + provider-specific error when the actor really needs a token. + """ + try: + from unity.common.runtime_oauth import get_refresh_token_oauth_env_overlay + + return get_refresh_token_oauth_env_overlay() + except Exception: + logger.warning("Failed to build OAuth env overlay", exc_info=True) + return {} + @property def primitive_scope(self) -> PrimitiveScope: """The scope controlling which managers' primitives are accessible.""" @@ -4607,6 +4639,16 @@ async def _handle_rpc_call( return self._make_json_serializable(await reason(**kwargs)) + if manager_name == "runtime" and method_name == "get_oauth_access_token": + from unity.common.runtime_oauth import get_oauth_access_token + + provider = kwargs.get("provider") + min_ttl_seconds = int(kwargs.get("min_ttl_seconds", 300)) + return get_oauth_access_token( + provider, + min_ttl_seconds=min_ttl_seconds, + ) + # Handle computer primitives if manager_name == "computer": if computer_primitives is None: @@ -4649,6 +4691,7 @@ async def execute_in_venv( initial_state: Optional[Dict[str, Any]] = None, primitives: Optional[Any] = None, computer_primitives: Optional[Any] = None, + env_overlay: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """ Execute a function implementation in a custom virtual environment. @@ -4688,6 +4731,7 @@ async def execute_in_venv( "implementation": implementation, "call_kwargs": call_kwargs, "is_async": is_async, + "env_overlay": env_overlay or self._get_runtime_oauth_env_overlay(), } if initial_state is not None: execute_payload["initial_state"] = initial_state diff --git a/unity/function_manager/venv_runner.py b/unity/function_manager/venv_runner.py index 4c4b62481..160e2d5b0 100644 --- a/unity/function_manager/venv_runner.py +++ b/unity/function_manager/venv_runner.py @@ -30,6 +30,7 @@ import asyncio import io import json +import os import signal import sys import threading @@ -295,6 +296,26 @@ async def reason( return result +def get_oauth_access_token(provider: str, *, min_ttl_seconds: int = 300) -> str: + """ + Return a current OAuth access token for a refresh-token backed provider. + + Custom virtual environments run in a child process whose environment can + be older than the parent Unity worker. This helper calls the parent process + over JSON-RPC so rotating OAuth access tokens are read from the current + assistant secret state instead of the child process's inherited env. + + Examples + -------- + ``token = get_oauth_access_token("microsoft")`` + ``token = get_oauth_access_token("google")`` + """ + return rpc_call_sync( + "runtime.get_oauth_access_token", + {"provider": provider, "min_ttl_seconds": min_ttl_seconds}, + ) + + # ──────────────────────────────────────────────────────────────────────────── # Execution Environment # ──────────────────────────────────────────────────────────────────────────── @@ -413,6 +434,7 @@ def create_safe_globals(is_async: bool = True): # Primitives proxy (computer and actor accessible via primitives.computer.* etc.) "primitives": PrimitivesProxy(is_async=is_async), "reason": reason, + "get_oauth_access_token": get_oauth_access_token, } # Try to add pydantic if available in this venv @@ -736,6 +758,21 @@ def inject_state_into_globals(state: dict, globals_dict: dict) -> None: pass +def apply_env_overlay(env_overlay: dict | None) -> None: + """Apply parent-supplied runtime env updates inside the child process. + + The venv runner can be a long-lived subprocess, so inherited environment + variables may be older than Unity's parent runtime. The parent sends only + the runtime overlay needed for execution, currently rotating OAuth token + variables, before each function call. + """ + if not env_overlay: + return + for key, value in env_overlay.items(): + if isinstance(key, str) and isinstance(value, str): + os.environ[key] = value + + # ──────────────────────────────────────────────────────────────────────────── # Main Entry Point # ──────────────────────────────────────────────────────────────────────────── @@ -864,6 +901,7 @@ def main(): call_kwargs = input_data.get("call_kwargs", {}) is_async = input_data.get("is_async", False) initial_state = input_data.get("initial_state") + apply_env_overlay(input_data.get("env_overlay")) # Execute with RPC support, optionally with initial state result = run_with_rpc_loop( @@ -1030,6 +1068,7 @@ def main_server(): implementation = input_data.get("implementation", "") call_kwargs = input_data.get("call_kwargs", {}) is_async = input_data.get("is_async", True) + apply_env_overlay(input_data.get("env_overlay")) # Execute with RPC support using persistent globals result = run_server_with_rpc_loop( diff --git a/unity/secret_manager/secret_manager.py b/unity/secret_manager/secret_manager.py index 3269fd875..487c4f037 100644 --- a/unity/secret_manager/secret_manager.py +++ b/unity/secret_manager/secret_manager.py @@ -4,6 +4,8 @@ import functools import logging import os +from threading import Lock +from time import monotonic from typing import Any, Callable, Dict, List, Optional, Type from pydantic import BaseModel @@ -57,6 +59,9 @@ def __init__(self) -> None: super().__init__() self.include_in_multi_assistant_table = True self._ctx = ContextRegistry.get_context(self, "Secrets") + self._assistant_secret_sync_lock = Lock() + self._last_assistant_secret_sync_success_at: float | None = None + self._last_assistant_secret_sync_failure_at: float | None = None # Ensure storage/schema exists deterministically (idempotent) self._provision_storage() @@ -212,22 +217,37 @@ def _default_update_tool_policy( # the built-in set above. OAUTH_SECRET_ALLOWLIST = _BUILTIN_OAUTH_SECRET_ALLOWLIST - def _sync_assistant_secrets(self) -> None: - """Pull Google / Microsoft OAuth tokens from Orchestra's - ``AssistantSecret`` table into the assistant's local ``Secrets`` - context. + @classmethod + def _resolve_secret_allowlist(cls) -> frozenset[str]: + """Return assistant-secret names owned by runtime OAuth sync. - Communication writes those tokens via REST (``/assistant/{id}/secret``) - from the OAuth callback. This sync mirrors them locally so the - Actor can use them in code-first plans, and writes them to - ``os.environ`` via ``_env_set`` so subprocesses see them too. + The set is intentionally limited to refresh-token OAuth metadata. + Console-pasted integration credentials already live in the local + ``Secrets`` context and reach ``os.environ`` through ``_sync_dotenv``. + """ + try: + from unity.common.runtime_oauth import refresh_token_oauth_secret_names - **Scope is intentionally narrow.** Console-pasted integration - secrets live in the ``/Secrets`` context directly; they reach env - via :meth:`_sync_dotenv`. This method does not know or care about - them — see ``_BUILTIN_OAUTH_SECRET_ALLOWLIST``. + return ( + cls._BUILTIN_OAUTH_SECRET_ALLOWLIST | refresh_token_oauth_secret_names() + ) + except Exception: + return cls._BUILTIN_OAUTH_SECRET_ALLOWLIST - Best-effort: failures are logged and silently swallowed. + def _sync_assistant_secrets(self) -> None: + """Mirror runtime OAuth assistant secrets from Orchestra into local state. + + Orchestra is the platform source of truth for assistant-level OAuth + secrets written outside this Unity process. Communication refresh jobs + persist updated access tokens there; this method pulls those values into + Unity's local ``Secrets`` context, then updates ``.env``/``os.environ`` + so generated code and provider SDKs can use normal environment-based + credential discovery. + + The sync is intentionally allowlisted. We mirror refresh-token OAuth + keys, but we do not copy arbitrary assistant secrets into the runtime. + Failures are best-effort: callers use ``sync_assistant_secrets_if_stale`` + as the observable gate. """ from ..session_details import SESSION_DETAILS @@ -259,10 +279,10 @@ def _sync_assistant_secrets(self) -> None: except Exception: return - # Allowlist is intentionally OAuth-only — see the - # ``_BUILTIN_OAUTH_SECRET_ALLOWLIST`` docstring above for why - # integration secrets do NOT flow through this sync. - active_allowlist = self._BUILTIN_OAUTH_SECRET_ALLOWLIST + # Allowlist is intentionally OAuth-only; integration secrets do not flow + # through this sync because they already live in the local Secrets + # context and are exported by _sync_dotenv. + active_allowlist = self._resolve_secret_allowlist() written = 0 for name, value in secrets_dict.items(): @@ -307,17 +327,10 @@ def _sync_assistant_secrets(self) -> None: written, ) - # Stale-cleanup: only the built-in Google / Microsoft OAuth keys - # are owned by THIS sync, so only those may be deleted when - # missing from Orchestra's response. Console-pasted secrets - # (HubSpot, Matterport, etc.) and OAuth-managed integration - # tokens (EMPLOYMENTHERO_REFRESH_TOKEN, etc.) live in the same - # local Secrets context but are NOT this sync's responsibility, - # so they must not be cleaned up here just because Orchestra's - # secrets payload omits them — that would silently wipe valid - # user state every time the admin endpoint returned a partial - # or stripped response. - for stale_name in self._BUILTIN_OAUTH_SECRET_ALLOWLIST - secrets_dict.keys(): + # Stale-cleanup is limited to the OAuth secrets owned by this sync. + # Console-pasted integration credentials live in the same local Secrets + # context but are not removed based on the admin assistant payload. + for stale_name in active_allowlist - secrets_dict.keys(): try: ids = unify.get_logs( context=self._ctx, @@ -331,6 +344,89 @@ def _sync_assistant_secrets(self) -> None: except Exception: continue + def sync_assistant_secrets_if_stale( + self, + ttl_seconds: float = 60.0, + *, + force: bool = False, + reason: str = "runtime", + failure_cooldown_seconds: float = 10.0, + ) -> bool: + """Pull assistant secrets through one debounced runtime sync gate. + + This is the single runtime entry point for keeping Unity's local secret + state close to Orchestra without adding a network round trip to every + actor operation. Normal callers, including ``execute_code``, call with + ``force=False`` and therefore only perform the expensive Orchestra pull + once per ``ttl_seconds`` window. Forced callers use this when freshness + matters more than debounce, such as SecretManager construction, + ``primitives.secrets.ask(...)``, assistant-update events, or an OAuth + helper detecting a missing/near-expiry access token. + + Returns ``True`` only when this invocation actually ran the sync work. + Returns ``False`` when the success debounce or failure cooldown skipped + work, or when the wrapped sync raised an exception. + """ + now = monotonic() + if not force: + last_success = self._last_assistant_secret_sync_success_at + if last_success is not None and now - last_success < ttl_seconds: + return False + last_failure = self._last_assistant_secret_sync_failure_at + if ( + last_failure is not None + and now - last_failure < failure_cooldown_seconds + ): + return False + + with self._assistant_secret_sync_lock: + now = monotonic() + if not force: + last_success = self._last_assistant_secret_sync_success_at + if last_success is not None and now - last_success < ttl_seconds: + return False + last_failure = self._last_assistant_secret_sync_failure_at + if ( + last_failure is not None + and now - last_failure < failure_cooldown_seconds + ): + return False + try: + self._sync_assistant_secrets() + self._sync_dotenv() + except Exception: + self._last_assistant_secret_sync_failure_at = monotonic() + logger.warning( + "[integrations] assistant secret sync failed reason=%s", + reason, + exc_info=True, + ) + return False + self._last_assistant_secret_sync_success_at = monotonic() + self._last_assistant_secret_sync_failure_at = None + logger.info( + "[integrations] assistant secret sync complete reason=%s", + reason, + ) + return True + + def _get_secret_value(self, name: str) -> str | None: + try: + rows = unify.get_logs( + context=self._ctx, + filter=f"name == {name!r}", + limit=1, + from_fields=["name", "value"], + ) + if rows: + value = (rows[0].entries or {}).get("value") + if isinstance(value, str) and value: + return value + except Exception: + pass + value = os.environ.get(name) + return value if value else None + # --------------------- Internal helpers (.env sync) --------------------- # def _dotenv_path(self) -> str: """Return the path to the .env file used for local sync. @@ -382,8 +478,7 @@ def _ensure_dotenv_synced_on_init(self) -> None: with open(path, "w", encoding="utf-8") as fh: fh.write("") - self._sync_assistant_secrets() - self._sync_dotenv() + self.sync_assistant_secrets_if_stale(force=True, reason="secret_manager_init") @staticmethod def _parse_env_lines(lines: List[str]) -> Dict[str, int]: @@ -536,17 +631,7 @@ async def ask( _clarification_down_q: Optional[asyncio.Queue[str]] = None, _call_id: Optional[str] = None, ) -> SteerableToolHandle: - # Pull OAuth tokens from Orchestra → Secrets context, then sync - # all secrets (including freshly-synced OAuth tokens) into .env - # so they're available through os.environ before the Actor reads them. - try: - self._sync_assistant_secrets() - except Exception: - pass - try: - self._sync_dotenv() - except Exception: - pass + self.sync_assistant_secrets_if_stale(force=True, reason="secret_ask") # First, replace any known raw secret values with placeholders try: diff --git a/unity/spending_limits.py b/unity/spending_limits.py index 2edbc35bf..277d59b8a 100644 --- a/unity/spending_limits.py +++ b/unity/spending_limits.py @@ -376,11 +376,7 @@ def _to_limit_type(type_str: Optional[str]) -> Optional[LimitType]: # legacy gate would block every call. Skip it for METERED, keep it # for CREDITS (and for the no-billing-mode-yet legacy case so we # don't loosen the gate during a partial Orchestra rollout). - if ( - billing_mode != "METERED" - and credit_balance is not None - and credit_balance <= 0 - ): + if billing_mode != "METERED" and credit_balance is not None and credit_balance <= 0: return LimitCheckResponse( allowed=False, reason=(