diff --git a/java/src/main/java/org/lance/index/scalar/InvertedIndexParams.java b/java/src/main/java/org/lance/index/scalar/InvertedIndexParams.java index ca0a7a46c70..d58cd71ece2 100755 --- a/java/src/main/java/org/lance/index/scalar/InvertedIndexParams.java +++ b/java/src/main/java/org/lance/index/scalar/InvertedIndexParams.java @@ -54,6 +54,7 @@ public static final class Builder { private Integer maxNgramLength; private Boolean prefixOnly; private Boolean skipMerge; + private Integer formatVersion; /** * Configure the base tokenizer. @@ -236,6 +237,23 @@ public Builder skipMerge(boolean skipMerge) { return this; } + /** + * Configure the on-disk FTS format version to write when creating a new index. + * + *

If unset, Lance chooses the current default format. + * + * @param formatVersion FTS format version, must be 1 or 2 + * @return this builder + * @throws IllegalArgumentException + */ + public Builder formatVersion(int formatVersion) { + if (formatVersion != 1 && formatVersion != 2) { + throw new IllegalArgumentException("formatVersion must be 1 or 2"); + } + this.formatVersion = formatVersion; + return this; + } + /** Build a {@link ScalarIndexParams} instance for an inverted index. */ public ScalarIndexParams build() { Map params = new HashMap<>(); @@ -283,6 +301,9 @@ public ScalarIndexParams build() { if (skipMerge != null) { params.put("skip_merge", skipMerge); } + if (formatVersion != null) { + params.put("format_version", formatVersion); + } String json = JsonUtils.toJson(params); return ScalarIndexParams.create(INDEX_TYPE, json); diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 45dc1b253d3..369b3648655 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3134,6 +3134,7 @@ def create_scalar_index( fragment_ids: Optional[List[int]] = None, index_uuid: Optional[str] = None, progress_callback: Optional[Callable[[IndexProgress], None]] = None, + format_version: Optional[Union[int, str]] = None, **kwargs, ): """Create a scalar index on a column. @@ -3239,6 +3240,11 @@ def create_scalar_index( progress_callback : callable, optional A callback that receives :class:`lance.progress.IndexProgress` events while the index is being built. + format_version: int or str, optional + This is for the ``INVERTED`` / ``FTS`` index. Explicit on-disk FTS + format version to write when creating a new index. Accepts ``1``, + ``2``, ``"v1"``, or ``"v2"``. If unset, Lance chooses the current + default format. with_position: bool, default False This is for the ``INVERTED`` index. If True, the index will store the @@ -3344,6 +3350,8 @@ def create_scalar_index( kwargs["index_uuid"] = index_uuid if progress_callback is not None: kwargs["progress_callback"] = progress_callback + if format_version is not None: + kwargs["format_version"] = format_version self._ds.create_index([column], index_type, name, replace, train, None, kwargs) diff --git a/python/python/tests/compat/compat_sequence.py b/python/python/tests/compat/compat_sequence.py index bec216d6a6f..48df46500ca 100644 --- a/python/python/tests/compat/compat_sequence.py +++ b/python/python/tests/compat/compat_sequence.py @@ -18,8 +18,9 @@ "ignore the index" mode to diff against, so its oracle reconstructs ground truth from a full scan: tokenize every live row, then require an FTS search for a spread of sampled terms to return exactly the rows that contain them. The FTS scenarios run under both -on-disk format versions (LANCE_FTS_FORMAT_VERSION 1 and 2), which take different merge -paths. +on-disk format versions (1 and 2), which take different merge paths. New Lance versions +pin this with the create-index `format_version` parameter; old Lance versions still use +`LANCE_FTS_FORMAT_VERSION`. The op vocabulary and bounds are deliberately small so the search is runnable; this is exhaustive over the maintenance-lifecycle grammar up to the configured lengths, not over @@ -63,11 +64,12 @@ def describe(kind, from_ref, to_ref, setup_ops, exercise_ops, fts_version=None): class IndexScenario: """A picklable, kind-parameterized scenario run across a version split.""" - def __init__(self, kind, path, setup_ops, exercise_ops): + def __init__(self, kind, path, setup_ops, exercise_ops, fts_version=None): self.kind = kind self.path = str(path) self.setup_ops = list(setup_ops) self.exercise_ops = list(exercise_ops) + self.fts_version = fts_version self.next_idx = 0 # --- in-venv helpers (only lance + pyarrow available) --- @@ -123,6 +125,8 @@ def _op_W(self): def _op_I(self): kwargs = {"with_position": True} if self.kind == "INVERTED" else {} + if self.kind == "INVERTED" and self.fts_version is not None: + kwargs["format_version"] = int(self.fts_version) self._open().create_scalar_index("key", self._index_type(), **kwargs) def _op_D(self): @@ -234,9 +238,10 @@ def search( """Search index-maintenance sequences up to `max_length` ops for one `kind`, across (from_ref -> to_ref). Runs only scenarios in this shard (i % num_shards == shard) so the space can be split across parallel workers. For INVERTED, `fts_version` ("1" or - "2") pins the on-disk FTS format (LANCE_FTS_FORMAT_VERSION) on both sides; both are - Fst token sets and exercise distinct merge paths. Returns failures; stops on the - first when `stop_on_first`.""" + "2") pins the on-disk FTS format on both sides. New Lance versions receive this + through the create-index parameter and old Lance versions receive it through + LANCE_FTS_FORMAT_VERSION. Both are Fst token sets and exercise distinct merge paths. + Returns failures; stops on the first when `stop_on_first`.""" from_venv = venv_factory.get_venv(from_ref) to_venv = venv_factory.get_venv(to_ref) env = {} @@ -256,7 +261,7 @@ def search( if key not in snapshots: snap = base / f"snap_{kind}_{len(snapshots)}" shutil.rmtree(snap, ignore_errors=True) - builder = IndexScenario(kind, snap, setup_tail, []) + builder = IndexScenario(kind, snap, setup_tail, [], fts_version) try: next_idx = from_venv.execute_method(builder, "setup", env) snapshots[key] = (snap, next_idx) @@ -277,7 +282,7 @@ def search( ex_path = base / f"ex_{kind}_{i}" shutil.rmtree(ex_path, ignore_errors=True) shutil.copytree(snap, ex_path) - scenario = IndexScenario(kind, ex_path, setup_tail, exercise) + scenario = IndexScenario(kind, ex_path, setup_tail, exercise, fts_version) scenario.next_idx = next_idx label = describe(kind, from_ref, to_ref, setup_tail, exercise, fts_version) try: diff --git a/python/python/tests/compat/test_scalar_indices.py b/python/python/tests/compat/test_scalar_indices.py index 35022df3b12..c3bf301eee0 100644 --- a/python/python/tests/compat/test_scalar_indices.py +++ b/python/python/tests/compat/test_scalar_indices.py @@ -320,7 +320,9 @@ def create(self): max_rows_per_file=100, data_storage_version=safe_data_storage_version(self.compat_version), ) - dataset.create_scalar_index("text", "INVERTED", with_position=True) + dataset.create_scalar_index( + "text", "INVERTED", with_position=True, format_version=1 + ) def check_read(self): """Verify FTS index can be queried.""" @@ -349,9 +351,7 @@ def check_write(self): def skip_downgrade(self, version: str) -> bool: return version.startswith("0.") - def current_env(self, method_name: str) -> dict[str, str]: - if method_name == "create": + def compat_env(self, version: str, method_name: str) -> dict[str, str]: + if method_name in {"create", "check_write"}: return {"LANCE_FTS_FORMAT_VERSION": "1"} - if method_name == "check_write": - return {"LANCE_FTS_FORMAT_VERSION": "2"} return {} diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index b6e882633f5..348d615d3da 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -4802,9 +4802,11 @@ def test_json_inverted_match_query(tmp_path): assert results.num_rows == 1 -@pytest.mark.parametrize("fts_format_version", ["1", "2"]) -def test_describe_indices(tmp_path, monkeypatch, fts_format_version): - monkeypatch.setenv("LANCE_FTS_FORMAT_VERSION", fts_format_version) +@pytest.mark.parametrize( + ("format_version", "expected_format_version"), + [(1, 1), (2, 2), ("v1", 1), ("v2", 2)], +) +def test_describe_indices(tmp_path, format_version, expected_format_version): data = pa.table( { "id": range(100), @@ -4820,7 +4822,7 @@ def test_describe_indices(tmp_path, monkeypatch, fts_format_version): } ) ds = lance.write_dataset(data, tmp_path) - ds.create_scalar_index("text", index_type="INVERTED") + ds.create_scalar_index("text", index_type="INVERTED", format_version=format_version) indices = ds.describe_indices() assert len(indices) == 1 @@ -4834,7 +4836,7 @@ def test_describe_indices(tmp_path, monkeypatch, fts_format_version): assert indices[0].segments[0].uuid is not None assert indices[0].segments[0].fragment_ids == {0} assert indices[0].segments[0].dataset_version_at_last_update == 1 - assert indices[0].segments[0].index_version == int(fts_format_version) + assert indices[0].segments[0].index_version == expected_format_version assert indices[0].segments[0].created_at is not None assert isinstance(indices[0].segments[0].created_at, datetime) assert indices[0].segments[0].size_bytes is not None @@ -4933,6 +4935,25 @@ def test_describe_indices(tmp_path, monkeypatch, fts_format_version): assert index.num_rows_indexed == 50 +def test_create_inverted_index_defaults_to_v2_and_ignores_env(tmp_path, monkeypatch): + monkeypatch.setenv("LANCE_FTS_FORMAT_VERSION", "1") + data = pa.table({"text": ["document about lance database"]}) + ds = lance.write_dataset(data, tmp_path) + + ds.create_scalar_index("text", index_type="INVERTED") + + indices = ds.describe_indices() + assert indices[0].segments[0].index_version == 2 + + +def test_create_inverted_index_rejects_invalid_format_version(tmp_path): + data = pa.table({"text": ["document about lance database"]}) + ds = lance.write_dataset(data, tmp_path) + + with pytest.raises(ValueError, match="unsupported FTS format version"): + ds.create_scalar_index("text", index_type="INVERTED", format_version="v3") + + def test_vector_filter_fts_search(tmp_path): # Create test data ids = list(range(1, 301)) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 31eaa96a654..f16ae198d72 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -76,6 +76,7 @@ use lance_index::{ FtsPrewarmOptions, IndexParams, IndexType, PrewarmOptions, optimize::OptimizeOptions, progress::{IndexBuildProgress, NoopIndexBuildProgress}, + scalar::inverted::InvertedListFormatVersion, scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams}, vector::{ ApproxMode, DEFAULT_QUERY_PARALLELISM, Query as VectorQuery, @@ -2298,6 +2299,23 @@ impl Dataset { if let Some(num_workers) = kwargs.get_item("num_workers")? { params = params.num_workers(num_workers.extract()?); } + if let Some(format_version) = kwargs.get_item("format_version")? + && !format_version.is_none() + { + let value = if let Ok(value) = format_version.cast::() { + value.to_string_lossy().to_string() + } else if let Ok(value) = format_version.extract::() { + value.to_string() + } else { + return Err(PyValueError::new_err( + "format_version must be 1, 2, 'v1', or 'v2'", + )); + }; + let format_version = value + .parse::() + .map_err(|err| PyValueError::new_err(err.to_string()))?; + params = params.format_version(format_version); + } } Box::new(params) } diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index d0bb0e40d3a..d4e3bfdd400 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -147,6 +147,7 @@ impl InvertedIndexPlugin { } }); + let format_version = params.resolved_format_version(); let details = pbold::InvertedIndexDetails::try_from(¶ms)?; let mut inverted_index = InvertedIndexBuilder::new_with_fragment_mask(params, fragment_mask) @@ -154,7 +155,7 @@ impl InvertedIndexPlugin { let files = inverted_index.update(data, index_store, None).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&details).unwrap(), - index_version: current_fts_format_version().index_version(), + index_version: format_version.index_version(), files, }) } diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 17cb18c5e96..332aae2e8da 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -246,6 +246,7 @@ impl InvertedIndexBuilder { fragment_mask: Option, deleted_fragments: RoaringBitmap, ) -> Self { + let format_version = params.resolved_format_version(); Self { params, partitions, @@ -253,8 +254,8 @@ impl InvertedIndexBuilder { src_store: store, token_set_format, fragment_mask, - format_version: current_fts_format_version(), - posting_tail_codec: current_fts_format_version().posting_tail_codec(), + format_version, + posting_tail_codec: format_version.posting_tail_codec(), progress: noop_progress(), deleted_fragments, } @@ -3217,7 +3218,8 @@ mod tests { token_set_format, None, RoaringBitmap::new(), - ); + ) + .with_format_version(InvertedListFormatVersion::V1); builder.write(dest_store.as_ref()).await?; let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?; @@ -3281,6 +3283,14 @@ mod tests { .await?; let index = InvertedIndex::load(src_store, None, &LanceCache::no_cache()).await?; + let derived_params = index.derive_index_params()?; + let derived_params: InvertedIndexParams = + serde_json::from_str(derived_params.params.as_deref().unwrap())?; + assert_eq!( + derived_params.format_version, + Some(InvertedListFormatVersion::V1) + ); + let schema = Arc::new(Schema::new(vec![ Field::new("doc", DataType::Utf8, true), Field::new(ROW_ID, DataType::UInt64, false), diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index c23dc1c4e78..4dcfee03587 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -133,15 +133,21 @@ pub static FTS_SCHEMA: LazyLock = static ROW_ID_SCHEMA: LazyLock = LazyLock::new(|| Arc::new(Schema::new(vec![ROW_ID_FIELD.clone()]))); -fn resolve_fts_format_version( +pub fn resolve_fts_format_version( value: Option<&str>, ) -> std::result::Result { - value.unwrap_or("1").parse() + match value { + Some(value) => value.parse(), + None => Ok(default_fts_format_version()), + } +} + +pub fn default_fts_format_version() -> InvertedListFormatVersion { + InvertedListFormatVersion::V2 } pub fn current_fts_format_version() -> InvertedListFormatVersion { - resolve_fts_format_version(std::env::var("LANCE_FTS_FORMAT_VERSION").ok().as_deref()) - .expect("failed to parse LANCE_FTS_FORMAT_VERSION") + default_fts_format_version() } pub fn max_supported_fts_format_version() -> InvertedListFormatVersion { @@ -150,8 +156,8 @@ pub fn max_supported_fts_format_version() -> InvertedListFormatVersion { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] pub enum InvertedListFormatVersion { - #[default] V1, + #[default] V2, } @@ -418,6 +424,7 @@ pub struct InvertedIndex { store: Arc, tokenizer: Box, token_set_format: TokenSetFormat, + format_version: InvertedListFormatVersion, pub(crate) partitions: Vec>, corpus_stats: Arc>, // Fragments which are contained in the index, but no longer in the dataset. @@ -430,6 +437,7 @@ impl Debug for InvertedIndex { f.debug_struct("InvertedIndex") .field("params", &self.params) .field("token_set_format", &self.token_set_format) + .field("format_version", &self.format_version) .field("partitions", &self.partitions) .field("deleted_fragments", &self.deleted_fragments) .finish() @@ -473,14 +481,7 @@ async fn resolve_deferred_candidates( impl InvertedIndex { fn format_version(&self) -> InvertedListFormatVersion { - self.partitions - .first() - .map(|partition| { - InvertedListFormatVersion::from_posting_tail_codec( - partition.inverted_list.posting_tail_codec(), - ) - }) - .unwrap_or_else(current_fts_format_version) + self.format_version } fn index_version(&self) -> u32 { @@ -995,6 +996,7 @@ impl InvertedIndex { store: store.clone(), tokenizer, token_set_format: TokenSetFormat::Arrow, + format_version: InvertedListFormatVersion::V1, partitions: vec![Arc::new(InvertedPartition { id: 0, store, @@ -1045,6 +1047,7 @@ impl InvertedIndex { .map(|name| TokenSetFormat::from_str(name)) .transpose()? .unwrap_or(TokenSetFormat::Arrow); + let format_version = parse_format_version_from_metadata(&reader.schema().metadata)?; // Load deleted_fragments if present (optional for backward compatibility) let deleted_fragments = if reader.num_rows() > 0 { @@ -1090,6 +1093,7 @@ impl InvertedIndex { store, tokenizer, token_set_format, + format_version, partitions, corpus_stats: Arc::new(OnceCell::new()), deleted_fragments, @@ -1324,8 +1328,9 @@ impl ScalarIndex for InvertedIndex { // Empty tokenizer metadata only appears in legacy simple-tokenizer indexes. params.base_tokenizer = "simple".to_string(); } + params = params.format_version(self.format_version()); - let params_json = serde_json::to_string(¶ms)?; + let params_json = params.to_training_json()?.to_string(); Ok(ScalarIndexParams { index_type: BuiltinIndexType::Inverted.as_str().to_string(), @@ -5964,10 +5969,10 @@ mod tests { } #[test] - fn test_resolve_fts_format_version_defaults_to_v1() { + fn test_resolve_fts_format_version_defaults_to_v2() { assert_eq!( resolve_fts_format_version(None).unwrap(), - InvertedListFormatVersion::V1 + InvertedListFormatVersion::V2 ); assert_eq!( resolve_fts_format_version(Some("2")).unwrap(), diff --git a/rust/lance-index/src/scalar/inverted/tokenizer.rs b/rust/lance-index/src/scalar/inverted/tokenizer.rs index 5a2a701dc73..c4abb1ce816 100644 --- a/rust/lance-index/src/scalar/inverted/tokenizer.rs +++ b/rust/lance-index/src/scalar/inverted/tokenizer.rs @@ -22,6 +22,9 @@ use crate::pbold; use crate::scalar::inverted::tokenizer::document_tokenizer::{ JsonTokenizer, LanceTokenizer, TextTokenizer, }; +use crate::scalar::inverted::{ + InvertedListFormatVersion, default_fts_format_version, resolve_fts_format_version, +}; pub use lance_tokenizer::Language; use lance_tokenizer::{ AsciiFoldingFilter, IcuTokenizer, LowerCaser, NgramTokenizer, RawTokenizer, RemoveLongFilter, @@ -119,6 +122,18 @@ pub struct InvertedIndexParams { /// The effective worker count is clamped to `[1, num_cpus - 2]`. #[serde(rename = "num_workers", skip_serializing, default)] pub(crate) num_workers: Option, + + /// On-disk FTS format version to write when creating a new index. + /// + /// This is a build-time only parameter and is not persisted with the index. + /// If unset, Lance writes the current default FTS format. + #[serde( + rename = "format_version", + skip_serializing, + default, + deserialize_with = "deserialize_format_version" + )] + pub(crate) format_version: Option, } impl TryFrom<&InvertedIndexParams> for pbold::InvertedIndexDetails { @@ -166,6 +181,7 @@ impl TryFrom<&pbold::InvertedIndexDetails> for InvertedIndexParams { prefix_only: details.prefix_only, memory_limit_mb: defaults.memory_limit_mb, num_workers: defaults.num_workers, + format_version: defaults.format_version, }) } } @@ -182,6 +198,37 @@ fn default_max_ngram_length() -> u32 { 3 } +fn deserialize_format_version<'de, D>( + deserializer: D, +) -> std::result::Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let value = Option::::deserialize(deserializer)?; + let Some(value) = value else { + return Ok(None); + }; + match value { + serde_json::Value::Null => Ok(None), + serde_json::Value::String(value) => resolve_fts_format_version(Some(&value)) + .map(Some) + .map_err(serde::de::Error::custom), + serde_json::Value::Number(value) => { + let Some(value) = value.as_u64() else { + return Err(serde::de::Error::custom( + "FTS format_version must be 1 or 2", + )); + }; + resolve_fts_format_version(Some(&value.to_string())) + .map(Some) + .map_err(serde::de::Error::custom) + } + other => Err(serde::de::Error::custom(format!( + "FTS format_version must be 1 or 2, got {other}" + ))), + } +} + impl Default for InvertedIndexParams { fn default() -> Self { Self::new("simple".to_owned(), Language::English) @@ -220,6 +267,7 @@ impl InvertedIndexParams { prefix_only: false, memory_limit_mb: None, num_workers: None, + format_version: None, } } @@ -322,6 +370,21 @@ impl InvertedIndexParams { self } + /// Set the on-disk FTS format version to use when creating a new index. + /// + /// If unset, Lance writes the current default FTS format. Existing indexes + /// keep their own on-disk format during update and optimize operations. + pub fn format_version(mut self, format_version: InvertedListFormatVersion) -> Self { + self.format_version = Some(format_version); + self + } + + /// Resolve the requested FTS format version, falling back to Lance's default. + pub fn resolved_format_version(&self) -> InvertedListFormatVersion { + self.format_version + .unwrap_or_else(default_fts_format_version) + } + /// Serialize params for the build/training path, including build-only fields. pub fn to_training_json(&self) -> serde_json::Result { let mut value = serde_json::to_value(self)?; @@ -340,6 +403,12 @@ impl InvertedIndexParams { serde_json::Value::from(num_workers), ); } + if let Some(format_version) = self.format_version { + object.insert( + "format_version".to_string(), + serde_json::Value::from(format_version.index_version()), + ); + } Ok(value) } @@ -443,17 +512,19 @@ pub fn language_model_home() -> Option { #[cfg(test)] mod tests { - use super::InvertedIndexParams; + use super::{InvertedIndexParams, InvertedListFormatVersion}; use lance_tokenizer::TokenStream; #[test] fn test_build_only_fields_are_not_serialized() { let params = InvertedIndexParams::default() .memory_limit_mb(4096) - .num_workers(7); + .num_workers(7) + .format_version(InvertedListFormatVersion::V1); let json = serde_json::to_value(¶ms).unwrap(); assert!(json.get("memory_limit").is_none()); assert!(json.get("num_workers").is_none()); + assert!(json.get("format_version").is_none()); } #[test] @@ -475,23 +546,38 @@ mod tests { let obj = json.as_object_mut().unwrap(); obj.insert("memory_limit".to_string(), serde_json::Value::from(4096)); obj.insert("num_workers".to_string(), serde_json::Value::from(3)); + obj.insert("format_version".to_string(), serde_json::Value::from("v1")); let params: InvertedIndexParams = serde_json::from_value(json).unwrap(); assert_eq!(params.memory_limit_mb, Some(4096)); assert_eq!(params.num_workers, Some(3)); + assert_eq!(params.format_version, Some(InvertedListFormatVersion::V1)); } #[test] fn test_training_json_serializes_build_only_fields() { let params = InvertedIndexParams::default() .memory_limit_mb(4096) - .num_workers(3); + .num_workers(3) + .format_version(InvertedListFormatVersion::V1); let json = params.to_training_json().unwrap(); assert_eq!( json.get("memory_limit"), Some(&serde_json::Value::from(4096)) ); assert_eq!(json.get("num_workers"), Some(&serde_json::Value::from(3))); + assert_eq!( + json.get("format_version"), + Some(&serde_json::Value::from(1)) + ); + } + + #[test] + fn test_default_format_version_resolves_to_v2() { + assert_eq!( + InvertedIndexParams::default().resolved_format_version(), + InvertedListFormatVersion::V2 + ); } #[test] diff --git a/rust/lance/src/dataset/mem_wal/index.rs b/rust/lance/src/dataset/mem_wal/index.rs index 208971f7be6..e5a38275292 100644 --- a/rust/lance/src/dataset/mem_wal/index.rs +++ b/rust/lance/src/dataset/mem_wal/index.rs @@ -32,6 +32,7 @@ use lance_core::datatypes::Schema as LanceSchema; use lance_core::{Error, Result}; use lance_index::pbold; use lance_index::scalar::InvertedIndexParams; +use lance_index::scalar::inverted::InvertedListFormatVersion; use lance_index::vector::hnsw::builder::HnswBuildParams; use lance_linalg::distance::DistanceType; use lance_table::format::IndexMetadata; @@ -146,6 +147,7 @@ impl MemIndexConfig { } else { InvertedIndexParams::default() }; + let params = params.format_version(Self::fts_format_version_from_metadata(index_meta)?); Ok(Self::Fts(FtsIndexConfig::with_params( index_meta.name.clone(), @@ -195,6 +197,21 @@ impl MemIndexConfig { } } + fn fts_format_version_from_metadata( + index_meta: &IndexMetadata, + ) -> Result { + match index_meta.index_version { + // Legacy Arrow FTS indexes did not use the v1/v2 metadata values, but + // the maintained-index path can only write the modern format. + 0 | 1 => Ok(InvertedListFormatVersion::V1), + 2 => Ok(InvertedListFormatVersion::V2), + version => Err(Error::invalid_input(format!( + "FTS index '{}' has unsupported index_version {}; expected 0, 1, or 2", + index_meta.name, version + ))), + } + } + /// Extract field ID and column name from index metadata. fn extract_field_info( index_meta: &IndexMetadata, @@ -869,6 +886,7 @@ mod tests { use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use log::warn; use std::sync::Arc; + use uuid::Uuid; /// Check if an index type is supported and log warning if not. fn check_index_type_supported(index_type: &str) -> bool { @@ -911,6 +929,29 @@ mod tests { .unwrap() } + fn fts_index_metadata(index_version: i32) -> IndexMetadata { + let details = + pbold::InvertedIndexDetails::try_from(&InvertedIndexParams::default()).unwrap(); + let mut value = Vec::new(); + details.encode(&mut value).unwrap(); + + IndexMetadata { + uuid: Uuid::new_v4(), + fields: vec![2], + name: "desc_idx".to_string(), + dataset_version: 1, + fragment_bitmap: None, + index_details: Some(Arc::new(prost_types::Any { + type_url: "type.googleapis.com/lance.index.InvertedIndexDetails".to_string(), + value, + })), + index_version, + created_at: None, + base_id: None, + files: None, + } + } + /// Single-column `id` batch for primary-key lookup tests. fn id_batch(ids: &[i32]) -> RecordBatch { RecordBatch::try_new( @@ -1011,6 +1052,44 @@ mod tests { assert!(!check_index_type_supported("unknown")); } + #[test] + fn fts_from_metadata_preserves_format_version() { + let arrow_schema = create_test_schema(); + let schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap(); + + for (index_version, expected_format_version) in [ + (0, InvertedListFormatVersion::V1), + (1, InvertedListFormatVersion::V1), + (2, InvertedListFormatVersion::V2), + ] { + let config = + MemIndexConfig::fts_from_metadata(&fts_index_metadata(index_version), &schema) + .unwrap(); + + match config { + MemIndexConfig::Fts(config) => { + assert_eq!( + config.params.resolved_format_version(), + expected_format_version + ); + } + _ => unreachable!("fts metadata should create an FTS config"), + } + } + } + + #[test] + fn fts_from_metadata_rejects_unsupported_format_version() { + let arrow_schema = create_test_schema(); + let schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap(); + + let err = MemIndexConfig::fts_from_metadata(&fts_index_metadata(3), &schema).unwrap_err(); + assert!( + err.to_string().contains("unsupported index_version 3"), + "{err}" + ); + } + #[test] fn test_from_configs() { let configs = vec![ diff --git a/rust/lance/src/dataset/mem_wal/index/fts.rs b/rust/lance/src/dataset/mem_wal/index/fts.rs index 8c6c9d85de7..f82b6283636 100644 --- a/rust/lance/src/dataset/mem_wal/index/fts.rs +++ b/rust/lance/src/dataset/mem_wal/index/fts.rs @@ -1699,6 +1699,8 @@ impl FtsMemIndex { let st = self.state.load_full(); let with_position = self.params.has_positions(); + let format_version = self.params.resolved_format_version(); + let posting_tail_codec = format_version.posting_tail_codec(); let total_rows_u64 = total_rows as u64; // Step 1: collect (original_pos, num_tokens) for every doc across all @@ -1716,10 +1718,11 @@ impl FtsMemIndex { } } if all_docs.is_empty() { - return Ok(InnerBuilder::new( + return Ok(InnerBuilder::new_with_format_version( partition_id, with_position, Default::default(), + format_version, )); } @@ -1805,7 +1808,10 @@ impl FtsMemIndex { docs_for_term.sort_by_key(|(doc_id, _, _)| *doc_id); let token_id = tokens.add(token) as usize; debug_assert_eq!(token_id, posting_lists.len()); - posting_lists.push(PostingListBuilder::new(with_position)); + posting_lists.push(PostingListBuilder::new_with_posting_tail_codec( + with_position, + posting_tail_codec, + )); let plb = &mut posting_lists[token_id]; for (doc_id, freq, pos) in docs_for_term { let recorder = if with_position { @@ -1817,7 +1823,12 @@ impl FtsMemIndex { } } - let mut builder = InnerBuilder::new(partition_id, with_position, Default::default()); + let mut builder = InnerBuilder::new_with_format_version( + partition_id, + with_position, + Default::default(), + format_version, + ); builder.set_tokens(tokens); builder.set_docs(docs); builder.set_posting_lists(posting_lists); diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index ebcc06cab44..0d79a607e8f 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -641,7 +641,6 @@ impl MemTableFlusher { total_rows: usize, ) -> Result> { use lance_index::pbold; - use lance_index::scalar::inverted::current_fts_format_version; use lance_index::scalar::lance_format::LanceIndexStore; let fts_configs: Vec<_> = index_configs @@ -704,6 +703,7 @@ impl MemTableFlusher { })?; let fragment_ids: roaring::RoaringBitmap = dataset.fragment_bitmap.as_ref().clone(); + let format_version = fts_cfg.params.resolved_format_version(); let index_meta = IndexMetadata { uuid: index_uuid, @@ -712,7 +712,7 @@ impl MemTableFlusher { dataset_version: dataset.version().version, fragment_bitmap: Some(fragment_ids), index_details: Some(Arc::new(index_details)), - index_version: current_fts_format_version().index_version() as i32, + index_version: format_version.index_version() as i32, created_at: None, base_id: None, files: None, @@ -739,22 +739,41 @@ impl MemTableFlusher { use arrow_schema::{DataType, Field, Schema}; use std::sync::Arc; - use lance_index::scalar::inverted::TokenSetFormat; + use lance_index::scalar::inverted::{ + POSITIONS_CODEC_KEY, POSITIONS_CODEC_PACKED_DELTA_V1, POSITIONS_LAYOUT_KEY, + POSITIONS_LAYOUT_SHARED_STREAM_V2, POSTING_TAIL_CODEC_KEY, TokenSetFormat, + }; // Create metadata with params and partitions in schema metadata (this is what InvertedIndex expects) let params_json = serde_json::to_string(&config.params)?; let partitions_json = serde_json::to_string(&[partition_id])?; let token_set_format = TokenSetFormat::default().to_string(); + let format_version = config.params.resolved_format_version(); + let mut metadata = [ + ("params".to_string(), params_json), + ("partitions".to_string(), partitions_json), + ("token_set_format".to_string(), token_set_format), + ( + POSTING_TAIL_CODEC_KEY.to_string(), + format_version.posting_tail_codec().as_str().to_string(), + ), + ] + .into_iter() + .collect::>(); + if config.params.has_positions() && format_version.uses_shared_position_stream() { + metadata.insert( + POSITIONS_LAYOUT_KEY.to_string(), + POSITIONS_LAYOUT_SHARED_STREAM_V2.to_string(), + ); + metadata.insert( + POSITIONS_CODEC_KEY.to_string(), + POSITIONS_CODEC_PACKED_DELTA_V1.to_string(), + ); + } let schema = Arc::new( - Schema::new(vec![Field::new("_placeholder", DataType::Utf8, true)]).with_metadata( - [ - ("params".to_string(), params_json), - ("partitions".to_string(), partitions_json), - ("token_set_format".to_string(), token_set_format), - ] - .into(), - ), + Schema::new(vec![Field::new("_placeholder", DataType::Utf8, true)]) + .with_metadata(metadata), ); // Create a minimal batch (schema metadata is what matters) diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 5faa65d8e7d..086133e77f5 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -4641,8 +4641,8 @@ mod shard_writer_tests { use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use lance_arrow::FixedSizeListArrayExt; use lance_index::IndexType; - use lance_index::scalar::ScalarIndexParams; - use lance_index::scalar::inverted::InvertedIndexParams; + use lance_index::scalar::inverted::{InvertedIndexParams, InvertedListFormatVersion}; + use lance_index::scalar::{FullTextSearchQuery, ScalarIndexParams}; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::pq::builder::PQBuildParams; use lance_linalg::distance::MetricType; @@ -4872,6 +4872,93 @@ mod shard_writer_tests { writer.close().await.unwrap(); } + #[tokio::test] + async fn test_mem_wal_maintained_fts_v1_flush_preserves_format() { + use tempfile::TempDir; + + let vector_dim = 32; + let schema = create_test_schema(vector_dim); + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let uri = format!("file://{}", temp_dir.path().display()); + + let initial = create_test_batch(&schema, 0, 16, vector_dim); + let batches = RecordBatchIterator::new([Ok(initial)], schema.clone()); + let mut dataset = Dataset::write(batches, &uri, Some(WriteParams::default())) + .await + .expect("Failed to create dataset"); + + let fts_params = + InvertedIndexParams::default().format_version(InvertedListFormatVersion::V1); + dataset + .create_index( + &["text"], + IndexType::Inverted, + Some("text_fts".to_string()), + &fts_params, + false, + ) + .await + .expect("Failed to create v1 FTS index"); + let base_indices = dataset.load_indices().await.unwrap(); + assert_eq!(base_indices.len(), 1); + assert_eq!(base_indices[0].index_version, 1); + + dataset + .initialize_mem_wal() + .maintained_indexes(["text_fts"]) + .execute() + .await + .expect("Failed to initialize MemWAL"); + + let shard_id = Uuid::new_v4(); + let config = ShardWriterConfig::new(shard_id) + .with_durable_write(true) + .with_sync_indexed_write(true); + let writer = dataset + .mem_wal_writer(shard_id, config) + .await + .expect("Failed to create MemWAL writer"); + writer + .put(vec![create_test_batch(&schema, 1_000, 3, vector_dim)]) + .await + .expect("Failed to write MemWAL batch"); + writer.close().await.expect("Failed to close writer"); + + let (store, base_path) = lance_io::object_store::ObjectStore::from_uri(&uri) + .await + .expect("Failed to open store"); + let manifest_store = + super::super::manifest::ShardManifestStore::new(store, &base_path, shard_id, 2); + let manifest = manifest_store + .read_latest() + .await + .expect("Failed to read manifest") + .expect("Manifest should exist"); + assert_eq!(manifest.flushed_generations.len(), 1); + + let flushed = &manifest.flushed_generations[0]; + let gen_uri = format!("{}/_mem_wal/{}/{}", uri, shard_id, flushed.path); + let flushed_dataset = Dataset::open(&gen_uri) + .await + .expect("Failed to open flushed generation"); + let flushed_indices = flushed_dataset.load_indices().await.unwrap(); + assert_eq!(flushed_indices.len(), 1); + assert_eq!(flushed_indices[0].name, "text_fts"); + assert_eq!( + flushed_indices[0].index_version, 1, + "maintained v1 FTS index must flush as v1" + ); + + let results = flushed_dataset + .scan() + .full_text_search(FullTextSearchQuery::new("Sample".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 3); + } + #[tokio::test] async fn test_writer_hnsw_params_override() { use lance_index::vector::hnsw::builder::HnswBuildParams; diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 267296c984b..45dc7743d1c 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -34,8 +34,10 @@ use lance_core::utils::tempfile::TempStrDir; use lance_datagen::{BatchCount, Dimension, RowCount, array, gen_batch}; use lance_file::reader::{FileReader, FileReaderOptions}; use lance_file::version::LanceFileVersion; +use lance_index::optimize::OptimizeOptions; use lance_index::scalar::FullTextSearchQuery; use lance_index::scalar::inverted::{ + InvertedListFormatVersion, query::{BooleanQuery, MatchQuery, Occur, Operator, PhraseQuery}, tokenizer::InvertedIndexParams, }; @@ -936,6 +938,68 @@ async fn test_fts_unindexed_data() { assert_eq!(results.num_rows(), 1); } +#[tokio::test] +async fn test_fts_v1_remains_queryable_after_append_optimize() { + let params = InvertedIndexParams::default().format_version(InvertedListFormatVersion::V1); + let text_col = StringArray::from(vec!["alpha original", "beta original"]); + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![Field::new( + "text", + text_col.data_type().to_owned(), + false, + )]) + .into(), + vec![Arc::new(text_col) as ArrayRef], + ) + .unwrap(); + let schema = batch.schema(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let mut dataset = Dataset::write(batches, "memory://test.lance", None) + .await + .unwrap(); + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + assert_eq!(dataset.load_indices().await.unwrap()[0].index_version, 1); + + let appended = StringArray::from(vec!["alpha appended"]); + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![Field::new( + "text", + appended.data_type().to_owned(), + false, + )]) + .into(), + vec![Arc::new(appended) as ArrayRef], + ) + .unwrap(); + let schema = batch.schema(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + dataset.append(batches, None).await.unwrap(); + dataset + .optimize_indices(&OptimizeOptions::append()) + .await + .unwrap(); + + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("alpha".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 2); + assert!( + dataset + .load_indices() + .await + .unwrap() + .iter() + .all(|index| index.index_version == 1) + ); +} + #[tokio::test] async fn test_fts_unindexed_data_with_stop_words() { // When indexed data has avg_doc_length < 1.0 (e.g. single-word stop words