feat: agent WebSocket push channel for real-time notifications#87
feat: agent WebSocket push channel for real-time notifications#87TerrifiedBug merged 23 commits intomainfrom
Conversation
- Add sync.Mutex to poller for thread-safe access from WS handler - Store websocketUrl from config response - Add WebSocket client startup after first poll in Run() - Channel-based dispatch (wsCh) ensures all mutations on main goroutine - handleWsMessage: config_changed triggers re-poll, sample_request sends directly to /api/agent/samples, action triggers self-update - Debounced immediate heartbeat (1s window) after state changes
- Add wsConnected field to fleet list and listWithPipelineStatus responses - Show green "WS" badge next to node status when WebSocket is connected
…tency - triggerImmediateHeartbeat now sends signal via channel instead of calling sendHeartbeat() directly from timer goroutine, ensuring all state access (including updateError) happens on the main goroutine - Add mutex locking to SampleRequests() and PendingAction() accessors for consistency with the thread-safety contract on poller
Greptile SummaryThis PR adds a real-time WebSocket push channel ( Key findings:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Agent as Go Agent
participant Server as Node.js Server (server.ts)
participant WSReg as wsRegistry (server.ts scope)
participant NextJS as Next.js Routes (tRPC)
participant WSReg2 as wsRegistry (Next.js scope)
participant DB as PostgreSQL
Agent->>Server: GET /api/agent/config (Bearer token)
Server->>DB: fetch pipelines + settings
Server-->>Agent: { pipelines, websocketUrl, pollIntervalMs }
Agent->>Server: WS Upgrade /api/agent/ws (Bearer token)
Server->>DB: authenticateWsUpgrade (bcrypt verify)
Server-->>Agent: 101 Switching Protocols
Server->>WSReg: register(nodeId, ws)
Note over WSReg,WSReg2: ⚠️ In production standalone, esbuild bundles<br/>wsRegistry inline → two separate instances
Agent->>Server: ♥ ping every 30s
Server-->>Agent: pong
Note over NextJS,DB: User deploys pipeline via UI
NextJS->>DB: create PipelineVersion, update nodes
NextJS->>WSReg2: send(nodeId, {type:"config_changed"})
Note over WSReg2: Empty registry — no connections registered here
WSReg2-->>NextJS: false (not sent)
Note over Agent: Falls back to 15s poll interval
Agent->>Server: GET /api/agent/config (next poll)
Server-->>Agent: updated config
|
- Add explicit type annotations in server.ts to fix TS7006 implicit any - Deduplicate EventSample records: skip if (requestId, componentKey) already exists, preventing duplicates when multiple agents run the same pipeline and submit results concurrently - Remove incorrect // indirect marker on gorilla/websocket in go.mod
|
@greptile fixed.
|
- ws-registry: unregister() now takes optional ws param and only removes if it matches the current socket, preventing stale close handlers from evicting a newer reconnection - ws-auth: add in-memory token cache so reconnects do O(1) lookup instead of O(n) bcrypt scan across all nodes - samples endpoint: replace findFirst TOCTOU check with unique constraint on (requestId, componentKey) + catch P2002 on create for race-safe dedup - Add migration for EventSample unique constraint
|
@greptile fixed. |
- Move EventSampleRequest ERROR status update after EventSample.create to prevent permanent ERROR state when another agent already submitted - Replace plaintext token cache keys with SHA-256 hashes to avoid storing credentials in process memory - Wire poll_interval WS message to actually reset the poll ticker
|
@greptile fixed. review. |
- Verify sample request's pipeline belongs to the agent's environment before allowing writes (prevents cross-environment data injection) - Use updateMany with status: "PENDING" filter for atomic status transitions — concurrent success/error submissions cannot clobber each other's final status
Merge success and error paths into a single flow: write EventSample, then check if all components have samples. Final status is determined by whether any samples have errors — COMPLETED if all succeeded, ERROR if any failed. The atomic updateMany with status: "PENDING" filter ensures only one writer transitions the request.
- Log warning and trigger re-poll for unimplemented "restart" WS action instead of silently discarding it - Add 30-minute TTL and 1000-entry cap to tokenCache to prevent unbounded memory growth from re-enrolled nodes
Summary
/api/agent/wsfor instant config delivery, sample requests, and fleet actions — replacing 15s poll latency with real-time pushserver.tswraps Next.js standalone with WebSocket upgrade handler, ping/pong keepalive, and authgorilla/websocketwith exponential backoff reconnect (1s→30s) and 45s dead-connection detectionconfig_changednotifications that trigger immediate re-poll, avoiding duplication of secret/cert resolution logicNew files
server.ts— Custom Node.js server with WebSocket upgrade handlersrc/server/services/ws-types.ts— Push message type definitionssrc/server/services/ws-registry.ts— In-memory connection registry singletonsrc/server/services/ws-auth.ts— WebSocket upgrade authenticationsrc/app/api/agent/samples/route.ts— Dedicated samples submission endpointagent/internal/ws/client.go— Go WebSocket client with reconnectagent/internal/ws/client_test.go— Message parsing, connection, and reconnect testsagent/internal/ws/types.go— Go push message typesModified files
deploy-agent.ts,deploy.ts,pipeline.ts,fleet.ts— Push notifications on deploy/undeploy/samples/actionsagent/agent.go— WebSocket integration with channel-based dispatch and debounced heartbeatagent/poller.go— Mutex for thread-safe access from WS handleragent/client.go—SendSampleResults(),NodeToken(),WebSocketURLfielddocker/server/Dockerfile— esbuild compilation of custom serverpackage.json—wsdependency, dev script uses custom serverTest plan
go test ./...) — message parsing, connect/receive, reconnectgo build ./...)pnpm build)server.tssuccessfullyconfig_changedand re-polls immediatelyserver.jsreplacement works