Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/omnigraph/src/db/commit_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
}

fn graph_commit_actors_uri(root_uri: &str) -> String {
pub(crate) fn graph_commit_actors_uri(root_uri: &str) -> String {
format!(
"{}/{}",
root_uri.trim_end_matches('/'),
Expand Down
3 changes: 2 additions & 1 deletion crates/omnigraph/src/db/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ mod recovery;
mod state;

use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash};
use layout::{open_manifest_dataset, table_uri_for_path, type_name_hash};
pub(crate) use layout::manifest_uri;
pub(crate) use metadata::TableVersionMetadata;
#[cfg(test)]
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
Expand Down
2 changes: 1 addition & 1 deletion crates/omnigraph/src/db/manifest/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(super) fn type_name_hash(name: &str) -> String {
format!("{:016x}", h)
}

pub(super) fn manifest_uri(root: &str) -> String {
pub(crate) fn manifest_uri(root: &str) -> String {
format!("{}/{}", root.trim_end_matches('/'), MANIFEST_DIR)
}

Expand Down
248 changes: 243 additions & 5 deletions crates/omnigraph/src/db/omnigraph/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,8 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
tasks
};

if table_tasks.is_empty() {
return Ok(Vec::new());
}

// NB: do NOT early-return when `table_tasks` is empty (a schema with no
// node/edge types) — the internal system tables below must still be compacted.
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);

let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
Expand Down Expand Up @@ -279,7 +277,42 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
}
}

