From d1a1928c54a0b93df28951c8c9d1578123370b28 Mon Sep 17 00:00:00 2001 From: SerPeter <43622448+serpeter@users.noreply.github.com> Date: Tue, 3 Mar 2026 20:22:55 +0100 Subject: [PATCH 1/6] refactor(indexing): remove Tier 1 consumer, simplify to two-tier pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tier 1 was a pure pass-through converting FileChanged → ASTDirty with zero value-add. Remove it to reduce latency, eliminate an extra Valkey stream hop, and simplify the architecture. Before: Watcher → file-changed → Tier1 → ast-dirty → Tier2 → Tier3 After: Watcher → file-changed → Tier2 → embed-dirty → Tier3 --- .gitattributes | 0 src/code_atlas/events.py | 13 +-- src/code_atlas/indexing/__init__.py | 2 - src/code_atlas/indexing/consumers.py | 64 +--------- src/code_atlas/indexing/daemon.py | 13 +-- src/code_atlas/indexing/orchestrator.py | 117 +++++-------------- tests/integration/indexing/test_consumers.py | 42 ------- 7 files changed, 43 insertions(+), 208 deletions(-) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..e69de29 diff --git a/src/code_atlas/events.py b/src/code_atlas/events.py index d856afe..e3731bb 100644 --- a/src/code_atlas/events.py +++ b/src/code_atlas/events.py @@ -44,15 +44,6 @@ class EntityRef: file_path: str -@dataclass(frozen=True) -class ASTDirty: - """A single file needs AST re-parsing (published by Tier 1, consumed by Tier 2).""" - - path: str - project_name: str = "" # monorepo sub-project (forwarded from FileChanged) - project_root: str = "" # absolute path to project root (forwarded from FileChanged) - - @dataclass(frozen=True) class EmbedDirty: """A single entity needs re-embedding (published by Tier 2, consumed by Tier 3).""" @@ -62,7 +53,7 @@ class EmbedDirty: # Type alias for any pipeline event -Event = FileChanged | ASTDirty | EmbedDirty +Event = FileChanged | EmbedDirty class Significance(StrEnum): @@ -83,14 +74,12 @@ class Topic(StrEnum): """Redis Stream keys for the pipeline.""" FILE_CHANGED = "file-changed" - AST_DIRTY = "ast-dirty" EMBED_DIRTY = "embed-dirty" # Map topic → event class for deserialization _TOPIC_EVENT_MAP: dict[Topic, type[Event]] = { Topic.FILE_CHANGED: FileChanged, - Topic.AST_DIRTY: ASTDirty, Topic.EMBED_DIRTY: EmbedDirty, } diff --git a/src/code_atlas/indexing/__init__.py b/src/code_atlas/indexing/__init__.py index 7da88c7..1b7764a 100644 --- a/src/code_atlas/indexing/__init__.py +++ b/src/code_atlas/indexing/__init__.py @@ -4,7 +4,6 @@ from code_atlas.indexing.consumers import ( BatchPolicy, - Tier1GraphConsumer, Tier2ASTConsumer, Tier3EmbedConsumer, TierConsumer, @@ -34,7 +33,6 @@ "IndexResult", "StalenessChecker", "StalenessInfo", - "Tier1GraphConsumer", "Tier2ASTConsumer", "Tier3EmbedConsumer", "TierConsumer", diff --git a/src/code_atlas/indexing/consumers.py b/src/code_atlas/indexing/consumers.py index 8be2407..b0da9c5 100644 --- a/src/code_atlas/indexing/consumers.py +++ b/src/code_atlas/indexing/consumers.py @@ -1,8 +1,6 @@ -"""Tiered consumer pipeline for event-driven indexing. +"""Two-tier consumer pipeline for event-driven indexing. -Three tiers form a linear pipeline, each pulling at its own pace: - - FileChanged → Tier 1 (graph metadata) → ASTDirty → Tier 2 (AST parse) + FileChanged → Tier 2 (hash gate + AST parse + diff) → significance gate → EmbedDirty → Tier 3 (embeddings) Each tier uses batch-pull with configurable time/count policy and @@ -21,7 +19,6 @@ from loguru import logger from code_atlas.events import ( - ASTDirty, EmbedDirty, EntityRef, Event, @@ -105,7 +102,7 @@ def _matches_project(self, event: Event) -> bool: if self._project_filter is None: return True pn = "" - if isinstance(event, (FileChanged, ASTDirty)): + if isinstance(event, FileChanged): pn = event.project_name elif isinstance(event, EmbedDirty): # EmbedDirty doesn't carry project_name directly — always accept @@ -260,55 +257,6 @@ async def run(self) -> None: # noqa: PLR0912, PLR0915 logger.debug("{} stopped", self.consumer_name) -# --------------------------------------------------------------------------- -# Tier 1: Graph metadata (cheap, fast) -# --------------------------------------------------------------------------- - - -class Tier1GraphConsumer(TierConsumer): - """Tier 1: Update file-level graph metadata, always publish ASTDirty downstream.""" - - def __init__( - self, bus: EventBus, graph: GraphClient, settings: AtlasSettings, *, project_filter: set[str] | None = None - ) -> None: - super().__init__( - bus=bus, - input_topic=Topic.FILE_CHANGED, - group="tier1-graph", - consumer_name="tier1-graph-0", - policy=BatchPolicy(time_window_s=0.5, max_batch_size=50), - project_filter=project_filter, - ) - self.graph = graph - self.settings = settings - - def dedup_key(self, event: Event) -> str: - if isinstance(event, FileChanged): - return event.path - return super().dedup_key(event) - - async def process_batch(self, events: list[Event], batch_id: str) -> None: - with _tracer.start_as_current_span("tier1.process_batch", attributes={"batch_id": batch_id}): - # Group files by (project_name, project_root) — monorepo batches can mix sub-projects - groups: dict[tuple[str, str], list[str]] = {} - for e in events: - if isinstance(e, FileChanged): - key = (e.project_name, e.project_root) - groups.setdefault(key, []).append(e.path) - - total = sum(len(p) for p in groups.values()) - logger.debug("Tier1 batch {}: {} file(s) in {} group(s)", batch_id, total, len(groups)) - - # TODO: Update Memgraph file nodes (timestamps, staleness flags) - - # Publish one ASTDirty per file — Tier 2 decides significance - for (project_name, project_root), paths in groups.items(): - await self.bus.publish_many( - Topic.AST_DIRTY, - [ASTDirty(path=p, project_name=project_name, project_root=project_root) for p in paths], - ) - - # --------------------------------------------------------------------------- # Tier 2: AST parse + graph write (medium cost) # --------------------------------------------------------------------------- @@ -380,7 +328,7 @@ def __init__( ) -> None: super().__init__( bus=bus, - input_topic=Topic.AST_DIRTY, + input_topic=Topic.FILE_CHANGED, group="tier2-ast", consumer_name="tier2-ast-0", policy=policy or BatchPolicy(time_window_s=3.0, max_batch_size=30), @@ -406,7 +354,7 @@ def __init__( self._pending_project_names: set[str] = set() def dedup_key(self, event: Event) -> str: - if isinstance(event, ASTDirty): + if isinstance(event, FileChanged): return event.path return super().dedup_key(event) @@ -493,7 +441,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no # Group paths by (project_name, project_root) — monorepo batches can mix sub-projects groups: dict[tuple[str, str], list[str]] = {} for e in events: - if isinstance(e, ASTDirty): + if isinstance(e, FileChanged): key = (e.project_name, e.project_root) groups.setdefault(key, []).append(e.path) diff --git a/src/code_atlas/indexing/daemon.py b/src/code_atlas/indexing/daemon.py index 46cc28a..800d623 100644 --- a/src/code_atlas/indexing/daemon.py +++ b/src/code_atlas/indexing/daemon.py @@ -1,7 +1,7 @@ """Daemon manager — reusable watcher + pipeline lifecycle. Encapsulates the EventBus, FileWatcher, EmbedClient, EmbedCache, -and Tier 1/2/3 consumers. Used by both the CLI (``atlas watch``, +and Tier 2/3 consumers. Used by both the CLI (``atlas watch``, ``atlas daemon start``) and the MCP server for auto-indexing. """ @@ -14,7 +14,7 @@ from loguru import logger from code_atlas.events import EventBus -from code_atlas.indexing.consumers import Tier1GraphConsumer, Tier2ASTConsumer, Tier3EmbedConsumer +from code_atlas.indexing.consumers import Tier2ASTConsumer, Tier3EmbedConsumer from code_atlas.indexing.orchestrator import FileScope, detect_sub_projects from code_atlas.indexing.watcher import FileWatcher from code_atlas.search.embeddings import EmbedCache, EmbedClient @@ -31,9 +31,7 @@ class DaemonManager: _bus: EventBus | None = field(default=None, repr=False) _watcher: FileWatcher | None = field(default=None, repr=False) - _consumers: list[Tier1GraphConsumer | Tier2ASTConsumer | Tier3EmbedConsumer] = field( - default_factory=list, repr=False - ) + _consumers: list[Tier2ASTConsumer | Tier3EmbedConsumer] = field(default_factory=list, repr=False) _tasks: list[asyncio.Task[None]] = field(default_factory=list, repr=False) _cache: EmbedCache | None = field(default=None, repr=False) _embed: EmbedClient | None = field(default=None, repr=False) @@ -79,8 +77,7 @@ async def start( cache = EmbedCache(settings.redis, settings.embeddings) self._cache = cache - consumers: list[Tier1GraphConsumer | Tier2ASTConsumer | Tier3EmbedConsumer] = [ - Tier1GraphConsumer(bus, graph, settings), + consumers: list[Tier2ASTConsumer | Tier3EmbedConsumer] = [ Tier2ASTConsumer(bus, graph, settings), ] if embed is not None: @@ -146,7 +143,7 @@ async def _run_watcher(self) -> None: logger.exception("File watcher crashed") @staticmethod - async def _run_consumer(consumer: Tier1GraphConsumer | Tier2ASTConsumer | Tier3EmbedConsumer) -> None: + async def _run_consumer(consumer: Tier2ASTConsumer | Tier3EmbedConsumer) -> None: """Run a consumer, catching exceptions so one failure doesn't crash the rest.""" try: await consumer.run() diff --git a/src/code_atlas/indexing/orchestrator.py b/src/code_atlas/indexing/orchestrator.py index 6f1ba80..5577efe 100644 --- a/src/code_atlas/indexing/orchestrator.py +++ b/src/code_atlas/indexing/orchestrator.py @@ -17,8 +17,8 @@ import pathspec from loguru import logger -from code_atlas.events import ASTDirty, Event, EventBus, FileChanged, Topic -from code_atlas.indexing.consumers import BatchPolicy, Tier1GraphConsumer, Tier2ASTConsumer, Tier3EmbedConsumer +from code_atlas.events import Event, EventBus, FileChanged, Topic +from code_atlas.indexing.consumers import BatchPolicy, Tier2ASTConsumer, Tier3EmbedConsumer from code_atlas.parsing.ast import get_language_for_file from code_atlas.search.embeddings import EmbedCache, EmbedClient from code_atlas.settings import derive_project_name, resolve_git_dir @@ -848,29 +848,21 @@ async def _run_pipeline( project_root: Path | None = None, project_filter: set[str] | None = None, on_drain_progress: Callable[[int, int, int], None] | None = None, - skip_tier1: bool = False, + reindex_mode: bool = False, ) -> Tier2ASTConsumer: """Start inline tier consumers and wait for the pipeline to drain. Returns the Tier2 consumer so callers can read accumulated stats. - When *skip_tier1* is True, Tier 1 is not created and reindex-tuned - policies are used for faster polling. + When *reindex_mode* is True, reindex-tuned policies are used for + faster polling. """ - tier1: Tier1GraphConsumer | None = None - task1: asyncio.Task[None] | None = None - - if not skip_tier1: - await bus.ensure_group(Topic.FILE_CHANGED, "tier1-graph") - tier1 = Tier1GraphConsumer(bus, graph, settings, project_filter=project_filter) - task1 = asyncio.create_task(tier1.run()) - - await bus.ensure_group(Topic.AST_DIRTY, "tier2-ast") + await bus.ensure_group(Topic.FILE_CHANGED, "tier2-ast") # Reindex-tuned policies: flush immediately, short blocking reads - t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if skip_tier1 else None + t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None t3_policy = ( BatchPolicy(time_window_s=1.0, max_batch_size=embed.batch_size, block_ms=50) - if skip_tier1 and embed is not None + if reindex_mode and embed is not None else None ) @@ -899,22 +891,17 @@ async def _run_pipeline( drain_timeout_s, embed_enabled=embed is not None, on_drain_progress=on_drain_progress, - skip_tier1=skip_tier1, - settle_s=2.0 if skip_tier1 else 8.0, + settle_s=2.0, ) finally: - if tier1 is not None: - tier1.stop() tier2.stop() if tier3 is not None: tier3.stop() await asyncio.sleep(0.5) - if task1 is not None: - task1.cancel() task2.cancel() if tier3_task is not None: tier3_task.cancel() - for t in [task1, task2, tier3_task]: + for t in [task2, tier3_task]: if t is not None: with contextlib.suppress(asyncio.CancelledError): await t @@ -1095,25 +1082,10 @@ async def _publish_events( *, project_name: str = "", project_root: str = "", - skip_tier1: bool = False, ) -> int: - """Publish FileChanged (or ASTDirty when skip_tier1) events and return the count published.""" - if skip_tier1: - # Publish ASTDirty directly, bypassing Tier 1 - if mode == "delta": - events: list[Event] = [ - ASTDirty(path=fp, project_name=project_name, project_root=project_root) - for fp in decision.files_added | decision.files_modified | decision.files_deleted - ] - else: - events = [ASTDirty(path=fp, project_name=project_name, project_root=project_root) for fp in files] - if events: - await bus.publish_many(Topic.AST_DIRTY, events) - logger.debug("Published {} ASTDirty events (skip_tier1, {})", len(events), mode) - return len(events) - + """Publish FileChanged events and return the count published.""" if mode == "delta": - events = [] + events: list[Event] = [] events.extend( FileChanged(path=fp, change_type="created", project_name=project_name, project_root=project_root) for fp in decision.files_added @@ -1251,13 +1223,11 @@ async def _index_project_inner( # noqa: PLR0915 # 5. Sort files for optimal resolution order (deep modules before __init__.py) files = _sort_files_for_indexing(files) - # 6. Publish events (skip Tier 1 for full reindex — ASTDirty directly) - skip_tier1 = full_reindex or decision.mode == "full" - published = await _publish_events( - bus, decision.mode, files, decision, project_name=project_name, skip_tier1=skip_tier1 - ) + # 6. Publish events + published = await _publish_events(bus, decision.mode, files, decision, project_name=project_name) - # 6. Start inline consumers and wait for drain + # 7. Start inline consumers and wait for drain + reindex_mode = full_reindex or decision.mode == "full" t2stats = None if published > 0: tier2 = await _run_pipeline( @@ -1270,7 +1240,7 @@ async def _index_project_inner( # noqa: PLR0915 project_root=project_root, project_filter={project_name}, on_drain_progress=on_drain_progress, - skip_tier1=skip_tier1, + reindex_mode=reindex_mode, ) t2stats = tier2.stats @@ -1377,7 +1347,6 @@ async def _publish_project( files: list[str], *, full_reindex: bool = False, - skip_tier1: bool = False, ) -> _ProjectPublishResult: """Scan, decide delta/full, create packages, and publish events for one project. @@ -1400,10 +1369,7 @@ async def _publish_project( # Sort files for optimal resolution order files = _sort_files_for_indexing(files) - # Determine skip_tier1 from caller hint or decision mode - effective_skip = skip_tier1 or decision.mode == "full" - - # Publish events (ASTDirty directly when skipping Tier 1) + # Publish events published = await _publish_events( bus, decision.mode, @@ -1411,7 +1377,6 @@ async def _publish_project( decision, project_name=project_name, project_root=str(project_root), - skip_tier1=effective_skip, ) return _ProjectPublishResult( @@ -1493,27 +1458,20 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 await cache.clear() # --- Start shared consumers (once for entire monorepo) --- - skip_tier1 = full_reindex # full reindex publishes ASTDirty directly - - tier1: Tier1GraphConsumer | None = None - if not skip_tier1: - await bus.ensure_group(Topic.FILE_CHANGED, "tier1-graph") - tier1 = Tier1GraphConsumer(bus, graph, settings) + reindex_mode = full_reindex - await bus.ensure_group(Topic.AST_DIRTY, "tier2-ast") + await bus.ensure_group(Topic.FILE_CHANGED, "tier2-ast") - t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if skip_tier1 else None + t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None t3_policy = ( BatchPolicy(time_window_s=1.0, max_batch_size=embed.batch_size, block_ms=50) - if skip_tier1 and embed is not None + if reindex_mode and embed is not None else None ) tier2 = Tier2ASTConsumer(bus, graph, settings, project_root=project_root, policy=t2_policy) consumer_tasks: list[asyncio.Task[None]] = [] - if tier1 is not None: - consumer_tasks.append(asyncio.create_task(tier1.run())) consumer_tasks.append(asyncio.create_task(tier2.run())) tier3: Tier3EmbedConsumer | None = None @@ -1542,7 +1500,6 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 sub.root, sub_files, full_reindex=full_reindex, - skip_tier1=skip_tier1, ) publish_results.append(pr) @@ -1566,7 +1523,6 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 project_root, root_only_files, full_reindex=full_reindex, - skip_tier1=skip_tier1, ) publish_results.append(root_pr) @@ -1579,14 +1535,11 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 drain_timeout_s, embed_enabled=embed is not None, on_drain_progress=on_drain_progress, - skip_tier1=skip_tier1, - settle_s=2.0 if skip_tier1 else 8.0, + settle_s=2.0, ) finally: # --- Tear down consumers (once) --- - if tier1 is not None: - tier1.stop() tier2.stop() if tier3 is not None: tier3.stop() @@ -1672,27 +1625,21 @@ async def _wait_for_drain( *, embed_enabled: bool = True, on_drain_progress: Callable[[int, int, int], None] | None = None, - skip_tier1: bool = False, settle_s: float = 2.0, ) -> None: - """Poll stream groups until Tier 1, Tier 2, and (optionally) Tier 3 are drained. - - When *skip_tier1* is True, Tier 1 polling is skipped (events go directly - to Tier 2). + """Poll stream groups until Tier 2 and (optionally) Tier 3 are drained. If *on_drain_progress* is provided, it is called each poll cycle with ``(t1_remaining, t2_remaining, t3_remaining)`` so callers can display - pipeline progress to the user. + pipeline progress to the user. ``t1_remaining`` is always 0 (kept for + callback signature compatibility). """ deadline = time.monotonic() + timeout_s settled_since: float | None = None poll_interval = 0.5 while time.monotonic() < deadline: - queries: list[tuple[Topic, str]] = [] - if not skip_tier1: - queries.append((Topic.FILE_CHANGED, "tier1-graph")) - queries.append((Topic.AST_DIRTY, "tier2-ast")) + queries: list[tuple[Topic, str]] = [(Topic.FILE_CHANGED, "tier2-ast")] if embed_enabled: queries.append((Topic.EMBED_DIRTY, "tier3-embed")) @@ -1700,14 +1647,13 @@ async def _wait_for_drain( # Build a topic→remaining map so we don't need fragile index tracking remaining = {topic: info["pending"] + info["lag"] for (topic, _), info in zip(queries, infos, strict=True)} - t1_remaining = remaining.get(Topic.FILE_CHANGED, 0) - t2_remaining = remaining.get(Topic.AST_DIRTY, 0) + t2_remaining = remaining.get(Topic.FILE_CHANGED, 0) t3_remaining = remaining.get(Topic.EMBED_DIRTY, 0) if on_drain_progress is not None: - on_drain_progress(t1_remaining, t2_remaining, t3_remaining) + on_drain_progress(0, t2_remaining, t3_remaining) - if t1_remaining == 0 and t2_remaining == 0 and t3_remaining == 0: + if t2_remaining == 0 and t3_remaining == 0: if settled_since is None: settled_since = time.monotonic() elif time.monotonic() - settled_since >= settle_s: @@ -1722,9 +1668,8 @@ async def _wait_for_drain( await asyncio.sleep(poll_interval) logger.warning( - "Pipeline drain timed out after {:.0f}s — t1={} t2={} t3={} (raw={})", + "Pipeline drain timed out after {:.0f}s — t2={} t3={} (raw={})", timeout_s, - t1_remaining, t2_remaining, t3_remaining, infos, diff --git a/tests/integration/indexing/test_consumers.py b/tests/integration/indexing/test_consumers.py index 9302768..3de8c27 100644 --- a/tests/integration/indexing/test_consumers.py +++ b/tests/integration/indexing/test_consumers.py @@ -5,20 +5,14 @@ from __future__ import annotations -import asyncio - import pytest from code_atlas.events import ( - ASTDirty, EventBus, FileChanged, Topic, decode_event, ) -from code_atlas.indexing.consumers import Tier1GraphConsumer -from code_atlas.settings import AtlasSettings -from tests.conftest import NO_EMBED # All tests in this module require a live Redis/Valkey pytestmark = pytest.mark.integration @@ -102,39 +96,3 @@ async def test_dedup_within_batch(event_bus: EventBus) -> None: assert len(pending) == 1 assert pending["src/main.py"].timestamp == 1004.0 - - -@pytest.mark.usefixtures("_clean_streams") -async def test_tier1_publishes_downstream(event_bus: EventBus) -> None: - """Run Tier1 briefly, verify ASTDirty appears on the ast-dirty stream.""" - # Set up consumer group for downstream - await event_bus.ensure_group(Topic.AST_DIRTY, "test-downstream") - - # Publish a FileChanged event - await event_bus.publish( - Topic.FILE_CHANGED, - FileChanged(path="src/app.py", change_type="modified", timestamp=2000.0), - ) - - # Run Tier1 for a short period then stop - # Tier1 needs graph + settings but we're only testing event flow here; - # it doesn't call graph in its current implementation. - from unittest.mock import AsyncMock - - mock_graph = AsyncMock() - test_settings = AtlasSettings(embeddings=NO_EMBED) - tier1 = Tier1GraphConsumer(event_bus, mock_graph, test_settings) - - async def stop_after_delay() -> None: - await asyncio.sleep(1.5) - tier1.stop() - - await asyncio.gather(tier1.run(), stop_after_delay()) - - # Read from the downstream ast-dirty stream - messages = await event_bus.read_batch(Topic.AST_DIRTY, "test-downstream", "test-ds-0", count=10, block_ms=500) - assert len(messages) >= 1 - - event = decode_event(Topic.AST_DIRTY, messages[0][1]) - assert isinstance(event, ASTDirty) - assert event.path == "src/app.py" From 39b3b00c94e81a09f2fe521222018f0a0fe9fd46 Mon Sep 17 00:00:00 2001 From: SerPeter <43622448+serpeter@users.noreply.github.com> Date: Tue, 3 Mar 2026 20:40:58 +0100 Subject: [PATCH 2/6] feat(indexing): add file hash gate to skip unchanged files Compute SHA-256 of file contents before parsing and compare against stored hashes in Memgraph. Files with matching hashes are skipped entirely, avoiding unnecessary AST parsing and graph writes. - strip_whitespace mode normalizes formatting before hashing so formatter-only changes (e.g. ruff format) are ignored - Hash gate is bypassed for deleted files and full reindexes (where stored hashes are empty) - Pre-read file bytes are passed to the parser to avoid double I/O --- .gitattributes | 84 +++++++++++++++++++++ src/code_atlas/graph/client.py | 37 +++++++++ src/code_atlas/indexing/consumers.py | 109 +++++++++++++++++++++++---- src/code_atlas/settings.py | 4 + 4 files changed, 220 insertions(+), 14 deletions(-) diff --git a/.gitattributes b/.gitattributes index e69de29..066588d 100644 --- a/.gitattributes +++ b/.gitattributes @@ -0,0 +1,84 @@ +# Default +# ================== +* text=auto eol=lf + +# Python Source files +# ================= +*.pxd text diff=python +*.py text diff=python +*.py3 text diff=python +*.pyw text diff=python +*.pyx text diff=python +*.pyz text diff=python +*.pyi text diff=python + +# Python Binary files +# ================= +*.db binary +*.p binary +*.pkl binary +*.pickle binary +*.pyc binary export-ignore +*.pyo binary export-ignore +*.pyd binary + +# Jupyter notebook +# ================= +*.ipynb text + +# ML models +# ================= +*.h5 filter=lfs diff=lfs merge=lfs -text +*.onnx filter=lfs diff=lfs merge=lfs -text +*.model filter=lfs diff=lfs merge=lfs -text +*.msgpack filter=lfs diff=lfs merge=lfs -text +*.pb filter=lfs diff=lfs merge=lfs -text +*.pt filter=lfs diff=lfs merge=lfs -text +*.pth filter=lfs diff=lfs merge=lfs -text +pytorch_model.bin filter=lfs diff=lfs merge=lfs -text +tokenizer.json filter=lfs diff=lfs merge=lfs -text +unigram.json filter=lfs diff=lfs merge=lfs -text + +# Data files +# ================= +*.csv filter=lfs diff=lfs merge=lfs -text +*.tsv filter=lfs diff=lfs merge=lfs -text +*.parquet filter=lfs diff=lfs merge=lfs + +# Presentation files +# ================= +*.pptx filter=lfs diff=lfs merge=lfs -text +*.word filter=lfs diff=lfs merge=lfs -text +*.xlsx filter=lfs diff=lfs merge=lfs -text +*.xls filter=lfs diff=lfs merge=lfs -text +*.pdf filter=lfs diff=lfs merge=lfs -text + +# Archives +# ================= +*.7z filter=lfs diff=lfs merge=lfs -text +*.br filter=lfs diff=lfs merge=lfs -text +*.gz filter=lfs diff=lfs merge=lfs -text +*.tar filter=lfs diff=lfs merge=lfs -text +*.tgz filter=lfs diff=lfs merge=lfs -text +*.tar.gz filter=lfs diff=lfs merge=lfs -text +*.zip filter=lfs diff=lfs merge=lfs -text +*.rar filter=lfs diff=lfs merge=lfs -text + +# Image files +# ================= +*.jpg filter=lfs diff=lfs merge=lfs -text +*.jpeg filter=lfs diff=lfs merge=lfs -text +*.png filter=lfs diff=lfs merge=lfs -text +*.gif filter=lfs diff=lfs merge=lfs -text +*.webp filter=lfs diff=lfs merge=lfs -text +*.bmp filter=lfs diff=lfs merge=lfs -text +*.svg filter=lfs diff=lfs merge=lfs -text +*.tiff filter=lfs diff=lfs merge=lfs -text + +# Other +# ================= +# Windows - keep CRLF +*.exe filter=lfs diff=lfs merge=lfs -text +*.bat text eol=crlf +*.cmd text eol=crlf +*.ps1 text eol=crlf diff --git a/src/code_atlas/graph/client.py b/src/code_atlas/graph/client.py index 13aa59b..a2e84be 100644 --- a/src/code_atlas/graph/client.py +++ b/src/code_atlas/graph/client.py @@ -655,6 +655,43 @@ async def merge_project_node(self, project_name: str, **metadata: Any) -> None: props, ) + async def get_batch_file_hashes( + self, + project_name: str, + file_paths: list[str], + ) -> dict[str, str | None]: + """Return ``{file_path: file_hash}`` for Module/Package nodes in one RTT. + + Returns ``None`` for files that have no stored hash. + """ + if not file_paths: + return {} + records = await self.execute( + f"UNWIND $fps AS fp " + f"MATCH (n {{project_name: $p, file_path: fp}}) " + f"WHERE n:{NodeLabel.MODULE} OR n:{NodeLabel.PACKAGE} " + "RETURN DISTINCT n.file_path AS fp, n.file_hash AS fh", + {"p": project_name, "fps": file_paths}, + ) + return {r["fp"]: r["fh"] for r in records} + + async def set_batch_file_hashes( + self, + project_name: str, + file_hashes: dict[str, str], + ) -> None: + """Write ``file_hash`` on Module/Package nodes for each file path.""" + if not file_hashes: + return + params = [{"fp": fp, "fh": fh} for fp, fh in file_hashes.items()] + await self.execute_write( + f"UNWIND $items AS item " + f"MATCH (n {{project_name: $p, file_path: item.fp}}) " + f"WHERE n:{NodeLabel.MODULE} OR n:{NodeLabel.PACKAGE} " + "SET n.file_hash = item.fh", + {"p": project_name, "items": params}, + ) + async def merge_package_node(self, project_name: str, qualified_name: str, name: str, file_path: str) -> None: """Create or update a Package node by uid.""" uid = f"{project_name}:{qualified_name}" diff --git a/src/code_atlas/indexing/consumers.py b/src/code_atlas/indexing/consumers.py index b0da9c5..2afe685 100644 --- a/src/code_atlas/indexing/consumers.py +++ b/src/code_atlas/indexing/consumers.py @@ -10,6 +10,8 @@ from __future__ import annotations import asyncio +import hashlib +import re import uuid from abc import ABC, abstractmethod from dataclasses import dataclass @@ -42,6 +44,24 @@ _tracer = get_tracer(__name__) +_COLLAPSE_BLANK_RE = re.compile(rb"\n{3,}") + + +def _compute_file_hash(source: bytes, *, strip_whitespace: bool = True) -> str: + """Compute a short SHA-256 hash of file contents. + + When *strip_whitespace* is True: strip leading/trailing whitespace per + line, collapse consecutive blank lines, then hash. This makes the gate + ignore formatting-only changes (e.g. ``ruff format``). + """ + if strip_whitespace: + lines = [line.strip() for line in source.splitlines()] + normalized = b"\n".join(lines) + normalized = _COLLAPSE_BLANK_RE.sub(b"\n\n", normalized) + return hashlib.sha256(normalized).hexdigest()[:16] + return hashlib.sha256(source).hexdigest()[:16] + + # --------------------------------------------------------------------------- # Batch policy # --------------------------------------------------------------------------- @@ -400,22 +420,26 @@ async def _parse_file( file_path: str, *, project_root: Path | None = None, + source: bytes | None = None, ) -> _ParsedFileData | None: """Parse and detect a single file without graph writes. Returns ``None`` for unreadable/unsupported files, ``_SENTINEL_DELETED`` for deleted files, or a ``_ParsedFileData`` with parsed results. + + If *source* is provided, it is used directly (avoids re-reading from disk + when the hash gate has already read the file). """ root = project_root if project_root is not None else self._project_root - full_path = root / file_path - if not full_path.is_file(): - return _SENTINEL_DELETED - - try: - source = full_path.read_bytes() - except OSError: - logger.warning("Tier2: cannot read {}", file_path) - return None + if source is None: + full_path = root / file_path + if not full_path.is_file(): + return _SENTINEL_DELETED + try: + source = full_path.read_bytes() + except OSError: + logger.warning("Tier2: cannot read {}", file_path) + return None parsed = parse_file(file_path, source, project_name, max_source_chars=self.settings.index.max_source_chars) if parsed is None: @@ -459,15 +483,66 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no for (event_project_name, event_project_root), unique_paths in groups.items(): project_name = event_project_name or derive_project_name(Path(self.settings.project_root)) effective_root = Path(event_project_root) if event_project_root else None + root = effective_root if effective_root is not None else self._project_root + + # 0. File hash gate — read files, compute hashes, skip unchanged + use_hash_gate = self.settings.index.file_hash_gate + strip_ws = self.settings.index.strip_whitespace + file_sources: dict[str, bytes] = {} # file_path → source bytes (pre-read) + deleted_files: list[str] = [] + + # Separate deleted files (always process) and read live files + live_paths: list[str] = [] + for fp in unique_paths: + full_path = root / fp + if not full_path.is_file(): + deleted_files.append(fp) + else: + try: + file_sources[fp] = full_path.read_bytes() + live_paths.append(fp) + except OSError: + logger.warning("Tier2: cannot read {}", fp) + + # Apply hash gate to live files + if use_hash_gate and live_paths: + new_hashes = { + fp: _compute_file_hash(file_sources[fp], strip_whitespace=strip_ws) for fp in live_paths + } + stored_hashes = await self.graph.get_batch_file_hashes(project_name, live_paths) + + gate_passed: list[str] = [] + for fp in live_paths: + stored = stored_hashes.get(fp) + if stored is not None and stored == new_hashes[fp]: + self.stats.files_skipped += 1 + else: + gate_passed.append(fp) + + hash_skipped = len(live_paths) - len(gate_passed) + if hash_skipped: + logger.debug( + "Tier2 batch {}: hash gate skipped {}/{} file(s)", + batch_id, + hash_skipped, + len(live_paths), + ) + live_paths = gate_passed + else: + new_hashes = {} # 1. Parse loop (async, per-file) — no graph writes parsed_files: dict[str, _ParsedFileData] = {} - deleted_files: list[str] = [] - for file_idx, file_path in enumerate(unique_paths, 1): + for file_idx, file_path in enumerate(live_paths, 1): if file_idx % 50 == 0: - logger.debug("Tier2 batch {}: parsed {}/{} files", batch_id, file_idx, len(unique_paths)) - pfd = await self._parse_file(project_name, file_path, project_root=effective_root) + logger.debug("Tier2 batch {}: parsed {}/{} files", batch_id, file_idx, len(live_paths)) + pfd = await self._parse_file( + project_name, + file_path, + project_root=effective_root, + source=file_sources.get(file_path), + ) if pfd is _SENTINEL_DELETED: deleted_files.append(file_path) elif pfd is not None: @@ -556,7 +631,13 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no text_hash = EmbedCache.hash_text(text) embed_candidates[entity.qualified_name] = (ref, text_hash) - # 6. Accumulate rels for deferred resolution + # 6. Write back file hashes for processed files + if new_hashes: + hashes_to_write = {fp: new_hashes[fp] for fp in parsed_files if fp in new_hashes} + if hashes_to_write: + await self.graph.set_batch_file_hashes(project_name, hashes_to_write) + + # 7. Accumulate rels for deferred resolution group_import_rels = [r for pfd in parsed_files.values() for r in pfd.import_rels] group_call_rels = [r for pfd in parsed_files.values() for r in pfd.call_rels] group_type_rels = [r for pfd in parsed_files.values() for r in pfd.type_rels] diff --git a/src/code_atlas/settings.py b/src/code_atlas/settings.py index aca7aab..58ed790 100644 --- a/src/code_atlas/settings.py +++ b/src/code_atlas/settings.py @@ -272,6 +272,10 @@ class IndexSettings(BaseSettings): description="Stale index behavior: 'warn' (annotate), 'lock' (refuse), 'ignore' (skip).", ) max_source_chars: int = Field(default=2000, description="Max characters for entity source text (0 to disable).") + file_hash_gate: bool = Field(default=True, description="Skip files whose content hash hasn't changed.") + strip_whitespace: bool = Field( + default=True, description="Normalize whitespace before hashing (ignores formatting-only changes)." + ) class ObservabilitySettings(BaseSettings): From 533ac73e01ce4285fb15c49513096da05637104f Mon Sep 17 00:00:00 2001 From: SerPeter <43622448+serpeter@users.noreply.github.com> Date: Tue, 3 Mar 2026 20:45:51 +0100 Subject: [PATCH 3/6] feat(indexing): add per-file cooldown for daemon mode --- src/code_atlas/indexing/consumers.py | 59 ++++++++++++++++++++++++++++ src/code_atlas/indexing/daemon.py | 2 +- src/code_atlas/settings.py | 1 + 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/src/code_atlas/indexing/consumers.py b/src/code_atlas/indexing/consumers.py index 2afe685..aeae0f0 100644 --- a/src/code_atlas/indexing/consumers.py +++ b/src/code_atlas/indexing/consumers.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio +import contextlib import hashlib import re import uuid @@ -345,6 +346,7 @@ def __init__( project_root: Path | None = None, project_filter: set[str] | None = None, policy: BatchPolicy | None = None, + cooldown_s: float = 0.0, ) -> None: super().__init__( bus=bus, @@ -360,6 +362,12 @@ def __init__( self.stats = Tier2Stats() self._detectors = get_enabled_detectors(settings.detectors.enabled) + # Per-file cooldown state (daemon mode) + self._cooldown_s = cooldown_s + self._cooldowns: dict[str, float] = {} # file_path → expiry (monotonic) + self._deferred: dict[str, FileChanged] = {} # file_path → latest deferred event + self._deferred_drain_task: asyncio.Task[None] | None = None + # Deferred resolution state — accumulate rels across batches, flush periodically. # In reindex mode (time_window_s=0, block_ms=50) use larger intervals to skip # redundant resolution; daemon mode (default policy) resolves every batch. @@ -378,6 +386,32 @@ def dedup_key(self, event: Event) -> str: return event.path return super().dedup_key(event) + async def _pre_run(self) -> None: + if self._cooldown_s > 0: + self._deferred_drain_task = asyncio.create_task(self._drain_deferred_loop()) + + async def _post_run(self) -> None: + if self._deferred_drain_task is not None: + self._deferred_drain_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._deferred_drain_task + self._deferred_drain_task = None + + async def _drain_deferred_loop(self) -> None: + """Re-publish deferred events whose cooldown has expired. + + Runs as a background task so deferred events are eventually + processed even if no new events arrive to trigger ``process_batch``. + """ + while not self._stop: + await asyncio.sleep(2.0) + now = asyncio.get_event_loop().time() + expired = [(fp, ev) for fp, ev in self._deferred.items() if now >= self._cooldowns.get(fp, 0)] + for fp, _ in expired: + del self._deferred[fp] + if expired: + await self.bus.publish_many(Topic.FILE_CHANGED, [ev for _, ev in expired]) + async def run(self) -> None: try: await super().run() @@ -462,6 +496,25 @@ async def _parse_file( async def process_batch(self, events: list[Event], batch_id: str) -> None: # noqa: PLR0912, PLR0915 with _tracer.start_as_current_span("tier2.process_batch", attributes={"batch_id": batch_id}) as span: + # Per-file cooldown filter: defer events for recently-processed files + if self._cooldown_s > 0: + now = asyncio.get_event_loop().time() + # Clean expired cooldowns + self._cooldowns = {fp: exp for fp, exp in self._cooldowns.items() if exp > now} + processable: list[Event] = [] + deferred_count = 0 + for ev in events: + if isinstance(ev, FileChanged) and ev.path in self._cooldowns: + self._deferred[ev.path] = ev # latest overwrites + deferred_count += 1 + else: + processable.append(ev) + if deferred_count: + logger.debug("Tier2 batch {}: {} event(s) deferred by cooldown", batch_id, deferred_count) + events = processable + if not events: + return + # Group paths by (project_name, project_root) — monorepo batches can mix sub-projects groups: dict[tuple[str, str], list[str]] = {} for e in events: @@ -647,6 +700,12 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no self._pending_type_rels.extend(group_type_rels) self._pending_project_names.add(project_name) + # 8. Set per-file cooldown for processed files + if self._cooldown_s > 0: + expiry = asyncio.get_event_loop().time() + self._cooldown_s + for fp in list(parsed_files) + deleted_files: + self._cooldowns[fp] = expiry + self._batches_since_resolve += 1 now = asyncio.get_event_loop().time() if ( diff --git a/src/code_atlas/indexing/daemon.py b/src/code_atlas/indexing/daemon.py index 800d623..85b40c8 100644 --- a/src/code_atlas/indexing/daemon.py +++ b/src/code_atlas/indexing/daemon.py @@ -78,7 +78,7 @@ async def start( self._cache = cache consumers: list[Tier2ASTConsumer | Tier3EmbedConsumer] = [ - Tier2ASTConsumer(bus, graph, settings), + Tier2ASTConsumer(bus, graph, settings, cooldown_s=settings.watcher.cooldown_s), ] if embed is not None: consumers.append(Tier3EmbedConsumer(bus, graph, embed, cache=cache)) diff --git a/src/code_atlas/settings.py b/src/code_atlas/settings.py index 58ed790..4e9657f 100644 --- a/src/code_atlas/settings.py +++ b/src/code_atlas/settings.py @@ -293,6 +293,7 @@ class WatcherSettings(BaseSettings): debounce_s: float = Field(default=5.0, description="Debounce timer in seconds (resets per change).") max_wait_s: float = Field(default=30.0, description="Max-wait ceiling in seconds (per batch).") + cooldown_s: float = Field(default=10.0, description="Per-file cooldown after processing (seconds). 0 disables.") class McpSettings(BaseSettings): From b7c9db3af34474232b5429db965de60d21fc79cb Mon Sep 17 00:00:00 2001 From: SerPeter <43622448+serpeter@users.noreply.github.com> Date: Tue, 3 Mar 2026 21:34:11 +0100 Subject: [PATCH 4/6] test(indexing): add integration tests for two-tier pipeline, hash gate, and cooldown --- tests/integration/indexing/test_consumers.py | 274 ++++++++++++++++++- 1 file changed, 273 insertions(+), 1 deletion(-) diff --git a/tests/integration/indexing/test_consumers.py b/tests/integration/indexing/test_consumers.py index 3de8c27..3824559 100644 --- a/tests/integration/indexing/test_consumers.py +++ b/tests/integration/indexing/test_consumers.py @@ -5,6 +5,10 @@ from __future__ import annotations +import asyncio +import time +from typing import TYPE_CHECKING + import pytest from code_atlas.events import ( @@ -13,6 +17,11 @@ Topic, decode_event, ) +from code_atlas.indexing.consumers import BatchPolicy, Tier2ASTConsumer + +if TYPE_CHECKING: + from code_atlas.graph.client import GraphClient + from code_atlas.settings import AtlasSettings # All tests in this module require a live Redis/Valkey pytestmark = pytest.mark.integration @@ -35,8 +44,15 @@ async def _clean_streams(event_bus: EventBus): await event_bus._redis.delete(key) +def _write_python_file(root, rel_path: str, content: str) -> None: + """Write a Python file under *root* at the given relative path.""" + full = root / rel_path + full.parent.mkdir(parents=True, exist_ok=True) + full.write_text(content, encoding="utf-8") + + # --------------------------------------------------------------------------- -# Tests +# EventBus tests # --------------------------------------------------------------------------- @@ -96,3 +112,259 @@ async def test_dedup_within_batch(event_bus: EventBus) -> None: assert len(pending) == 1 assert pending["src/main.py"].timestamp == 1004.0 + + +# --------------------------------------------------------------------------- +# Tier 2 consumer tests +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("_clean_streams") +async def test_tier2_consumes_file_changed( + event_bus: EventBus, + graph_client: GraphClient, + settings: AtlasSettings, +) -> None: + """Tier 2 consumes FileChanged from the file-changed topic and writes entities to graph.""" + await graph_client.ensure_schema() + + # Write a Python file for Tier 2 to parse + _write_python_file(settings.project_root, "hello.py", "def greet(name: str) -> str:\n return f'Hello {name}'\n") + + consumer = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + ) + + # Publish a FileChanged and let the consumer process it + project_name = settings.project_root.resolve().name + await event_bus.publish( + Topic.FILE_CHANGED, + FileChanged( + path="hello.py", + change_type="created", + timestamp=time.time(), + project_name=project_name, + project_root=str(settings.project_root), + ), + ) + + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(1.0) + consumer.stop() + await asyncio.wait_for(task, timeout=5.0) + + assert consumer.stats.files_processed >= 1 + assert consumer.stats.entities_added >= 1 + + +@pytest.mark.usefixtures("_clean_streams") +async def test_file_hash_gate_skips_unchanged( + event_bus: EventBus, + graph_client: GraphClient, + settings: AtlasSettings, +) -> None: + """Hash gate skips a file when content hasn't changed between runs.""" + await graph_client.ensure_schema() + + _write_python_file(settings.project_root, "stable.py", "X = 42\n") + + project_name = settings.project_root.resolve().name + ev = FileChanged( + path="stable.py", + change_type="modified", + timestamp=time.time(), + project_name=project_name, + project_root=str(settings.project_root), + ) + + # First run: processes the file and stores its hash + c1 = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + ) + await event_bus.publish(Topic.FILE_CHANGED, ev) + task = asyncio.create_task(c1.run()) + await asyncio.sleep(1.0) + c1.stop() + await asyncio.wait_for(task, timeout=5.0) + assert c1.stats.files_processed >= 1 + + # Second run: same file, same content — should be skipped + c2 = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + ) + await event_bus.publish(Topic.FILE_CHANGED, ev) + task = asyncio.create_task(c2.run()) + await asyncio.sleep(1.0) + c2.stop() + await asyncio.wait_for(task, timeout=5.0) + + assert c2.stats.files_skipped >= 1 + assert c2.stats.files_processed == 0 + + +@pytest.mark.usefixtures("_clean_streams") +async def test_file_hash_gate_processes_modified( + event_bus: EventBus, + graph_client: GraphClient, + settings: AtlasSettings, +) -> None: + """Hash gate allows a file through when content changes between runs.""" + await graph_client.ensure_schema() + + _write_python_file(settings.project_root, "changing.py", "X = 1\n") + + project_name = settings.project_root.resolve().name + ev = FileChanged( + path="changing.py", + change_type="modified", + timestamp=time.time(), + project_name=project_name, + project_root=str(settings.project_root), + ) + + # First run + c1 = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + ) + await event_bus.publish(Topic.FILE_CHANGED, ev) + task = asyncio.create_task(c1.run()) + await asyncio.sleep(1.0) + c1.stop() + await asyncio.wait_for(task, timeout=5.0) + assert c1.stats.files_processed >= 1 + + # Modify the file + _write_python_file(settings.project_root, "changing.py", "X = 2\nY = 3\n") + + # Second run: changed content — should process again + c2 = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + ) + await event_bus.publish(Topic.FILE_CHANGED, ev) + task = asyncio.create_task(c2.run()) + await asyncio.sleep(1.0) + c2.stop() + await asyncio.wait_for(task, timeout=5.0) + + assert c2.stats.files_processed >= 1 + + +@pytest.mark.usefixtures("_clean_streams") +async def test_cooldown_defers_rapid_edits( + event_bus: EventBus, + graph_client: GraphClient, + settings: AtlasSettings, +) -> None: + """Per-file cooldown defers rapid re-edits so only the first is processed immediately.""" + await graph_client.ensure_schema() + + _write_python_file(settings.project_root, "rapid.py", "A = 1\n") + + project_name = settings.project_root.resolve().name + + consumer = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + cooldown_s=60.0, # Long cooldown — second event should be deferred + ) + + # Publish first event + await event_bus.publish( + Topic.FILE_CHANGED, + FileChanged( + path="rapid.py", + change_type="modified", + timestamp=time.time(), + project_name=project_name, + project_root=str(settings.project_root), + ), + ) + + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(1.0) + + # First event should be processed + assert consumer.stats.files_processed >= 1 + first_processed = consumer.stats.files_processed + + # Publish a second event for the same file — should be deferred + await event_bus.publish( + Topic.FILE_CHANGED, + FileChanged( + path="rapid.py", + change_type="modified", + timestamp=time.time(), + project_name=project_name, + project_root=str(settings.project_root), + ), + ) + await asyncio.sleep(1.0) + + consumer.stop() + await asyncio.wait_for(task, timeout=5.0) + + # Second event deferred — files_processed should not have increased + assert consumer.stats.files_processed == first_processed + assert "rapid.py" in consumer._deferred + + +@pytest.mark.usefixtures("_clean_streams") +async def test_cooldown_disabled_processes_all( + event_bus: EventBus, + graph_client: GraphClient, + settings: AtlasSettings, +) -> None: + """With cooldown_s=0, all events are processed immediately (reindex mode).""" + await graph_client.ensure_schema() + + _write_python_file(settings.project_root, "nodelay.py", "Z = 1\n") + + project_name = settings.project_root.resolve().name + + consumer = Tier2ASTConsumer( + event_bus, + graph_client, + settings, + policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50), + cooldown_s=0.0, # No cooldown + ) + + # Publish two events for the same file + for i in range(2): + await event_bus.publish( + Topic.FILE_CHANGED, + FileChanged( + path="nodelay.py", + change_type="modified", + timestamp=time.time() + i, + project_name=project_name, + project_root=str(settings.project_root), + ), + ) + # Small gap so they arrive in separate batches + await asyncio.sleep(0.1) + + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(2.0) + consumer.stop() + await asyncio.wait_for(task, timeout=5.0) + + # Both events processed — no deferral + assert not consumer._deferred From 168c9eb8638572e180cc58a2a51c0501ffbfbbf8 Mon Sep 17 00:00:00 2001 From: SerPeter <43622448+serpeter@users.noreply.github.com> Date: Tue, 3 Mar 2026 22:43:57 +0100 Subject: [PATCH 5/6] refactor(indexing): rename Tier 2/3 to AST/Embed stage across code and docs --- CHANGELOG.md | 6 +- CLAUDE.md | 8 +- docs/adr/0004-event-driven-tiered-pipeline.md | 67 ++++++----- docs/adr/0005-deployment-process-model.md | 22 ++-- docs/adr/0006-pure-python-tree-sitter.md | 8 +- docs/architecture.md | 64 +++++------ docs/benchmarks.md | 16 +-- docs/guides/repo-guidelines.md | 2 +- scripts/profile_index.py | 68 ++++++------ src/code_atlas/events.py | 2 +- src/code_atlas/indexing/__init__.py | 8 +- src/code_atlas/indexing/consumers.py | 70 ++++++------ src/code_atlas/indexing/daemon.py | 14 +-- src/code_atlas/indexing/orchestrator.py | 104 +++++++++--------- src/code_atlas/search/embeddings.py | 2 +- src/code_atlas/settings.py | 2 +- tests/conftest.py | 2 +- tests/integration/indexing/test_consumers.py | 24 ++-- tests/unit/search/test_embeddings.py | 22 ++-- 19 files changed, 246 insertions(+), 265 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 454257b..07d273f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ - **ci**: Resolve ty check failures with --all-extras in CI ([`3f74816`](https://github.com/SerPeter/code-atlas/commit/3f7481635091d2d676aed75c3fbcaa5db4332242)) -- **consumers**: Group batches by project in Tier1/Tier2 +- **consumers**: Group batches by project in AST/Embed consumers ([#2](https://github.com/SerPeter/code-atlas/pull/2), [`5107b24`](https://github.com/SerPeter/code-atlas/commit/5107b24a7dfbcb44cadc7917f632ae6a9743c057)) @@ -190,7 +190,7 @@ - **docs**: Add markdown parser with tree-sitter-markdown ([`e8d372c`](https://github.com/SerPeter/code-atlas/commit/e8d372c162652d6d73d1f66da5e14a61fcb2136a)) -- **embeddings**: Add EmbedClient with litellm routing and Tier 3 pipeline +- **embeddings**: Add EmbedClient with litellm routing and embed pipeline ([`ad7c972`](https://github.com/SerPeter/code-atlas/commit/ad7c9726f2e48fdb8746b50547089c5c483bcb75)) - **embeddings**: Add three-tier embedding cache with Valkey backend @@ -241,7 +241,7 @@ - **naming**: Worktree-aware naming and monorepo sub-project prefixing ([`2acdfb3`](https://github.com/SerPeter/code-atlas/commit/2acdfb33ba4b486f966272a01cf8a37f670661f6)) -- **parser**: Add py-tree-sitter parser, implement Tier 2 pipeline, drop Rust +- **parser**: Add py-tree-sitter parser, implement AST pipeline, drop Rust ([`d56e7d2`](https://github.com/SerPeter/code-atlas/commit/d56e7d2a686ec279a52d85bbc4903f4d85f51a4e)) - **parsing**: Add multi-language support (10 languages, 7 modules) diff --git a/CLAUDE.md b/CLAUDE.md index c4c639a..41ca211 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -51,7 +51,7 @@ src/code_atlas/ ├── __init__.py # __version__ only ├── schema.py # Graph schema (labels, relationships, DDL generators) ├── settings.py # Pydantic configuration (atlas.toml + env vars) -├── events.py # Event types (FileChanged, ASTDirty, EmbedDirty) + Valkey Streams EventBus +├── events.py # Event types (FileChanged, EmbedDirty) + Valkey Streams EventBus ├── telemetry.py # OpenTelemetry integration ├── cli.py # Typer CLI entrypoint (index, search, status, mcp, daemon commands) │ @@ -69,7 +69,7 @@ src/code_atlas/ │ ├── indexing/ │ ├── orchestrator.py # Full-index, monorepo detection, staleness checking -│ ├── consumers.py # Tier 1/2/3 event consumers (batch-pull pattern) +│ ├── consumers.py # AST + Embed event consumers (batch-pull pattern) │ ├── watcher.py # Filesystem watcher (watchfiles + hybrid debounce) │ └── daemon.py # Daemon lifecycle manager (watcher + pipeline) │ @@ -78,13 +78,13 @@ src/code_atlas/ └── health.py # Infrastructure health checks + diagnostics ``` -**Event Pipeline:** File Watcher → Valkey Streams → Tier 1 (graph metadata) → Tier 2 (AST diff + gate) → Tier 3 (embeddings) → Memgraph +**Event Pipeline:** File Watcher → Valkey Streams → AST stage (hash gate + parse + diff) → Embed stage (embeddings) → Memgraph **Query Pipeline:** MCP Server → Query Router → [Graph Search | Vector Search | BM25 Search] → RRF Fusion → Results **Deployment:** Daemon (`atlas daemon start`) for indexing + MCP (`atlas mcp`) per agent session, decoupled via Valkey + Memgraph -**Event model:** Events are atomic — one logical change per event (one file per ASTDirty, one entity per EmbedDirty). Never bundle lists of work items into a single event; use `EventBus.publish_many()` for network-efficient batch publishing. The consumer's `max_batch_size` must directly control work volume, not just message count. +**Event model:** Events are atomic — one logical change per event (one file per FileChanged, one entity per EmbedDirty). Never bundle lists of work items into a single event; use `EventBus.publish_many()` for network-efficient batch publishing. The consumer's `max_batch_size` must directly control work volume, not just message count. **Infrastructure:** Memgraph (graph DB, port 7687), TEI (embeddings, port 8080), Valkey (event bus, port 6379) diff --git a/docs/adr/0004-event-driven-tiered-pipeline.md b/docs/adr/0004-event-driven-tiered-pipeline.md index b817c98..3664c2f 100644 --- a/docs/adr/0004-event-driven-tiered-pipeline.md +++ b/docs/adr/0004-event-driven-tiered-pipeline.md @@ -45,53 +45,50 @@ Redis Streams provide the pub/sub backbone with consumer groups: Typed frozen dataclasses with JSON serialization for Redis transport: - `FileChanged(path, change_type, timestamp)` — published by file watcher -- `ASTDirty(paths, batch_id)` — published by Tier 1 -- `EmbedDirty(entities: list[EntityRef], significance, batch_id)` — published by Tier 2 +- `EmbedDirty(entities: list[EntityRef], significance, batch_id)` — published by AST stage -### Three-Stream Pipeline +### Two-Stage Pipeline ``` - atlas:file-changed atlas:ast-dirty atlas:embed-dirty - stream stream stream - │ │ │ - ┌──────▼───────┐ ┌──────▼───────┐ ┌──────▼───────┐ - File Watcher ────► │ Tier 1 │ ──────► │ Tier 2 │ ─gate─►│ Tier 3 │ - │ Graph Metadata│ always │ AST Diff + │ only │ Embeddings │ - │ (0.5s batch) │ │ Graph Update │ if sig │ (15s batch) │ - └──────────────┘ │ (3s batch) │ change └──────────────┘ - └──────────────┘ + atlas:file-changed atlas:embed-dirty + stream stream + │ │ + ┌──────▼───────┐ ┌──────▼───────┐ + File Watcher ────► │ AST Stage │ ─────── significance gate ───► │ Embed Stage │ + │ hash gate + │ only if semantically changed │ Embeddings │ + │ parse + diff │ │ (15s batch) │ + │ (3s batch) │ └──────────────┘ + └──────────────┘ ``` -Each tier pulls at its own pace via `XREADGROUP`, deduplicates within its batch window, and publishes downstream only if -warranted. +Each stage pulls at its own pace via `XREADGROUP`, deduplicates within its batch window, and publishes downstream only +if warranted. ### Per-Consumer Batch Policy -| Tier | Window | Max Batch | Dedup Key | -| -------------- | ------ | --------- | --------------------- | -| Tier 1 (Graph) | 0.5s | 50 | File path | -| Tier 2 (AST) | 3.0s | 20 | File path | -| Tier 3 (Embed) | 15.0s | 100 | Entity qualified name | +| Stage | Window | Max Batch | Dedup Key | +| ----- | ------ | --------- | --------------------- | +| AST | 3.0s | 30 | File path | +| Embed | 15.0s | 100 | Entity qualified name | Hybrid batching: flush when count OR time threshold hit, whichever first. Same file changed 5× in window = 1 work item. ### Event Data Flow ``` -FileChanged ASTDirty EmbedDirty -┌─────────────┐ ┌──────────────────┐ ┌──────────────────────────┐ -│ path: str │ │ paths: [str] │ │ entities: [EntityRef] │ -│ change_type │ ─Tier 1──► │ batch_id: str │ ─Tier 2─►│ significance: str │ -│ timestamp │ └──────────────────┘ gate │ batch_id: str │ -└─────────────┘ └──────────────────────────┘ - EntityRef: - qualified_name, node_type, - file_path +FileChanged EmbedDirty +┌─────────────┐ ┌──────────────────────────┐ +│ path: str │ │ entity: EntityRef │ +│ change_type │ ─── AST stage ── sig gate ────────► │ significance: str │ +│ timestamp │ └──────────────────────────┘ +└─────────────┘ EntityRef: + qualified_name, node_type, + file_path ``` -### Significance Gating (Tier 2 → 3) +### Significance Gating (AST → Embed) -Tier 2 evaluates whether a change is semantically significant enough to warrant re-embedding: +The AST stage evaluates whether a change is semantically significant enough to warrant re-embedding: | Condition | Level | Action | | --------------------------- | -------- | ------------------- | @@ -115,25 +112,25 @@ own retries through this mechanism, avoiding the need for a separate dead-letter - Cheap operations (staleness flags, graph metadata) are near-instant — MCP queries reflect changes within ~1s - Expensive operations (embeddings) only run when semantically justified — significant cost reduction - Decoupled stages can be developed, tested, and scaled independently -- Batching per tier matches the cost profile of each operation +- Batching per stage matches the cost profile of each operation - Multi-process from day one — no rewrite needed when scaling - Dual-use of Valkey for event bus + embedding cache -- Natural extension point: new tiers or event types can be added without restructuring +- Natural extension point: new stages or event types can be added without restructuring ### Negative - More architectural complexity than a simple "reindex everything on change" - Significance threshold heuristics need tuning and may produce false negatives (skipping re-embeds that should have happened) -- Debugging event flow across tiers is harder than a linear pipeline +- Debugging event flow across stages is harder than a linear pipeline - Additional infrastructure dependency (Valkey), though lightweight ### Risks - Threshold tuning: too aggressive = stale embeddings, too conservative = excessive TEI calls. Need observability on gate decisions. -- Event ordering: if Tier 2 processes file A before file B, but B depends on A's entities, the diff may be incorrect. - Batch boundaries must align with dependency boundaries. +- Event ordering: if the AST stage processes file A before file B, but B depends on A's entities, the diff may be + incorrect. Batch boundaries must align with dependency boundaries. - Complexity creep: the event bus must stay simple. If we find ourselves adding routing rules, dead-letter queues, or retry logic, we've gone too far. diff --git a/docs/adr/0005-deployment-process-model.md b/docs/adr/0005-deployment-process-model.md index 3807d8c..ce4e57e 100644 --- a/docs/adr/0005-deployment-process-model.md +++ b/docs/adr/0005-deployment-process-model.md @@ -95,12 +95,12 @@ decoupled via Valkey Streams and Memgraph: └───────┬────────┘ ▼ ┌────────────────┐ - │ Create Consumer│ Idempotent XGROUP CREATE for all 3 streams + │ Create Consumer│ Idempotent XGROUP CREATE for pipeline streams │ Groups │ └───────┬────────┘ ▼ ┌────────────────┐ - │ Start Tier │ asyncio.gather(tier1.run(), tier2.run(), tier3.run()) + │ Start Pipeline │ asyncio.gather(ast.run(), embed.run()) │ Consumers │ └───────┬────────┘ ▼ @@ -111,7 +111,7 @@ decoupled via Valkey Streams and Memgraph: ▼ ┌────────────────┐ Git-based fast path: diff stored_commit..HEAD │ Reconcile │ Fallback: mtime comparison for non-git or rebases - │ (progressive) │ Enqueue stale files → Tier 1 → 2 → 3 + │ (progressive) │ Enqueue stale files → AST → Embed └───────┬────────┘ ▼ ┌────────────────┐ @@ -230,17 +230,11 @@ Queries: Agent calls MCP tools ─────► Memgraph ◄──── Da ### Data Flow at Runtime ``` - ┌──────────┐ FileChanged ┌─────────┐ ASTDirty ┌─────────┐ - │ File │ ──► events ────────► │ Tier 1 │ ──► events ─────────► │ Tier 2 │ - │ Watcher │ (Valkey Stream) │ (graph) │ (Valkey Stream) │ (AST) │ - └──────────┘ └─────────┘ └────┬────┘ - gate │ - EmbedDirty│ - (if sig) │ - ┌────▼────┐ - │ Tier 3 │ - │ (embed) │ - └────┬────┘ + ┌──────────┐ FileChanged ┌───────────┐ EmbedDirty ┌───────────┐ + │ File │ ──► events ────────► │ AST Stage │ ──► events ──────► │ Embed │ + │ Watcher │ (Valkey Stream) │ (parse) │ (Valkey Stream) │ Stage │ + └──────────┘ └─────┬─────┘ └─────┬────┘ + │ │ │ ┌──────────┐ │ Agent ◄──── MCP Server ◄──── reads │ Memgraph │ ◄──── writes ────────┘ diff --git a/docs/adr/0006-pure-python-tree-sitter.md b/docs/adr/0006-pure-python-tree-sitter.md index 0b4adc0..e217c52 100644 --- a/docs/adr/0006-pure-python-tree-sitter.md +++ b/docs/adr/0006-pure-python-tree-sitter.md @@ -17,7 +17,7 @@ actual cost breakdown: - **Subprocess overhead** (spawn, JSON serialization, IPC) exceeded the parse time itself for typical files - **Build complexity** required both `uv` and `cargo` toolchains in dev/CI/Docker - **Contributor friction** — Rust was isolated to one component, but still required a full toolchain install -- **Parallelism** is already handled by the event bus (multiple Tier 2 consumer instances via Valkey Streams), not by +- **Parallelism** is already handled by the event bus (multiple AST consumer instances via Valkey Streams), not by Rust's threading model Meanwhile, `py-tree-sitter` uses the exact same C parsing library (tree-sitter) via Python bindings. The grammar @@ -25,13 +25,13 @@ packages (`tree-sitter-python`, etc.) ship pre-compiled wheels — no compilatio ## Decision -Drop the Rust binary (`crates/atlas-parser`) and use **py-tree-sitter** called in-process within the Tier 2 pipeline +Drop the Rust binary (`crates/atlas-parser`) and use **py-tree-sitter** called in-process within the AST pipeline consumer. The parser module lives at `src/code_atlas/parser.py`. ### Architecture ``` -Tier 2 Consumer +AST Consumer └── parser.parse_file(path, source, project_name) └── tree-sitter C engine (via py-tree-sitter bindings) └── tree-sitter-python grammar (pre-compiled wheel) @@ -39,7 +39,7 @@ Tier 2 Consumer ### Parallelism Model -Multiple Tier 2 consumer instances can run concurrently — each pulls from the `atlas:ast-dirty` Valkey Stream via its +Multiple AST consumer instances can run concurrently — each pulls from the `atlas:file-changed` Valkey Stream via its own consumer group member. This gives process-level parallelism without the GIL concern, since each consumer is an independent process. diff --git a/docs/architecture.md b/docs/architecture.md index 0239dc3..5754cc4 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -25,9 +25,8 @@ graph TB subgraph "Code Atlas — Daemon" FW[File Watcher] - T1[Tier 1: Graph Metadata] - T2[Tier 2: AST Diff] - T3[Tier 3: Embeddings] + AST[AST Stage] + EMB[Embed Stage] end subgraph "Code Atlas — MCP" @@ -72,15 +71,12 @@ graph TB BS --> MG FW --> VK - VK --> T1 - T1 --> VK - VK --> T2 - T2 --> VK - VK --> T3 - T1 --> MG - T2 --> MG - T3 --> MG - T3 --> TEI + VK --> AST + AST --> VK + VK --> EMB + AST --> MG + EMB --> MG + EMB --> TEI ``` ## Component Architecture @@ -111,34 +107,28 @@ Spawned per agent session via `atlas mcp`, it reads Memgraph directly with no de ### Event-Driven Pipeline -The indexing pipeline is event-driven, with three tiers of increasing cost connected via Valkey (Redis) Streams. Each -tier pulls at its own pace, deduplicates within its batch window, and gates downstream work based on significance. +The indexing pipeline is event-driven, with two stages of increasing cost connected via Valkey (Redis) Streams. Each +stage pulls at its own pace, deduplicates within its batch window, and gates downstream work based on significance. ```mermaid graph LR FW[File Watcher] -->|FileChanged| S1[atlas:file-changed] - S1 -->|XREADGROUP| T1[Tier 1: Graph Metadata
0.5s batch, dedup by path] - T1 -->|ASTDirty| S2[atlas:ast-dirty] - S2 -->|XREADGROUP| T2[Tier 2: AST Diff
3s batch, significance gate] - T2 -->|EmbedDirty| S3[atlas:embed-dirty] - S3 -->|XREADGROUP| T3[Tier 3: Embeddings
15s batch, dedup by entity] - T1 -->|write| MG[(Memgraph)] - T2 -->|write| MG - T3 -->|write| MG - T3 -->|embed| TEI[TEI] + S1 -->|XREADGROUP| AST[AST Stage
hash gate + parse + diff] + AST -->|EmbedDirty| S2[atlas:embed-dirty] + S2 -->|XREADGROUP| EMB[Embed Stage
dedup by entity] + AST -->|write| MG[(Memgraph)] + EMB -->|write| MG + EMB -->|embed| TEI[TEI] ``` -**Tier 1 — Graph Metadata** (cheap, ~0.5s batch): Updates file node timestamps and staleness flags. Always publishes -`ASTDirty` downstream. +**AST Stage** (medium cost, ~3s batch): Applies a file hash gate to skip unchanged files, re-parses AST via tree-sitter, +diffs entities, updates graph nodes/edges. Evaluates a significance gate — trivial changes (whitespace, formatting) stop +here; semantic changes (signature, body, docstring) publish `EmbedDirty` to the Embed stage. -**Tier 2 — AST Diff** (medium, ~3s batch): Re-parses AST via tree-sitter, diffs entities, updates graph nodes/edges. -Evaluates a significance gate — trivial changes (whitespace, formatting) stop here; semantic changes (signature, body, -docstring) publish `EmbedDirty` to Tier 3. +**Embed Stage** (expensive, ~15s batch): Re-embeds affected entities via TEI, writes vectors to Memgraph. Deduplicates +by entity qualified name across all events in the batch. -**Tier 3 — Embeddings** (expensive, ~15s batch): Re-embeds affected entities via TEI, writes vectors to Memgraph. -Deduplicates by entity qualified name across all events in the batch. - -**Significance Gate (Tier 2 → 3):** +**Significance Gate (AST → Embed):** - Whitespace/formatting only → stop - Non-docstring comment → stop @@ -259,17 +249,17 @@ MCP server is spawned per agent session. ```bash docker compose up -d # Memgraph (7687) + Valkey (6379) docker compose --profile tei up -d # Include TEI (8080) for local embeddings -atlas daemon start # File watcher + tier consumers (long-running) +atlas daemon start # File watcher + AST/Embed consumers (long-running) atlas mcp # MCP server — stdio (Claude Code, Cursor) atlas mcp --transport http # MCP server — Streamable HTTP (VS Code, JetBrains) ``` -The daemon publishes file change events to Valkey Streams, where tier consumers process them and write to Memgraph. The -MCP server reads Memgraph directly — no dependency on the daemon for queries, so agents can query immediately even with -a stale index. +The daemon publishes file change events to Valkey Streams, where the AST and Embed consumers process them and write to +Memgraph. The MCP server reads Memgraph directly — no dependency on the daemon for queries, so agents can query +immediately even with a stale index. On startup, the daemon runs a reconciliation pass: compares filesystem state against the index and enqueues stale files -through the pipeline progressively (Tier 1 first, then 2, then 3). +through the pipeline progressively (AST stage first, then Embed stage). See [ADR-0005](adr/0005-deployment-process-model.md) for full rationale. diff --git a/docs/benchmarks.md b/docs/benchmarks.md index dcbcd14..e90716f 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -5,14 +5,14 @@ Measured on the code-atlas repo (107 Python files, 2,706 entities) with local TEI embeddings. See `scripts/profile_index.py --full`. -| Stage | Wall Time | Notes | -| ------------------- | --------- | ---------------------------------------------- | -| Scan + packages | 0.1s | File discovery + package hierarchy | -| Tier 2 (AST+graph) | ~26s | Parse, upsert, resolve imports/calls/types | -| Tier 3 (embeddings) | ~52s | Embed API + graph writes (8 concurrent) | -| **Total** | **55s** | Embedding-bound; cached reindex is much faster | - -Tier 2 and Tier 3 overlap — embedding starts as soon as the first batch of entities is written. The bottleneck is the +| Stage | Wall Time | Notes | +| ------------------ | --------- | ---------------------------------------------- | +| Scan + packages | 0.1s | File discovery + package hierarchy | +| AST (parse+graph) | ~26s | Parse, upsert, resolve imports/calls/types | +| Embed (embeddings) | ~52s | Embed API + graph writes (8 concurrent) | +| **Total** | **55s** | Embedding-bound; cached reindex is much faster | + +AST and Embed stages overlap — embedding starts as soon as the first batch of entities is written. The bottleneck is the embedding API (75.8s cumulative across 8 workers, ~3.2s avg per batch of 128 entities). ## Parse-Only Throughput diff --git a/docs/guides/repo-guidelines.md b/docs/guides/repo-guidelines.md index 8bcf9d8..1681f0d 100644 --- a/docs/guides/repo-guidelines.md +++ b/docs/guides/repo-guidelines.md @@ -103,7 +103,7 @@ invisible to AST parsing. ```python # Do -class Tier2Consumer(TierConsumer): ... +class ASTConsumer(TierConsumer): ... # Don't Consumer = make_consumer(TierConsumer, features=["ast"]) diff --git a/scripts/profile_index.py b/scripts/profile_index.py index 1d42eb9..138445d 100644 --- a/scripts/profile_index.py +++ b/scripts/profile_index.py @@ -209,25 +209,25 @@ def patch_all(): orch._wait_for_drain = timed_async("orch.wait_for_drain")(orch._wait_for_drain) # --- Parser --- - parser_mod.parse_file = timed_sync("tier2.parse_file")(parser_mod.parse_file) + parser_mod.parse_file = timed_sync("ast.parse_file")(parser_mod.parse_file) # --- Detectors --- - det_mod.run_detectors = timed_async("tier2.run_detectors")(det_mod.run_detectors) + det_mod.run_detectors = timed_async("ast.run_detectors")(det_mod.run_detectors) # --- Graph client methods --- - gc.GraphClient.upsert_file_entities = timed_async("tier2.upsert_file_entities")(gc.GraphClient.upsert_file_entities) - gc.GraphClient.upsert_batch_entities = timed_async("tier2.upsert_batch_entities")( + gc.GraphClient.upsert_file_entities = timed_async("ast.upsert_file_entities")(gc.GraphClient.upsert_file_entities) + gc.GraphClient.upsert_batch_entities = timed_async("ast.upsert_batch_entities")( gc.GraphClient.upsert_batch_entities ) - gc.GraphClient.resolve_imports = timed_async("tier2.resolve_imports")(gc.GraphClient.resolve_imports) - gc.GraphClient.build_resolution_lookup = timed_async("tier2.build_resolution_lookup")( + gc.GraphClient.resolve_imports = timed_async("ast.resolve_imports")(gc.GraphClient.resolve_imports) + gc.GraphClient.build_resolution_lookup = timed_async("ast.build_resolution_lookup")( gc.GraphClient.build_resolution_lookup ) - gc.GraphClient.resolve_calls = timed_async("tier2.resolve_calls")(gc.GraphClient.resolve_calls) - gc.GraphClient.resolve_type_refs = timed_async("tier2.resolve_type_refs")(gc.GraphClient.resolve_type_refs) + gc.GraphClient.resolve_calls = timed_async("ast.resolve_calls")(gc.GraphClient.resolve_calls) + gc.GraphClient.resolve_type_refs = timed_async("ast.resolve_type_refs")(gc.GraphClient.resolve_type_refs) gc.GraphClient.merge_package_batch = timed_async("orch.merge_package_batch")(gc.GraphClient.merge_package_batch) - gc.GraphClient.delete_file_entities = timed_async("tier2.delete_file_entities")(gc.GraphClient.delete_file_entities) - gc.GraphClient.apply_property_enrichments = timed_async("tier2.apply_enrichments")( + gc.GraphClient.delete_file_entities = timed_async("ast.delete_file_entities")(gc.GraphClient.delete_file_entities) + gc.GraphClient.apply_property_enrichments = timed_async("ast.apply_enrichments")( gc.GraphClient.apply_property_enrichments ) @@ -249,16 +249,16 @@ def patch_all(): gc.GraphClient._recreate_batch_relationships ) - # --- Tier 3 --- - gc.GraphClient.read_entity_texts = timed_async("tier3.read_entity_texts")(gc.GraphClient.read_entity_texts) - gc.GraphClient.write_embeddings_and_hashes = timed_async("tier3.write_embeddings_and_hashes")( + # --- Embed stage --- + gc.GraphClient.read_entity_texts = timed_async("embed.read_entity_texts")(gc.GraphClient.read_entity_texts) + gc.GraphClient.write_embeddings_and_hashes = timed_async("embed.write_embeddings_and_hashes")( gc.GraphClient.write_embeddings_and_hashes ) - emb_mod.EmbedClient.embed_batch = timed_async("tier3.embed_api")(emb_mod.EmbedClient.embed_batch) + emb_mod.EmbedClient.embed_batch = timed_async("embed.embed_api")(emb_mod.EmbedClient.embed_batch) - # Tier 3 cache - emb_mod.EmbedCache.get_many = timed_async("tier3.cache_get_many")(emb_mod.EmbedCache.get_many) - emb_mod.EmbedCache.put_many = timed_async("tier3.cache_put_many")(emb_mod.EmbedCache.put_many) + # Embed cache + emb_mod.EmbedCache.get_many = timed_async("embed.cache_get_many")(emb_mod.EmbedCache.get_many) + emb_mod.EmbedCache.put_many = timed_async("embed.cache_put_many")(emb_mod.EmbedCache.put_many) # --- EventBus --- EventBus.publish_many = timed_async("bus.publish_many")(EventBus.publish_many) @@ -348,17 +348,17 @@ def print_report(wall_time: float): "orch.wait_for_drain", "orch.merge_package_batch", ], - "Tier 2 (AST + Graph)": [ - "tier2.parse_file", - "tier2.run_detectors", - "tier2.upsert_file_entities", - "tier2.upsert_batch_entities", - "tier2.delete_file_entities", - "tier2.apply_enrichments", - "tier2.resolve_imports", - "tier2.build_resolution_lookup", - "tier2.resolve_calls", - "tier2.resolve_type_refs", + "AST Stage (Parse + Graph)": [ + "ast.parse_file", + "ast.run_detectors", + "ast.upsert_file_entities", + "ast.upsert_batch_entities", + "ast.delete_file_entities", + "ast.apply_enrichments", + "ast.resolve_imports", + "ast.build_resolution_lookup", + "ast.resolve_calls", + "ast.resolve_type_refs", # Upsert inner breakdown "upsert.get_hashes", "upsert.get_batch_hashes", @@ -369,12 +369,12 @@ def print_report(wall_time: float): "upsert.recreate_rels", "upsert.recreate_batch_rels", ], - "Tier 3 (Embeddings)": [ - "tier3.read_entity_texts", - "tier3.cache_get_many", - "tier3.embed_api", - "tier3.cache_put_many", - "tier3.write_embeddings_and_hashes", + "Embed Stage (Embeddings)": [ + "embed.read_entity_texts", + "embed.cache_get_many", + "embed.embed_api", + "embed.cache_put_many", + "embed.write_embeddings_and_hashes", ], "Event Bus (Valkey)": [ "bus.publish_many", diff --git a/src/code_atlas/events.py b/src/code_atlas/events.py index e3731bb..aeab3bd 100644 --- a/src/code_atlas/events.py +++ b/src/code_atlas/events.py @@ -46,7 +46,7 @@ class EntityRef: @dataclass(frozen=True) class EmbedDirty: - """A single entity needs re-embedding (published by Tier 2, consumed by Tier 3).""" + """A single entity needs re-embedding (published by AST stage, consumed by Embed stage).""" entity: EntityRef significance: str # "MODERATE" | "HIGH" diff --git a/src/code_atlas/indexing/__init__.py b/src/code_atlas/indexing/__init__.py index 1b7764a..cbefce9 100644 --- a/src/code_atlas/indexing/__init__.py +++ b/src/code_atlas/indexing/__init__.py @@ -3,9 +3,9 @@ from __future__ import annotations from code_atlas.indexing.consumers import ( + ASTConsumer, BatchPolicy, - Tier2ASTConsumer, - Tier3EmbedConsumer, + EmbedConsumer, TierConsumer, ) from code_atlas.indexing.daemon import DaemonManager @@ -24,17 +24,17 @@ from code_atlas.indexing.watcher import FileWatcher __all__ = [ + "ASTConsumer", "BatchPolicy", "DaemonManager", "DeltaStats", "DetectedProject", + "EmbedConsumer", "FileScope", "FileWatcher", "IndexResult", "StalenessChecker", "StalenessInfo", - "Tier2ASTConsumer", - "Tier3EmbedConsumer", "TierConsumer", "classify_file_project", "detect_sub_projects", diff --git a/src/code_atlas/indexing/consumers.py b/src/code_atlas/indexing/consumers.py index aeae0f0..05ceda2 100644 --- a/src/code_atlas/indexing/consumers.py +++ b/src/code_atlas/indexing/consumers.py @@ -1,9 +1,9 @@ -"""Two-tier consumer pipeline for event-driven indexing. +"""Two-stage consumer pipeline for event-driven indexing. - FileChanged → Tier 2 (hash gate + AST parse + diff) - → significance gate → EmbedDirty → Tier 3 (embeddings) + FileChanged → AST stage (hash gate + AST parse + diff) + → significance gate → EmbedDirty → Embed stage (embeddings) -Each tier uses batch-pull with configurable time/count policy and +Each stage uses batch-pull with configurable time/count policy and deduplicates within its batch window. """ @@ -279,10 +279,10 @@ async def run(self) -> None: # noqa: PLR0912, PLR0915 # --------------------------------------------------------------------------- -# Tier 2: AST parse + graph write (medium cost) +# AST stage: parse + graph write (medium cost) # --------------------------------------------------------------------------- -# Significance levels for the Tier 2 → Tier 3 gate +# Significance levels for the AST → Embed gate # # | Condition | Level | Action | # |----------------------------------|----------|---------------| @@ -304,8 +304,8 @@ async def run(self) -> None: # noqa: PLR0912, PLR0915 @dataclass -class Tier2Stats: - """Accumulated delta statistics for Tier 2 processing.""" +class ASTStats: + """Accumulated delta statistics for AST stage processing.""" files_processed: int = 0 files_skipped: int = 0 @@ -334,8 +334,8 @@ class _ParsedFileData: ) -class Tier2ASTConsumer(TierConsumer): - """Tier 2: Parse AST via tree-sitter, write entities to graph, publish EmbedDirty.""" +class ASTConsumer(TierConsumer): + """AST stage: Parse AST via tree-sitter, write entities to graph, publish EmbedDirty.""" def __init__( self, @@ -351,15 +351,15 @@ def __init__( super().__init__( bus=bus, input_topic=Topic.FILE_CHANGED, - group="tier2-ast", - consumer_name="tier2-ast-0", + group="ast", + consumer_name="ast-0", policy=policy or BatchPolicy(time_window_s=3.0, max_batch_size=30), project_filter=project_filter, ) self.graph = graph self.settings = settings self._project_root = project_root or Path(settings.project_root) - self.stats = Tier2Stats() + self.stats = ASTStats() self._detectors = get_enabled_detectors(settings.detectors.enabled) # Per-file cooldown state (daemon mode) @@ -472,12 +472,12 @@ async def _parse_file( try: source = full_path.read_bytes() except OSError: - logger.warning("Tier2: cannot read {}", file_path) + logger.warning("AST: cannot read {}", file_path) return None parsed = parse_file(file_path, source, project_name, max_source_chars=self.settings.index.max_source_chars) if parsed is None: - logger.debug("Tier2: unsupported language for {}", file_path) + logger.debug("AST: unsupported language for {}", file_path) return None det_result = await run_detectors(self._detectors, parsed, project_name, self.graph) @@ -495,7 +495,7 @@ async def _parse_file( ) async def process_batch(self, events: list[Event], batch_id: str) -> None: # noqa: PLR0912, PLR0915 - with _tracer.start_as_current_span("tier2.process_batch", attributes={"batch_id": batch_id}) as span: + with _tracer.start_as_current_span("ast.process_batch", attributes={"batch_id": batch_id}) as span: # Per-file cooldown filter: defer events for recently-processed files if self._cooldown_s > 0: now = asyncio.get_event_loop().time() @@ -510,7 +510,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no else: processable.append(ev) if deferred_count: - logger.debug("Tier2 batch {}: {} event(s) deferred by cooldown", batch_id, deferred_count) + logger.debug("AST batch {}: {} event(s) deferred by cooldown", batch_id, deferred_count) events = processable if not events: return @@ -526,7 +526,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no groups = {key: list(dict.fromkeys(paths)) for key, paths in groups.items()} total_paths = sum(len(p) for p in groups.values()) - logger.debug("Tier2 batch {}: {} unique path(s) in {} group(s)", batch_id, total_paths, len(groups)) + logger.debug("AST batch {}: {} unique path(s) in {} group(s)", batch_id, total_paths, len(groups)) embed_candidates: dict[str, tuple[EntityRef, str]] = {} # uid → (ref, text_hash) skipped_before = self.stats.files_skipped @@ -555,7 +555,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no file_sources[fp] = full_path.read_bytes() live_paths.append(fp) except OSError: - logger.warning("Tier2: cannot read {}", fp) + logger.warning("AST: cannot read {}", fp) # Apply hash gate to live files if use_hash_gate and live_paths: @@ -575,7 +575,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no hash_skipped = len(live_paths) - len(gate_passed) if hash_skipped: logger.debug( - "Tier2 batch {}: hash gate skipped {}/{} file(s)", + "AST batch {}: hash gate skipped {}/{} file(s)", batch_id, hash_skipped, len(live_paths), @@ -589,7 +589,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no for file_idx, file_path in enumerate(live_paths, 1): if file_idx % 50 == 0: - logger.debug("Tier2 batch {}: parsed {}/{} files", batch_id, file_idx, len(live_paths)) + logger.debug("AST batch {}: parsed {}/{} files", batch_id, file_idx, len(live_paths)) pfd = await self._parse_file( project_name, file_path, @@ -603,7 +603,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no # 2. Handle deleted files for fp in deleted_files: - logger.debug("Tier2: file deleted, removing entities for {}", fp) + logger.debug("AST: file deleted, removing entities for {}", fp) deleted = await self.graph.delete_file_entities(project_name, fp) self.stats.files_deleted += 1 self.stats.entities_deleted += len(deleted) @@ -718,7 +718,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no span.set_attribute("entities_changed", total_changed) logger.debug( - "Tier2 batch {}: {} files, {} skipped, {} entities changed", + "AST batch {}: {} files, {} skipped, {} entities changed", batch_id, total_paths, self.stats.files_skipped - skipped_before, @@ -752,14 +752,14 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no # --------------------------------------------------------------------------- -# Tier 3: Embeddings (expensive, heavily batched) +# Embed stage: Embeddings (expensive, heavily batched) # --------------------------------------------------------------------------- -class Tier3EmbedConsumer(TierConsumer): - """Tier 3: Re-embed entities via TEI. Deduplicates by qualified name. +class EmbedConsumer(TierConsumer): + """Embed stage: Re-embed entities via TEI. Deduplicates by qualified name. - Implements a three-tier lookup to minimize expensive embedding API calls: + Implements a three-level lookup to minimize expensive embedding API calls: 1. **Graph hit** — node already has ``embed_hash`` matching current text (free). 2. **Valkey cache hit** — vector stored in Redis from a previous run (1 round-trip). 3. **API call** — embed via TEI / cloud provider (expensive). @@ -780,8 +780,8 @@ def __init__( super().__init__( bus=bus, input_topic=Topic.EMBED_DIRTY, - group="tier3-embed", - consumer_name="tier3-embed", + group="embed", + consumer_name="embed", policy=policy or BatchPolicy( time_window_s=10.0, @@ -875,7 +875,7 @@ async def _embed_and_store(self, need_embed: list[tuple[str, str, str]]) -> list return result async def process_batch(self, events: list[Event], batch_id: str) -> None: - with _tracer.start_as_current_span("tier3.process_batch", attributes={"batch_id": batch_id}) as span: + with _tracer.start_as_current_span("embed.process_batch", attributes={"batch_id": batch_id}) as span: # Collect and deduplicate entities across all events in the batch seen: dict[str, EntityRef] = {} for e in events: @@ -883,7 +883,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: seen[e.entity.qualified_name] = e.entity entities = list(seen.values()) - logger.debug("Tier3 batch {}: {} unique entity(ies)", batch_id, len(entities)) + logger.debug("Embed batch {}: {} unique entity(ies)", batch_id, len(entities)) if not entities: return @@ -917,7 +917,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: if not to_process: elapsed = asyncio.get_event_loop().time() - t0 logger.debug( - "Tier3 batch {}: {} entities, {} graph hits, 0 cache hits, 0 embedded ({:.1f}s)", + "Embed batch {}: {} entities, {} graph hits, 0 cache hits, 0 embedded ({:.1f}s)", batch_id, total, graph_hits, @@ -933,10 +933,10 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # Serialized via _write_lock to avoid Memgraph write-lock contention. all_resolved = cache_resolved + api_vectors if all_resolved: - with _tracer.start_as_current_span("tier3.write_lock_wait"): + with _tracer.start_as_current_span("embed.write_lock_wait"): await self._write_lock.acquire() try: - with _tracer.start_as_current_span("tier3.write_embeddings"): + with _tracer.start_as_current_span("embed.write_embeddings"): write_labels = [uid_to_label[uid] for uid, _, _ in all_resolved] if uid_to_label else None await self.graph.write_embeddings_and_hashes(all_resolved, labels=write_labels) finally: @@ -951,7 +951,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: get_metrics().embedding_latency.record(elapsed) logger.debug( - "Tier3 batch {}: {} entities, {} graph hits, {} cache hits, {} embedded ({:.1f}s)", + "Embed batch {}: {} entities, {} graph hits, {} cache hits, {} embedded ({:.1f}s)", batch_id, total, graph_hits, diff --git a/src/code_atlas/indexing/daemon.py b/src/code_atlas/indexing/daemon.py index 85b40c8..eebec20 100644 --- a/src/code_atlas/indexing/daemon.py +++ b/src/code_atlas/indexing/daemon.py @@ -1,7 +1,7 @@ """Daemon manager — reusable watcher + pipeline lifecycle. Encapsulates the EventBus, FileWatcher, EmbedClient, EmbedCache, -and Tier 2/3 consumers. Used by both the CLI (``atlas watch``, +and AST/Embed consumers. Used by both the CLI (``atlas watch``, ``atlas daemon start``) and the MCP server for auto-indexing. """ @@ -14,7 +14,7 @@ from loguru import logger from code_atlas.events import EventBus -from code_atlas.indexing.consumers import Tier2ASTConsumer, Tier3EmbedConsumer +from code_atlas.indexing.consumers import ASTConsumer, EmbedConsumer from code_atlas.indexing.orchestrator import FileScope, detect_sub_projects from code_atlas.indexing.watcher import FileWatcher from code_atlas.search.embeddings import EmbedCache, EmbedClient @@ -31,7 +31,7 @@ class DaemonManager: _bus: EventBus | None = field(default=None, repr=False) _watcher: FileWatcher | None = field(default=None, repr=False) - _consumers: list[Tier2ASTConsumer | Tier3EmbedConsumer] = field(default_factory=list, repr=False) + _consumers: list[ASTConsumer | EmbedConsumer] = field(default_factory=list, repr=False) _tasks: list[asyncio.Task[None]] = field(default_factory=list, repr=False) _cache: EmbedCache | None = field(default=None, repr=False) _embed: EmbedClient | None = field(default=None, repr=False) @@ -77,11 +77,11 @@ async def start( cache = EmbedCache(settings.redis, settings.embeddings) self._cache = cache - consumers: list[Tier2ASTConsumer | Tier3EmbedConsumer] = [ - Tier2ASTConsumer(bus, graph, settings, cooldown_s=settings.watcher.cooldown_s), + consumers: list[ASTConsumer | EmbedConsumer] = [ + ASTConsumer(bus, graph, settings, cooldown_s=settings.watcher.cooldown_s), ] if embed is not None: - consumers.append(Tier3EmbedConsumer(bus, graph, embed, cache=cache)) + consumers.append(EmbedConsumer(bus, graph, embed, cache=cache)) self._consumers = consumers if include_watcher: @@ -143,7 +143,7 @@ async def _run_watcher(self) -> None: logger.exception("File watcher crashed") @staticmethod - async def _run_consumer(consumer: Tier2ASTConsumer | Tier3EmbedConsumer) -> None: + async def _run_consumer(consumer: ASTConsumer | EmbedConsumer) -> None: """Run a consumer, catching exceptions so one failure doesn't crash the rest.""" try: await consumer.run() diff --git a/src/code_atlas/indexing/orchestrator.py b/src/code_atlas/indexing/orchestrator.py index 5577efe..8af38e1 100644 --- a/src/code_atlas/indexing/orchestrator.py +++ b/src/code_atlas/indexing/orchestrator.py @@ -18,7 +18,7 @@ from loguru import logger from code_atlas.events import Event, EventBus, FileChanged, Topic -from code_atlas.indexing.consumers import BatchPolicy, Tier2ASTConsumer, Tier3EmbedConsumer +from code_atlas.indexing.consumers import ASTConsumer, BatchPolicy, EmbedConsumer from code_atlas.parsing.ast import get_language_for_file from code_atlas.search.embeddings import EmbedCache, EmbedClient from code_atlas.settings import derive_project_name, resolve_git_dir @@ -849,41 +849,41 @@ async def _run_pipeline( project_filter: set[str] | None = None, on_drain_progress: Callable[[int, int, int], None] | None = None, reindex_mode: bool = False, -) -> Tier2ASTConsumer: - """Start inline tier consumers and wait for the pipeline to drain. +) -> ASTConsumer: + """Start inline consumers and wait for the pipeline to drain. - Returns the Tier2 consumer so callers can read accumulated stats. + Returns the AST consumer so callers can read accumulated stats. When *reindex_mode* is True, reindex-tuned policies are used for faster polling. """ - await bus.ensure_group(Topic.FILE_CHANGED, "tier2-ast") + await bus.ensure_group(Topic.FILE_CHANGED, "ast") # Reindex-tuned policies: flush immediately, short blocking reads - t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None - t3_policy = ( + ast_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None + embed_policy = ( BatchPolicy(time_window_s=1.0, max_batch_size=embed.batch_size, block_ms=50) if reindex_mode and embed is not None else None ) - tier2 = Tier2ASTConsumer( - bus, graph, settings, project_root=project_root, project_filter=project_filter, policy=t2_policy + ast_consumer = ASTConsumer( + bus, graph, settings, project_root=project_root, project_filter=project_filter, policy=ast_policy ) - task2 = asyncio.create_task(tier2.run()) + ast_task = asyncio.create_task(ast_consumer.run()) - tier3: Tier3EmbedConsumer | None = None - tier3_task: asyncio.Task[None] | None = None + embed_consumer: EmbedConsumer | None = None + embed_task: asyncio.Task[None] | None = None if embed is not None: - await bus.ensure_group(Topic.EMBED_DIRTY, "tier3-embed") - tier3 = Tier3EmbedConsumer( + await bus.ensure_group(Topic.EMBED_DIRTY, "embed") + embed_consumer = EmbedConsumer( bus, graph, embed, cache=cache, project_filter=project_filter, - policy=t3_policy, + policy=embed_policy, ) - tier3_task = asyncio.create_task(tier3.run()) + embed_task = asyncio.create_task(embed_consumer.run()) try: await _wait_for_drain( @@ -894,21 +894,21 @@ async def _run_pipeline( settle_s=2.0, ) finally: - tier2.stop() - if tier3 is not None: - tier3.stop() + ast_consumer.stop() + if embed_consumer is not None: + embed_consumer.stop() await asyncio.sleep(0.5) - task2.cancel() - if tier3_task is not None: - tier3_task.cancel() - for t in [task2, tier3_task]: + ast_task.cancel() + if embed_task is not None: + embed_task.cancel() + for t in [ast_task, embed_task]: if t is not None: with contextlib.suppress(asyncio.CancelledError): await t if cache is not None: await cache.close() - return tier2 + return ast_consumer @dataclass @@ -1144,7 +1144,7 @@ async def index_project( 3. Decide full vs. delta mode (git diff, threshold check) 4. Create Project + Package hierarchy in the graph 5. Publish FileChanged events to Valkey (all or delta-only) - 6. Run inline Tier 1 + Tier 2 consumers until the pipeline drains + 6. Run inline AST + Embed consumers until the pipeline drains 7. Update Project metadata (counts, git hash, delta stats) In monorepo mode, *project_name* and *project_root* override the @@ -1228,9 +1228,9 @@ async def _index_project_inner( # noqa: PLR0915 # 7. Start inline consumers and wait for drain reindex_mode = full_reindex or decision.mode == "full" - t2stats = None + ast_stats = None if published > 0: - tier2 = await _run_pipeline( + ast_consumer = await _run_pipeline( bus, graph, settings, @@ -1242,7 +1242,7 @@ async def _index_project_inner( # noqa: PLR0915 on_drain_progress=on_drain_progress, reindex_mode=reindex_mode, ) - t2stats = tier2.stats + ast_stats = ast_consumer.stats # 7. Set dependency versions on ExternalPackage nodes dep_versions = _parse_dependency_versions(project_root) @@ -1267,7 +1267,7 @@ async def _index_project_inner( # noqa: PLR0915 await graph.update_project_metadata(project_name, **metadata) duration = time.monotonic() - start - delta_stats = _build_delta_stats(decision, t2stats) if decision.mode == "delta" else None + delta_stats = _build_delta_stats(decision, ast_stats) if decision.mode == "delta" else None logger.debug( "Indexing complete ({}): {} files scanned, {} published, {} entities, {:.1f}s", @@ -1460,25 +1460,25 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 # --- Start shared consumers (once for entire monorepo) --- reindex_mode = full_reindex - await bus.ensure_group(Topic.FILE_CHANGED, "tier2-ast") + await bus.ensure_group(Topic.FILE_CHANGED, "ast") - t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None - t3_policy = ( + ast_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None + embed_policy = ( BatchPolicy(time_window_s=1.0, max_batch_size=embed.batch_size, block_ms=50) if reindex_mode and embed is not None else None ) - tier2 = Tier2ASTConsumer(bus, graph, settings, project_root=project_root, policy=t2_policy) + ast_consumer = ASTConsumer(bus, graph, settings, project_root=project_root, policy=ast_policy) consumer_tasks: list[asyncio.Task[None]] = [] - consumer_tasks.append(asyncio.create_task(tier2.run())) + consumer_tasks.append(asyncio.create_task(ast_consumer.run())) - tier3: Tier3EmbedConsumer | None = None + embed_consumer: EmbedConsumer | None = None if embed is not None: - await bus.ensure_group(Topic.EMBED_DIRTY, "tier3-embed") - tier3 = Tier3EmbedConsumer(bus, graph, embed, cache=cache, policy=t3_policy) - consumer_tasks.append(asyncio.create_task(tier3.run())) + await bus.ensure_group(Topic.EMBED_DIRTY, "embed") + embed_consumer = EmbedConsumer(bus, graph, embed, cache=cache, policy=embed_policy) + consumer_tasks.append(asyncio.create_task(embed_consumer.run())) start = time.monotonic() publish_results: list[_ProjectPublishResult] = [] @@ -1529,7 +1529,7 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 if on_progress is not None: on_progress(root_name, total, total) - # --- Wait for ALL tiers to drain (once) --- + # --- Wait for ALL stages to drain (once) --- await _wait_for_drain( bus, drain_timeout_s, @@ -1540,9 +1540,9 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 finally: # --- Tear down consumers (once) --- - tier2.stop() - if tier3 is not None: - tier3.stop() + ast_consumer.stop() + if embed_consumer is not None: + embed_consumer.stop() await asyncio.sleep(0.5) for task in consumer_tasks: task.cancel() @@ -1578,7 +1578,7 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 # Use shared start time — in monorepo mode all projects share one pipeline, # so per-project publish timestamps don't reflect actual processing duration. duration = time.monotonic() - start - delta_stats = _build_delta_stats(pr.decision, tier2.stats) if pr.mode == "delta" else None + delta_stats = _build_delta_stats(pr.decision, ast_consumer.stats) if pr.mode == "delta" else None results.append( IndexResult( files_scanned=pr.files_scanned, @@ -1606,16 +1606,16 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915 return results -def _build_delta_stats(decision: _DeltaDecision, t2stats: Any) -> DeltaStats: - """Build DeltaStats from the decision and Tier2 stats.""" +def _build_delta_stats(decision: _DeltaDecision, ast_stats: Any) -> DeltaStats: + """Build DeltaStats from the decision and AST consumer stats.""" return DeltaStats( files_added=len(decision.files_added), files_modified=len(decision.files_modified), files_deleted=len(decision.files_deleted), - entities_added=t2stats.entities_added if t2stats else 0, - entities_modified=t2stats.entities_modified if t2stats else 0, - entities_deleted=t2stats.entities_deleted if t2stats else 0, - entities_unchanged=t2stats.entities_unchanged if t2stats else 0, + entities_added=ast_stats.entities_added if ast_stats else 0, + entities_modified=ast_stats.entities_modified if ast_stats else 0, + entities_deleted=ast_stats.entities_deleted if ast_stats else 0, + entities_unchanged=ast_stats.entities_unchanged if ast_stats else 0, ) @@ -1627,7 +1627,7 @@ async def _wait_for_drain( on_drain_progress: Callable[[int, int, int], None] | None = None, settle_s: float = 2.0, ) -> None: - """Poll stream groups until Tier 2 and (optionally) Tier 3 are drained. + """Poll stream groups until AST and (optionally) Embed consumers are drained. If *on_drain_progress* is provided, it is called each poll cycle with ``(t1_remaining, t2_remaining, t3_remaining)`` so callers can display @@ -1639,9 +1639,9 @@ async def _wait_for_drain( poll_interval = 0.5 while time.monotonic() < deadline: - queries: list[tuple[Topic, str]] = [(Topic.FILE_CHANGED, "tier2-ast")] + queries: list[tuple[Topic, str]] = [(Topic.FILE_CHANGED, "ast")] if embed_enabled: - queries.append((Topic.EMBED_DIRTY, "tier3-embed")) + queries.append((Topic.EMBED_DIRTY, "embed")) infos = await bus.stream_group_info_multi(queries) diff --git a/src/code_atlas/search/embeddings.py b/src/code_atlas/search/embeddings.py index 3eb76d5..70ef286 100644 --- a/src/code_atlas/search/embeddings.py +++ b/src/code_atlas/search/embeddings.py @@ -461,7 +461,7 @@ def _build_doc_section_text(qualified_name: str, docstring: str) -> str: """Build embed text for DocSection nodes. The ``qualified_name`` encodes the header breadcrumb - (e.g. ``docs/architecture.md > Architecture > Event Pipeline > Tier 2``). + (e.g. ``docs/architecture.md > Architecture > Event Pipeline > AST Stage``). """ # Split on " > " to get file path and section headers breadcrumb_parts = qualified_name.split(" > ") diff --git a/src/code_atlas/settings.py b/src/code_atlas/settings.py index 4e9657f..75396c8 100644 --- a/src/code_atlas/settings.py +++ b/src/code_atlas/settings.py @@ -188,7 +188,7 @@ class EmbeddingSettings(BaseSettings): dimension: int | None = Field(default=None, description="Embedding vector dimension. Auto-detected when None.") batch_size: int | None = Field(default=None, description="Max texts per embedding API call. Auto from provider.") max_concurrency: int | None = Field( - default=None, description="Max concurrent embedding API calls / Tier 3 consumers. Auto from provider." + default=None, description="Max concurrent embedding API calls / embed consumers. Auto from provider." ) timeout_s: float = Field(default=30.0, description="Timeout in seconds for embedding API calls.") truncate_ratio: float = Field( diff --git a/tests/conftest.py b/tests/conftest.py index c3f5634..04e24d3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,5 +12,5 @@ """Shortened drain timeout for integration tests (default 600s is too long).""" NO_EMBED = EmbeddingSettings(enabled=False) -"""Embedding settings that disable Tier3 entirely — use for pipeline tests +"""Embedding settings that disable the embed stage entirely — use for pipeline tests that don't need real or mocked embeddings.""" diff --git a/tests/integration/indexing/test_consumers.py b/tests/integration/indexing/test_consumers.py index 3824559..53c7e61 100644 --- a/tests/integration/indexing/test_consumers.py +++ b/tests/integration/indexing/test_consumers.py @@ -17,7 +17,7 @@ Topic, decode_event, ) -from code_atlas.indexing.consumers import BatchPolicy, Tier2ASTConsumer +from code_atlas.indexing.consumers import ASTConsumer, BatchPolicy if TYPE_CHECKING: from code_atlas.graph.client import GraphClient @@ -115,23 +115,23 @@ async def test_dedup_within_batch(event_bus: EventBus) -> None: # --------------------------------------------------------------------------- -# Tier 2 consumer tests +# AST consumer tests # --------------------------------------------------------------------------- @pytest.mark.usefixtures("_clean_streams") -async def test_tier2_consumes_file_changed( +async def test_ast_consumes_file_changed( event_bus: EventBus, graph_client: GraphClient, settings: AtlasSettings, ) -> None: - """Tier 2 consumes FileChanged from the file-changed topic and writes entities to graph.""" + """AST consumer processes FileChanged from the file-changed topic and writes entities to graph.""" await graph_client.ensure_schema() - # Write a Python file for Tier 2 to parse + # Write a Python file for the AST consumer to parse _write_python_file(settings.project_root, "hello.py", "def greet(name: str) -> str:\n return f'Hello {name}'\n") - consumer = Tier2ASTConsumer( + consumer = ASTConsumer( event_bus, graph_client, settings, @@ -181,7 +181,7 @@ async def test_file_hash_gate_skips_unchanged( ) # First run: processes the file and stores its hash - c1 = Tier2ASTConsumer( + c1 = ASTConsumer( event_bus, graph_client, settings, @@ -195,7 +195,7 @@ async def test_file_hash_gate_skips_unchanged( assert c1.stats.files_processed >= 1 # Second run: same file, same content — should be skipped - c2 = Tier2ASTConsumer( + c2 = ASTConsumer( event_bus, graph_client, settings, @@ -232,7 +232,7 @@ async def test_file_hash_gate_processes_modified( ) # First run - c1 = Tier2ASTConsumer( + c1 = ASTConsumer( event_bus, graph_client, settings, @@ -249,7 +249,7 @@ async def test_file_hash_gate_processes_modified( _write_python_file(settings.project_root, "changing.py", "X = 2\nY = 3\n") # Second run: changed content — should process again - c2 = Tier2ASTConsumer( + c2 = ASTConsumer( event_bus, graph_client, settings, @@ -277,7 +277,7 @@ async def test_cooldown_defers_rapid_edits( project_name = settings.project_root.resolve().name - consumer = Tier2ASTConsumer( + consumer = ASTConsumer( event_bus, graph_client, settings, @@ -338,7 +338,7 @@ async def test_cooldown_disabled_processes_all( project_name = settings.project_root.resolve().name - consumer = Tier2ASTConsumer( + consumer = ASTConsumer( event_bus, graph_client, settings, diff --git a/tests/unit/search/test_embeddings.py b/tests/unit/search/test_embeddings.py index a588845..baa7255 100644 --- a/tests/unit/search/test_embeddings.py +++ b/tests/unit/search/test_embeddings.py @@ -10,7 +10,7 @@ import pytest from code_atlas.events import EmbedDirty, EntityRef -from code_atlas.indexing.consumers import Tier3EmbedConsumer +from code_atlas.indexing.consumers import EmbedConsumer from code_atlas.search.embeddings import EmbedCache, EmbedClient, EmbeddingError, build_embed_text from code_atlas.settings import EmbeddingSettings @@ -89,15 +89,15 @@ def test_module(self): def test_doc_section(self): props = { "_label": "DocSection", - "qualified_name": "docs/architecture.md > Architecture > Event Pipeline > Tier 2", + "qualified_name": "docs/architecture.md > Architecture > Event Pipeline > AST Stage", "kind": "", "signature": "", - "docstring": "The AST parsing tier processes file changes...", + "docstring": "The AST stage processes file changes...", } text = build_embed_text(props) assert "File: docs/architecture.md" in text - assert "Section: Architecture > Event Pipeline > Tier 2" in text - assert '"""The AST parsing tier processes file changes..."""' in text + assert "Section: Architecture > Event Pipeline > AST Stage" in text + assert '"""The AST stage processes file changes..."""' in text def test_empty_qualified_name_returns_empty(self): props = {"_label": "Callable", "qualified_name": "", "kind": "function"} @@ -353,12 +353,12 @@ async def test_health_check_failure(self): # --------------------------------------------------------------------------- -# Tier3 cache integration tests (mocked graph + embed + cache) +# Embed consumer cache integration tests (mocked graph + embed + cache) # --------------------------------------------------------------------------- -class TestTier3CacheLookup: - """Test the three-tier lookup logic in Tier3EmbedConsumer with mocks.""" +class TestEmbedCacheLookup: + """Test the three-level lookup logic in EmbedConsumer with mocks.""" @staticmethod def _make_entity_ref(qn: str) -> EntityRef: @@ -402,7 +402,7 @@ async def test_graph_hit_skips_all(self): ] ) - consumer = Tier3EmbedConsumer(bus, graph, embed, cache=cache) + consumer = EmbedConsumer(bus, graph, embed, cache=cache) entity = self._make_entity_ref("foo.bar") event = self._make_embed_dirty(entity) @@ -440,7 +440,7 @@ async def test_cache_hit_skips_embed(self): ) cache.get_many = AsyncMock(return_value={text_hash: cached_vec}) - consumer = Tier3EmbedConsumer(bus, graph, embed, cache=cache) + consumer = EmbedConsumer(bus, graph, embed, cache=cache) entity = self._make_entity_ref("foo.bar") event = self._make_embed_dirty(entity) @@ -483,7 +483,7 @@ async def test_cache_miss_embeds_and_stores(self): cache.get_many = AsyncMock(return_value={}) # no cache hit embed.embed_batch = AsyncMock(return_value=[api_vec]) - consumer = Tier3EmbedConsumer(bus, graph, embed, cache=cache) + consumer = EmbedConsumer(bus, graph, embed, cache=cache) entity = self._make_entity_ref("foo.bar") event = self._make_embed_dirty(entity) From 50101e0e6996c5ff5b0cd4dc723ab410eb8e4c8c Mon Sep 17 00:00:00 2001 From: SerPeter <43622448+serpeter@users.noreply.github.com> Date: Wed, 4 Mar 2026 00:18:42 +0100 Subject: [PATCH 6/6] =?UTF-8?q?fix(indexing):=20address=20PR=20feedback=20?= =?UTF-8?q?=E2=80=94=20rstrip,=20contract=20fix,=20public=20stats?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/code_atlas/graph/client.py | 5 ++++- src/code_atlas/indexing/consumers.py | 11 +++++++---- tests/integration/indexing/test_consumers.py | 6 +++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/code_atlas/graph/client.py b/src/code_atlas/graph/client.py index a2e84be..9eff43d 100644 --- a/src/code_atlas/graph/client.py +++ b/src/code_atlas/graph/client.py @@ -673,7 +673,10 @@ async def get_batch_file_hashes( "RETURN DISTINCT n.file_path AS fp, n.file_hash AS fh", {"p": project_name, "fps": file_paths}, ) - return {r["fp"]: r["fh"] for r in records} + result: dict[str, str | None] = dict.fromkeys(file_paths) + for r in records: + result[r["fp"]] = r["fh"] + return result async def set_batch_file_hashes( self, diff --git a/src/code_atlas/indexing/consumers.py b/src/code_atlas/indexing/consumers.py index 05ceda2..c676c10 100644 --- a/src/code_atlas/indexing/consumers.py +++ b/src/code_atlas/indexing/consumers.py @@ -51,12 +51,13 @@ def _compute_file_hash(source: bytes, *, strip_whitespace: bool = True) -> str: """Compute a short SHA-256 hash of file contents. - When *strip_whitespace* is True: strip leading/trailing whitespace per - line, collapse consecutive blank lines, then hash. This makes the gate - ignore formatting-only changes (e.g. ``ruff format``). + When *strip_whitespace* is True: strip trailing whitespace per line, + collapse consecutive blank lines, then hash. This makes the gate + ignore formatting-only changes (e.g. ``ruff format``) while preserving + leading indentation for indentation-sensitive languages. """ if strip_whitespace: - lines = [line.strip() for line in source.splitlines()] + lines = [line.rstrip() for line in source.splitlines()] normalized = b"\n".join(lines) normalized = _COLLAPSE_BLANK_RE.sub(b"\n\n", normalized) return hashlib.sha256(normalized).hexdigest()[:16] @@ -309,6 +310,7 @@ class ASTStats: files_processed: int = 0 files_skipped: int = 0 + files_deferred: int = 0 files_deleted: int = 0 entities_added: int = 0 entities_modified: int = 0 @@ -510,6 +512,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no else: processable.append(ev) if deferred_count: + self.stats.files_deferred += deferred_count logger.debug("AST batch {}: {} event(s) deferred by cooldown", batch_id, deferred_count) events = processable if not events: diff --git a/tests/integration/indexing/test_consumers.py b/tests/integration/indexing/test_consumers.py index 53c7e61..fb5bbdc 100644 --- a/tests/integration/indexing/test_consumers.py +++ b/tests/integration/indexing/test_consumers.py @@ -322,7 +322,7 @@ async def test_cooldown_defers_rapid_edits( # Second event deferred — files_processed should not have increased assert consumer.stats.files_processed == first_processed - assert "rapid.py" in consumer._deferred + assert consumer.stats.files_deferred >= 1 @pytest.mark.usefixtures("_clean_streams") @@ -366,5 +366,5 @@ async def test_cooldown_disabled_processes_all( consumer.stop() await asyncio.wait_for(task, timeout=5.0) - # Both events processed — no deferral - assert not consumer._deferred + # No deferral when cooldown is disabled + assert consumer.stats.files_deferred == 0