Add WebSocket transport for OpenAI Responses API streaming#2186
Add WebSocket transport for OpenAI Responses API streaming#2186dgageot wants to merge 4 commits intodocker:mainfrom
Conversation
Introduce an optional WebSocket transport as an alternative to SSE for
the OpenAI Responses API. Users can enable it via provider_opts:
provider_opts:
transport: websocket
Key changes:
- Add responseEventStream interface to abstract SSE and WebSocket transports
- Refactor ResponseStreamAdapter to accept any responseEventStream
- Implement wsStream (WebSocket transport) and wsPool (connection pool
with 55-min TTL, auto-reconnect, and lastResponseID tracking)
- Integrate WebSocket path in CreateResponseStream with automatic SSE
fallback on connection failure
- No new dependencies (reuses existing gorilla/websocket)
The existing ResponseStreamAdapter.Recv() logic is fully reused since
WebSocket events use the same JSON schema as SSE events.
Assisted-By: docker-agent
There was a problem hiding this comment.
Assessment: 🔴 CRITICAL
Found 2 critical and 4 notable issues in the WebSocket transport implementation.
Critical Issues:
- Resource leak: wsPool never closed
- Broken connections reused from pool
Notable Issues:
- Server-side caching broken on reconnection
- Infinite retry loop on auth failures
- Architecture inconsistency with environment.Provider
- Missing CloseNoStatusReceived handling
The core WebSocket functionality works, but these issues need addressing before production use.
|
|
||
| // wsPool is lazily initialized when transport=websocket is configured. | ||
| // It maintains a persistent WebSocket connection across requests. | ||
| wsPool *wsPool |
There was a problem hiding this comment.
🔴 CRITICAL: Resource Leak - WebSocket connections never closed
The wsPool field is initialized in createWebSocketStream() but the Client struct has no Close() method. This means WebSocket connections are never properly closed and will leak.
Impact:
- Long-lived connections (55-min TTL) accumulate over time
- TCP connections remain open until timeout
- Multiple Client instances multiply the leak
Recommendation:
Add a Close() method to the Client struct that calls wsPool.Close() if the pool exists.
pkg/model/provider/openai/ws_pool.go
Outdated
| } | ||
| p.conn = &wsConnection{ | ||
| conn: stream.conn, | ||
| createdAt: time.Now(), |
There was a problem hiding this comment.
🟡 Performance Issue: Server-side caching broken on reconnection
When a connection breaks and reconnects (line 120), the new wsConnection is created with lastResponseID set to empty string, losing the previous response context.
Impact:
- OpenAI's server-side caching cannot resume after reconnection
- Defeats the performance benefit of WebSocket over SSE
Recommendation:
Preserve lastResponseID from the old connection before discarding it:
oldResponseID := ""
if p.conn != nil {
oldResponseID = p.conn.lastResponseID
}
// ... reconnect ...
p.conn.lastResponseID = oldResponseID
pkg/model/provider/openai/ws_pool.go
Outdated
| _ = p.conn.conn.Close() | ||
| p.conn = nil | ||
|
|
||
| headers, err2 := p.headerFn(ctx) |
There was a problem hiding this comment.
🟡 Error Handling: Infinite retry loop on headerFn failure
If headerFn(ctx) fails consistently (e.g., invalid token_key), the pool will retry indefinitely with no backoff or circuit breaker.
Impact:
- Wasted CPU and network resources
- Poor user experience (repeated failures instead of fast-fail)
Recommendation:
Track consecutive header failures and return early after N attempts, or cache the error for a short duration.
|
|
||
| // Close releases the stream but keeps the underlying connection alive in | ||
| // the pool for reuse. | ||
| func (s *pooledStream) Close() error { |
There was a problem hiding this comment.
🔴 CRITICAL: Broken connections reused, causing cascading failures
pooledStream.Close() deliberately keeps the connection in the pool (line 199 comment), but if the stream encountered an error before completion, the broken connection will be reused by the next request.
Impact:
- Next
Stream()call will fail immediately - Triggers unnecessary reconnection overhead
- Can cause request failures if reconnection also fails
Recommendation:
Check s.inner.Err() before closing. If non-nil, invalidate the connection:
if s.inner.Err() != nil {
s.pool.mu.Lock()
if s.pool.conn != nil && s.pool.conn.conn == s.inner.conn {
_ = s.pool.conn.conn.Close()
s.pool.conn = nil
}
s.pool.mu.Unlock()
}
pkg/model/provider/openai/client.go
Outdated
| } | ||
| if apiKey == "" { | ||
| // Fall back to the standard OPENAI_API_KEY env var. | ||
| apiKey = os.Getenv("OPENAI_API_KEY") |
There was a problem hiding this comment.
🟡 Architecture: Bypasses environment.Provider abstraction
The fallback to os.Getenv("OPENAI_API_KEY") bypasses the environment.Provider abstraction, which supports Docker Desktop keychain, env files, and sandboxed tokens.
Impact:
- Inconsistent credential resolution
- Misses security features of environment.Provider
Recommendation:
Remove the os.Getenv fallback and rely solely on c.Env.Get(). If OPENAI_API_KEY should be supported, register it in the environment.Provider configuration.
| _, data, err := s.conn.ReadMessage() | ||
| if err != nil { | ||
| if websocket.IsCloseError(err, | ||
| websocket.CloseNormalClosure, |
There was a problem hiding this comment.
🟡 Error Handling: Valid close condition treated as error
The code only checks for CloseNormalClosure and CloseGoingAway, but not CloseNoStatusReceived (code 1005), which is common when servers timeout idle connections.
Impact:
- Unnecessary error logging for normal server-side cleanup
- May confuse debugging
Recommendation:
Add CloseNoStatusReceived to the close error check:
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
) {- Add Client.Close() to release pooled WebSocket connections
- Invalidate broken connections in pooledStream.Close() instead of
returning dead sockets to the pool
- Preserve lastResponseID across reconnections (expired + broken) so
server-side context caching survives connection resets
- Add wsMaxReconnectAttempts constant with bounded retry loop to prevent
unbounded reconnection attempts
- Replace os.Getenv("OPENAI_API_KEY") with c.Env.Get() for consistent
secret resolution via the environment provider
- Treat websocket.CloseNoStatusReceived as a normal close condition
Assisted-By: docker-agent
- Promote lastResponseID from wsConnection to wsPool so it naturally survives all connection transitions without manual threading - Extract closeLocked(), dialLocked(), invalidateConn() helpers to eliminate duplicated connection lifecycle logic in Stream() - Replace loop-of-one reconnect with a single dialLocked() call - Extract sendResponseCreate() to deduplicate marshal+send between dialWebSocket() and sendOnExisting() - Remove wsMaxReconnectAttempts constant (was always 1) - Simplify wsConnection struct to just conn + createdAt Net result: -18 lines, fewer code paths, same behavior. Assisted-By: docker-agent
- Initialize wsPool eagerly in NewClient instead of lazily in createWebSocketStream to eliminate a potential data race when concurrent goroutines both see wsPool==nil - Downgrade WebSocket→SSE fallback log from Error to Warn since this is an intentional graceful degradation, not an unexpected error - Close HTTP response body defensively in dialWebSocket on handshake failure to prevent a potential resource leak Assisted-By: docker-agent
Introduce an optional WebSocket transport as an alternative to SSE for the OpenAI Responses API. Users can enable it via provider_opts:
Key changes:
The existing ResponseStreamAdapter.Recv() logic is fully reused since WebSocket events use the same JSON schema as SSE events.
Assisted-By: docker-agent