diff --git a/packages/sdk-py/src/agent_relay/__init__.py b/packages/sdk-py/src/agent_relay/__init__.py index 068291f24..7f1ce0c4a 100644 --- a/packages/sdk-py/src/agent_relay/__init__.py +++ b/packages/sdk-py/src/agent_relay/__init__.py @@ -2,7 +2,7 @@ # ── Primary API: Direct spawn/message (matches TypeScript SDK) ──────────────── -from .relay import AgentRelay, Agent, AgentSpawner, HumanHandle, Message, SpawnOptions +from .relay import AgentRelay, Agent, AgentSpawner, HumanHandle, Message, SpawnOptions, SystemHandle _has_communicate = False try: from .communicate import Relay, RelayConfig, on_relay @@ -78,6 +78,7 @@ "Agent", "AgentSpawner", "HumanHandle", + "SystemHandle", "Message", "SpawnOptions", *(["Relay", "RelayConfig", "on_relay"] if _has_communicate else []), diff --git a/packages/sdk-py/src/agent_relay/client.py b/packages/sdk-py/src/agent_relay/client.py index 2fc70c315..0c15d006b 100644 --- a/packages/sdk-py/src/agent_relay/client.py +++ b/packages/sdk-py/src/agent_relay/client.py @@ -26,6 +26,7 @@ from .protocol import ( BrokerEvent, MessageInjectionMode, + ParticipantKind, ) # ── Errors ──────────────────────────────────────────────────────────────────── @@ -469,6 +470,7 @@ async def send_message( to: str, text: str, from_: Optional[str] = None, + from_kind: Optional[ParticipantKind] = None, thread_id: Optional[str] = None, priority: Optional[int] = None, data: Optional[dict[str, Any]] = None, @@ -476,6 +478,7 @@ async def send_message( ) -> dict[str, Any]: payload: dict[str, Any] = {"to": to, "text": text} if from_ is not None: payload["from"] = from_ + if from_kind is not None: payload["senderKind"] = from_kind if thread_id is not None: payload["threadId"] = thread_id if priority is not None: payload["priority"] = priority if data is not None: payload["data"] = data diff --git a/packages/sdk-py/src/agent_relay/protocol.py b/packages/sdk-py/src/agent_relay/protocol.py index e0f263e50..8e8a846f4 100644 --- a/packages/sdk-py/src/agent_relay/protocol.py +++ b/packages/sdk-py/src/agent_relay/protocol.py @@ -10,6 +10,8 @@ AgentRuntime = Literal["pty", "headless"] HeadlessProvider = Literal["claude", "opencode"] MessageInjectionMode = Literal["wait", "steer"] +ParticipantKind = Literal["agent", "human", "system"] +SenderKind = Literal["agent", "human", "system", "unknown"] # BrokerEvent is a dict with a 'kind' field discriminator. # Event kinds: agent_spawned, agent_released, agent_exit, agent_exited, diff --git a/packages/sdk-py/src/agent_relay/relay.py b/packages/sdk-py/src/agent_relay/relay.py index 84dd477da..120ee60c0 100644 --- a/packages/sdk-py/src/agent_relay/relay.py +++ b/packages/sdk-py/src/agent_relay/relay.py @@ -13,10 +13,10 @@ import os import secrets from dataclasses import dataclass, field -from typing import Any, Awaitable, Callable, Optional +from typing import Any, Awaitable, Callable, Literal, Optional from .client import AgentRelayClient -from .protocol import AgentRuntime, BrokerEvent, MessageInjectionMode +from .protocol import AgentRuntime, BrokerEvent, MessageInjectionMode, SenderKind # ── Public types ────────────────────────────────────────────────────────────── @@ -34,6 +34,7 @@ class Message: from_name: str to: str text: str + from_kind: Optional[SenderKind] = None thread_id: Optional[str] = None data: Optional[dict[str, Any]] = None mode: Optional[MessageInjectionMode] = None @@ -215,6 +216,7 @@ async def send_message( msg = Message( event_id=event_id, from_name=self._name, + from_kind="agent", to=to, text=text, thread_id=thread_id, @@ -241,20 +243,25 @@ def unsubscribe() -> None: return unsubscribe -# ── Human handle ────────────────────────────────────────────────────────────── +# ── Human/system handles ────────────────────────────────────────────────────── -class HumanHandle: - """A messaging handle for human/system messages.""" +class _ParticipantHandle: + """Shared messaging handle for non-agent participants.""" - def __init__(self, name: str, relay: AgentRelay): + def __init__(self, name: str, kind: Literal["human", "system"], relay: AgentRelay): self._name = name + self._kind = kind self._relay = relay @property def name(self) -> str: return self._name + @property + def kind(self) -> Literal["human", "system"]: + return self._kind + async def send_message( self, *, @@ -270,6 +277,7 @@ async def send_message( to=to, text=text, from_=self._name, + from_kind=self._kind, thread_id=thread_id, priority=priority, data=data, @@ -280,6 +288,7 @@ async def send_message( msg = Message( event_id=event_id, from_name=self._name, + from_kind=self._kind, to=to, text=text, thread_id=thread_id, @@ -292,6 +301,20 @@ async def send_message( return msg +class HumanHandle(_ParticipantHandle): + """A messaging handle for human participants.""" + + def __init__(self, name: str, relay: AgentRelay): + super().__init__(name, "human", relay) + + +class SystemHandle(_ParticipantHandle): + """A messaging handle for deterministic system-origin messages.""" + + def __init__(self, relay: AgentRelay): + super().__init__("system", "system", relay) + + # ── Agent spawner ───────────────────────────────────────────────────────────── @@ -573,8 +596,8 @@ async def spawn_and_wait( def human(self, name: str) -> HumanHandle: return HumanHandle(name, self) - def system(self) -> HumanHandle: - return HumanHandle("system", self) + def system(self) -> SystemHandle: + return SystemHandle(self) async def broadcast(self, text: str, *, from_name: str = "human:orchestrator") -> Message: return await self.human(from_name).send_message(to="*", text=text) @@ -773,6 +796,7 @@ def on_event(event: BrokerEvent) -> None: msg = Message( event_id=event.get("event_id", ""), from_name=event.get("from", ""), + from_kind=event.get("sender_kind"), to=event.get("target", ""), text=event.get("body", ""), thread_id=event.get("thread_id"), diff --git a/packages/sdk-py/tests/test_send_message_mode.py b/packages/sdk-py/tests/test_send_message_mode.py index 4dedcc83b..1cebc597d 100644 --- a/packages/sdk-py/tests/test_send_message_mode.py +++ b/packages/sdk-py/tests/test_send_message_mode.py @@ -26,6 +26,7 @@ async def fake_request_ok(type_: str, payload: dict): to="Worker", text="hello", from_="system", + from_kind="system", thread_id="thread-1", priority=5, data={"k": "v"}, @@ -38,6 +39,7 @@ async def fake_request_ok(type_: str, payload: dict): "to": "Worker", "text": "hello", "from": "system", + "senderKind": "system", "thread_id": "thread-1", "priority": 5, "data": {"k": "v"}, @@ -53,14 +55,17 @@ async def test_human_send_message_passes_mode_and_sets_message_mode(): client.send_message = AsyncMock(return_value={"event_id": "evt-2"}) relay._ensure_started = AsyncMock(return_value=client) - human = HumanHandle("system", relay) + human = HumanHandle("operator", relay) msg = await human.send_message(to="Worker", text="status?", mode="wait") assert msg.mode == "wait" + assert msg.from_kind == "human" + assert human.kind == "human" client.send_message.assert_awaited_once_with( to="Worker", text="status?", - from_="system", + from_="operator", + from_kind="human", thread_id=None, priority=None, data=None, @@ -80,12 +85,39 @@ async def test_agent_send_message_passes_mode_and_sets_message_mode(): msg = await agent.send_message(to="Reviewer", text="ready", mode="steer") assert msg.mode == "steer" + assert msg.from_kind == "agent" client.send_message.assert_awaited_with( to="Reviewer", text="ready", from_="Worker", + from_kind="agent", thread_id=None, priority=None, data=None, mode="steer", ) + + +@pytest.mark.asyncio +async def test_system_handle_is_distinct_from_human_handle(): + relay = AgentRelay() + client = AsyncMock() + client.send_message = AsyncMock(return_value={"event_id": "evt-4"}) + relay._ensure_started = AsyncMock(return_value=client) + + system = relay.system() + msg = await system.send_message(to="Worker", text="deterministic notice") + + assert system.kind == "system" + assert system.name == "system" + assert msg.from_kind == "system" + client.send_message.assert_awaited_once_with( + to="Worker", + text="deterministic notice", + from_="system", + from_kind="system", + thread_id=None, + priority=None, + data=None, + mode=None, + ) diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index 945cf66c6..2951df652 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -756,9 +756,31 @@ describe('AgentRelay orchestration handles', () => { to: 'worker-1', text: 'New task assigned', from: 'system', + fromKind: 'system', }) ); + expect(system.kind).toBe('system'); expect(message.from).toBe('system'); + expect(message.fromKind).toBe('system'); + } finally { + await relay.shutdown(); + } + }); + + it('human() and system() stay type-distinct while sharing the send API', async () => { + const { client } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + + try { + const human = relay.human({ name: 'Owner' }); + const system = relay.system(); + + expect(human.kind).toBe('human'); + expect(system.kind).toBe('system'); + expect(human.name).toBe('Owner'); + expect(system.name).toBe('system'); } finally { await relay.shutdown(); } diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 61662dc05..6c609186c 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -384,6 +384,7 @@ export class AgentRelayClient { to: input.to, text: input.text, from: input.from, + senderKind: input.fromKind, threadId: input.threadId, workspaceId: input.workspaceId, workspaceAlias: input.workspaceAlias, diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index 373930e39..b170346cc 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -2,6 +2,8 @@ export const PROTOCOL_VERSION = 1 as const; export type AgentRuntime = 'pty' | 'headless'; export type HeadlessProvider = 'claude' | 'opencode'; +export type ParticipantKind = 'agent' | 'human' | 'system'; +export type SenderKind = ParticipantKind | 'unknown'; export interface RestartPolicy { enabled?: boolean; @@ -62,6 +64,7 @@ export type SdkToBroker = to: string; text: string; from?: string; + from_kind?: ParticipantKind; thread_id?: string; workspace_id?: string; workspace_alias?: string; @@ -230,6 +233,7 @@ export type BrokerEvent = kind: 'relay_inbound'; event_id: string; from: string; + sender_kind?: SenderKind; target: string; body: string; thread_id?: string; diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 8493572f1..f0ac18a44 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -38,7 +38,9 @@ import type { BrokerStatus, HeadlessProvider, MessageInjectionMode, + ParticipantKind, RestartPolicy, + SenderKind, } from './protocol.js'; import { followLogs as followLogsFromFile, @@ -55,6 +57,7 @@ function isUnsupportedOperation(error: unknown): error is AgentRelayProtocolErro function buildUnsupportedOperationMessage( from: string, + fromKind: ParticipantKind, input: { to: string; text: string; @@ -66,6 +69,7 @@ function buildUnsupportedOperationMessage( return { eventId: 'unsupported_operation', from, + fromKind, to: input.to, text: input.text, threadId: input.threadId, @@ -144,6 +148,7 @@ function toWorkspaceRegistryEntry(value: unknown): WorkspaceRegistryEntry { export interface Message { eventId: string; from: string; + fromKind?: SenderKind; to: string; text: string; threadId?: string; @@ -230,6 +235,7 @@ type AgentOutputPayload = { stream: string; chunk: string }; type AgentOutputCallback = ((chunk: string) => void) | ((data: AgentOutputPayload) => void); export interface Agent { + readonly kind: 'agent'; readonly name: string; readonly runtime: AgentRuntime; readonly channels: string[]; @@ -272,6 +278,7 @@ export interface Agent { } export interface HumanHandle { + readonly kind: 'human'; readonly name: string; sendMessage(input: { to: string; @@ -283,6 +290,19 @@ export interface HumanHandle { }): Promise; } +export interface SystemHandle { + readonly kind: 'system'; + readonly name: 'system'; + sendMessage(input: { + to: string; + text: string; + threadId?: string; + priority?: number; + data?: Record; + mode?: MessageInjectionMode; + }): Promise; +} + export interface AgentSpawner { spawn(options?: SpawnerSpawnOptions): Promise; } @@ -645,16 +665,36 @@ export class AgentRelay { // ── Human source ──────────────────────────────────────────────────────── human(opts: { name: string }): HumanHandle { - return { - name: opts.name, - sendMessage: async (input) => { + return this.createParticipantHandle('human', opts.name); + } + + system(): SystemHandle { + return this.createParticipantHandle('system', 'system'); + } + + private createParticipantHandle( + kind: TKind, + name: TKind extends 'system' ? 'system' : string + ): TKind extends 'system' ? SystemHandle : HumanHandle { + const handle = { + kind, + name, + sendMessage: async (input: { + to: string; + text: string; + threadId?: string; + priority?: number; + data?: Record; + mode?: MessageInjectionMode; + }): Promise => { const client = await this.ensureStarted(); let result: Awaited>; try { result = await client.sendMessage({ to: input.to, text: input.text, - from: opts.name, + from: name, + fromKind: kind, threadId: input.threadId, priority: input.priority, data: input.data, @@ -662,18 +702,19 @@ export class AgentRelay { }); } catch (error) { if (isUnsupportedOperation(error)) { - return buildUnsupportedOperationMessage(opts.name, input); + return buildUnsupportedOperationMessage(name, kind, input); } throw error; } if (result?.event_id === 'unsupported_operation') { - return buildUnsupportedOperationMessage(opts.name, input); + return buildUnsupportedOperationMessage(name, kind, input); } const eventId = result?.event_id ?? randomBytes(8).toString('hex'); const msg: Message = { eventId, - from: opts.name, + from: name, + fromKind: kind, to: input.to, text: input.text, threadId: input.threadId, @@ -684,10 +725,8 @@ export class AgentRelay { return msg; }, }; - } - system(): HumanHandle { - return this.human({ name: 'system' }); + return handle as TKind extends 'system' ? SystemHandle : HumanHandle; } // ── Messaging ───────────────────────────────────────────────────────── @@ -1212,10 +1251,11 @@ export class AgentRelay { this.messageReadyAgents.add(event.from); this.exitedAgents.delete(event.from); } - const msg: Message = { - eventId: event.event_id, - from: event.from, - to: event.target, + const msg: Message = { + eventId: event.event_id, + from: event.from, + fromKind: event.sender_kind, + to: event.target, text: event.body, threadId: event.thread_id, mode: event.injection_mode ?? event.mode, @@ -1354,6 +1394,7 @@ export class AgentRelay { const relay = this; let agentChannels = [...channels]; const agent: InternalAgent = { + kind: 'agent', name, runtime, get channels() { @@ -1499,6 +1540,7 @@ export class AgentRelay { to: input.to, text: input.text, from: name, + fromKind: 'agent', threadId: input.threadId, priority: input.priority, data: input.data, @@ -1506,17 +1548,18 @@ export class AgentRelay { }); } catch (error) { if (isUnsupportedOperation(error)) { - return buildUnsupportedOperationMessage(name, input); + return buildUnsupportedOperationMessage(name, 'agent', input); } throw error; } if (result?.event_id === 'unsupported_operation') { - return buildUnsupportedOperationMessage(name, input); + return buildUnsupportedOperationMessage(name, 'agent', input); } const eventId = result?.event_id ?? randomBytes(8).toString('hex'); const msg: Message = { eventId, from: name, + fromKind: 'agent', to: input.to, text: input.text, threadId: input.threadId, diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index f3120049c..b64d8130d 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -2,7 +2,13 @@ * Shared input/output types for the broker SDK. */ -import type { AgentRuntime, HeadlessProvider, MessageInjectionMode, RestartPolicy } from './protocol.js'; +import type { + AgentRuntime, + HeadlessProvider, + MessageInjectionMode, + ParticipantKind, + RestartPolicy, +} from './protocol.js'; export interface SpawnPtyInput { name: string; @@ -54,6 +60,7 @@ export interface SendMessageInput { to: string; text: string; from?: string; + fromKind?: ParticipantKind; threadId?: string; workspaceId?: string; workspaceAlias?: string; diff --git a/src/control.rs b/src/control.rs index 11c2e91f8..64d31a525 100644 --- a/src/control.rs +++ b/src/control.rs @@ -4,8 +4,8 @@ pub fn is_human_sender(sender: &str, sender_kind: SenderKind) -> bool { if matches!(sender_kind, SenderKind::Human) { return true; } - // If the protocol explicitly marks the sender as an agent, trust that. - if matches!(sender_kind, SenderKind::Agent) { + // If the protocol explicitly marks the sender as an agent or system actor, trust that. + if matches!(sender_kind, SenderKind::Agent | SenderKind::System) { return false; } // Fallback heuristic for Unknown sender_kind (e.g. command.invoked events). @@ -26,6 +26,7 @@ mod tests { fn human_sender_detection() { assert!(is_human_sender("alice", SenderKind::Human)); assert!(is_human_sender("human:alice", SenderKind::Unknown)); + assert!(!is_human_sender("system", SenderKind::System)); assert!(!is_human_sender("Worker1", SenderKind::Agent)); // Explicit Agent kind overrides string heuristic assert!(!is_human_sender("human:spoofed", SenderKind::Agent)); diff --git a/src/listen_api.rs b/src/listen_api.rs index 4aefa9798..942d52738 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant}; use relay_broker::{ multi_workspace::WorkspaceMembershipSummary, protocol::MessageInjectionMode, - replay_buffer::ReplayBuffer, + replay_buffer::ReplayBuffer, types::SenderKind, }; use serde::Deserialize; use serde_json::{json, Value}; @@ -62,6 +62,7 @@ pub enum ListenApiRequest { to: String, text: String, from: Option, + sender_kind: Option, thread_id: Option, workspace_id: Option, workspace_alias: Option, @@ -678,6 +679,29 @@ async fn listen_api_send( .map(str::trim) .filter(|value| !value.is_empty()) .map(str::to_string); + let sender_kind = match body + .get("senderKind") + .or_else(|| body.get("sender_kind")) + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(|value| value.to_ascii_lowercase()) + .as_deref() + { + Some("agent") => Some(SenderKind::Agent), + Some("human") => Some(SenderKind::Human), + Some("system") => Some(SenderKind::System), + Some("unknown") | None => None, + Some(other) => { + return ( + axum::http::StatusCode::BAD_REQUEST, + axum::Json(json!({ + "success": false, + "error": format!("invalid sender kind '{other}'. expected 'agent', 'human', or 'system'"), + })), + ); + } + }; let workspace_id = body .get("workspaceId") .or_else(|| body.get("workspace_id")) @@ -718,6 +742,7 @@ async fn listen_api_send( request_id = %request_id, to = %to, from = ?from, + sender_kind = ?sender_kind, thread_id = ?thread_id, workspace_id = ?workspace_id, workspace_alias = ?workspace_alias, @@ -746,6 +771,7 @@ async fn listen_api_send( to: to.clone(), text, from, + sender_kind, thread_id, workspace_id, workspace_alias, @@ -1707,11 +1733,17 @@ mod auth_tests { let (router, mut rx) = test_router(Some("secret")); let send_replier = tokio::spawn(async move { match rx.recv().await { - Some(ListenApiRequest::Send { mode, reply, .. }) => { + Some(ListenApiRequest::Send { + mode, + sender_kind, + reply, + .. + }) => { assert!(matches!( mode, relay_broker::protocol::MessageInjectionMode::Wait )); + assert_eq!(sender_kind, None); let _ = reply.send(Ok(json!({ "success": true, "event_id": "evt_1" }))); } other => panic!("unexpected request: {:?}", other.map(|_| "other")), @@ -1742,11 +1774,17 @@ mod auth_tests { let (router, mut rx) = test_router(Some("secret")); let send_replier = tokio::spawn(async move { match rx.recv().await { - Some(ListenApiRequest::Send { mode, reply, .. }) => { + Some(ListenApiRequest::Send { + mode, + sender_kind, + reply, + .. + }) => { assert!(matches!( mode, relay_broker::protocol::MessageInjectionMode::Steer )); + assert_eq!(sender_kind, None); let _ = reply.send(Ok(json!({ "success": true, "event_id": "evt_2" }))); } other => panic!("unexpected request: {:?}", other.map(|_| "other")), @@ -1773,6 +1811,50 @@ mod auth_tests { send_replier.await.expect("send replier should complete"); } + #[tokio::test] + async fn send_route_forwards_system_sender_kind() { + let (router, mut rx) = test_router(Some("secret")); + let send_replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::Send { + from, + sender_kind, + reply, + .. + }) => { + assert_eq!(from.as_deref(), Some("system")); + assert_eq!(sender_kind, Some(SenderKind::System)); + let _ = reply.send(Ok(json!({ "success": true, "event_id": "evt_3" }))); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/send") + .method("POST") + .header("x-api-key", "secret") + .header("content-type", "application/json") + .body(Body::from( + json!({ + "to": "worker-a", + "text": "system notice", + "from": "system", + "senderKind": "system" + }) + .to_string(), + )) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + send_replier.await.expect("send replier should complete"); + } + #[tokio::test] async fn send_route_rejects_invalid_mode() { let (router, mut rx) = test_router(Some("secret")); diff --git a/src/main.rs b/src/main.rs index ae275cecb..98efe0ec0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -561,13 +561,27 @@ struct ThreadAccumulator { sort_key: i64, } -fn normalize_sender(sender: Option) -> String { +fn normalize_sender(sender: Option, sender_kind: Option) -> String { + let default_sender = match sender_kind { + Some(SenderKind::System) => "system", + _ => "human:orchestrator", + }; let raw = sender - .unwrap_or_else(|| "human:orchestrator".to_string()) + .unwrap_or_else(|| default_sender.to_string()) .trim() .to_string(); if raw.is_empty() { - return "human:orchestrator".to_string(); + return default_sender.to_string(); + } + if matches!(sender_kind, Some(SenderKind::System)) { + if let Some(rest) = raw.strip_prefix("system:") { + let normalized_rest = rest.trim(); + if normalized_rest.is_empty() { + return "system".to_string(); + } + return format!("system:{normalized_rest}"); + } + return raw; } if let Some(rest) = raw.strip_prefix("human:") { let normalized_rest = rest.trim(); @@ -579,6 +593,15 @@ fn normalize_sender(sender: Option) -> String { raw } +fn sender_kind_label(sender_kind: SenderKind) -> &'static str { + match sender_kind { + SenderKind::Agent => "agent", + SenderKind::Human => "human", + SenderKind::System => "system", + SenderKind::Unknown => "unknown", + } +} + fn sender_is_dashboard_label(sender: &str, self_name: &str) -> bool { let trimmed = sender.trim(); trimmed.eq_ignore_ascii_case("Dashboard") @@ -1991,6 +2014,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { to, text, from, + sender_kind, thread_id, workspace_id, workspace_alias, @@ -2034,7 +2058,8 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { let selected_workspace_id = selected_workspace.workspace_id.clone(); let selected_workspace_alias = selected_workspace.workspace_alias.clone(); let workspace_self_name = selected_workspace.self_name.clone(); - let normalized_sender = normalize_sender(from.clone()); + let normalized_sender = + normalize_sender(from.clone(), sender_kind); let from_dashboard = sender_is_dashboard_label(&normalized_sender, &workspace_self_name); let delivery_from = if from_dashboard { @@ -2167,6 +2192,9 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "kind": "relay_inbound", "event_id": event_id, "from": ui_from, + "sender_kind": sender_kind + .map(sender_kind_label) + .unwrap_or("unknown"), "target": normalized_to, "body": text, "thread_id": thread_id.clone(), @@ -2231,6 +2259,9 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "kind": "relay_inbound", "event_id": event_id, "from": ui_from, + "sender_kind": sender_kind + .map(sender_kind_label) + .unwrap_or("unknown"), "target": normalized_to, "body": text, "thread_id": thread_id.clone(), @@ -6088,10 +6119,13 @@ mod tests { #[test] fn normalize_sender_defaults_to_human_orchestrator() { - assert_eq!(normalize_sender(None), "human:orchestrator"); - assert_eq!(normalize_sender(Some(String::new())), "human:orchestrator"); + assert_eq!(normalize_sender(None, None), "human:orchestrator"); assert_eq!( - normalize_sender(Some(" ".to_string())), + normalize_sender(Some(String::new()), None), + "human:orchestrator" + ); + assert_eq!( + normalize_sender(Some(" ".to_string()), None), "human:orchestrator" ); } @@ -6099,7 +6133,7 @@ mod tests { #[test] fn normalize_sender_normalizes_human_prefix() { assert_eq!( - normalize_sender(Some("human: Dashboard ".to_string())), + normalize_sender(Some("human: Dashboard ".to_string()), None), "human:Dashboard" ); } @@ -6107,11 +6141,23 @@ mod tests { #[test] fn normalize_sender_preserves_worker_names() { assert_eq!( - normalize_sender(Some("WorkerOne".to_string())), + normalize_sender(Some("WorkerOne".to_string()), None), "WorkerOne".to_string() ); } + #[test] + fn normalize_sender_defaults_and_normalizes_system_sender() { + assert_eq!(normalize_sender(None, Some(SenderKind::System)), "system"); + assert_eq!( + normalize_sender( + Some("system: planner ".to_string()), + Some(SenderKind::System) + ), + "system:planner" + ); + } + #[test] fn sender_is_dashboard_label_accepts_legacy_dashboard_senders() { assert!(sender_is_dashboard_label("Dashboard", "my-project")); diff --git a/src/message_bridge.rs b/src/message_bridge.rs index 277ab9d92..04e6887b4 100644 --- a/src/message_bridge.rs +++ b/src/message_bridge.rs @@ -727,6 +727,7 @@ fn parse_sender_kind(accessor: EventAccessor<'_>) -> SenderKind { fn parse_sender_kind_label(raw: &str) -> Option { match raw.trim().to_ascii_lowercase().as_str() { "human" | "user" => Some(SenderKind::Human), + "system" => Some(SenderKind::System), "agent" | "bot" | "assistant" => Some(SenderKind::Agent), _ => None, } @@ -1093,6 +1094,26 @@ mod tests { assert_eq!(event.sender_kind, crate::types::SenderKind::Agent); } + #[test] + fn sender_kind_parses_system_role() { + let event = map_event(&json!({ + "type": "message.created", + "event_id": "evt-system", + "channel": "ops", + "payload": { + "text": "maintenance window", + "from": { + "name": "system", + "role": "system" + } + } + })) + .expect("system payload should map"); + + assert_eq!(event.from, "system"); + assert_eq!(event.sender_kind, crate::types::SenderKind::System); + } + #[test] fn maps_reaction_added() { let event = map_event(&json!({ diff --git a/src/protocol.rs b/src/protocol.rs index 049941a1c..95dc9c8dc 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::supervisor::RestartPolicy; +use crate::types::SenderKind; pub const PROTOCOL_VERSION: u32 = 1; @@ -98,6 +99,8 @@ pub enum SdkToBroker { #[serde(default)] from: Option, #[serde(default)] + from_kind: Option, + #[serde(default)] thread_id: Option, #[serde(default)] workspace_id: Option, @@ -175,6 +178,8 @@ pub enum BrokerEvent { RelayInbound { event_id: String, from: String, + #[serde(default)] + sender_kind: Option, target: String, body: String, thread_id: Option, diff --git a/src/types.rs b/src/types.rs index 34dda695d..e0407c32a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,6 +38,7 @@ pub enum InboundKind { pub enum SenderKind { Agent, Human, + System, Unknown, }