Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
370f678
chore: add ws dependency for WebSocket push channel
TerrifiedBug Mar 10, 2026
110f81d
feat: define push message types for WebSocket protocol
TerrifiedBug Mar 10, 2026
9197cbd
feat: add in-memory WebSocket connection registry
TerrifiedBug Mar 10, 2026
ed40360
feat: add WebSocket upgrade authentication helper
TerrifiedBug Mar 10, 2026
b559f35
feat: custom server.ts with WebSocket upgrade handler and keepalive
TerrifiedBug Mar 10, 2026
9cbf302
chore: add gorilla/websocket dependency for agent WebSocket client
TerrifiedBug Mar 10, 2026
474646e
feat: define Go types for WebSocket push messages
TerrifiedBug Mar 10, 2026
1a98166
feat: dedicated /api/agent/samples endpoint for sample results
TerrifiedBug Mar 10, 2026
78cd438
feat: include websocketUrl in agent config response
TerrifiedBug Mar 10, 2026
6cb38b3
feat: compile custom server.ts in Docker build for WebSocket support
TerrifiedBug Mar 10, 2026
1485f51
test: WebSocket client message parsing and reconnect tests
TerrifiedBug Mar 10, 2026
5cb4ef4
feat: WebSocket client with exponential backoff reconnect
TerrifiedBug Mar 10, 2026
245c2da
feat: push WebSocket notifications from deploy, sample, fleet actions
TerrifiedBug Mar 10, 2026
c99d033
feat: add SendSampleResults method, NodeToken accessor, WebSocketURL …
TerrifiedBug Mar 10, 2026
630b9ed
feat: add poller mutex and wire WebSocket into agent main loop
TerrifiedBug Mar 10, 2026
06202f7
feat: expose WebSocket status in fleet API and add WS badge to UI
TerrifiedBug Mar 10, 2026
4bc4c34
fix: resolve data race in immediate heartbeat and poller mutex consis…
TerrifiedBug Mar 10, 2026
583baf7
fix: resolve CI errors and duplicate sample results
TerrifiedBug Mar 10, 2026
f5548fe
fix: reconnection race, auth performance, and sample dedup
TerrifiedBug Mar 10, 2026
ccbe6ad
fix: address final Greptile review findings
TerrifiedBug Mar 11, 2026
da937d6
fix: cross-environment auth and status race in samples endpoint
TerrifiedBug Mar 11, 2026
ce90f8a
fix: unify sample status logic to prevent COMPLETED/ERROR race
TerrifiedBug Mar 11, 2026
dbf1379
fix: handle restart action and add token cache TTL/size bounds
TerrifiedBug Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/TerrifiedBug/vectorflow/agent

go 1.22

require github.com/gorilla/websocket v1.5.3
2 changes: 2 additions & 0 deletions agent/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
168 changes: 168 additions & 0 deletions agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/TerrifiedBug/vectorflow/agent/internal/config"
"github.com/TerrifiedBug/vectorflow/agent/internal/sampler"
"github.com/TerrifiedBug/vectorflow/agent/internal/supervisor"
"github.com/TerrifiedBug/vectorflow/agent/internal/ws"
)

var Version = "dev"
Expand All @@ -32,6 +33,11 @@ type Agent struct {
sampleResults []client.SampleResultMsg
failedUpdateVersion string // skip retries for this version
updateError string // report failure to server

wsClient *ws.Client
wsCh chan ws.PushMessage
immediateHeartbeat *time.Timer
immediateHeartbeatCh chan struct{}
}

func New(cfg *config.Config) (*Agent, error) {
Expand Down Expand Up @@ -83,18 +89,45 @@ func (a *Agent) Run() error {

// Do first poll immediately
a.pollAndApply()

// Start WebSocket if the server provided a URL
a.wsCh = make(chan ws.PushMessage, 16)
a.immediateHeartbeatCh = make(chan struct{}, 1)
if wsURL := a.poller.WebSocketURL(); wsURL != "" {
a.wsClient = ws.New(wsURL, a.client.NodeToken(), func(msg ws.PushMessage) {
// Forward messages to the main goroutine via channel
select {
case a.wsCh <- msg:
default:
slog.Warn("ws: message channel full, dropping message", "type", msg.Type)
}
})
go a.wsClient.Connect()
slog.Info("websocket client started", "url", wsURL)
}

a.sendHeartbeat()

for {
select {
case <-ctx.Done():
slog.Info("shutting down all pipelines")
if a.wsClient != nil {
a.wsClient.Close()
}
if a.immediateHeartbeat != nil {
a.immediateHeartbeat.Stop()
}
a.supervisor.ShutdownAll()
slog.Info("agent stopped")
return nil
case <-ticker.C:
a.pollAndApply()
a.sendHeartbeat()
case msg := <-a.wsCh:
a.handleWsMessage(msg, ticker)
case <-a.immediateHeartbeatCh:
a.sendHeartbeat()
}
}
}
Expand Down Expand Up @@ -259,3 +292,138 @@ func (a *Agent) processSampleRequests(requests []client.SampleRequestMsg) {
}
}
}

