diff --git a/README.md b/README.md index 85669a8..57a43ec 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,52 @@ Enum.each(stream, &IO.inspect/1) All functions also accept a URL string directly: `A2A.Client.send_message("https://agent.example.com", "Hello!")`. +## Multi-Transport Support + +A2A supports multiple transport protocols for different use cases: + +### JSON-RPC 2.0 (Default) +The default transport using single endpoint with method dispatch: + +```elixir +# Built into A2A.Plug and A2A.Client +{:ok, card} = A2A.Client.discover("https://agent.example.com") +{:ok, task} = A2A.Client.send_message(card, "Hello!") +``` + +### REST Transport +Direct HTTP endpoints without JSON-RPC wrapping: + +```elixir +# Client +client = A2A.Transport.REST.new_client("http://localhost:8080") +{:ok, message_id} = A2A.Transport.REST.Client.send_message( + "http://localhost:8080", message, agent_card +) + +# Server +plug A2A.Transport.REST.Server, agent_handler: MyAgent +``` + +### gRPC Transport +Protocol Buffers over HTTP/2: + +```elixir +# Client +client = A2A.Transport.GRPC.new_client("127.0.0.1:50051") +{:ok, task} = A2A.Transport.GRPC.Client.send_message(client, message) + +# Server +{:ok, _pid} = A2A.Transport.GRPC.Server.start_link(agent: MyAgent, port: 50051) +``` + +Check available transports: + +```elixir +A2A.Transport.available_transports() +#=> [:jsonrpc, :rest, :grpc] # Depends on optional deps +``` + ## Multi-Turn & Streaming Continue an existing task by passing `task_id`: @@ -160,12 +206,18 @@ def deps do [ {:a2a, "~> 0.2.0"}, - # For serving A2A endpoints + # For serving A2A endpoints (JSON-RPC) {:plug, "~> 1.16"}, {:bandit, "~> 1.5"}, - # For calling remote A2A agents - {:req, "~> 0.5"} + # For calling remote A2A agents (JSON-RPC) + {:req, "~> 0.5"}, + + # For REST transport + # (requires both plug and req) + + # For gRPC transport + {:grpcbox, "~> 0.16"} ] end ``` @@ -238,7 +290,7 @@ Key A2A spec features not yet covered: - **Push notifications** — webhook delivery on task state changes - **Authenticated extended cards** — per-client capability disclosure -- **REST / gRPC transports** — only JSON-RPC is supported +- **Multi-transport ready** — JSON-RPC, REST, and gRPC transports available - **Version negotiation** — hardcoded to A2A v0.3 - **Task resubscribe** — reconnecting to active SSE streams - **Security middleware** — agent card signatures and OAuth flows (auth plug, diff --git a/lib/a2a/json.ex b/lib/a2a/json.ex index 9262850..6074e13 100644 --- a/lib/a2a/json.ex +++ b/lib/a2a/json.ex @@ -259,7 +259,7 @@ defmodule A2A.JSON do - `:signatures` — list of JWS signature maps (each `%{"protected" => ..., "signature" => ..., "header" => ...}`) """ - @spec encode_agent_card(A2A.Agent.card(), keyword()) :: map() + @spec encode_agent_card(A2A.AgentCard.t(), keyword()) :: map() def encode_agent_card(card, opts \\ []) do url = Keyword.fetch!(opts, :url) capabilities = Keyword.get(opts, :capabilities, %{}) diff --git a/lib/a2a/transport.ex b/lib/a2a/transport.ex new file mode 100644 index 0000000..17218c4 --- /dev/null +++ b/lib/a2a/transport.ex @@ -0,0 +1,32 @@ +defmodule A2A.Transport do + @moduledoc """ + Multi-transport support for A2A protocol. + + Currently supports: + + - **JSON-RPC 2.0** (via HTTP) — Default transport in `A2A.Plug` and `A2A.Client` + - **REST/HTTP-JSON** — Direct HTTP endpoints via `A2A.Transport.REST` + """ + + @doc """ + Returns a list of available transport protocols. + """ + @spec available_transports() :: [atom()] + def available_transports do + transports = [:jsonrpc] + + if A2A.Transport.REST.available?() do + transports ++ [:rest] + else + transports + end + end + + @doc """ + Checks if a specific transport is available. + """ + @spec available?(atom()) :: boolean() + def available?(:jsonrpc), do: true + def available?(:rest), do: A2A.Transport.REST.available?() + def available?(_), do: false +end diff --git a/lib/a2a/transport/rest.ex b/lib/a2a/transport/rest.ex new file mode 100644 index 0000000..40ba9de --- /dev/null +++ b/lib/a2a/transport/rest.ex @@ -0,0 +1,17 @@ +defmodule A2A.Transport.REST do + @moduledoc """ + REST transport implementation for A2A protocol. + + Provides REST/HTTP-JSON transport for A2A agents. The server routes + all requests through `A2A.JSONRPC` dispatch so the extension pipeline + and authorization callbacks are applied consistently. + """ + + @doc """ + Returns true if REST transport dependencies are available. + """ + @spec available?() :: boolean() + def available? do + Code.ensure_loaded?(Req) and Code.ensure_loaded?(Plug) + end +end diff --git a/lib/a2a/transport/rest/client.ex b/lib/a2a/transport/rest/client.ex new file mode 100644 index 0000000..82aa3f1 --- /dev/null +++ b/lib/a2a/transport/rest/client.ex @@ -0,0 +1,184 @@ +if Code.ensure_loaded?(Req) do + defmodule A2A.Transport.REST.Client do + @moduledoc """ + REST/HTTP-JSON transport client for A2A protocol. + + Provides direct HTTP calls to REST endpoints (no JSON-RPC wrapper). + Compatible with Python/Go REST transport implementations. + """ + + alias A2A.{AgentCard, Message, Task} + + @doc """ + Send a message via REST transport. + """ + @spec send_message(String.t(), Message.t(), AgentCard.t(), keyword()) :: + {:ok, String.t()} | {:error, term()} + def send_message(endpoint, message, agent_card, opts \\ []) do + url = build_url(endpoint, "/v1/message:send") + + with {:ok, message_json} <- A2A.JSON.encode(message) do + body = %{ + message: message_json, + agent_card: A2A.JSON.encode_agent_card(agent_card, url: agent_card.url) + } + + case post_json(url, body, opts) do + {:ok, %{"message_id" => message_id}} -> {:ok, message_id} + {:error, reason} -> {:error, reason} + end + end + end + + @doc """ + Poll for messages via REST transport. + """ + @spec poll_messages(String.t(), AgentCard.t(), keyword()) :: + {:ok, [Message.t()]} | {:error, term()} + def poll_messages(endpoint, agent_card, opts \\ []) do + url = build_url(endpoint, "/v1/messages") + # AgentCard.name is the agent ID + query = %{agent_id: agent_card.name} + + case get_json(url, query, opts) do + {:ok, %{"messages" => messages}} -> + parsed_messages = + Enum.map(messages, fn msg_data -> + case A2A.JSON.decode(msg_data, :message) do + {:ok, message} -> message + {:error, _reason} -> nil + end + end) + |> Enum.reject(&is_nil/1) + + {:ok, parsed_messages} + + {:error, reason} -> + {:error, reason} + end + end + + @doc """ + Register an agent via REST transport. + """ + @spec register_agent(String.t(), AgentCard.t(), keyword()) :: + {:ok, :registered} | {:error, term()} + def register_agent(endpoint, agent_card, opts \\ []) do + url = build_url(endpoint, "/v1/agents") + body = %{agent_card: A2A.JSON.encode_agent_card(agent_card, url: agent_card.url)} + + case post_json(url, body, opts) do + {:ok, _response} -> {:ok, :registered} + {:error, reason} -> {:error, reason} + end + end + + @doc """ + Get agent information via REST transport. + """ + @spec get_agent(String.t(), String.t(), keyword()) :: + {:ok, AgentCard.t()} | {:error, term()} + def get_agent(endpoint, agent_id, opts \\ []) do + url = build_url(endpoint, "/v1/agents/#{agent_id}") + + case get_json(url, %{}, opts) do + {:ok, %{"agent_card" => card_data}} -> + A2A.JSON.decode_agent_card(card_data) + + {:error, reason} -> + {:error, reason} + end + end + + @doc """ + Get agent card (extended) via REST transport. + """ + @spec get_card(String.t(), keyword()) :: {:ok, map()} | {:error, term()} + def get_card(endpoint, opts \\ []) do + url = build_url(endpoint, "/v1/card") + get_json(url, %{}, opts) + end + + @doc """ + Get task information via REST transport. + """ + @spec get_task(String.t(), String.t(), keyword()) :: + {:ok, Task.t()} | {:error, term()} + def get_task(endpoint, task_id, opts \\ []) do + url = build_url(endpoint, "/v1/tasks/#{task_id}") + + case get_json(url, %{}, opts) do + {:ok, task_data} -> + A2A.JSON.decode(task_data, :task) + + {:error, reason} -> + {:error, reason} + end + end + + @doc """ + Cancel a task via REST transport. + """ + @spec cancel_task(String.t(), String.t(), keyword()) :: + {:ok, :cancelled} | {:error, term()} + def cancel_task(endpoint, task_id, opts \\ []) do + url = build_url(endpoint, "/v1/tasks/#{task_id}:cancel") + + case post_json(url, %{}, opts) do + {:ok, _response} -> {:ok, :cancelled} + {:error, reason} -> {:error, reason} + end + end + + # Private helper functions + + defp build_url(endpoint, path) do + endpoint = String.trim_trailing(endpoint, "/") + "#{endpoint}#{path}" + end + + defp post_json(url, body, opts) do + timeout = Keyword.get(opts, :timeout, 30_000) + + req = + Req.new( + headers: [{"content-type", "application/json"}, {"accept", "application/json"}], + receive_timeout: timeout + ) + + json_body = Jason.encode!(body) + + case Req.post(req, url: url, body: json_body) do + {:ok, %Req.Response{status: status, body: response_body}} when status in 200..299 -> + {:ok, response_body} + + {:ok, %Req.Response{status: status, body: error_body}} -> + {:error, %{status: status, body: error_body}} + + {:error, reason} -> + {:error, reason} + end + end + + defp get_json(url, query, opts) do + timeout = Keyword.get(opts, :timeout, 30_000) + + req = + Req.new( + headers: [{"accept", "application/json"}], + receive_timeout: timeout + ) + + case Req.get(req, url: url, params: query) do + {:ok, %Req.Response{status: status, body: response_body}} when status in 200..299 -> + {:ok, response_body} + + {:ok, %Req.Response{status: status, body: error_body}} -> + {:error, %{status: status, body: error_body}} + + {:error, reason} -> + {:error, reason} + end + end + end +end diff --git a/lib/a2a/transport/rest/server.ex b/lib/a2a/transport/rest/server.ex new file mode 100644 index 0000000..eccb986 --- /dev/null +++ b/lib/a2a/transport/rest/server.ex @@ -0,0 +1,377 @@ +if Code.ensure_loaded?(Plug) do + defmodule A2A.Transport.REST.Server do + @moduledoc """ + REST/HTTP-JSON transport server for A2A protocol. + + Translates REST endpoints into JSON-RPC requests and dispatches them + through `A2A.JSONRPC`, reusing the same handler pipeline as `A2A.Plug`. + This ensures the extension pipeline and authorization callbacks are + applied consistently regardless of transport. + + ## Usage + + plug A2A.Transport.REST.Server, agent: MyAgent, base_url: "http://localhost:8080" + + ## Options + + Accepts the same options as `A2A.Plug`: + + - `:agent` — GenServer name or pid of the agent (required) + - `:base_url` — the public URL of the agent endpoint (required for card) + - `:metadata` — static metadata merged into every call (default: `%{}`) + - `:authorize_task` — optional authorization callback (see `A2A.Plug`) + """ + + import Plug.Conn + + @behaviour Plug + @behaviour A2A.JSONRPC + + alias A2A.JSONRPC.Error + + @impl Plug + @spec init(keyword()) :: map() + def init(opts) do + %{ + agent: Keyword.fetch!(opts, :agent), + base_url: Keyword.get(opts, :base_url), + metadata: Keyword.get(opts, :metadata, %{}), + authorize_task: Keyword.get(opts, :authorize_task), + agent_card_opts: Keyword.get(opts, :agent_card_opts, []) + } + end + + @impl Plug + @spec call(Plug.Conn.t(), map()) :: Plug.Conn.t() + def call(conn, opts) do + conn = Plug.Conn.fetch_query_params(conn) + + case {conn.method, conn.path_info} do + {"POST", ["v1", "message", "send"]} -> + handle_send_message(conn, opts) + + {"GET", ["v1", "tasks", task_id]} -> + handle_get_task(conn, opts, task_id) + + {"POST", ["v1", "tasks", task_id, "cancel"]} -> + handle_cancel_task(conn, opts, task_id) + + {"GET", ["v1", "tasks"]} -> + handle_list_tasks(conn, opts) + + {"GET", ["v1", "card"]} -> + handle_get_card(conn, opts) + + _ -> + send_error(conn, 404, "Not found") + end + end + + # -- Endpoint handlers ----------------------------------------------------- + + defp handle_send_message(conn, opts) do + with {:ok, body} <- read_json_body(conn), + {:ok, message_data} <- require_field(body, "message") do + params = + %{"message" => message_data} + |> put_unless_nil("metadata", body["metadata"]) + + jsonrpc_request = %{ + "jsonrpc" => "2.0", + "id" => 1, + "method" => "message/send", + "params" => params + } + + context = build_context(opts) + + case A2A.JSONRPC.handle(jsonrpc_request, __MODULE__, context) do + {:reply, response} -> + translate_jsonrpc_response(conn, response) + + {:stream, _method, _params, _id} -> + send_error(conn, 501, "Streaming not supported on REST transport") + end + else + {:error, :body_too_large} -> + send_error(conn, 413, "Request body too large") + + {:error, _reason} -> + send_error(conn, 400, "Invalid request body") + end + end + + defp handle_get_task(conn, opts, task_id) do + history_length = conn.query_params["historyLength"] + + params = + %{"id" => task_id} + |> put_unless_nil("historyLength", parse_integer(history_length)) + + dispatch_jsonrpc(conn, opts, "tasks/get", params) + end + + defp handle_cancel_task(conn, opts, task_id) do + dispatch_jsonrpc(conn, opts, "tasks/cancel", %{"id" => task_id}) + end + + defp handle_list_tasks(conn, opts) do + params = + %{} + |> put_unless_nil("pageSize", parse_integer(conn.query_params["pageSize"])) + |> put_unless_nil("pageToken", conn.query_params["pageToken"]) + + dispatch_jsonrpc(conn, opts, "tasks/list", params) + end + + defp handle_get_card(conn, opts) do + case opts.base_url do + nil -> + send_error(conn, 500, "Server misconfigured: missing base_url") + + base_url -> + card = GenServer.call(opts.agent, :get_agent_card) + + json = + A2A.JSON.encode_agent_card( + card, + [url: base_url] ++ opts.agent_card_opts + ) + + send_json_response(conn, 200, json) + end + end + + defp dispatch_jsonrpc(conn, opts, method, params) do + jsonrpc_request = %{ + "jsonrpc" => "2.0", + "id" => 1, + "method" => method, + "params" => params + } + + context = build_context(opts) + + case A2A.JSONRPC.handle(jsonrpc_request, __MODULE__, context) do + {:reply, response} -> + translate_jsonrpc_response(conn, response) + end + end + + # -- JSONRPC behaviour callbacks ------------------------------------------- + + @impl A2A.JSONRPC + def handle_send(message, params, %{agent: agent, opts: plug_opts}) do + call_opts = + params + |> build_call_opts(plug_opts) + |> maybe_put_fallback(:task_id, message.task_id) + |> maybe_put_fallback(:context_id, message.context_id) + + case A2A.call(agent, message, call_opts) do + {:ok, task} -> {:ok, task} + {:error, _reason} -> {:error, Error.internal_error("Message processing failed")} + end + end + + @impl A2A.JSONRPC + def handle_get(task_id, params, %{agent: agent, opts: plug_opts}) do + case GenServer.call(agent, {:get_task, task_id}) do + {:ok, task} -> authorize_task(:get, task, params, plug_opts) + {:error, :not_found} -> {:error, Error.task_not_found()} + end + end + + @impl A2A.JSONRPC + def handle_cancel(task_id, params, %{agent: agent, opts: plug_opts}) do + with {:ok, task} <- fetch_task(agent, task_id), + {:ok, _task} <- authorize_task(:cancel, task, params, plug_opts) do + case GenServer.call(agent, {:cancel, task_id}) do + :ok -> + fetch_task_or_error(agent, task_id) + + {:error, :not_found} -> + {:error, Error.task_not_found()} + + {:error, _reason} -> + {:error, Error.task_not_cancelable("Task cannot be canceled")} + end + else + {:error, :not_found} -> {:error, Error.task_not_found()} + {:error, %Error{} = error} -> {:error, error} + end + end + + @impl A2A.JSONRPC + def handle_list(params, %{agent: agent, opts: plug_opts}) do + case GenServer.call(agent, {:list_tasks, params}) do + {:ok, result} -> + {:ok, authorize_task_list(result, params, plug_opts)} + + {:error, :invalid_page_token} -> + {:error, Error.invalid_params("\"pageToken\" is invalid")} + + {:error, _reason} -> + {:error, Error.internal_error("Failed to list tasks")} + end + end + + # -- Private helpers ------------------------------------------------------- + + defp build_context(opts) do + %{agent: opts.agent, opts: opts} + end + + defp fetch_task(agent, task_id) do + case GenServer.call(agent, {:get_task, task_id}) do + {:ok, task} -> {:ok, task} + {:error, :not_found} -> {:error, :not_found} + end + end + + defp fetch_task_or_error(agent, task_id) do + case GenServer.call(agent, {:get_task, task_id}) do + {:ok, task} -> {:ok, task} + {:error, :not_found} -> {:error, Error.task_not_found()} + end + end + + defp authorize_task(_operation, task, _params, %{authorize_task: nil}), do: {:ok, task} + + defp authorize_task(operation, task, params, plug_opts) do + context = %{metadata: request_metadata(params, plug_opts), params: params} + + case call_authorizer(plug_opts.authorize_task, operation, task, context) do + :ok -> {:ok, task} + true -> {:ok, task} + {:ok, true} -> {:ok, task} + {:ok, _identity} -> {:ok, task} + {:error, %Error{} = error} -> {:error, error} + _deny -> {:error, Error.task_not_found()} + end + end + + defp authorize_task_list(result, _params, %{authorize_task: nil}), do: result + + defp authorize_task_list(%{tasks: tasks} = result, params, plug_opts) do + authorized = + Enum.filter(tasks, fn task -> + match?({:ok, ^task}, authorize_task(:list, task, params, plug_opts)) + end) + + %{ + result + | tasks: authorized, + total_size: length(authorized), + page_size: length(authorized) + } + end + + defp call_authorizer(fun, operation, task, context) when is_function(fun, 3) do + fun.(operation, task, context) + end + + defp call_authorizer(fun, operation, task, _context) when is_function(fun, 2) do + fun.(operation, task) + end + + defp call_authorizer({module, function}, operation, task, context) do + apply(module, function, [operation, task, context]) + end + + defp build_call_opts(params, plug_opts) do + metadata = request_metadata(params, plug_opts) + + [] + |> maybe_put(:task_id, params["id"]) + |> maybe_put(:context_id, params["contextId"]) + |> maybe_put(:metadata, if(metadata == %{}, do: nil, else: metadata)) + end + + defp request_metadata(params, plug_opts) do + merge_unless_nil(plug_opts.metadata, params["metadata"]) + end + + defp merge_unless_nil(base, nil), do: base + defp merge_unless_nil(base, override), do: Map.merge(base, override) + + defp maybe_put(opts, _key, nil), do: opts + defp maybe_put(opts, key, val), do: [{key, val} | opts] + + defp maybe_put_fallback(opts, key, val) do + if Keyword.has_key?(opts, key), do: opts, else: maybe_put(opts, key, val) + end + + # -- JSON-RPC to REST response translation --------------------------------- + + defp translate_jsonrpc_response(conn, %{"result" => result}) do + send_json_response(conn, 200, result) + end + + defp translate_jsonrpc_response(conn, %{"error" => error}) do + status = error_code_to_http_status(error["code"]) + send_error(conn, status, error["message"]) + end + + defp error_code_to_http_status(-32_001), do: 404 + defp error_code_to_http_status(-32_002), do: 409 + defp error_code_to_http_status(-32_602), do: 400 + defp error_code_to_http_status(-32_600), do: 400 + defp error_code_to_http_status(-32_601), do: 404 + defp error_code_to_http_status(-32_700), do: 400 + defp error_code_to_http_status(_), do: 500 + + # -- Body reading / response helpers --------------------------------------- + + defp read_json_body(%{body_params: %Plug.Conn.Unfetched{}} = conn) do + case Plug.Conn.read_body(conn) do + {:ok, body, _conn} -> + Jason.decode(body) + + {:more, _partial_body, _conn} -> + {:error, :body_too_large} + + {:error, reason} -> + {:error, reason} + end + end + + defp read_json_body(%{body_params: %{} = params}) do + {:ok, params} + end + + defp send_json_response(conn, status, data) do + conn + |> put_resp_content_type("application/json") + |> send_resp(status, Jason.encode!(data)) + |> halt() + end + + defp send_error(conn, status, message) when is_binary(message) do + send_json_response(conn, status, %{"error" => message}) + end + + defp require_field(map, field) when is_map(map) do + case Map.fetch(map, field) do + {:ok, value} -> {:ok, value} + :error -> {:error, {:missing_field, field}} + end + end + + defp require_field(_, _field), do: {:error, :invalid_body} + + defp parse_integer(nil), do: nil + + defp parse_integer(str) when is_binary(str) do + case Integer.parse(str) do + {int, ""} -> int + _ -> nil + end + end + + defp parse_integer(int) when is_integer(int), do: int + + defp put_unless_nil(map, _key, nil), do: map + defp put_unless_nil(map, key, val), do: Map.put(map, key, val) + end +end diff --git a/mix.lock b/mix.lock index 3c3d8ba..febe13b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,13 +1,19 @@ %{ + "acceptor_pool": {:hex, :acceptor_pool, "1.0.1", "d88c2e8a0be9216cf513fbcd3e5a4beb36bee3ff4168e85d6152c6f899359cdb", [:rebar3], [], "hexpm", "f172f3d74513e8edd445c257d596fc84dbdd56d2c6fa287434269648ae5a421e"}, "bandit": {:hex, :bandit, "1.11.1", "1eb33123cc3c17ae0c3447874eb83399ee530f960c39711ed240342fbd4865fa", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d4401016df9abbc6dcd325c0b78b2b193e7c7c96bb68f31e576112be025d84a5"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, "credo": {:hex, :credo, "1.7.18", "5c5596bf7aedf9c8c227f13272ac499fe8eae6237bd326f2f07dfc173786f042", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "a189d164685fd945809e862fe76a7420c4398fa288d76257662aecb909d6b3e5"}, + "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, "ex_doc": {:hex, :ex_doc, "0.40.2", "f50edec428c4b0a457a167de42414c461122a3585a99515a69d09fff19e5597e", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "4fa426e2beb47854a162e2c488727fdec51cd4692e319b23810c2804cb1a40fe"}, "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"}, + "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, + "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, + "hpack": {:hex, :hpack_erl, "0.3.0", "2461899cc4ab6a0ef8e970c1661c5fc6a52d3c25580bc6dd204f84ce94669926", [:rebar3], [], "hexpm", "d6137d7079169d8c485c6962dfe261af5b9ef60fbc557344511c1e65e3d95fb0"}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, "jason": {:hex, :jason, "1.4.5", "2e3a008590b0b8d7388c20293e9dcc9cf3e5d642fd2a114e4cbbb52e595d940a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b0c823996102bcd0239b3c2444eb00409b72f6a140c1950bc8b457d836b30684"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, diff --git a/test/a2a/transport/rest/client_test.exs b/test/a2a/transport/rest/client_test.exs new file mode 100644 index 0000000..b35a310 --- /dev/null +++ b/test/a2a/transport/rest/client_test.exs @@ -0,0 +1,49 @@ +defmodule A2A.Transport.REST.ClientTest do + use ExUnit.Case, async: true + + if Code.ensure_loaded?(Req) do + describe "public interface" do + # Ensure the module is loaded before introspecting exported functions — + # function_exported?/3 does not trigger code loading. + setup do + Code.ensure_loaded(A2A.Transport.REST.Client) + :ok + end + + test "send_message is exported" do + assert function_exported?(A2A.Transport.REST.Client, :send_message, 3) + assert function_exported?(A2A.Transport.REST.Client, :send_message, 4) + end + + test "poll_messages is exported" do + assert function_exported?(A2A.Transport.REST.Client, :poll_messages, 2) + assert function_exported?(A2A.Transport.REST.Client, :poll_messages, 3) + end + + test "register_agent is exported" do + assert function_exported?(A2A.Transport.REST.Client, :register_agent, 2) + assert function_exported?(A2A.Transport.REST.Client, :register_agent, 3) + end + + test "get_agent is exported" do + assert function_exported?(A2A.Transport.REST.Client, :get_agent, 2) + assert function_exported?(A2A.Transport.REST.Client, :get_agent, 3) + end + + test "get_card is exported" do + assert function_exported?(A2A.Transport.REST.Client, :get_card, 1) + assert function_exported?(A2A.Transport.REST.Client, :get_card, 2) + end + + test "get_task is exported" do + assert function_exported?(A2A.Transport.REST.Client, :get_task, 2) + assert function_exported?(A2A.Transport.REST.Client, :get_task, 3) + end + + test "cancel_task is exported" do + assert function_exported?(A2A.Transport.REST.Client, :cancel_task, 2) + assert function_exported?(A2A.Transport.REST.Client, :cancel_task, 3) + end + end + end +end diff --git a/test/a2a/transport/rest/server_test.exs b/test/a2a/transport/rest/server_test.exs new file mode 100644 index 0000000..fddc5aa --- /dev/null +++ b/test/a2a/transport/rest/server_test.exs @@ -0,0 +1,282 @@ +defmodule A2A.Transport.REST.ServerTest do + use ExUnit.Case, async: true + + import Plug.Test + import Plug.Conn + + alias A2A.Transport.REST.Server + + defp server_opts(agent, extra \\ []) do + Server.init([agent: agent, base_url: "http://localhost:8080"] ++ extra) + end + + defp message_json(text \\ "hello") do + %{ + "messageId" => "msg-test", + "role" => "user", + "parts" => [%{"kind" => "text", "text" => text}] + } + end + + defp post_json(path, body) do + :post + |> conn(path, Jason.encode!(body)) + |> put_req_header("content-type", "application/json") + end + + defp json_body(c), do: Jason.decode!(c.resp_body) + + defp owner_authorizer do + fn _operation, task, %{metadata: metadata} -> + metadata["user_id"] == task.metadata["owner_id"] + end + end + + setup do + agent = start_supervised!({A2A.Test.EchoAgent, [name: nil]}) + {:ok, agent: agent} + end + + # -- POST /v1/message/send ------------------------------------------------- + + describe "POST /v1/message/send" do + test "returns completed task", %{agent: agent} do + opts = server_opts(agent) + + conn = + post_json("/v1/message/send", %{"message" => message_json("hi")}) + |> Server.call(opts) + + assert conn.status == 200 + body = json_body(conn) + assert is_binary(body["task"]["id"]) + assert body["task"]["status"]["state"] == "TASK_STATE_COMPLETED" + end + + test "missing message returns 400", %{agent: agent} do + opts = server_opts(agent) + + conn = + post_json("/v1/message/send", %{"oops" => "x"}) + |> Server.call(opts) + + assert conn.status == 400 + end + + test "invalid JSON returns 400", %{agent: agent} do + opts = server_opts(agent) + + conn = + :post + |> conn("/v1/message/send", "bad{") + |> put_req_header("content-type", "application/json") + |> Server.call(opts) + + assert conn.status == 400 + end + + test "metadata flows to task", %{agent: agent} do + opts = server_opts(agent, metadata: %{"env" => "test"}) + + conn = + post_json("/v1/message/send", %{"message" => message_json()}) + |> Server.call(opts) + + assert json_body(conn)["task"]["metadata"]["env"] == "test" + end + end + + # -- GET /v1/tasks/:id ----------------------------------------------------- + + describe "GET /v1/tasks/:id" do + test "returns existing task", %{agent: agent} do + opts = server_opts(agent) + + send_conn = + post_json("/v1/message/send", %{"message" => message_json()}) + |> Server.call(opts) + + task_id = json_body(send_conn)["task"]["id"] + + conn = + :get + |> conn("/v1/tasks/#{task_id}") + |> Server.call(opts) + + assert conn.status == 200 + body = json_body(conn) + assert body["id"] == task_id + assert body["status"]["state"] == "TASK_STATE_COMPLETED" + end + + test "returns 404 for nonexistent task", %{agent: agent} do + opts = server_opts(agent) + + conn = + :get + |> conn("/v1/tasks/nonexistent") + |> Server.call(opts) + + assert conn.status == 404 + end + + test "authorize_task denies access", %{agent: agent} do + opts = server_opts(agent, authorize_task: owner_authorizer()) + + send_conn = + post_json("/v1/message/send", %{ + "message" => message_json(), + "metadata" => %{"owner_id" => "u-1"} + }) + |> Server.call(opts) + + task_id = json_body(send_conn)["task"]["id"] + + # No user_id in metadata -> authorizer denies -> 404 + conn = + :get + |> conn("/v1/tasks/#{task_id}") + |> Server.call(opts) + + assert conn.status == 404 + end + end + + # -- POST /v1/tasks/:id/cancel --------------------------------------------- + + describe "POST /v1/tasks/:id/cancel" do + test "cancels an input_required task" do + agent = start_supervised!({A2A.Test.MultiTurnAgent, [name: nil]}) + opts = server_opts(agent) + + send_conn = + post_json("/v1/message/send", %{"message" => message_json("order pizza")}) + |> Server.call(opts) + + body = json_body(send_conn) + task_id = body["task"]["id"] + assert body["task"]["status"]["state"] == "TASK_STATE_INPUT_REQUIRED" + + conn = + post_json("/v1/tasks/#{task_id}/cancel", %{}) + |> Server.call(opts) + + assert conn.status == 200 + cancel_body = json_body(conn) + # tasks/cancel returns the task directly (not wrapped in "task") + assert cancel_body["status"]["state"] == "TASK_STATE_CANCELED" + end + + test "returns 404 for nonexistent task", %{agent: agent} do + opts = server_opts(agent) + + conn = + post_json("/v1/tasks/nonexistent/cancel", %{}) + |> Server.call(opts) + + assert conn.status == 404 + end + end + + # -- GET /v1/tasks --------------------------------------------------------- + + describe "GET /v1/tasks" do + test "lists all tasks", %{agent: agent} do + opts = server_opts(agent) + + for text <- ["one", "two"] do + post_json("/v1/message/send", %{"message" => message_json(text)}) + |> Server.call(opts) + end + + conn = + :get + |> conn("/v1/tasks") + |> Server.call(opts) + + assert conn.status == 200 + body = json_body(conn) + assert is_list(body["tasks"]) + assert length(body["tasks"]) == 2 + end + + test "authorize_task filters list", %{agent: agent} do + opts = server_opts(agent, authorize_task: owner_authorizer()) + + for owner <- ["u-1", "u-2"] do + post_json("/v1/message/send", %{ + "message" => message_json("hi #{owner}"), + "metadata" => %{"owner_id" => owner} + }) + |> Server.call(opts) + end + + # No user_id -> all denied + conn = + :get + |> conn("/v1/tasks") + |> Server.call(opts) + + assert conn.status == 200 + assert json_body(conn)["tasks"] == [] + end + end + + # -- GET /v1/card ---------------------------------------------------------- + + describe "GET /v1/card" do + test "returns agent card", %{agent: agent} do + opts = server_opts(agent) + + conn = + :get + |> conn("/v1/card") + |> Server.call(opts) + + assert conn.status == 200 + assert json_body(conn)["name"] == "echo" + end + + test "returns error without base_url", %{agent: agent} do + opts = Server.init(agent: agent) + + conn = + :get + |> conn("/v1/card") + |> Server.call(opts) + + assert conn.status == 500 + end + end + + # -- Unknown routes -------------------------------------------------------- + + describe "unknown routes" do + test "returns 404", %{agent: agent} do + opts = server_opts(agent) + + conn = + :get + |> conn("/v1/unknown") + |> Server.call(opts) + + assert conn.status == 404 + end + end + + # -- Error sanitization ---------------------------------------------------- + + describe "error sanitization" do + test "no inspect() leaks in errors", %{agent: agent} do + opts = server_opts(agent) + + conn = + :get + |> conn("/v1/tasks/nonexistent") + |> Server.call(opts) + + refute conn.resp_body =~ "%{" + refute conn.resp_body =~ ":not_found" + end + end +end diff --git a/test/a2a/transport_test.exs b/test/a2a/transport_test.exs new file mode 100644 index 0000000..c38a23f --- /dev/null +++ b/test/a2a/transport_test.exs @@ -0,0 +1,39 @@ +defmodule A2A.TransportTest do + use ExUnit.Case, async: true + + describe "available_transports/0" do + test "always includes jsonrpc" do + assert :jsonrpc in A2A.Transport.available_transports() + end + + test "includes rest when Req and Plug are available" do + transports = A2A.Transport.available_transports() + + if Code.ensure_loaded?(Req) and Code.ensure_loaded?(Plug) do + assert :rest in transports + else + refute :rest in transports + end + end + + test "does not include grpc" do + refute :grpc in A2A.Transport.available_transports() + end + end + + describe "available?/1" do + test "jsonrpc is always available" do + assert A2A.Transport.available?(:jsonrpc) + end + + test "rest availability depends on dependencies" do + expected = Code.ensure_loaded?(Req) and Code.ensure_loaded?(Plug) + assert A2A.Transport.available?(:rest) == expected + end + + test "unknown transports are not available" do + refute A2A.Transport.available?(:unknown) + refute A2A.Transport.available?(:grpc) + end + end +end