diff --git a/cmd/kontext/main.go b/cmd/kontext/main.go index 71f38a7..16d7427 100644 --- a/cmd/kontext/main.go +++ b/cmd/kontext/main.go @@ -27,6 +27,7 @@ import ( "github.com/kontext-security/kontext-cli/internal/update" _ "github.com/kontext-security/kontext-cli/internal/agent/claude" + _ "github.com/kontext-security/kontext-cli/internal/agent/cowork" ) var version = "dev" diff --git a/internal/agent/cowork/cowork.go b/internal/agent/cowork/cowork.go new file mode 100644 index 0000000..3b376df --- /dev/null +++ b/internal/agent/cowork/cowork.go @@ -0,0 +1,31 @@ +// Package cowork registers the Cowork agent adapter. Claude Cowork runs the same +// bundled Claude Code CLI inside a per-session VM, so its hook payloads use the +// identical Claude Code hook-input format. The adapter therefore reuses the +// Claude decoder/encoder and only differs in its name, which is recorded as the +// session's agent ("cowork") to distinguish Cowork activity from Claude Code in +// the ledger and dashboard. +package cowork + +import ( + "github.com/kontext-security/kontext-cli/internal/agent" + "github.com/kontext-security/kontext-cli/internal/hook" + "github.com/kontext-security/kontext-cli/internal/hookruntime" +) + +func init() { + agent.Register(&Cowork{}) +} + +type Cowork struct{} + +func (c *Cowork) Name() string { return "cowork" } + +func (c *Cowork) Aliases() []string { return []string{"claude-cowork"} } + +func (c *Cowork) DecodeHookInput(input []byte) (hook.Event, error) { + return hookruntime.DecodeClaudeEvent(input, c.Name()) +} + +func (c *Cowork) EncodeHookResult(event hook.Event, result hook.Result) ([]byte, error) { + return hookruntime.EncodeClaudeResult(event.HookName.String(), result) +} diff --git a/internal/coworkobserve/coworkobserve.go b/internal/coworkobserve/coworkobserve.go new file mode 100644 index 0000000..037ea55 --- /dev/null +++ b/internal/coworkobserve/coworkobserve.go @@ -0,0 +1,784 @@ +// Package coworkobserve adds Claude Cowork observation to the managed-observe +// daemon, reusing the existing Claude Code pipeline. +// +// Cowork runs the bundled Claude Code CLI inside a per-session VM whose root +// filesystem is rebuilt on every boot, so hooks cannot be baked into the image. +// What does survive is the per-session CLAUDE config dir, which Cowork mounts +// from the host at: +// +// ~/Library/Application Support/Claude/local-agent-mode-sessions///local_/.claude/ +// +// This is the guest's $HOME/.claude — the "user" settings tier, which Cowork's +// CLI loads at startup. Two host-side loops run inside the daemon: +// +// - injector: writes settings.json with a PreToolUse command hook into each +// new per-session .claude dir before the in-VM CLI initializes. The hook +// appends every tool event to a host-mounted spool file in the session dir +// (no in-VM network needed — the dir is a host mount). +// - collector: tails those spool files and replays each event into the +// daemon's existing localruntime socket as agent "cowork", reusing the same +// classify -> store -> managedstream -> ledger path Claude Code uses. +// +// # Modes +// +// The posture follows the deployment-level managed.json mode, like every +// other hook edge. In observe mode the hook is fire-and-forget and nothing +// blocks. In enforce mode the hook wraps each event in an envelope with a +// request id, waits up to 10s for the daemon's verdict at +// kontext-cowork-decisions/.json, and emits it verbatim — the bundled +// CLI honors the permissionDecision. No verdict in time means the hook emits +// deny ("Kontext daemon unavailable"): fail-closed, mirroring the sidecar. +// +// # Caveats +// +// Integrity: the spool is written by code running inside the VM, so anything +// in the VM (including a prompt-injected agent) can append forged events or +// withhold real ones. Cowork-tagged ledger entries are self-reported +// telemetry, not attested records. Enforcement gates agent-via-CLI actions +// only: in-VM code that bypasses the CLI was never reachable by a hook, and +// a tool call that lands before settings injection runs unguarded (the +// health heartbeat surfaces such sessions). A hook killed at the CLI's own +// timeout is treated by Claude Code as allow, so fail-closed is best-effort +// within the timeout budget. +// +// Delivery: events are replayed at-least-once. A replay that fails after a +// partial send is retried, so the ledger may very occasionally see a +// duplicate; it never silently drops a complete spool line. +// +// Coupling: the session-dir layout, the host mount, and the user-tier +// settings load are undocumented Cowork internals; an update can break +// observation without an error. The health heartbeat (sessions seen vs +// hooked vs spooling) exists so that breakage is visible in diagnostics. +// +// Deployment: run the daemon in the session user's context (LaunchAgent, +// not a root LaunchDaemon) — the injector writes settings.json into the +// user's ~/Library and root-owned files there may confuse Cowork or the +// VM mount's UID mapping. +package coworkobserve + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/kontext-security/kontext-cli/internal/diagnostic" + guardhookruntime "github.com/kontext-security/kontext-cli/internal/guard/hookruntime" + "github.com/kontext-security/kontext-cli/internal/hook" + "github.com/kontext-security/kontext-cli/internal/hookruntime" + "github.com/kontext-security/kontext-cli/internal/localruntime" +) + +const ( + // EnvEnabled enables Cowork observation when set to a truthy value. + EnvEnabled = "KONTEXT_COWORK_OBSERVE" + // EnvSessionsRoot overrides the Cowork sessions root for testing. + EnvSessionsRoot = "KONTEXT_COWORK_SESSIONS_ROOT" + + spoolName = "kontext-cowork-events.jsonl" + decisionsDirName = "kontext-cowork-decisions" + settingsMark = spoolName // hook commands containing this string are ours + agentName = "cowork" + + // Enforce-hook budget, mirroring the sidecar conventions: the hook waits + // up to decisionWait for the daemon's verdict (hookConnDeadline is 10s on + // the socket side) inside claudemanaged's default 20s hook timeout. + enforceHookTimeout = 20 + observeHookTimeout = 5 +) + +// Enabled reports whether Cowork observation is turned on via the environment. +// Managed installs should prefer the cowork_enabled field in managed.json; +// the env var remains as a development/debugging override. +func Enabled() bool { + switch strings.ToLower(strings.TrimSpace(os.Getenv(EnvEnabled))) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} + +// Options configures the Cowork observer loops. +type Options struct { + // SocketPath is the daemon's localruntime socket (where events are replayed). + SocketPath string + // SessionsRoot is the Cowork sessions root; defaults to the standard path. + SessionsRoot string + // StatePath persists collector spool offsets across daemon restarts so a + // restart does not re-replay already-ingested events as duplicate ledger + // entries. Empty means offsets are kept in memory only. + StatePath string + // Mode selects the injected hook: observe installs the fire-and-forget + // spool append, enforce installs the decision round-trip that blocks the + // tool until the daemon's verdict lands. Defaults to observe. + Mode guardhookruntime.Mode + // PollInterval controls how often the loops scan; defaults to 250ms. + PollInterval time.Duration + Diagnostic diagnostic.Logger +} + +// DefaultSessionsRoot returns the standard Cowork sessions root on macOS. +func DefaultSessionsRoot() string { + if override := strings.TrimSpace(os.Getenv(EnvSessionsRoot)); override != "" { + return override + } + home, err := os.UserHomeDir() + if err != nil { + return "" + } + return filepath.Join(home, "Library", "Application Support", "Claude", "local-agent-mode-sessions") +} + +// The observe hook reads the full Claude Code hook event from stdin and appends +// it to a spool file in the session dir, then exits 0 so the tool is never +// blocked. The spool path is relative to the hook's cwd, which Cowork sets to +// the session's outputs/ dir (a host mount); `..` is therefore the session dir +// itself, where .claude lives and where the collector globs. NB: the guest +// $HOME is NOT the session dir (Cowork points the CLI at the per-session +// .claude via a config-dir override, not via $HOME), so a $HOME-relative spool +// would land on the ephemeral VM filesystem and never reach the host collector. +const observeHookCommand = `p=$(cat); printf '%s\n' "$p" >> ../` + spoolName + ` 2>/dev/null; true` + +// denyJSON is the fail-closed verdict the enforce hook emits itself when no +// decision arrives in time. Reason mirrors the sidecar's enforce behavior on +// daemon unavailability (guard/hookruntime). +const denyJSON = `{"hookSpecificOutput":{"hookEventName":"PreToolUse","permissionDecision":"deny","permissionDecisionReason":"Kontext daemon unavailable"}}` + +// The enforce hook wraps the event in an envelope carrying a hook-generated +// request ID (the shell cannot parse tool_use_id out of the JSON), appends it +// to the spool, then polls for the daemon-rendered decision file and emits it +// verbatim — so the CLI sees exactly the permissionDecision the policy engine +// produced. No decision within 10s (100 x 0.1s) means deny: fail-closed, same +// as the sidecar when the daemon is unreachable. The one gap we cannot close +// is the CLI killing the hook at its 20s timeout, which Claude Code treats as +// allow. +const enforceHookCommand = `p=$(cat) +deny='` + denyJSON + `' +if [ -z "$p" ]; then printf '%s\n' "$deny"; exit 0; fi +rid="$$-$(date +%s%N)" +if ! printf '{"rid":"%s","event":%s}\n' "$rid" "$p" >> ../` + spoolName + ` 2>/dev/null; then printf '%s\n' "$deny"; exit 0; fi +d=../` + decisionsDirName + `/"$rid".json +i=0 +while [ "$i" -lt 100 ]; do + if [ -f "$d" ]; then cat "$d" 2>/dev/null; rm -f "$d" 2>/dev/null; exit 0; fi + i=$((i+1)); sleep 0.1 +done +printf '%s\n' "$deny"` + +func hookEntry(mode guardhookruntime.Mode) map[string]any { + command, timeout := observeHookCommand, observeHookTimeout + if mode == guardhookruntime.ModeEnforce { + command, timeout = enforceHookCommand, enforceHookTimeout + } + return map[string]any{ + "matcher": "*", + "hooks": []any{ + map[string]any{"type": "command", "command": command, "timeout": timeout}, + }, + } +} + +// mergeSettings adds the given spool hook entry to existing settings.json +// content, preserving every other setting and any hooks Cowork or the user put +// there. Stale variants of our own entry (e.g. after a mode switch) are +// replaced. Existing content that is not valid JSON is replaced wholesale — +// the in-VM CLI could not have parsed it either. The second return reports +// whether a write is needed (false when the current entry is already present). +func mergeSettings(existing []byte, entry map[string]any) ([]byte, bool) { + settings := map[string]any{} + if len(bytes.TrimSpace(existing)) > 0 { + if err := json.Unmarshal(existing, &settings); err != nil { + settings = map[string]any{} + } + } + hooks, _ := settings["hooks"].(map[string]any) + if hooks == nil { + hooks = map[string]any{} + } + pre, _ := hooks["PreToolUse"].([]any) + wantJSON, err := json.Marshal(entry) + if err != nil { + return nil, false + } + kept := make([]any, 0, len(pre)+1) + current := false + for _, candidate := range pre { + if !entryIsOurs(candidate) { + kept = append(kept, candidate) + continue + } + if got, err := json.Marshal(candidate); err == nil && bytes.Equal(got, wantJSON) && !current { + current = true + kept = append(kept, candidate) + } + // stale or duplicate variants of our entry are dropped + } + if current && len(kept) == len(pre) { + return nil, false + } + if !current { + kept = append(kept, entry) + } + hooks["PreToolUse"] = kept + settings["hooks"] = hooks + data, err := json.Marshal(settings) + if err != nil { + return nil, false + } + return data, true +} + +// entryIsOurs reports whether a PreToolUse matcher group was installed by the +// injector (any of its command hooks references the spool file). +func entryIsOurs(candidate any) bool { + m, ok := candidate.(map[string]any) + if !ok { + return false + } + hooks, _ := m["hooks"].([]any) + for _, h := range hooks { + hm, ok := h.(map[string]any) + if !ok { + continue + } + if command, _ := hm["command"].(string); strings.Contains(command, settingsMark) { + return true + } + } + return false +} + +// writeFileAtomic writes via temp file + rename so the in-VM CLI can never +// observe a half-written settings.json at startup. +func writeFileAtomic(path string, data []byte, perm os.FileMode) error { + tmp := path + ".kontext-tmp" + if err := os.WriteFile(tmp, data, perm); err != nil { + return err + } + if err := os.Rename(tmp, path); err != nil { + _ = os.Remove(tmp) + return err + } + return nil +} + +// Run starts the injector and collector loops and blocks until ctx is cancelled. +func Run(ctx context.Context, opts Options) { + if opts.SessionsRoot == "" { + opts.SessionsRoot = DefaultSessionsRoot() + } + if opts.PollInterval <= 0 { + // Enforce holds every tool call for the spool round-trip, so scan + // tighter than the observe default. + if opts.Mode == guardhookruntime.ModeEnforce { + opts.PollInterval = 100 * time.Millisecond + } else { + opts.PollInterval = 250 * time.Millisecond + } + } + if opts.SessionsRoot == "" { + opts.Diagnostic.Printf("cowork observe: no sessions root; disabled\n") + return + } + opts.Diagnostic.Printf("cowork observe: watching %s\n", opts.SessionsRoot) + + c := newCollector(opts.StatePath) + h := newHealth() + // Re-merge the configured-mode hook into sessions already running when the + // daemon comes up. The mode is read once from managed.json at startup, so a + // mode change (observe -> enforce) and a late daemon start are the same + // event: the daemon starting while sessions already exist. Those sessions' + // .claude dir modtimes froze long ago (our own settings.json write is the + // last thing to touch the dir), so the steady-state inject would never + // re-reach them; this one forced pass does, before they next act. + reinjectExisting(opts, h) + ticker := time.NewTicker(opts.PollInterval) + defer ticker.Stop() + heartbeat := time.NewTicker(heartbeatInterval) + defer heartbeat.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + inject(opts, h) + c.collect(opts, h) + c.saveOffsets(opts) + case <-heartbeat.C: + h.logHeartbeat(opts) + } + } +} + +const heartbeatInterval = 5 * time.Minute + +// health tracks whether observation is actually working. The whole mechanism +// depends on undocumented Cowork internals (session dir layout, host mount, +// settings tier), so a Cowork update can break it without any error surfacing. +// The heartbeat makes "no Cowork activity" distinguishable from "observation +// broken" in the daemon diagnostics. +type health struct { + sessionsSeen map[string]bool // .claude dirs discovered by the injector + // written is .claude dirs where we wrote (or found current) our hook AND + // have reason to believe it takes effect — the dir was watched at session + // start (a new session we seeded before its CLI started, or a re-merge of a + // hook that was already there). It is not, on its own, proof the hook fires. + written map[string]bool + // unverified is .claude dirs where we wrote a first-time hook onto a session + // whose CLI may already be running. Claude Code's settings watcher only + // watches dirs that had a settings file when the session started, so such a + // write may never load. These are best-effort and must not be reported as + // working until a spool confirms otherwise. + unverified map[string]bool + // spooled is session dirs that produced a spool — ground truth that the hook + // actually fired. A spool promotes a session from unverified to written. + spooled map[string]bool + eventsReplayed int64 + linesDropped int64 + denied int64 +} + +func newHealth() *health { + return &health{ + sessionsSeen: map[string]bool{}, + written: map[string]bool{}, + unverified: map[string]bool{}, + spooled: map[string]bool{}, + } +} + +func (h *health) logHeartbeat(opts Options) { + opts.Diagnostic.Printf( + "cowork observe: health: sessions seen=%d written=%d confirmed=%d unverified=%d events replayed=%d denied=%d malformed dropped=%d\n", + len(h.sessionsSeen), len(h.written), len(h.spooled), len(h.unverified), h.eventsReplayed, h.denied, h.linesDropped, + ) + if len(h.unverified) > 0 { + opts.Diagnostic.Printf( + "cowork observe: warning: %d pre-existing session(s) had a hook written but unconfirmed — their CLI likely started before the hook existed, so it will not fire until the session restarts\n", + len(h.unverified), + ) + } + if seen := len(h.sessionsSeen) - len(h.written) - len(h.unverified); seen > 0 { + opts.Diagnostic.Printf( + "cowork observe: warning: %d session(s) never received the hook (injection raced CLI startup, or the daemon started after the session)\n", + seen, + ) + } + if len(h.written) > 0 && len(h.spooled) == 0 { + opts.Diagnostic.Printf("cowork observe: warning: hook written but no spool has appeared; the Cowork session layout or mount may have changed\n") + } +} + +// inject merges the mode-appropriate spool hook into settings.json in each +// per-session .claude dir freshly created within the cutoff. Steady state only +// needs to catch new sessions: a new session's dir modtime is fresh when it +// appears, and once the daemon is running, the mode cannot change without a +// restart (which runs reinjectExisting again). Sessions older than the cutoff +// are handled at startup, not here. +func inject(opts Options, h *health) { + claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) + cutoff := time.Now().Add(-3 * time.Minute) + entry := hookEntry(opts.Mode) + for _, dir := range claudeDirs { + info, err := os.Stat(dir) + if err != nil || info.ModTime().Before(cutoff) { + continue + } + // trustFresh: a brand-new session's CLI has not started yet (we win the + // race against it), so even a first-time hook will be loaded. + mergeInto(opts, h, dir, entry, true) + } +} + +// reinjectWindow bounds how far back the startup pass looks. A session whose +// .claude dir has not been touched within this window is treated as abandoned +// and left alone, so the pass does not write into the pile of dead session +// dirs Cowork leaves behind. It is generous because a session's dir modtime +// freezes once we inject, so this is "created/active within the last day", +// not "used in the last day". +const reinjectWindow = 24 * time.Hour + +// reinjectExisting force-merges the configured-mode hook into every recent +// session at daemon startup, ignoring the steady-state freshness cutoff that +// inject uses. This is what reaches a session that was already running when +// the daemon came up — whether because the daemon started late or because the +// mode just changed (which requires a restart). Without it, such a session +// keeps whatever hook it had (a stale observe hook, or none) until it happens +// to start a fresh session. +func reinjectExisting(opts Options, h *health) { + claudeDirs, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", ".claude")) + cutoff := time.Now().Add(-reinjectWindow) + entry := hookEntry(opts.Mode) + for _, dir := range claudeDirs { + info, err := os.Stat(dir) + if err != nil || info.ModTime().Before(cutoff) { + continue // abandoned session dir + } + // trustFresh is false: this session's CLI may already be running. If we + // are writing the first settings file into its dir, Claude Code's + // watcher never watched that dir (it only watches dirs that had a + // settings file at session start), so the write will not take effect — + // record it as unverified rather than working. + mergeInto(opts, h, dir, entry, false) + } +} + +// mergeInto records the session for the heartbeat, then writes entry into its +// settings.json when it is missing or a stale-mode variant is present. +// +// trustFresh says whether a first-time hook write (no hook of ours was already +// present) can be trusted to take effect. It is true for steady-state inject — +// a new session's CLI has not started, so it will load our settings.json — and +// false for the startup pass, where the session may already be running over a +// dir the settings watcher never watched. A write that updates a hook of ours +// already present is always trusted: that dir was watched at session start, so +// the change hot-reloads. Anything written but not trusted is recorded as +// unverified, and only a spool (see collect) confirms it actually fires. +func mergeInto(opts Options, h *health, dir string, entry map[string]any, trustFresh bool) { + h.sessionsSeen[dir] = true + settingsPath := filepath.Join(dir, "settings.json") + existing, err := os.ReadFile(settingsPath) + if err != nil && !os.IsNotExist(err) { + return + } + hadOurHook := hasOurHook(existing) + merged, needed := mergeSettings(existing, entry) + if !needed { + h.written[dir] = true + return // already current + } + if err := writeFileAtomic(settingsPath, merged, 0o644); err != nil { + opts.Diagnostic.Printf("cowork observe: inject %s: %v\n", settingsPath, err) + return + } + if hadOurHook || trustFresh { + h.written[dir] = true + opts.Diagnostic.Printf("cowork observe: injected hook into %s\n", settingsPath) + return + } + h.unverified[dir] = true + opts.Diagnostic.Printf("cowork observe: wrote hook into pre-existing session %s (unverified; will not fire until the session restarts unless its dir was already watched)\n", settingsPath) +} + +// hasOurHook reports whether settings.json already carries a PreToolUse hook the +// injector installed (i.e. one whose command references the spool). Used to tell +// a mode-switch re-merge (the dir was watched at session start, so the change +// hot-reloads) apart from a first-time hook on an already-running session. +func hasOurHook(existing []byte) bool { + if len(bytes.TrimSpace(existing)) == 0 { + return false + } + var settings map[string]any + if json.Unmarshal(existing, &settings) != nil { + return false + } + hooks, _ := settings["hooks"].(map[string]any) + pre, _ := hooks["PreToolUse"].([]any) + for _, candidate := range pre { + if entryIsOurs(candidate) { + return true + } + } + return false +} + +type collector struct { + offsets map[string]int64 + statePath string + dirty bool +} + +func newCollector(statePath string) *collector { + c := &collector{offsets: map[string]int64{}, statePath: statePath} + if statePath == "" { + return c + } + data, err := os.ReadFile(statePath) + if err != nil { + return c + } + if err := json.Unmarshal(data, &c.offsets); err != nil || c.offsets == nil { + c.offsets = map[string]int64{} + } + return c +} + +func (c *collector) setOffset(spool string, off int64) { + if c.offsets[spool] == off { + return + } + c.offsets[spool] = off + c.dirty = true +} + +// saveOffsets persists the offset map after ticks that changed it, via temp +// file + rename so a crash mid-write cannot corrupt the state. +func (c *collector) saveOffsets(opts Options) { + if !c.dirty || c.statePath == "" { + return + } + data, err := json.Marshal(c.offsets) + if err != nil { + return + } + if err := os.MkdirAll(filepath.Dir(c.statePath), 0o700); err != nil { + opts.Diagnostic.Printf("cowork observe: save offsets: %v\n", err) + return + } + tmp := c.statePath + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + opts.Diagnostic.Printf("cowork observe: save offsets: %v\n", err) + return + } + if err := os.Rename(tmp, c.statePath); err != nil { + opts.Diagnostic.Printf("cowork observe: save offsets: %v\n", err) + return + } + c.dirty = false +} + +// spoolRetention is how long a fully-drained spool may sit idle before the +// collector deletes it. Spools hold raw, unredacted tool inputs, so they +// should not accumulate on disk indefinitely. +const spoolRetention = time.Hour + +func (c *collector) collect(opts Options, h *health) { + spools, _ := filepath.Glob(filepath.Join(opts.SessionsRoot, "*", "*", "local_*", spoolName)) + live := make(map[string]bool, len(spools)) + for _, spool := range spools { + live[spool] = true + sessionDir := filepath.Dir(spool) + h.spooled[sessionDir] = true + // A spool is ground truth the hook fired: promote the session out of + // unverified into written, since the watcher clearly did load it. + if claudeDir := filepath.Join(sessionDir, ".claude"); h.unverified[claudeDir] { + delete(h.unverified, claudeDir) + h.written[claudeDir] = true + } + c.drain(opts, h, spool) + c.cleanup(opts, spool) + cleanupOrphanDecisions(opts, filepath.Dir(spool)) + } + // Cowork deleted the session dir; its offset entry is dead weight. + for spool := range c.offsets { + if !live[spool] { + delete(c.offsets, spool) + c.dirty = true + } + } +} + +// beforeSpoolRemove, when non-nil, runs inside cleanup after the drained/idle +// checks but before the re-stat guard. It exists only for tests to drive the +// stat/remove race deterministically; production leaves it nil. +var beforeSpoolRemove func(spool string) + +// cleanup removes a spool once it is fully drained and idle past the +// retention window. If the session wakes up again, the hook recreates the +// file and drain starts over from offset zero (the shrink reset). +func (c *collector) cleanup(opts Options, spool string) { + info, err := os.Stat(spool) + if err != nil { + return + } + if time.Since(info.ModTime()) < spoolRetention { + return + } + if c.offsets[spool] != info.Size() { + return // not fully drained yet + } + if beforeSpoolRemove != nil { + beforeSpoolRemove(spool) // test seam: simulate a hook appending in the window + } + // Re-stat immediately before unlinking. drain ran earlier this tick, but a + // hook may append between then and now; if the spool grew or its modtime + // advanced since the check above, a fresh (undrained) event just landed, so + // leave the file for the next tick to drain rather than unlink data we + // never replayed. This shrinks — does not fully close — the stat/remove + // window, but the loss it guards against requires an append landing in that + // window after a full hour of spool idleness, so a narrow guard suffices. + if again, err := os.Stat(spool); err != nil || again.Size() != info.Size() || !again.ModTime().Equal(info.ModTime()) { + return + } + if err := os.Remove(spool); err != nil { + opts.Diagnostic.Printf("cowork observe: remove drained spool %s: %v\n", spool, err) + return + } + delete(c.offsets, spool) + c.dirty = true + opts.Diagnostic.Printf("cowork observe: removed drained spool %s\n", spool) +} + +// errMalformed marks spool lines that can never replay successfully; drain +// drops them. Any other replay error is treated as transient (the daemon +// socket hiccuped), so drain stops before the failed line and retries it on +// the next tick. Retrying after a partial send can deliver an event twice; +// for an audit trail, at-least-once beats silently dropping it. +var errMalformed = errors.New("malformed spool line") + +// drain replays complete lines appended since the last tick. The in-VM hook +// may be mid-append while we read, so a trailing chunk without a newline is +// left in place — the offset only ever advances past complete, handled lines. +func (c *collector) drain(opts Options, h *health, spool string) { + f, err := os.Open(spool) + if err != nil { + return + } + defer f.Close() + info, err := f.Stat() + if err != nil { + return + } + off := c.offsets[spool] + if info.Size() < off { + off = 0 // spool was recreated; start over on the new file + } + if info.Size() == off { + return + } + if _, err := f.Seek(off, io.SeekStart); err != nil { + return + } + data, err := io.ReadAll(f) + if err != nil { + return + } + consumed := 0 + for { + idx := bytes.IndexByte(data[consumed:], '\n') + if idx < 0 { + break // partial line still being appended; retry next tick + } + line := bytes.TrimSpace(data[consumed : consumed+idx]) + if len(line) > 0 { + if err := c.replay(opts, h, spool, line); err != nil { + if !errors.Is(err, errMalformed) { + opts.Diagnostic.Printf("cowork observe: replay: %v\n", err) + break + } + h.linesDropped++ + opts.Diagnostic.Printf("cowork observe: %v\n", err) + } else { + h.eventsReplayed++ + } + } + consumed += idx + 1 + } + c.setOffset(spool, off+int64(consumed)) +} + +// spoolEnvelope is what the enforce hook appends: the raw event plus the +// hook-generated request ID that names the decision file. Observe-mode lines +// are bare events and unwrap to an empty rid. +type spoolEnvelope struct { + RID string `json:"rid"` + Event json.RawMessage `json:"event"` +} + +func unwrapEnvelope(line []byte) (rid string, payload []byte) { + var env spoolEnvelope + if err := json.Unmarshal(line, &env); err == nil && len(env.Event) > 0 { + return env.RID, env.Event + } + return "", line +} + +// ridPattern bounds what may name a decision file. The rid comes from inside +// the VM, so anything outside this charset (notably path separators) must be +// rejected before it reaches a filepath.Join. +var ridPattern = regexp.MustCompile(`^[A-Za-z0-9._-]{1,128}$`) + +// replay decodes a spool line with the same decoder the Claude Code hook path +// uses (Cowork runs the bundled Claude Code CLI, so the formats are identical) +// and forwards it to the daemon socket as agent "cowork". Lines carrying a +// request ID get the daemon's verdict written back as a decision file for the +// waiting enforce hook. +func (c *collector) replay(opts Options, h *health, spool string, line []byte) error { + rid, payload := unwrapEnvelope(line) + event, err := hookruntime.DecodeClaudeEvent(payload, agentName) + if err != nil { + return fmt.Errorf("%w: %v", errMalformed, err) + } + if event.HookName != hook.HookPreToolUse { + return nil // injector only wires PreToolUse + } + event.SessionID = "cowork-" + event.SessionID + req, err := localruntime.EvaluateRequestFromEvent(event) + if err != nil { + return fmt.Errorf("%w: %v", errMalformed, err) + } + res, err := send(opts.SocketPath, req) + if err != nil { + return err + } + if !res.Allowed { + h.denied++ + } + if rid == "" { + return nil // observe hook: nothing is waiting + } + // A failed decision write is logged but not retried: the offset advances + // (the event is ingested) and the waiting hook fails closed on its own. + if !ridPattern.MatchString(rid) { + opts.Diagnostic.Printf("cowork observe: rejected decision request id %q\n", rid) + return nil + } + if err := writeDecision(filepath.Join(filepath.Dir(spool), decisionsDirName), rid, res); err != nil { + opts.Diagnostic.Printf("cowork observe: write decision %s: %v\n", rid, err) + } + return nil +} + +// writeDecision renders the verdict in the exact PreToolUse output shape the +// bundled CLI honors and parks it where the enforce hook is polling. +func writeDecision(dir, rid string, res localruntime.EvaluateResult) error { + result := localruntime.ResultFromEvaluateResult(res) + data, err := hookruntime.EncodeClaudeResult(hook.HookPreToolUse.String(), result) + if err != nil { + return err + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return err + } + return writeFileAtomic(filepath.Join(dir, rid+".json"), data, 0o644) +} + +// cleanupOrphanDecisions removes decision files nobody consumed (hook killed, +// session gone). The hook deletes its own file on the happy path. +func cleanupOrphanDecisions(opts Options, sessionDir string) { + files, _ := filepath.Glob(filepath.Join(sessionDir, decisionsDirName, "*.json")) + cutoff := time.Now().Add(-10 * time.Minute) + for _, file := range files { + if info, err := os.Stat(file); err == nil && info.ModTime().Before(cutoff) { + if err := os.Remove(file); err != nil { + opts.Diagnostic.Printf("cowork observe: remove orphan decision %s: %v\n", file, err) + } + } + } +} + +func send(socketPath string, req localruntime.EvaluateRequest) (localruntime.EvaluateResult, error) { + var res localruntime.EvaluateResult + conn, err := net.DialTimeout("unix", socketPath, 5*time.Second) + if err != nil { + return res, err + } + defer conn.Close() + _ = conn.SetDeadline(time.Now().Add(10 * time.Second)) + if err := localruntime.WriteMessage(conn, req); err != nil { + return res, err + } + if err := localruntime.ReadMessage(conn, &res); err != nil { + return res, err + } + return res, nil +} diff --git a/internal/coworkobserve/coworkobserve_shell_test.go b/internal/coworkobserve/coworkobserve_shell_test.go new file mode 100644 index 0000000..482853c --- /dev/null +++ b/internal/coworkobserve/coworkobserve_shell_test.go @@ -0,0 +1,186 @@ +package coworkobserve + +import ( + "bytes" + "encoding/json" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" +) + +// These tests run the actual generated hook command strings under /bin/sh, the +// way Cowork's bundled CLI does. They guard the shell behavior the rest of the +// suite cannot reach: $-relative path resolution, rid generation, the spool +// append, the decision poll, the stdout shape, and fail-closed timeout. The +// commands write to ../ (the session dir) because the hook's cwd is the +// session's outputs/ mount, so each test runs sh with cwd = /outputs. + +func shellTestDir(t *testing.T) (sessionDir, outputsDir string) { + t.Helper() + sessionDir = t.TempDir() + outputsDir = filepath.Join(sessionDir, "outputs") + if err := os.MkdirAll(outputsDir, 0o755); err != nil { + t.Fatal(err) + } + return sessionDir, outputsDir +} + +func runHook(t *testing.T, command, cwd, stdin string) (string, error) { + t.Helper() + cmd := exec.Command("sh", "-c", command) + cmd.Dir = cwd + cmd.Stdin = strings.NewReader(stdin) + out, err := cmd.Output() + return string(out), err +} + +func TestShellObserveAppendsToSessionSpool(t *testing.T) { + sessionDir, outputsDir := shellTestDir(t) + spool := filepath.Join(sessionDir, spoolName) + + out, err := runHook(t, observeHookCommand, outputsDir, eventLine("Bash")) + if err != nil { + t.Fatalf("observe hook exited non-zero: %v", err) + } + if out != "" { + t.Fatalf("observe hook wrote to stdout: %q", out) + } + data, err := os.ReadFile(spool) + if err != nil { + t.Fatalf("spool not written at the session dir: %v", err) + } + if string(data) != eventLine("Bash")+"\n" { + t.Fatalf("spool = %q, want the event line", data) + } + + // A second invocation appends rather than truncating. + if _, err := runHook(t, observeHookCommand, outputsDir, eventLine("Read")); err != nil { + t.Fatalf("second observe hook exited non-zero: %v", err) + } + data, _ = os.ReadFile(spool) + if want := eventLine("Bash") + "\n" + eventLine("Read") + "\n"; string(data) != want { + t.Fatalf("spool after two events = %q, want %q", data, want) + } +} + +func TestShellEnforceEmptyStdinDenies(t *testing.T) { + sessionDir, outputsDir := shellTestDir(t) + + out, err := runHook(t, enforceHookCommand, outputsDir, "") + if err != nil { + t.Fatalf("enforce hook exited non-zero: %v", err) + } + if out != denyJSON+"\n" { + t.Fatalf("stdout = %q, want the deny verdict", out) + } + if _, err := os.Stat(filepath.Join(sessionDir, spoolName)); !os.IsNotExist(err) { + t.Fatalf("spool created for empty stdin (err=%v)", err) + } +} + +func TestShellEnforceEmitsParkedDecision(t *testing.T) { + sessionDir, outputsDir := shellTestDir(t) + + cmd := exec.Command("sh", "-c", enforceHookCommand) + cmd.Dir = outputsDir + cmd.Stdin = strings.NewReader(eventLine("Bash")) + var stdout bytes.Buffer + cmd.Stdout = &stdout + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + // The rid is generated inside the shell, so we cannot predict it. The hook + // writes the envelope (carrying the rid) before it starts polling, so read + // the spool to learn the rid, then park the decision the hook is waiting on. + spool := filepath.Join(sessionDir, spoolName) + var rid string + for i := 0; i < 100 && rid == ""; i++ { + if line := bytes.TrimSpace(readFileMaybe(spool)); len(line) > 0 { + var env spoolEnvelope + if json.Unmarshal(line, &env) == nil { + rid = env.RID + } + } + if rid == "" { + time.Sleep(20 * time.Millisecond) + } + } + if rid == "" { + _ = cmd.Process.Kill() + t.Fatal("hook never spooled an envelope with a rid") + } + if !ridPattern.MatchString(rid) { + _ = cmd.Process.Kill() + t.Fatalf("shell-generated rid %q does not match ridPattern", rid) + } + + decision := `{"hookSpecificOutput":{"hookEventName":"PreToolUse","permissionDecision":"allow","permissionDecisionReason":"ok"}}` + decDir := filepath.Join(sessionDir, decisionsDirName) + if err := os.MkdirAll(decDir, 0o755); err != nil { + t.Fatal(err) + } + decFile := filepath.Join(decDir, rid+".json") + if err := writeFileAtomic(decFile, []byte(decision), 0o644); err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + select { + case err := <-done: + if err != nil { + t.Fatalf("hook exited non-zero: %v", err) + } + case <-time.After(5 * time.Second): + _ = cmd.Process.Kill() + t.Fatal("hook did not exit after the decision was parked") + } + + if !strings.Contains(stdout.String(), decision) { + t.Fatalf("stdout = %q, want the parked decision", stdout.String()) + } + if _, err := os.Stat(decFile); !os.IsNotExist(err) { + t.Fatalf("hook did not delete its consumed decision file (err=%v)", err) + } +} + +func TestShellEnforceFailsClosedWithoutDecision(t *testing.T) { + // The real poll loop is 100 x 0.1s = 10s. Substitute a 3-iteration loop so + // the test stays fast while exercising the same fail-closed path, and + // assert the real constant still has the 100-count loop so the substitution + // cannot silently drift out of sync. + if !strings.Contains(enforceHookCommand, `"$i" -lt 100`) { + t.Fatal("enforce poll-loop bound changed; update this test's substitution") + } + fast := strings.Replace(enforceHookCommand, `"$i" -lt 100`, `"$i" -lt 3`, 1) + + sessionDir, outputsDir := shellTestDir(t) + out, err := runHook(t, fast, outputsDir, eventLine("Bash")) + if err != nil { + t.Fatalf("hook exited non-zero: %v", err) + } + if out != denyJSON+"\n" { + t.Fatalf("stdout = %q, want the deny verdict", out) + } + // The event is still spooled before the poll loop, so the daemon ingests it + // even though the in-VM call was denied. + data, err := os.ReadFile(filepath.Join(sessionDir, spoolName)) + if err != nil { + t.Fatalf("spool not written before the poll loop: %v", err) + } + if !bytes.Contains(data, []byte(`"rid"`)) { + t.Fatalf("envelope not spooled: %s", data) + } +} + +func readFileMaybe(path string) []byte { + data, err := os.ReadFile(path) + if err != nil { + return nil + } + return data +} diff --git a/internal/coworkobserve/coworkobserve_test.go b/internal/coworkobserve/coworkobserve_test.go new file mode 100644 index 0000000..0044529 --- /dev/null +++ b/internal/coworkobserve/coworkobserve_test.go @@ -0,0 +1,646 @@ +package coworkobserve + +import ( + "bytes" + "encoding/json" + "io" + "net" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/kontext-security/kontext-cli/internal/diagnostic" + guardhookruntime "github.com/kontext-security/kontext-cli/internal/guard/hookruntime" + "github.com/kontext-security/kontext-cli/internal/localruntime" +) + +// fakeDaemon accepts localruntime evaluate requests on a unix socket and +// records them, mimicking the managed-observe daemon. +type fakeDaemon struct { + listener net.Listener + mu sync.Mutex + requests []localruntime.EvaluateRequest + result *localruntime.EvaluateResult +} + +func (d *fakeDaemon) setResult(res localruntime.EvaluateResult) { + d.mu.Lock() + defer d.mu.Unlock() + d.result = &res +} + +func startFakeDaemon(t *testing.T) (*fakeDaemon, string) { + t.Helper() + dir, err := os.MkdirTemp("", "kx") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + socketPath := filepath.Join(dir, "s.sock") + listener, err := net.Listen("unix", socketPath) + if err != nil { + t.Fatalf("listen: %v", err) + } + d := &fakeDaemon{listener: listener} + t.Cleanup(func() { _ = listener.Close() }) + go func() { + for { + conn, err := listener.Accept() + if err != nil { + return + } + go func(conn net.Conn) { + defer conn.Close() + var req localruntime.EvaluateRequest + if err := localruntime.ReadMessage(conn, &req); err != nil { + return + } + d.mu.Lock() + d.requests = append(d.requests, req) + res := localruntime.EvaluateResult{Type: "result", Decision: "allow", Allowed: true} + if d.result != nil { + res = *d.result + } + d.mu.Unlock() + _ = localruntime.WriteMessage(conn, res) + }(conn) + } + }() + return d, socketPath +} + +func (d *fakeDaemon) toolNames() []string { + d.mu.Lock() + defer d.mu.Unlock() + names := make([]string, 0, len(d.requests)) + for _, req := range d.requests { + names = append(names, req.ToolName) + } + return names +} + +func testOptions(t *testing.T, socketPath string) Options { + t.Helper() + return Options{ + SocketPath: socketPath, + SessionsRoot: t.TempDir(), + Diagnostic: diagnostic.New(io.Discard, false), + } +} + +func writeSpool(t *testing.T, path, content string) { + t.Helper() + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + t.Fatal(err) + } + defer f.Close() + if _, err := f.WriteString(content); err != nil { + t.Fatal(err) + } +} + +func eventLine(tool string) string { + return `{"session_id":"s1","hook_event_name":"PreToolUse","tool_name":"` + tool + `","tool_input":{"command":"x"},"tool_use_id":"tu-` + tool + `","cwd":"/w"}` +} + +func TestMergeSettingsPreservesExistingContent(t *testing.T) { + existing := []byte(`{"model":"opus","hooks":{"PreToolUse":[{"matcher":"Bash","hooks":[{"type":"command","command":"echo hi"}]}],"Stop":[{"hooks":[{"type":"command","command":"echo bye"}]}]}}`) + merged, needed := mergeSettings(existing, hookEntry(guardhookruntime.ModeObserve)) + if !needed { + t.Fatal("mergeSettings reported no write needed for foreign settings") + } + var settings map[string]any + if err := json.Unmarshal(merged, &settings); err != nil { + t.Fatalf("merged settings are not valid JSON: %v", err) + } + if settings["model"] != "opus" { + t.Fatalf("model dropped: %v", settings["model"]) + } + hooks := settings["hooks"].(map[string]any) + if _, ok := hooks["Stop"]; !ok { + t.Fatal("Stop hooks dropped") + } + pre := hooks["PreToolUse"].([]any) + if len(pre) != 2 { + t.Fatalf("PreToolUse entries = %d, want existing + ours", len(pre)) + } + if !bytes.Contains(merged, []byte("echo hi")) || !bytes.Contains(merged, []byte(settingsMark)) { + t.Fatal("merged settings missing existing hook or our spool hook") + } + + // A second merge is a no-op. + if _, needed := mergeSettings(merged, hookEntry(guardhookruntime.ModeObserve)); needed { + t.Fatal("mergeSettings wants to rewrite settings that already carry the hook") + } +} + +func TestMergeSettingsFromEmptyAndInvalid(t *testing.T) { + for _, existing := range [][]byte{nil, []byte(" "), []byte("{broken")} { + merged, needed := mergeSettings(existing, hookEntry(guardhookruntime.ModeObserve)) + if !needed { + t.Fatalf("mergeSettings(%q) reported no write needed", existing) + } + var settings map[string]any + if err := json.Unmarshal(merged, &settings); err != nil { + t.Fatalf("merged settings are not valid JSON: %v", err) + } + if !bytes.Contains(merged, []byte(settingsMark)) { + t.Fatal("merged settings missing spool hook") + } + } +} + +func TestMergeSettingsReplacesStaleVariantOnModeSwitch(t *testing.T) { + observed, needed := mergeSettings(nil, hookEntry(guardhookruntime.ModeObserve)) + if !needed { + t.Fatal("initial observe merge reported no write needed") + } + + enforced, needed := mergeSettings(observed, hookEntry(guardhookruntime.ModeEnforce)) + if !needed { + t.Fatal("mode switch reported no write needed") + } + var settings map[string]any + if err := json.Unmarshal(enforced, &settings); err != nil { + t.Fatalf("merged settings are not valid JSON: %v", err) + } + pre := settings["hooks"].(map[string]any)["PreToolUse"].([]any) + if len(pre) != 1 { + t.Fatalf("PreToolUse entries = %d, want stale observe variant replaced", len(pre)) + } + if !bytes.Contains(enforced, []byte(decisionsDirName)) { + t.Fatal("enforce variant missing decision poll") + } + if _, needed := mergeSettings(enforced, hookEntry(guardhookruntime.ModeEnforce)); needed { + t.Fatal("repeat enforce merge should be a no-op") + } +} + +func TestInjectAndHealthTracking(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + claudeDir := filepath.Join(sessionDir, ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + + h := newHealth() + inject(opts, h) + if len(h.sessionsSeen) != 1 || len(h.written) != 1 { + t.Fatalf("seen=%d written=%d, want 1/1", len(h.sessionsSeen), len(h.written)) + } + settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) + if err != nil { + t.Fatalf("settings.json not written: %v", err) + } + if !bytes.Contains(settings, []byte(settingsMark)) { + t.Fatalf("settings.json missing spool hook: %s", settings) + } + + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + c := &collector{offsets: map[string]int64{}} + c.collect(opts, h) + if len(h.spooled) != 1 || h.eventsReplayed != 1 { + t.Fatalf("spooled=%d replayed=%d, want 1/1", len(h.spooled), h.eventsReplayed) + } + if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { + t.Fatalf("replayed tools = %v, want [Bash]", got) + } +} + +func TestDrainLeavesPartialTrailingLine(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + + writeSpool(t, spool, eventLine("Bash")+"\n"+`{"session_id":"s1","hook_event`) + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) + + if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { + t.Fatalf("replayed tools = %v, want [Bash]", got) + } + wantOffset := int64(len(eventLine("Bash")) + 1) + if c.offsets[spool] != wantOffset { + t.Fatalf("offset = %d, want %d (end of last complete line)", c.offsets[spool], wantOffset) + } + + // Completing the partial line replays it on the next tick. + writeSpool(t, spool, `_name":"PreToolUse","tool_name":"Read","tool_use_id":"tu-2"}`+"\n") + c.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 2 || got[1] != "Read" { + t.Fatalf("replayed tools = %v, want [Bash Read]", got) + } +} + +func TestDrainHaltsOnTransportErrorAndRetries(t *testing.T) { + dir, err := os.MkdirTemp("", "kx") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + deadSocket := filepath.Join(dir, "dead.sock") + opts := testOptions(t, deadSocket) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n"+eventLine("Read")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) + if c.offsets[spool] != 0 { + t.Fatalf("offset advanced to %d past undelivered events", c.offsets[spool]) + } + + // Daemon comes back; both events are delivered in order. + daemon, socketPath := startFakeDaemon(t) + opts.SocketPath = socketPath + c.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 2 || got[0] != "Bash" || got[1] != "Read" { + t.Fatalf("replayed tools = %v, want [Bash Read]", got) + } +} + +func TestDrainSkipsMalformedLines(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, "{not json\n"+eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 1 || got[0] != "Bash" { + t.Fatalf("replayed tools = %v, want [Bash]", got) + } + c.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 1 { + t.Fatalf("malformed line was retried: %v", got) + } +} + +func TestOffsetsPersistAcrossRestarts(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + statePath := filepath.Join(t.TempDir(), "offsets.json") + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := newCollector(statePath) + c.drain(opts, newHealth(), spool) + c.saveOffsets(opts) + + // A restarted collector must not re-replay the already-ingested event. + restarted := newCollector(statePath) + restarted.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 1 { + t.Fatalf("replayed tools after restart = %v, want exactly one Bash", got) + } + + // New events appended after the restart still flow. + writeSpool(t, spool, eventLine("Grep")+"\n") + restarted.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 2 || got[1] != "Grep" { + t.Fatalf("replayed tools = %v, want [Bash Grep]", got) + } +} + +func TestReplayDecodesCamelCaseAndPermissionMode(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, `{"sessionId":"s9","hookEventName":"PreToolUse","toolName":"Bash","toolInput":{"command":"ls"},"toolUseId":"tu-9","cwd":"/w","permission_mode":"acceptEdits"}`+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) + + daemon.mu.Lock() + defer daemon.mu.Unlock() + if len(daemon.requests) != 1 { + t.Fatalf("requests = %d, want 1", len(daemon.requests)) + } + req := daemon.requests[0] + if req.SessionID != "cowork-s9" || req.Agent != "cowork" { + t.Fatalf("session/agent = %q/%q", req.SessionID, req.Agent) + } + if req.ToolName != "Bash" || req.ToolUseID != "tu-9" { + t.Fatalf("camelCase fields dropped: tool=%q toolUseID=%q", req.ToolName, req.ToolUseID) + } + if req.PermissionMode != "acceptEdits" { + t.Fatalf("permission mode dropped: %q", req.PermissionMode) + } + if !bytes.Contains(req.ToolInput, []byte(`"ls"`)) { + t.Fatalf("tool input dropped: %s", req.ToolInput) + } +} + +func TestCollectCleansUpDrainedIdleSpools(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + h := newHealth() + c.collect(opts, h) + if _, err := os.Stat(spool); err != nil { + t.Fatalf("fresh drained spool was removed: %v", err) + } + + // Once idle past the retention window, the drained spool is deleted and + // its offset entry dropped. + old := time.Now().Add(-2 * spoolRetention) + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + c.collect(opts, h) + if _, err := os.Stat(spool); !os.IsNotExist(err) { + t.Fatalf("idle drained spool still present (err=%v)", err) + } + if _, ok := c.offsets[spool]; ok { + t.Fatal("offset entry for removed spool not pruned") + } +} + +func TestCollectKeepsUndrainedSpools(t *testing.T) { + dir, err := os.MkdirTemp("", "kx") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + opts := testOptions(t, filepath.Join(dir, "dead.sock")) // daemon down: nothing drains + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + old := time.Now().Add(-2 * spoolRetention) + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + + c := &collector{offsets: map[string]int64{}} + c.collect(opts, newHealth()) + if _, err := os.Stat(spool); err != nil { + t.Fatalf("undrained spool was removed: %v", err) + } +} + +func TestEnforceReplayWritesDecisionFile(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + daemon.setResult(localruntime.EvaluateResult{ + Type: "result", + Decision: "deny", + Allowed: false, + Reason: "blocked by policy", + }) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, `{"rid":"123-456","event":`+eventLine("Bash")+`}`+"\n") + + c := &collector{offsets: map[string]int64{}} + h := newHealth() + c.collect(opts, h) + + decision, err := os.ReadFile(filepath.Join(sessionDir, decisionsDirName, "123-456.json")) + if err != nil { + t.Fatalf("decision file not written: %v", err) + } + var out map[string]any + if err := json.Unmarshal(decision, &out); err != nil { + t.Fatalf("decision is not valid JSON: %v", err) + } + specific := out["hookSpecificOutput"].(map[string]any) + if specific["permissionDecision"] != "deny" { + t.Fatalf("permissionDecision = %v, want deny", specific["permissionDecision"]) + } + if specific["permissionDecisionReason"] != "blocked by policy" { + t.Fatalf("reason = %v", specific["permissionDecisionReason"]) + } + if h.denied != 1 { + t.Fatalf("denied counter = %d, want 1", h.denied) + } +} + +func TestObserveLinesWriteNoDecision(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.collect(opts, newHealth()) + if _, err := os.Stat(filepath.Join(sessionDir, decisionsDirName)); !os.IsNotExist(err) { + t.Fatalf("decisions dir created for observe-mode line (err=%v)", err) + } +} + +func TestReplayRejectsUnsafeRequestIDs(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + if err := os.MkdirAll(sessionDir, 0o755); err != nil { + t.Fatal(err) + } + spool := filepath.Join(sessionDir, spoolName) + writeSpool(t, spool, `{"rid":"../escape","event":`+eventLine("Bash")+`}`+"\n") + + c := &collector{offsets: map[string]int64{}} + c.collect(opts, newHealth()) + + // The event is still ingested, but no decision file may be written for a + // rid that could steer the path (".." alone has no separator, but the + // pattern rejects "/" outright). + if got := daemon.toolNames(); len(got) != 1 { + t.Fatalf("replayed tools = %v, want the event ingested", got) + } + if _, err := os.Stat(filepath.Join(sessionDir, decisionsDirName)); !os.IsNotExist(err) { + t.Fatalf("decision written for unsafe rid (err=%v)", err) + } +} + +func TestDrainResetsOffsetWhenSpoolShrinks(t *testing.T) { + daemon, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) + + // Spool recreated (e.g. cleaned up and the hook started a fresh one). The + // fresh file is smaller than the old offset, which signals the reset. + if err := os.Remove(spool); err != nil { + t.Fatal(err) + } + writeSpool(t, spool, `{"session_id":"s1","hook_event_name":"PreToolUse","tool_name":"Read","tool_use_id":"t"}`+"\n") + c.drain(opts, newHealth(), spool) + if got := daemon.toolNames(); len(got) != 2 || got[1] != "Read" { + t.Fatalf("replayed tools = %v, want [Bash Read]", got) + } +} + +// A session already running when the daemon comes up has a long-frozen .claude +// dir modtime, so the steady-state inject skips it. The startup pass must still +// re-merge it to the configured mode — this is both the late-daemon-start and +// the observe->enforce mode-switch case, since a mode change needs a restart. +func TestReinjectExistingRemergesRunningSessionOnRestart(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_abc") + claudeDir := filepath.Join(sessionDir, ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + + // A session carrying the observe hook, its dir modtime long frozen. + observed, _ := mergeSettings(nil, hookEntry(guardhookruntime.ModeObserve)) + if err := os.WriteFile(filepath.Join(claudeDir, "settings.json"), observed, 0o644); err != nil { + t.Fatal(err) + } + stale := time.Now().Add(-30 * time.Minute) + if err := os.Chtimes(claudeDir, stale, stale); err != nil { + t.Fatal(err) + } + + // inject (steady state) skips it — too old. + hTick := newHealth() + opts.Mode = guardhookruntime.ModeEnforce + inject(opts, hTick) + if len(hTick.sessionsSeen) != 0 { + t.Fatalf("steady-state inject visited a frozen session: seen=%d", len(hTick.sessionsSeen)) + } + + // The startup pass re-reaches it, re-merges observe -> enforce, and records + // it so the heartbeat can warn if hooking ever fails. + h := newHealth() + reinjectExisting(opts, h) + if len(h.sessionsSeen) != 1 || len(h.written) != 1 { + t.Fatalf("startup pass did not record the running session: seen=%d written=%d", len(h.sessionsSeen), len(h.written)) + } + settings, err := os.ReadFile(filepath.Join(claudeDir, "settings.json")) + if err != nil { + t.Fatal(err) + } + if !bytes.Contains(settings, []byte(decisionsDirName)) { + t.Fatalf("startup pass did not re-merge to the enforce hook: %s", settings) + } +} + +// The startup pass must not write into the pile of long-dead session dirs +// Cowork leaves behind. +func TestReinjectExistingSkipsAbandonedSessions(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + claudeDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_old", ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + ancient := time.Now().Add(-2 * reinjectWindow) + if err := os.Chtimes(claudeDir, ancient, ancient); err != nil { + t.Fatal(err) + } + + h := newHealth() + reinjectExisting(opts, h) + if len(h.sessionsSeen) != 0 { + t.Fatalf("startup pass touched an abandoned session: seen=%d", len(h.sessionsSeen)) + } + if _, err := os.Stat(filepath.Join(claudeDir, "settings.json")); !os.IsNotExist(err) { + t.Fatal("startup pass wrote settings into an abandoned session dir") + } +} + +// When the startup pass writes a first-time hook onto a session that may +// already be running (no hook of ours was there), it cannot know the CLI's +// settings watcher ever watched the dir, so it must record the session as +// unverified, not working. A later spool is ground truth and promotes it. +func TestReinjectExistingFlagsFreshHookUnverifiedUntilSpooled(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + sessionDir := filepath.Join(opts.SessionsRoot, "acct", "ws", "local_pre") + claudeDir := filepath.Join(sessionDir, ".claude") + if err := os.MkdirAll(claudeDir, 0o755); err != nil { + t.Fatal(err) + } + // Recent (within the window) but with no hook of ours: the daemon was down + // when this session's CLI started, so we are creating its first settings + // file only now. + stale := time.Now().Add(-30 * time.Minute) + if err := os.Chtimes(claudeDir, stale, stale); err != nil { + t.Fatal(err) + } + + h := newHealth() + reinjectExisting(opts, h) + if len(h.written) != 0 { + t.Fatalf("first-time hook on a pre-existing session was trusted: written=%d", len(h.written)) + } + if len(h.unverified) != 1 { + t.Fatalf("first-time hook not flagged unverified: unverified=%d", len(h.unverified)) + } + // Still written best-effort, in case the dir happens to be watched. + if _, err := os.Stat(filepath.Join(claudeDir, "settings.json")); err != nil { + t.Fatalf("best-effort settings.json not written: %v", err) + } + + // A spool confirms the hook actually fires: promote to written, clear unverified. + writeSpool(t, filepath.Join(sessionDir, spoolName), eventLine("Bash")+"\n") + c := &collector{offsets: map[string]int64{}} + c.collect(opts, h) + if len(h.unverified) != 0 || !h.written[claudeDir] { + t.Fatalf("spool did not confirm the session: unverified=%d written=%v", len(h.unverified), h.written[claudeDir]) + } +} + +// cleanup must not unlink a drained, idle spool if a hook appends a fresh event +// in the window between the drained check and the remove — the re-stat guard +// has to catch the change and leave the file for the next tick to drain. +func TestCleanupSkipsSpoolAppendedInRemoveWindow(t *testing.T) { + _, socketPath := startFakeDaemon(t) + opts := testOptions(t, socketPath) + spool := filepath.Join(t.TempDir(), spoolName) + writeSpool(t, spool, eventLine("Bash")+"\n") + + c := &collector{offsets: map[string]int64{}} + c.drain(opts, newHealth(), spool) // offset now == size: fully drained + old := time.Now().Add(-2 * spoolRetention) + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + + // Simulate a hook landing a new event in the stat/remove window. + beforeSpoolRemove = func(s string) { + writeSpool(t, s, eventLine("Read")+"\n") + } + t.Cleanup(func() { beforeSpoolRemove = nil }) + + c.cleanup(opts, spool) + if _, err := os.Stat(spool); err != nil { + t.Fatalf("spool unlinked despite an append in the remove window: %v", err) + } + + // Without further appends, the next cleanup tick removes it normally. + beforeSpoolRemove = nil + c.drain(opts, newHealth(), spool) // drain the event that landed + if err := os.Chtimes(spool, old, old); err != nil { + t.Fatal(err) + } + c.cleanup(opts, spool) + if _, err := os.Stat(spool); !os.IsNotExist(err) { + t.Fatalf("drained idle spool not removed on a quiet tick (err=%v)", err) + } +} diff --git a/internal/managedconfig/config.go b/internal/managedconfig/config.go index 83d5c1b..e6dc5cf 100644 --- a/internal/managedconfig/config.go +++ b/internal/managedconfig/config.go @@ -21,8 +21,11 @@ import ( const ( Version = "managed-install-v1" - Mode = "observe" - Agent = "claude" + // Mode is the default posture; ModeEnforce turns daemon decisions into + // real denies at every hook edge (Claude Code and Cowork alike). + Mode = "observe" + ModeEnforce = "enforce" + Agent = "claude" DefaultPath = "/Library/Application Support/Kontext/managed.json" EnvPath = "KONTEXT_MANAGED_CONFIG" @@ -44,6 +47,11 @@ type Config struct { Agent string `json:"agent"` Credentials Credentials `json:"credentials"` Device Device `json:"device,omitempty"` + // CoworkEnabled turns on Claude Cowork observation/enforcement in the + // managed-observe daemon (the posture follows Mode). A managed.json field + // so MDM-deployed installs control it through config rather than launchd + // environment plumbing. + CoworkEnabled bool `json:"cowork_enabled,omitempty"` } type Credentials struct { @@ -201,8 +209,8 @@ func normalizeAndValidate(cfg Config) (Config, error) { if err := validateCloudURL(cfg.CloudURL); err != nil { return Config{}, err } - if cfg.Mode != Mode { - return Config{}, fmt.Errorf("mode must be %q", Mode) + if cfg.Mode != Mode && cfg.Mode != ModeEnforce { + return Config{}, fmt.Errorf("mode must be %q or %q", Mode, ModeEnforce) } if cfg.Agent != Agent { return Config{}, fmt.Errorf("agent must be %q", Agent) diff --git a/internal/managedconfig/config_test.go b/internal/managedconfig/config_test.go index b10efad..6ea482e 100644 --- a/internal/managedconfig/config_test.go +++ b/internal/managedconfig/config_test.go @@ -44,6 +44,38 @@ func TestParseValidConfigNormalizesStrings(t *testing.T) { } } +func TestParseCoworkEnabled(t *testing.T) { + cfg, err := Parse([]byte(validConfigJSON())) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if cfg.CoworkEnabled { + t.Fatal("cowork_enabled should default to false") + } + withFlag := strings.Replace(validConfigJSON(), `"mode": "observe",`, `"mode": "observe", + "cowork_enabled": true,`, 1) + cfg, err = Parse([]byte(withFlag)) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if !cfg.CoworkEnabled { + t.Fatal("cowork_enabled = false, want true") + } +} + +func TestParseModeObserveAndEnforce(t *testing.T) { + cfg, err := Parse([]byte(strings.Replace(validConfigJSON(), `"mode": "observe"`, `"mode": "enforce"`, 1))) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if cfg.Mode != ModeEnforce { + t.Fatalf("Mode = %q, want %q", cfg.Mode, ModeEnforce) + } + if _, err := Parse([]byte(strings.Replace(validConfigJSON(), `"mode": "observe"`, `"mode": "block"`, 1))); err == nil { + t.Fatal("Parse() accepted unknown mode") + } +} + func TestParseRejectsUnknownFields(t *testing.T) { _, err := Parse([]byte(`{ "version": "managed-install-v1", diff --git a/internal/managedobserve/daemon.go b/internal/managedobserve/daemon.go index 9bd5e66..c230e89 100644 --- a/internal/managedobserve/daemon.go +++ b/internal/managedobserve/daemon.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "net/http" + "path/filepath" "time" + "github.com/kontext-security/kontext-cli/internal/coworkobserve" "github.com/kontext-security/kontext-cli/internal/diagnostic" guardhookruntime "github.com/kontext-security/kontext-cli/internal/guard/hookruntime" "github.com/kontext-security/kontext-cli/internal/guard/store/sqlite" @@ -58,11 +60,18 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { opts.Diagnostic.Printf("managed observe cleanup: %v\n", err) } + // The deployment-level mode from managed.json drives every hook edge: + // observe records would-decisions, enforce returns real denies. + mode, err := guardhookruntime.ParseMode(loadedConfig.Config.Mode) + if err != nil { + return fmt.Errorf("parse managed mode: %w", err) + } + host, err := runtimehost.Start(ctx, runtimehost.Options{ AgentName: managedconfig.Agent, DBPath: dbPath, SocketPath: socketPath, - Mode: guardhookruntime.ModeObserve, + Mode: mode, Diagnostic: opts.Diagnostic, SkipInitialSession: true, DisableAsyncIngest: true, @@ -91,6 +100,18 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error { }) }() + // Cowork observation runs alongside Claude Code in the same daemon, replaying + // in-VM Cowork tool events into the same localruntime socket as agent "cowork". + // Enabled via managed.json (cowork_enabled) or the env var override. + if loadedConfig.Config.CoworkEnabled || coworkobserve.Enabled() { + go coworkobserve.Run(ctx, coworkobserve.Options{ + SocketPath: socketPath, + StatePath: filepath.Join(filepath.Dir(dbPath), "cowork-spool-offsets.json"), + Mode: mode, + Diagnostic: opts.Diagnostic, + }) + } + idleTimeout := opts.IdleTimeout if idleTimeout == 0 { idleTimeout = DefaultIdleTimeout()