stats.into_iter().collect()
// Compact the internal system tables too (RFC-013 step 2). They are not
Comment thread
cursor[bot] marked this conversation as resolved.
// catalog-tracked, so they take a separate, simpler path (`compact_internal_table`):
// compact in place, no manifest publish, no sidecar. Appended after the
// data-table stats so the data-table cache invalidation above is computed from
// data-table stats only; each internal compaction does its own coordinator
// refresh for cache coherence.
let mut all = stats;
// One source of truth for the internal system tables optimize compacts. The
// commit graph is THREE tables, not one: the DAG (`_graph_commits`), the actor
// map (`_graph_commit_actors`, appended by every *authenticated* write — the
// production server/CLI path always carries an actor), and the manifest. Missing
// any leaves an O(history) scan on a live write path. `__manifest` is always
// present (created at init); the two commit-graph tables may be absent (the
// coordinator opens them as `Option`, gated on existence — graphs predating the
// commit graph, and the actor table is itself optional), so guard each with the
// same existence check rather than letting `Dataset::open` error and fail the
// whole optimize.
let root = db.root_uri();
let internal_tables: [(&str, String); 3] = [
("__manifest", crate::db::manifest::manifest_uri(root)),
(
"_graph_commits",
crate::db::commit_graph::graph_commits_uri(root),
),
(
"_graph_commit_actors",
crate::db::commit_graph::graph_commit_actors_uri(root),
),
];
for (table_key, uri) in internal_tables {
if table_key == "__manifest" || db.storage_adapter().exists(&uri).await? {
all.push(compact_internal_table(db, table_key, uri).await);
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

all.into_iter().collect()
}

/// Compact one table and publish the compacted version to the `__manifest`.
Expand Down Expand Up @@ -438,7 +471,25 @@ async fn optimize_one_table(
// uses. `optimize_indices` is an inline-commit residual: lance-6.0.1
// exposes no uncommitted variant, so like `compact_files` it commits
// directly and relies on the sidecar for recovery.
// Capture the baseline BEFORE the auto-cleanup scrub below, so that if the
// scrub is the only thing that commits, `committed` is still true and Phase C
// publishes the advanced HEAD (no uncovered HEAD>manifest drift).
let version_before = ds.version().version;

// Keep optimize non-destructive on upgraded graphs (same guarantee the
// internal-table path makes — see `clear_stale_auto_cleanup_config`).
// `compact_files` / `optimize_indices` commit with a default `CommitConfig`
// (`skip_auto_cleanup = false`) and expose no skip override, so on a graph
// created by a pre-v7 binary (auto_cleanup ON) those commits would fire
// Lance's version-GC hook and prune `__manifest`-pinned data-table versions.
// Strip the stale config first. We hold the per-table queue, so no concurrent
// writer can race this (no retry loop needed, unlike the internal-table path);
// any commit it makes is content-preserving and covered by the Optimize
// sidecar's loose `post_commit_pin` like the other Phase-B commits.
clear_stale_auto_cleanup_config(&mut ds)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;

let metrics: CompactionMetrics = if will_compact {
compact_files(&mut ds, options, None)
.await
Expand Down Expand Up @@ -514,6 +565,173 @@ async fn optimize_one_table(
Ok(stat)
}

/// Bound on the app-level retry of an internal-table compaction against a
/// concurrent live writer (see [`is_retryable_lance_conflict`]).
const INTERNAL_COMPACTION_RETRY_BUDGET: u32 = 5;

/// A Lance commit error that means "a concurrent writer preempted us; reload the
/// dataset and rerun." `compact_files` commits via `commit_compaction` ->
/// `apply_commit` *directly* — unlike the merge-insert path it is NOT wrapped in
/// `execute_with_retry`, so a `Rewrite`-vs-`Merge`/`Update`/`Delete` `check_txn`
/// conflict propagates raw instead of being rebased or converted to
/// `TooMuchWriteContention`. Lance's transaction spec prescribes that the
/// *application* reruns these, which is what `compact_internal_table` does — so a
/// maintenance compaction (a physical op) never fails a live write (a logical op),
/// invariant 7. (`TooMuchWriteContention` is included for the exhausted-retry form
/// some commit paths surface.)
fn is_retryable_lance_conflict(err: &lance::Error) -> bool {
matches!(
err,
lance::Error::RetryableCommitConflict { .. }
| lance::Error::CommitConflict { .. }
| lance::Error::TooMuchWriteContention { .. }
)
}

/// Remove any stored `lance.auto_cleanup.*` config from a table so compaction
/// stays **non-destructive by construction**. Used by both the internal-table
/// path ([`compact_internal_table`]) and the data-table path
/// ([`optimize_one_table`]).
///
/// `compact_files` / `optimize_indices` commit with a default `CommitConfig`
/// (`skip_auto_cleanup = false`) and `CompactionOptions` exposes no override, so on
/// a dataset whose stored config has `lance.auto_cleanup.interval` set, the
/// compaction/reindex commit would fire Lance's auto-cleanup hook (version GC) —
/// deletion of old versions, including ones `__manifest` pins for snapshots /
/// time-travel (data tables) or that hold lineage/time-travel state (internal
/// tables). New graphs create tables with `auto_cleanup: None` (`manifest/graph.rs`,
/// `commit_graph.rs`, and the data-table create path) so there is nothing to clear;
/// only pre-`auto_cleanup`-fix *upgraded* graphs carry the config. OmniGraph owns
/// version cleanup explicitly (`cleanup`), so Lance's hook is unwanted regardless —
/// clearing it both makes `optimize` non-destructive and aligns the table with the
/// new-graph posture. The `delete_config_keys` commit itself does not GC: the
/// resulting manifest no longer has the `interval` key, so the post-commit hook is a
/// no-op. Returns whether any config was cleared (it advances Lance HEAD iff so).
/// Recovery coverage differs by caller: the data-table path runs this inside the
/// Optimize sidecar window; the internal-table path needs none (it commits at HEAD
/// and is read at HEAD — the strip is a content-preserving config commit, so a crash
/// leaves the table readable and content-identical, see [`compact_internal_table`]).
async fn clear_stale_auto_cleanup_config(
ds: &mut lance::Dataset,
) -> std::result::Result<bool, lance::Error> {
let keys: Vec<String> = ds
.config()
.keys()
.filter(|k| k.starts_with("lance.auto_cleanup."))
.cloned()
.collect();
if keys.is_empty() {
return Ok(false);
}
// Merge-update with `None` values to delete the keys — the non-deprecated
// replacement for `delete_config_keys` (awaiting the builder merges rather
// than replacing the whole config map).
let entries: Vec<(&str, Option<&str>)> = keys.iter().map(|k| (k.as_str(), None)).collect();
ds.update_config(entries).await?;
Ok(true)
}

/// Compact one INTERNAL system table (`__manifest` / `_graph_commits` /
/// `_graph_commit_actors`) in place.
///
/// Unlike catalog data tables, the internal tables are not tracked in the
/// `__manifest` (they ARE the manifest / the lineage DAG): readers open them at
/// their latest Lance HEAD, so compaction just advances that HEAD and the next
/// reader transparently observes the compacted version. That makes this path much
/// simpler than [`optimize_one_table`] — no manifest publish (nothing to publish
/// to), and no recovery sidecar. The sidecar-free claim does NOT rest on
/// single-commit atomicity: `compact_files` can emit a `ReserveFragments` commit
/// before the final `Rewrite` (and the config strip is a separate commit before
/// both), so this advances HEAD over one or more commits. It needs no sidecar
/// because every one of those commits is content-preserving and the table is read
/// at HEAD — a crash at any point leaves the table readable and content-identical,
/// and the next `optimize` re-plans. Internal tables carry no Lance index (only
/// `object_id`'s unenforced-PK schema metadata), so no `optimize_indices`.
///
/// Concurrency: no application lock, but `compact_files` does NOT auto-retry a
/// semantic conflict — its `Operation::Rewrite` commits through `apply_commit`
/// directly (not the merge-insert `execute_with_retry` path), so a `Rewrite`
/// vs concurrent `Update`/`Merge`/`Delete` `check_txn` conflict propagates raw.
/// We own the retry here (see [`is_retryable_lance_conflict`]): on a retryable
/// conflict, reopen at the new HEAD and rerun. A follow-up coordinator `refresh`
/// makes the warm internal-table handles observe the compacted HEAD
/// deterministically (the version probe would also self-heal on the next read).
async fn compact_internal_table(
db: &Omnigraph,
table_key: &str,
uri: String,
) -> Result<TableOptimizeStats> {
// App-level retry against concurrent live writers. compact_files does NOT
// auto-retry a Rewrite-vs-live-write conflict (see is_retryable_lance_conflict),
// so optimize would otherwise fail spuriously on a live graph. On a retryable
// conflict we re-open at the new HEAD and rerun — the canonical Lance-consumer
// pattern. Each attempt opens fresh because the conflict means the version moved.
for attempt in 0..INTERNAL_COMPACTION_RETRY_BUDGET {
let handle = db
.storage()
.open_dataset_head_for_write(table_key, &uri, None)
.await?;
let mut ds = handle.into_dataset();

// Keep optimize non-destructive by construction (see clear_stale_auto_cleanup_config).
// Returns whether it committed a config-strip (which advances Lance HEAD).
let cleared_config = match clear_stale_auto_cleanup_config(&mut ds).await {
Ok(cleared) => cleared,
Err(e) => {
if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e)
{
continue;
}
return Err(OmniError::Lance(e.to_string()));
}
};

let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if plan.num_tasks() == 0 {
// No compaction work, but a config-strip still advanced HEAD — refresh
// the warm coordinator handles so they observe it deterministically
// (same cache-coherence step the successful-compaction path takes
// below; otherwise they stay pinned until the next version probe).
if cleared_config {
db.coordinator.write().await.refresh().await?;
}
return Ok(TableOptimizeStats::compacted(
table_key.to_string(),
&CompactionMetrics::default(),
false,
));
Comment thread
cursor[bot] marked this conversation as resolved.
}

match compact_files(&mut ds, options, None).await {
Ok(metrics) => {
// Cache coherence: re-open the warm coordinator's internal-table
// handles at the compacted HEAD (they live in `db.coordinator`, not
// the data-table `runtime_cache`).
db.coordinator.write().await.refresh().await?;
return Ok(TableOptimizeStats::compacted(
table_key.to_string(),
&metrics,
true,
));
}
Err(e)
if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET
&& is_retryable_lance_conflict(&e) =>
{
continue;
}
Err(e) => return Err(OmniError::Lance(e.to_string())),
}
}
Err(OmniError::manifest_conflict(format!(
"internal-table compaction of {table_key} exhausted {INTERNAL_COMPACTION_RETRY_BUDGET} \
retries against concurrent writers"
)))
}

/// Run Lance `cleanup_old_versions` on every node + edge table on `main`,
/// using [`CleanupPolicyOptions`]. The latest manifest is always preserved
/// regardless (Lance invariant).
Expand Down Expand Up @@ -912,6 +1130,26 @@ mod tests {
use crate::failpoints::ScopedFailPoint;
use crate::loader::{LoadMode, load_jsonl};

/// The internal-table compaction retry classifier: a concurrent live writer
/// preempting our `Rewrite` is retryable (Lance prescribes app-rerun, and
/// compact_files does not auto-retry it); a non-conflict error is not (must not
/// be masked by a blind retry).
#[test]
fn retryable_lance_conflicts_are_classified() {
assert!(is_retryable_lance_conflict(
&lance::Error::retryable_commit_conflict_source(
1,
Box::new(std::io::Error::other("preempted by concurrent write")),
)
));
assert!(is_retryable_lance_conflict(
&lance::Error::too_much_write_contention("contended")
));
assert!(!is_retryable_lance_conflict(&lance::Error::invalid_input(
"not a conflict"
)));
}

fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
Expand Down
17 changes: 17 additions & 0 deletions crates/omnigraph/tests/helpers/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,23 @@ pub async fn measure_insert(db: &mut Omnigraph, tag: &str) -> IoCounts {
io
}

/// Like [`measure_insert`] but carries an actor, so the write appends to and reads
/// `_graph_commit_actors.lance` — the authenticated (server/CLI) write path. The
/// commit-graph IO wrapper covers both `_graph_commits` and `_graph_commit_actors`,
/// so `IoCounts::commit_graph_reads` includes the actor-table scan on this path.
pub async fn measure_insert_as(db: &mut Omnigraph, tag: &str, actor: &str) -> IoCounts {
let (res, io) = measure(db.mutate_as(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", tag)], &[("$age", 30)]),
Some(actor),
))
.await;
res.unwrap();
io
}

// ── Backend fixtures — one knob, store-agnostic body ──

/// Local tempdir graph (default; deterministic, every-PR).
Expand Down
16 changes: 16 additions & 0 deletions crates/omnigraph/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,22 @@ pub async fn commit_many(db: &mut Omnigraph, n: usize) {
}
}

/// Like [`commit_many`] but every commit carries an actor, so it grows
/// `_graph_commit_actors.lance` too — the authenticated (server/CLI) write path.
pub async fn commit_many_as(db: &mut Omnigraph, n: usize, actor: &str) {
for i in 0..n {
db.mutate_as(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", &format!("commit_many_as_{i}"))], &[("$age", 30)]),
Some(actor),
)
.await
.unwrap();
}
}

pub async fn snapshot_main(db: &Omnigraph) -> Result<Snapshot> {
db.snapshot_of(ReadTarget::branch("main")).await
}
Expand Down
Loading