Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/sdk-python/src/relay_sdk/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from urllib.parse import quote

from .client import AsyncHttpClient, HttpClient
from .errors import RelayError
from .models import (
Channel,
ChannelMemberInfo,
Expand Down Expand Up @@ -120,6 +121,18 @@ def get(self, name: str) -> dict[str, Any]:
def join(self, name: str) -> Any:
return self._client.post(f"/v1/channels/{_enc(name)}/join")

def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]:
created = False
try:
self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True))
created = True
except RelayError as err:
if err.status != 409 or err.code != "channel_already_exists":
raise
join_result = self.join(name)
joined = not bool(join_result.get("already_member"))
return {"created": created, "joined": joined}
Comment on lines +124 to +134
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure_joined catches broad Exception and uses 409 as the join idempotency signal. The SDK’s HttpClient raises RelayError, and the join endpoint is already idempotent (200 with already_member). Consider catching RelayError specifically and using the join response’s already_member to compute whether a join actually happened (and remove the join-side 409 swallow).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ace6ab4. The sync Python helper now catches RelayError specifically on create, only tolerates channel_already_exists, and computes joined from the join response payload instead of swallowing join-side 409s.


def leave(self, name: str) -> None:
self._client.post(f"/v1/channels/{_enc(name)}/leave")

Expand Down Expand Up @@ -437,6 +450,18 @@ async def get(self, name: str) -> dict[str, Any]:
async def join(self, name: str) -> Any:
return await self._client.post(f"/v1/channels/{_enc(name)}/join")

async def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]:
created = False
try:
await self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True))
created = True
except RelayError as err:
if err.status != 409 or err.code != "channel_already_exists":
raise
join_result = await self.join(name)
joined = not bool(join_result.get("already_member"))
return {"created": created, "joined": joined}
Comment on lines +453 to +463
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue for the async variant: it catches broad Exception and treats 409 as an expected join outcome, but join is 200 with already_member. Catch RelayError and use the response payload to determine whether the join was a no-op.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ace6ab4. The async Python helper now mirrors the sync fix: specific RelayError handling for duplicate create, then joined comes from the 200 already_member response.


async def leave(self, name: str) -> None:
await self._client.post(f"/v1/channels/{_enc(name)}/leave")

Expand Down
6 changes: 6 additions & 0 deletions packages/sdk-python/src/relay_sdk/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class CreateWorkspaceResponse(BaseModel):
created_at: str


class WorkspaceStreamConfig(BaseModel):
enabled: bool
default_enabled: bool
override: bool | None = Field(default=None, alias='override')


# ── Channel ───────────────────────────────────────────────────────

class Channel(BaseModel):
Expand Down
50 changes: 49 additions & 1 deletion packages/sdk-python/src/relay_sdk/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .client import AsyncHttpClient, HttpClient
from .errors import RelayError
from .local_runtime import ensure_local_runtime
from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace
from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace, WorkspaceStreamConfig


def _enc(value: str) -> str:
Expand All @@ -20,11 +20,35 @@ def _is_duplicate_agent_error(err: RelayError) -> bool:
return err.status == 409 and err.code in {"agent_already_exists", "name_conflict"}


class _WorkspaceStreamNamespace:
def __init__(self, client: HttpClient) -> None:
self._client = client

def get(self) -> WorkspaceStreamConfig:
result = self._client.get("/v1/workspace/stream")
return WorkspaceStreamConfig.model_validate(result)

def set(self, enabled: bool) -> WorkspaceStreamConfig:
result = self._client.put("/v1/workspace/stream", {"enabled": enabled})
return WorkspaceStreamConfig.model_validate(result)

def inherit(self) -> WorkspaceStreamConfig:
result = self._client.put("/v1/workspace/stream", {"mode": "inherit"})
return WorkspaceStreamConfig.model_validate(result)
Comment on lines +31 to +37
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Python HttpClient and AsyncHttpClient have no put method, causing AttributeError at runtime

