diff --git a/packages/sdk-python/src/relay_sdk/agent.py b/packages/sdk-python/src/relay_sdk/agent.py index 707429d..e3fe638 100644 --- a/packages/sdk-python/src/relay_sdk/agent.py +++ b/packages/sdk-python/src/relay_sdk/agent.py @@ -6,6 +6,7 @@ from urllib.parse import quote from .client import AsyncHttpClient, HttpClient +from .errors import RelayError from .models import ( Channel, ChannelMemberInfo, @@ -120,6 +121,18 @@ def get(self, name: str) -> dict[str, Any]: def join(self, name: str) -> Any: return self._client.post(f"/v1/channels/{_enc(name)}/join") + def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: + created = False + try: + self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) + created = True + except RelayError as err: + if err.status != 409 or err.code != "channel_already_exists": + raise + join_result = self.join(name) + joined = not bool(join_result.get("already_member")) + return {"created": created, "joined": joined} + def leave(self, name: str) -> None: self._client.post(f"/v1/channels/{_enc(name)}/leave") @@ -437,6 +450,18 @@ async def get(self, name: str) -> dict[str, Any]: async def join(self, name: str) -> Any: return await self._client.post(f"/v1/channels/{_enc(name)}/join") + async def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: + created = False + try: + await self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) + created = True + except RelayError as err: + if err.status != 409 or err.code != "channel_already_exists": + raise + join_result = await self.join(name) + joined = not bool(join_result.get("already_member")) + return {"created": created, "joined": joined} + async def leave(self, name: str) -> None: await self._client.post(f"/v1/channels/{_enc(name)}/leave") diff --git a/packages/sdk-python/src/relay_sdk/models.py b/packages/sdk-python/src/relay_sdk/models.py index 6869d5f..60c5a14 100644 --- a/packages/sdk-python/src/relay_sdk/models.py +++ b/packages/sdk-python/src/relay_sdk/models.py @@ -87,6 +87,12 @@ class CreateWorkspaceResponse(BaseModel): created_at: str +class WorkspaceStreamConfig(BaseModel): + enabled: bool + default_enabled: bool + override: bool | None = Field(default=None, alias='override') + + # ── Channel ─────────────────────────────────────────────────────── class Channel(BaseModel): diff --git a/packages/sdk-python/src/relay_sdk/relay.py b/packages/sdk-python/src/relay_sdk/relay.py index 745f2a3..8b754e8 100644 --- a/packages/sdk-python/src/relay_sdk/relay.py +++ b/packages/sdk-python/src/relay_sdk/relay.py @@ -9,7 +9,7 @@ from .client import AsyncHttpClient, HttpClient from .errors import RelayError from .local_runtime import ensure_local_runtime -from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace +from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace, WorkspaceStreamConfig def _enc(value: str) -> str: @@ -20,11 +20,35 @@ def _is_duplicate_agent_error(err: RelayError) -> bool: return err.status == 409 and err.code in {"agent_already_exists", "name_conflict"} +class _WorkspaceStreamNamespace: + def __init__(self, client: HttpClient) -> None: + self._client = client + + def get(self) -> WorkspaceStreamConfig: + result = self._client.get("/v1/workspace/stream") + return WorkspaceStreamConfig.model_validate(result) + + def set(self, enabled: bool) -> WorkspaceStreamConfig: + result = self._client.put("/v1/workspace/stream", {"enabled": enabled}) + return WorkspaceStreamConfig.model_validate(result) + + def inherit(self) -> WorkspaceStreamConfig: + result = self._client.put("/v1/workspace/stream", {"mode": "inherit"}) + return WorkspaceStreamConfig.model_validate(result) + + def ensure_enabled(self) -> WorkspaceStreamConfig: + config = self.get() + if config.enabled: + return config + return self.set(True) + + class _WorkspaceNamespace: """Sync workspace operations.""" def __init__(self, client: HttpClient) -> None: self._client = client + self.stream = _WorkspaceStreamNamespace(client) def info(self) -> Workspace: result = self._client.get("/v1/workspace") @@ -172,11 +196,35 @@ def __exit__(self, *args: Any) -> None: # ── Async variants ──────────────────────────────────────────────── +class _AsyncWorkspaceStreamNamespace: + def __init__(self, client: AsyncHttpClient) -> None: + self._client = client + + async def get(self) -> WorkspaceStreamConfig: + result = await self._client.get("/v1/workspace/stream") + return WorkspaceStreamConfig.model_validate(result) + + async def set(self, enabled: bool) -> WorkspaceStreamConfig: + result = await self._client.put("/v1/workspace/stream", {"enabled": enabled}) + return WorkspaceStreamConfig.model_validate(result) + + async def inherit(self) -> WorkspaceStreamConfig: + result = await self._client.put("/v1/workspace/stream", {"mode": "inherit"}) + return WorkspaceStreamConfig.model_validate(result) + + async def ensure_enabled(self) -> WorkspaceStreamConfig: + config = await self.get() + if config.enabled: + return config + return await self.set(True) + + class _AsyncWorkspaceNamespace: """Async workspace operations.""" def __init__(self, client: AsyncHttpClient) -> None: self._client = client + self.stream = _AsyncWorkspaceStreamNamespace(client) async def info(self) -> Workspace: result = await self._client.get("/v1/workspace") diff --git a/packages/sdk-python/tests/test_agent.py b/packages/sdk-python/tests/test_agent.py index 07498bc..de9cf37 100644 --- a/packages/sdk-python/tests/test_agent.py +++ b/packages/sdk-python/tests/test_agent.py @@ -8,6 +8,7 @@ from relay_sdk.agent import AgentClient, AsyncAgentClient from relay_sdk.client import AsyncHttpClient, HttpClient +from relay_sdk.errors import RelayError from relay_sdk.models import ( Channel, ChannelMemberInfo, @@ -612,3 +613,30 @@ async def test_presence_disconnect_alias_posts_disconnect(self): await c.disconnect() assert route.called await c.client.close() + +class TestAgentChannelEnsureJoined: + @respx.mock + def test_channels_ensure_joined_creates_and_joins(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(201, json={"ok": True, "data": CHANNEL})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"channel": "general", "agent_id": "agent_1", "already_member": False})) + client = AgentClient(HttpClient(TOKEN, BASE)) + assert client.channels.ensure_joined("general", topic="General discussion") == {"created": True, "joined": True} + + @respx.mock + def test_channels_ensure_joined_treats_duplicate_create_and_already_member_as_success(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_already_exists", "message": "exists"}})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"channel": "general", "agent_id": "agent_1", "already_member": True})) + client = AgentClient(HttpClient(TOKEN, BASE)) + assert client.channels.ensure_joined("general") == {"created": False, "joined": False} + + @respx.mock + def test_channels_ensure_joined_does_not_swallow_join_conflicts(self): + respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_already_exists", "message": "exists"}})) + respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "already_joined", "message": "joined"}})) + client = AgentClient(HttpClient(TOKEN, BASE)) + + with pytest.raises(RelayError) as excinfo: + client.channels.ensure_joined("general") + + assert excinfo.value.status == 409 + assert excinfo.value.code == "already_joined" diff --git a/packages/sdk-python/tests/test_relay.py b/packages/sdk-python/tests/test_relay.py index 90d825f..746cef7 100644 --- a/packages/sdk-python/tests/test_relay.py +++ b/packages/sdk-python/tests/test_relay.py @@ -243,3 +243,22 @@ async def test_as_agent_returns_async_client(self): async with AsyncRelay(KEY, base_url=BASE) as r: ac = r.as_agent("at_xxx") assert isinstance(ac, AsyncAgentClient) + + +class TestRelayWorkspaceStream: + @respx.mock + def test_workspace_stream_ensure_enabled_is_noop_when_enabled(self): + route = respx.get(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": True, "default_enabled": True, "override": True})) + r = Relay(KEY, base_url=BASE) + cfg = r.workspace.stream.ensure_enabled() + assert cfg.enabled is True + assert route.called + + @respx.mock + def test_workspace_stream_ensure_enabled_sets_when_disabled(self): + respx.get(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": False, "default_enabled": False, "override": None})) + put = respx.put(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": True, "default_enabled": False, "override": True})) + r = Relay(KEY, base_url=BASE) + cfg = r.workspace.stream.ensure_enabled() + assert cfg.enabled is True + assert put.called diff --git a/packages/sdk-rust/Cargo.toml b/packages/sdk-rust/Cargo.toml index ce94f2d..92ee217 100644 --- a/packages/sdk-rust/Cargo.toml +++ b/packages/sdk-rust/Cargo.toml @@ -27,6 +27,7 @@ urlencoding = "2.1" tokio-test = "0.4" mockito = "1.5" wiremock = "0.6" +httpmock = "0.7" tempfile = "3.19" [features] diff --git a/packages/sdk-rust/src/agent.rs b/packages/sdk-rust/src/agent.rs index 1b389a5..c4de2bb 100644 --- a/packages/sdk-rust/src/agent.rs +++ b/packages/sdk-rust/src/agent.rs @@ -1,7 +1,7 @@ //! Agent client for message and channel operations. use crate::client::{ClientOptions, HttpClient, RequestOptions}; -use crate::error::Result; +use crate::error::{RelayError, Result}; use crate::types::*; use crate::ws::{EventReceiver, LifecycleReceiver, WsClient, WsClientOptions}; @@ -10,6 +10,13 @@ fn strip_hash(channel: &str) -> &str { channel.strip_prefix('#').unwrap_or(channel) } +/// Outcome from idempotently ensuring a channel exists and is joined. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct EnsureChannelJoinedOutcome { + pub created: bool, + pub joined: bool, +} + /// Client for agent-level operations. pub struct AgentClient { client: HttpClient, @@ -576,6 +583,40 @@ impl AgentClient { .await } + /// Ensure a channel exists and that the current agent is joined to it. + /// + /// Duplicate channel creation is treated as a successful no-op, and join + /// idempotence is derived from the join response's `already_member` field. + pub async fn ensure_channel_joined( + &self, + request: CreateChannelRequest, + ) -> Result { + let channel_name = request.name.clone(); + + let created = match self.create_channel(request).await { + Ok(_) => true, + Err(error) + if error.is_conflict() && error.code() == Some("channel_already_exists") => + { + false + } + Err(error) => return Err(error), + }; + + let join_response = self.join_channel(&channel_name).await?; + let joined = join_response + .get("already_member") + .and_then(serde_json::Value::as_bool) + .map(|already_member| !already_member) + .ok_or_else(|| { + RelayError::InvalidResponse( + "channel join response missing already_member".into(), + ) + })?; + + Ok(EnsureChannelJoinedOutcome { created, joined }) + } + /// Leave a channel. pub async fn leave_channel(&self, name: &str) -> Result<()> { self.client @@ -845,3 +886,124 @@ impl AgentClient { self.client.get("/v1/files", query_ref, None).await } } + + +#[cfg(test)] +mod tests { + use super::AgentClient; + use crate::{CreateChannelRequest, EnsureChannelJoinedOutcome}; + use httpmock::{Method::POST, MockServer, Then, When}; + + fn client(base_url: &str) -> AgentClient { + AgentClient::new("at_live_test", Some(base_url.to_string())).expect("agent client") + } + + #[tokio::test] + async fn ensure_channel_joined_creates_and_joins_when_missing() { + let server = MockServer::start(); + let create = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels"); + then.status(201).json_body_obj(&serde_json::json!({ + "ok": true, + "data": { + "id": "ch_1", + "workspace_id": "w_1", + "name": "general", + "channel_type": 0, + "topic": "General discussion", + "metadata": {}, + "created_by": "agent_1", + "created_at": "2026-03-26T00:00:00Z", + "is_archived": false + } + })); + }); + let join = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels/general/join"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": { + "channel": "general", + "agent_id": "agent_1", + "already_member": false + } + })); + }); + + let agent = client(&server.base_url()); + let outcome = agent.ensure_channel_joined(CreateChannelRequest { + name: "general".into(), + topic: Some("General discussion".into()), + metadata: None, + }).await.expect("ensure joined succeeds"); + + assert_eq!(outcome, EnsureChannelJoinedOutcome { created: true, joined: true }); + create.assert(); + join.assert(); + } + + #[tokio::test] + async fn ensure_channel_joined_treats_duplicate_create_and_already_member_as_success() { + let server = MockServer::start(); + let create = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "channel_already_exists", "message": "exists"} + })); + }); + let join = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels/general/join"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": { + "channel": "general", + "agent_id": "agent_1", + "already_member": true + } + })); + }); + + let agent = client(&server.base_url()); + let outcome = agent.ensure_channel_joined(CreateChannelRequest { + name: "general".into(), + topic: None, + metadata: None, + }).await.expect("ensure joined succeeds"); + + assert_eq!(outcome, EnsureChannelJoinedOutcome { created: false, joined: false }); + create.assert(); + join.assert(); + } + + #[tokio::test] + async fn ensure_channel_joined_does_not_swallow_join_conflicts() { + let server = MockServer::start(); + let create = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "channel_already_exists", "message": "exists"} + })); + }); + let join = server.mock(|when: When, then: Then| { + when.method(POST).path("/v1/channels/general/join"); + then.status(409).json_body_obj(&serde_json::json!({ + "ok": false, + "error": {"code": "already_joined", "message": "joined"} + })); + }); + + let agent = client(&server.base_url()); + let error = agent.ensure_channel_joined(CreateChannelRequest { + name: "general".into(), + topic: None, + metadata: None, + }).await.expect_err("ensure joined should fail"); + + assert!(error.is_conflict()); + assert_eq!(error.code(), Some("already_joined")); + create.assert(); + join.assert(); + } +} diff --git a/packages/sdk-rust/src/lib.rs b/packages/sdk-rust/src/lib.rs index 88303bb..7a1c9ce 100644 --- a/packages/sdk-rust/src/lib.rs +++ b/packages/sdk-rust/src/lib.rs @@ -75,7 +75,7 @@ pub mod types; pub mod ws; // Re-export main types -pub use agent::AgentClient; +pub use agent::{AgentClient, EnsureChannelJoinedOutcome}; pub use client::{ClientOptions, HttpClient, RequestOptions}; pub use error::{RelayError, Result}; pub use registration::{ diff --git a/packages/sdk-rust/src/relay.rs b/packages/sdk-rust/src/relay.rs index 9119d7f..ab00079 100644 --- a/packages/sdk-rust/src/relay.rs +++ b/packages/sdk-rust/src/relay.rs @@ -307,6 +307,18 @@ impl RelayCast { .await } + /// Ensure workspace-wide WebSocket fanout is enabled. + /// + /// Returns the effective configuration after ensuring it is enabled. If the + /// stream is already enabled, this is a read-only no-op. + pub async fn ensure_workspace_stream_enabled(&self) -> Result { + let config = self.workspace_stream_get().await?; + if config.enabled { + return Ok(config); + } + self.workspace_stream_set(true).await + } + /// Clear workspace stream override and inherit default behavior. pub async fn workspace_stream_inherit(&self) -> Result { self.client @@ -758,3 +770,62 @@ impl RelayCast { .await } } + + +#[cfg(test)] +mod tests { + use super::RelayCast; + use crate::RelayCastOptions; + use httpmock::{Method::{GET, PUT}, MockServer, Then, When}; + + fn relay(base_url: &str) -> RelayCast { + RelayCast::new(RelayCastOptions::new("rk_live_test").with_base_url(base_url)).expect("relay") + } + + #[tokio::test] + async fn ensure_workspace_stream_enabled_is_noop_when_already_enabled() { + let server = MockServer::start(); + let get = server.mock(|when: When, then: Then| { + when.method(GET).path("/v1/workspace/stream"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": {"enabled": true, "default_enabled": true, "override": true} + })); + }); + let put = server.mock(|when: When, then: Then| { + when.method(PUT).path("/v1/workspace/stream"); + then.status(200); + }); + + let relay = relay(&server.base_url()); + let config = relay.ensure_workspace_stream_enabled().await.expect("stream enabled"); + assert!(config.enabled); + get.assert(); + assert_eq!(put.hits(), 0); + } + + #[tokio::test] + async fn ensure_workspace_stream_enabled_sets_override_when_disabled() { + let server = MockServer::start(); + let get = server.mock(|when: When, then: Then| { + when.method(GET).path("/v1/workspace/stream"); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": {"enabled": false, "default_enabled": false, "override": null} + })); + }); + let put = server.mock(|when: When, then: Then| { + when.method(PUT).path("/v1/workspace/stream").json_body_obj(&serde_json::json!({"enabled": true})); + then.status(200).json_body_obj(&serde_json::json!({ + "ok": true, + "data": {"enabled": true, "default_enabled": false, "override": true} + })); + }); + + let relay = relay(&server.base_url()); + let config = relay.ensure_workspace_stream_enabled().await.expect("stream enabled"); + assert!(config.enabled); + get.assert(); + put.assert(); + } +} diff --git a/packages/sdk-typescript/src/__tests__/agent.test.ts b/packages/sdk-typescript/src/__tests__/agent.test.ts new file mode 100644 index 0000000..29bceb1 --- /dev/null +++ b/packages/sdk-typescript/src/__tests__/agent.test.ts @@ -0,0 +1,46 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { AgentClient } from '../agent.js'; +import { HttpClient, RelayError } from '../client.js'; + +describe('AgentClient.channels.ensureJoined', () => { + const originalFetch = global.fetch; + + beforeEach(() => { + global.fetch = vi.fn() as typeof fetch; + }); + + afterEach(() => { + global.fetch = originalFetch; + }); + + it('creates and joins when missing', async () => { + (global.fetch as unknown as ReturnType) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { id: 'ch_1', workspace_id: 'w_1', name: 'general', channel_type: 0, topic: 'General discussion', metadata: {}, created_by: 'agent_1', created_at: '2026-03-26T00:00:00Z', is_archived: false } }), { status: 201 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { channel: 'general', agent_id: 'agent_1', already_member: false } }), { status: 200 })); + + const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); + await expect(client.channels.ensureJoined('general', { topic: 'General discussion' })).resolves.toEqual({ created: true, joined: true }); + }); + + it('treats duplicate create plus already-member join as success', async () => { + (global.fetch as unknown as ReturnType) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'channel_already_exists', message: 'exists' } }), { status: 409 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: true, data: { channel: 'general', agent_id: 'agent_1', already_member: true } }), { status: 200 })); + + const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); + await expect(client.channels.ensureJoined('general')).resolves.toEqual({ created: false, joined: false }); + }); + + it('does not swallow join conflicts', async () => { + (global.fetch as unknown as ReturnType) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'channel_already_exists', message: 'exists' } }), { status: 409 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ ok: false, error: { code: 'already_joined', message: 'joined' } }), { status: 409 })); + + const client = new AgentClient(new HttpClient({ apiKey: 'at_live_test', baseUrl: 'https://api.relaycast.dev' })); + + await expect(client.channels.ensureJoined('general')).rejects.toMatchObject({ + statusCode: 409, + rawCode: 'already_joined', + } satisfies Partial); + }); +}); diff --git a/packages/sdk-typescript/src/__tests__/relay.test.ts b/packages/sdk-typescript/src/__tests__/relay.test.ts index aac3059..2fe426b 100644 --- a/packages/sdk-typescript/src/__tests__/relay.test.ts +++ b/packages/sdk-typescript/src/__tests__/relay.test.ts @@ -1,9 +1,14 @@ -import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; // Mock global fetch once for this file. +const originalFetch = global.fetch; const mockFetch = vi.fn(); vi.stubGlobal('fetch', mockFetch); +afterAll(() => { + global.fetch = originalFetch; +}); + function mockResponse(data: unknown, apiOk = true, status = 200) { return Promise.resolve({ ok: status >= 200 && status < 300, @@ -15,6 +20,7 @@ function mockResponse(data: unknown, apiOk = true, status = 200) { describe('RelayCast', () => { beforeEach(() => { + global.fetch = mockFetch as typeof fetch; mockFetch.mockReset(); vi.useRealTimers(); }); @@ -1139,3 +1145,38 @@ describe('RelayCast', () => { }); }); }); + +describe('workspace.stream.ensureEnabled', () => { + it('returns current config without PUT when already enabled', async () => { + mockFetch.mockResolvedValueOnce(new Response(JSON.stringify({ + ok: true, + data: { enabled: true, default_enabled: true, override: true }, + }), { status: 200 })); + + const { RelayCast } = await import('../relay.js'); + const relay = new RelayCast({ apiKey: 'rk_live_test' }); + const result = await relay.workspace.stream.ensureEnabled(); + + expect(result.enabled).toBe(true); + expect(global.fetch).toHaveBeenCalledTimes(1); + }); + + it('enables stream when disabled', async () => { + mockFetch + .mockResolvedValueOnce(new Response(JSON.stringify({ + ok: true, + data: { enabled: false, default_enabled: false, override: null }, + }), { status: 200 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ + ok: true, + data: { enabled: true, default_enabled: false, override: true }, + }), { status: 200 })); + + const { RelayCast } = await import('../relay.js'); + const relay = new RelayCast({ apiKey: 'rk_live_test' }); + const result = await relay.workspace.stream.ensureEnabled(); + + expect(result.enabled).toBe(true); + expect(global.fetch).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/sdk-typescript/src/agent.ts b/packages/sdk-typescript/src/agent.ts index 2b0fa95..8f77369 100644 --- a/packages/sdk-typescript/src/agent.ts +++ b/packages/sdk-typescript/src/agent.ts @@ -55,7 +55,7 @@ import type { WsReconnectingEvent, WsPermanentlyDisconnectedEvent, } from './types.js'; -import { HttpClient, type RequestOptions } from './client.js'; +import { HttpClient, RelayError, type RequestOptions } from './client.js'; import { WsClient, type WsClientOptions, withInternalWsOrigin } from './ws.js'; function stripHash(channel: string): string { @@ -435,6 +435,35 @@ export class AgentClient { join: (name: string): Promise => this.client.post(`/v1/channels/${encodeURIComponent(name)}/join`), + ensureJoined: async ( + name: string, + options?: { topic?: string; metadata?: CreateChannelRequest['metadata'] }, + ): Promise<{ created: boolean; joined: boolean }> => { + let created = false; + + try { + await this.client.post('/v1/channels', { + name, + topic: options?.topic, + metadata: options?.metadata, + }); + created = true; + } catch (error) { + if ( + !(error instanceof RelayError) + || error.statusCode !== 409 + || error.rawCode !== 'channel_already_exists' + ) { + throw error; + } + } + + const joinResult = await this.channels.join(name); + const joined = !joinResult.alreadyMember; + + return { created, joined }; + }, + leave: async (name: string): Promise => { await this.client.post(`/v1/channels/${encodeURIComponent(name)}/leave`); }, diff --git a/packages/sdk-typescript/src/relay.ts b/packages/sdk-typescript/src/relay.ts index 94461f9..08f28af 100644 --- a/packages/sdk-typescript/src/relay.ts +++ b/packages/sdk-typescript/src/relay.ts @@ -377,6 +377,13 @@ export class RelayCast { this.client.put('/v1/workspace/stream', { enabled }), inherit: (): Promise => this.client.put('/v1/workspace/stream', { mode: 'inherit' }), + ensureEnabled: async (): Promise => { + const config = await this.client.get('/v1/workspace/stream'); + if (config.enabled) { + return config; + } + return this.client.put('/v1/workspace/stream', { enabled: true }); + }, }, };