Skip to content

feat: agent WebSocket push channel for real-time notifications#87

Merged
TerrifiedBug merged 23 commits intomainfrom
agent-websockets-c67
Mar 11, 2026
Merged

feat: agent WebSocket push channel for real-time notifications#87
TerrifiedBug merged 23 commits intomainfrom
agent-websockets-c67

Conversation

@TerrifiedBug
Copy link
Owner

Summary

  • Add WebSocket server→agent push channel on /api/agent/ws for instant config delivery, sample requests, and fleet actions — replacing 15s poll latency with real-time push
  • Custom server.ts wraps Next.js standalone with WebSocket upgrade handler, ping/pong keepalive, and auth
  • Go agent connects via gorilla/websocket with exponential backoff reconnect (1s→30s) and 45s dead-connection detection
  • Config changes push lightweight config_changed notifications that trigger immediate re-poll, avoiding duplication of secret/cert resolution logic
  • Channel-based dispatch ensures all agent state mutations happen on the main goroutine
  • Fleet UI shows green "WS" badge on WebSocket-connected nodes
  • All existing polling behavior preserved — old agents work unchanged

New files

  • server.ts — Custom Node.js server with WebSocket upgrade handler
  • src/server/services/ws-types.ts — Push message type definitions
  • src/server/services/ws-registry.ts — In-memory connection registry singleton
  • src/server/services/ws-auth.ts — WebSocket upgrade authentication
  • src/app/api/agent/samples/route.ts — Dedicated samples submission endpoint
  • agent/internal/ws/client.go — Go WebSocket client with reconnect
  • agent/internal/ws/client_test.go — Message parsing, connection, and reconnect tests
  • agent/internal/ws/types.go — Go push message types

Modified files

  • deploy-agent.ts, deploy.ts, pipeline.ts, fleet.ts — Push notifications on deploy/undeploy/samples/actions
  • agent/agent.go — WebSocket integration with channel-based dispatch and debounced heartbeat
  • agent/poller.go — Mutex for thread-safe access from WS handler
  • agent/client.goSendSampleResults(), NodeToken(), WebSocketURL field
  • docker/server/Dockerfile — esbuild compilation of custom server
  • package.jsonws dependency, dev script uses custom server
  • Fleet page — WS connection badge

Test plan

  • Go tests pass (go test ./...) — message parsing, connect/receive, reconnect
  • Go agent builds (go build ./...)
  • Next.js production build passes (pnpm build)
  • esbuild compiles server.ts successfully
  • Manual: start dev server, connect agent, verify WS upgrade and push notifications
  • Manual: deploy pipeline, verify agent receives config_changed and re-polls immediately
  • Manual: Docker build and verify server.js replacement works

- Add sync.Mutex to poller for thread-safe access from WS handler
- Store websocketUrl from config response
- Add WebSocket client startup after first poll in Run()
- Channel-based dispatch (wsCh) ensures all mutations on main goroutine
- handleWsMessage: config_changed triggers re-poll, sample_request sends
  directly to /api/agent/samples, action triggers self-update
- Debounced immediate heartbeat (1s window) after state changes
- Add wsConnected field to fleet list and listWithPipelineStatus responses
- Show green "WS" badge next to node status when WebSocket is connected
…tency

- triggerImmediateHeartbeat now sends signal via channel instead of
  calling sendHeartbeat() directly from timer goroutine, ensuring all
  state access (including updateError) happens on the main goroutine
- Add mutex locking to SampleRequests() and PendingAction() accessors
  for consistency with the thread-safety contract on poller
@github-actions github-actions bot added dependencies Pull requests that update a dependency file docker agent feature labels Mar 10, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 10, 2026

Greptile Summary

This PR adds a real-time WebSocket push channel (/api/agent/ws) to replace 15-second poll latency with instant server→agent delivery of config changes, sample requests, and fleet actions. The Go agent gains a reconnecting WS client with exponential backoff and channel-based dispatch ensuring all state mutations stay on the main goroutine. A new dedicated /api/agent/samples endpoint supports direct result submission. The overall design is well-structured and the Go implementation is clean, but there is one critical production correctness issue and one security concern in the auth layer.

Key findings:

  • Production push notifications will silently no-op — the esbuild --bundle flag inlines ws-registry.ts directly into server.js, giving the WebSocket server its own private module instance. Next.js route handlers (tRPC routers in deploy.ts, fleet.ts, pipeline.ts, deploy-agent.ts) load ws-registry through Next.js's own module evaluation and get a separate, empty instance. Every wsRegistry.send() call in those routers will find zero connections and return false. Agents will never receive push notifications in production and will silently fall back to polling. See the Dockerfile comment for a suggested fix.

  • ws-auth.ts cache returns stale environmentId — on cache hits, the code re-fetches and re-verifies nodeTokenHash from the DB (correct) but returns environmentId from the in-memory cache without re-reading it from the database. If a node's environment association changes without re-enrollment, the WebSocket session would be scoped to the wrong environment.

  • The Go WS client (agent/internal/ws/client.go), channel-based dispatch in agent.go, poller mutex additions, and the EventSample unique constraint + idempotent completion logic in the new samples route are all well-implemented.

