From 0cb8ff8d2b96ba347ef0826d9422764d2dff0c71 Mon Sep 17 00:00:00 2001 From: Noodle Date: Thu, 26 Mar 2026 21:52:22 -0400 Subject: [PATCH 1/3] feat(sdk): add idempotent workspace bootstrap helpers --- packages/sdk-python/src/relay_sdk/agent.py | 34 ++++++ packages/sdk-python/src/relay_sdk/models.py | 6 + packages/sdk-python/src/relay_sdk/relay.py | 50 +++++++- packages/sdk-python/tests/test_agent.py | 18 +++ packages/sdk-python/tests/test_relay.py | 21 ++++ packages/sdk-rust/src/agent.rs | 112 ++++++++++++++++++ packages/sdk-rust/src/lib.rs | 2 +- packages/sdk-rust/src/relay.rs | 71 +++++++++++ .../src/__tests__/agent.test.ts | 33 ++++++ .../src/__tests__/relay.test.ts | 36 ++++++ packages/sdk-typescript/src/agent.ts | 32 +++++ packages/sdk-typescript/src/relay.ts | 7 ++ 12 files changed, 420 insertions(+), 2 deletions(-) create mode 100644 packages/sdk-typescript/src/__tests__/agent.test.ts diff --git a/packages/sdk-python/src/relay_sdk/agent.py b/packages/sdk-python/src/relay_sdk/agent.py index 707429d..dda52b0 100644 --- a/packages/sdk-python/src/relay_sdk/agent.py +++ b/packages/sdk-python/src/relay_sdk/agent.py @@ -120,6 +120,23 @@ def get(self, name: str) -> dict[str, Any]: def join(self, name: str) -> Any: return self._client.post(f"/v1/channels/{_enc(name)}/join") + def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: + created = False + joined = False + try: + self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) + created = True + except Exception as err: + if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: + raise + try: + self._client.post(f"/v1/channels/{_enc(name)}/join") + joined = True + except Exception as err: + if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: + raise + return {"created": created, "joined": joined} + def leave(self, name: str) -> None: self._client.post(f"/v1/channels/{_enc(name)}/leave") @@ -437,6 +454,23 @@ async def get(self, name: str) -> dict[str, Any]: async def join(self, name: str) -> Any: return await self._client.post(f"/v1/channels/{_enc(name)}/join") + async def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: + created = False + joined = False + try: + await self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) + created = True + except Exception as err: + if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: + raise + try: + await self._client.post(f"/v1/channels/{_enc(name)}/join") + joined = True + except Exception as err: + if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: + raise + return {"created": created, "joined": joined} + async def leave(self, name: str) -> None: await self._client.post(f"/v1/channels/{_enc(name)}/leave") diff --git a/packages/sdk-python/src/relay_sdk/models.py b/packages/sdk-python/src/relay_sdk/models.py index 6869d5f..60c5a14 100644 --- a/packages/sdk-python/src/relay_sdk/models.py +++ b/packages/sdk-python/src/relay_sdk/models.py @@ -87,6 +87,12 @@ class CreateWorkspaceResponse(BaseModel): created_at: str +class WorkspaceStreamConfig(BaseModel): + enabled: bool + default_enabled: bool + override: bool | None = Field(default=None, alias='override') + + # ── Channel ─────────────────────────────────────────────────────── class Channel(BaseModel): diff --git a/packages/sdk-python/src/relay_sdk/relay.py b/packages/sdk-python/src/relay_sdk/relay.py index 745f2a3..8b754e8 100644 --- a/packages/sdk-python/src/relay_sdk/relay.py +++ b/packages/sdk-python/src/relay_sdk/relay.py @@ -9,7 +9,7 @@ from .client import AsyncHttpClient, HttpClient from .errors import RelayError from .local_runtime import ensure_local_runtime -from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace +from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace, WorkspaceStreamConfig def _enc(value: str) -> str: @@ -20,11 +20,35 @@ def _is_duplicate_agent_error(err: RelayError) -> bool: return err.status == 409 and err.code in {"agent_already_exists", "name_conflict"} +class _WorkspaceStreamNamespace: + def __init__(self, client: HttpClient) -> None: + self._client = client + + def get(self) -> WorkspaceStreamConfig: + result = self._client.get("/v1/workspace/stream") + return WorkspaceStreamConfig.model_validate(result) + + def set(self, enabled: bool) -> WorkspaceStreamConfig: + result = self._client.put("/v1/workspace/stream", {"enabled": enabled}) + return WorkspaceStreamConfig.model_validate(result) + + def inherit(self) -> WorkspaceStreamConfig: + result = self._client.put("/v1/workspace/stream", {"mode": "inherit"}) + return WorkspaceStreamConfig.model_validate(result) + + def ensure_enabled(self) -> WorkspaceStreamConfig: + config = self.get() + if config.enabled: + return config + return self.set(True) + + class _WorkspaceNamespace: """Sync workspace operations.""" def __init__(self, client: HttpClient) -> None: self._client = client + self.stream = _WorkspaceStreamNamespace(client) def info(self) -> Workspace: result = self._client.get("/v1/workspace") @@ -172,11 +196,35 @@ def __exit__(self, *args: Any) -> None: # ── Async variants ──────────────────────────────────────────────── +class _AsyncWorkspaceStreamNamespace: + def __init__(self, client: AsyncHttpClient) -> None: + self._client = client + + async def get(self) -> WorkspaceStreamConfig: + result = await self._client.get("/v1/workspace/stream") + return WorkspaceStreamConfig.model_validate(result) + + async def set(self, enabled: bool) -> WorkspaceStreamConfig: + result = await self._client.put("/v1/workspace/stream", {"enabled": enabled}) + return WorkspaceStreamConfig.model_validate(result) + + async def inherit(self) -> WorkspaceStreamConfig: + result = await self._client.put("/v1/workspace/stream", {"mode": "inherit"}) + return WorkspaceStreamConfig.model_validate(result) + + async def ensure_enabled(self) -> WorkspaceStreamConfig: + config = await self.get() + if config.enabled: + return config + return await self.set(True) + + class _AsyncWorkspaceNamespace: """Async workspace operations.""" def __init__(self, client: AsyncHttpClient) -> None: self._client = client + self.stream = _AsyncWorkspaceStreamNamespace(client) async def info(self) -> Workspace: result = await self._client.get("/v1/workspace") diff --git a/packages/sdk-python/tests/test_agent.py b/packages/sdk-python/tests/test_agent.py index 07498bc..85ab693 100644 --- a/packages/sdk-python/tests/test_agent.py +++ b/packages/sdk-python/tests/test_agent.py @@ -612,3 +612,21 @@ async def test_presence_disconnect_alias_posts_disconnect(self): await c.disconnect() assert route.called await c.client.close() + + + + +class TestAgentChannelEnsureJoined: + @respx.mock + def test_channels_ensure_joined_creates_and_joins(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=ok(CHANNEL)) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"joined": True})) + client = AgentClient(HttpClient(TOKEN, BASE)) + assert client.channels.ensure_joined("general", topic="General discussion") == {"created": True, "joined": True} + + @respx.mock + def test_channels_ensure_joined_treats_conflicts_as_success(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_exists", "message": "exists"}})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "already_joined", "message": "joined"}})) + client = AgentClient(HttpClient(TOKEN, BASE)) + assert client.channels.ensure_joined("general") == {"created": False, "joined": False} diff --git a/packages/sdk-python/tests/test_relay.py b/packages/sdk-python/tests/test_relay.py index 90d825f..f24328e 100644 --- a/packages/sdk-python/tests/test_relay.py +++ b/packages/sdk-python/tests/test_relay.py @@ -243,3 +243,24 @@ async def test_as_agent_returns_async_client(self): async with AsyncRelay(KEY, base_url=BASE) as r: ac = r.as_agent("at_xxx") assert isinstance(ac, AsyncAgentClient) + + + + +class TestRelayWorkspaceStream: + @respx.mock + def test_workspace_stream_ensure_enabled_is_noop_when_enabled(self): + route = respx.get(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": True, "default_enabled": True, "override": True})) + r = Relay(KEY, base_url=BASE) + cfg = r.workspace.stream.ensure_enabled() + assert cfg.enabled is True + assert route.called + + @respx.mock + def test_workspace_stream_ensure_enabled_sets_when_disabled(self): + respx.get(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": False, "default_enabled": False, "override": None})) + put = respx.put(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": True, "default_enabled": False, "override": True})) + r = Relay(KEY, base_url=BASE) + cfg = r.workspace.stream.ensure_enabled() + assert cfg.enabled is True + assert put.called diff --git a/packages/sdk-rust/src/agent.rs b/packages/sdk-rust/src/agent.rs index 1b389a5..bec950e 100644 --- a/packages/sdk-rust/src/agent.rs +++ b/packages/sdk-rust/src/agent.rs @@ -10,6 +10,13 @@ fn strip_hash(channel: &str) -> &str { channel.strip_prefix('#').unwrap_or(channel) } +/// Outcome from idempotently ensuring a channel exists and is joined. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct EnsureChannelJoinedOutcome { + pub created: bool, + pub joined: bool, +} + /// Client for agent-level operations. pub struct AgentClient { client: HttpClient, @@ -576,6 +583,32 @@ impl AgentClient { .await } + + /// Ensure a channel exists and that the current agent is joined to it. + /// + /// Conflict responses for both create and join are treated as successful + /// no-ops so callers can use this for idempotent startup/bootstrap flows. + pub async fn ensure_channel_joined( + &self, + request: CreateChannelRequest, + ) -> Result { + let channel_name = request.name.clone(); + + let created = match self.create_channel(request).await { + Ok(_) => true, + Err(error) if error.is_conflict() => false, + Err(error) => return Err(error), + }; + + let joined = match self.join_channel(&channel_name).await { + Ok(_) => true, + Err(error) if error.is_conflict() => false, + Err(error) => return Err(error), + }; + + Ok(EnsureChannelJoinedOutcome { created, joined }) + } + /// Leave a channel. pub async fn leave_channel(&self, name: &str) -> Result<()> { self.client @@ -845,3 +878,82 @@ impl AgentClient { self.client.get("/v1/files", query_ref, None).await } } + + +#[cfg(test)] +mod tests { + use super::AgentClient; + use crate::{ClientOptions, CreateChannelRequest, EnsureChannelJoinedOutcome, RelayError}; + use httpmock::{Method::POST, MockServer}; + + fn client(base_url: &str) -> AgentClient { + AgentClient::new("at_live_test", Some(base_url.to_string())).expect("agent client") + } + + #[tokio::test] + async fn ensure_channel_joined_creates_and_joins_when_missing() { + let server = MockServer::start(); + let create = server.mock(|when, then| { + when.method(POST).path("/v1/channels"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": { + "id": "ch_1", + "workspace_id": "w_1", + "name": "general", + "channel_type": 0, + "topic": "General discussion", + "metadata": {}, + "created_by": "agent_1", + "created_at": "2026-03-26T00:00:00Z", + "is_archived": false + } + })); + }); + let join = server.mock(|when, then| { + when.method(POST).path("/v1/channels/general/join"); + then.status(200).json_body_obj(&serde_json::json!({ "ok": true, "data": {"joined": true} })); + }); + + let agent = client(&server.base_url()); + let outcome = agent.ensure_channel_joined(CreateChannelRequest { + name: "general".into(), + topic: Some("General discussion".into()), + metadata: None, + }).await.expect("ensure joined succeeds"); + + assert_eq!(outcome, EnsureChannelJoinedOutcome { created: true, joined: true }); + create.assert(); + join.assert(); + } + + #[tokio::test] + async fn ensure_channel_joined_treats_conflicts_as_success() { + let server = MockServer::start(); + let create = server.mock(|when, then| { + when.method(POST).path("/v1/channels"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "channel_exists", "message": "exists"} + })); + }); + let join = server.mock(|when, then| { + when.method(POST).path("/v1/channels/general/join"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "already_joined", "message": "joined"} + })); + }); + + let agent = client(&server.base_url()); + let outcome = agent.ensure_channel_joined(CreateChannelRequest { + name: "general".into(), + topic: None, + metadata: None, + }).await.expect("ensure joined succeeds"); + + assert_eq!(outcome, EnsureChannelJoinedOutcome { created: false, joined: false }); + create.assert(); + join.assert(); + } +} diff --git a/packages/sdk-rust/src/lib.rs b/packages/sdk-rust/src/lib.rs index 88303bb..7a1c9ce 100644 --- a/packages/sdk-rust/src/lib.rs +++ b/packages/sdk-rust/src/lib.rs @@ -75,7 +75,7 @@ pub mod types; pub mod ws; // Re-export main types -pub use agent::AgentClient; +pub use agent::{AgentClient, EnsureChannelJoinedOutcome}; pub use client::{ClientOptions, HttpClient, RequestOptions}; pub use error::{RelayError, Result}; pub use registration::{ diff --git a/packages/sdk-rust/src/relay.rs b/packages/sdk-rust/src/relay.rs index 9119d7f..61ed5ac 100644 --- a/packages/sdk-rust/src/relay.rs +++ b/packages/sdk-rust/src/relay.rs @@ -307,6 +307,18 @@ impl RelayCast { .await } + /// Ensure workspace-wide WebSocket fanout is enabled. + /// + /// Returns the effective configuration after ensuring it is enabled. If the + /// stream is already enabled, this is a read-only no-op. + pub async fn ensure_workspace_stream_enabled(&self) -> Result { + let config = self.workspace_stream_get().await?; + if config.enabled { + return Ok(config); + } + self.workspace_stream_set(true).await + } + /// Clear workspace stream override and inherit default behavior. pub async fn workspace_stream_inherit(&self) -> Result { self.client @@ -758,3 +770,62 @@ impl RelayCast { .await } } + + +#[cfg(test)] +mod tests { + use super::RelayCast; + use crate::RelayCastOptions; + use httpmock::{Method::{GET, PUT}, MockServer}; + + fn relay(base_url: &str) -> RelayCast { + RelayCast::new(RelayCastOptions::new("rk_live_test").with_base_url(base_url)).expect("relay") + } + + #[tokio::test] + async fn ensure_workspace_stream_enabled_is_noop_when_already_enabled() { + let server = MockServer::start(); + let get = server.mock(|when, then| { + when.method(GET).path("/v1/workspace/stream"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": {"enabled": true, "default_enabled": true, "override": true} + })); + }); + let put = server.mock(|when, then| { + when.method(PUT).path("/v1/workspace/stream"); + then.status(200); + }); + + let relay = relay(&server.base_url()); + let config = relay.ensure_workspace_stream_enabled().await.expect("stream enabled"); + assert!(config.enabled); + get.assert(); + assert_eq!(put.hits(), 0); + } + + #[tokio::test] + async fn ensure_workspace_stream_enabled_sets_override_when_disabled() { + let server = MockServer::start(); + let get = server.mock(|when, then| { + when.method(GET).path("/v1/workspace/stream"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": {"enabled": false, "default_enabled": false, "override": null} + })); + }); + let put = server.mock(|when, then| { + when.method(PUT).path("/v1/workspace/stream").json_body_obj(&serde_json::json!({"enabled": true})); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": {"enabled": true, "default_enabled": false, "override": true} + })); + }); + + let relay = relay(&server.base_url()); + let config = relay.ensure_workspace_stream_enabled().await.expect("stream enabled"); + assert!(config.enabled); + get.assert(); + put.assert(); + } +} diff --git a/packages/sdk-typescript/src/__tests__/agent.test.ts b/packages/sdk-typescript/src/__tests__/agent.test.ts new file mode 100644 index 0000000..e1c0b3a --- /dev/null +++ b/packages/sdk-typescript/src/__tests__/agent.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; +import { AgentClient } from '../agent.js'; +import { HttpClient, RelayError } from '../client.js'; + +describe('AgentClient.channels.ensureJoined', () => { + const originalFetch = global.fetch; + + beforeEach(() => { + global.fetch = vi.fn() as typeof fetch; + }); + + afterEach(() => { + global.fetch = originalFetch; + }); + + it('creates and joins when missing', async () => { + (global.fetch as unknown as ReturnType) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { id: 'ch_1', workspace_id: 'w_1', name: 'general', channel_type: 0, topic: 'General discussion', metadata: {}, created_by: 'agent_1', created_at: '2026-03-26T00:00:00Z', is_archived: false } }), { status: 200 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { joined: true } }), { status: 200 })); + + const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); + await expect(client.channels.ensureJoined('general', { topic: 'General discussion' })).resolves.toEqual({ created: true, joined: true }); + }); + + it('treats conflicts as success', async () => { + (global.fetch as unknown as ReturnType) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'channel_exists', message: 'exists' } }), { status: 409 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'already_joined', message: 'joined' } }), { status: 409 })); + + const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); + await expect(client.channels.ensureJoined('general')).resolves.toEqual({ created: false, joined: false }); + }); +}); diff --git a/packages/sdk-typescript/src/__tests__/relay.test.ts b/packages/sdk-typescript/src/__tests__/relay.test.ts index aac3059..47a7091 100644 --- a/packages/sdk-typescript/src/__tests__/relay.test.ts +++ b/packages/sdk-typescript/src/__tests__/relay.test.ts @@ -1139,3 +1139,39 @@ describe('RelayCast', () => { }); }); }); + + +describe('workspace.stream.ensureEnabled', () => { + it('returns current config without PUT when already enabled', async () => { + global.fetch = vi.fn().mockResolvedValueOnce(new Response(JSON.stringify({ + ok: true, + data: { enabled: true, default_enabled: true, override: true }, + }), { status: 200 })); + + const { RelayCast } = await import('../relay.js'); + const relay = new RelayCast({ apiKey: 'rk_live_test' }); + const result = await relay.workspace.stream.ensureEnabled(); + + expect(result.enabled).toBe(true); + expect(global.fetch).toHaveBeenCalledTimes(1); + }); + + it('enables stream when disabled', async () => { + global.fetch = vi.fn() + .mockResolvedValueOnce(new Response(JSON.stringify({ + ok: true, + data: { enabled: false, default_enabled: false, override: null }, + }), { status: 200 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ + ok: true, + data: { enabled: true, default_enabled: false, override: true }, + }), { status: 200 })); + + const { RelayCast } = await import('../relay.js'); + const relay = new RelayCast({ apiKey: 'rk_live_test' }); + const result = await relay.workspace.stream.ensureEnabled(); + + expect(result.enabled).toBe(true); + expect(global.fetch).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/sdk-typescript/src/agent.ts b/packages/sdk-typescript/src/agent.ts index 2b0fa95..d2b2f9c 100644 --- a/packages/sdk-typescript/src/agent.ts +++ b/packages/sdk-typescript/src/agent.ts @@ -435,6 +435,38 @@ export class AgentClient { join: (name: string): Promise => this.client.post(`/v1/channels/${encodeURIComponent(name)}/join`), + ensureJoined: async ( + name: string, + options?: { topic?: string; metadata?: CreateChannelRequest['metadata'] }, + ): Promise<{ created: boolean; joined: boolean }> => { + let created = false; + let joined = false; + + try { + await this.client.post('/v1/channels', { + name, + topic: options?.topic, + metadata: options?.metadata, + }); + created = true; + } catch (error) { + if (!(error instanceof Error) || !("statusCode" in error) || (error as { statusCode?: number }).statusCode !== 409) { + throw error; + } + } + + try { + await this.client.post(`/v1/channels/${encodeURIComponent(name)}/join`); + joined = true; + } catch (error) { + if (!(error instanceof Error) || !("statusCode" in error) || (error as { statusCode?: number }).statusCode !== 409) { + throw error; + } + } + + return { created, joined }; + }, + leave: async (name: string): Promise => { await this.client.post(`/v1/channels/${encodeURIComponent(name)}/leave`); }, diff --git a/packages/sdk-typescript/src/relay.ts b/packages/sdk-typescript/src/relay.ts index 94461f9..08f28af 100644 --- a/packages/sdk-typescript/src/relay.ts +++ b/packages/sdk-typescript/src/relay.ts @@ -377,6 +377,13 @@ export class RelayCast { this.client.put('/v1/workspace/stream', { enabled }), inherit: (): Promise => this.client.put('/v1/workspace/stream', { mode: 'inherit' }), + ensureEnabled: async (): Promise => { + const config = await this.client.get('/v1/workspace/stream'); + if (config.enabled) { + return config; + } + return this.client.put('/v1/workspace/stream', { enabled: true }); + }, }, }; From 39788fd6c4696388e25876fda35b095338f1901b Mon Sep 17 00:00:00 2001 From: Noodle Date: Thu, 26 Mar 2026 22:05:39 -0400 Subject: [PATCH 2/3] fix(rust-sdk): restore httpmock test support --- packages/sdk-rust/Cargo.toml | 1 + packages/sdk-rust/src/agent.rs | 12 ++++++------ packages/sdk-rust/src/relay.rs | 10 +++++----- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/packages/sdk-rust/Cargo.toml b/packages/sdk-rust/Cargo.toml index ce94f2d..92ee217 100644 --- a/packages/sdk-rust/Cargo.toml +++ b/packages/sdk-rust/Cargo.toml @@ -27,6 +27,7 @@ urlencoding = "2.1" tokio-test = "0.4" mockito = "1.5" wiremock = "0.6" +httpmock = "0.7" tempfile = "3.19" [features] diff --git a/packages/sdk-rust/src/agent.rs b/packages/sdk-rust/src/agent.rs index bec950e..ef52914 100644 --- a/packages/sdk-rust/src/agent.rs +++ b/packages/sdk-rust/src/agent.rs @@ -883,8 +883,8 @@ impl AgentClient { #[cfg(test)] mod tests { use super::AgentClient; - use crate::{ClientOptions, CreateChannelRequest, EnsureChannelJoinedOutcome, RelayError}; - use httpmock::{Method::POST, MockServer}; + use crate::{CreateChannelRequest, EnsureChannelJoinedOutcome}; + use httpmock::{Method::POST, MockServer, Then, When}; fn client(base_url: &str) -> AgentClient { AgentClient::new("at_live_test", Some(base_url.to_string())).expect("agent client") @@ -893,7 +893,7 @@ mod tests { #[tokio::test] async fn ensure_channel_joined_creates_and_joins_when_missing() { let server = MockServer::start(); - let create = server.mock(|when, then| { + let create = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels"); then.status(200).json_body_obj(&serde_json::json!({ "ok": true, @@ -910,7 +910,7 @@ mod tests { } })); }); - let join = server.mock(|when, then| { + let join = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels/general/join"); then.status(200).json_body_obj(&serde_json::json!({ "ok": true, "data": {"joined": true} })); }); @@ -930,14 +930,14 @@ mod tests { #[tokio::test] async fn ensure_channel_joined_treats_conflicts_as_success() { let server = MockServer::start(); - let create = server.mock(|when, then| { + let create = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels"); then.status(409).json_body_obj(&serde_json::json!({ "ok": false, "error": {"code": "channel_exists", "message": "exists"} })); }); - let join = server.mock(|when, then| { + let join = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels/general/join"); then.status(409).json_body_obj(&serde_json::json!({ "ok": false, diff --git a/packages/sdk-rust/src/relay.rs b/packages/sdk-rust/src/relay.rs index 61ed5ac..ab00079 100644 --- a/packages/sdk-rust/src/relay.rs +++ b/packages/sdk-rust/src/relay.rs @@ -776,7 +776,7 @@ impl RelayCast { mod tests { use super::RelayCast; use crate::RelayCastOptions; - use httpmock::{Method::{GET, PUT}, MockServer}; + use httpmock::{Method::{GET, PUT}, MockServer, Then, When}; fn relay(base_url: &str) -> RelayCast { RelayCast::new(RelayCastOptions::new("rk_live_test").with_base_url(base_url)).expect("relay") @@ -785,14 +785,14 @@ mod tests { #[tokio::test] async fn ensure_workspace_stream_enabled_is_noop_when_already_enabled() { let server = MockServer::start(); - let get = server.mock(|when, then| { + let get = server.mock(|when: When, then: Then| { when.method(GET).path("/v1/workspace/stream"); then.status(200).json_body_obj(&serde_json::json!({ "ok": true, "data": {"enabled": true, "default_enabled": true, "override": true} })); }); - let put = server.mock(|when, then| { + let put = server.mock(|when: When, then: Then| { when.method(PUT).path("/v1/workspace/stream"); then.status(200); }); @@ -807,14 +807,14 @@ mod tests { #[tokio::test] async fn ensure_workspace_stream_enabled_sets_override_when_disabled() { let server = MockServer::start(); - let get = server.mock(|when, then| { + let get = server.mock(|when: When, then: Then| { when.method(GET).path("/v1/workspace/stream"); then.status(200).json_body_obj(&serde_json::json!({ "ok": true, "data": {"enabled": false, "default_enabled": false, "override": null} })); }); - let put = server.mock(|when, then| { + let put = server.mock(|when: When, then: Then| { when.method(PUT).path("/v1/workspace/stream").json_body_obj(&serde_json::json!({"enabled": true})); then.status(200).json_body_obj(&serde_json::json!({ "ok": true, From ace6ab4f2a17325edd584d08ac1a237b27c4d424 Mon Sep 17 00:00:00 2001 From: Noodle Date: Fri, 24 Apr 2026 20:57:12 -0400 Subject: [PATCH 3/3] Fix ensure-joined channel idempotence --- packages/sdk-python/src/relay_sdk/agent.py | 27 ++---- packages/sdk-python/tests/test_agent.py | 26 ++++-- packages/sdk-python/tests/test_relay.py | 2 - packages/sdk-rust/src/agent.rs | 84 +++++++++++++++---- .../src/__tests__/agent.test.ts | 25 ++++-- .../src/__tests__/relay.test.ts | 13 ++- packages/sdk-typescript/src/agent.ts | 19 ++--- 7 files changed, 130 insertions(+), 66 deletions(-) diff --git a/packages/sdk-python/src/relay_sdk/agent.py b/packages/sdk-python/src/relay_sdk/agent.py index dda52b0..e3fe638 100644 --- a/packages/sdk-python/src/relay_sdk/agent.py +++ b/packages/sdk-python/src/relay_sdk/agent.py @@ -6,6 +6,7 @@ from urllib.parse import quote from .client import AsyncHttpClient, HttpClient +from .errors import RelayError from .models import ( Channel, ChannelMemberInfo, @@ -122,19 +123,14 @@ def join(self, name: str) -> Any: def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: created = False - joined = False try: self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) created = True - except Exception as err: - if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: - raise - try: - self._client.post(f"/v1/channels/{_enc(name)}/join") - joined = True - except Exception as err: - if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: + except RelayError as err: + if err.status != 409 or err.code != "channel_already_exists": raise + join_result = self.join(name) + joined = not bool(join_result.get("already_member")) return {"created": created, "joined": joined} def leave(self, name: str) -> None: @@ -456,19 +452,14 @@ async def join(self, name: str) -> Any: async def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: created = False - joined = False try: await self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) created = True - except Exception as err: - if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: - raise - try: - await self._client.post(f"/v1/channels/{_enc(name)}/join") - joined = True - except Exception as err: - if not isinstance(err, Exception) or not getattr(err, 'status', None) == 409: + except RelayError as err: + if err.status != 409 or err.code != "channel_already_exists": raise + join_result = await self.join(name) + joined = not bool(join_result.get("already_member")) return {"created": created, "joined": joined} async def leave(self, name: str) -> None: diff --git a/packages/sdk-python/tests/test_agent.py b/packages/sdk-python/tests/test_agent.py index 85ab693..de9cf37 100644 --- a/packages/sdk-python/tests/test_agent.py +++ b/packages/sdk-python/tests/test_agent.py @@ -8,6 +8,7 @@ from relay_sdk.agent import AgentClient, AsyncAgentClient from relay_sdk.client import AsyncHttpClient, HttpClient +from relay_sdk.errors import RelayError from relay_sdk.models import ( Channel, ChannelMemberInfo, @@ -613,20 +614,29 @@ async def test_presence_disconnect_alias_posts_disconnect(self): assert route.called await c.client.close() - - - class TestAgentChannelEnsureJoined: @respx.mock def test_channels_ensure_joined_creates_and_joins(self): - respx.post(f"{BASE}/v1/channels").mock(return_value=ok(CHANNEL)) - respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"joined": True})) + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(201, json={"ok": True, "data": CHANNEL})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"channel": "general", "agent_id": "agent_1", "already_member": False})) client = AgentClient(HttpClient(TOKEN, BASE)) assert client.channels.ensure_joined("general", topic="General discussion") == {"created": True, "joined": True} @respx.mock - def test_channels_ensure_joined_treats_conflicts_as_success(self): - respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_exists", "message": "exists"}})) - respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "already_joined", "message": "joined"}})) + def test_channels_ensure_joined_treats_duplicate_create_and_already_member_as_success(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_already_exists", "message": "exists"}})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"channel": "general", "agent_id": "agent_1", "already_member": True})) client = AgentClient(HttpClient(TOKEN, BASE)) assert client.channels.ensure_joined("general") == {"created": False, "joined": False} + + @respx.mock + def test_channels_ensure_joined_does_not_swallow_join_conflicts(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_already_exists", "message": "exists"}})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "already_joined", "message": "joined"}})) + client = AgentClient(HttpClient(TOKEN, BASE)) + + with pytest.raises(RelayError) as excinfo: + client.channels.ensure_joined("general") + + assert excinfo.value.status == 409 + assert excinfo.value.code == "already_joined" diff --git a/packages/sdk-python/tests/test_relay.py b/packages/sdk-python/tests/test_relay.py index f24328e..746cef7 100644 --- a/packages/sdk-python/tests/test_relay.py +++ b/packages/sdk-python/tests/test_relay.py @@ -245,8 +245,6 @@ async def test_as_agent_returns_async_client(self): assert isinstance(ac, AsyncAgentClient) - - class TestRelayWorkspaceStream: @respx.mock def test_workspace_stream_ensure_enabled_is_noop_when_enabled(self): diff --git a/packages/sdk-rust/src/agent.rs b/packages/sdk-rust/src/agent.rs index ef52914..c4de2bb 100644 --- a/packages/sdk-rust/src/agent.rs +++ b/packages/sdk-rust/src/agent.rs @@ -1,7 +1,7 @@ //! Agent client for message and channel operations. use crate::client::{ClientOptions, HttpClient, RequestOptions}; -use crate::error::Result; +use crate::error::{RelayError, Result}; use crate::types::*; use crate::ws::{EventReceiver, LifecycleReceiver, WsClient, WsClientOptions}; @@ -583,11 +583,10 @@ impl AgentClient { .await } - /// Ensure a channel exists and that the current agent is joined to it. /// - /// Conflict responses for both create and join are treated as successful - /// no-ops so callers can use this for idempotent startup/bootstrap flows. + /// Duplicate channel creation is treated as a successful no-op, and join + /// idempotence is derived from the join response's `already_member` field. pub async fn ensure_channel_joined( &self, request: CreateChannelRequest, @@ -596,15 +595,24 @@ impl AgentClient { let created = match self.create_channel(request).await { Ok(_) => true, - Err(error) if error.is_conflict() => false, + Err(error) + if error.is_conflict() && error.code() == Some("channel_already_exists") => + { + false + } Err(error) => return Err(error), }; - let joined = match self.join_channel(&channel_name).await { - Ok(_) => true, - Err(error) if error.is_conflict() => false, - Err(error) => return Err(error), - }; + let join_response = self.join_channel(&channel_name).await?; + let joined = join_response + .get("already_member") + .and_then(serde_json::Value::as_bool) + .map(|already_member| !already_member) + .ok_or_else(|| { + RelayError::InvalidResponse( + "channel join response missing already_member".into(), + ) + })?; Ok(EnsureChannelJoinedOutcome { created, joined }) } @@ -895,7 +903,7 @@ mod tests { let server = MockServer::start(); let create = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels"); - then.status(200).json_body_obj(&serde_json::json!({ + then.status(201).json_body_obj(&serde_json::json!({ "ok": true, "data": { "id": "ch_1", @@ -912,7 +920,14 @@ mod tests { }); let join = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels/general/join"); - then.status(200).json_body_obj(&serde_json::json!({ "ok": true, "data": {"joined": true} })); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": { + "channel": "general", + "agent_id": "agent_1", + "already_member": false + } + })); }); let agent = client(&server.base_url()); @@ -928,20 +943,24 @@ mod tests { } #[tokio::test] - async fn ensure_channel_joined_treats_conflicts_as_success() { + async fn ensure_channel_joined_treats_duplicate_create_and_already_member_as_success() { let server = MockServer::start(); let create = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels"); then.status(409).json_body_obj(&serde_json::json!({ "ok": false, - "error": {"code": "channel_exists", "message": "exists"} + "error": {"code": "channel_already_exists", "message": "exists"} })); }); let join = server.mock(|when: When, then: Then| { when.method(POST).path("/v1/channels/general/join"); - then.status(409).json_body_obj(&serde_json::json!({ - "ok": false, - "error": {"code": "already_joined", "message": "joined"} + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": { + "channel": "general", + "agent_id": "agent_1", + "already_member": true + } })); }); @@ -956,4 +975,35 @@ mod tests { create.assert(); join.assert(); } + + #[tokio::test] + async fn ensure_channel_joined_does_not_swallow_join_conflicts() { + let server = MockServer::start(); + let create = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "channel_already_exists", "message": "exists"} + })); + }); + let join = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels/general/join"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "already_joined", "message": "joined"} + })); + }); + + let agent = client(&server.base_url()); + let error = agent.ensure_channel_joined(CreateChannelRequest { + name: "general".into(), + topic: None, + metadata: None, + }).await.expect_err("ensure joined should fail"); + + assert!(error.is_conflict()); + assert_eq!(error.code(), Some("already_joined")); + create.assert(); + join.assert(); + } } diff --git a/packages/sdk-typescript/src/__tests__/agent.test.ts b/packages/sdk-typescript/src/__tests__/agent.test.ts index e1c0b3a..29bceb1 100644 --- a/packages/sdk-typescript/src/__tests__/agent.test.ts +++ b/packages/sdk-typescript/src/__tests__/agent.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { AgentClient } from '../agent.js'; import { HttpClient, RelayError } from '../client.js'; @@ -15,19 +15,32 @@ describe('AgentClient.channels.ensureJoined', () => { it('creates and joins when missing', async () => { (global.fetch as unknown as ReturnType) - .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { id: 'ch_1', workspace_id: 'w_1', name: 'general', channel_type: 0, topic: 'General discussion', metadata: {}, created_by: 'agent_1', created_at: '2026-03-26T00:00:00Z', is_archived: false } }), { status: 200 })) - .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { joined: true } }), { status: 200 })); + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { id: 'ch_1', workspace_id: 'w_1', name: 'general', channel_type: 0, topic: 'General discussion', metadata: {}, created_by: 'agent_1', created_at: '2026-03-26T00:00:00Z', is_archived: false } }), { status: 201 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { channel: 'general', agent_id: 'agent_1', already_member: false } }), { status: 200 })); const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); await expect(client.channels.ensureJoined('general', { topic: 'General discussion' })).resolves.toEqual({ created: true, joined: true }); }); - it('treats conflicts as success', async () => { + it('treats duplicate create plus already-member join as success', async () => { (global.fetch as unknown as ReturnType) - .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'channel_exists', message: 'exists' } }), { status: 409 })) - .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'already_joined', message: 'joined' } }), { status: 409 })); + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'channel_already_exists', message: 'exists' } }), { status: 409 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { channel: 'general', agent_id: 'agent_1', already_member: true } }), { status: 200 })); const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); await expect(client.channels.ensureJoined('general')).resolves.toEqual({ created: false, joined: false }); }); + + it('does not swallow join conflicts', async () => { + (global.fetch as unknown as ReturnType) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'channel_already_exists', message: 'exists' } }), { status: 409 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'already_joined', message: 'joined' } }), { status: 409 })); + + const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); + + await expect(client.channels.ensureJoined('general')).rejects.toMatchObject({ + statusCode: 409, + rawCode: 'already_joined', + } satisfies Partial); + }); }); diff --git a/packages/sdk-typescript/src/__tests__/relay.test.ts b/packages/sdk-typescript/src/__tests__/relay.test.ts index 47a7091..2fe426b 100644 --- a/packages/sdk-typescript/src/__tests__/relay.test.ts +++ b/packages/sdk-typescript/src/__tests__/relay.test.ts @@ -1,9 +1,14 @@ -import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; // Mock global fetch once for this file. +const originalFetch = global.fetch; const mockFetch = vi.fn(); vi.stubGlobal('fetch', mockFetch); +afterAll(() => { + global.fetch = originalFetch; +}); + function mockResponse(data: unknown, apiOk = true, status = 200) { return Promise.resolve({ ok: status >= 200 && status < 300, @@ -15,6 +20,7 @@ function mockResponse(data: unknown, apiOk = true, status = 200) { describe('RelayCast', () => { beforeEach(() => { + global.fetch = mockFetch as typeof fetch; mockFetch.mockReset(); vi.useRealTimers(); }); @@ -1140,10 +1146,9 @@ describe('RelayCast', () => { }); }); - describe('workspace.stream.ensureEnabled', () => { it('returns current config without PUT when already enabled', async () => { - global.fetch = vi.fn().mockResolvedValueOnce(new Response(JSON.stringify({ + mockFetch.mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { enabled: true, default_enabled: true, override: true }, }), { status: 200 })); @@ -1157,7 +1162,7 @@ describe('workspace.stream.ensureEnabled', () => { }); it('enables stream when disabled', async () => { - global.fetch = vi.fn() + mockFetch .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { enabled: false, default_enabled: false, override: null }, diff --git a/packages/sdk-typescript/src/agent.ts b/packages/sdk-typescript/src/agent.ts index d2b2f9c..8f77369 100644 --- a/packages/sdk-typescript/src/agent.ts +++ b/packages/sdk-typescript/src/agent.ts @@ -55,7 +55,7 @@ import type { WsReconnectingEvent, WsPermanentlyDisconnectedEvent, } from './types.js'; -import { HttpClient, type RequestOptions } from './client.js'; +import { HttpClient, RelayError, type RequestOptions } from './client.js'; import { WsClient, type WsClientOptions, withInternalWsOrigin } from './ws.js'; function stripHash(channel: string): string { @@ -440,7 +440,6 @@ export class AgentClient { options?: { topic?: string; metadata?: CreateChannelRequest['metadata'] }, ): Promise<{ created: boolean; joined: boolean }> => { let created = false; - let joined = false; try { await this.client.post('/v1/channels', { @@ -450,19 +449,17 @@ export class AgentClient { }); created = true; } catch (error) { - if (!(error instanceof Error) || !("statusCode" in error) || (error as { statusCode?: number }).statusCode !== 409) { + if ( + !(error instanceof RelayError) + || error.statusCode !== 409 + || error.rawCode !== 'channel_already_exists' + ) { throw error; } } - try { - await this.client.post(`/v1/channels/${encodeURIComponent(name)}/join`); - joined = true; - } catch (error) { - if (!(error instanceof Error) || !("statusCode" in error) || (error as { statusCode?: number }).statusCode !== 409) { - throw error; - } - } + const joinResult = await this.channels.join(name); + const joined = !joinResult.alreadyMember; return { created, joined }; },