` as the prefix. | Filter saved-schedules list by **both** prefixes when `skillId === 'dev-workflow'` so existing jobs surface. Don't migrate names — they keep working. |
+| **i18n drift** — there's already significant pre-existing drift in `en-N.ts` chunks for `settings.skillsRunner.*` keys (audit log shows ~50 keys in `en.ts` not in chunks). | Address chunk drift for the **new** keys this PR adds (toggle, history, smart-picker). Pre-existing drift is out of scope but worth a follow-up. |
+| **`DevWorkflowPanel` removal breaks the deep link or settings nav route.** | Check `settings/AppSettings.tsx` (or wherever the nav is wired) and either preserve the route as a redirect to `/skills` or update the nav entry. |
+| **Coverage gate (≥80% on changed lines)** on a 990-line component. | Add focused tests per Phase 3 chunk: toggle test (Phase 2), history-expand test (chunk 1), active-card render test (chunk 2), conditional-picker test (chunk 3). |
+| **Stale-component test deletion regressing dev-workflow coverage.** | If `DevWorkflowPanel.test.tsx` is deleted, ensure equivalent coverage exists in `SkillsRunnerBody.test.tsx` for the smart-picker path. |
+
+### Test plan
+
+Per phase commit:
+
+- **Phase 2**: `SkillsRunnerBody.test.tsx` — render one saved job, click toggle, assert `openhumanCronUpdate` called with `{ enabled: false }`, assert refresh-list invoked.
+- **Phase 3.1**: render one saved job with `runHistory`, click expand, assert per-run output `` visible. Assert `openhumanCronRuns(jobId, 5)` called on render.
+- **Phase 3.2**: render multiple jobs, assert most-recent-active sorted to top with `data-testid="active-schedule"` (or equivalent). Assert non-enabled jobs sorted below.
+- **Phase 3.3**: render with `skillId === 'dev-workflow'`, assert `SmartIssuePicker` present. Render with `skillId === 'github-issue-crusher'`, assert it's absent. Subcomponent's own tests cover Composio loading/error paths (move from `DevWorkflowPanel.test.tsx`).
+- **Phase 3.4**: if DevWorkflowPanel becomes a redirect, assert nav still routes correctly; if deleted, delete the test file.
+
+---
+
+## Open questions
+
+1. **Schema-driven pickers vs. hard-coded `if (skillId === 'dev-workflow')`.** The clean answer is to extend `skill.toml`'s `[[inputs]]` schema with an optional `picker = "github-issue"` field, and let the runner route to a `SmartIssuePicker` subcomponent based on the picker key. The shortcut answer is the hard-coded `if`. Phase 3 chunk 3 ships the shortcut with a `TODO(picker-schema)` comment. The schema upgrade is a follow-up issue worth filing — it would also benefit `RepoPicker` and `BranchPicker`, which today route by *name convention* (`REPO_INPUT_NAMES` / `BRANCH_INPUT_NAMES` sets in SkillsRunnerBody:42-55), which is brittle.
+2. **One enabled schedule per (skill, inputs) combo, or many?** Today DevWorkflowPanel allows only one (it looks up `name?.startsWith('dev-workflow')` and updates in place). `SkillsRunnerBody` allows many (the cron-name hash includes input values, so two different repos produce two jobs). The unified UX should keep "many" — but display only one as the prominent "ACTIVE" card. Pre-existing `dev-workflow-` jobs will surface in the list once we add the dual-prefix filter.
+3. **Run history retention.** DevWorkflowPanel pulls last 5; SkillsRunnerBody's bottom "Recent runs" scans last 10 log files. Unify on a single source? For now, keep both — the per-schedule history is structured cron data, the bottom list is a cross-cutting "what ran lately" surface useful even with no schedules. Worth re-evaluating once both surfaces are in production for a few weeks.
+4. **`last_output` field on `CoreCronJob` vs. per-run output on `CoreCronRun`.** Today both exist (`cron.ts:42-43` and `cron.ts:51`). DevWorkflowPanel renders `existingJob.last_output` in the active card AND per-run `run.output` in history rows. After unification, drop the duplicated `last_output` block; the per-run history already shows the most recent run's output. Lightweight change.
+5. **Deprecation timing for the Settings → Developer Options → Dev Workflow nav entry.** Strip immediately (Phase 3 chunk 4), or leave as a redirect for one release? Leaning toward strip — the user is the maintainer, the panel is dev-only, and `/skills` is more discoverable than a buried Settings sub-page.
diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs
index 14aa5f9363..4754cf659e 100644
--- a/src/core/jsonrpc.rs
+++ b/src/core/jsonrpc.rs
@@ -1488,6 +1488,10 @@ async fn run_server_inner(
),
Err(e) => log::warn!("[boot] whatsapp_data::global init failed: {e}"),
}
+ // Seed bundled default skills into /skills/ so they
+ // ship with the system — discoverable (skills_list) and runnable
+ // — without a manual drop. Idempotent; never clobbers user edits.
+ crate::openhuman::skills::registry::seed_default_skills(&cfg.workspace_dir);
}
Err(e) => {
log::error!(
diff --git a/src/openhuman/agent/agents/code_executor/agent.toml b/src/openhuman/agent/agents/code_executor/agent.toml
index 1482d31c15..47e7d4c48c 100644
--- a/src/openhuman/agent/agents/code_executor/agent.toml
+++ b/src/openhuman/agent/agents/code_executor/agent.toml
@@ -1,7 +1,7 @@
id = "code_executor"
display_name = "Code Executor"
delegate_name = "run_code"
-when_to_use = "Sandboxed developer — writes, runs, and debugs code until tests pass. Use for any task that requires producing or modifying source files and exercising them with shell or test commands."
+when_to_use = "Code-repo worker — owns the FULL lifecycle of any task scoped to a code repository: clone, navigate via `codegraph_search` / `grep` / `lsp`, read files, edit (`edit` / `apply_patch` / `file_write`), build, run tests/lint, and drive **local `git`** (branch / commit / push / diff) for the working tree. **GitHub state I/O (issues, PRs, comments, reviews, checks, labels) goes through `composio_execute` with the matching `GITHUB_*` tool — never `gh` CLI** (see code_executor prompt.md 'GitHub I/O' section for the full split rule and rationale). Use for ANY repo-scoped work — locating where to edit, investigating a bug, exploring a codebase, and any modify / build / test / git / push / PR step — not only for the literal 'writing code' moment. Keep the entire end-to-end flow inside one `delegate_run_code` call so the worker accumulates context across steps."
temperature = 0.4
max_iterations = 10
max_result_chars = 16000
@@ -25,6 +25,11 @@ hint = "coding"
# OPENHUMAN_LSP_ENABLED — listing it here is harmless when the gate is
# off (the tool is simply not registered).
named = [
+ # codegraph navigation — preferred first step for locating code in a repo.
+ # `codegraph_search` auto-indexes on first use; see the prompt's
+ # "Finding code in a repo — codegraph first" section.
+ "codegraph_search",
+ "codegraph_index",
"shell",
"file_read",
"file_write",
diff --git a/src/openhuman/agent/agents/code_executor/prompt.md b/src/openhuman/agent/agents/code_executor/prompt.md
index 204dd8b61c..4c1cb704ed 100644
--- a/src/openhuman/agent/agents/code_executor/prompt.md
+++ b/src/openhuman/agent/agents/code_executor/prompt.md
@@ -9,6 +9,36 @@ You are the **Code Executor** agent. You write, run, and debug code in a sandbox
- Run tests and interpret results
- Git operations (commit, diff, status)
+## Finding code in a repo — codegraph_search FIRST (hard rule)
+
+**Your first navigation tool call in any repository MUST be `codegraph_search`.** Calling `grep` / `glob` / `lsp` / `find` / shell-`grep` / `rg` / `file_read` of the tree *before* `codegraph_search` is a **process error** — back up and call `codegraph_search` first.
+
+`codegraph_search` returns the files most relevant to a query (the symbols, identifiers, error strings, or feature you're changing) and **auto-indexes the repo on its first call** (~30–90s on a fresh clone — this is the index build, **not a hang**; do not retry, do not switch tools). Subsequent calls are millisecond-cheap.
+
+After `codegraph_search` returns, inspect the `coverage` flag:
+
+- `coverage: full` → read the top hits with `file_read` and confirm the exact edit site.
+- `coverage: partial` → refine with `grep` **scoped to the directories codegraph returned** (not the whole tree), then `file_read` the refined hits.
+- `coverage: none` (or zero hits) → only then may you fall back to a blind `grep` / `glob` over the tree.
+
+This applies even for "obvious" string searches like i18n keys, error messages, or literal config names — codegraph returns ranked structural+semantic hits in one call where a blind `grep` returns every occurrence and forces you to re-rank by hand. Use it every time.
+
+## GitHub I/O — Composio for state, local `git` for working tree (hard rule)
+
+When a task involves a GitHub repository, you act through **two distinct surfaces**, never both with the same intent. Mixing them — or shelling `gh` for state ops — is a process error.
+
+| Op | Surface | How |
+| --- | --- | --- |
+| **Read** issues / PRs / review comments / check runs / labels / commit metadata | **Composio** | `composio_execute({ tool: "GITHUB_GET_PULL_REQUEST" | "GITHUB_LIST_REVIEW_COMMENTS" | "GITHUB_GET_COMBINED_STATUS" | "GITHUB_GET_ISSUE" | "GITHUB_LIST_ISSUES" | … })` |
+| **Write** PRs / comments / reviews / labels / branch as remote ref | **Composio** | `composio_execute({ tool: "GITHUB_CREATE_PULL_REQUEST" | "GITHUB_CREATE_ISSUE_COMMENT" | "GITHUB_CREATE_REVIEW" | "GITHUB_ADD_LABELS" | … })` |
+| **Working tree**: clone, branch, status, diff, add, commit, push, log, stash, restore | **Local `git`** (shell) | `git clone …`, `git checkout -b …`, `git diff`, `git commit -m …`, `git push origin ` (when push credentials exist) |
+| **Tests / build / lint** | **Local shell** | `pnpm test`, `cargo check`, `pytest`, `make`, etc. — run inside the cloned working tree |
+| **Code navigation** | **`codegraph_search`** (then `file_read`) | See the section above |
+
+**Do not shell `gh` for GitHub state ops.** `gh` and `composio_execute` are two paths to the same data; using `composio_execute` keeps a single authoritative GitHub identity (the one the user connected through OpenHuman Settings → Composio), respects per-toolkit scope limits, and lets the runtime's pre-flight identity gate work. `gh` bypasses all of that. Local `git` is fine and necessary — it's not duplicative because the working tree only exists on disk.
+
+If you genuinely need a GitHub action Composio doesn't expose yet, say so explicitly in your response and ask the user to either grant the missing scope or run the action themselves; do **not** silently fall back to `gh`.
+
## Execution environment
Shell commands run through an approval gate under the user's access policy. Keep this in mind so you don't waste turns being blocked:
@@ -21,6 +51,9 @@ Shell commands run through an approval gate under the user's access policy. Keep
## Rules
+- **codegraph_search is the FIRST navigation call (hard rule)** — see the "Finding code in a repo" section above. `grep` / `glob` / `lsp` / `file_read` of the tree before `codegraph_search` is a process error; back up and call `codegraph_search` first.
+- **GitHub state ops go through `composio_execute`, NOT `gh` (hard rule)** — see the "GitHub I/O" section above. Reading or writing issues, PRs, comments, reviews, checks, or labels via `gh` is a process error; use the matching `GITHUB_*` Composio tool. Local `git` stays for the working tree (clone, branch, commit, push, diff, tests, build, codegraph) — that's not duplication, that's the split.
+- **Don't explore forever — commit to an edit** — after at most a few rounds of locate (`codegraph_search` → `file_read` top hits → confirm), TRANSITION to editing. Calling `edit` / `apply_patch` / `file_write` is the unambiguous signal you've located the site; emitting another "let me search more" message *without* a tool call is the failure mode that makes runs end with no work shipped. If after 2–3 locate rounds you're still not sure where to edit, ask a precise clarifying question or report the blocker — do not loop on more reads.
- **Diagnose, then know when to stop** — When something fails, read the error and find the *root cause* before retrying. Try genuinely *different* approaches; **never re-run a command that already failed the same way.** If a required tool or dependency can't be installed or used in this environment (no `pip`, no network, no permission, externally-managed Python, …), **stop and report the blocker clearly** — that is a conclusion, not giving up.
- **Run tests** — After writing code, run relevant tests to verify correctness.
- **Stay in scope** — Only do what was asked. Don't refactor unrelated code.
diff --git a/src/openhuman/agent/agents/orchestrator/agent.toml b/src/openhuman/agent/agents/orchestrator/agent.toml
index 001417c02e..330a78e2bb 100644
--- a/src/openhuman/agent/agents/orchestrator/agent.toml
+++ b/src/openhuman/agent/agents/orchestrator/agent.toml
@@ -138,6 +138,15 @@ named = [
# touch files itself.
"todowrite",
"plan_exit",
+ # Skill chaining: let an in-flight autonomous skill (e.g.
+ # `github-issue-crusher` after the draft PR is open) spawn another
+ # bundled skill_run (e.g. `pr-review-shepherd` against that PR) as a
+ # fresh background job, so each skill stays narrow + composable.
+ # Thin wrapper over `skills::schemas::spawn_skill_run_background` — the
+ # same helper the `openhuman.skills_run` JSON-RPC controller uses, so
+ # RPC callers and tool callers share one spawn path (iter cap,
+ # transcript isolation, degenerate-response detector all apply).
+ "run_skill",
# Self-update — lets the orchestrator answer "am I up to date" /
# "update OpenHuman" without sending the user to Settings →
# Developer Options. `update_check` is read-only; `update_apply`
diff --git a/src/openhuman/agent/agents/orchestrator/prompt.md b/src/openhuman/agent/agents/orchestrator/prompt.md
index f0d51466bd..5c55e1fedc 100644
--- a/src/openhuman/agent/agents/orchestrator/prompt.md
+++ b/src/openhuman/agent/agents/orchestrator/prompt.md
@@ -27,7 +27,7 @@ Follow this sequence for every user message:
- No: continue.
4. **Does this need other specialised execution?**
- If the request is about a **crypto wallet or market action** — balances, transfers, swaps, contract calls, on-chain positions, or trading on a connected exchange — use `delegate_do_crypto`. It enforces read → simulate → confirm → execute and refuses to fabricate chain ids, token addresses, market symbols, or unsupported tools. **Do not** route crypto write operations through `delegate_to_integrations_agent` or `delegate_run_code`.
- - If code writing/execution/debugging is required, use `delegate_run_code`.
+ - **Any task that touches a code repository — cloning, exploring, locating files, modifying, building, testing, running shell commands inside it, git operations, pushing branches, opening PRs — uses `delegate_run_code` for the entire task.** Treat "locate where to edit", "investigate the bug", "find the function", "read the file" as code-repo work the moment they're scoped to a repo: they belong inside the same `delegate_run_code` worker as the edit / build / git steps. **Never** route code-repo work through `tools_agent` / `spawn_worker_thread`; those workers lack `edit` / `apply_patch` / `file_write` / `git_operations` / `codegraph_search` and will silently stall in read-mode. `tools_agent` is for *non-repo* work only — ad-hoc shell against the host, web fetch, memory helpers, etc.
- If web/doc crawling is required, use `research`.
- If the user asks for live/current/time-sensitive facts that are not covered by a direct tool — weather, forecasts, current temperatures, recent news, fresh web facts, or "use Grok/web/live data" — call `research` with a prompt that asks for live sources. Do **not** stop at "on it", and do **not** wait for the exact named provider if it is not wired in. Use the available research tool and then answer with the result.
- If complex multi-step decomposition is required, use `delegate_plan`.
diff --git a/src/openhuman/agent/agents/tools_agent/agent.toml b/src/openhuman/agent/agents/tools_agent/agent.toml
index acfa4069f8..7b2dc03906 100644
--- a/src/openhuman/agent/agents/tools_agent/agent.toml
+++ b/src/openhuman/agent/agents/tools_agent/agent.toml
@@ -1,6 +1,6 @@
id = "tools_agent"
display_name = "Tools Agent"
-when_to_use = "Generalist specialist for heavyweight ad-hoc execution with built-in OpenHuman tools (shell, file I/O, HTTP, web search, memory). Use only when direct orchestrator handling is insufficient and the task needs substantial tool-driven execution, but does NOT require managed Composio OAuth integrations. For external SaaS, spawn `integrations_agent` with a `toolkit` argument instead."
+when_to_use = "Generalist for heavyweight ad-hoc execution that does NOT touch a code repository — host shell, HTTP/web fetch, web search, memory helpers, file READS (`file_read` / `grep` / `glob`). Lacks `edit` / `apply_patch` / `file_write` / `git_operations` / `codegraph_search` — do **not** use for any task scoped to a code repo (cloning, locating, modifying, building, testing, git, push, PR); those route to `delegate_run_code` end-to-end. Do not use for managed Composio OAuth integrations either — those route to `integrations_agent` with a `toolkit` argument."
temperature = 0.4
max_iterations = 10
sandbox_mode = "none"
diff --git a/src/openhuman/agent/harness/subagent_runner/autonomous.rs b/src/openhuman/agent/harness/subagent_runner/autonomous.rs
new file mode 100644
index 0000000000..3c4964fcb4
--- /dev/null
+++ b/src/openhuman/agent/harness/subagent_runner/autonomous.rs
@@ -0,0 +1,29 @@
+//! Autonomous skill-run overrides.
+//!
+//! `skills_run` runs the orchestrator (and any sub-agents it spawns) as an
+//! unattended background tree: it isn't approval-gated (background turns carry
+//! no `APPROVAL_CHAT_CONTEXT`), and the per-agent iteration cap is lifted so the
+//! run continues until it's done or the repeated-failure circuit breaker trips.
+//!
+//! The lifted cap rides a `tokio` task-local set around the orchestrator's
+//! `run_single`. Sub-agent inner loops are awaited *inline* within that scope
+//! (`run_subagent` does not detach), so the task-local reaches them too — one
+//! switch covers the whole tree.
+
+use std::future::Future;
+
+tokio::task_local! {
+ static AUTONOMOUS_ITER_CAP: usize;
+}
+
+/// The active autonomous iteration cap, if a skill run scoped one.
+pub fn autonomous_iter_cap() -> Option {
+ AUTONOMOUS_ITER_CAP.try_with(|c| *c).ok()
+}
+
+/// Run `fut` with an autonomous iteration cap in scope. The cap propagates to
+/// every agentic loop awaited within — the orchestrator turn and the inline
+/// sub-agent loops.
+pub async fn with_autonomous_iter_cap(cap: usize, fut: F) -> F::Output {
+ AUTONOMOUS_ITER_CAP.scope(cap, fut).await
+}
diff --git a/src/openhuman/agent/harness/subagent_runner/mod.rs b/src/openhuman/agent/harness/subagent_runner/mod.rs
index c00d17f10c..41feabe265 100644
--- a/src/openhuman/agent/harness/subagent_runner/mod.rs
+++ b/src/openhuman/agent/harness/subagent_runner/mod.rs
@@ -30,6 +30,7 @@
//! | `extract_tool.rs` | `extract_from_result` tool (direct provider extraction) |
//! | `tool_prep.rs` | Tool filtering + prompt loading + text-mode protocol block |
+mod autonomous;
mod extract_tool;
mod handoff;
mod ops;
@@ -37,6 +38,7 @@ mod tool_prep;
mod types;
// Public API — the entry point and the shapes it returns.
+pub use autonomous::{autonomous_iter_cap, with_autonomous_iter_cap};
pub use ops::run_subagent;
pub use types::{SubagentMode, SubagentRunError, SubagentRunOptions, SubagentRunOutcome};
diff --git a/src/openhuman/agent/harness/subagent_runner/ops.rs b/src/openhuman/agent/harness/subagent_runner/ops.rs
index 9f0aa6bd05..abe5a5f8a0 100644
--- a/src/openhuman/agent/harness/subagent_runner/ops.rs
+++ b/src/openhuman/agent/harness/subagent_runner/ops.rs
@@ -1230,7 +1230,13 @@ async fn run_inner_loop(
handoff_cache: Option<&ResultHandoffCache>,
parent: &ParentExecutionContext,
) -> Result<(String, usize, AggregatedUsage), SubagentRunError> {
- let max_iterations = max_iterations.max(1);
+ // An autonomous skill run (set via `with_autonomous_iter_cap`) lifts the
+ // per-agent cap so sub-agents run until done / the circuit breaker trips.
+ // Take the larger of the two so a sub-agent that already wants more keeps it.
+ let max_iterations = super::autonomous::autonomous_iter_cap()
+ .map(|cap| cap.max(max_iterations))
+ .unwrap_or(max_iterations)
+ .max(1);
// Compiled digest of this sub-agent run's tool calls + results, for a
// graceful checkpoint if it hits the iteration cap (mirrors the main
diff --git a/src/openhuman/codegraph/index.rs b/src/openhuman/codegraph/index.rs
new file mode 100644
index 0000000000..301c76768c
--- /dev/null
+++ b/src/openhuman/codegraph/index.rs
@@ -0,0 +1,829 @@
+//! Indexing: enumerate a git tree's blobs → for each unseen `(content, model)`
+//! extract a structural-aug doc + BM25 tokens, embed it, and cache by blob SHA;
+//! then write the `(repo, ref)` manifest. Content-addressed + incremental: a
+//! branch switch / new commit / pull only (re)embeds the blobs that changed.
+//!
+//! The structural extractor here is a dependency-free heuristic (signatures +
+//! imports + call identifiers + leading doc/comments) — the same *content* the
+//! validated prototype's `ast` pass produced. A tree-sitter upgrade (better
+//! extraction + the repo-map call graph) slots in behind [`structural_doc`].
+//!
+//! The embedder is injected (`&dyn EmbeddingProvider`) so the flow unit-tests
+//! with a fake; production passes the configured (cloud-default) provider, and
+//! its `signature()` becomes the blob cache's `model` key.
+
+use anyhow::{Context, Result};
+use std::path::Path;
+use std::process::Command;
+
+use crate::openhuman::embeddings::EmbeddingProvider;
+
+use super::store::CodegraphStore;
+
+const CODE_EXTS: &[&str] = &[
+ "rs", "py", "js", "jsx", "ts", "tsx", "go", "java", "rb", "c", "cc", "cpp", "h", "hpp", "cs",
+ "php", "kt", "swift", "scala", "sh",
+];
+const MAX_FILE_BYTES: u64 = 100_000;
+const MAX_CALLS: usize = 200;
+/// Structural docs embedded per provider call. One call per file would be one
+/// network round-trip per file against a cloud embedder; batching collapses a
+/// repo into a handful of calls.
+const EMBED_BATCH: usize = 128;
+
+/// Cache `model` key for a lexical-only (BM25, no embedding) index. Kept
+/// separate from any embedder signature so a later dense pass indexes fresh
+/// under its own key rather than colliding with these embedding-less rows.
+pub const LEXICAL_MODEL: &str = "codegraph:lexical:v1";
+
+/// What to build for a `(repo, ref)`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum IndexMode {
+ /// BM25 tokens only — no embedding calls. Cheap; enough for small repos
+ /// where recall saturates anyway.
+ Lexical,
+ /// Structural-aug dense vectors + BM25 tokens — the full seed. Worth its
+ /// embedding cost on larger repos.
+ Dense,
+}
+
+impl IndexMode {
+ /// The blob-cache `model` key this mode writes/reads under.
+ pub fn model_key(self, embedder: &dyn EmbeddingProvider) -> String {
+ match self {
+ IndexMode::Lexical => LEXICAL_MODEL.to_string(),
+ IndexMode::Dense => embedder.signature(),
+ }
+ }
+}
+
+/// Count tracked code files at the checkout — the cheap signal (`git ls-files`,
+/// no reads/embeds) used to choose [`IndexMode`] before indexing.
+pub fn count_code_files(repo_dir: &Path) -> Result {
+ Ok(tree_blobs(repo_dir)?.len())
+}
+
+/// Per-index outcome. On a branch switch, `computed` is just the changed blobs.
+#[derive(Debug, Clone, serde::Serialize)]
+pub struct IndexReport {
+ pub repo_id: String,
+ pub git_ref: String,
+ pub files: usize,
+ pub computed: usize,
+ pub cached: usize,
+ pub skipped: usize,
+}
+
+fn git(repo_dir: &Path, args: &[&str]) -> Result {
+ let out = Command::new("git")
+ .arg("-C")
+ .arg(repo_dir)
+ .args(args)
+ .output()
+ .with_context(|| format!("git {args:?}"))?;
+ if !out.status.success() {
+ anyhow::bail!(
+ "git {args:?} failed: {}",
+ String::from_utf8_lossy(&out.stderr)
+ );
+ }
+ Ok(String::from_utf8_lossy(&out.stdout).into_owned())
+}
+
+/// Branch name if on a branch, else the short commit SHA (detached).
+pub fn current_ref(repo_dir: &Path) -> Result {
+ if let Ok(s) = git(repo_dir, &["symbolic-ref", "--quiet", "--short", "HEAD"]) {
+ let s = s.trim();
+ if !s.is_empty() {
+ return Ok(s.to_string());
+ }
+ }
+ Ok(git(repo_dir, &["rev-parse", "--short", "HEAD"])?
+ .trim()
+ .to_string())
+}
+
+/// `(path, blob_sha)` for tracked code files at the current checkout.
+fn tree_blobs(repo_dir: &Path) -> Result> {
+ let mut out = Vec::new();
+ for line in git(repo_dir, &["ls-files", "-s"])?.lines() {
+ // ` \t`
+ let (meta, path) = match line.split_once('\t') {
+ Some(p) => p,
+ None => continue,
+ };
+ let sha = match meta.split_whitespace().nth(1) {
+ Some(s) => s,
+ None => continue,
+ };
+ let ext = Path::new(path)
+ .extension()
+ .and_then(|e| e.to_str())
+ .unwrap_or("");
+ if CODE_EXTS.contains(&ext) {
+ out.push((path.to_string(), sha.to_string()));
+ }
+ }
+ Ok(out)
+}
+
+/// Lexical tokens with identifier splitting (camelCase / snake_case), so the
+/// BM25 arm matches `__floordiv__` and `floordiv`/`floor`/`div` alike.
+pub fn code_tokens(text: &str) -> Vec {
+ let mut toks = Vec::new();
+ for raw in text.split(|c: char| !c.is_ascii_alphanumeric()) {
+ if raw.is_empty() {
+ continue;
+ }
+ let low = raw.to_ascii_lowercase();
+ toks.push(low.clone());
+ // split camelCase / snake (already split on non-alnum) into sub-words
+ let mut cur = String::new();
+ let mut prev_lower = false;
+ for ch in raw.chars() {
+ if ch.is_ascii_uppercase() && prev_lower && !cur.is_empty() {
+ toks.push(cur.to_ascii_lowercase());
+ cur.clear();
+ }
+ cur.push(ch);
+ prev_lower = ch.is_ascii_lowercase();
+ }
+ let sub = cur.to_ascii_lowercase();
+ if !sub.is_empty() && sub != low {
+ toks.push(sub);
+ }
+ }
+ toks
+}
+
+/// Heuristic, content-only "intent" text: definition signatures + imports +
+/// called-symbol identifiers + leading doc/comment lines. Path is excluded so
+/// the result is purely content-derived (cacheable by blob SHA).
+pub fn structural_doc(text: &str) -> String {
+ let mut sigs = Vec::new();
+ let mut imports = Vec::new();
+ let mut docs = Vec::new();
+ let mut calls: Vec = Vec::new();
+ let mut seen_calls = std::collections::HashSet::new();
+
+ for line in text.lines() {
+ let t = line.trim();
+ if t.is_empty() {
+ continue;
+ }
+ let lead = t.split_whitespace().next().unwrap_or("");
+ match lead {
+ // definition keywords across the supported languages
+ "fn" | "def" | "class" | "struct" | "impl" | "trait" | "enum" | "interface"
+ | "type" | "func" | "function" | "module" | "public" | "private" | "protected"
+ | "pub" | "async" | "export" | "const" => {
+ sigs.push(t.trim_end_matches('{').trim().to_string());
+ }
+ "import" | "use" | "from" | "require" | "#include" | "package" => {
+ imports.push(t.to_string());
+ }
+ _ => {}
+ }
+ if t.starts_with("//")
+ || t.starts_with("///")
+ || t.starts_with('#')
+ || t.starts_with('*')
+ || t.starts_with("\"\"\"")
+ {
+ docs.push(t.trim_start_matches(['/', '#', '*', ' ', '"']).to_string());
+ }
+ // naive call extraction: `ident(`
+ for (i, _) in line.match_indices('(') {
+ let prefix = &line[..i];
+ let name: String = prefix
+ .chars()
+ .rev()
+ .take_while(|c| c.is_ascii_alphanumeric() || *c == '_')
+ .collect::()
+ .chars()
+ .rev()
+ .collect();
+ if name.len() >= 2 && seen_calls.insert(name.clone()) && calls.len() < MAX_CALLS {
+ calls.push(name);
+ }
+ }
+ }
+
+ let mut parts = Vec::new();
+ if !sigs.is_empty() {
+ parts.push(format!("symbols: {}", sigs.join(" ")));
+ }
+ if !imports.is_empty() {
+ parts.push(format!("imports: {}", imports.join(" ")));
+ }
+ if !calls.is_empty() {
+ parts.push(format!("calls: {}", calls.join(" ")));
+ }
+ if !docs.is_empty() {
+ parts.push(format!("docs: {}", docs.join(" ")));
+ }
+ parts.join("\n")
+}
+
+fn l2_normalize(v: &mut [f32]) {
+ let norm = v.iter().map(|x| x * x).sum::().sqrt();
+ if norm > 0.0 {
+ for x in v.iter_mut() {
+ *x /= norm;
+ }
+ }
+}
+
+/// (Re)index the checkout at `repo_dir` under `(repo_id, ref)`. Only blobs not
+/// already cached for this `mode`'s key are read + (in `Dense`) embedded; the
+/// rest are cache hits. Then the ref's manifest is rewritten to the current
+/// tree. In `Lexical` mode no embedder call is made — tokens only.
+pub async fn index_ref(
+ store: &mut CodegraphStore,
+ repo_id: &str,
+ repo_dir: &Path,
+ git_ref: Option<&str>,
+ embedder: &dyn EmbeddingProvider,
+ mode: IndexMode,
+) -> Result {
+ let git_ref = match git_ref {
+ Some(r) => r.to_string(),
+ None => current_ref(repo_dir)?,
+ };
+ let model = mode.model_key(embedder);
+ let blobs = tree_blobs(repo_dir)?;
+ let (mut cached, mut skipped) = (0usize, 0usize);
+
+ // Phase 1 — read + extract every *uncached, unique* blob. No DB writes and
+ // no embedding yet, so phases 2 and 3 can batch both. A content SHA seen
+ // twice in the tree (identical file) is extracted once.
+ let mut seen = std::collections::HashSet::new();
+ let mut pend_sha: Vec = Vec::new();
+ let mut pend_tokens: Vec> = Vec::new();
+ let mut pend_docs: Vec = Vec::new();
+ for (path, sha) in &blobs {
+ if !seen.insert(sha.clone()) || store.has_blob(sha, &model)? {
+ cached += 1;
+ continue;
+ }
+ let full = repo_dir.join(path);
+ match std::fs::metadata(&full) {
+ Ok(m) if m.len() > MAX_FILE_BYTES => {
+ skipped += 1;
+ continue;
+ }
+ Err(_) => {
+ skipped += 1;
+ continue;
+ }
+ _ => {}
+ }
+ let text = match std::fs::read_to_string(&full) {
+ Ok(t) => t,
+ Err(_) => {
+ skipped += 1;
+ continue;
+ }
+ };
+ let tokens = code_tokens(&text);
+ if mode == IndexMode::Dense {
+ // A file with no extractable structure (empty `__init__.py`, a data
+ // file, `x = 1`) yields an empty structural doc. Embedders reject
+ // empty input (the cloud backend 400s the whole batch), so fall
+ // back to the lexical tokens — still content-derived, cacheable by
+ // blob SHA. (Skipped entirely in Lexical mode — no embedding.)
+ let doc = structural_doc(&text);
+ let doc = if doc.trim().is_empty() {
+ if tokens.is_empty() {
+ "(no extractable content)".to_string()
+ } else {
+ tokens.join(" ")
+ }
+ } else {
+ doc
+ };
+ pend_docs.push(doc);
+ }
+ pend_tokens.push(tokens);
+ pend_sha.push(sha.clone());
+ }
+
+ // Phase 2 — produce a vector per pending blob. Lexical: empty vectors (no
+ // embedder call). Dense: embed the structural docs in batches (few
+ // round-trips, not one per file), L2-normalising each.
+ let mut embs: Vec> = Vec::with_capacity(pend_sha.len());
+ match mode {
+ IndexMode::Lexical => embs.resize(pend_sha.len(), Vec::new()),
+ IndexMode::Dense => {
+ for chunk in pend_docs.chunks(EMBED_BATCH) {
+ let refs: Vec<&str> = chunk.iter().map(String::as_str).collect();
+ let out = embedder
+ .embed(&refs)
+ .await
+ .context("codegraph: embed structural docs")?;
+ if out.len() != chunk.len() {
+ anyhow::bail!(
+ "codegraph: embedder returned {} vectors for {} inputs",
+ out.len(),
+ chunk.len()
+ );
+ }
+ for mut v in out {
+ l2_normalize(&mut v);
+ embs.push(v);
+ }
+ }
+ }
+ }
+
+ // Phase 3 — persist the whole batch in one transaction, then rewrite the
+ // ref's manifest.
+ let computed = pend_sha.len();
+ let entries: Vec<(String, Vec, Vec)> = pend_sha
+ .into_iter()
+ .zip(pend_tokens)
+ .zip(embs)
+ .map(|((sha, tokens), emb)| (sha, tokens, emb))
+ .collect();
+ store.put_blobs(&model, &entries)?;
+ store.set_manifest(repo_id, &git_ref, &blobs)?;
+
+ Ok(IndexReport {
+ repo_id: repo_id.to_string(),
+ git_ref,
+ files: blobs.len(),
+ computed,
+ cached,
+ skipped,
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use async_trait::async_trait;
+ use tempfile::TempDir;
+
+ #[test]
+ fn code_tokens_splits_identifiers() {
+ let t = code_tokens("def __floordiv__(self): TimedeltaIndex");
+ assert!(t.contains(&"floordiv".to_string()));
+ assert!(t.contains(&"timedelta".to_string()) || t.contains(&"timedeltaindex".to_string()));
+ }
+
+ #[test]
+ fn structural_doc_pulls_signatures_imports_calls() {
+ let src = "import os\nfn reconcile(charge):\n return backoff(charge)\n";
+ let d = structural_doc(src);
+ assert!(d.contains("imports:") && d.contains("import os"));
+ assert!(d.contains("symbols:") && d.contains("reconcile"));
+ assert!(d.contains("calls:") && d.contains("backoff"));
+ }
+
+ struct FakeEmbedder;
+ #[async_trait]
+ impl EmbeddingProvider for FakeEmbedder {
+ fn name(&self) -> &str {
+ "fake"
+ }
+ fn model_id(&self) -> &str {
+ "fake-1"
+ }
+ fn dimensions(&self) -> usize {
+ 3
+ }
+ async fn embed(&self, texts: &[&str]) -> anyhow::Result>> {
+ // deterministic non-zero vector per input (length-based, just needs to be stable)
+ Ok(texts
+ .iter()
+ .map(|t| vec![t.len() as f32 + 1.0, 1.0, 0.5])
+ .collect())
+ }
+ }
+
+ fn git(dir: &std::path::Path, args: &[&str]) {
+ let ok = std::process::Command::new("git")
+ .arg("-C")
+ .arg(dir)
+ .args(args)
+ .output()
+ .unwrap()
+ .status
+ .success();
+ assert!(ok, "git {args:?}");
+ }
+
+ #[tokio::test]
+ async fn index_ref_is_content_addressed_and_incremental() {
+ let tmp = TempDir::new().unwrap();
+ let repo = tmp.path().join("repo");
+ std::fs::create_dir_all(&repo).unwrap();
+ git(&repo, &["init", "-q"]);
+ git(&repo, &["config", "user.email", "t@t"]);
+ git(&repo, &["config", "user.name", "t"]);
+ std::fs::write(repo.join("a.rs"), "fn reconcile() { backoff(); }\n").unwrap();
+ std::fs::write(repo.join("readme.md"), "not code\n").unwrap(); // non-code ext → ignored
+ git(&repo, &["add", "-A"]);
+ git(&repo, &["commit", "-q", "-m", "init"]);
+
+ let mut store = CodegraphStore::open(&tmp.path().join("cg.db")).unwrap();
+ let emb = FakeEmbedder;
+
+ let r1 = index_ref(&mut store, "r", &repo, Some("main"), &emb, IndexMode::Dense)
+ .await
+ .unwrap();
+ assert_eq!(r1.files, 1, "only the .rs file is indexed");
+ assert_eq!(r1.computed, 1);
+ assert_eq!(r1.cached, 0);
+
+ // Re-index unchanged tree → all cache hits, nothing re-embedded.
+ let r2 = index_ref(&mut store, "r", &repo, Some("main"), &emb, IndexMode::Dense)
+ .await
+ .unwrap();
+ assert_eq!(r2.computed, 0);
+ assert_eq!(r2.cached, 1);
+
+ // The blob hydrates with tokens + a normalized embedding.
+ let hits = store.hydrate("r", "main", &emb.signature()).unwrap();
+ assert_eq!(hits.len(), 1);
+ assert!(hits[0].tokens.contains(&"reconcile".to_string()));
+ let norm: f32 = hits[0].emb.iter().map(|x| x * x).sum::().sqrt();
+ assert!((norm - 1.0).abs() < 1e-3, "embedding is L2-normalized");
+ }
+
+ /// An embedder that errors on empty input, like the real cloud backend
+ /// (which 400s "input must be a non-empty string"). Guards the fallback.
+ struct StrictEmbedder;
+ #[async_trait]
+ impl EmbeddingProvider for StrictEmbedder {
+ fn name(&self) -> &str {
+ "strict"
+ }
+ fn model_id(&self) -> &str {
+ "strict-1"
+ }
+ fn dimensions(&self) -> usize {
+ 2
+ }
+ async fn embed(&self, texts: &[&str]) -> anyhow::Result>> {
+ if texts.iter().any(|t| t.trim().is_empty()) {
+ anyhow::bail!("input must be a non-empty string");
+ }
+ Ok(texts
+ .iter()
+ .map(|t| vec![t.len() as f32 + 1.0, 1.0])
+ .collect())
+ }
+ }
+
+ #[tokio::test]
+ async fn index_ref_never_embeds_empty_doc() {
+ let tmp = TempDir::new().unwrap();
+ let repo = tmp.path().join("repo");
+ std::fs::create_dir_all(&repo).unwrap();
+ git(&repo, &["init", "-q"]);
+ git(&repo, &["config", "user.email", "t@t"]);
+ git(&repo, &["config", "user.name", "t"]);
+ // Structure-less files: empty, and a bare assignment (no def/import/call/doc).
+ std::fs::write(repo.join("__init__.py"), "").unwrap();
+ std::fs::write(repo.join("data.py"), "x = 1\n").unwrap();
+ std::fs::write(repo.join("ok.py"), "def go():\n run()\n").unwrap();
+ git(&repo, &["add", "-A"]);
+ git(&repo, &["commit", "-q", "-m", "init"]);
+
+ let mut store = CodegraphStore::open(&tmp.path().join("cg.db")).unwrap();
+ // Must NOT bail with the empty-input error: the fallback keeps every
+ // embed input non-empty even for files with no extractable structure.
+ let rep = index_ref(
+ &mut store,
+ "r",
+ &repo,
+ Some("main"),
+ &StrictEmbedder,
+ IndexMode::Dense,
+ )
+ .await
+ .expect("index_ref tolerates structure-less files");
+ assert_eq!(rep.computed, 3, "all three files embedded + stored");
+ }
+
+ /// Embedder that fails if called at all — proves the lexical path embeds nothing.
+ struct NoEmbed;
+ #[async_trait]
+ impl EmbeddingProvider for NoEmbed {
+ fn name(&self) -> &str {
+ "noembed"
+ }
+ fn model_id(&self) -> &str {
+ "noembed-1"
+ }
+ fn dimensions(&self) -> usize {
+ 2
+ }
+ async fn embed(&self, _t: &[&str]) -> anyhow::Result>> {
+ anyhow::bail!("embedder must not be called in lexical mode")
+ }
+ }
+
+ #[tokio::test]
+ async fn lexical_mode_indexes_and_searches_without_embedding() {
+ let tmp = TempDir::new().unwrap();
+ let repo = tmp.path().join("repo");
+ std::fs::create_dir_all(&repo).unwrap();
+ git(&repo, &["init", "-q"]);
+ git(&repo, &["config", "user.email", "t@t"]);
+ git(&repo, &["config", "user.name", "t"]);
+ std::fs::write(repo.join("auth.rs"), "fn login() { session(); token(); }\n").unwrap();
+ std::fs::write(repo.join("retry.rs"), "fn reconcile() { backoff(); }\n").unwrap();
+ git(&repo, &["add", "-A"]);
+ git(&repo, &["commit", "-q", "-m", "init"]);
+
+ let mut store = CodegraphStore::open(&tmp.path().join("cg.db")).unwrap();
+ // Lexical index makes no embedder call (NoEmbed would bail) …
+ let rep = index_ref(
+ &mut store,
+ "r",
+ &repo,
+ Some("main"),
+ &NoEmbed,
+ IndexMode::Lexical,
+ )
+ .await
+ .expect("lexical index never embeds");
+ assert_eq!(rep.computed, 2);
+
+ // … and lexical search is BM25-only — still no embedder call — yet ranks.
+ let out = crate::openhuman::codegraph::search_ref(
+ &mut store,
+ "r",
+ "main",
+ "reconcile backoff",
+ &NoEmbed,
+ 5,
+ )
+ .await
+ .expect("lexical search never embeds");
+ assert!(matches!(
+ out.coverage,
+ crate::openhuman::codegraph::Coverage::Full
+ ));
+ assert_eq!(
+ out.hits.first().map(String::as_str),
+ Some("retry.rs"),
+ "BM25 ranks retry.rs first for 'reconcile backoff'"
+ );
+ }
+
+ // ---- manual indexing benchmark -------------------------------------
+ // A zero-latency embedder returning realistically-sized (default 1024-d)
+ // vectors, with cumulative embed-time accounting so the harness can
+ // subtract it and report *pure engine* throughput (extract + tokenize +
+ // SQLite + manifest). Real cloud embedding latency adds on top of that.
+ use std::sync::atomic::{AtomicU64, Ordering};
+ use std::sync::Arc;
+
+ struct BenchEmbedder {
+ dim: usize,
+ embed_nanos: Arc,
+ invocations: Arc,
+ docs: Arc,
+ }
+ #[async_trait]
+ impl EmbeddingProvider for BenchEmbedder {
+ fn name(&self) -> &str {
+ "bench"
+ }
+ fn model_id(&self) -> &str {
+ "bench-vec"
+ }
+ fn dimensions(&self) -> usize {
+ self.dim
+ }
+ async fn embed(&self, texts: &[&str]) -> anyhow::Result>> {
+ let t = std::time::Instant::now();
+ let out: Vec> = texts
+ .iter()
+ .map(|s| {
+ // cheap, deterministic, non-degenerate vector of the real size
+ let mut v = vec![0.0f32; self.dim];
+ v[0] = s.len() as f32 + 1.0;
+ if self.dim > 1 {
+ v[1] = 1.0;
+ }
+ v
+ })
+ .collect();
+ self.embed_nanos
+ .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
+ self.invocations.fetch_add(1, Ordering::Relaxed);
+ self.docs.fetch_add(texts.len() as u64, Ordering::Relaxed);
+ Ok(out)
+ }
+ }
+
+ #[tokio::test]
+ #[ignore = "manual benchmark: CODEGRAPH_BENCH_REPO=/path cargo test ... -- --ignored --nocapture"]
+ async fn bench_index_speed() {
+ let repo = match std::env::var("CODEGRAPH_BENCH_REPO") {
+ Ok(p) => std::path::PathBuf::from(p),
+ Err(_) => {
+ eprintln!("bench_index_speed: set CODEGRAPH_BENCH_REPO=/path/to/git/repo");
+ return;
+ }
+ };
+ let dim: usize = std::env::var("CODEGRAPH_BENCH_DIM")
+ .ok()
+ .and_then(|s| s.parse().ok())
+ .unwrap_or(1024);
+
+ let tmp = TempDir::new().unwrap();
+ let mut store = CodegraphStore::open(&tmp.path().join("cg.db")).unwrap();
+ let embed_nanos = Arc::new(AtomicU64::new(0));
+ let invocations = Arc::new(AtomicU64::new(0));
+ let docs = Arc::new(AtomicU64::new(0));
+ let emb = BenchEmbedder {
+ dim,
+ embed_nanos: embed_nanos.clone(),
+ invocations: invocations.clone(),
+ docs: docs.clone(),
+ };
+
+ // COLD — nothing cached, every blob is read + extracted + embedded + stored.
+ let t0 = std::time::Instant::now();
+ let cold = index_ref(&mut store, "bench", &repo, None, &emb, IndexMode::Dense)
+ .await
+ .unwrap();
+ let cold_ms = t0.elapsed().as_secs_f64() * 1e3;
+ let embed_ms = embed_nanos.load(Ordering::Relaxed) as f64 / 1e6;
+ let engine_ms = (cold_ms - embed_ms).max(0.0);
+ let n = cold.computed.max(1) as f64;
+
+ // WARM — re-index the same tree: content-addressed → all cache hits.
+ let t1 = std::time::Instant::now();
+ let warm = index_ref(&mut store, "bench", &repo, None, &emb, IndexMode::Dense)
+ .await
+ .unwrap();
+ let warm_ms = t1.elapsed().as_secs_f64() * 1e3;
+
+ eprintln!("\n==== codegraph index bench =====================================");
+ eprintln!("repo : {}", repo.display());
+ eprintln!("embed dim : {dim} (zero-latency fake embedder)");
+ eprintln!(
+ "files (tracked) : {} computed={} cached={} skipped={}",
+ cold.files, cold.computed, cold.cached, cold.skipped
+ );
+ eprintln!("-- COLD (full index) -------------------------------------------");
+ eprintln!(" wall total : {:>8.1} ms", cold_ms);
+ eprintln!(
+ " fake embed : {:>8.1} ms ({:.1}% — replaced by real cloud latency in prod)",
+ embed_ms,
+ 100.0 * embed_ms / cold_ms.max(1e-9)
+ );
+ eprintln!(
+ " ENGINE only : {:>8.1} ms → {:>7.0} files/s ({:.3} ms/file)",
+ engine_ms,
+ n / (engine_ms / 1e3).max(1e-9),
+ engine_ms / n
+ );
+ eprintln!(
+ " embed : {} call(s) for {} docs (batched ≤{}/call)",
+ invocations.load(Ordering::Relaxed),
+ docs.load(Ordering::Relaxed),
+ EMBED_BATCH
+ );
+ eprintln!("-- WARM (content-addressed re-index, all cache hits) -----------");
+ eprintln!(
+ " wall total : {:>8.1} ms → {:>7.0} files/s ({:.4} ms/file) cached={}",
+ warm_ms,
+ warm.files as f64 / (warm_ms / 1e3).max(1e-9),
+ warm_ms / warm.files.max(1) as f64,
+ warm.cached
+ );
+ eprintln!("================================================================\n");
+ }
+
+ /// Live probe — build the *real* provider from the workspace config and
+ /// embed one string. Confirms the cloud session JWT + backend are reachable
+ /// before attempting a full real-embedding index. A `401`/expired session
+ /// prints `EMBED FAILED` rather than panicking.
+ ///
+ /// OPENHUMAN_WORKSPACE=/path OPENHUMAN_KEYRING_BACKEND=file \
+ /// cargo test --lib codegraph::index::tests::cloud_embed_probe -- --ignored --nocapture
+ #[tokio::test]
+ #[ignore = "live: needs OPENHUMAN_WORKSPACE + a valid backend session"]
+ async fn cloud_embed_probe() {
+ let config = crate::openhuman::config::Config::load_or_init()
+ .await
+ .expect("load config");
+ let provider = crate::openhuman::embeddings::provider_from_config(&config)
+ .expect("build embedding provider");
+ eprintln!(
+ "\n==== cloud embed probe ====\nprovider={} model={} dims={} sig={}",
+ provider.name(),
+ provider.model_id(),
+ provider.dimensions(),
+ provider.signature(),
+ );
+ let t = std::time::Instant::now();
+ match provider.embed(&["hello world from codegraph"]).await {
+ Ok(vs) => {
+ let v = vs.first().map(Vec::as_slice).unwrap_or(&[]);
+ eprintln!(
+ "OK: {} vector(s), dim={}, first5={:?} ({:.0} ms)",
+ vs.len(),
+ v.len(),
+ &v[..v.len().min(5)],
+ t.elapsed().as_secs_f64() * 1e3
+ );
+ }
+ Err(e) => eprintln!("EMBED FAILED: {e:#}"),
+ }
+ eprintln!("===========================\n");
+ }
+
+ /// Full real-embedding e2e: load the workspace config → build the cloud
+ /// provider → `index_ref` a real repo → `search_ref`, asserting full
+ /// coverage + non-empty hits and printing real wall-time (embedding
+ /// included). Defaults to the small flask checkout (one embed batch);
+ /// override with `CODEGRAPH_E2E_REPO` / `CODEGRAPH_E2E_QUERY`.
+ ///
+ /// OPENHUMAN_WORKSPACE=/path OPENHUMAN_KEYRING_BACKEND=file \
+ /// cargo test --lib codegraph::index::tests::index_e2e_cloud -- --ignored --nocapture
+ #[tokio::test]
+ #[ignore = "live: real cloud embeddings; needs OPENHUMAN_WORKSPACE + a valid session"]
+ async fn index_e2e_cloud() {
+ let repo = std::path::PathBuf::from(std::env::var("CODEGRAPH_E2E_REPO").unwrap_or_else(
+ |_| {
+ "/home/sanil/vezures/openhuman-cbmem-ab/bench/codebase-memory-ab/repos/pallets__flask"
+ .to_string()
+ },
+ ));
+ if !repo.exists() {
+ eprintln!("index_e2e_cloud: repo not found: {}", repo.display());
+ return;
+ }
+ let query = std::env::var("CODEGRAPH_E2E_QUERY")
+ .unwrap_or_else(|_| "register blueprint route url rule".to_string());
+
+ let config = crate::openhuman::config::Config::load_or_init()
+ .await
+ .expect("load config");
+ let provider = crate::openhuman::embeddings::provider_from_config(&config)
+ .expect("build embedding provider");
+
+ let tmp = TempDir::new().unwrap();
+ let mut store = CodegraphStore::open(&tmp.path().join("cg.db")).unwrap();
+
+ let t0 = std::time::Instant::now();
+ let rep = index_ref(
+ &mut store,
+ "e2e",
+ &repo,
+ None,
+ provider.as_ref(),
+ IndexMode::Dense,
+ )
+ .await
+ .expect("index_ref");
+ let index_ms = t0.elapsed().as_secs_f64() * 1e3;
+
+ let t1 = std::time::Instant::now();
+ let out = crate::openhuman::codegraph::search_ref(
+ &mut store,
+ "e2e",
+ &rep.git_ref,
+ &query,
+ provider.as_ref(),
+ 10,
+ )
+ .await
+ .expect("search_ref");
+ let search_ms = t1.elapsed().as_secs_f64() * 1e3;
+
+ eprintln!("\n==== codegraph e2e (REAL cloud embeddings) =====================");
+ eprintln!("repo : {} ref={}", repo.display(), rep.git_ref);
+ eprintln!(
+ "index : files={} computed={} cached={} skipped={} in {:.0} ms (embedding incl.)",
+ rep.files, rep.computed, rep.cached, rep.skipped, index_ms
+ );
+ eprintln!("query : {query:?}");
+ eprintln!(
+ "search: coverage={:?} indexed={} total={} in {:.0} ms",
+ out.coverage, out.indexed, out.total, search_ms
+ );
+ eprintln!("top hits:");
+ for (i, h) in out.hits.iter().take(10).enumerate() {
+ eprintln!(" {}. {}", i + 1, h);
+ }
+ eprintln!("================================================================\n");
+
+ assert!(rep.computed > 0, "indexed at least one blob");
+ // Not None — we got real coverage. A clean small repo is Full; a large
+ // repo with oversized/binary files skipped is legitimately Partial.
+ assert!(
+ !matches!(out.coverage, crate::openhuman::codegraph::Coverage::None),
+ "search has at least partial coverage"
+ );
+ assert!(!out.hits.is_empty(), "search returned hits");
+ }
+}
diff --git a/src/openhuman/codegraph/mod.rs b/src/openhuman/codegraph/mod.rs
new file mode 100644
index 0000000000..c48d8aa678
--- /dev/null
+++ b/src/openhuman/codegraph/mod.rs
@@ -0,0 +1,29 @@
+//! codegraph — content-addressed code retrieval for coding subagents.
+//!
+//! The seed engine behind the issue-crusher / pr-reviewer skills. Retrieval is
+//! `BM25 (SQLite FTS5) ∪ structural-aug dense (embeddings domain)`, RRF-fused.
+//! Indexing is content-addressed: every file's `{tokens, struct-doc embedding}`
+//! is cached by its git **blob SHA** (+ embedding-model signature); a branch's
+//! index is just its per-`(repo, ref)` **manifest** rows joined to the shared
+//! blob cache at query time. Branch switch / new commit / pull only (re)embed
+//! the blobs that actually changed.
+//!
+//! Pure Rust: `tree-sitter` for structure, `rusqlite`+FTS5 for lexical, and the
+//! `embeddings` domain (cloud by default) for vectors. No Python, no extra
+//! services.
+//!
+//! Layers:
+//! - [`store`] — persistent SQLite blob cache + manifests (this commit).
+//! - `index` — tree-sitter extract + FTS5 + dense, incremental (next).
+//! - `search` — BM25 ∪ dense RRF + coverage flag (next).
+
+pub mod index;
+pub mod search;
+pub mod store;
+
+pub use index::{
+ code_tokens, count_code_files, current_ref, index_ref, structural_doc, IndexMode, IndexReport,
+ LEXICAL_MODEL,
+};
+pub use search::{search_ref, Coverage, SearchOutcome};
+pub use store::{BlobEntry, CodegraphStore};
diff --git a/src/openhuman/codegraph/search.rs b/src/openhuman/codegraph/search.rs
new file mode 100644
index 0000000000..534e1fe9ae
--- /dev/null
+++ b/src/openhuman/codegraph/search.rs
@@ -0,0 +1,289 @@
+//! Retrieval: the seed. Hydrate a `(repo, ref)` working set from the store,
+//! score it with **BM25 (lexical) ∪ dense (cosine)**, **RRF-fuse**, and report
+//! a **coverage** flag (`full`/`partial`/`none`) so callers know whether the
+//! index is complete or the agent should lean on grep.
+//!
+//! BM25 is in-memory over the hydrated tokens (the working set is one repo's
+//! files — small; this matches the validated prototype and keeps the
+//! hydrate-per-query model simple). Dense is cosine over the L2-normalised
+//! structural-aug vectors. The query is embedded once with the same provider
+//! the index was built with (its `signature()` is the cache `model` key).
+
+use std::collections::{HashMap, HashSet};
+
+use anyhow::{Context, Result};
+
+use crate::openhuman::embeddings::EmbeddingProvider;
+
+use super::index::code_tokens;
+use super::store::{BlobEntry, CodegraphStore};
+
+const RRF_K: f32 = 60.0;
+const PER_ARM: usize = 20; // top-N from each arm fed into RRF
+const BM25_K1: f32 = 1.5;
+const BM25_B: f32 = 0.75;
+
+/// How complete the index is for the queried `(repo, ref)`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum Coverage {
+ /// Every manifest file is embedded — trust the candidates.
+ Full,
+ /// Some files still pending (background index in flight) — treat as hints.
+ Partial,
+ /// Nothing indexed yet — fall back to grep.
+ None,
+}
+
+/// The seed result: ranked candidate paths + how complete the index was.
+#[derive(Debug, Clone, serde::Serialize)]
+pub struct SearchOutcome {
+ pub hits: Vec,
+ pub coverage: Coverage,
+ /// Files embedded (hydrated) vs total in the manifest.
+ pub indexed: usize,
+ pub total: usize,
+}
+
+fn l2_normalize(v: &mut [f32]) {
+ let norm = v.iter().map(|x| x * x).sum::().sqrt();
+ if norm > 0.0 {
+ for x in v.iter_mut() {
+ *x /= norm;
+ }
+ }
+}
+
+/// BM25-Okapi over the hydrated docs; returns doc indices ranked best-first.
+fn bm25_rank(docs: &[BlobEntry], query: &[String]) -> Vec {
+ let n = docs.len() as f32;
+ let lens: Vec = docs.iter().map(|d| d.tokens.len() as f32).collect();
+ let avgdl = (lens.iter().sum::() / n).max(1.0);
+ // per-doc term frequency tables
+ let tfs: Vec> = docs
+ .iter()
+ .map(|d| {
+ let mut m: HashMap<&str, f32> = HashMap::new();
+ for w in &d.tokens {
+ *m.entry(w.as_str()).or_insert(0.0) += 1.0;
+ }
+ m
+ })
+ .collect();
+ let q_terms: HashSet<&str> = query.iter().map(|s| s.as_str()).collect();
+
+ let mut scores = vec![0.0f32; docs.len()];
+ for &t in &q_terms {
+ let df = tfs.iter().filter(|m| m.contains_key(t)).count() as f32;
+ if df == 0.0 {
+ continue;
+ }
+ let idf = (((n - df + 0.5) / (df + 0.5)) + 1.0).ln();
+ for (i, m) in tfs.iter().enumerate() {
+ if let Some(&f) = m.get(t) {
+ let denom = f + BM25_K1 * (1.0 - BM25_B + BM25_B * lens[i] / avgdl);
+ scores[i] += idf * (f * (BM25_K1 + 1.0)) / denom;
+ }
+ }
+ }
+ rank_by_score(&scores)
+}
+
+/// Cosine (dot over normalised vectors) of `qv` against each doc; best-first.
+fn dense_rank(docs: &[BlobEntry], qv: &[f32]) -> Vec {
+ let scores: Vec = docs
+ .iter()
+ .map(|d| d.emb.iter().zip(qv).map(|(a, b)| a * b).sum::())
+ .collect();
+ rank_by_score(&scores)
+}
+
+fn rank_by_score(scores: &[f32]) -> Vec {
+ let mut idx: Vec = (0..scores.len()).collect();
+ idx.sort_by(|&a, &b| {
+ scores[b]
+ .partial_cmp(&scores[a])
+ .unwrap_or(std::cmp::Ordering::Equal)
+ });
+ idx
+}
+
+/// Reciprocal-rank fusion of several rankings (top-`PER_ARM` of each), top-`k`.
+fn rrf(rankings: &[Vec], k: usize) -> Vec {
+ let mut score: HashMap = HashMap::new();
+ for ranking in rankings {
+ for (rank, &doc) in ranking.iter().take(PER_ARM).enumerate() {
+ *score.entry(doc).or_insert(0.0) += 1.0 / (RRF_K + rank as f32 + 1.0);
+ }
+ }
+ let mut items: Vec<(usize, f32)> = score.into_iter().collect();
+ items.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
+ items.into_iter().take(k).map(|(i, _)| i).collect()
+}
+
+/// Seed `query` against a `(repo, ref)` index: BM25 ∪ dense, RRF-fused, top-`k`,
+/// with a coverage flag. Embeds the query once with `embedder`.
+pub async fn search_ref(
+ store: &mut CodegraphStore,
+ repo_id: &str,
+ git_ref: &str,
+ query: &str,
+ embedder: &dyn EmbeddingProvider,
+ k: usize,
+) -> Result {
+ let total = store.manifest_size(repo_id, git_ref)?;
+ // Auto-detect the index mode: prefer the dense arm (rows under the
+ // embedder's signature); if none, fall back to the lexical-only key (a
+ // small repo indexed BM25-only). Lexical search makes no embedder call.
+ let dense_model = embedder.signature();
+ let mut docs = store.hydrate(repo_id, git_ref, &dense_model)?;
+ let dense_active = !docs.is_empty();
+ if !dense_active {
+ docs = store.hydrate(repo_id, git_ref, super::index::LEXICAL_MODEL)?;
+ }
+
+ let coverage = if total == 0 {
+ Coverage::None
+ } else if docs.len() >= total {
+ Coverage::Full
+ } else {
+ Coverage::Partial
+ };
+ if docs.is_empty() {
+ return Ok(SearchOutcome {
+ hits: vec![],
+ coverage,
+ indexed: 0,
+ total,
+ });
+ }
+
+ let q_tokens = code_tokens(query);
+ let bm = bm25_rank(&docs, &q_tokens);
+
+ // Dense arm only when the index has vectors — otherwise BM25 alone, and no
+ // query-embed round-trip. RRF over a single ranking preserves its order.
+ let arms = if dense_active {
+ let mut qv = embedder
+ .embed(&[query])
+ .await
+ .context("codegraph: embed query")?
+ .into_iter()
+ .next()
+ .unwrap_or_default();
+ l2_normalize(&mut qv);
+ vec![bm, dense_rank(&docs, &qv)]
+ } else {
+ vec![bm]
+ };
+
+ let fused = rrf(&arms, k);
+ let hits = fused.into_iter().map(|i| docs[i].path.clone()).collect();
+ Ok(SearchOutcome {
+ hits,
+ coverage,
+ indexed: docs.len(),
+ total,
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use async_trait::async_trait;
+ use tempfile::TempDir;
+
+ fn doc(path: &str, toks: &[&str]) -> BlobEntry {
+ BlobEntry {
+ path: path.into(),
+ tokens: toks.iter().map(|s| s.to_string()).collect(),
+ emb: vec![0.0, 0.0, 0.0],
+ }
+ }
+
+ #[test]
+ fn bm25_ranks_the_matching_doc_first() {
+ let docs = vec![
+ doc("auth.rs", &["login", "session", "token"]),
+ doc("retry.rs", &["reconcile", "backoff", "charge"]),
+ doc("util.rs", &["helper", "misc"]),
+ ];
+ let ranked = bm25_rank(&docs, &code_tokens("reconcile after backoff"));
+ assert_eq!(ranked[0], 1, "retry.rs ranks first for 'reconcile/backoff'");
+ }
+
+ #[test]
+ fn rrf_blends_two_rankings() {
+ // bm25 likes doc 2, dense likes doc 0; both should surface above doc 1.
+ let fused = rrf(&[vec![2, 1, 0], vec![0, 1, 2]], 3);
+ assert!(fused.contains(&0) && fused.contains(&2));
+ assert_eq!(fused.len(), 3);
+ }
+
+ struct FakeEmbedder;
+ #[async_trait]
+ impl EmbeddingProvider for FakeEmbedder {
+ fn name(&self) -> &str {
+ "fake"
+ }
+ fn model_id(&self) -> &str {
+ "fake-1"
+ }
+ fn dimensions(&self) -> usize {
+ 3
+ }
+ async fn embed(&self, texts: &[&str]) -> anyhow::Result>> {
+ Ok(texts.iter().map(|_| vec![1.0, 0.0, 0.0]).collect())
+ }
+ }
+
+ #[tokio::test]
+ async fn search_ref_returns_ranked_hits_and_partial_coverage() {
+ let tmp = TempDir::new().unwrap();
+ let mut store = CodegraphStore::open(&tmp.path().join("cg.db")).unwrap();
+ let sig = FakeEmbedder.signature();
+ store
+ .put_blob(
+ "a",
+ &sig,
+ &["reconcile".into(), "backoff".into()],
+ &[1.0, 0.0, 0.0],
+ )
+ .unwrap();
+ store
+ .put_blob(
+ "b",
+ &sig,
+ &["login".into(), "token".into()],
+ &[0.0, 1.0, 0.0],
+ )
+ .unwrap();
+ // manifest has a 3rd file with no cached blob → partial coverage.
+ store
+ .set_manifest(
+ "r",
+ "main",
+ &[
+ ("retry.rs".into(), "a".into()),
+ ("auth.rs".into(), "b".into()),
+ ("pending.rs".into(), "uncached".into()),
+ ],
+ )
+ .unwrap();
+
+ let out = search_ref(
+ &mut store,
+ "r",
+ "main",
+ "reconcile backoff",
+ &FakeEmbedder,
+ 10,
+ )
+ .await
+ .unwrap();
+ assert_eq!(out.coverage, Coverage::Partial);
+ assert_eq!(out.indexed, 2);
+ assert_eq!(out.total, 3);
+ assert_eq!(out.hits[0], "retry.rs", "lexical match surfaces first");
+ }
+}
diff --git a/src/openhuman/codegraph/store.rs b/src/openhuman/codegraph/store.rs
new file mode 100644
index 0000000000..ab736f35f7
--- /dev/null
+++ b/src/openhuman/codegraph/store.rs
@@ -0,0 +1,332 @@
+//! Persistent, content-addressed store for codegraph.
+//!
+//! Two tables (SQLite, WAL):
+//!
+//! - `blob(sha, model, tokens, emb, dim)` PK `(sha, model)` — the shared
+//! content cache: one row per unique file content per embedding model.
+//! `tokens` is the space-joined BM25 token stream; `emb` is the L2-normalised
+//! structural-aug vector stored as little-endian `f32` bytes. Shared across
+//! every repo and branch, so renames / unchanged files are free.
+//!
+//! - `manifest(repo_id, git_ref, path, sha)` PK `(repo_id, git_ref, path)` —
+//! one row per file per branch/commit. A branch's index is its rows here,
+//! joined to `blob` at query time. A file deleted on a branch drops from
+//! *that ref's* rows; its blob lingers until no manifest references it
+//! ([`CodegraphStore::gc`]).
+//!
+//! This is the storage layer only — tree-sitter extraction, FTS5 ranking, and
+//! the embeddings call live in `index`/`search`.
+
+use anyhow::{Context, Result};
+use rusqlite::{params, Connection};
+use std::path::Path;
+
+const SCHEMA: &str = "\
+CREATE TABLE IF NOT EXISTS blob (
+ sha TEXT NOT NULL,
+ model TEXT NOT NULL,
+ tokens TEXT NOT NULL,
+ emb BLOB NOT NULL,
+ dim INTEGER NOT NULL,
+ PRIMARY KEY (sha, model)
+);
+CREATE TABLE IF NOT EXISTS manifest (
+ repo_id TEXT NOT NULL,
+ git_ref TEXT NOT NULL,
+ path TEXT NOT NULL,
+ sha TEXT NOT NULL,
+ PRIMARY KEY (repo_id, git_ref, path)
+);
+CREATE INDEX IF NOT EXISTS manifest_repo_ref ON manifest(repo_id, git_ref);
+";
+
+/// One hydrated file in a `(repo, ref)` working set: its path plus the cached
+/// BM25 tokens and dense vector. Returned by [`CodegraphStore::hydrate`].
+#[derive(Debug, Clone)]
+pub struct BlobEntry {
+ pub path: String,
+ pub tokens: Vec,
+ pub emb: Vec,
+}
+
+/// Content-addressed blob cache + per-`(repo, ref)` manifests, backed by SQLite.
+pub struct CodegraphStore {
+ conn: Connection,
+}
+
+impl CodegraphStore {
+ /// Open (creating if needed) the codegraph DB at `db_path`.
+ pub fn open(db_path: &Path) -> Result {
+ if let Some(parent) = db_path.parent() {
+ std::fs::create_dir_all(parent).ok();
+ }
+ let conn = Connection::open(db_path)
+ .with_context(|| format!("open codegraph db at {}", db_path.display()))?;
+ conn.pragma_update(None, "journal_mode", "WAL")?;
+ // NORMAL is durable across an app crash under WAL (only a power/OS crash
+ // can lose the last commit) and drops the per-commit fsync that
+ // otherwise dominates a cold index — and this is a rebuildable cache.
+ conn.pragma_update(None, "synchronous", "NORMAL")?;
+ conn.execute_batch(SCHEMA)
+ .context("init codegraph schema")?;
+ Ok(Self { conn })
+ }
+
+ /// True if this content (`sha`) is already cached for `model` — the
+ /// incremental check: a cache hit means no re-embed on (re)index.
+ pub fn has_blob(&self, sha: &str, model: &str) -> Result {
+ let n: i64 = self.conn.query_row(
+ "SELECT COUNT(*) FROM blob WHERE sha=?1 AND model=?2",
+ params![sha, model],
+ |r| r.get(0),
+ )?;
+ Ok(n > 0)
+ }
+
+ /// Insert a computed blob (idempotent on `(sha, model)`).
+ pub fn put_blob(&self, sha: &str, model: &str, tokens: &[String], emb: &[f32]) -> Result<()> {
+ let token_str = tokens.join(" ");
+ let mut bytes = Vec::with_capacity(emb.len() * 4);
+ for f in emb {
+ bytes.extend_from_slice(&f.to_le_bytes());
+ }
+ self.conn.execute(
+ "INSERT OR IGNORE INTO blob(sha, model, tokens, emb, dim) VALUES (?1,?2,?3,?4,?5)",
+ params![sha, model, token_str, bytes, emb.len() as i64],
+ )?;
+ Ok(())
+ }
+
+ /// Insert many computed blobs in a single transaction (one fsync for the
+ /// batch, not one per blob). Idempotent on `(sha, model)` via `OR IGNORE`,
+ /// so duplicate content within the batch keeps its first row. The hot path
+ /// for a cold index — prefer this over a `put_blob` loop.
+ pub fn put_blobs(
+ &mut self,
+ model: &str,
+ blobs: &[(String, Vec, Vec)],
+ ) -> Result<()> {
+ if blobs.is_empty() {
+ return Ok(());
+ }
+ let tx = self.conn.transaction()?;
+ {
+ let mut stmt = tx.prepare(
+ "INSERT OR IGNORE INTO blob(sha, model, tokens, emb, dim) VALUES (?1,?2,?3,?4,?5)",
+ )?;
+ for (sha, tokens, emb) in blobs {
+ let token_str = tokens.join(" ");
+ let mut bytes = Vec::with_capacity(emb.len() * 4);
+ for f in emb {
+ bytes.extend_from_slice(&f.to_le_bytes());
+ }
+ stmt.execute(params![sha, model, token_str, bytes, emb.len() as i64])?;
+ }
+ }
+ tx.commit()?;
+ Ok(())
+ }
+
+ /// Replace a `(repo, ref)` manifest with `files` (`(path, sha)`), handling
+ /// deletes/renames: the ref's rows are rewritten to exactly `files`.
+ pub fn set_manifest(
+ &mut self,
+ repo_id: &str,
+ git_ref: &str,
+ files: &[(String, String)],
+ ) -> Result<()> {
+ let tx = self.conn.transaction()?;
+ tx.execute(
+ "DELETE FROM manifest WHERE repo_id=?1 AND git_ref=?2",
+ params![repo_id, git_ref],
+ )?;
+ {
+ let mut stmt = tx.prepare(
+ "INSERT INTO manifest(repo_id, git_ref, path, sha) VALUES (?1,?2,?3,?4)",
+ )?;
+ for (path, sha) in files {
+ stmt.execute(params![repo_id, git_ref, path, sha])?;
+ }
+ }
+ tx.commit()?;
+ Ok(())
+ }
+
+ /// Hydrate one `(repo, ref)` working set: manifest joined to the blob cache
+ /// for `model`. Files whose blob isn't cached (e.g. skipped/oversized) are
+ /// omitted — the caller derives coverage from `returned / manifest_size`.
+ pub fn hydrate(&self, repo_id: &str, git_ref: &str, model: &str) -> Result> {
+ let mut stmt = self.conn.prepare(
+ "SELECT m.path, b.tokens, b.emb FROM manifest m \
+ JOIN blob b ON b.sha = m.sha AND b.model = ?1 \
+ WHERE m.repo_id = ?2 AND m.git_ref = ?3",
+ )?;
+ let rows = stmt.query_map(params![model, repo_id, git_ref], |r| {
+ let path: String = r.get(0)?;
+ let tokens: String = r.get(1)?;
+ let bytes: Vec = r.get(2)?;
+ Ok((path, tokens, bytes))
+ })?;
+ let mut out = Vec::new();
+ for row in rows {
+ let (path, tokens, bytes) = row?;
+ let emb = bytes
+ .chunks_exact(4)
+ .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
+ .collect();
+ out.push(BlobEntry {
+ path,
+ tokens: tokens.split_whitespace().map(str::to_string).collect(),
+ emb,
+ });
+ }
+ Ok(out)
+ }
+
+ /// Number of files in a `(repo, ref)` manifest (the coverage denominator).
+ pub fn manifest_size(&self, repo_id: &str, git_ref: &str) -> Result {
+ let n: i64 = self.conn.query_row(
+ "SELECT COUNT(*) FROM manifest WHERE repo_id=?1 AND git_ref=?2",
+ params![repo_id, git_ref],
+ |r| r.get(0),
+ )?;
+ Ok(n as usize)
+ }
+
+ /// Distinct refs indexed for a repo.
+ pub fn refs(&self, repo_id: &str) -> Result> {
+ let mut stmt = self
+ .conn
+ .prepare("SELECT DISTINCT git_ref FROM manifest WHERE repo_id=?1")?;
+ let rows = stmt.query_map(params![repo_id], |r| r.get::<_, String>(0))?;
+ Ok(rows.collect::>>()?)
+ }
+
+ /// Drop cached blobs no live manifest references. Returns rows removed.
+ pub fn gc(&self) -> Result {
+ let removed = self.conn.execute(
+ "DELETE FROM blob WHERE sha NOT IN (SELECT DISTINCT sha FROM manifest)",
+ [],
+ )?;
+ Ok(removed)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use tempfile::TempDir;
+
+ fn store(dir: &TempDir) -> CodegraphStore {
+ CodegraphStore::open(&dir.path().join("codegraph").join("index.db")).unwrap()
+ }
+
+ #[test]
+ fn blob_roundtrip_and_dedup() {
+ let tmp = TempDir::new().unwrap();
+ let s = store(&tmp);
+ assert!(!s.has_blob("sha1", "m").unwrap());
+ s.put_blob("sha1", "m", &["foo".into(), "bar".into()], &[0.5, -0.5])
+ .unwrap();
+ assert!(s.has_blob("sha1", "m").unwrap());
+ // Different model = distinct cache entry.
+ assert!(!s.has_blob("sha1", "other").unwrap());
+ // Idempotent.
+ s.put_blob("sha1", "m", &["foo".into()], &[1.0]).unwrap();
+ }
+
+ #[test]
+ fn put_blobs_batches_and_dedups() {
+ let tmp = TempDir::new().unwrap();
+ let mut s = store(&tmp);
+ s.put_blobs(
+ "m",
+ &[
+ ("s1".into(), vec!["a".into(), "b".into()], vec![1.0, 0.0]),
+ ("s2".into(), vec!["c".into()], vec![0.0, 1.0]),
+ ("s1".into(), vec!["dup".into()], vec![9.0]), // OR IGNORE keeps the first
+ ],
+ )
+ .unwrap();
+ assert!(s.has_blob("s1", "m").unwrap());
+ assert!(s.has_blob("s2", "m").unwrap());
+ // Empty batch is a no-op (warm re-index path).
+ s.put_blobs("m", &[]).unwrap();
+ s.set_manifest(
+ "r",
+ "main",
+ &[("a.rs".into(), "s1".into()), ("b.rs".into(), "s2".into())],
+ )
+ .unwrap();
+ let hits = s.hydrate("r", "main", "m").unwrap();
+ assert_eq!(hits.len(), 2);
+ let a = hits.iter().find(|h| h.path == "a.rs").unwrap();
+ assert_eq!(
+ a.tokens,
+ vec!["a".to_string(), "b".to_string()],
+ "first insert kept, not the dup"
+ );
+ }
+
+ #[test]
+ fn manifest_hydrate_and_coverage() {
+ let tmp = TempDir::new().unwrap();
+ let mut s = store(&tmp);
+ s.put_blob("shaA", "m", &["alpha".into()], &[1.0, 0.0])
+ .unwrap();
+ // shaB intentionally not cached (simulates skipped/oversized) → omitted from hydrate.
+ s.set_manifest(
+ "repo",
+ "main",
+ &[
+ ("a.rs".into(), "shaA".into()),
+ ("b.rs".into(), "shaB".into()),
+ ],
+ )
+ .unwrap();
+ let hits = s.hydrate("repo", "main", "m").unwrap();
+ assert_eq!(hits.len(), 1, "only the cached blob hydrates");
+ assert_eq!(hits[0].path, "a.rs");
+ assert_eq!(hits[0].tokens, vec!["alpha".to_string()]);
+ assert_eq!(hits[0].emb, vec![1.0, 0.0]);
+ assert_eq!(s.manifest_size("repo", "main").unwrap(), 2);
+ }
+
+ #[test]
+ fn manifest_is_per_ref_and_rewrites_on_set() {
+ let tmp = TempDir::new().unwrap();
+ let mut s = store(&tmp);
+ s.put_blob("x", "m", &["x".into()], &[0.0]).unwrap();
+ s.set_manifest("r", "brA", &[("util.rs".into(), "x".into())])
+ .unwrap();
+ s.set_manifest("r", "brB", &[("util/mod.rs".into(), "x".into())])
+ .unwrap();
+ let mut refs = s.refs("r").unwrap();
+ refs.sort();
+ assert_eq!(refs, vec!["brA".to_string(), "brB".to_string()]);
+ // Re-setting a ref rewrites it (delete on brA: file gone from that ref).
+ s.set_manifest("r", "brA", &[]).unwrap();
+ assert_eq!(s.manifest_size("r", "brA").unwrap(), 0);
+ assert_eq!(s.manifest_size("r", "brB").unwrap(), 1);
+ }
+
+ #[test]
+ fn gc_drops_unreferenced_blobs_and_persists() {
+ let path = TempDir::new().unwrap();
+ let db = path.path().join("cg.db");
+ {
+ let mut s = CodegraphStore::open(&db).unwrap();
+ s.put_blob("live", "m", &["a".into()], &[1.0]).unwrap();
+ s.put_blob("orphan", "m", &["b".into()], &[1.0]).unwrap();
+ s.set_manifest("r", "main", &[("a.rs".into(), "live".into())])
+ .unwrap();
+ assert_eq!(s.gc().unwrap(), 1, "orphan blob removed");
+ assert!(s.has_blob("live", "m").unwrap());
+ assert!(!s.has_blob("orphan", "m").unwrap());
+ }
+ // Reopen: state persisted across "restart".
+ let s = CodegraphStore::open(&db).unwrap();
+ assert!(s.has_blob("live", "m").unwrap());
+ assert_eq!(s.hydrate("r", "main", "m").unwrap().len(), 1);
+ }
+}
diff --git a/src/openhuman/composio/identity.rs b/src/openhuman/composio/identity.rs
new file mode 100644
index 0000000000..aba1dc0010
--- /dev/null
+++ b/src/openhuman/composio/identity.rs
@@ -0,0 +1,232 @@
+//! Composio connection identity resolution.
+//!
+//! Single source of truth for "what is the username on this Composio
+//! connection?". Used by the skill preflight gate (`[github]
+//! identity_match = "strict"`) and by any future caller that needs to
+//! compare the connected account against another subsystem (e.g. local
+//! `git config user.name`).
+//!
+//! The lookup goes through the per-toolkit
+//! [`ComposioProvider::fetch_user_profile`](crate::openhuman::memory_sync::composio::providers::ComposioProvider::fetch_user_profile)
+//! call, which already knows the right Composio action slug for each
+//! toolkit (`GITHUB_GET_THE_AUTHENTICATED_USER`,
+//! `GMAIL_GET_PROFILE`, …) and the JSON field that holds the username.
+//!
+//! ## Failure surface
+//!
+//! Everything in this module is best-effort and returns `Option`:
+//!
+//! - toolkit not registered → `None`
+//! - user not signed in / no active connection for the toolkit → `None`
+//! - Composio call fails / returns no username → `None`
+//!
+//! Callers MUST treat `None` as "couldn't resolve" rather than
+//! "username is empty". The preflight gate uses this contract to map
+//! `None` into a clear "GitHub identity not resolved — reconnect via
+//! `composio_authorize github`" error.
+
+use std::sync::Arc;
+
+use crate::openhuman::config::Config;
+
+use super::ops::fetch_connected_integrations;
+use super::providers::{get_provider, ProviderContext};
+
+/// Resolve the connected account's username for the given Composio
+/// toolkit, going through the existing per-provider `fetch_user_profile`
+/// path.
+///
+/// Returns `Some(username)` when:
+/// 1. The toolkit has a registered provider; AND
+/// 2. The toolkit is currently connected (per
+/// [`fetch_connected_integrations`]); AND
+/// 3. The provider's `fetch_user_profile` call succeeds AND yields a
+/// non-empty `username`.
+///
+/// Returns `None` for any other case. See module docs for the failure
+/// contract.
+pub async fn connection_identity(config: &Config, toolkit: &str) -> Option {
+ let toolkit_norm = toolkit.trim().to_ascii_lowercase();
+ if toolkit_norm.is_empty() {
+ tracing::debug!("[composio:identity] connection_identity: empty toolkit slug");
+ return None;
+ }
+
+ // (1) Provider must exist for this toolkit.
+ let provider = match get_provider(&toolkit_norm) {
+ Some(p) => p,
+ None => {
+ tracing::debug!(
+ toolkit = %toolkit_norm,
+ "[composio:identity] no provider registered for toolkit"
+ );
+ return None;
+ }
+ };
+
+ // (2) Toolkit must be in the active integrations set. This is the
+ // same source of truth Settings → Connections uses.
+ let connections = fetch_connected_integrations(config).await;
+ let matching = connections
+ .iter()
+ .find(|c| c.toolkit.eq_ignore_ascii_case(&toolkit_norm));
+ if matching.is_none() {
+ tracing::debug!(
+ toolkit = %toolkit_norm,
+ "[composio:identity] toolkit not in active integrations"
+ );
+ return None;
+ }
+
+ // (3) Build a provider context and call fetch_user_profile.
+ // `ProviderContext::from_config` probes the Composio factory and
+ // returns `None` when the user isn't signed in at all — same
+ // short-circuit other consumers rely on.
+ let ctx = ProviderContext::from_config(Arc::new(config.clone()), &toolkit_norm, None)?;
+ match provider.fetch_user_profile(&ctx).await {
+ Ok(profile) => {
+ let username = profile.username.as_deref().map(str::trim).unwrap_or("");
+ if username.is_empty() {
+ tracing::debug!(
+ toolkit = %toolkit_norm,
+ "[composio:identity] provider returned empty username"
+ );
+ None
+ } else {
+ tracing::debug!(
+ toolkit = %toolkit_norm,
+ username = %username,
+ "[composio:identity] resolved username"
+ );
+ Some(username.to_string())
+ }
+ }
+ Err(e) => {
+ tracing::debug!(
+ toolkit = %toolkit_norm,
+ error = %e,
+ "[composio:identity] fetch_user_profile failed"
+ );
+ None
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::openhuman::memory_sync::composio::providers::{
+ register_provider, ComposioProvider, ProviderArc, ProviderUserProfile, SyncOutcome,
+ SyncReason,
+ };
+ use async_trait::async_trait;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ /// Test provider that returns a fixed username (or fails, when
+ /// `fail` is set). We don't go through Composio at all — the
+ /// preflight gate just needs the provider's `username` field.
+ struct StubProvider {
+ slug: &'static str,
+ username: Option<&'static str>,
+ fail: bool,
+ calls: AtomicUsize,
+ }
+
+ impl StubProvider {
+ fn new(slug: &'static str, username: Option<&'static str>) -> Self {
+ Self {
+ slug,
+ username,
+ fail: false,
+ calls: AtomicUsize::new(0),
+ }
+ }
+ fn failing(slug: &'static str) -> Self {
+ Self {
+ slug,
+ username: None,
+ fail: true,
+ calls: AtomicUsize::new(0),
+ }
+ }
+ }
+
+ #[async_trait]
+ impl ComposioProvider for StubProvider {
+ fn toolkit_slug(&self) -> &'static str {
+ self.slug
+ }
+
+ async fn fetch_user_profile(
+ &self,
+ _ctx: &ProviderContext,
+ ) -> Result {
+ self.calls.fetch_add(1, Ordering::SeqCst);
+ if self.fail {
+ return Err("stub provider: forced failure".to_string());
+ }
+ Ok(ProviderUserProfile {
+ toolkit: self.slug.to_string(),
+ username: self.username.map(|s| s.to_string()),
+ ..Default::default()
+ })
+ }
+
+ async fn sync(
+ &self,
+ _ctx: &ProviderContext,
+ reason: SyncReason,
+ ) -> Result {
+ Ok(SyncOutcome {
+ toolkit: self.slug.to_string(),
+ reason: reason.as_str().to_string(),
+ ..Default::default()
+ })
+ }
+ }
+
+ fn fresh_config_in_workspace(tmp: &std::path::Path) -> Config {
+ let mut config = Config::default();
+ config.config_path = tmp.join("config.toml");
+ config.workspace_dir = tmp.join("workspace");
+ config.secrets.encrypt = false;
+ config
+ }
+
+ #[tokio::test]
+ async fn empty_toolkit_short_circuits_to_none() {
+ let tmp = tempfile::tempdir().expect("tempdir");
+ let config = fresh_config_in_workspace(tmp.path());
+ assert!(connection_identity(&config, "").await.is_none());
+ assert!(connection_identity(&config, " ").await.is_none());
+ }
+
+ #[tokio::test]
+ async fn unknown_toolkit_returns_none_without_provider_call() {
+ let tmp = tempfile::tempdir().expect("tempdir");
+ let config = fresh_config_in_workspace(tmp.path());
+ // Toolkit slug that has no registered provider.
+ assert!(connection_identity(&config, "not-a-real-toolkit-xyz")
+ .await
+ .is_none());
+ }
+
+ #[tokio::test]
+ async fn no_active_connection_short_circuits_before_provider_call() {
+ // Register a provider but no connections exist for the toolkit
+ // → identity helper should return None without calling
+ // fetch_user_profile.
+ let stub: ProviderArc = Arc::new(StubProvider::new(
+ "stub-no-active",
+ Some("would-not-be-returned"),
+ ));
+ register_provider(stub.clone());
+
+ let tmp = tempfile::tempdir().expect("tempdir");
+ let config = fresh_config_in_workspace(tmp.path());
+ // Default config has no Composio auth → fetch_connected_integrations
+ // returns an empty vec, so the toolkit is not "in active".
+ let username = connection_identity(&config, "stub-no-active").await;
+ assert!(username.is_none(), "must short-circuit when not connected");
+ }
+}
diff --git a/src/openhuman/composio/mod.rs b/src/openhuman/composio/mod.rs
index c35bd9516c..4eca1e42e4 100644
--- a/src/openhuman/composio/mod.rs
+++ b/src/openhuman/composio/mod.rs
@@ -43,6 +43,7 @@ pub mod error_mapping;
pub mod execute_dispatch;
pub mod execute_prepare;
pub mod googlecalendar_args;
+pub mod identity;
pub mod oauth_handoff;
pub mod ops;
pub mod periodic;
@@ -66,6 +67,7 @@ pub use crate::openhuman::memory_sync::composio::providers::{
};
pub use action_tool::ComposioActionTool;
pub use client::ComposioClient;
+pub use identity::connection_identity;
pub use ops::{
cached_active_integrations, connected_set_hash, fetch_connected_integrations,
fetch_connected_integrations_status, fetch_toolkit_actions,
diff --git a/src/openhuman/cron/schemas.rs b/src/openhuman/cron/schemas.rs
index fe87402c05..43d691d7f6 100644
--- a/src/openhuman/cron/schemas.rs
+++ b/src/openhuman/cron/schemas.rs
@@ -18,6 +18,7 @@ fn job_id_input(comment: &'static str) -> FieldSchema {
pub fn all_controller_schemas() -> Vec {
vec![
+ schemas("add"),
schemas("list"),
schemas("update"),
schemas("remove"),
@@ -28,6 +29,10 @@ pub fn all_controller_schemas() -> Vec {
pub fn all_registered_controllers() -> Vec {
vec![
+ RegisteredController {
+ schema: schemas("add"),
+ handler: handle_add,
+ },
RegisteredController {
schema: schemas("list"),
handler: handle_list,
@@ -53,6 +58,83 @@ pub fn all_registered_controllers() -> Vec {
pub fn schemas(function: &str) -> ControllerSchema {
match function {
+ "add" => ControllerSchema {
+ namespace: "cron",
+ function: "add",
+ description: "Create a new cron job (shell or agent).",
+ inputs: vec![
+ FieldSchema {
+ name: "name",
+ ty: TypeSchema::Option(Box::new(TypeSchema::String)),
+ comment: "Human-readable job name.",
+ required: false,
+ },
+ FieldSchema {
+ name: "schedule",
+ ty: TypeSchema::Ref("CronSchedule"),
+ comment: "When to run — { kind: 'cron', expr } | { kind: 'at', at } | { kind: 'every', every_ms }.",
+ required: true,
+ },
+ FieldSchema {
+ name: "job_type",
+ ty: TypeSchema::Option(Box::new(TypeSchema::Enum {
+ variants: vec!["shell", "agent"],
+ })),
+ comment: "Defaults to 'agent' when prompt is set, 'shell' when command is set.",
+ required: false,
+ },
+ FieldSchema {
+ name: "command",
+ ty: TypeSchema::Option(Box::new(TypeSchema::String)),
+ comment: "Shell command (required for shell jobs).",
+ required: false,
+ },
+ FieldSchema {
+ name: "prompt",
+ ty: TypeSchema::Option(Box::new(TypeSchema::String)),
+ comment: "Agent task prompt (required for agent jobs).",
+ required: false,
+ },
+ FieldSchema {
+ name: "session_target",
+ ty: TypeSchema::Option(Box::new(TypeSchema::Enum {
+ variants: vec!["isolated", "main"],
+ })),
+ comment: "Defaults to 'isolated'.",
+ required: false,
+ },
+ FieldSchema {
+ name: "model",
+ ty: TypeSchema::Option(Box::new(TypeSchema::String)),
+ comment: "Model override for agent jobs.",
+ required: false,
+ },
+ FieldSchema {
+ name: "agent_id",
+ ty: TypeSchema::Option(Box::new(TypeSchema::String)),
+ comment: "Built-in agent or skill definition ID.",
+ required: false,
+ },
+ FieldSchema {
+ name: "delivery",
+ ty: TypeSchema::Option(Box::new(TypeSchema::Ref("DeliveryConfig"))),
+ comment: "Delivery mode (proactive, announce, etc.).",
+ required: false,
+ },
+ FieldSchema {
+ name: "delete_after_run",
+ ty: TypeSchema::Option(Box::new(TypeSchema::Bool)),
+ comment: "If true, remove the job after its first execution.",
+ required: false,
+ },
+ ],
+ outputs: vec![FieldSchema {
+ name: "job",
+ ty: TypeSchema::Ref("CronJob"),
+ comment: "Newly created cron job.",
+ required: true,
+ }],
+ },
"list" => ControllerSchema {
namespace: "cron",
function: "list",
@@ -195,6 +277,80 @@ pub fn schemas(function: &str) -> ControllerSchema {
}
}
+fn handle_add(params: Map) -> ControllerFuture {
+ Box::pin(async move {
+ let config = config_rpc::load_config_with_timeout().await?;
+
+ let schedule: crate::openhuman::cron::Schedule = read_required(¶ms, "schedule")?;
+ let name = params
+ .get("name")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let command = params
+ .get("command")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let prompt = params
+ .get("prompt")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let session_target_str = params
+ .get("session_target")
+ .and_then(|v| v.as_str())
+ .unwrap_or("isolated");
+ let session_target = match session_target_str {
+ "main" => crate::openhuman::cron::SessionTarget::Main,
+ _ => crate::openhuman::cron::SessionTarget::Isolated,
+ };
+ let model = params
+ .get("model")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let agent_id = params
+ .get("agent_id")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let delivery: Option = params
+ .get("delivery")
+ .and_then(|v| serde_json::from_value(v.clone()).ok());
+ let delete_after_run = params
+ .get("delete_after_run")
+ .and_then(|v| v.as_bool())
+ .unwrap_or(false);
+
+ // Determine job type
+ let job_type = params
+ .get("job_type")
+ .and_then(|v| v.as_str())
+ .unwrap_or_else(|| if prompt.is_some() { "agent" } else { "shell" });
+
+ let job = match job_type {
+ "shell" => {
+ let cmd = command.ok_or("'command' is required for shell jobs")?;
+ crate::openhuman::cron::store::add_shell_job(&config, name, schedule, &cmd)
+ .map_err(|e| e.to_string())?
+ }
+ _ => {
+ let p = prompt.unwrap_or_default();
+ crate::openhuman::cron::store::add_agent_job_with_definition(
+ &config,
+ name,
+ schedule,
+ &p,
+ session_target,
+ model,
+ delivery,
+ delete_after_run,
+ agent_id,
+ )
+ .map_err(|e| e.to_string())?
+ }
+ };
+
+ to_json(RpcOutcome::single_log(job, "cron job created"))
+ })
+}
+
fn handle_list(_params: Map) -> ControllerFuture {
Box::pin(async {
let config = config_rpc::load_config_with_timeout().await?;
@@ -343,21 +499,42 @@ mod tests {
// ── registry helpers ────────────────────────────────────────────
+ #[test]
+ fn schemas_add_requires_schedule_and_returns_job() {
+ let s = schemas("add");
+ assert_eq!(s.namespace, "cron");
+ assert_eq!(s.function, "add");
+ let required: Vec<_> = s
+ .inputs
+ .iter()
+ .filter(|f| f.required)
+ .map(|f| f.name)
+ .collect();
+ assert_eq!(required, vec!["schedule"]);
+ assert_eq!(s.outputs[0].name, "job");
+ }
+
#[test]
fn all_controller_schemas_covers_every_supported_function() {
let names: Vec<_> = all_controller_schemas()
.into_iter()
.map(|s| s.function)
.collect();
- assert_eq!(names, vec!["list", "update", "remove", "run", "runs"]);
+ assert_eq!(
+ names,
+ vec!["add", "list", "update", "remove", "run", "runs"]
+ );
}
#[test]
fn all_registered_controllers_has_handler_per_schema() {
let controllers = all_registered_controllers();
- assert_eq!(controllers.len(), 5);
+ assert_eq!(controllers.len(), 6);
let names: Vec<_> = controllers.iter().map(|c| c.schema.function).collect();
- assert_eq!(names, vec!["list", "update", "remove", "run", "runs"]);
+ assert_eq!(
+ names,
+ vec!["add", "list", "update", "remove", "run", "runs"]
+ );
}
// ── read_required ───────────────────────────────────────────────
diff --git a/src/openhuman/embeddings/mod.rs b/src/openhuman/embeddings/mod.rs
index 5e69e8d40d..1f3e5cd896 100644
--- a/src/openhuman/embeddings/mod.rs
+++ b/src/openhuman/embeddings/mod.rs
@@ -41,6 +41,7 @@ pub use noop::NoopEmbedding;
pub use ollama::{OllamaEmbedding, DEFAULT_OLLAMA_DIMENSIONS, DEFAULT_OLLAMA_MODEL};
pub use openai::OpenAiEmbedding;
pub use provider_trait::{format_embedding_signature, EmbeddingProvider};
+pub use rpc::provider_from_config;
pub use schemas::{
all_controller_schemas as all_embeddings_controller_schemas,
all_registered_controllers as all_embeddings_registered_controllers,
diff --git a/src/openhuman/embeddings/rpc.rs b/src/openhuman/embeddings/rpc.rs
index 74deafedc6..b7df3bb808 100644
--- a/src/openhuman/embeddings/rpc.rs
+++ b/src/openhuman/embeddings/rpc.rs
@@ -378,6 +378,29 @@ pub async fn test_connection(
}
}
+/// Build an embedding provider from the live config — the same construction
+/// [`embed`] uses, exposed so other domains (e.g. `codegraph`) can obtain a
+/// provider for `signature()` + direct embedding without a JSON-RPC round-trip.
+pub fn provider_from_config(config: &Config) -> anyhow::Result> {
+ let provider_name = &config.memory.embedding_provider;
+ let model = &config.memory.embedding_model;
+ let dims = config.memory.embedding_dimensions;
+ let api_key = resolve_api_key(config, provider_name);
+ let custom_endpoint = provider_name.strip_prefix("custom:").map(|s| s.to_string());
+ let provider_slug = if provider_name.starts_with("custom:") {
+ "custom"
+ } else {
+ provider_name.as_str()
+ };
+ create_embedding_provider_with_credentials(
+ provider_slug,
+ model,
+ dims,
+ &api_key,
+ custom_endpoint.as_deref(),
+ )
+}
+
fn resolve_api_key(config: &Config, provider_name: &str) -> String {
let slug = if provider_name.starts_with("custom:") {
"custom"
diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs
index c911ce8f13..7b0a5e7814 100644
--- a/src/openhuman/mod.rs
+++ b/src/openhuman/mod.rs
@@ -26,6 +26,7 @@ pub mod audio_toolkit;
pub mod autocomplete;
pub mod billing;
pub mod channels;
+pub mod codegraph;
pub mod composio;
pub mod config;
pub mod connectivity;
diff --git a/src/openhuman/skills/defaults/dev-workflow/SKILL.md b/src/openhuman/skills/defaults/dev-workflow/SKILL.md
new file mode 100644
index 0000000000..48b438f272
--- /dev/null
+++ b/src/openhuman/skills/defaults/dev-workflow/SKILL.md
@@ -0,0 +1,149 @@
+# Dev Workflow — Autonomous Issue Crusher
+
+You are an autonomous developer agent. Your job is to find a GitHub issue on `{upstream}`, implement a fix, and deliver a PR.
+
+## Tool split — Composio for GitHub state, local git for the working tree
+
+GitHub state operations — issues, PRs, labels, assignees, branches as remote refs, repository metadata, AND in this skill the **commit** itself (this skill ships the commit through the GitHub API rather than `git push`, see below) — go through Composio via `composio_execute({tool: "GITHUB_", arguments: {...}})`. The working tree — clone, checkout, edit, `git status`/`diff`, run tests — stays on local `git`. Composio is the single authoritative GitHub identity (the user's connected account, gated by the skill's `[github]` preflight); the local working tree is where the actual code change happens. Do **not** shell out to `gh` for state operations — the preflight checked Composio but not gh's credential store, so a `gh` call may silently route through a different account.
+
+## The two repos
+
+- **Upstream** = `{upstream}` — where issues live and where PRs target (base = `{target_branch}`).
+- **Fork** = `{fork_owner}/` — where the fix branch is pushed. (`` is derived from `{upstream}`.)
+- You act as the **connected GitHub identity**. **Commit through the GitHub API via Composio** — assume you have *no* local `git push` credentials. Never block on `git push`.
+
+## Issue selection (smart fallback)
+
+1. **First**: Look for open issues assigned to `{fork_owner}` on `{upstream}` with no linked PR. Pick the oldest:
+ ```
+ composio_execute({
+ "tool": "GITHUB_LIST_REPOSITORY_ISSUES",
+ "arguments": {
+ "owner": "",
+ "repo": "",
+ "state": "open",
+ "assignee": "{fork_owner}",
+ "sort": "created",
+ "direction": "asc",
+ "per_page": 30
+ }
+ })
+ ```
+2. **If none assigned**: Find unassigned open issues. Prefer issues labeled `good first issue`, `bug`, `help wanted`, or `easy`. Prefer issues with detailed descriptions (>500 chars). Skip issues that already have an open PR linked. Use the same tool with `"assignee": "none"` and walk the labels by re-issuing with `"labels": "good first issue"` etc.
+3. **Self-assign**: Once you pick an unassigned issue, assign it to `{fork_owner}` so no one else picks it up concurrently:
+ ```
+ composio_execute({
+ "tool": "GITHUB_ADD_ASSIGNEES_TO_AN_ISSUE",
+ "arguments": {
+ "owner": "",
+ "repo": "",
+ "issue_number": ,
+ "assignees": ["{fork_owner}"]
+ }
+ })
+ ```
+4. **If no suitable issues at all**: Exit cleanly — report "no suitable issues found".
+
+## Per-run workflow
+
+1. **Pick issue** using the selection strategy above.
+2. **Read the issue.** Fetch the full issue body, comments, and labels via Composio. Note the connected login:
+ ```
+ composio_execute({
+ "tool": "GITHUB_GET_AN_ISSUE",
+ "arguments": { "owner": "", "repo": "", "issue_number": }
+ })
+ composio_execute({
+ "tool": "GITHUB_LIST_ISSUE_COMMENTS",
+ "arguments": { "owner": "", "repo": "", "issue_number": }
+ })
+ ```
+3. **Ensure the fork.** If `{fork_owner}/` exists, use it. Otherwise create a fork of `{upstream}` under `{fork_owner}` via Composio (idempotent — a no-op when the fork is already there):
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_FORK",
+ "arguments": { "owner": "", "repo": "" }
+ })
+ ```
+4. **Clone & branch.** Clone `{upstream}` locally — this is a working-tree op so it stays on local git. Create branch `dev-workflow/-` off `{target_branch}`:
+ ```
+ git clone https://github.com/{upstream} /tmp/--
+ git -C checkout -b dev-workflow/- origin/{target_branch}
+ ```
+5. **Index the codebase.** Run `codegraph_index` on the cloned repo to build a retrieval index.
+6. **Locate the cause.** Use `codegraph_search` with the issue's key symbols and error strings. Respect the `coverage` flag — if not `full`, also use `grep`/`glob`. Open top candidates to confirm the exact edit site.
+7. **Implement.** Make the **minimal** correct fix/feature. Follow existing code style. Re-read files and `git diff` instead of trusting memory.
+8. **Test.** Detect and run available test commands (npm test, cargo test, pytest, etc.). Iterate until green.
+9. **Push via the GitHub API (Composio).** Create the fix branch on the **fork** through Composio (blob → tree → commit → update-ref) — **do not `git push`**, this skill assumes no local push credentials. For each changed file:
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_BLOB",
+ "arguments": {
+ "owner": "{fork_owner}",
+ "repo": "",
+ "content": "",
+ "encoding": "base64"
+ }
+ })
+ ```
+ Compose the new tree from the existing fork tree + the new blobs:
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_TREE",
+ "arguments": {
+ "owner": "{fork_owner}",
+ "repo": "",
+ "base_tree": "",
+ "tree": [ { "path": "", "mode": "100644", "type": "blob", "sha": "" } ]
+ }
+ })
+ ```
+ Create the commit and update the ref:
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_COMMIT",
+ "arguments": {
+ "owner": "{fork_owner}",
+ "repo": "",
+ "message": "(scope): (#)",
+ "tree": "",
+ "parents": [""]
+ }
+ })
+ composio_execute({
+ "tool": "GITHUB_UPDATE_A_REFERENCE",
+ "arguments": {
+ "owner": "{fork_owner}",
+ "repo": "",
+ "ref": "heads/dev-workflow/-",
+ "sha": "",
+ "force": true
+ }
+ })
+ ```
+10. **Open cross-repo PR via Composio.** Open a PR against `{upstream}:{target_branch}` with head `{fork_owner}:`. Body must include `Closes #`, a root-cause + fix summary, and verification steps:
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_PULL_REQUEST",
+ "arguments": {
+ "owner": "",
+ "repo": "",
+ "title": "(scope): (#)",
+ "body": "Closes #\n\n## Root cause\n\n\n## Fix\n\n\n## Verified\n",
+ "head": "{fork_owner}:dev-workflow/-",
+ "base": "{target_branch}",
+ "draft": true
+ }
+ })
+ ```
+
+## Rules
+- **GitHub state via Composio, working tree via local git.** Never shell to `gh` — the preflight gates Composio, not `gh`'s credential store, so a `gh` call can silently use the wrong identity.
+- **One PR per run.** After opening the PR, stop.
+- **Scope.** Only changes that fix the picked issue.
+- **API commits only.** No `git push` — use the Composio GitHub API (blob → tree → commit → update-ref).
+- **codegraph is an accelerant, not a gate.** If cold or unavailable, fall back to `grep`/`glob` — never block on indexing.
+- **If too large/risky** (would touch >20 files or needs multi-system changes), comment on the issue via `GITHUB_CREATE_AN_ISSUE_COMMENT` explaining why and skip.
+- Never force-push to upstream. Never push to upstream directly.
+- You are the **orchestrator**: delegate narrow subtasks to subagents when helpful, but own the end goal.
+- **Stop** when the PR is open, or surface a blocker and stop — don't thrash.
diff --git a/src/openhuman/skills/defaults/dev-workflow/skill.toml b/src/openhuman/skills/defaults/dev-workflow/skill.toml
new file mode 100644
index 0000000000..9ddd6322a0
--- /dev/null
+++ b/src/openhuman/skills/defaults/dev-workflow/skill.toml
@@ -0,0 +1,49 @@
+# dev-workflow — a DEFAULT skill shipped with OpenHuman.
+# Bundled into the binary and seeded into /skills/ on first load
+# (idempotent — never clobbers user edits). Parsed as a SkillDefinition:
+# AgentDefinition fields are flattened in, plus the declared [[inputs]]. At
+# skills_run time it runs as the `orchestrator` agent, focused by SKILL.md,
+# with these inputs rendered into the task prompt.
+#
+# Autonomous developer: picks GitHub issues assigned to the user on an upstream
+# repo, implements fixes using codegraph-accelerated code navigation, and opens
+# cross-repo PRs from a fork.
+id = "dev-workflow"
+when_to_use = "Autonomous developer — picks GitHub issues assigned to the user and raises pull requests. Runs on a schedule via cron."
+
+# Preflight gate (see src/openhuman/skills/preflight.rs). dev-workflow
+# touches GitHub via Composio AND uses local git for the working tree,
+# so both subsystems must be ready before the orchestrator boots:
+#
+# * Composio GitHub connection is active
+# * local `git` is on PATH
+# * `git config --global user.{name,email}` are both set
+# * (strict) Composio username == local git user.name (case-insensitive)
+#
+# Strict identity match is the default — if Composio is signed in as
+# `alice` but local git claims to be `bob`, the PR you push will be
+# authored by `bob` while the OAuth-driven Composio call comes from
+# `alice`. That mismatch produces broken commits no fix-up can recover.
+[github]
+required = true
+identity_match = "strict"
+
+[[inputs]]
+name = "repo"
+description = "The UPSTREAM repo to pick issues from and target PRs against, as owner/name (e.g. acme/web)."
+required = true
+
+[[inputs]]
+name = "upstream"
+description = "Alias for the upstream repo full name. Same as repo if this IS the upstream."
+required = true
+
+[[inputs]]
+name = "target_branch"
+description = "Branch on the upstream to base PRs against (e.g. main)."
+required = true
+
+[[inputs]]
+name = "fork_owner"
+description = "GitHub username of the fork owner — the fix branch is pushed to fork_owner/repo."
+required = true
diff --git a/src/openhuman/skills/defaults/github-issue-crusher/SKILL.md b/src/openhuman/skills/defaults/github-issue-crusher/SKILL.md
new file mode 100644
index 0000000000..e08506cee9
--- /dev/null
+++ b/src/openhuman/skills/defaults/github-issue-crusher/SKILL.md
@@ -0,0 +1,104 @@
+# GitHub Issue Crusher
+
+Fix the **single** GitHub issue named in the inputs, end to end, then open a **DRAFT** pull request via the **fork workflow** — issue on upstream `{repo}`, fix pushed to a fork, cross-repo draft PR back to `{repo}`. Stay strictly in scope; this is autonomous, so work until the draft PR is open or you hit a real blocker, then stop.
+
+## Tool split — Composio for GitHub state, local git for the working tree
+
+GitHub state operations — issues, PRs, comments, reviews, checks, labels, branches as remote refs, repository metadata — go through Composio via `composio_execute({tool: "GITHUB_", arguments: {...}})`. The working tree — clone, checkout, edit, `git status`/`diff`, run tests, commit locally, push the branch to your fork — stays on local `git`. Composio is the single authoritative GitHub identity (the user's connected account, gated by the skill's `[github]` preflight); the local working tree is where the actual code change happens. Do **not** shell out to `gh` for state operations — the preflight checked Composio but not gh's credential store, so a `gh` call may silently route through a different account.
+
+## The two repos
+
+- **Upstream** = `{repo}` — where `#{issue}` lives and where the draft PR is opened (base = `{pr_base}`, or the upstream's default branch).
+- **Fork** = `{fork}` if provided, otherwise the existing fork of `{repo}` under the authed GitHub account. Resolve the authed account once at the top:
+ ```
+ composio_execute({
+ "tool": "GITHUB_GET_THE_AUTHENTICATED_USER",
+ "arguments": {}
+ })
+ ```
+ The response's `login` is ``. If no fork exists yet, create one:
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_FORK",
+ "arguments": { "owner": "", "repo": "" }
+ })
+ ```
+
+## Steps
+
+1. **Read the issue.** Fetch issue `#{issue}` in `{repo}` (title, body, comments) via Composio:
+ ```
+ composio_execute({
+ "tool": "GITHUB_GET_AN_ISSUE",
+ "arguments": { "owner": "", "repo": "", "issue_number": {issue} }
+ })
+ composio_execute({
+ "tool": "GITHUB_LIST_ISSUE_COMMENTS",
+ "arguments": { "owner": "", "repo": "", "issue_number": {issue} }
+ })
+ ```
+ Identify the exact files/changes it asks for.
+
+2. **Ensure the fork.** Resolve `` via `GITHUB_GET_THE_AUTHENTICATED_USER` (cache for the run). Create the fork via `GITHUB_CREATE_A_FORK` if it doesn't already exist (idempotent — a no-op when the fork is already there).
+
+3. **Clone fresh.** Clone `{repo}` locally to a unique directory (e.g. `/tmp/-{issue}-`). If the directory already exists from a previous run, remove it first so the clone starts clean. This is a local-git operation:
+ ```
+ git clone https://github.com/{repo} /tmp/-{issue}-
+ ```
+
+4. **Pin the local git identity** in the clone so commits are verified under the authed account. Use the `login` and `id` already on hand from step 2 — never `--global`, never clobber the host's global config:
+ ```
+ git -C config user.name ""
+ git -C config user.email "+@users.noreply.github.com"
+ ```
+
+5. **Locate the cause.** Start with `codegraph_search` on the issue's key symbols / error strings / literal phrases — it auto-indexes on first call (~30–90s on a fresh clone, this is normal not a hang). Inspect the result:
+ - `coverage: full` → read the top hits and confirm the exact edit site.
+ - `coverage: partial` → refine with `grep` scoped to the directories codegraph returned.
+ - `coverage: none` or zero hits → fall back to a blind `grep` / `glob`.
+
+6. **Apply the minimal fix.** Edit only the files identified in step 5. Re-read each file or `git diff` to confirm the change matches the intent — never trust memory.
+
+7. **Verify.** Run the test/lint commands that apply to the changed files (e.g. `pnpm i18n:check` for i18n, `cargo test -p ` for Rust, `pnpm test ` for TS). Skip if the change is pure docs / strings.
+
+8. **Branch, commit, push to the fork** via local git — pushing is a working-tree operation so it stays on git:
+ ```
+ git -C checkout -b fix/{issue}-
+ git -C add # never git add -A
+ git -C commit -m "(scope): (#{issue})"
+ git -C push -u "https://github.com//" fix/{issue}-
+ ```
+
+9. **Open the DRAFT cross-repo PR via Composio.** This is the canonical Composio call for cross-repo PRs — the `head` value `:` tells GitHub to take the branch from the fork:
+ ```
+ composio_execute({
+ "tool": "GITHUB_CREATE_A_PULL_REQUEST",
+ "arguments": {
+ "owner": "",
+ "repo": "",
+ "title": "(scope): (#{issue})",
+ "body": "Closes #{issue}\n\n## Root cause\n\n\n## Fix\n\n\n## Verified\n",
+ "head": ":fix/{issue}-",
+ "base": "{pr_base}",
+ "draft": true
+ }
+ })
+ ```
+ `draft: true` is non-negotiable for autonomous runs — CI runs and a human reviews before promotion to ready.
+
+10. **Hand off Phase 6 to the shepherd, then exit.** Once the draft PR URL is in hand, invoke the `pr-review-shepherd` skill as a fresh background run so the CI + review loop continues autonomously while *this* skill exits cleanly:
+ ```
+ run_skill {
+ "skill_id": "pr-review-shepherd",
+ "inputs": { "repo": "{repo}", "pr": }
+ }
+ ```
+ The call returns immediately with the shepherd's `run_id` + `log` path. Include both in your final response so the user can track the shepherd, then stop — do not stay around polling CI yourself, that's the shepherd's job.
+
+## Rules
+- **GitHub state via Composio, working tree via local git.** Never shell to `gh` — the preflight gates Composio, not `gh`'s credential store, so a `gh` call can silently use the wrong identity.
+- **Scope:** only changes that fix `#{issue}`. No unrelated cleanup, no other issues.
+- **Source of truth** is the filesystem + `git` + `codegraph` — re-read / re-search rather than relying on recall.
+- **codegraph_search first** for every locate step (it auto-indexes); `grep` / `glob` are refinement or fallback only.
+- **DRAFT always** — never open a PR as ready-to-merge from an autonomous run.
+- **Stop** when the draft PR is open or surface a real blocker and stop — don't thrash.
diff --git a/src/openhuman/skills/defaults/github-issue-crusher/skill.toml b/src/openhuman/skills/defaults/github-issue-crusher/skill.toml
new file mode 100644
index 0000000000..6d1d16f825
--- /dev/null
+++ b/src/openhuman/skills/defaults/github-issue-crusher/skill.toml
@@ -0,0 +1,50 @@
+# github-issue-crusher — a DEFAULT skill shipped with OpenHuman.
+# Bundled into the binary and seeded into /skills/ on first load
+# (idempotent — never clobbers user edits). Parsed as a SkillDefinition:
+# AgentDefinition fields are flattened in, plus the declared [[inputs]]. At
+# skills_run time it runs as the `orchestrator` agent, focused by SKILL.md,
+# with these inputs rendered into the task prompt.
+#
+# Fork-aware: the issue lives on the UPSTREAM repo, the fix is pushed to a FORK
+# (via the GitHub API — no local push creds needed), and the PR is cross-repo.
+id = "github-issue-crusher"
+when_to_use = "Fix one GitHub issue end to end and open a pull request — including the fork workflow (issue on an upstream repo, fix pushed to a fork, cross-repo PR back to upstream)."
+
+# Preflight gate (see src/openhuman/skills/preflight.rs). issue-crusher
+# touches GitHub via Composio AND pushes branches to a fork via local
+# git, so both subsystems must be ready before the orchestrator boots:
+#
+# * Composio GitHub connection is active
+# * local `git` is on PATH
+# * `git config --global user.{name,email}` are both set
+# * (strict) Composio username == local git user.name (case-insensitive)
+#
+# Strict match is important here: a cross-repo PR's fork-side push uses
+# whatever local git is configured as, while the PR itself is opened
+# through Composio under a (potentially different) GitHub account. The
+# gate refuses the run loudly rather than silently producing a PR
+# authored under one identity and pushed under another.
+[github]
+required = true
+identity_match = "strict"
+
+[[inputs]]
+name = "repo"
+description = "The UPSTREAM repo the issue lives on AND the PR targets, as owner/name (e.g. acme/web)."
+required = true
+
+[[inputs]]
+name = "issue"
+description = "Issue number on the upstream repo to pick and fix."
+required = true
+type = "integer"
+
+[[inputs]]
+name = "fork"
+description = "Fork to push the fix branch to, as owner/name. Omit to use (or create) a fork under the connected GitHub account."
+required = false
+
+[[inputs]]
+name = "pr_base"
+description = "Base branch on the upstream the PR targets (default: the upstream's default branch)."
+required = false
diff --git a/src/openhuman/skills/defaults/pr-review-shepherd/SKILL.md b/src/openhuman/skills/defaults/pr-review-shepherd/SKILL.md
new file mode 100644
index 0000000000..2900a18db3
--- /dev/null
+++ b/src/openhuman/skills/defaults/pr-review-shepherd/SKILL.md
@@ -0,0 +1,140 @@
+# PR Review Shepherd
+
+Drive a single open GitHub PR all the way to **ready-for-merge** — CI green, every actionable reviewer/bot comment addressed, approvals in. This is autonomous Phase-6 work: iterate the **check → fix → push → re-check** loop until both gates close, or surface a real blocker and stop.
+
+## Tool split — Composio for GitHub state, local git for the working tree
+
+GitHub state operations — PR details, comments (top-level and inline review), check runs, status rollups, comment replies, labels — go through Composio via `composio_execute({tool: "GITHUB_", arguments: {...}})`. The working tree — clone the fork branch, edit files, run tests, commit, force-with-lease push to the fork — stays on local `git`. Composio is the single authoritative GitHub identity (the user's connected account, gated by the skill's `[github]` preflight); the local working tree is where the actual fix lands. Do **not** shell out to `gh` for state operations — the preflight checked Composio but not gh's credential store, so a `gh` call may silently route through a different account.
+
+## When this skill is "done"
+
+Both must hold:
+1. **CI green** — every required check on PR `#{pr}` is `success` (or explicitly waived by a maintainer in the thread).
+2. **All actionable comments resolved** — every comment from a human reviewer or bot (CodeRabbit, Codecov, etc.) is either (a) addressed by a follow-up commit AND replied to on the thread, or (b) intentionally deferred with a one-line reason replied on the thread.
+
+Also stop if the PR is **merged** (success) or **closed without merge** (note the reason and report).
+
+## Steps
+
+1. **Snapshot the PR state** for `#{pr}` on `{repo}` via Composio. Issue these in parallel where the model can — each is a read-only state op:
+ ```
+ composio_execute({
+ "tool": "GITHUB_GET_A_PULL_REQUEST",
+ "arguments": { "owner": "", "repo": "", "pull_number": {pr} }
+ })
+ composio_execute({
+ "tool": "GITHUB_LIST_REVIEW_COMMENTS_ON_A_PULL_REQUEST",
+ "arguments": { "owner": "", "repo": "", "pull_number": {pr} }
+ })
+ composio_execute({
+ "tool": "GITHUB_LIST_ISSUE_COMMENTS",
+ "arguments": { "owner": "", "repo": "", "issue_number": {pr} }
+ })
+ composio_execute({
+ "tool": "GITHUB_LIST_REVIEWS_FOR_A_PULL_REQUEST",
+ "arguments": { "owner": "", "repo": "", "pull_number": {pr} }
+ })
+ composio_execute({
+ "tool": "GITHUB_GET_THE_COMBINED_STATUS_FOR_A_SPECIFIC_REFERENCE",
+ "arguments": { "owner": "", "repo": "", "ref": "" }
+ })
+ composio_execute({
+ "tool": "GITHUB_LIST_CHECK_RUNS_FOR_A_GIT_REFERENCE",
+ "arguments": { "owner": "", "repo": "", "ref": "" }
+ })
+ ```
+ PRs in the GitHub API are addressable both as `pull_number` (the PR-specific endpoints) and `issue_number` (top-level comments live on the issue surface). The check-rollup endpoints take the head SHA from the PR response.
+
+ Derive `` from the PR's `head.repo.owner.login` (or use `{fork}` if provided). Note the head branch name as ``. Record: failing-check ids, unresolved comment threads (with their body + author + path/line if inline), approval count, merge state, PR state (`OPEN` / `MERGED` / `CLOSED`).
+
+ _TODO(composio-catalog): if the exact slugs above drift in the Composio catalog, swap to whatever the current name is. The shapes (owner/repo/pull_number/issue_number/ref) are stable; the slug casing is what occasionally changes._
+
+2. **Check terminal conditions first.**
+ - PR `state` is `MERGED` → report `"merged: "` and stop.
+ - PR `state` is `CLOSED` (not merged) → report `"closed: "` and stop.
+ - All required checks `success` AND zero unresolved actionable threads AND at least one approval → report `"ready for merge: "` and stop.
+ - Otherwise → continue.
+
+3. **Clone the fork branch fresh** to a unique local directory (skip this if the directory from a prior round in this same run already exists and is on the right HEAD). Clone + identity-pin are local-git working-tree ops:
+ ```
+ git clone --branch https://github.com// /tmp/-pr{pr}-
+ ```
+ Pin the local git identity in the clone so any new commits are verified under the authed account. Use the `login` and `id` from a one-time `GITHUB_GET_THE_AUTHENTICATED_USER` call:
+ ```
+ composio_execute({ "tool": "GITHUB_GET_THE_AUTHENTICATED_USER", "arguments": {} })
+ # then with + from the response:
+ git -C