Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions src/google/adk/agents/config_agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,91 @@ def _load_config_from_path(config_path: str) -> AgentConfig:
return AgentConfig.model_validate(config_data)


_ENFORCE_DENYLIST = True

# Modules that must never be imported via YAML agent configuration.
# These provide direct access to the operating system, process execution,
# or dynamic code evaluation and could be abused to achieve arbitrary
# code execution when referenced in callback, tool, schema, or model
# code-reference fields.
_BLOCKED_MODULES = frozenset({
# Process / OS execution
"os",
"subprocess",
"sys",
"builtins",
"importlib",
"shutil",
"signal",
"multiprocessing",
"threading",
# Dynamic code evaluation
"code",
"codeop",
"compileall",
"runpy",
# Native / unsafe extensions
"ctypes",
# Network access
"socket",
"http",
"urllib",
"ftplib",
"smtplib",
"poplib",
"imaplib",
"nntplib",
"telnetlib",
"xmlrpc",
"asyncio",
# Filesystem / serialisation
"tempfile",
"pathlib",
"shelve",
"pickle",
"marshal",
# Interactive / side-effect modules
"webbrowser",
"antigravity",
"pty",
"commands",
"pdb",
"profile",
})


def _validate_module_reference(fully_qualified_name: str) -> None:
"""Validate that a module reference does not target a blocked module.

Args:
fully_qualified_name: The fully-qualified Python name to validate
(e.g. ``"my_package.my_module.my_func"``).

Raises:
ValueError: If the top-level module is in ``_BLOCKED_MODULES``.
"""
if not _ENFORCE_DENYLIST:
return
# Extract the top-level package from the fully-qualified name.
top_module = fully_qualified_name.split(".")[0]
if top_module in _BLOCKED_MODULES:
raise ValueError(
f"Blocked module reference: {fully_qualified_name!r}. "
f"Importing from the '{top_module}' module is not allowed in "
"agent configurations because it can execute arbitrary code."
)


def _set_enforce_denylist(value: bool) -> None:
global _ENFORCE_DENYLIST
_ENFORCE_DENYLIST = value


@experimental(FeatureName.AGENT_CONFIG)
def resolve_fully_qualified_name(name: str) -> Any:
try:
module_path, obj_name = name.rsplit(".", 1)
_validate_module_reference(name)
module = importlib.import_module(module_path)
return getattr(module, obj_name)
except Exception as e:
Expand Down Expand Up @@ -160,6 +241,7 @@ def _resolve_agent_code_reference(code: str) -> Any:
if "." not in code:
raise ValueError(f"Invalid code reference: {code}")

_validate_module_reference(code)
module_path, obj_name = code.rsplit(".", 1)
module = importlib.import_module(module_path)
obj = getattr(module, obj_name)
Expand Down Expand Up @@ -189,6 +271,7 @@ def resolve_code_reference(code_config: CodeConfig) -> Any:
if not code_config or not code_config.name:
raise ValueError("Invalid CodeConfig.")

_validate_module_reference(code_config.name)
module_path, obj_name = code_config.name.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, obj_name)
Expand Down
3 changes: 3 additions & 0 deletions src/google/adk/agents/llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,9 @@ def _resolve_tools(
obj = getattr(module, tool_config.name)
else:
# User-defined tools
from .config_agent_utils import _validate_module_reference

_validate_module_reference(tool_config.name)
module_path, obj_name = tool_config.name.rsplit('.', 1)
module = importlib.import_module(module_path)
obj = getattr(module, obj_name)
Expand Down
115 changes: 115 additions & 0 deletions tests/unittests/agents/test_agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,118 @@ def fake_from_config(path: str):
)
assert result == "sentinel"
assert recorded["path"] == expected_path

# --- Security tests: module blocklist for YAML agent config code references ---


def test_resolve_code_reference_blocks_os_when_enforced():
"""Verify resolve_code_reference blocks os module directly."""
from google.adk.agents.common_configs import CodeConfig

with pytest.raises(ValueError, match="Blocked module reference"):
config_agent_utils.resolve_code_reference(
CodeConfig(name="os.system")
)


def test_resolve_fully_qualified_name_blocks_subprocess_when_enforced():
"""Verify resolve_fully_qualified_name blocks subprocess module.

resolve_fully_qualified_name wraps all exceptions in
ValueError("Invalid fully qualified name: ..."), so we check the wrapper
and verify the __cause__ carries the blocklist message.
"""
with pytest.raises(ValueError, match="Invalid fully qualified name") as exc_info:
config_agent_utils.resolve_fully_qualified_name("subprocess.Popen")
assert "Blocked module reference" in str(exc_info.value.__cause__)


def test_allowed_module_passes_when_enforced(tmp_path: Path):
"""Verify that google.adk modules are NOT blocked by the module denylist."""
# This should NOT raise — google.adk modules must remain allowed
result = config_agent_utils.resolve_fully_qualified_name(
"google.adk.agents.llm_agent.LlmAgent"
)
assert result is LlmAgent


@pytest.mark.parametrize(
"blocked_module",
[
"os.system",
"subprocess.call",
"builtins.exec",
],
)
def test_resolve_agent_code_reference_blocks_when_enforced(
blocked_module: str,
):
"""Verify _resolve_agent_code_reference blocks dangerous modules."""
with pytest.raises(ValueError, match="Blocked module reference"):
config_agent_utils._resolve_agent_code_reference(blocked_module)


@pytest.mark.parametrize(
"blocked_ref",
[
"os.system",
"subprocess.call",
"builtins.exec",
"pickle.loads",
],
)
def test_resolve_tools_blocks_dangerous_modules(blocked_ref: str):
"""Verify _resolve_tools blocks dangerous modules for user-defined tools."""
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_configs import ToolConfig

tool_config = ToolConfig(name=blocked_ref)
with pytest.raises(ValueError, match="Blocked module reference"):
LlmAgent._resolve_tools([tool_config], "/fake/path.yaml")


def test_resolve_tools_allows_builtin_adk_tools():
"""Verify _resolve_tools allows ADK built-in tools (no dot in name)."""
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_configs import ToolConfig

# Built-in tools have no dot — they import from google.adk.tools
tool_config = ToolConfig(name="google_search")
# Should NOT raise — this is a safe, hardcoded import path
resolved = LlmAgent._resolve_tools([tool_config], "/fake/path.yaml")
assert len(resolved) == 1


@pytest.mark.parametrize(
"blocked_ref",
[
"ftplib.FTP",
"smtplib.SMTP",
"xmlrpc.client",
"telnetlib.Telnet",
"poplib.POP3",
"imaplib.IMAP4",
"asyncio.run",
"pathlib.Path",
],
)
def test_newly_blocked_network_modules_are_rejected(blocked_ref: str):
"""Verify newly added network-capable modules are blocked.

resolve_fully_qualified_name wraps errors, so we check the cause.
"""
with pytest.raises(ValueError, match="Invalid fully qualified name") as exc_info:
config_agent_utils.resolve_fully_qualified_name(blocked_ref)
assert "Blocked module reference" in str(exc_info.value.__cause__)


def test_denylist_can_be_disabled():
"""Verify _set_enforce_denylist(False) disables module blocking."""
config_agent_utils._set_enforce_denylist(False)
try:
# os.getcwd is a real, importable reference — should succeed
result = config_agent_utils.resolve_fully_qualified_name("os.getcwd")
assert callable(result)
finally:
config_agent_utils._set_enforce_denylist(True)