diff --git a/src/mcp_server_docker/input_schemas.py b/src/mcp_server_docker/input_schemas.py index c912d0e..773600d 100644 --- a/src/mcp_server_docker/input_schemas.py +++ b/src/mcp_server_docker/input_schemas.py @@ -1,4 +1,5 @@ import json +import os from datetime import datetime from typing import Any, Literal, get_args, get_origin @@ -11,6 +12,63 @@ model_validator, ) +# Host paths that must never be bind-mounted into containers. +# Mounting container runtime sockets allows full host takeover from within +# a container. /proc, /sys, and /dev expose kernel interfaces that enable +# privilege escalation. These restrictions prevent container escape attacks +# triggered via prompt injection or untrusted LLM tool calls. +BLOCKED_MOUNT_SOURCES = frozenset( + { + "/var/run/docker.sock", + "/run/docker.sock", + "/var/run/containerd", + "/run/containerd", + "/var/run/crio", + "/run/crio", + "/proc", + "/sys", + "/dev", + } +) + + +def _extract_host_paths( + volumes: dict[str, dict[str, str]] | list[str], +) -> list[str]: + """Return host source paths from a Docker volume mount specification. + + Handles both formats accepted by the Docker SDK: + dict: {"/host/path": {"bind": "/container/path", "mode": "rw"}} + list: ["/host/path:/container/path:rw"] + """ + if isinstance(volumes, dict): + return list(volumes.keys()) + + paths: list[str] = [] + for entry in volumes: + # list entries are "source:dest[:mode]" + parts = entry.split(":") + if parts: + paths.append(parts[0]) + return paths + + +def _is_blocked_mount(host_path: str) -> str | None: + """Return the matched blocked path if host_path resolves to one, else None. + + Resolves symlinks so that aliased paths (e.g. /tmp/sock pointing at + /var/run/docker.sock) are still caught. + """ + try: + resolved = os.path.realpath(host_path) + except (OSError, ValueError): + resolved = host_path + + for blocked in BLOCKED_MOUNT_SOURCES: + if resolved == blocked or resolved.startswith(blocked + os.sep): + return blocked + return None + class JSONParsingModel(BaseModel): """ @@ -103,6 +161,25 @@ class CreateContainerInput(JSONParsingModel): volumes: dict[str, dict[str, str]] | list[str] | None = Field( None, description="Volume mappings" ) + + @field_validator("volumes", mode="after") + @classmethod + def _reject_blocked_mounts( + cls, volumes: dict[str, dict[str, str]] | list[str] | None + ) -> dict[str, dict[str, str]] | list[str] | None: + if volumes is None: + return volumes + + for host_path in _extract_host_paths(volumes): + blocked = _is_blocked_mount(host_path) + if blocked is not None: + raise ValueError( + f"Volume mount source '{host_path}' is blocked. " + f"Mounting '{blocked}' exposes host resources that " + f"allow container escape or privilege escalation." + ) + return volumes + labels: dict[str, str] | list[str] | None = Field( None, description="Container labels, either as a dictionary or a list of key=value strings", @@ -164,6 +241,23 @@ class BuildImageInput(JSONParsingModel): tag: str = Field(..., description="Image tag") dockerfile: str | None = Field(None, description="Path to Dockerfile") + @field_validator("path", mode="after") + @classmethod + def _reject_sensitive_build_context(cls, path: str) -> str: + try: + resolved = os.path.realpath(path) + except (OSError, ValueError): + resolved = path + + for blocked in BLOCKED_MOUNT_SOURCES: + if resolved == blocked or resolved.startswith(blocked + os.sep): + raise ValueError( + f"Build context path '{path}' is blocked. " + f"'{blocked}' contains sensitive host resources that " + f"would be sent to the Docker daemon as build context." + ) + return path + class RemoveImageInput(JSONParsingModel): image: str = Field(..., description="Image ID or name") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_mount_validation.py b/tests/test_mount_validation.py new file mode 100644 index 0000000..c9c849a --- /dev/null +++ b/tests/test_mount_validation.py @@ -0,0 +1,97 @@ +"""Tests for blocked volume mount and build context validation.""" + +import pytest +from pydantic import ValidationError + +from mcp_server_docker.input_schemas import ( + BLOCKED_MOUNT_SOURCES, + BuildImageInput, + CreateContainerInput, +) + + +class TestBlockedVolumeMounts: + """Verify that dangerous host paths are rejected as volume sources.""" + + @pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES)) + def test_dict_format_blocks_dangerous_paths(self, blocked_path: str) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes={blocked_path: {"bind": "/mnt", "mode": "ro"}}, + ) + + @pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES)) + def test_list_format_blocks_dangerous_paths(self, blocked_path: str) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes=[f"{blocked_path}:/mnt:ro"], + ) + + def test_docker_socket_blocked(self) -> None: + with pytest.raises(ValidationError, match="container escape"): + CreateContainerInput( + image="alpine:latest", + volumes={ + "/var/run/docker.sock": { + "bind": "/var/run/docker.sock", + "mode": "rw", + } + }, + ) + + def test_subpath_of_blocked_path_also_blocked(self) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes=["/proc/1/root:/mnt:ro"], + ) + + def test_safe_volume_mount_allowed(self) -> None: + container = CreateContainerInput( + image="nginx:latest", + volumes={"/home/user/app": {"bind": "/app", "mode": "ro"}}, + ) + assert container.volumes is not None + + def test_safe_list_volume_mount_allowed(self) -> None: + container = CreateContainerInput( + image="nginx:latest", + volumes=["/tmp/data:/data:rw"], + ) + assert container.volumes is not None + + def test_none_volumes_allowed(self) -> None: + container = CreateContainerInput(image="alpine:latest") + assert container.volumes is None + + def test_multiple_volumes_one_blocked(self) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes={ + "/tmp/safe": {"bind": "/safe", "mode": "ro"}, + "/var/run/docker.sock": { + "bind": "/var/run/docker.sock", + "mode": "rw", + }, + }, + ) + + +class TestBlockedBuildContextPaths: + """Verify that sensitive paths are rejected as Docker build contexts.""" + + @pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES)) + def test_blocked_paths_rejected_as_build_context(self, blocked_path: str) -> None: + with pytest.raises(ValidationError, match="blocked"): + BuildImageInput(path=blocked_path, tag="test:latest") + + def test_safe_build_context_allowed(self) -> None: + build = BuildImageInput(path="/home/user/project", tag="myapp:latest") + assert build.path == "/home/user/project" + + def test_subpath_of_blocked_path_rejected(self) -> None: + with pytest.raises(ValidationError, match="blocked"): + BuildImageInput(path="/proc/self", tag="test:latest")