diff --git a/agent/go.mod b/agent/go.mod index a75086f6..92a8edfd 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -1,3 +1,5 @@ module github.com/TerrifiedBug/vectorflow/agent go 1.22 + +require github.com/gorilla/websocket v1.5.3 diff --git a/agent/go.sum b/agent/go.sum new file mode 100644 index 00000000..25a9fc4b --- /dev/null +++ b/agent/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index ac98d726..cde2066d 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -16,6 +16,7 @@ import ( "github.com/TerrifiedBug/vectorflow/agent/internal/config" "github.com/TerrifiedBug/vectorflow/agent/internal/sampler" "github.com/TerrifiedBug/vectorflow/agent/internal/supervisor" + "github.com/TerrifiedBug/vectorflow/agent/internal/ws" ) var Version = "dev" @@ -32,6 +33,11 @@ type Agent struct { sampleResults []client.SampleResultMsg failedUpdateVersion string // skip retries for this version updateError string // report failure to server + + wsClient *ws.Client + wsCh chan ws.PushMessage + immediateHeartbeat *time.Timer + immediateHeartbeatCh chan struct{} } func New(cfg *config.Config) (*Agent, error) { @@ -83,18 +89,45 @@ func (a *Agent) Run() error { // Do first poll immediately a.pollAndApply() + + // Start WebSocket if the server provided a URL + a.wsCh = make(chan ws.PushMessage, 16) + a.immediateHeartbeatCh = make(chan struct{}, 1) + if wsURL := a.poller.WebSocketURL(); wsURL != "" { + a.wsClient = ws.New(wsURL, a.client.NodeToken(), func(msg ws.PushMessage) { + // Forward messages to the main goroutine via channel + select { + case a.wsCh <- msg: + default: + slog.Warn("ws: message channel full, dropping message", "type", msg.Type) + } + }) + go a.wsClient.Connect() + slog.Info("websocket client started", "url", wsURL) + } + a.sendHeartbeat() for { select { case <-ctx.Done(): slog.Info("shutting down all pipelines") + if a.wsClient != nil { + a.wsClient.Close() + } + if a.immediateHeartbeat != nil { + a.immediateHeartbeat.Stop() + } a.supervisor.ShutdownAll() slog.Info("agent stopped") return nil case <-ticker.C: a.pollAndApply() a.sendHeartbeat() + case msg := <-a.wsCh: + a.handleWsMessage(msg, ticker) + case <-a.immediateHeartbeatCh: + a.sendHeartbeat() } } } @@ -259,3 +292,138 @@ func (a *Agent) processSampleRequests(requests []client.SampleRequestMsg) { } } } + +// handleWsMessage processes a push message from the WebSocket channel. +// MUST be called from the main goroutine (same goroutine as Run()'s select loop). +func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { + switch msg.Type { + case "config_changed": + slog.Info("ws: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason) + // Re-poll immediately to get the full assembled config + a.pollAndApply() + a.triggerImmediateHeartbeat() + + case "sample_request": + slog.Info("ws: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID) + a.processSampleRequestsAndSend([]client.SampleRequestMsg{ + { + RequestID: msg.RequestID, + PipelineID: msg.PipelineID, + ComponentKeys: msg.ComponentKeys, + Limit: msg.Limit, + }, + }) + + case "action": + slog.Info("ws: action received", "action", msg.Action) + switch msg.Action { + case "self_update": + a.handlePendingAction(&client.PendingAction{ + Type: "self_update", + TargetVersion: msg.TargetVersion, + DownloadURL: msg.DownloadURL, + Checksum: msg.Checksum, + }) + a.triggerImmediateHeartbeat() + case "restart": + slog.Warn("ws: restart action not yet implemented, triggering re-poll instead") + a.pollAndApply() + a.triggerImmediateHeartbeat() + default: + slog.Warn("ws: unknown action", "action", msg.Action) + } + + case "poll_interval": + if msg.IntervalMs > 0 { + newInterval := time.Duration(msg.IntervalMs) * time.Millisecond + ticker.Reset(newInterval) + slog.Info("ws: poll interval changed", "intervalMs", msg.IntervalMs) + } + + default: + slog.Warn("ws: unknown message type", "type", msg.Type) + } +} + +// triggerImmediateHeartbeat sends a heartbeat soon, debounced to 1 second. +// Multiple calls within 1s collapse into a single heartbeat with the latest state. +// The timer fires a signal back to the main goroutine's select loop, ensuring +// sendHeartbeat() always runs on the main goroutine (no data race on updateError). +// MUST be called from the main goroutine. +func (a *Agent) triggerImmediateHeartbeat() { + if a.immediateHeartbeat != nil { + a.immediateHeartbeat.Stop() + } + a.immediateHeartbeat = time.AfterFunc(time.Second, func() { + select { + case a.immediateHeartbeatCh <- struct{}{}: + default: + } + }) +} + +// processSampleRequestsAndSend processes sample requests and sends results +// directly to the /api/agent/samples endpoint (used for WebSocket-triggered requests). +// Falls back to heartbeat delivery on HTTP failure. +func (a *Agent) processSampleRequestsAndSend(requests []client.SampleRequestMsg) { + statuses := a.supervisor.Statuses() + statusMap := make(map[string]supervisor.ProcessInfo, len(statuses)) + for _, s := range statuses { + statusMap[s.PipelineID] = s + } + + for _, req := range requests { + s, found := statusMap[req.PipelineID] + if !found || s.Status != "RUNNING" || s.APIPort == 0 { + errMsg := "pipeline not running" + if !found { + errMsg = "pipeline not found" + } else if s.APIPort == 0 { + errMsg = "pipeline API port not available" + } + results := make([]client.SampleResultMsg, 0, len(req.ComponentKeys)) + for _, key := range req.ComponentKeys { + results = append(results, client.SampleResultMsg{ + RequestID: req.RequestID, + ComponentKey: key, + Error: errMsg, + }) + } + if err := a.client.SendSampleResults(results); err != nil { + slog.Warn("failed to send sample error results via dedicated endpoint", "error", err) + a.mu.Lock() + a.sampleResults = append(a.sampleResults, results...) + a.mu.Unlock() + } + continue + } + + for _, key := range req.ComponentKeys { + go func(reqID string, apiPort int, componentKey string, limit int) { + result := sampler.Sample(a.cfg.VectorBin, apiPort, componentKey, limit) + result.RequestID = reqID + + msg := client.SampleResultMsg{ + RequestID: result.RequestID, + ComponentKey: result.ComponentKey, + Events: result.Events, + Error: result.Error, + } + for _, fi := range result.Schema { + msg.Schema = append(msg.Schema, client.FieldInfoMsg{ + Path: fi.Path, Type: fi.Type, Sample: fi.Sample, + }) + } + + if err := a.client.SendSampleResults([]client.SampleResultMsg{msg}); err != nil { + slog.Warn("failed to send sample results via dedicated endpoint, will retry in heartbeat", "error", err) + a.mu.Lock() + a.sampleResults = append(a.sampleResults, msg) + a.mu.Unlock() + } else { + slog.Debug("sample result sent via dedicated endpoint", "requestId", reqID, "component", componentKey) + } + }(req.RequestID, s.APIPort, key, req.Limit) + } + } +} diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index c4d5410b..5f9426ab 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/TerrifiedBug/vectorflow/agent/internal/client" "github.com/TerrifiedBug/vectorflow/agent/internal/config" @@ -24,9 +25,11 @@ type pipelineState struct { type poller struct { cfg *config.Config client configFetcher + mu sync.Mutex known map[string]pipelineState // pipelineId -> last known state sampleRequests []client.SampleRequestMsg pendingAction *client.PendingAction + websocketUrl string } func newPoller(cfg *config.Config, c configFetcher) *poller { @@ -59,6 +62,9 @@ type PipelineAction struct { // Poll fetches config from VectorFlow and returns actions to take. func (p *poller) Poll() ([]PipelineAction, error) { + p.mu.Lock() + defer p.mu.Unlock() + resp, err := p.client.GetConfig() if err != nil { return nil, err @@ -184,15 +190,31 @@ func (p *poller) Poll() ([]PipelineAction, error) { // Store pending action (e.g. self-update) for the agent to handle p.pendingAction = resp.PendingAction + // Store websocket URL for the agent to use + if resp.WebSocketURL != "" { + p.websocketUrl = resp.WebSocketURL + } + return actions, nil } // SampleRequests returns the sample requests from the last poll response. func (p *poller) SampleRequests() []client.SampleRequestMsg { + p.mu.Lock() + defer p.mu.Unlock() return p.sampleRequests } // PendingAction returns the pending action from the last poll response, if any. func (p *poller) PendingAction() *client.PendingAction { + p.mu.Lock() + defer p.mu.Unlock() return p.pendingAction } + +// WebSocketURL returns the WebSocket URL from the last config response. +func (p *poller) WebSocketURL() string { + p.mu.Lock() + defer p.mu.Unlock() + return p.websocketUrl +} diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go index faabfb41..9fd39297 100644 --- a/agent/internal/client/client.go +++ b/agent/internal/client/client.go @@ -29,6 +29,11 @@ func (c *Client) SetNodeToken(token string) { c.nodeToken = token } +// NodeToken returns the current node token for use by other packages (e.g., WebSocket auth). +func (c *Client) NodeToken() string { + return c.nodeToken +} + // EnrollRequest is sent to POST /api/agent/enroll type EnrollRequest struct { Token string `json:"token"` @@ -114,6 +119,7 @@ type ConfigResponse struct { SecretBackendConfig map[string]interface{} `json:"secretBackendConfig,omitempty"` SampleRequests []SampleRequestMsg `json:"sampleRequests,omitempty"` PendingAction *PendingAction `json:"pendingAction,omitempty"` + WebSocketURL string `json:"websocketUrl,omitempty"` } func (c *Client) GetConfig() (*ConfigResponse, error) { @@ -257,3 +263,38 @@ func (c *Client) SendHeartbeat(req HeartbeatRequest) error { } return nil } + +// SampleResultsRequest is sent to POST /api/agent/samples +type SampleResultsRequest struct { + Results []SampleResultMsg `json:"results"` +} + +// SendSampleResults sends sample results directly to the dedicated samples endpoint. +func (c *Client) SendSampleResults(results []SampleResultMsg) error { + req := SampleResultsRequest{Results: results} + body, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("marshal sample results: %w", err) + } + + httpReq, err := http.NewRequest("POST", c.baseURL+"/api/agent/samples", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create sample results request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", "Bearer "+c.nodeToken) + + slog.Debug("http request", "method", "POST", "url", c.baseURL+"/api/agent/samples") + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return fmt.Errorf("sample results request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("sample results failed (status %d): %s", resp.StatusCode, string(respBody)) + } + slog.Debug("http response", "method", "POST", "url", "/api/agent/samples", "status", 200) + return nil +} diff --git a/agent/internal/ws/client.go b/agent/internal/ws/client.go new file mode 100644 index 00000000..817e4909 --- /dev/null +++ b/agent/internal/ws/client.go @@ -0,0 +1,143 @@ +// agent/internal/ws/client.go +package ws + +import ( + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// Client manages a persistent WebSocket connection to the VectorFlow server. +type Client struct { + url string + token string + onMessage func(PushMessage) + + mu sync.Mutex + conn *websocket.Conn + done chan struct{} +} + +// New creates a WebSocket client. Call Connect() to start. +func New(url, token string, onMessage func(PushMessage)) *Client { + return &Client{ + url: url, + token: token, + onMessage: onMessage, + done: make(chan struct{}), + } +} + +// Connect establishes the WebSocket connection with exponential backoff retry. +// Blocks until Close() is called. After connecting, starts a read loop that +// calls onMessage for each received message. +func (c *Client) Connect() { + backoff := time.Second + maxBackoff := 30 * time.Second + + for { + select { + case <-c.done: + return + default: + } + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + header := http.Header{} + if c.token != "" { + header.Set("Authorization", "Bearer "+c.token) + } + + conn, _, err := dialer.Dial(c.url, header) + if err != nil { + slog.Warn("websocket connect failed, retrying", "url", c.url, "backoff", backoff, "error", err) + select { + case <-time.After(backoff): + case <-c.done: + return + } + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + continue + } + + // Connected — reset backoff + backoff = time.Second + slog.Info("websocket connected", "url", c.url) + + c.mu.Lock() + c.conn = conn + c.mu.Unlock() + + // Set read deadline so silent network drops are detected. + // Reset on every pong. Ping interval is 30s + 10s timeout = 40s. + conn.SetReadDeadline(time.Now().Add(45 * time.Second)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(45 * time.Second)) + return nil + }) + + // Read loop — blocks until connection drops + c.readLoop(conn) + + c.mu.Lock() + c.conn = nil + c.mu.Unlock() + + slog.Warn("websocket disconnected, reconnecting", "backoff", backoff) + } +} + +func (c *Client) readLoop(conn *websocket.Conn) { + for { + _, data, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + slog.Warn("websocket read error", "error", err) + } + return + } + + // Reset read deadline on any received message + conn.SetReadDeadline(time.Now().Add(45 * time.Second)) + + var msg PushMessage + if err := json.Unmarshal(data, &msg); err != nil { + slog.Warn("websocket message parse error", "error", err, "data", string(data)) + continue + } + + slog.Debug("websocket message received", "type", msg.Type) + c.onMessage(msg) + } +} + +// Close gracefully shuts down the WebSocket connection. +func (c *Client) Close() { + select { + case <-c.done: + return // already closed + default: + close(c.done) + } + + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn != nil { + conn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + ) + conn.Close() + } +} diff --git a/agent/internal/ws/client_test.go b/agent/internal/ws/client_test.go new file mode 100644 index 00000000..e9afe017 --- /dev/null +++ b/agent/internal/ws/client_test.go @@ -0,0 +1,211 @@ +// agent/internal/ws/client_test.go +package ws + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func TestPushMessageParsing(t *testing.T) { + tests := []struct { + name string + json string + wantType string + }{ + { + name: "config_changed", + json: `{"type":"config_changed","pipelineId":"p1","reason":"deploy"}`, + wantType: "config_changed", + }, + { + name: "config_changed_no_pipeline", + json: `{"type":"config_changed","reason":"maintenance_on"}`, + wantType: "config_changed", + }, + { + name: "sample_request", + json: `{"type":"sample_request","requestId":"r1","pipelineId":"p1","componentKeys":["source_in"],"limit":5}`, + wantType: "sample_request", + }, + { + name: "action_self_update", + json: `{"type":"action","action":"self_update","targetVersion":"v1.2.3","downloadUrl":"https://example.com/agent","checksum":"sha256:abc"}`, + wantType: "action", + }, + { + name: "poll_interval", + json: `{"type":"poll_interval","intervalMs":5000}`, + wantType: "poll_interval", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var msg PushMessage + if err := json.Unmarshal([]byte(tt.json), &msg); err != nil { + t.Fatalf("unmarshal error: %v", err) + } + if msg.Type != tt.wantType { + t.Errorf("got type %q, want %q", msg.Type, tt.wantType) + } + }) + } +} + +func TestClientConnectsAndReceivesMessages(t *testing.T) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + var serverConn *websocket.Conn + var mu sync.Mutex + connected := make(chan struct{}, 1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify auth header + auth := r.Header.Get("Authorization") + if auth != "Bearer test-token" { + http.Error(w, "unauthorized", 401) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Logf("upgrade error: %v", err) + return + } + mu.Lock() + serverConn = conn + mu.Unlock() + select { + case connected <- struct{}{}: + default: + } + })) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + + var received []PushMessage + var recvMu sync.Mutex + msgCh := make(chan struct{}, 1) + + client := New(wsURL, "test-token", func(msg PushMessage) { + recvMu.Lock() + received = append(received, msg) + recvMu.Unlock() + select { + case msgCh <- struct{}{}: + default: + } + }) + + go client.Connect() + defer client.Close() + + // Wait for connection (channel-based, not sleep) + select { + case <-connected: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for connection") + } + + // Send a message from server + mu.Lock() + conn := serverConn + mu.Unlock() + + if conn == nil { + t.Fatal("server did not receive connection") + } + + msg := PushMessage{Type: "config_changed", PipelineID: "p1", Reason: "deploy"} + data, _ := json.Marshal(msg) + if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { + t.Fatalf("write error: %v", err) + } + + // Wait for message to be received (channel-based) + select { + case <-msgCh: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for message") + } + + recvMu.Lock() + defer recvMu.Unlock() + if len(received) != 1 { + t.Fatalf("expected 1 message, got %d", len(received)) + } + if received[0].Type != "config_changed" { + t.Errorf("got type %q, want config_changed", received[0].Type) + } + if received[0].PipelineID != "p1" { + t.Errorf("got pipelineId %q, want p1", received[0].PipelineID) + } +} + +func TestClientReconnectsAfterDisconnect(t *testing.T) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + var connections int + var connMu sync.Mutex + reconnected := make(chan struct{}, 1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + connMu.Lock() + connections++ + count := connections + connMu.Unlock() + + if count == 1 { + // Close first connection to trigger reconnect + time.Sleep(100 * time.Millisecond) + conn.Close() + } else { + // Signal reconnect happened + select { + case reconnected <- struct{}{}: + default: + } + // Keep second connection open + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + } + })) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + client := New(wsURL, "", func(msg PushMessage) {}) + go client.Connect() + defer client.Close() + + // Wait for reconnect (channel-based, not sleep) + select { + case <-reconnected: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for reconnect") + } + + connMu.Lock() + defer connMu.Unlock() + if connections < 2 { + t.Errorf("expected at least 2 connections (reconnect), got %d", connections) + } +} diff --git a/agent/internal/ws/types.go b/agent/internal/ws/types.go new file mode 100644 index 00000000..a7a0feb2 --- /dev/null +++ b/agent/internal/ws/types.go @@ -0,0 +1,29 @@ +package ws + +// PushMessage is the envelope for all server→agent push messages. +// The Type field determines which concrete fields are populated. +// +// Fields are shared across message types: +// - PipelineID: used by config_changed, sample_request +// - Checksum: used by action (self_update) +type PushMessage struct { + Type string `json:"type"` + + // config_changed fields + PipelineID string `json:"pipelineId,omitempty"` + Reason string `json:"reason,omitempty"` + + // sample_request fields + RequestID string `json:"requestId,omitempty"` + ComponentKeys []string `json:"componentKeys,omitempty"` + Limit int `json:"limit,omitempty"` + + // action fields + Action string `json:"action,omitempty"` + TargetVersion string `json:"targetVersion,omitempty"` + DownloadURL string `json:"downloadUrl,omitempty"` + Checksum string `json:"checksum,omitempty"` + + // poll_interval fields + IntervalMs int `json:"intervalMs,omitempty"` +} diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 84960ce5..0479327e 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -31,8 +31,15 @@ COPY prisma.config.ts ./prisma.config.ts RUN npx prisma generate COPY src ./src COPY public ./public +COPY server.ts ./server.ts RUN --mount=type=cache,target=/app/.next/cache \ pnpm build +# Bundle custom server.ts → server.js (replaces Next.js default standalone server.js) +# Uses --alias to resolve @/ imports, --packages=external keeps npm deps external +RUN npx esbuild server.ts --bundle --platform=node --target=node22 \ + --outfile=.next/standalone/server.js \ + --alias:@/=./src/ \ + --packages=external # Save Prisma client version for the runner stage (standalone strips node_modules) RUN node -e "console.log(require('./node_modules/@prisma/client/package.json').version)" > /tmp/prisma-version diff --git a/package.json b/package.json index 0fe1381b..38ba4493 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "private": true, "packageManager": "pnpm@10.13.1", "scripts": { - "dev": "next dev", + "dev": "tsx server.ts", "build": "next build", "start": "next start", "lint": "eslint", @@ -50,6 +50,7 @@ "sonner": "^2.0.7", "superjson": "^2.2.6", "tailwind-merge": "^3.5.0", + "ws": "^8.19.0", "zod": "^4.3.6", "zustand": "^5.0.11" }, @@ -74,6 +75,7 @@ "@types/nodemailer": "^7.0.11", "@types/react": "^19", "@types/react-dom": "^19", + "@types/ws": "^8.18.1", "eslint": "^9", "eslint-config-next": "16.1.6", "prisma": "^7.4.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11ee7960..ee8353fc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -129,6 +129,9 @@ importers: tailwind-merge: specifier: ^3.5.0 version: 3.5.0 + ws: + specifier: ^8.19.0 + version: 8.19.0 zod: specifier: ^4.3.6 version: 4.3.6 @@ -163,6 +166,9 @@ importers: '@types/react-dom': specifier: ^19 version: 19.2.3(@types/react@19.2.14) + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 eslint: specifier: ^9 version: 9.39.3(jiti@2.6.1) @@ -1950,6 +1956,9 @@ packages: '@types/validate-npm-package-name@4.0.2': resolution: {integrity: sha512-lrpDziQipxCEeK5kWxvljWYhUvOiB2A9izZd9B2AFarYAkqZshb4lPbRs7zKEic6eGtH8V/2qJW+dPp9OtF6bw==} + '@types/ws@8.18.1': + resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} + '@typescript-eslint/eslint-plugin@8.56.1': resolution: {integrity: sha512-Jz9ZztpB37dNC+HU2HI28Bs9QXpzCz+y/twHOwhyrIRdbuVDxSytJNDl6z/aAKlaRIwC7y8wJdkBv7FxYGgi0A==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} @@ -4752,6 +4761,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.19.0: + resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + wsl-utils@0.3.1: resolution: {integrity: sha512-g/eziiSUNBSsdDJtCLB8bdYEUMj4jR7AGeUo96p/3dTafgjHhpF4RiCFPiRILwjQoDXx5MqkBr4fwWtR3Ky4Wg==} engines: {node: '>=20'} @@ -6608,6 +6629,10 @@ snapshots: '@types/validate-npm-package-name@4.0.2': {} + '@types/ws@8.18.1': + dependencies: + '@types/node': 20.19.35 + '@typescript-eslint/eslint-plugin@8.56.1(@typescript-eslint/parser@8.56.1(eslint@9.39.3(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.3(jiti@2.6.1))(typescript@5.9.3)': dependencies: '@eslint-community/regexpp': 4.12.2 @@ -9748,6 +9773,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.19.0: {} + wsl-utils@0.3.1: dependencies: is-wsl: 3.1.1 diff --git a/prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql b/prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql new file mode 100644 index 00000000..dabc5d9e --- /dev/null +++ b/prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE UNIQUE INDEX "EventSample_requestId_componentKey_key" ON "EventSample"("requestId", "componentKey"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 50e3076b..b529d5f9 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -364,6 +364,7 @@ model EventSample { error String? sampledAt DateTime @default(now()) + @@unique([requestId, componentKey]) @@index([pipelineId, componentKey]) } diff --git a/server.ts b/server.ts new file mode 100644 index 00000000..5f401d32 --- /dev/null +++ b/server.ts @@ -0,0 +1,99 @@ +import { createServer, type IncomingMessage } from "http"; +import type { Socket } from "net"; +import next from "next"; +import { parse } from "url"; +import { type WebSocket, WebSocketServer } from "ws"; + +const dev = process.env.NODE_ENV !== "production"; +const hostname = process.env.HOSTNAME || "0.0.0.0"; +const port = parseInt(process.env.PORT || "3000", 10); + +const PING_INTERVAL_MS = 30_000; +const PONG_TIMEOUT_MS = 10_000; +const WS_PATH = "/api/agent/ws"; + +const app = next({ dev, hostname, port }); +const handle = app.getRequestHandler(); + +app.prepare().then(async () => { + const { authenticateWsUpgrade } = await import("./src/server/services/ws-auth"); + const { wsRegistry } = await import("./src/server/services/ws-registry"); + + const server = createServer((req, res) => { + const parsedUrl = parse(req.url ?? "/", true); + handle(req, res, parsedUrl); + }); + + const wss = new WebSocketServer({ noServer: true }); + + server.on("upgrade", async (req: IncomingMessage, socket: Socket, head: Buffer) => { + const { pathname } = parse(req.url ?? "/", true); + + if (pathname !== WS_PATH) { + socket.destroy(); + return; + } + + const agent = await authenticateWsUpgrade(req); + if (!agent) { + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); + socket.destroy(); + return; + } + + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit("connection", ws, req, agent); + }); + }); + + wss.on("connection", (ws: WebSocket, _req: IncomingMessage, agent: { nodeId: string; environmentId: string }) => { + const { nodeId } = agent; + console.log(`[ws] agent connected: ${nodeId}`); + wsRegistry.register(nodeId, ws); + + let alive = true; + let pongTimer: ReturnType | null = null; + + ws.on("pong", () => { + alive = true; + if (pongTimer) { + clearTimeout(pongTimer); + pongTimer = null; + } + }); + + const pingInterval = setInterval(() => { + if (!alive) { + console.log(`[ws] agent ${nodeId} did not respond to ping, terminating`); + ws.terminate(); + return; + } + alive = false; + ws.ping(); + pongTimer = setTimeout(() => { + if (!alive) { + console.log(`[ws] agent ${nodeId} pong timeout (${PONG_TIMEOUT_MS}ms), terminating`); + ws.terminate(); + } + }, PONG_TIMEOUT_MS); + }, PING_INTERVAL_MS); + + ws.on("close", () => { + console.log(`[ws] agent disconnected: ${nodeId}`); + clearInterval(pingInterval); + if (pongTimer) clearTimeout(pongTimer); + wsRegistry.unregister(nodeId, ws); + }); + + ws.on("error", (err: Error) => { + console.error(`[ws] error for agent ${nodeId}:`, err.message); + clearInterval(pingInterval); + if (pongTimer) clearTimeout(pongTimer); + wsRegistry.unregister(nodeId, ws); + }); + }); + + server.listen(port, hostname, () => { + console.log(`> Ready on http://${hostname}:${port}`); + }); +}); diff --git a/src/app/(dashboard)/fleet/page.tsx b/src/app/(dashboard)/fleet/page.tsx index bcc404f0..811ea6a3 100644 --- a/src/app/(dashboard)/fleet/page.tsx +++ b/src/app/(dashboard)/fleet/page.tsx @@ -231,6 +231,18 @@ export default function FleetPage() { {nodeStatusLabel(node.status)} )} + {node.wsConnected && ( + + + + WS + + + + WebSocket connected — real-time push enabled + + + )} {formatLastSeen(node.lastSeen)} diff --git a/src/app/api/agent/config/route.ts b/src/app/api/agent/config/route.ts index 822f906c..f6b6fa80 100644 --- a/src/app/api/agent/config/route.ts +++ b/src/app/api/agent/config/route.ts @@ -196,9 +196,18 @@ export async function GET(request: Request) { select: { fleetPollIntervalMs: true }, }); + // Build WebSocket URL from the incoming request's host + const proto = request.headers.get("x-forwarded-proto") ?? "http"; + const wsProto = proto === "https" ? "wss" : "ws"; + const host = request.headers.get("x-forwarded-host") + ?? request.headers.get("host") + ?? `localhost:${process.env.PORT ?? 3000}`; + const websocketUrl = `${wsProto}://${host}/api/agent/ws`; + return NextResponse.json({ pipelines: pipelineConfigs, pollIntervalMs: settings?.fleetPollIntervalMs ?? 15000, + websocketUrl, secretBackend: environment.secretBackend, ...(environment.secretBackend !== "BUILTIN" ? { secretBackendConfig: environment.secretBackendConfig } diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts new file mode 100644 index 00000000..3b173ae8 --- /dev/null +++ b/src/app/api/agent/samples/route.ts @@ -0,0 +1,113 @@ +import { NextResponse } from "next/server"; +import { Prisma } from "@/generated/prisma"; +import { prisma } from "@/lib/prisma"; +import { authenticateAgent } from "@/server/services/agent-auth"; +import { z } from "zod"; + +const sampleResultSchema = z.object({ + results: z.array( + z.object({ + requestId: z.string(), + componentKey: z.string(), + events: z.array(z.unknown()).optional().default([]), + schema: z + .array( + z.object({ + path: z.string(), + type: z.string(), + sample: z.string(), + }), + ) + .optional() + .default([]), + error: z.string().optional(), + }), + ), +}); + +/** Returns true if this is a Prisma unique constraint violation (P2002). */ +function isUniqueViolation(err: unknown): boolean { + return err instanceof Prisma.PrismaClientKnownRequestError && err.code === "P2002"; +} + +export async function POST(request: Request) { + const agent = await authenticateAgent(request); + if (!agent) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + try { + const body = await request.json(); + const parsed = sampleResultSchema.safeParse(body); + if (!parsed.success) { + return NextResponse.json( + { error: "Invalid payload", details: parsed.error.issues }, + { status: 400 }, + ); + } + + const { results } = parsed.data; + + for (const result of results) { + const sampleRequest = await prisma.eventSampleRequest.findUnique({ + where: { id: result.requestId }, + include: { pipeline: { select: { environmentId: true } } }, + }); + if (!sampleRequest || sampleRequest.status !== "PENDING") { + continue; + } + + // Verify the request's pipeline belongs to this agent's environment + if (sampleRequest.pipeline.environmentId !== agent.environmentId) { + continue; + } + + // Write the EventSample (success or error) + try { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: sampleRequest.pipelineId, + componentKey: result.componentKey, + events: result.error ? [] : (result.events as object[]), + schema: result.error ? [] : result.schema, + error: result.error ?? null, + }, + }); + } catch (err) { + if (isUniqueViolation(err)) continue; // another agent already submitted + throw err; + } + + // Check if all components now have samples (success or error) + const componentKeys = sampleRequest.componentKeys as string[]; + const samples = await prisma.eventSample.findMany({ + where: { requestId: result.requestId }, + select: { componentKey: true, error: true }, + }); + const sampledKeySet = new Set(samples.map((s) => s.componentKey)); + const allDone = componentKeys.every((k) => sampledKeySet.has(k)); + + if (allDone) { + const hasErrors = samples.some((s) => s.error != null); + // Atomically transition PENDING → final status; no-op if already moved. + await prisma.eventSampleRequest.updateMany({ + where: { id: result.requestId, status: "PENDING" }, + data: { + status: hasErrors ? "ERROR" : "COMPLETED", + completedAt: new Date(), + nodeId: agent.nodeId, + }, + }); + } + } + + return NextResponse.json({ ok: true }); + } catch (error) { + console.error("Sample results error:", error); + return NextResponse.json( + { error: "Failed to process sample results" }, + { status: 500 }, + ); + } +} diff --git a/src/server/routers/deploy.ts b/src/server/routers/deploy.ts index 342d6f9c..d29ac2b3 100644 --- a/src/server/routers/deploy.ts +++ b/src/server/routers/deploy.ts @@ -10,6 +10,7 @@ import { decryptNodeConfig } from "@/server/services/config-crypto"; import { withAudit } from "@/server/middleware/audit"; import { writeAuditLog } from "@/server/services/audit"; import { fireEventAlert } from "@/server/services/event-alerts"; +import { wsRegistry } from "@/server/services/ws-registry"; export const deployRouter = router({ preview: protectedProcedure @@ -267,7 +268,24 @@ export const deployRouter = router({ throw new TRPCError({ code: "NOT_FOUND", message: "Pipeline not found" }); } - return undeployAgent(input.pipelineId); + const result = await undeployAgent(input.pipelineId); + + // Notify connected agents that config has changed + if (result.success) { + const nodes = await prisma.vectorNode.findMany({ + where: { environmentId: pipeline.environmentId }, + select: { id: true }, + }); + for (const node of nodes) { + wsRegistry.send(node.id, { + type: "config_changed", + pipelineId: input.pipelineId, + reason: "undeploy", + }); + } + } + + return result; }), environmentInfo: protectedProcedure diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index 0c4225c3..2a82f09b 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -5,19 +5,24 @@ import { prisma } from "@/lib/prisma"; import { LogLevel } from "@/generated/prisma"; import { withAudit } from "@/server/middleware/audit"; import { checkDevAgentVersion } from "@/server/services/version-check"; +import { wsRegistry } from "@/server/services/ws-registry"; export const fleetRouter = router({ list: protectedProcedure .input(z.object({ environmentId: z.string() })) .use(withTeamAccess("VIEWER")) .query(async ({ input }) => { - return prisma.vectorNode.findMany({ + const nodes = await prisma.vectorNode.findMany({ where: { environmentId: input.environmentId }, include: { environment: { select: { id: true, name: true } }, }, orderBy: { createdAt: "desc" }, }); + return nodes.map((node) => ({ + ...node, + wsConnected: wsRegistry.isConnected(node.id), + })); }), get: protectedProcedure @@ -279,7 +284,7 @@ export const fleetRouter = router({ checksum = `sha256:${freshChecksum}`; } - return prisma.vectorNode.update({ + const updated = await prisma.vectorNode.update({ where: { id: input.nodeId }, data: { pendingAction: { @@ -290,6 +295,17 @@ export const fleetRouter = router({ }, }, }); + + // Push action to agent via WebSocket (fallback: agent reads pendingAction on next poll) + wsRegistry.send(input.nodeId, { + type: "action", + action: "self_update", + targetVersion, + downloadUrl, + checksum, + }); + + return updated; }), updateLabels: protectedProcedure @@ -345,13 +361,21 @@ export const fleetRouter = router({ if (!node) { throw new TRPCError({ code: "NOT_FOUND", message: "Node not found" }); } - return prisma.vectorNode.update({ + const updated = await prisma.vectorNode.update({ where: { id: input.nodeId }, data: { maintenanceMode: input.enabled, maintenanceModeAt: input.enabled ? new Date() : null, }, }); + + // Maintenance mode changes what the config endpoint returns — notify agent to re-poll + wsRegistry.send(input.nodeId, { + type: "config_changed", + reason: input.enabled ? "maintenance_on" : "maintenance_off", + }); + + return updated; }), listWithPipelineStatus: protectedProcedure @@ -388,7 +412,10 @@ export const fleetRouter = router({ }); return { - nodes, + nodes: nodes.map((node) => ({ + ...node, + wsConnected: wsRegistry.isConnected(node.id), + })), deployedPipelines: deployedPipelines.map((p) => ({ id: p.id, name: p.name, diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 00078d90..d82f3535 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -18,6 +18,7 @@ import { copyPipelineGraph } from "@/server/services/copy-pipeline-graph"; import { stripEnvRefs, type StrippedRef } from "@/server/services/strip-env-refs"; import { gitSyncDeletePipeline } from "@/server/services/git-sync"; import { evaluatePipelineHealth } from "@/server/services/sli-evaluator"; +import { wsRegistry } from "@/server/services/ws-registry"; /** Pipeline names must be safe identifiers */ const pipelineNameSchema = z @@ -1161,6 +1162,21 @@ export const pipelineRouter = router({ }, }); + // Push sample request to connected agents running this pipeline + const statuses = await prisma.nodePipelineStatus.findMany({ + where: { pipelineId: input.pipelineId, status: "RUNNING" }, + select: { nodeId: true }, + }); + for (const { nodeId } of statuses) { + wsRegistry.send(nodeId, { + type: "sample_request", + requestId: request.id, + pipelineId: input.pipelineId, + componentKeys: input.componentKeys, + limit: input.limit, + }); + } + return { requestId: request.id, status: "PENDING" }; }), diff --git a/src/server/services/deploy-agent.ts b/src/server/services/deploy-agent.ts index 12b2ebff..389ac525 100644 --- a/src/server/services/deploy-agent.ts +++ b/src/server/services/deploy-agent.ts @@ -6,6 +6,7 @@ import { createVersion } from "@/server/services/pipeline-version"; import { decryptNodeConfig } from "@/server/services/config-crypto"; import { startSystemVector, stopSystemVector } from "@/server/services/system-vector"; import { gitSyncCommitPipeline } from "@/server/services/git-sync"; +import { wsRegistry } from "@/server/services/ws-registry"; export interface AgentDeployResult { success: boolean; @@ -165,6 +166,28 @@ export async function deployAgent( await startSystemVector(version.configYaml); } + // Notify connected agents that config has changed — they will re-poll + // to get the full assembled config with secrets and certs resolved. + if (!pipeline.isSystem) { + const nodeSelector = pipeline.nodeSelector as Record | null; + const targetNodes = await prisma.vectorNode.findMany({ + where: { environmentId: pipeline.environmentId }, + select: { id: true, labels: true }, + }); + for (const node of targetNodes) { + const labels = (node.labels as Record) ?? {}; + const selectorEntries = Object.entries(nodeSelector ?? {}); + const matches = selectorEntries.every(([k, v]) => labels[k] === v); + if (matches) { + wsRegistry.send(node.id, { + type: "config_changed", + pipelineId, + reason: "deploy", + }); + } + } + } + return { success: true, versionId: version.id, diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts new file mode 100644 index 00000000..ac6c6782 --- /dev/null +++ b/src/server/services/ws-auth.ts @@ -0,0 +1,96 @@ +import type { IncomingMessage } from "http"; +import { createHash } from "crypto"; +import { prisma } from "@/lib/prisma"; +import { extractBearerToken, verifyNodeToken } from "./agent-token"; + +/** Fast, non-reversible hash used as cache key instead of the raw plaintext + * token — avoids keeping credentials in process memory. */ +function tokenCacheKey(token: string): string { + return createHash("sha256").update(token).digest("hex"); +} + +const TOKEN_CACHE_TTL_MS = 30 * 60 * 1000; // 30 minutes +const TOKEN_CACHE_MAX_SIZE = 1000; + +interface CacheEntry { + nodeId: string; + environmentId: string; + cachedAt: number; +} + +/** Cache verified tokens to avoid O(n) bcrypt scan on every WS upgrade. + * Key: SHA-256 hash of token, Value: { nodeId, environmentId, cachedAt }. + * Entries expire after 30 minutes and the cache is capped at 1000 entries. */ +const tokenCache = new Map(); + +/** Remove expired entries. Called on each lookup to bound memory. */ +function evictStale(): void { + if (tokenCache.size <= TOKEN_CACHE_MAX_SIZE) return; + const now = Date.now(); + for (const [key, entry] of tokenCache) { + if (now - entry.cachedAt > TOKEN_CACHE_TTL_MS) { + tokenCache.delete(key); + } + } +} + +/** + * Authenticate a WebSocket upgrade request by verifying its Bearer token. + * + * Uses an in-memory cache so reconnects (same token) are O(1) instead of + * scanning all node hashes with bcrypt. + */ +export async function authenticateWsUpgrade( + req: IncomingMessage, +): Promise<{ nodeId: string; environmentId: string } | null> { + const authHeader = req.headers["authorization"]; + const token = extractBearerToken( + Array.isArray(authHeader) ? authHeader[0] : authHeader ?? null, + ); + if (!token) { + return null; + } + + // Fast path: check cache first (O(1) string lookup) + const cacheKey = tokenCacheKey(token); + const cached = tokenCache.get(cacheKey); + if (cached) { + // Evict if TTL expired + if (Date.now() - cached.cachedAt > TOKEN_CACHE_TTL_MS) { + tokenCache.delete(cacheKey); + } else { + // Verify the node still exists and the hash still matches (re-enrollment invalidates) + const node = await prisma.vectorNode.findUnique({ + where: { id: cached.nodeId }, + select: { nodeTokenHash: true }, + }); + if (node?.nodeTokenHash && await verifyNodeToken(token, node.nodeTokenHash)) { + return { nodeId: cached.nodeId, environmentId: cached.environmentId }; + } + // Cache stale — node deleted or re-enrolled + tokenCache.delete(cacheKey); + } + } + + // Slow path: scan all nodes with bcrypt + const nodes = await prisma.vectorNode.findMany({ + where: { nodeTokenHash: { not: null } }, + select: { id: true, environmentId: true, nodeTokenHash: true }, + }); + + for (const node of nodes) { + if (!node.nodeTokenHash) continue; + const valid = await verifyNodeToken(token, node.nodeTokenHash); + if (valid) { + evictStale(); + tokenCache.set(cacheKey, { + nodeId: node.id, + environmentId: node.environmentId, + cachedAt: Date.now(), + }); + return { nodeId: node.id, environmentId: node.environmentId }; + } + } + + return null; +} diff --git a/src/server/services/ws-registry.ts b/src/server/services/ws-registry.ts new file mode 100644 index 00000000..2c60e5bc --- /dev/null +++ b/src/server/services/ws-registry.ts @@ -0,0 +1,63 @@ +import type WebSocket from "ws"; +import type { PushMessage } from "./ws-types"; + +class WsRegistry { + private connections = new Map(); + + register(nodeId: string, ws: WebSocket): void { + const existing = this.connections.get(nodeId); + if (existing && existing.readyState === existing.OPEN) { + existing.close(1000, "replaced"); + } + this.connections.set(nodeId, ws); + } + + /** Remove a connection. If `ws` is provided, only remove if it matches the + * current registered socket — prevents a stale close handler from removing + * a newer reconnection. */ + unregister(nodeId: string, ws?: WebSocket): void { + if (ws) { + const current = this.connections.get(nodeId); + if (current !== ws) return; // stale socket — newer connection already registered + } + this.connections.delete(nodeId); + } + + send(nodeId: string, message: PushMessage): boolean { + const ws = this.connections.get(nodeId); + if (!ws || ws.readyState !== ws.OPEN) { + return false; + } + try { + ws.send(JSON.stringify(message)); + return true; + } catch { + return false; + } + } + + broadcast(nodeIds: string[], message: PushMessage): string[] { + const sent: string[] = []; + for (const nodeId of nodeIds) { + if (this.send(nodeId, message)) { + sent.push(nodeId); + } + } + return sent; + } + + isConnected(nodeId: string): boolean { + const ws = this.connections.get(nodeId); + return ws !== undefined && ws.readyState === ws.OPEN; + } + + get size(): number { + let count = 0; + for (const [, ws] of this.connections) { + if (ws.readyState === ws.OPEN) count++; + } + return count; + } +} + +export const wsRegistry = new WsRegistry(); diff --git a/src/server/services/ws-types.ts b/src/server/services/ws-types.ts new file mode 100644 index 00000000..3f5dac1c --- /dev/null +++ b/src/server/services/ws-types.ts @@ -0,0 +1,45 @@ +// src/server/services/ws-types.ts + +/** + * Server→Agent push message types sent over WebSocket. + * Each message has a `type` discriminator for client-side dispatch. + * + * Config changes use lightweight notifications (config_changed) that trigger + * an immediate re-poll, rather than carrying the full assembled config. + * This avoids duplicating secret/cert resolution logic from the config endpoint. + */ +export type PushMessage = + | ConfigChangedMessage + | SampleRequestMessage + | ActionMessage + | PollIntervalMessage; + +/** Notification that pipeline config has changed. Agent should re-poll immediately. */ +export interface ConfigChangedMessage { + type: "config_changed"; + /** Optional: which pipeline changed. If absent, agent re-polls all. */ + pipelineId?: string; + /** Reason for the change (deploy, undeploy, maintenance). For logging only. */ + reason?: string; +} + +export interface SampleRequestMessage { + type: "sample_request"; + requestId: string; + pipelineId: string; + componentKeys: string[]; + limit: number; +} + +export interface ActionMessage { + type: "action"; + action: "self_update" | "restart"; + targetVersion?: string; + downloadUrl?: string; + checksum?: string; +} + +export interface PollIntervalMessage { + type: "poll_interval"; + intervalMs: number; +}