diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index 26f23f29..49091ce0 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -30,6 +30,15 @@ logger = logging.getLogger(__name__) +# Passage ID schemes recorded in .meta.json["passage_id_scheme"]. +# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text). +# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across +# file moves and reorderings. +# Older indexes have no passage_id_scheme field — readers must default to +# "sequential" when the key is absent. See #329 for the rollout plan. +PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential" +PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash" + def get_registered_backends() -> list[str]: """Get list of registered backend names.""" @@ -361,8 +370,19 @@ def __init__( dimensions: Optional[int] = None, embedding_mode: str = "sentence-transformers", embedding_options: Optional[dict[str, Any]] = None, + passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL, **backend_kwargs, ): + if passage_id_scheme not in ( + PASSAGE_ID_SCHEME_SEQUENTIAL, + PASSAGE_ID_SCHEME_CONTENT_HASH, + ): + raise ValueError( + f"Unknown passage_id_scheme: {passage_id_scheme!r}. " + f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, " + f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}." + ) + self.passage_id_scheme = passage_id_scheme self.backend_name = backend_name # Normalize incompatible combinations early (for consistent metadata) if backend_name == "hnsw": @@ -457,10 +477,23 @@ def __init__( self.backend_kwargs = backend_kwargs self.chunks: list[dict[str, Any]] = [] + def _generate_passage_id(self, text: str) -> str: + """Generate a passage ID per the configured scheme. + + sequential: str(insertion index) — fast, position-dependent, current default. + content-hash: sha256(text)[:16] — content-stable, dedup-friendly across + file moves and reorderings. See #329 for the design. + """ + if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + import hashlib + + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + return str(len(self.chunks)) + def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None): if metadata is None: metadata = {} - passage_id = metadata.get("id", str(len(self.chunks))) + passage_id = metadata.get("id") or self._generate_passage_id(text) chunk_data = {"id": passage_id, "text": text, "metadata": metadata} self.chunks.append(chunk_data) @@ -550,12 +583,13 @@ def build_index(self, index_path: str): builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs) leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -675,12 +709,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda # Create metadata file leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -1138,6 +1173,12 @@ def __init__( ) self.bm25_scorer: Optional[BM25Scorer] = None + # Surface the index's passage ID scheme so callers can introspect. + # Older indexes (pre-#330) don't record this field — they're sequential. + self.passage_id_scheme: str = self.meta_data.get( + "passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL + ) + # Optional one-shot warmup at construction time to hide cold-start latency. if self._warmup: self.warmup() diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 15a8a187..7597b04e 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -341,6 +341,16 @@ def create_parser(self) -> argparse.ArgumentParser: default=True, help="Fall back to traditional chunking if AST chunking fails (default: True)", ) + build_parser.add_argument( + "--id-scheme", + choices=["sequential", "content-hash"], + default="sequential", + help=( + "How passage IDs are assigned. 'sequential' (default) keys by insertion " + "order; 'content-hash' uses sha256(text)[:16], stable across file moves " + "and reorderings. See #329." + ), + ) # Watch command watch_parser = subparsers.add_parser( @@ -365,6 +375,26 @@ def create_parser(self) -> argparse.ArgumentParser: help="Report changes without rebuilding (original watch behavior)", ) + migrate_parser = subparsers.add_parser( + "migrate-ids", + help=( + "Rewrite an existing index's passage IDs to content-hash form. " + "Irreversible; back up the index first." + ), + ) + migrate_parser.add_argument("index_name", help="Index name") + migrate_parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would change without writing anything.", + ) + migrate_parser.add_argument( + "-y", + "--yes", + action="store_true", + help="Skip the interactive confirmation prompt.", + ) + # Search command search_parser = subparsers.add_parser("search", help="Search documents") search_parser.add_argument("index_name", help="Index name") @@ -1874,7 +1904,32 @@ def _chunks_for_paths(self, all_texts: list[dict], paths: set[str]) -> list[dict in paths ] + def _existing_index_id_scheme(self, index_path: str) -> Optional[str]: + """Return the passage_id_scheme recorded in an existing index's meta.json. + + Returns None when the index doesn't exist yet or the field isn't + recorded (older indexes pre-#330). Callers should treat None as + "fall back to whatever the args say or the default". + """ + meta_path = Path(index_path).with_suffix(".leann.meta.json") + if not meta_path.exists(): + return None + try: + with open(meta_path, encoding="utf-8") as f: + return json.load(f).get("passage_id_scheme") + except Exception: + return None + def _make_incremental_builder(self, args) -> "LeannBuilder": + # For incremental updates, the existing index's scheme wins. Otherwise + # IDs would mix schemes within one index, which breaks lookups. + existing_scheme = self._existing_index_id_scheme(self.get_index_path(args.index_name)) + scheme = existing_scheme or getattr(args, "id_scheme", "sequential") + if existing_scheme and getattr(args, "id_scheme", existing_scheme) != existing_scheme: + print( + f"Note: --id-scheme={args.id_scheme} ignored — index '{args.index_name}' " + f"was built with passage_id_scheme={existing_scheme!r}, keeping that." + ) return LeannBuilder( backend_name=args.backend_name, embedding_model=args.embedding_model, @@ -1885,6 +1940,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder": is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=scheme, ) def _incremental_add_only( @@ -2378,6 +2434,7 @@ async def build_index(self, args): is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=getattr(args, "id_scheme", "sequential"), ) for chunk in all_texts: @@ -2498,6 +2555,127 @@ async def _watch_trigger_build(self, index_name: str) -> None: build_args = parser.parse_args(build_args_list) await self.build_index(build_args) + def migrate_ids(self, args) -> None: + """Rewrite an existing index's passage IDs to content-hash form. + + Migration is purely a Python-side rewrite — the vector graph isn't + touched, so FAISS labels stay valid. What gets rewritten: + - .passages.jsonl : new IDs in each line's "id" field + - .passages.idx : new offset map keyed by new IDs + - .ids.txt : new label → ID mapping for FAISS + - .meta.json : passage_id_scheme = "content-hash" + + Identical-text chunks collide on the same sha256 prefix; the later + occurrence wins in the offset map (dedup). A `--preserve-duplicates` + knob to suffix collisions can land separately. + + Irreversible. Prompts for confirmation unless --yes is passed. + """ + import hashlib + import shutil + + index_name = args.index_name + index_path = self._resolve_index_path(index_name, purpose="migrate") + if not index_path: + return + meta_path = Path(index_path).with_suffix(".leann.meta.json") + if not meta_path.exists(): + print(f"Cannot migrate '{index_name}': metadata missing at {meta_path}.") + return + with open(meta_path, encoding="utf-8") as f: + meta = json.load(f) + current_scheme = meta.get("passage_id_scheme", "sequential") + if current_scheme == "content-hash": + print(f"Index '{index_name}' already uses content-hash IDs. Nothing to do.") + return + + # Locate the sibling artifacts using the same conventions as build_index. + index_dir = Path(index_path).parent + index_base = Path(index_path).name + passages_file = index_dir / f"{index_base}.passages.jsonl" + offset_file = index_dir / f"{index_base}.passages.idx" + base_no_leann = ( + index_base[: -len(".leann")] if index_base.endswith(".leann") else index_base + ) + idmap_file = index_dir / f"{base_no_leann}.ids.txt" + + for p in (passages_file, offset_file): + if not p.exists(): + print(f"Cannot migrate: required artifact missing at {p}.") + return + + # Stream the passages to plan the rewrite and surface collision count + # before committing to anything irreversible. + old_ids: list[str] = [] + new_ids: list[str] = [] + with open(passages_file, encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + data = json.loads(line) + old_ids.append(data["id"]) + new_ids.append(hashlib.sha256(data["text"].encode("utf-8")).hexdigest()[:16]) + + unique_new = len(set(new_ids)) + collisions = len(new_ids) - unique_new + print( + f"Migrating '{index_name}': {len(new_ids)} passages → {unique_new} unique " + f"content-hash IDs ({collisions} collision(s) will dedup)." + ) + + if args.dry_run: + print("(dry-run; not writing anything)") + return + if not args.yes: + confirm = input( + "Proceed? This rewrites passages.jsonl, .idx, .ids.txt, .meta.json. [y/N] " + ) + if confirm.strip().lower() not in ("y", "yes"): + print("Aborted.") + return + + # Stage writes into siblings, then atomically rename. + new_passages = passages_file.with_suffix(passages_file.suffix + ".migrate") + new_offsets: dict[str, int] = {} + with ( + open(passages_file, encoding="utf-8") as src, + open(new_passages, "w", encoding="utf-8") as dst, + ): + idx = 0 + for line in src: + if not line.strip(): + continue + data = json.loads(line) + data["id"] = new_ids[idx] + offset = dst.tell() + json.dump(data, dst, ensure_ascii=False) + dst.write("\n") + new_offsets[new_ids[idx]] = offset + idx += 1 + + new_idx = offset_file.with_suffix(offset_file.suffix + ".migrate") + with open(new_idx, "wb") as f: + pickle.dump(new_offsets, f) + + if idmap_file.exists(): + new_idmap = idmap_file.with_suffix(idmap_file.suffix + ".migrate") + with open(new_idmap, "w", encoding="utf-8") as f: + for nid in new_ids: + f.write(nid + "\n") + shutil.move(str(new_idmap), str(idmap_file)) + + shutil.move(str(new_passages), str(passages_file)) + shutil.move(str(new_idx), str(offset_file)) + + meta["passage_id_scheme"] = "content-hash" + meta["version"] = "1.1" + with open(meta_path, "w", encoding="utf-8") as f: + json.dump(meta, f, indent=2) + + print( + f"✓ Migrated '{index_name}' to content-hash IDs. {collisions} collisions were deduped." + ) + async def watch_index(self, args): index_name = args.index_name resolved = self._resolve_index_for_watch(index_name) @@ -3218,6 +3396,8 @@ async def run(self, args=None): await self.build_index(args) elif args.command == "watch": await self.watch_index(args) + elif args.command == "migrate-ids": + self.migrate_ids(args) elif args.command == "search": with suppress_cpp_output(suppress): await self.search_documents(args)