Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4412faf
feat(push): add SSE push message type definitions
TerrifiedBug Mar 11, 2026
afb94c9
feat(push): add SSE push connection registry with keepalive
TerrifiedBug Mar 11, 2026
75a53d7
feat(push): add SSE streaming route handler at /api/agent/push
TerrifiedBug Mar 11, 2026
af575de
refactor(fleet): migrate from wsRegistry to pushRegistry
TerrifiedBug Mar 11, 2026
d2f1ba6
refactor(deploy): migrate from wsRegistry to pushRegistry
TerrifiedBug Mar 11, 2026
d76eef7
refactor(pipeline): migrate from wsRegistry to pushRegistry
TerrifiedBug Mar 11, 2026
e53cc25
refactor(deploy-agent): migrate from wsRegistry to pushRegistry
TerrifiedBug Mar 11, 2026
656ae00
refactor(fleet-ui): rename WS badge to Live, update field to pushConn…
TerrifiedBug Mar 11, 2026
35275f4
refactor(config): replace websocketUrl with pushUrl for SSE
TerrifiedBug Mar 11, 2026
dd25e89
feat(agent): add SSE push message types
TerrifiedBug Mar 11, 2026
6395204
feat(agent): add SSE push client with reconnection and backoff
TerrifiedBug Mar 11, 2026
126b832
test(agent): add SSE push client integration tests
TerrifiedBug Mar 11, 2026
83a559f
refactor(agent): swap ws.Client for push.Client in agent main loop
TerrifiedBug Mar 11, 2026
96a87f5
refactor(agent): rename websocketUrl to pushUrl in poller
TerrifiedBug Mar 11, 2026
ce7e87d
refactor(agent): rename WebSocketURL to PushURL in config response
TerrifiedBug Mar 11, 2026
de98445
cleanup: delete WebSocket server wrapper and WS service files
TerrifiedBug Mar 11, 2026
848bbf2
cleanup: delete WebSocket Go client package
TerrifiedBug Mar 11, 2026
3df4d7f
cleanup: remove gorilla/websocket dependency
TerrifiedBug Mar 11, 2026
aaa01b9
cleanup: remove ws, @types/ws, esbuild, tsx dependencies; revert dev …
TerrifiedBug Mar 11, 2026
121f46c
cleanup: revert Dockerfile to standard Next.js standalone build
TerrifiedBug Mar 11, 2026
821540c
docs: update WebSocket references to SSE in agent docs and Go comments
TerrifiedBug Mar 11, 2026
dd82ba4
fix(push): reset backoff after stable connection, remove unused const…
TerrifiedBug Mar 11, 2026
e8abd29
fix(push): add X-Accel-Buffering header and HTTP response-header timeout
TerrifiedBug Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pnpm-approve-builds.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
["@prisma/engines","esbuild","prisma"]
["@prisma/engines","prisma"]
2 changes: 0 additions & 2 deletions agent/go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module github.com/TerrifiedBug/vectorflow/agent

go 1.22

require github.com/gorilla/websocket v1.5.3
2 changes: 0 additions & 2 deletions agent/go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
51 changes: 25 additions & 26 deletions agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/TerrifiedBug/vectorflow/agent/internal/config"
"github.com/TerrifiedBug/vectorflow/agent/internal/sampler"
"github.com/TerrifiedBug/vectorflow/agent/internal/supervisor"
"github.com/TerrifiedBug/vectorflow/agent/internal/ws"
"github.com/TerrifiedBug/vectorflow/agent/internal/push"
)