Confidence Score: 2/5

  • Merging is safe for backward compatibility (polling is preserved), but the core feature — real-time push — will not function in production due to the module isolation issue.
  • The PR's primary value proposition (eliminating poll latency via push) is broken in production standalone mode by the esbuild bundling approach, which creates two separate wsRegistry instances. Agents will always fall back to the 15s poll. The auth layer has a minor stale-scope risk on cache hits. All other components (Go client, samples endpoint, migration) are correct and safe.
  • docker/server/Dockerfile (esbuild bundling approach) and src/server/services/ws-auth.ts (stale environmentId on cache hits) require attention before the feature works correctly in production.

Important Files Changed

Filename Overview
docker/server/Dockerfile esbuild --bundle inlines ws-registry.ts into server.js, creating a separate module instance from what Next.js routes use — push notifications will silently fail in production
src/server/services/ws-registry.ts Clean singleton registry with correct stale-socket replacement and identity-checked unregister; the correctness of the singleton itself is sound, but its sharing across process/bundle boundaries is the core production issue
src/server/services/ws-auth.ts Token cache correctly avoids O(n) bcrypt scan; however cache hits return environmentId from memory without re-fetching from DB, creating a potential stale-scope window if a node's environment changes without re-enrollment
server.ts WebSocket server with correct auth, ping/pong keepalive, and clean close handler; the pongTimer reference is cleared by the close event handler which fires after terminate(), so no double-terminate issue in practice
agent/internal/ws/client.go Well-structured Go WS client with exponential backoff, read-deadline-based dead-connection detection, and clean channel-based close signaling; no issues found
src/app/api/agent/samples/route.ts New dedicated samples endpoint with correct bearer auth, Zod validation, environment-scoped authorization, idempotent unique-constraint handling, and atomic PENDING→final status transition

Sequence Diagram

sequenceDiagram
    participant Agent as Go Agent
    participant Server as Node.js Server (server.ts)
    participant WSReg as wsRegistry (server.ts scope)
    participant NextJS as Next.js Routes (tRPC)
    participant WSReg2 as wsRegistry (Next.js scope)
    participant DB as PostgreSQL

    Agent->>Server: GET /api/agent/config (Bearer token)
    Server->>DB: fetch pipelines + settings
    Server-->>Agent: { pipelines, websocketUrl, pollIntervalMs }

    Agent->>Server: WS Upgrade /api/agent/ws (Bearer token)
    Server->>DB: authenticateWsUpgrade (bcrypt verify)
    Server-->>Agent: 101 Switching Protocols
    Server->>WSReg: register(nodeId, ws)

    Note over WSReg,WSReg2: ⚠️ In production standalone, esbuild bundles<br/>wsRegistry inline → two separate instances

    Agent->>Server: ♥ ping every 30s
    Server-->>Agent: pong

    Note over NextJS,DB: User deploys pipeline via UI
    NextJS->>DB: create PipelineVersion, update nodes
    NextJS->>WSReg2: send(nodeId, {type:"config_changed"})
    Note over WSReg2: Empty registry — no connections registered here
    WSReg2-->>NextJS: false (not sent)

    Note over Agent: Falls back to 15s poll interval
    Agent->>Server: GET /api/agent/config (next poll)
    Server-->>Agent: updated config
Loading

