diff --git a/.pnpm-approve-builds.json b/.pnpm-approve-builds.json index a0a4e58..4e8a04d 100644 --- a/.pnpm-approve-builds.json +++ b/.pnpm-approve-builds.json @@ -1 +1 @@ -["@prisma/engines","esbuild","prisma"] +["@prisma/engines","prisma"] diff --git a/agent/go.mod b/agent/go.mod index 92a8edf..a75086f 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -1,5 +1,3 @@ module github.com/TerrifiedBug/vectorflow/agent go 1.22 - -require github.com/gorilla/websocket v1.5.3 diff --git a/agent/go.sum b/agent/go.sum index 25a9fc4..e69de29 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -1,2 +0,0 @@ -github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= -github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index 55e800b..18df103 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -16,7 +16,7 @@ import ( "github.com/TerrifiedBug/vectorflow/agent/internal/config" "github.com/TerrifiedBug/vectorflow/agent/internal/sampler" "github.com/TerrifiedBug/vectorflow/agent/internal/supervisor" - "github.com/TerrifiedBug/vectorflow/agent/internal/ws" + "github.com/TerrifiedBug/vectorflow/agent/internal/push" ) var Version = "dev" @@ -34,8 +34,8 @@ type Agent struct { failedUpdateVersion string // skip retries for this version updateError string // report failure to server - wsClient *ws.Client - wsCh chan ws.PushMessage + pushClient *push.Client + pushCh chan push.PushMessage immediateHeartbeat *time.Timer immediateHeartbeatCh chan struct{} } @@ -91,20 +91,19 @@ func (a *Agent) Run() error { // Do first poll immediately a.pollAndApply() - // Start WebSocket if the server provided a URL - a.wsCh = make(chan ws.PushMessage, 16) + // Start SSE push client if the server provided a URL + a.pushCh = make(chan push.PushMessage, 16) a.immediateHeartbeatCh = make(chan struct{}, 1) - if wsURL := a.poller.WebSocketURL(); wsURL != "" { - a.wsClient = ws.New(wsURL, a.client.NodeToken(), func(msg ws.PushMessage) { - // Forward messages to the main goroutine via channel + if pushURL := a.poller.PushURL(); pushURL != "" { + a.pushClient = push.New(pushURL, a.client.NodeToken(), func(msg push.PushMessage) { select { - case a.wsCh <- msg: + case a.pushCh <- msg: default: - slog.Warn("ws: message channel full, dropping message", "type", msg.Type) + slog.Warn("push: message channel full, dropping message", "type", msg.Type) } }) - go a.wsClient.Connect() - slog.Info("websocket client started", "url", wsURL) + go a.pushClient.Connect() + slog.Info("push client started", "url", pushURL) } a.sendHeartbeat() @@ -114,8 +113,8 @@ func (a *Agent) Run() error { select { case <-ctx.Done(): slog.Info("shutting down all pipelines") - if a.wsClient != nil { - a.wsClient.Close() + if a.pushClient != nil { + a.pushClient.Close() } if a.immediateHeartbeat != nil { a.immediateHeartbeat.Stop() @@ -127,8 +126,8 @@ func (a *Agent) Run() error { a.pollAndApply() a.sendHeartbeat() currentInterval = a.maybeResetTicker(ticker, currentInterval) - case msg := <-a.wsCh: - a.handleWsMessage(msg, ticker) + case msg := <-a.pushCh: + a.handlePushMessage(msg, ticker) case <-a.immediateHeartbeatCh: a.sendHeartbeat() } @@ -312,18 +311,18 @@ func (a *Agent) processSampleRequests(requests []client.SampleRequestMsg) { } } -// handleWsMessage processes a push message from the WebSocket channel. +// handlePushMessage processes a push message from the SSE push channel. // MUST be called from the main goroutine (same goroutine as Run()'s select loop). -func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { +func (a *Agent) handlePushMessage(msg push.PushMessage, ticker *time.Ticker) { switch msg.Type { case "config_changed": - slog.Info("ws: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason) + slog.Info("push: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason) // Re-poll immediately to get the full assembled config a.pollAndApply() a.triggerImmediateHeartbeat() case "sample_request": - slog.Info("ws: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID) + slog.Info("push: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID) a.processSampleRequestsAndSend([]client.SampleRequestMsg{ { RequestID: msg.RequestID, @@ -334,7 +333,7 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { }) case "action": - slog.Info("ws: action received", "action", msg.Action) + slog.Info("push: action received", "action", msg.Action) switch msg.Action { case "self_update": a.handlePendingAction(&client.PendingAction{ @@ -345,22 +344,22 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { }) a.triggerImmediateHeartbeat() case "restart": - slog.Warn("ws: restart action not yet implemented, triggering re-poll instead") + slog.Warn("push: restart action not yet implemented, triggering re-poll instead") a.pollAndApply() a.triggerImmediateHeartbeat() default: - slog.Warn("ws: unknown action", "action", msg.Action) + slog.Warn("push: unknown action", "action", msg.Action) } case "poll_interval": if msg.IntervalMs > 0 { newInterval := time.Duration(msg.IntervalMs) * time.Millisecond ticker.Reset(newInterval) - slog.Info("ws: poll interval changed", "intervalMs", msg.IntervalMs) + slog.Info("push: poll interval changed", "intervalMs", msg.IntervalMs) } default: - slog.Warn("ws: unknown message type", "type", msg.Type) + slog.Warn("push: unknown message type", "type", msg.Type) } } @@ -382,7 +381,7 @@ func (a *Agent) triggerImmediateHeartbeat() { } // processSampleRequestsAndSend processes sample requests and sends results -// directly to the /api/agent/samples endpoint (used for WebSocket-triggered requests). +// directly to the /api/agent/samples endpoint (used for push-triggered requests). // Falls back to heartbeat delivery on HTTP failure. func (a *Agent) processSampleRequestsAndSend(requests []client.SampleRequestMsg) { statuses := a.supervisor.Statuses() diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index 68cd936..fec8137 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -30,7 +30,7 @@ type poller struct { sampleRequests []client.SampleRequestMsg pendingAction *client.PendingAction pollIntervalMs int // server-provided poll interval from last response - websocketUrl string + pushUrl string } func newPoller(cfg *config.Config, c configFetcher) *poller { @@ -194,9 +194,9 @@ func (p *poller) Poll() ([]PipelineAction, error) { // Store server-provided poll interval p.pollIntervalMs = resp.PollIntervalMs - // Store websocket URL for the agent to use - if resp.WebSocketURL != "" { - p.websocketUrl = resp.WebSocketURL + // Store push URL for the agent to use + if resp.PushURL != "" { + p.pushUrl = resp.PushURL } return actions, nil @@ -221,9 +221,9 @@ func (p *poller) PollIntervalMs() int { return p.pollIntervalMs } -// WebSocketURL returns the WebSocket URL from the last config response. -func (p *poller) WebSocketURL() string { +// PushURL returns the SSE push URL from the last config response. +func (p *poller) PushURL() string { p.mu.Lock() defer p.mu.Unlock() - return p.websocketUrl + return p.pushUrl } diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go index b06d5d8..5b3456a 100644 --- a/agent/internal/client/client.go +++ b/agent/internal/client/client.go @@ -29,7 +29,7 @@ func (c *Client) SetNodeToken(token string) { c.nodeToken = token } -// NodeToken returns the current node token for use by other packages (e.g., WebSocket auth). +// NodeToken returns the current node token for use by other packages (e.g., push auth). func (c *Client) NodeToken() string { return c.nodeToken } @@ -119,7 +119,7 @@ type ConfigResponse struct { SecretBackendConfig map[string]interface{} `json:"secretBackendConfig,omitempty"` SampleRequests []SampleRequestMsg `json:"sampleRequests,omitempty"` PendingAction *PendingAction `json:"pendingAction,omitempty"` - WebSocketURL string `json:"websocketUrl,omitempty"` + PushURL string `json:"pushUrl,omitempty"` } func (c *Client) GetConfig() (*ConfigResponse, error) { diff --git a/agent/internal/push/client.go b/agent/internal/push/client.go new file mode 100644 index 0000000..f23b724 --- /dev/null +++ b/agent/internal/push/client.go @@ -0,0 +1,173 @@ +package push + +import ( + "bufio" + "context" + "encoding/json" + "log/slog" + "net/http" + "strings" + "sync" + "time" +) + +const ( + maxBackoff = 30 * time.Second + maxBufferSize = 256 * 1024 // 256KB for large payloads +) + +// sseHTTPClient sets a response-header timeout so a stalled proxy doesn't block +// the connect goroutine indefinitely. Body reads remain context-driven. +var sseHTTPClient = &http.Client{ + Transport: &http.Transport{ + ResponseHeaderTimeout: 15 * time.Second, + }, +} + +// Client maintains a persistent SSE connection to the server push endpoint. +type Client struct { + url string + token string + onMessage func(PushMessage) + + mu sync.Mutex + cancel context.CancelFunc + done chan struct{} +} + +// New creates a new SSE push client. +func New(url, token string, onMessage func(PushMessage)) *Client { + return &Client{ + url: url, + token: token, + onMessage: onMessage, + done: make(chan struct{}), + } +} + +// Connect blocks and maintains a persistent SSE connection with exponential +// backoff reconnection. Call Close() to stop. +func (c *Client) Connect() { + ctx, cancel := context.WithCancel(context.Background()) + c.mu.Lock() + c.cancel = cancel + c.mu.Unlock() + + backoff := time.Second + + for { + start := time.Now() + err := c.stream(ctx) + if ctx.Err() != nil { + // Graceful shutdown + close(c.done) + return + } + // Reset backoff if connection was alive for a meaningful duration + if time.Since(start) > 5*time.Second { + backoff = time.Second + } + slog.Warn("push: connection lost, reconnecting", + "error", err, "backoff", backoff) + select { + case <-ctx.Done(): + close(c.done) + return + case <-time.After(backoff): + } + backoff = min(backoff*2, maxBackoff) + } +} + +// stream opens a single SSE connection and reads messages until error or cancel. +func (c *Client) stream(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, "GET", c.url, nil) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+c.token) + + resp, err := sseHTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return &httpError{StatusCode: resp.StatusCode} + } + + slog.Info("push: connected", "url", c.url) + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, maxBufferSize), maxBufferSize) + + var eventType string + var dataBuilder strings.Builder + + for scanner.Scan() { + line := scanner.Text() + + if line == "" { + // Empty line = dispatch event + if dataBuilder.Len() > 0 { + c.dispatch(eventType, dataBuilder.String()) + eventType = "" + dataBuilder.Reset() + } + continue + } + + if strings.HasPrefix(line, ":") { + // SSE comment (keepalive) — ignore + continue + } + + if strings.HasPrefix(line, "event: ") { + eventType = strings.TrimPrefix(line, "event: ") + continue + } + + if strings.HasPrefix(line, "data: ") { + if dataBuilder.Len() > 0 { + dataBuilder.WriteByte('\n') + } + dataBuilder.WriteString(strings.TrimPrefix(line, "data: ")) + continue + } + } + + if err := scanner.Err(); err != nil { + return err + } + return nil +} + +func (c *Client) dispatch(eventType, data string) { + var msg PushMessage + if err := json.Unmarshal([]byte(data), &msg); err != nil { + slog.Error("push: failed to parse message", "error", err, "event", eventType, "data", data) + return + } + c.onMessage(msg) +} + +// Close gracefully stops the SSE connection. +func (c *Client) Close() { + c.mu.Lock() + cancel := c.cancel + c.mu.Unlock() + + if cancel != nil { + cancel() + <-c.done + } +} + +type httpError struct { + StatusCode int +} + +func (e *httpError) Error() string { + return "push: unexpected status " + http.StatusText(e.StatusCode) +} diff --git a/agent/internal/push/client_test.go b/agent/internal/push/client_test.go new file mode 100644 index 0000000..cbedf18 --- /dev/null +++ b/agent/internal/push/client_test.go @@ -0,0 +1,173 @@ +package push + +import ( + "fmt" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" +) + +func TestPushMessageParsing(t *testing.T) { + tests := []struct { + name string + json string + wantType string + }{ + {"config_changed", `{"type":"config_changed","pipelineId":"p1","reason":"deploy"}`, "config_changed"}, + {"sample_request", `{"type":"sample_request","requestId":"r1","pipelineId":"p1","componentKeys":["k1"],"limit":10}`, "sample_request"}, + {"action", `{"type":"action","action":"self_update","targetVersion":"1.0.0"}`, "action"}, + {"poll_interval", `{"type":"poll_interval","intervalMs":5000}`, "poll_interval"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var received PushMessage + var wg sync.WaitGroup + wg.Add(1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + t.Fatal("expected flusher") + } + + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", tt.wantType, tt.json) + flusher.Flush() + + // Keep alive briefly for client to read + time.Sleep(100 * time.Millisecond) + })) + defer server.Close() + + client := New(server.URL, "test-token", func(msg PushMessage) { + received = msg + wg.Done() + }) + + go client.Connect() + wg.Wait() + client.Close() + + if received.Type != tt.wantType { + t.Errorf("got type %q, want %q", received.Type, tt.wantType) + } + }) + } +} + +func TestClientSendsAuthHeader(t *testing.T) { + var gotAuth string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(200) + fmt.Fprintf(w, ": connected\n\n") + time.Sleep(100 * time.Millisecond) + })) + defer server.Close() + + client := New(server.URL, "my-secret-token", func(msg PushMessage) {}) + go client.Connect() + time.Sleep(200 * time.Millisecond) + client.Close() + + if gotAuth != "Bearer my-secret-token" { + t.Errorf("got auth %q, want %q", gotAuth, "Bearer my-secret-token") + } +} + +func TestClientReconnectsAfterDisconnect(t *testing.T) { + var mu sync.Mutex + connectCount := 0 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + connectCount++ + count := connectCount + mu.Unlock() + + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + t.Fatal("expected flusher") + } + + fmt.Fprintf(w, ": connected\n\n") + flusher.Flush() + + if count == 1 { + // First connection: close immediately to trigger reconnect + return + } + + // Second connection: stay open + time.Sleep(2 * time.Second) + })) + defer server.Close() + + client := New(server.URL, "test-token", func(msg PushMessage) {}) + go client.Connect() + + // Wait for reconnect + time.Sleep(3 * time.Second) + client.Close() + + mu.Lock() + defer mu.Unlock() + if connectCount < 2 { + t.Errorf("expected at least 2 connections (reconnect), got %d", connectCount) + } +} + +func TestClientHandlesKeepalive(t *testing.T) { + var received PushMessage + var wg sync.WaitGroup + wg.Add(1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + t.Fatal("expected flusher") + } + + // Send keepalive comments before actual data + fmt.Fprintf(w, ": keepalive\n\n") + flusher.Flush() + fmt.Fprintf(w, ": keepalive\n\n") + flusher.Flush() + + // Then send real message + fmt.Fprintf(w, "event: config_changed\ndata: {\"type\":\"config_changed\",\"reason\":\"test\"}\n\n") + flusher.Flush() + + time.Sleep(100 * time.Millisecond) + })) + defer server.Close() + + client := New(server.URL, "test-token", func(msg PushMessage) { + received = msg + wg.Done() + }) + go client.Connect() + wg.Wait() + client.Close() + + if received.Type != "config_changed" { + t.Errorf("got type %q, want %q", received.Type, "config_changed") + } + if received.Reason != "test" { + t.Errorf("got reason %q, want %q", received.Reason, "test") + } +} diff --git a/agent/internal/ws/types.go b/agent/internal/push/types.go similarity index 69% rename from agent/internal/ws/types.go rename to agent/internal/push/types.go index a7a0feb..22070b8 100644 --- a/agent/internal/ws/types.go +++ b/agent/internal/push/types.go @@ -1,11 +1,8 @@ -package ws +package push -// PushMessage is the envelope for all server→agent push messages. -// The Type field determines which concrete fields are populated. -// -// Fields are shared across message types: -// - PipelineID: used by config_changed, sample_request -// - Checksum: used by action (self_update) +// PushMessage is a server→agent push message received over SSE. +// All fields are on a single struct with omitempty — the Type field +// discriminates which fields are populated. type PushMessage struct { Type string `json:"type"` diff --git a/agent/internal/ws/client.go b/agent/internal/ws/client.go deleted file mode 100644 index 817e490..0000000 --- a/agent/internal/ws/client.go +++ /dev/null @@ -1,143 +0,0 @@ -// agent/internal/ws/client.go -package ws - -import ( - "encoding/json" - "log/slog" - "net/http" - "sync" - "time" - - "github.com/gorilla/websocket" -) - -// Client manages a persistent WebSocket connection to the VectorFlow server. -type Client struct { - url string - token string - onMessage func(PushMessage) - - mu sync.Mutex - conn *websocket.Conn - done chan struct{} -} - -// New creates a WebSocket client. Call Connect() to start. -func New(url, token string, onMessage func(PushMessage)) *Client { - return &Client{ - url: url, - token: token, - onMessage: onMessage, - done: make(chan struct{}), - } -} - -// Connect establishes the WebSocket connection with exponential backoff retry. -// Blocks until Close() is called. After connecting, starts a read loop that -// calls onMessage for each received message. -func (c *Client) Connect() { - backoff := time.Second - maxBackoff := 30 * time.Second - - for { - select { - case <-c.done: - return - default: - } - - dialer := websocket.Dialer{ - HandshakeTimeout: 10 * time.Second, - } - header := http.Header{} - if c.token != "" { - header.Set("Authorization", "Bearer "+c.token) - } - - conn, _, err := dialer.Dial(c.url, header) - if err != nil { - slog.Warn("websocket connect failed, retrying", "url", c.url, "backoff", backoff, "error", err) - select { - case <-time.After(backoff): - case <-c.done: - return - } - backoff = backoff * 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - continue - } - - // Connected — reset backoff - backoff = time.Second - slog.Info("websocket connected", "url", c.url) - - c.mu.Lock() - c.conn = conn - c.mu.Unlock() - - // Set read deadline so silent network drops are detected. - // Reset on every pong. Ping interval is 30s + 10s timeout = 40s. - conn.SetReadDeadline(time.Now().Add(45 * time.Second)) - conn.SetPongHandler(func(string) error { - conn.SetReadDeadline(time.Now().Add(45 * time.Second)) - return nil - }) - - // Read loop — blocks until connection drops - c.readLoop(conn) - - c.mu.Lock() - c.conn = nil - c.mu.Unlock() - - slog.Warn("websocket disconnected, reconnecting", "backoff", backoff) - } -} - -func (c *Client) readLoop(conn *websocket.Conn) { - for { - _, data, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { - slog.Warn("websocket read error", "error", err) - } - return - } - - // Reset read deadline on any received message - conn.SetReadDeadline(time.Now().Add(45 * time.Second)) - - var msg PushMessage - if err := json.Unmarshal(data, &msg); err != nil { - slog.Warn("websocket message parse error", "error", err, "data", string(data)) - continue - } - - slog.Debug("websocket message received", "type", msg.Type) - c.onMessage(msg) - } -} - -// Close gracefully shuts down the WebSocket connection. -func (c *Client) Close() { - select { - case <-c.done: - return // already closed - default: - close(c.done) - } - - c.mu.Lock() - conn := c.conn - c.mu.Unlock() - - if conn != nil { - conn.WriteMessage( - websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), - ) - conn.Close() - } -} diff --git a/agent/internal/ws/client_test.go b/agent/internal/ws/client_test.go deleted file mode 100644 index e9afe01..0000000 --- a/agent/internal/ws/client_test.go +++ /dev/null @@ -1,211 +0,0 @@ -// agent/internal/ws/client_test.go -package ws - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "strings" - "sync" - "testing" - "time" - - "github.com/gorilla/websocket" -) - -func TestPushMessageParsing(t *testing.T) { - tests := []struct { - name string - json string - wantType string - }{ - { - name: "config_changed", - json: `{"type":"config_changed","pipelineId":"p1","reason":"deploy"}`, - wantType: "config_changed", - }, - { - name: "config_changed_no_pipeline", - json: `{"type":"config_changed","reason":"maintenance_on"}`, - wantType: "config_changed", - }, - { - name: "sample_request", - json: `{"type":"sample_request","requestId":"r1","pipelineId":"p1","componentKeys":["source_in"],"limit":5}`, - wantType: "sample_request", - }, - { - name: "action_self_update", - json: `{"type":"action","action":"self_update","targetVersion":"v1.2.3","downloadUrl":"https://example.com/agent","checksum":"sha256:abc"}`, - wantType: "action", - }, - { - name: "poll_interval", - json: `{"type":"poll_interval","intervalMs":5000}`, - wantType: "poll_interval", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var msg PushMessage - if err := json.Unmarshal([]byte(tt.json), &msg); err != nil { - t.Fatalf("unmarshal error: %v", err) - } - if msg.Type != tt.wantType { - t.Errorf("got type %q, want %q", msg.Type, tt.wantType) - } - }) - } -} - -func TestClientConnectsAndReceivesMessages(t *testing.T) { - upgrader := websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { return true }, - } - - var serverConn *websocket.Conn - var mu sync.Mutex - connected := make(chan struct{}, 1) - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Verify auth header - auth := r.Header.Get("Authorization") - if auth != "Bearer test-token" { - http.Error(w, "unauthorized", 401) - return - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - t.Logf("upgrade error: %v", err) - return - } - mu.Lock() - serverConn = conn - mu.Unlock() - select { - case connected <- struct{}{}: - default: - } - })) - defer server.Close() - - wsURL := "ws" + strings.TrimPrefix(server.URL, "http") - - var received []PushMessage - var recvMu sync.Mutex - msgCh := make(chan struct{}, 1) - - client := New(wsURL, "test-token", func(msg PushMessage) { - recvMu.Lock() - received = append(received, msg) - recvMu.Unlock() - select { - case msgCh <- struct{}{}: - default: - } - }) - - go client.Connect() - defer client.Close() - - // Wait for connection (channel-based, not sleep) - select { - case <-connected: - case <-time.After(5 * time.Second): - t.Fatal("timed out waiting for connection") - } - - // Send a message from server - mu.Lock() - conn := serverConn - mu.Unlock() - - if conn == nil { - t.Fatal("server did not receive connection") - } - - msg := PushMessage{Type: "config_changed", PipelineID: "p1", Reason: "deploy"} - data, _ := json.Marshal(msg) - if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { - t.Fatalf("write error: %v", err) - } - - // Wait for message to be received (channel-based) - select { - case <-msgCh: - case <-time.After(5 * time.Second): - t.Fatal("timed out waiting for message") - } - - recvMu.Lock() - defer recvMu.Unlock() - if len(received) != 1 { - t.Fatalf("expected 1 message, got %d", len(received)) - } - if received[0].Type != "config_changed" { - t.Errorf("got type %q, want config_changed", received[0].Type) - } - if received[0].PipelineID != "p1" { - t.Errorf("got pipelineId %q, want p1", received[0].PipelineID) - } -} - -func TestClientReconnectsAfterDisconnect(t *testing.T) { - upgrader := websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { return true }, - } - - var connections int - var connMu sync.Mutex - reconnected := make(chan struct{}, 1) - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return - } - connMu.Lock() - connections++ - count := connections - connMu.Unlock() - - if count == 1 { - // Close first connection to trigger reconnect - time.Sleep(100 * time.Millisecond) - conn.Close() - } else { - // Signal reconnect happened - select { - case reconnected <- struct{}{}: - default: - } - // Keep second connection open - for { - if _, _, err := conn.ReadMessage(); err != nil { - return - } - } - } - })) - defer server.Close() - - wsURL := "ws" + strings.TrimPrefix(server.URL, "http") - client := New(wsURL, "", func(msg PushMessage) {}) - go client.Connect() - defer client.Close() - - // Wait for reconnect (channel-based, not sleep) - select { - case <-reconnected: - case <-time.After(10 * time.Second): - t.Fatal("timed out waiting for reconnect") - } - - connMu.Lock() - defer connMu.Unlock() - if connections < 2 { - t.Errorf("expected at least 2 connections (reconnect), got %d", connections) - } -} diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index c579d9e..d4b72cd 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -31,19 +31,8 @@ COPY prisma.config.ts ./prisma.config.ts RUN pnpm exec prisma generate COPY src ./src COPY public ./public -COPY server.ts ./server.ts RUN --mount=type=cache,target=/app/.next/cache \ pnpm build -# Preserve the original standalone server.js (with inlined config and static file -# serving), then bundle our WebSocket wrapper as the new server.js entry point. -# The wrapper monkey-patches http.createServer to intercept the server instance, -# then loads next-server.js which runs the standard Next.js standalone startup. -# Only `next` stays external (in standalone node_modules); all other deps are bundled. -RUN mv .next/standalone/server.js .next/standalone/next-server.js && \ - pnpm exec esbuild server.ts --bundle --platform=node --target=node22 \ - --outfile=.next/standalone/server.js \ - --alias:@=./src \ - --external:next --external:'next/*' --external:./next-server.js # Save Prisma client version for the runner stage (standalone strips node_modules) RUN node -e "console.log(require('./node_modules/@prisma/client/package.json').version)" > /tmp/prisma-version diff --git a/docs/public/getting-started/deploy-server.md b/docs/public/getting-started/deploy-server.md index 716f1e5..14bbe43 100644 --- a/docs/public/getting-started/deploy-server.md +++ b/docs/public/getting-started/deploy-server.md @@ -248,10 +248,10 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; - # WebSocket support (live metrics) + # SSE streaming support (agent push channel) proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; + proxy_buffering off; + proxy_cache off; } } ``` diff --git a/package.json b/package.json index 514fa5f..19c4604 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "private": true, "packageManager": "pnpm@10.13.1", "scripts": { - "dev": "tsx server.ts", + "dev": "next dev", "build": "next build", "start": "next start", "lint": "eslint", @@ -50,14 +50,12 @@ "sonner": "^2.0.7", "superjson": "^2.2.6", "tailwind-merge": "^3.5.0", - "ws": "^8.19.0", "zod": "^4.3.6", "zustand": "^5.0.11" }, "pnpm": { "onlyBuiltDependencies": [ "@prisma/engines", - "esbuild", "prisma" ], "overrides": { @@ -75,14 +73,11 @@ "@types/nodemailer": "^7.0.11", "@types/react": "^19", "@types/react-dom": "^19", - "@types/ws": "^8.18.1", - "esbuild": "^0.27.3", "eslint": "^9", "eslint-config-next": "16.1.6", "prisma": "^7.4.2", "shadcn": "^3.8.5", "tailwindcss": "^4", - "tsx": "^4.21.0", "tw-animate-css": "^1.4.0", "typescript": "^5" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e60f1c6..1ff01a7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -129,9 +129,6 @@ importers: tailwind-merge: specifier: ^3.5.0 version: 3.5.0 - ws: - specifier: ^8.19.0 - version: 8.19.0 zod: specifier: ^4.3.6 version: 4.3.6 @@ -166,12 +163,6 @@ importers: '@types/react-dom': specifier: ^19 version: 19.2.3(@types/react@19.2.14) - '@types/ws': - specifier: ^8.18.1 - version: 8.18.1 - esbuild: - specifier: ^0.27.3 - version: 0.27.3 eslint: specifier: ^9 version: 9.39.3(jiti@2.6.1) @@ -187,9 +178,6 @@ importers: tailwindcss: specifier: ^4 version: 4.2.1 - tsx: - specifier: ^4.21.0 - version: 4.21.0 tw-animate-css: specifier: ^1.4.0 version: 1.4.0 @@ -424,162 +412,6 @@ packages: '@emnapi/wasi-threads@1.1.0': resolution: {integrity: sha512-WI0DdZ8xFSbgMjR1sFsKABJ/C5OnRrjT06JXbZKexJGrDuPTzZdDYfFlsgcCXCyf+suG5QU2e/y1Wo2V/OapLQ==} - '@esbuild/aix-ppc64@0.27.3': - resolution: {integrity: sha512-9fJMTNFTWZMh5qwrBItuziu834eOCUcEqymSH7pY+zoMVEZg3gcPuBNxH1EvfVYe9h0x/Ptw8KBzv7qxb7l8dg==} - engines: {node: '>=18'} - cpu: [ppc64] - os: [aix] - - '@esbuild/android-arm64@0.27.3': - resolution: {integrity: sha512-YdghPYUmj/FX2SYKJ0OZxf+iaKgMsKHVPF1MAq/P8WirnSpCStzKJFjOjzsW0QQ7oIAiccHdcqjbHmJxRb/dmg==} - engines: {node: '>=18'} - cpu: [arm64] - os: [android] - - '@esbuild/android-arm@0.27.3': - resolution: {integrity: sha512-i5D1hPY7GIQmXlXhs2w8AWHhenb00+GxjxRncS2ZM7YNVGNfaMxgzSGuO8o8SJzRc/oZwU2bcScvVERk03QhzA==} - engines: {node: '>=18'} - cpu: [arm] - os: [android] - - '@esbuild/android-x64@0.27.3': - resolution: {integrity: sha512-IN/0BNTkHtk8lkOM8JWAYFg4ORxBkZQf9zXiEOfERX/CzxW3Vg1ewAhU7QSWQpVIzTW+b8Xy+lGzdYXV6UZObQ==} - engines: {node: '>=18'} - cpu: [x64] - os: [android] - - '@esbuild/darwin-arm64@0.27.3': - resolution: {integrity: sha512-Re491k7ByTVRy0t3EKWajdLIr0gz2kKKfzafkth4Q8A5n1xTHrkqZgLLjFEHVD+AXdUGgQMq+Godfq45mGpCKg==} - engines: {node: '>=18'} - cpu: [arm64] - os: [darwin] - - '@esbuild/darwin-x64@0.27.3': - resolution: {integrity: sha512-vHk/hA7/1AckjGzRqi6wbo+jaShzRowYip6rt6q7VYEDX4LEy1pZfDpdxCBnGtl+A5zq8iXDcyuxwtv3hNtHFg==} - engines: {node: '>=18'} - cpu: [x64] - os: [darwin] - - '@esbuild/freebsd-arm64@0.27.3': - resolution: {integrity: sha512-ipTYM2fjt3kQAYOvo6vcxJx3nBYAzPjgTCk7QEgZG8AUO3ydUhvelmhrbOheMnGOlaSFUoHXB6un+A7q4ygY9w==} - engines: {node: '>=18'} - cpu: [arm64] - os: [freebsd] - - '@esbuild/freebsd-x64@0.27.3': - resolution: {integrity: sha512-dDk0X87T7mI6U3K9VjWtHOXqwAMJBNN2r7bejDsc+j03SEjtD9HrOl8gVFByeM0aJksoUuUVU9TBaZa2rgj0oA==} - engines: {node: '>=18'} - cpu: [x64] - os: [freebsd] - - '@esbuild/linux-arm64@0.27.3': - resolution: {integrity: sha512-sZOuFz/xWnZ4KH3YfFrKCf1WyPZHakVzTiqji3WDc0BCl2kBwiJLCXpzLzUBLgmp4veFZdvN5ChW4Eq/8Fc2Fg==} - engines: {node: '>=18'} - cpu: [arm64] - os: [linux] - - '@esbuild/linux-arm@0.27.3': - resolution: {integrity: sha512-s6nPv2QkSupJwLYyfS+gwdirm0ukyTFNl3KTgZEAiJDd+iHZcbTPPcWCcRYH+WlNbwChgH2QkE9NSlNrMT8Gfw==} - engines: {node: '>=18'} - cpu: [arm] - os: [linux] - - '@esbuild/linux-ia32@0.27.3': - resolution: {integrity: sha512-yGlQYjdxtLdh0a3jHjuwOrxQjOZYD/C9PfdbgJJF3TIZWnm/tMd/RcNiLngiu4iwcBAOezdnSLAwQDPqTmtTYg==} - engines: {node: '>=18'} - cpu: [ia32] - os: [linux] - - '@esbuild/linux-loong64@0.27.3': - resolution: {integrity: sha512-WO60Sn8ly3gtzhyjATDgieJNet/KqsDlX5nRC5Y3oTFcS1l0KWba+SEa9Ja1GfDqSF1z6hif/SkpQJbL63cgOA==} - engines: {node: '>=18'} - cpu: [loong64] - os: [linux] - - '@esbuild/linux-mips64el@0.27.3': - resolution: {integrity: sha512-APsymYA6sGcZ4pD6k+UxbDjOFSvPWyZhjaiPyl/f79xKxwTnrn5QUnXR5prvetuaSMsb4jgeHewIDCIWljrSxw==} - engines: {node: '>=18'} - cpu: [mips64el] - os: [linux] - - '@esbuild/linux-ppc64@0.27.3': - resolution: {integrity: sha512-eizBnTeBefojtDb9nSh4vvVQ3V9Qf9Df01PfawPcRzJH4gFSgrObw+LveUyDoKU3kxi5+9RJTCWlj4FjYXVPEA==} - engines: {node: '>=18'} - cpu: [ppc64] - os: [linux] - - '@esbuild/linux-riscv64@0.27.3': - resolution: {integrity: sha512-3Emwh0r5wmfm3ssTWRQSyVhbOHvqegUDRd0WhmXKX2mkHJe1SFCMJhagUleMq+Uci34wLSipf8Lagt4LlpRFWQ==} - engines: {node: '>=18'} - cpu: [riscv64] - os: [linux] - - '@esbuild/linux-s390x@0.27.3': - resolution: {integrity: sha512-pBHUx9LzXWBc7MFIEEL0yD/ZVtNgLytvx60gES28GcWMqil8ElCYR4kvbV2BDqsHOvVDRrOxGySBM9Fcv744hw==} - engines: {node: '>=18'} - cpu: [s390x] - os: [linux] - - '@esbuild/linux-x64@0.27.3': - resolution: {integrity: sha512-Czi8yzXUWIQYAtL/2y6vogER8pvcsOsk5cpwL4Gk5nJqH5UZiVByIY8Eorm5R13gq+DQKYg0+JyQoytLQas4dA==} - engines: {node: '>=18'} - cpu: [x64] - os: [linux] - - '@esbuild/netbsd-arm64@0.27.3': - resolution: {integrity: sha512-sDpk0RgmTCR/5HguIZa9n9u+HVKf40fbEUt+iTzSnCaGvY9kFP0YKBWZtJaraonFnqef5SlJ8/TiPAxzyS+UoA==} - engines: {node: '>=18'} - cpu: [arm64] - os: [netbsd] - - '@esbuild/netbsd-x64@0.27.3': - resolution: {integrity: sha512-P14lFKJl/DdaE00LItAukUdZO5iqNH7+PjoBm+fLQjtxfcfFE20Xf5CrLsmZdq5LFFZzb5JMZ9grUwvtVYzjiA==} - engines: {node: '>=18'} - cpu: [x64] - os: [netbsd] - - '@esbuild/openbsd-arm64@0.27.3': - resolution: {integrity: sha512-AIcMP77AvirGbRl/UZFTq5hjXK+2wC7qFRGoHSDrZ5v5b8DK/GYpXW3CPRL53NkvDqb9D+alBiC/dV0Fb7eJcw==} - engines: {node: '>=18'} - cpu: [arm64] - os: [openbsd] - - '@esbuild/openbsd-x64@0.27.3': - resolution: {integrity: sha512-DnW2sRrBzA+YnE70LKqnM3P+z8vehfJWHXECbwBmH/CU51z6FiqTQTHFenPlHmo3a8UgpLyH3PT+87OViOh1AQ==} - engines: {node: '>=18'} - cpu: [x64] - os: [openbsd] - - '@esbuild/openharmony-arm64@0.27.3': - resolution: {integrity: sha512-NinAEgr/etERPTsZJ7aEZQvvg/A6IsZG/LgZy+81wON2huV7SrK3e63dU0XhyZP4RKGyTm7aOgmQk0bGp0fy2g==} - engines: {node: '>=18'} - cpu: [arm64] - os: [openharmony] - - '@esbuild/sunos-x64@0.27.3': - resolution: {integrity: sha512-PanZ+nEz+eWoBJ8/f8HKxTTD172SKwdXebZ0ndd953gt1HRBbhMsaNqjTyYLGLPdoWHy4zLU7bDVJztF5f3BHA==} - engines: {node: '>=18'} - cpu: [x64] - os: [sunos] - - '@esbuild/win32-arm64@0.27.3': - resolution: {integrity: sha512-B2t59lWWYrbRDw/tjiWOuzSsFh1Y/E95ofKz7rIVYSQkUYBjfSgf6oeYPNWHToFRr2zx52JKApIcAS/D5TUBnA==} - engines: {node: '>=18'} - cpu: [arm64] - os: [win32] - - '@esbuild/win32-ia32@0.27.3': - resolution: {integrity: sha512-QLKSFeXNS8+tHW7tZpMtjlNb7HKau0QDpwm49u0vUp9y1WOF+PEzkU84y9GqYaAVW8aH8f3GcBck26jh54cX4Q==} - engines: {node: '>=18'} - cpu: [ia32] - os: [win32] - - '@esbuild/win32-x64@0.27.3': - resolution: {integrity: sha512-4uJGhsxuptu3OcpVAzli+/gWusVGwZZHTlS63hh++ehExkVT8SgiEf7/uC/PclrPPkLhZqGgCTjd0VWLo6xMqA==} - engines: {node: '>=18'} - cpu: [x64] - os: [win32] - '@eslint-community/eslint-utils@4.9.1': resolution: {integrity: sha512-phrYmNiYppR7znFEdqgfWHXR6NCkZEK7hwWDHZUjit/2/U0r6XvkDl0SYnoM51Hq7FhCGdLDT6zxCCOY1hexsQ==} engines: {node: ^12.22.0 || ^14.17.0 || >=16.0.0} @@ -1959,9 +1791,6 @@ packages: '@types/validate-npm-package-name@4.0.2': resolution: {integrity: sha512-lrpDziQipxCEeK5kWxvljWYhUvOiB2A9izZd9B2AFarYAkqZshb4lPbRs7zKEic6eGtH8V/2qJW+dPp9OtF6bw==} - '@types/ws@8.18.1': - resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} - '@typescript-eslint/eslint-plugin@8.56.1': resolution: {integrity: sha512-Jz9ZztpB37dNC+HU2HI28Bs9QXpzCz+y/twHOwhyrIRdbuVDxSytJNDl6z/aAKlaRIwC7y8wJdkBv7FxYGgi0A==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} @@ -2728,11 +2557,6 @@ packages: resolution: {integrity: sha512-w+5mJ3GuFL+NjVtJlvydShqE1eN3h3PbI7/5LAsYJP/2qtuMXjfL2LpHSRqo4b4eSF5K/DH1JXKUAHSB2UW50g==} engines: {node: '>= 0.4'} - esbuild@0.27.3: - resolution: {integrity: sha512-8VwMnyGCONIs6cWue2IdpHxHnAjzxnw2Zr7MkVxB2vjmQ2ivqGFb4LEG3SMnv0Gb2F/G/2yA8zUaiL1gywDCCg==} - engines: {node: '>=18'} - hasBin: true - escalade@3.2.0: resolution: {integrity: sha512-WUj2qlxaQtO4g6Pq5c29GTcWGDyd8itL8zTlipgECz3JesAiiOKotd8JU6otB3PACgG6xkJUyVhboMS+bje/jA==} engines: {node: '>=6'} @@ -3007,11 +2831,6 @@ packages: resolution: {integrity: sha512-VWSRii4t0AFm6ixFFmLLx1t7wS1gh+ckoa84aOeapGum0h+EZd1EhEumSB+ZdDLnEPuucsVB9oB7cxJHap6Afg==} engines: {node: '>=14.14'} - fsevents@2.3.3: - resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==} - engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} - os: [darwin] - function-bind@1.1.2: resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==} @@ -4587,11 +4406,6 @@ packages: tslib@2.8.1: resolution: {integrity: sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==} - tsx@4.21.0: - resolution: {integrity: sha512-5C1sg4USs1lfG0GFb2RLXsdpXqBSEhAaA/0kPL01wxzpMqLILNxIxIOKiILz+cdg/pLnOUxFYOR5yhHU666wbw==} - engines: {node: '>=18.0.0'} - hasBin: true - tw-animate-css@1.4.0: resolution: {integrity: sha512-7bziOlRqH0hJx80h/3mbicLW7o8qLsH5+RaLR2t+OHM3D0JlWGODQKQ4cxbK7WlvmUxpcj6Kgu6EKqjrGFe3QQ==} @@ -4764,18 +4578,6 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} - ws@8.19.0: - resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==} - engines: {node: '>=10.0.0'} - peerDependencies: - bufferutil: ^4.0.1 - utf-8-validate: '>=5.0.2' - peerDependenciesMeta: - bufferutil: - optional: true - utf-8-validate: - optional: true - wsl-utils@0.3.1: resolution: {integrity: sha512-g/eziiSUNBSsdDJtCLB8bdYEUMj4jR7AGeUo96p/3dTafgjHhpF4RiCFPiRILwjQoDXx5MqkBr4fwWtR3Ky4Wg==} engines: {node: '>=20'} @@ -5166,84 +4968,6 @@ snapshots: tslib: 2.8.1 optional: true - '@esbuild/aix-ppc64@0.27.3': - optional: true - - '@esbuild/android-arm64@0.27.3': - optional: true - - '@esbuild/android-arm@0.27.3': - optional: true - - '@esbuild/android-x64@0.27.3': - optional: true - - '@esbuild/darwin-arm64@0.27.3': - optional: true - - '@esbuild/darwin-x64@0.27.3': - optional: true - - '@esbuild/freebsd-arm64@0.27.3': - optional: true - - '@esbuild/freebsd-x64@0.27.3': - optional: true - - '@esbuild/linux-arm64@0.27.3': - optional: true - - '@esbuild/linux-arm@0.27.3': - optional: true - - '@esbuild/linux-ia32@0.27.3': - optional: true - - '@esbuild/linux-loong64@0.27.3': - optional: true - - '@esbuild/linux-mips64el@0.27.3': - optional: true - - '@esbuild/linux-ppc64@0.27.3': - optional: true - - '@esbuild/linux-riscv64@0.27.3': - optional: true - - '@esbuild/linux-s390x@0.27.3': - optional: true - - '@esbuild/linux-x64@0.27.3': - optional: true - - '@esbuild/netbsd-arm64@0.27.3': - optional: true - - '@esbuild/netbsd-x64@0.27.3': - optional: true - - '@esbuild/openbsd-arm64@0.27.3': - optional: true - - '@esbuild/openbsd-x64@0.27.3': - optional: true - - '@esbuild/openharmony-arm64@0.27.3': - optional: true - - '@esbuild/sunos-x64@0.27.3': - optional: true - - '@esbuild/win32-arm64@0.27.3': - optional: true - - '@esbuild/win32-ia32@0.27.3': - optional: true - - '@esbuild/win32-x64@0.27.3': - optional: true - '@eslint-community/eslint-utils@4.9.1(eslint@9.39.3(jiti@2.6.1))': dependencies: eslint: 9.39.3(jiti@2.6.1) @@ -6632,10 +6356,6 @@ snapshots: '@types/validate-npm-package-name@4.0.2': {} - '@types/ws@8.18.1': - dependencies: - '@types/node': 20.19.35 - '@typescript-eslint/eslint-plugin@8.56.1(@typescript-eslint/parser@8.56.1(eslint@9.39.3(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.3(jiti@2.6.1))(typescript@5.9.3)': dependencies: '@eslint-community/regexpp': 4.12.2 @@ -7460,35 +7180,6 @@ snapshots: is-date-object: 1.1.0 is-symbol: 1.1.1 - esbuild@0.27.3: - optionalDependencies: - '@esbuild/aix-ppc64': 0.27.3 - '@esbuild/android-arm': 0.27.3 - '@esbuild/android-arm64': 0.27.3 - '@esbuild/android-x64': 0.27.3 - '@esbuild/darwin-arm64': 0.27.3 - '@esbuild/darwin-x64': 0.27.3 - '@esbuild/freebsd-arm64': 0.27.3 - '@esbuild/freebsd-x64': 0.27.3 - '@esbuild/linux-arm': 0.27.3 - '@esbuild/linux-arm64': 0.27.3 - '@esbuild/linux-ia32': 0.27.3 - '@esbuild/linux-loong64': 0.27.3 - '@esbuild/linux-mips64el': 0.27.3 - '@esbuild/linux-ppc64': 0.27.3 - '@esbuild/linux-riscv64': 0.27.3 - '@esbuild/linux-s390x': 0.27.3 - '@esbuild/linux-x64': 0.27.3 - '@esbuild/netbsd-arm64': 0.27.3 - '@esbuild/netbsd-x64': 0.27.3 - '@esbuild/openbsd-arm64': 0.27.3 - '@esbuild/openbsd-x64': 0.27.3 - '@esbuild/openharmony-arm64': 0.27.3 - '@esbuild/sunos-x64': 0.27.3 - '@esbuild/win32-arm64': 0.27.3 - '@esbuild/win32-ia32': 0.27.3 - '@esbuild/win32-x64': 0.27.3 - escalade@3.2.0: {} escape-html@1.0.3: {} @@ -7887,9 +7578,6 @@ snapshots: jsonfile: 6.2.0 universalify: 2.0.1 - fsevents@2.3.3: - optional: true - function-bind@1.1.2: {} function.prototype.name@1.1.8: @@ -9541,13 +9229,6 @@ snapshots: tslib@2.8.1: {} - tsx@4.21.0: - dependencies: - esbuild: 0.27.3 - get-tsconfig: 4.13.6 - optionalDependencies: - fsevents: 2.3.3 - tw-animate-css@1.4.0: {} type-check@0.4.0: @@ -9776,8 +9457,6 @@ snapshots: wrappy@1.0.2: {} - ws@8.19.0: {} - wsl-utils@0.3.1: dependencies: is-wsl: 3.1.1 diff --git a/server.ts b/server.ts deleted file mode 100644 index fb10b8f..0000000 --- a/server.ts +++ /dev/null @@ -1,150 +0,0 @@ -/** - * WebSocket wrapper for Next.js standalone server. - * - * In production (Docker), the Dockerfile renames the default standalone server.js - * to next-server.js, then bundles this file as server.js. It intercepts the HTTP - * server that Next.js creates and layers WebSocket upgrade handling on top. - * - * In dev mode (`pnpm dev`), this uses the standard `next()` custom server API. - */ -import { createServer, type IncomingMessage } from "http"; -import type { Socket } from "net"; -import { parse } from "url"; -import { type WebSocket, WebSocketServer } from "ws"; - -const dev = process.env.NODE_ENV !== "production"; -const hostname = process.env.HOSTNAME || "0.0.0.0"; -const port = parseInt(process.env.PORT || "3000", 10); - -const PING_INTERVAL_MS = 30_000; -const PONG_TIMEOUT_MS = 10_000; -const WS_PATH = "/api/agent/ws"; - -// Lazy-loaded auth and registry (bundled by esbuild in production) -let _authenticateWsUpgrade: typeof import("./src/server/services/ws-auth").authenticateWsUpgrade; -let _wsRegistry: typeof import("./src/server/services/ws-registry").wsRegistry; - -async function getWsDeps() { - if (!_authenticateWsUpgrade) { - const auth = await import("./src/server/services/ws-auth"); - const reg = await import("./src/server/services/ws-registry"); - _authenticateWsUpgrade = auth.authenticateWsUpgrade; - _wsRegistry = reg.wsRegistry; - } - return { authenticateWsUpgrade: _authenticateWsUpgrade, wsRegistry: _wsRegistry }; -} - -function setupWebSocket(server: ReturnType) { - const wss = new WebSocketServer({ noServer: true }); - - // Use prependListener so our handler runs before Next.js's upgrade handler. - // We only handle /api/agent/ws; all other upgrade requests fall through to Next.js. - server.prependListener("upgrade", async (req: IncomingMessage, socket: Socket, head: Buffer) => { - const { pathname } = parse(req.url ?? "/", true); - if (pathname !== WS_PATH) return; - - // Remove other upgrade listeners for this request to prevent Next.js - // from also trying to handle the already-consumed socket. - socket.removeAllListeners("close"); - - const { authenticateWsUpgrade } = await getWsDeps(); - const agent = await authenticateWsUpgrade(req); - if (!agent) { - socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); - socket.destroy(); - return; - } - - wss.handleUpgrade(req, socket, head, (ws) => { - wss.emit("connection", ws, req, agent); - }); - }); - - wss.on("connection", (ws: WebSocket, _req: IncomingMessage, agent: { nodeId: string; environmentId: string }) => { - const { nodeId } = agent; - console.log(`[ws] agent connected: ${nodeId}`); - _wsRegistry.register(nodeId, ws); - - let alive = true; - let pongTimer: ReturnType | null = null; - - ws.on("pong", () => { - alive = true; - if (pongTimer) { - clearTimeout(pongTimer); - pongTimer = null; - } - }); - - const pingInterval = setInterval(() => { - if (!alive) { - console.log(`[ws] agent ${nodeId} did not respond to ping, terminating`); - ws.terminate(); - return; - } - alive = false; - ws.ping(); - pongTimer = setTimeout(() => { - if (!alive) { - console.log(`[ws] agent ${nodeId} pong timeout (${PONG_TIMEOUT_MS}ms), terminating`); - ws.terminate(); - } - }, PONG_TIMEOUT_MS); - }, PING_INTERVAL_MS); - - ws.on("close", () => { - console.log(`[ws] agent disconnected: ${nodeId}`); - clearInterval(pingInterval); - if (pongTimer) clearTimeout(pongTimer); - _wsRegistry.unregister(nodeId, ws); - }); - - ws.on("error", (err: Error) => { - console.error(`[ws] error for agent ${nodeId}:`, err.message); - clearInterval(pingInterval); - if (pongTimer) clearTimeout(pongTimer); - _wsRegistry.unregister(nodeId, ws); - }); - }); -} - -if (dev) { - // ── Dev mode: standard Next.js custom server ── - import("next").then(({ default: next }) => { - const app = next({ dev, hostname, port }); - const handle = app.getRequestHandler(); - - app.prepare().then(() => { - const server = createServer((req, res) => { - const parsedUrl = parse(req.url ?? "/", true); - handle(req, res, parsedUrl); - }); - - setupWebSocket(server); - - server.listen(port, hostname, () => { - console.log(`> Ready on http://${hostname}:${port}`); - }); - }); - }); -} else { - // ── Production: wrap the standalone Next.js server ── - // Monkey-patch http.createServer to intercept the server instance that - // Next.js's standalone server.js (renamed to next-server.js) creates. - // eslint-disable-next-line @typescript-eslint/no-require-imports - const http = require("http") as typeof import("http"); - const origCreateServer = http.createServer.bind(http); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (http as any).createServer = function (...args: any[]) { - const server = origCreateServer(...args); - setupWebSocket(server); - return server; - }; - - // Load the original standalone server (with inlined config, static file - // serving, and all Next.js initialization). It calls http.createServer - // internally, which our patch intercepts. - // eslint-disable-next-line @typescript-eslint/no-require-imports - require("./next-server.js"); -} diff --git a/src/app/(dashboard)/fleet/page.tsx b/src/app/(dashboard)/fleet/page.tsx index 811ea6a..592195f 100644 --- a/src/app/(dashboard)/fleet/page.tsx +++ b/src/app/(dashboard)/fleet/page.tsx @@ -231,15 +231,15 @@ export default function FleetPage() { {nodeStatusLabel(node.status)} )} - {node.wsConnected && ( + {node.pushConnected && ( - WS + Live - WebSocket connected — real-time push enabled + SSE connected — real-time push enabled )} diff --git a/src/app/api/agent/config/route.ts b/src/app/api/agent/config/route.ts index f6b6fa8..baa9577 100644 --- a/src/app/api/agent/config/route.ts +++ b/src/app/api/agent/config/route.ts @@ -196,18 +196,17 @@ export async function GET(request: Request) { select: { fleetPollIntervalMs: true }, }); - // Build WebSocket URL from the incoming request's host + // Build push URL from the incoming request's host const proto = request.headers.get("x-forwarded-proto") ?? "http"; - const wsProto = proto === "https" ? "wss" : "ws"; const host = request.headers.get("x-forwarded-host") ?? request.headers.get("host") ?? `localhost:${process.env.PORT ?? 3000}`; - const websocketUrl = `${wsProto}://${host}/api/agent/ws`; + const pushUrl = `${proto}://${host}/api/agent/push`; return NextResponse.json({ pipelines: pipelineConfigs, pollIntervalMs: settings?.fleetPollIntervalMs ?? 15000, - websocketUrl, + pushUrl, secretBackend: environment.secretBackend, ...(environment.secretBackend !== "BUILTIN" ? { secretBackendConfig: environment.secretBackendConfig } diff --git a/src/app/api/agent/push/route.ts b/src/app/api/agent/push/route.ts new file mode 100644 index 0000000..718697a --- /dev/null +++ b/src/app/api/agent/push/route.ts @@ -0,0 +1,44 @@ +import { authenticateAgent } from "@/server/services/agent-auth"; +import { pushRegistry } from "@/server/services/push-registry"; + +export const dynamic = "force-dynamic"; + +export async function GET(request: Request): Promise { + const agent = await authenticateAgent(request); + if (!agent) { + return new Response("Unauthorized", { status: 401 }); + } + + const { nodeId, environmentId } = agent; + let controllerRef: ReadableStreamDefaultController | null = null; + + const stream = new ReadableStream({ + start(controller) { + controllerRef = controller; + pushRegistry.register(nodeId, controller, environmentId); + // Send initial comment to confirm connection + controller.enqueue(new TextEncoder().encode(": connected\n\n")); + }, + cancel() { + if (controllerRef) { + pushRegistry.unregister(nodeId, controllerRef); + } + }, + }); + + // Also unregister on client abort + request.signal.addEventListener("abort", () => { + if (controllerRef) { + pushRegistry.unregister(nodeId, controllerRef); + } + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }, + }); +} diff --git a/src/server/routers/deploy.ts b/src/server/routers/deploy.ts index d29ac2b..9d419c4 100644 --- a/src/server/routers/deploy.ts +++ b/src/server/routers/deploy.ts @@ -10,7 +10,7 @@ import { decryptNodeConfig } from "@/server/services/config-crypto"; import { withAudit } from "@/server/middleware/audit"; import { writeAuditLog } from "@/server/services/audit"; import { fireEventAlert } from "@/server/services/event-alerts"; -import { wsRegistry } from "@/server/services/ws-registry"; +import { pushRegistry } from "@/server/services/push-registry"; export const deployRouter = router({ preview: protectedProcedure @@ -277,7 +277,7 @@ export const deployRouter = router({ select: { id: true }, }); for (const node of nodes) { - wsRegistry.send(node.id, { + pushRegistry.send(node.id, { type: "config_changed", pipelineId: input.pipelineId, reason: "undeploy", diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index 57d711e..10ce782 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -5,7 +5,7 @@ import { prisma } from "@/lib/prisma"; import { LogLevel } from "@/generated/prisma"; import { withAudit } from "@/server/middleware/audit"; import { checkDevAgentVersion } from "@/server/services/version-check"; -import { wsRegistry } from "@/server/services/ws-registry"; +import { pushRegistry } from "@/server/services/push-registry"; export const fleetRouter = router({ list: protectedProcedure @@ -21,7 +21,7 @@ export const fleetRouter = router({ }); return nodes.map((node) => ({ ...node, - wsConnected: wsRegistry.isConnected(node.id), + pushConnected: pushRegistry.isConnected(node.id), })); }), @@ -385,8 +385,8 @@ export const fleetRouter = router({ }, }); - // Push action to agent via WebSocket (fallback: agent reads pendingAction on next poll) - wsRegistry.send(input.nodeId, { + // Push action to agent via SSE (fallback: agent reads pendingAction on next poll) + pushRegistry.send(input.nodeId, { type: "action", action: "self_update", targetVersion, @@ -459,7 +459,7 @@ export const fleetRouter = router({ }); // Maintenance mode changes what the config endpoint returns — notify agent to re-poll - wsRegistry.send(input.nodeId, { + pushRegistry.send(input.nodeId, { type: "config_changed", reason: input.enabled ? "maintenance_on" : "maintenance_off", }); @@ -503,7 +503,7 @@ export const fleetRouter = router({ return { nodes: nodes.map((node) => ({ ...node, - wsConnected: wsRegistry.isConnected(node.id), + pushConnected: pushRegistry.isConnected(node.id), })), deployedPipelines: deployedPipelines.map((p) => ({ id: p.id, diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 79f3374..b645d6f 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -18,7 +18,7 @@ import { copyPipelineGraph } from "@/server/services/copy-pipeline-graph"; import { stripEnvRefs, type StrippedRef } from "@/server/services/strip-env-refs"; import { gitSyncDeletePipeline } from "@/server/services/git-sync"; import { evaluatePipelineHealth } from "@/server/services/sli-evaluator"; -import { wsRegistry } from "@/server/services/ws-registry"; +import { pushRegistry } from "@/server/services/push-registry"; /** Pipeline names must be safe identifiers */ const pipelineNameSchema = z @@ -1169,7 +1169,7 @@ export const pipelineRouter = router({ select: { nodeId: true }, }); for (const { nodeId } of statuses) { - wsRegistry.send(nodeId, { + pushRegistry.send(nodeId, { type: "sample_request", requestId: request.id, pipelineId: input.pipelineId, diff --git a/src/server/services/deploy-agent.ts b/src/server/services/deploy-agent.ts index 389ac52..e35700a 100644 --- a/src/server/services/deploy-agent.ts +++ b/src/server/services/deploy-agent.ts @@ -6,7 +6,7 @@ import { createVersion } from "@/server/services/pipeline-version"; import { decryptNodeConfig } from "@/server/services/config-crypto"; import { startSystemVector, stopSystemVector } from "@/server/services/system-vector"; import { gitSyncCommitPipeline } from "@/server/services/git-sync"; -import { wsRegistry } from "@/server/services/ws-registry"; +import { pushRegistry } from "@/server/services/push-registry"; export interface AgentDeployResult { success: boolean; @@ -179,7 +179,7 @@ export async function deployAgent( const selectorEntries = Object.entries(nodeSelector ?? {}); const matches = selectorEntries.every(([k, v]) => labels[k] === v); if (matches) { - wsRegistry.send(node.id, { + pushRegistry.send(node.id, { type: "config_changed", pipelineId, reason: "deploy", diff --git a/src/server/services/push-registry.ts b/src/server/services/push-registry.ts new file mode 100644 index 0000000..fa26999 --- /dev/null +++ b/src/server/services/push-registry.ts @@ -0,0 +1,91 @@ +// src/server/services/push-registry.ts +import type { PushMessage } from "./push-types"; + +interface Connection { + controller: ReadableStreamDefaultController; + environmentId: string; +} + +const KEEPALIVE_INTERVAL_MS = 30_000; + +class PushRegistry { + private connections = new Map(); + private keepaliveTimer: ReturnType | null = null; + + constructor() { + this.startKeepalive(); + } + + register( + nodeId: string, + controller: ReadableStreamDefaultController, + environmentId: string, + ): void { + const existing = this.connections.get(nodeId); + if (existing) { + try { + existing.controller.close(); + } catch { + // already closed + } + } + this.connections.set(nodeId, { controller, environmentId }); + } + + unregister(nodeId: string, controller?: ReadableStreamDefaultController): void { + if (controller) { + const current = this.connections.get(nodeId); + if (current?.controller !== controller) return; // stale — newer connection exists + } + this.connections.delete(nodeId); + } + + send(nodeId: string, message: PushMessage): boolean { + const conn = this.connections.get(nodeId); + if (!conn) return false; + try { + const encoded = `event: ${message.type}\ndata: ${JSON.stringify(message)}\n\n`; + conn.controller.enqueue(new TextEncoder().encode(encoded)); + return true; + } catch { + this.connections.delete(nodeId); + return false; + } + } + + broadcast(nodeIds: string[], message: PushMessage): string[] { + const sent: string[] = []; + for (const nodeId of nodeIds) { + if (this.send(nodeId, message)) { + sent.push(nodeId); + } + } + return sent; + } + + isConnected(nodeId: string): boolean { + return this.connections.has(nodeId); + } + + get size(): number { + return this.connections.size; + } + + private startKeepalive(): void { + // unref() allows Node.js to exit cleanly even if timer is active (tests, cold starts) + this.keepaliveTimer = setInterval(() => { + const encoder = new TextEncoder(); + const keepalive = encoder.encode(": keepalive\n\n"); + for (const [nodeId, conn] of this.connections) { + try { + conn.controller.enqueue(keepalive); + } catch { + this.connections.delete(nodeId); + } + } + }, KEEPALIVE_INTERVAL_MS); + this.keepaliveTimer.unref(); + } +} + +export const pushRegistry = new PushRegistry(); diff --git a/src/server/services/ws-types.ts b/src/server/services/push-types.ts similarity index 92% rename from src/server/services/ws-types.ts rename to src/server/services/push-types.ts index 3f5dac1..3051181 100644 --- a/src/server/services/ws-types.ts +++ b/src/server/services/push-types.ts @@ -1,7 +1,7 @@ -// src/server/services/ws-types.ts +// src/server/services/push-types.ts /** - * Server→Agent push message types sent over WebSocket. + * Server→Agent push message types sent over SSE. * Each message has a `type` discriminator for client-side dispatch. * * Config changes use lightweight notifications (config_changed) that trigger diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts deleted file mode 100644 index ac6c678..0000000 --- a/src/server/services/ws-auth.ts +++ /dev/null @@ -1,96 +0,0 @@ -import type { IncomingMessage } from "http"; -import { createHash } from "crypto"; -import { prisma } from "@/lib/prisma"; -import { extractBearerToken, verifyNodeToken } from "./agent-token"; - -/** Fast, non-reversible hash used as cache key instead of the raw plaintext - * token — avoids keeping credentials in process memory. */ -function tokenCacheKey(token: string): string { - return createHash("sha256").update(token).digest("hex"); -} - -const TOKEN_CACHE_TTL_MS = 30 * 60 * 1000; // 30 minutes -const TOKEN_CACHE_MAX_SIZE = 1000; - -interface CacheEntry { - nodeId: string; - environmentId: string; - cachedAt: number; -} - -/** Cache verified tokens to avoid O(n) bcrypt scan on every WS upgrade. - * Key: SHA-256 hash of token, Value: { nodeId, environmentId, cachedAt }. - * Entries expire after 30 minutes and the cache is capped at 1000 entries. */ -const tokenCache = new Map(); - -/** Remove expired entries. Called on each lookup to bound memory. */ -function evictStale(): void { - if (tokenCache.size <= TOKEN_CACHE_MAX_SIZE) return; - const now = Date.now(); - for (const [key, entry] of tokenCache) { - if (now - entry.cachedAt > TOKEN_CACHE_TTL_MS) { - tokenCache.delete(key); - } - } -} - -/** - * Authenticate a WebSocket upgrade request by verifying its Bearer token. - * - * Uses an in-memory cache so reconnects (same token) are O(1) instead of - * scanning all node hashes with bcrypt. - */ -export async function authenticateWsUpgrade( - req: IncomingMessage, -): Promise<{ nodeId: string; environmentId: string } | null> { - const authHeader = req.headers["authorization"]; - const token = extractBearerToken( - Array.isArray(authHeader) ? authHeader[0] : authHeader ?? null, - ); - if (!token) { - return null; - } - - // Fast path: check cache first (O(1) string lookup) - const cacheKey = tokenCacheKey(token); - const cached = tokenCache.get(cacheKey); - if (cached) { - // Evict if TTL expired - if (Date.now() - cached.cachedAt > TOKEN_CACHE_TTL_MS) { - tokenCache.delete(cacheKey); - } else { - // Verify the node still exists and the hash still matches (re-enrollment invalidates) - const node = await prisma.vectorNode.findUnique({ - where: { id: cached.nodeId }, - select: { nodeTokenHash: true }, - }); - if (node?.nodeTokenHash && await verifyNodeToken(token, node.nodeTokenHash)) { - return { nodeId: cached.nodeId, environmentId: cached.environmentId }; - } - // Cache stale — node deleted or re-enrolled - tokenCache.delete(cacheKey); - } - } - - // Slow path: scan all nodes with bcrypt - const nodes = await prisma.vectorNode.findMany({ - where: { nodeTokenHash: { not: null } }, - select: { id: true, environmentId: true, nodeTokenHash: true }, - }); - - for (const node of nodes) { - if (!node.nodeTokenHash) continue; - const valid = await verifyNodeToken(token, node.nodeTokenHash); - if (valid) { - evictStale(); - tokenCache.set(cacheKey, { - nodeId: node.id, - environmentId: node.environmentId, - cachedAt: Date.now(), - }); - return { nodeId: node.id, environmentId: node.environmentId }; - } - } - - return null; -} diff --git a/src/server/services/ws-registry.ts b/src/server/services/ws-registry.ts deleted file mode 100644 index 2c60e5b..0000000 --- a/src/server/services/ws-registry.ts +++ /dev/null @@ -1,63 +0,0 @@ -import type WebSocket from "ws"; -import type { PushMessage } from "./ws-types"; - -class WsRegistry { - private connections = new Map(); - - register(nodeId: string, ws: WebSocket): void { - const existing = this.connections.get(nodeId); - if (existing && existing.readyState === existing.OPEN) { - existing.close(1000, "replaced"); - } - this.connections.set(nodeId, ws); - } - - /** Remove a connection. If `ws` is provided, only remove if it matches the - * current registered socket — prevents a stale close handler from removing - * a newer reconnection. */ - unregister(nodeId: string, ws?: WebSocket): void { - if (ws) { - const current = this.connections.get(nodeId); - if (current !== ws) return; // stale socket — newer connection already registered - } - this.connections.delete(nodeId); - } - - send(nodeId: string, message: PushMessage): boolean { - const ws = this.connections.get(nodeId); - if (!ws || ws.readyState !== ws.OPEN) { - return false; - } - try { - ws.send(JSON.stringify(message)); - return true; - } catch { - return false; - } - } - - broadcast(nodeIds: string[], message: PushMessage): string[] { - const sent: string[] = []; - for (const nodeId of nodeIds) { - if (this.send(nodeId, message)) { - sent.push(nodeId); - } - } - return sent; - } - - isConnected(nodeId: string): boolean { - const ws = this.connections.get(nodeId); - return ws !== undefined && ws.readyState === ws.OPEN; - } - - get size(): number { - let count = 0; - for (const [, ws] of this.connections) { - if (ws.readyState === ws.OPEN) count++; - } - return count; - } -} - -export const wsRegistry = new WsRegistry();