Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ jobs:
.github/workflows/ci.yml|Cargo.toml|Cargo.lock|crates/*/Cargo.toml) run_rustfs_ci=true ;;
crates/omnigraph/src/storage.rs) run_rustfs_ci=true ;;
crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;;
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/write_cost_s3.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
crates/omnigraph/src/table_store.rs|crates/omnigraph/src/instrumentation.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
Expand Down Expand Up @@ -372,6 +373,9 @@ jobs:
- name: Run RustFS storage tests
run: cargo test --locked -p omnigraph-engine --test s3_storage -- --nocapture

- name: Run RustFS write-path cost gate (RFC-013 step 3a opener)
run: cargo test --locked -p omnigraph-engine --test write_cost_s3 -- --nocapture

- name: Run RustFS server smoke
# No name filter: every test in the s3 target is bucket-gated, and a
# filter matching nothing passes vacuously (which silently ran zero
Expand Down
5 changes: 4 additions & 1 deletion crates/omnigraph/src/db/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ mod layout;
mod metadata;
#[path = "manifest/migrations.rs"]
mod migrations;
// Entirely test-only since RFC-013 step 3a: with both reads (Fix 2) and writes
// bypassing the Lance namespace, nothing in production routes through it; the
// `LanceNamespace` impls are retained only to validate the contract in unit tests.
#[cfg(test)]
#[path = "manifest/namespace.rs"]
mod namespace;
#[path = "manifest/publisher.rs"]
Expand All @@ -28,7 +32,6 @@ use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_
pub(crate) use metadata::TableVersionMetadata;
#[cfg(test)]
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
Expand Down
1 change: 1 addition & 0 deletions crates/omnigraph/src/db/manifest/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub(super) fn table_uri_for_path(root_uri: &str, table_path: &str, branch: Optio
}
}

#[cfg(test)]
pub(super) fn namespace_internal_error(message: impl Into<String>) -> LanceNamespaceError {
LanceNamespaceError::namespace_source(Box::new(std::io::Error::other(message.into())))
}
5 changes: 4 additions & 1 deletion crates/omnigraph/src/db/manifest/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::collections::HashMap;

use lance::Dataset;
use lance_namespace::Error as LanceNamespaceError;
use lance_namespace::models::{CreateTableVersionRequest, TableVersion};
use lance_namespace::models::CreateTableVersionRequest;
#[cfg(test)]
use lance_namespace::models::TableVersion;
use serde::{Deserialize, Serialize};

use crate::error::{OmniError, Result};
Expand Down Expand Up @@ -142,6 +144,7 @@ impl TableVersionMetadata {
self.to_namespace_version_with_details(version, None, None)
}

#[cfg(test)]
pub(super) fn to_namespace_version_with_details(
&self,
version: u64,
Expand Down
65 changes: 13 additions & 52 deletions crates/omnigraph/src/db/manifest/namespace.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
// Both the read namespace (BranchManifestNamespace) and the write namespace
// (StagedTableNamespace) are now test-only contract validation. Reads open
// sub-tables directly by location+version (SubTableEntry::open, Fix 2), and
// writes open the table head directly by URI (TableStore::open_dataset_head,
// RFC-013 step 3a), so nothing in production routes through the Lance namespace
// anymore. These impls are retained only to validate the LanceNamespace
// contract in unit tests.
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -16,30 +23,21 @@ use object_store::{

use crate::error::{OmniError, Result};

use super::layout::{namespace_internal_error, table_uri_for_path};
#[cfg(test)]
use super::layout::{open_manifest_dataset, table_id_to_key};
use super::metadata::TableVersionMetadata;
#[cfg(test)]
use super::metadata::{namespace_version_metadata, parse_namespace_version_request};
#[cfg(test)]
use super::layout::{
namespace_internal_error, open_manifest_dataset, table_id_to_key, table_uri_for_path,
};
use super::metadata::{
TableVersionMetadata, namespace_version_metadata, parse_namespace_version_request,
};
use super::publisher::GraphNamespacePublisher;
// The read namespace (BranchManifestNamespace) is test-only since Fix 2: reads
// open sub-tables directly by location+version (SubTableEntry::open), so nothing
// in production routes a read through the Lance namespace. The writes path uses
// StagedTableNamespace. These items are retained to validate the namespace
// contract in unit tests.
#[cfg(test)]
use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state};

#[cfg(test)]
#[derive(Debug, Clone)]
struct BranchManifestNamespace {
root_uri: String,
branch: Option<String>,
}

#[cfg(test)]
impl BranchManifestNamespace {
fn new(root_uri: &str, branch: Option<&str>) -> Self {
Self {
Expand Down Expand Up @@ -146,7 +144,6 @@ impl StagedTableNamespace {
}
}

#[cfg(test)]
pub(crate) fn branch_manifest_namespace(
root_uri: &str,
branch: Option<&str>,
Expand All @@ -165,27 +162,6 @@ pub(crate) fn staged_table_namespace(
))
}

async fn load_table_from_namespace(
namespace: Arc<dyn LanceNamespace>,
table_key: &str,
branch: Option<&str>,
version: Option<u64>,
) -> Result<Dataset> {
let builder = DatasetBuilder::from_namespace(namespace, vec![table_key.to_string()])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let builder = match (branch, version) {
(Some(branch), version) => builder.with_branch(branch, version),
(None, Some(version)) => builder.with_version(version),
(None, None) => builder,
};
builder
.load()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}

#[cfg(test)]
#[async_trait]
impl LanceNamespace for BranchManifestNamespace {
fn namespace_id(&self) -> String {
Expand Down Expand Up @@ -540,18 +516,3 @@ impl LanceNamespace for StagedTableNamespace {
Ok(response)
}
}

pub(crate) async fn open_table_head_for_write(
root_uri: &str,
table_key: &str,
table_path: &str,
branch: Option<&str>,
) -> Result<Dataset> {
load_table_from_namespace(
staged_table_namespace(root_uri, table_key, table_path, branch),
table_key,
branch,
None,
)
.await
}
24 changes: 18 additions & 6 deletions crates/omnigraph/src/table_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::sync::Arc;

use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
use crate::db::manifest::TableVersionMetadata;
use crate::db::{Snapshot, SubTableEntry};
use crate::error::{OmniError, Result};
use crate::storage_layer::ForkOutcome;
Expand Down Expand Up @@ -160,9 +160,15 @@ impl TableStore {
dataset_uri: &str,
branch: Option<&str>,
) -> Result<Dataset> {
let ds = Dataset::open(dataset_uri)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Direct open by URI (O(1) latest-resolution). Routed through the tracked
// opener so a cost test counts it via the per-query `table_wrapper`
// (no-op in production — the task-local is unset, so this is exactly
// `Dataset::open(uri)`).
let ds = crate::instrumentation::open_dataset_tracked(
dataset_uri,
crate::instrumentation::table_wrapper(),
)
.await?;
match branch {
Some(branch) if branch != "main" => ds
.checkout_branch(branch)
Expand All @@ -178,8 +184,14 @@ impl TableStore {
dataset_uri: &str,
branch: Option<&str>,
) -> Result<Dataset> {
let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
// RFC-013 step 3a: open writes via the direct opener (O(1)) instead of the
// lance-namespace builder, which re-resolved the table's version chain
// O(depth) per write. The namespace is a catalog/discovery layer, not a
// per-open hot-path component (RFC §2.4); the manifest already holds the
// location, and `ensure_expected_version` still validates head == pinned
// for strict ops. `table_key` retained for signature stability.
let _ = table_key;
self.open_dataset_head(dataset_uri, branch).await
}

pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
Expand Down
Loading