var Version = "dev"
Expand All @@ -34,8 +34,8 @@ type Agent struct {
failedUpdateVersion string // skip retries for this version
updateError string // report failure to server

wsClient *ws.Client
wsCh chan ws.PushMessage
pushClient *push.Client
pushCh chan push.PushMessage
immediateHeartbeat *time.Timer
immediateHeartbeatCh chan struct{}
}
Expand Down Expand Up @@ -91,20 +91,19 @@ func (a *Agent) Run() error {
// Do first poll immediately
a.pollAndApply()

// Start WebSocket if the server provided a URL
a.wsCh = make(chan ws.PushMessage, 16)
// Start SSE push client if the server provided a URL
a.pushCh = make(chan push.PushMessage, 16)
a.immediateHeartbeatCh = make(chan struct{}, 1)
if wsURL := a.poller.WebSocketURL(); wsURL != "" {
a.wsClient = ws.New(wsURL, a.client.NodeToken(), func(msg ws.PushMessage) {
// Forward messages to the main goroutine via channel
if pushURL := a.poller.PushURL(); pushURL != "" {
a.pushClient = push.New(pushURL, a.client.NodeToken(), func(msg push.PushMessage) {
select {
case a.wsCh <- msg:
case a.pushCh <- msg:
default:
slog.Warn("ws: message channel full, dropping message", "type", msg.Type)
slog.Warn("push: message channel full, dropping message", "type", msg.Type)
}
})
go a.wsClient.Connect()
slog.Info("websocket client started", "url", wsURL)
go a.pushClient.Connect()
slog.Info("push client started", "url", pushURL)
}

a.sendHeartbeat()
Expand All @@ -114,8 +113,8 @@ func (a *Agent) Run() error {
select {
case <-ctx.Done():
slog.Info("shutting down all pipelines")
if a.wsClient != nil {
a.wsClient.Close()
if a.pushClient != nil {
a.pushClient.Close()
}
if a.immediateHeartbeat != nil {
a.immediateHeartbeat.Stop()
Expand All @@ -127,8 +126,8 @@ func (a *Agent) Run() error {
a.pollAndApply()
a.sendHeartbeat()
currentInterval = a.maybeResetTicker(ticker, currentInterval)
case msg := <-a.wsCh:
a.handleWsMessage(msg, ticker)
case msg := <-a.pushCh:
a.handlePushMessage(msg, ticker)
case <-a.immediateHeartbeatCh:
a.sendHeartbeat()
}
Expand Down Expand Up @@ -312,18 +311,18 @@ func (a *Agent) processSampleRequests(requests []client.SampleRequestMsg) {
}
}

// handleWsMessage processes a push message from the WebSocket channel.
// handlePushMessage processes a push message from the SSE push channel.
// MUST be called from the main goroutine (same goroutine as Run()'s select loop).
func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) {
func (a *Agent) handlePushMessage(msg push.PushMessage, ticker *time.Ticker) {
switch msg.Type {
case "config_changed":
slog.Info("ws: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason)
slog.Info("push: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason)
// Re-poll immediately to get the full assembled config
a.pollAndApply()
a.triggerImmediateHeartbeat()

case "sample_request":
slog.Info("ws: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID)
slog.Info("push: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID)
a.processSampleRequestsAndSend([]client.SampleRequestMsg{
{
RequestID: msg.RequestID,
Expand All @@ -334,7 +333,7 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) {
})

case "action":
slog.Info("ws: action received", "action", msg.Action)
slog.Info("push: action received", "action", msg.Action)
switch msg.Action {
case "self_update":
a.handlePendingAction(&client.PendingAction{
Expand All @@ -345,22 +344,22 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) {
})
a.triggerImmediateHeartbeat()
case "restart":
slog.Warn("ws: restart action not yet implemented, triggering re-poll instead")
slog.Warn("push: restart action not yet implemented, triggering re-poll instead")
a.pollAndApply()
a.triggerImmediateHeartbeat()
default:
slog.Warn("ws: unknown action", "action", msg.Action)
slog.Warn("push: unknown action", "action", msg.Action)
}

case "poll_interval":
if msg.IntervalMs > 0 {
newInterval := time.Duration(msg.IntervalMs) * time.Millisecond
ticker.Reset(newInterval)
slog.Info("ws: poll interval changed", "intervalMs", msg.IntervalMs)
slog.Info("push: poll interval changed", "intervalMs", msg.IntervalMs)
}

default:
slog.Warn("ws: unknown message type", "type", msg.Type)
slog.Warn("push: unknown message type", "type", msg.Type)
}
}

Expand All @@ -382,7 +381,7 @@ func (a *Agent) triggerImmediateHeartbeat() {
}

// processSampleRequestsAndSend processes sample requests and sends results
// directly to the /api/agent/samples endpoint (used for WebSocket-triggered requests).
// directly to the /api/agent/samples endpoint (used for push-triggered requests).
// Falls back to heartbeat delivery on HTTP failure.
func (a *Agent) processSampleRequestsAndSend(requests []client.SampleRequestMsg) {
statuses := a.supervisor.Statuses()
Expand Down
14 changes: 7 additions & 7 deletions agent/internal/agent/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type poller struct {
sampleRequests []client.SampleRequestMsg
pendingAction *client.PendingAction
pollIntervalMs int // server-provided poll interval from last response
websocketUrl string
pushUrl string
}

func newPoller(cfg *config.Config, c configFetcher) *poller {
Expand Down Expand Up @@ -194,9 +194,9 @@ func (p *poller) Poll() ([]PipelineAction, error) {
// Store server-provided poll interval
p.pollIntervalMs = resp.PollIntervalMs

// Store websocket URL for the agent to use
if resp.WebSocketURL != "" {
p.websocketUrl = resp.WebSocketURL
// Store push URL for the agent to use
if resp.PushURL != "" {
p.pushUrl = resp.PushURL
}

return actions, nil
Expand All @@ -221,9 +221,9 @@ func (p *poller) PollIntervalMs() int {
return p.pollIntervalMs
}

// WebSocketURL returns the WebSocket URL from the last config response.
func (p *poller) WebSocketURL() string {
// PushURL returns the SSE push URL from the last config response.
func (p *poller) PushURL() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.websocketUrl
return p.pushUrl
}
4 changes: 2 additions & 2 deletions agent/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *Client) SetNodeToken(token string) {
c.nodeToken = token
}

// NodeToken returns the current node token for use by other packages (e.g., WebSocket auth).
// NodeToken returns the current node token for use by other packages (e.g., push auth).
func (c *Client) NodeToken() string {
return c.nodeToken
}
Expand Down Expand Up @@ -119,7 +119,7 @@ type ConfigResponse struct {
SecretBackendConfig map[string]interface{} `json:"secretBackendConfig,omitempty"`
SampleRequests []SampleRequestMsg `json:"sampleRequests,omitempty"`
PendingAction *PendingAction `json:"pendingAction,omitempty"`
WebSocketURL string `json:"websocketUrl,omitempty"`
PushURL string `json:"pushUrl,omitempty"`
}

func (c *Client) GetConfig() (*ConfigResponse, error) {
Expand Down
173 changes: 173 additions & 0 deletions agent/internal/push/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package push

import (
"bufio"
"context"
"encoding/json"
"log/slog"
"net/http"
"strings"
"sync"
"time"
)

const (
maxBackoff = 30 * time.Second
maxBufferSize = 256 * 1024 // 256KB for large payloads
)

// sseHTTPClient sets a response-header timeout so a stalled proxy doesn't block
// the connect goroutine indefinitely. Body reads remain context-driven.
var sseHTTPClient = &http.Client{
Transport: &http.Transport{
ResponseHeaderTimeout: 15 * time.Second,
},
}

// Client maintains a persistent SSE connection to the server push endpoint.
type Client struct {
url string
token string
onMessage func(PushMessage)

mu sync.Mutex
cancel context.CancelFunc
done chan struct{}
}

// New creates a new SSE push client.
func New(url, token string, onMessage func(PushMessage)) *Client {
return &Client{
url: url,
token: token,
onMessage: onMessage,
done: make(chan struct{}),
}
}

// Connect blocks and maintains a persistent SSE connection with exponential
// backoff reconnection. Call Close() to stop.
func (c *Client) Connect() {
ctx, cancel := context.WithCancel(context.Background())
c.mu.Lock()
c.cancel = cancel
c.mu.Unlock()

backoff := time.Second

for {
start := time.Now()
err := c.stream(ctx)
if ctx.Err() != nil {
// Graceful shutdown
close(c.done)
return
}
// Reset backoff if connection was alive for a meaningful duration
if time.Since(start) > 5*time.Second {
backoff = time.Second
}
slog.Warn("push: connection lost, reconnecting",
"error", err, "backoff", backoff)
select {
case <-ctx.Done():
close(c.done)
return
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
}

// stream opens a single SSE connection and reads messages until error or cancel.
func (c *Client) stream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", c.url, nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+c.token)

resp, err := sseHTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return &httpError{StatusCode: resp.StatusCode}
}

slog.Info("push: connected", "url", c.url)

scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, maxBufferSize), maxBufferSize)

var eventType string
var dataBuilder strings.Builder

for scanner.Scan() {
line := scanner.Text()

if line == "" {
// Empty line = dispatch event
if dataBuilder.Len() > 0 {
c.dispatch(eventType, dataBuilder.String())
eventType = ""
dataBuilder.Reset()
}
continue
}

if strings.HasPrefix(line, ":") {
// SSE comment (keepalive) — ignore
continue
}

if strings.HasPrefix(line, "event: ") {
eventType = strings.TrimPrefix(line, "event: ")
continue
}

if strings.HasPrefix(line, "data: ") {
if dataBuilder.Len() > 0 {
dataBuilder.WriteByte('\n')
}
dataBuilder.WriteString(strings.TrimPrefix(line, "data: "))
continue
}
}

if err := scanner.Err(); err != nil {
return err
}
return nil
}

func (c *Client) dispatch(eventType, data string) {
var msg PushMessage
if err := json.Unmarshal([]byte(data), &msg); err != nil {
slog.Error("push: failed to parse message", "error", err, "event", eventType, "data", data)
return
}
c.onMessage(msg)
}

// Close gracefully stops the SSE connection.
func (c *Client) Close() {
c.mu.Lock()
cancel := c.cancel
c.mu.Unlock()

if cancel != nil {
cancel()
<-c.done
}
}

type httpError struct {
StatusCode int
}

func (e *httpError) Error() string {
return "push: unexpected status " + http.StatusText(e.StatusCode)
}
Loading
Loading