Skip to content

feat(composio): migrate Notion provider to memory_tree pipeline (#2885)#2887

Merged
senamakel merged 4 commits into
tinyhumansai:mainfrom
justinhsu1477:feat/notion-provider-memory-tree-migration
May 29, 2026
Merged

feat(composio): migrate Notion provider to memory_tree pipeline (#2885)#2887
senamakel merged 4 commits into
tinyhumansai:mainfrom
justinhsu1477:feat/notion-provider-memory-tree-migration

Conversation

@justinhsu1477
Copy link
Copy Markdown
Contributor

@justinhsu1477 justinhsu1477 commented May 29, 2026

First of four migrations from #2885. Notion sync previously wrote to the legacy UnifiedMemory backend (memory_docs); every modern retrieval surface reads from memory_tree, so Notion content was invisible to memory.search, tree.read_chunk, tree.browse, the agent's recall path, summary trees, and the MCP tree.* tools.

Same architectural pattern as the vault sync silent failure fixed in #2720 (#2705) — just at the Composio provider layer instead of the file walker layer.

Migration shape

Before After
Call site provider.rs::syncpersist_single_item provider.rs::syncingest::ingest_page_into_memory_tree
Memory path MemoryClient::store_skill_syncUnifiedMemory::upsert_documentmemory_docs ingest_pipeline::ingest_documentmem_tree_chunks + mem_tree_ingested_sources
Source id composio-notion-page-{page_id} (UnifiedMemory document_id) notion:{connection_id}:{page_id} (stable, dedup-able)
Visibility to retrieval Nothing Every memory.search / tree.* / MCP surface

Mirrors the canonical Slack / Gmail layout: a new providers/notion/ingest.rs owns the page → memory-tree translation. The existing dedup / cursor / budget state-machine (SyncState::synced_ids keyed by {page_id}@{edited_time}) is unchanged.

Three design choices worth flagging

  1. Per-page source-id scope — one source per (connection_id, page_id). Page is the natural Notion grouping, keeps SourceKind::Document semantics, and parallels Gmail per-message / Slack per-channel sourcing. Distinct connections produce distinct source_ids so the same Notion page seen by two connected accounts doesn't cross-contaminate.

  2. Re-ingest-on-edit cleanup — Notion pages mutate, but the pipeline's already_ingested gate is content-blind. The new module calls delete_chunks_by_source before each re-ingest so the new revision actually lands. Same pattern as the vault sync content-update path in fix(vault): sync writes to memory_tree, not legacy UnifiedMemory (#2705) #2720.

  3. Full-page JSON in the body# {title}\n\n followed by the pretty-printed payload in a fenced code block. Notion pages don't have a natural single-string canonical body the way Slack messages do, so the chunk content retains enough Notion-specific signal (properties, URL, excerpt) that retrieval can match on more than just the title.

Tests

27/27 pass in memory_sync::composio::providers::notion::* (was 22, +5 new in ingest::tests):

  • ingest_page_writes_to_memory_treethe feat(memory-sync): migrate Notion/ClickUp/Linear/GitHub Composio providers off UnifiedMemory #2885 regression. Asserts count_chunks goes up + is_source_ingested returns true for the expected notion:{conn}:{page} source_id. Mirrors the sync_writes_to_memory_tree regression in vault::sync from fix(vault): sync writes to memory_tree, not legacy UnifiedMemory (#2705) #2720.

  • notion_source_id_is_stable_and_namespaced — pins the format the delete_chunks_by_source cleanup path relies on; distinct connections + distinct page ids must produce distinct source_ids.

  • parse_edited_time_handles_valid_and_invalid_inputs — ISO 8601 / RFC 3339 parsing with safe fallback to Utc::now() instead of failing ingest.

  • render_page_body_includes_title_header_and_pretty_json — pins the chunked-body shape so future "just keep the title" simplifications can't silently strip Notion-specific signal.

  • re_ingesting_edited_page_replaces_prior_chunks — exercises the delete-first guard; edited page must replace prior chunks (no silent already_ingested no-op, no append-duplicate).

  • cargo test --lib memory_sync::composio::providers::notion — 27/27 pass.

  • cargo check --lib — clean.

  • cargo fmt --check — clean.

  • cargo test --tests --no-run — clean (integration-test target compiles, lesson from feat(mcp-registry): InstalledServer HTTP-remote transport #2603).

PR sequence (from #2885)

Tracked separately in #2885; not action items for this PR.

Out of scope

Refs #2885, #2720, #2705, #2585.

Summary by CodeRabbit

  • New Features

    • Notion pages are ingested into the memory system with consistent tracking and default tags.
    • Edited Notion pages are updated in memory without creating duplicate entries.
  • Bug Fixes / Reliability

    • Sync now preserves the cursor when ingest failures occur so failed ranges are retried; failures are logged.
    • More robust handling of edited timestamps with a safe fallback for malformed or missing values.
  • Tests

    • Unit and regression tests added to validate ingest stability and re-ingest behavior.

Review Change Stack

…humansai#2885)

Closes the first of four migrations listed in tinyhumansai#2885. Notion's sync path
previously routed each page through `persist_single_item` →
`MemoryClient::store_skill_sync` → `UnifiedMemory::upsert_document` →
the legacy `memory_docs` table. Every modern retrieval surface —
`memory.search`, `tree.read_chunk`, `tree.browse`, the agent's recall
path, summary trees, the MCP `tree.top_entities` tool — reads from
`mem_tree_chunks` + `mem_tree_ingested_sources`, so Notion content was
invisible to all of them. Same architectural pattern as the vault sync
silent failure fixed in tinyhumansai#2720 (tinyhumansai#2705).

## Migration shape

Mirrors the canonical Slack / Gmail layout: a new dedicated
`providers/notion/ingest.rs` owns the page → memory-tree translation
and drives `memory::ingest_pipeline::ingest_document` with a stable
`source_id = "notion:{connection_id}:{page_id}"`. The provider's
`sync()` calls `ingest_page_into_memory_tree` in place of
`persist_single_item`; the existing dedup / cursor / budget
state-machine (`SyncState::synced_ids` keyed by
`{page_id}@{edited_time}`) is unchanged.

Three design choices worth flagging:

1. **Per-page source-id scope** — one source per `(connection_id,
   page_id)`. Page is the natural Notion grouping ("one page = one
   document"), keeps `SourceKind::Document` semantics, and parallels
   the Gmail per-message / Slack per-channel sourcing in the same
   subsystem. Distinct connections produce distinct source_ids so the
   same Notion page seen by two different connected accounts doesn't
   cross-contaminate.

2. **Re-ingest-on-edit cleanup** — Notion pages mutate, and the
   pipeline's `already_ingested(SourceKind::Document, source_id)`
   gate is content-blind. The new module calls
   `delete_chunks_by_source` before each re-ingest so the new revision
   actually lands. Same pattern as the vault sync content-update path
   in tinyhumansai#2720.

3. **Full-page JSON in the body** — Notion pages don't have a natural
   single-string canonical body the way Slack messages do, so the
   body is `# {title}\n\n```json\n{pretty}\n```\n`. Chunk content
   retains enough Notion-specific signal (properties, URL, excerpt)
   that retrieval can match on more than just the title.

## Tests

5 new tests in `notion::ingest::tests` — 27/27 total in the Notion
module pass. The key one is `ingest_page_writes_to_memory_tree`, the
tinyhumansai#2885 regression: creates a Notion page payload, ingests it, asserts
`count_chunks` went up AND `is_source_ingested(SourceKind::Document,
"notion:conn-test:page-phoenix")` returns true. Mirrors the
`sync_writes_to_memory_tree` regression in `vault::sync`.

Plus:
- `notion_source_id_is_stable_and_namespaced` — pins the format the
  `delete_chunks_by_source` cleanup path relies on.
- `parse_edited_time_handles_valid_and_invalid_inputs` — ISO 8601
  parsing + fallback to `Utc::now()`.
- `render_page_body_includes_title_header_and_pretty_json` — body
  shape so future "just keep the title" simplifications can't
  silently strip Notion-specific signal.
- `re_ingesting_edited_page_replaces_prior_chunks` — exercises the
  delete-first guard; an edited page must replace prior chunks, not
  silently no-op via `already_ingested` and not append-duplicate.

`cargo check --lib` + `cargo fmt --check` + `cargo test --tests
--no-run` all clean.

## Out of scope

- The other three providers in tinyhumansai#2885 (ClickUp / Linear / GitHub) ship
  as follow-up PRs that reuse this pattern. Splitting per-provider
  keeps the review surface tight and lets each PR carry its own
  regression test.
- Removing `persist_single_item` / `store_skill_sync` entirely is
  blocked on tinyhumansai#2585's broader UnifiedMemory removal — they stay in
  place until then for any non-Composio caller.

Refs tinyhumansai#2885, tinyhumansai#2720, tinyhumansai#2705, tinyhumansai#2585.
@justinhsu1477 justinhsu1477 requested a review from a team May 29, 2026 02:09
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 29, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 24cb222e-f627-4d9a-995e-73463221941c

📥 Commits

Reviewing files that changed from the base of the PR and between 0c15fd2 and 2311861.

📒 Files selected for processing (1)
  • src/openhuman/memory_sync/composio/providers/notion/ingest.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/openhuman/memory_sync/composio/providers/notion/ingest.rs

📝 Walkthrough

Walkthrough

Adds a Notion ingest module that computes stable source IDs, renders page payloads, parses edited timestamps with fallback, deletes prior chunks on re-ingest, and routes Notion provider page sync through this ingest pipeline with tests covering persistence and re-ingest behavior.

Changes

Notion ingest integration

Layer / File(s) Summary
Module declaration and provider imports
src/openhuman/memory_sync/composio/providers/notion/mod.rs, src/openhuman/memory_sync/composio/providers/notion/provider.rs
Adds mod ingest; and updates provider imports to use ingest_page_into_memory_tree instead of the legacy persist_single_item path.
Ingest helpers and constants
src/openhuman/memory_sync/composio/providers/notion/ingest.rs
Adds NOTION_PLATFORM, DEFAULT_TAGS, notion_source_id, deterministic render_page_body, and parse_edited_time with Utc::now() fallback.
Notion ingest pipeline implementation
src/openhuman/memory_sync/composio/providers/notion/ingest.rs
Implements ingest_page_into_memory_tree: computes source_id, conditionally deletes prior chunks on re-ingest (spawn_blocking), builds DocumentInput, calls ingest_pipeline::ingest_document, logs already_ingested, and returns chunk counts or errors annotated with source_id.
Ingest module tests and regression coverage
src/openhuman/memory_sync/composio/providers/notion/ingest.rs
Unit tests for notion_source_id, parse_edited_time, and render_page_body. Tokio integration tests verify memory-tree chunk/source table writes and that re-ingesting an edited page replaces prior chunks (stable counts).
Provider sync behavior changes
src/openhuman/memory_sync/composio/providers/notion/provider.rs
Tracks had_ingest_failures during a sync pass; per-page loop calls the new ingest path, marks sync keys on success, logs failures and holds the cursor for re-fetch when any ingest failed.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

  • #2885: Implements migration of Notion persistence to the memory_tree ingest pipeline and adds re-ingest cleanup (matches issue objective).

Suggested reviewers

  • graycyrus
  • sanil-23
  • M3gA-Mind

🐰 I hop through pages and tag them neat,
Stable IDs keep every memory complete,
Old chunks are cleared when edits arrive,
The memory tree hums and stays alive,
🍃 A little rabbit helps each datum thrive.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective of the PR: migrating the Notion provider to use the memory_tree pipeline instead of the legacy UnifiedMemory path. It is concise, specific, and clearly conveys the primary change.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot added rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure. memory Memory store, memory tree, recall, summarization, and embeddings in src/openhuman/memory/. labels May 29, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/openhuman/memory_sync/composio/providers/notion/ingest.rs (1)

110-131: ⚡ Quick win

Gate Notion re-ingest cleanup (delete_chunks_by_source) to avoid scanning all document chunks on fresh pages

delete_chunks_by_source queries mem_tree_chunks with only source_kind = ?1 and then Rust-side filters to source_id, so it scans every document chunk each time it runs. ingest_page_into_memory_tree calls it unconditionally for every admitted Notion page, including first-time ingests.

Wrap the delete behind is_source_ingested (indexed via mem_tree_ingested_sources(source_kind, source_id) PK) so first-time ingests return 0 immediately, while edited/re-ingest paths still perform cleanup.

♻️ Proposed gate (illustrative)
+use crate::openhuman::memory_store::chunks::store::is_source_ingested;
+
     let cfg_for_blocking = config.clone();
     let source_for_blocking = source_id.clone();
     let removed = tokio::task::spawn_blocking(move || {
-        delete_chunks_by_source(
-            &cfg_for_blocking,
-            SourceKind::Document,
-            &source_for_blocking,
-        )
+        // Only pay the scan cost when the source is actually present.
+        if is_source_ingested(&cfg_for_blocking, SourceKind::Document, &source_for_blocking)? {
+            delete_chunks_by_source(&cfg_for_blocking, SourceKind::Document, &source_for_blocking)
+        } else {
+            Ok(0)
+        }
     })
     .await
     .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??;
     if removed > 0 {
         log::debug!(
             "[composio:notion] ingest: re-ingest cleanup connection_id={} page_id={} removed_chunks={}",
             connection_id,
             page_id,
             removed
         );
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/memory_sync/composio/providers/notion/ingest.rs` around lines
110 - 131, Before calling delete_chunks_by_source in
ingest_page_into_memory_tree, first check whether the source is already recorded
as ingested by using is_source_ingested (which queries mem_tree_ingested_sources
by (source_kind, source_id)); only spawn_blocking and call
delete_chunks_by_source when is_source_ingested returns true (otherwise skip and
treat removed as 0). Preserve the existing cloning of config/source for the
blocking task and keep the debug log path when removed > 0; ensure any error
mapping/.await behavior remains the same when you conditionally bypass the
spawn_blocking call.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/openhuman/memory_sync/composio/providers/notion/ingest.rs`:
- Around line 110-131: Before calling delete_chunks_by_source in
ingest_page_into_memory_tree, first check whether the source is already recorded
as ingested by using is_source_ingested (which queries mem_tree_ingested_sources
by (source_kind, source_id)); only spawn_blocking and call
delete_chunks_by_source when is_source_ingested returns true (otherwise skip and
treat removed as 0). Preserve the existing cloning of config/source for the
blocking task and keep the debug log path when removed > 0; ensure any error
mapping/.await behavior remains the same when you conditionally bypass the
spawn_blocking call.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 26f8300b-3171-4bf5-91bd-71ea081ae9c5

📥 Commits

Reviewing files that changed from the base of the PR and between b1b919a and 8492dfb.

📒 Files selected for processing (3)
  • src/openhuman/memory_sync/composio/providers/notion/ingest.rs
  • src/openhuman/memory_sync/composio/providers/notion/mod.rs
  • src/openhuman/memory_sync/composio/providers/notion/provider.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 29, 2026
Copy link
Copy Markdown
Contributor

@graycyrus graycyrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justinhsu1477 hey! the code looks good to me, but CI has a failing check (PR Submission Checklist) and several jobs still pending — once everything is green, i'll come back and approve this. let me know if you need any help.

Walkthrough: adds providers/notion/ingest.rs with ingest_page_into_memory_tree, swaps the call site in provider.rs from persist_single_itemMemoryClient::store_skill_syncmemory_docs to the canonical memory-tree pipeline, and ships 5 targeted tests. Mirrors the Slack/Gmail ingest module shape exactly.

The implementation is clean throughout. delete_chunks_by_source is correctly hopped through spawn_blocking. The double-? on the task join surfaces both the join panic and the inner Result cleanly. No .unwrap() in production paths. The already_ingested log comment after the delete-first guard is accurate — you'd only hit that on a fresh page where prior chunks were already absent, not a real short-circuit. The 5 new tests cover the regression, source_id stability, timestamp parsing, body shape, and the delete-replace round-trip — all the right things to pin.

Change summary:

File Change
notion/ingest.rs New module: notion_source_id, render_page_body, parse_edited_time, ingest_page_into_memory_tree, 5 tests
notion/mod.rs Exposes the new ingest submodule
notion/provider.rs Replaces persist_single_item call with ingest_page_into_memory_tree; removes unused memory binding

AI risk assessment: Breaking risk low (self-contained to Notion provider, persist_single_item stays for other callers, source_id format change is internal dedup-key only). Security risk zero (no auth surface, no user-facing input, no secrets). Safe to merge once CI is green.

…it nit)

Gate `delete_chunks_by_source` behind `is_source_ingested`. The delete
path uses a `source_kind = ?1` scan + Rust-side source-id filter
(`store::delete_chunks_by_source_filter`), so a first-time ingest of a
never-seen page would scan every Document-kind chunk just to find zero
matches. `is_source_ingested` is an indexed PK lookup against
`mem_tree_ingested_sources`, so the common fresh-page case becomes one
cheap `COUNT(*)` and we only pay the scan cost on actual re-ingests of
edited pages.

Same `spawn_blocking` hop, single combined closure — the gate runs
inside the existing blocking task. The 5 ingest tests still pass:
the regression test exercises the false → no-delete path (fresh page);
the re-ingest test exercises the true → delete path (edited page).

No behaviour change on hot paths — `is_source_ingested` is the same
table the pipeline's `claim_source_ingest_tx` writes to, so it sees
exactly the rows the delete needs to clean up.

Refs CodeRabbit review on tinyhumansai#2887.
@coderabbitai coderabbitai Bot added the working A PR that is being worked on by the team. label May 29, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes May 29, 2026
justinhsu1477 added a commit to justinhsu1477/openhuman that referenced this pull request May 29, 2026
Gate `delete_chunks_by_source` behind `is_source_ingested`. The delete
path uses a `source_kind = ?1` scan + Rust-side source-id filter
(`store::delete_chunks_by_source_filter`), so a first-time ingest of a
never-seen issue would scan every Document-kind chunk just to find zero
matches. `is_source_ingested` is an indexed PK lookup against
`mem_tree_ingested_sources`, so the common fresh-issue case becomes one
cheap `COUNT(*)` and we only pay the scan cost on actual re-ingests of
edited issues.

Same `spawn_blocking` hop, single combined closure — the gate runs
inside the existing blocking task. The 6 ingest tests still pass:
the regression test exercises the false → no-delete path (fresh issue);
the re-ingest test exercises the true → delete path (edited issue).

Same fix carried forward from the parallel Notion PR (tinyhumansai#2887), where
CodeRabbit caught the issue first. Linear and ClickUp PRs will land
with the gate baked in from the start.

Refs tinyhumansai#2885.
@justinhsu1477
Copy link
Copy Markdown
Contributor Author

Two fixes pushed in dd3943d6:

Item Fix Where
CodeRabbit nit (gate delete_chunks_by_source behind is_source_ingested) Applied — the fresh-page case now converts to one PK-indexed COUNT(*) against mem_tree_ingested_sources instead of a full source_kind = Document scan with Rust-side filter. Both calls share the same spawn_blocking hop. notion/ingest.rs lines 113-141
PR Submission Checklist failing — 3/8 "unchecked" Edited the PR body. The "PR sequence (from #2885)" section used - [ ] lines as a sequence tracker for the follow-up PRs (GitHub / Linear / ClickUp); the checklist linter read them as action items for this PR. Switched to plain bullets under a "not action items" note. PR description

Same is_source_ingested gate carried over to the parallel GitHub PR (#2889) so the pattern stays consistent — 6/6 tests still green there too.

The 5 ingest tests on this PR still pass (cargo test --lib memory_sync::composio::providers::notion::ingest). The regression test exercises the new false → no-delete path (fresh page); the re-ingest test exercises the true → delete path (edited page).

@graycyrus — checklist should clear on the next push; waiting for CI to re-run.

…chain

Two related correctness fixes flagged by CodeRabbit on the parallel
GitHub PR (tinyhumansai#2889) — same shape exists here in Notion, so applying the
fix here proactively rather than waiting for a second round.

## 1. Cursor advances on ingest failure → data-loss window

`provider.rs` updated `newest_edited_time` for every result before
attempting ingest, then advanced the persistent cursor unconditionally
at the end of the loop. Items whose ingest call returned `Err` were
logged + skipped (no `mark_synced`) — but the next sync queried
`last_edited_time > {cursor}`, so the failed item was never re-fetched.

Pre-tinyhumansai#2885 this was just "stale revision": the legacy
`persist_single_item` path wasn't delete-first, so a failed write left
old chunks alone. Post-tinyhumansai#2885 it's worse — `ingest_page_into_memory_tree`
calls `delete_chunks_by_source` *before* the (failing) write, so an
*edited* page that fails to re-ingest is left with neither old nor new
chunks in `mem_tree_chunks` until its next edit.

Fix: track `had_ingest_failures: bool`, set on every `Err`, and gate the
`advance_cursor` at the bottom of the sync loop. Already-synced items
are cheaply skipped via `state.is_synced` on the re-fetch next pass, so
the cost of holding is just one extra search-API page on the failure
range.

A retry-only-failed-items fix (separate `newest_successful_updated`
cursor) would be tighter but adds state and edge cases; the simple gate
matches CodeRabbit's exact suggestion and is provably safe (cursor only
ever advances when every item in the pass succeeded).

## 2. `{err}` drops anyhow context chain

`ingest.rs` wrapped the `ingest_pipeline::ingest_document` failure as
`anyhow!("ingest_document failed for {source_id}: {err}")`, where
`{err}` is Display-only and strips the chain. `provider.rs` then logs
the wrapped error with `tracing::warn!(error = %e)` (also Display), so
the underlying DB / embedding / persist cause was lost.

Fix: swap to `{err:#}` (alternate formatter) so the chain is baked into
the wrapping anyhow's Display impl. Matches the convention `provider.rs`
already uses on its own `ctx.execute(...).map_err(|e| ... "{e:#}")` call
sites (lines 92, 207).

## Tests

27/27 still pass in `memory_sync::composio::providers::notion::*`.
The provider's `sync()` is hard to unit-test without mocking
`ComposioContext` heavily — the invariant is pinned by the rustdoc on
`had_ingest_failures` and the inline gate comment. The ingest-side
tests (regression + re-ingest) cover the ingest function's contract;
the cursor logic lives entirely in `provider.rs`.

Refs CodeRabbit review on tinyhumansai#2889.
@coderabbitai coderabbitai Bot added the feature Net-new user-facing capability or product behavior. label May 29, 2026
justinhsu1477 added a commit to justinhsu1477/openhuman that referenced this pull request May 29, 2026
…chain

CodeRabbit caught two correctness issues on this PR. Same shape exists
on the parallel Notion PR (tinyhumansai#2887) — fix applied there in parallel.

## 1. Cursor advances on ingest failure → data-loss window

`provider.rs` updated `newest_updated` for every result before attempting
ingest, then advanced the persistent cursor unconditionally at the end
of the sync loop. Items whose ingest call returned `Err` were logged
+ skipped (no `mark_synced`) — but the next sync queried
`updated:>{cursor}`, so the failed item was never re-fetched.

Pre-tinyhumansai#2885 this was just "stale revision": the legacy `persist_single_item`
path wasn't delete-first, so a failed write left old chunks alone.
Post-tinyhumansai#2885 it's worse — `ingest_issue_into_memory_tree` calls
`delete_chunks_by_source` *before* the (failing) write, so an *edited*
issue that fails to re-ingest is left with neither old nor new chunks
in `mem_tree_chunks` until its next edit.

Fix: track `had_ingest_failures: bool`, set on every `Err`, and gate
the `advance_cursor` at the bottom of the sync loop. Already-synced
items are cheaply skipped via `state.is_synced` on the re-fetch next
pass, so the cost of holding is just one extra search-API page on the
failure range. `set_last_sync_at_ms` still fires — it's a heartbeat,
not a fetch-window boundary.

A retry-only-failed-items fix (separate `newest_successful_updated`
cursor) would be tighter but adds state and edge cases; the simple
gate matches CodeRabbit's exact suggestion and is provably safe
(cursor only ever advances when every item in the pass succeeded).

## 2. `{err}` drops anyhow context chain

`ingest.rs` wrapped the `ingest_pipeline::ingest_document` failure as
`anyhow!("ingest_document failed for {source_id}: {err}")`, where
`{err}` is Display-only and strips the chain. `provider.rs` then logs
the wrapped error with `tracing::warn!(error = %e)` (also Display),
so the underlying DB / embedding / persist cause was lost.

Fix: swap to `{err:#}` (alternate formatter) so the chain is baked
into the wrapping anyhow's Display impl. Matches the convention
`provider.rs` already uses on its own `ctx.execute(...).map_err(...)`
sites (lines 90, 234, 406).

## Tests

48/48 still pass in `memory_sync::composio::providers::github::*`.
The provider's `sync()` is hard to unit-test without mocking
`ComposioContext` heavily — the invariant is pinned by the rustdoc on
`had_ingest_failures` and the inline gate comment. The ingest-side
tests (regression + re-ingest) cover the ingest function's contract;
the cursor logic lives entirely in `provider.rs`.

Refs CodeRabbit review on tinyhumansai#2889, tinyhumansai#2885.
@justinhsu1477
Copy link
Copy Markdown
Contributor Author

Pushed 0c15fd27 — applying both correctness fixes CodeRabbit caught on the parallel #2889 (same shape exists here, fixing proactively):

Fix Where Mechanism
Hold cursor on ingest failure notion/provider.rs New `had_ingest_failures` flag; `advance_cursor` now gated. Without this, an edited page that fails to re-ingest is left with neither old nor new chunks in `mem_tree_chunks` until its next edit — the delete-first re-ingest path makes the prior "stale revision" failure mode into actual data loss.
Preserve anyhow chain on ingest_document Err `notion/ingest.rs` `{err}` → `{err:#}` so the wrapping anyhow's Display impl carries the chain. Without it, `tracing::warn!(error = %e)` on the provider side dropped underlying DB / embedding causes.

Tests: 27/27 still pass in memory_sync::composio::providers::notion::*. The provider's sync() is hard to unit-test (would need to mock ComposioContext heavily) so the invariant is pinned by rustdoc + inline comments rather than a new test.

@graycyrus — should be the last blocker before re-approval.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/openhuman/memory_sync/composio/providers/notion/provider.rs (1)

338-352: Cursor hold is correct for transient failures, but a permanently-failing page never advances and drains the daily budget.

The "cost of holding is minimal" reasoning holds for transient errors. For a persistently failing page (e.g. a payload the ingest pipeline can never process), the cursor is pinned indefinitely: every subsequent sync re-fetches the whole range from the held cursor — and each fetched page still calls state.record_requests(1) — so the daily budget gets burned each pass without ever making forward progress. Once budget_exhausted() trips, genuinely-new pages later in the pass are also blocked until the budget resets.

Consider bounding this so a single poison page can't stall the connection forever — e.g. tracking a per-sync_key failure count in SyncState and, after N attempts, marking it synced (dead-letter) and/or emitting a metric/alert so it surfaces operationally rather than silently looping.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/memory_sync/composio/providers/notion/provider.rs` around lines
338 - 352, The current cursor-hold logic (if !had_ingest_failures { if let
Some(newest_edited_time) { state.advance_cursor(&new_cursor) } }) correctly
retries transient errors but lets a permanently-failing page pin the cursor and
drain budget via repeated state.record_requests(1); update SyncState to track
per-sync_key failure counts (e.g., increment on ingest failure for the offending
page), and in the logic around had_ingest_failures/newest_edited_time implement
a threshold N: when a page for a given sync_key has failed >= N attempts mark it
as dead-lettered/synced (advance the cursor via state.advance_cursor or a new
state.dead_letter(&sync_key) helper), emit a metric/alert (including
connection_id and sync_key), and avoid counting further budget-consuming
record_requests for that page so the pipeline can make forward progress while
surfacing the poisoned page for ops to investigate.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/openhuman/memory_sync/composio/providers/notion/provider.rs`:
- Around line 338-352: The current cursor-hold logic (if !had_ingest_failures {
if let Some(newest_edited_time) { state.advance_cursor(&new_cursor) } })
correctly retries transient errors but lets a permanently-failing page pin the
cursor and drain budget via repeated state.record_requests(1); update SyncState
to track per-sync_key failure counts (e.g., increment on ingest failure for the
offending page), and in the logic around had_ingest_failures/newest_edited_time
implement a threshold N: when a page for a given sync_key has failed >= N
attempts mark it as dead-lettered/synced (advance the cursor via
state.advance_cursor or a new state.dead_letter(&sync_key) helper), emit a
metric/alert (including connection_id and sync_key), and avoid counting further
budget-consuming record_requests for that page so the pipeline can make forward
progress while surfacing the poisoned page for ops to investigate.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 22084c55-1576-4c39-9ed5-724252f9a822

📥 Commits

Reviewing files that changed from the base of the PR and between dd3943d and 0c15fd2.

📒 Files selected for processing (2)
  • src/openhuman/memory_sync/composio/providers/notion/ingest.rs
  • src/openhuman/memory_sync/composio/providers/notion/provider.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/openhuman/memory_sync/composio/providers/notion/ingest.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 29, 2026
justinhsu1477 added a commit to justinhsu1477/openhuman that referenced this pull request May 29, 2026
… ingest

Per @graycyrus review on tinyhumansai#2889: the two `log::debug!` calls in
`ingest_issue_into_memory_tree` used positional format strings, while
`provider.rs` in the same directory logs via `tracing::*` with
structured key=value fields. Positional log args are flattened to an
opaque message by the tracing-log bridge, so log-aggregation pipelines
that filter on span fields (connection_id, issue_id, …) lose them.

Swap both calls to `tracing::debug!` with named fields, matching the
provider.rs shape exactly:
- re-ingest cleanup: connection_id / issue_id / removed_chunks
- issue persisted: connection_id / issue_id / chunks_written / already_ingested

Scope note: the older ingest modules (slack, gmail, vault::sync) still
use `log::*` — those are pre-existing and out of scope here. The new
memory-tree ingest modules (this + notion in tinyhumansai#2887) now consistently
use tracing alongside their provider.rs. A follow-up sweep to unify the
legacy modules can be filed separately if desired.

Refs tinyhumansai#2889.
… ingest

Carries the @graycyrus review note from the parallel GitHub PR (tinyhumansai#2889)
back to Notion so the two new memory-tree ingest modules stay
consistent. The two `log::debug!` calls in `ingest_page_into_memory_tree`
used positional format strings; provider.rs in the same directory logs
via `tracing::*` with structured key=value fields, which log-aggregation
pipelines can filter on (the tracing-log bridge flattens positional
args to an opaque message and loses them).

Swap both to `tracing::debug!` with named fields matching provider.rs:
- re-ingest cleanup: connection_id / page_id / removed_chunks
- page persisted: connection_id / page_id / chunks_written / already_ingested

Scope note: the older ingest modules (slack, gmail, vault::sync) still
use `log::*` — pre-existing, out of scope. A follow-up sweep can unify
them separately if desired.

Refs tinyhumansai#2887, tinyhumansai#2889.
@senamakel senamakel merged commit 1d52548 into tinyhumansai:main May 29, 2026
33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature Net-new user-facing capability or product behavior. memory Memory store, memory tree, recall, summarization, and embeddings in src/openhuman/memory/. rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure. working A PR that is being worked on by the team.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants