From 8492dfbf4e14e648d55a0487d990499e743d7328 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 10:09:21 +0800 Subject: [PATCH 1/4] feat(composio): migrate Notion provider to memory_tree pipeline (#2885) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the first of four migrations listed in #2885. Notion's sync path previously routed each page through `persist_single_item` → `MemoryClient::store_skill_sync` → `UnifiedMemory::upsert_document` → the legacy `memory_docs` table. Every modern retrieval surface — `memory.search`, `tree.read_chunk`, `tree.browse`, the agent's recall path, summary trees, the MCP `tree.top_entities` tool — reads from `mem_tree_chunks` + `mem_tree_ingested_sources`, so Notion content was invisible to all of them. Same architectural pattern as the vault sync silent failure fixed in #2720 (#2705). ## Migration shape Mirrors the canonical Slack / Gmail layout: a new dedicated `providers/notion/ingest.rs` owns the page → memory-tree translation and drives `memory::ingest_pipeline::ingest_document` with a stable `source_id = "notion:{connection_id}:{page_id}"`. The provider's `sync()` calls `ingest_page_into_memory_tree` in place of `persist_single_item`; the existing dedup / cursor / budget state-machine (`SyncState::synced_ids` keyed by `{page_id}@{edited_time}`) is unchanged. Three design choices worth flagging: 1. **Per-page source-id scope** — one source per `(connection_id, page_id)`. Page is the natural Notion grouping ("one page = one document"), keeps `SourceKind::Document` semantics, and parallels the Gmail per-message / Slack per-channel sourcing in the same subsystem. Distinct connections produce distinct source_ids so the same Notion page seen by two different connected accounts doesn't cross-contaminate. 2. **Re-ingest-on-edit cleanup** — Notion pages mutate, and the pipeline's `already_ingested(SourceKind::Document, source_id)` gate is content-blind. The new module calls `delete_chunks_by_source` before each re-ingest so the new revision actually lands. Same pattern as the vault sync content-update path in #2720. 3. **Full-page JSON in the body** — Notion pages don't have a natural single-string canonical body the way Slack messages do, so the body is `# {title}\n\n```json\n{pretty}\n```\n`. Chunk content retains enough Notion-specific signal (properties, URL, excerpt) that retrieval can match on more than just the title. ## Tests 5 new tests in `notion::ingest::tests` — 27/27 total in the Notion module pass. The key one is `ingest_page_writes_to_memory_tree`, the #2885 regression: creates a Notion page payload, ingests it, asserts `count_chunks` went up AND `is_source_ingested(SourceKind::Document, "notion:conn-test:page-phoenix")` returns true. Mirrors the `sync_writes_to_memory_tree` regression in `vault::sync`. Plus: - `notion_source_id_is_stable_and_namespaced` — pins the format the `delete_chunks_by_source` cleanup path relies on. - `parse_edited_time_handles_valid_and_invalid_inputs` — ISO 8601 parsing + fallback to `Utc::now()`. - `render_page_body_includes_title_header_and_pretty_json` — body shape so future "just keep the title" simplifications can't silently strip Notion-specific signal. - `re_ingesting_edited_page_replaces_prior_chunks` — exercises the delete-first guard; an edited page must replace prior chunks, not silently no-op via `already_ingested` and not append-duplicate. `cargo check --lib` + `cargo fmt --check` + `cargo test --tests --no-run` all clean. ## Out of scope - The other three providers in #2885 (ClickUp / Linear / GitHub) ship as follow-up PRs that reuse this pattern. Splitting per-provider keeps the review surface tight and lets each PR carry its own regression test. - Removing `persist_single_item` / `store_skill_sync` entirely is blocked on #2585's broader UnifiedMemory removal — they stay in place until then for any non-Composio caller. Refs #2885, #2720, #2705, #2585. --- .../composio/providers/notion/ingest.rs | 384 ++++++++++++++++++ .../composio/providers/notion/mod.rs | 1 + .../composio/providers/notion/provider.rs | 28 +- 3 files changed, 401 insertions(+), 12 deletions(-) create mode 100644 src/openhuman/memory_sync/composio/providers/notion/ingest.rs diff --git a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs new file mode 100644 index 0000000000..4ad302d2c4 --- /dev/null +++ b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs @@ -0,0 +1,384 @@ +//! Notion → memory tree ingest plumbing. +//! +//! Owns the conversion from a single Notion page payload (post-extracted +//! by [`super::sync`]) into a [`DocumentInput`] and drives +//! [`memory::ingest_pipeline::ingest_document`] for that page. +//! +//! Mirrors the canonical Slack/Gmail per-source ingest layout +//! ([`super::super::slack::ingest`] / [`super::super::gmail::ingest`]) +//! so retrieval surfaces (`memory.search`, `tree.read_chunk`, +//! `tree.browse`, the agent's recall path, summary trees) actually see +//! Notion content — pre-#2885 the provider wrote via +//! `MemoryClient::store_skill_sync` into the legacy `memory_docs` table, +//! invisible to the memory-tree retrieval stack. +//! +//! ## Source-id scope +//! +//! Source id is `notion:{connection_id}:{page_id}` — one source per +//! Notion page per connection. Page is the natural Notion grouping +//! ("one page = one document") so per-page ingest keeps the canonical +//! `SourceKind::Document` semantics and matches how the Gmail per-message +//! / Slack per-channel paths scope their sources. +//! +//! ## Re-ingest of edited pages +//! +//! Notion pages mutate (the cursor advances by `last_edited_time`). +//! Re-ingesting the same `(connection_id, page_id)` after the user edits +//! the page would short-circuit on the pipeline's `already_ingested` +//! gate — so the call site drops prior chunks for the same source_id +//! via `delete_chunks_by_source` *before* re-ingest, mirroring the +//! vault sync pattern in #2720. The provider's own +//! `SyncState::synced_ids` keyed by `{page_id}@{edited_time}` is the +//! authoritative "have we seen this revision?" check; this module only +//! runs when that says yes. +//! +//! ## Idempotency +//! +//! Chunk IDs are content-hashed inside the memory tree, so re-ingesting +//! a previously-seen page is an UPSERT on the same chunk row — no +//! duplicate chunks across syncs. + +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use serde_json::Value; + +use crate::openhuman::config::Config; +use crate::openhuman::memory::ingest_pipeline::{self, IngestResult}; +use crate::openhuman::memory_store::chunks::store::delete_chunks_by_source; +use crate::openhuman::memory_store::chunks::types::SourceKind; +use crate::openhuman::memory_sync::canonicalize::document::DocumentInput; + +/// Platform identifier embedded in the canonical document body header. +/// Matches the value `memory_tree::retrieval::source::PLATFORM_KINDS` +/// expects for Notion-sourced documents. +pub const NOTION_PLATFORM: &str = "notion"; + +/// Tags attached to every Notion-ingested chunk. Stable list — retrieval +/// callers filter on these. +pub const DEFAULT_TAGS: &[&str] = &["notion", "ingested"]; + +/// Build the memory-tree source_id for one Notion page in one connection. +/// +/// Stable across re-syncs of the same `(connection_id, page_id)` so the +/// pipeline's idempotency gate works correctly and the dedup-on-edit +/// path can map back to the prior chunks for cleanup before re-ingest. +pub(crate) fn notion_source_id(connection_id: &str, page_id: &str) -> String { + format!("notion:{connection_id}:{page_id}") +} + +/// Pretty-printed JSON body for one Notion page. We persist the *full* +/// Composio response payload (not just the title) so the chunked content +/// retains enough context for retrieval — Notion pages don't have a +/// natural single-string canonical body the way Slack messages do. +fn render_page_body(title: &str, page: &Value) -> String { + let pretty = serde_json::to_string_pretty(page).unwrap_or_else(|_| "{}".to_string()); + format!("# {title}\n\n```json\n{pretty}\n```\n") +} + +/// Parse a Notion `last_edited_time` (ISO 8601 / RFC 3339) into a +/// `DateTime`, falling back to `Utc::now()` on failure so the +/// pipeline still gets a valid timestamp. +fn parse_edited_time(raw: Option<&str>) -> DateTime { + raw.and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(Utc::now) +} + +/// Ingest one Notion page into the memory tree. +/// +/// Caller (the provider's `sync` loop) is responsible for the +/// edit-detection / dedup state-machine (`SyncState::synced_ids` keyed +/// by `{page_id}@{edited_time}`) — this function trusts that the call +/// only happens for items the caller wants to admit. +/// +/// On content updates of an already-ingested source_id, drops the prior +/// chunks first via `delete_chunks_by_source` so the pipeline's +/// `already_ingested` gate doesn't short-circuit the new content. This +/// mirrors the vault sync pattern in #2720. +/// +/// Returns the number of chunks the pipeline wrote. +pub async fn ingest_page_into_memory_tree( + config: &Config, + connection_id: &str, + page_id: &str, + title: &str, + edited_time: Option<&str>, + page: &Value, +) -> Result { + let source_id = notion_source_id(connection_id, page_id); + + // Re-sync of an edited page: drop prior chunks for the same source_id + // before re-ingest. `delete_chunks_by_source` is sync I/O so it has to + // hop through `spawn_blocking`. + let cfg_for_blocking = config.clone(); + let source_for_blocking = source_id.clone(); + let removed = tokio::task::spawn_blocking(move || { + delete_chunks_by_source( + &cfg_for_blocking, + SourceKind::Document, + &source_for_blocking, + ) + }) + .await + .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??; + if removed > 0 { + log::debug!( + "[composio:notion] ingest: re-ingest cleanup connection_id={} page_id={} removed_chunks={}", + connection_id, + page_id, + removed + ); + } + + let modified_at = parse_edited_time(edited_time); + let body = render_page_body(title, page); + let source_ref = Some(format!("notion://page/{page_id}")); + + let doc = DocumentInput { + provider: NOTION_PLATFORM.to_string(), + title: title.to_string(), + body, + modified_at, + source_ref, + }; + let tags: Vec = DEFAULT_TAGS.iter().map(|s| s.to_string()).collect(); + let owner = format!("notion:{connection_id}"); + + match ingest_pipeline::ingest_document(config, &source_id, &owner, tags, doc).await { + Ok(IngestResult { + chunks_written, + already_ingested, + .. + }) => { + // The delete-first guard above prevents `already_ingested` on + // the normal update path. Seeing it here means the prior + // chunks were already absent (fresh ingest into a primed + // memory_tree) — fine, just log at debug. + log::debug!( + "[composio:notion] ingest: page connection_id={} page_id={} chunks_written={} already_ingested={}", + connection_id, + page_id, + chunks_written, + already_ingested, + ); + Ok(chunks_written) + } + Err(err) => Err(anyhow::anyhow!( + "ingest_document failed for {source_id}: {err}" + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn test_config() -> (TempDir, Config) { + let tmp = TempDir::new().expect("tempdir"); + let mut cfg = Config::default(); + cfg.workspace_dir = tmp.path().to_path_buf(); + // Disable strict embedding so the pipeline accepts chunks without + // a live embedder (matches the + // `memory::sync_pipeline_e2e_test::test_config` shape). + cfg.memory_tree.embedding_endpoint = None; + cfg.memory_tree.embedding_model = None; + cfg.memory_tree.embedding_strict = false; + (tmp, cfg) + } + + fn sample_page(page_id: &str, edited_time: &str) -> Value { + json!({ + "id": page_id, + "object": "page", + "last_edited_time": edited_time, + "properties": { + "Name": { "title": [{ "plain_text": "Phoenix migration plan" }] } + }, + "url": format!("https://www.notion.so/{}", page_id.replace('-', "")), + "body_excerpt": "Phoenix ships Friday after staging review. Alice owns rollback, Bob on-call.", + }) + } + + /// `notion_source_id` is stable across calls and namespaces + /// `(connection_id, page_id)` distinctly. Pins the contract the + /// re-ingest cleanup path relies on (`delete_chunks_by_source` + /// against the same `source_id`). + #[test] + fn notion_source_id_is_stable_and_namespaced() { + let a = notion_source_id("conn-1", "page-abc"); + let b = notion_source_id("conn-1", "page-abc"); + assert_eq!(a, b); + assert_eq!(a, "notion:conn-1:page-abc"); + + assert_ne!( + notion_source_id("conn-1", "page-abc"), + notion_source_id("conn-2", "page-abc"), + "distinct connections must produce distinct source ids" + ); + assert_ne!( + notion_source_id("conn-1", "page-abc"), + notion_source_id("conn-1", "page-xyz"), + "distinct page ids must produce distinct source ids" + ); + } + + /// `parse_edited_time` accepts valid ISO 8601 / RFC 3339 and falls + /// back to `Utc::now()` on bad input rather than failing the ingest. + /// We don't assert the now-fallback timestamp value (it's + /// time-dependent) — just that we got a `DateTime` back. + #[test] + fn parse_edited_time_handles_valid_and_invalid_inputs() { + let good = parse_edited_time(Some("2026-05-28T12:34:56.000Z")); + assert_eq!(good.format("%Y-%m-%d").to_string(), "2026-05-28"); + + // Invalid / missing both fall through to `Utc::now()` — sanity + // check that the result is "recent" (within last 5s). + let bad = parse_edited_time(Some("not-a-timestamp")); + assert!((Utc::now() - bad).num_seconds().abs() < 5); + + let missing = parse_edited_time(None); + assert!((Utc::now() - missing).num_seconds().abs() < 5); + } + + /// `render_page_body` produces a markdown document with the title + /// header + the full page JSON pretty-printed in a fenced code + /// block. Pins the chunked-content shape — without this the + /// retrieval body becomes "just the title" and loses Notion-specific + /// signal (properties, URL, excerpt) at search time. + #[test] + fn render_page_body_includes_title_header_and_pretty_json() { + let page = json!({ "id": "p-1", "url": "https://notion.so/p1" }); + let body = render_page_body("Phoenix plan", &page); + assert!(body.starts_with("# Phoenix plan\n")); + assert!(body.contains("```json\n")); + assert!(body.contains("\"id\": \"p-1\"")); + assert!(body.contains("\"url\": \"https://notion.so/p1\"")); + } + + /// The #2885 regression test. + /// + /// Before this migration, Notion sync routed through + /// `MemoryClient::store_skill_sync` → `UnifiedMemory::upsert_document` + /// → `memory_docs` (legacy backend). The memory-tree retrieval + /// surfaces (which every modern caller reads from) saw zero rows. + /// + /// This test pins the new contract: a successful `ingest_page_into_memory_tree` + /// call writes to `mem_tree_chunks` + `mem_tree_ingested_sources`, + /// so the silent-failure mode can't reappear. Mirrors the + /// `sync_writes_to_memory_tree` regression in `vault::sync` (#2720). + #[tokio::test] + async fn ingest_page_writes_to_memory_tree() { + use crate::openhuman::memory_store::chunks::store::{count_chunks, is_source_ingested}; + + let (_tmp, cfg) = test_config(); + let connection_id = "conn-test"; + let page_id = "page-phoenix"; + let page = sample_page(page_id, "2026-05-28T10:00:00.000Z"); + + let chunks_before = count_chunks(&cfg).expect("count_chunks before"); + + let written = ingest_page_into_memory_tree( + &cfg, + connection_id, + page_id, + "Phoenix migration plan", + Some("2026-05-28T10:00:00.000Z"), + &page, + ) + .await + .expect("ingest_page_into_memory_tree"); + + assert!( + written > 0, + "Notion ingest must write at least one chunk; got {written}" + ); + + // Core regression assertion: chunks landed in memory_tree. + let chunks_after = count_chunks(&cfg).expect("count_chunks after"); + assert!( + chunks_after > chunks_before, + "ingest must populate mem_tree_chunks (#2885): {chunks_before} → {chunks_after}" + ); + + // Source registration. + let cfg_for_blocking = cfg.clone(); + let expected = notion_source_id(connection_id, page_id); + let registered = tokio::task::spawn_blocking(move || { + is_source_ingested(&cfg_for_blocking, SourceKind::Document, &expected).unwrap_or(false) + }) + .await + .expect("source-check task join"); + assert!( + registered, + "source_id {} must be registered in mem_tree_ingested_sources", + notion_source_id(connection_id, page_id) + ); + } + + /// Re-ingesting an edited page (same `(connection_id, page_id)`, + /// different content) cleans up prior chunks and writes fresh ones — + /// the `delete_chunks_by_source` guard sidesteps the pipeline's + /// `already_ingested` short-circuit that would otherwise drop the + /// new revision. + #[tokio::test] + async fn re_ingesting_edited_page_replaces_prior_chunks() { + use crate::openhuman::memory_store::chunks::store::count_chunks; + + let (_tmp, cfg) = test_config(); + let connection_id = "conn-edit"; + let page_id = "page-edit"; + + // First ingest. + let v1 = sample_page(page_id, "2026-05-28T10:00:00.000Z"); + let first = ingest_page_into_memory_tree( + &cfg, + connection_id, + page_id, + "Phoenix plan v1", + Some("2026-05-28T10:00:00.000Z"), + &v1, + ) + .await + .expect("first ingest"); + assert!(first > 0); + let after_first = count_chunks(&cfg).expect("count after first"); + + // Re-ingest with different body — should NOT short-circuit, and + // chunk count should not double (prior chunks dropped, new ones + // written, net same per-page count for this body size). + let v2 = json!({ + "id": page_id, + "object": "page", + "last_edited_time": "2026-05-29T10:00:00.000Z", + "properties": { "Name": { "title": [{ "plain_text": "Phoenix plan revised" }] } }, + "body_excerpt": "Plan revised: ship Monday, Carol takes on-call instead.", + }); + let second = ingest_page_into_memory_tree( + &cfg, + connection_id, + page_id, + "Phoenix plan v2", + Some("2026-05-29T10:00:00.000Z"), + &v2, + ) + .await + .expect("second ingest"); + assert!( + second > 0, + "edited page must actually re-ingest, not silently no-op" + ); + let after_second = count_chunks(&cfg).expect("count after second"); + + // The chunk count after the second ingest should equal the + // count after the first (replaced one revision with another), + // not double. Allow ±1 for any rounding in how the chunker + // splits subtly-different markdown. + assert!( + after_second.abs_diff(after_first) <= 1, + "edited page must replace prior chunks, not append: \ + after_first={after_first} after_second={after_second}" + ); + } +} diff --git a/src/openhuman/memory_sync/composio/providers/notion/mod.rs b/src/openhuman/memory_sync/composio/providers/notion/mod.rs index 427b6fb3f2..44bc4f4b91 100644 --- a/src/openhuman/memory_sync/composio/providers/notion/mod.rs +++ b/src/openhuman/memory_sync/composio/providers/notion/mod.rs @@ -1,3 +1,4 @@ +mod ingest; mod provider; mod sync; #[cfg(test)] diff --git a/src/openhuman/memory_sync/composio/providers/notion/provider.rs b/src/openhuman/memory_sync/composio/providers/notion/provider.rs index 8ad83a6d6e..3c52dab0b3 100644 --- a/src/openhuman/memory_sync/composio/providers/notion/provider.rs +++ b/src/openhuman/memory_sync/composio/providers/notion/provider.rs @@ -18,10 +18,9 @@ use async_trait::async_trait; use serde_json::{json, Value}; +use super::ingest::ingest_page_into_memory_tree; use super::sync; -use crate::openhuman::memory_sync::composio::providers::sync_state::{ - extract_item_id, persist_single_item, SyncState, -}; +use crate::openhuman::memory_sync::composio::providers::sync_state::{extract_item_id, SyncState}; use crate::openhuman::memory_sync::composio::providers::{ pick_str, ComposioProvider, CuratedTool, ProviderContext, ProviderUserProfile, SyncOutcome, SyncReason, @@ -277,21 +276,26 @@ impl ComposioProvider for NotionProvider { // Build a title from the page's properties. let title_text = sync::extract_page_title(page) .unwrap_or_else(|| format!("Notion page {page_id}")); - let doc_id = format!("composio-notion-page-{page_id}"); let title = format!("Notion: {title_text}"); - match persist_single_item( - &memory, - "notion", - &doc_id, + // Route into the memory-tree pipeline (#2885). The prior + // implementation called `persist_single_item` → + // `MemoryClient::store_skill_sync` → UnifiedMemory + // `memory_docs`, which the modern retrieval surfaces + // (`memory.search`, `tree.read_chunk`, `tree.browse`, + // summary trees, MCP tools) don't read from — the data + // was invisible to every agent recall path. + match ingest_page_into_memory_tree( + &ctx.config, + &connection_id, + &page_id, &title, + edited_time.as_deref(), page, - "notion", - ctx.connection_id.as_deref(), ) .await { - Ok(_) => { + Ok(_chunks_written) => { state.mark_synced(&sync_key); total_persisted += 1; } @@ -299,7 +303,7 @@ impl ComposioProvider for NotionProvider { tracing::warn!( page_id = %page_id, error = %e, - "[composio:notion] failed to persist page (continuing)" + "[composio:notion] failed to ingest page into memory_tree (continuing)" ); } } From dd3943d649d144d72438077a629406128e983fc4 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 10:35:15 +0800 Subject: [PATCH 2/4] perf(composio/notion): skip chunk scan on fresh-page ingest (CodeRabbit nit) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gate `delete_chunks_by_source` behind `is_source_ingested`. The delete path uses a `source_kind = ?1` scan + Rust-side source-id filter (`store::delete_chunks_by_source_filter`), so a first-time ingest of a never-seen page would scan every Document-kind chunk just to find zero matches. `is_source_ingested` is an indexed PK lookup against `mem_tree_ingested_sources`, so the common fresh-page case becomes one cheap `COUNT(*)` and we only pay the scan cost on actual re-ingests of edited pages. Same `spawn_blocking` hop, single combined closure — the gate runs inside the existing blocking task. The 5 ingest tests still pass: the regression test exercises the false → no-delete path (fresh page); the re-ingest test exercises the true → delete path (edited page). No behaviour change on hot paths — `is_source_ingested` is the same table the pipeline's `claim_source_ingest_tx` writes to, so it sees exactly the rows the delete needs to clean up. Refs CodeRabbit review on #2887. --- .../composio/providers/notion/ingest.rs | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs index 4ad302d2c4..b962edd116 100644 --- a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs +++ b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs @@ -44,7 +44,7 @@ use serde_json::Value; use crate::openhuman::config::Config; use crate::openhuman::memory::ingest_pipeline::{self, IngestResult}; -use crate::openhuman::memory_store::chunks::store::delete_chunks_by_source; +use crate::openhuman::memory_store::chunks::store::{delete_chunks_by_source, is_source_ingested}; use crate::openhuman::memory_store::chunks::types::SourceKind; use crate::openhuman::memory_sync::canonicalize::document::DocumentInput; @@ -108,16 +108,33 @@ pub async fn ingest_page_into_memory_tree( let source_id = notion_source_id(connection_id, page_id); // Re-sync of an edited page: drop prior chunks for the same source_id - // before re-ingest. `delete_chunks_by_source` is sync I/O so it has to - // hop through `spawn_blocking`. + // before re-ingest. Both calls are sync rusqlite I/O so they share one + // `spawn_blocking` hop. + // + // We gate `delete_chunks_by_source` behind `is_source_ingested` — the + // delete path uses a `source_kind = ?1` scan with Rust-side + // source-id filtering (see `store::delete_chunks_by_source_filter`), + // so on a first-time ingest of a never-seen page it would scan every + // Document-kind chunk just to find zero matches. `is_source_ingested` + // is an indexed PK lookup against `mem_tree_ingested_sources`, so it + // converts the common fresh-page case to one cheap `COUNT(*)` and only + // pays the scan cost on actual re-ingests of edited pages. let cfg_for_blocking = config.clone(); let source_for_blocking = source_id.clone(); - let removed = tokio::task::spawn_blocking(move || { - delete_chunks_by_source( + let removed = tokio::task::spawn_blocking(move || -> Result { + if is_source_ingested( &cfg_for_blocking, SourceKind::Document, &source_for_blocking, - ) + )? { + delete_chunks_by_source( + &cfg_for_blocking, + SourceKind::Document, + &source_for_blocking, + ) + } else { + Ok(0) + } }) .await .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??; From 0c15fd2720bde61e3b1701d856d21ff81b04120c Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 11:33:49 +0800 Subject: [PATCH 3/4] fix(composio/notion): hold cursor on ingest failure + preserve error chain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related correctness fixes flagged by CodeRabbit on the parallel GitHub PR (#2889) — same shape exists here in Notion, so applying the fix here proactively rather than waiting for a second round. ## 1. Cursor advances on ingest failure → data-loss window `provider.rs` updated `newest_edited_time` for every result before attempting ingest, then advanced the persistent cursor unconditionally at the end of the loop. Items whose ingest call returned `Err` were logged + skipped (no `mark_synced`) — but the next sync queried `last_edited_time > {cursor}`, so the failed item was never re-fetched. Pre-#2885 this was just "stale revision": the legacy `persist_single_item` path wasn't delete-first, so a failed write left old chunks alone. Post-#2885 it's worse — `ingest_page_into_memory_tree` calls `delete_chunks_by_source` *before* the (failing) write, so an *edited* page that fails to re-ingest is left with neither old nor new chunks in `mem_tree_chunks` until its next edit. Fix: track `had_ingest_failures: bool`, set on every `Err`, and gate the `advance_cursor` at the bottom of the sync loop. Already-synced items are cheaply skipped via `state.is_synced` on the re-fetch next pass, so the cost of holding is just one extra search-API page on the failure range. A retry-only-failed-items fix (separate `newest_successful_updated` cursor) would be tighter but adds state and edge cases; the simple gate matches CodeRabbit's exact suggestion and is provably safe (cursor only ever advances when every item in the pass succeeded). ## 2. `{err}` drops anyhow context chain `ingest.rs` wrapped the `ingest_pipeline::ingest_document` failure as `anyhow!("ingest_document failed for {source_id}: {err}")`, where `{err}` is Display-only and strips the chain. `provider.rs` then logs the wrapped error with `tracing::warn!(error = %e)` (also Display), so the underlying DB / embedding / persist cause was lost. Fix: swap to `{err:#}` (alternate formatter) so the chain is baked into the wrapping anyhow's Display impl. Matches the convention `provider.rs` already uses on its own `ctx.execute(...).map_err(|e| ... "{e:#}")` call sites (lines 92, 207). ## Tests 27/27 still pass in `memory_sync::composio::providers::notion::*`. The provider's `sync()` is hard to unit-test without mocking `ComposioContext` heavily — the invariant is pinned by the rustdoc on `had_ingest_failures` and the inline gate comment. The ingest-side tests (regression + re-ingest) cover the ingest function's contract; the cursor logic lives entirely in `provider.rs`. Refs CodeRabbit review on #2889. --- .../composio/providers/notion/ingest.rs | 6 ++++- .../composio/providers/notion/provider.rs | 25 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs index b962edd116..14541c0ea9 100644 --- a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs +++ b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs @@ -181,7 +181,11 @@ pub async fn ingest_page_into_memory_tree( Ok(chunks_written) } Err(err) => Err(anyhow::anyhow!( - "ingest_document failed for {source_id}: {err}" + // `{err:#}` (alternate formatter) bakes in the anyhow context + // chain so provider.rs's `tracing::warn!(error = %e)` doesn't + // strip the underlying cause (DB / embedding / persist failure) + // when it Displays the wrapped error. + "ingest_document failed for {source_id}: {err:#}" )), } } diff --git a/src/openhuman/memory_sync/composio/providers/notion/provider.rs b/src/openhuman/memory_sync/composio/providers/notion/provider.rs index 3c52dab0b3..a0f79e2703 100644 --- a/src/openhuman/memory_sync/composio/providers/notion/provider.rs +++ b/src/openhuman/memory_sync/composio/providers/notion/provider.rs @@ -181,6 +181,14 @@ impl ComposioProvider for NotionProvider { let mut total_persisted: usize = 0; let mut newest_edited_time: Option = None; let mut notion_cursor: Option = None; + // Track whether any per-item ingest failed this pass. If so, we hold + // the persistent cursor — `last_edited_time > {cursor}` on the next + // sync would otherwise exclude the failed item, and because the new + // memory-tree pipeline (#2885) is delete-first, an *edited* page that + // failed to re-ingest is left with neither old nor new chunks until + // its next edit. Already-synced items are skipped cheaply via + // `is_synced` on the re-fetch, so the cost of holding is minimal. + let mut had_ingest_failures = false; for page_num in 0..MAX_PAGES_PER_SYNC { if state.budget_exhausted() { @@ -300,6 +308,7 @@ impl ComposioProvider for NotionProvider { total_persisted += 1; } Err(e) => { + had_ingest_failures = true; tracing::warn!( page_id = %page_id, error = %e, @@ -326,8 +335,20 @@ impl ComposioProvider for NotionProvider { } // ── Step 5: advance cursor and save state ─────────────────── - if let Some(new_cursor) = newest_edited_time { - state.advance_cursor(&new_cursor); + // + // Hold the cursor when any item failed to ingest this pass. See the + // `had_ingest_failures` declaration above for why this matters under + // the delete-first memory-tree pipeline (#2885). + if !had_ingest_failures { + if let Some(new_cursor) = newest_edited_time { + state.advance_cursor(&new_cursor); + } + } else { + tracing::warn!( + connection_id = %connection_id, + "[composio:notion] holding cursor — ingest failures this pass; next sync will \ + re-fetch the failed range" + ); } state.save(&memory).await?; From 23118615796a0dcc0dce03c62690f3b3832e9144 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 16:53:34 +0800 Subject: [PATCH 4/4] style(composio/notion): use tracing::debug! with structured fields in ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Carries the @graycyrus review note from the parallel GitHub PR (#2889) back to Notion so the two new memory-tree ingest modules stay consistent. The two `log::debug!` calls in `ingest_page_into_memory_tree` used positional format strings; provider.rs in the same directory logs via `tracing::*` with structured key=value fields, which log-aggregation pipelines can filter on (the tracing-log bridge flattens positional args to an opaque message and loses them). Swap both to `tracing::debug!` with named fields matching provider.rs: - re-ingest cleanup: connection_id / page_id / removed_chunks - page persisted: connection_id / page_id / chunks_written / already_ingested Scope note: the older ingest modules (slack, gmail, vault::sync) still use `log::*` — pre-existing, out of scope. A follow-up sweep can unify them separately if desired. Refs #2887, #2889. --- .../composio/providers/notion/ingest.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs index 14541c0ea9..82ed1c0b93 100644 --- a/src/openhuman/memory_sync/composio/providers/notion/ingest.rs +++ b/src/openhuman/memory_sync/composio/providers/notion/ingest.rs @@ -139,11 +139,11 @@ pub async fn ingest_page_into_memory_tree( .await .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??; if removed > 0 { - log::debug!( - "[composio:notion] ingest: re-ingest cleanup connection_id={} page_id={} removed_chunks={}", - connection_id, - page_id, - removed + tracing::debug!( + connection_id = %connection_id, + page_id = %page_id, + removed_chunks = removed, + "[composio:notion] ingest: re-ingest cleanup" ); } @@ -171,12 +171,12 @@ pub async fn ingest_page_into_memory_tree( // the normal update path. Seeing it here means the prior // chunks were already absent (fresh ingest into a primed // memory_tree) — fine, just log at debug. - log::debug!( - "[composio:notion] ingest: page connection_id={} page_id={} chunks_written={} already_ingested={}", - connection_id, - page_id, + tracing::debug!( + connection_id = %connection_id, + page_id = %page_id, chunks_written, already_ingested, + "[composio:notion] ingest: page persisted" ); Ok(chunks_written) }