diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index 82aaa6e452..d15fd038db 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -105,10 +105,91 @@ def _load_config_from_path(config_path: str) -> AgentConfig: return AgentConfig.model_validate(config_data) +_ENFORCE_DENYLIST = True + +# Modules that must never be imported via YAML agent configuration. +# These provide direct access to the operating system, process execution, +# or dynamic code evaluation and could be abused to achieve arbitrary +# code execution when referenced in callback, tool, schema, or model +# code-reference fields. +_BLOCKED_MODULES = frozenset({ + # Process / OS execution + "os", + "subprocess", + "sys", + "builtins", + "importlib", + "shutil", + "signal", + "multiprocessing", + "threading", + # Dynamic code evaluation + "code", + "codeop", + "compileall", + "runpy", + # Native / unsafe extensions + "ctypes", + # Network access + "socket", + "http", + "urllib", + "ftplib", + "smtplib", + "poplib", + "imaplib", + "nntplib", + "telnetlib", + "xmlrpc", + "asyncio", + # Filesystem / serialisation + "tempfile", + "pathlib", + "shelve", + "pickle", + "marshal", + # Interactive / side-effect modules + "webbrowser", + "antigravity", + "pty", + "commands", + "pdb", + "profile", +}) + + +def _validate_module_reference(fully_qualified_name: str) -> None: + """Validate that a module reference does not target a blocked module. + + Args: + fully_qualified_name: The fully-qualified Python name to validate + (e.g. ``"my_package.my_module.my_func"``). + + Raises: + ValueError: If the top-level module is in ``_BLOCKED_MODULES``. + """ + if not _ENFORCE_DENYLIST: + return + # Extract the top-level package from the fully-qualified name. + top_module = fully_qualified_name.split(".")[0] + if top_module in _BLOCKED_MODULES: + raise ValueError( + f"Blocked module reference: {fully_qualified_name!r}. " + f"Importing from the '{top_module}' module is not allowed in " + "agent configurations because it can execute arbitrary code." + ) + + +def _set_enforce_denylist(value: bool) -> None: + global _ENFORCE_DENYLIST + _ENFORCE_DENYLIST = value + + @experimental(FeatureName.AGENT_CONFIG) def resolve_fully_qualified_name(name: str) -> Any: try: module_path, obj_name = name.rsplit(".", 1) + _validate_module_reference(name) module = importlib.import_module(module_path) return getattr(module, obj_name) except Exception as e: @@ -160,6 +241,7 @@ def _resolve_agent_code_reference(code: str) -> Any: if "." not in code: raise ValueError(f"Invalid code reference: {code}") + _validate_module_reference(code) module_path, obj_name = code.rsplit(".", 1) module = importlib.import_module(module_path) obj = getattr(module, obj_name) @@ -189,6 +271,7 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: if not code_config or not code_config.name: raise ValueError("Invalid CodeConfig.") + _validate_module_reference(code_config.name) module_path, obj_name = code_config.name.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, obj_name) diff --git a/src/google/adk/agents/llm_agent.py b/src/google/adk/agents/llm_agent.py index ee1b05c535..a09453fc4a 100644 --- a/src/google/adk/agents/llm_agent.py +++ b/src/google/adk/agents/llm_agent.py @@ -1044,6 +1044,9 @@ def _resolve_tools( obj = getattr(module, tool_config.name) else: # User-defined tools + from .config_agent_utils import _validate_module_reference + + _validate_module_reference(tool_config.name) module_path, obj_name = tool_config.name.rsplit('.', 1) module = importlib.import_module(module_path) obj = getattr(module, obj_name) diff --git a/tests/unittests/agents/test_agent_config.py b/tests/unittests/agents/test_agent_config.py index 99ee1d8401..611835b9bf 100644 --- a/tests/unittests/agents/test_agent_config.py +++ b/tests/unittests/agents/test_agent_config.py @@ -465,3 +465,118 @@ def fake_from_config(path: str): ) assert result == "sentinel" assert recorded["path"] == expected_path + +# --- Security tests: module blocklist for YAML agent config code references --- + + +def test_resolve_code_reference_blocks_os_when_enforced(): + """Verify resolve_code_reference blocks os module directly.""" + from google.adk.agents.common_configs import CodeConfig + + with pytest.raises(ValueError, match="Blocked module reference"): + config_agent_utils.resolve_code_reference( + CodeConfig(name="os.system") + ) + + +def test_resolve_fully_qualified_name_blocks_subprocess_when_enforced(): + """Verify resolve_fully_qualified_name blocks subprocess module. + + resolve_fully_qualified_name wraps all exceptions in + ValueError("Invalid fully qualified name: ..."), so we check the wrapper + and verify the __cause__ carries the blocklist message. + """ + with pytest.raises(ValueError, match="Invalid fully qualified name") as exc_info: + config_agent_utils.resolve_fully_qualified_name("subprocess.Popen") + assert "Blocked module reference" in str(exc_info.value.__cause__) + + +def test_allowed_module_passes_when_enforced(tmp_path: Path): + """Verify that google.adk modules are NOT blocked by the module denylist.""" + # This should NOT raise — google.adk modules must remain allowed + result = config_agent_utils.resolve_fully_qualified_name( + "google.adk.agents.llm_agent.LlmAgent" + ) + assert result is LlmAgent + + +@pytest.mark.parametrize( + "blocked_module", + [ + "os.system", + "subprocess.call", + "builtins.exec", + ], +) +def test_resolve_agent_code_reference_blocks_when_enforced( + blocked_module: str, +): + """Verify _resolve_agent_code_reference blocks dangerous modules.""" + with pytest.raises(ValueError, match="Blocked module reference"): + config_agent_utils._resolve_agent_code_reference(blocked_module) + + +@pytest.mark.parametrize( + "blocked_ref", + [ + "os.system", + "subprocess.call", + "builtins.exec", + "pickle.loads", + ], +) +def test_resolve_tools_blocks_dangerous_modules(blocked_ref: str): + """Verify _resolve_tools blocks dangerous modules for user-defined tools.""" + from google.adk.agents.llm_agent import LlmAgent + from google.adk.tools.tool_configs import ToolConfig + + tool_config = ToolConfig(name=blocked_ref) + with pytest.raises(ValueError, match="Blocked module reference"): + LlmAgent._resolve_tools([tool_config], "/fake/path.yaml") + + +def test_resolve_tools_allows_builtin_adk_tools(): + """Verify _resolve_tools allows ADK built-in tools (no dot in name).""" + from google.adk.agents.llm_agent import LlmAgent + from google.adk.tools.tool_configs import ToolConfig + + # Built-in tools have no dot — they import from google.adk.tools + tool_config = ToolConfig(name="google_search") + # Should NOT raise — this is a safe, hardcoded import path + resolved = LlmAgent._resolve_tools([tool_config], "/fake/path.yaml") + assert len(resolved) == 1 + + +@pytest.mark.parametrize( + "blocked_ref", + [ + "ftplib.FTP", + "smtplib.SMTP", + "xmlrpc.client", + "telnetlib.Telnet", + "poplib.POP3", + "imaplib.IMAP4", + "asyncio.run", + "pathlib.Path", + ], +) +def test_newly_blocked_network_modules_are_rejected(blocked_ref: str): + """Verify newly added network-capable modules are blocked. + + resolve_fully_qualified_name wraps errors, so we check the cause. + """ + with pytest.raises(ValueError, match="Invalid fully qualified name") as exc_info: + config_agent_utils.resolve_fully_qualified_name(blocked_ref) + assert "Blocked module reference" in str(exc_info.value.__cause__) + + +def test_denylist_can_be_disabled(): + """Verify _set_enforce_denylist(False) disables module blocking.""" + config_agent_utils._set_enforce_denylist(False) + try: + # os.getcwd is a real, importable reference — should succeed + result = config_agent_utils.resolve_fully_qualified_name("os.getcwd") + assert callable(result) + finally: + config_agent_utils._set_enforce_denylist(True) +