Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@
},
"provider_opts": {
"type": "object",
"description": "Provider-specific options. dmr: runtime_flags. anthropic/amazon-bedrock (Claude): interleaved_thinking (boolean, default true). openai/anthropic/google: rerank_prompt (string) to fully override the system prompt used for RAG reranking (advanced - prefer using results.reranking.criteria for domain-specific guidance).",
"description": "Provider-specific options. dmr: runtime_flags. anthropic/amazon-bedrock (Claude): interleaved_thinking (boolean, default true). openai: transport ('sse' or 'websocket') to choose between SSE and WebSocket streaming for the Responses API. openai/anthropic/google: rerank_prompt (string) to fully override the system prompt used for RAG reranking (advanced - prefer using results.reranking.criteria for domain-specific guidance).",
"additionalProperties": true
},
"track_usage": {
Expand Down
30 changes: 30 additions & 0 deletions docs/providers/openai/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,33 @@ models:
model: gpt-4o
base_url: https://your-proxy.example.com/v1
```

## WebSocket Transport

For OpenAI Responses API models (gpt-4.1+, o-series, gpt-5), you can use WebSocket streaming instead of the default SSE (Server-Sent Events):

```yaml
models:
fast-gpt:
provider: openai
model: gpt-4.1
provider_opts:
transport: websocket # Use WebSocket instead of SSE
```

### Benefits

- **~40% faster** for workflows with 20+ tool calls
- **Persistent connection** reduces per-turn overhead
- **Server-side caching** of connection state
- **Automatic fallback** to SSE if WebSocket fails

### Requirements

- Only works with Responses API models: `gpt-4.1+`, `o1`, `o3`, `o4`, `gpt-5`
- NOT compatible with `--gateway` flag (automatically falls back to SSE)
- Requires `OPENAI_API_KEY` environment variable

### Example

See [`examples/websocket_transport.yaml`]({{ '/examples/websocket_transport/' | relative_url }}) for a complete example.
42 changes: 42 additions & 0 deletions examples/websocket_transport.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env docker agent run

# Example: WebSocket Transport for OpenAI Responses API
#
# This example demonstrates how to use WebSocket streaming instead of
# Server-Sent Events (SSE) for the OpenAI Responses API.
#
# WebSocket transport maintains a persistent connection across tool-call
# rounds, reducing per-turn overhead and improving end-to-end latency
# for agentic workflows with many tool calls.
#
# Benefits of WebSocket over SSE:
# - ~40% faster end-to-end execution for workflows with 20+ tool calls
# - Persistent connection reduces per-turn continuation overhead
# - Connection-local state caching on the server
# - Falls back to SSE automatically if WebSocket connection fails
#
# Requirements:
# - Works only with OpenAI Responses API models (gpt-4.1+, o-series, gpt-5)
# - Requires OPENAI_API_KEY environment variable (or use token_key)
# - NOT compatible with --gateway flag (automatically falls back to SSE)
#
# Run with:
# docker agent run websocket_transport.yaml

models:
gpt-ws:
provider: openai
model: gpt-4.1
provider_opts:
transport: websocket # Use WebSocket instead of SSE

agents:
root:
model: gpt-ws
description: Assistant using WebSocket streaming
instruction: |
You are a helpful assistant. Answer questions concisely.
toolsets:
- type: shell # Real toolset for demonstrating multi-turn tool calls
commands:
demo: "List the files in the current directory, then count how many are YAML files"
114 changes: 103 additions & 11 deletions pkg/model/provider/openai/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log/slog"
"net/http"
"net/url"
"strings"

Expand All @@ -29,11 +30,15 @@ import (
"github.com/docker/docker-agent/pkg/tools"
)

// Client represents an OpenAI client wrapper
// It implements the provider.Provider interface
// Client represents an OpenAI client wrapper.
// It implements the provider.Provider interface.
type Client struct {
base.Config
clientFn func(context.Context) (*openai.Client, error)

// wsPool is initialized in NewClient when transport=websocket is configured.
// It maintains a persistent WebSocket connection across requests.
wsPool *wsPool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL: Resource Leak - WebSocket connections never closed

The wsPool field is initialized in createWebSocketStream() but the Client struct has no Close() method. This means WebSocket connections are never properly closed and will leak.

Impact:

  • Long-lived connections (55-min TTL) accumulate over time
  • TCP connections remain open until timeout
  • Multiple Client instances multiply the leak

Recommendation:
Add a Close() method to the Client struct that calls wsPool.Close() if the pool exists.

}

// NewClient creates a new OpenAI client from the provided configuration
Expand Down Expand Up @@ -139,14 +144,32 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro

slog.Debug("OpenAI client created successfully", "model", cfg.Model)

return &Client{
client := &Client{
Config: base.Config{
ModelConfig: *cfg,
ModelOptions: globalOptions,
Env: env,
},
clientFn: clientFn,
}, nil
}

// Pre-create the WebSocket pool when the transport is configured.
// The pool is cheap (no connections opened until the first Stream call)
// and eager init avoids a data race on the lazy path.
if getTransport(cfg) == "websocket" && globalOptions.Gateway() == "" {
baseURL := cmp.Or(cfg.BaseURL, "https://api.openai.com/v1")
client.wsPool = newWSPool(httpToWSURL(baseURL), client.buildWSHeaderFn())
}

return client, nil
}

// Close releases resources held by the client, including any pooled WebSocket
// connections. It is safe to call Close multiple times.
func (c *Client) Close() {
if c.wsPool != nil {
c.wsPool.Close()
}
}

// convertMessages converts chat.Message to openai.ChatCompletionMessageParamUnion
Expand Down Expand Up @@ -306,12 +329,6 @@ func (c *Client) CreateResponseStream(
return nil, errors.New("at least one message is required")
}

client, err := c.clientFn(ctx)
if err != nil {
slog.Error("Failed to create OpenAI client", "error", err)
return nil, err
}

input := convertMessagesToResponseInput(messages)

params := responses.ResponseNewParams{
Expand Down Expand Up @@ -397,10 +414,85 @@ func (c *Client) CreateResponseStream(
slog.Error("Failed to marshal OpenAI responses request to JSON", "error", err)
}

// Choose transport: WebSocket or SSE (default).
// WebSocket is disabled when using a Gateway since most gateways don't support it.
transport := getTransport(&c.ModelConfig)
trackUsage := c.ModelConfig.TrackUsage == nil || *c.ModelConfig.TrackUsage

if transport == "websocket" && c.ModelOptions.Gateway() == "" {
stream, err := c.createWebSocketStream(ctx, params)
if err != nil {
slog.Warn("WebSocket stream failed, falling back to SSE", "error", err)
// Fall through to SSE below.
} else {
slog.Debug("OpenAI responses WebSocket stream created successfully", "model", c.ModelConfig.Model)
return newResponseStreamAdapter(stream, trackUsage), nil
}
} else if transport == "websocket" {
slog.Debug("WebSocket transport requested but Gateway is configured, using SSE",
"model", c.ModelConfig.Model,
"gateway", c.ModelOptions.Gateway())
}

client, err := c.clientFn(ctx)
if err != nil {
slog.Error("Failed to create OpenAI client", "error", err)
return nil, err
}
stream := client.Responses.NewStreaming(ctx, params)

slog.Debug("OpenAI responses stream created successfully", "model", c.ModelConfig.Model)
return newResponseStreamAdapter(stream, c.ModelConfig.TrackUsage == nil || *c.ModelConfig.TrackUsage), nil
return newResponseStreamAdapter(stream, trackUsage), nil
}

// createWebSocketStream sends a request over the pre-initialized WebSocket
// pool, returning a responseEventStream.
func (c *Client) createWebSocketStream(
ctx context.Context,
params responses.ResponseNewParams,
) (responseEventStream, error) {
if c.wsPool == nil {
return nil, errors.New("websocket pool not initialized")
}

return c.wsPool.Stream(ctx, params)
}

// buildWSHeaderFn returns a function that produces the HTTP headers needed
// for the WebSocket handshake, including the Authorization header.
func (c *Client) buildWSHeaderFn() func(ctx context.Context) (http.Header, error) {
return func(ctx context.Context) (http.Header, error) {
h := http.Header{}

// Resolve the API key using the same logic as the HTTP client.
var apiKey string
if c.ModelConfig.TokenKey != "" {
apiKey, _ = c.Env.Get(ctx, c.ModelConfig.TokenKey)
}
if apiKey == "" {
// Fall back to the standard OPENAI_API_KEY env var via the
// environment provider so that secret resolution is
// consistent with the HTTP client path.
apiKey, _ = c.Env.Get(ctx, "OPENAI_API_KEY")
}
if apiKey != "" {
h.Set("Authorization", "Bearer "+apiKey)
}

return h, nil
}
}

// getTransport returns the streaming transport preference from ProviderOpts.
// Valid values are "sse" (default) and "websocket".
func getTransport(cfg *latest.ModelConfig) string {
if cfg == nil || cfg.ProviderOpts == nil {
return "sse"
}
if t, ok := cfg.ProviderOpts["transport"].(string); ok {
return strings.ToLower(t)
}
return "sse"
}

func convertMessagesToResponseInput(messages []chat.Message) []responses.ResponseInputItemUnionParam {
Expand Down
23 changes: 23 additions & 0 deletions pkg/model/provider/openai/event_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package openai

import "github.com/openai/openai-go/v3/responses"

// responseEventStream abstracts over SSE and WebSocket transports for
// streaming Responses API events.
//
// The ssestream.Stream[responses.ResponseStreamEventUnion] type already
// satisfies this interface, so it can be used directly.
type responseEventStream interface {
// Next advances the stream to the next event.
// Returns false when the stream is exhausted or an error occurred.
Next() bool

// Current returns the most recently decoded event.
Current() responses.ResponseStreamEventUnion

// Err returns the first non-EOF error encountered by the stream.
Err() error

// Close releases resources held by the stream.
Close() error
}
12 changes: 8 additions & 4 deletions pkg/model/provider/openai/response_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ import (
"github.com/docker/docker-agent/pkg/tools"
)

// ResponseStreamAdapter adapts the OpenAI responses stream to our interface
// Compile-time check: ssestream.Stream satisfies responseEventStream.
var _ responseEventStream = (*ssestream.Stream[responses.ResponseStreamEventUnion])(nil)

// ResponseStreamAdapter adapts the OpenAI responses stream to our interface.
// It works with any responseEventStream implementation (SSE or WebSocket).
type ResponseStreamAdapter struct {
stream *ssestream.Stream[responses.ResponseStreamEventUnion]
stream responseEventStream
trackUsage bool
itemCallIDMap map[string]string
itemHasContent map[string]bool
}

func newResponseStreamAdapter(stream *ssestream.Stream[responses.ResponseStreamEventUnion], trackUsage bool) *ResponseStreamAdapter {
func newResponseStreamAdapter(stream responseEventStream, trackUsage bool) *ResponseStreamAdapter {
return &ResponseStreamAdapter{
stream: stream,
trackUsage: trackUsage,
Expand Down Expand Up @@ -254,5 +258,5 @@ func (a *ResponseStreamAdapter) Recv() (chat.MessageStreamResponse, error) {

// Close closes the stream
func (a *ResponseStreamAdapter) Close() {
a.stream.Close()
_ = a.stream.Close()
}
Loading
Loading