diff --git a/.github/workflows/branch-e2e.yml b/.github/workflows/branch-e2e.yml index 8a9e7fe29..46d9528b0 100644 --- a/.github/workflows/branch-e2e.yml +++ b/.github/workflows/branch-e2e.yml @@ -111,6 +111,16 @@ jobs: with: image-tag: ${{ github.sha }} + mcp-conformance: + needs: [pr_metadata, build-gateway, build-supervisor] + if: needs.pr_metadata.outputs.should_run == 'true' && needs.pr_metadata.outputs.run_core_e2e == 'true' + permissions: + contents: read + packages: read + uses: ./.github/workflows/mcp-conformance.yml + with: + image-tag: ${{ github.sha }} + kubernetes-ha-e2e: needs: [pr_metadata, build-gateway, build-supervisor] if: needs.pr_metadata.outputs.should_run == 'true' && needs.pr_metadata.outputs.run_kubernetes_ha_e2e == 'true' @@ -126,7 +136,7 @@ jobs: core-e2e-result: name: Core E2E result - needs: [pr_metadata, build-gateway, build-supervisor, e2e, kubernetes-e2e] + needs: [pr_metadata, build-gateway, build-supervisor, e2e, kubernetes-e2e, mcp-conformance] if: always() && needs.pr_metadata.outputs.should_run == 'true' && needs.pr_metadata.outputs.run_core_e2e == 'true' runs-on: ubuntu-latest steps: @@ -136,6 +146,7 @@ jobs: BUILD_SUPERVISOR_RESULT: ${{ needs.build-supervisor.result }} E2E_RESULT: ${{ needs.e2e.result }} KUBERNETES_E2E_RESULT: ${{ needs.kubernetes-e2e.result }} + MCP_CONFORMANCE_RESULT: ${{ needs.mcp-conformance.result }} run: | set -euo pipefail failed=0 @@ -143,7 +154,8 @@ jobs: "build-gateway:$BUILD_GATEWAY_RESULT" \ "build-supervisor:$BUILD_SUPERVISOR_RESULT" \ "e2e:$E2E_RESULT" \ - "kubernetes-e2e:$KUBERNETES_E2E_RESULT"; do + "kubernetes-e2e:$KUBERNETES_E2E_RESULT" \ + "mcp-conformance:$MCP_CONFORMANCE_RESULT"; do name="${item%%:*}" result="${item#*:}" if [ "$result" != "success" ]; then diff --git a/.github/workflows/mcp-conformance.yml b/.github/workflows/mcp-conformance.yml new file mode 100644 index 000000000..853c3dd8e --- /dev/null +++ b/.github/workflows/mcp-conformance.yml @@ -0,0 +1,99 @@ +name: MCP Conformance Test + +on: + workflow_call: + inputs: + image-tag: + description: "Image tag to test (typically the commit SHA)" + required: true + type: string + runner: + description: "GitHub Actions runner label" + required: false + type: string + default: "linux-amd64-cpu8" + checkout-ref: + description: "Git ref to check out for test inputs (defaults to the workflow SHA)" + required: false + type: string + default: "" + +permissions: + contents: read + packages: read + +jobs: + mcp-conformance: + name: MCP Conformance + runs-on: ${{ inputs.runner }} + timeout-minutes: 40 + defaults: + run: + shell: bash + container: + image: ghcr.io/nvidia/openshell/ci:latest + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --privileged + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - /home/runner/_work:/home/runner/_work + env: + MISE_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + IMAGE_TAG: ${{ inputs.image-tag }} + OPENSHELL_REGISTRY: ghcr.io/nvidia/openshell + OPENSHELL_REGISTRY_HOST: ghcr.io + OPENSHELL_REGISTRY_NAMESPACE: nvidia/openshell + OPENSHELL_REGISTRY_USERNAME: ${{ github.actor }} + OPENSHELL_REGISTRY_PASSWORD: ${{ secrets.GITHUB_TOKEN }} + OPENSHELL_SUPERVISOR_IMAGE: ${{ format('ghcr.io/nvidia/openshell/supervisor:{0}', inputs.image-tag) }} + OPENSHELL_MCP_CONFORMANCE_CLIENT_IMAGE: openshell-mcp-conformance-client:${{ github.sha }} + steps: + - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6 + with: + ref: ${{ inputs['checkout-ref'] || github.sha }} + + - name: Check out MCP conformance tests + uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6 + with: + repository: modelcontextprotocol/conformance + ref: v0.1.16 + path: .cache/mcp-conformance + + - name: Set up Node.js + uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6 + with: + node-version: "22" + cache: npm + cache-dependency-path: .cache/mcp-conformance/package-lock.json + + - name: Build MCP conformance runner + working-directory: .cache/mcp-conformance + run: | + npm ci + npm run build + + - name: Log in to GHCR with Docker + run: echo "${OPENSHELL_REGISTRY_PASSWORD}" | docker login ghcr.io -u "${{ github.actor }}" --password-stdin + + - name: Build OpenShell e2e binaries + run: | + cargo build -p openshell-server --bin openshell-gateway --features openshell-core/dev-settings + cargo build -p openshell-cli --bin openshell --features openshell-core/dev-settings + + - name: Build MCP conformance client image + run: docker build --pull -f e2e/mcp-conformance/Dockerfile.client -t "${OPENSHELL_MCP_CONFORMANCE_CLIENT_IMAGE}" .cache/mcp-conformance + + - name: Run MCP conformance through OpenShell + run: | + set -euo pipefail + for scenario in initialize tools_call; do + echo "::group::MCP conformance: ${scenario}" + node .cache/mcp-conformance/dist/index.js client \ + --command "bash e2e/mcp-conformance/client-through-openshell.sh" \ + --scenario "${scenario}" \ + --expected-failures e2e/mcp-conformance/expected-failures.yml \ + --timeout 900000 + echo "::endgroup::" + done diff --git a/architecture/sandbox.md b/architecture/sandbox.md index e60b727a5..eb78eb6ad 100644 --- a/architecture/sandbox.md +++ b/architecture/sandbox.md @@ -49,6 +49,14 @@ paths, such as proxy support files or GPU device paths when a GPU is present. All ordinary agent egress is routed through the sandbox proxy. The proxy identifies the calling binary, checks trust-on-first-use binary identity, rejects unsafe internal destinations, and evaluates the active policy. +For inspected HTTP traffic, the proxy can enforce REST method/path rules, +WebSocket upgrade and text-message rules, GraphQL operation rules, and +JSON-RPC method and params rules on sandbox-to-server request bodies. JSON-RPC +request inspection buffers up to the endpoint `json_rpc.max_body_bytes` limit. +Literal dotted keys in JSON-RPC params are rejected before policy evaluation so +they cannot be confused with flattened nested selector paths. +JSON-RPC responses and server-to-client MCP messages on response or SSE streams +are relayed but are not currently parsed for policy enforcement. `https://inference.local` is special. It bypasses OPA network policy and is handled by the inference interception path: diff --git a/crates/openshell-cli/src/policy_update.rs b/crates/openshell-cli/src/policy_update.rs index 57656b878..22363c920 100644 --- a/crates/openshell-cli/src/policy_update.rs +++ b/crates/openshell-cli/src/policy_update.rs @@ -205,6 +205,8 @@ fn group_allow_rules(specs: &[String]) -> Result Result, #[serde(default, skip_serializing_if = "is_zero_u32")] graphql_max_body_bytes: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + json_rpc: Option, } // Signature dictated by serde's `skip_serializing_if`, which requires `&T`. @@ -149,6 +151,17 @@ fn is_zero_u32(v: &u32) -> bool { *v == 0 } +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct JsonRpcConfigDef { + #[serde(default, skip_serializing_if = "is_zero_u32")] + max_body_bytes: u32, +} + +fn json_rpc_config_from_proto(max_body_bytes: u32) -> Option { + (max_body_bytes > 0).then_some(JsonRpcConfigDef { max_body_bytes }) +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct GraphqlOperationDef { @@ -183,6 +196,10 @@ struct L7AllowDef { operation_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] fields: Vec, + #[serde(default, skip_serializing_if = "String::is_empty")] + rpc_method: String, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + params: BTreeMap, } #[derive(Debug, Serialize, Deserialize)] @@ -216,6 +233,10 @@ struct L7DenyRuleDef { operation_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] fields: Vec, + #[serde(default, skip_serializing_if = "String::is_empty")] + rpc_method: String, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + params: BTreeMap, } #[derive(Debug, Serialize, Deserialize)] @@ -232,6 +253,24 @@ struct NetworkBinaryDef { // YAML → proto conversion // --------------------------------------------------------------------------- +fn matcher_def_to_proto(matcher: QueryMatcherDef) -> L7QueryMatcher { + match matcher { + QueryMatcherDef::Glob(glob) => L7QueryMatcher { glob, any: vec![] }, + QueryMatcherDef::Any(any) => L7QueryMatcher { + glob: String::new(), + any: any.any, + }, + } +} + +fn matcher_proto_to_def(matcher: L7QueryMatcher) -> QueryMatcherDef { + if matcher.any.is_empty() { + QueryMatcherDef::Glob(matcher.glob) + } else { + QueryMatcherDef::Any(QueryAnyDef { any: matcher.any }) + } +} + fn to_proto(raw: PolicyFile) -> SandboxPolicy { let network_policies = raw .network_policies @@ -276,21 +315,21 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { operation_type: r.allow.operation_type, operation_name: r.allow.operation_name, fields: r.allow.fields, + rpc_method: r.allow.rpc_method, query: r .allow .query .into_iter() .map(|(key, matcher)| { - let proto = match matcher { - QueryMatcherDef::Glob(glob) => { - L7QueryMatcher { glob, any: vec![] } - } - QueryMatcherDef::Any(any) => L7QueryMatcher { - glob: String::new(), - any: any.any, - }, - }; - (key, proto) + (key, matcher_def_to_proto(matcher)) + }) + .collect(), + params: r + .allow + .params + .into_iter() + .map(|(key, matcher)| { + (key, matcher_def_to_proto(matcher)) }) .collect(), }), @@ -307,21 +346,16 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { operation_type: d.operation_type, operation_name: d.operation_name, fields: d.fields, + rpc_method: d.rpc_method, query: d .query .into_iter() - .map(|(key, matcher)| { - let proto = match matcher { - QueryMatcherDef::Glob(glob) => { - L7QueryMatcher { glob, any: vec![] } - } - QueryMatcherDef::Any(any) => L7QueryMatcher { - glob: String::new(), - any: any.any, - }, - }; - (key, proto) - }) + .map(|(key, matcher)| (key, matcher_def_to_proto(matcher))) + .collect(), + params: d + .params + .into_iter() + .map(|(key, matcher)| (key, matcher_def_to_proto(matcher))) .collect(), }) .collect(), @@ -347,6 +381,10 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, + json_rpc_max_body_bytes: e + .json_rpc + .as_ref() + .map_or(0, |config| config.max_body_bytes), } }) .collect(), @@ -448,18 +486,19 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { operation_type: a.operation_type, operation_name: a.operation_name, fields: a.fields, + rpc_method: a.rpc_method, query: a .query .into_iter() .map(|(key, matcher)| { - let yaml_matcher = if matcher.any.is_empty() { - QueryMatcherDef::Glob(matcher.glob) - } else { - QueryMatcherDef::Any(QueryAnyDef { - any: matcher.any, - }) - }; - (key, yaml_matcher) + (key, matcher_proto_to_def(matcher)) + }) + .collect(), + params: a + .params + .into_iter() + .map(|(key, matcher)| { + (key, matcher_proto_to_def(matcher)) }) .collect(), }, @@ -477,18 +516,19 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { operation_type: d.operation_type.clone(), operation_name: d.operation_name.clone(), fields: d.fields.clone(), + rpc_method: d.rpc_method.clone(), query: d .query .iter() .map(|(key, matcher)| { - let yaml_matcher = if matcher.any.is_empty() { - QueryMatcherDef::Glob(matcher.glob.clone()) - } else { - QueryMatcherDef::Any(QueryAnyDef { - any: matcher.any.clone(), - }) - }; - (key.clone(), yaml_matcher) + (key.clone(), matcher_proto_to_def(matcher.clone())) + }) + .collect(), + params: d + .params + .iter() + .map(|(key, matcher)| { + (key.clone(), matcher_proto_to_def(matcher.clone())) }) .collect(), }) @@ -512,6 +552,7 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, + json_rpc: json_rpc_config_from_proto(e.json_rpc_max_body_bytes), } }) .collect(), @@ -1699,6 +1740,60 @@ network_policies: assert_eq!(ep.deny_rules[0].fields, vec!["deleteRepository"]); } + #[test] + fn round_trip_preserves_json_rpc_max_body_bytes() { + let yaml = r" +version: 1 +network_policies: + mcp: + name: mcp + endpoints: + - host: mcp.example.com + port: 443 + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + binaries: + - path: /usr/bin/curl +"; + let proto1 = parse_sandbox_policy(yaml).expect("parse failed"); + let yaml_out = serialize_sandbox_policy(&proto1).expect("serialize failed"); + let proto2 = parse_sandbox_policy(&yaml_out).expect("re-parse failed"); + + let ep = &proto2.network_policies["mcp"].endpoints[0]; + assert_eq!(ep.protocol, "json-rpc"); + assert_eq!(ep.json_rpc_max_body_bytes, 131_072); + } + + #[test] + fn parse_rejects_unsupported_json_rpc_config_fields() { + let yaml = r" +version: 1 +network_policies: + mcp: + endpoints: + - host: mcp.example.com + port: 443 + protocol: json-rpc + json_rpc: + max_body_bytes: 131072 + on_parse_error: deny + batch_policy: all + access: full + binaries: + - path: /usr/bin/curl +"; + + assert!( + parse_sandbox_policy(yaml).is_err(), + "unsupported json_rpc fields must not be silently accepted" + ); + } + #[test] fn round_trip_preserves_websocket_credential_rewrite() { let yaml = r" diff --git a/crates/openshell-policy/src/merge.rs b/crates/openshell-policy/src/merge.rs index f191cd272..6be185ca0 100644 --- a/crates/openshell-policy/src/merge.rs +++ b/crates/openshell-policy/src/merge.rs @@ -747,6 +747,8 @@ fn expand_access_preset(protocol: &str, access: &str) -> Option> { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::default(), }), }) .collect(), @@ -961,6 +963,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::default(), }), } } diff --git a/crates/openshell-providers/src/profiles.rs b/crates/openshell-providers/src/profiles.rs index d2a35ca80..f94c09ef9 100644 --- a/crates/openshell-providers/src/profiles.rs +++ b/crates/openshell-providers/src/profiles.rs @@ -200,6 +200,8 @@ pub struct EndpointProfile { pub graphql_persisted_queries: HashMap, #[serde(default, skip_serializing_if = "is_zero")] pub graphql_max_body_bytes: u32, + #[serde(default, skip_serializing_if = "is_zero")] + pub json_rpc_max_body_bytes: u32, #[serde(default, skip_serializing_if = "String::is_empty")] pub path: String, } @@ -743,6 +745,7 @@ fn endpoint_to_proto(endpoint: &EndpointProfile) -> NetworkEndpoint { .map(|(name, operation)| (name.clone(), graphql_operation_to_proto(operation))) .collect(), graphql_max_body_bytes: endpoint.graphql_max_body_bytes, + json_rpc_max_body_bytes: endpoint.json_rpc_max_body_bytes, path: endpoint.path.clone(), } } @@ -773,6 +776,7 @@ fn endpoint_from_proto(endpoint: &NetworkEndpoint) -> EndpointProfile { .map(|(name, operation)| (name.clone(), graphql_operation_from_proto(operation))) .collect(), graphql_max_body_bytes: endpoint.graphql_max_body_bytes, + json_rpc_max_body_bytes: endpoint.json_rpc_max_body_bytes, path: endpoint.path.clone(), } } @@ -816,6 +820,8 @@ fn allow_to_proto(allow: &L7AllowProfile) -> L7Allow { operation_type: allow.operation_type.clone(), operation_name: allow.operation_name.clone(), fields: allow.fields.clone(), + rpc_method: String::new(), + params: HashMap::new(), } } @@ -848,6 +854,8 @@ fn deny_rule_to_proto(rule: &L7DenyRuleProfile) -> L7DenyRule { operation_type: rule.operation_type.clone(), operation_name: rule.operation_name.clone(), fields: rule.fields.clone(), + rpc_method: String::new(), + params: HashMap::new(), } } diff --git a/crates/openshell-sandbox/src/mechanistic_mapper.rs b/crates/openshell-sandbox/src/mechanistic_mapper.rs index ba7c51de9..bbe7b93b8 100644 --- a/crates/openshell-sandbox/src/mechanistic_mapper.rs +++ b/crates/openshell-sandbox/src/mechanistic_mapper.rs @@ -355,6 +355,8 @@ fn build_l7_rules(samples: &HashMap<(String, String), u32>) -> Vec { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::new(), }), }); } diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 2e2210f44..ebae4809a 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -8049,6 +8049,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }; @@ -8444,6 +8446,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }]; @@ -8590,6 +8594,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }; diff --git a/crates/openshell-supervisor-network/data/sandbox-policy.rego b/crates/openshell-supervisor-network/data/sandbox-policy.rego index afcd28863..38070911c 100644 --- a/crates/openshell-supervisor-network/data/sandbox-policy.rego +++ b/crates/openshell-supervisor-network/data/sandbox-policy.rego @@ -274,6 +274,15 @@ request_denied_for_endpoint(request, endpoint) if { command_matches(request.command, deny_rule.command) } +# --- L7 deny rule matching: JSON-RPC method + params --- + +request_denied_for_endpoint(request, endpoint) if { + some deny_rule + deny_rule := endpoint.deny_rules[_] + deny_rule.rpc_method + jsonrpc_rule_matches(request, deny_rule) +} + # --- L7 deny rule matching: GraphQL operation --- request_denied_for_endpoint(request, endpoint) if { @@ -417,6 +426,15 @@ request_allowed_for_endpoint(request, endpoint) if { command_matches(request.command, rule.allow.command) } +# --- L7 rule matching: JSON-RPC method --- + +request_allowed_for_endpoint(request, endpoint) if { + some rule + rule := endpoint.rules[_] + rule.allow.rpc_method + jsonrpc_rule_matches(request, rule.allow) +} + # --- L7 rule matching: GraphQL operation --- request_allowed_for_endpoint(request, endpoint) if { @@ -638,6 +656,35 @@ query_value_matches(value, matcher) if { glob.match(any_patterns[i], [], value) } +# JSON-RPC method and params matching. The sandbox flattens object params into +# dot-separated keys before policy evaluation, e.g. arguments.scope. +jsonrpc_rule_matches(request, rule) if { + jsonrpc := object.get(request, "jsonrpc", {}) + method := object.get(jsonrpc, "method", null) + method != null + glob.match(rule.rpc_method, [], method) + jsonrpc_params_match(jsonrpc, rule) +} + +jsonrpc_params_match(jsonrpc, rule) if { + param_rules := object.get(rule, "params", {}) + not jsonrpc_param_mismatch(jsonrpc, param_rules) +} + +jsonrpc_param_mismatch(jsonrpc, param_rules) if { + some key + matcher := param_rules[key] + not jsonrpc_param_key_matches(jsonrpc, key, matcher) +} + +jsonrpc_param_key_matches(jsonrpc, key, matcher) if { + params := object.get(jsonrpc, "params", {}) + value := object.get(params, key, null) + value != null + is_string(value) + query_value_matches(value, matcher) +} + # SQL command matching: "*" matches any; otherwise case-insensitive. command_matches(_, "*") if true diff --git a/crates/openshell-supervisor-network/src/l7/graphql.rs b/crates/openshell-supervisor-network/src/l7/graphql.rs index 82c35720e..12979f0b1 100644 --- a/crates/openshell-supervisor-network/src/l7/graphql.rs +++ b/crates/openshell-supervisor-network/src/l7/graphql.rs @@ -810,6 +810,7 @@ network_policies: target: req.target, query_params: req.query_params, graphql: Some(info), + jsonrpc: None, }; let tunnel_engine = engine diff --git a/crates/openshell-supervisor-network/src/l7/http.rs b/crates/openshell-supervisor-network/src/l7/http.rs new file mode 100644 index 000000000..66269f6ba --- /dev/null +++ b/crates/openshell-supervisor-network/src/l7/http.rs @@ -0,0 +1,199 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared HTTP/1.1 request helpers for L7 protocols carried over HTTP. + +use crate::l7::provider::{BodyLength, L7Request}; +use miette::{IntoDiagnostic, Result, miette}; +use tokio::io::{AsyncRead, AsyncReadExt}; + +const READ_BUF_SIZE: usize = 8192; + +pub async fn read_body_for_inspection( + client: &mut C, + request: &mut L7Request, + max_body_bytes: usize, +) -> Result> { + let header_end = request + .raw_header + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(request.raw_header.len(), |p| p + 4); + let overflow = request.raw_header[header_end..].to_vec(); + + match request.body_length { + BodyLength::None => Ok(Vec::new()), + BodyLength::ContentLength(len) => { + let len = usize::try_from(len) + .map_err(|_| miette!("HTTP request body length exceeds platform limit"))?; + if len > max_body_bytes { + return Err(miette!( + "HTTP request body exceeds {max_body_bytes} byte inspection limit" + )); + } + if overflow.len() > len { + return Err(miette!( + "HTTP request contains more body bytes than Content-Length" + )); + } + let remaining = len - overflow.len(); + let mut body = overflow; + if remaining > 0 { + let start = body.len(); + body.resize(len, 0); + client + .read_exact(&mut body[start..]) + .await + .into_diagnostic()?; + } + request.raw_header.truncate(header_end); + request.raw_header.extend_from_slice(&body); + Ok(body) + } + BodyLength::Chunked => { + let body = read_chunked_body_for_inspection( + client, + request, + header_end, + overflow, + max_body_bytes, + ) + .await?; + normalize_chunked_request_to_content_length(request, header_end, &body)?; + Ok(body) + } + } +} + +fn normalize_chunked_request_to_content_length( + request: &mut L7Request, + header_end: usize, + body: &[u8], +) -> Result<()> { + let header_str = std::str::from_utf8(&request.raw_header[..header_end]) + .map_err(|_| miette!("HTTP headers contain invalid UTF-8"))?; + let header_str = header_str + .strip_suffix("\r\n\r\n") + .ok_or_else(|| miette!("HTTP headers missing terminator"))?; + + let mut normalized = Vec::with_capacity(header_str.len() + body.len() + 32); + for (idx, line) in header_str.split("\r\n").enumerate() { + if idx > 0 { + let name = line + .split_once(':') + .map(|(name, _)| name.trim().to_ascii_lowercase()); + if matches!( + name.as_deref(), + Some("transfer-encoding" | "content-length" | "trailer") + ) { + continue; + } + } + normalized.extend_from_slice(line.as_bytes()); + normalized.extend_from_slice(b"\r\n"); + } + normalized.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()); + normalized.extend_from_slice(body); + + request.raw_header = normalized; + request.body_length = BodyLength::ContentLength(body.len() as u64); + Ok(()) +} + +async fn read_chunked_body_for_inspection( + client: &mut C, + request: &mut L7Request, + header_end: usize, + overflow: Vec, + max_body_bytes: usize, +) -> Result> { + let mut raw = overflow; + let mut decoded = Vec::new(); + let mut pos = 0usize; + + loop { + let size_line_end = loop { + if let Some(end) = find_crlf(&raw, pos) { + break end; + } + read_more(client, &mut raw, max_body_bytes).await?; + }; + let size_line = std::str::from_utf8(&raw[pos..size_line_end]) + .into_diagnostic() + .map_err(|_| miette!("Invalid UTF-8 in HTTP chunk-size line"))?; + let size_token = size_line + .split(';') + .next() + .map(str::trim) + .unwrap_or_default(); + let chunk_size = usize::from_str_radix(size_token, 16) + .into_diagnostic() + .map_err(|_| miette!("Invalid HTTP chunk size token: {size_token:?}"))?; + pos = size_line_end + 2; + + if decoded.len().saturating_add(chunk_size) > max_body_bytes { + return Err(miette!( + "HTTP request body exceeds {max_body_bytes} byte inspection limit" + )); + } + + if chunk_size == 0 { + loop { + let trailer_end = loop { + if let Some(end) = find_crlf(&raw, pos) { + break end; + } + read_more(client, &mut raw, max_body_bytes).await?; + }; + let trailer_line = &raw[pos..trailer_end]; + pos = trailer_end + 2; + if trailer_line.is_empty() { + request.raw_header.truncate(header_end); + request.raw_header.extend_from_slice(&raw[..pos]); + return Ok(decoded); + } + } + } + + let chunk_end = pos + .checked_add(chunk_size) + .ok_or_else(|| miette!("HTTP chunk size overflow"))?; + let chunk_with_crlf_end = chunk_end + .checked_add(2) + .ok_or_else(|| miette!("HTTP chunk size overflow"))?; + while raw.len() < chunk_with_crlf_end { + read_more(client, &mut raw, max_body_bytes).await?; + } + decoded.extend_from_slice(&raw[pos..chunk_end]); + if raw.get(chunk_end..chunk_with_crlf_end) != Some(&b"\r\n"[..]) { + return Err(miette!("HTTP chunk payload missing terminating CRLF")); + } + pos = chunk_with_crlf_end; + } +} + +async fn read_more( + client: &mut C, + raw: &mut Vec, + max_body_bytes: usize, +) -> Result<()> { + if raw.len() > max_body_bytes.saturating_mul(2).max(max_body_bytes) { + return Err(miette!( + "HTTP chunked request body exceeds inspection framing limit" + )); + } + let mut buf = [0u8; READ_BUF_SIZE]; + let n = client.read(&mut buf).await.into_diagnostic()?; + if n == 0 { + return Err(miette!("HTTP chunked body ended before terminator")); + } + raw.extend_from_slice(&buf[..n]); + Ok(()) +} + +fn find_crlf(buf: &[u8], start: usize) -> Option { + buf.get(start..)? + .windows(2) + .position(|w| w == b"\r\n") + .map(|p| start + p) +} diff --git a/crates/openshell-supervisor-network/src/l7/jsonrpc.rs b/crates/openshell-supervisor-network/src/l7/jsonrpc.rs new file mode 100644 index 000000000..a27297811 --- /dev/null +++ b/crates/openshell-supervisor-network/src/l7/jsonrpc.rs @@ -0,0 +1,372 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! JSON-RPC 2.0 over HTTP L7 inspection. + +use miette::Result; +use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; +use std::collections::HashMap; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::l7::provider::{L7Provider, L7Request}; + +pub const DEFAULT_MAX_BODY_BYTES: usize = 64 * 1024; + +pub struct JsonRpcHttpRequest { + pub request: L7Request, + pub info: JsonRpcRequestInfo, +} + +pub(crate) async fn parse_jsonrpc_http_request( + client: &mut C, + max_body_bytes: usize, + canonicalize_options: crate::l7::path::CanonicalizeOptions, +) -> Result> { + let provider = crate::l7::rest::RestProvider::with_options(canonicalize_options); + let Some(mut request) = provider.parse_request(client).await? else { + return Ok(None); + }; + let body = + crate::l7::http::read_body_for_inspection(client, &mut request, max_body_bytes).await?; + let info = parse_jsonrpc_body(&body); + Ok(Some(JsonRpcHttpRequest { request, info })) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcRequestInfo { + pub calls: Vec, + pub is_batch: bool, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcCallInfo { + pub method: String, + pub params: HashMap, +} + +impl JsonRpcRequestInfo { + pub(crate) fn params_sha256(&self) -> Option { + if self.is_batch { + if self.calls.is_empty() || self.calls.iter().all(|call| call.params.is_empty()) { + return None; + } + let canonical_params = self + .calls + .iter() + .map(|call| canonical_params_map(&call.params)) + .collect::>(); + return Some(sha256_json(&canonical_params)); + } + + let call = self.calls.first()?; + if call.params.is_empty() { + return None; + } + Some(sha256_json(&canonical_params_map(&call.params))) + } +} +/// Parse a JSON-RPC 2.0 request body and extract the `method` field. +/// +/// Returns an info struct with `method` set on success, or `error` set if the +/// body is not valid JSON-RPC 2.0. +pub fn parse_jsonrpc_body(body: &[u8]) -> JsonRpcRequestInfo { + let Ok(value) = serde_json::from_slice::(body) else { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: false, + error: Some("invalid JSON".to_string()), + }; + }; + + if let serde_json::Value::Array(items) = value { + if items.is_empty() { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: true, + error: Some("empty batch".to_string()), + }; + } + let mut calls = Vec::new(); + for item in &items { + let call = match parse_jsonrpc_call(item) { + Ok(call) => call, + Err(error) => { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: true, + error: Some(format!("batch item invalid: {error}")), + }; + } + }; + calls.push(call); + } + return JsonRpcRequestInfo { + calls, + is_batch: true, + error: None, + }; + } + + let call = match parse_jsonrpc_call(&value) { + Ok(call) => call, + Err(error) => { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: false, + error: Some(error), + }; + } + }; + JsonRpcRequestInfo { + calls: vec![call], + is_batch: false, + error: None, + } +} + +fn parse_jsonrpc_call(value: &serde_json::Value) -> std::result::Result { + let version = value + .get("jsonrpc") + .and_then(|v| v.as_str()) + .ok_or_else(|| "missing or non-string 'jsonrpc' field".to_string())?; + if version != "2.0" { + return Err(format!("unsupported JSON-RPC version '{version}'")); + } + let method = value + .get("method") + .and_then(|m| m.as_str()) + .ok_or_else(|| "missing or non-string 'method' field".to_string())?; + let params = value + .get("params") + .map_or_else(|| Ok(HashMap::new()), flatten_jsonrpc_params)?; + Ok(JsonRpcCallInfo { + method: method.to_string(), + params, + }) +} + +fn flatten_jsonrpc_params( + value: &serde_json::Value, +) -> std::result::Result, String> { + let mut params = HashMap::new(); + flatten_json_value("", value, &mut params)?; + Ok(params) +} + +fn canonical_params_map(params: &HashMap) -> BTreeMap { + params + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect() +} + +fn sha256_json(value: &impl serde::Serialize) -> String { + let encoded = serde_json::to_vec(value).expect("canonical JSON-RPC params should serialize"); + hex::encode(Sha256::digest(&encoded)) +} + +fn flatten_json_value( + prefix: &str, + value: &serde_json::Value, + out: &mut HashMap, +) -> std::result::Result<(), String> { + match value { + serde_json::Value::Object(map) => { + for (key, child) in map { + if key.contains('.') { + return Err(format!( + "ambiguous dotted params key '{key}' is not allowed" + )); + } + let next = if prefix.is_empty() { + key.clone() + } else { + format!("{prefix}.{key}") + }; + flatten_json_value(&next, child, out)?; + } + } + serde_json::Value::String(s) if !prefix.is_empty() => { + insert_flattened_param(out, prefix, s.clone())?; + } + serde_json::Value::Number(n) if !prefix.is_empty() => { + insert_flattened_param(out, prefix, n.to_string())?; + } + serde_json::Value::Bool(b) if !prefix.is_empty() => { + insert_flattened_param(out, prefix, b.to_string())?; + } + _ => {} + } + Ok(()) +} + +fn insert_flattened_param( + out: &mut HashMap, + key: &str, + value: String, +) -> std::result::Result<(), String> { + if out.insert(key.to_string(), value).is_some() { + return Err(format!("ambiguous params key collision at '{key}'")); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_method_from_request_body() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}"#; + let info = parse_jsonrpc_body(body); + assert_eq!( + info.calls.first().map(|call| call.method.as_str()), + Some("initialize") + ); + assert_eq!(info.calls.len(), 1); + assert!(!info.is_batch); + assert!(info.error.is_none()); + } + + #[test] + fn flattens_object_params_for_policy_matching() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#; + let info = parse_jsonrpc_body(body); + let params = &info.calls.first().expect("single request call").params; + assert_eq!( + params.get("name").map(String::as_str), + Some("submit_report") + ); + assert_eq!( + params.get("arguments.scope").map(String::as_str), + Some("workspace/main") + ); + } + + #[test] + fn rejects_literal_dotted_param_keys() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"arguments.scope":"workspace/other","arguments":{"scope":"workspace/main"}}}"#; + let info = parse_jsonrpc_body(body); + + assert!(info.calls.is_empty()); + assert!( + info.error + .as_deref() + .is_some_and(|error| error.contains("ambiguous dotted params key")), + "expected dotted params key error, got {info:?}" + ); + } + + #[test] + fn rejects_requests_missing_jsonrpc_version() { + let body = br#"{"id":1,"method":"tools/list"}"#; + let info = parse_jsonrpc_body(body); + + assert!(info.calls.is_empty()); + assert_eq!( + info.error.as_deref(), + Some("missing or non-string 'jsonrpc' field") + ); + } + + #[test] + fn rejects_batch_items_missing_jsonrpc_version() { + let body = br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#; + let info = parse_jsonrpc_body(body); + + assert!(info.calls.is_empty()); + assert!(info.is_batch); + assert_eq!( + info.error.as_deref(), + Some("batch item invalid: missing or non-string 'jsonrpc' field") + ); + } + + #[test] + fn rejects_unsupported_jsonrpc_version() { + let body = br#"{"jsonrpc":"1.0","id":1,"method":"tools/list"}"#; + let info = parse_jsonrpc_body(body); + + assert!(info.calls.is_empty()); + assert_eq!( + info.error.as_deref(), + Some("unsupported JSON-RPC version '1.0'") + ); + } + + #[test] + fn detects_flattened_param_collisions() { + let mut params = HashMap::from([("arguments.scope".to_string(), "first".to_string())]); + + let error = insert_flattened_param(&mut params, "arguments.scope", "second".to_string()) + .expect_err("duplicate flattened key should be ambiguous"); + + assert!(error.contains("ambiguous params key collision")); + } + + #[test] + fn parses_valid_batch_without_error() { + let body = br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#; + let info = parse_jsonrpc_body(body); + assert!(info.error.is_none()); + assert!(info.is_batch); + assert_eq!(info.calls.len(), 2); + assert_eq!(info.calls[0].method, "tools/list"); + assert_eq!(info.calls[1].method, "tools/call"); + assert_eq!( + info.calls[1].params.get("name").map(String::as_str), + Some("read_status") + ); + } + + #[test] + fn params_digest_is_canonical_and_redacted() { + let first = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#, + ); + let reordered = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"arguments":{"scope":"workspace/main"},"name":"submit_report"}}"#, + ); + let changed = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/other"}}}"#, + ); + + let digest = first.params_sha256().expect("params digest"); + assert_eq!(Some(digest.as_str()), reordered.params_sha256().as_deref()); + assert_ne!(Some(digest.as_str()), changed.params_sha256().as_deref()); + assert_eq!(digest.len(), 64); + assert!(digest.chars().all(|c| c.is_ascii_hexdigit())); + assert!(!digest.contains("workspace/main")); + assert!(!digest.contains("submit_report")); + } + + #[test] + fn batch_params_digest_covers_call_params_without_raw_values() { + let batch = parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}} + ]"#, + ); + let empty_batch = parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"initialize"} + ]"#, + ); + + let digest = batch.params_sha256().expect("batch params digest"); + assert_eq!(digest.len(), 64); + assert!(digest.chars().all(|c| c.is_ascii_hexdigit())); + assert!(!digest.contains("blocked_action")); + assert!(empty_batch.params_sha256().is_none()); + } +} diff --git a/crates/openshell-supervisor-network/src/l7/mod.rs b/crates/openshell-supervisor-network/src/l7/mod.rs index 802058ec2..563094dd0 100644 --- a/crates/openshell-supervisor-network/src/l7/mod.rs +++ b/crates/openshell-supervisor-network/src/l7/mod.rs @@ -9,7 +9,9 @@ //! evaluated against OPA policy, and either forwarded or denied. pub mod graphql; +pub(crate) mod http; pub mod inference; +pub mod jsonrpc; pub mod path; pub mod provider; pub mod relay; @@ -25,6 +27,7 @@ pub enum L7Protocol { Websocket, Graphql, Sql, + JsonRpc, } impl L7Protocol { @@ -34,6 +37,7 @@ impl L7Protocol { "websocket" => Some(Self::Websocket), "graphql" => Some(Self::Graphql), "sql" => Some(Self::Sql), + "json-rpc" => Some(Self::JsonRpc), _ => None, } } @@ -76,6 +80,8 @@ pub struct L7EndpointConfig { pub enforcement: EnforcementMode, /// Maximum GraphQL request body bytes to buffer for inspection. pub graphql_max_body_bytes: usize, + /// Maximum JSON-RPC request body bytes to buffer for inspection. + pub json_rpc_max_body_bytes: usize, /// When true, percent-encoded `/` (`%2F`) is preserved in path segments /// rather than rejected at the parser. Needed by upstreams like GitLab /// that embed `%2F` in namespaced project paths. Defaults to false. @@ -110,6 +116,8 @@ pub struct L7RequestInfo { pub query_params: std::collections::HashMap>, /// Parsed GraphQL operation metadata for GraphQL endpoints. pub graphql: Option, + /// Parsed JSON-RPC request metadata for JSON-RPC endpoints. + pub jsonrpc: Option, } /// Parse an L7 endpoint config from a regorus Value (returned by Rego query). @@ -165,6 +173,10 @@ pub fn parse_l7_config(val: ®orus::Value) -> Option { .and_then(|v| usize::try_from(v).ok()) .filter(|v| *v > 0) .unwrap_or(graphql::DEFAULT_MAX_BODY_BYTES); + let json_rpc_max_body_bytes = get_object_u64(val, "json_rpc_max_body_bytes") + .and_then(|v| usize::try_from(v).ok()) + .filter(|v| *v > 0) + .unwrap_or(jsonrpc::DEFAULT_MAX_BODY_BYTES); Some(L7EndpointConfig { protocol, @@ -172,6 +184,7 @@ pub fn parse_l7_config(val: ®orus::Value) -> Option { tls, enforcement, graphql_max_body_bytes, + json_rpc_max_body_bytes, allow_encoded_slash, websocket_credential_rewrite, request_body_credential_rewrite, @@ -598,7 +611,7 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< if !protocol.is_empty() && L7Protocol::parse(protocol).is_none() { errors.push(format!( - "{loc}: unknown protocol '{protocol}' (expected rest, websocket, graphql, or sql)" + "{loc}: unknown protocol '{protocol}' (expected rest, websocket, graphql, sql, or json-rpc)" )); } @@ -624,6 +637,18 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< } } + if ep.get("json_rpc_max_body_bytes").is_some() { + let valid_max = ep + .get("json_rpc_max_body_bytes") + .and_then(serde_json::Value::as_u64) + .is_some_and(|v| v > 0); + if !valid_max { + errors.push(format!( + "{loc}: json_rpc_max_body_bytes must be a positive integer" + )); + } + } + if protocol != "graphql" && protocol != "websocket" && (ep.get("persisted_queries").is_some() @@ -635,6 +660,12 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< )); } + if protocol != "json-rpc" && ep.get("json_rpc_max_body_bytes").is_some() { + warnings.push(format!( + "{loc}: JSON-RPC-specific endpoint fields are ignored unless protocol is json-rpc" + )); + } + if ep .get("websocket_credential_rewrite") .and_then(serde_json::Value::as_bool) diff --git a/crates/openshell-supervisor-network/src/l7/relay.rs b/crates/openshell-supervisor-network/src/l7/relay.rs index 3054a4530..1251b8e7b 100644 --- a/crates/openshell-supervisor-network/src/l7/relay.rs +++ b/crates/openshell-supervisor-network/src/l7/relay.rs @@ -178,6 +178,7 @@ where .into_diagnostic()?; Ok(()) } + L7Protocol::JsonRpc => relay_jsonrpc(config, &engine, client, upstream, ctx).await, } } @@ -297,6 +298,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: graphql_info.clone(), + jsonrpc: None, }; let websocket_request = crate::l7::rest::request_is_websocket_upgrade(&req.raw_header); if config.protocol == L7Protocol::Websocket && !websocket_request { @@ -341,7 +343,7 @@ where let engine_type = match config.protocol { L7Protocol::Graphql => "l7-graphql", L7Protocol::Websocket => "l7-websocket", - L7Protocol::Rest | L7Protocol::Sql => "l7", + L7Protocol::Rest | L7Protocol::Sql | L7Protocol::JsonRpc => "l7", }; emit_l7_request_log( ctx, @@ -694,6 +696,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: None, + jsonrpc: None, }; let websocket_request = crate::l7::rest::request_is_websocket_upgrade(&req.raw_header); if config.protocol == L7Protocol::Websocket && !websocket_request { @@ -885,6 +888,173 @@ fn close_if_stale(guard: &PolicyGenerationGuard, ctx: &L7EvalContext) -> bool { true } +async fn relay_jsonrpc( + config: &L7EndpointConfig, + engine: &TunnelPolicyEngine, + client: &mut C, + upstream: &mut U, + ctx: &L7EvalContext, +) -> Result<()> +where + C: AsyncRead + AsyncWrite + Unpin + Send, + U: AsyncRead + AsyncWrite + Unpin + Send, +{ + loop { + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let parsed = match crate::l7::jsonrpc::parse_jsonrpc_http_request( + client, + config.json_rpc_max_body_bytes, + crate::l7::path::CanonicalizeOptions { + allow_encoded_slash: config.allow_encoded_slash, + ..Default::default() + }, + ) + .await + { + Ok(Some(parsed)) => parsed, + Ok(None) => return Ok(()), + Err(e) => { + if is_benign_connection_error(&e) { + debug!( + host = %ctx.host, + port = ctx.port, + error = %e, + "JSON-RPC L7 connection closed" + ); + } else { + let detail = + parse_rejection_detail(&e.to_string(), ParseRejectionMode::L7Endpoint); + emit_parse_rejection(ctx, &detail, "l7-jsonrpc"); + } + return Ok(()); + } + }; + + let req = parsed.request; + let jsonrpc_info = parsed.info; + + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let redacted_target = req.target.clone(); + + let request_info = L7RequestInfo { + action: req.action.clone(), + target: redacted_target.clone(), + query_params: req.query_params.clone(), + graphql: None, + jsonrpc: Some(jsonrpc_info.clone()), + }; + + let parse_error_reason = jsonrpc_info + .error + .as_deref() + .map(|e| format!("JSON-RPC request rejected: {e}")); + let force_deny = parse_error_reason.is_some(); + let (allowed, reason, jsonrpc_log_info) = if let Some(reason) = parse_error_reason { + (false, reason, jsonrpc_info.clone()) + } else { + let evaluation = + evaluate_jsonrpc_l7_request_for_log(engine, ctx, &request_info, &jsonrpc_info)?; + (evaluation.allowed, evaluation.reason, evaluation.log_info) + }; + + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let decision_str = match (allowed, config.enforcement) { + (_, _) if force_deny => "deny", + (true, _) => "allow", + (false, EnforcementMode::Audit) => "audit", + (false, EnforcementMode::Enforce) => "deny", + }; + + { + let (action_id, disposition_id, severity) = match decision_str { + "deny" => (ActionId::Denied, DispositionId::Blocked, SeverityId::Medium), + _ => ( + ActionId::Allowed, + DispositionId::Allowed, + SeverityId::Informational, + ), + }; + let endpoint = format!("{}:{}{}", ctx.host, ctx.port, redacted_target); + let params_sha256 = jsonrpc_log_info + .params_sha256() + .unwrap_or_else(|| "".to_string()); + let policy_version = engine.captured_generation(); + let event = HttpActivityBuilder::new(openshell_ocsf::ctx::ctx()) + .activity(ActivityId::Other) + .action(action_id) + .disposition(disposition_id) + .severity(severity) + .http_request(HttpRequest::new( + &request_info.action, + OcsfUrl::new("http", &ctx.host, &redacted_target, ctx.port), + )) + .dst_endpoint(Endpoint::from_domain(&ctx.host, ctx.port)) + .firewall_rule(&ctx.policy_name, "l7-jsonrpc") + .message(jsonrpc_log_message( + decision_str, + &request_info.action, + &endpoint, + &jsonrpc_log_info, + ¶ms_sha256, + policy_version, + &reason, + )) + .build(); + ocsf_emit!(event); + } + + if allowed || (config.enforcement == EnforcementMode::Audit && !force_deny) { + let outcome = crate::l7::rest::relay_http_request_with_resolver_guarded( + &req, + client, + upstream, + ctx.secret_resolver.as_deref(), + Some(engine.generation_guard()), + ) + .await?; + match outcome { + RelayOutcome::Reusable => {} + RelayOutcome::Consumed => { + debug!( + host = %ctx.host, + port = ctx.port, + "Upstream connection not reusable, closing JSON-RPC L7 relay" + ); + return Ok(()); + } + RelayOutcome::Upgraded { .. } => { + return Ok(()); + } + } + } else { + crate::l7::rest::RestProvider::default() + .deny_with_redacted_target( + &req, + &ctx.policy_name, + &reason, + client, + Some(&redacted_target), + Some(crate::l7::rest::DenyResponseContext { + host: Some(&ctx.host), + port: Some(ctx.port), + binary: Some(&ctx.binary_path), + }), + ) + .await?; + return Ok(()); + } + } +} + async fn relay_graphql( config: &L7EndpointConfig, engine: &TunnelPolicyEngine, @@ -962,6 +1132,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: Some(graphql_info.clone()), + jsonrpc: None, }; // Malformed or ambiguous GraphQL requests, such as duplicated GET @@ -1110,6 +1281,38 @@ fn graphql_log_summary(info: &crate::l7::graphql::GraphqlRequestInfo) -> String format!("graphql_ops={}", ops.join(";")) } +pub(crate) fn jsonrpc_log_message( + decision: &str, + http_method: &str, + endpoint: &str, + info: &crate::l7::jsonrpc::JsonRpcRequestInfo, + params_sha256: &str, + policy_version: u64, + reason: &str, +) -> String { + let rpc_methods = jsonrpc_methods_for_log(info); + format!( + "JSONRPC_L7_REQUEST decision={decision} http_method={http_method} endpoint={endpoint} rpc_methods={rpc_methods} params_sha256={params_sha256} policy_version={policy_version} reason={reason}" + ) +} + +pub(crate) fn jsonrpc_methods_for_log(info: &crate::l7::jsonrpc::JsonRpcRequestInfo) -> String { + if info.calls.is_empty() { + return "-".to_string(); + } + info.calls + .iter() + .map(|call| call.method.as_str()) + .collect::>() + .join(",") +} + +struct JsonRpcEvaluation { + allowed: bool, + reason: String, + log_info: crate::l7::jsonrpc::JsonRpcRequestInfo, +} + /// Check if a miette error represents a benign connection close. /// /// TLS handshake EOF, missing `close_notify`, connection resets, and broken @@ -1135,6 +1338,88 @@ pub fn evaluate_l7_request( engine: &TunnelPolicyEngine, ctx: &L7EvalContext, request: &L7RequestInfo, +) -> Result<(bool, String)> { + if let Some(jsonrpc) = &request.jsonrpc + && jsonrpc.is_batch + && !jsonrpc.calls.is_empty() + { + for call in &jsonrpc.calls { + let item_request = jsonrpc_request_for_call(request, call); + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; + if !allowed { + return Ok((false, reason)); + } + } + return Ok((true, String::new())); + } + + evaluate_l7_request_once(engine, ctx, request) +} + +fn evaluate_jsonrpc_l7_request_for_log( + engine: &TunnelPolicyEngine, + ctx: &L7EvalContext, + request: &L7RequestInfo, + jsonrpc: &crate::l7::jsonrpc::JsonRpcRequestInfo, +) -> Result { + if jsonrpc.is_batch && !jsonrpc.calls.is_empty() { + let mut denied_calls = Vec::new(); + let mut first_denied_reason = None; + for call in &jsonrpc.calls { + let item_request = jsonrpc_request_for_call(request, call); + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; + if !allowed { + if first_denied_reason.is_none() { + first_denied_reason = Some(reason); + } + denied_calls.push(call.clone()); + } + } + + if denied_calls.is_empty() { + return Ok(JsonRpcEvaluation { + allowed: true, + reason: String::new(), + log_info: jsonrpc.clone(), + }); + } + + return Ok(JsonRpcEvaluation { + allowed: false, + reason: first_denied_reason.unwrap_or_else(|| "request denied by policy".to_string()), + log_info: crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: denied_calls, + is_batch: true, + error: None, + }, + }); + } + + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, request)?; + Ok(JsonRpcEvaluation { + allowed, + reason, + log_info: jsonrpc.clone(), + }) +} + +fn jsonrpc_request_for_call( + request: &L7RequestInfo, + call: &crate::l7::jsonrpc::JsonRpcCallInfo, +) -> L7RequestInfo { + let mut item_request = request.clone(); + item_request.jsonrpc = Some(crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: vec![call.clone()], + is_batch: false, + error: None, + }); + item_request +} + +fn evaluate_l7_request_once( + engine: &TunnelPolicyEngine, + ctx: &L7EvalContext, + request: &L7RequestInfo, ) -> Result<(bool, String)> { if engine.is_stale() { return Err(miette!( @@ -1159,6 +1444,14 @@ pub fn evaluate_l7_request( "path": request.target, "query_params": request.query_params.clone(), "graphql": request.graphql.clone(), + "jsonrpc": request.jsonrpc.as_ref().map(|j| { + let call = if j.is_batch { None } else { j.calls.first() }; + serde_json::json!({ + "method": call.map(|call| call.method.as_str()), + "params": call.map(|call| call.params.clone()).unwrap_or_default(), + "error": j.error, + }) + }), } }); @@ -1792,6 +2085,7 @@ network_policies: target: "/ws".into(), query_params: std::collections::HashMap::new(), graphql: None, + jsonrpc: None, }; let (allowed, reason) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); @@ -1800,6 +2094,180 @@ network_policies: assert!(reason.contains("WEBSOCKET_TEXT /ws not permitted")); } + #[test] + fn jsonrpc_batch_evaluates_each_call() { + let data = r#" +network_policies: + jsonrpc_api: + name: jsonrpc_api + endpoints: + - host: api.example.test + port: 443 + protocol: json-rpc + enforcement: enforce + rules: + - allow: + method: POST + path: "/mcp" + rpc_method: "tools/list" + - allow: + method: POST + path: "/mcp" + rpc_method: "tools/call" + params: + name: read_status + deny_rules: + - rpc_method: "tools/call" + params: + name: blocked_action + - rpc_method: "tools/delete" + binaries: + - { path: /usr/bin/node } +"#; + let engine = OpaEngine::from_strings(TEST_POLICY, data).unwrap(); + let tunnel_engine = engine + .clone_engine_for_tunnel(engine.current_generation()) + .unwrap(); + let ctx = L7EvalContext { + host: "api.example.test".into(), + port: 443, + policy_name: "jsonrpc_api".into(), + binary_path: "/usr/bin/node".into(), + ancestors: vec![], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + }; + let mut request = L7RequestInfo { + action: "POST".into(), + target: "/mcp".into(), + query_params: std::collections::HashMap::new(), + graphql: None, + jsonrpc: Some(crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#, + )), + }; + + let (allowed, reason) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); + assert!(allowed, "{reason}"); + + request.jsonrpc = Some(crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}}, + {"jsonrpc":"2.0","id":3,"method":"tools/delete","params":{"name":"purge_cache"}} + ]"#, + )); + let (allowed, _) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); + assert!(!allowed); + + let jsonrpc = request.jsonrpc.as_ref().expect("jsonrpc request"); + let evaluation = + evaluate_jsonrpc_l7_request_for_log(&tunnel_engine, &ctx, &request, jsonrpc).unwrap(); + assert!(!evaluation.allowed); + assert!(evaluation.log_info.is_batch); + assert_eq!( + jsonrpc_methods_for_log(&evaluation.log_info), + "tools/call,tools/delete" + ); + + let full_params_sha256 = jsonrpc.params_sha256().expect("full batch params digest"); + let log_params_sha256 = evaluation + .log_info + .params_sha256() + .expect("logged batch params digest"); + assert_ne!(full_params_sha256, log_params_sha256); + let message = jsonrpc_log_message( + "deny", + "POST", + "api.example.test:443/mcp", + &evaluation.log_info, + &log_params_sha256, + 42, + &evaluation.reason, + ); + assert!(message.contains("rpc_methods=tools/call,tools/delete")); + assert!(message.contains("params_sha256=")); + assert!(!message.contains("params_sha256=sha256:")); + assert!(message.contains("policy_version=42")); + assert!(!message.contains("tools/list")); + assert!(!message.contains("blocked_action")); + assert!(!message.contains("purge_cache")); + } + + #[test] + fn jsonrpc_log_records_digest_not_args() { + let info = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"delete_resource","arguments":{"scope":"secret-scope"}}}"#, + ); + let params_sha256 = info.params_sha256().expect("params digest"); + let message = jsonrpc_log_message( + "deny", + "POST", + "mcp.example.com:443/mcp", + &info, + ¶ms_sha256, + 42, + "request denied by policy", + ); + + assert!(message.contains("endpoint=mcp.example.com:443/mcp")); + assert!(message.contains("rpc_methods=tools/call")); + assert!(message.contains("params_sha256=")); + assert!(!message.contains("params_sha256=sha256:")); + assert!(message.contains("policy_version=42")); + assert!(!message.contains("delete_resource")); + assert!(!message.contains("secret-scope")); + + let batch = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"delete_resource"}} + ]"#, + ); + let batch_params_sha256 = batch.params_sha256().expect("batch params digest"); + let batch_message = jsonrpc_log_message( + "allow", + "POST", + "mcp.example.com:443/mcp", + &batch, + &batch_params_sha256, + 43, + "", + ); + + assert!(batch_message.starts_with("JSONRPC_L7_REQUEST ")); + assert!(batch_message.contains("rpc_methods=tools/list,tools/call")); + assert!(batch_message.contains("params_sha256=")); + assert!(!batch_message.contains("params_sha256=sha256:")); + assert!(batch_message.contains("policy_version=43")); + assert!(!batch_message.contains("rpc_method=")); + assert!(!batch_message.contains("delete_resource")); + + let no_params = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"initialize"}"#, + ); + let no_params_sha256 = no_params + .params_sha256() + .unwrap_or_else(|| "".to_string()); + let no_params_message = jsonrpc_log_message( + "allow", + "POST", + "mcp.example.com:443/mcp", + &no_params, + &no_params_sha256, + 44, + "", + ); + assert!(no_params_message.contains("rpc_methods=initialize")); + assert!(no_params_message.contains("params_sha256=")); + } + #[tokio::test] async fn route_selected_websocket_upgrade_rejects_invalid_accept_without_forwarding_101() { let data = r#" @@ -1828,6 +2296,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -1931,6 +2400,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -2051,6 +2521,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -2407,4 +2878,100 @@ network_policies: "stale passthrough request must not be forwarded upstream" ); } + + #[tokio::test] + async fn jsonrpc_relay_denies_method_not_in_allow_list() { + let data = r" +network_policies: + mcp_api: + name: mcp_api + endpoints: + - host: mcp.example.test + port: 8000 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: initialize + binaries: + - { path: /usr/bin/python3 } +"; + let engine = OpaEngine::from_strings(TEST_POLICY, data).unwrap(); + let input = NetworkInput { + host: "mcp.example.test".into(), + port: 8000, + binary_path: PathBuf::from("/usr/bin/python3"), + binary_sha256: "unused".into(), + ancestors: vec![], + cmdline_paths: vec![], + }; + let (endpoint_config, generation) = engine + .query_endpoint_config_with_generation(&input) + .unwrap(); + let config = crate::l7::parse_l7_config(&endpoint_config.unwrap()).unwrap(); + let tunnel_engine = engine.clone_engine_for_tunnel(generation).unwrap(); + let ctx = L7EvalContext { + host: "mcp.example.test".into(), + port: 8000, + policy_name: "mcp_api".into(), + binary_path: "/usr/bin/python3".into(), + ancestors: vec![], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + }; + + let (mut app, mut relay_client) = tokio::io::duplex(8192); + let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192); + let relay = tokio::spawn(async move { + relay_with_inspection( + &config, + tunnel_engine, + &mut relay_client, + &mut relay_upstream, + &ctx, + ) + .await + }); + + let body = + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_repos"}}"#; + let request = format!( + "POST /mcp HTTP/1.1\r\nHost: mcp.example.test:8000\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + app.write_all(request.as_bytes()).await.unwrap(); + app.write_all(body).await.unwrap(); + + let mut response = [0u8; 512]; + let n = tokio::time::timeout(std::time::Duration::from_secs(2), app.read(&mut response)) + .await + .expect("relay should respond without reaching upstream") + .unwrap(); + let response = String::from_utf8_lossy(&response[..n]); + assert!( + response.contains("403"), + "tools/call not in allow list must be denied with 403, got: {response:?}" + ); + + let mut upstream_buf = [0u8; 128]; + let n = tokio::time::timeout( + std::time::Duration::from_millis(100), + upstream.read(&mut upstream_buf), + ) + .await + .unwrap_or(Ok(0)) + .unwrap_or(0); + assert_eq!(n, 0, "denied request must not be forwarded to upstream"); + + drop(app); + tokio::time::timeout(std::time::Duration::from_secs(1), relay) + .await + .expect("relay should complete") + .unwrap() + .unwrap(); + } } diff --git a/crates/openshell-supervisor-network/src/l7/websocket.rs b/crates/openshell-supervisor-network/src/l7/websocket.rs index 31aa35509..e1f92e6ec 100644 --- a/crates/openshell-supervisor-network/src/l7/websocket.rs +++ b/crates/openshell-supervisor-network/src/l7/websocket.rs @@ -545,6 +545,7 @@ fn inspect_websocket_text_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: None, + jsonrpc: None, }; let (allowed, reason) = evaluate_l7_request(inspector.engine, inspector.ctx, &request_info)?; let decision = match (allowed, inspector.enforcement) { @@ -581,6 +582,7 @@ fn inspect_graphql_websocket_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: None, + jsonrpc: None, }; emit_websocket_l7_event( host, @@ -602,6 +604,7 @@ fn inspect_graphql_websocket_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: Some(graphql.clone()), + jsonrpc: None, }; let parse_error_reason = graphql .error diff --git a/crates/openshell-supervisor-network/src/opa.rs b/crates/openshell-supervisor-network/src/opa.rs index 4dd0350ff..14d88a87f 100644 --- a/crates/openshell-supervisor-network/src/opa.rs +++ b/crates/openshell-supervisor-network/src/opa.rs @@ -925,6 +925,24 @@ fn resolve_binary_in_container(_policy_path: &str, _entrypoint_pid: u32) -> Opti None } +fn l7_matchers_to_json( + matchers: &std::collections::HashMap, +) -> serde_json::Map { + matchers + .iter() + .map(|(key, matcher)| { + let mut matcher_json = serde_json::json!({}); + if !matcher.glob.is_empty() { + matcher_json["glob"] = matcher.glob.clone().into(); + } + if !matcher.any.is_empty() { + matcher_json["any"] = matcher.any.clone().into(); + } + (key.clone(), matcher_json) + }) + .collect() +} + /// Convert typed proto policy fields to JSON suitable for `engine.add_data_json()`. /// /// The rego rules reference `data.*` directly, so the JSON structure has @@ -1023,35 +1041,25 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St "command": a.map_or("", |a| &a.command), "operation_type": a.map_or("", |a| &a.operation_type), "operation_name": a.map_or("", |a| &a.operation_name), + "rpc_method": a.map_or("", |a| &a.rpc_method), }); if let Some(a) = a && !a.fields.is_empty() { allow["fields"] = a.fields.clone().into(); } - let query: serde_json::Map = a - .map(|allow| { - allow - .query - .iter() - .map(|(key, matcher)| { - let mut matcher_json = serde_json::json!({}); - if !matcher.glob.is_empty() { - matcher_json["glob"] = - matcher.glob.clone().into(); - } - if !matcher.any.is_empty() { - matcher_json["any"] = - matcher.any.clone().into(); - } - (key.clone(), matcher_json) - }) - .collect() - }) - .unwrap_or_default(); + let query = a.map_or_else(serde_json::Map::new, |allow| { + l7_matchers_to_json(&allow.query) + }); if !query.is_empty() { allow["query"] = query.into(); } + let params = a.map_or_else(serde_json::Map::new, |allow| { + l7_matchers_to_json(&allow.params) + }); + if !params.is_empty() { + allow["params"] = params.into(); + } serde_json::json!({ "allow": allow }) }) .collect(); @@ -1087,23 +1095,17 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St if !d.fields.is_empty() { deny["fields"] = d.fields.clone().into(); } - let query: serde_json::Map = d - .query - .iter() - .map(|(key, matcher)| { - let mut matcher_json = serde_json::json!({}); - if !matcher.glob.is_empty() { - matcher_json["glob"] = matcher.glob.clone().into(); - } - if !matcher.any.is_empty() { - matcher_json["any"] = matcher.any.clone().into(); - } - (key.clone(), matcher_json) - }) - .collect(); + if !d.rpc_method.is_empty() { + deny["rpc_method"] = d.rpc_method.clone().into(); + } + let query = l7_matchers_to_json(&d.query); if !query.is_empty() { deny["query"] = query.into(); } + let params = l7_matchers_to_json(&d.params); + if !params.is_empty() { + deny["params"] = params.into(); + } deny }) .collect(); @@ -1141,6 +1143,9 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St if e.graphql_max_body_bytes > 0 { ep["graphql_max_body_bytes"] = e.graphql_max_body_bytes.into(); } + if e.json_rpc_max_body_bytes > 0 { + ep["json_rpc_max_body_bytes"] = e.json_rpc_max_body_bytes.into(); + } ep }) .collect(); @@ -1948,6 +1953,36 @@ process: }) } + fn l7_jsonrpc_input(host: &str, port: u16, path: &str, rpc_method: &str) -> serde_json::Value { + l7_jsonrpc_input_with_params(host, port, path, rpc_method, serde_json::json!({})) + } + + fn l7_jsonrpc_input_with_params( + host: &str, + port: u16, + path: &str, + rpc_method: &str, + params: serde_json::Value, + ) -> serde_json::Value { + serde_json::json!({ + "network": { "host": host, "port": port }, + "exec": { + "path": "/usr/bin/curl", + "ancestors": [], + "cmdline_paths": [] + }, + "request": { + "method": "POST", + "path": path, + "query_params": {}, + "jsonrpc": { + "method": rpc_method, + "params": params + } + } + }) + } + fn l7_graphql_input(host: &str, operations: serde_json::Value) -> serde_json::Value { serde_json::json!({ "network": { "host": host, "port": 443 }, @@ -2494,6 +2529,8 @@ network_policies: operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: std::collections::HashMap::new(), }), }], ..Default::default() @@ -2542,6 +2579,140 @@ network_policies: assert!(!eval_l7(&engine, &deny_input)); } + #[test] + fn l7_jsonrpc_rpc_method_from_proto_is_enforced() { + let mut network_policies = std::collections::HashMap::new(); + network_policies.insert( + "jsonrpc_proto".to_string(), + NetworkPolicyRule { + name: "jsonrpc_proto".to_string(), + endpoints: vec![NetworkEndpoint { + host: "mcp.proto.com".to_string(), + port: 8000, + path: "/mcp".to_string(), + protocol: "json-rpc".to_string(), + enforcement: "enforce".to_string(), + rules: vec![L7Rule { + allow: Some(L7Allow { + method: String::new(), + path: String::new(), + command: String::new(), + query: std::collections::HashMap::new(), + operation_type: String::new(), + operation_name: String::new(), + fields: Vec::new(), + rpc_method: "initialize".to_string(), + params: std::collections::HashMap::new(), + }), + }], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }, + ); + + let proto = ProtoSandboxPolicy { + version: 1, + filesystem: Some(ProtoFs { + include_workdir: true, + read_only: vec![], + read_write: vec![], + }), + landlock: Some(openshell_core::proto::LandlockPolicy { + compatibility: "best_effort".to_string(), + }), + process: Some(ProtoProc { + run_as_user: "sandbox".to_string(), + run_as_group: "sandbox".to_string(), + }), + network_policies, + }; + + let engine = OpaEngine::from_proto(&proto).expect("engine from proto"); + let allow_input = l7_jsonrpc_input("mcp.proto.com", 8000, "/mcp", "initialize"); + assert!(eval_l7(&engine, &allow_input)); + + let deny_input = l7_jsonrpc_input("mcp.proto.com", 8000, "/mcp", "tools/list"); + assert!(!eval_l7(&engine, &deny_input)); + } + + #[test] + fn l7_jsonrpc_params_rules_filter_tools_call() { + let data = r#" +network_policies: + jsonrpc_params: + name: jsonrpc_params + endpoints: + - host: mcp.params.test + port: 8000 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: blocked_action + binaries: + - { path: /usr/bin/curl } +"#; + let engine = OpaEngine::from_strings(TEST_POLICY, data).expect("engine from yaml"); + + let read_status = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({"name": "read_status"}), + ); + assert!(eval_l7(&engine, &read_status)); + + let submit_report = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({ + "name": "submit_report", + "arguments.scope": "workspace/main" + }), + ); + assert!(eval_l7(&engine, &submit_report)); + + let blocked_without_args = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({"name": "blocked_action"}), + ); + assert!(!eval_l7(&engine, &blocked_without_args)); + + let blocked_with_args = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({ + "name": "blocked_action", + "arguments.reason": "test" + }), + ); + assert!(!eval_l7(&engine, &blocked_with_args)); + } + #[test] fn l7_no_request_on_l4_only_endpoint() { // L4-only endpoint should not match L7 allow_request diff --git a/crates/openshell-supervisor-network/src/policy_local.rs b/crates/openshell-supervisor-network/src/policy_local.rs index 2fce25389..cf783dc5e 100644 --- a/crates/openshell-supervisor-network/src/policy_local.rs +++ b/crates/openshell-supervisor-network/src/policy_local.rs @@ -1088,6 +1088,8 @@ fn network_endpoint_from_json( operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::new(), }), }) .collect(); @@ -1102,6 +1104,8 @@ fn network_endpoint_from_json( operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::new(), }) .collect(); @@ -1125,6 +1129,7 @@ fn network_endpoint_from_json( persisted_queries: String::new(), graphql_persisted_queries: HashMap::new(), graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: 0, path: String::new(), }) } diff --git a/crates/openshell-supervisor-network/src/proxy.rs b/crates/openshell-supervisor-network/src/proxy.rs index d467b022e..0ee2d6719 100644 --- a/crates/openshell-supervisor-network/src/proxy.rs +++ b/crates/openshell-supervisor-network/src/proxy.rs @@ -13,7 +13,7 @@ use openshell_core::denial::DenialEvent; use openshell_core::net::{is_always_blocked_ip, is_internal_ip, is_link_local_ip}; use openshell_core::policy::ProxyPolicy; use openshell_core::provider_credentials::ProviderCredentialState; -use openshell_core::secrets::{SecretResolver, rewrite_header_line_checked}; +use openshell_core::secrets::{self, SecretResolver, rewrite_header_line_checked}; use openshell_ocsf::{ ActionId, ActivityId, DispositionId, Endpoint, HttpActivityBuilder, HttpRequest, NetworkActivityBuilder, Process, SeverityId, StatusId, Url as OcsfUrl, ocsf_emit, @@ -176,7 +176,7 @@ impl ProxyHandle { /// The proxy uses OPA for network decisions with process-identity binding /// via `/proc/net/tcp`. All connections are evaluated through OPA policy. #[allow(clippy::too_many_arguments)] - pub async fn start_with_bind_addr( + pub(crate) async fn start_with_bind_addr( policy: &ProxyPolicy, bind_addr: Option, opa_engine: Arc, @@ -345,6 +345,21 @@ fn emit_forward_success_activity(tx: Option<&ActivitySender>, l7_activity_pendin ); } +fn l7_parse_error_reason(request_info: &crate::l7::L7RequestInfo) -> Option { + request_info + .graphql + .as_ref() + .and_then(|info| info.error.as_deref()) + .map(|error| format!("GraphQL request rejected: {error}")) + .or_else(|| { + request_info + .jsonrpc + .as_ref() + .and_then(|info| info.error.as_deref()) + .map(|error| format!("JSON-RPC request rejected: {error}")) + }) +} + /// Emit a denial event to the aggregator channel (if configured). /// Used by `handle_tcp_connection` which owns `Option`. fn emit_denial( @@ -492,6 +507,7 @@ async fn handle_tcp_connection( ) .await?; if let InferenceOutcome::Denied { reason } = outcome { + emit_activity(&activity_tx, true, "forward_policy"); let event = NetworkActivityBuilder::new(openshell_ocsf::ctx::ctx()) .activity(ActivityId::Open) .action(ActionId::Denied) @@ -2767,16 +2783,14 @@ fn rewrite_forward_request( path: &str, secret_resolver: Option<&SecretResolver>, request_body_credential_rewrite: bool, -) -> Result, openshell_core::secrets::UnresolvedPlaceholderError> { +) -> Result, secrets::UnresolvedPlaceholderError> { let header_end = raw[..used] .windows(4) .position(|w| w == b"\r\n\r\n") .map_or(used, |p| p + 4); let websocket_upgrade = crate::l7::rest::request_is_websocket_upgrade(&raw[..header_end]); let upstream_path = match secret_resolver { - Some(resolver) => { - openshell_core::secrets::rewrite_target_for_eval(path, resolver)?.resolved - } + Some(resolver) => secrets::rewrite_target_for_eval(path, resolver)?.resolved, None => path.to_string(), }; @@ -2869,10 +2883,10 @@ fn rewrite_forward_request( output.len() }; let output_str = String::from_utf8_lossy(&output[..scan_end]); - if output_str.contains(openshell_core::secrets::PLACEHOLDER_PREFIX_PUBLIC) - || output_str.contains(openshell_core::secrets::PROVIDER_ALIAS_MARKER_PUBLIC) + if output_str.contains(secrets::PLACEHOLDER_PREFIX_PUBLIC) + || output_str.contains(secrets::PROVIDER_ALIAS_MARKER_PUBLIC) { - return Err(openshell_core::secrets::UnresolvedPlaceholderError { location: "header" }); + return Err(secrets::UnresolvedPlaceholderError { location: "header" }); } } @@ -3395,18 +3409,66 @@ async fn handle_forward_proxy( } else { None }; + let jsonrpc = if l7_config.config.protocol == crate::l7::L7Protocol::JsonRpc { + let header_end = forward_request_bytes + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(forward_request_bytes.len(), |p| p + 4); + let header_str = std::str::from_utf8(&forward_request_bytes[..header_end]) + .map_err(|_| miette::miette!("Forward JSON-RPC headers contain invalid UTF-8"))?; + let body_length = crate::l7::rest::parse_body_length(header_str)?; + let mut jsonrpc_request = crate::l7::provider::L7Request { + action: method.to_string(), + target: path.clone(), + query_params: query_params.clone(), + raw_header: forward_request_bytes, + body_length, + }; + let body = match crate::l7::http::read_body_for_inspection( + client, + &mut jsonrpc_request, + l7_config.config.json_rpc_max_body_bytes, + ) + .await + { + Ok(body) => body, + Err(e) => { + let event = NetworkActivityBuilder::new(openshell_ocsf::ctx::ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Medium) + .status(StatusId::Failure) + .dst_endpoint(Endpoint::from_domain(&host_lc, port)) + .message(format!("FORWARD_JSONRPC_L7 request rejected: {e}")) + .build(); + ocsf_emit!(event); + emit_activity_simple(activity_tx, true, "l7_parse_rejection"); + respond( + client, + &build_json_error_response( + 400, + "Bad Request", + "invalid_jsonrpc_request", + &format!("JSON-RPC request rejected before policy evaluation: {e}"), + ), + ) + .await?; + return Ok(()); + } + }; + forward_request_bytes = jsonrpc_request.raw_header; + Some(crate::l7::jsonrpc::parse_jsonrpc_body(&body)) + } else { + None + }; let request_info = crate::l7::L7RequestInfo { action: method.to_string(), target: path.clone(), query_params, graphql, + jsonrpc, }; - let parse_error_reason = request_info - .graphql - .as_ref() - .and_then(|info| info.error.as_deref()) - .map(|error| format!("GraphQL request rejected: {error}")); + let parse_error_reason = l7_parse_error_reason(&request_info); let force_deny = parse_error_reason.is_some(); let (allowed, reason) = parse_error_reason.map_or_else( || { @@ -3447,16 +3509,39 @@ async fn handle_forward_proxy( SeverityId::Informational, ), }; - let engine_type = if l7_config.config.protocol == crate::l7::L7Protocol::Graphql { - "l7-graphql" - } else { - "l7" - }; - let message_prefix = if l7_config.config.protocol == crate::l7::L7Protocol::Graphql { - "FORWARD_GRAPHQL_L7" - } else { - "FORWARD_L7" + let engine_type = match l7_config.config.protocol { + crate::l7::L7Protocol::Graphql => "l7-graphql", + crate::l7::L7Protocol::JsonRpc => "l7-jsonrpc", + _ => "l7", }; + let log_message = request_info.jsonrpc.as_ref().map_or_else( + || { + let message_prefix = + if l7_config.config.protocol == crate::l7::L7Protocol::Graphql { + "FORWARD_GRAPHQL_L7" + } else { + "FORWARD_L7" + }; + format!( + "{message_prefix} {decision_str} {method} {host_lc}:{port}{path} reason={reason}" + ) + }, + |jsonrpc_info| { + let endpoint = format!("{host_lc}:{port}{path}"); + let params_sha256 = jsonrpc_info + .params_sha256() + .unwrap_or_else(|| "".to_string()); + crate::l7::relay::jsonrpc_log_message( + decision_str, + method, + &endpoint, + jsonrpc_info, + ¶ms_sha256, + tunnel_engine.captured_generation(), + &reason, + ) + }, + ); let event = HttpActivityBuilder::new(openshell_ocsf::ctx::ctx()) .activity(ActivityId::Other) .action(action_id) @@ -3473,9 +3558,7 @@ async fn handle_forward_proxy( .with_cmd_line(&cmdline_str), ) .firewall_rule(policy_str, engine_type) - .message(format!( - "{message_prefix} {decision_str} {method} {host_lc}:{port}{path} reason={reason}" - )) + .message(log_message) .build(); ocsf_emit!(event); } @@ -4091,6 +4174,7 @@ mod tests { tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite, request_body_credential_rewrite: false, @@ -4132,6 +4216,28 @@ mod tests { assert_eq!(event.deny_group, "unknown"); } + #[test] + fn l7_parse_error_reason_includes_jsonrpc_errors() { + let request_info = crate::l7::L7RequestInfo { + action: "POST".to_string(), + target: "/mcp".to_string(), + query_params: std::collections::HashMap::new(), + graphql: None, + jsonrpc: Some(crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: false, + error: Some("ambiguous dotted params key 'arguments.scope'".to_string()), + }), + }; + + let reason = l7_parse_error_reason(&request_info).expect("JSON-RPC parse error"); + + assert_eq!( + reason, + "JSON-RPC request rejected: ambiguous dotted params key 'arguments.scope'" + ); + } + #[test] fn forward_l7_allowed_activity_is_deferred_until_after_ssrf() { let (tx, mut rx) = mpsc::channel(4); @@ -4690,6 +4796,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: false, request_body_credential_rewrite: false, @@ -4703,6 +4810,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: false, request_body_credential_rewrite: false, diff --git a/docs/reference/policy-schema.mdx b/docs/reference/policy-schema.mdx index 59f72c9f7..1a0705cda 100644 --- a/docs/reference/policy-schema.mdx +++ b/docs/reference/policy-schema.mdx @@ -155,7 +155,7 @@ Each endpoint defines a reachable destination and optional inspection rules. | `host` | string | Yes | Hostname or IP address. Supports a `*` wildcard inside the first DNS label only: `*.example.com`, `**.example.com`, and intra-label patterns like `*-aiplatform.googleapis.com` are accepted; bare `*`/`**`, TLD wildcards (`*.com`), and wildcards outside the first label are rejected at load time. | | `port` | integer | Yes | TCP port number. | | `path` | string | No | Optional HTTP path glob used to select between L7 endpoints that share the same host and port. Empty means all paths. Use this when REST and GraphQL live under the same host, such as `/repos/**` and `/graphql`. | -| `protocol` | string | No | Set to `rest` for HTTP method/path inspection, `websocket` for RFC 6455 upgrade and client text-message inspection, or `graphql` for GraphQL-over-HTTP operation inspection. WebSocket endpoints can also use GraphQL operation rules for GraphQL-over-WebSocket traffic. Omit for TCP passthrough. | +| `protocol` | string | No | Set to `rest` for HTTP method/path inspection, `websocket` for RFC 6455 upgrade and client text-message inspection, `graphql` for GraphQL-over-HTTP operation inspection, or `json-rpc` for sandbox-to-server JSON-RPC-over-HTTP method and params inspection. WebSocket endpoints can also use GraphQL operation rules for GraphQL-over-WebSocket traffic. Omit for TCP passthrough. | | `tls` | string | No | TLS handling mode. The proxy auto-detects TLS by peeking the first bytes of each connection and terminates it for inspected HTTPS traffic, so this field is optional in most cases. Set to `skip` to disable auto-detection for edge cases such as client-certificate mTLS or non-standard protocols. The values `terminate` and `passthrough` are deprecated and log a warning; they are still accepted for backward compatibility but have no effect on behavior. | | `enforcement` | string | No | `enforce` actively blocks disallowed requests. `audit` logs violations but allows traffic through. | | `access` | string | No | Access preset. One of `read-only`, `read-write`, or `full`. Mutually exclusive with `rules`. | @@ -168,6 +168,7 @@ Each endpoint defines a reachable destination and optional inspection rules. | `persisted_queries` | string | No | GraphQL hash-only behavior for `protocol: graphql` and GraphQL-over-WebSocket operation policy. Default is `deny`; use `allow_registered` only with `graphql_persisted_queries`. | | `graphql_persisted_queries` | map | No | Trusted GraphQL persisted-query registry keyed by hash or saved-query ID. Values contain `operation_type`, optional `operation_name`, and optional root `fields`. | | `graphql_max_body_bytes` | integer | No | Maximum GraphQL-over-HTTP request body bytes buffered for inspection. Defaults to `65536`. | +| `json_rpc` | object | No | JSON-RPC endpoint options. For `protocol: json-rpc`, `json_rpc.max_body_bytes` sets the maximum JSON-RPC-over-HTTP request body bytes buffered for inspection. Defaults to `65536`. | Credential rewrite recognizes the canonical `openshell:resolve:env:KEY` placeholder form and whole-token provider-shaped aliases such as `provider-OPENSHELL-RESOLVE-ENV-API_TOKEN` when the referenced environment key exists in the configured provider credentials. @@ -175,11 +176,13 @@ Credential rewrite recognizes the canonical `openshell:resolve:env:KEY` placehol The `access` field accepts one of the following values: -| Value | REST expansion | WebSocket expansion | GraphQL expansion | -|---|---|---|---| -| `full` | All methods and paths. | WebSocket upgrade and all inspected client text-message paths. | All operation types. | -| `read-only` | `GET`, `HEAD`, `OPTIONS`. | WebSocket upgrade handshake only. | `query` operations. | -| `read-write` | `GET`, `HEAD`, `OPTIONS`, `POST`, `PUT`, `PATCH`. | WebSocket upgrade handshake and client text messages. | `query` and `mutation` operations. | +| Value | REST expansion | WebSocket expansion | GraphQL expansion | JSON-RPC behavior | +|---|---|---|---|---| +| `full` | All methods and paths. | WebSocket upgrade and all inspected client text-message paths. | All operation types. | Allows matching HTTP requests without constraining JSON-RPC methods. | +| `read-only` | `GET`, `HEAD`, `OPTIONS`. | WebSocket upgrade handshake only. | `query` operations. | Expands to HTTP read methods and does not allow typical JSON-RPC `POST` calls. | +| `read-write` | `GET`, `HEAD`, `OPTIONS`, `POST`, `PUT`, `PATCH`. | WebSocket upgrade handshake and client text messages. | `query` and `mutation` operations. | Allows matching HTTP write requests without constraining JSON-RPC methods. | + +For JSON-RPC endpoints, prefer explicit `rules` with `rpc_method` and optional `params` when you need method-level control. #### Allow Rule Objects @@ -274,6 +277,42 @@ rules: Do not combine `method`, `path`, or `query` with `operation_type`, `operation_name`, or `fields` inside the same WebSocket rule. When a WebSocket endpoint has GraphQL operation policy, use GraphQL rules for client messages instead of a raw `WEBSOCKET_TEXT` allow rule. +##### JSON-RPC Allow Rule (`protocol: json-rpc`) + +JSON-RPC allow rules match sandbox-to-server JSON-RPC-over-HTTP request objects by RPC method and optional params. They apply to single JSON-RPC requests and batch requests. For a batch, OpenShell evaluates each call independently. JSON-RPC responses and server-to-client messages on response bodies or MCP SSE streams are relayed but are not currently parsed for policy enforcement. + +| Field | Type | Required | Description | +|---|---|---|---| +| `rpc_method` | string | Yes | JSON-RPC method name or glob, such as `initialize`, `tools/list`, or `tools/*`. | +| `params` | map | No | Params matchers keyed by flattened object-param path. Use dot-separated keys for nested object params, such as `arguments.scope`. Matcher value can be a glob string or an object with `any`. Strings, numbers, and booleans are converted to strings; arrays, `null`, and non-object top-level params do not produce matcher keys. Requests with literal `.` characters in params object keys are rejected before policy evaluation because they are ambiguous with flattened nested paths. | + +Example JSON-RPC allow rules: + +```yaml showLineNumbers={false} +endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main +``` + #### Deny Rule Objects Blocks specific operations on endpoints that otherwise have broad access. Deny rules are evaluated after allow rules and take precedence: if a request matches any deny rule, it is blocked regardless of what the allow rules or access preset permit. @@ -356,6 +395,33 @@ endpoints: operation_name: Admin* ``` +##### JSON-RPC Deny Rule (`protocol: json-rpc`) + +JSON-RPC deny rules use the same field names as JSON-RPC allow rules, but they appear directly under each `deny_rules` entry instead of under an `allow` wrapper. Deny rules take precedence over allow rules. In a batch request, one denied call denies the full batch. + +| Field | Type | Required | Description | +|---|---|---|---| +| `rpc_method` | string | Yes | JSON-RPC method name or glob to deny. | +| `params` | map | No | Params matchers keyed by flattened object-param path. Omit to deny every call matching `rpc_method`. Strings, numbers, and booleans are converted to strings; arrays, `null`, and non-object top-level params do not produce matcher keys. | + +Example JSON-RPC deny rules: + +```yaml showLineNumbers={false} +endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: tools/* + deny_rules: + - rpc_method: tools/call + params: + name: delete_resource +``` + ### Binary Object Identifies an executable that is permitted to use the associated endpoints. diff --git a/docs/sandboxes/policies.mdx b/docs/sandboxes/policies.mdx index 406ed12b8..f0a3464b3 100644 --- a/docs/sandboxes/policies.mdx +++ b/docs/sandboxes/policies.mdx @@ -148,7 +148,7 @@ The following steps outline the hot-reload policy update workflow. To inspect a stored sandbox-authored revision instead of the current effective policy, pass `--rev `. -5. Edit the YAML: add or adjust `network_policies` entries, binaries, `access`, or `rules`. +5. Edit the YAML: add or adjust `network_policies` entries, binaries, `access`, `rules`, or protocol-specific matchers such as GraphQL operation fields and JSON-RPC `rpc_method` / `params` rules. 6. Push the updated policy when you need a full replacement. Exit codes: 0 = loaded, 1 = validation failed, 124 = timeout. @@ -173,7 +173,7 @@ Use `openshell policy update` when you want to merge network policy changes into - remove one endpoint or one named rule without rewriting the rest of the file. - preview a merged result locally with `--dry-run` before you send it to the gateway. -Use `openshell policy set` instead when you want to replace the full policy, update static sections, or make broader edits that are easier to express in YAML. +Use `openshell policy set` instead when you want to replace the full policy, update static sections, or make broader edits that are easier to express in YAML. Use full YAML for GraphQL and JSON-RPC rule shapes. ### Update Commands @@ -210,6 +210,7 @@ This is the practical difference: Current constraints: - `--add-allow` and `--add-deny` work on `protocol: rest` and `protocol: websocket` endpoints. +- GraphQL and JSON-RPC fine-grained rules require full policy YAML applied with `openshell policy set`. - `--add-deny` requires the endpoint to already have an allow base, either an `access` preset or explicit allow `rules`. - `protocol: sql` is not a practical incremental workflow today. OpenShell does not do full SQL parsing, and SQL enforcement is not meaningfully supported yet. @@ -228,7 +229,7 @@ Each segment has a fixed meaning: | `host` | Yes | Destination hostname. | | `port` | Yes | Destination port, `1` through `65535`. | | `access` | No | Access preset for L7 endpoints: `read-only`, `read-write`, or `full`. Incremental updates expand presets into protocol-specific method/path rules for REST and WebSocket endpoints. | -| `protocol` | No | L7 inspection mode: `rest`, `websocket`, or `sql`. `sql` is audit-only and not a recommended workflow today. | +| `protocol` | No | L7 inspection mode accepted by `openshell policy update`: `rest`, `websocket`, or `sql`. `sql` is audit-only and not a recommended workflow today. Full policy YAML also supports `graphql` and `json-rpc`. | | `enforcement` | No | Enforcement mode for inspected traffic: `enforce` or `audit`. | | `options` | No | Comma-separated endpoint options. Use `websocket-credential-rewrite` with `protocol: websocket` or REST compatibility endpoints that perform a WebSocket upgrade. Use `request-body-credential-rewrite` only with `protocol: rest`. | @@ -548,7 +549,7 @@ For an end-to-end walkthrough that combines this policy with a GitHub credential - { path: /usr/bin/gh } ``` -Endpoints with `protocol: rest` enable HTTP request inspection and can opt in to supported text request body credential rewrite. Endpoints with `protocol: websocket` validate WebSocket upgrades and inspect client text messages on the upgraded request path. WebSocket endpoints can also classify GraphQL-over-WebSocket operation messages with the same operation rules used by GraphQL-over-HTTP. Endpoints with `protocol: graphql` parse GraphQL-over-HTTP payloads before evaluating rules. The endpoint-level `path` field lets these protocols share `api.github.com:443` without treating GraphQL payloads as plain REST `POST /graphql` requests. +Endpoints with `protocol: rest` enable HTTP request inspection and can opt in to supported text request body credential rewrite. Endpoints with `protocol: websocket` validate WebSocket upgrades and inspect client text messages on the upgraded request path. WebSocket endpoints can also classify GraphQL-over-WebSocket operation messages with the same operation rules used by GraphQL-over-HTTP. Endpoints with `protocol: graphql` parse GraphQL-over-HTTP payloads before evaluating rules. Endpoints with `protocol: json-rpc` parse JSON-RPC-over-HTTP request bodies and evaluate `rpc_method` and optional params rules. The endpoint-level `path` field lets these protocols share `api.github.com:443` without treating GraphQL payloads as plain REST `POST /graphql` requests. @@ -579,6 +580,51 @@ REST rules can also constrain query parameter values: `query` matchers are case-sensitive and run on decoded values. If a request has duplicate keys (for example, `tag=a&tag=b`), every value for that key must match the configured glob(s). +### JSON-RPC matching + +JSON-RPC endpoints use `protocol: json-rpc`. The proxy parses sandbox-to-server JSON-RPC-over-HTTP request bodies, evaluates the `method` field against `rpc_method`, and can match object params through dot-separated `params` keys. + +JSON-RPC policy enforcement is directional. It applies to HTTP request bodies sent by the sandboxed process to the configured endpoint. JSON-RPC responses and server-to-client messages carried on response bodies or MCP SSE streams are relayed but are not currently parsed for policy enforcement. + +JSON-RPC endpoint policies currently require full policy YAML applied with `openshell policy set`; the incremental `openshell policy update --add-endpoint` parser does not accept `json-rpc` as a protocol. + +```yaml showLineNumbers={false} + mcp_server: + name: mcp_server + endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: delete_resource + binaries: + - { path: /usr/bin/python3 } +``` + +`json_rpc.max_body_bytes` controls how many JSON-RPC-over-HTTP request body bytes OpenShell buffers for inspection. It defaults to `65536`. + +`params` matchers are case-sensitive and use the same string glob or `{ any: [...] }` matcher syntax as REST query parameters. They match scalar leaf values from object params: strings, numbers, and booleans are converted to strings, and nested JSON object params are flattened with dot-separated keys before matching. Arrays, `null`, and non-object top-level params do not produce matcher keys. Requests with literal `.` characters in params object keys are rejected before policy evaluation because they are ambiguous with flattened nested paths. This is useful for controls such as matching MCP `tools/call` by `params.name`, but it is not a complete MCP payload policy for rich nested content. For batch requests, OpenShell evaluates each JSON-RPC call independently and denies the whole batch if any call is denied. + ### GraphQL matching GraphQL endpoints use `protocol: graphql`. The proxy parses GraphQL-over-HTTP `GET` and `POST` requests, classifies each operation, and evaluates rules against the operation type, optional operation name, and selected root fields. diff --git a/e2e/mcp-conformance/Dockerfile.client b/e2e/mcp-conformance/Dockerfile.client new file mode 100644 index 000000000..79810bbe9 --- /dev/null +++ b/e2e/mcp-conformance/Dockerfile.client @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +FROM public.ecr.aws/docker/library/node:22-bookworm-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates iproute2 \ + && rm -rf /var/lib/apt/lists/* + +ARG SANDBOX_UID=1000660000 +ARG SANDBOX_GID=1000660000 + +# Match the sandbox user expected by OpenShell policies and supervisor setup. +# The UID/GID are intentionally outside Debian's default login.defs range. +RUN groupadd -K "GID_MAX=${SANDBOX_GID}" -g "${SANDBOX_GID}" sandbox \ + && useradd -K "UID_MAX=${SANDBOX_UID}" --no-log-init -m -u "${SANDBOX_UID}" -g sandbox sandbox + +WORKDIR /opt/mcp-conformance + +COPY . . +RUN if [ -f package-lock.json ]; then npm ci; else npm install; fi +RUN chown -R sandbox:sandbox /opt/mcp-conformance /home/sandbox + +USER sandbox +CMD ["sleep", "infinity"] diff --git a/e2e/mcp-conformance/README.md b/e2e/mcp-conformance/README.md new file mode 100644 index 000000000..1dd47b87e --- /dev/null +++ b/e2e/mcp-conformance/README.md @@ -0,0 +1,27 @@ +# MCP Conformance E2E + +This directory contains the OpenShell wrapper for the upstream +`modelcontextprotocol/conformance` runner. + +The workflow checks out and builds the upstream conformance repository, then +runs its CLI in client mode. The upstream runner starts a real MCP test server, +then invokes `client-through-openshell.sh` with that server URL. The wrapper +starts the Docker-backed OpenShell e2e gateway and runs the upstream TypeScript +`everything-client` inside an OpenShell sandbox, so the MCP traffic crosses the +sandbox proxy. + +The conformance server URL uses `localhost` from the GitHub Actions job +container's perspective. Sandboxes run in separate Docker containers, so the +wrapper rewrites local URLs to `host.openshell.internal`, the alias that +`e2e/with-docker-gateway.sh` attaches to the job container on the e2e Docker +network. + +The generated policy allows valid JSON-RPC requests to the conformance server +with `rpc_method: "*"`. That keeps OpenShell deny-by-default at the network +boundary while allowing the upstream scenarios to exercise MCP behavior. The +policy body lives in `policy-template.yaml`; the wrapper renders its host, port, +and path placeholders from the upstream server URL. + +When enabling broader upstream suites, add scenarios that OpenShell does not yet +support through the JSON-RPC proxy to `expected-failures.yml`. The upstream +runner treats listed failures as allowed and treats stale entries as failures. diff --git a/e2e/mcp-conformance/client-through-openshell.sh b/e2e/mcp-conformance/client-through-openshell.sh new file mode 100755 index 000000000..5b3e0c6fd --- /dev/null +++ b/e2e/mcp-conformance/client-through-openshell.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Runs the upstream MCP conformance client through an OpenShell sandbox. +# +# The modelcontextprotocol/conformance runner starts a real MCP test server in +# the GitHub Actions job container and invokes this script with that server URL. +# This script starts the normal Docker-backed OpenShell e2e gateway, creates a +# sandbox from the prebuilt conformance client image, and runs the upstream +# TypeScript everything-client inside that sandbox. That keeps the MCP +# client/server traffic in the OpenShell proxy data path. +# +# Conformance server URLs usually point at localhost in the job container. +# Sandboxes are separate Docker containers, so localhost would point back at the +# sandbox itself. The wrapper rewrites local URLs to host.openshell.internal, +# which e2e/with-docker-gateway.sh attaches to the job container on the e2e +# Docker network. + +set -euo pipefail + +usage() { + echo "usage: $0 " >&2 +} + +if [ "$#" -ne 1 ]; then + usage + exit 2 +fi + +# Parse the conformance runner's server URL and render the OpenShell policy. +prepare_conformance_target() { + local server_url=$1 + local policy_file=$2 + local policy_template=$3 + + python3 - "${server_url}" "${policy_file}" "${policy_template}" <<'PY' +import json +import string +import sys +from pathlib import Path +from urllib.parse import urlparse, urlunparse + +raw_url, policy_file, policy_template = sys.argv[1:4] +parsed = urlparse(raw_url) + +if parsed.scheme not in ("http", "https"): + raise SystemExit(f"unsupported conformance server URL scheme: {parsed.scheme!r}") + +host = parsed.hostname +if not host: + raise SystemExit(f"conformance server URL is missing a host: {raw_url}") + +target_host = "host.openshell.internal" if host in {"localhost", "127.0.0.1", "::1"} else host +port = parsed.port or (443 if parsed.scheme == "https" else 80) +path = parsed.path or "/" +netloc_host = f"[{target_host}]" if ":" in target_host and not target_host.startswith("[") else target_host +netloc = f"{netloc_host}:{port}" +rewritten = urlunparse((parsed.scheme, netloc, path, parsed.params, parsed.query, parsed.fragment)) + +template = string.Template(Path(policy_template).read_text(encoding="utf-8")) +policy = template.substitute( + host=json.dumps(target_host), + port=str(port), + path=json.dumps(path), +) +Path(policy_file).write_text(policy, encoding="utf-8") + +print(rewritten) +PY +} + +SERVER_URL="$1" +CLIENT_IMAGE="${OPENSHELL_MCP_CONFORMANCE_CLIENT_IMAGE:?set OPENSHELL_MCP_CONFORMANCE_CLIENT_IMAGE to the prebuilt conformance client image}" +ROOT="$(git rev-parse --show-toplevel 2>/dev/null || pwd)" +POLICY_TEMPLATE="${ROOT}/e2e/mcp-conformance/policy-template.yaml" + +POLICY_FILE="$(mktemp "${TMPDIR:-/tmp}/openshell-mcp-conformance-policy.XXXXXX.yaml")" +trap 'rm -f "${POLICY_FILE}"' EXIT + +CLIENT_SERVER_URL="$(prepare_conformance_target "${SERVER_URL}" "${POLICY_FILE}" "${POLICY_TEMPLATE}")" + +ENV_ARGS=() +# These environment variables are set by the upstream conformance test runner +# before it invokes the configured client command. Forward them into the +# sandbox because the sandboxed TypeScript client depends on them to select the +# scenario and read scenario-specific context. +for NAME in MCP_CONFORMANCE_SCENARIO MCP_CONFORMANCE_CONTEXT MCP_CONFORMANCE_PROTOCOL_VERSION; do + if [ -n "${!NAME+x}" ]; then + ENV_ARGS+=(--env "${NAME}=${!NAME}") + fi +done + +# shellcheck source=e2e/support/gateway-common.sh disable=SC1091 +source "${ROOT}/e2e/support/gateway-common.sh" +TARGET_DIR="$(e2e_cargo_target_dir "${ROOT}")" +OPENSHELL_BIN="${OPENSHELL_BIN:-${TARGET_DIR}/debug/openshell}" +export OPENSHELL_E2E_DOCKER_SANDBOX_IMAGE="${OPENSHELL_E2E_DOCKER_SANDBOX_IMAGE:-${CLIENT_IMAGE}}" + +# shellcheck disable=SC2016 +"${ROOT}/e2e/with-docker-gateway.sh" \ + "${OPENSHELL_BIN}" sandbox create \ + --from "${CLIENT_IMAGE}" \ + --policy "${POLICY_FILE}" \ + "${ENV_ARGS[@]}" \ + -- \ + sh -c 'cd /opt/mcp-conformance && exec ./node_modules/.bin/tsx examples/clients/typescript/everything-client.ts "$1"' \ + sh "${CLIENT_SERVER_URL}" diff --git a/e2e/mcp-conformance/expected-failures.yml b/e2e/mcp-conformance/expected-failures.yml new file mode 100644 index 000000000..5b226631f --- /dev/null +++ b/e2e/mcp-conformance/expected-failures.yml @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Add client scenarios here when enabling broader MCP conformance suites that +# exercise features OpenShell does not yet support through the JSON-RPC proxy. +client: [] +server: [] diff --git a/e2e/mcp-conformance/policy-template.yaml b/e2e/mcp-conformance/policy-template.yaml new file mode 100644 index 000000000..2a02f6374 --- /dev/null +++ b/e2e/mcp-conformance/policy-template.yaml @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /bin + - /usr + - /lib + - /lib64 + - /proc + - /sys + - /dev/urandom + - /etc + - /opt + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + - /home/sandbox + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox + +network_policies: + mcp_conformance: + name: mcp_conformance + endpoints: + - host: ${host} + port: ${port} + path: ${path} + protocol: json-rpc + enforcement: enforce + allowed_ips: + - "10.0.0.0/8" + - "172.0.0.0/8" + - "192.168.0.0/16" + - "fc00::/7" + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: "*" + binaries: + - path: /bin/sh + - path: /usr/bin/env + - path: /usr/local/bin/node + - path: /usr/bin/node + - path: /opt/mcp-conformance/node_modules/.bin/* diff --git a/e2e/rust/Cargo.toml b/e2e/rust/Cargo.toml index 083c622df..2f61f2d86 100644 --- a/e2e/rust/Cargo.toml +++ b/e2e/rust/Cargo.toml @@ -97,6 +97,11 @@ name = "forward_proxy_graphql_l7" path = "tests/forward_proxy_graphql_l7.rs" required-features = ["e2e-host-gateway"] +[[test]] +name = "forward_proxy_jsonrpc_l7" +path = "tests/forward_proxy_jsonrpc_l7.rs" +required-features = ["e2e-host-gateway"] + [[test]] name = "gpu_device_selection" path = "tests/gpu_device_selection.rs" diff --git a/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs b/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs new file mode 100644 index 000000000..ada51ba2e --- /dev/null +++ b/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs @@ -0,0 +1,371 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! E2E tests for JSON-RPC L7 inspection across both proxy entry points. +//! +//! The upstream server deliberately does not implement JSON-RPC. `OpenShell` +//! parses and enforces JSON-RPC before forwarding, so any HTTP server that +//! accepts POST /mcp is enough to prove allowed requests reach upstream +//! and denied requests are stopped by the sandbox proxy. + +#![cfg(feature = "e2e")] + +use std::io::Write; + +use openshell_e2e::harness::container::ContainerHttpServer; +use openshell_e2e::harness::sandbox::SandboxGuard; +use tempfile::NamedTempFile; + +const TEST_SERVER_ALIAS: &str = "jsonrpc-l7.openshell.test"; + +async fn start_test_server() -> Result { + let script = r#"from http.server import BaseHTTPRequestHandler, HTTPServer + +class Handler(BaseHTTPRequestHandler): + def read_body(self): + if self.headers.get("Transfer-Encoding", "").lower() == "chunked": + data = b"" + while True: + size_line = self.rfile.readline() + if not size_line: + break + size = int(size_line.split(b";", 1)[0].strip(), 16) + if size == 0: + while self.rfile.readline().strip(): + pass + break + data += self.rfile.read(size) + self.rfile.read(2) + return data + return self.rfile.read(int(self.headers.get("Content-Length", "0"))) + + def do_GET(self): + self.send_response(200) + self.end_headers() + + def do_POST(self): + self.read_body() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"jsonrpc":"2.0","id":1,"result":{}}') + + def log_message(self, format, *args): + pass + +HTTPServer(("0.0.0.0", 8000), Handler).serve_forever() +"#; + + ContainerHttpServer::start_python(TEST_SERVER_ALIAS, script).await +} + +fn write_jsonrpc_policy(host: &str, port: u16) -> Result { + let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?; + let policy = format!( + r#"version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /usr + - /lib + - /proc + - /dev/urandom + - /app + - /etc + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox + +network_policies: + test_jsonrpc_l7: + name: test_jsonrpc_l7 + endpoints: + - host: {host} + port: {port} + path: /mcp + protocol: json-rpc + enforcement: enforce + allowed_ips: + - "10.0.0.0/8" + - "172.0.0.0/8" + - "192.168.0.0/16" + - "fc00::/7" + json_rpc: + max_body_bytes: 65536 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: blocked_action + binaries: + - path: /usr/bin/python* + - path: /usr/local/bin/python* + - path: /sandbox/.uv/python/*/bin/python* +"# + ); + file.write_all(policy.as_bytes()) + .map_err(|e| format!("write temp policy file: {e}"))?; + file.flush() + .map_err(|e| format!("flush temp policy file: {e}"))?; + Ok(file) +} + +#[tokio::test] +#[allow(clippy::too_many_lines)] +async fn jsonrpc_l7_enforces_method_and_params_rules_on_forward_and_connect_paths() { + let server = start_test_server().await.expect("start test server"); + let policy = write_jsonrpc_policy(&server.host, server.port).expect("write custom policy"); + let policy_path = policy + .path() + .to_str() + .expect("temp policy path should be utf-8") + .to_string(); + + let script = format!( + r#" +import json +import os +import socket +import time +import urllib.error +import urllib.parse +import urllib.request + +HOST = {host:?} +PORT = {port} +DETAILS = {{}} + +def post_jsonrpc(method, params=None, req_id=1): + body = {{"jsonrpc": "2.0", "id": req_id, "method": method}} + if params is not None: + body["params"] = params + encoded = json.dumps(body).encode() + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json"}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def post_jsonrpc_batch(requests): + encoded = json.dumps(requests).encode() + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json"}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def post_invalid_json(): + encoded = b"not valid json {{" + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json", "Content-Length": str(len(encoded))}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def proxy_parts(*names): + proxy_url = next((os.environ.get(name) for name in names if os.environ.get(name)), None) + parsed = urllib.parse.urlparse(proxy_url) + return parsed.hostname, parsed.port or 80 + +def read_until(sock, marker): + data = b"" + while marker not in data: + chunk = sock.recv(4096) + if not chunk: + break + data += chunk + return data + +def read_response(sock): + response = read_until(sock, b"\r\n\r\n") + headers, _, body = response.partition(b"\r\n\r\n") + content_length = 0 + for line in headers.split(b"\r\n")[1:]: + if line.lower().startswith(b"content-length:"): + content_length = int(line.split(b":", 1)[1].strip()) + break + while len(body) < content_length: + chunk = sock.recv(4096) + if not chunk: + break + body += chunk + return response, body + +def status_code(response, label): + parts = response.split() + if len(parts) < 2: + DETAILS[f"{{label}}_raw"] = response.decode(errors="replace") + raise RuntimeError(f"{{label}}: malformed HTTP response: {{response!r}}") + try: + return int(parts[1]) + except ValueError as error: + DETAILS[f"{{label}}_raw"] = response.decode(errors="replace") + raise RuntimeError(f"{{label}}: non-numeric HTTP status: {{response!r}}") from error + +def connect_http_status(label, request): + proxy_host, proxy_port = proxy_parts("HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy") + target = f"{{HOST}}:{{PORT}}" + + last_error = None + for attempt in range(5): + try: + with socket.create_connection((proxy_host, proxy_port), timeout=15) as sock: + sock.sendall( + f"CONNECT {{target}} HTTP/1.1\r\nHost: {{target}}\r\n\r\n".encode() + ) + connect_response = read_until(sock, b"\r\n\r\n") + connect_code = status_code(connect_response, f"{{label}}_connect") + if connect_code != 200: + return connect_code + sock.sendall(request) + sock.shutdown(socket.SHUT_WR) + response = read_until(sock, b"\r\n\r\n") + return status_code(response, f"{{label}}_response") + except (OSError, RuntimeError) as error: + last_error = error + DETAILS[f"{{label}}_attempt_{{attempt + 1}}_error"] = str(error) + time.sleep(0.2) + + raise RuntimeError(f"{{label}}: failed after 5 attempts: {{last_error}}") + +def connect_jsonrpc_status(method, params, label): + target = f"{{HOST}}:{{PORT}}" + body = {{"jsonrpc": "2.0", "id": 1, "method": method}} + if params is not None: + body["params"] = params + encoded = json.dumps(body).encode() + request = ( + f"POST /mcp HTTP/1.1\r\n" + f"Host: {{target}}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {{len(encoded)}}\r\n" + f"Connection: close\r\n" + f"\r\n" + ).encode() + encoded + return connect_http_status(label, request) + +results = {{ + # forward proxy — method-only allow rules + "forward_method_initialize_allowed": post_jsonrpc("initialize", {{"protocolVersion": "2025-11-25", "capabilities": {{}}}}), + "forward_method_tools_list_allowed": post_jsonrpc("tools/list"), + + # forward proxy — params allow rules + "forward_tools_call_params_name_no_args_allowed": post_jsonrpc("tools/call", {{"name": "read_status"}}), + "forward_tools_call_params_nested_args_allowed": post_jsonrpc("tools/call", {{"name": "submit_report", "arguments": {{"scope": "workspace/main", "title": "test"}}}}), + + # forward proxy — params denied + "forward_tools_call_params_name_no_args_denied": post_jsonrpc("tools/call", {{"name": "blocked_action"}}), + "forward_tools_call_params_name_with_args_denied": post_jsonrpc("tools/call", {{"name": "blocked_action", "arguments": {{"reason": "test"}}}}), + + # forward proxy — batch: all requests allowed + "forward_batch_all_allowed": post_jsonrpc_batch([ + {{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}}, + {{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {{"name": "read_status"}}}}, + ]), + + # forward proxy — batch: one denied request causes full batch denial + "forward_batch_one_denied": post_jsonrpc_batch([ + {{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}}, + {{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {{"name": "blocked_action"}}}}, + ]), + + # forward proxy — invalid JSON body fails closed before generic rules apply + "forward_invalid_json_denied": post_invalid_json(), + + # CONNECT path — representative allowed and denied cases + "connect_method_initialize_allowed": connect_jsonrpc_status("initialize", {{"protocolVersion": "2025-11-25", "capabilities": {{}}}}, "connect_method_initialize_allowed"), + "connect_method_tools_list_allowed": connect_jsonrpc_status("tools/list", None, "connect_method_tools_list_allowed"), + "connect_tools_call_params_name_no_args_allowed": connect_jsonrpc_status("tools/call", {{"name": "read_status"}}, "connect_tools_call_params_name_no_args_allowed"), + "connect_tools_call_params_nested_args_allowed": connect_jsonrpc_status("tools/call", {{"name": "submit_report", "arguments": {{"scope": "workspace/main"}}}}, "connect_tools_call_params_nested_args_allowed"), + "connect_tools_call_params_name_no_args_denied": connect_jsonrpc_status("tools/call", {{"name": "blocked_action"}}, "connect_tools_call_params_name_no_args_denied"), + "connect_tools_call_params_name_with_args_denied": connect_jsonrpc_status("tools/call", {{"name": "blocked_action", "arguments": {{"reason": "test"}}}}, "connect_tools_call_params_name_with_args_denied"), +}} +results.update(DETAILS) +print(json.dumps(results, sort_keys=True)) +"#, + host = server.host, + port = server.port, + ); + + let guard = SandboxGuard::create(&["--policy", &policy_path, "--", "python3", "-c", &script]) + .await + .expect("sandbox create"); + + for (key, expected) in [ + // forward proxy — allowed + ("forward_method_initialize_allowed", 200), + ("forward_method_tools_list_allowed", 200), + ("forward_tools_call_params_name_no_args_allowed", 200), + ("forward_tools_call_params_nested_args_allowed", 200), + // forward proxy — params denied + ("forward_tools_call_params_name_no_args_denied", 403), + ("forward_tools_call_params_name_with_args_denied", 403), + // forward proxy — batch + ("forward_batch_all_allowed", 200), + ("forward_batch_one_denied", 403), + // forward proxy — parse error + ("forward_invalid_json_denied", 403), + // CONNECT path — allowed + ("connect_method_initialize_allowed", 200), + ("connect_method_tools_list_allowed", 200), + ("connect_tools_call_params_name_no_args_allowed", 200), + ("connect_tools_call_params_nested_args_allowed", 200), + // CONNECT path — params denied + ("connect_tools_call_params_name_no_args_denied", 403), + ("connect_tools_call_params_name_with_args_denied", 403), + ] { + let expected_fragment = format!(r#""{key}": {expected}"#); + assert!( + guard.create_output.contains(&expected_fragment), + "expected {key}={expected}, got:\n{}", + guard.create_output + ); + } +} diff --git a/proto/sandbox.proto b/proto/sandbox.proto index ef0b0540f..afe1d3301 100644 --- a/proto/sandbox.proto +++ b/proto/sandbox.proto @@ -128,6 +128,9 @@ message NetworkEndpoint { // Advisor-proposed endpoints must not satisfy exact-host SSRF trust unless // they are converted through an explicit user-authored policy path. bool advisor_proposed = 18; + // Maximum JSON-RPC-over-HTTP request body bytes to buffer for inspection. + // Defaults to 65536 when unset. + uint32 json_rpc_max_body_bytes = 19; } // Trusted GraphQL operation classification. @@ -160,6 +163,11 @@ message L7DenyRule { // GraphQL root field globs. Deny rules match when any selected root field // matches any configured glob. repeated string fields = 7; + // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". + string rpc_method = 8; + // JSON-RPC params matcher map. Dot-separated keys select nested params + // fields, e.g. "arguments.scope". + map params = 9; } // An L7 policy rule (allow-only). @@ -186,6 +194,11 @@ message L7Allow { // GraphQL root field globs. Allow rules match only when every selected root // field matches one of the configured globs. Omit to match all fields. repeated string fields = 7; + // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". + string rpc_method = 8; + // JSON-RPC params matcher map. Dot-separated keys select nested params + // fields, e.g. "arguments.scope". + map params = 9; } // Query value matcher for one query parameter key.