Skip to content

Add WebSocket transport for OpenAI Responses API streaming#2186

Open
dgageot wants to merge 4 commits intodocker:mainfrom
dgageot:ws
Open

Add WebSocket transport for OpenAI Responses API streaming#2186
dgageot wants to merge 4 commits intodocker:mainfrom
dgageot:ws

Conversation

@dgageot
Copy link
Member

@dgageot dgageot commented Mar 19, 2026

Introduce an optional WebSocket transport as an alternative to SSE for the OpenAI Responses API. Users can enable it via provider_opts:

models:
  gpt-ws:
    provider: openai
    model: gpt-4.1
    provider_opts:
      transport: websocket  # Use WebSocket instead of SSE

Key changes:

  • Add responseEventStream interface to abstract SSE and WebSocket transports
  • Refactor ResponseStreamAdapter to accept any responseEventStream
  • Implement wsStream (WebSocket transport) and wsPool (connection pool with 55-min TTL, auto-reconnect, and lastResponseID tracking)
  • Integrate WebSocket path in CreateResponseStream with automatic SSE fallback on connection failure
  • No new dependencies (reuses existing gorilla/websocket)

The existing ResponseStreamAdapter.Recv() logic is fully reused since WebSocket events use the same JSON schema as SSE events.

Assisted-By: docker-agent

Introduce an optional WebSocket transport as an alternative to SSE for
the OpenAI Responses API. Users can enable it via provider_opts:

  provider_opts:
    transport: websocket

Key changes:
- Add responseEventStream interface to abstract SSE and WebSocket transports
- Refactor ResponseStreamAdapter to accept any responseEventStream
- Implement wsStream (WebSocket transport) and wsPool (connection pool
  with 55-min TTL, auto-reconnect, and lastResponseID tracking)
- Integrate WebSocket path in CreateResponseStream with automatic SSE
  fallback on connection failure
- No new dependencies (reuses existing gorilla/websocket)

The existing ResponseStreamAdapter.Recv() logic is fully reused since
WebSocket events use the same JSON schema as SSE events.

Assisted-By: docker-agent
@dgageot dgageot requested a review from a team as a code owner March 19, 2026 20:09
Copy link

@docker-agent docker-agent bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assessment: 🔴 CRITICAL

Found 2 critical and 4 notable issues in the WebSocket transport implementation.

Critical Issues:

  • Resource leak: wsPool never closed
  • Broken connections reused from pool

Notable Issues:

  • Server-side caching broken on reconnection
  • Infinite retry loop on auth failures
  • Architecture inconsistency with environment.Provider
  • Missing CloseNoStatusReceived handling

The core WebSocket functionality works, but these issues need addressing before production use.


// wsPool is lazily initialized when transport=websocket is configured.
// It maintains a persistent WebSocket connection across requests.
wsPool *wsPool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL: Resource Leak - WebSocket connections never closed

The wsPool field is initialized in createWebSocketStream() but the Client struct has no Close() method. This means WebSocket connections are never properly closed and will leak.

Impact:

  • Long-lived connections (55-min TTL) accumulate over time
  • TCP connections remain open until timeout
  • Multiple Client instances multiply the leak