_WorkspaceStreamNamespace.set() and .inherit() call self._client.put(...), but HttpClient (defined in packages/sdk-python/src/relay_sdk/client.py) only exposes get, post, patch, delete, and request — there is no put method. This causes an AttributeError at runtime whenever workspace.stream.set(), workspace.stream.inherit(), or workspace.stream.ensure_enabled() (when the stream is currently disabled) is called. The same issue exists for the async variant _AsyncWorkspaceStreamNamespace at packages/sdk-python/src/relay_sdk/relay.py:208 and :212. The project has no mypy/pyright configuration, so this is not caught by CI.

Prompt for agents
The Python HttpClient and AsyncHttpClient classes in packages/sdk-python/src/relay_sdk/client.py are missing a `put` method. The new _WorkspaceStreamNamespace (line 32, 36) and _AsyncWorkspaceStreamNamespace (line 208, 212) in packages/sdk-python/src/relay_sdk/relay.py call self._client.put(...) which does not exist.

Fix: Add a `put` method to both HttpClient and AsyncHttpClient in client.py, following the same pattern as the existing `patch` method:

For HttpClient:
    def put(self, path: str, body: Any = None) -> Any:
        return self.request("PUT", path, body=body)

For AsyncHttpClient:
    async def put(self, path: str, body: Any = None) -> Any:
        return await self.request("PUT", path, body=body)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


def ensure_enabled(self) -> WorkspaceStreamConfig:
config = self.get()
if config.enabled:
return config
return self.set(True)


class _WorkspaceNamespace:
"""Sync workspace operations."""

def __init__(self, client: HttpClient) -> None:
self._client = client
self.stream = _WorkspaceStreamNamespace(client)

def info(self) -> Workspace:
result = self._client.get("/v1/workspace")
Expand Down Expand Up @@ -172,11 +196,35 @@ def __exit__(self, *args: Any) -> None:
# ── Async variants ────────────────────────────────────────────────


class _AsyncWorkspaceStreamNamespace:
def __init__(self, client: AsyncHttpClient) -> None:
self._client = client

async def get(self) -> WorkspaceStreamConfig:
result = await self._client.get("/v1/workspace/stream")
return WorkspaceStreamConfig.model_validate(result)

async def set(self, enabled: bool) -> WorkspaceStreamConfig:
result = await self._client.put("/v1/workspace/stream", {"enabled": enabled})
return WorkspaceStreamConfig.model_validate(result)

async def inherit(self) -> WorkspaceStreamConfig:
result = await self._client.put("/v1/workspace/stream", {"mode": "inherit"})
return WorkspaceStreamConfig.model_validate(result)

async def ensure_enabled(self) -> WorkspaceStreamConfig:
config = await self.get()
if config.enabled:
return config
return await self.set(True)


class _AsyncWorkspaceNamespace:
"""Async workspace operations."""

def __init__(self, client: AsyncHttpClient) -> None:
self._client = client
self.stream = _AsyncWorkspaceStreamNamespace(client)

async def info(self) -> Workspace:
result = await self._client.get("/v1/workspace")
Expand Down
28 changes: 28 additions & 0 deletions packages/sdk-python/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from relay_sdk.agent import AgentClient, AsyncAgentClient
from relay_sdk.client import AsyncHttpClient, HttpClient
from relay_sdk.errors import RelayError
from relay_sdk.models import (
Channel,
ChannelMemberInfo,
Expand Down Expand Up @@ -612,3 +613,30 @@ async def test_presence_disconnect_alias_posts_disconnect(self):
await c.disconnect()
assert route.called
await c.client.close()

class TestAgentChannelEnsureJoined:
@respx.mock
def test_channels_ensure_joined_creates_and_joins(self):
respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(201, json={"ok": True, "data": CHANNEL}))
respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"channel": "general", "agent_id": "agent_1", "already_member": False}))
client = AgentClient(HttpClient(TOKEN, BASE))
assert client.channels.ensure_joined("general", topic="General discussion") == {"created": True, "joined": True}

@respx.mock
def test_channels_ensure_joined_treats_duplicate_create_and_already_member_as_success(self):
respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_already_exists", "message": "exists"}}))
respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=ok({"channel": "general", "agent_id": "agent_1", "already_member": True}))
client = AgentClient(HttpClient(TOKEN, BASE))
assert client.channels.ensure_joined("general") == {"created": False, "joined": False}

