Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions src/mcp_server_docker/input_schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
from datetime import datetime
from typing import Any, Literal, get_args, get_origin

Expand All @@ -11,6 +12,63 @@
model_validator,
)

# Host paths that must never be bind-mounted into containers.
# Mounting container runtime sockets allows full host takeover from within
# a container. /proc, /sys, and /dev expose kernel interfaces that enable
# privilege escalation. These restrictions prevent container escape attacks
# triggered via prompt injection or untrusted LLM tool calls.
BLOCKED_MOUNT_SOURCES = frozenset(
{
"/var/run/docker.sock",
"/run/docker.sock",
"/var/run/containerd",
"/run/containerd",
"/var/run/crio",
"/run/crio",
"/proc",
"/sys",
"/dev",
}
)


def _extract_host_paths(
volumes: dict[str, dict[str, str]] | list[str],
) -> list[str]:
"""Return host source paths from a Docker volume mount specification.

Handles both formats accepted by the Docker SDK:
dict: {"/host/path": {"bind": "/container/path", "mode": "rw"}}
list: ["/host/path:/container/path:rw"]
"""
if isinstance(volumes, dict):
return list(volumes.keys())

paths: list[str] = []
for entry in volumes:
# list entries are "source:dest[:mode]"
parts = entry.split(":")
if parts:
paths.append(parts[0])
return paths


def _is_blocked_mount(host_path: str) -> str | None:
"""Return the matched blocked path if host_path resolves to one, else None.

Resolves symlinks so that aliased paths (e.g. /tmp/sock pointing at
/var/run/docker.sock) are still caught.
"""
try:
resolved = os.path.realpath(host_path)
except (OSError, ValueError):
resolved = host_path

for blocked in BLOCKED_MOUNT_SOURCES:
if resolved == blocked or resolved.startswith(blocked + os.sep):
return blocked
return None


class JSONParsingModel(BaseModel):
"""
Expand Down Expand Up @@ -103,6 +161,25 @@ class CreateContainerInput(JSONParsingModel):
volumes: dict[str, dict[str, str]] | list[str] | None = Field(
None, description="Volume mappings"
)

@field_validator("volumes", mode="after")
@classmethod
def _reject_blocked_mounts(
cls, volumes: dict[str, dict[str, str]] | list[str] | None
) -> dict[str, dict[str, str]] | list[str] | None:
if volumes is None:
return volumes

for host_path in _extract_host_paths(volumes):
blocked = _is_blocked_mount(host_path)
if blocked is not None:
raise ValueError(
f"Volume mount source '{host_path}' is blocked. "
f"Mounting '{blocked}' exposes host resources that "
f"allow container escape or privilege escalation."
)
return volumes

labels: dict[str, str] | list[str] | None = Field(
None,
description="Container labels, either as a dictionary or a list of key=value strings",
Expand Down Expand Up @@ -164,6 +241,23 @@ class BuildImageInput(JSONParsingModel):
tag: str = Field(..., description="Image tag")
dockerfile: str | None = Field(None, description="Path to Dockerfile")

@field_validator("path", mode="after")
@classmethod
def _reject_sensitive_build_context(cls, path: str) -> str:
try:
resolved = os.path.realpath(path)
except (OSError, ValueError):
resolved = path

for blocked in BLOCKED_MOUNT_SOURCES:
if resolved == blocked or resolved.startswith(blocked + os.sep):
raise ValueError(
f"Build context path '{path}' is blocked. "
f"'{blocked}' contains sensitive host resources that "
f"would be sent to the Docker daemon as build context."
)
return path


class RemoveImageInput(JSONParsingModel):
image: str = Field(..., description="Image ID or name")
Expand Down
Empty file added tests/__init__.py
Empty file.
97 changes: 97 additions & 0 deletions tests/test_mount_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Tests for blocked volume mount and build context validation."""

import pytest
from pydantic import ValidationError

from mcp_server_docker.input_schemas import (
BLOCKED_MOUNT_SOURCES,
BuildImageInput,
CreateContainerInput,
)


class TestBlockedVolumeMounts:
"""Verify that dangerous host paths are rejected as volume sources."""

@pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES))
def test_dict_format_blocks_dangerous_paths(self, blocked_path: str) -> None:
with pytest.raises(ValidationError, match="blocked"):
CreateContainerInput(
image="alpine:latest",
volumes={blocked_path: {"bind": "/mnt", "mode": "ro"}},
)

@pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES))
def test_list_format_blocks_dangerous_paths(self, blocked_path: str) -> None:
with pytest.raises(ValidationError, match="blocked"):
CreateContainerInput(
image="alpine:latest",
volumes=[f"{blocked_path}:/mnt:ro"],
)

def test_docker_socket_blocked(self) -> None:
with pytest.raises(ValidationError, match="container escape"):
CreateContainerInput(
image="alpine:latest",
volumes={
"/var/run/docker.sock": {
"bind": "/var/run/docker.sock",
"mode": "rw",
}
},
)

def test_subpath_of_blocked_path_also_blocked(self) -> None:
with pytest.raises(ValidationError, match="blocked"):
CreateContainerInput(
image="alpine:latest",
volumes=["/proc/1/root:/mnt:ro"],
)

def test_safe_volume_mount_allowed(self) -> None:
container = CreateContainerInput(
image="nginx:latest",
volumes={"/home/user/app": {"bind": "/app", "mode": "ro"}},
)
assert container.volumes is not None

def test_safe_list_volume_mount_allowed(self) -> None:
container = CreateContainerInput(
image="nginx:latest",
volumes=["/tmp/data:/data:rw"],
)
assert container.volumes is not None

def test_none_volumes_allowed(self) -> None:
container = CreateContainerInput(image="alpine:latest")
assert container.volumes is None

def test_multiple_volumes_one_blocked(self) -> None:
with pytest.raises(ValidationError, match="blocked"):
CreateContainerInput(
image="alpine:latest",
volumes={
"/tmp/safe": {"bind": "/safe", "mode": "ro"},
"/var/run/docker.sock": {
"bind": "/var/run/docker.sock",
"mode": "rw",
},
},
)


class TestBlockedBuildContextPaths:
"""Verify that sensitive paths are rejected as Docker build contexts."""

@pytest.mark.parametrize("blocked_path", sorted(BLOCKED_MOUNT_SOURCES))
def test_blocked_paths_rejected_as_build_context(self, blocked_path: str) -> None:
with pytest.raises(ValidationError, match="blocked"):
BuildImageInput(path=blocked_path, tag="test:latest")

def test_safe_build_context_allowed(self) -> None:
build = BuildImageInput(path="/home/user/project", tag="myapp:latest")
assert build.path == "/home/user/project"

def test_subpath_of_blocked_path_rejected(self) -> None:
with pytest.raises(ValidationError, match="blocked"):
BuildImageInput(path="/proc/self", tag="test:latest")