diff --git a/src/google/adk/artifacts/file_artifact_service.py b/src/google/adk/artifacts/file_artifact_service.py index b0078e27ce..ff6fff0361 100644 --- a/src/google/adk/artifacts/file_artifact_service.py +++ b/src/google/adk/artifacts/file_artifact_service.py @@ -133,6 +133,31 @@ def _resolve_scoped_artifact_path( return candidate, relative +def _validate_path_segment(parent: Path, segment: str, label: str) -> Path: + """Validates that a path segment does not escape its parent directory. + + Args: + parent: The directory the segment must stay within. + segment: Caller-supplied value used as a single path component. + label: Human-readable name for error messages (e.g. ``"User ID"``). + + Returns: + The resolved path ``parent / segment``. + + Raises: + InputValidationError: If the resulting path escapes ``parent``. + """ + parent_resolved = parent.resolve(strict=False) + candidate = (parent_resolved / segment).resolve(strict=False) + try: + candidate.relative_to(parent_resolved) + except ValueError as exc: + raise InputValidationError( + f"{label} {segment!r} resolves outside storage directory." + ) from exc + return candidate + + def _is_user_scoped(session_id: Optional[str], filename: str) -> bool: """Determines whether artifacts should be stored in the user namespace.""" return session_id is None or _file_has_user_namespace(filename) @@ -145,7 +170,11 @@ def _user_artifacts_dir(base_root: Path) -> Path: def _session_artifacts_dir(base_root: Path, session_id: str) -> Path: """Returns the path that stores session-scoped artifacts.""" - return base_root / "sessions" / session_id / "artifacts" + sessions_dir = base_root / "sessions" + return ( + _validate_path_segment(sessions_dir, session_id, "Session ID") + / "artifacts" + ) def _versions_dir(artifact_dir: Path) -> Path: @@ -220,7 +249,8 @@ def __init__(self, root_dir: Path | str): def _base_root(self, user_id: str, /) -> Path: """Returns the artifacts root directory for a user.""" - return self.root_dir / "users" / user_id + users_dir = self.root_dir / "users" + return _validate_path_segment(users_dir, user_id, "User ID") def _scope_root( self, diff --git a/tests/unittests/artifacts/test_artifact_service.py b/tests/unittests/artifacts/test_artifact_service.py index 25294d4909..79dc6239a3 100644 --- a/tests/unittests/artifacts/test_artifact_service.py +++ b/tests/unittests/artifacts/test_artifact_service.py @@ -896,3 +896,62 @@ async def test_save_artifact_with_snake_case_dict( assert loaded is not None assert loaded.inline_data is not None assert loaded.inline_data.mime_type == "text/plain" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "user_id", + [ + "../../escape", + "../sibling", + "valid/../../../etc", + ], +) +async def test_file_save_rejects_traversal_user_id(tmp_path, user_id): + """FileArtifactService rejects user_id values that escape the storage root.""" + artifact_service = FileArtifactService(root_dir=tmp_path / "artifacts") + part = types.Part(text="content") + with pytest.raises(InputValidationError): + await artifact_service.save_artifact( + app_name="myapp", + user_id=user_id, + session_id="sess1", + filename="file.txt", + artifact=part, + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "session_id", + [ + "../../escape", + "../sibling", + "valid/../../../tmp", + ], +) +async def test_file_save_rejects_traversal_session_id(tmp_path, session_id): + """FileArtifactService rejects session_id values that escape the storage root.""" + artifact_service = FileArtifactService(root_dir=tmp_path / "artifacts") + part = types.Part(text="content") + with pytest.raises(InputValidationError): + await artifact_service.save_artifact( + app_name="myapp", + user_id="user1", + session_id=session_id, + filename="file.txt", + artifact=part, + ) + + +@pytest.mark.asyncio +async def test_file_delete_rejects_traversal_session_id(tmp_path): + """Traversal session_id is caught before shutil.rmtree can be reached.""" + artifact_service = FileArtifactService(root_dir=tmp_path / "artifacts") + with pytest.raises(InputValidationError): + await artifact_service.delete_artifact( + app_name="myapp", + user_id="user1", + session_id="../../escape", + filename="file.txt", + )