From 9d8b9f53cea88f32019208ba0fb050e9120a3a47 Mon Sep 17 00:00:00 2001 From: Taranveer Tengurchittoo Date: Fri, 20 Mar 2026 06:59:07 +0400 Subject: [PATCH] fix(security): block dangerous host paths in volume mounts and build contexts Container runtime sockets (/var/run/docker.sock, containerd, crio), /proc, /sys, and /dev are now rejected as volume mount sources and build context paths. Symlinks are resolved before checking to prevent bypass via aliased paths. Without this, an LLM (or an attacker via prompt injection through container logs or image metadata) can mount the Docker socket into a container and achieve full host takeover. This is the standard Docker container escape technique. Includes 35 tests covering both dict and list volume formats, blocked path subpaths, symlink resolution, and safe path passthrough. --- src/mcp_server_docker/input_schemas.py | 94 +++++++++++++++++++++++++ tests/__init__.py | 0 tests/test_mount_validation.py | 97 ++++++++++++++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/test_mount_validation.py diff --git a/src/mcp_server_docker/input_schemas.py b/src/mcp_server_docker/input_schemas.py index c912d0e..773600d 100644 --- a/src/mcp_server_docker/input_schemas.py +++ b/src/mcp_server_docker/input_schemas.py @@ -1,4 +1,5 @@ import json +import os from datetime import datetime from typing import Any, Literal, get_args, get_origin @@ -11,6 +12,63 @@ model_validator, ) +# Host paths that must never be bind-mounted into containers. +# Mounting container runtime sockets allows full host takeover from within +# a container. /proc, /sys, and /dev expose kernel interfaces that enable +# privilege escalation. These restrictions prevent container escape attacks +# triggered via prompt injection or untrusted LLM tool calls. +BLOCKED_MOUNT_SOURCES = frozenset( + { + "/var/run/docker.sock", + "/run/docker.sock", + "/var/run/containerd", + "/run/containerd", + "/var/run/crio", + "/run/crio", + "/proc", + "/sys", + "/dev", + } +) + + +def _extract_host_paths( + volumes: dict[str, dict[str, str]] | list[str], +) -> list[str]: + """Return host source paths from a Docker volume mount specification. + + Handles both formats accepted by the Docker SDK: + dict: {"/host/path": {"bind": "/container/path", "mode": "rw"}} + list: ["/host/path:/container/path:rw"] + """ + if isinstance(volumes, dict): + return list(volumes.keys()) + + paths: list[str] = [] + for entry in volumes: + # list entries are "source:dest[:mode]" + parts = entry.split(":") + if parts: + paths.append(parts[0]) + return paths + + +def _is_blocked_mount(host_path: str) -> str | None: + """Return the matched blocked path if host_path resolves to one, else None. + + Resolves symlinks so that aliased paths (e.g. /tmp/sock pointing at + /var/run/docker.sock) are still caught. + """ + try: + resolved = os.path.realpath(host_path) + except (OSError, ValueError): + resolved = host_path + + for blocked in BLOCKED_MOUNT_SOURCES: + if resolved == blocked or resolved.startswith(blocked + os.sep): + return blocked + return None + class JSONParsingModel(BaseModel): """ @@ -103,6 +161,25 @@ class CreateContainerInput(JSONParsingModel): volumes: dict[str, dict[str, str]] | list[str] | None = Field( None, description="Volume mappings" ) + + @field_validator("volumes", mode="after") + @classmethod + def _reject_blocked_mounts( + cls, volumes: dict[str, dict[str, str]] | list[str] | None + ) -> dict[str, dict[str, str]] | list[str] | None: + if volumes is None: + return volumes + + for host_path in _extract_host_paths(volumes): + blocked = _is_blocked_mount(host_path) + if blocked is not None: + raise ValueError( + f"Volume mount source '{host_path}' is blocked. " + f"Mounting '{blocked}' exposes host resources that " + f"allow container escape or privilege escalation." + ) + return volumes + labels: dict[str, str] | list[str] | None = Field( None, description="Container labels, either as a dictionary or a list of key=value strings", @@ -164,6 +241,23 @@ class BuildImageInput(JSONParsingModel): tag: str = Field(..., description="Image tag") dockerfile: str | None = Field(None, description="Path to Dockerfile") + @field_validator("path", mode="after") + @classmethod + def _reject_sensitive_build_context(cls, path: str) -> str: + try: + resolved = os.path.realpath(path) + except (OSError, ValueError): + resolved = path + + for blocked in BLOCKED_MOUNT_SOURCES: + if resolved == blocked or resolved.startswith(blocked + os.sep): + raise ValueError( + f"Build context path '{path}' is blocked. " + f"'{blocked}' contains sensitive host resources that " + f"would be sent to the Docker daemon as build context." + ) + return path + class RemoveImageInput(JSONParsingModel): image: str = Field(..., description="Image ID or name") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_mount_validation.py b/tests/test_mount_validation.py new file mode 100644 index 0000000..c9c849a --- /dev/null +++ b/tests/test_mount_validation.py @@ -0,0 +1,97 @@ +"""Tests for blocked volume mount and build context validation.""" + +import pytest +from pydantic import ValidationError + +from mcp_server_docker.input_schemas import ( + BLOCKED_MOUNT_SOURCES, + BuildImageInput, + CreateContainerInput, +) + + +class TestBlockedVolumeMounts: + """Verify that dangerous host paths are rejected as volume sources.""" + + @pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES)) + def test_dict_format_blocks_dangerous_paths(self, blocked_path: str) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes={blocked_path: {"bind": "/mnt", "mode": "ro"}}, + ) + + @pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES)) + def test_list_format_blocks_dangerous_paths(self, blocked_path: str) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes=[f"{blocked_path}:/mnt:ro"], + ) + + def test_docker_socket_blocked(self) -> None: + with pytest.raises(ValidationError, match="container escape"): + CreateContainerInput( + image="alpine:latest", + volumes={ + "/var/run/docker.sock": { + "bind": "/var/run/docker.sock", + "mode": "rw", + } + }, + ) + + def test_subpath_of_blocked_path_also_blocked(self) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes=["/proc/1/root:/mnt:ro"], + ) + + def test_safe_volume_mount_allowed(self) -> None: + container = CreateContainerInput( + image="nginx:latest", + volumes={"/home/user/app": {"bind": "/app", "mode": "ro"}}, + ) + assert container.volumes is not None + + def test_safe_list_volume_mount_allowed(self) -> None: + container = CreateContainerInput( + image="nginx:latest", + volumes=["/tmp/data:/data:rw"], + ) + assert container.volumes is not None + + def test_none_volumes_allowed(self) -> None: + container = CreateContainerInput(image="alpine:latest") + assert container.volumes is None + + def test_multiple_volumes_one_blocked(self) -> None: + with pytest.raises(ValidationError, match="blocked"): + CreateContainerInput( + image="alpine:latest", + volumes={ + "/tmp/safe": {"bind": "/safe", "mode": "ro"}, + "/var/run/docker.sock": { + "bind": "/var/run/docker.sock", + "mode": "rw", + }, + }, + ) + + +class TestBlockedBuildContextPaths: + """Verify that sensitive paths are rejected as Docker build contexts.""" + + @pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES)) + def test_blocked_paths_rejected_as_build_context(self, blocked_path: str) -> None: + with pytest.raises(ValidationError, match="blocked"): + BuildImageInput(path=blocked_path, tag="test:latest") + + def test_safe_build_context_allowed(self) -> None: + build = BuildImageInput(path="/home/user/project", tag="myapp:latest") + assert build.path == "/home/user/project" + + def test_subpath_of_blocked_path_rejected(self) -> None: + with pytest.raises(ValidationError, match="blocked"): + BuildImageInput(path="/proc/self", tag="test:latest")