|
12 | 12 | from mcp.client import Client |
13 | 13 | from mcp.server.context import ServerRequestContext |
14 | 14 | from mcp.server.experimental.request_context import Experimental |
15 | | -from mcp.server.mcpserver import Context, MCPServer |
| 15 | +from mcp.server.mcpserver import Context, MCPServer, ResourceSecurity |
16 | 16 | from mcp.server.mcpserver.exceptions import ToolError |
17 | 17 | from mcp.server.mcpserver.prompts.base import Message, UserMessage |
18 | 18 | from mcp.server.mcpserver.resources import FileResource, FunctionResource |
@@ -159,6 +159,47 @@ async def test_resource_decorator_rejects_malformed_template(self): |
159 | 159 | with pytest.raises(InvalidUriTemplate, match="Unclosed expression"): |
160 | 160 | mcp.resource("file://{name") |
161 | 161 |
|
| 162 | + async def test_resource_security_default_rejects_traversal(self): |
| 163 | + mcp = MCPServer() |
| 164 | + |
| 165 | + @mcp.resource("data://items/{name}") |
| 166 | + def get_item(name: str) -> str: |
| 167 | + return f"item:{name}" |
| 168 | + |
| 169 | + async with Client(mcp) as client: |
| 170 | + # ".." as a path component is rejected by default policy |
| 171 | + with pytest.raises(MCPError, match="Unknown resource"): |
| 172 | + await client.read_resource("data://items/..") |
| 173 | + |
| 174 | + async def test_resource_security_per_resource_override(self): |
| 175 | + mcp = MCPServer() |
| 176 | + |
| 177 | + @mcp.resource( |
| 178 | + "git://diff/{+range}", |
| 179 | + security=ResourceSecurity(exempt_params=frozenset({"range"})), |
| 180 | + ) |
| 181 | + def git_diff(range: str) -> str: |
| 182 | + return f"diff:{range}" |
| 183 | + |
| 184 | + async with Client(mcp) as client: |
| 185 | + # "../foo" would be rejected by default, but "range" is exempt |
| 186 | + result = await client.read_resource("git://diff/../foo") |
| 187 | + assert isinstance(result.contents[0], TextResourceContents) |
| 188 | + assert result.contents[0].text == "diff:../foo" |
| 189 | + |
| 190 | + async def test_resource_security_server_wide_override(self): |
| 191 | + mcp = MCPServer(resource_security=ResourceSecurity(reject_path_traversal=False)) |
| 192 | + |
| 193 | + @mcp.resource("data://items/{name}") |
| 194 | + def get_item(name: str) -> str: |
| 195 | + return f"item:{name}" |
| 196 | + |
| 197 | + async with Client(mcp) as client: |
| 198 | + # Server-wide policy disabled traversal check; ".." now allowed |
| 199 | + result = await client.read_resource("data://items/..") |
| 200 | + assert isinstance(result.contents[0], TextResourceContents) |
| 201 | + assert result.contents[0].text == "item:.." |
| 202 | + |
162 | 203 |
|
163 | 204 | class TestDnsRebindingProtection: |
164 | 205 | """Tests for automatic DNS rebinding protection on localhost. |
|
0 commit comments