From 44e6397c8f032494b7d059fa920f6e2bb34a17a9 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 14:57:20 +0200 Subject: [PATCH 01/21] feat(cowork): observe Claude Cowork tool calls via per-session settings injection Adds Cowork observation to the managed-observe daemon, reusing the existing Claude Code pipeline (classify -> store -> managedstream -> ledger). No backend ingest changes needed; Cowork rows are tagged agent:cowork. - internal/agent/cowork: registers the "cowork" agent, reusing the Claude hook decoder (Cowork runs the same bundled Claude Code CLI). - internal/coworkobserve: injector writes settings.json with a PreToolUse command hook into each per-session .claude dir (the host-mounted guest $HOME/.claude, which survives the per-boot VM rootfs rebuild); the hook spools each event to a host file; the collector replays events into the daemon's localruntime socket as agent "cowork". File-drop transport, no in-VM network. - Gated by KONTEXT_COWORK_OBSERVE; wired into RunDaemon; agent blank-imported. Verified end to end locally: real Cowork bash + Chrome MCP tool calls land in the hosted ledger tagged agent:cowork. Follow-ups (dev-grade today): FSEvents instead of 250ms poll; persist collector offsets (in-memory now); enforce mode (hook<->daemon decision round-trip); unit tests for injector/collector. Co-Authored-By: Claude Opus 4.8 --- cmd/kontext/main.go | 1 + internal/agent/cowork/cowork.go | 31 +++ internal/coworkobserve/coworkobserve.go | 249 ++++++++++++++++++++++++ internal/managedobserve/daemon.go | 10 + 4 files changed, 291 insertions(+) create mode 100644 internal/agent/cowork/cowork.go create mode 100644 internal/coworkobserve/coworkobserve.go diff --git a/cmd/kontext/main.go b/cmd/kontext/main.go index 71f38a7..16d7427 100644 --- a/cmd/kontext/main.go +++ b/cmd/kontext/main.go @@ -27,6 +27,7 @@ import ( "github.com/kontext-security/kontext-cli/internal/update" _ "github.com/kontext-security/kontext-cli/internal/agent/claude" + _ "github.com/kontext-security/kontext-cli/internal/agent/cowork" ) var version = "dev" diff --git a/internal/agent/cowork/cowork.go b/internal/agent/cowork/cowork.go new file mode 100644 index 0000000..3b376df --- /dev/null +++ b/internal/agent/cowork/cowork.go @@ -0,0 +1,31 @@ +// Package cowork registers the Cowork agent adapter. Claude Cowork runs the same +// bundled Claude Code CLI inside a per-session VM, so its hook payloads use the +// identical Claude Code hook-input format. The adapter therefore reuses the +// Claude decoder/encoder and only differs in its name, which is recorded as the +// session's agent ("cowork") to distinguish Cowork activity from Claude Code in +// the ledger and dashboard. +package cowork + +import ( + "github.com/kontext-security/kontext-cli/internal/agent" + "github.com/kontext-security/kontext-cli/internal/hook" + "github.com/kontext-security/kontext-cli/internal/hookruntime" +) + +func init() { + agent.Register(&Cowork{}) +} + +type Cowork struct{} + +func (c *Cowork) Name() string { return "cowork" } + +func (c *Cowork) Aliases() []string { return []string{"claude-cowork"} } + +func (c *Cowork) DecodeHookInput(input []byte) (hook.Event, error) { + return hookruntime.DecodeClaudeEvent(input, c.Name()) +} + +func (c *Cowork) EncodeHookResult(event hook.Event, result hook.Result) ([]byte, error) { + return hookruntime.EncodeClaudeResult(event.HookName.String(), result) +} diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go new file mode 100644 index 0000000..6742f51 --- /dev/null +++ b/internal/coworkobserve/coworkobserve.go @@ -0,0 +1,249 @@ +// Package coworkobserve adds Claude Cowork observation to the managed-observe +// daemon, reusing the existing Claude Code pipeline. +// +// Cowork runs the bundled Claude Code CLI inside a per-session VM whose root +// filesystem is rebuilt on every boot, so hooks cannot be baked into the image. +// What does survive is the per-session CLAUDE config dir, which Cowork mounts +// from the host at: +// +// ~/Library/Application Support/Claude/local-agent-mode-sessions///local_/.claude/ +// +// This is the guest's $HOME/.claude — the "user" settings tier, which Cowork's +// CLI loads at startup. Two host-side loops run inside the daemon: +// +// - injector: writes settings.json with a PreToolUse command hook into each +// new per-session .claude dir before the in-VM CLI initializes. The hook +// appends every tool event to a host-mounted spool file in the session dir +// (no in-VM network needed — the dir is a host mount). +// - collector: tails those spool files and replays each event into the +// daemon's existing localruntime socket as agent "cowork", reusing the same +// classify -> store -> managedstream -> ledger path Claude Code uses. +package coworkobserve + +import ( + "bytes" + "context" + "encoding/json" + "net" + "os" + "path/filepath" + "strings" + "time" + + "github.com/kontext-security/kontext-cli/internal/diagnostic" + "github.com/kontext-security/kontext-cli/internal/hook" + "github.com/kontext-security/kontext-cli/internal/localruntime" +) + +const ( + // EnvEnabled enables Cowork observation when set to a truthy value. + EnvEnabled = "KONTEXT_COWORK_OBSERVE" + // EnvSessionsRoot overrides the Cowork sessions root for testing. + EnvSessionsRoot = "KONTEXT_COWORK_SESSIONS_ROOT" + + spoolName = "kontext-cowork-events.jsonl" + settingsMark = spoolName // settings.json containing this string is ours + agentName = "cowork" +) + +// Enabled reports whether Cowork observation is turned on via the environment. +func Enabled() bool { + switch strings.ToLower(strings.TrimSpace(os.Getenv(EnvEnabled))) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} + +// Options configures the Cowork observer loops. +type Options struct { + // SocketPath is the daemon's localruntime socket (where events are replayed). + SocketPath string + // SessionsRoot is the Cowork sessions root; defaults to the standard path. + SessionsRoot string + // PollInterval controls how often the loops scan; defaults to 250ms. + PollInterval time.Duration + Diagnostic diagnostic.Logger +} + +// DefaultSessionsRoot returns the standard Cowork sessions root on macOS. +func DefaultSessionsRoot() string { + if override := strings.TrimSpace(os.Getenv(EnvSessionsRoot)); override != "" { + return override + } + home, err := os.UserHomeDir() + if err != nil { + return "" + } + return filepath.Join(home, "Library", "Application Support", "Claude", "local-agent-mode-sessions") +} + +// The command hook reads the full Claude Code hook event from stdin and appends +// it to a spool file one directory above cwd (the session dir, host-mounted), +// then exits 0 so the tool is never blocked. +const hookCommand = `p=$(cat); printf '%s\n' "$p" >> ../` + spoolName + ` 2>/dev/null; true` + +func settingsJSON() []byte { + settings := map[string]any{ + "hooks": map[string]any{ + "PreToolUse": []any{ + map[string]any{ + "matcher": "*", + "hooks": []any{ + map[string]any{"type": "command", "command": hookCommand, "timeout": 12}, + }, + }, + }, + }, + } + data, _ := json.Marshal(settings) + return data +} + +// Run starts the injector and collector loops and blocks until ctx is cancelled. +func Run(ctx context.Context, opts Options) { + if opts.SessionsRoot == "" { + opts.SessionsRoot = DefaultSessionsRoot() + } + if opts.PollInterval <= 0 { + opts.PollInterval = 250 * time.Millisecond + } + if opts.SessionsRoot == "" { + opts.Diagnostic.Printf("cowork observe: no sessions root; disabled\n") + return + } + opts.Diagnostic.Printf("cowork observe: watching %s\n", opts.SessionsRoot) + + c := &collector{offsets: map[string]int64{}} + settings := settingsJSON() + ticker := time.NewTicker(opts.PollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + inject(opts, settings) + c.collect(ctx, opts) + } + } +} + +// inject writes settings.json into any recent per-session .claude dir that does +// not yet carry our hook. +func inject(opts Options, settings []byte) { + claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) + cutoff := time.Now().Add(-3 * time.Minute) + for _, dir := range claudeDirs { + info, err := os.Stat(dir) + if err != nil || info.ModTime().Before(cutoff) { + continue + } + settingsPath := filepath.Join(dir, "settings.json") + if existing, err := os.ReadFile(settingsPath); err == nil && bytes.Contains(existing, []byte(settingsMark)) { + continue // already ours + } + if err := os.WriteFile(settingsPath, settings, 0o644); err != nil { + opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) + continue + } + opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) + } +} + +type collector struct { + offsets map[string]int64 +} + +type coworkEvent struct { + SessionID string `json:"session_id"` + SessionIDAlt string `json:"sessionId"` + HookEventName string `json:"hook_event_name"` + HookEventAlt string `json:"hookEventName"` + ToolName string `json:"tool_name"` + ToolNameAlt string `json:"toolName"` + ToolInput json.RawMessage `json:"tool_input"` + ToolInputAlt json.RawMessage `json:"toolInput"` + ToolUseID string `json:"tool_use_id"` + CWD string `json:"cwd"` +} + +func (c *collector) collect(ctx context.Context, opts Options) { + spools, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", spoolName)) + for _, spool := range spools { + c.drain(ctx, opts, spool) + } +} + +func (c *collector) drain(ctx context.Context, opts Options, spool string) { + data, err := os.ReadFile(spool) + if err != nil { + return + } + off := c.offsets[spool] + if int64(len(data)) <= off { + return + } + fresh := data[off:] + c.offsets[spool] = int64(len(data)) + for _, line := range bytes.Split(fresh, []byte("\n")) { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + if err := c.replay(opts, line); err != nil { + opts.Diagnostic.Printf("cowork observe: replay: %v\n", err) + } + } +} + +func (c *collector) replay(opts Options, line []byte) error { + var ev coworkEvent + if err := json.Unmarshal(line, &ev); err != nil { + return err + } + hookEvent := firstNonEmpty(ev.HookEventName, ev.HookEventAlt) + if hook.HookName(hookEvent) != hook.HookPreToolUse { + return nil // injector only wires PreToolUse + } + sessionID := firstNonEmpty(ev.SessionID, ev.SessionIDAlt) + toolInput := ev.ToolInput + if len(toolInput) == 0 { + toolInput = ev.ToolInputAlt + } + req := localruntime.EvaluateRequest{ + Type: "evaluate", + SessionID: "cowork-" + sessionID, + Agent: agentName, + HookEvent: hook.HookPreToolUse.String(), + ToolName: firstNonEmpty(ev.ToolName, ev.ToolNameAlt), + ToolInput: toolInput, + ToolUseID: ev.ToolUseID, + CWD: ev.CWD, + } + return send(opts.SocketPath, req) +} + +func send(socketPath string, req localruntime.EvaluateRequest) error { + conn, err := net.DialTimeout("unix", socketPath, 5*time.Second) + if err != nil { + return err + } + defer conn.Close() + _ = conn.SetDeadline(time.Now().Add(10 * time.Second)) + if err := localruntime.WriteMessage(conn, req); err != nil { + return err + } + var res localruntime.EvaluateResult + return localruntime.ReadMessage(conn, &res) +} + +func firstNonEmpty(values ...string) string { + for _, v := range values { + if v != "" { + return v + } + } + return "" +} diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index 9bd5e66..49e93db 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -7,6 +7,7 @@ import ( "net/http" "time" + "github.com/kontext-security/kontext-cli/internal/coworkobserve" "github.com/kontext-security/kontext-cli/internal/diagnostic" guardhookruntime "github.com/kontext-security/kontext-cli/internal/guard/hookruntime" "github.com/kontext-security/kontext-cli/internal/guard/store/sqlite" @@ -91,6 +92,15 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { }) }() + // Cowork observation runs alongside Claude Code in the same daemon, replaying + // in-VM Cowork tool events into the same localruntime socket as agent "cowork". + if coworkobserve.Enabled() { + go coworkobserve.Run(ctx, coworkobserve.Options{ + SocketPath: socketPath, + Diagnostic: opts.Diagnostic, + }) + } + idleTimeout := opts.IdleTimeout if idleTimeout == 0 { idleTimeout = DefaultIdleTimeout() From 4e8d6790fef7f86d96dc4bf4ee1cacd135494042 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 15:53:45 +0200 Subject: [PATCH 02/21] fix(coworkobserve): only advance spool offset past complete, replayed lines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The collector previously read the whole spool, advanced its offset to EOF, then parsed lines — so a trailing partial line (hook mid-append) was consumed and dropped forever, and a failed socket replay also lost its event because the offset had already moved. Drain now seeks to the saved offset, reads incrementally (no full-file re-read every tick), only advances past complete newline-terminated lines, halts and retries on transient socket errors (at-least-once delivery), skips permanently-malformed lines, and resets when the spool shrinks (file recreated). Covered by new collector tests against a fake daemon socket. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 66 +++++-- internal/coworkobserve/coworkobserve_test.go | 181 +++++++++++++++++++ 2 files changed, 232 insertions(+), 15 deletions(-) create mode 100644 internal/coworkobserve/coworkobserve_test.go diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 6742f51..651b39b 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -24,6 +24,9 @@ import ( "bytes" "context" "encoding/json" + "errors" + "fmt" + "io" "net" "os" "path/filepath" @@ -125,7 +128,7 @@ func Run(ctx context.Context, opts Options) { return case <-ticker.C: inject(opts, settings) - c.collect(ctx, opts) + c.collect(opts) } } } @@ -169,39 +172,72 @@ type coworkEvent struct { CWD string `json:"cwd"` } -func (c *collector) collect(ctx context.Context, opts Options) { +func (c *collector) collect(opts Options) { spools, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", spoolName)) for _, spool := range spools { - c.drain(ctx, opts, spool) + c.drain(opts, spool) } } -func (c *collector) drain(ctx context.Context, opts Options, spool string) { - data, err := os.ReadFile(spool) +// errMalformed marks spool lines that can never replay successfully; drain +// drops them. Any other replay error is treated as transient (the daemon +// socket hiccuped), so drain stops before the failed line and retries it on +// the next tick. Retrying after a partial send can deliver an event twice; +// for an audit trail, at-least-once beats silently dropping it. +var errMalformed = errors.New("malformed spool line") + +// drain replays complete lines appended since the last tick. The in-VM hook +// may be mid-append while we read, so a trailing chunk without a newline is +// left in place — the offset only ever advances past complete, handled lines. +func (c *collector) drain(opts Options, spool string) { + f, err := os.Open(spool) + if err != nil { + return + } + defer f.Close() + info, err := f.Stat() if err != nil { return } off := c.offsets[spool] - if int64(len(data)) <= off { + if info.Size() < off { + off = 0 // spool was recreated; start over on the new file + } + if info.Size() == off { return } - fresh := data[off:] - c.offsets[spool] = int64(len(data)) - for _, line := range bytes.Split(fresh, []byte("\n")) { - line = bytes.TrimSpace(line) - if len(line) == 0 { - continue + if _, err := f.Seek(off, io.SeekStart); err != nil { + return + } + data, err := io.ReadAll(f) + if err != nil { + return + } + consumed := 0 + for { + idx := bytes.IndexByte(data[consumed:], '\n') + if idx < 0 { + break // partial line still being appended; retry next tick } - if err := c.replay(opts, line); err != nil { - opts.Diagnostic.Printf("cowork observe: replay: %v\n", err) + line := bytes.TrimSpace(data[consumed : consumed+idx]) + if len(line) > 0 { + if err := c.replay(opts, line); err != nil { + if !errors.Is(err, errMalformed) { + opts.Diagnostic.Printf("cowork observe: replay: %v\n", err) + break + } + opts.Diagnostic.Printf("cowork observe: %v\n", err) + } } + consumed += idx + 1 } + c.offsets[spool] = off + int64(consumed) } func (c *collector) replay(opts Options, line []byte) error { var ev coworkEvent if err := json.Unmarshal(line, &ev); err != nil { - return err + return fmt.Errorf("%w: %v", errMalformed, err) } hookEvent := firstNonEmpty(ev.HookEventName, ev.HookEventAlt) if hook.HookName(hookEvent) != hook.HookPreToolUse { diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go new file mode 100644 index 0000000..a5cfe22 --- /dev/null +++ b/internal/coworkobserve/coworkobserve_test.go @@ -0,0 +1,181 @@ +package coworkobserve + +import ( + "io" + "net" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/kontext-security/kontext-cli/internal/diagnostic" + "github.com/kontext-security/kontext-cli/internal/localruntime" +) + +// fakeDaemon accepts localruntime evaluate requests on a unix socket and +// records them, mimicking the managed-observe daemon. +type fakeDaemon struct { + listener net.Listener + mu sync.Mutex + requests []localruntime.EvaluateRequest +} + +func startFakeDaemon(t *testing.T) (*fakeDaemon, string) { + t.Helper() + dir, err := os.MkdirTemp("", "kx") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + socketPath := filepath.Join(dir, "s.sock") + listener, err := net.Listen("unix", socketPath) + if err != nil { + t.Fatalf("listen: %v", err) + } + d := &fakeDaemon{listener: listener} + t.Cleanup(func() { _ = listener.Close() }) + go func() { + for { + conn, err := listener.Accept() + if err != nil { + return + } + go func(conn net.Conn) { + defer conn.Close() + var req localruntime.EvaluateRequest + if err := localruntime.ReadMessage(conn, &req); err != nil { + return + } + d.mu.Lock() + d.requests = append(d.requests, req) + d.mu.Unlock() + _ = localruntime.WriteMessage(conn, localruntime.EvaluateResult{Type: "result", Allowed: true}) + }(conn) + } + }() + return d, socketPath +} + +func (d *fakeDaemon) toolNames() []string { + d.mu.Lock() + defer d.mu.Unlock() + names := make([]string, 0, len(d.requests)) + for _, req := range d.requests { + names = append(names, req.ToolName) + } + return names +} + +func testOptions(t *testing.T, socketPath string) Options { + t.Helper() + return Options{ + SocketPath: socketPath, + SessionsRoot: t.TempDir(), + Diagnostic: diagnostic.New(io.Discard, false), + } +} + +func writeSpool(t *testing.T, path, content string) { + t.Helper() + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + t.Fatal(err) + } + defer f.Close() + if _, err := f.WriteString(content); err != nil { + t.Fatal(err) + } +} + +func eventLine(tool string) string { + return `{"session_id":"s1","hook_event_name":"PreToolUse","tool_name":"` + tool + `","tool_input":{"command":"x"},"tool_use_id":"tu-` + tool + `","cwd":"/w"}` +} + +func TestDrainLeavesPartialTrailingLine(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + + writeSpool(t, spool, eventLine("Bash")+"\n"+`{"session_id":"s1","hook_event`) + c := &collector{offsets: map[string]int64{}} + c.drain(opts, spool) + + if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { + t.Fatalf("replayed tools = %v, want [Bash]", got) + } + wantOffset := int64(len(eventLine("Bash")) + 1) + if c.offsets[spool] != wantOffset { + t.Fatalf("offset = %d, want %d (end of last complete line)", c.offsets[spool], wantOffset) + } + + // Completing the partial line replays it on the next tick. + writeSpool(t, spool, `_name":"PreToolUse","tool_name":"Read","tool_use_id":"tu-2"}`+"\n") + c.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 2 || got[1] != "Read" { + t.Fatalf("replayed tools = %v, want [Bash Read]", got) + } +} + +func TestDrainHaltsOnTransportErrorAndRetries(t *testing.T) { + dir, err := os.MkdirTemp("", "kx") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + deadSocket := filepath.Join(dir, "dead.sock") + opts := testOptions(t, deadSocket) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n"+eventLine("Read")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, spool) + if c.offsets[spool] != 0 { + t.Fatalf("offset advanced to %d past undelivered events", c.offsets[spool]) + } + + // Daemon comes back; both events are delivered in order. + daemon, socketPath := startFakeDaemon(t) + opts.SocketPath = socketPath + c.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 2 || got[0] != "Bash" || got[1] != "Read" { + t.Fatalf("replayed tools = %v, want [Bash Read]", got) + } +} + +func TestDrainSkipsMalformedLines(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, "{not json\n"+eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { + t.Fatalf("replayed tools = %v, want [Bash]", got) + } + c.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 1 { + t.Fatalf("malformed line was retried: %v", got) + } +} + +func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, spool) + + // Spool recreated (e.g. cleaned up and the hook started a fresh one). The + // fresh file is smaller than the old offset, which signals the reset. + if err := os.Remove(spool); err != nil { + t.Fatal(err) + } + writeSpool(t, spool, `{"session_id":"s1","hook_event_name":"PreToolUse","tool_name":"Read","tool_use_id":"t"}`+"\n") + c.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 2 || got[1] != "Read" { + t.Fatalf("replayed tools = %v, want [Bash Read]", got) + } +} From 5959941c003b0d603eb4bbe089794621d06962b0 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 15:55:17 +0200 Subject: [PATCH 03/21] fix(coworkobserve): persist spool offsets so restarts do not duplicate ledger rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Offsets were in-memory only, so every daemon restart (update, reboot, idle cycle) re-replayed every existing spool from byte zero. SaveDecision inserts fresh action IDs per event — two per PreToolUse — so each restart double-ingested the full history of every live session into the ledger. The collector now loads/saves its offset map at a state file next to the guard DB (cowork-spool-offsets.json, written via temp+rename after ticks that changed it), mirroring the stream-state.json convention. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 62 +++++++++++++++++++- internal/coworkobserve/coworkobserve_test.go | 26 ++++++++ internal/managedobserve/daemon.go | 2 + 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 651b39b..afcce19 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -65,6 +65,10 @@ type Options struct { SocketPath string // SessionsRoot is the Cowork sessions root; defaults to the standard path. SessionsRoot string + // StatePath persists collector spool offsets across daemon restarts so a + // restart does not re-replay already-ingested events as duplicate ledger + // entries. Empty means offsets are kept in memory only. + StatePath string // PollInterval controls how often the loops scan; defaults to 250ms. PollInterval time.Duration Diagnostic diagnostic.Logger @@ -118,7 +122,7 @@ func Run(ctx context.Context, opts Options) { } opts.Diagnostic.Printf("cowork observe: watching %s\n", opts.SessionsRoot) - c := &collector{offsets: map[string]int64{}} + c := newCollector(opts.StatePath) settings := settingsJSON() ticker := time.NewTicker(opts.PollInterval) defer ticker.Stop() @@ -129,6 +133,7 @@ func Run(ctx context.Context, opts Options) { case <-ticker.C: inject(opts, settings) c.collect(opts) + c.saveOffsets(opts) } } } @@ -156,7 +161,58 @@ func inject(opts Options, settings []byte) { } type collector struct { - offsets map[string]int64 + offsets map[string]int64 + statePath string + dirty bool +} + +func newCollector(statePath string) *collector { + c := &collector{offsets: map[string]int64{}, statePath: statePath} + if statePath == "" { + return c + } + data, err := os.ReadFile(statePath) + if err != nil { + return c + } + if err := json.Unmarshal(data, &c.offsets); err != nil || c.offsets == nil { + c.offsets = map[string]int64{} + } + return c +} + +func (c *collector) setOffset(spool string, off int64) { + if c.offsets[spool] == off { + return + } + c.offsets[spool] = off + c.dirty = true +} + +// saveOffsets persists the offset map after ticks that changed it, via temp +// file + rename so a crash mid-write cannot corrupt the state. +func (c *collector) saveOffsets(opts Options) { + if !c.dirty || c.statePath == "" { + return + } + data, err := json.Marshal(c.offsets) + if err != nil { + return + } + if err := os.MkdirAll(filepath.Dir(c.statePath), 0o700); err != nil { + opts.Diagnostic.Printf("cowork observe: save offsets: %v\n", err) + return + } + tmp := c.statePath + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + opts.Diagnostic.Printf("cowork observe: save offsets: %v\n", err) + return + } + if err := os.Rename(tmp, c.statePath); err != nil { + opts.Diagnostic.Printf("cowork observe: save offsets: %v\n", err) + return + } + c.dirty = false } type coworkEvent struct { @@ -231,7 +287,7 @@ func (c *collector) drain(opts Options, spool string) { } consumed += idx + 1 } - c.offsets[spool] = off + int64(consumed) + c.setOffset(spool, off+int64(consumed)) } func (c *collector) replay(opts Options, line []byte) error { diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index a5cfe22..fe73fb5 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -159,6 +159,32 @@ func TestDrainSkipsMalformedLines(t *testing.T) { } } +func TestOffsetsPersistAcrossRestarts(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + statePath := filepath.Join(t.TempDir(), "offsets.json") + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := newCollector(statePath) + c.drain(opts, spool) + c.saveOffsets(opts) + + // A restarted collector must not re-replay the already-ingested event. + restarted := newCollector(statePath) + restarted.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 1 { + t.Fatalf("replayed tools after restart = %v, want exactly one Bash", got) + } + + // New events appended after the restart still flow. + writeSpool(t, spool, eventLine("Grep")+"\n") + restarted.drain(opts, spool) + if got := daemon.toolNames(); len(got) != 2 || got[1] != "Grep" { + t.Fatalf("replayed tools = %v, want [Bash Grep]", got) + } +} + func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index 49e93db..92d27eb 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "path/filepath" "time" "github.com/kontext-security/kontext-cli/internal/coworkobserve" @@ -97,6 +98,7 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { if coworkobserve.Enabled() { go coworkobserve.Run(ctx, coworkobserve.Options{ SocketPath: socketPath, + StatePath: filepath.Join(filepath.Dir(dbPath), "cowork-spool-offsets.json"), Diagnostic: opts.Diagnostic, }) } From 39539e3361118f2d9207442ee22c2d405d14bf40 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 15:56:35 +0200 Subject: [PATCH 04/21] fix(coworkobserve): merge hook into existing settings.json and write atomically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The injector overwrote any settings.json that lacked its marker, which would destroy settings Cowork or the user place in the per-session dir (and re-clobber them within one poll tick whenever the in-VM CLI rewrote the file). It also wrote with a plain truncate-then-write, so the CLI could race a read of a half-written file at exactly session startup. inject now parses the existing file, appends our PreToolUse matcher group alongside whatever is already there, and writes via temp file + rename. Unparseable existing content is still replaced — the CLI could not have read it either. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 79 +++++++++++++++----- internal/coworkobserve/coworkobserve_test.go | 49 ++++++++++++ 2 files changed, 108 insertions(+), 20 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index afcce19..66e8526 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -91,21 +91,56 @@ func DefaultSessionsRoot() string { // then exits 0 so the tool is never blocked. const hookCommand = `p=$(cat); printf '%s\n' "$p" >> ../` + spoolName + ` 2>/dev/null; true` -func settingsJSON() []byte { - settings := map[string]any{ - "hooks": map[string]any{ - "PreToolUse": []any{ - map[string]any{ - "matcher": "*", - "hooks": []any{ - map[string]any{"type": "command", "command": hookCommand, "timeout": 12}, - }, - }, - }, +func hookEntry() map[string]any { + return map[string]any{ + "matcher": "*", + "hooks": []any{ + map[string]any{"type": "command", "command": hookCommand, "timeout": 12}, }, } - data, _ := json.Marshal(settings) - return data +} + +// mergeSettings adds the spool hook to existing settings.json content, +// preserving every other setting and any hooks Cowork or the user put there. +// Existing content that is not valid JSON is replaced — the in-VM CLI could +// not have parsed it either. The second return reports whether a write is +// needed (false when the hook is already present). +func mergeSettings(existing []byte) ([]byte, bool) { + settings := map[string]any{} + if len(bytes.TrimSpace(existing)) > 0 { + if bytes.Contains(existing, []byte(settingsMark)) { + return nil, false + } + if err := json.Unmarshal(existing, &settings); err != nil { + settings = map[string]any{} + } + } + hooks, _ := settings["hooks"].(map[string]any) + if hooks == nil { + hooks = map[string]any{} + } + pre, _ := hooks["PreToolUse"].([]any) + hooks["PreToolUse"] = append(pre, hookEntry()) + settings["hooks"] = hooks + data, err := json.Marshal(settings) + if err != nil { + return nil, false + } + return data, true +} + +// writeFileAtomic writes via temp file + rename so the in-VM CLI can never +// observe a half-written settings.json at startup. +func writeFileAtomic(path string, data []byte, perm os.FileMode) error { + tmp := path + ".kontext-tmp" + if err := os.WriteFile(tmp, data, perm); err != nil { + return err + } + if err := os.Rename(tmp, path); err != nil { + _ = os.Remove(tmp) + return err + } + return nil } // Run starts the injector and collector loops and blocks until ctx is cancelled. @@ -123,7 +158,6 @@ func Run(ctx context.Context, opts Options) { opts.Diagnostic.Printf("cowork observe: watching %s\n", opts.SessionsRoot) c := newCollector(opts.StatePath) - settings := settingsJSON() ticker := time.NewTicker(opts.PollInterval) defer ticker.Stop() for { @@ -131,16 +165,16 @@ func Run(ctx context.Context, opts Options) { case <-ctx.Done(): return case <-ticker.C: - inject(opts, settings) + inject(opts) c.collect(opts) c.saveOffsets(opts) } } } -// inject writes settings.json into any recent per-session .claude dir that does -// not yet carry our hook. -func inject(opts Options, settings []byte) { +// inject merges the spool hook into settings.json in any recent per-session +// .claude dir that does not yet carry it. +func inject(opts Options) { claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) cutoff := time.Now().Add(-3 * time.Minute) for _, dir := range claudeDirs { @@ -149,10 +183,15 @@ func inject(opts Options, settings []byte) { continue } settingsPath := filepath.Join(dir, "settings.json") - if existing, err := os.ReadFile(settingsPath); err == nil && bytes.Contains(existing, []byte(settingsMark)) { + existing, err := os.ReadFile(settingsPath) + if err != nil && !os.IsNotExist(err) { + continue + } + merged, needed := mergeSettings(existing) + if !needed { continue // already ours } - if err := os.WriteFile(settingsPath, settings, 0o644); err != nil { + if err := writeFileAtomic(settingsPath, merged, 0o644); err != nil { opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) continue } diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index fe73fb5..84b8093 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -1,6 +1,8 @@ package coworkobserve import ( + "bytes" + "encoding/json" "io" "net" "os" @@ -91,6 +93,53 @@ func eventLine(tool string) string { return `{"session_id":"s1","hook_event_name":"PreToolUse","tool_name":"` + tool + `","tool_input":{"command":"x"},"tool_use_id":"tu-` + tool + `","cwd":"/w"}` } +func TestMergeSettingsPreservesExistingContent(t *testing.T) { + existing := []byte(`{"model":"opus","hooks":{"PreToolUse":[{"matcher":"Bash","hooks":[{"type":"command","command":"echo hi"}]}],"Stop":[{"hooks":[{"type":"command","command":"echo bye"}]}]}}`) + merged, needed := mergeSettings(existing) + if !needed { + t.Fatal("mergeSettings reported no write needed for foreign settings") + } + var settings map[string]any + if err := json.Unmarshal(merged, &settings); err != nil { + t.Fatalf("merged settings are not valid JSON: %v", err) + } + if settings["model"] != "opus" { + t.Fatalf("model dropped: %v", settings["model"]) + } + hooks := settings["hooks"].(map[string]any) + if _, ok := hooks["Stop"]; !ok { + t.Fatal("Stop hooks dropped") + } + pre := hooks["PreToolUse"].([]any) + if len(pre) != 2 { + t.Fatalf("PreToolUse entries = %d, want existing + ours", len(pre)) + } + if !bytes.Contains(merged, []byte("echo hi")) || !bytes.Contains(merged, []byte(settingsMark)) { + t.Fatal("merged settings missing existing hook or our spool hook") + } + + // A second merge is a no-op. + if _, needed := mergeSettings(merged); needed { + t.Fatal("mergeSettings wants to rewrite settings that already carry the hook") + } +} + +func TestMergeSettingsFromEmptyAndInvalid(t *testing.T) { + for _, existing := range [][]byte{nil, []byte(" "), []byte("{broken")} { + merged, needed := mergeSettings(existing) + if !needed { + t.Fatalf("mergeSettings(%q) reported no write needed", existing) + } + var settings map[string]any + if err := json.Unmarshal(merged, &settings); err != nil { + t.Fatalf("merged settings are not valid JSON: %v", err) + } + if !bytes.Contains(merged, []byte(settingsMark)) { + t.Fatal("merged settings missing spool hook") + } + } +} + func TestDrainLeavesPartialTrailingLine(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) From c5cb35eb4a32d0ee624276e4c8495e8674d1bf8f Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 15:57:06 +0200 Subject: [PATCH 05/21] fix(coworkobserve): spool via absolute $HOME path; tighten hook timeout The hook wrote the spool to ../ relative to whatever cwd it inherited, which silently misses the host mount if Cowork ever starts the CLI somewhere other than one level below the session dir. The session dir is the guest $HOME (its .claude subdir is where the CLI loads the injected settings from), so address the spool absolutely. Also drop the hook timeout from 12s to 5s: the append is local-disk fast, and the timeout bounds how much latency a hung host mount can add to every Cowork tool call. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 66e8526..bd6b17a 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -86,16 +86,20 @@ func DefaultSessionsRoot() string { return filepath.Join(home, "Library", "Application Support", "Claude", "local-agent-mode-sessions") } -// The command hook reads the full Claude Code hook event from stdin and appends -// it to a spool file one directory above cwd (the session dir, host-mounted), -// then exits 0 so the tool is never blocked. -const hookCommand = `p=$(cat); printf '%s\n' "$p" >> ../` + spoolName + ` 2>/dev/null; true` +// The command hook reads the full Claude Code hook event from stdin and +// appends it to a spool file in the guest $HOME (the per-session dir, which is +// the host mount: its .claude subdir is where the CLI reads these settings +// from), then exits 0 so the tool is never blocked. $HOME is absolute, so the +// spool lands in the session dir no matter what cwd the hook inherits. +const hookCommand = `p=$(cat); printf '%s\n' "$p" >> "$HOME"/` + spoolName + ` 2>/dev/null; true` func hookEntry() map[string]any { return map[string]any{ "matcher": "*", "hooks": []any{ - map[string]any{"type": "command", "command": hookCommand, "timeout": 12}, + // A local file append is near-instant; the short timeout bounds the + // latency a hung host mount can add to every tool call. + map[string]any{"type": "command", "command": hookCommand, "timeout": 5}, }, } } From a4c78b674ebfcaeccb6a239b86975a3d36cf4d2c Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 15:58:15 +0200 Subject: [PATCH 06/21] refactor(coworkobserve): reuse the Claude hook decoder for spool replay The collector hand-rolled a parallel decode struct that missed the toolUseId/toolUseID camelCase fallbacks and dropped permission_mode. Replay now goes through hookruntime.DecodeClaudeEvent (the same decoder the registered cowork agent adapter wraps) plus localruntime.EvaluateRequestFromEvent, so there is one decode path and no field drift between the hook path and the spool path. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 51 +++++--------------- internal/coworkobserve/coworkobserve_test.go | 29 +++++++++++ 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index bd6b17a..060a567 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -35,6 +35,7 @@ import ( "github.com/kontext-security/kontext-cli/internal/diagnostic" "github.com/kontext-security/kontext-cli/internal/hook" + "github.com/kontext-security/kontext-cli/internal/hookruntime" "github.com/kontext-security/kontext-cli/internal/localruntime" ) @@ -258,19 +259,6 @@ func (c *collector) saveOffsets(opts Options) { c.dirty = false } -type coworkEvent struct { - SessionID string `json:"session_id"` - SessionIDAlt string `json:"sessionId"` - HookEventName string `json:"hook_event_name"` - HookEventAlt string `json:"hookEventName"` - ToolName string `json:"tool_name"` - ToolNameAlt string `json:"toolName"` - ToolInput json.RawMessage `json:"tool_input"` - ToolInputAlt json.RawMessage `json:"toolInput"` - ToolUseID string `json:"tool_use_id"` - CWD string `json:"cwd"` -} - func (c *collector) collect(opts Options) { spools, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", spoolName)) for _, spool := range spools { @@ -333,29 +321,21 @@ func (c *collector) drain(opts Options, spool string) { c.setOffset(spool, off+int64(consumed)) } +// replay decodes a spool line with the same decoder the Claude Code hook path +// uses (Cowork runs the bundled Claude Code CLI, so the formats are identical) +// and forwards it to the daemon socket as agent "cowork". func (c *collector) replay(opts Options, line []byte) error { - var ev coworkEvent - if err := json.Unmarshal(line, &ev); err != nil { + event, err := hookruntime.DecodeClaudeEvent(line, agentName) + if err != nil { return fmt.Errorf("%w: %v", errMalformed, err) } - hookEvent := firstNonEmpty(ev.HookEventName, ev.HookEventAlt) - if hook.HookName(hookEvent) != hook.HookPreToolUse { + if event.HookName != hook.HookPreToolUse { return nil // injector only wires PreToolUse } - sessionID := firstNonEmpty(ev.SessionID, ev.SessionIDAlt) - toolInput := ev.ToolInput - if len(toolInput) == 0 { - toolInput = ev.ToolInputAlt - } - req := localruntime.EvaluateRequest{ - Type: "evaluate", - SessionID: "cowork-" + sessionID, - Agent: agentName, - HookEvent: hook.HookPreToolUse.String(), - ToolName: firstNonEmpty(ev.ToolName, ev.ToolNameAlt), - ToolInput: toolInput, - ToolUseID: ev.ToolUseID, - CWD: ev.CWD, + event.SessionID = "cowork-" + event.SessionID + req, err := localruntime.EvaluateRequestFromEvent(event) + if err != nil { + return fmt.Errorf("%w: %v", errMalformed, err) } return send(opts.SocketPath, req) } @@ -373,12 +353,3 @@ func send(socketPath string, req localruntime.EvaluateRequest) error { var res localruntime.EvaluateResult return localruntime.ReadMessage(conn, &res) } - -func firstNonEmpty(values ...string) string { - for _, v := range values { - if v != "" { - return v - } - } - return "" -} diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index 84b8093..22fe01d 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -234,6 +234,35 @@ func TestOffsetsPersistAcrossRestarts(t *testing.T) { } } +func TestReplayDecodesCamelCaseAndPermissionMode(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, `{"sessionId":"s9","hookEventName":"PreToolUse","toolName":"Bash","toolInput":{"command":"ls"},"toolUseId":"tu-9","cwd":"/w","permission_mode":"acceptEdits"}`+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, spool) + + daemon.mu.Lock() + defer daemon.mu.Unlock() + if len(daemon.requests) != 1 { + t.Fatalf("requests = %d, want 1", len(daemon.requests)) + } + req := daemon.requests[0] + if req.SessionID != "cowork-s9" || req.Agent != "cowork" { + t.Fatalf("session/agent = %q/%q", req.SessionID, req.Agent) + } + if req.ToolName != "Bash" || req.ToolUseID != "tu-9" { + t.Fatalf("camelCase fields dropped: tool=%q toolUseID=%q", req.ToolName, req.ToolUseID) + } + if req.PermissionMode != "acceptEdits" { + t.Fatalf("permission mode dropped: %q", req.PermissionMode) + } + if !bytes.Contains(req.ToolInput, []byte(`"ls"`)) { + t.Fatalf("tool input dropped: %s", req.ToolInput) + } +} + func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) From 5054a07c59a19e50133e7eed7cf87f6c6cdd2f78 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:00:05 +0200 Subject: [PATCH 07/21] feat(coworkobserve): add health counters and heartbeat diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Observation depends on undocumented Cowork internals (session dir layout, host mount, settings tier), so a Cowork update could break it with no error anywhere — "no activity" and "observation broken" looked identical. Track sessions seen vs sessions carrying the hook vs sessions producing a spool, plus events replayed and malformed lines dropped, and log a 5-minute heartbeat. Warn explicitly when a session never received the hook (injection raced CLI startup or the daemon started late) and when hooks are injected but no spool ever appears (layout/mount changed). Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 63 ++++++++++++++++++-- internal/coworkobserve/coworkobserve_test.go | 57 ++++++++++++++---- 2 files changed, 102 insertions(+), 18 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 060a567..67b5b31 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -163,23 +163,67 @@ func Run(ctx context.Context, opts Options) { opts.Diagnostic.Printf("cowork observe: watching %s\n", opts.SessionsRoot) c := newCollector(opts.StatePath) + h := newHealth() ticker := time.NewTicker(opts.PollInterval) defer ticker.Stop() + heartbeat := time.NewTicker(heartbeatInterval) + defer heartbeat.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: - inject(opts) - c.collect(opts) + inject(opts, h) + c.collect(opts, h) c.saveOffsets(opts) + case <-heartbeat.C: + h.logHeartbeat(opts) } } } +const heartbeatInterval = 5 * time.Minute + +// health tracks whether observation is actually working. The whole mechanism +// depends on undocumented Cowork internals (session dir layout, host mount, +// settings tier), so a Cowork update can break it without any error surfacing. +// The heartbeat makes "no Cowork activity" distinguishable from "observation +// broken" in the daemon diagnostics. +type health struct { + sessionsSeen map[string]bool // recent .claude dirs discovered by the injector + hooked map[string]bool // .claude dirs carrying our hook + spooled map[string]bool // session dirs that produced a spool file + eventsReplayed int64 + linesDropped int64 +} + +func newHealth() *health { + return &health{ + sessionsSeen: map[string]bool{}, + hooked: map[string]bool{}, + spooled: map[string]bool{}, + } +} + +func (h *health) logHeartbeat(opts Options) { + opts.Diagnostic.Printf( + "cowork observe: health: sessions seen=%d hooked=%d spooling=%d events replayed=%d malformed dropped=%d\n", + len(h.sessionsSeen), len(h.hooked), len(h.spooled), h.eventsReplayed, h.linesDropped, + ) + if len(h.sessionsSeen) > len(h.hooked) { + opts.Diagnostic.Printf( + "cowork observe: warning: %d session(s) never received the hook (injection raced CLI startup, or the daemon started after the session)\n", + len(h.sessionsSeen)-len(h.hooked), + ) + } + if len(h.hooked) > 0 && len(h.spooled) == 0 { + opts.Diagnostic.Printf("cowork observe: warning: hook injected but no spool has appeared; the Cowork session layout or mount may have changed\n") + } +} + // inject merges the spool hook into settings.json in any recent per-session // .claude dir that does not yet carry it. -func inject(opts Options) { +func inject(opts Options, h *health) { claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) cutoff := time.Now().Add(-3 * time.Minute) for _, dir := range claudeDirs { @@ -187,6 +231,7 @@ func inject(opts Options) { if err != nil || info.ModTime().Before(cutoff) { continue } + h.sessionsSeen[dir] = true settingsPath := filepath.Join(dir, "settings.json") existing, err := os.ReadFile(settingsPath) if err != nil && !os.IsNotExist(err) { @@ -194,12 +239,14 @@ func inject(opts Options) { } merged, needed := mergeSettings(existing) if !needed { + h.hooked[dir] = true continue // already ours } if err := writeFileAtomic(settingsPath, merged, 0o644); err != nil { opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) continue } + h.hooked[dir] = true opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) } } @@ -259,10 +306,11 @@ func (c *collector) saveOffsets(opts Options) { c.dirty = false } -func (c *collector) collect(opts Options) { +func (c *collector) collect(opts Options, h *health) { spools, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", spoolName)) for _, spool := range spools { - c.drain(opts, spool) + h.spooled[filepath.Dir(spool)] = true + c.drain(opts, h, spool) } } @@ -276,7 +324,7 @@ var errMalformed = errors.New("malformed spool line") // drain replays complete lines appended since the last tick. The in-VM hook // may be mid-append while we read, so a trailing chunk without a newline is // left in place — the offset only ever advances past complete, handled lines. -func (c *collector) drain(opts Options, spool string) { +func (c *collector) drain(opts Options, h *health, spool string) { f, err := os.Open(spool) if err != nil { return @@ -313,7 +361,10 @@ func (c *collector) drain(opts Options, spool string) { opts.Diagnostic.Printf("cowork observe: replay: %v\n", err) break } + h.linesDropped++ opts.Diagnostic.Printf("cowork observe: %v\n", err) + } else { + h.eventsReplayed++ } } consumed += idx + 1 diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index 22fe01d..5a066dc 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -140,6 +140,39 @@ func TestMergeSettingsFromEmptyAndInvalid(t *testing.T) { } } +func TestInjectAndHealthTracking(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + claudeDir := filepath.Join(sessionDir, ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + + h := newHealth() + inject(opts, h) + if len(h.sessionsSeen) != 1 || len(h.hooked) != 1 { + t.Fatalf("seen=%d hooked=%d, want 1/1", len(h.sessionsSeen), len(h.hooked)) + } + settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) + if err != nil { + t.Fatalf("settings.json not written: %v", err) + } + if !bytes.Contains(settings, []byte(settingsMark)) { + t.Fatalf("settings.json missing spool hook: %s", settings) + } + + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + c := &collector{offsets: map[string]int64{}} + c.collect(opts, h) + if len(h.spooled) != 1 || h.eventsReplayed != 1 { + t.Fatalf("spooled=%d replayed=%d, want 1/1", len(h.spooled), h.eventsReplayed) + } + if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { + t.Fatalf("replayed tools = %v, want [Bash]", got) + } +} + func TestDrainLeavesPartialTrailingLine(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) @@ -147,7 +180,7 @@ func TestDrainLeavesPartialTrailingLine(t *testing.T) { writeSpool(t, spool, eventLine("Bash")+"\n"+`{"session_id":"s1","hook_event`) c := &collector{offsets: map[string]int64{}} - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { t.Fatalf("replayed tools = %v, want [Bash]", got) @@ -159,7 +192,7 @@ func TestDrainLeavesPartialTrailingLine(t *testing.T) { // Completing the partial line replays it on the next tick. writeSpool(t, spool, `_name":"PreToolUse","tool_name":"Read","tool_use_id":"tu-2"}`+"\n") - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 2 || got[1] != "Read" { t.Fatalf("replayed tools = %v, want [Bash Read]", got) } @@ -177,7 +210,7 @@ func TestDrainHaltsOnTransportErrorAndRetries(t *testing.T) { writeSpool(t, spool, eventLine("Bash")+"\n"+eventLine("Read")+"\n") c := &collector{offsets: map[string]int64{}} - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if c.offsets[spool] != 0 { t.Fatalf("offset advanced to %d past undelivered events", c.offsets[spool]) } @@ -185,7 +218,7 @@ func TestDrainHaltsOnTransportErrorAndRetries(t *testing.T) { // Daemon comes back; both events are delivered in order. daemon, socketPath := startFakeDaemon(t) opts.SocketPath = socketPath - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 2 || got[0] != "Bash" || got[1] != "Read" { t.Fatalf("replayed tools = %v, want [Bash Read]", got) } @@ -198,11 +231,11 @@ func TestDrainSkipsMalformedLines(t *testing.T) { writeSpool(t, spool, "{not json\n"+eventLine("Bash")+"\n") c := &collector{offsets: map[string]int64{}} - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { t.Fatalf("replayed tools = %v, want [Bash]", got) } - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 1 { t.Fatalf("malformed line was retried: %v", got) } @@ -216,19 +249,19 @@ func TestOffsetsPersistAcrossRestarts(t *testing.T) { writeSpool(t, spool, eventLine("Bash")+"\n") c := newCollector(statePath) - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) c.saveOffsets(opts) // A restarted collector must not re-replay the already-ingested event. restarted := newCollector(statePath) - restarted.drain(opts, spool) + restarted.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 1 { t.Fatalf("replayed tools after restart = %v, want exactly one Bash", got) } // New events appended after the restart still flow. writeSpool(t, spool, eventLine("Grep")+"\n") - restarted.drain(opts, spool) + restarted.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 2 || got[1] != "Grep" { t.Fatalf("replayed tools = %v, want [Bash Grep]", got) } @@ -241,7 +274,7 @@ func TestReplayDecodesCamelCaseAndPermissionMode(t *testing.T) { writeSpool(t, spool, `{"sessionId":"s9","hookEventName":"PreToolUse","toolName":"Bash","toolInput":{"command":"ls"},"toolUseId":"tu-9","cwd":"/w","permission_mode":"acceptEdits"}`+"\n") c := &collector{offsets: map[string]int64{}} - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) daemon.mu.Lock() defer daemon.mu.Unlock() @@ -270,7 +303,7 @@ func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { writeSpool(t, spool, eventLine("Bash")+"\n") c := &collector{offsets: map[string]int64{}} - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) // Spool recreated (e.g. cleaned up and the hook started a fresh one). The // fresh file is smaller than the old offset, which signals the reset. @@ -278,7 +311,7 @@ func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { t.Fatal(err) } writeSpool(t, spool, `{"session_id":"s1","hook_event_name":"PreToolUse","tool_name":"Read","tool_use_id":"t"}`+"\n") - c.drain(opts, spool) + c.drain(opts, newHealth(), spool) if got := daemon.toolNames(); len(got) != 2 || got[1] != "Read" { t.Fatalf("replayed tools = %v, want [Bash Read]", got) } From 1b23af024325042d4dfd859076e08d1f16abc15e Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:01:16 +0200 Subject: [PATCH 08/21] feat(coworkobserve): delete drained spools after an idle retention window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spool files hold raw, unredacted tool inputs (the normal pipeline only persists redacted parameters) and were never rotated or deleted, so they accumulated plaintext on customer disks for as long as the session dir lived — and inflated what any offset-state loss could re-replay. The collector now removes a spool once it is fully drained and idle for an hour, drops its offset entry, and also prunes offset entries whose session dir Cowork has deleted. A session that wakes up again simply recreates the spool and drain starts over via the shrink reset. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 38 +++++++++++++ internal/coworkobserve/coworkobserve_test.go | 58 ++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 67b5b31..0bb5978 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -306,12 +306,50 @@ func (c *collector) saveOffsets(opts Options) { c.dirty = false } +// spoolRetention is how long a fully-drained spool may sit idle before the +// collector deletes it. Spools hold raw, unredacted tool inputs, so they +// should not accumulate on disk indefinitely. +const spoolRetention = time.Hour + func (c *collector) collect(opts Options, h *health) { spools, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", spoolName)) + live := make(map[string]bool, len(spools)) for _, spool := range spools { + live[spool] = true h.spooled[filepath.Dir(spool)] = true c.drain(opts, h, spool) + c.cleanup(opts, spool) + } + // Cowork deleted the session dir; its offset entry is dead weight. + for spool := range c.offsets { + if !live[spool] { + delete(c.offsets, spool) + c.dirty = true + } + } +} + +// cleanup removes a spool once it is fully drained and idle past the +// retention window. If the session wakes up again, the hook recreates the +// file and drain starts over from offset zero (the shrink reset). +func (c *collector) cleanup(opts Options, spool string) { + info, err := os.Stat(spool) + if err != nil { + return + } + if time.Since(info.ModTime()) < spoolRetention { + return + } + if c.offsets[spool] != info.Size() { + return // not fully drained yet } + if err := os.Remove(spool); err != nil { + opts.Diagnostic.Printf("cowork observe: remove drained spool %s: %v\n", spool, err) + return + } + delete(c.offsets, spool) + c.dirty = true + opts.Diagnostic.Printf("cowork observe: removed drained spool %s\n", spool) } // errMalformed marks spool lines that can never replay successfully; drain diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index 5a066dc..0a071e8 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/kontext-security/kontext-cli/internal/diagnostic" "github.com/kontext-security/kontext-cli/internal/localruntime" @@ -296,6 +297,63 @@ func TestReplayDecodesCamelCaseAndPermissionMode(t *testing.T) { } } +func TestCollectCleansUpDrainedIdleSpools(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + h := newHealth() + c.collect(opts, h) + if _, err := os.Stat(spool); err != nil { + t.Fatalf("fresh drained spool was removed: %v", err) + } + + // Once idle past the retention window, the drained spool is deleted and + // its offset entry dropped. + old := time.Now().Add(-2 * spoolRetention) + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + c.collect(opts, h) + if _, err := os.Stat(spool); !os.IsNotExist(err) { + t.Fatalf("idle drained spool still present (err=%v)", err) + } + if _, ok := c.offsets[spool]; ok { + t.Fatal("offset entry for removed spool not pruned") + } +} + +func TestCollectKeepsUndrainedSpools(t *testing.T) { + dir, err := os.MkdirTemp("", "kx") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + opts := testOptions(t, filepath.Join(dir, "dead.sock")) // daemon down: nothing drains + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + old := time.Now().Add(-2 * spoolRetention) + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + + c := &collector{offsets: map[string]int64{}} + c.collect(opts, newHealth()) + if _, err := os.Stat(spool); err != nil { + t.Fatalf("undrained spool was removed: %v", err) + } +} + func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) From 3a7064f00434516194efe129cac805eb80a24f06 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:02:45 +0200 Subject: [PATCH 09/21] feat(coworkobserve): gate enablement via managed.json cowork_observe field The only switch was the KONTEXT_COWORK_OBSERVE env var, which is awkward to plumb through launchd plists on MDM-deployed installs. managed.json now carries an optional cowork_observe boolean (default false) and the daemon honors either it or the env var, which stays as a dev override. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 2 ++ internal/managedconfig/config.go | 4 ++++ internal/managedconfig/config_test.go | 19 +++++++++++++++++++ internal/managedobserve/daemon.go | 3 ++- 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 0bb5978..a7504ce 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -51,6 +51,8 @@ const ( ) // Enabled reports whether Cowork observation is turned on via the environment. +// Managed installs should prefer the cowork_observe field in managed.json; +// the env var remains as a development/debugging override. func Enabled() bool { switch strings.ToLower(strings.TrimSpace(os.Getenv(EnvEnabled))) { case "1", "true", "yes", "on": diff --git a/internal/managedconfig/config.go b/internal/managedconfig/config.go index 83d5c1b..de913ac 100644 --- a/internal/managedconfig/config.go +++ b/internal/managedconfig/config.go @@ -44,6 +44,10 @@ type Config struct { Agent string `json:"agent"` Credentials Credentials `json:"credentials"` Device Device `json:"device,omitempty"` + // CoworkObserve turns on Claude Cowork observation in the managed-observe + // daemon. A managed.json field so MDM-deployed installs control it through + // config rather than launchd environment plumbing. + CoworkObserve bool `json:"cowork_observe,omitempty"` } type Credentials struct { diff --git a/internal/managedconfig/config_test.go b/internal/managedconfig/config_test.go index b10efad..0bf713e 100644 --- a/internal/managedconfig/config_test.go +++ b/internal/managedconfig/config_test.go @@ -44,6 +44,25 @@ func TestParseValidConfigNormalizesStrings(t *testing.T) { } } +func TestParseCoworkObserve(t *testing.T) { + cfg, err := Parse([]byte(validConfigJSON())) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if cfg.CoworkObserve { + t.Fatal("cowork_observe should default to false") + } + withFlag := strings.Replace(validConfigJSON(), `"mode": "observe",`, `"mode": "observe", + "cowork_observe": true,`, 1) + cfg, err = Parse([]byte(withFlag)) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if !cfg.CoworkObserve { + t.Fatal("cowork_observe = false, want true") + } +} + func TestParseRejectsUnknownFields(t *testing.T) { _, err := Parse([]byte(`{ "version": "managed-install-v1", diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index 92d27eb..551757c 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -95,7 +95,8 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { // Cowork observation runs alongside Claude Code in the same daemon, replaying // in-VM Cowork tool events into the same localruntime socket as agent "cowork". - if coworkobserve.Enabled() { + // Enabled via managed.json (cowork_observe) or the env var override. + if loadedConfig.Config.CoworkObserve || coworkobserve.Enabled() { go coworkobserve.Run(ctx, coworkobserve.Options{ SocketPath: socketPath, StatePath: filepath.Join(filepath.Dir(dbPath), "cowork-spool-offsets.json"), From e696864a06f9f80dbc32d7beaf272883855136c7 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:03:12 +0200 Subject: [PATCH 10/21] docs(coworkobserve): document integrity, delivery, coupling, and deployment caveats Spell out in the package doc that spool events are self-reported in-VM telemetry (forgeable, observe-only, never enforcement), that delivery is at-least-once, that the mechanism rides on undocumented Cowork internals watched by the health heartbeat, and that the daemon must run in the session user's context so injected files are not root-owned. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index a7504ce..820d46c 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -18,6 +18,28 @@ // - collector: tails those spool files and replays each event into the // daemon's existing localruntime socket as agent "cowork", reusing the same // classify -> store -> managedstream -> ledger path Claude Code uses. +// +// # Caveats +// +// Integrity: the spool is written by code running inside the VM, so anything +// in the VM (including a prompt-injected agent) can append forged events or +// withhold real ones. Cowork-tagged ledger entries are self-reported +// telemetry, not attested records, and this package is observe-only — +// it must not be treated as enforcement. +// +// Delivery: events are replayed at-least-once. A replay that fails after a +// partial send is retried, so the ledger may very occasionally see a +// duplicate; it never silently drops a complete spool line. +// +// Coupling: the session-dir layout, the host mount, and the user-tier +// settings load are undocumented Cowork internals; an update can break +// observation without an error. The health heartbeat (sessions seen vs +// hooked vs spooling) exists so that breakage is visible in diagnostics. +// +// Deployment: run the daemon in the session user's context (LaunchAgent, +// not a root LaunchDaemon) — the injector writes settings.json into the +// user's ~/Library and root-owned files there may confuse Cowork or the +// VM mount's UID mapping. package coworkobserve import ( From a234d177c0cdbf778c0dfeed93803617ab9d75db Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:37:22 +0200 Subject: [PATCH 11/21] feat(managedconfig): accept mode "enforce"; rename cowork_observe to cowork_enabled managed.json already carries the deployment-level mode knob but validation pinned it to "observe". Allow "enforce" so the same single knob the rest of the runtime is built around (guardhookruntime.Mode, per-edge result transforms) can switch managed installs to blocking. cowork_observe becomes cowork_enabled: it gates whether the Cowork loops run at all, while the posture now follows mode. Co-Authored-By: Claude Fable 5 --- internal/managedconfig/config.go | 20 ++++++++++++-------- internal/managedconfig/config_test.go | 25 +++++++++++++++++++------ internal/managedobserve/daemon.go | 2 +- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/internal/managedconfig/config.go b/internal/managedconfig/config.go index de913ac..e6dc5cf 100644 --- a/internal/managedconfig/config.go +++ b/internal/managedconfig/config.go @@ -21,8 +21,11 @@ import ( const ( Version = "managed-install-v1" - Mode = "observe" - Agent = "claude" + // Mode is the default posture; ModeEnforce turns daemon decisions into + // real denies at every hook edge (Claude Code and Cowork alike). + Mode = "observe" + ModeEnforce = "enforce" + Agent = "claude" DefaultPath = "/Library/Application Support/Kontext/managed.json" EnvPath = "KONTEXT_MANAGED_CONFIG" @@ -44,10 +47,11 @@ type Config struct { Agent string `json:"agent"` Credentials Credentials `json:"credentials"` Device Device `json:"device,omitempty"` - // CoworkObserve turns on Claude Cowork observation in the managed-observe - // daemon. A managed.json field so MDM-deployed installs control it through - // config rather than launchd environment plumbing. - CoworkObserve bool `json:"cowork_observe,omitempty"` + // CoworkEnabled turns on Claude Cowork observation/enforcement in the + // managed-observe daemon (the posture follows Mode). A managed.json field + // so MDM-deployed installs control it through config rather than launchd + // environment plumbing. + CoworkEnabled bool `json:"cowork_enabled,omitempty"` } type Credentials struct { @@ -205,8 +209,8 @@ func normalizeAndValidate(cfg Config) (Config, error) { if err := validateCloudURL(cfg.CloudURL); err != nil { return Config{}, err } - if cfg.Mode != Mode { - return Config{}, fmt.Errorf("mode must be %q", Mode) + if cfg.Mode != Mode && cfg.Mode != ModeEnforce { + return Config{}, fmt.Errorf("mode must be %q or %q", Mode, ModeEnforce) } if cfg.Agent != Agent { return Config{}, fmt.Errorf("agent must be %q", Agent) diff --git a/internal/managedconfig/config_test.go b/internal/managedconfig/config_test.go index 0bf713e..6ea482e 100644 --- a/internal/managedconfig/config_test.go +++ b/internal/managedconfig/config_test.go @@ -44,22 +44,35 @@ func TestParseValidConfigNormalizesStrings(t *testing.T) { } } -func TestParseCoworkObserve(t *testing.T) { +func TestParseCoworkEnabled(t *testing.T) { cfg, err := Parse([]byte(validConfigJSON())) if err != nil { t.Fatalf("Parse() error = %v", err) } - if cfg.CoworkObserve { - t.Fatal("cowork_observe should default to false") + if cfg.CoworkEnabled { + t.Fatal("cowork_enabled should default to false") } withFlag := strings.Replace(validConfigJSON(), `"mode": "observe",`, `"mode": "observe", - "cowork_observe": true,`, 1) + "cowork_enabled": true,`, 1) cfg, err = Parse([]byte(withFlag)) if err != nil { t.Fatalf("Parse() error = %v", err) } - if !cfg.CoworkObserve { - t.Fatal("cowork_observe = false, want true") + if !cfg.CoworkEnabled { + t.Fatal("cowork_enabled = false, want true") + } +} + +func TestParseModeObserveAndEnforce(t *testing.T) { + cfg, err := Parse([]byte(strings.Replace(validConfigJSON(), `"mode": "observe"`, `"mode": "enforce"`, 1))) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if cfg.Mode != ModeEnforce { + t.Fatalf("Mode = %q, want %q", cfg.Mode, ModeEnforce) + } + if _, err := Parse([]byte(strings.Replace(validConfigJSON(), `"mode": "observe"`, `"mode": "block"`, 1))); err == nil { + t.Fatal("Parse() accepted unknown mode") } } diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index 551757c..0eee21c 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -96,7 +96,7 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { // Cowork observation runs alongside Claude Code in the same daemon, replaying // in-VM Cowork tool events into the same localruntime socket as agent "cowork". // Enabled via managed.json (cowork_observe) or the env var override. - if loadedConfig.Config.CoworkObserve || coworkobserve.Enabled() { + if loadedConfig.Config.CoworkEnabled || coworkobserve.Enabled() { go coworkobserve.Run(ctx, coworkobserve.Options{ SocketPath: socketPath, StatePath: filepath.Join(filepath.Dir(dbPath), "cowork-spool-offsets.json"), From 00153652cd98e33e9f1d1d26b0404e8000ec418d Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:38:05 +0200 Subject: [PATCH 12/21] feat(managedobserve): drive hook edges from the configured managed mode RunDaemon hardcoded ModeObserve; it now parses managed.json's mode and passes it to runtimehost (whose existing per-edge result transform already returns real denies when not observing) and to the Cowork observer, which will pick its injected hook variant by the same mode. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 5 +++++ internal/managedobserve/daemon.go | 10 +++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 820d46c..b2f50e5 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -56,6 +56,7 @@ import ( "time" "github.com/kontext-security/kontext-cli/internal/diagnostic" + guardhookruntime "github.com/kontext-security/kontext-cli/internal/guard/hookruntime" "github.com/kontext-security/kontext-cli/internal/hook" "github.com/kontext-security/kontext-cli/internal/hookruntime" "github.com/kontext-security/kontext-cli/internal/localruntime" @@ -94,6 +95,10 @@ type Options struct { // restart does not re-replay already-ingested events as duplicate ledger // entries. Empty means offsets are kept in memory only. StatePath string + // Mode selects the injected hook: observe installs the fire-and-forget + // spool append, enforce installs the decision round-trip that blocks the + // tool until the daemon's verdict lands. Defaults to observe. + Mode guardhookruntime.Mode // PollInterval controls how often the loops scan; defaults to 250ms. PollInterval time.Duration Diagnostic diagnostic.Logger diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index 0eee21c..e830663 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -60,11 +60,18 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { opts.Diagnostic.Printf("managed observe cleanup: %v\n", err) } + // The deployment-level mode from managed.json drives every hook edge: + // observe records would-decisions, enforce returns real denies. + mode, err := guardhookruntime.ParseMode(loadedConfig.Config.Mode) + if err != nil { + return fmt.Errorf("parse managed mode: %w", err) + } + host, err := runtimehost.Start(ctx, runtimehost.Options{ AgentName: managedconfig.Agent, DBPath: dbPath, SocketPath: socketPath, - Mode: guardhookruntime.ModeObserve, + Mode: mode, Diagnostic: opts.Diagnostic, SkipInitialSession: true, DisableAsyncIngest: true, @@ -100,6 +107,7 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { go coworkobserve.Run(ctx, coworkobserve.Options{ SocketPath: socketPath, StatePath: filepath.Join(filepath.Dir(dbPath), "cowork-spool-offsets.json"), + Mode: mode, Diagnostic: opts.Diagnostic, }) } From 33db709cd74f42935831dff3fb8d7a21f7496ca9 Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:41:55 +0200 Subject: [PATCH 13/21] =?UTF-8?q?feat(coworkobserve):=20enforce=20mode=20?= =?UTF-8?q?=E2=80=94=20deny/allow=20via=20decision-file=20round-trip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In enforce mode the injected hook becomes synchronous: it appends the event wrapped in an envelope carrying a hook-generated request id, then polls the host-mounted kontext-cowork-decisions/.json for up to 10s inside claudemanaged's conventional 20s hook timeout. The collector evaluates the event through the daemon socket (whose per-edge transform already returns real denies when the managed mode is enforce), renders the verdict with the standard Claude encoder, and parks it for the hook to emit verbatim — the CLI honors the permissionDecision. No decision in time means the hook emits deny "Kontext daemon unavailable" itself: fail-closed, mirroring the sidecar's enforce behavior. Request ids originate inside the VM, so they are validated against a strict charset before naming any file the daemon writes. mergeSettings now replaces stale variants of our entry on mode switches, orphaned decision files are TTL-cleaned, the scan tightens to 100ms under enforce, and the heartbeat reports denies. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 230 ++++++++++++++++--- internal/coworkobserve/coworkobserve_test.go | 126 +++++++++- 2 files changed, 317 insertions(+), 39 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index b2f50e5..6067c4f 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -52,6 +52,7 @@ import ( "net" "os" "path/filepath" + "regexp" "strings" "time" @@ -68,9 +69,16 @@ const ( // EnvSessionsRoot overrides the Cowork sessions root for testing. EnvSessionsRoot = "KONTEXT_COWORK_SESSIONS_ROOT" - spoolName = "kontext-cowork-events.jsonl" - settingsMark = spoolName // settings.json containing this string is ours - agentName = "cowork" + spoolName = "kontext-cowork-events.jsonl" + decisionsDirName = "kontext-cowork-decisions" + settingsMark = spoolName // hook commands containing this string are ours + agentName = "cowork" + + // Enforce-hook budget, mirroring the sidecar conventions: the hook waits + // up to decisionWait for the daemon's verdict (hookConnDeadline is 10s on + // the socket side) inside claudemanaged's default 20s hook timeout. + enforceHookTimeout = 20 + observeHookTimeout = 5 ) // Enabled reports whether Cowork observation is turned on via the environment. @@ -116,35 +124,61 @@ func DefaultSessionsRoot() string { return filepath.Join(home, "Library", "Application Support", "Claude", "local-agent-mode-sessions") } -// The command hook reads the full Claude Code hook event from stdin and +// The observe hook reads the full Claude Code hook event from stdin and // appends it to a spool file in the guest $HOME (the per-session dir, which is // the host mount: its .claude subdir is where the CLI reads these settings // from), then exits 0 so the tool is never blocked. $HOME is absolute, so the // spool lands in the session dir no matter what cwd the hook inherits. -const hookCommand = `p=$(cat); printf '%s\n' "$p" >> "$HOME"/` + spoolName + ` 2>/dev/null; true` +const observeHookCommand = `p=$(cat); printf '%s\n' "$p" >> "$HOME"/` + spoolName + ` 2>/dev/null; true` + +// denyJSON is the fail-closed verdict the enforce hook emits itself when no +// decision arrives in time. Reason mirrors the sidecar's enforce behavior on +// daemon unavailability (guard/hookruntime). +const denyJSON = `{"hookSpecificOutput":{"hookEventName":"PreToolUse","permissionDecision":"deny","permissionDecisionReason":"Kontext daemon unavailable"}}` -func hookEntry() map[string]any { +// The enforce hook wraps the event in an envelope carrying a hook-generated +// request ID (the shell cannot parse tool_use_id out of the JSON), appends it +// to the spool, then polls for the daemon-rendered decision file and emits it +// verbatim — so the CLI sees exactly the permissionDecision the policy engine +// produced. No decision within 10s (100 x 0.1s) means deny: fail-closed, same +// as the sidecar when the daemon is unreachable. The one gap we cannot close +// is the CLI killing the hook at its 20s timeout, which Claude Code treats as +// allow. +const enforceHookCommand = `p=$(cat) +deny='` + denyJSON + `' +if [ -z "$p" ]; then printf '%s\n' "$deny"; exit 0; fi +rid="$$-$(date +%s%N)" +if ! printf '{"rid":"%s","event":%s}\n' "$rid" "$p" >> "$HOME"/` + spoolName + ` 2>/dev/null; then printf '%s\n' "$deny"; exit 0; fi +d="$HOME"/` + decisionsDirName + `/"$rid".json +i=0 +while [ "$i" -lt 100 ]; do + if [ -f "$d" ]; then cat "$d" 2>/dev/null; rm -f "$d" 2>/dev/null; exit 0; fi + i=$((i+1)); sleep 0.1 +done +printf '%s\n' "$deny"` + +func hookEntry(mode guardhookruntime.Mode) map[string]any { + command, timeout := observeHookCommand, observeHookTimeout + if mode == guardhookruntime.ModeEnforce { + command, timeout = enforceHookCommand, enforceHookTimeout + } return map[string]any{ "matcher": "*", "hooks": []any{ - // A local file append is near-instant; the short timeout bounds the - // latency a hung host mount can add to every tool call. - map[string]any{"type": "command", "command": hookCommand, "timeout": 5}, + map[string]any{"type": "command", "command": command, "timeout": timeout}, }, } } -// mergeSettings adds the spool hook to existing settings.json content, -// preserving every other setting and any hooks Cowork or the user put there. -// Existing content that is not valid JSON is replaced — the in-VM CLI could -// not have parsed it either. The second return reports whether a write is -// needed (false when the hook is already present). -func mergeSettings(existing []byte) ([]byte, bool) { +// mergeSettings adds the given spool hook entry to existing settings.json +// content, preserving every other setting and any hooks Cowork or the user put +// there. Stale variants of our own entry (e.g. after a mode switch) are +// replaced. Existing content that is not valid JSON is replaced wholesale — +// the in-VM CLI could not have parsed it either. The second return reports +// whether a write is needed (false when the current entry is already present). +func mergeSettings(existing []byte, entry map[string]any) ([]byte, bool) { settings := map[string]any{} if len(bytes.TrimSpace(existing)) > 0 { - if bytes.Contains(existing, []byte(settingsMark)) { - return nil, false - } if err := json.Unmarshal(existing, &settings); err != nil { settings = map[string]any{} } @@ -154,7 +188,30 @@ func mergeSettings(existing []byte) ([]byte, bool) { hooks = map[string]any{} } pre, _ := hooks["PreToolUse"].([]any) - hooks["PreToolUse"] = append(pre, hookEntry()) + wantJSON, err := json.Marshal(entry) + if err != nil { + return nil, false + } + kept := make([]any, 0, len(pre)+1) + current := false + for _, candidate := range pre { + if !entryIsOurs(candidate) { + kept = append(kept, candidate) + continue + } + if got, err := json.Marshal(candidate); err == nil && bytes.Equal(got, wantJSON) && !current { + current = true + kept = append(kept, candidate) + } + // stale or duplicate variants of our entry are dropped + } + if current && len(kept) == len(pre) { + return nil, false + } + if !current { + kept = append(kept, entry) + } + hooks["PreToolUse"] = kept settings["hooks"] = hooks data, err := json.Marshal(settings) if err != nil { @@ -163,6 +220,26 @@ func mergeSettings(existing []byte) ([]byte, bool) { return data, true } +// entryIsOurs reports whether a PreToolUse matcher group was installed by the +// injector (any of its command hooks references the spool file). +func entryIsOurs(candidate any) bool { + m, ok := candidate.(map[string]any) + if !ok { + return false + } + hooks, _ := m["hooks"].([]any) + for _, h := range hooks { + hm, ok := h.(map[string]any) + if !ok { + continue + } + if command, _ := hm["command"].(string); strings.Contains(command, settingsMark) { + return true + } + } + return false +} + // writeFileAtomic writes via temp file + rename so the in-VM CLI can never // observe a half-written settings.json at startup. func writeFileAtomic(path string, data []byte, perm os.FileMode) error { @@ -183,7 +260,13 @@ func Run(ctx context.Context, opts Options) { opts.SessionsRoot = DefaultSessionsRoot() } if opts.PollInterval <= 0 { - opts.PollInterval = 250 * time.Millisecond + // Enforce holds every tool call for the spool round-trip, so scan + // tighter than the observe default. + if opts.Mode == guardhookruntime.ModeEnforce { + opts.PollInterval = 100 * time.Millisecond + } else { + opts.PollInterval = 250 * time.Millisecond + } } if opts.SessionsRoot == "" { opts.Diagnostic.Printf("cowork observe: no sessions root; disabled\n") @@ -224,6 +307,7 @@ type health struct { spooled map[string]bool // session dirs that produced a spool file eventsReplayed int64 linesDropped int64 + denied int64 } func newHealth() *health { @@ -236,8 +320,8 @@ func newHealth() *health { func (h *health) logHeartbeat(opts Options) { opts.Diagnostic.Printf( - "cowork observe: health: sessions seen=%d hooked=%d spooling=%d events replayed=%d malformed dropped=%d\n", - len(h.sessionsSeen), len(h.hooked), len(h.spooled), h.eventsReplayed, h.linesDropped, + "cowork observe: health: sessions seen=%d hooked=%d spooling=%d events replayed=%d denied=%d malformed dropped=%d\n", + len(h.sessionsSeen), len(h.hooked), len(h.spooled), h.eventsReplayed, h.denied, h.linesDropped, ) if len(h.sessionsSeen) > len(h.hooked) { opts.Diagnostic.Printf( @@ -250,11 +334,12 @@ func (h *health) logHeartbeat(opts Options) { } } -// inject merges the spool hook into settings.json in any recent per-session -// .claude dir that does not yet carry it. +// inject merges the mode-appropriate spool hook into settings.json in any +// recent per-session .claude dir that does not yet carry it. func inject(opts Options, h *health) { claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) cutoff := time.Now().Add(-3 * time.Minute) + entry := hookEntry(opts.Mode) for _, dir := range claudeDirs { info, err := os.Stat(dir) if err != nil || info.ModTime().Before(cutoff) { @@ -266,7 +351,7 @@ func inject(opts Options, h *health) { if err != nil && !os.IsNotExist(err) { continue } - merged, needed := mergeSettings(existing) + merged, needed := mergeSettings(existing, entry) if !needed { h.hooked[dir] = true continue // already ours @@ -348,6 +433,7 @@ func (c *collector) collect(opts Options, h *health) { h.spooled[filepath.Dir(spool)] = true c.drain(opts, h, spool) c.cleanup(opts, spool) + cleanupOrphanDecisions(opts, filepath.Dir(spool)) } // Cowork deleted the session dir; its offset entry is dead weight. for spool := range c.offsets { @@ -423,7 +509,7 @@ func (c *collector) drain(opts Options, h *health, spool string) { } line := bytes.TrimSpace(data[consumed : consumed+idx]) if len(line) > 0 { - if err := c.replay(opts, line); err != nil { + if err := c.replay(opts, h, spool, line); err != nil { if !errors.Is(err, errMalformed) { opts.Diagnostic.Printf("cowork observe: replay: %v\n", err) break @@ -439,11 +525,35 @@ func (c *collector) drain(opts Options, h *health, spool string) { c.setOffset(spool, off+int64(consumed)) } +// spoolEnvelope is what the enforce hook appends: the raw event plus the +// hook-generated request ID that names the decision file. Observe-mode lines +// are bare events and unwrap to an empty rid. +type spoolEnvelope struct { + RID string `json:"rid"` + Event json.RawMessage `json:"event"` +} + +func unwrapEnvelope(line []byte) (rid string, payload []byte) { + var env spoolEnvelope + if err := json.Unmarshal(line, &env); err == nil && len(env.Event) > 0 { + return env.RID, env.Event + } + return "", line +} + +// ridPattern bounds what may name a decision file. The rid comes from inside +// the VM, so anything outside this charset (notably path separators) must be +// rejected before it reaches a filepath.Join. +var ridPattern = regexp.MustCompile(`^[A-Za-z0-9._-]{1,128}$`) + // replay decodes a spool line with the same decoder the Claude Code hook path // uses (Cowork runs the bundled Claude Code CLI, so the formats are identical) -// and forwards it to the daemon socket as agent "cowork". -func (c *collector) replay(opts Options, line []byte) error { - event, err := hookruntime.DecodeClaudeEvent(line, agentName) +// and forwards it to the daemon socket as agent "cowork". Lines carrying a +// request ID get the daemon's verdict written back as a decision file for the +// waiting enforce hook. +func (c *collector) replay(opts Options, h *health, spool string, line []byte) error { + rid, payload := unwrapEnvelope(line) + event, err := hookruntime.DecodeClaudeEvent(payload, agentName) if err != nil { return fmt.Errorf("%w: %v", errMalformed, err) } @@ -455,19 +565,69 @@ func (c *collector) replay(opts Options, line []byte) error { if err != nil { return fmt.Errorf("%w: %v", errMalformed, err) } - return send(opts.SocketPath, req) + res, err := send(opts.SocketPath, req) + if err != nil { + return err + } + if !res.Allowed { + h.denied++ + } + if rid == "" { + return nil // observe hook: nothing is waiting + } + // A failed decision write is logged but not retried: the offset advances + // (the event is ingested) and the waiting hook fails closed on its own. + if !ridPattern.MatchString(rid) { + opts.Diagnostic.Printf("cowork observe: rejected decision request id %q\n", rid) + return nil + } + if err := writeDecision(filepath.Join(filepath.Dir(spool), decisionsDirName), rid, res); err != nil { + opts.Diagnostic.Printf("cowork observe: write decision %s: %v\n", rid, err) + } + return nil } -func send(socketPath string, req localruntime.EvaluateRequest) error { - conn, err := net.DialTimeout("unix", socketPath, 5*time.Second) +// writeDecision renders the verdict in the exact PreToolUse output shape the +// bundled CLI honors and parks it where the enforce hook is polling. +func writeDecision(dir, rid string, res localruntime.EvaluateResult) error { + result := localruntime.ResultFromEvaluateResult(res) + data, err := hookruntime.EncodeClaudeResult(hook.HookPreToolUse.String(), result) if err != nil { return err } + if err := os.MkdirAll(dir, 0o755); err != nil { + return err + } + return writeFileAtomic(filepath.Join(dir, rid+".json"), data, 0o644) +} + +// cleanupOrphanDecisions removes decision files nobody consumed (hook killed, +// session gone). The hook deletes its own file on the happy path. +func cleanupOrphanDecisions(opts Options, sessionDir string) { + files, _ := filepath.Glob(filepath.Join(sessionDir, decisionsDirName, "*.json")) + cutoff := time.Now().Add(-10 * time.Minute) + for _, file := range files { + if info, err := os.Stat(file); err == nil && info.ModTime().Before(cutoff) { + if err := os.Remove(file); err != nil { + opts.Diagnostic.Printf("cowork observe: remove orphan decision %s: %v\n", file, err) + } + } + } +} + +func send(socketPath string, req localruntime.EvaluateRequest) (localruntime.EvaluateResult, error) { + var res localruntime.EvaluateResult + conn, err := net.DialTimeout("unix", socketPath, 5*time.Second) + if err != nil { + return res, err + } defer conn.Close() _ = conn.SetDeadline(time.Now().Add(10 * time.Second)) if err := localruntime.WriteMessage(conn, req); err != nil { - return err + return res, err } - var res localruntime.EvaluateResult - return localruntime.ReadMessage(conn, &res) + if err := localruntime.ReadMessage(conn, &res); err != nil { + return res, err + } + return res, nil } diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index 0a071e8..d4e5e9b 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/kontext-security/kontext-cli/internal/diagnostic" + guardhookruntime "github.com/kontext-security/kontext-cli/internal/guard/hookruntime" "github.com/kontext-security/kontext-cli/internal/localruntime" ) @@ -21,6 +22,13 @@ type fakeDaemon struct { listener net.Listener mu sync.Mutex requests []localruntime.EvaluateRequest + result *localruntime.EvaluateResult +} + +func (d *fakeDaemon) setResult(res localruntime.EvaluateResult) { + d.mu.Lock() + defer d.mu.Unlock() + d.result = &res } func startFakeDaemon(t *testing.T) (*fakeDaemon, string) { @@ -51,8 +59,12 @@ func startFakeDaemon(t *testing.T) (*fakeDaemon, string) { } d.mu.Lock() d.requests = append(d.requests, req) + res := localruntime.EvaluateResult{Type: "result", Decision: "allow", Allowed: true} + if d.result != nil { + res = *d.result + } d.mu.Unlock() - _ = localruntime.WriteMessage(conn, localruntime.EvaluateResult{Type: "result", Allowed: true}) + _ = localruntime.WriteMessage(conn, res) }(conn) } }() @@ -96,7 +108,7 @@ func eventLine(tool string) string { func TestMergeSettingsPreservesExistingContent(t *testing.T) { existing := []byte(`{"model":"opus","hooks":{"PreToolUse":[{"matcher":"Bash","hooks":[{"type":"command","command":"echo hi"}]}],"Stop":[{"hooks":[{"type":"command","command":"echo bye"}]}]}}`) - merged, needed := mergeSettings(existing) + merged, needed := mergeSettings(existing, hookEntry(guardhookruntime.ModeObserve)) if !needed { t.Fatal("mergeSettings reported no write needed for foreign settings") } @@ -120,14 +132,14 @@ func TestMergeSettingsPreservesExistingContent(t *testing.T) { } // A second merge is a no-op. - if _, needed := mergeSettings(merged); needed { + if _, needed := mergeSettings(merged, hookEntry(guardhookruntime.ModeObserve)); needed { t.Fatal("mergeSettings wants to rewrite settings that already carry the hook") } } func TestMergeSettingsFromEmptyAndInvalid(t *testing.T) { for _, existing := range [][]byte{nil, []byte(" "), []byte("{broken")} { - merged, needed := mergeSettings(existing) + merged, needed := mergeSettings(existing, hookEntry(guardhookruntime.ModeObserve)) if !needed { t.Fatalf("mergeSettings(%q) reported no write needed", existing) } @@ -141,6 +153,32 @@ func TestMergeSettingsFromEmptyAndInvalid(t *testing.T) { } } +func TestMergeSettingsReplacesStaleVariantOnModeSwitch(t *testing.T) { + observed, needed := mergeSettings(nil, hookEntry(guardhookruntime.ModeObserve)) + if !needed { + t.Fatal("initial observe merge reported no write needed") + } + + enforced, needed := mergeSettings(observed, hookEntry(guardhookruntime.ModeEnforce)) + if !needed { + t.Fatal("mode switch reported no write needed") + } + var settings map[string]any + if err := json.Unmarshal(enforced, &settings); err != nil { + t.Fatalf("merged settings are not valid JSON: %v", err) + } + pre := settings["hooks"].(map[string]any)["PreToolUse"].([]any) + if len(pre) != 1 { + t.Fatalf("PreToolUse entries = %d, want stale observe variant replaced", len(pre)) + } + if !bytes.Contains(enforced, []byte(decisionsDirName)) { + t.Fatal("enforce variant missing decision poll") + } + if _, needed := mergeSettings(enforced, hookEntry(guardhookruntime.ModeEnforce)); needed { + t.Fatal("repeat enforce merge should be a no-op") + } +} + func TestInjectAndHealthTracking(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) @@ -354,6 +392,86 @@ func TestCollectKeepsUndrainedSpools(t *testing.T) { } } +func TestEnforceReplayWritesDecisionFile(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + daemon.setResult(localruntime.EvaluateResult{ + Type: "result", + Decision: "deny", + Allowed: false, + Reason: "blocked by policy", + }) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, `{"rid":"123-456","event":`+eventLine("Bash")+`}`+"\n") + + c := &collector{offsets: map[string]int64{}} + h := newHealth() + c.collect(opts, h) + + decision, err := os.ReadFile(filepath.Join(sessionDir, decisionsDirName, "123-456.json")) + if err != nil { + t.Fatalf("decision file not written: %v", err) + } + var out map[string]any + if err := json.Unmarshal(decision, &out); err != nil { + t.Fatalf("decision is not valid JSON: %v", err) + } + specific := out["hookSpecificOutput"].(map[string]any) + if specific["permissionDecision"] != "deny" { + t.Fatalf("permissionDecision = %v, want deny", specific["permissionDecision"]) + } + if specific["permissionDecisionReason"] != "blocked by policy" { + t.Fatalf("reason = %v", specific["permissionDecisionReason"]) + } + if h.denied != 1 { + t.Fatalf("denied counter = %d, want 1", h.denied) + } +} + +func TestObserveLinesWriteNoDecision(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.collect(opts, newHealth()) + if _, err := os.Stat(filepath.Join(sessionDir, decisionsDirName)); !os.IsNotExist(err) { + t.Fatalf("decisions dir created for observe-mode line (err=%v)", err) + } +} + +func TestReplayRejectsUnsafeRequestIDs(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, `{"rid":"../escape","event":`+eventLine("Bash")+`}`+"\n") + + c := &collector{offsets: map[string]int64{}} + c.collect(opts, newHealth()) + + // The event is still ingested, but no decision file may be written for a + // rid that could steer the path (".." alone has no separator, but the + // pattern rejects "/" outright). + if got := daemon.toolNames(); len(got) != 1 { + t.Fatalf("replayed tools = %v, want the event ingested", got) + } + if _, err := os.Stat(filepath.Join(sessionDir, decisionsDirName)); !os.IsNotExist(err) { + t.Fatalf("decision written for unsafe rid (err=%v)", err) + } +} + func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { daemon, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) From ab6fe7008efd691a0e345a9769a7b12ec2bcc1be Mon Sep 17 00:00:00 2001 From: tumberger Date: Fri, 12 Jun 2026 16:42:35 +0200 Subject: [PATCH 14/21] docs(coworkobserve): document mode semantics and enforce limits Replace the observe-only caveat with the mode-driven behavior: enforce gates agent-via-CLI actions through the decision round-trip, fail-closed on daemon unavailability, with the honest limits spelled out (injection race window, CLI-timeout kill reads as allow, in-VM bypass out of hook reach). Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 6067c4f..ccd6c3d 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -19,13 +19,27 @@ // daemon's existing localruntime socket as agent "cowork", reusing the same // classify -> store -> managedstream -> ledger path Claude Code uses. // +// # Modes +// +// The posture follows the deployment-level managed.json mode, like every +// other hook edge. In observe mode the hook is fire-and-forget and nothing +// blocks. In enforce mode the hook wraps each event in an envelope with a +// request id, waits up to 10s for the daemon's verdict at +// kontext-cowork-decisions/.json, and emits it verbatim — the bundled +// CLI honors the permissionDecision. No verdict in time means the hook emits +// deny ("Kontext daemon unavailable"): fail-closed, mirroring the sidecar. +// // # Caveats // // Integrity: the spool is written by code running inside the VM, so anything // in the VM (including a prompt-injected agent) can append forged events or // withhold real ones. Cowork-tagged ledger entries are self-reported -// telemetry, not attested records, and this package is observe-only — -// it must not be treated as enforcement. +// telemetry, not attested records. Enforcement gates agent-via-CLI actions +// only: in-VM code that bypasses the CLI was never reachable by a hook, and +// a tool call that lands before settings injection runs unguarded (the +// health heartbeat surfaces such sessions). A hook killed at the CLI's own +// timeout is treated by Claude Code as allow, so fail-closed is best-effort +// within the timeout budget. // // Delivery: events are replayed at-least-once. A replay that fails after a // partial send is retried, so the ledger may very occasionally see a From 2ade3185ceb796e77822a7e0e72fadc1e453002b Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 14:37:34 +0200 Subject: [PATCH 15/21] fix(cowork): write spool/decision files relative to session dir, not guest $HOME The guest $HOME is not the per-session dir: Cowork points the bundled CLI at the per-session .claude via a config-dir override, not via $HOME, so a $HOME-relative spool lands on the ephemeral VM filesystem and never reaches the host collector. The hook's cwd is the session's outputs/ mount, so write the spool and poll the decision file at ../ (the session dir, where .claude lives and where the collector globs) instead. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index ccd6c3d..8dbdb94 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -138,12 +138,15 @@ func DefaultSessionsRoot() string { return filepath.Join(home, "Library", "Application Support", "Claude", "local-agent-mode-sessions") } -// The observe hook reads the full Claude Code hook event from stdin and -// appends it to a spool file in the guest $HOME (the per-session dir, which is -// the host mount: its .claude subdir is where the CLI reads these settings -// from), then exits 0 so the tool is never blocked. $HOME is absolute, so the -// spool lands in the session dir no matter what cwd the hook inherits. -const observeHookCommand = `p=$(cat); printf '%s\n' "$p" >> "$HOME"/` + spoolName + ` 2>/dev/null; true` +// The observe hook reads the full Claude Code hook event from stdin and appends +// it to a spool file in the session dir, then exits 0 so the tool is never +// blocked. The spool path is relative to the hook's cwd, which Cowork sets to +// the session's outputs/ dir (a host mount); `..` is therefore the session dir +// itself, where .claude lives and where the collector globs. NB: the guest +// $HOME is NOT the session dir (Cowork points the CLI at the per-session +// .claude via a config-dir override, not via $HOME), so a $HOME-relative spool +// would land on the ephemeral VM filesystem and never reach the host collector. +const observeHookCommand = `p=$(cat); printf '%s\n' "$p" >> ../` + spoolName + ` 2>/dev/null; true` // denyJSON is the fail-closed verdict the enforce hook emits itself when no // decision arrives in time. Reason mirrors the sidecar's enforce behavior on @@ -162,8 +165,8 @@ const enforceHookCommand = `p=$(cat) deny='` + denyJSON + `' if [ -z "$p" ]; then printf '%s\n' "$deny"; exit 0; fi rid="$$-$(date +%s%N)" -if ! printf '{"rid":"%s","event":%s}\n' "$rid" "$p" >> "$HOME"/` + spoolName + ` 2>/dev/null; then printf '%s\n' "$deny"; exit 0; fi -d="$HOME"/` + decisionsDirName + `/"$rid".json +if ! printf '{"rid":"%s","event":%s}\n' "$rid" "$p" >> ../` + spoolName + ` 2>/dev/null; then printf '%s\n' "$deny"; exit 0; fi +d=../` + decisionsDirName + `/"$rid".json i=0 while [ "$i" -lt 100 ]; do if [ -f "$d" ]; then cat "$d" 2>/dev/null; rm -f "$d" 2>/dev/null; exit 0; fi From bd9a93c69d62707cfae75dc5fff74cae67bce9ff Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 14:37:50 +0200 Subject: [PATCH 16/21] docs(cowork): correct stale cowork_observe references to cowork_enabled The managed.json field was named cowork_enabled, but two comments still named it cowork_observe (which now only exists as the KONTEXT_COWORK_OBSERVE env-var override, deliberately a different name). No behavior change. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 2 +- internal/managedobserve/daemon.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 8dbdb94..370a97d 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -96,7 +96,7 @@ const ( ) // Enabled reports whether Cowork observation is turned on via the environment. -// Managed installs should prefer the cowork_observe field in managed.json; +// Managed installs should prefer the cowork_enabled field in managed.json; // the env var remains as a development/debugging override. func Enabled() bool { switch strings.ToLower(strings.TrimSpace(os.Getenv(EnvEnabled))) { diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index e830663..c230e89 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -102,7 +102,7 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { // Cowork observation runs alongside Claude Code in the same daemon, replaying // in-VM Cowork tool events into the same localruntime socket as agent "cowork". - // Enabled via managed.json (cowork_observe) or the env var override. + // Enabled via managed.json (cowork_enabled) or the env var override. if loadedConfig.Config.CoworkEnabled || coworkobserve.Enabled() { go coworkobserve.Run(ctx, coworkobserve.Options{ SocketPath: socketPath, From 44df17451625e8c460d92d710476f9b7ed59d883 Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 14:39:37 +0200 Subject: [PATCH 17/21] fix(cowork): keep long-running sessions reachable for re-merge and heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The injector keyed liveness on the .claude dir modtime, but its own settings.json write freezes that modtime, so a session looked stale ~3 minutes after the first injection even while it kept running. A mode switch (observe -> enforce) never re-reached such a session — it kept the stale hook — and because the cutoff skipped the dir before recording it, the heartbeat's "never received the hook" warning could not surface it either. Add a spool-modtime fallback: a session is also live if its spool was written recently, the signal the in-VM CLI keeps fresh while driving tool calls. That re-reaches running sessions for a mode-switch re-merge and records them in sessionsSeen so the heartbeat can warn. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 29 +++++++++- internal/coworkobserve/coworkobserve_test.go | 57 ++++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 370a97d..d9104eb 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -352,14 +352,23 @@ func (h *health) logHeartbeat(opts Options) { } // inject merges the mode-appropriate spool hook into settings.json in any -// recent per-session .claude dir that does not yet carry it. +// live per-session .claude dir that does not yet carry the current entry +// (including dirs carrying a stale variant from a previous mode). +// +// Liveness can't key on the .claude dir modtime alone: our own settings.json +// write freezes that modtime, so a session would look stale ~3 minutes after +// the first injection even while it keeps running. A mode switch or a late +// daemon start would then never re-reach the session, and — because the dir +// was skipped before being recorded — the heartbeat would never surface it +// either. So a session is also live if its spool was written recently: that +// is the signal the in-VM CLI keeps fresh as it drives tool calls. func inject(opts Options, h *health) { claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) cutoff := time.Now().Add(-3 * time.Minute) entry := hookEntry(opts.Mode) for _, dir := range claudeDirs { info, err := os.Stat(dir) - if err != nil || info.ModTime().Before(cutoff) { + if err != nil || !sessionLive(filepath.Dir(dir), info, cutoff) { continue } h.sessionsSeen[dir] = true @@ -382,6 +391,22 @@ func inject(opts Options, h *health) { } } +// sessionLive reports whether a per-session dir is worth (re-)injecting into: +// either its .claude dir was touched after cutoff (a fresh session, before our +// settings.json write freezes the dir modtime) or its spool was written after +// cutoff (a long-running session the in-VM CLI is actively driving). The spool +// signal is what keeps a session reachable for a mode-switch re-merge and +// visible to the heartbeat once the dir modtime has frozen. +func sessionLive(sessionDir string, claudeInfo os.FileInfo, cutoff time.Time) bool { + if claudeInfo.ModTime().After(cutoff) { + return true + } + if si, err := os.Stat(filepath.Join(sessionDir, spoolName)); err == nil && si.ModTime().After(cutoff) { + return true + } + return false +} + type collector struct { offsets map[string]int64 statePath string diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index d4e5e9b..2799795 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -492,3 +492,60 @@ func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { t.Fatalf("replayed tools = %v, want [Bash Read]", got) } } + +// A long-running session's .claude dir modtime freezes once we write +// settings.json, so the injector must fall back to the spool modtime to keep +// re-reaching it. Without that, a mode switch never reaches a session older +// than the cutoff, and the session is invisible to the heartbeat. +func TestInjectRemergesLongRunningSessionViaSpoolSignal(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + claudeDir := filepath.Join(sessionDir, ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + + inject(opts, newHealth()) // observe hook into the fresh session + settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) + if err != nil { + t.Fatalf("settings.json not written: %v", err) + } + if bytes.Contains(settings, []byte(decisionsDirName)) { + t.Fatal("observe injection unexpectedly wrote the enforce hook") + } + + // Freeze the dir modtime in the past, as it would be once the session has + // been running a while. With no spool yet, the injector must now treat the + // session as stale and leave it untouched. + stale := time.Now().Add(-10 * time.Minute) + if err := os.Chtimes(claudeDir, stale, stale); err != nil { + t.Fatal(err) + } + h := newHealth() + opts.Mode = guardhookruntime.ModeEnforce + inject(opts, h) + if len(h.sessionsSeen) != 0 { + t.Fatalf("stale dir with no spool was visited: seen=%d", len(h.sessionsSeen)) + } + + // The session produces a spool (the in-VM CLI is firing tool calls). Now + // the injector must re-reach it under the new mode, re-merge the stale + // observe hook into the enforce hook, and record it for the heartbeat. + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + if err := os.Chtimes(claudeDir, stale, stale); err != nil { + t.Fatal(err) + } + h = newHealth() + inject(opts, h) + if len(h.sessionsSeen) != 1 || len(h.hooked) != 1 { + t.Fatalf("live session not re-reached: seen=%d hooked=%d", len(h.sessionsSeen), len(h.hooked)) + } + settings, err = os.ReadFile(filepath.Join(claudeDir, "settings.json")) + if err != nil { + t.Fatal(err) + } + if !bytes.Contains(settings, []byte(decisionsDirName)) { + t.Fatalf("mode switch did not re-merge to the enforce hook: %s", settings) + } +} From 411792aba1afc02cbca7e648fe62281bdcbc31dd Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 14:40:58 +0200 Subject: [PATCH 18/21] fix(cowork): re-stat spool before unlinking to guard the cleanup race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cleanup stat'd a spool, confirmed it drained and idle, then removed it. A hook appending in the window between the check and the remove would write an event into a file about to be unlinked, losing it (the append opens the spool fresh, so it does not keep the inode alive). The window is narrow — it needs an append after a full hour of spool idleness — but the guard is cheap: re- stat right before the remove and skip it if the size or modtime changed. A nil-in-production test seam drives the window deterministically. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 18 +++++++++ internal/coworkobserve/coworkobserve_test.go | 39 ++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index d9104eb..8b92154 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -486,6 +486,11 @@ func (c *collector) collect(opts Options, h *health) { } } +// beforeSpoolRemove, when non-nil, runs inside cleanup after the drained/idle +// checks but before the re-stat guard. It exists only for tests to drive the +// stat/remove race deterministically; production leaves it nil. +var beforeSpoolRemove func(spool string) + // cleanup removes a spool once it is fully drained and idle past the // retention window. If the session wakes up again, the hook recreates the // file and drain starts over from offset zero (the shrink reset). @@ -500,6 +505,19 @@ func (c *collector) cleanup(opts Options, spool string) { if c.offsets[spool] != info.Size() { return // not fully drained yet } + if beforeSpoolRemove != nil { + beforeSpoolRemove(spool) // test seam: simulate a hook appending in the window + } + // Re-stat immediately before unlinking. drain ran earlier this tick, but a + // hook may append between then and now; if the spool grew or its modtime + // advanced since the check above, a fresh (undrained) event just landed, so + // leave the file for the next tick to drain rather than unlink data we + // never replayed. This shrinks — does not fully close — the stat/remove + // window, but the loss it guards against requires an append landing in that + // window after a full hour of spool idleness, so a narrow guard suffices. + if again, err := os.Stat(spool); err != nil || again.Size() != info.Size() || !again.ModTime().Equal(info.ModTime()) { + return + } if err := os.Remove(spool); err != nil { opts.Diagnostic.Printf("cowork observe: remove drained spool %s: %v\n", spool, err) return diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index 2799795..9098ecc 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -549,3 +549,42 @@ func TestInjectRemergesLongRunningSessionViaSpoolSignal(t *testing.T) { t.Fatalf("mode switch did not re-merge to the enforce hook: %s", settings) } } + +// cleanup must not unlink a drained, idle spool if a hook appends a fresh event +// in the window between the drained check and the remove — the re-stat guard +// has to catch the change and leave the file for the next tick to drain. +func TestCleanupSkipsSpoolAppendedInRemoveWindow(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) // offset now == size: fully drained + old := time.Now().Add(-2 * spoolRetention) + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + + // Simulate a hook landing a new event in the stat/remove window. + beforeSpoolRemove = func(s string) { + writeSpool(t, s, eventLine("Read")+"\n") + } + t.Cleanup(func() { beforeSpoolRemove = nil }) + + c.cleanup(opts, spool) + if _, err := os.Stat(spool); err != nil { + t.Fatalf("spool unlinked despite an append in the remove window: %v", err) + } + + // Without further appends, the next cleanup tick removes it normally. + beforeSpoolRemove = nil + c.drain(opts, newHealth(), spool) // drain the event that landed + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + c.cleanup(opts, spool) + if _, err := os.Stat(spool); !os.IsNotExist(err) { + t.Fatalf("drained idle spool not removed on a quiet tick (err=%v)", err) + } +} From 39d47f5961d2228536c13699f6480e41a354c950 Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 14:42:35 +0200 Subject: [PATCH 19/21] test(cowork): integration tests for the generated hook shell commands Run the actual observe/enforce command strings under /bin/sh with cwd set to the session's outputs/ mount, covering the behavior the Go-level tests can't: ../-relative spool append and exit 0 (observe), empty-stdin immediate deny, the enforce decision round-trip (discovering the shell-generated rid from the spooled envelope, then parking the decision the hook polls for), and the fail-closed deny when no decision arrives (loop count substituted 100->3 to keep the test fast, with a guard asserting the real constant is unchanged). Co-Authored-By: Claude Fable 5 --- .../coworkobserve/coworkobserve_shell_test.go | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 internal/coworkobserve/coworkobserve_shell_test.go diff --git a/internal/coworkobserve/coworkobserve_shell_test.go b/internal/coworkobserve/coworkobserve_shell_test.go new file mode 100644 index 0000000..482853c --- /dev/null +++ b/internal/coworkobserve/coworkobserve_shell_test.go @@ -0,0 +1,186 @@ +package coworkobserve + +import ( + "bytes" + "encoding/json" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" +) + +// These tests run the actual generated hook command strings under /bin/sh, the +// way Cowork's bundled CLI does. They guard the shell behavior the rest of the +// suite cannot reach: $-relative path resolution, rid generation, the spool +// append, the decision poll, the stdout shape, and fail-closed timeout. The +// commands write to ../ (the session dir) because the hook's cwd is the +// session's outputs/ mount, so each test runs sh with cwd = /outputs. + +func shellTestDir(t *testing.T) (sessionDir, outputsDir string) { + t.Helper() + sessionDir = t.TempDir() + outputsDir = filepath.Join(sessionDir, "outputs") + if err := os.MkdirAll(outputsDir, 0o755); err != nil { + t.Fatal(err) + } + return sessionDir, outputsDir +} + +func runHook(t *testing.T, command, cwd, stdin string) (string, error) { + t.Helper() + cmd := exec.Command("sh", "-c", command) + cmd.Dir = cwd + cmd.Stdin = strings.NewReader(stdin) + out, err := cmd.Output() + return string(out), err +} + +func TestShellObserveAppendsToSessionSpool(t *testing.T) { + sessionDir, outputsDir := shellTestDir(t) + spool := filepath.Join(sessionDir, spoolName) + + out, err := runHook(t, observeHookCommand, outputsDir, eventLine("Bash")) + if err != nil { + t.Fatalf("observe hook exited non-zero: %v", err) + } + if out != "" { + t.Fatalf("observe hook wrote to stdout: %q", out) + } + data, err := os.ReadFile(spool) + if err != nil { + t.Fatalf("spool not written at the session dir: %v", err) + } + if string(data) != eventLine("Bash")+"\n" { + t.Fatalf("spool = %q, want the event line", data) + } + + // A second invocation appends rather than truncating. + if _, err := runHook(t, observeHookCommand, outputsDir, eventLine("Read")); err != nil { + t.Fatalf("second observe hook exited non-zero: %v", err) + } + data, _ = os.ReadFile(spool) + if want := eventLine("Bash") + "\n" + eventLine("Read") + "\n"; string(data) != want { + t.Fatalf("spool after two events = %q, want %q", data, want) + } +} + +func TestShellEnforceEmptyStdinDenies(t *testing.T) { + sessionDir, outputsDir := shellTestDir(t) + + out, err := runHook(t, enforceHookCommand, outputsDir, "") + if err != nil { + t.Fatalf("enforce hook exited non-zero: %v", err) + } + if out != denyJSON+"\n" { + t.Fatalf("stdout = %q, want the deny verdict", out) + } + if _, err := os.Stat(filepath.Join(sessionDir, spoolName)); !os.IsNotExist(err) { + t.Fatalf("spool created for empty stdin (err=%v)", err) + } +} + +func TestShellEnforceEmitsParkedDecision(t *testing.T) { + sessionDir, outputsDir := shellTestDir(t) + + cmd := exec.Command("sh", "-c", enforceHookCommand) + cmd.Dir = outputsDir + cmd.Stdin = strings.NewReader(eventLine("Bash")) + var stdout bytes.Buffer + cmd.Stdout = &stdout + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + // The rid is generated inside the shell, so we cannot predict it. The hook + // writes the envelope (carrying the rid) before it starts polling, so read + // the spool to learn the rid, then park the decision the hook is waiting on. + spool := filepath.Join(sessionDir, spoolName) + var rid string + for i := 0; i < 100 && rid == ""; i++ { + if line := bytes.TrimSpace(readFileMaybe(spool)); len(line) > 0 { + var env spoolEnvelope + if json.Unmarshal(line, &env) == nil { + rid = env.RID + } + } + if rid == "" { + time.Sleep(20 * time.Millisecond) + } + } + if rid == "" { + _ = cmd.Process.Kill() + t.Fatal("hook never spooled an envelope with a rid") + } + if !ridPattern.MatchString(rid) { + _ = cmd.Process.Kill() + t.Fatalf("shell-generated rid %q does not match ridPattern", rid) + } + + decision := `{"hookSpecificOutput":{"hookEventName":"PreToolUse","permissionDecision":"allow","permissionDecisionReason":"ok"}}` + decDir := filepath.Join(sessionDir, decisionsDirName) + if err := os.MkdirAll(decDir, 0o755); err != nil { + t.Fatal(err) + } + decFile := filepath.Join(decDir, rid+".json") + if err := writeFileAtomic(decFile, []byte(decision), 0o644); err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + select { + case err := <-done: + if err != nil { + t.Fatalf("hook exited non-zero: %v", err) + } + case <-time.After(5 * time.Second): + _ = cmd.Process.Kill() + t.Fatal("hook did not exit after the decision was parked") + } + + if !strings.Contains(stdout.String(), decision) { + t.Fatalf("stdout = %q, want the parked decision", stdout.String()) + } + if _, err := os.Stat(decFile); !os.IsNotExist(err) { + t.Fatalf("hook did not delete its consumed decision file (err=%v)", err) + } +} + +func TestShellEnforceFailsClosedWithoutDecision(t *testing.T) { + // The real poll loop is 100 x 0.1s = 10s. Substitute a 3-iteration loop so + // the test stays fast while exercising the same fail-closed path, and + // assert the real constant still has the 100-count loop so the substitution + // cannot silently drift out of sync. + if !strings.Contains(enforceHookCommand, `"$i" -lt 100`) { + t.Fatal("enforce poll-loop bound changed; update this test's substitution") + } + fast := strings.Replace(enforceHookCommand, `"$i" -lt 100`, `"$i" -lt 3`, 1) + + sessionDir, outputsDir := shellTestDir(t) + out, err := runHook(t, fast, outputsDir, eventLine("Bash")) + if err != nil { + t.Fatalf("hook exited non-zero: %v", err) + } + if out != denyJSON+"\n" { + t.Fatalf("stdout = %q, want the deny verdict", out) + } + // The event is still spooled before the poll loop, so the daemon ingests it + // even though the in-VM call was denied. + data, err := os.ReadFile(filepath.Join(sessionDir, spoolName)) + if err != nil { + t.Fatalf("spool not written before the poll loop: %v", err) + } + if !bytes.Contains(data, []byte(`"rid"`)) { + t.Fatalf("envelope not spooled: %s", data) + } +} + +func readFileMaybe(path string) []byte { + data, err := os.ReadFile(path) + if err != nil { + return nil + } + return data +} From 14267a1da25fc7448841a2a3b8ca1447214dc32c Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 15:26:50 +0200 Subject: [PATCH 20/21] refactor(cowork): re-merge running sessions via a startup pass, not spool modtime MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the per-tick spool-modtime liveness heuristic with the reviewer's suggested approach: re-check/re-merge existing settings when the mode changes. The mode is read once from managed.json at daemon start, so it cannot change without a restart — which means "mode switched observe->enforce while a session runs" and "daemon started after a session" are the same event: the daemon coming up while sessions already exist. So a single forced pass at startup (reinjectExisting) re-merges the configured-mode hook into every recent session, regardless of the frozen .claude dir modtime, before they next act. This closes the gap the spool heuristic left — an idle-but-alive session whose first call after a switch still ran under the old hook — and is simpler: steady-state inject only has to catch newly-created sessions. Bounded to sessions touched within the last day so the pass does not write into abandoned session dirs. inject/reinjectExisting share mergeInto, which records the session as seen before writing so a failed hook still surfaces in the heartbeat. Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 103 ++++++++++++------- internal/coworkobserve/coworkobserve_test.go | 80 ++++++++------ 2 files changed, 110 insertions(+), 73 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 8b92154..8c5c188 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -293,6 +293,14 @@ func Run(ctx context.Context, opts Options) { c := newCollector(opts.StatePath) h := newHealth() + // Re-merge the configured-mode hook into sessions already running when the + // daemon comes up. The mode is read once from managed.json at startup, so a + // mode change (observe -> enforce) and a late daemon start are the same + // event: the daemon starting while sessions already exist. Those sessions' + // .claude dir modtimes froze long ago (our own settings.json write is the + // last thing to touch the dir), so the steady-state inject would never + // re-reach them; this one forced pass does, before they next act. + reinjectExisting(opts, h) ticker := time.NewTicker(opts.PollInterval) defer ticker.Stop() heartbeat := time.NewTicker(heartbeatInterval) @@ -351,60 +359,75 @@ func (h *health) logHeartbeat(opts Options) { } } -// inject merges the mode-appropriate spool hook into settings.json in any -// live per-session .claude dir that does not yet carry the current entry -// (including dirs carrying a stale variant from a previous mode). -// -// Liveness can't key on the .claude dir modtime alone: our own settings.json -// write freezes that modtime, so a session would look stale ~3 minutes after -// the first injection even while it keeps running. A mode switch or a late -// daemon start would then never re-reach the session, and — because the dir -// was skipped before being recorded — the heartbeat would never surface it -// either. So a session is also live if its spool was written recently: that -// is the signal the in-VM CLI keeps fresh as it drives tool calls. +// inject merges the mode-appropriate spool hook into settings.json in each +// per-session .claude dir freshly created within the cutoff. Steady state only +// needs to catch new sessions: a new session's dir modtime is fresh when it +// appears, and once the daemon is running, the mode cannot change without a +// restart (which runs reinjectExisting again). Sessions older than the cutoff +// are handled at startup, not here. func inject(opts Options, h *health) { claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) cutoff := time.Now().Add(-3 * time.Minute) entry := hookEntry(opts.Mode) for _, dir := range claudeDirs { info, err := os.Stat(dir) - if err != nil || !sessionLive(filepath.Dir(dir), info, cutoff) { - continue - } - h.sessionsSeen[dir] = true - settingsPath := filepath.Join(dir, "settings.json") - existing, err := os.ReadFile(settingsPath) - if err != nil && !os.IsNotExist(err) { + if err != nil || info.ModTime().Before(cutoff) { continue } - merged, needed := mergeSettings(existing, entry) - if !needed { - h.hooked[dir] = true - continue // already ours - } - if err := writeFileAtomic(settingsPath, merged, 0o644); err != nil { - opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) - continue + mergeInto(opts, h, dir, entry) + } +} + +// reinjectWindow bounds how far back the startup pass looks. A session whose +// .claude dir has not been touched within this window is treated as abandoned +// and left alone, so the pass does not write into the pile of dead session +// dirs Cowork leaves behind. It is generous because a session's dir modtime +// freezes once we inject, so this is "created/active within the last day", +// not "used in the last day". +const reinjectWindow = 24 * time.Hour + +// reinjectExisting force-merges the configured-mode hook into every recent +// session at daemon startup, ignoring the steady-state freshness cutoff that +// inject uses. This is what reaches a session that was already running when +// the daemon came up — whether because the daemon started late or because the +// mode just changed (which requires a restart). Without it, such a session +// keeps whatever hook it had (a stale observe hook, or none) until it happens +// to start a fresh session. +func reinjectExisting(opts Options, h *health) { + claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) + cutoff := time.Now().Add(-reinjectWindow) + entry := hookEntry(opts.Mode) + for _, dir := range claudeDirs { + info, err := os.Stat(dir) + if err != nil || info.ModTime().Before(cutoff) { + continue // abandoned session dir } - h.hooked[dir] = true - opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) + mergeInto(opts, h, dir, entry) } } -// sessionLive reports whether a per-session dir is worth (re-)injecting into: -// either its .claude dir was touched after cutoff (a fresh session, before our -// settings.json write freezes the dir modtime) or its spool was written after -// cutoff (a long-running session the in-VM CLI is actively driving). The spool -// signal is what keeps a session reachable for a mode-switch re-merge and -// visible to the heartbeat once the dir modtime has frozen. -func sessionLive(sessionDir string, claudeInfo os.FileInfo, cutoff time.Time) bool { - if claudeInfo.ModTime().After(cutoff) { - return true +// mergeInto records the session for the heartbeat, then writes entry into its +// settings.json when it is missing or a stale-mode variant is present. The +// session is recorded as seen before the write so a hook that fails to land +// shows up as seen-but-not-hooked in the heartbeat. +func mergeInto(opts Options, h *health, dir string, entry map[string]any) { + h.sessionsSeen[dir] = true + settingsPath := filepath.Join(dir, "settings.json") + existing, err := os.ReadFile(settingsPath) + if err != nil && !os.IsNotExist(err) { + return } - if si, err := os.Stat(filepath.Join(sessionDir, spoolName)); err == nil && si.ModTime().After(cutoff) { - return true + merged, needed := mergeSettings(existing, entry) + if !needed { + h.hooked[dir] = true + return // already ours } - return false + if err := writeFileAtomic(settingsPath, merged, 0o644); err != nil { + opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) + return + } + h.hooked[dir] = true + opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) } type collector struct { diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index 9098ecc..ee35593 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -493,11 +493,11 @@ func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { } } -// A long-running session's .claude dir modtime freezes once we write -// settings.json, so the injector must fall back to the spool modtime to keep -// re-reaching it. Without that, a mode switch never reaches a session older -// than the cutoff, and the session is invisible to the heartbeat. -func TestInjectRemergesLongRunningSessionViaSpoolSignal(t *testing.T) { +// A session already running when the daemon comes up has a long-frozen .claude +// dir modtime, so the steady-state inject skips it. The startup pass must still +// re-merge it to the configured mode — this is both the late-daemon-start and +// the observe->enforce mode-switch case, since a mode change needs a restart. +func TestReinjectExistingRemergesRunningSessionOnRestart(t *testing.T) { _, socketPath := startFakeDaemon(t) opts := testOptions(t, socketPath) sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") @@ -506,47 +506,61 @@ func TestInjectRemergesLongRunningSessionViaSpoolSignal(t *testing.T) { t.Fatal(err) } - inject(opts, newHealth()) // observe hook into the fresh session - settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) - if err != nil { - t.Fatalf("settings.json not written: %v", err) - } - if bytes.Contains(settings, []byte(decisionsDirName)) { - t.Fatal("observe injection unexpectedly wrote the enforce hook") + // A session carrying the observe hook, its dir modtime long frozen. + observed, _ := mergeSettings(nil, hookEntry(guardhookruntime.ModeObserve)) + if err := os.WriteFile(filepath.Join(claudeDir, "settings.json"), observed, 0o644); err != nil { + t.Fatal(err) } - - // Freeze the dir modtime in the past, as it would be once the session has - // been running a while. With no spool yet, the injector must now treat the - // session as stale and leave it untouched. - stale := time.Now().Add(-10 * time.Minute) + stale := time.Now().Add(-30 * time.Minute) if err := os.Chtimes(claudeDir, stale, stale); err != nil { t.Fatal(err) } - h := newHealth() + + // inject (steady state) skips it — too old. + hTick := newHealth() opts.Mode = guardhookruntime.ModeEnforce - inject(opts, h) - if len(h.sessionsSeen) != 0 { - t.Fatalf("stale dir with no spool was visited: seen=%d", len(h.sessionsSeen)) + inject(opts, hTick) + if len(hTick.sessionsSeen) != 0 { + t.Fatalf("steady-state inject visited a frozen session: seen=%d", len(hTick.sessionsSeen)) } - // The session produces a spool (the in-VM CLI is firing tool calls). Now - // the injector must re-reach it under the new mode, re-merge the stale - // observe hook into the enforce hook, and record it for the heartbeat. - writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") - if err := os.Chtimes(claudeDir, stale, stale); err != nil { - t.Fatal(err) - } - h = newHealth() - inject(opts, h) + // The startup pass re-reaches it, re-merges observe -> enforce, and records + // it so the heartbeat can warn if hooking ever fails. + h := newHealth() + reinjectExisting(opts, h) if len(h.sessionsSeen) != 1 || len(h.hooked) != 1 { - t.Fatalf("live session not re-reached: seen=%d hooked=%d", len(h.sessionsSeen), len(h.hooked)) + t.Fatalf("startup pass did not record the running session: seen=%d hooked=%d", len(h.sessionsSeen), len(h.hooked)) } - settings, err = os.ReadFile(filepath.Join(claudeDir, "settings.json")) + settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) if err != nil { t.Fatal(err) } if !bytes.Contains(settings, []byte(decisionsDirName)) { - t.Fatalf("mode switch did not re-merge to the enforce hook: %s", settings) + t.Fatalf("startup pass did not re-merge to the enforce hook: %s", settings) + } +} + +// The startup pass must not write into the pile of long-dead session dirs +// Cowork leaves behind. +func TestReinjectExistingSkipsAbandonedSessions(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + claudeDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_old", ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + ancient := time.Now().Add(-2 * reinjectWindow) + if err := os.Chtimes(claudeDir, ancient, ancient); err != nil { + t.Fatal(err) + } + + h := newHealth() + reinjectExisting(opts, h) + if len(h.sessionsSeen) != 0 { + t.Fatalf("startup pass touched an abandoned session: seen=%d", len(h.sessionsSeen)) + } + if _, err := os.Stat(filepath.Join(claudeDir, "settings.json")); !os.IsNotExist(err) { + t.Fatal("startup pass wrote settings into an abandoned session dir") } } From 3817633987dcde956e5d01a4097ed9d513a9ad79 Mon Sep 17 00:00:00 2001 From: tumberger Date: Sat, 13 Jun 2026 16:09:09 +0200 Subject: [PATCH 21/21] fix(cowork): report ground-truth health; don't claim unverifiable hooks work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Claude Code's settings watcher only watches dirs that already had a settings file when the session started. Cowork does not pre-create one, so writing the first settings.json into a session whose CLI is already running never takes effect — yet health marked it hooked. That is the silent miss relocated into the health log. Split the signal: - written — wrote (or found current) our hook AND have reason to believe it loads: a new session we seeded before its CLI started, or a re-merge of a hook already present (that dir was watched at session start, so the change hot-reloads). - unverified — wrote a first-time hook onto a possibly-already-running session (the startup pass). The watcher may never load it, so it is best-effort and must not read as working. The heartbeat warns about these explicitly. - confirmed — a spool arrived: ground truth the hook fires. A spool promotes a session from unverified to written. reinjectExisting now trusts only sessions that already carried our hook (the safe mode-switch re-merge); a first-time hook on a pre-existing session is written best-effort but recorded as unverified. Steady-state inject still trusts its writes (it wins the race before the CLI starts). Co-Authored-By: Claude Fable 5 --- internal/coworkobserve/coworkobserve.go | 110 +++++++++++++++---- internal/coworkobserve/coworkobserve_test.go | 50 ++++++++- 2 files changed, 135 insertions(+), 25 deletions(-) diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go index 8c5c188..037ea55 100644 --- a/internal/coworkobserve/coworkobserve.go +++ b/internal/coworkobserve/coworkobserve.go @@ -327,9 +327,21 @@ const heartbeatInterval = 5 * time.Minute // The heartbeat makes "no Cowork activity" distinguishable from "observation // broken" in the daemon diagnostics. type health struct { - sessionsSeen map[string]bool // recent .claude dirs discovered by the injector - hooked map[string]bool // .claude dirs carrying our hook - spooled map[string]bool // session dirs that produced a spool file + sessionsSeen map[string]bool // .claude dirs discovered by the injector + // written is .claude dirs where we wrote (or found current) our hook AND + // have reason to believe it takes effect — the dir was watched at session + // start (a new session we seeded before its CLI started, or a re-merge of a + // hook that was already there). It is not, on its own, proof the hook fires. + written map[string]bool + // unverified is .claude dirs where we wrote a first-time hook onto a session + // whose CLI may already be running. Claude Code's settings watcher only + // watches dirs that had a settings file when the session started, so such a + // write may never load. These are best-effort and must not be reported as + // working until a spool confirms otherwise. + unverified map[string]bool + // spooled is session dirs that produced a spool — ground truth that the hook + // actually fired. A spool promotes a session from unverified to written. + spooled map[string]bool eventsReplayed int64 linesDropped int64 denied int64 @@ -338,24 +350,31 @@ type health struct { func newHealth() *health { return &health{ sessionsSeen: map[string]bool{}, - hooked: map[string]bool{}, + written: map[string]bool{}, + unverified: map[string]bool{}, spooled: map[string]bool{}, } } func (h *health) logHeartbeat(opts Options) { opts.Diagnostic.Printf( - "cowork observe: health: sessions seen=%d hooked=%d spooling=%d events replayed=%d denied=%d malformed dropped=%d\n", - len(h.sessionsSeen), len(h.hooked), len(h.spooled), h.eventsReplayed, h.denied, h.linesDropped, + "cowork observe: health: sessions seen=%d written=%d confirmed=%d unverified=%d events replayed=%d denied=%d malformed dropped=%d\n", + len(h.sessionsSeen), len(h.written), len(h.spooled), len(h.unverified), h.eventsReplayed, h.denied, h.linesDropped, ) - if len(h.sessionsSeen) > len(h.hooked) { + if len(h.unverified) > 0 { + opts.Diagnostic.Printf( + "cowork observe: warning: %d pre-existing session(s) had a hook written but unconfirmed — their CLI likely started before the hook existed, so it will not fire until the session restarts\n", + len(h.unverified), + ) + } + if seen := len(h.sessionsSeen) - len(h.written) - len(h.unverified); seen > 0 { opts.Diagnostic.Printf( "cowork observe: warning: %d session(s) never received the hook (injection raced CLI startup, or the daemon started after the session)\n", - len(h.sessionsSeen)-len(h.hooked), + seen, ) } - if len(h.hooked) > 0 && len(h.spooled) == 0 { - opts.Diagnostic.Printf("cowork observe: warning: hook injected but no spool has appeared; the Cowork session layout or mount may have changed\n") + if len(h.written) > 0 && len(h.spooled) == 0 { + opts.Diagnostic.Printf("cowork observe: warning: hook written but no spool has appeared; the Cowork session layout or mount may have changed\n") } } @@ -374,7 +393,9 @@ func inject(opts Options, h *health) { if err != nil || info.ModTime().Before(cutoff) { continue } - mergeInto(opts, h, dir, entry) + // trustFresh: a brand-new session's CLI has not started yet (we win the + // race against it), so even a first-time hook will be loaded. + mergeInto(opts, h, dir, entry, true) } } @@ -402,32 +423,72 @@ func reinjectExisting(opts Options, h *health) { if err != nil || info.ModTime().Before(cutoff) { continue // abandoned session dir } - mergeInto(opts, h, dir, entry) + // trustFresh is false: this session's CLI may already be running. If we + // are writing the first settings file into its dir, Claude Code's + // watcher never watched that dir (it only watches dirs that had a + // settings file at session start), so the write will not take effect — + // record it as unverified rather than working. + mergeInto(opts, h, dir, entry, false) } } // mergeInto records the session for the heartbeat, then writes entry into its -// settings.json when it is missing or a stale-mode variant is present. The -// session is recorded as seen before the write so a hook that fails to land -// shows up as seen-but-not-hooked in the heartbeat. -func mergeInto(opts Options, h *health, dir string, entry map[string]any) { +// settings.json when it is missing or a stale-mode variant is present. +// +// trustFresh says whether a first-time hook write (no hook of ours was already +// present) can be trusted to take effect. It is true for steady-state inject — +// a new session's CLI has not started, so it will load our settings.json — and +// false for the startup pass, where the session may already be running over a +// dir the settings watcher never watched. A write that updates a hook of ours +// already present is always trusted: that dir was watched at session start, so +// the change hot-reloads. Anything written but not trusted is recorded as +// unverified, and only a spool (see collect) confirms it actually fires. +func mergeInto(opts Options, h *health, dir string, entry map[string]any, trustFresh bool) { h.sessionsSeen[dir] = true settingsPath := filepath.Join(dir, "settings.json") existing, err := os.ReadFile(settingsPath) if err != nil && !os.IsNotExist(err) { return } + hadOurHook := hasOurHook(existing) merged, needed := mergeSettings(existing, entry) if !needed { - h.hooked[dir] = true - return // already ours + h.written[dir] = true + return // already current } if err := writeFileAtomic(settingsPath, merged, 0o644); err != nil { opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) return } - h.hooked[dir] = true - opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) + if hadOurHook || trustFresh { + h.written[dir] = true + opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) + return + } + h.unverified[dir] = true + opts.Diagnostic.Printf("cowork observe: wrote hook into pre-existing session %s (unverified; will not fire until the session restarts unless its dir was already watched)\n", settingsPath) +} + +// hasOurHook reports whether settings.json already carries a PreToolUse hook the +// injector installed (i.e. one whose command references the spool). Used to tell +// a mode-switch re-merge (the dir was watched at session start, so the change +// hot-reloads) apart from a first-time hook on an already-running session. +func hasOurHook(existing []byte) bool { + if len(bytes.TrimSpace(existing)) == 0 { + return false + } + var settings map[string]any + if json.Unmarshal(existing, &settings) != nil { + return false + } + hooks, _ := settings["hooks"].(map[string]any) + pre, _ := hooks["PreToolUse"].([]any) + for _, candidate := range pre { + if entryIsOurs(candidate) { + return true + } + } + return false } type collector struct { @@ -495,7 +556,14 @@ func (c *collector) collect(opts Options, h *health) { live := make(map[string]bool, len(spools)) for _, spool := range spools { live[spool] = true - h.spooled[filepath.Dir(spool)] = true + sessionDir := filepath.Dir(spool) + h.spooled[sessionDir] = true + // A spool is ground truth the hook fired: promote the session out of + // unverified into written, since the watcher clearly did load it. + if claudeDir := filepath.Join(sessionDir, ".claude"); h.unverified[claudeDir] { + delete(h.unverified, claudeDir) + h.written[claudeDir] = true + } c.drain(opts, h, spool) c.cleanup(opts, spool) cleanupOrphanDecisions(opts, filepath.Dir(spool)) diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go index ee35593..0044529 100644 --- a/internal/coworkobserve/coworkobserve_test.go +++ b/internal/coworkobserve/coworkobserve_test.go @@ -190,8 +190,8 @@ func TestInjectAndHealthTracking(t *testing.T) { h := newHealth() inject(opts, h) - if len(h.sessionsSeen) != 1 || len(h.hooked) != 1 { - t.Fatalf("seen=%d hooked=%d, want 1/1", len(h.sessionsSeen), len(h.hooked)) + if len(h.sessionsSeen) != 1 || len(h.written) != 1 { + t.Fatalf("seen=%d written=%d, want 1/1", len(h.sessionsSeen), len(h.written)) } settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) if err != nil { @@ -528,8 +528,8 @@ func TestReinjectExistingRemergesRunningSessionOnRestart(t *testing.T) { // it so the heartbeat can warn if hooking ever fails. h := newHealth() reinjectExisting(opts, h) - if len(h.sessionsSeen) != 1 || len(h.hooked) != 1 { - t.Fatalf("startup pass did not record the running session: seen=%d hooked=%d", len(h.sessionsSeen), len(h.hooked)) + if len(h.sessionsSeen) != 1 || len(h.written) != 1 { + t.Fatalf("startup pass did not record the running session: seen=%d written=%d", len(h.sessionsSeen), len(h.written)) } settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) if err != nil { @@ -564,6 +564,48 @@ func TestReinjectExistingSkipsAbandonedSessions(t *testing.T) { } } +// When the startup pass writes a first-time hook onto a session that may +// already be running (no hook of ours was there), it cannot know the CLI's +// settings watcher ever watched the dir, so it must record the session as +// unverified, not working. A later spool is ground truth and promotes it. +func TestReinjectExistingFlagsFreshHookUnverifiedUntilSpooled(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_pre") + claudeDir := filepath.Join(sessionDir, ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + // Recent (within the window) but with no hook of ours: the daemon was down + // when this session's CLI started, so we are creating its first settings + // file only now. + stale := time.Now().Add(-30 * time.Minute) + if err := os.Chtimes(claudeDir, stale, stale); err != nil { + t.Fatal(err) + } + + h := newHealth() + reinjectExisting(opts, h) + if len(h.written) != 0 { + t.Fatalf("first-time hook on a pre-existing session was trusted: written=%d", len(h.written)) + } + if len(h.unverified) != 1 { + t.Fatalf("first-time hook not flagged unverified: unverified=%d", len(h.unverified)) + } + // Still written best-effort, in case the dir happens to be watched. + if _, err := os.Stat(filepath.Join(claudeDir, "settings.json")); err != nil { + t.Fatalf("best-effort settings.json not written: %v", err) + } + + // A spool confirms the hook actually fires: promote to written, clear unverified. + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + c := &collector{offsets: map[string]int64{}} + c.collect(opts, h) + if len(h.unverified) != 0 || !h.written[claudeDir] { + t.Fatalf("spool did not confirm the session: unverified=%d written=%v", len(h.unverified), h.written[claudeDir]) + } +} + // cleanup must not unlink a drained, idle spool if a hook appends a fresh event // in the window between the drained check and the remove — the re-stat guard // has to catch the change and leave the file for the next tick to drain.