diff --git a/README.md b/README.md index a308495..f70469f 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ standard library. | [`conditions`](./conditions) | Conditional- and range-request value types (ETag, Range, Conditions). | | [`config`](./config) | Layered override → environment → default settings resolver; non-failing typed getters. | | [`serde`](./serde) | Serialization seam (Marshaler/Unmarshaler) with a JSON default, plus Tristate for PATCH payloads. | -| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser + reconnecting Stream (Last-Event-ID replay). | +| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser + reconnecting Stream (Last-Event-ID replay); `Client.EventStream` runs it through the pipeline. | | [`jsonl`](./jsonl) | JSON Lines / NDJSON streaming decoder (`iter.Seq2`). | | [`webhook`](./webhook) | Inbound webhook signature verification (constant-time HMAC + timestamp tolerance). | | [`formdata`](./formdata) | Multipart/form-data request body builder (replayable; file uploads). | @@ -118,6 +118,26 @@ stripped and query values are redacted unless allowlisted with `DEXPACE_RETRY_BASE_DELAY`, `DEXPACE_HTTP_TIMEOUT` (default transport only) — for settings not set explicitly; explicit options always win. +### Streaming + +`Client.EventStream(ctx, req, opts...)` returns an `iter.Seq2[sse.Event, error]` +reconnecting Server-Sent Events stream that runs through the pipeline — every +connection clones `req`, sets `Accept: text/event-stream`, and replays the +`Last-Event-ID` after a mid-stream interruption, so auth, logging, and tracing +run per connection. A non-2xx response or a transport failure on connect ends +the stream with that error; cancel the request context to stop. + +```go +req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.example.com/v1/events", nil) +for ev, err := range client.EventStream(ctx, req) { + if err != nil { + log.Printf("stream ended: %v", err) + break + } + fmt.Println(ev.Data) +} +``` + ## Requirements Go **1.26+**. The module targets modern idioms: generics, range-over-func diff --git a/doc.go b/doc.go index 1cfbdbd..659bc22 100644 --- a/doc.go +++ b/doc.go @@ -65,7 +65,8 @@ // // The sse package parses Server-Sent Events (text/event-stream) into a // range-over-func iterator of events, with a reconnecting Stream that replays the -// Last-Event-ID. +// Last-Event-ID. Client.EventStream wires that reconnecting stream through the +// pipeline, so every connection runs the configured policies. // // The webhook package verifies inbound webhook signatures (constant-time HMAC // with a timestamp-tolerance window). diff --git a/docs/superpowers/specs/2026-06-16-client-eventstream-design.md b/docs/superpowers/specs/2026-06-16-client-eventstream-design.md new file mode 100644 index 0000000..9322dbf --- /dev/null +++ b/docs/superpowers/specs/2026-06-16-client-eventstream-design.md @@ -0,0 +1,93 @@ +# Client-integrated SSE stream — design + +**Date:** 2026-06-16 +**Status:** Approved (standing delegation); ready for implementation +**Subsystem:** deferred-feature #6 (the `dexpace.Client`-integrated SSE client noted as out-of-scope in the SSE reconnect spec) + +## Context + +`sse.Stream(ctx, ConnectFunc, …)` reconnects, replays `Last-Event-ID`, and honours +server `retry` backoff, but the caller must wire the HTTP themselves via a +`ConnectFunc`. The common case — "stream this endpoint through my configured +client" — should be one call. This adds `Client.EventStream`, which builds the +`ConnectFunc` from the client pipeline so events flow through auth, logging, +tracing, retry, and the rest exactly like any other request. + +## Decisions + +1. **A method on `Client`, not a new type.** `EventStream(ctx, req, opts...) + iter.Seq2[sse.Event, error]` mirrors `Client.Do` (caller supplies a standard + `*http.Request`) and returns the same iterator shape as `sse.Parse`/`Stream`. +2. **Each (re)connect clones the request** with `req.Clone(ctx)`, sets + `Accept: text/event-stream` (unless the caller set one), adds the + `Last-Event-ID` header once an id has been seen, and sends it through `c.Do` — + so the full policy stack runs per connection (token refresh, per-connect logs). +3. **Connect status check.** A non-2xx response ends the stream with an error + (`sse.Stream` treats connect errors as terminal); the body is drained and + closed first. A transport error from `c.Do` is returned as-is. +4. **Replayable bodies.** SSE is normally `GET` (no body). When the request has a + body, the clone resets it via `req.GetBody` so reconnects re-send it; a + non-replayable body surfaces an error on the second connect. +5. **`opts ...sse.StreamOption` pass through** to `sse.Stream` (e.g. + `WithReconnectDelay`). + +## Architecture + +### `header` addition +`header.LastEventID = "Last-Event-Id"` (canonical form of `Last-Event-ID`). + +### `eventstream.go` (package `dexpace`) + +```go +// EventStream opens a reconnecting Server-Sent Events stream for req and yields +// decoded events through the client pipeline. Each connection clones req, sets +// Accept: text/event-stream (unless already set) and, after the first event id, +// the Last-Event-ID header, then sends it via Do. A non-2xx response or a +// transport failure on connect ends the stream with that error; a mid-stream +// interruption reconnects transparently with the most recent event id. Cancel the +// request context to stop. The iterator is single-pass. +func (c *Client) EventStream(ctx context.Context, req *http.Request, opts ...sse.StreamOption) iter.Seq2[sse.Event, error] +``` + +The `ConnectFunc` clones `req` with the connect ctx, resets a replayable body via +`req.GetBody`, sets `Accept` and `Last-Event-ID`, calls `c.Do`, closes the body and +returns an error on a transport failure or non-2xx status, else returns +`resp.Body`. + +## Edge cases + +- Caller-set `Accept` is preserved (some servers want a custom accept). +- Non-2xx connect → drained, closed, terminal error with the status. +- Transport error → returned; with the default retry policy, transient connect + failures are already retried inside `c.Do` before `sse.Stream` sees them. +- Body present but non-replayable → error on reconnect (documented; SSE is + normally bodyless). +- Context cancel → `sse.Stream` stops without a further connect (existing + behaviour). + +## Package layout + +| Path | Change | +|---|---| +| `header/header.go` (modify) | add `LastEventID` | +| `eventstream.go` (new, package dexpace) | `Client.EventStream` | +| `eventstream_test.go` (new, `dexpace_test`) | flatten/reconnect, Last-Event-ID, Accept, non-2xx | +| `doc.go`, `README.md` | document | + +## Testing + +- **Flatten + reconnect**: a stub transporter returns two SSE bodies then a + terminal error, with `WithReconnectDelay(0)`; assert events `a,b,c` in order + then the error. +- **Last-Event-ID replay**: events carry ids; assert the header sent on the second + connect equals the last dispatched id. +- **Accept header**: every connect carries `Accept: text/event-stream`. +- **Non-2xx connect**: a 503 first response yields a terminal error and no events. +- Retry disabled in tests (`WithRetry(retry.Options{MaxRetries: -1})`) for + deterministic connect counts. Table-driven where natural, parallel; stdlib-only; + `gofmt`/`go vet`/`go test -race` clean. + +## Out of scope (deferred) + +- A typed event-decoding helper (callers decode `Event.Data` with `serde`/`jsonl`). +- Automatic `GET`-forcing or body stripping — the caller controls the request. diff --git a/eventstream.go b/eventstream.go new file mode 100644 index 0000000..126fdd5 --- /dev/null +++ b/eventstream.go @@ -0,0 +1,63 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package dexpace + +import ( + "context" + "fmt" + "io" + "iter" + "net/http" + + "github.com/dexpace/go-sdk/header" + "github.com/dexpace/go-sdk/mediatype" + "github.com/dexpace/go-sdk/sse" +) + +// EventStream opens a reconnecting Server-Sent Events stream for req and yields +// decoded events through the client pipeline. Each connection clones req, sets +// Accept: text/event-stream (unless the caller already set Accept) and, after the +// first event id is seen, the Last-Event-ID header, then sends it through Do — so +// auth, logging, tracing, and the rest run per connection. +// +// A non-2xx response or a transport failure on connect ends the stream with that +// error (a connect error is terminal). A mid-stream interruption reconnects +// transparently with the most recent event id. When req carries a body it must be +// replayable (req.GetBody set, as net/http does for in-memory bodies) so reconnects +// can re-send it; SSE is normally a bodyless GET. Cancel the request context to +// stop. The iterator is single-pass. Pass sse.StreamOption values (for example +// sse.WithReconnectDelay) to configure reconnection. +func (c *Client) EventStream(ctx context.Context, req *http.Request, opts ...sse.StreamOption) iter.Seq2[sse.Event, error] { + connect := func(ctx context.Context, lastEventID string) (io.ReadCloser, error) { + r := req.Clone(ctx) + if req.Body != nil && req.GetBody != nil { + body, err := req.GetBody() + if err != nil { + return nil, fmt.Errorf("dexpace: rewind event-stream request body: %w", err) + } + r.Body = body + } + if r.Header.Get(header.Accept) == "" { + r.Header.Set(header.Accept, mediatype.TextEventStream.Essence()) + } + if lastEventID != "" { + r.Header.Set(header.LastEventID, lastEventID) + } + + resp, err := c.Do(r) + if err != nil { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + return nil, err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 4096)) + _ = resp.Body.Close() + return nil, fmt.Errorf("dexpace: event stream connect: unexpected status %s", resp.Status) + } + return resp.Body, nil + } + return sse.Stream(ctx, connect, opts...) +} diff --git a/eventstream_test.go b/eventstream_test.go new file mode 100644 index 0000000..1d9bae8 --- /dev/null +++ b/eventstream_test.go @@ -0,0 +1,153 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package dexpace_test + +import ( + "context" + "errors" + "io" + "net/http" + "strings" + "sync" + "testing" + + dexpace "github.com/dexpace/go-sdk" + "github.com/dexpace/go-sdk/retry" + "github.com/dexpace/go-sdk/sse" +) + +// sseStub is a stub Transporter that returns scripted responses and records the +// Last-Event-Id and Accept headers seen on each connect. +type sseStub struct { + mu sync.Mutex + calls int + lastEventIDs []string + accepts []string + responses []func() (*http.Response, error) +} + +func (s *sseStub) Do(req *http.Request) (*http.Response, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.lastEventIDs = append(s.lastEventIDs, req.Header.Get("Last-Event-Id")) + s.accepts = append(s.accepts, req.Header.Get("Accept")) + i := s.calls + s.calls++ + if i < len(s.responses) { + return s.responses[i]() + } + return nil, errors.New("sseStub: no more scripted responses") +} + +func sseBody(status int, body string) func() (*http.Response, error) { + return func() (*http.Response, error) { + return &http.Response{ + StatusCode: status, + Status: http.StatusText(status), + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(body)), + }, nil + } +} + +func newEventStreamClient(stub *sseStub) *dexpace.Client { + return dexpace.New( + dexpace.WithTransport(stub), + dexpace.WithRetry(retry.Options{MaxRetries: -1}), // deterministic connect counts + ) +} + +func TestEventStreamReconnectAndFlatten(t *testing.T) { + t.Parallel() + stub := &sseStub{responses: []func() (*http.Response, error){ + sseBody(200, "id: 1\ndata: a\n\nid: 2\ndata: b\n\n"), + sseBody(200, "id: 3\ndata: c\n\n"), + func() (*http.Response, error) { return nil, errors.New("stop") }, + }} + client := newEventStreamClient(stub) + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://example.test/stream", nil) + if err != nil { + t.Fatal(err) + } + + var data []string + var gotErr error + for ev, err := range client.EventStream(context.Background(), req, sse.WithReconnectDelay(0)) { + if err != nil { + gotErr = err + break + } + data = append(data, ev.Data) + } + if strings.Join(data, ",") != "a,b,c" { + t.Fatalf("events = %v, want [a b c]", data) + } + if gotErr == nil || !strings.Contains(gotErr.Error(), "stop") { + t.Fatalf("final error = %v, want the terminal connect error", gotErr) + } +} + +func TestEventStreamLastEventIDAndAccept(t *testing.T) { + t.Parallel() + stub := &sseStub{responses: []func() (*http.Response, error){ + sseBody(200, "id: 1\ndata: a\n\nid: 2\ndata: b\n\n"), + sseBody(200, "id: 3\ndata: c\n\n"), + func() (*http.Response, error) { return nil, errors.New("stop") }, + }} + client := newEventStreamClient(stub) + req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://example.test/stream", nil) + + for _, err := range client.EventStream(context.Background(), req, sse.WithReconnectDelay(0)) { + if err != nil { + break + } + } + + stub.mu.Lock() + defer stub.mu.Unlock() + if len(stub.lastEventIDs) < 3 { + t.Fatalf("got %d connects, want >= 3", len(stub.lastEventIDs)) + } + if stub.lastEventIDs[0] != "" { + t.Fatalf("first connect Last-Event-Id = %q, want empty", stub.lastEventIDs[0]) + } + if stub.lastEventIDs[1] != "2" { + t.Fatalf("second connect Last-Event-Id = %q, want 2", stub.lastEventIDs[1]) + } + if stub.lastEventIDs[2] != "3" { + t.Fatalf("third connect Last-Event-Id = %q, want 3", stub.lastEventIDs[2]) + } + for i, a := range stub.accepts { + if a != "text/event-stream" { + t.Fatalf("connect %d Accept = %q, want text/event-stream", i, a) + } + } +} + +func TestEventStreamNon2xxIsTerminal(t *testing.T) { + t.Parallel() + stub := &sseStub{responses: []func() (*http.Response, error){ + sseBody(503, "unavailable"), + }} + client := newEventStreamClient(stub) + req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://example.test/stream", nil) + + var events int + var gotErr error + for ev, err := range client.EventStream(context.Background(), req, sse.WithReconnectDelay(0)) { + if err != nil { + gotErr = err + break + } + _ = ev + events++ + } + if events != 0 { + t.Fatalf("got %d events, want 0 on a non-2xx connect", events) + } + if gotErr == nil { + t.Fatal("want a terminal error on a non-2xx connect") + } +} diff --git a/header/header.go b/header/header.go index 901162d..6427a8d 100644 --- a/header/header.go +++ b/header/header.go @@ -25,6 +25,7 @@ const ( IfModifiedSince = "If-Modified-Since" IfNoneMatch = "If-None-Match" IfUnmodifiedSince = "If-Unmodified-Since" + LastEventID = "Last-Event-Id" // canonical form of "Last-Event-ID" Location = "Location" Range = "Range" RetryAfter = "Retry-After"