diff --git a/internal/mcp/mcp.go b/internal/mcp/mcp.go index ace26ff..bc54430 100644 --- a/internal/mcp/mcp.go +++ b/internal/mcp/mcp.go @@ -96,6 +96,7 @@ var ProfileAgent = map[string]bool{ "mem_update": true, // update observation by ID — skills say "use mem_update when you have an exact ID to correct" "mem_current_project": true, // detect current project — recommended first call for agents (REQ-313) "mem_judge": true, // record verdict on a pending memory conflict (REQ-003, Phase D) + "mem_consolidate": true, // atomic merge N observations into 1 — garbage collection for agents } // ProfileAdmin contains tools for TUI, dashboards, and manual curation @@ -363,6 +364,45 @@ Examples: ) } + // ─── mem_consolidate (profile: agent) ──────────────────────────────── + if shouldRegister("mem_consolidate", allowlist) { + srv.AddTool( + mcp.NewTool("mem_consolidate", + mcp.WithDescription(`Atomically merge multiple observations into one. Soft-deletes the source observations and creates a new consolidated observation in a single transaction. If the insert fails, all deletes are rolled back. + +Use this when an agent has accumulated multiple related observations over time (e.g., iterations on the same architecture decision) and wants to clean up into a single, authoritative observation.`), + mcp.WithDeferLoading(true), + mcp.WithTitleAnnotation("Consolidate Memories"), + mcp.WithReadOnlyHintAnnotation(false), + mcp.WithDestructiveHintAnnotation(true), + mcp.WithIdempotentHintAnnotation(false), + mcp.WithOpenWorldHintAnnotation(false), + mcp.WithArray("ids_to_delete", + mcp.Required(), + mcp.Description("Array of observation IDs to soft-delete and consolidate"), + ), + mcp.WithString("new_title", + mcp.Required(), + mcp.Description("Title for the new consolidated observation"), + ), + mcp.WithString("new_content", + mcp.Required(), + mcp.Description("Content for the new consolidated observation"), + ), + mcp.WithString("type", + mcp.Description("Category: decision, architecture, bugfix, pattern, config, discovery, learning (default: manual)"), + ), + mcp.WithString("scope", + mcp.Description("Scope: project (default) or personal"), + ), + mcp.WithString("topic_key", + mcp.Description("Optional topic key for the consolidated observation"), + ), + ), + handleConsolidate(s, cfg, activity), + ) + } + // ─── mem_suggest_topic_key (profile: agent, deferred) ─────────────── if shouldRegister("mem_suggest_topic_key", allowlist) { srv.AddTool( @@ -1031,6 +1071,78 @@ func handleSave(s *store.Store, cfg MCPConfig, activity *SessionActivity) server } } +func handleConsolidate(s *store.Store, cfg MCPConfig, activity *SessionActivity) server.ToolHandlerFunc { + return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + newTitle, _ := req.GetArguments()["new_title"].(string) + newContent, _ := req.GetArguments()["new_content"].(string) + typ, _ := req.GetArguments()["type"].(string) + scope, _ := req.GetArguments()["scope"].(string) + topicKey, _ := req.GetArguments()["topic_key"].(string) + + // Parse ids_to_delete from JSON array + rawIDs, _ := req.GetArguments()["ids_to_delete"].([]interface{}) + var idsToDelete []int64 + for _, raw := range rawIDs { + switch v := raw.(type) { + case float64: + idsToDelete = append(idsToDelete, int64(v)) + case json.Number: + n, _ := v.Int64() + idsToDelete = append(idsToDelete, n) + } + } + + if len(idsToDelete) == 0 { + return mcp.NewToolResultError("ids_to_delete is required and must contain at least one ID"), nil + } + if newTitle == "" { + return mcp.NewToolResultError("new_title is required"), nil + } + if newContent == "" { + return mcp.NewToolResultError("new_content is required"), nil + } + + // Auto-detect project (same pattern as handleSave) + detRes, err := resolveWriteProject() + if err != nil { + return errorWithMeta("ambiguous_project", + fmt.Sprintf("Cannot determine project: %s", err), + detRes.AvailableProjects, + ), nil + } + project := detRes.Project + normalized, _ := store.NormalizeProject(project) + project = normalized + + if typ == "" { + typ = "manual" + } + sessionID := defaultSessionID(project) + + // Ensure the session exists + s.CreateSession(sessionID, project, "") + + newID, err := s.Consolidate(idsToDelete, store.AddObservationParams{ + SessionID: sessionID, + Type: typ, + Title: newTitle, + Content: newContent, + Project: project, + Scope: scope, + TopicKey: topicKey, + }) + if err != nil { + return mcp.NewToolResultError("Failed to consolidate: " + err.Error()), nil + } + + activity.RecordSave(defaultSessionID(project)) + + msg := fmt.Sprintf("Consolidated %d observations into new observation #%d: %q", len(idsToDelete), newID, newTitle) + detRes.Project = project + return respondWithProject(detRes, msg, nil), nil + } +} + func handleSuggestTopicKey() server.ToolHandlerFunc { return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { typ, _ := req.GetArguments()["type"].(string) diff --git a/internal/mcp/mcp_test.go b/internal/mcp/mcp_test.go index 7d7bc7c..5a3df8b 100644 --- a/internal/mcp/mcp_test.go +++ b/internal/mcp/mcp_test.go @@ -962,6 +962,7 @@ func TestResolveToolsAgentProfile(t *testing.T) { "mem_update", // skills explicitly say "use mem_update when you have an exact ID to correct" "mem_current_project", // added REQ-313: discovery tool recommended first call "mem_judge", // REQ-003: conflict verdict tool (Phase D) + "mem_consolidate", // atomic merge N observations into 1 — garbage collection for agents } for _, tool := range expectedTools { if !result[tool] { @@ -1006,13 +1007,13 @@ func TestResolveToolsCombinedProfiles(t *testing.T) { t.Fatal("expected non-nil allowlist for combined profiles") } - // Should have all 17 tools (16 prior + mem_judge added in Phase D) + // Should have all 18 tools (17 prior + mem_consolidate) allTools := []string{ "mem_save", "mem_search", "mem_context", "mem_session_summary", "mem_session_start", "mem_session_end", "mem_get_observation", "mem_suggest_topic_key", "mem_capture_passive", "mem_save_prompt", "mem_update", "mem_delete", "mem_stats", "mem_timeline", "mem_merge_projects", - "mem_current_project", "mem_judge", + "mem_current_project", "mem_judge", "mem_consolidate", } for _, tool := range allTools { if !result[tool] { @@ -1598,7 +1599,7 @@ func TestNewServerWithToolsNilRegistersAll(t *testing.T) { "mem_session_start", "mem_session_end", "mem_get_observation", "mem_suggest_topic_key", "mem_capture_passive", "mem_save_prompt", "mem_update", "mem_delete", "mem_stats", "mem_timeline", "mem_merge_projects", - "mem_current_project", "mem_judge", + "mem_current_project", "mem_judge", "mem_consolidate", } for _, name := range allTools { @@ -1637,9 +1638,9 @@ func TestNewServerBackwardsCompatible(t *testing.T) { srv := NewServer(s) tools := srv.ListTools() - // 13 agent + 4 admin = 17 total (mem_judge added in Phase D) - if len(tools) != 17 { - t.Errorf("NewServer should register all 17 tools, got %d", len(tools)) + // 14 agent + 4 admin = 18 total (mem_consolidate added) + if len(tools) != 18 { + t.Errorf("NewServer should register all 18 tools, got %d", len(tools)) } } @@ -1653,9 +1654,9 @@ func TestProfileConsistency(t *testing.T) { combined[tool] = true } - // 13 agent + 4 admin = 17 total (mem_judge added in Phase D) - if len(combined) != 17 { - t.Errorf("agent + admin should cover all 17 tools, got %d", len(combined)) + // 14 agent + 4 admin = 18 total (mem_consolidate added) + if len(combined) != 18 { + t.Errorf("agent + admin should cover all 18 tools, got %d", len(combined)) } // Verify no overlap between profiles @@ -1980,9 +1981,9 @@ func TestNewServerWithConfig(t *testing.T) { t.Fatal("expected MCP server instance") } tools := srv.ListTools() - // Should have all 17 tools (13 agent + 4 admin; mem_judge added in Phase D) - if len(tools) != 17 { - t.Errorf("NewServerWithConfig should register all 17 tools, got %d", len(tools)) + // Should have all 18 tools (14 agent + 4 admin; mem_consolidate added) + if len(tools) != 18 { + t.Errorf("NewServerWithConfig should register all 18 tools, got %d", len(tools)) } } diff --git a/internal/store/store.go b/internal/store/store.go index d1d9f8e..1bb3186 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -2449,6 +2449,84 @@ func (s *Store) DeleteObservation(id int64, hardDelete bool) error { }) } +// Consolidate atomically soft-deletes source observations and creates a new +// consolidated observation in a single transaction. If the INSERT fails, all +// soft-deletes are rolled back. Non-existent IDs are silently ignored. +func (s *Store) Consolidate(idsToDelete []int64, p AddObservationParams) (int64, error) { + p.Project, _ = NormalizeProject(p.Project) + title := stripPrivateTags(p.Title) + content := stripPrivateTags(p.Content) + if len(content) > s.cfg.MaxObservationLength { + content = content[:s.cfg.MaxObservationLength] + "... [truncated]" + } + scope := normalizeScope(p.Scope) + normHash := hashNormalized(content) + topicKey := normalizeTopicKey(p.TopicKey) + + var observationID int64 + err := s.withTx(func(tx *sql.Tx) error { + // Phase 1: Soft-delete sources (ignore non-existent) + for _, id := range idsToDelete { + obs, err := s.getObservationTx(tx, id) + if err == sql.ErrNoRows { + continue // silently ignore non-existent or already-deleted + } + if err != nil { + return err + } + + deletedAt := Now() + if _, err := s.execHook(tx, + `UPDATE observations + SET deleted_at = datetime('now'), + updated_at = datetime('now') + WHERE id = ? AND deleted_at IS NULL`, + id, + ); err != nil { + return err + } + if err := tx.QueryRow(`SELECT deleted_at FROM observations WHERE id = ?`, id).Scan(&deletedAt); err != nil { + return err + } + if err := s.enqueueSyncMutationTx(tx, SyncEntityObservation, obs.SyncID, SyncOpDelete, syncObservationPayload{ + SyncID: obs.SyncID, + SessionID: obs.SessionID, + Project: obs.Project, + Deleted: true, + DeletedAt: &deletedAt, + }); err != nil { + return err + } + } + + // Phase 2: Insert new consolidated observation + syncID := newSyncID("obs") + res, err := s.execHook(tx, + `INSERT INTO observations (sync_id, session_id, type, title, content, tool_name, project, scope, topic_key, normalized_hash, revision_count, duplicate_count, last_seen_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 1, datetime('now'), datetime('now'))`, + syncID, p.SessionID, p.Type, title, content, + nullableString(p.ToolName), nullableString(p.Project), scope, nullableString(topicKey), normHash, + ) + if err != nil { + return err + } + observationID, err = res.LastInsertId() + if err != nil { + return err + } + obs, err := s.getObservationTx(tx, observationID) + if err != nil { + return err + } + return s.enqueueSyncMutationTx(tx, SyncEntityObservation, obs.SyncID, SyncOpUpsert, observationPayloadFromObservation(obs)) + }) + if err != nil { + return 0, err + } + return observationID, nil +} + + // ─── Timeline ──────────────────────────────────────────────────────────────── // // Timeline provides chronological context around a specific observation. diff --git a/internal/store/store_test.go b/internal/store/store_test.go index 81e85c2..90ae6b7 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -7004,3 +7004,200 @@ func TestAddObservation_DecayNotAppliedToExistingRows(t *testing.T) { t.Errorf("revision must not overwrite review_after: was %q, now %q", ra1, ra2) } } + +func TestConsolidateDeletesSourcesAndCreatesNewObservation(t *testing.T) { + s := newTestStore(t) + + if err := s.CreateSession("s1", "engram", "/tmp/engram"); err != nil { + t.Fatalf("create session: %v", err) + } + + var ids []int64 + for i := 0; i < 3; i++ { + id, err := s.AddObservation(AddObservationParams{ + SessionID: "s1", + Type: "decision", + Title: fmt.Sprintf("old observation %d", i), + Content: fmt.Sprintf("old content %d", i), + Project: "engram", + Scope: "project", + }) + if err != nil { + t.Fatalf("add obs %d: %v", i, err) + } + ids = append(ids, id) + } + + newID, err := s.Consolidate(ids, AddObservationParams{ + SessionID: "s1", + Type: "decision", + Title: "consolidated observation", + Content: "merged content from 3 sources", + Project: "engram", + Scope: "project", + }) + if err != nil { + t.Fatalf("consolidate: %v", err) + } + + if newID == 0 { + t.Fatalf("expected non-zero new observation ID") + } + + // Verify source observations are soft-deleted + for _, id := range ids { + var deletedAt *string + err := s.db.QueryRow("SELECT deleted_at FROM observations WHERE id = ?", id).Scan(&deletedAt) + if err != nil { + t.Fatalf("query deleted_at for id %d: %v", id, err) + } + if deletedAt == nil { + t.Fatalf("expected obs %d to be soft-deleted, but deleted_at is NULL", id) + } + } + + // Verify new observation exists and is not deleted + newObs, err := s.GetObservation(newID) + if err != nil { + t.Fatalf("get new obs: %v", err) + } + if newObs.Title != "consolidated observation" { + t.Fatalf("expected title 'consolidated observation', got %q", newObs.Title) + } + if newObs.SyncID == "" { + t.Fatalf("expected new obs to have a sync_id") + } +} + +func TestConsolidateIsAtomic(t *testing.T) { + s := newTestStore(t) + + if err := s.CreateSession("s1", "engram", "/tmp/engram"); err != nil { + t.Fatalf("create session: %v", err) + } + + id1, err := s.AddObservation(AddObservationParams{ + SessionID: "s1", + Type: "decision", + Title: "should survive rollback", + Content: "this should not be deleted if insert fails", + Project: "engram", + Scope: "project", + }) + if err != nil { + t.Fatalf("add obs: %v", err) + } + + // Empty session_id should cause the INSERT to fail (NOT NULL constraint) + _, err = s.Consolidate([]int64{id1}, AddObservationParams{ + SessionID: "", // this should cause failure + Type: "decision", + Title: "should fail", + Content: "should fail", + Project: "engram", + Scope: "project", + }) + if err == nil { + t.Fatalf("expected error when consolidating with empty session_id") + } + + // Original observation should still be active (rollback) + var deletedAt *string + qErr := s.db.QueryRow("SELECT deleted_at FROM observations WHERE id = ?", id1).Scan(&deletedAt) + if qErr != nil { + t.Fatalf("query deleted_at: %v", qErr) + } + if deletedAt != nil { + t.Fatalf("expected obs %d to NOT be soft-deleted after rollback, but deleted_at = %v", id1, *deletedAt) + } +} + +func TestConsolidateEnqueuesSyncMutations(t *testing.T) { + s := newTestStore(t) + + if err := s.CreateSession("s1", "engram", "/tmp/engram"); err != nil { + t.Fatalf("create session: %v", err) + } + + var ids []int64 + for i := 0; i < 3; i++ { + id, err := s.AddObservation(AddObservationParams{ + SessionID: "s1", + Type: "decision", + Title: fmt.Sprintf("obs for sync test %d", i), + Content: fmt.Sprintf("content %d", i), + Project: "engram", + Scope: "project", + }) + if err != nil { + t.Fatalf("add obs %d: %v", i, err) + } + ids = append(ids, id) + } + + // Clear existing sync_mutations from AddObservation calls + if _, err := s.db.Exec("DELETE FROM sync_mutations"); err != nil { + t.Fatalf("clear sync_mutations: %v", err) + } + + _, err := s.Consolidate(ids, AddObservationParams{ + SessionID: "s1", + Type: "decision", + Title: "consolidated", + Content: "merged", + Project: "engram", + Scope: "project", + }) + if err != nil { + t.Fatalf("consolidate: %v", err) + } + + // Should have 3 delete mutations + 1 upsert mutation = 4 total + var deleteCount, upsertCount int + if err := s.db.QueryRow("SELECT COUNT(*) FROM sync_mutations WHERE op = 'delete'").Scan(&deleteCount); err != nil { + t.Fatalf("count deletes: %v", err) + } + if err := s.db.QueryRow("SELECT COUNT(*) FROM sync_mutations WHERE op = 'upsert'").Scan(&upsertCount); err != nil { + t.Fatalf("count upserts: %v", err) + } + + if deleteCount != 3 { + t.Fatalf("expected 3 delete sync mutations, got %d", deleteCount) + } + if upsertCount != 1 { + t.Fatalf("expected 1 upsert sync mutation, got %d", upsertCount) + } +} + +func TestConsolidateIgnoresNonExistentIDs(t *testing.T) { + s := newTestStore(t) + + if err := s.CreateSession("s1", "engram", "/tmp/engram"); err != nil { + t.Fatalf("create session: %v", err) + } + + // Consolidate with non-existent IDs — should not error, new obs should still be created + newID, err := s.Consolidate([]int64{999999}, AddObservationParams{ + SessionID: "s1", + Type: "decision", + Title: "consolidated from nothing", + Content: "this should still be created", + Project: "engram", + Scope: "project", + }) + if err != nil { + t.Fatalf("consolidate with non-existent IDs should not error: %v", err) + } + + if newID == 0 { + t.Fatalf("expected non-zero new observation ID even with non-existent source IDs") + } + + newObs, err := s.GetObservation(newID) + if err != nil { + t.Fatalf("get new obs: %v", err) + } + if newObs.Title != "consolidated from nothing" { + t.Fatalf("expected title 'consolidated from nothing', got %q", newObs.Title) + } +}