-
Notifications
You must be signed in to change notification settings - Fork 0
feat(sdk): add idempotent workspace bootstrap helpers #111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| from urllib.parse import quote | ||
|
|
||
| from .client import AsyncHttpClient, HttpClient | ||
| from .errors import RelayError | ||
| from .models import ( | ||
| Channel, | ||
| ChannelMemberInfo, | ||
|
|
@@ -120,6 +121,18 @@ def get(self, name: str) -> dict[str, Any]: | |
| def join(self, name: str) -> Any: | ||
| return self._client.post(f"/v1/channels/{_enc(name)}/join") | ||
|
|
||
| def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: | ||
| created = False | ||
| try: | ||
| self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) | ||
| created = True | ||
| except RelayError as err: | ||
| if err.status != 409 or err.code != "channel_already_exists": | ||
| raise | ||
| join_result = self.join(name) | ||
| joined = not bool(join_result.get("already_member")) | ||
| return {"created": created, "joined": joined} | ||
|
|
||
| def leave(self, name: str) -> None: | ||
| self._client.post(f"/v1/channels/{_enc(name)}/leave") | ||
|
|
||
|
|
@@ -437,6 +450,18 @@ async def get(self, name: str) -> dict[str, Any]: | |
| async def join(self, name: str) -> Any: | ||
| return await self._client.post(f"/v1/channels/{_enc(name)}/join") | ||
|
|
||
| async def ensure_joined(self, name: str, *, topic: str | None = None, metadata: dict[str, Any] | None = None) -> dict[str, bool]: | ||
| created = False | ||
| try: | ||
| await self._client.post("/v1/channels", CreateChannelRequest(name=name, topic=topic, metadata=metadata).model_dump(exclude_none=True)) | ||
| created = True | ||
| except RelayError as err: | ||
| if err.status != 409 or err.code != "channel_already_exists": | ||
| raise | ||
| join_result = await self.join(name) | ||
| joined = not bool(join_result.get("already_member")) | ||
| return {"created": created, "joined": joined} | ||
|
Comment on lines
+453
to
+463
|
||
|
|
||
| async def leave(self, name: str) -> None: | ||
| await self._client.post(f"/v1/channels/{_enc(name)}/leave") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,7 @@ | |
| from .client import AsyncHttpClient, HttpClient | ||
| from .errors import RelayError | ||
| from .local_runtime import ensure_local_runtime | ||
| from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace | ||
| from .models import Agent, CreateAgentRequest, CreateAgentResponse, TokenRotateResponse, Workspace, WorkspaceStreamConfig | ||
|
|
||
|
|
||
| def _enc(value: str) -> str: | ||
|
|
@@ -20,11 +20,35 @@ def _is_duplicate_agent_error(err: RelayError) -> bool: | |
| return err.status == 409 and err.code in {"agent_already_exists", "name_conflict"} | ||
|
|
||
|
|
||
| class _WorkspaceStreamNamespace: | ||
| def __init__(self, client: HttpClient) -> None: | ||
| self._client = client | ||
|
|
||
| def get(self) -> WorkspaceStreamConfig: | ||
| result = self._client.get("/v1/workspace/stream") | ||
| return WorkspaceStreamConfig.model_validate(result) | ||
|
|
||
| def set(self, enabled: bool) -> WorkspaceStreamConfig: | ||
| result = self._client.put("/v1/workspace/stream", {"enabled": enabled}) | ||
| return WorkspaceStreamConfig.model_validate(result) | ||
|
|
||
| def inherit(self) -> WorkspaceStreamConfig: | ||
| result = self._client.put("/v1/workspace/stream", {"mode": "inherit"}) | ||
| return WorkspaceStreamConfig.model_validate(result) | ||
|
Comment on lines
+31
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Python
Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| def ensure_enabled(self) -> WorkspaceStreamConfig: | ||
| config = self.get() | ||
| if config.enabled: | ||
| return config | ||
| return self.set(True) | ||
|
|
||
|
|
||
| class _WorkspaceNamespace: | ||
| """Sync workspace operations.""" | ||
|
|
||
| def __init__(self, client: HttpClient) -> None: | ||
| self._client = client | ||
| self.stream = _WorkspaceStreamNamespace(client) | ||
|
|
||
| def info(self) -> Workspace: | ||
| result = self._client.get("/v1/workspace") | ||
|
|
@@ -172,11 +196,35 @@ def __exit__(self, *args: Any) -> None: | |
| # ── Async variants ──────────────────────────────────────────────── | ||
|
|
||
|
|
||
| class _AsyncWorkspaceStreamNamespace: | ||
| def __init__(self, client: AsyncHttpClient) -> None: | ||
| self._client = client | ||
|
|
||
| async def get(self) -> WorkspaceStreamConfig: | ||
| result = await self._client.get("/v1/workspace/stream") | ||
| return WorkspaceStreamConfig.model_validate(result) | ||
|
|
||
| async def set(self, enabled: bool) -> WorkspaceStreamConfig: | ||
| result = await self._client.put("/v1/workspace/stream", {"enabled": enabled}) | ||
| return WorkspaceStreamConfig.model_validate(result) | ||
|
|
||
| async def inherit(self) -> WorkspaceStreamConfig: | ||
| result = await self._client.put("/v1/workspace/stream", {"mode": "inherit"}) | ||
| return WorkspaceStreamConfig.model_validate(result) | ||
|
|
||
| async def ensure_enabled(self) -> WorkspaceStreamConfig: | ||
| config = await self.get() | ||
| if config.enabled: | ||
| return config | ||
| return await self.set(True) | ||
|
|
||
|
|
||
| class _AsyncWorkspaceNamespace: | ||
| """Async workspace operations.""" | ||
|
|
||
| def __init__(self, client: AsyncHttpClient) -> None: | ||
| self._client = client | ||
| self.stream = _AsyncWorkspaceStreamNamespace(client) | ||
|
|
||
| async def info(self) -> Workspace: | ||
| result = await self._client.get("/v1/workspace") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| //! Agent client for message and channel operations. | ||
|
|
||
| use crate::client::{ClientOptions, HttpClient, RequestOptions}; | ||
| use crate::error::Result; | ||
| use crate::error::{RelayError, Result}; | ||
| use crate::types::*; | ||
| use crate::ws::{EventReceiver, LifecycleReceiver, WsClient, WsClientOptions}; | ||
|
|
||
|
|
@@ -10,6 +10,13 @@ fn strip_hash(channel: &str) -> &str { | |
| channel.strip_prefix('#').unwrap_or(channel) | ||
| } | ||
|
|
||
| /// Outcome from idempotently ensuring a channel exists and is joined. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub struct EnsureChannelJoinedOutcome { | ||
| pub created: bool, | ||
| pub joined: bool, | ||
| } | ||
|
|
||
| /// Client for agent-level operations. | ||
| pub struct AgentClient { | ||
| client: HttpClient, | ||
|
|
@@ -576,6 +583,40 @@ impl AgentClient { | |
| .await | ||
| } | ||
|
|
||
| /// Ensure a channel exists and that the current agent is joined to it. | ||
| /// | ||
| /// Duplicate channel creation is treated as a successful no-op, and join | ||
| /// idempotence is derived from the join response's `already_member` field. | ||
| pub async fn ensure_channel_joined( | ||
| &self, | ||
| request: CreateChannelRequest, | ||
| ) -> Result<EnsureChannelJoinedOutcome> { | ||
| let channel_name = request.name.clone(); | ||
|
|
||
| let created = match self.create_channel(request).await { | ||
| Ok(_) => true, | ||
| Err(error) | ||
| if error.is_conflict() && error.code() == Some("channel_already_exists") => | ||
| { | ||
| false | ||
| } | ||
| Err(error) => return Err(error), | ||
| }; | ||
|
|
||
| let join_response = self.join_channel(&channel_name).await?; | ||
| let joined = join_response | ||
| .get("already_member") | ||
| .and_then(serde_json::Value::as_bool) | ||
| .map(|already_member| !already_member) | ||
| .ok_or_else(|| { | ||
| RelayError::InvalidResponse( | ||
| "channel join response missing already_member".into(), | ||
| ) | ||
| })?; | ||
|
|
||
| Ok(EnsureChannelJoinedOutcome { created, joined }) | ||
| } | ||
|
Comment on lines
+586
to
+618
|
||
|
|
||
| /// Leave a channel. | ||
| pub async fn leave_channel(&self, name: &str) -> Result<()> { | ||
| self.client | ||
|
|
@@ -845,3 +886,124 @@ impl AgentClient { | |
| self.client.get("/v1/files", query_ref, None).await | ||
| } | ||
| } | ||
|
|
||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::AgentClient; | ||
| use crate::{CreateChannelRequest, EnsureChannelJoinedOutcome}; | ||
| use httpmock::{Method::POST, MockServer, Then, When}; | ||
|
|
||
| fn client(base_url: &str) -> AgentClient { | ||
| AgentClient::new("at_live_test", Some(base_url.to_string())).expect("agent client") | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn ensure_channel_joined_creates_and_joins_when_missing() { | ||
| let server = MockServer::start(); | ||
| let create = server.mock(|when: When, then: Then| { | ||
| when.method(POST).path("/v1/channels"); | ||
| then.status(201).json_body_obj(&serde_json::json!({ | ||
| "ok": true, | ||
| "data": { | ||
| "id": "ch_1", | ||
| "workspace_id": "w_1", | ||
| "name": "general", | ||
| "channel_type": 0, | ||
| "topic": "General discussion", | ||
| "metadata": {}, | ||
| "created_by": "agent_1", | ||
| "created_at": "2026-03-26T00:00:00Z", | ||
| "is_archived": false | ||
| } | ||
| })); | ||
| }); | ||
| let join = server.mock(|when: When, then: Then| { | ||
| when.method(POST).path("/v1/channels/general/join"); | ||
| then.status(200).json_body_obj(&serde_json::json!({ | ||
| "ok": true, | ||
| "data": { | ||
| "channel": "general", | ||
| "agent_id": "agent_1", | ||
| "already_member": false | ||
| } | ||
| })); | ||
| }); | ||
|
|
||
| let agent = client(&server.base_url()); | ||
| let outcome = agent.ensure_channel_joined(CreateChannelRequest { | ||
| name: "general".into(), | ||
| topic: Some("General discussion".into()), | ||
| metadata: None, | ||
| }).await.expect("ensure joined succeeds"); | ||
|
|
||
| assert_eq!(outcome, EnsureChannelJoinedOutcome { created: true, joined: true }); | ||
| create.assert(); | ||
| join.assert(); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn ensure_channel_joined_treats_duplicate_create_and_already_member_as_success() { | ||
| let server = MockServer::start(); | ||
| let create = server.mock(|when: When, then: Then| { | ||
| when.method(POST).path("/v1/channels"); | ||
| then.status(409).json_body_obj(&serde_json::json!({ | ||
| "ok": false, | ||
| "error": {"code": "channel_already_exists", "message": "exists"} | ||
| })); | ||
| }); | ||
| let join = server.mock(|when: When, then: Then| { | ||
| when.method(POST).path("/v1/channels/general/join"); | ||
| then.status(200).json_body_obj(&serde_json::json!({ | ||
| "ok": true, | ||
| "data": { | ||
| "channel": "general", | ||
| "agent_id": "agent_1", | ||
| "already_member": true | ||
| } | ||
| })); | ||
| }); | ||
|
|
||
| let agent = client(&server.base_url()); | ||
| let outcome = agent.ensure_channel_joined(CreateChannelRequest { | ||
| name: "general".into(), | ||
| topic: None, | ||
| metadata: None, | ||
| }).await.expect("ensure joined succeeds"); | ||
|
|
||
| assert_eq!(outcome, EnsureChannelJoinedOutcome { created: false, joined: false }); | ||
| create.assert(); | ||
| join.assert(); | ||
| } | ||
|
Comment on lines
+945
to
+977
|
||
|
|
||
| #[tokio::test] | ||
| async fn ensure_channel_joined_does_not_swallow_join_conflicts() { | ||
| let server = MockServer::start(); | ||
| let create = server.mock(|when: When, then: Then| { | ||
| when.method(POST).path("/v1/channels"); | ||
| then.status(409).json_body_obj(&serde_json::json!({ | ||
| "ok": false, | ||
| "error": {"code": "channel_already_exists", "message": "exists"} | ||
| })); | ||
| }); | ||
| let join = server.mock(|when: When, then: Then| { | ||
| when.method(POST).path("/v1/channels/general/join"); | ||
| then.status(409).json_body_obj(&serde_json::json!({ | ||
| "ok": false, | ||
| "error": {"code": "already_joined", "message": "joined"} | ||
| })); | ||
| }); | ||
|
|
||
| let agent = client(&server.base_url()); | ||
| let error = agent.ensure_channel_joined(CreateChannelRequest { | ||
| name: "general".into(), | ||
| topic: None, | ||
| metadata: None, | ||
| }).await.expect_err("ensure joined should fail"); | ||
|
|
||
| assert!(error.is_conflict()); | ||
| assert_eq!(error.code(), Some("already_joined")); | ||
| create.assert(); | ||
| join.assert(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensure_joinedcatches broadExceptionand uses 409 as the join idempotency signal. The SDK’sHttpClientraisesRelayError, and the join endpoint is already idempotent (200 withalready_member). Consider catchingRelayErrorspecifically and using the join response’salready_memberto compute whether a join actually happened (and remove the join-side 409 swallow).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in ace6ab4. The sync Python helper now catches
RelayErrorspecifically on create, only tolerateschannel_already_exists, and computesjoinedfrom the join response payload instead of swallowing join-side 409s.