From 37facfdb29fa22f7d1c7adafaa489264446e2ed1 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 10:28:14 +0800 Subject: [PATCH 1/4] feat(composio): migrate GitHub provider to memory_tree pipeline (#2885) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second of the four-PR follow-up sequence in #2885 (after Notion in #2887). GitHub issue/PR sync now writes through `ingest_pipeline::ingest_document` → `mem_tree_chunks` instead of `MemoryClient::store_skill_sync` → UnifiedMemory `memory_docs`, so `memory.search` / `tree.read_chunk` / `tree.browse` / the agent recall path / summary trees all see GitHub items again. Mirrors the Notion shape: - `providers/github/ingest.rs` (NEW): `github_source_id`, `parse_updated_at`, `render_issue_body`, `extract_html_url`, `ingest_issue_into_memory_tree`. - Re-ingest cleanup via `delete_chunks_by_source` before each ingest, so the pipeline's content-blind `already_ingested` gate doesn't drop edited issues (same pattern as vault sync #2720 and Notion #2887). - `source_id = github:{connection_id}:{issue_id}` — stable across re-syncs and across connections. Works for both numeric internal IDs and the `owner/repo#number` slug fallback from `extract_issue_id`. - `source_ref` carries the canonical GitHub `html_url` so audit trails are one-click navigable from any downstream UI. - `provider.rs`: dropped `persist_single_item` + `doc_id`, threaded `updated_at` through to the new ingest fn, kept the existing composite-key dedup logic (`{issue_id}@{updated_at}`) intact. Tests (lib): 6 in `ingest::tests`, all green. - `github_source_id_is_stable_and_namespaced` — pins the contract the `delete_chunks_by_source` cleanup path relies on. - `parse_updated_at_handles_valid_and_invalid_inputs` — falls through to `Utc::now()` on bad input instead of failing the ingest. - `render_issue_body_includes_title_header_and_pretty_json` — pins the chunked-content shape; without it the retrieval body loses labels / assignees / state / comment count. - `extract_html_url_handles_both_envelope_shapes` — handles direct and `data.html_url` Composio envelope variants. - `ingest_issue_writes_to_memory_tree` — the #2885 regression: asserts `count_chunks` rises and the source_id is registered in `mem_tree_ingested_sources`. - `re_ingesting_edited_issue_replaces_prior_chunks` — confirms the delete-first guard works; chunk count after re-ingest stays steady (±1 chunker rounding) instead of doubling. `persist_single_item` / `store_skill_sync` are intentionally left in place — they're still used by the remaining unmigrated providers (Linear, ClickUp) and the wider UnifiedMemory removal lives under the senamakel #2585 follow-up. They become removable once Linear and ClickUp follow (also tracked in #2885). Refs #2885. --- .../composio/providers/github/ingest.rs | 457 ++++++++++++++++++ .../composio/providers/github/mod.rs | 1 + .../composio/providers/github/provider.rs | 28 +- 3 files changed, 474 insertions(+), 12 deletions(-) create mode 100644 src/openhuman/memory_sync/composio/providers/github/ingest.rs diff --git a/src/openhuman/memory_sync/composio/providers/github/ingest.rs b/src/openhuman/memory_sync/composio/providers/github/ingest.rs new file mode 100644 index 0000000000..90854b1b84 --- /dev/null +++ b/src/openhuman/memory_sync/composio/providers/github/ingest.rs @@ -0,0 +1,457 @@ +//! GitHub → memory tree ingest plumbing. +//! +//! Owns the conversion from a single GitHub issue/PR payload (post-extracted +//! by [`super::sync`]) into a [`DocumentInput`] and drives +//! [`memory::ingest_pipeline::ingest_document`] for that item. +//! +//! Mirrors the canonical Slack/Gmail/Notion per-source ingest layout +//! ([`super::super::slack::ingest`] / [`super::super::gmail::ingest`] / +//! [`super::super::notion::ingest`]) so retrieval surfaces +//! (`memory.search`, `tree.read_chunk`, `tree.browse`, the agent's +//! recall path, summary trees) actually see GitHub content — pre-#2885 +//! the provider wrote via `MemoryClient::store_skill_sync` into the +//! legacy `memory_docs` table, invisible to the memory-tree retrieval +//! stack. +//! +//! ## Source-id scope +//! +//! Source id is `github:{connection_id}:{issue_id}` — one source per +//! GitHub issue or pull request per connection. Issue and PR are the +//! natural GitHub grouping ("one issue/PR = one document") so per-item +//! ingest keeps the canonical `SourceKind::Document` semantics and +//! matches how the Gmail per-message / Slack per-channel paths scope +//! their sources. +//! +//! ## Re-ingest of edited issues +//! +//! GitHub issues and PRs mutate (the cursor advances by `updated_at` — +//! title, body, labels, assignees, and comments all bump it). Re-ingesting +//! the same `(connection_id, issue_id)` after an update would +//! short-circuit on the pipeline's `already_ingested` gate — so the call +//! site drops prior chunks for the same source_id via +//! `delete_chunks_by_source` *before* re-ingest, mirroring the vault +//! sync pattern in #2720 and the Notion provider in #2887. The +//! provider's own `SyncState::synced_ids` keyed by +//! `{issue_id}@{updated_at}` is the authoritative "have we seen this +//! revision?" check; this module only runs when that says yes. +//! +//! ## Idempotency +//! +//! Chunk IDs are content-hashed inside the memory tree, so re-ingesting +//! a previously-seen issue is an UPSERT on the same chunk row — no +//! duplicate chunks across syncs. + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use serde_json::Value; + +use crate::openhuman::config::Config; +use crate::openhuman::memory::ingest_pipeline::{self, IngestResult}; +use crate::openhuman::memory_store::chunks::store::delete_chunks_by_source; +use crate::openhuman::memory_store::chunks::types::SourceKind; +use crate::openhuman::memory_sync::canonicalize::document::DocumentInput; + +/// Platform identifier embedded in the canonical document body header. +/// Matches the value `memory_tree::retrieval::source::PLATFORM_KINDS` +/// expects for GitHub-sourced documents. +pub const GITHUB_PLATFORM: &str = "github"; + +/// Tags attached to every GitHub-ingested chunk. Stable list — retrieval +/// callers filter on these. +pub const DEFAULT_TAGS: &[&str] = &["github", "ingested"]; + +/// Build the memory-tree source_id for one GitHub issue/PR in one connection. +/// +/// Stable across re-syncs of the same `(connection_id, issue_id)` so the +/// pipeline's idempotency gate works correctly and the dedup-on-edit +/// path can map back to the prior chunks for cleanup before re-ingest. +/// +/// `issue_id` is whatever stable identifier `super::sync::extract_issue_id` +/// produces — typically GitHub's numeric internal ID, falling back to +/// `owner/repo#number` parsed from `html_url`. Either form is stable +/// per-issue, so callers don't need to normalise before constructing +/// the source id. +pub(crate) fn github_source_id(connection_id: &str, issue_id: &str) -> String { + format!("github:{connection_id}:{issue_id}") +} + +/// Pretty-printed JSON body for one GitHub issue/PR. We persist the *full* +/// Composio response payload (not just the title + body markdown) so the +/// chunked content retains enough context for retrieval — labels, +/// assignees, milestone, comment counts, and state transitions are all +/// meaningful retrieval signal that a "just the body" projection would +/// drop. +fn render_issue_body(title: &str, issue: &Value) -> String { + let pretty = serde_json::to_string_pretty(issue).unwrap_or_else(|_| "{}".to_string()); + format!("# {title}\n\n```json\n{pretty}\n```\n") +} + +/// Parse a GitHub `updated_at` timestamp (ISO 8601 / RFC 3339, e.g. +/// `"2024-05-21T15:30:00Z"`) into a `DateTime`, falling back to +/// `Utc::now()` on failure so the pipeline still gets a valid timestamp. +fn parse_updated_at(raw: Option<&str>) -> DateTime { + raw.and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(Utc::now) +} + +/// Best-effort extraction of the issue's `html_url` for `source_ref`. +/// +/// Using the canonical GitHub URL (rather than a custom `github://` scheme +/// like Notion does) keeps the audit trail one-click navigable — clicking +/// `source_ref` in any downstream UI lands on the real issue page. +fn extract_html_url(issue: &Value) -> Option { + issue + .get("html_url") + .and_then(|v| v.as_str()) + .or_else(|| issue.pointer("/data/html_url").and_then(|v| v.as_str())) + .map(|s| s.to_string()) +} + +/// Ingest one GitHub issue (or PR — same shape) into the memory tree. +/// +/// Caller (the provider's `sync` loop) is responsible for the +/// edit-detection / dedup state-machine (`SyncState::synced_ids` keyed +/// by `{issue_id}@{updated_at}`) — this function trusts that the call +/// only happens for items the caller wants to admit. +/// +/// On content updates of an already-ingested source_id, drops the prior +/// chunks first via `delete_chunks_by_source` so the pipeline's +/// `already_ingested` gate doesn't short-circuit the new content. This +/// mirrors the vault sync pattern in #2720 and the Notion provider in +/// #2887. +/// +/// Returns the number of chunks the pipeline wrote. +pub async fn ingest_issue_into_memory_tree( + config: &Config, + connection_id: &str, + issue_id: &str, + title: &str, + updated_at: Option<&str>, + issue: &Value, +) -> Result { + let source_id = github_source_id(connection_id, issue_id); + + // Re-sync of an edited issue: drop prior chunks for the same source_id + // before re-ingest. `delete_chunks_by_source` is sync I/O so it has to + // hop through `spawn_blocking`. + let cfg_for_blocking = config.clone(); + let source_for_blocking = source_id.clone(); + let removed = tokio::task::spawn_blocking(move || { + delete_chunks_by_source( + &cfg_for_blocking, + SourceKind::Document, + &source_for_blocking, + ) + }) + .await + .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??; + if removed > 0 { + log::debug!( + "[composio:github] ingest: re-ingest cleanup connection_id={} issue_id={} removed_chunks={}", + connection_id, + issue_id, + removed + ); + } + + let modified_at = parse_updated_at(updated_at); + let body = render_issue_body(title, issue); + let source_ref = extract_html_url(issue); + + let doc = DocumentInput { + provider: GITHUB_PLATFORM.to_string(), + title: title.to_string(), + body, + modified_at, + source_ref, + }; + let tags: Vec = DEFAULT_TAGS.iter().map(|s| s.to_string()).collect(); + let owner = format!("github:{connection_id}"); + + match ingest_pipeline::ingest_document(config, &source_id, &owner, tags, doc).await { + Ok(IngestResult { + chunks_written, + already_ingested, + .. + }) => { + // The delete-first guard above prevents `already_ingested` on + // the normal update path. Seeing it here means the prior + // chunks were already absent (fresh ingest into a primed + // memory_tree) — fine, just log at debug. + log::debug!( + "[composio:github] ingest: issue connection_id={} issue_id={} chunks_written={} already_ingested={}", + connection_id, + issue_id, + chunks_written, + already_ingested, + ); + Ok(chunks_written) + } + Err(err) => Err(anyhow::anyhow!( + "ingest_document failed for {source_id}: {err}" + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn test_config() -> (TempDir, Config) { + let tmp = TempDir::new().expect("tempdir"); + let mut cfg = Config::default(); + cfg.workspace_dir = tmp.path().to_path_buf(); + // Disable strict embedding so the pipeline accepts chunks without + // a live embedder (matches the + // `memory::sync_pipeline_e2e_test::test_config` shape). + cfg.memory_tree.embedding_endpoint = None; + cfg.memory_tree.embedding_model = None; + cfg.memory_tree.embedding_strict = false; + (tmp, cfg) + } + + fn sample_issue(issue_id: u64, updated_at: &str, title: &str, body: &str) -> Value { + json!({ + "id": issue_id, + "number": 99, + "title": title, + "body": body, + "state": "open", + "html_url": format!("https://github.com/acme/core/issues/{}", issue_id), + "user": { "login": "octocat", "id": 1u64 }, + "labels": [ + { "name": "bug", "color": "d73a4a" }, + { "name": "memory", "color": "0e8a16" } + ], + "updated_at": updated_at, + "created_at": "2026-05-01T00:00:00Z", + "comments": 3, + "assignees": [{ "login": "alice" }], + }) + } + + /// `github_source_id` is stable across calls and namespaces + /// `(connection_id, issue_id)` distinctly. Pins the contract the + /// re-ingest cleanup path relies on (`delete_chunks_by_source` + /// against the same `source_id`). + #[test] + fn github_source_id_is_stable_and_namespaced() { + let a = github_source_id("conn-1", "12345"); + let b = github_source_id("conn-1", "12345"); + assert_eq!(a, b); + assert_eq!(a, "github:conn-1:12345"); + + // owner/repo#number slug form also stable. + assert_eq!( + github_source_id("conn-1", "acme/core#42"), + "github:conn-1:acme/core#42" + ); + + assert_ne!( + github_source_id("conn-1", "12345"), + github_source_id("conn-2", "12345"), + "distinct connections must produce distinct source ids" + ); + assert_ne!( + github_source_id("conn-1", "12345"), + github_source_id("conn-1", "67890"), + "distinct issue ids must produce distinct source ids" + ); + } + + /// `parse_updated_at` accepts valid ISO 8601 / RFC 3339 and falls + /// back to `Utc::now()` on bad input rather than failing the ingest. + /// We don't assert the now-fallback timestamp value (it's + /// time-dependent) — just that we got a `DateTime` back. + #[test] + fn parse_updated_at_handles_valid_and_invalid_inputs() { + let good = parse_updated_at(Some("2026-05-28T12:34:56Z")); + assert_eq!(good.format("%Y-%m-%d").to_string(), "2026-05-28"); + + // Invalid / missing both fall through to `Utc::now()` — sanity + // check that the result is "recent" (within last 5s). + let bad = parse_updated_at(Some("not-a-timestamp")); + assert!((Utc::now() - bad).num_seconds().abs() < 5); + + let missing = parse_updated_at(None); + assert!((Utc::now() - missing).num_seconds().abs() < 5); + } + + /// `render_issue_body` produces a markdown document with the title + /// header + the full issue JSON pretty-printed in a fenced code + /// block. Pins the chunked-content shape — without this the + /// retrieval body becomes "just the title" and loses GitHub-specific + /// signal (labels, assignees, comment count, state) at search time. + #[test] + fn render_issue_body_includes_title_header_and_pretty_json() { + let issue = sample_issue(123, "2026-05-28T10:00:00Z", "Fix the bug", "Repro steps:"); + let body = render_issue_body("GitHub: acme/core#99: Fix the bug", &issue); + assert!(body.starts_with("# GitHub: acme/core#99: Fix the bug\n")); + assert!(body.contains("```json\n")); + assert!(body.contains("\"title\": \"Fix the bug\"")); + assert!(body.contains("\"bug\"")); + assert!(body.contains("\"assignees\"")); + } + + /// `extract_html_url` prefers the top-level field and falls back to + /// the `data.html_url` wrapper Composio sometimes returns. + #[test] + fn extract_html_url_handles_both_envelope_shapes() { + let direct = json!({ "html_url": "https://github.com/x/y/issues/1" }); + assert_eq!( + extract_html_url(&direct), + Some("https://github.com/x/y/issues/1".to_string()) + ); + + let wrapped = json!({ "data": { "html_url": "https://github.com/x/y/issues/2" } }); + assert_eq!( + extract_html_url(&wrapped), + Some("https://github.com/x/y/issues/2".to_string()) + ); + + let missing = json!({ "id": 1u64 }); + assert!(extract_html_url(&missing).is_none()); + } + + /// The #2885 regression test. + /// + /// Before this migration, GitHub sync routed through + /// `MemoryClient::store_skill_sync` → `UnifiedMemory::upsert_document` + /// → `memory_docs` (legacy backend). The memory-tree retrieval + /// surfaces (which every modern caller reads from) saw zero rows. + /// + /// This test pins the new contract: a successful + /// `ingest_issue_into_memory_tree` call writes to `mem_tree_chunks` + /// + `mem_tree_ingested_sources`, so the silent-failure mode can't + /// reappear. Mirrors the `sync_writes_to_memory_tree` regression + /// in `vault::sync` (#2720) and the equivalent in + /// `composio::providers::notion::ingest` (#2887). + #[tokio::test] + async fn ingest_issue_writes_to_memory_tree() { + use crate::openhuman::memory_store::chunks::store::{count_chunks, is_source_ingested}; + + let (_tmp, cfg) = test_config(); + let connection_id = "conn-test"; + let issue_id = "987654321"; + let issue = sample_issue(987654321, "2026-05-28T10:00:00Z", "Fix flaky test", "Repro"); + + let chunks_before = count_chunks(&cfg).expect("count_chunks before"); + + let written = ingest_issue_into_memory_tree( + &cfg, + connection_id, + issue_id, + "GitHub: acme/core#99: Fix flaky test", + Some("2026-05-28T10:00:00Z"), + &issue, + ) + .await + .expect("ingest_issue_into_memory_tree"); + + assert!( + written > 0, + "GitHub ingest must write at least one chunk; got {written}" + ); + + // Core regression assertion: chunks landed in memory_tree. + let chunks_after = count_chunks(&cfg).expect("count_chunks after"); + assert!( + chunks_after > chunks_before, + "ingest must populate mem_tree_chunks (#2885): {chunks_before} → {chunks_after}" + ); + + // Source registration. + let cfg_for_blocking = cfg.clone(); + let expected = github_source_id(connection_id, issue_id); + let registered = tokio::task::spawn_blocking(move || { + is_source_ingested(&cfg_for_blocking, SourceKind::Document, &expected).unwrap_or(false) + }) + .await + .expect("source-check task join"); + assert!( + registered, + "source_id {} must be registered in mem_tree_ingested_sources", + github_source_id(connection_id, issue_id) + ); + } + + /// Re-ingesting an edited issue (same `(connection_id, issue_id)`, + /// different content) cleans up prior chunks and writes fresh ones — + /// the `delete_chunks_by_source` guard sidesteps the pipeline's + /// `already_ingested` short-circuit that would otherwise drop the + /// new revision. + #[tokio::test] + async fn re_ingesting_edited_issue_replaces_prior_chunks() { + use crate::openhuman::memory_store::chunks::store::count_chunks; + + let (_tmp, cfg) = test_config(); + let connection_id = "conn-edit"; + let issue_id = "555"; + + // First ingest. + let v1 = sample_issue( + 555, + "2026-05-28T10:00:00Z", + "Flaky test on Linux", + "Original repro: bisect points at #1234.", + ); + let first = ingest_issue_into_memory_tree( + &cfg, + connection_id, + issue_id, + "GitHub: acme/core#99: Flaky test on Linux", + Some("2026-05-28T10:00:00Z"), + &v1, + ) + .await + .expect("first ingest"); + assert!(first > 0); + let after_first = count_chunks(&cfg).expect("count after first"); + + // Re-ingest with different body (issue was edited with new + // description and labels) — should NOT short-circuit, and chunk + // count should not double (prior chunks dropped, new ones + // written, net same per-issue count for this body size). + let v2 = json!({ + "id": 555u64, + "number": 99, + "title": "Flaky test on Linux — root cause found", + "body": "Root cause: race in scheduler. PR #1300 fixes.", + "state": "open", + "html_url": "https://github.com/acme/core/issues/99", + "labels": [ + { "name": "bug" }, + { "name": "needs-review" } + ], + "updated_at": "2026-05-29T11:22:33Z", + }); + let second = ingest_issue_into_memory_tree( + &cfg, + connection_id, + issue_id, + "GitHub: acme/core#99: Flaky test on Linux — root cause found", + Some("2026-05-29T11:22:33Z"), + &v2, + ) + .await + .expect("second ingest"); + assert!( + second > 0, + "edited issue must actually re-ingest, not silently no-op" + ); + let after_second = count_chunks(&cfg).expect("count after second"); + + // The chunk count after the second ingest should equal the + // count after the first (replaced one revision with another), + // not double. Allow ±1 for any rounding in how the chunker + // splits subtly-different markdown. + assert!( + after_second.abs_diff(after_first) <= 1, + "edited issue must replace prior chunks, not append: \ + after_first={after_first} after_second={after_second}" + ); + } +} diff --git a/src/openhuman/memory_sync/composio/providers/github/mod.rs b/src/openhuman/memory_sync/composio/providers/github/mod.rs index 94c62d12fb..a6aac2a898 100644 --- a/src/openhuman/memory_sync/composio/providers/github/mod.rs +++ b/src/openhuman/memory_sync/composio/providers/github/mod.rs @@ -12,6 +12,7 @@ //! //! Issue: #2408. +mod ingest; mod provider; mod sync; #[cfg(test)] diff --git a/src/openhuman/memory_sync/composio/providers/github/provider.rs b/src/openhuman/memory_sync/composio/providers/github/provider.rs index 1d2acf53f6..87dd17f659 100644 --- a/src/openhuman/memory_sync/composio/providers/github/provider.rs +++ b/src/openhuman/memory_sync/composio/providers/github/provider.rs @@ -22,10 +22,9 @@ use async_trait::async_trait; use serde_json::json; +use super::ingest::ingest_issue_into_memory_tree; use super::sync; -use crate::openhuman::memory_sync::composio::providers::sync_state::{ - persist_single_item, SyncState, -}; +use crate::openhuman::memory_sync::composio::providers::sync_state::SyncState; use crate::openhuman::memory_sync::composio::providers::{ pick_str, ComposioProvider, CuratedTool, ProviderContext, ProviderUserProfile, SyncOutcome, SyncReason, @@ -302,20 +301,25 @@ impl ComposioProvider for GitHubProvider { let title_text = sync::extract_issue_title(issue) .unwrap_or_else(|| format!("GitHub issue {issue_id}")); - let doc_id = format!("composio-github-issue-{issue_id}"); - match persist_single_item( - &memory, - "github", - &doc_id, + // Route into the memory-tree pipeline (#2885). The prior + // implementation called `persist_single_item` → + // `MemoryClient::store_skill_sync` → UnifiedMemory + // `memory_docs`, which the modern retrieval surfaces + // (`memory.search`, `tree.read_chunk`, `tree.browse`, + // summary trees, MCP tools) don't read from — the data + // was invisible to every agent recall path. + match ingest_issue_into_memory_tree( + &ctx.config, + &connection_id, + &issue_id, &title_text, + updated.as_deref(), issue, - "github", - ctx.connection_id.as_deref(), ) .await { - Ok(_) => { + Ok(_chunks_written) => { state.mark_synced(&sync_key); total_persisted += 1; } @@ -323,7 +327,7 @@ impl ComposioProvider for GitHubProvider { tracing::warn!( issue_id = %issue_id, error = %e, - "[composio:github] failed to persist issue (continuing)" + "[composio:github] failed to ingest issue into memory_tree (continuing)" ); } } From 1f35cf9358dc548c67aa5d38d463c3fde40608ec Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 10:37:01 +0800 Subject: [PATCH 2/4] perf(composio/github): skip chunk scan on fresh-issue ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gate `delete_chunks_by_source` behind `is_source_ingested`. The delete path uses a `source_kind = ?1` scan + Rust-side source-id filter (`store::delete_chunks_by_source_filter`), so a first-time ingest of a never-seen issue would scan every Document-kind chunk just to find zero matches. `is_source_ingested` is an indexed PK lookup against `mem_tree_ingested_sources`, so the common fresh-issue case becomes one cheap `COUNT(*)` and we only pay the scan cost on actual re-ingests of edited issues. Same `spawn_blocking` hop, single combined closure — the gate runs inside the existing blocking task. The 6 ingest tests still pass: the regression test exercises the false → no-delete path (fresh issue); the re-ingest test exercises the true → delete path (edited issue). Same fix carried forward from the parallel Notion PR (#2887), where CodeRabbit caught the issue first. Linear and ClickUp PRs will land with the gate baked in from the start. Refs #2885. --- .../composio/providers/github/ingest.rs | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/openhuman/memory_sync/composio/providers/github/ingest.rs b/src/openhuman/memory_sync/composio/providers/github/ingest.rs index 90854b1b84..24dfd200c0 100644 --- a/src/openhuman/memory_sync/composio/providers/github/ingest.rs +++ b/src/openhuman/memory_sync/composio/providers/github/ingest.rs @@ -47,7 +47,7 @@ use serde_json::Value; use crate::openhuman::config::Config; use crate::openhuman::memory::ingest_pipeline::{self, IngestResult}; -use crate::openhuman::memory_store::chunks::store::delete_chunks_by_source; +use crate::openhuman::memory_store::chunks::store::{delete_chunks_by_source, is_source_ingested}; use crate::openhuman::memory_store::chunks::types::SourceKind; use crate::openhuman::memory_sync::canonicalize::document::DocumentInput; @@ -133,16 +133,33 @@ pub async fn ingest_issue_into_memory_tree( let source_id = github_source_id(connection_id, issue_id); // Re-sync of an edited issue: drop prior chunks for the same source_id - // before re-ingest. `delete_chunks_by_source` is sync I/O so it has to - // hop through `spawn_blocking`. + // before re-ingest. Both calls are sync rusqlite I/O so they share one + // `spawn_blocking` hop. + // + // We gate `delete_chunks_by_source` behind `is_source_ingested` — the + // delete path uses a `source_kind = ?1` scan with Rust-side + // source-id filtering (see `store::delete_chunks_by_source_filter`), + // so on a first-time ingest of a never-seen issue it would scan every + // Document-kind chunk just to find zero matches. `is_source_ingested` + // is an indexed PK lookup against `mem_tree_ingested_sources`, so it + // converts the common fresh-issue case to one cheap `COUNT(*)` and + // only pays the scan cost on actual re-ingests of edited items. let cfg_for_blocking = config.clone(); let source_for_blocking = source_id.clone(); - let removed = tokio::task::spawn_blocking(move || { - delete_chunks_by_source( + let removed = tokio::task::spawn_blocking(move || -> Result { + if is_source_ingested( &cfg_for_blocking, SourceKind::Document, &source_for_blocking, - ) + )? { + delete_chunks_by_source( + &cfg_for_blocking, + SourceKind::Document, + &source_for_blocking, + ) + } else { + Ok(0) + } }) .await .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??; From 9764b31f1784b73a19ef455dd72243e5ac1e8054 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 11:37:29 +0800 Subject: [PATCH 3/4] fix(composio/github): hold cursor on ingest failure + preserve error chain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit caught two correctness issues on this PR. Same shape exists on the parallel Notion PR (#2887) — fix applied there in parallel. ## 1. Cursor advances on ingest failure → data-loss window `provider.rs` updated `newest_updated` for every result before attempting ingest, then advanced the persistent cursor unconditionally at the end of the sync loop. Items whose ingest call returned `Err` were logged + skipped (no `mark_synced`) — but the next sync queried `updated:>{cursor}`, so the failed item was never re-fetched. Pre-#2885 this was just "stale revision": the legacy `persist_single_item` path wasn't delete-first, so a failed write left old chunks alone. Post-#2885 it's worse — `ingest_issue_into_memory_tree` calls `delete_chunks_by_source` *before* the (failing) write, so an *edited* issue that fails to re-ingest is left with neither old nor new chunks in `mem_tree_chunks` until its next edit. Fix: track `had_ingest_failures: bool`, set on every `Err`, and gate the `advance_cursor` at the bottom of the sync loop. Already-synced items are cheaply skipped via `state.is_synced` on the re-fetch next pass, so the cost of holding is just one extra search-API page on the failure range. `set_last_sync_at_ms` still fires — it's a heartbeat, not a fetch-window boundary. A retry-only-failed-items fix (separate `newest_successful_updated` cursor) would be tighter but adds state and edge cases; the simple gate matches CodeRabbit's exact suggestion and is provably safe (cursor only ever advances when every item in the pass succeeded). ## 2. `{err}` drops anyhow context chain `ingest.rs` wrapped the `ingest_pipeline::ingest_document` failure as `anyhow!("ingest_document failed for {source_id}: {err}")`, where `{err}` is Display-only and strips the chain. `provider.rs` then logs the wrapped error with `tracing::warn!(error = %e)` (also Display), so the underlying DB / embedding / persist cause was lost. Fix: swap to `{err:#}` (alternate formatter) so the chain is baked into the wrapping anyhow's Display impl. Matches the convention `provider.rs` already uses on its own `ctx.execute(...).map_err(...)` sites (lines 90, 234, 406). ## Tests 48/48 still pass in `memory_sync::composio::providers::github::*`. The provider's `sync()` is hard to unit-test without mocking `ComposioContext` heavily — the invariant is pinned by the rustdoc on `had_ingest_failures` and the inline gate comment. The ingest-side tests (regression + re-ingest) cover the ingest function's contract; the cursor logic lives entirely in `provider.rs`. Refs CodeRabbit review on #2889, #2885. --- .../composio/providers/github/ingest.rs | 6 ++++- .../composio/providers/github/provider.rs | 27 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/openhuman/memory_sync/composio/providers/github/ingest.rs b/src/openhuman/memory_sync/composio/providers/github/ingest.rs index 24dfd200c0..37f0abd99e 100644 --- a/src/openhuman/memory_sync/composio/providers/github/ingest.rs +++ b/src/openhuman/memory_sync/composio/providers/github/ingest.rs @@ -206,7 +206,11 @@ pub async fn ingest_issue_into_memory_tree( Ok(chunks_written) } Err(err) => Err(anyhow::anyhow!( - "ingest_document failed for {source_id}: {err}" + // `{err:#}` (alternate formatter) bakes in the anyhow context + // chain so provider.rs's `tracing::warn!(error = %e)` doesn't + // strip the underlying cause (DB / embedding / persist failure) + // when it Displays the wrapped error. + "ingest_document failed for {source_id}: {err:#}" )), } } diff --git a/src/openhuman/memory_sync/composio/providers/github/provider.rs b/src/openhuman/memory_sync/composio/providers/github/provider.rs index 87dd17f659..ddf959a60a 100644 --- a/src/openhuman/memory_sync/composio/providers/github/provider.rs +++ b/src/openhuman/memory_sync/composio/providers/github/provider.rs @@ -201,6 +201,14 @@ impl ComposioProvider for GitHubProvider { let mut total_fetched: usize = 0; let mut total_persisted: usize = 0; let mut newest_updated: Option = None; + // Track whether any per-item ingest failed this pass. If so, we hold + // the persistent cursor — `updated:>{cursor}` on the next search + // would otherwise exclude the failed item, and because the new + // memory-tree pipeline (#2885) is delete-first, an *edited* issue + // that failed to re-ingest is left with neither old nor new chunks + // until its next edit. Already-synced items are skipped cheaply via + // `is_synced` on the re-fetch, so the cost of holding is minimal. + let mut had_ingest_failures = false; 'pages: for page_num in 1..=MAX_PAGES { if state.budget_exhausted() { @@ -324,6 +332,7 @@ impl ComposioProvider for GitHubProvider { total_persisted += 1; } Err(e) => { + had_ingest_failures = true; tracing::warn!( issue_id = %issue_id, error = %e, @@ -346,8 +355,22 @@ impl ComposioProvider for GitHubProvider { } // ── Step 5: advance cursor and save state ──────────────────── - if let Some(new_cursor) = newest_updated { - state.advance_cursor(&new_cursor); + // + // Hold the cursor when any item failed to ingest this pass. See the + // `had_ingest_failures` declaration above for why this matters under + // the delete-first memory-tree pipeline (#2885). `set_last_sync_at_ms` + // still advances — that's just a heartbeat, not a fetch-window + // boundary, so it's safe to record that we did attempt a sync. + if !had_ingest_failures { + if let Some(new_cursor) = newest_updated { + state.advance_cursor(&new_cursor); + } + } else { + tracing::warn!( + connection_id = %connection_id, + "[composio:github] holding cursor — ingest failures this pass; next sync will \ + re-fetch the failed range" + ); } state.set_last_sync_at_ms(sync::now_ms()); state.save(&memory).await?; From 7d27b5d486599b4d47c34f31f5d0d2ef3a155cf8 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 16:51:51 +0800 Subject: [PATCH 4/4] style(composio/github): use tracing::debug! with structured fields in ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per @graycyrus review on #2889: the two `log::debug!` calls in `ingest_issue_into_memory_tree` used positional format strings, while `provider.rs` in the same directory logs via `tracing::*` with structured key=value fields. Positional log args are flattened to an opaque message by the tracing-log bridge, so log-aggregation pipelines that filter on span fields (connection_id, issue_id, …) lose them. Swap both calls to `tracing::debug!` with named fields, matching the provider.rs shape exactly: - re-ingest cleanup: connection_id / issue_id / removed_chunks - issue persisted: connection_id / issue_id / chunks_written / already_ingested Scope note: the older ingest modules (slack, gmail, vault::sync) still use `log::*` — those are pre-existing and out of scope here. The new memory-tree ingest modules (this + notion in #2887) now consistently use tracing alongside their provider.rs. A follow-up sweep to unify the legacy modules can be filed separately if desired. Refs #2889. --- .../composio/providers/github/ingest.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/openhuman/memory_sync/composio/providers/github/ingest.rs b/src/openhuman/memory_sync/composio/providers/github/ingest.rs index 37f0abd99e..b3009d9ecf 100644 --- a/src/openhuman/memory_sync/composio/providers/github/ingest.rs +++ b/src/openhuman/memory_sync/composio/providers/github/ingest.rs @@ -164,11 +164,11 @@ pub async fn ingest_issue_into_memory_tree( .await .map_err(|e| anyhow::anyhow!("delete-prior task join error: {e}"))??; if removed > 0 { - log::debug!( - "[composio:github] ingest: re-ingest cleanup connection_id={} issue_id={} removed_chunks={}", - connection_id, - issue_id, - removed + tracing::debug!( + connection_id = %connection_id, + issue_id = %issue_id, + removed_chunks = removed, + "[composio:github] ingest: re-ingest cleanup" ); } @@ -196,12 +196,12 @@ pub async fn ingest_issue_into_memory_tree( // the normal update path. Seeing it here means the prior // chunks were already absent (fresh ingest into a primed // memory_tree) — fine, just log at debug. - log::debug!( - "[composio:github] ingest: issue connection_id={} issue_id={} chunks_written={} already_ingested={}", - connection_id, - issue_id, + tracing::debug!( + connection_id = %connection_id, + issue_id = %issue_id, chunks_written, already_ingested, + "[composio:github] ingest: issue persisted" ); Ok(chunks_written) }