diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 5d49f2bc..887d6613 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -12,7 +12,6 @@ import ( "github.com/kernel/kernel-images/server/lib/cdpmonitor" "github.com/kernel/kernel-images/server/lib/devtoolsproxy" - "github.com/kernel/kernel-images/server/lib/telemetry" "github.com/kernel/kernel-images/server/lib/events" "github.com/kernel/kernel-images/server/lib/logger" "github.com/kernel/kernel-images/server/lib/nekoclient" @@ -20,6 +19,7 @@ import ( "github.com/kernel/kernel-images/server/lib/policy" "github.com/kernel/kernel-images/server/lib/recorder" "github.com/kernel/kernel-images/server/lib/scaletozero" + "github.com/kernel/kernel-images/server/lib/telemetry" ) type cdpMonitorController interface { @@ -99,7 +99,7 @@ func New( stz scaletozero.PinnedController, nekoAuthClient *nekoclient.AuthClient, telemetrySession *telemetry.TelemetrySession, - eventStream *events.EventStream, + eventStream *events.EventStream, displayNum int, ) (*ApiService, error) { switch { diff --git a/server/cmd/api/api/api_test.go b/server/cmd/api/api/api_test.go index 72afd164..9637fb45 100644 --- a/server/cmd/api/api/api_test.go +++ b/server/cmd/api/api/api_test.go @@ -12,12 +12,12 @@ import ( "log/slog" "github.com/kernel/kernel-images/server/lib/devtoolsproxy" - "github.com/kernel/kernel-images/server/lib/telemetry" "github.com/kernel/kernel-images/server/lib/events" "github.com/kernel/kernel-images/server/lib/nekoclient" oapi "github.com/kernel/kernel-images/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/recorder" "github.com/kernel/kernel-images/server/lib/scaletozero" + "github.com/kernel/kernel-images/server/lib/telemetry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) diff --git a/server/cmd/api/api/computer_test.go b/server/cmd/api/api/computer_test.go index af2cfd51..597a57ec 100644 --- a/server/cmd/api/api/computer_test.go +++ b/server/cmd/api/api/computer_test.go @@ -262,7 +262,7 @@ func TestGaussianDelay_WelfordVelocityVariance(t *testing.T) { for i := range distances { t_norm := float64(i) / float64(steps) base := 5.0 + 15.0*math.Sin(t_norm*math.Pi) // 5-20px, peaked in middle - distances[i] = base + rng.Float64()*3.0 // small random variation + distances[i] = base + rng.Float64()*3.0 // small random variation } // Gaussian delays → velocity variance @@ -302,27 +302,27 @@ func TestClampPoints(t *testing.T) { expected [][2]int }{ { - name: "no clamping needed", - points: [][2]int{{10, 20}, {50, 50}, {100, 80}}, - w: 200, h: 200, + name: "no clamping needed", + points: [][2]int{{10, 20}, {50, 50}, {100, 80}}, + w: 200, h: 200, expected: [][2]int{{10, 20}, {50, 50}, {100, 80}}, }, { - name: "clamp negative x and y", - points: [][2]int{{-10, -20}, {50, 50}}, - w: 200, h: 200, + name: "clamp negative x and y", + points: [][2]int{{-10, -20}, {50, 50}}, + w: 200, h: 200, expected: [][2]int{{0, 0}, {50, 50}}, }, { - name: "clamp exceeding screen bounds", - points: [][2]int{{50, 50}, {250, 300}}, - w: 200, h: 200, + name: "clamp exceeding screen bounds", + points: [][2]int{{50, 50}, {250, 300}}, + w: 200, h: 200, expected: [][2]int{{50, 50}, {199, 199}}, }, { - name: "clamp both directions", - points: [][2]int{{-5, 250}, {300, -10}, {100, 100}}, - w: 200, h: 200, + name: "clamp both directions", + points: [][2]int{{-5, 250}, {300, -10}, {100, 100}}, + w: 200, h: 200, expected: [][2]int{{0, 199}, {199, 0}, {100, 100}}, }, } diff --git a/server/cmd/api/api/display.go b/server/cmd/api/api/display.go index 5dd29ff5..993b2397 100644 --- a/server/cmd/api/api/display.go +++ b/server/cmd/api/api/display.go @@ -12,11 +12,11 @@ import ( "strings" "time" - nekooapi "github.com/m1k1o/neko/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/cdpclient" "github.com/kernel/kernel-images/server/lib/logger" oapi "github.com/kernel/kernel-images/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/recorder" + nekooapi "github.com/m1k1o/neko/server/lib/oapi" ) // PatchDisplay updates the display configuration. When require_idle @@ -655,4 +655,3 @@ func (s *ApiService) setResolutionViaNeko(ctx context.Context, width, height, re log.Info("successfully changed resolution via Neko API", "width", width, "height", height, "refresh_rate", refreshRate) return nil } - diff --git a/server/cmd/api/api/fs.go b/server/cmd/api/api/fs.go index 3dc3443f..fffb3fa8 100644 --- a/server/cmd/api/api/fs.go +++ b/server/cmd/api/api/fs.go @@ -16,11 +16,11 @@ import ( "os/user" "github.com/fsnotify/fsnotify" - "github.com/nrednav/cuid2" "github.com/kernel/kernel-images/server/lib/logger" oapi "github.com/kernel/kernel-images/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/ziputil" "github.com/kernel/kernel-images/server/lib/zstdutil" + "github.com/nrednav/cuid2" ) // fsWatch represents an in-memory directory watch. diff --git a/server/cmd/api/api/process.go b/server/cmd/api/api/process.go index 66b304d7..12eb448a 100644 --- a/server/cmd/api/api/process.go +++ b/server/cmd/api/api/process.go @@ -23,10 +23,10 @@ import ( "github.com/coder/websocket" "github.com/creack/pty" "github.com/google/uuid" - openapi_types "github.com/oapi-codegen/runtime/types" "github.com/kernel/kernel-images/server/lib/logger" oapi "github.com/kernel/kernel-images/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/ptyio" + openapi_types "github.com/oapi-codegen/runtime/types" ) type processHandle struct { diff --git a/server/cmd/api/api/process_test.go b/server/cmd/api/api/process_test.go index d9d955f3..741734d7 100644 --- a/server/cmd/api/api/process_test.go +++ b/server/cmd/api/api/process_test.go @@ -13,9 +13,9 @@ import ( "time" "github.com/google/uuid" - openapi_types "github.com/oapi-codegen/runtime/types" oapi "github.com/kernel/kernel-images/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/scaletozero" + openapi_types "github.com/oapi-codegen/runtime/types" "github.com/stretchr/testify/require" ) diff --git a/server/cmd/chromium-launcher/main.go b/server/cmd/chromium-launcher/main.go index fd0979f8..558b0499 100644 --- a/server/cmd/chromium-launcher/main.go +++ b/server/cmd/chromium-launcher/main.go @@ -135,7 +135,6 @@ func main() { } } - // execLookPath helps satisfy syscall.Exec's requirement to pass an absolute path. func execLookPath(file string) (string, error) { if strings.ContainsRune(file, os.PathSeparator) { diff --git a/server/cmd/chromium-launcher/main_test.go b/server/cmd/chromium-launcher/main_test.go index e8f19d35..7b4ba40a 100644 --- a/server/cmd/chromium-launcher/main_test.go +++ b/server/cmd/chromium-launcher/main_test.go @@ -34,4 +34,3 @@ func TestExecLookPath(t *testing.T) { t.Fatalf("execLookPath PATH search failed: p=%q err=%v", p, err) } } - diff --git a/server/cmd/shell/main.go b/server/cmd/shell/main.go index 0dde480b..6f5a3abb 100644 --- a/server/cmd/shell/main.go +++ b/server/cmd/shell/main.go @@ -17,8 +17,8 @@ import ( "time" "github.com/google/uuid" - openapi_types "github.com/oapi-codegen/runtime/types" oapi "github.com/kernel/kernel-images/server/lib/oapi" + openapi_types "github.com/oapi-codegen/runtime/types" "golang.org/x/term" ) diff --git a/server/cmd/wrapper/chromium.go b/server/cmd/wrapper/chromium.go index ccc87e77..3b239a14 100644 --- a/server/cmd/wrapper/chromium.go +++ b/server/cmd/wrapper/chromium.go @@ -61,4 +61,3 @@ func applyHeadlessDefaultFlags() { }, " ") _ = os.Setenv("CHROMIUM_FLAGS", flags) } - diff --git a/server/cmd/wrapper/main.go b/server/cmd/wrapper/main.go index 5015c496..aae8f0c0 100644 --- a/server/cmd/wrapper/main.go +++ b/server/cmd/wrapper/main.go @@ -243,9 +243,9 @@ func main() { // waitAllReady gates on all caller-visible ready signals concurrently: // - cdp : HTTP /json/version on the public CDP port (proves api proxy is -// wired through to chromium's DevTools server) +// wired through to chromium's DevTools server) // - chromedriver : TCP on chromedriver's internal port 9225 (api on 9224 is bound -// when api itself is up, which CDP readiness already implies) +// when api itself is up, which CDP readiness already implies) // - neko : TCP on neko's HTTP port (8080), only when ENABLE_WEBRTC=true // - envoy : TCP on envoy's listener (3128), only when envoy is enabled func waitAllReady(t0 time.Time, webrtc bool) map[string]time.Duration { diff --git a/server/e2e/container.go b/server/e2e/container.go index 6b8280bf..ada4b135 100644 --- a/server/e2e/container.go +++ b/server/e2e/container.go @@ -178,7 +178,7 @@ func (c *TestContainer) ExitCh() <-chan error { // WaitDevTools waits for the CDP WebSocket endpoint to be ready. func (c *TestContainer) WaitDevTools(ctx context.Context) error { return wait.ForListeningPort(nat.Port("9222/tcp")). - WithStartupTimeout(2 * time.Minute). + WithStartupTimeout(2*time.Minute). WaitUntilReady(ctx, c.ctr) } diff --git a/server/lib/cdpclient/cdpclient_test.go b/server/lib/cdpclient/cdpclient_test.go index f44d675f..33875afe 100644 --- a/server/lib/cdpclient/cdpclient_test.go +++ b/server/lib/cdpclient/cdpclient_test.go @@ -16,20 +16,20 @@ import ( // fakeCDP is a minimal CDP server that responds to the commands used by // SetDeviceMetricsOverride and GetBrowserVersion. type fakeCDP struct { - getTargetsCalled bool - attachCalled bool - setMetricsCalled bool - setMetricsWidth int - setMetricsHeight int - detachCalled bool - pageTargetID string - sessionID string - failGetTargets bool - failSetMetrics bool - returnNoPageTargets bool - getVersionCalled bool - failGetVersion bool - productResponse string + getTargetsCalled bool + attachCalled bool + setMetricsCalled bool + setMetricsWidth int + setMetricsHeight int + detachCalled bool + pageTargetID string + sessionID string + failGetTargets bool + failSetMetrics bool + returnNoPageTargets bool + getVersionCalled bool + failGetVersion bool + productResponse string } func (f *fakeCDP) handler(w http.ResponseWriter, r *http.Request) { diff --git a/server/lib/cdpmonitor/computed.go b/server/lib/cdpmonitor/computed.go index 3921089b..07ebfc47 100644 --- a/server/lib/cdpmonitor/computed.go +++ b/server/lib/cdpmonitor/computed.go @@ -100,7 +100,6 @@ func (s *computedState) navDataWith(extra map[string]any) json.RawMessage { return out } - // currentNavCtxFields returns the current nav context fields for constructing typed event payloads. // Returns zero values if s is nil (before first navigation). func (s *computedState) currentNavCtxFields() (sessionID, targetID, targetType, frameID, loaderID, url string, navSeq int64) { diff --git a/server/lib/cdpmonitor/types.go b/server/lib/cdpmonitor/types.go index 2d5b0422..1cdce741 100644 --- a/server/lib/cdpmonitor/types.go +++ b/server/lib/cdpmonitor/types.go @@ -16,12 +16,12 @@ const mainSessionUnset = "\x00unset" // Each maps 1-to-1 with a specific CDP domain event (Runtime.*, Network.*, // Page.*, PerformanceTimeline.*) received from Chrome. const ( - EventConsoleLog = "console_log" // Runtime.consoleAPICalled (non-error types) - EventConsoleError = "console_error" // Runtime.consoleAPICalled (type=error) or Runtime.exceptionThrown - EventNetworkRequest = "network_request" // Network.requestWillBeSent - EventNetworkResponse = "network_response" // Network.loadingFinished (with prior responseReceived) - EventNetworkLoadingFailed = "network_loading_failed" // Network.loadingFailed - EventNavigation = "page_navigation" // Page.frameNavigated + EventConsoleLog = "console_log" // Runtime.consoleAPICalled (non-error types) + EventConsoleError = "console_error" // Runtime.consoleAPICalled (type=error) or Runtime.exceptionThrown + EventNetworkRequest = "network_request" // Network.requestWillBeSent + EventNetworkResponse = "network_response" // Network.loadingFinished (with prior responseReceived) + EventNetworkLoadingFailed = "network_loading_failed" // Network.loadingFailed + EventNavigation = "page_navigation" // Page.frameNavigated EventDOMContentLoaded = "page_dom_content_loaded" // Page.domContentEventFired EventPageLoad = "page_load" // Page.loadEventFired EventLayoutShift = "page_layout_shift" // PerformanceTimeline event of type "layout-shift" @@ -33,8 +33,8 @@ const ( // None of these correspond to a single CDP notification; they are inferred from // sequences of CDP events and debounce timers. const ( - EventNetworkIdle = "network_idle" // 500 ms after all in-flight requests finish - EventLayoutSettled = "page_layout_settled" // 1 s after page_load with no intervening layout shifts + EventNetworkIdle = "network_idle" // 500 ms after all in-flight requests finish + EventLayoutSettled = "page_layout_settled" // 1 s after page_load with no intervening layout shifts EventNavigationSettled = "page_navigation_settled" // fires once page_dom_content_loaded and page_layout_settled both hold ) @@ -42,18 +42,18 @@ const ( // Runtime.bindingCalled mechanism. They originate in the browser's renderer // process, not from Chrome's network or page domains. const ( - EventInteractionClick = "interaction_click" // document click (target selector, coords, text) - EventInteractionKey = "interaction_key" // keydown (key name, target selector) + EventInteractionClick = "interaction_click" // document click (target selector, coords, text) + EventInteractionKey = "interaction_key" // keydown (key name, target selector) EventScrollSettled = "interaction_scroll_settled" // 300 ms after the last scroll event on a target ) // Monitor lifecycle and internal events — emitted by the monitor itself, not by Chrome. const ( - EventScreenshot = "monitor_screenshot" // ffmpeg frame capture on page load or JS exception - EventMonitorDisconnected = "monitor_disconnected" // WebSocket to Chrome closed unexpectedly - EventMonitorReconnected = "monitor_reconnected" // successfully reconnected after a disconnect + EventScreenshot = "monitor_screenshot" // ffmpeg frame capture on page load or JS exception + EventMonitorDisconnected = "monitor_disconnected" // WebSocket to Chrome closed unexpectedly + EventMonitorReconnected = "monitor_reconnected" // successfully reconnected after a disconnect EventMonitorReconnectFailed = "monitor_reconnect_failed" // reconnect attempts exhausted - EventMonitorInitFailed = "monitor_init_failed" // could not initialise the CDP session + EventMonitorInitFailed = "monitor_init_failed" // could not initialise the CDP session ) // Metadata keys written into events.Source.Metadata for CDP-sourced events. diff --git a/server/lib/devtoolsproxy/proxy.go b/server/lib/devtoolsproxy/proxy.go index 3dc76198..cef8a045 100644 --- a/server/lib/devtoolsproxy/proxy.go +++ b/server/lib/devtoolsproxy/proxy.go @@ -359,11 +359,11 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(r.Context().Err(), context.Canceled), errors.Is(r.Context().Err(), context.DeadlineExceeded): clientConn.Close(websocket.StatusGoingAway, "request cancelled") - publishCdpDisconnect(publish, oapi.ContextCancelled, connectedAt, msgCount.Load()) + publishCdpDisconnect(publish, oapi.ContextCancelled, connectedAt, time.Now(), msgCount.Load()) default: logger.Error("failed to connect to upstream", slog.String("err", err.Error())) clientConn.Close(websocket.StatusInternalError, "upstream unavailable") - publishCdpDisconnect(publish, oapi.UpstreamError, connectedAt, msgCount.Load()) + publishCdpDisconnect(publish, oapi.UpstreamError, connectedAt, time.Now(), msgCount.Load()) } return } @@ -371,14 +371,12 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess logger.Debug("proxying websocket", slog.String("url", upstreamURL)) - // Cancel the pump when the upstream URL changes (Chromium restarted), - // forcing the client to reconnect with the new upstream. pumpCtx, pumpCancel := context.WithCancel(r.Context()) - // Set by the URL-watcher when it tears down the pump; cleanup falls - // back to client_close otherwise. - var reasonOverride atomic.Pointer[oapi.BrowserCdpDisconnectEventDataReason] - + // Force clients off a stale upstream as soon as UpstreamManager + // publishes a different DevTools URL. Closing upstreamConn (rather + // than cancelling pumpCtx) makes the pump exit PumpExitUpstream so + // resolveDisconnectReason classifies the disconnect via mgr.Current(). go func(currentUpstreamURL string) { for { select { @@ -393,9 +391,7 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess logger.Info("upstream URL changed, closing stale proxy session", slog.String("old_url", currentUpstreamURL), slog.String("new_url", newURL)) - reason := oapi.UpstreamChanged - reasonOverride.CompareAndSwap(nil, &reason) - pumpCancel() + upstreamConn.Close(websocket.StatusGoingAway, "upstream changed") return case <-pumpCtx.Done(): return @@ -404,18 +400,19 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess }(upstreamURL) var once sync.Once - cleanup := func() { + cleanup := func(cause wsproxy.PumpExitCause) { once.Do(func() { - reason := oapi.ClientClose - if rp := reasonOverride.Load(); rp != nil { - reason = *rp - } else if r.Context().Err() != nil { - reason = oapi.ContextCancelled - } + // Pin disconnectedAt before resolveDisconnectReason so duration_ms + // reflects actual session length, not the up-to-restartConfirmWait + // poll. Close conns explicitly before resolving as defense in + // depth — coder/websocket already closes the client conn as a + // side effect of pumpCancel, but we shouldn't rely on that. + disconnectedAt := time.Now() pumpCancel() upstreamConn.Close(websocket.StatusNormalClosure, "") clientConn.Close(websocket.StatusNormalClosure, "") - publishCdpDisconnect(publish, reason, connectedAt, msgCount.Load()) + reason := resolveDisconnectReason(cause, r.Context(), mgr, upstreamURL, restartConfirmWait, logger) + publishCdpDisconnect(publish, reason, connectedAt, disconnectedAt, msgCount.Load()) }) } @@ -423,6 +420,49 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess }) } +// restartConfirmWait is how long cleanup waits for a new upstream URL after +// the upstream side of the pump dies before classifying the disconnect as +// upstream_error vs upstream_changed. Sized for Chromium's typical cold +// restart (~5-8s on Unikraft Cloud) with headroom. var (not const) so tests +// can temporarily shrink it. +var restartConfirmWait = 10 * time.Second + +// resolveDisconnectReason picks the cdp_disconnect reason from which side +// caused the pump to exit. On upstream cause it polls mgr.Current() for up +// to restartWait: a different URL within the window means Chromium restarted +// (upstream_changed); timeout means the upstream broke without a restart +// (upstream_error). Polling rather than reading urlCh avoids competing with +// the URL watcher and works because setCurrent updates Current() before +// broadcasting. +func resolveDisconnectReason(cause wsproxy.PumpExitCause, reqCtx context.Context, mgr *UpstreamManager, dialedURL string, restartWait time.Duration, logger *slog.Logger) oapi.BrowserCdpDisconnectEventDataReason { + if reqCtx.Err() != nil { + return oapi.ContextCancelled + } + switch cause { + case wsproxy.PumpExitClient: + return oapi.ClientClose + case wsproxy.PumpExitContext: + return oapi.ContextCancelled + } + + deadline := time.Now().Add(restartWait) + for { + if newest := normalizeUpstreamURL(mgr.Current()); newest != "" && newest != dialedURL { + logger.Info("upstream restart detected after disconnect", + slog.String("old_url", dialedURL), slog.String("new_url", newest)) + return oapi.UpstreamChanged + } + if !time.Now().Before(deadline) { + return oapi.UpstreamError + } + select { + case <-time.After(100 * time.Millisecond): + case <-reqCtx.Done(): + return oapi.ContextCancelled + } + } +} + func publishCdpConnect(publish EventPublisher) { if publish == nil { return @@ -435,17 +475,17 @@ func publishCdpConnect(publish EventPublisher) { }) } -func publishCdpDisconnect(publish EventPublisher, reason oapi.BrowserCdpDisconnectEventDataReason, connectedAt time.Time, msgCount int64) { +func publishCdpDisconnect(publish EventPublisher, reason oapi.BrowserCdpDisconnectEventDataReason, connectedAt, disconnectedAt time.Time, msgCount int64) { if publish == nil { return } data, _ := json.Marshal(oapi.BrowserCdpDisconnectEventData{ - DurationMs: float32(time.Since(connectedAt).Microseconds()) / 1000.0, + DurationMs: float32(disconnectedAt.Sub(connectedAt).Microseconds()) / 1000.0, MessageCount: int(msgCount), Reason: reason, }) publish(events.Event{ - Ts: time.Now().UnixMicro(), + Ts: disconnectedAt.UnixMicro(), Type: "cdp_disconnect", Category: events.System, Source: oapi.BrowserEventSource{Kind: oapi.KernelApi}, diff --git a/server/lib/devtoolsproxy/proxy_test.go b/server/lib/devtoolsproxy/proxy_test.go index 80915eaf..dbb022a9 100644 --- a/server/lib/devtoolsproxy/proxy_test.go +++ b/server/lib/devtoolsproxy/proxy_test.go @@ -23,6 +23,7 @@ import ( "github.com/kernel/kernel-images/server/lib/events" oapi "github.com/kernel/kernel-images/server/lib/oapi" "github.com/kernel/kernel-images/server/lib/scaletozero" + "github.com/kernel/kernel-images/server/lib/wsproxy" ) func silentLogger() *slog.Logger { @@ -508,6 +509,246 @@ func TestWebSocketProxyHandler_EmitsConnectAndDisconnect(t *testing.T) { } } +func TestResolveDisconnectReason(t *testing.T) { + logger := silentLogger() + const dialed = "ws://127.0.0.1:1234/devtools/browser/dialed" + + cases := []struct { + name string + cause wsproxy.PumpExitCause + reqCtxErr bool + setCurr string + pushURL string + wait time.Duration + want oapi.BrowserCdpDisconnectEventDataReason + }{ + { + name: "client cause -> client_close", + cause: wsproxy.PumpExitClient, + wait: 10 * time.Millisecond, + want: oapi.ClientClose, + }, + { + name: "context cause -> context_cancelled", + cause: wsproxy.PumpExitContext, + wait: 10 * time.Millisecond, + want: oapi.ContextCancelled, + }, + { + name: "request context cancelled wins over upstream cause", + cause: wsproxy.PumpExitUpstream, + reqCtxErr: true, + wait: 10 * time.Millisecond, + want: oapi.ContextCancelled, + }, + { + name: "upstream cause + Current already changed -> upstream_changed", + cause: wsproxy.PumpExitUpstream, + setCurr: "ws://127.0.0.1:1234/devtools/browser/fresh", + wait: 10 * time.Millisecond, + want: oapi.UpstreamChanged, + }, + { + name: "upstream cause + new URL arrives during wait -> upstream_changed", + cause: wsproxy.PumpExitUpstream, + pushURL: "ws://127.0.0.1:1234/devtools/browser/fresh", + wait: 500 * time.Millisecond, + want: oapi.UpstreamChanged, + }, + { + name: "upstream cause + no new URL -> upstream_error", + cause: wsproxy.PumpExitUpstream, + wait: 50 * time.Millisecond, + want: oapi.UpstreamError, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + mgr := NewUpstreamManager("/dev/null", logger) + mgr.setCurrent(dialed) + if tc.setCurr != "" { + mgr.setCurrent(tc.setCurr) + } + + reqCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + if tc.reqCtxErr { + cancel() + } + + if tc.pushURL != "" { + go func() { + time.Sleep(20 * time.Millisecond) + mgr.setCurrent(tc.pushURL) + }() + } + + got := resolveDisconnectReason(tc.cause, reqCtx, mgr, dialed, tc.wait, logger) + if got != tc.want { + t.Fatalf("reason = %q, want %q", got, tc.want) + } + }) + } +} + +func TestWebSocketProxyHandler_EmitsUpstreamChangedOnMidStreamRestart(t *testing.T) { + // Shorten the resolve wait so the test doesn't pay the production 10s. + prev := restartConfirmWait + restartConfirmWait = 1 * time.Second + defer func() { restartConfirmWait = prev }() + + // Upstream A: echoes once, then closes (simulates Chromium dying mid-session). + upstreamA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{OriginPatterns: []string{"*"}}) + if err != nil { + t.Errorf("accept failed: %v", err) + return + } + mt, msg, err := c.Read(r.Context()) + if err != nil { + c.Close(websocket.StatusInternalError, "") + return + } + _ = c.Write(r.Context(), mt, msg) + c.Close(websocket.StatusGoingAway, "chromium-died") + })) + defer upstreamA.Close() + + urlA, _ := url.Parse(upstreamA.URL) + urlA.Scheme = "ws" + urlA.Path = "/devtools/browser/a" + urlB := "ws://127.0.0.1:1/devtools/browser/b-replacement" + + logger := silentLogger() + mgr := NewUpstreamManager("/dev/null", logger) + mgr.setCurrent(urlA.String()) + + rp := &recordingPublisher{} + proxySrv := httptest.NewServer(WebSocketProxyHandler(mgr, logger, false, scaletozero.NewNoopController(), rp.publish)) + defer proxySrv.Close() + + pu, _ := url.Parse(proxySrv.URL) + pu.Scheme = "ws" + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + conn, _, err := websocket.Dial(ctx, pu.String(), nil) + if err != nil { + t.Fatalf("dial proxy failed: %v", err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + + if err := conn.Write(ctx, websocket.MessageText, []byte("hello")); err != nil { + t.Fatalf("write failed: %v", err) + } + if _, _, err := conn.Read(ctx); err != nil { + t.Fatalf("read failed: %v", err) + } + + // Publish the new URL deliberately late so duration_ms would clearly be + // inflated if it were computed at publish time instead of disconnect time. + urlChangeAt := 700 * time.Millisecond + go func() { + time.Sleep(urlChangeAt) + mgr.setCurrent(urlB) + }() + + if !waitForCondition(2*time.Second, func() bool { return len(rp.snapshot()) >= 2 }) { + t.Fatalf("expected 2 events, got %d: %+v", len(rp.snapshot()), rp.snapshot()) + } + + captured := rp.snapshot() + if captured[1].Type != "cdp_disconnect" { + t.Fatalf("second event type = %q, want cdp_disconnect", captured[1].Type) + } + var disconnect struct { + Reason oapi.BrowserCdpDisconnectEventDataReason `json:"reason"` + DurationMs float64 `json:"duration_ms"` + } + if err := json.Unmarshal(captured[1].Data, &disconnect); err != nil { + t.Fatalf("unmarshal disconnect data: %v", err) + } + if disconnect.Reason != oapi.UpstreamChanged { + t.Fatalf("disconnect reason = %q, want %q", disconnect.Reason, oapi.UpstreamChanged) + } + // duration_ms must reflect actual session length, not the resolver poll wait. + if maxMs := float64(urlChangeAt / time.Millisecond); disconnect.DurationMs >= maxMs { + t.Fatalf("disconnect duration_ms = %f, want < %f", disconnect.DurationMs, maxMs) + } +} + +func TestWebSocketProxyHandler_KicksClientOffStaleUpstreamOnURLChange(t *testing.T) { + prev := restartConfirmWait + restartConfirmWait = 500 * time.Millisecond + defer func() { restartConfirmWait = prev }() + + // Upstream stays alive until the proxy closes it from the watcher path. + upstreamSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{OriginPatterns: []string{"*"}}) + if err != nil { + t.Errorf("accept failed: %v", err) + return + } + defer c.Close(websocket.StatusNormalClosure, "") + for { + if _, _, err := c.Read(r.Context()); err != nil { + return + } + } + })) + defer upstreamSrv.Close() + + urlA, _ := url.Parse(upstreamSrv.URL) + urlA.Scheme = "ws" + urlA.Path = "/devtools/browser/a" + urlB := "ws://127.0.0.1:1/devtools/browser/b-replacement" + + logger := silentLogger() + mgr := NewUpstreamManager("/dev/null", logger) + mgr.setCurrent(urlA.String()) + + rp := &recordingPublisher{} + proxySrv := httptest.NewServer(WebSocketProxyHandler(mgr, logger, false, scaletozero.NewNoopController(), rp.publish)) + defer proxySrv.Close() + + pu, _ := url.Parse(proxySrv.URL) + pu.Scheme = "ws" + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + conn, _, err := websocket.Dial(ctx, pu.String(), nil) + if err != nil { + t.Fatalf("dial proxy failed: %v", err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + + if !waitForCondition(2*time.Second, func() bool { return len(rp.snapshot()) >= 1 }) { + t.Fatalf("expected cdp_connect, got %d events", len(rp.snapshot())) + } + + mgr.setCurrent(urlB) + + if !waitForCondition(2*time.Second, func() bool { return len(rp.snapshot()) >= 2 }) { + t.Fatalf("expected cdp_disconnect after URL change, got %d events: %+v", + len(rp.snapshot()), rp.snapshot()) + } + + captured := rp.snapshot() + if captured[1].Type != "cdp_disconnect" { + t.Fatalf("second event type = %q, want cdp_disconnect", captured[1].Type) + } + var disconnect struct { + Reason oapi.BrowserCdpDisconnectEventDataReason `json:"reason"` + } + if err := json.Unmarshal(captured[1].Data, &disconnect); err != nil { + t.Fatalf("unmarshal disconnect data: %v", err) + } + if disconnect.Reason != oapi.UpstreamChanged { + t.Fatalf("disconnect reason = %q, want %q", disconnect.Reason, oapi.UpstreamChanged) + } +} + func TestWebSocketProxyHandler_EmitsUpstreamErrorOnDialFailure(t *testing.T) { port, err := getFreePort() if err != nil { diff --git a/server/lib/events/events_test.go b/server/lib/events/events_test.go index 24a5ad5c..86a5744f 100644 --- a/server/lib/events/events_test.go +++ b/server/lib/events/events_test.go @@ -135,7 +135,7 @@ func newTestRingBuffer(t *testing.T, capacity int) *ringBuffer { // TestRingBuffer: publish 3 envelopes; reader reads all 3 in order func TestRingBuffer(t *testing.T) { - rb := newTestRingBuffer(t,10) + rb := newTestRingBuffer(t, 10) reader := rb.newReader(0) envelopes := []Envelope{ @@ -160,7 +160,7 @@ func TestRingBuffer(t *testing.T) { // TestRingBufferOverflowNoBlock: writer never blocks even with no readers func TestRingBufferOverflowNoBlock(t *testing.T) { - rb := newTestRingBuffer(t,2) + rb := newTestRingBuffer(t, 2) done := make(chan struct{}) go func() { @@ -187,7 +187,7 @@ func TestRingBufferOverflowNoBlock(t *testing.T) { } func TestRingBufferOverflowExistingReader(t *testing.T) { - rb := newTestRingBuffer(t,2) + rb := newTestRingBuffer(t, 2) reader := rb.newReader(0) rb.publish(mkEnv(1, cdpEvent("console.log", Console))) @@ -212,7 +212,7 @@ func TestRingBufferOverflowExistingReader(t *testing.T) { } func TestNewReaderResume(t *testing.T) { - rb := newTestRingBuffer(t,10) + rb := newTestRingBuffer(t, 10) for i := uint64(1); i <= 5; i++ { rb.publish(mkEnv(i, cdpEvent("console.log", Console))) } @@ -254,7 +254,7 @@ func TestNewReaderResume(t *testing.T) { func TestConcurrentPublishRead(t *testing.T) { const numEvents = 20 - rb := newTestRingBuffer(t,32) + rb := newTestRingBuffer(t, 32) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -286,7 +286,7 @@ func TestConcurrentPublishRead(t *testing.T) { } func TestConcurrentReaders(t *testing.T) { - rb := newTestRingBuffer(t,20) + rb := newTestRingBuffer(t, 20) numReaders := 3 numEvents := 5 @@ -329,9 +329,8 @@ func TestConcurrentReaders(t *testing.T) { } } - func TestRingBufferResetWithActiveReader(t *testing.T) { - rb := newTestRingBuffer(t,10) + rb := newTestRingBuffer(t, 10) reader := rb.newReader(0) // Publish some events so the reader advances. @@ -368,4 +367,3 @@ func TestNewRingBufferRejectsNonPositiveCapacity(t *testing.T) { assert.Contains(t, err.Error(), "capacity must be > 0") } } - diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index 2513cb12..141fc6c9 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -13,10 +13,10 @@ import ( ) type mockBackend struct { - mu sync.Mutex - appended []Envelope - err error - errCount int + mu sync.Mutex + appended []Envelope + err error + errCount int } func (m *mockBackend) Append(_ context.Context, env Envelope) error { diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index ee874c3f..e5dfec84 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -12,8 +12,8 @@ type ringBuffer struct { mu sync.RWMutex buf []Envelope cap uint64 - latestSeq uint64 // highest envelope.Seq published - readerWake chan struct{} // closed-and-replaced on each Publish to wake blocked readers + latestSeq uint64 // highest envelope.Seq published + readerWake chan struct{} // closed-and-replaced on each Publish to wake blocked readers } func newRingBuffer(capacity int) (*ringBuffer, error) { diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index dd112798..d10b51f5 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -49,13 +49,13 @@ func (sp *s2Producer) close(ctx context.Context) error { // s2Storage appends all events to a single fixed stream set at construction time. type s2Storage struct { - producer s2Producer - sessionCancel context.CancelFunc - shutdownCtx context.Context - shutdownCancel context.CancelFunc - closeOnce sync.Once - ackErrors atomic.Uint64 - log *slog.Logger + producer s2Producer + sessionCancel context.CancelFunc + shutdownCtx context.Context + shutdownCancel context.CancelFunc + closeOnce sync.Once + ackErrors atomic.Uint64 + log *slog.Logger } // newS2Storage opens an AppendSession that runs under an independent context so diff --git a/server/lib/policy/overrides.go b/server/lib/policy/overrides.go index 71df7307..9978af81 100644 --- a/server/lib/policy/overrides.go +++ b/server/lib/policy/overrides.go @@ -48,30 +48,30 @@ var blockedPolicies = map[string]string{ "ExtensionInstallForcelist": "managed by kernel extension system", // Could disable kernel's own extensions - "ExtensionInstallBlocklist": "could interfere with kernel extensions", - "ExtensionInstallBlacklist": "could interfere with kernel extensions", - "ExtensionInstallAllowlist": "could interfere with kernel extensions", - "ExtensionInstallWhitelist": "could interfere with kernel extensions", - "BlockExternalExtensions": "could interfere with kernel extensions", - "ExtensionAllowedTypes": "could interfere with kernel extensions", - "ExtensionInstallSources": "could interfere with kernel extensions", + "ExtensionInstallBlocklist": "could interfere with kernel extensions", + "ExtensionInstallBlacklist": "could interfere with kernel extensions", + "ExtensionInstallAllowlist": "could interfere with kernel extensions", + "ExtensionInstallWhitelist": "could interfere with kernel extensions", + "BlockExternalExtensions": "could interfere with kernel extensions", + "ExtensionAllowedTypes": "could interfere with kernel extensions", + "ExtensionInstallSources": "could interfere with kernel extensions", "ExtensionManifestV2Availability": "could interfere with kernel extensions", // Required for CDP / automation - "RemoteDebuggingAllowed": "required for CDP connectivity", - "DeveloperToolsAvailability": "required for CDP connectivity", - "DeveloperToolsDisabled": "required for CDP connectivity", - "DeveloperToolsAvailabilityAllowlist": "required for CDP connectivity", - "DeveloperToolsAvailabilityBlocklist": "required for CDP connectivity", - "ChromeForTestingAllowed": "required for automation", + "RemoteDebuggingAllowed": "required for CDP connectivity", + "DeveloperToolsAvailability": "required for CDP connectivity", + "DeveloperToolsDisabled": "required for CDP connectivity", + "DeveloperToolsAvailabilityAllowlist": "required for CDP connectivity", + "DeveloperToolsAvailabilityBlocklist": "required for CDP connectivity", + "ChromeForTestingAllowed": "required for automation", "WebDriverOverridesIncompatiblePolicies": "required for automation", // Proxy is managed via kernel's proxy feature - "ProxySettings": "use kernel's proxy API instead", - "ProxyMode": "use kernel's proxy API instead", - "ProxyServer": "use kernel's proxy API instead", + "ProxySettings": "use kernel's proxy API instead", + "ProxyMode": "use kernel's proxy API instead", + "ProxyServer": "use kernel's proxy API instead", "ProxyBypassList": "use kernel's proxy API instead", - "ProxyPacUrl": "use kernel's proxy API instead", + "ProxyPacUrl": "use kernel's proxy API instead", } // policyRegistry maps all Chromium enterprise policies supported on Linux diff --git a/server/lib/policy/overrides_test.go b/server/lib/policy/overrides_test.go index a23466f0..66eb3f98 100644 --- a/server/lib/policy/overrides_test.go +++ b/server/lib/policy/overrides_test.go @@ -10,12 +10,12 @@ import ( func TestChromiumPolicyOverrides_Validate_ValidPolicies(t *testing.T) { overrides := ChromiumPolicyOverrides{ - "DefaultCookiesSetting": json.RawMessage(`1`), + "DefaultCookiesSetting": json.RawMessage(`1`), "BasicAuthOverHttpEnabled": json.RawMessage(`true`), - "HttpsUpgradesEnabled": json.RawMessage(`false`), - "NewTabPageLocation": json.RawMessage(`"https://example.com"`), - "PopupsAllowedForUrls": json.RawMessage(`["https://example.com", "https://test.com"]`), - "MaxConnectionsPerProxy": json.RawMessage(`32`), + "HttpsUpgradesEnabled": json.RawMessage(`false`), + "NewTabPageLocation": json.RawMessage(`"https://example.com"`), + "PopupsAllowedForUrls": json.RawMessage(`["https://example.com", "https://test.com"]`), + "MaxConnectionsPerProxy": json.RawMessage(`32`), } err := overrides.Validate() @@ -49,10 +49,10 @@ func TestChromiumPolicyOverrides_Validate_BlockedPolicies(t *testing.T) { func TestChromiumPolicyOverrides_Validate_WrongTypes(t *testing.T) { tests := []struct { - name string - policy string - value string - wantErr string + name string + policy string + value string + wantErr string }{ { name: "bool expected, got string", @@ -181,8 +181,8 @@ func TestChromiumPolicyOverrides_Validate_DictPolicy(t *testing.T) { func TestChromiumPolicyOverrides_Validate_MultipleErrors(t *testing.T) { overrides := ChromiumPolicyOverrides{ - "ExtensionSettings": json.RawMessage(`{}`), - "DefaultCookiesSetting": json.RawMessage(`"wrong"`), + "ExtensionSettings": json.RawMessage(`{}`), + "DefaultCookiesSetting": json.RawMessage(`"wrong"`), "PasswordManagerEnabled": json.RawMessage(`1`), } diff --git a/server/lib/wsproxy/wsproxy.go b/server/lib/wsproxy/wsproxy.go index f69f792a..8a14e490 100644 --- a/server/lib/wsproxy/wsproxy.go +++ b/server/lib/wsproxy/wsproxy.go @@ -30,19 +30,36 @@ type ProxyOptions struct { Transform MessageTransform } +// PumpExitCause names which side caused Pump to return. Callers use this to +// distinguish a clean client close from an upstream failure or context +// cancellation when deciding telemetry attribution and reconnect policy. +type PumpExitCause string + +const ( + // PumpExitClient indicates the client-side read or upstream-side write + // returned an error first (typically: client closed the WS). + PumpExitClient PumpExitCause = "client" + // PumpExitUpstream indicates the upstream-side read or client-side write + // returned an error first (typically: upstream died or restarted). + PumpExitUpstream PumpExitCause = "upstream" + // PumpExitContext indicates the pump's context was cancelled before + // either side errored (typically: server shutdown). + PumpExitContext PumpExitCause = "context" +) + // Pump bidirectionally copies messages between client and upstream until -// either side errors or ctx is cancelled, then calls onClose. +// either side errors or ctx is cancelled, then calls onClose with the cause. // If transform is non-nil it is called for every message; the returned bytes // are forwarded to the other side. -func Pump(ctx context.Context, client, upstream Conn, onClose func(), logger *slog.Logger, transform MessageTransform) { - errChan := make(chan error, 2) +func Pump(ctx context.Context, client, upstream Conn, onClose func(cause PumpExitCause), logger *slog.Logger, transform MessageTransform) { + causeChan := make(chan PumpExitCause, 2) go func() { for { mt, msg, err := client.Read(ctx) if err != nil { logger.Error("client read error", slog.String("err", err.Error())) - errChan <- err + causeChan <- PumpExitClient return } if transform != nil { @@ -50,7 +67,7 @@ func Pump(ctx context.Context, client, upstream Conn, onClose func(), logger *sl } if err := upstream.Write(ctx, mt, msg); err != nil { logger.Error("upstream write error", slog.String("err", err.Error())) - errChan <- err + causeChan <- PumpExitUpstream return } } @@ -61,7 +78,7 @@ func Pump(ctx context.Context, client, upstream Conn, onClose func(), logger *sl mt, msg, err := upstream.Read(ctx) if err != nil { logger.Error("upstream read error", slog.String("err", err.Error())) - errChan <- err + causeChan <- PumpExitUpstream return } if transform != nil { @@ -69,17 +86,19 @@ func Pump(ctx context.Context, client, upstream Conn, onClose func(), logger *sl } if err := client.Write(ctx, mt, msg); err != nil { logger.Error("client write error", slog.String("err", err.Error())) - errChan <- err + causeChan <- PumpExitClient return } } }() + var cause PumpExitCause select { case <-ctx.Done(): - case <-errChan: + cause = PumpExitContext + case cause = <-causeChan: } - onClose() + onClose(cause) } // Proxy accepts a client WebSocket upgrade, dials the upstream URL, and pumps @@ -113,7 +132,7 @@ func Proxy(w http.ResponseWriter, r *http.Request, upstreamURL string, opts Prox logger.Debug("proxying websocket", slog.String("url", upstreamURL)) var once sync.Once - cleanup := func() { + cleanup := func(_ PumpExitCause) { once.Do(func() { upstreamConn.Close(websocket.StatusNormalClosure, "") clientConn.Close(websocket.StatusNormalClosure, "") diff --git a/server/scripts/concurrent_stop_test/main.go b/server/scripts/concurrent_stop_test/main.go index d02194cf..dbdf74d4 100644 --- a/server/scripts/concurrent_stop_test/main.go +++ b/server/scripts/concurrent_stop_test/main.go @@ -15,8 +15,8 @@ import ( "time" retry "github.com/avast/retry-go/v5" - "github.com/nrednav/cuid2" oapi "github.com/kernel/kernel-images/server/lib/oapi" + "github.com/nrednav/cuid2" ) func main() {