diff --git a/backend/go.mod b/backend/go.mod index cee5b42f80..55657904d7 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -3,6 +3,7 @@ module github.com/aoagents/agent-orchestrator/backend go 1.25.7 require ( + github.com/Microsoft/go-winio v0.6.2 github.com/aymanbagabas/go-pty v0.2.3 github.com/coder/websocket v1.8.14 github.com/creack/pty v1.1.24 diff --git a/backend/go.sum b/backend/go.sum index 56fcf87f3f..a7e9d8331b 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -1,3 +1,5 @@ +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/aymanbagabas/go-pty v0.2.3 h1:hsqcTIUV8I4iTSh3HQl61CR2wh0YPS6gHOYLhAfWu/E= github.com/aymanbagabas/go-pty v0.2.3/go.mod h1:GLkgQovzqN5A1xMB79yHWiG1rhcquZCjkwKQGKFPdPg= github.com/bool64/dev v0.2.43 h1:yQ7qiZVef6WtCl2vDYU0Y+qSq+0aBrQzY8KXkklk9cQ= diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 3603fbe9ba..d565a16c5a 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -15,6 +15,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/runtimeselect" "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/daemon/supervisor" "github.com/aoagents/agent-orchestrator/backend/internal/httpd" "github.com/aoagents/agent-orchestrator/backend/internal/notify" "github.com/aoagents/agent-orchestrator/backend/internal/ports" @@ -149,21 +150,30 @@ func Run() error { log.Error("reconcile sessions on boot failed", "err", reconcileErr) } + // ponytail: 5s tolerates a brief frontend restart; tune if dev hot-reload trips it. + const supervisorGrace = 5 * time.Second + + if ln, addr, err := supervisor.Listen(cfg.RunFilePath); err != nil { + // Non-fatal: without the link the daemon still works (e.g. headless "ao start"), + // it just will not auto-stop when a frontend dies. Do not block startup on it. + log.Warn("supervisor: listener unavailable; frontend-death auto-stop disabled", "err", err) + } else { + log.Info("supervisor: listening", "addr", addr) + sup := supervisor.New(supervisorGrace, srv.RequestShutdown, log) + go func() { + if err := sup.Serve(ctx, ln); err != nil { + log.Warn("supervisor: serve stopped with error", "err", err) + } + }() + } + runErr := srv.Run(ctx) - // Save and tear down all live sessions before the store closes. Both SIGTERM - // and POST /shutdown funnel through srv.Run returning (SIGTERM cancels ctx, - // which srv.Run selects on; POST /shutdown closes the shutdownRequested channel, - // which srv.Run also selects on), so this single call site covers both paths. - // - // Use a fresh context with a bounded deadline: the ctx that caused srv.Run - // to return is already cancelled, so passing it would abort the save - // immediately and leave every session unsaved. - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownSaveTimeout) - defer shutdownCancel() - if saveErr := sessMgr.SaveAndTeardownAll(shutdownCtx); saveErr != nil { - log.Error("save sessions on shutdown failed", "err", saveErr) - } + // Both graceful shutdown paths (SIGTERM and POST /shutdown) funnel through + // srv.Run returning. We deliberately do NOT tear down sessions here: they + // survive the daemon exit and the next boot's Reconcile adopts them, + // preserving session IDs. The narrowed sessionLifecycle interface makes + // teardown-on-shutdown a compile error. // Shut the background goroutines down in order: cancel the context FIRST so // their loops exit, then wait for them to drain. Doing this explicitly (not @@ -178,10 +188,6 @@ func Run() error { return runErr } -// shutdownSaveTimeout bounds the SaveAndTeardownAll call on shutdown so a -// pathological session cannot stall the process exit indefinitely. -const shutdownSaveTimeout = 30 * time.Second - // newLogger returns the daemon's slog logger. It writes to stderr so supervisors // can capture it separately from any structured stdout protocol added later. func newLogger() *slog.Logger { diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 676dcb8e70..c5391fe197 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -61,18 +61,21 @@ func (l *lifecycleStack) Stop() { // sessionLifecycle is the narrow surface of sessionmanager.Manager used for // boot/shutdown wiring. A minimal interface keeps the daemon testable without // depending on the concrete manager type. +// +// SaveAndTeardownAll is deliberately ABSENT from this interface so the daemon +// cannot tear down live sessions on shutdown. Sessions survive the daemon exit +// and Reconcile on the next boot adopts them, preserving session IDs. Re-adding +// the method here is a visible, reviewable interface change. type sessionLifecycle interface { Reconcile(ctx context.Context) error RestoreAll(ctx context.Context) error - SaveAndTeardownAll(ctx context.Context) error } // startSession builds the controller-facing session service: a session manager // over the selected runtime, a per-session gitworktree workspace, the shared // store + LCM, the per-session agent resolver, and the agent messenger. The // returned service is mounted at httpd APIDeps.Sessions. It also returns the -// manager so the caller can wire Reconcile/SaveAndTeardownAll into the -// boot/shutdown sequence. +// manager so the caller can wire Reconcile into the boot sequence. func startSession(cfg config.Config, runtime runtimeselect.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, telemetry ports.EventSink, log *slog.Logger) (*sessionsvc.Service, reviewsvc.Manager, sessionLifecycle, error) { defaultAgent := cfg.Agent if defaultAgent == "" { diff --git a/backend/internal/daemon/supervisor/listen_unix.go b/backend/internal/daemon/supervisor/listen_unix.go new file mode 100644 index 0000000000..159c7e9737 --- /dev/null +++ b/backend/internal/daemon/supervisor/listen_unix.go @@ -0,0 +1,25 @@ +//go:build !windows + +package supervisor + +import ( + "net" + "os" + "path/filepath" +) + +// Listen creates a Unix domain socket listener alongside the run-file. +// The socket path is a sibling of runFilePath: /supervise.sock. +// Any stale socket file is removed before binding (ignored if absent). +// The returned net.Listener unlinks the socket on Close (Go stdlib default for UnixListener). +// Returns (listener, socketPath, error). +func Listen(runFilePath string) (net.Listener, string, error) { + sockPath := filepath.Join(filepath.Dir(runFilePath), "supervise.sock") + // Remove stale socket; ignore not-exist error. + _ = os.Remove(sockPath) + ln, err := net.Listen("unix", sockPath) + if err != nil { + return nil, "", err + } + return ln, sockPath, nil +} diff --git a/backend/internal/daemon/supervisor/listen_unix_test.go b/backend/internal/daemon/supervisor/listen_unix_test.go new file mode 100644 index 0000000000..c4888325d9 --- /dev/null +++ b/backend/internal/daemon/supervisor/listen_unix_test.go @@ -0,0 +1,81 @@ +//go:build !windows + +package supervisor + +import ( + "net" + "os" + "path/filepath" + "testing" +) + +// TestListen_basic verifies that Listen returns a listener whose address is +// /supervise.sock, that the socket file exists on disk, and +// that a Dial to that address succeeds. +func TestListen_basic(t *testing.T) { + t.Parallel() + dir := t.TempDir() + runFile := filepath.Join(dir, "running.json") + + ln, addr, err := Listen(runFile) + if err != nil { + t.Fatalf("Listen: unexpected error: %v", err) + } + defer ln.Close() + + wantSock := filepath.Join(dir, "supervise.sock") + if addr != wantSock { + t.Errorf("addr = %q, want %q", addr, wantSock) + } + + // Socket file must exist after Listen. + if _, err := os.Stat(wantSock); err != nil { + t.Errorf("socket file missing after Listen: %v", err) + } + + // Dialing the returned address must succeed. + conn, err := net.Dial("unix", addr) + if err != nil { + t.Fatalf("Dial(%q): %v", addr, err) + } + conn.Close() +} + +// TestListen_staleSocket verifies that a pre-existing file at the socket path +// does not prevent Listen from succeeding (the stale file is removed first). +func TestListen_staleSocket(t *testing.T) { + t.Parallel() + dir := t.TempDir() + runFile := filepath.Join(dir, "running.json") + sockPath := filepath.Join(dir, "supervise.sock") + + // Pre-create a regular file to simulate a stale socket. + if err := os.WriteFile(sockPath, []byte("stale"), 0o600); err != nil { + t.Fatalf("pre-create stale file: %v", err) + } + + ln, _, err := Listen(runFile) + if err != nil { + t.Fatalf("Listen with stale socket: unexpected error: %v", err) + } + ln.Close() +} + +// TestListen_unlinkOnClose verifies that closing the listener removes the +// socket file from the filesystem (Go stdlib default for UnixListener). +func TestListen_unlinkOnClose(t *testing.T) { + t.Parallel() + dir := t.TempDir() + runFile := filepath.Join(dir, "running.json") + sockPath := filepath.Join(dir, "supervise.sock") + + ln, _, err := Listen(runFile) + if err != nil { + t.Fatalf("Listen: %v", err) + } + ln.Close() + + if _, err := os.Stat(sockPath); !os.IsNotExist(err) { + t.Errorf("socket file still present after Close (err=%v); expected not-exist", err) + } +} diff --git a/backend/internal/daemon/supervisor/listen_windows.go b/backend/internal/daemon/supervisor/listen_windows.go new file mode 100644 index 0000000000..52fa226242 --- /dev/null +++ b/backend/internal/daemon/supervisor/listen_windows.go @@ -0,0 +1,23 @@ +//go:build windows + +package supervisor + +import ( + "net" + + "github.com/Microsoft/go-winio" +) + +const pipeName = `\\.\pipe\ao-supervise` + +// Listen creates a Windows named pipe listener for the supervisor watchdog. +// runFilePath is ignored on Windows: named pipes are global and identified +// by name only. +// ponytail: global pipe name; add a per-instance suffix if multiple daemons must coexist on one machine. +func Listen(_ string) (net.Listener, string, error) { + ln, err := winio.ListenPipe(pipeName, nil) + if err != nil { + return nil, "", err + } + return ln, pipeName, nil +} diff --git a/backend/internal/daemon/supervisor/supervisor.go b/backend/internal/daemon/supervisor/supervisor.go new file mode 100644 index 0000000000..72c3a7acd7 --- /dev/null +++ b/backend/internal/daemon/supervisor/supervisor.go @@ -0,0 +1,162 @@ +// Package supervisor provides a transport-agnostic watchdog that fires a +// callback when the last connected client disconnects and stays gone for a +// configurable grace period. It arms only after the FIRST client ever +// connects so a daemon started with no frontend (e.g. CLI "ao start") never +// self-stops. +// +// This package is a leaf: it imports only stdlib. +package supervisor + +import ( + "context" + "errors" + "log/slog" + "net" + "sync" + "time" +) + +// acceptRetryBackoff bounds the retry after a transient Accept error so a +// persistent failure cannot hot-spin the accept loop. +const acceptRetryBackoff = 200 * time.Millisecond + +// Supervisor watches connections on a net.Listener and calls onLastClientGone +// exactly once (per process lifetime) when the live-count drops to zero and +// stays zero for the grace period. +// +// Concurrency model: +// - mu guards liveCount, armed, and pendingTimer. +// - armed flips to true on the first accepted connection and never resets; +// it is the "headless-safety" gate that prevents a pre-connect fire. +// - pendingTimer holds the *time.Timer from time.AfterFunc so it can be +// stopped on reconnect. A non-nil pendingTimer means a grace countdown is +// running. +// - fireOnce ensures onLastClientGone is called at most once for the entire +// process lifetime, even if the timer fires concurrently with a reconnect. +type Supervisor struct { + grace time.Duration + onLastClientGone func() + log *slog.Logger + + mu sync.Mutex + liveCount int + armed bool // true once any connection has been accepted + pendingTimer *time.Timer // non-nil while grace countdown is running + + fireOnce sync.Once +} + +// New creates a Supervisor. grace is the delay before the callback fires after +// the last connection closes. onLastClientGone is called at most once for the +// process lifetime, so it is safe to use it to trigger os.Exit or context +// cancellation. +func New(grace time.Duration, onLastClientGone func(), log *slog.Logger) *Supervisor { + return &Supervisor{ + grace: grace, + onLastClientGone: onLastClientGone, + log: log, + } +} + +// Serve runs the accept loop on ln until ctx is cancelled or ln is closed. +// It returns nil on a clean shutdown (context cancelled or listener closed +// normally); it only returns a non-nil error for unexpected Accept failures. +func (s *Supervisor) Serve(ctx context.Context, ln net.Listener) error { + // Derive a cancellable context so the watcher goroutine always unblocks + // when Serve returns, even if ctx itself is not cancelled (e.g. listener + // closed directly). + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Close the listener when ctx is cancelled so Accept() unblocks. + go func() { + <-ctx.Done() + _ = ln.Close() + }() + + for { + conn, err := ln.Accept() + if err != nil { + // A closed listener or context cancellation is a clean stop. + select { + case <-ctx.Done(): + return nil + default: + } + // net.ErrClosed is what real listeners return when closed normally. + if errors.Is(err, net.ErrClosed) { + return nil + } + // A transient Accept error (e.g. EMFILE) must NOT silently kill the + // watchdog: that would leave the daemon unable to self-stop on + // frontend death. Back off briefly and keep accepting. A genuinely + // closed listener returns net.ErrClosed (handled above) or trips + // ctx.Done during the backoff. + s.log.Warn("supervisor: accept error, retrying", "err", err) + select { + case <-ctx.Done(): + return nil + case <-time.After(acceptRetryBackoff): + } + continue + } + + s.mu.Lock() + s.armed = true + s.liveCount++ + // If a grace timer was pending (reconnect before grace elapsed), cancel it. + if s.pendingTimer != nil { + s.pendingTimer.Stop() + s.pendingTimer = nil + } + live := s.liveCount + s.mu.Unlock() + + s.log.Debug("supervisor: client connected", "liveCount", live) + go s.watchConn(conn) + } +} + +// watchConn drains conn (reads into a scratch buffer) purely to detect close. +// When read returns io.EOF or any error, the connection is gone. +func (s *Supervisor) watchConn(conn net.Conn) { + // ponytail: 32-byte scratch buffer; we never process the payload. + scratch := make([]byte, 32) + for { + _, err := conn.Read(scratch) + if err != nil { + break + } + } + _ = conn.Close() + + s.mu.Lock() + s.liveCount-- + live := s.liveCount + armed := s.armed + s.mu.Unlock() + + s.log.Debug("supervisor: client disconnected", "liveCount", live) + + if armed && live == 0 { + s.armGrace() + } +} + +// armGrace starts the grace countdown. If another client connects before it +// elapses, Serve() will Stop() the timer via pendingTimer. +func (s *Supervisor) armGrace() { + s.mu.Lock() + s.pendingTimer = time.AfterFunc(s.grace, func() { + s.mu.Lock() + live := s.liveCount + s.pendingTimer = nil + s.mu.Unlock() + + if live == 0 { + s.log.Info("supervisor: last client gone; grace elapsed, firing callback") + s.fireOnce.Do(s.onLastClientGone) + } + }) + s.mu.Unlock() +} diff --git a/backend/internal/daemon/supervisor/supervisor_test.go b/backend/internal/daemon/supervisor/supervisor_test.go new file mode 100644 index 0000000000..c3195b316a --- /dev/null +++ b/backend/internal/daemon/supervisor/supervisor_test.go @@ -0,0 +1,250 @@ +// Package supervisor_test exercises the Supervisor watchdog via in-process +// net.Pipe connections so no real OS sockets are needed. +package supervisor_test + +import ( + "context" + "io" + "log/slog" + "net" + "sync" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/daemon/supervisor" +) + +// fakeListener queues pre-made conns and blocks (or returns a closed error) +// once the queue is drained. Close() unblocks any pending Accept(). +type fakeListener struct { + mu sync.Mutex + conns []net.Conn + closed bool + ready chan struct{} // closed when a conn is enqueued or the listener is closed +} + +func newFakeListener() *fakeListener { + return &fakeListener{ready: make(chan struct{}, 1)} +} + +// enqueue adds a conn for the next Accept() call. +func (fl *fakeListener) enqueue(c net.Conn) { + fl.mu.Lock() + fl.conns = append(fl.conns, c) + fl.mu.Unlock() + select { + case fl.ready <- struct{}{}: + default: + } +} + +func (fl *fakeListener) Accept() (net.Conn, error) { + for { + fl.mu.Lock() + if fl.closed { + fl.mu.Unlock() + return nil, net.ErrClosed // signals Serve to stop + } + if len(fl.conns) > 0 { + c := fl.conns[0] + fl.conns = fl.conns[1:] + fl.mu.Unlock() + return c, nil + } + fl.mu.Unlock() + // drain the ready channel so we can block below + select { + case <-fl.ready: + default: + } + // wait for a new conn or a close signal + <-fl.ready + } +} + +func (fl *fakeListener) Close() error { + fl.mu.Lock() + fl.closed = true + fl.mu.Unlock() + select { + case fl.ready <- struct{}{}: + default: + } + return nil +} + +func (fl *fakeListener) Addr() net.Addr { return &net.UnixAddr{Name: "fake", Net: "unix"} } + +// noopLogger returns a slog.Logger that discards all output. +func noopLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +const testGrace = 30 * time.Millisecond + +// comfortWait is how long we wait when asserting the callback did NOT fire. +// It must be strictly greater than testGrace so a real timer would have fired. +const comfortWait = testGrace * 5 + +// TestNeverFiresPreConnect: start Serve with no connections, wait well past +// grace, assert callback was NOT called. +func TestNeverFiresPreConnect(t *testing.T) { + t.Parallel() + + fired := make(chan struct{}) + cb := func() { close(fired) } + + s := supervisor.New(testGrace, cb, noopLogger()) + ln := newFakeListener() + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- s.Serve(ctx, ln) }() + + // wait comfortably past grace with no connections ever accepted + time.Sleep(comfortWait) + + select { + case <-fired: + t.Fatal("callback fired before any client ever connected") + default: + } + + cancel() + _ = ln.Close() + <-done +} + +// TestFiresOnceAfterGrace: connect one client, close it, assert the callback +// fires exactly once within a reasonable window. +func TestFiresOnceAfterGrace(t *testing.T) { + t.Parallel() + + fireCount := 0 + var mu sync.Mutex + fired := make(chan struct{}) + cb := func() { + mu.Lock() + fireCount++ + mu.Unlock() + // close is safe even if called once, but use a Once-guarded close via + // a sync.Once in the real impl; here we just close the channel once + select { + case fired <- struct{}{}: + default: + } + } + + s := supervisor.New(testGrace, cb, noopLogger()) + ln := newFakeListener() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- s.Serve(ctx, ln) }() + + // create a pipe, enqueue the server-side end + serverConn, clientConn := makePipe() + ln.enqueue(serverConn) + + // close the client side to signal disconnect + _ = clientConn.Close() + + // wait for the callback within a bounded window + select { + case <-fired: + // good + case <-time.After(comfortWait * 2): + t.Fatal("callback did not fire after client disconnected and grace elapsed") + } + + // close and wait a bit more to make sure it only fires once + time.Sleep(comfortWait) + mu.Lock() + count := fireCount + mu.Unlock() + if count != 1 { + t.Fatalf("expected callback to fire exactly once, got %d", count) + } + + cancel() + _ = ln.Close() + <-done +} + +// TestReconnectWithinGraceCancels: connect, disconnect (arms grace), reconnect +// before grace elapses, wait past grace, assert callback NOT called. Then +// disconnect again and assert it DOES fire. +func TestReconnectWithinGraceCancels(t *testing.T) { + t.Parallel() + + fireCount := 0 + var mu sync.Mutex + fired := make(chan struct{}, 1) + cb := func() { + mu.Lock() + fireCount++ + mu.Unlock() + select { + case fired <- struct{}{}: + default: + } + } + + s := supervisor.New(testGrace, cb, noopLogger()) + ln := newFakeListener() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- s.Serve(ctx, ln) }() + + // --- first connection --- + serverConn1, clientConn1 := makePipe() + ln.enqueue(serverConn1) + // small sleep so the server-side accept loop picks up the first conn + time.Sleep(5 * time.Millisecond) + + // disconnect first client: this arms grace + _ = clientConn1.Close() + + // reconnect immediately (well within grace period) before grace elapses + serverConn2, clientConn2 := makePipe() + ln.enqueue(serverConn2) + + // wait well past grace: grace should have been cancelled by the reconnect + time.Sleep(comfortWait) + + select { + case <-fired: + t.Fatal("callback fired even though a client reconnected before grace elapsed") + default: + } + + // now disconnect the second client: grace re-arms, callback should fire + _ = clientConn2.Close() + + select { + case <-fired: + // good + case <-time.After(comfortWait * 2): + t.Fatal("callback did not fire after second client disconnected and grace elapsed") + } + + mu.Lock() + count := fireCount + mu.Unlock() + if count != 1 { + t.Fatalf("expected exactly one callback fire (process-lifetime once), got %d", count) + } + + cancel() + _ = ln.Close() + <-done +} + +// makePipe returns a server-side and client-side net.Conn pair via net.Pipe. +func makePipe() (net.Conn, net.Conn) { + s, c := net.Pipe() + return s, c +} diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index 21dbcbd59f..c5a24402d7 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -381,16 +381,14 @@ func TestProjectRepoResolver_ResolvesRegisteredProject(t *testing.T) { } } -// fakeSessionLifecycle records calls to Reconcile, RestoreAll, and -// SaveAndTeardownAll so tests can assert the daemon wiring invokes the correct -// methods without needing a real runtime or worktree. +// fakeSessionLifecycle records calls to Reconcile and RestoreAll so tests can +// assert the daemon wiring invokes the correct methods without needing a real +// runtime or worktree. type fakeSessionLifecycle struct { - reconcileCalled bool - restoreAllCalled bool - saveAndTeardownCalled bool - reconcileErr error - restoreErr error - saveErr error + reconcileCalled bool + restoreAllCalled bool + reconcileErr error + restoreErr error } func (f *fakeSessionLifecycle) Reconcile(_ context.Context) error { @@ -403,16 +401,10 @@ func (f *fakeSessionLifecycle) RestoreAll(_ context.Context) error { return f.restoreErr } -func (f *fakeSessionLifecycle) SaveAndTeardownAll(_ context.Context) error { - f.saveAndTeardownCalled = true - return f.saveErr -} - // TestWiring_SessionLifecycleInterfaceInvokedByDaemon asserts the // sessionLifecycle interface is satisfied by *sessionmanager.Manager (compile -// check) and that Reconcile, RestoreAll, and SaveAndTeardownAll dispatch -// correctly through the interface, matching what daemon.go wires at -// boot/shutdown. +// check) and that Reconcile and RestoreAll dispatch correctly through the +// interface, matching what daemon.go wires at boot. func TestWiring_SessionLifecycleInterfaceInvokedByDaemon(t *testing.T) { // Verify *sessionmanager.Manager satisfies the interface at compile time. var _ sessionLifecycle = (*sessionmanager.Manager)(nil) @@ -437,11 +429,4 @@ func TestWiring_SessionLifecycleInterfaceInvokedByDaemon(t *testing.T) { if !fake.restoreAllCalled { t.Fatal("RestoreAll was not called through the interface") } - - if err := sl.SaveAndTeardownAll(ctx); err != nil { - t.Fatalf("SaveAndTeardownAll: %v", err) - } - if !fake.saveAndTeardownCalled { - t.Fatal("SaveAndTeardownAll was not called through the interface") - } } diff --git a/backend/internal/httpd/server.go b/backend/internal/httpd/server.go index 58f6d5bd9e..e770db9b87 100644 --- a/backend/internal/httpd/server.go +++ b/backend/internal/httpd/server.go @@ -89,6 +89,7 @@ func (s *Server) Run(ctx context.Context) error { PID: os.Getpid(), Port: s.boundPort(), StartedAt: time.Now().UTC(), + Owner: os.Getenv("AO_OWNER"), } if err := runfile.Write(s.cfg.RunFilePath, info); err != nil { _ = s.listen.Close() @@ -148,3 +149,7 @@ func (s *Server) requestShutdown() { close(s.shutdownRequested) }) } + +// RequestShutdown triggers the same clean shutdown as POST /shutdown: it makes +// Run return so the daemon exits without tearing down sessions. Idempotent. +func (s *Server) RequestShutdown() { s.requestShutdown() } diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index 4b70fb2528..fcfe64d11c 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -208,8 +208,10 @@ func TestRestoreRoundTripPreservesMetadata(t *testing.T) { // TestReconcile_TerminatesDeadLiveSessionAndReapsLeakedTmux exercises // Manager.Reconcile against a real sqlite.Store: // -// - Session A: is_terminated=0 but its runtime is GONE => Reconcile must -// mark it terminated in the DB. +// - Session A: is_terminated=0 but its runtime is GONE and it is a promptless +// KindWorker. reconcileLive marks it terminated. RestoreAll does NOT relaunch it +// (ErrNotResumable: no prompt, no session id, not an orchestrator). End state: +// is_terminated=true, runtime.Create count stays 0. // - Session B: is_terminated=1 but its runtime is still ALIVE (leaked teardown) // => Reconcile must call Destroy on its handle. func TestReconcile_TerminatesDeadLiveSessionAndReapsLeakedTmux(t *testing.T) { @@ -274,7 +276,10 @@ func TestReconcile_TerminatesDeadLiveSessionAndReapsLeakedTmux(t *testing.T) { t.Fatalf("Reconcile: %v", err) } - // Session A must now be terminated in the store. + // Session A is a promptless KindWorker: reconcileLive captured its work and + // marked it terminated. RestoreAll skips it (ErrNotResumable: no prompt, no + // AgentSessionID, not an orchestrator). End state: is_terminated=true, no fresh + // runtime.Create (a blank relaunch would silently lose its task). gotA, ok, err := st.store.GetSession(ctx, recA.ID) if err != nil { t.Fatalf("get session A: %v", err) @@ -283,7 +288,11 @@ func TestReconcile_TerminatesDeadLiveSessionAndReapsLeakedTmux(t *testing.T) { t.Fatalf("session A: not found after Reconcile") } if !gotA.IsTerminated { - t.Fatalf("session A: want is_terminated=true after Reconcile, got false") + t.Fatalf("session A: want terminated (is_terminated=true) after crash recovery of promptless worker, got live") + } + // No runtime.Create: a promptless worker must not be blank-relaunched. + if st.rt.created != 0 { + t.Fatalf("want 0 runtime Creates (promptless worker must not relaunch), got %d", st.rt.created) } // Session B's leaked runtime must have been destroyed. diff --git a/backend/internal/runfile/runfile.go b/backend/internal/runfile/runfile.go index 92718d34c1..f2383044df 100644 --- a/backend/internal/runfile/runfile.go +++ b/backend/internal/runfile/runfile.go @@ -24,6 +24,11 @@ type Info struct { Port int `json:"port"` // StartedAt is when the daemon came up (RFC 3339). StartedAt time.Time `json:"startedAt"` + // Owner is "app" when the desktop Electron app spawned this daemon; empty + // for a headless `ao start` daemon. Used by the app to decide whether to + // hold a supervisor link on attach (app-owned: re-link; headless: skip so + // the daemon stays persistent across app quit). + Owner string `json:"owner,omitempty"` } // Write atomically writes running.json at path, creating parent directories diff --git a/backend/internal/runfile/runfile_test.go b/backend/internal/runfile/runfile_test.go index 6a926874b1..87b62b320f 100644 --- a/backend/internal/runfile/runfile_test.go +++ b/backend/internal/runfile/runfile_test.go @@ -30,6 +30,42 @@ func TestWriteReadRoundTrip(t *testing.T) { // running.json from a crashed predecessor must be replaced cleanly. POSIX // rename(2) handles this natively; Windows needs MoveFileEx with // MOVEFILE_REPLACE_EXISTING — atomicReplace gives us both. +func TestWriteReadRoundTripOwner(t *testing.T) { + path := filepath.Join(t.TempDir(), "running.json") + + // app-owned daemon: Owner round-trips as "app". + want := Info{PID: 1, Port: 3001, Owner: "app"} + if err := Write(path, want); err != nil { + t.Fatalf("Write: %v", err) + } + got, err := Read(path) + if err != nil { + t.Fatalf("Read: %v", err) + } + if got == nil { + t.Fatal("Read returned nil for an existing file") + } + if got.Owner != "app" { + t.Errorf("Owner round trip: got %q, want %q", got.Owner, "app") + } + + // headless daemon: Owner is empty (omitempty), round-trips as "". + headless := Info{PID: 2, Port: 3002} + if err := Write(path, headless); err != nil { + t.Fatalf("Write headless: %v", err) + } + got, err = Read(path) + if err != nil { + t.Fatalf("Read headless: %v", err) + } + if got == nil { + t.Fatal("Read returned nil for headless file") + } + if got.Owner != "" { + t.Errorf("headless Owner round trip: got %q, want %q", got.Owner, "") + } +} + func TestWriteOverwritesExisting(t *testing.T) { path := filepath.Join(t.TempDir(), "running.json") diff --git a/backend/internal/service/session/service.go b/backend/internal/service/session/service.go index facfc7bdce..9f092d2e32 100644 --- a/backend/internal/service/session/service.go +++ b/backend/internal/service/session/service.go @@ -88,7 +88,7 @@ type Service struct { telemetry ports.EventSink // signalCapable reports whether a harness has a hook pipeline that can // deliver activity signals at all. Only capable harnesses are eligible for - // the no_signal downgrade — a hook-less harness staying silent forever is + // the no_signal downgrade: a hook-less harness staying silent forever is // normal, not a broken pipeline. nil means "unknown": never downgrade. signalCapable func(domain.AgentHarness) bool } @@ -166,7 +166,7 @@ func (s *Service) requireProject(ctx context.Context, id domain.ProjectID) (doma return domain.ProjectRecord{}, fmt.Errorf("get project %s: %w", id, err) } if !ok { - return domain.ProjectRecord{}, apierr.NotFound("PROJECT_NOT_FOUND", "Unknown project — register it with `ao project add`") + return domain.ProjectRecord{}, apierr.NotFound("PROJECT_NOT_FOUND", "Unknown project. Register it with `ao project add`") } return rec, nil } @@ -258,11 +258,12 @@ func (s *Service) emitSpawnFailed(cfg ports.SpawnConfig, err error, durationMs i // SpawnOrchestrator spawns an orchestrator session for a project. When clean is // true it first tears down any active orchestrator(s) for that project so the new -// one is the only live coordinator — a business rule that belongs here, not in the -// HTTP controller. +// one is the only live coordinator. When clean is false it is idempotent: if an +// active orchestrator already exists it is returned as-is. A business rule that +// belongs here, not in the HTTP controller. func (s *Service) SpawnOrchestrator(ctx context.Context, projectID domain.ProjectID, clean bool) (domain.Session, error) { + active := true if clean { - active := true existing, err := s.List(ctx, ListFilter{ProjectID: projectID, Active: &active, OrchestratorOnly: true}) if err != nil { return domain.Session{}, err @@ -272,6 +273,15 @@ func (s *Service) SpawnOrchestrator(ctx context.Context, projectID domain.Projec return domain.Session{}, err } } + } else { + // ponytail: check-then-spawn is not atomic; fine for the single-frontend ensure-on-load case. Upgrade path: a partial unique index on (project_id) where kind=orchestrator and not terminated. + existing, err := s.List(ctx, ListFilter{ProjectID: projectID, Active: &active, OrchestratorOnly: true}) + if err != nil { + return domain.Session{}, err + } + if len(existing) > 0 { + return existing[0], nil + } } return s.Spawn(ctx, ports.SpawnConfig{ProjectID: projectID, Kind: domain.KindOrchestrator}) } @@ -457,7 +467,7 @@ func toAPIError(err error) error { return apierr.Conflict("SESSION_NOT_RESUMABLE", "This session has no saved agent session or prompt to resume from", nil) case errors.Is(err, sessionmanager.ErrProjectNotResolvable): - return apierr.Invalid("PROJECT_NOT_RESOLVABLE", "Project is not registered or has no repo — register it with `ao project add`", nil) + return apierr.Invalid("PROJECT_NOT_RESOLVABLE", "Project is not registered or has no repo. Register it with `ao project add`", nil) case errors.Is(err, sessionmanager.ErrUnknownHarness): return apierr.Invalid("UNKNOWN_HARNESS", err.Error(), nil) case errors.Is(err, sessionmanager.ErrMissingHarness): diff --git a/backend/internal/service/session/service_test.go b/backend/internal/service/session/service_test.go index 3b0476f288..84a4849909 100644 --- a/backend/internal/service/session/service_test.go +++ b/backend/internal/service/session/service_test.go @@ -542,19 +542,74 @@ func TestToAPIErrorMapsWorkspaceBranchSentinels(t *testing.T) { } } -func TestSpawnOrchestratorNoCleanSkipsKills(t *testing.T) { +// TestToAPIError_NotResumable asserts that ErrNotResumable (promptless worker +// with no adapter resume handle) maps to a Conflict with code SESSION_NOT_RESUMABLE. +func TestToAPIError_NotResumable(t *testing.T) { + err := fmt.Errorf("restore mer-1: %w", sessionmanager.ErrNotResumable) + mapped := toAPIError(err) + var e *apierr.Error + if !errors.As(mapped, &e) || e.Kind != apierr.KindConflict || e.Code != "SESSION_NOT_RESUMABLE" { + t.Fatalf("mapped = %v, want Conflict SESSION_NOT_RESUMABLE", mapped) + } +} + +// TestSpawnOrchestratorNoCleanReturnsExistingWhenActiveExists is the RED test +// for the idempotency fix: when an active orchestrator already exists and +// clean=false, SpawnOrchestrator must return that orchestrator without minting +// a second one. Before the fix this test fails because a duplicate is spawned. +func TestSpawnOrchestratorNoCleanReturnsExistingWhenActiveExists(t *testing.T) { st := newFakeStore() st.projects["mer"] = domain.ProjectRecord{ID: "mer"} + // Pre-load an active orchestrator. st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", Kind: domain.KindOrchestrator} fc := &fakeCommander{} svc := &Service{manager: fc, store: st} - if _, err := svc.SpawnOrchestrator(context.Background(), "mer", false); err != nil { + got, err := svc.SpawnOrchestrator(context.Background(), "mer", false) + if err != nil { + t.Fatalf("SpawnOrchestrator: %v", err) + } + // Must return the existing orchestrator, not a newly minted one. + if got.ID != "mer-1" { + t.Fatalf("returned id = %q, want existing orchestrator mer-1", got.ID) + } + // Must NOT have called manager.Spawn (no duplicate created). + if fc.spawned { + t.Fatal("manager.Spawn must NOT be called when an active orchestrator already exists") + } + // Must NOT have killed anything. + if len(fc.killed) != 0 { + t.Fatalf("no kills expected with clean=false, got %v", fc.killed) + } + // Exactly one session in the store (no duplicate). + if len(st.sessions) != 1 { + t.Fatalf("session count = %d, want 1 (no duplicate)", len(st.sessions)) + } +} + +// TestSpawnOrchestratorNoCleanSpawnsWhenNoneExists: clean=false spawns a new +// orchestrator when no active one exists for the project. +func TestSpawnOrchestratorNoCleanSpawnsWhenNoneExists(t *testing.T) { + st := newFakeStore() + st.projects["mer"] = domain.ProjectRecord{ID: "mer"} + // No active orchestrator present. + + fc := &fakeCommander{} + svc := &Service{manager: fc, store: st} + + got, err := svc.SpawnOrchestrator(context.Background(), "mer", false) + if err != nil { t.Fatalf("SpawnOrchestrator: %v", err) } - if len(fc.killed) != 0 || !fc.spawned { - t.Fatalf("clean=false must spawn without kills: killed=%v spawned=%v", fc.killed, fc.spawned) + if !fc.spawned { + t.Fatal("manager.Spawn must be called when no active orchestrator exists") + } + if len(fc.killed) != 0 { + t.Fatalf("no kills expected with clean=false, got %v", fc.killed) + } + if got.ID == "" { + t.Fatal("returned session must have an id") } } @@ -858,17 +913,3 @@ func containsString(values []string, want string) bool { } return false } - -func TestToAPIError_NotResumable(t *testing.T) { - err := toAPIError(fmt.Errorf("restore foo: %w", sessionmanager.ErrNotResumable)) - var ae *apierr.Error - if !errors.As(err, &ae) { - t.Fatalf("want *apierr.Error, got %T: %v", err, err) - } - if ae.Kind != apierr.KindConflict { - t.Errorf("kind = %v, want %v", ae.Kind, apierr.KindConflict) - } - if ae.Code != "SESSION_NOT_RESUMABLE" { - t.Errorf("code = %q, want SESSION_NOT_RESUMABLE", ae.Code) - } -} diff --git a/backend/internal/session_manager/manager.go b/backend/internal/session_manager/manager.go index 89cbf05e02..48d93fe26f 100644 --- a/backend/internal/session_manager/manager.go +++ b/backend/internal/session_manager/manager.go @@ -26,13 +26,6 @@ var ( ErrNotRestorable = errors.New("session: not restorable (not terminal)") ErrTerminated = errors.New("session: terminated") ErrIncompleteHandle = errors.New("session: incomplete teardown handle") - // ErrNotResumable means there is nothing for Restore to relaunch from: the - // harness adapter cannot resume the session (no native or derivable session - // id) AND no prompt was saved to fresh-launch from. Resumability is decided - // by the adapter (e.g. Claude Code pins a deterministic --session-id, so it - // resumes with no captured token), not by inspecting metadata fields here. - // Distinct from ErrNotRestorable (which is "not terminal yet"). - ErrNotResumable = errors.New("session: nothing to resume from") // ErrProjectNotResolvable means the spawn's project has no usable repo // (unregistered, archived, or missing a path). The API maps it to a 400. ErrProjectNotResolvable = errors.New("session: project repo not resolvable") @@ -43,6 +36,12 @@ var ( // ErrMissingHarness means neither the spawn request nor the project's role // config selected an agent. Worker/orchestrator spawns must be explicit. ErrMissingHarness = errors.New("session: agent harness required") + // ErrNotResumable means a terminated session cannot be relaunched: its adapter + // cannot natively resume it AND it has no prompt to fresh-launch from, and it is + // not an orchestrator (orchestrators are promptless by design and relaunch fresh + // with the system prompt only). Workers without a task and without a native + // session id have nothing meaningful to restore. + ErrNotResumable = errors.New("session: nothing to resume from") ) // Env vars a spawned process reads to learn who it is. @@ -487,10 +486,11 @@ func (m *Manager) Restore(ctx context.Context, id domain.SessionID) (domain.Sess if meta.WorkspacePath == "" || meta.Branch == "" { return domain.SessionRecord{}, fmt.Errorf("restore %s: %w", id, ErrIncompleteHandle) } - // Resumability is NOT decided here: a promptless session can still be fully - // resumable when the harness pins a deterministic session id (Claude Code). - // restoreArgv asks the adapter and returns ErrNotResumable only when the - // adapter cannot resume AND there is no prompt to fresh-launch from. + // Resumability is decided inside restoreArgv, not here. A promptless session + // can still be fully resumable when the harness pins a deterministic session id + // (Claude Code). restoreArgv returns ErrNotResumable only for a promptless, + // unresumable non-orchestrator (a worker with no task and no native id to resume). + // Orchestrators always relaunch fresh with the system prompt only. project, err := m.loadProject(ctx, rec.ProjectID) if err != nil { @@ -521,7 +521,7 @@ func (m *Manager) Restore(ctx context.Context, id domain.SessionID) (domain.Sess } // Restore re-applies the project's resolved agent config so a configured // model/permissions carry across a restore, matching fresh spawn. - argv, err := restoreArgv(ctx, agent, id, ws.Path, meta, systemPrompt, effectiveAgentConfig(rec.Kind, project.Config)) + argv, err := restoreArgv(ctx, agent, id, ws.Path, meta, systemPrompt, effectiveAgentConfig(rec.Kind, project.Config), rec.Kind) if err != nil { return domain.SessionRecord{}, fmt.Errorf("restore %s: %w", id, err) } @@ -828,7 +828,14 @@ func (m *Manager) RestoreAll(ctx context.Context) error { // Step 3: relaunch via the existing single-session Restore method. if _, err := m.Restore(ctx, rec.ID); err != nil { - m.logger.Error("restore-all: relaunch failed", "sessionID", rec.ID, "error", err) + // A promptless, unresumable worker is intentionally left terminated + // (ErrNotResumable): expected, not an operational failure, so log it + // quietly rather than as an error. + if errors.Is(err, ErrNotResumable) { + m.logger.Warn("restore-all: session left terminated (nothing to resume)", "sessionID", rec.ID) + } else { + m.logger.Error("restore-all: relaunch failed", "sessionID", rec.ID, "error", err) + } } } return nil @@ -1220,7 +1227,11 @@ func (m *Manager) prepareWorkspace(ctx context.Context, agent ports.Agent, id do // restoreArgv builds the argv to relaunch a torn-down session: the agent's // native resume command when it can continue the session, else a fresh launch. // The agent signals via ok=false (e.g. no native session id captured yet). -func restoreArgv(ctx context.Context, agent ports.Agent, id domain.SessionID, workspacePath string, meta domain.SessionMetadata, systemPrompt string, agentConfig ports.AgentConfig) ([]string, error) { +// Returns ErrNotResumable only for a promptless, unresumable non-orchestrator: +// a worker with no prompt and no native session id has nothing to restore from. +// Orchestrators are promptless by design and always relaunch fresh with the +// system prompt only. +func restoreArgv(ctx context.Context, agent ports.Agent, id domain.SessionID, workspacePath string, meta domain.SessionMetadata, systemPrompt string, agentConfig ports.AgentConfig, kind domain.SessionKind) ([]string, error) { ref := ports.SessionRef{ ID: string(id), WorkspacePath: workspacePath, @@ -1233,12 +1244,13 @@ func restoreArgv(ctx context.Context, agent ports.Agent, id domain.SessionID, wo if ok { return cmd, nil } - // The adapter reports no session to resume (no native or derivable session - // id). A saved prompt lets us relaunch fresh; with neither, there is - // genuinely nothing to restore from. - if meta.Prompt == "" { + // Adapter cannot resume. A saved prompt is replayed fresh. An orchestrator is + // promptless by design and relaunches with the system prompt only. A promptless + // WORKER has no task and no session id to restore from: do not blank-relaunch it. + if meta.Prompt == "" && kind != domain.KindOrchestrator { return nil, ErrNotResumable } + // Fall through to GetLaunchCommand (replays meta.Prompt; empty for an orchestrator). argv, err := agent.GetLaunchCommand(ctx, ports.LaunchConfig{ SessionID: string(id), WorkspacePath: workspacePath, diff --git a/backend/internal/session_manager/manager_test.go b/backend/internal/session_manager/manager_test.go index 2d7d646ac4..82ff2c491a 100644 --- a/backend/internal/session_manager/manager_test.go +++ b/backend/internal/session_manager/manager_test.go @@ -937,23 +937,67 @@ func TestRestore_PromptlessOrchestratorResumesViaAdapter(t *testing.T) { } } -// TestRestore_RefusesPromptlessWhenAdapterCannotResume preserves the typed -// error: a promptless session whose adapter cannot resume (no native session id) -// has genuinely nothing to relaunch from and must still return ErrNotResumable. -func TestRestore_RefusesPromptlessWhenAdapterCannotResume(t *testing.T) { +// TestRestore_PromptlessUnresumableRelaunchesFresh covers the genuine-reboot +// case: a promptless session whose adapter cannot resume (no native session id, +// no captured AgentSessionID) must be relaunched fresh via GetLaunchCommand +// in the SAME id. The orchestrator is the canonical example: after a reboot +// where tmux is truly gone, RestoreAll must recover it in place rather than +// abandon it and mint a new one (which caused the id-increment bug). +func TestRestore_PromptlessUnresumableRelaunchesFresh(t *testing.T) { + st := newFakeStore() + st.sessions["mer-1"] = domain.SessionRecord{ + ID: "mer-1", ProjectID: "mer", Kind: domain.KindOrchestrator, IsTerminated: true, + // No AgentSessionID, no Prompt: exactly how an orchestrator is persisted. + Metadata: domain.SessionMetadata{WorkspacePath: "/ws/mer-1", Branch: "ao/mer-orchestrator"}, + Activity: domain.Activity{State: domain.ActivityExited}, + } + rt := &fakeRuntime{} + lookPath := func(string) (string, error) { return "/bin/true", nil } + // fakeAgents resolves to fakeAgent, whose GetRestoreCommand returns ok=false + // without an agentSessionId, and GetLaunchCommand returns a valid argv. + m := New(Deps{Runtime: rt, Agents: fakeAgents{}, Workspace: &fakeWorkspace{}, Store: st, Messenger: &fakeMessenger{}, Lifecycle: &fakeLCM{store: st}, LookPath: lookPath}) + + if _, err := m.Restore(ctx, "mer-1"); err != nil { + t.Fatalf("promptless unresumable session must relaunch fresh, got err = %v", err) + } + if rt.created != 1 { + t.Fatalf("runtime.Create = %d, want 1 (fresh launch)", rt.created) + } + if st.sessions["mer-1"].IsTerminated { + t.Error("session must be live after fresh relaunch") + } +} + +// TestRestore_PromptlessWorkerNotResumable is the RED test for the promptless-worker +// fix: a KindWorker session with no prompt and no captured AgentSessionID (so the +// adapter returns ok=false) must NOT be blank-relaunched. The session had no task +// to replay and no native id to resume from, so relaunching fresh would silently +// drop its work. Restore must return ErrNotResumable and leave the session terminated +// (runtime.Create must NOT be called). +func TestRestore_PromptlessWorkerNotResumable(t *testing.T) { st := newFakeStore() st.sessions["mer-1"] = domain.SessionRecord{ ID: "mer-1", ProjectID: "mer", Kind: domain.KindWorker, IsTerminated: true, + // No AgentSessionID, no Prompt: promptless worker with no resume handle. Metadata: domain.SessionMetadata{WorkspacePath: "/ws/mer-1", Branch: "ao/mer-1/root"}, Activity: domain.Activity{State: domain.ActivityExited}, } + rt := &fakeRuntime{} lookPath := func(string) (string, error) { return "/bin/true", nil } // fakeAgents resolves to fakeAgent, whose GetRestoreCommand returns ok=false - // without an agentSessionId. - m := New(Deps{Runtime: &fakeRuntime{}, Agents: fakeAgents{}, Workspace: &fakeWorkspace{}, Store: st, Messenger: &fakeMessenger{}, Lifecycle: &fakeLCM{store: st}, LookPath: lookPath}) + // when there is no AgentSessionID. With a KindWorker and empty Prompt, this + // must produce ErrNotResumable instead of a blank relaunch. + m := New(Deps{Runtime: rt, Agents: fakeAgents{}, Workspace: &fakeWorkspace{}, Store: st, Messenger: &fakeMessenger{}, Lifecycle: &fakeLCM{store: st}, LookPath: lookPath}) - if _, err := m.Restore(ctx, "mer-1"); !errors.Is(err, ErrNotResumable) { - t.Fatalf("Restore err = %v, want ErrNotResumable", err) + _, err := m.Restore(ctx, "mer-1") + if !errors.Is(err, ErrNotResumable) { + t.Fatalf("promptless unresumable worker must return ErrNotResumable, got %v", err) + } + if rt.created != 0 { + t.Fatalf("runtime.Create = %d, want 0 (must not relaunch a promptless worker)", rt.created) + } + if !st.sessions["mer-1"].IsTerminated { + t.Error("session must remain terminated after ErrNotResumable") } } diff --git a/docs/superpowers/plans/2026-06-25-ao-cli-install-and-daemon-lifecycle.md b/docs/superpowers/plans/2026-06-25-ao-cli-install-and-daemon-lifecycle.md new file mode 100644 index 0000000000..8ddfa040fc --- /dev/null +++ b/docs/superpowers/plans/2026-06-25-ao-cli-install-and-daemon-lifecycle.md @@ -0,0 +1,132 @@ +# Daemon Shutdown = Adopt-Alive, Not Teardown — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development or superpowers:executing-plans. Steps use checkbox (`- [ ]`) syntax. + +**Goal:** Stopping the daemon (or losing the frontend) must NOT destroy live agent sessions. Sessions persist in the runtime and are re-adopted on the next boot, with their ids and context intact, on macOS/tmux exactly as on Windows/ConPTY. + +**Architecture (validated by direct experiment):** tmux sessions survive the daemon process dying (they reparent to `launchd`), and the boot reconcile already adopts surviving sessions correctly. The bug is that the **graceful** shutdown path runs `SaveAndTeardownAll`, which calls `runtime.Destroy` (`tmux kill-session`) and DESTROYS the live session; the orchestrator then can't be restored (promptless → `ErrNotResumable`) and the frontend mints a new one (id increments). Fix: shutdown becomes a clean exit that leaves sessions alive; boot adopts them. The liveness link makes the daemon reliably stop when the frontend dies, via that same clean exit. + +**Tech Stack:** Go daemon (`session_manager`, `daemon`, `httpd`, tmux/ConPTY runtimes), Electron main (TypeScript, `node:net`), `go test` + Vitest. + +## Reproduction (recorded, sandboxed `AO_DATA_DIR`) + +- Spawn worker + orchestrator → both get a live tmux session, DB `is_terminated=0`. +- `kill -9` daemon → tmux sessions survive → restart → both **adopted**: same id, `is_terminated=0`, same tmux session. No increment. ✅ +- `ao stop` (graceful) → `SaveAndTeardownAll` → tmux sessions **killed**, marked `exited`, marker written → restart → orchestrator stays `is_terminated=1` (Restore returned `ErrNotResumable`) → `POST /orchestrators` creates `…-3` (num 2→3). ❌ This is the bug. + +## Why only the orchestrator visibly breaks (workers are NOT immune) + +The graceful path kills BOTH workers' and orchestrators' live sessions; the orchestrator is just the only one where the damage is visible, for two independent reasons that both happen to land on it: + +1. **Restore failure is orchestrator-only.** Workers carry a saved `prompt`, so when the agent can't natively resume, `restoreArgv` falls back to relaunching fresh from the prompt and "succeeds". The orchestrator is promptless (`prompt=""` for all of them in the DB), so the same path hits `ErrNotResumable` (`manager.go:1238`) and it is left terminated. +2. **Auto-recreation is orchestrator-only.** The frontend calls `POST /api/v1/orchestrators` on load to _ensure_ an orchestrator; finding none active, it mints a new one (`num+1` → the visible `14→15→16`). Nothing auto-respawns workers, so a worker that fails to restore just silently vanishes. + So both workers and the orchestrator lose their LIVE session + context on a graceful stop today; the orchestrator alone advertises it via the incrementing id. Task 1 (adopt-alive instead of teardown) restores live context for ALL sessions and needs no orchestrator special-casing — this is the unification you asked for. + +## Global Constraints + +- All app state under `~/.ao` (`AO_DATA_DIR`/`AO_RUN_FILE` overrides). Liveness socket under `~/.ao` (unix) / named pipe (Windows). +- No em dashes anywhere. +- Headless safety: a daemon with no frontend (CLI `ao start`) must never self-stop and must not have its sessions destroyed. +- `ao` on agent PATH already works in packaged builds via `HookPATH` (`session_manager/manager.go:1082`). No global install. Dev `go run` is the only gap (optional Phase D). +- Do not break genuine reboot recovery: when the runtime is truly gone, `reconcileLive` stashes the worktree and relaunches (work preserved). Keep that path. +- Mark shortcuts with `ponytail:` comments. + +--- + +## File Structure + +- **Modify** `backend/internal/daemon/daemon.go` — remove the `SaveAndTeardownAll` call from the normal shutdown path (`daemon.go:164`); the daemon exits leaving sessions alive. (Keep `Reconcile` on boot, which already adopts.) +- **Modify** `backend/internal/session_manager/manager.go` — `restoreArgv` (~1238): a promptless session relaunches fresh in the same id instead of `ErrNotResumable` (covers the reboot-only case). Audit/remove now-dead orchestrator divergence. +- **Create** `backend/internal/daemon/supervisor/{supervisor.go,listen_unix.go,listen_windows.go,supervisor_test.go}` — OS-native liveness listener + watchdog → triggers a clean shutdown (the same `RequestShutdown` the HTTP `/shutdown` uses) when the frontend link drops. +- **Modify** `backend/internal/daemon/daemon.go` — start the supervisor, publish its address (extend `runfile`/`/healthz`), wire `onLastClientGone` to `RequestShutdown`. +- **Create** `frontend/src/main/supervisor-link.ts` (+ test); **Modify** `frontend/src/main.ts` — hold the supervisor link open for the app lifetime; remove all daemon-stop logic from `before-quit`/`process.on("exit")`. + +Phases (independently shippable): + +- **Phase A** (Task 1): shutdown no longer tears down sessions → adopt-alive on the graceful path. THE fix; stops the increment and preserves context. +- **Phase B** (Tasks 2-4): OS-native liveness link → daemon cleanly stops when the frontend dies (no orphan), without tearing down sessions. +- **Phase C** (Task 5): promptless restore + de-segregate (covers the genuine reboot case). +- **Phase D** (Task 6, optional): dev `ao`-on-PATH. + +--- + +## Task 1: Shutdown stops destroying live sessions (Phase A — the fix) + +**Files:** Modify `backend/internal/daemon/daemon.go`. + +**Change:** On the shutdown path (after `srv.Run` returns, `daemon.go:~162-166`), do NOT call `SaveAndTeardownAll`. The daemon exits and the tmux/ConPTY sessions stay alive; the next boot's `Reconcile`→`reconcileLive` adopts them (already implemented and verified). `SaveAndTeardownAll` is reserved for explicit teardown needs, not routine shutdown. + +> Rationale proven above: with the teardown removed, the graceful path behaves like the hard-kill path, which already adopts cleanly. Uncommitted work is not lost: it stays in the on-disk worktree and, if the runtime is ever genuinely gone (reboot), `reconcileLive` stashes it on boot. + +- [ ] **Step 1:** Write a failing `go test` at the daemon/session_manager seam: after a simulated graceful shutdown, live sessions remain non-terminated and their runtime handles are NOT destroyed. (Use the existing manager test doubles / a fake runtime that records `Destroy` calls; assert `Destroy` is not called on shutdown.) +- [ ] **Step 2:** Run it → FAIL (current code calls `SaveAndTeardownAll` → `Destroy`). +- [ ] **Step 3:** Remove the `SaveAndTeardownAll` invocation from `daemon.Run`'s shutdown sequence (keep ordered teardown of CDC/preview/lifecycle goroutines). Leave the function in place for explicit callers. +- [ ] **Step 4:** Run `go test ./internal/daemon/... ./internal/session_manager/... -race` → PASS (fix any test asserting the old teardown-on-shutdown). +- [ ] **Step 5:** Manual repro (sandbox `AO_DATA_DIR`): spawn orchestrator → `ao stop` → tmux session SURVIVES → `ao start` → orchestrator adopted, SAME id, no increment. +- [ ] **Step 6:** Commit `fix(daemon): do not tear down live sessions on shutdown; adopt them on boot`. + +## Task 2: Supervisor watchdog core + +**Files:** Create `backend/internal/daemon/supervisor/supervisor.go`, `supervisor_test.go`. +**Produces:** `New(grace, onLastClientGone, log)`, `(*Supervisor) Serve(ctx, ln net.Listener) error`. Arms on first accepted conn; when live count hits 0, starts `grace`; if it elapses still 0, calls `onLastClientGone()` once; a reconnect cancels it. Each conn read into a scratch buffer purely to detect close. + +- [ ] **Step 1:** Failing tests: never fires pre-connect; fires once after grace on last disconnect; reconnect within grace cancels. Use `net.Pipe()` + a fake listener + short grace. +- [ ] **Step 2:** Run → FAIL. +- [ ] **Step 3:** Implement (mutex `liveCount`, `time.AfterFunc` grace, `sync.Once` fire). +- [ ] **Step 4:** Run → PASS. +- [ ] **Step 5:** Commit `feat(daemon): supervisor watchdog`. + +## Task 3: Platform listeners + daemon wiring + +**Files:** Create `supervisor/listen_unix.go`, `listen_windows.go`; Modify `daemon.go`. +**Produces:** `Listen(dataDir) (net.Listener, string, error)` — unix UDS at `~/.ao/supervise.sock` (unlink-stale first); windows named pipe `\\.\pipe\ao-supervise` (`//go:build windows`, via `go-winio` — confirm/declare the dep). Wire into `daemon.Run` after the HTTP server is up; publish `addr` (extend `runfile` write or `/healthz`); `go sup.Serve(ctx, ln)`; `onLastClientGone = deps.RequestShutdown`. Because Task 1 made shutdown non-destructive, a watchdog-triggered shutdown simply exits leaving sessions alive. + +- [ ] **Step 1:** Implement listeners (unix first; windows behind build tag). +- [ ] **Step 2:** Wire + publish address. +- [ ] **Step 3:** `go build ./... && go vet ./...` clean (darwin at least). +- [ ] **Step 4:** Manual: start daemon, `nc -U ~/.ao/supervise.sock`, kill `nc` → daemon exits after grace, **tmux sessions still alive**; reconnect within grace → no shutdown. +- [ ] **Step 5:** Commit `feat(daemon): OS-native supervisor listener triggers clean shutdown`. + +## Task 4: Electron holds the link; drop quit-time daemon teardown + +**Files:** Create `frontend/src/main/supervisor-link.ts` (+ test); Modify `frontend/src/main.ts`. +**Produces:** `connectSupervisor(addr, opts?) -> { dispose() }` (`node:net` connect to UDS/pipe; retry with backoff if the daemon is not up yet; heartbeat byte every N s). In `main.ts`: connect after the daemon is ready (read addr from the handshake); **remove** all daemon-stop logic from `before-quit`/`process.on("exit")` (delete `killDaemon`/`ao stop`). Closing the app drops the socket → daemon self-stops cleanly, sessions persist. + +- [ ] **Step 1:** Failing test: retry-until-connected against a throwaway `net.Server` on a temp UDS. +- [ ] **Step 2:** Run `pnpm vitest run src/main/supervisor-link.test.ts` → FAIL. +- [ ] **Step 3:** Implement `supervisor-link.ts`. +- [ ] **Step 4:** Edit `main.ts`; `pnpm tsc --noEmit && pnpm vite build --config vite.main.config.ts` clean. +- [ ] **Step 5:** Dev smoke: Cmd+Q AND `kill -9` Electron → daemon exits both ways, `running.json` gone, **tmux sessions still alive** → reopen → sessions adopted with context. +- [ ] **Step 6:** Commit `feat(desktop): supervisor link; daemon self-stops (clean) on frontend exit`. + +## Task 5: Promptless restore + de-segregate (covers the reboot case) + +**Files:** Modify `backend/internal/session_manager/manager.go`. +**Change:** In `restoreArgv` (~1238), when `ok=false` and `meta.Prompt==""`, relaunch fresh via `GetLaunchCommand` (empty prompt, system prompt only) instead of returning `ErrNotResumable`. This only matters when the runtime is genuinely gone (reboot) and `RestoreAll` runs; with Task 1, normal restarts adopt and never reach here. Remove orchestrator-only divergence the audit surfaces. + +- [ ] **Step 1:** Failing `go test`: `restoreArgv` with `ok=false`, empty `AgentSessionID` + empty `Prompt` returns the fresh `GetLaunchCommand` argv, not `ErrNotResumable`. +- [ ] **Step 2:** Run → FAIL. +- [ ] **Step 3:** Implement (drop the empty-prompt early return; fall through to `GetLaunchCommand`). +- [ ] **Step 4:** `go test ./internal/session_manager/... -race` → PASS. +- [ ] **Step 5:** Manual reboot-sim: spawn orchestrator → `tmux kill-server` (simulate reboot losing tmux) → restart daemon → orchestrator restored in the SAME id (not recreated). +- [ ] **Step 6:** Commit `fix(core): restore promptless sessions in place (reboot recovery, no increment)`. + +## Task 6 (optional, Phase D): `ao` on agent PATH in dev + +`HookPATH` needs the daemon binary named `ao`. Packaged satisfies this; dev `go run` produces a hash-named temp binary. If wanted, build a stable `~/.ao/dev/ao` once at dev startup and launch from it. Detail on request. + +--- + +## Verification (whole feature) + +- [ ] `go build ./... && go vet ./... && go test ./... -race` green; `cd frontend && pnpm vitest run && pnpm tsc --noEmit` green; full `pnpm build`. +- [ ] **Graceful stop preserves sessions:** spawn orchestrator → `ao stop` → tmux session ALIVE → `ao start` → orchestrator adopted, SAME id (reproduces the fix for the recorded bug). +- [ ] **Frontend death:** Cmd+Q AND `kill -9` Electron → daemon exits, sessions alive, reopen → adopted with context. +- [ ] **Reboot recovery:** `tmux kill-server` then restart → orchestrator restored in the same id. +- [ ] **Headless safety:** `ao start` from a terminal, no app → daemon runs forever, sessions intact. + +## Self-Review + +- Spec coverage: don't depend on clean close (Tasks 1+4), no orphan daemon (Task 4), orchestrator survives restart treated like a worker (Task 1 adopt; Task 5 reboot), OS-native pipe/socket transport (Tasks 2-3), `ao` to workers (HookPATH; dev = Task 6). Covered. +- Key insight baked in: the fix is primarily DELETION (stop tearing down on shutdown), validated by the hard-kill adopt experiment. +- Open implementation check (Task 1): confirm nothing else relies on `SaveAndTeardownAll` running at shutdown (e.g., a test or a resource-flush); the function stays available for explicit teardown. diff --git a/frontend/src/main.ts b/frontend/src/main.ts index ac3bb4af87..0c00e2a0c0 100644 --- a/frontend/src/main.ts +++ b/frontend/src/main.ts @@ -33,6 +33,8 @@ import { buildDaemonEnv, resolveShellEnv, type ShellRunner } from "./shared/shel import { DEFAULT_POSTHOG_HOST, DEFAULT_POSTHOG_PROJECT_KEY } from "./shared/posthog-config"; import { buildTelemetryBootstrap } from "./shared/telemetry"; import { createBrowserViewHost, type BrowserViewHost } from "./main/browser-view-host"; +import { connectSupervisor, type SupervisorLinkHandle } from "./main/supervisor-link"; +import { shouldLinkOnAttach } from "./main/daemon-owner"; // Globals injected at compile time by @electron-forge/plugin-vite. declare const MAIN_WINDOW_VITE_DEV_SERVER_URL: string | undefined; @@ -68,6 +70,8 @@ let daemonStartPromise: Promise | null = null; let daemonStartEpoch = 0; let daemonStatus: DaemonStatus = { state: "stopped" }; let browserViewHost: BrowserViewHost | null = null; +// Held for the app lifetime. Dropping it (on any exit) triggers daemon self-stop. +let supervisorLink: SupervisorLinkHandle | null = null; const isDev = !app.isPackaged; @@ -300,11 +304,15 @@ function ensureShellEnv(): Promise { } function daemonEnv(): NodeJS.ProcessEnv { + // AO_OWNER=app marks this daemon as app-spawned so the app can re-link the + // supervisor on attach (headless `ao start` daemons get no AO_OWNER and stay + // unlinked, preserving their persistence across app quit). + const ownerTag = { AO_OWNER: "app" }; // Windows keeps the old behavior exactly: no shell probe, no unix PATH floor. if (process.platform === "win32") { - return { ...process.env, ...telemetryOverrides() }; + return { ...process.env, ...telemetryOverrides(), ...ownerTag }; } - return buildDaemonEnv(process.env, cachedShellEnv, telemetryOverrides()); + return buildDaemonEnv(process.env, cachedShellEnv, { ...telemetryOverrides(), ...ownerTag }); } function pathKey(value: string): string { @@ -371,7 +379,38 @@ function daemonIdentityError(launch: DaemonLaunchSpec, probe: DaemonProbe): stri return null; } -async function inspectExistingDaemon(launch: DaemonLaunchSpec): Promise { +/** + * Establish (or re-establish) the OS-native liveness link to the daemon's + * supervisor socket. Holding this connection keeps the daemon alive: when + * Electron exits for any reason (Cmd+Q, crash, SIGKILL), the OS closes the fd + * and the daemon detects EOF, then self-stops after its ~5s grace period. + * + * Called unconditionally on the spawn path (we always own that daemon). + * Called on the attach path only when the daemon is app-owned (owner === "app"); + * headless `ao start` daemons stay unlinked so they remain persistent after + * app quit. + */ +function establishSupervisorLink(): void { + const rfp = runFilePath(); + const addr = + process.platform === "win32" + ? "\\\\.\\pipe\\ao-supervise" + : rfp + ? path.join(path.dirname(rfp), "supervise.sock") + : null; + if (addr) { + supervisorLink?.dispose(); + supervisorLink = connectSupervisor(addr, { + log: (msg) => console.log(`AO: ${msg}`), + }); + } else { + console.warn("AO: supervisor link skipped; run-file path unavailable"); + } +} + +async function inspectExistingDaemon( + launch: DaemonLaunchSpec, +): Promise<{ status: DaemonStatus; owner: string | undefined } | null> { const handshakePath = runFilePath(); let runFileContents: string | null = null; if (handshakePath) { @@ -381,12 +420,15 @@ async function inspectExistingDaemon(launch: DaemonLaunchSpec): Promise daemonIdentityError(launch, probe), }); + if (!status) return null; + const owner = runFileContents ? (parseRunFile(runFileContents)?.owner ?? undefined) : undefined; + return { status, owner }; } async function refreshDaemonStatus(): Promise { @@ -403,7 +445,7 @@ async function refreshDaemonStatus(): Promise { if (!launch) return daemonStatus; const existing = await inspectExistingDaemon(launch); if (existing) { - setDaemonStatus(existing); + setDaemonStatus(existing.status); } else if ( daemonStatus.state === "ready" || (daemonStatus.state === "error" && (daemonStatus.pid || daemonStatus.port)) @@ -460,7 +502,13 @@ async function startDaemonInner(startEpoch: number): Promise { return daemonStatus; } if (existing) { - setDaemonStatus(existing); + setDaemonStatus(existing.status); + // Re-link the supervisor only when attaching to an app-owned daemon (one we + // previously spawned). Headless `ao start` daemons (owner unset) stay unlinked + // so they remain persistent after app quit. + if (shouldLinkOnAttach(existing.owner)) { + establishSupervisorLink(); + } return daemonStatus; } @@ -482,6 +530,25 @@ async function startDaemonInner(startEpoch: number): Promise { } if (directDaemon) { setDaemonStatus(directDaemon); + // Re-link iff the daemon is app-owned. Read the run-file for the owner tag; + // if unavailable (run-file absent or unreadable), treat as headless and skip. + // ponytail: narrow TOCTOU here (the port was probed live, then the run-file + // is read separately), so in theory a headless daemon could have replaced an + // app-owned one in the gap. Acceptable: the window is tiny, the worst case is + // linking a headless daemon, and establishSupervisorLink disposes any prior + // link so nothing leaks. + const rfp = runFilePath(); + let portAttachOwner: string | undefined; + if (rfp) { + try { + portAttachOwner = parseRunFile(await readFile(rfp, "utf8"))?.owner ?? undefined; + } catch { + // run-file absent or unreadable: treat as headless, skip link. + } + } + if (shouldLinkOnAttach(portAttachOwner)) { + establishSupervisorLink(); + } return daemonStatus; } @@ -593,6 +660,15 @@ async function startDaemonInner(startEpoch: number): Promise { portConfirmed = true; stopDiscovery(); setDaemonStatus({ state: "ready", port }); + + // Establish the OS-native liveness link unconditionally: this callback fires + // only on the spawn path (we own this daemon). Holding the connection keeps + // the daemon alive; when Electron exits for any reason, the OS closes the fd + // and the daemon detects EOF, then self-stops after its ~5s grace period. + // The attach paths link only when the daemon is app-owned (see + // establishSupervisorLink + shouldLinkOnAttach); headless `ao start` daemons + // stay unlinked so they remain persistent across app quit. + establishSupervisorLink(); }; // One scanner per stream: each keeps its own partial-line buffer. @@ -683,6 +759,11 @@ function stopDaemon(): DaemonStatus { } daemonStoppingProcess = daemonProcess; + // Drop the liveness link: an explicit stop is not a frontend death, so stop + // holding the socket open (and stop the reconnect loop retrying a dead daemon). + // A later daemon:start re-establishes the link via reportBoundPort. + supervisorLink?.dispose(); + supervisorLink = null; killDaemon(daemonProcess); setDaemonStatus({ state: "stopped" }); return daemonStatus; @@ -751,77 +832,24 @@ app.whenReady().then(() => { }); }); -// Re-entrancy guard: the first before-quit fires, prevents default, does async -// work, then calls app.exit(). If app.quit() is called concurrently (e.g. from -// window-all-closed on non-darwin), the second before-quit fires while the first -// is still in flight. Without a guard it would preventDefault again and loop. -// With the guard set to true, the second invocation falls through and lets the -// quit proceed. app.exit() itself does NOT re-fire before-quit, so the guard -// mainly protects against a concurrent app.quit() race. -let quitting = false; - -app.on("before-quit", (event) => { +// Daemon teardown is now handled via the OS-native supervisor socket: the daemon +// self-stops ~5s after the last client (this process) drops its connection. +// The supervisorLink fd is NOT explicitly closed on quit; the OS closes it when +// the process exits for any reason (Cmd+Q, crash, SIGKILL). Sessions survive. +app.on("before-quit", () => { browserViewHost?.dispose(); browserViewHost = null; - - // Re-entrancy: if we already started the async quit sequence, let this - // invocation fall through so the app actually exits. - if (quitting) return; - quitting = true; - - // Capture the current daemon handle and port before any async gap so that - // a race with stopDaemon() cannot null them out underneath us. - const child = daemonProcess; - const port = daemonStatus.state === "ready" ? daemonStatus.port : undefined; - - if (!child) { - // No daemon we own: nothing to shut down. - return; - } - - // Prevent the synchronous quit so we can ask the daemon to save gracefully - // before killing it. - event.preventDefault(); - - const doQuit = async () => { - // Best-effort graceful shutdown: POST /shutdown so the daemon flushes - // its session state before exiting. An ~8s timeout prevents a hung or - // absent daemon from blocking quit indefinitely. - // Note: the daemon's internal save bound is 30s (shutdownSaveTimeout), so - // if this fetch times out and we proceed to killDaemon (SIGTERM), the first - // SIGTERM only cancels the daemon's listen context; the daemon's in-flight - // save (on a fresh context) still runs to completion or its own 30s bound. - if (port !== undefined) { - try { - await fetch(`http://127.0.0.1:${port}/shutdown`, { - method: "POST", - signal: AbortSignal.timeout(8_000), - }); - } catch { - // Timeout, network error, or daemon already gone: proceed to kill. - console.log(`AO: /shutdown fetch failed (port ${port}); proceeding with SIGTERM.`); - } - } - - // Kill the daemon process group (reaches the daemon behind any shell - // wrapper and its PTY children). - killDaemon(child); - - // Exit without re-firing before-quit (app.exit bypasses the event). - app.exit(0); - }; - - void doQuit(); }); -// Last-resort teardown. before-quit covers the normal quit path, but app.exit() -// and some shutdown routes skip it, which would orphan the detached daemon and -// leave it holding the port for the next launch. The Node 'exit' event fires -// synchronously on those paths too, so the daemon's process group is always -// signalled when the supervisor goes away. (A hard SIGKILL/crash still can't run -// JS; the daemon's port-conflict fallback covers the orphan that leaves behind.) +// Last resort: if the OS-native supervisor link is not actually connected +// (daemon socket never bound, e.g. UDS path-length limit, or addr was null), +// the dropped fd will NOT stop the daemon on quit, so kill it here to avoid an +// orphan. Safe because Phase A made the daemon's SIGTERM non-destructive: it +// exits without tearing down sessions, which survive for the next boot to adopt. +// When the link IS connected we do nothing here and rely on the OS closing the +// fd on exit, which covers crash and SIGKILL uniformly. process.on("exit", () => { - if (daemonProcess) { + if (daemonProcess && !supervisorLink?.connected) { killDaemon(daemonProcess); } }); diff --git a/frontend/src/main/daemon-owner.test.ts b/frontend/src/main/daemon-owner.test.ts new file mode 100644 index 0000000000..806dc4ba05 --- /dev/null +++ b/frontend/src/main/daemon-owner.test.ts @@ -0,0 +1,21 @@ +// @vitest-environment node +import { describe, it, expect } from "vitest"; +import { shouldLinkOnAttach } from "./daemon-owner"; + +describe("shouldLinkOnAttach", () => { + it('returns true when owner is "app"', () => { + expect(shouldLinkOnAttach("app")).toBe(true); + }); + + it("returns false when owner is undefined (headless ao start)", () => { + expect(shouldLinkOnAttach(undefined)).toBe(false); + }); + + it('returns false when owner is "" (empty string)', () => { + expect(shouldLinkOnAttach("")).toBe(false); + }); + + it('returns false when owner is "cli"', () => { + expect(shouldLinkOnAttach("cli")).toBe(false); + }); +}); diff --git a/frontend/src/main/daemon-owner.ts b/frontend/src/main/daemon-owner.ts new file mode 100644 index 0000000000..771060165e --- /dev/null +++ b/frontend/src/main/daemon-owner.ts @@ -0,0 +1,9 @@ +/** + * Whether the app should hold a supervisor link to a daemon it ATTACHED to + * (did not spawn). Only re-link app-owned daemons (owner === "app"); leave + * headless `ao start` daemons (owner unset or empty) unlinked so they stay + * persistent across app quit. + */ +export function shouldLinkOnAttach(owner: string | undefined): boolean { + return owner === "app"; +} diff --git a/frontend/src/main/supervisor-link.test.ts b/frontend/src/main/supervisor-link.test.ts new file mode 100644 index 0000000000..901044a6f4 --- /dev/null +++ b/frontend/src/main/supervisor-link.test.ts @@ -0,0 +1,201 @@ +// @vitest-environment node +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; +import { describe, it, expect, afterEach } from "vitest"; +import { connectSupervisor, type SupervisorLinkHandle } from "./supervisor-link"; + +// Bounded wait: resolves when the promise resolves, rejects after timeoutMs. +function withTimeout(promise: Promise, timeoutMs: number, label: string): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error(`Timeout: ${label}`)), timeoutMs); + promise.then( + (v) => { + clearTimeout(timer); + resolve(v); + }, + (e) => { + clearTimeout(timer); + reject(e); + }, + ); + }); +} + +function tmpSocketPath(): string { + return path.join(os.tmpdir(), `ao-svlink-test-${process.pid}-${Date.now()}.sock`); +} + +// Promisify: resolves the next time server.on("connection") fires. +function nextConnection(server: net.Server): Promise { + return new Promise((resolve) => { + server.once("connection", resolve); + }); +} + +describe("supervisor-link", () => { + const handles: SupervisorLinkHandle[] = []; + const servers: net.Server[] = []; + + afterEach(async () => { + for (const h of handles.splice(0)) h.dispose(); + await Promise.all( + servers.splice(0).map( + (s) => + new Promise((resolve) => { + s.close(() => resolve()); + }), + ), + ); + }); + + it("retries until connected: connects after server is started later", async () => { + const addr = tmpSocketPath(); + + // Start the link BEFORE the server exists. + const link = connectSupervisor(addr, { log: () => undefined }); + handles.push(link); + + // Wait a bit so a few retry attempts have fired against a missing socket. + await new Promise((r) => setTimeout(r, 400)); + + // Now start the server. + const server = net.createServer(); + servers.push(server); + const connectionPromise = nextConnection(server); + await new Promise((resolve, reject) => { + server.listen(addr, () => resolve()); + server.once("error", reject); + }); + + // The link should reconnect and the server should receive a connection. + const conn = await withTimeout( + connectionPromise, + 5_000, + "retry-until-connected: server did not receive connection", + ); + expect(conn).toBeTruthy(); + conn.destroy(); + }); + + it("reconnects on drop: re-establishes after the accepted socket is closed", async () => { + const addr = tmpSocketPath(); + + // Start server first. + const server = net.createServer(); + servers.push(server); + + let connectionCount = 0; + const secondConnection = new Promise((resolve) => { + let first = true; + server.on("connection", (sock) => { + connectionCount++; + if (first) { + first = false; + // Close the first accepted socket to simulate a drop. + setTimeout(() => sock.destroy(), 50); + } else { + resolve(sock); + } + }); + }); + + await new Promise((resolve, reject) => { + server.listen(addr, () => resolve()); + server.once("error", reject); + }); + + // Connect after server is up. + const link = connectSupervisor(addr, { log: () => undefined }); + handles.push(link); + + // Wait for both the initial connection and the reconnect. + const reconn = await withTimeout(secondConnection, 6_000, "reconnect-on-drop: second connection never arrived"); + expect(connectionCount).toBeGreaterThanOrEqual(2); + reconn.destroy(); + }); + + it("connected flag: true after connect, false after server closes connection", async () => { + const addr = tmpSocketPath(); + + const server = net.createServer(); + servers.push(server); + const connectionPromise = nextConnection(server); + await new Promise((resolve, reject) => { + server.listen(addr, () => resolve()); + server.once("error", reject); + }); + + const link = connectSupervisor(addr, { log: () => undefined }); + handles.push(link); + + // Wait for the server to receive the connection. + const conn = await withTimeout(connectionPromise, 3_000, "connected-flag: server did not receive connection"); + + // Poll until connected is true (the "connect" event fires asynchronously). + await withTimeout( + new Promise((resolve) => { + const check = () => { + if (link.connected) { + resolve(); + return; + } + setTimeout(check, 20); + }; + check(); + }), + 1_000, + "connected-flag: handle.connected never became true", + ); + expect(link.connected).toBe(true); + + // Server-side close of the accepted socket triggers the client "close" event. + conn.destroy(); + + // Poll until connected drops back to false. + await withTimeout( + new Promise((resolve) => { + const check = () => { + if (!link.connected) { + resolve(); + return; + } + setTimeout(check, 20); + }; + check(); + }), + 3_000, + "connected-flag: handle.connected never became false after server closed", + ); + expect(link.connected).toBe(false); + + link.dispose(); + }); + + it("dispose stops reconnect: no connection arrives after dispose", async () => { + const addr = tmpSocketPath(); + + // Start link against a missing socket (no server), then dispose quickly. + const link = connectSupervisor(addr, { log: () => undefined }); + + // Dispose before the server exists. + link.dispose(); + + // Start a server and assert no connection arrives within a bounded window. + const server = net.createServer(); + servers.push(server); + let receivedConnection = false; + server.on("connection", () => { + receivedConnection = true; + }); + await new Promise((resolve, reject) => { + server.listen(addr, () => resolve()); + server.once("error", reject); + }); + + // Wait long enough for at least one retry cycle to have run if dispose failed. + await new Promise((r) => setTimeout(r, 600)); + + expect(receivedConnection).toBe(false); + }); +}); diff --git a/frontend/src/main/supervisor-link.ts b/frontend/src/main/supervisor-link.ts new file mode 100644 index 0000000000..b8f556b254 --- /dev/null +++ b/frontend/src/main/supervisor-link.ts @@ -0,0 +1,113 @@ +import net from "node:net"; + +// ponytail: no heartbeat. The open socket IS the liveness signal. When the +// Electron process dies the kernel closes the fd and the daemon detects EOF +// immediately (proven against the real daemon with a write-free held +// connection). A heartbeat adds nothing for a Unix domain socket or named +// pipe and is omitted deliberately. + +const BACKOFF_INIT_MS = 200; +const BACKOFF_MAX_MS = 2_000; + +export interface SupervisorLinkHandle { + readonly connected: boolean; + dispose(): void; +} + +/** + * Hold one client connection to the daemon's supervisor socket for the + * lifetime of the Electron process. When this process exits for any reason + * (Cmd+Q, crash, SIGKILL), the OS closes the fd. The daemon detects EOF and + * self-stops after its ~5s grace period, leaving tmux/ConPTY sessions alive + * for the next boot to adopt. + * + * Retry semantics: if the daemon has not created the socket yet (or restarts), + * we reconnect with bounded exponential backoff so the link re-establishes + * automatically. dispose() cancels any pending retry and destroys the socket. + */ +export function connectSupervisor(addr: string, opts?: { log?: (msg: string) => void }): SupervisorLinkHandle { + const log = opts?.log ?? (() => undefined); + + let disposed = false; + let connected = false; + let socket: net.Socket | null = null; + let retryTimer: ReturnType | null = null; + let backoff = BACKOFF_INIT_MS; + + function clearRetry() { + if (retryTimer !== null) { + clearTimeout(retryTimer); + retryTimer = null; + } + } + + function destroySocket() { + if (socket !== null) { + socket.removeAllListeners(); + socket.destroy(); + socket = null; + } + } + + function scheduleReconnect() { + if (disposed) return; + clearRetry(); + const delay = backoff; + backoff = Math.min(backoff * 2, BACKOFF_MAX_MS); + log(`supervisor-link: reconnecting in ${delay}ms`); + retryTimer = setTimeout(() => { + retryTimer = null; + if (!disposed) connect(); + }, delay); + } + + function connect() { + if (disposed) return; + + destroySocket(); + + const s = net.connect(addr); + socket = s; + + s.on("connect", () => { + if (disposed) { + s.destroy(); + return; + } + connected = true; + log("supervisor-link: connected"); + // Reset backoff on successful connection. + backoff = BACKOFF_INIT_MS; + }); + + // Drain inbound data: the daemon never sends payload; discard so the + // socket buffer never stalls. ponytail: no payload to process. + s.on("data", () => undefined); + + s.on("error", (err) => { + log(`supervisor-link: error: ${err.message}`); + // close fires after error, which schedules the reconnect. + }); + + s.on("close", () => { + connected = false; + if (disposed) return; + log("supervisor-link: connection closed, will retry"); + scheduleReconnect(); + }); + } + + connect(); + + return { + get connected() { + return connected; + }, + dispose() { + disposed = true; + connected = false; + clearRetry(); + destroySocket(); + }, + }; +} diff --git a/frontend/src/renderer/test/setup.ts b/frontend/src/renderer/test/setup.ts index b228fa15ef..795db7b4cd 100644 --- a/frontend/src/renderer/test/setup.ts +++ b/frontend/src/renderer/test/setup.ts @@ -1,136 +1,141 @@ import "@testing-library/jest-dom/vitest"; -class ResizeObserverStub { - observe() {} - unobserve() {} - disconnect() {} -} +// Guard: src/main/** tests run in the Node.js environment (no DOM). vitest still +// routes setupFiles here, so only install the DOM stubs when a DOM exists. +// ponytail: single guard; node env has no DOM to stub. +if (typeof window !== "undefined") { + class ResizeObserverStub { + observe() {} + unobserve() {} + disconnect() {} + } -Object.defineProperty(window, "ResizeObserver", { - configurable: true, - writable: true, - value: ResizeObserverStub, -}); + Object.defineProperty(window, "ResizeObserver", { + configurable: true, + writable: true, + value: ResizeObserverStub, + }); -Object.defineProperty(window, "matchMedia", { - configurable: true, - writable: true, - value: (query: string) => ({ - matches: false, - media: query, - onchange: null, - addEventListener: () => undefined, - removeEventListener: () => undefined, - addListener: () => undefined, - removeListener: () => undefined, - dispatchEvent: () => false, - }), -}); + Object.defineProperty(window, "matchMedia", { + configurable: true, + writable: true, + value: (query: string) => ({ + matches: false, + media: query, + onchange: null, + addEventListener: () => undefined, + removeEventListener: () => undefined, + addListener: () => undefined, + removeListener: () => undefined, + dispatchEvent: () => false, + }), + }); -const localStorageStub = (() => { - const values = new Map(); - return { - clear: () => values.clear(), - getItem: (key: string) => values.get(key) ?? null, - removeItem: (key: string) => values.delete(key), - setItem: (key: string, value: string) => values.set(key, value), - }; -})(); + const localStorageStub = (() => { + const values = new Map(); + return { + clear: () => values.clear(), + getItem: (key: string) => values.get(key) ?? null, + removeItem: (key: string) => values.delete(key), + setItem: (key: string, value: string) => values.set(key, value), + }; + })(); -Object.defineProperty(window, "localStorage", { - configurable: true, - writable: true, - value: localStorageStub, -}); + Object.defineProperty(window, "localStorage", { + configurable: true, + writable: true, + value: localStorageStub, + }); -HTMLCanvasElement.prototype.getContext = (() => ({})) as unknown as typeof HTMLCanvasElement.prototype.getContext; + HTMLCanvasElement.prototype.getContext = (() => ({})) as unknown as typeof HTMLCanvasElement.prototype.getContext; -Element.prototype.hasPointerCapture = (() => false) as typeof Element.prototype.hasPointerCapture; -Element.prototype.setPointerCapture = (() => undefined) as typeof Element.prototype.setPointerCapture; -Element.prototype.releasePointerCapture = (() => undefined) as typeof Element.prototype.releasePointerCapture; -Element.prototype.scrollIntoView = (() => undefined) as typeof Element.prototype.scrollIntoView; + Element.prototype.hasPointerCapture = (() => false) as typeof Element.prototype.hasPointerCapture; + Element.prototype.setPointerCapture = (() => undefined) as typeof Element.prototype.setPointerCapture; + Element.prototype.releasePointerCapture = (() => undefined) as typeof Element.prototype.releasePointerCapture; + Element.prototype.scrollIntoView = (() => undefined) as typeof Element.prototype.scrollIntoView; -window.ao = { - app: { - getVersion: async () => "0.0.0-test", - chooseDirectory: async () => null, - }, - clipboard: { - writeText: async () => undefined, - readText: async () => "", - }, - daemon: { - getStatus: async () => ({ state: "stopped" }), - start: async () => ({ state: "starting" }), - stop: async () => ({ state: "stopped" }), - onStatus: () => () => undefined, - }, - telemetry: { - getBootstrap: async () => null, - }, - browser: { - ensure: async (sessionId: string) => ({ - viewId: `test:${sessionId}`, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - setBounds: () => undefined, - navigate: async ({ viewId }: { viewId: string }) => ({ - viewId, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - clear: async (viewId: string) => ({ - viewId, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - goBack: async (viewId: string) => ({ - viewId, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - goForward: async (viewId: string) => ({ - viewId, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - reload: async (viewId: string) => ({ - viewId, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - stop: async (viewId: string) => ({ - viewId, - url: "", - title: "", - canGoBack: false, - canGoForward: false, - isLoading: false, - }), - destroy: () => undefined, - onNavState: () => () => undefined, - }, - notifications: { - show: async () => undefined, - onClick: () => () => undefined, - }, -}; + window.ao = { + app: { + getVersion: async () => "0.0.0-test", + chooseDirectory: async () => null, + }, + clipboard: { + writeText: async () => undefined, + readText: async () => "", + }, + daemon: { + getStatus: async () => ({ state: "stopped" }), + start: async () => ({ state: "starting" }), + stop: async () => ({ state: "stopped" }), + onStatus: () => () => undefined, + }, + telemetry: { + getBootstrap: async () => null, + }, + browser: { + ensure: async (sessionId: string) => ({ + viewId: `test:${sessionId}`, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + setBounds: () => undefined, + navigate: async ({ viewId }: { viewId: string }) => ({ + viewId, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + clear: async (viewId: string) => ({ + viewId, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + goBack: async (viewId: string) => ({ + viewId, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + goForward: async (viewId: string) => ({ + viewId, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + reload: async (viewId: string) => ({ + viewId, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + stop: async (viewId: string) => ({ + viewId, + url: "", + title: "", + canGoBack: false, + canGoForward: false, + isLoading: false, + }), + destroy: () => undefined, + onNavState: () => () => undefined, + }, + notifications: { + show: async () => undefined, + onClick: () => () => undefined, + }, + }; +} // end if (typeof window !== "undefined") diff --git a/frontend/src/shared/daemon-discovery.ts b/frontend/src/shared/daemon-discovery.ts index 16f407cb68..9e9f608429 100644 --- a/frontend/src/shared/daemon-discovery.ts +++ b/frontend/src/shared/daemon-discovery.ts @@ -66,6 +66,11 @@ export type RunFileInfo = { port: number; /** startedAt in epoch ms; 0 when missing/unparseable. */ startedAtMs: number; + /** + * Daemon ownership tag. "app" when the desktop app spawned this daemon; + * undefined/empty for a headless `ao start` daemon. + */ + owner?: string; }; /** Parse running.json contents. Returns null for malformed JSON or an invalid port. */ @@ -77,13 +82,19 @@ export function parseRunFile(contents: string): RunFileInfo | null { return null; } if (typeof raw !== "object" || raw === null) return null; - const { pid, port, startedAt } = raw as { pid?: unknown; port?: unknown; startedAt?: unknown }; + const { pid, port, startedAt, owner } = raw as { + pid?: unknown; + port?: unknown; + startedAt?: unknown; + owner?: unknown; + }; if (typeof port !== "number" || !Number.isInteger(port) || port < 1 || port > 65535) return null; const startedAtMs = typeof startedAt === "string" ? Date.parse(startedAt) : NaN; return { pid: typeof pid === "number" && Number.isInteger(pid) ? pid : 0, port, startedAtMs: Number.isNaN(startedAtMs) ? 0 : startedAtMs, + owner: typeof owner === "string" ? owner : undefined, }; }