Recommendation:
Add a Close() method to the Client struct that calls wsPool.Close() if the pool exists.

}
p.conn = &wsConnection{
conn: stream.conn,
createdAt: time.Now(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Performance Issue: Server-side caching broken on reconnection

When a connection breaks and reconnects (line 120), the new wsConnection is created with lastResponseID set to empty string, losing the previous response context.

Impact:

  • OpenAI's server-side caching cannot resume after reconnection
  • Defeats the performance benefit of WebSocket over SSE

Recommendation:
Preserve lastResponseID from the old connection before discarding it:

oldResponseID := ""
if p.conn != nil {
    oldResponseID = p.conn.lastResponseID
}
// ... reconnect ...
p.conn.lastResponseID = oldResponseID

_ = p.conn.conn.Close()
p.conn = nil

headers, err2 := p.headerFn(ctx)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Error Handling: Infinite retry loop on headerFn failure

If headerFn(ctx) fails consistently (e.g., invalid token_key), the pool will retry indefinitely with no backoff or circuit breaker.

Impact:

  • Wasted CPU and network resources
  • Poor user experience (repeated failures instead of fast-fail)

Recommendation:
Track consecutive header failures and return early after N attempts, or cache the error for a short duration.


// Close releases the stream but keeps the underlying connection alive in
// the pool for reuse.
func (s *pooledStream) Close() error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL: Broken connections reused, causing cascading failures

pooledStream.Close() deliberately keeps the connection in the pool (line 199 comment), but if the stream encountered an error before completion, the broken connection will be reused by the next request.

Impact:

  • Next Stream() call will fail immediately
  • Triggers unnecessary reconnection overhead
  • Can cause request failures if reconnection also fails

Recommendation:
Check s.inner.Err() before closing. If non-nil, invalidate the connection:

if s.inner.Err() != nil {
    s.pool.mu.Lock()
    if s.pool.conn != nil && s.pool.conn.conn == s.inner.conn {
        _ = s.pool.conn.conn.Close()
        s.pool.conn = nil
    }
    s.pool.mu.Unlock()
}

}
if apiKey == "" {
// Fall back to the standard OPENAI_API_KEY env var.
apiKey = os.Getenv("OPENAI_API_KEY")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Architecture: Bypasses environment.Provider abstraction

The fallback to os.Getenv("OPENAI_API_KEY") bypasses the environment.Provider abstraction, which supports Docker Desktop keychain, env files, and sandboxed tokens.

Impact:

  • Inconsistent credential resolution
  • Misses security features of environment.Provider

Recommendation:
Remove the os.Getenv fallback and rely solely on c.Env.Get(). If OPENAI_API_KEY should be supported, register it in the environment.Provider configuration.

_, data, err := s.conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Error Handling: Valid close condition treated as error

The code only checks for CloseNormalClosure and CloseGoingAway, but not CloseNoStatusReceived (code 1005), which is common when servers timeout idle connections.

Impact:

  • Unnecessary error logging for normal server-side cleanup
  • May confuse debugging

Recommendation:
Add CloseNoStatusReceived to the close error check:

if websocket.IsCloseError(err,
    websocket.CloseNormalClosure,
    websocket.CloseGoingAway,
    websocket.CloseNoStatusReceived,
) {

dgageot added 3 commits March 19, 2026 22:50
- Add Client.Close() to release pooled WebSocket connections
- Invalidate broken connections in pooledStream.Close() instead of
  returning dead sockets to the pool
- Preserve lastResponseID across reconnections (expired + broken) so
  server-side context caching survives connection resets
- Add wsMaxReconnectAttempts constant with bounded retry loop to prevent
  unbounded reconnection attempts
- Replace os.Getenv("OPENAI_API_KEY") with c.Env.Get() for consistent
  secret resolution via the environment provider
- Treat websocket.CloseNoStatusReceived as a normal close condition

Assisted-By: docker-agent
- Promote lastResponseID from wsConnection to wsPool so it naturally
  survives all connection transitions without manual threading
- Extract closeLocked(), dialLocked(), invalidateConn() helpers to
  eliminate duplicated connection lifecycle logic in Stream()
- Replace loop-of-one reconnect with a single dialLocked() call
- Extract sendResponseCreate() to deduplicate marshal+send between
  dialWebSocket() and sendOnExisting()
- Remove wsMaxReconnectAttempts constant (was always 1)
- Simplify wsConnection struct to just conn + createdAt

Net result: -18 lines, fewer code paths, same behavior.

Assisted-By: docker-agent
- Initialize wsPool eagerly in NewClient instead of lazily in
  createWebSocketStream to eliminate a potential data race when
  concurrent goroutines both see wsPool==nil
- Downgrade WebSocket→SSE fallback log from Error to Warn since this
  is an intentional graceful degradation, not an unexpected error
- Close HTTP response body defensively in dialWebSocket on handshake
  failure to prevent a potential resource leak

Assisted-By: docker-agent
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant