Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dispatch_cli/commands/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def configure_provider(
"default_model": model,
"scope": "org",
"set_default": set_default,
"allow_overwrite": True,
},
headers=auth_headers,
timeout=30,
Expand Down Expand Up @@ -891,6 +892,7 @@ def setup_wizard(
"default_model": model,
"scope": remote_scope,
"set_default": set_default,
"allow_overwrite": True,
},
headers=auth_headers,
timeout=30,
Expand Down
14 changes: 4 additions & 10 deletions dispatch_cli/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
- Syntax-highlighted code blocks
"""

import sys
from contextlib import contextmanager
from io import StringIO
from typing import Literal

from rich.console import Console
Expand All @@ -28,8 +30,6 @@ def __init__(self, verbose: bool = False):
Args:
verbose: If True, show all messages including debug. If False, only show important messages.
"""
import sys

self.verbose = verbose
self.console = Console()
self._live_context: Live | None = None
Expand All @@ -52,18 +52,12 @@ def _print(
**kwargs: Extra fields for Rich console
"""
if self._is_piped:
from io import StringIO

from rich.console import Console as PlainConsole

# Render any Rich object to plain text
string_io = StringIO()
plain_console = PlainConsole(
file=string_io, force_terminal=False, no_color=True
)
plain_console = Console(file=string_io, force_terminal=False, no_color=True)
plain_console.print(message)
plain = string_io.getvalue().rstrip("\n")
print(f"{plain_prefix}{plain}", flush=True)
print(f"{plain_prefix}{plain}", file=sys.stderr, flush=True)
else:
self.console.print(message, **kwargs)

Expand Down
48 changes: 48 additions & 0 deletions dispatch_cli/mcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
CreateScheduleResponse,
DeleteScheduleRequest,
DeleteScheduleResponse,
EventRecord,
EventTraceResponse,
GetScheduleRequest,
GetScheduleResponse,
ListSchedulesRequest,
ListSchedulesResponse,
RebootAgentResponse,
RecentTracesResponse,
StopAgentResponse,
TopicListItem,
UpdateScheduleRequest,
UpdateScheduleResponse,
)
Expand Down Expand Up @@ -147,6 +151,50 @@ def get_topic_schema(self, topic: str, namespace: str | None = None) -> dict:
resp.raise_for_status()
return resp.json()

def list_topics(self, namespace: str) -> list[TopicListItem]:
"""List all topics in namespace."""
url = self._namespaced_url("/events/topics", namespace)
resp = self.client.get(url)
resp.raise_for_status()
return [TopicListItem.model_validate(t) for t in resp.json()]

def get_recent_events(
self,
namespace: str,
topic: str | None = None,
limit: int = 20,
) -> list[EventRecord]:
"""Get recent events, optionally filtered by topic."""
url = self._namespaced_url("/events/recent", namespace)
params: dict[str, str | int] = {"limit": limit}
if topic:
params["topic"] = topic
resp = self.client.get(url, params=params)
resp.raise_for_status()
return [EventRecord.model_validate(e) for e in resp.json()]

def get_event_trace(self, trace_id: str, namespace: str) -> EventTraceResponse:
"""Get full event trace by trace ID."""
url = self._namespaced_url(f"/events/trace/{trace_id}", namespace)
resp = self.client.get(url)
resp.raise_for_status()
return EventTraceResponse.model_validate(resp.json())

def get_recent_traces(
self,
namespace: str,
topic: str | None = None,
limit: int = 50,
) -> RecentTracesResponse:
"""Get recent trace summaries."""
url = self._namespaced_url("/events/traces/recent", namespace)
params: dict[str, str | int] = {"limit": limit}
if topic:
params["topic"] = topic
resp = self.client.get(url, params=params)
resp.raise_for_status()
return RecentTracesResponse.model_validate(resp.json())

# Invoke Operations
def invoke_function(
self,
Expand Down
81 changes: 80 additions & 1 deletion dispatch_cli/mcp/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Shared Pydantic models for MCP client and tools."""

from typing import Any
from typing import Any, Literal

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -146,3 +146,82 @@ class RebootAgentResponse(BaseModel):
description="Deployment job ID for polling status with get_deploy_status"
)
version: str = Field(description="Agent version being deployed")


# Topic & Event Models


class SubscribedHandler(BaseModel):
"""A handler subscribed to a topic."""

agent_name: str
handler_name: str


class TopicListItem(BaseModel):
"""A topic item as returned by the list topics endpoint."""

topic: str
topic_id: str | None = None
created_at: str | None = None
namespace: str | None = None
webhook_enabled: bool | None = None
webhook_provider: str | None = None
subscribers: list[str] = []
subscribed_handlers: list[SubscribedHandler] = []
integration: str | None = None
schema_: dict[str, Any] | None = Field(default=None, alias="schema")
schema_locked: bool = False
description: str | None = None
sdk_docs_url: str | None = None

model_config = {"populate_by_name": True}


class EventRecord(BaseModel):
"""A single event record from the event history."""

uid: str | None = None
message_type: str | None = None
topic: str | None = None
function_name: str | None = None
schedule_name: str | None = None
source: str | None = None
timestamp: str | None = None
trace_id: str | None = None
parent_id: str | None = None
payload: dict[str, Any] | None = None


class TraceSummary(BaseModel):
"""Summary of a trace (session) with agent invocations."""

trace_id: str
first_event_timestamp: str
event_count: int
trigger: str
trigger_type: Literal["topic", "function", "schedule", "unknown"]
trigger_agent: str | None = None
trigger_function: str | None = None
schedule_name: str | None = None
last_activity: str
root_event_uid: str | None = None
root_topic: str | None = None
agents_involved: list[str]


class RecentTracesResponse(BaseModel):
"""Response from the recent traces endpoint."""

total_events: int
unique_traces: int
traces: list[TraceSummary]


class EventTraceResponse(BaseModel):
"""Response from the event trace endpoint."""

events: list[dict[str, Any]] = Field(
description="Tree-structured events with invocation enrichment"
)
llm_summary: dict[str, Any] | None = None
115 changes: 115 additions & 0 deletions dispatch_cli/mcp/operator/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
CreateScheduleResponse,
DeleteScheduleRequest,
DeleteScheduleResponse,
EventRecord,
EventTraceResponse,
GetScheduleRequest,
GetScheduleResponse,
ListSchedulesRequest,
ListSchedulesResponse,
RebootAgentResponse,
RecentTracesResponse,
StopAgentResponse,
TopicListItem,
UpdateScheduleRequest,
UpdateScheduleResponse,
)
Expand Down Expand Up @@ -140,6 +144,47 @@ class PublishEventRequest(BaseModel):
)


class ListTopicsRequest(BaseModel):
"""Request payload for listing topics."""

namespace: str = Field(
description="Namespace (required). Use the namespace from the agent's dispatch.yaml, or call list_namespaces to discover valid namespaces."
)


class GetRecentEventsRequest(BaseModel):
"""Request payload for getting recent events."""

namespace: str = Field(
description="Namespace (required). Use the namespace from the agent's dispatch.yaml, or call list_namespaces to discover valid namespaces."
)
topic: str | None = Field(default=None, description="Optional topic filter")
limit: int = Field(
default=20, description="Max events to return (1-100)", ge=1, le=100
)


class GetEventTraceRequest(BaseModel):
"""Request payload for getting an event trace."""

trace_id: str = Field(description="Trace ID to look up")
namespace: str = Field(
description="Namespace (required). Use the namespace from the agent's dispatch.yaml, or call list_namespaces to discover valid namespaces."
)


class GetRecentTracesRequest(BaseModel):
"""Request payload for getting recent traces."""

namespace: str = Field(
description="Namespace (required). Use the namespace from the agent's dispatch.yaml, or call list_namespaces to discover valid namespaces."
)
topic: str | None = Field(default=None, description="Optional topic filter")
limit: int = Field(
default=50, description="Max traces to return (1-100)", ge=1, le=100
)


class GetAgentFunctionsRequest(BaseModel):
"""Request payload for getting agent functions."""

Expand Down Expand Up @@ -1152,6 +1197,76 @@ async def publish_event(request: PublishEventRequest) -> PublishEventResponse:
result = client.publish_event(request.topic, request.payload, namespace=ns)
return PublishEventResponse(**result)

@mcp.tool()
async def list_topics(request: ListTopicsRequest) -> list[TopicListItem]:
"""List all topics in a namespace.

Returns topics with their subscribed handlers, webhook configuration,
and schema information.

Args:
request: ListTopicsRequest with namespace

Returns:
List of TopicListItem with topic details and subscribers
"""
ns = _get_namespace(request.namespace)
return client.list_topics(namespace=ns)

@mcp.tool()
async def get_recent_events(
request: GetRecentEventsRequest,
) -> list[EventRecord]:
"""Get recent events, optionally filtered by topic.

Args:
request: GetRecentEventsRequest with namespace, optional topic filter, and limit

Returns:
List of EventRecord with event details
"""
ns = _get_namespace(request.namespace)
return client.get_recent_events(
namespace=ns, topic=request.topic, limit=request.limit
)

@mcp.tool()
async def get_event_trace(request: GetEventTraceRequest) -> EventTraceResponse:
"""Get the full event trace tree for a given trace ID.

Returns a tree-structured view of all events in the trace, enriched
with invocation status, LLM call summaries, and MCP tool calls.

Args:
request: GetEventTraceRequest with trace_id and namespace

Returns:
EventTraceResponse with trace_id, total_events, tree-structured events, and optional llm_summary
"""
ns = _get_namespace(request.namespace)
return client.get_event_trace(trace_id=request.trace_id, namespace=ns)

@mcp.tool()
async def get_recent_traces(
request: GetRecentTracesRequest,
) -> RecentTracesResponse:
"""Get recent trace summaries for agent invocations.

Returns summaries of recent traces, including trigger type, involved
agents, and event counts. Useful for discovering trace IDs to inspect
with get_event_trace.

Args:
request: GetRecentTracesRequest with namespace, optional topic filter, and limit

Returns:
RecentTracesResponse with total_events, unique_traces, and list of TraceSummary
"""
ns = _get_namespace(request.namespace)
return client.get_recent_traces(
namespace=ns, topic=request.topic, limit=request.limit
)

@mcp.tool()
async def get_agent_functions(
request: GetAgentFunctionsRequest,
Expand Down
Loading
Loading