Comments Outside Diff (2)

  1. docker/server/Dockerfile, line 771-774 (link)

    wsRegistry singleton will be split between server.js and Next.js route handlers

    The --bundle flag tells esbuild to inline all local source files (including ws-registry.ts) directly into server.js. esbuild-inlined modules are not registered in Node.js's require cache — they live in the bundle's own module scope.

    When Next.js handles API requests (tRPC routers, route handlers), it loads route files through its own webpack-compiled chunks via Node.js require. Those chunks import ws-registry through a completely separate module evaluation, producing a second, empty wsRegistry instance.

    As a result, every wsRegistry.send() call in deploy.ts, fleet.ts, pipeline.ts, and deploy-agent.ts operates on a registry that has zero connections registered — push notifications will silently no-op for every agent in production.

    One way to fix this is to make ws-registry.ts an external module so both the bundle and the Next.js routes share the same Node.js require cache entry:

    RUN npx esbuild server.ts --bundle --platform=node --target=node22 \
        --outfile=.next/standalone/server.js \
        --alias:@/=./src/ \
        --packages=external \
        --external:./src/server/services/ws-registry.js
    

    And copy the transpiled file alongside server.js, or load the registry via a side-channel (e.g., a local Unix socket or global variable on globalThis) that both execution contexts can share.

  2. src/server/services/ws-auth.ts, line 1-20 (link)

    Cache fast-path still performs bcrypt on every WS upgrade

    The cache is described as making reconnects "O(1) instead of scanning all node hashes with bcrypt," but the cached path (lines 43–52) still calls verifyNodeToken(token, node.nodeTokenHash) — which is bcrypt — on every hit. The actual saving is O(n)→O(1) DB row scan, not removing bcrypt entirely.

    More importantly, note that environmentId is returned directly from the in-memory cache without being re-fetched from the database. The DB query on cache hit only re-validates nodeTokenHash. If a node's environmentId can change in the database (e.g., via an admin migration operation) without triggering re-enrollment (and therefore a new token hash), the authorization context returned here would be stale and could scope the WebSocket session to the wrong environment.

    Since ws-auth.ts is the only auth path for the new WebSocket endpoint and the environmentId propagates to how wsRegistry distinguishes which nodes get which pushes, a stale environment association could allow a node from environment A to receive config-change notifications intended for environment B.

    Consider re-fetching environmentId alongside nodeTokenHash on cache hits:

    select: { nodeTokenHash: true, environmentId: true },

    and returning node.environmentId instead of cached.environmentId.

Last reviewed commit: dbf1379

- Add explicit type annotations in server.ts to fix TS7006 implicit any
- Deduplicate EventSample records: skip if (requestId, componentKey)
  already exists, preventing duplicates when multiple agents run the
  same pipeline and submit results concurrently
- Remove incorrect // indirect marker on gorilla/websocket in go.mod
@TerrifiedBug
Copy link
Owner Author

@greptile fixed.

  1. CI server.ts TS7006 errors — Added explicit type annotations: ws: WebSocket, _req: IncomingMessage, err:
    Error on the wss.on("connection") and error handler callbacks.
  2. Duplicate EventSample records — Added a findFirst check before creating each EventSample — if a
    (requestId, componentKey) pair already exists, the result is skipped. First agent to submit wins;
    subsequent agents' duplicate submissions are silently dropped.
  3. go.mod // indirect — Removed the incorrect marker since gorilla/websocket is directly imported in
    agent/internal/ws/client.go.

- ws-registry: unregister() now takes optional ws param and only removes
  if it matches the current socket, preventing stale close handlers from
  evicting a newer reconnection
- ws-auth: add in-memory token cache so reconnects do O(1) lookup
  instead of O(n) bcrypt scan across all nodes
- samples endpoint: replace findFirst TOCTOU check with unique constraint
  on (requestId, componentKey) + catch P2002 on create for race-safe dedup
- Add migration for EventSample unique constraint
@TerrifiedBug
Copy link
Owner Author

@greptile fixed.

- Move EventSampleRequest ERROR status update after EventSample.create
  to prevent permanent ERROR state when another agent already submitted
- Replace plaintext token cache keys with SHA-256 hashes to avoid
  storing credentials in process memory
- Wire poll_interval WS message to actually reset the poll ticker
@TerrifiedBug
Copy link
Owner Author

@greptile fixed. review.

- Verify sample request's pipeline belongs to the agent's environment
  before allowing writes (prevents cross-environment data injection)
- Use updateMany with status: "PENDING" filter for atomic status
  transitions — concurrent success/error submissions cannot clobber
  each other's final status
Merge success and error paths into a single flow: write EventSample,
then check if all components have samples. Final status is determined
by whether any samples have errors — COMPLETED if all succeeded,
ERROR if any failed. The atomic updateMany with status: "PENDING"
filter ensures only one writer transitions the request.
- Log warning and trigger re-poll for unimplemented "restart" WS action
  instead of silently discarding it
- Add 30-minute TTL and 1000-entry cap to tokenCache to prevent
  unbounded memory growth from re-enrolled nodes
@TerrifiedBug TerrifiedBug deleted the agent-websockets-c67 branch March 11, 2026 00:59
@TerrifiedBug TerrifiedBug restored the agent-websockets-c67 branch March 11, 2026 00:59
@TerrifiedBug TerrifiedBug reopened this Mar 11, 2026
@TerrifiedBug TerrifiedBug merged commit 299388a into main Mar 11, 2026
13 checks passed
@TerrifiedBug TerrifiedBug deleted the agent-websockets-c67 branch March 11, 2026 01:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent dependencies Pull requests that update a dependency file docker feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant