-
Notifications
You must be signed in to change notification settings - Fork 332
feat: add atomic memory consolidation (Store.Consolidate + MCP mem_consolidate) #246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2449,6 +2449,84 @@ func (s *Store) DeleteObservation(id int64, hardDelete bool) error { | |
| }) | ||
| } | ||
|
|
||
| // Consolidate atomically soft-deletes source observations and creates a new | ||
| // consolidated observation in a single transaction. If the INSERT fails, all | ||
| // soft-deletes are rolled back. Non-existent IDs are silently ignored. | ||
| func (s *Store) Consolidate(idsToDelete []int64, p AddObservationParams) (int64, error) { | ||
| p.Project, _ = NormalizeProject(p.Project) | ||
| title := stripPrivateTags(p.Title) | ||
| content := stripPrivateTags(p.Content) | ||
| if len(content) > s.cfg.MaxObservationLength { | ||
| content = content[:s.cfg.MaxObservationLength] + "... [truncated]" | ||
| } | ||
| scope := normalizeScope(p.Scope) | ||
| normHash := hashNormalized(content) | ||
| topicKey := normalizeTopicKey(p.TopicKey) | ||
|
|
||
| var observationID int64 | ||
| err := s.withTx(func(tx *sql.Tx) error { | ||
| // Phase 1: Soft-delete sources (ignore non-existent) | ||
| for _, id := range idsToDelete { | ||
| obs, err := s.getObservationTx(tx, id) | ||
| if err == sql.ErrNoRows { | ||
| continue // silently ignore non-existent or already-deleted | ||
| } | ||
| if err != nil { | ||
|
Comment on lines
+2468
to
+2474
|
||
| return err | ||
| } | ||
|
|
||
| deletedAt := Now() | ||
| if _, err := s.execHook(tx, | ||
| `UPDATE observations | ||
| SET deleted_at = datetime('now'), | ||
| updated_at = datetime('now') | ||
| WHERE id = ? AND deleted_at IS NULL`, | ||
| id, | ||
| ); err != nil { | ||
| return err | ||
| } | ||
| if err := tx.QueryRow(`SELECT deleted_at FROM observations WHERE id = ?`, id).Scan(&deletedAt); err != nil { | ||
| return err | ||
| } | ||
| if err := s.enqueueSyncMutationTx(tx, SyncEntityObservation, obs.SyncID, SyncOpDelete, syncObservationPayload{ | ||
| SyncID: obs.SyncID, | ||
| SessionID: obs.SessionID, | ||
| Project: obs.Project, | ||
| Deleted: true, | ||
| DeletedAt: &deletedAt, | ||
| }); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // Phase 2: Insert new consolidated observation | ||
| syncID := newSyncID("obs") | ||
| res, err := s.execHook(tx, | ||
| `INSERT INTO observations (sync_id, session_id, type, title, content, tool_name, project, scope, topic_key, normalized_hash, revision_count, duplicate_count, last_seen_at, updated_at) | ||
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 1, datetime('now'), datetime('now'))`, | ||
| syncID, p.SessionID, p.Type, title, content, | ||
| nullableString(p.ToolName), nullableString(p.Project), scope, nullableString(topicKey), normHash, | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| observationID, err = res.LastInsertId() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| obs, err := s.getObservationTx(tx, observationID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return s.enqueueSyncMutationTx(tx, SyncEntityObservation, obs.SyncID, SyncOpUpsert, observationPayloadFromObservation(obs)) | ||
| }) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| return observationID, nil | ||
| } | ||
|
|
||
|
|
||
| // ─── Timeline ──────────────────────────────────────────────────────────────── | ||
| // | ||
| // Timeline provides chronological context around a specific observation. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ids_to_deleteparsing silently truncatesfloat64values and ignoresjson.Numberconversion errors, which can lead to surprising behavior for non-integer inputs (e.g.,1.9becomes1) and makes it hard to debug bad requests. Since this tool is destructive, consider validating that every element is a positive integer and returning a clear error if any element is invalid/out of range, instead of best-effort parsing.