Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7186c3d
fix(daemon): do not tear down live sessions on shutdown; adopt them o…
harshitsinghbhandari Jun 25, 2026
6fb48f6
fix(daemon): make shutdown-teardown regression test falsifiable
harshitsinghbhandari Jun 25, 2026
3a52589
refactor(daemon): guard shutdown-teardown at compile time via narrowe…
harshitsinghbhandari Jun 25, 2026
000fc69
fix(session): make ensure-orchestrator idempotent so POST cannot mint…
harshitsinghbhandari Jun 25, 2026
5602027
test(session): drop redundant NoCleanSkipsKills, covered by SpawnsWhe…
harshitsinghbhandari Jun 25, 2026
5a61b75
feat(daemon): supervisor watchdog
harshitsinghbhandari Jun 25, 2026
63ce6af
fix(daemon): supervisor watchdog review fixes (data race, ErrClosed, …
harshitsinghbhandari Jun 25, 2026
dbdd9bd
feat(daemon): OS-native supervisor listener triggers clean shutdown
harshitsinghbhandari Jun 25, 2026
02977e0
fix(daemon): log supervisor Serve error instead of discarding it
harshitsinghbhandari Jun 25, 2026
76a995e
feat(desktop): supervisor link; daemon self-stops (clean) on frontend…
harshitsinghbhandari Jun 25, 2026
e9eb791
fix(desktop): guard against daemon orphan when supervisor link is dow…
harshitsinghbhandari Jun 25, 2026
aabaad7
fix(core): restore promptless sessions in place (reboot recovery, no …
harshitsinghbhandari Jun 25, 2026
3c11afd
test(integration): dead-live session is restored, not abandoned, afte…
harshitsinghbhandari Jun 25, 2026
72e9d1d
style(session): replace em dashes in service.go messages and comment
harshitsinghbhandari Jun 25, 2026
6ed9bc5
docs(desktop): scope supervisor link to spawn path; dispose link on e…
harshitsinghbhandari Jun 25, 2026
cd9f21c
chore: format with prettier [skip ci]
github-actions[bot] Jun 25, 2026
fa2e4c1
docs: add daemon-lifecycle adopt-on-shutdown implementation plan
harshitsinghbhandari Jun 25, 2026
c5fdfdc
chore: format with prettier [skip ci]
github-actions[bot] Jun 25, 2026
9013402
fix(daemon): keep supervisor watchdog alive across transient Accept e…
harshitsinghbhandari Jun 25, 2026
2931a19
fix(core): leave promptless workers terminated on restore (orchestrat…
harshitsinghbhandari Jun 25, 2026
cc0309b
fix(desktop): re-link supervisor on attach for app-owned daemons (clo…
harshitsinghbhandari Jun 25, 2026
f09bea2
fix: quiet expected ErrNotResumable log in RestoreAll; note attach TO…
harshitsinghbhandari Jun 25, 2026
5cc9b74
style: gofmt the review-fix files (CI format/lint check)
harshitsinghbhandari Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/aoagents/agent-orchestrator/backend
go 1.25.7

require (
github.com/Microsoft/go-winio v0.6.2
github.com/aymanbagabas/go-pty v0.2.3
github.com/coder/websocket v1.8.14
github.com/creack/pty v1.1.24
Expand Down
2 changes: 2 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/aymanbagabas/go-pty v0.2.3 h1:hsqcTIUV8I4iTSh3HQl61CR2wh0YPS6gHOYLhAfWu/E=
github.com/aymanbagabas/go-pty v0.2.3/go.mod h1:GLkgQovzqN5A1xMB79yHWiG1rhcquZCjkwKQGKFPdPg=
github.com/bool64/dev v0.2.43 h1:yQ7qiZVef6WtCl2vDYU0Y+qSq+0aBrQzY8KXkklk9cQ=
Expand Down
40 changes: 23 additions & 17 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/runtimeselect"
"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/daemon/supervisor"
"github.com/aoagents/agent-orchestrator/backend/internal/httpd"
"github.com/aoagents/agent-orchestrator/backend/internal/notify"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
Expand Down Expand Up @@ -149,21 +150,30 @@ func Run() error {
log.Error("reconcile sessions on boot failed", "err", reconcileErr)
}

// ponytail: 5s tolerates a brief frontend restart; tune if dev hot-reload trips it.
const supervisorGrace = 5 * time.Second

if ln, addr, err := supervisor.Listen(cfg.RunFilePath); err != nil {
// Non-fatal: without the link the daemon still works (e.g. headless "ao start"),
// it just will not auto-stop when a frontend dies. Do not block startup on it.
log.Warn("supervisor: listener unavailable; frontend-death auto-stop disabled", "err", err)
} else {
log.Info("supervisor: listening", "addr", addr)
sup := supervisor.New(supervisorGrace, srv.RequestShutdown, log)
go func() {
if err := sup.Serve(ctx, ln); err != nil {
log.Warn("supervisor: serve stopped with error", "err", err)
}
}()
}

runErr := srv.Run(ctx)

// Save and tear down all live sessions before the store closes. Both SIGTERM
// and POST /shutdown funnel through srv.Run returning (SIGTERM cancels ctx,
// which srv.Run selects on; POST /shutdown closes the shutdownRequested channel,
// which srv.Run also selects on), so this single call site covers both paths.
//
// Use a fresh context with a bounded deadline: the ctx that caused srv.Run
// to return is already cancelled, so passing it would abort the save
// immediately and leave every session unsaved.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownSaveTimeout)
defer shutdownCancel()
if saveErr := sessMgr.SaveAndTeardownAll(shutdownCtx); saveErr != nil {
log.Error("save sessions on shutdown failed", "err", saveErr)
}
// Both graceful shutdown paths (SIGTERM and POST /shutdown) funnel through
// srv.Run returning. We deliberately do NOT tear down sessions here: they
// survive the daemon exit and the next boot's Reconcile adopts them,
// preserving session IDs. The narrowed sessionLifecycle interface makes
// teardown-on-shutdown a compile error.

// Shut the background goroutines down in order: cancel the context FIRST so
// their loops exit, then wait for them to drain. Doing this explicitly (not
Expand All @@ -178,10 +188,6 @@ func Run() error {
return runErr
}

// shutdownSaveTimeout bounds the SaveAndTeardownAll call on shutdown so a
// pathological session cannot stall the process exit indefinitely.
const shutdownSaveTimeout = 30 * time.Second

// newLogger returns the daemon's slog logger. It writes to stderr so supervisors
// can capture it separately from any structured stdout protocol added later.
func newLogger() *slog.Logger {
Expand Down
9 changes: 6 additions & 3 deletions backend/internal/daemon/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,21 @@ func (l *lifecycleStack) Stop() {
// sessionLifecycle is the narrow surface of sessionmanager.Manager used for
// boot/shutdown wiring. A minimal interface keeps the daemon testable without
// depending on the concrete manager type.
//
// SaveAndTeardownAll is deliberately ABSENT from this interface so the daemon
// cannot tear down live sessions on shutdown. Sessions survive the daemon exit
// and Reconcile on the next boot adopts them, preserving session IDs. Re-adding
// the method here is a visible, reviewable interface change.
type sessionLifecycle interface {
Reconcile(ctx context.Context) error
RestoreAll(ctx context.Context) error
SaveAndTeardownAll(ctx context.Context) error
}

// startSession builds the controller-facing session service: a session manager
// over the selected runtime, a per-session gitworktree workspace, the shared
// store + LCM, the per-session agent resolver, and the agent messenger. The
// returned service is mounted at httpd APIDeps.Sessions. It also returns the
// manager so the caller can wire Reconcile/SaveAndTeardownAll into the
// boot/shutdown sequence.
// manager so the caller can wire Reconcile into the boot sequence.
func startSession(cfg config.Config, runtime runtimeselect.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, telemetry ports.EventSink, log *slog.Logger) (*sessionsvc.Service, reviewsvc.Manager, sessionLifecycle, error) {
defaultAgent := cfg.Agent
if defaultAgent == "" {
Expand Down
25 changes: 25 additions & 0 deletions backend/internal/daemon/supervisor/listen_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//go:build !windows

package supervisor

import (
"net"
"os"
"path/filepath"
)

// Listen creates a Unix domain socket listener alongside the run-file.
// The socket path is a sibling of runFilePath: <dir(runFilePath)>/supervise.sock.
// Any stale socket file is removed before binding (ignored if absent).
// The returned net.Listener unlinks the socket on Close (Go stdlib default for UnixListener).
// Returns (listener, socketPath, error).
func Listen(runFilePath string) (net.Listener, string, error) {
sockPath := filepath.Join(filepath.Dir(runFilePath), "supervise.sock")
// Remove stale socket; ignore not-exist error.
_ = os.Remove(sockPath)
ln, err := net.Listen("unix", sockPath)
if err != nil {
return nil, "", err
}
return ln, sockPath, nil
}
81 changes: 81 additions & 0 deletions backend/internal/daemon/supervisor/listen_unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//go:build !windows

package supervisor

import (
"net"
"os"
"path/filepath"
"testing"
)

// TestListen_basic verifies that Listen returns a listener whose address is
// <dir(runFilePath)>/supervise.sock, that the socket file exists on disk, and
// that a Dial to that address succeeds.
func TestListen_basic(t *testing.T) {
t.Parallel()
dir := t.TempDir()
runFile := filepath.Join(dir, "running.json")

ln, addr, err := Listen(runFile)
if err != nil {
t.Fatalf("Listen: unexpected error: %v", err)
}
defer ln.Close()

wantSock := filepath.Join(dir, "supervise.sock")
if addr != wantSock {
t.Errorf("addr = %q, want %q", addr, wantSock)
}

// Socket file must exist after Listen.
if _, err := os.Stat(wantSock); err != nil {
t.Errorf("socket file missing after Listen: %v", err)
}

// Dialing the returned address must succeed.
conn, err := net.Dial("unix", addr)
if err != nil {
t.Fatalf("Dial(%q): %v", addr, err)
}
conn.Close()
}

// TestListen_staleSocket verifies that a pre-existing file at the socket path
// does not prevent Listen from succeeding (the stale file is removed first).
func TestListen_staleSocket(t *testing.T) {
t.Parallel()
dir := t.TempDir()
runFile := filepath.Join(dir, "running.json")
sockPath := filepath.Join(dir, "supervise.sock")

// Pre-create a regular file to simulate a stale socket.
if err := os.WriteFile(sockPath, []byte("stale"), 0o600); err != nil {
t.Fatalf("pre-create stale file: %v", err)
}

ln, _, err := Listen(runFile)
if err != nil {
t.Fatalf("Listen with stale socket: unexpected error: %v", err)
}
ln.Close()
}

// TestListen_unlinkOnClose verifies that closing the listener removes the
// socket file from the filesystem (Go stdlib default for UnixListener).
func TestListen_unlinkOnClose(t *testing.T) {
t.Parallel()
dir := t.TempDir()
runFile := filepath.Join(dir, "running.json")
sockPath := filepath.Join(dir, "supervise.sock")

ln, _, err := Listen(runFile)
if err != nil {
t.Fatalf("Listen: %v", err)
}
ln.Close()

if _, err := os.Stat(sockPath); !os.IsNotExist(err) {
t.Errorf("socket file still present after Close (err=%v); expected not-exist", err)
}
}
23 changes: 23 additions & 0 deletions backend/internal/daemon/supervisor/listen_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//go:build windows

package supervisor

import (
"net"

"github.com/Microsoft/go-winio"
)

const pipeName = `\\.\pipe\ao-supervise`

// Listen creates a Windows named pipe listener for the supervisor watchdog.
// runFilePath is ignored on Windows: named pipes are global and identified
// by name only.
// ponytail: global pipe name; add a per-instance suffix if multiple daemons must coexist on one machine.
func Listen(_ string) (net.Listener, string, error) {
ln, err := winio.ListenPipe(pipeName, nil)
if err != nil {
return nil, "", err
}
return ln, pipeName, nil
}
162 changes: 162 additions & 0 deletions backend/internal/daemon/supervisor/supervisor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Package supervisor provides a transport-agnostic watchdog that fires a
// callback when the last connected client disconnects and stays gone for a
// configurable grace period. It arms only after the FIRST client ever
// connects so a daemon started with no frontend (e.g. CLI "ao start") never
// self-stops.
//
// This package is a leaf: it imports only stdlib.
package supervisor

import (
"context"
"errors"
"log/slog"
"net"
"sync"
"time"
)

// acceptRetryBackoff bounds the retry after a transient Accept error so a
// persistent failure cannot hot-spin the accept loop.
const acceptRetryBackoff = 200 * time.Millisecond

// Supervisor watches connections on a net.Listener and calls onLastClientGone
// exactly once (per process lifetime) when the live-count drops to zero and
// stays zero for the grace period.
//
// Concurrency model:
// - mu guards liveCount, armed, and pendingTimer.
// - armed flips to true on the first accepted connection and never resets;
// it is the "headless-safety" gate that prevents a pre-connect fire.
// - pendingTimer holds the *time.Timer from time.AfterFunc so it can be
// stopped on reconnect. A non-nil pendingTimer means a grace countdown is
// running.
// - fireOnce ensures onLastClientGone is called at most once for the entire
// process lifetime, even if the timer fires concurrently with a reconnect.
type Supervisor struct {
grace time.Duration
onLastClientGone func()
log *slog.Logger

mu sync.Mutex
liveCount int
armed bool // true once any connection has been accepted
pendingTimer *time.Timer // non-nil while grace countdown is running

fireOnce sync.Once
}

// New creates a Supervisor. grace is the delay before the callback fires after
// the last connection closes. onLastClientGone is called at most once for the
// process lifetime, so it is safe to use it to trigger os.Exit or context
// cancellation.
func New(grace time.Duration, onLastClientGone func(), log *slog.Logger) *Supervisor {
return &Supervisor{
grace: grace,
onLastClientGone: onLastClientGone,
log: log,
}
}

// Serve runs the accept loop on ln until ctx is cancelled or ln is closed.
// It returns nil on a clean shutdown (context cancelled or listener closed
// normally); it only returns a non-nil error for unexpected Accept failures.
func (s *Supervisor) Serve(ctx context.Context, ln net.Listener) error {
// Derive a cancellable context so the watcher goroutine always unblocks
// when Serve returns, even if ctx itself is not cancelled (e.g. listener
// closed directly).
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Close the listener when ctx is cancelled so Accept() unblocks.
go func() {
<-ctx.Done()
_ = ln.Close()
}()

for {
conn, err := ln.Accept()
if err != nil {
// A closed listener or context cancellation is a clean stop.
select {
case <-ctx.Done():
return nil
default:
}
// net.ErrClosed is what real listeners return when closed normally.
if errors.Is(err, net.ErrClosed) {
return nil
}
// A transient Accept error (e.g. EMFILE) must NOT silently kill the
// watchdog: that would leave the daemon unable to self-stop on
// frontend death. Back off briefly and keep accepting. A genuinely
// closed listener returns net.ErrClosed (handled above) or trips
// ctx.Done during the backoff.
s.log.Warn("supervisor: accept error, retrying", "err", err)
select {
case <-ctx.Done():
return nil
case <-time.After(acceptRetryBackoff):
}
continue
}

s.mu.Lock()
s.armed = true
s.liveCount++
// If a grace timer was pending (reconnect before grace elapsed), cancel it.
if s.pendingTimer != nil {
s.pendingTimer.Stop()
s.pendingTimer = nil
}
live := s.liveCount
s.mu.Unlock()

s.log.Debug("supervisor: client connected", "liveCount", live)
go s.watchConn(conn)
}
}

// watchConn drains conn (reads into a scratch buffer) purely to detect close.
// When read returns io.EOF or any error, the connection is gone.
func (s *Supervisor) watchConn(conn net.Conn) {
// ponytail: 32-byte scratch buffer; we never process the payload.
scratch := make([]byte, 32)
for {
_, err := conn.Read(scratch)
if err != nil {
break
}
}
_ = conn.Close()

s.mu.Lock()
s.liveCount--
live := s.liveCount
armed := s.armed
s.mu.Unlock()

s.log.Debug("supervisor: client disconnected", "liveCount", live)

if armed && live == 0 {
s.armGrace()
}
}

// armGrace starts the grace countdown. If another client connects before it
// elapses, Serve() will Stop() the timer via pendingTimer.
func (s *Supervisor) armGrace() {
s.mu.Lock()
s.pendingTimer = time.AfterFunc(s.grace, func() {
s.mu.Lock()
live := s.liveCount
s.pendingTimer = nil
s.mu.Unlock()

if live == 0 {
s.log.Info("supervisor: last client gone; grace elapsed, firing callback")
s.fireOnce.Do(s.onLastClientGone)
}
})
s.mu.Unlock()
}
Loading
Loading