@respx.mock
def test_channels_ensure_joined_does_not_swallow_join_conflicts(self):
respx.post(f"{BASE}/v1/channels").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "channel_already_exists", "message": "exists"}}))
respx.post(f"{BASE}/v1/channels/general/join").mock(return_value=httpx.Response(409, json={"ok": False, "error": {"code": "already_joined", "message": "joined"}}))
client = AgentClient(HttpClient(TOKEN, BASE))

with pytest.raises(RelayError) as excinfo:
client.channels.ensure_joined("general")

assert excinfo.value.status == 409
assert excinfo.value.code == "already_joined"
19 changes: 19 additions & 0 deletions packages/sdk-python/tests/test_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,22 @@ async def test_as_agent_returns_async_client(self):
async with AsyncRelay(KEY, base_url=BASE) as r:
ac = r.as_agent("at_xxx")
assert isinstance(ac, AsyncAgentClient)


class TestRelayWorkspaceStream:
@respx.mock
def test_workspace_stream_ensure_enabled_is_noop_when_enabled(self):
route = respx.get(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": True, "default_enabled": True, "override": True}))
r = Relay(KEY, base_url=BASE)
cfg = r.workspace.stream.ensure_enabled()
assert cfg.enabled is True
assert route.called

@respx.mock
def test_workspace_stream_ensure_enabled_sets_when_disabled(self):
respx.get(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": False, "default_enabled": False, "override": None}))
put = respx.put(f"{BASE}/v1/workspace/stream").mock(return_value=ok({"enabled": True, "default_enabled": False, "override": True}))
r = Relay(KEY, base_url=BASE)
cfg = r.workspace.stream.ensure_enabled()
assert cfg.enabled is True
assert put.called
1 change: 1 addition & 0 deletions packages/sdk-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ urlencoding = "2.1"
tokio-test = "0.4"
mockito = "1.5"
wiremock = "0.6"
httpmock = "0.7"
tempfile = "3.19"

[features]
Expand Down
164 changes: 163 additions & 1 deletion packages/sdk-rust/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Agent client for message and channel operations.

use crate::client::{ClientOptions, HttpClient, RequestOptions};
use crate::error::Result;
use crate::error::{RelayError, Result};
use crate::types::*;
use crate::ws::{EventReceiver, LifecycleReceiver, WsClient, WsClientOptions};

Expand All @@ -10,6 +10,13 @@ fn strip_hash(channel: &str) -> &str {
channel.strip_prefix('#').unwrap_or(channel)
}

/// Outcome from idempotently ensuring a channel exists and is joined.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EnsureChannelJoinedOutcome {
pub created: bool,
pub joined: bool,
}

/// Client for agent-level operations.
pub struct AgentClient {
client: HttpClient,
Expand Down Expand Up @@ -576,6 +583,40 @@ impl AgentClient {
.await
}

/// Ensure a channel exists and that the current agent is joined to it.
///
/// Duplicate channel creation is treated as a successful no-op, and join
/// idempotence is derived from the join response's `already_member` field.
pub async fn ensure_channel_joined(
&self,
request: CreateChannelRequest,
) -> Result<EnsureChannelJoinedOutcome> {
let channel_name = request.name.clone();

let created = match self.create_channel(request).await {
Ok(_) => true,
Err(error)
if error.is_conflict() && error.code() == Some("channel_already_exists") =>
{
false
}
Err(error) => return Err(error),
};

let join_response = self.join_channel(&channel_name).await?;
let joined = join_response
.get("already_member")
.and_then(serde_json::Value::as_bool)
.map(|already_member| !already_member)
.ok_or_else(|| {
RelayError::InvalidResponse(
"channel join response missing already_member".into(),
)
})?;

Ok(EnsureChannelJoinedOutcome { created, joined })
}
Comment on lines +586 to +618
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure_channel_joined marks joined = true for any successful join call, but the join endpoint is already idempotent and returns an already_member flag (200) rather than a 409 conflict. If the intent is to report whether a join actually occurred, parse the join response and set joined = !already_member (and consider updating the doc comment which currently describes 409 behavior for join).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ace6ab4. The Rust helper now documents the real behavior, only absorbs duplicate-create conflicts, and derives joined from the join response already_member field.


/// Leave a channel.
pub async fn leave_channel(&self, name: &str) -> Result<()> {
self.client
Expand Down Expand Up @@ -845,3 +886,124 @@ impl AgentClient {
self.client.get("/v1/files", query_ref, None).await
}
}


#[cfg(test)]
mod tests {
use super::AgentClient;
use crate::{CreateChannelRequest, EnsureChannelJoinedOutcome};
use httpmock::{Method::POST, MockServer, Then, When};

fn client(base_url: &str) -> AgentClient {
AgentClient::new("at_live_test", Some(base_url.to_string())).expect("agent client")
}

#[tokio::test]
async fn ensure_channel_joined_creates_and_joins_when_missing() {
let server = MockServer::start();
let create = server.mock(|when: When, then: Then| {
when.method(POST).path("/v1/channels");
then.status(201).json_body_obj(&serde_json::json!({
"ok": true,
"data": {
"id": "ch_1",
"workspace_id": "w_1",
"name": "general",
"channel_type": 0,
"topic": "General discussion",
"metadata": {},
"created_by": "agent_1",
"created_at": "2026-03-26T00:00:00Z",
"is_archived": false
}
}));
});
let join = server.mock(|when: When, then: Then| {
when.method(POST).path("/v1/channels/general/join");
then.status(200).json_body_obj(&serde_json::json!({
"ok": true,
"data": {
"channel": "general",
"agent_id": "agent_1",
"already_member": false
}
}));
});

let agent = client(&server.base_url());
let outcome = agent.ensure_channel_joined(CreateChannelRequest {
name: "general".into(),
topic: Some("General discussion".into()),
metadata: None,
}).await.expect("ensure joined succeeds");

assert_eq!(outcome, EnsureChannelJoinedOutcome { created: true, joined: true });
create.assert();
join.assert();
}

#[tokio::test]
async fn ensure_channel_joined_treats_duplicate_create_and_already_member_as_success() {
let server = MockServer::start();
let create = server.mock(|when: When, then: Then| {
when.method(POST).path("/v1/channels");
then.status(409).json_body_obj(&serde_json::json!({
"ok": false,
"error": {"code": "channel_already_exists", "message": "exists"}
}));
});
let join = server.mock(|when: When, then: Then| {
when.method(POST).path("/v1/channels/general/join");
then.status(200).json_body_obj(&serde_json::json!({
"ok": true,
"data": {
"channel": "general",
"agent_id": "agent_1",
"already_member": true
}
}));
});

let agent = client(&server.base_url());
let outcome = agent.ensure_channel_joined(CreateChannelRequest {
name: "general".into(),
topic: None,
metadata: None,
}).await.expect("ensure joined succeeds");

assert_eq!(outcome, EnsureChannelJoinedOutcome { created: false, joined: false });
create.assert();
join.assert();
}
Comment on lines +945 to +977
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests model channel create/join conflicts as 409 responses with codes channel_exists / already_joined. On the server, create conflicts are channel_already_exists, and join is 200 with already_member rather than 409. Aligning the mocks with the real API will ensure the test exercises the helper correctly.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ace6ab4. The Rust tests now match the real API semantics: create conflicts use channel_already_exists, and join returns 200 with already_member.


#[tokio::test]
async fn ensure_channel_joined_does_not_swallow_join_conflicts() {
let server = MockServer::start();
let create = server.mock(|when: When, then: Then| {
when.method(POST).path("/v1/channels");
then.status(409).json_body_obj(&serde_json::json!({
"ok": false,
"error": {"code": "channel_already_exists", "message": "exists"}
}));
});
let join = server.mock(|when: When, then: Then| {
when.method(POST).path("/v1/channels/general/join");
then.status(409).json_body_obj(&serde_json::json!({
"ok": false,
"error": {"code": "already_joined", "message": "joined"}
}));
});

let agent = client(&server.base_url());
let error = agent.ensure_channel_joined(CreateChannelRequest {
name: "general".into(),
topic: None,
metadata: None,
}).await.expect_err("ensure joined should fail");

assert!(error.is_conflict());
assert_eq!(error.code(), Some("already_joined"));
create.assert();
join.assert();
}
}
Loading
Loading