Skip to content

Commit cf17875

Browse files
George-iamclaude
andauthored
feat: SSE status filter + fix ResponseNotRead in streaming error path
* fix(sdk): increase SSE stream timeout to prevent ReadTimeout SSE streaming calls now use wait_seconds + 15s read timeout instead of the global 15s client timeout. Fixes agent.listen() crashing when the server holds the connection open for 30s keepalive cycles. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add status parameter to listen() for SSE stream filtering Server now excludes terminal statuses (COMPLETED, FAILED, CANCELED, TIMED_OUT) by default. SDK gains optional status parameter to override the server default when needed (e.g. for debugging or audit). Backward-compatible: existing agents that call listen() without status= benefit from server-side zombie filtering automatically. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: handle ResponseNotRead in SSE streaming error path When the SSE stream returns a 4xx/5xx, _raise_http_error accessed response.text inside a streaming context, causing httpx.ResponseNotRead. This crashed agents on transient 503s instead of allowing reconnect. Fix: call response.read() before accessing .text/.json(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8a13e12 commit cf17875

File tree

1 file changed

+36
-1
lines changed

1 file changed

+36
-1
lines changed

axme_sdk/client.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,7 @@ def listen(
832832
since: int = 0,
833833
wait_seconds: int = 15,
834834
timeout_seconds: float | None = None,
835+
status: str | None = None,
835836
trace_id: str | None = None,
836837
) -> Iterator[dict[str, Any]]:
837838
"""Stream incoming intents for an agent address via SSE.
@@ -843,6 +844,10 @@ def listen(
843844
automatically reconnects until ``timeout_seconds`` elapses (or forever if
844845
``timeout_seconds`` is ``None``).
845846
847+
By default the server excludes terminal statuses (COMPLETED, FAILED,
848+
CANCELED, TIMED_OUT) so agents only receive actionable intents. Pass an
849+
explicit ``status`` filter to override this behaviour.
850+
846851
Args:
847852
address: Full ``agent://org/workspace/name`` or bare ``org/workspace/name``
848853
agent address to listen on.
@@ -854,6 +859,9 @@ def listen(
854859
new intents.
855860
timeout_seconds: Optional wall-clock timeout after which the method
856861
raises ``TimeoutError``. ``None`` means listen indefinitely.
862+
status: Comma-separated status filter passed to the server (e.g.
863+
``"CREATED,DELIVERED"``). When omitted the server applies its
864+
default filter (exclude terminal statuses).
857865
trace_id: Optional trace ID forwarded as ``X-Trace-Id``.
858866
859867
Yields:
@@ -895,6 +903,7 @@ def listen(
895903
path_part=path_part,
896904
since=next_since,
897905
wait_seconds=stream_wait_seconds,
906+
status=status,
898907
trace_id=trace_id,
899908
):
900909
seq = event.get("seq")
@@ -1670,11 +1679,18 @@ def _iter_intent_events_stream(
16701679
if normalized_trace_id is not None:
16711680
headers = {"X-Trace-Id": normalized_trace_id}
16721681

1682+
stream_timeout = httpx.Timeout(
1683+
connect=10.0,
1684+
read=float(wait_seconds) + 15.0,
1685+
write=10.0,
1686+
pool=10.0,
1687+
)
16731688
with self._http.stream(
16741689
"GET",
16751690
f"/v1/intents/{intent_id}/events/stream",
16761691
params={"since": str(since), "wait_seconds": str(wait_seconds)},
16771692
headers=headers,
1693+
timeout=stream_timeout,
16781694
) as response:
16791695
if response.status_code >= 400:
16801696
self._raise_http_error(response)
@@ -1710,18 +1726,31 @@ def _iter_agent_intents_stream(
17101726
path_part: str,
17111727
since: int,
17121728
wait_seconds: int,
1729+
status: str | None = None,
17131730
trace_id: str | None,
17141731
) -> Iterator[dict[str, Any]]:
17151732
headers: dict[str, str] | None = None
17161733
normalized_trace_id = self._normalize_trace_id(trace_id)
17171734
if normalized_trace_id is not None:
17181735
headers = {"X-Trace-Id": normalized_trace_id}
17191736

1737+
# SSE streams need a longer read timeout than regular API calls
1738+
# (server holds connection open for wait_seconds + processing time)
1739+
stream_timeout = httpx.Timeout(
1740+
connect=10.0,
1741+
read=float(wait_seconds) + 15.0,
1742+
write=10.0,
1743+
pool=10.0,
1744+
)
1745+
params: dict[str, str] = {"since": str(since), "wait_seconds": str(wait_seconds)}
1746+
if status:
1747+
params["status"] = status
17201748
with self._http.stream(
17211749
"GET",
17221750
f"/v1/agents/{path_part}/intents/stream",
1723-
params={"since": str(since), "wait_seconds": str(wait_seconds)},
1751+
params=params,
17241752
headers=headers,
1753+
timeout=stream_timeout,
17251754
) as response:
17261755
if response.status_code >= 400:
17271756
self._raise_http_error(response)
@@ -1876,6 +1905,12 @@ def _parse_json_response(self, response: httpx.Response) -> dict[str, Any]:
18761905
def _raise_http_error(self, response: httpx.Response) -> None:
18771906
body: Any | None
18781907
body = None
1908+
# response.read() is required when called inside a streaming context —
1909+
# without it, accessing .text or .json() raises httpx.ResponseNotRead.
1910+
try:
1911+
response.read()
1912+
except Exception:
1913+
pass
18791914
message = response.text
18801915
try:
18811916
body = response.json()

0 commit comments

Comments
 (0)