// handleWsMessage processes a push message from the WebSocket channel.
// MUST be called from the main goroutine (same goroutine as Run()'s select loop).
func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) {
switch msg.Type {
case "config_changed":
slog.Info("ws: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason)
// Re-poll immediately to get the full assembled config
a.pollAndApply()
a.triggerImmediateHeartbeat()

case "sample_request":
slog.Info("ws: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID)
a.processSampleRequestsAndSend([]client.SampleRequestMsg{
{
RequestID: msg.RequestID,
PipelineID: msg.PipelineID,
ComponentKeys: msg.ComponentKeys,
Limit: msg.Limit,
},
})

case "action":
slog.Info("ws: action received", "action", msg.Action)
switch msg.Action {
case "self_update":
a.handlePendingAction(&client.PendingAction{
Type: "self_update",
TargetVersion: msg.TargetVersion,
DownloadURL: msg.DownloadURL,
Checksum: msg.Checksum,
})
a.triggerImmediateHeartbeat()
case "restart":
slog.Warn("ws: restart action not yet implemented, triggering re-poll instead")
a.pollAndApply()
a.triggerImmediateHeartbeat()
default:
slog.Warn("ws: unknown action", "action", msg.Action)
}

case "poll_interval":
if msg.IntervalMs > 0 {
newInterval := time.Duration(msg.IntervalMs) * time.Millisecond
ticker.Reset(newInterval)
slog.Info("ws: poll interval changed", "intervalMs", msg.IntervalMs)
}

default:
slog.Warn("ws: unknown message type", "type", msg.Type)
}
}

// triggerImmediateHeartbeat sends a heartbeat soon, debounced to 1 second.
// Multiple calls within 1s collapse into a single heartbeat with the latest state.
// The timer fires a signal back to the main goroutine's select loop, ensuring
// sendHeartbeat() always runs on the main goroutine (no data race on updateError).
// MUST be called from the main goroutine.
func (a *Agent) triggerImmediateHeartbeat() {
if a.immediateHeartbeat != nil {
a.immediateHeartbeat.Stop()
}
a.immediateHeartbeat = time.AfterFunc(time.Second, func() {
select {
case a.immediateHeartbeatCh <- struct{}{}:
default:
}
})
}

// processSampleRequestsAndSend processes sample requests and sends results
// directly to the /api/agent/samples endpoint (used for WebSocket-triggered requests).
// Falls back to heartbeat delivery on HTTP failure.
func (a *Agent) processSampleRequestsAndSend(requests []client.SampleRequestMsg) {
statuses := a.supervisor.Statuses()
statusMap := make(map[string]supervisor.ProcessInfo, len(statuses))
for _, s := range statuses {
statusMap[s.PipelineID] = s
}

for _, req := range requests {
s, found := statusMap[req.PipelineID]
if !found || s.Status != "RUNNING" || s.APIPort == 0 {
errMsg := "pipeline not running"
if !found {
errMsg = "pipeline not found"
} else if s.APIPort == 0 {
errMsg = "pipeline API port not available"
}
results := make([]client.SampleResultMsg, 0, len(req.ComponentKeys))
for _, key := range req.ComponentKeys {
results = append(results, client.SampleResultMsg{
RequestID: req.RequestID,
ComponentKey: key,
Error: errMsg,
})
}
if err := a.client.SendSampleResults(results); err != nil {
slog.Warn("failed to send sample error results via dedicated endpoint", "error", err)
a.mu.Lock()
a.sampleResults = append(a.sampleResults, results...)
a.mu.Unlock()
}
continue
}

for _, key := range req.ComponentKeys {
go func(reqID string, apiPort int, componentKey string, limit int) {
result := sampler.Sample(a.cfg.VectorBin, apiPort, componentKey, limit)
result.RequestID = reqID

msg := client.SampleResultMsg{
RequestID: result.RequestID,
ComponentKey: result.ComponentKey,
Events: result.Events,
Error: result.Error,
}
for _, fi := range result.Schema {
msg.Schema = append(msg.Schema, client.FieldInfoMsg{
Path: fi.Path, Type: fi.Type, Sample: fi.Sample,
})
}

if err := a.client.SendSampleResults([]client.SampleResultMsg{msg}); err != nil {
slog.Warn("failed to send sample results via dedicated endpoint, will retry in heartbeat", "error", err)
a.mu.Lock()
a.sampleResults = append(a.sampleResults, msg)
a.mu.Unlock()
} else {
slog.Debug("sample result sent via dedicated endpoint", "requestId", reqID, "component", componentKey)
}
}(req.RequestID, s.APIPort, key, req.Limit)
}
}
}
22 changes: 22 additions & 0 deletions agent/internal/agent/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/TerrifiedBug/vectorflow/agent/internal/client"
"github.com/TerrifiedBug/vectorflow/agent/internal/config"
Expand All @@ -24,9 +25,11 @@ type pipelineState struct {
type poller struct {
cfg *config.Config
client configFetcher
mu sync.Mutex
known map[string]pipelineState // pipelineId -> last known state
sampleRequests []client.SampleRequestMsg
pendingAction *client.PendingAction
websocketUrl string
}

func newPoller(cfg *config.Config, c configFetcher) *poller {
Expand Down Expand Up @@ -59,6 +62,9 @@ type PipelineAction struct {

// Poll fetches config from VectorFlow and returns actions to take.
func (p *poller) Poll() ([]PipelineAction, error) {
p.mu.Lock()
defer p.mu.Unlock()

resp, err := p.client.GetConfig()
if err != nil {
return nil, err
Expand Down Expand Up @@ -184,15 +190,31 @@ func (p *poller) Poll() ([]PipelineAction, error) {
// Store pending action (e.g. self-update) for the agent to handle
p.pendingAction = resp.PendingAction

// Store websocket URL for the agent to use
if resp.WebSocketURL != "" {
p.websocketUrl = resp.WebSocketURL
}

return actions, nil
}

// SampleRequests returns the sample requests from the last poll response.
func (p *poller) SampleRequests() []client.SampleRequestMsg {
p.mu.Lock()
defer p.mu.Unlock()
return p.sampleRequests
}

// PendingAction returns the pending action from the last poll response, if any.
func (p *poller) PendingAction() *client.PendingAction {
p.mu.Lock()
defer p.mu.Unlock()
return p.pendingAction
}

// WebSocketURL returns the WebSocket URL from the last config response.
func (p *poller) WebSocketURL() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.websocketUrl
}
41 changes: 41 additions & 0 deletions agent/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (c *Client) SetNodeToken(token string) {
c.nodeToken = token
}

// NodeToken returns the current node token for use by other packages (e.g., WebSocket auth).
func (c *Client) NodeToken() string {
return c.nodeToken
}

// EnrollRequest is sent to POST /api/agent/enroll
type EnrollRequest struct {
Token string `json:"token"`
Expand Down Expand Up @@ -114,6 +119,7 @@ type ConfigResponse struct {
SecretBackendConfig map[string]interface{} `json:"secretBackendConfig,omitempty"`
SampleRequests []SampleRequestMsg `json:"sampleRequests,omitempty"`
PendingAction *PendingAction `json:"pendingAction,omitempty"`
WebSocketURL string `json:"websocketUrl,omitempty"`
}

func (c *Client) GetConfig() (*ConfigResponse, error) {
Expand Down Expand Up @@ -257,3 +263,38 @@ func (c *Client) SendHeartbeat(req HeartbeatRequest) error {
}
return nil
}

// SampleResultsRequest is sent to POST /api/agent/samples
type SampleResultsRequest struct {
Results []SampleResultMsg `json:"results"`
}

// SendSampleResults sends sample results directly to the dedicated samples endpoint.
func (c *Client) SendSampleResults(results []SampleResultMsg) error {
req := SampleResultsRequest{Results: results}
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal sample results: %w", err)
}

httpReq, err := http.NewRequest("POST", c.baseURL+"/api/agent/samples", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create sample results request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Authorization", "Bearer "+c.nodeToken)

slog.Debug("http request", "method", "POST", "url", c.baseURL+"/api/agent/samples")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("sample results request failed: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("sample results failed (status %d): %s", resp.StatusCode, string(respBody))
}
slog.Debug("http response", "method", "POST", "url", "/api/agent/samples", "status", 200)
return nil
}
Loading
Loading