diff --git a/server/app/models/agents/component.rb b/server/app/models/agents/component.rb index d890b358f..7b43f0753 100644 --- a/server/app/models/agents/component.rb +++ b/server/app/models/agents/component.rb @@ -11,11 +11,47 @@ class Component < ApplicationRecord inverse_of: :target_component enum component_type: { chat_input: 0, chat_output: 1, data_storage: 2, llm_model: 3, prompt_template: 4, +<<<<<<< HEAD vector_store: 5, python_custom: 6 } +======= + vector_store: 5, python_custom: 6, conditional: 7, guardrails: 8, tool: 9, agent: 10, + knowledge_base: 11, llm_router: 12, human_in_loop: 13, a2a_agent: 14 } +>>>>>>> afa98e94b (feat(CE): add a2a_agent component type, masked config, and JSON-RPC client (#1719)) validates :name, presence: true validates :component_type, presence: true store :position, coder: JSON + + # Returns configuration with auth_config values masked for API responses. + # Currently applies to :a2a_agent components and masks all non-blank strings under auth_config. + def masked_configuration + return configuration if configuration.blank? + return configuration unless a2a_agent? + + mask_a2a_secrets(configuration.deep_dup) + end + + private + + SECRET_MASK = "*************" + + def mask_a2a_secrets(config) + config["auth_config"] = mask_nested_values(config["auth_config"]) if config["auth_config"].present? + config + end + + def mask_nested_values(obj) + case obj + when Hash + obj.transform_values { |v| mask_nested_values(v) } + when Array + obj.map { |v| mask_nested_values(v) } + when String + obj.present? ? SECRET_MASK : obj + else + obj + end + end end end diff --git a/server/lib/utils/json_rpc_client.rb b/server/lib/utils/json_rpc_client.rb new file mode 100644 index 000000000..d69451d4e --- /dev/null +++ b/server/lib/utils/json_rpc_client.rb @@ -0,0 +1,141 @@ +# frozen_string_literal: true + +module Utils + class JsonRpcClient < HttpClient + DEFAULT_TIMEOUT = 30 + ALLOWED_SCHEMES = %w[http https].freeze + + def self.execute_rpc(url:, method:, params: {}, headers: {}, config: {}) + validate_uri_scheme!(url) + envelope = build_jsonrpc_envelope(method, params) + result = post( + base_url: url, + headers: rpc_headers(headers), + body: envelope, + config: translate_config(config) + ) + wrap_result(result, validate_hash: true) + rescue URI::InvalidURIError => e + error_response(e) + end + + def self.execute_get(url:, headers: {}, config: {}) + validate_uri_scheme!(url) + result = get( + base_url: url, + headers: rpc_headers(headers), + config: translate_config(config) + ) + wrap_result(result) + rescue URI::InvalidURIError => e + error_response(e) + end + + def self.handle_response(response) + body = parse_response_body(response) + return body if response.code.to_i.between?(200, 299) + + body["error"] ||= { "message" => "HTTP request failed with status #{response.code}" } + body + end + + def self.post(base_url:, headers: {}, body: nil, config: {}) + super + rescue RuntimeError => e + raise e.cause || e + end + + def self.get(base_url:, headers: {}, config: {}) + super + rescue RuntimeError => e + raise e.cause || e + end + + def self.build_http_client(uri, timeout: DEFAULT_TIMEOUT, read_timeout: nil) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = (uri.scheme == "https") + http.verify_mode = OpenSSL::SSL::VERIFY_PEER + http.open_timeout = timeout + http.read_timeout = read_timeout || timeout + http.verify_mode = OpenSSL::SSL::VERIFY_NONE if Rails.env.development? || Rails.env.test? + http + end + + def self.build_post_request(uri, method, params, headers: {}, id: nil) + request = Net::HTTP::Post.new(uri.request_uri) + request["Content-Type"] = "application/json" + request["Accept"] = "application/json" + headers.each { |k, v| request[k.to_s] = v } + request.body = build_jsonrpc_envelope(method, params, id:).to_json + request + end + + def self.build_jsonrpc_envelope(method, params = {}, id: nil) + { + jsonrpc: "2.0", + id: id || rand(1..2_147_483_647), + method:, + params: + } + end + + def self.parse_jsonrpc_response(response, body: nil) + raw = body || response.body + parsed = JSON.parse(raw.to_s) + unless parsed.is_a?(Hash) + return { success: false, status: response.code, raw:, + body: { "error" => { "message" => "Invalid JSON-RPC response: expected object" } } } + end + + { success: response.is_a?(Net::HTTPSuccess) && !parsed.key?("error"), + status: response.code, raw:, body: parsed } + rescue JSON::ParserError + { success: false, status: response.code, raw:, + body: { "error" => { "message" => "Invalid JSON response" } } } + end + + def self.parse_response(response) + body = parse_response_body(response) + { success: response.is_a?(Net::HTTPSuccess), body: } + end + + def self.validate_uri_scheme!(url) + return if ALLOWED_SCHEMES.include?(URI.parse(url).scheme) + + raise URI::InvalidURIError, "Unsupported URI scheme (only http/https allowed)" + end + + def self.rpc_headers(custom = {}) + { "Accept" => "application/json" }.merge(custom) + end + + def self.translate_config(config) + t = config.fetch(:timeout, DEFAULT_TIMEOUT) + { timeout: config[:read_timeout] || t, open_timeout: t } + end + + def self.parse_response_body(response) + return {} if response.body.blank? + + parsed = JSON.parse(response.body) + parsed.is_a?(Hash) ? parsed : {} + rescue JSON::ParserError + {} + end + + def self.wrap_result(result, validate_hash: false) + if validate_hash && !result.is_a?(Hash) + return { success: false, body: { "error" => { "message" => "Invalid JSON-RPC response: expected object" } } } + end + + { success: !(result.is_a?(Hash) && result.key?("error")), body: result } + end + + def self.error_response(error) + { success: false, body: { "error" => { "message" => "Transport error: #{error.message}" } } } + end + + private_class_method :validate_uri_scheme!, :rpc_headers, :translate_config, + :parse_response_body, :wrap_result, :error_response + end +end diff --git a/server/spec/lib/utils/json_rpc_client_spec.rb b/server/spec/lib/utils/json_rpc_client_spec.rb new file mode 100644 index 000000000..b07621760 --- /dev/null +++ b/server/spec/lib/utils/json_rpc_client_spec.rb @@ -0,0 +1,492 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe Utils::JsonRpcClient do + let(:url) { "https://api.example.com/rpc" } + let(:uri) { URI.parse(url) } + + it "inherits from Utils::HttpClient" do + expect(described_class).to be < Utils::HttpClient + end + + describe ".build_jsonrpc_envelope" do + it "builds a valid JSON-RPC 2.0 envelope" do + envelope = described_class.build_jsonrpc_envelope("test_method", { key: "value" }, id: 42) + + expect(envelope[:jsonrpc]).to eq("2.0") + expect(envelope[:method]).to eq("test_method") + expect(envelope[:params]).to eq({ key: "value" }) + expect(envelope[:id]).to eq(42) + end + + it "generates a random id when not provided" do + envelope = described_class.build_jsonrpc_envelope("test_method") + + expect(envelope[:id]).to be_a(Integer) + expect(envelope[:id]).to be_between(1, 2_147_483_647) + end + + it "defaults params to empty hash" do + envelope = described_class.build_jsonrpc_envelope("test_method") + + expect(envelope[:params]).to eq({}) + end + end + + describe ".build_http_client" do + it "creates an HTTP client with SSL for https URLs" do + http = described_class.build_http_client(uri) + + expect(http).to be_a(Net::HTTP) + expect(http.use_ssl?).to be(true) + expect(http.open_timeout).to eq(30) + expect(http.read_timeout).to eq(30) + end + + it "uses custom timeout" do + http = described_class.build_http_client(uri, timeout: 60) + + expect(http.open_timeout).to eq(60) + expect(http.read_timeout).to eq(60) + end + + it "uses separate read_timeout when provided" do + http = described_class.build_http_client(uri, timeout: 30, read_timeout: 120) + + expect(http.open_timeout).to eq(30) + expect(http.read_timeout).to eq(120) + end + + it "does not use SSL for http URLs" do + http_uri = URI.parse("http://api.example.com/rpc") + http = described_class.build_http_client(http_uri) + + expect(http.use_ssl?).to be(false) + end + end + + describe ".build_post_request" do + it "creates a POST request with JSON-RPC body" do + request = described_class.build_post_request(uri, "test_method", { key: "value" }, id: 1) + + expect(request).to be_a(Net::HTTP::Post) + expect(request["Content-Type"]).to eq("application/json") + expect(request["Accept"]).to eq("application/json") + + body = JSON.parse(request.body) + expect(body["jsonrpc"]).to eq("2.0") + expect(body["method"]).to eq("test_method") + expect(body["params"]).to eq({ "key" => "value" }) + expect(body["id"]).to eq(1) + end + + it "includes custom headers" do + request = described_class.build_post_request( + uri, "test_method", {}, headers: { "Authorization" => "Bearer token123" } + ) + + expect(request["Authorization"]).to eq("Bearer token123") + end + end + + describe ".parse_jsonrpc_response" do + context "with successful JSON-RPC response" do + let(:response) do + instance_double(Net::HTTPSuccess, + code: "200", + body: { "jsonrpc" => "2.0", "id" => 1, "result" => "ok" }.to_json) + end + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) } + + it "returns success with parsed body, status, and raw" do + result = described_class.parse_jsonrpc_response(response) + + expect(result[:success]).to be(true) + expect(result[:status]).to eq("200") + expect(result[:raw]).to eq(response.body) + expect(result[:body]["result"]).to eq("ok") + end + end + + context "with JSON-RPC error response" do + let(:response) do + instance_double( + Net::HTTPSuccess, + code: "200", + body: { "jsonrpc" => "2.0", "id" => 1, "error" => { "code" => -32_600, "message" => "Invalid" } }.to_json + ) + end + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) } + + it "returns failure when body contains error key" do + result = described_class.parse_jsonrpc_response(response) + + expect(result[:success]).to be(false) + expect(result[:status]).to eq("200") + expect(result[:body]["error"]["message"]).to eq("Invalid") + end + end + + context "with HTTP failure response" do + let(:response) do + instance_double( + Net::HTTPInternalServerError, + code: "500", + body: { "jsonrpc" => "2.0", "id" => 1, "result" => "data" }.to_json + ) + end + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(false) } + + it "returns failure with status and raw" do + result = described_class.parse_jsonrpc_response(response) + + expect(result[:success]).to be(false) + expect(result[:status]).to eq("500") + expect(result[:raw]).to eq(response.body) + end + end + + context "with invalid JSON body" do + let(:response) { instance_double(Net::HTTPSuccess, code: "200", body: "not valid json") } + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) } + + it "returns failure with status, raw, and error message" do + result = described_class.parse_jsonrpc_response(response) + + expect(result[:success]).to be(false) + expect(result[:status]).to eq("200") + expect(result[:raw]).to eq("not valid json") + expect(result[:body]["error"]["message"]).to eq("Invalid JSON response") + end + end + + context "with non-object JSON body" do + ["[1,2,3]", "\"just_a_string\"", "42", "true", "null"].each do |payload| + it "rejects #{payload} with status and raw" do + resp = instance_double(Net::HTTPSuccess, code: "200", body: payload) + allow(resp).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) + + result = described_class.parse_jsonrpc_response(resp) + + expect(result[:success]).to be(false) + expect(result[:status]).to eq("200") + expect(result[:raw]).to eq(payload) + expect(result[:body]["error"]["message"]).to include("expected object") + end + end + end + + it "accepts an optional body parameter" do + response = instance_double(Net::HTTPSuccess, code: "200", body: "ignored") + allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) + override_body = { "jsonrpc" => "2.0", "id" => 1, "result" => "override" }.to_json + + result = described_class.parse_jsonrpc_response(response, body: override_body) + + expect(result[:success]).to be(true) + expect(result[:raw]).to eq(override_body) + expect(result[:body]["result"]).to eq("override") + end + end + + describe ".parse_response" do + context "with successful response" do + let(:response) do + instance_double(Net::HTTPSuccess, code: "200", body: { "data" => "value" }.to_json) + end + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) } + + it "returns success with parsed body" do + result = described_class.parse_response(response) + + expect(result[:success]).to be(true) + expect(result[:body]).to eq({ "data" => "value" }) + end + end + + context "with failure response" do + let(:response) do + instance_double(Net::HTTPInternalServerError, code: "500", body: { "error" => "fail" }.to_json) + end + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(false) } + + it "returns failure" do + result = described_class.parse_response(response) + + expect(result[:success]).to be(false) + end + end + + context "with unparseable body" do + let(:response) do + instance_double(Net::HTTPSuccess, code: "200", body: "not json") + end + + before { allow(response).to receive(:is_a?).with(Net::HTTPSuccess).and_return(true) } + + it "returns empty hash as body" do + result = described_class.parse_response(response) + + expect(result[:success]).to be(true) + expect(result[:body]).to eq({}) + end + end + end + + describe ".execute_rpc" do + before do + allow(described_class).to receive(:post).and_return( + { "jsonrpc" => "2.0", "id" => 1, "result" => "done" } + ) + end + + it "delegates to inherited post and returns wrapped result" do + result = described_class.execute_rpc(url:, method: "test_method", params: { key: "val" }) + + expect(described_class).to have_received(:post).with( + base_url: url, + headers: hash_including("Accept" => "application/json"), + body: hash_including(jsonrpc: "2.0", method: "test_method", params: { key: "val" }), + config: { timeout: 30, open_timeout: 30 } + ) + expect(result[:success]).to be(true) + expect(result[:body]["result"]).to eq("done") + end + + it "passes custom headers" do + described_class.execute_rpc(url:, method: "m", headers: { "X-Custom" => "val" }) + + expect(described_class).to have_received(:post).with( + hash_including(headers: hash_including("X-Custom" => "val")) + ) + end + + it "uses timeout from config" do + described_class.execute_rpc(url:, method: "m", config: { timeout: 90 }) + + expect(described_class).to have_received(:post).with( + hash_including(config: { timeout: 90, open_timeout: 90 }) + ) + end + + it "threads read_timeout from config to HttpClient" do + described_class.execute_rpc(url:, method: "m", config: { timeout: 30, read_timeout: 120 }) + + expect(described_class).to have_received(:post).with( + hash_including(config: { timeout: 120, open_timeout: 30 }) + ) + end + + it "returns error hash for malformed URLs" do + result = described_class.execute_rpc(url: "ht tp://bad url", method: "m") + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to include("Transport error") + end + + it "returns error hash for unsupported URI schemes" do + result = described_class.execute_rpc(url: "ftp://files.example.com/data", method: "m") + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to include("Unsupported URI scheme") + end + + it "returns failure when response contains error key" do + allow(described_class).to receive(:post).and_return( + { "jsonrpc" => "2.0", "id" => 1, "error" => { "code" => -32_600, "message" => "Invalid" } } + ) + + result = described_class.execute_rpc(url:, method: "m") + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to eq("Invalid") + end + + it "returns failure when response is not a Hash" do + allow(described_class).to receive(:post).and_return([1, 2, 3]) + + result = described_class.execute_rpc(url:, method: "m") + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to include("expected object") + end + + it "propagates transport errors for Connection#handle_request_errors" do + allow(described_class).to receive(:post).and_raise(Errno::ECONNREFUSED) + + expect { described_class.execute_rpc(url:, method: "m") }.to raise_error(Errno::ECONNREFUSED) + end + + it "propagates timeout errors for Connection#handle_request_errors" do + allow(described_class).to receive(:post).and_raise(Timeout::Error, "timed out") + + expect { described_class.execute_rpc(url:, method: "m") }.to raise_error(Timeout::Error) + end + end + + describe ".execute_get" do + before do + allow(described_class).to receive(:get).and_return({ "status" => "ok" }) + end + + it "delegates to inherited get and returns wrapped result" do + result = described_class.execute_get(url:) + + expect(described_class).to have_received(:get).with( + base_url: url, + headers: hash_including("Accept" => "application/json"), + config: { timeout: 30, open_timeout: 30 } + ) + expect(result[:success]).to be(true) + expect(result[:body]).to eq({ "status" => "ok" }) + end + + it "passes custom headers" do + described_class.execute_get(url:, headers: { "Authorization" => "Bearer token" }) + + expect(described_class).to have_received(:get).with( + hash_including(headers: hash_including("Authorization" => "Bearer token")) + ) + end + + it "uses timeout from config" do + described_class.execute_get(url:, config: { timeout: 120 }) + + expect(described_class).to have_received(:get).with( + hash_including(config: { timeout: 120, open_timeout: 120 }) + ) + end + + it "threads read_timeout from config to HttpClient" do + described_class.execute_get(url:, config: { timeout: 30, read_timeout: 90 }) + + expect(described_class).to have_received(:get).with( + hash_including(config: { timeout: 90, open_timeout: 30 }) + ) + end + + it "returns error hash for malformed URLs" do + result = described_class.execute_get(url: "ht tp://bad url") + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to include("Transport error") + end + + it "returns error hash for unsupported URI schemes" do + result = described_class.execute_get(url: "ftp://files.example.com/data") + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to include("Unsupported URI scheme") + end + + it "returns failure when response contains error key" do + allow(described_class).to receive(:get).and_return( + { "error" => { "message" => "Not found" } } + ) + + result = described_class.execute_get(url:) + + expect(result[:success]).to be(false) + expect(result[:body]["error"]["message"]).to eq("Not found") + end + + it "propagates transport errors for Connection#handle_request_errors" do + allow(described_class).to receive(:get).and_raise(Errno::ECONNREFUSED) + + expect { described_class.execute_get(url:) }.to raise_error(Errno::ECONNREFUSED) + end + + it "propagates timeout errors for Connection#handle_request_errors" do + allow(described_class).to receive(:get).and_raise(Timeout::Error, "timed out") + + expect { described_class.execute_get(url:) }.to raise_error(Timeout::Error) + end + end + + describe ".handle_response" do + it "returns parsed body for 2xx responses" do + response = instance_double(Net::HTTPSuccess, code: "200", + body: { "result" => "ok" }.to_json) + + result = described_class.handle_response(response) + + expect(result).to eq({ "result" => "ok" }) + end + + it "returns empty hash for 2xx with blank body" do + response = instance_double(Net::HTTPSuccess, code: "200", body: "") + + result = described_class.handle_response(response) + + expect(result).to eq({}) + end + + it "preserves error key from non-2xx JSON-RPC response" do + response = instance_double(Net::HTTPBadRequest, code: "400", + body: { "error" => { "message" => "Bad request" } }.to_json) + + result = described_class.handle_response(response) + + expect(result["error"]["message"]).to eq("Bad request") + end + + it "adds synthetic error for non-2xx without error key" do + response = instance_double(Net::HTTPInternalServerError, code: "500", + body: { "data" => "value" }.to_json) + + result = described_class.handle_response(response) + + expect(result["error"]["message"]).to eq("HTTP request failed with status 500") + expect(result["data"]).to eq("value") + end + + it "handles non-2xx with empty body" do + response = instance_double(Net::HTTPInternalServerError, code: "500", body: "") + + result = described_class.handle_response(response) + + expect(result["error"]["message"]).to eq("HTTP request failed with status 500") + end + + it "handles non-2xx with non-JSON body" do + response = instance_double(Net::HTTPInternalServerError, code: "500", + body: "Server Error") + + result = described_class.handle_response(response) + + expect(result["error"]["message"]).to eq("HTTP request failed with status 500") + end + end + + describe ".post (error unwrapping)" do + it "re-raises original exception from HttpClient RuntimeError wrapper" do + allow(Utils::HttpClient).to receive(:post).and_raise( + RuntimeError.new("HTTP request failed: Connection refused").tap do |e| + # Simulate Ruby's cause mechanism + allow(e).to receive(:cause).and_return(Errno::ECONNREFUSED.new) + end + ) + + expect { described_class.post(base_url: url) }.to raise_error(Errno::ECONNREFUSED) + end + end + + describe ".get (error unwrapping)" do + it "re-raises original exception from HttpClient RuntimeError wrapper" do + allow(Utils::HttpClient).to receive(:get).and_raise( + RuntimeError.new("HTTP request failed: Connection refused").tap do |e| + allow(e).to receive(:cause).and_return(Errno::ECONNREFUSED.new) + end + ) + + expect { described_class.get(base_url: url) }.to raise_error(Errno::ECONNREFUSED) + end + end +end diff --git a/server/spec/models/agents/component_spec.rb b/server/spec/models/agents/component_spec.rb index ffb21d310..321c633dc 100644 --- a/server/spec/models/agents/component_spec.rb +++ b/server/spec/models/agents/component_spec.rb @@ -19,7 +19,19 @@ llm_model: 3, prompt_template: 4, vector_store: 5, +<<<<<<< HEAD python_custom: 6 +======= + python_custom: 6, + conditional: 7, + guardrails: 8, + tool: 9, + agent: 10, + knowledge_base: 11, + llm_router: 12, + human_in_loop: 13, + a2a_agent: 14 +>>>>>>> afa98e94b (feat(CE): add a2a_agent component type, masked config, and JSON-RPC client (#1719)) ) } end @@ -40,4 +52,85 @@ expect(component.position).to eq(position_data) end end + + describe "#masked_configuration" do + context "when component is a2a_agent" do + let(:component) do + create(:component, component_type: :a2a_agent, configuration: { + "url" => "https://remote-agent.example.com", + "auth_type" => "bearer", + "auth_config" => { "secret" => "super-secret-token" }, + "headers" => { "X-Custom" => "value" }, + "timeout" => 60, + "agent_card" => { "name" => "Test Agent" }, + "skills" => [{ "id" => "s1", "name" => "Skill 1" }] + }) + end + + it "masks auth_config secret values" do + masked = component.masked_configuration + expect(masked["auth_config"]["secret"]).to eq("*************") + end + + it "preserves non-secret fields" do + masked = component.masked_configuration + expect(masked["url"]).to eq("https://remote-agent.example.com") + expect(masked["auth_type"]).to eq("bearer") + expect(masked["timeout"]).to eq(60) + expect(masked["agent_card"]["name"]).to eq("Test Agent") + expect(masked["skills"].first["name"]).to eq("Skill 1") + end + + it "does not mutate the original configuration" do + component.masked_configuration + expect(component.configuration["auth_config"]["secret"]).to eq("super-secret-token") + end + end + + context "when component is a2a_agent with basic auth" do + let(:component) do + create(:component, component_type: :a2a_agent, configuration: { + "url" => "https://remote-agent.example.com", + "auth_type" => "basic", + "auth_config" => { "username" => "admin", "secret" => "password123" } + }) + end + + it "masks all auth_config string values" do + masked = component.masked_configuration + expect(masked["auth_config"]["username"]).to eq("*************") + expect(masked["auth_config"]["secret"]).to eq("*************") + end + end + + context "when component is a2a_agent with no auth_config" do + let(:component) do + create(:component, component_type: :a2a_agent, configuration: { + "url" => "https://remote-agent.example.com", + "auth_type" => "none" + }) + end + + it "returns configuration unchanged" do + masked = component.masked_configuration + expect(masked).to eq(component.configuration) + end + end + + context "when component is not a2a_agent" do + let(:component) { create(:component, component_type: :agent) } + + it "returns configuration as-is" do + expect(component.masked_configuration).to eq(component.configuration) + end + end + + context "when configuration is blank" do + let(:component) { create(:component, component_type: :a2a_agent, configuration: {}) } + + it "returns the blank configuration" do + expect(component.masked_configuration).to eq({}) + end + end + end end