diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..066588d
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,84 @@
+# Default
+# ==================
+* text=auto eol=lf
+
+# Python Source files
+# =================
+*.pxd text diff=python
+*.py text diff=python
+*.py3 text diff=python
+*.pyw text diff=python
+*.pyx text diff=python
+*.pyz text diff=python
+*.pyi text diff=python
+
+# Python Binary files
+# =================
+*.db binary
+*.p binary
+*.pkl binary
+*.pickle binary
+*.pyc binary export-ignore
+*.pyo binary export-ignore
+*.pyd binary
+
+# Jupyter notebook
+# =================
+*.ipynb text
+
+# ML models
+# =================
+*.h5 filter=lfs diff=lfs merge=lfs -text
+*.onnx filter=lfs diff=lfs merge=lfs -text
+*.model filter=lfs diff=lfs merge=lfs -text
+*.msgpack filter=lfs diff=lfs merge=lfs -text
+*.pb filter=lfs diff=lfs merge=lfs -text
+*.pt filter=lfs diff=lfs merge=lfs -text
+*.pth filter=lfs diff=lfs merge=lfs -text
+pytorch_model.bin filter=lfs diff=lfs merge=lfs -text
+tokenizer.json filter=lfs diff=lfs merge=lfs -text
+unigram.json filter=lfs diff=lfs merge=lfs -text
+
+# Data files
+# =================
+*.csv filter=lfs diff=lfs merge=lfs -text
+*.tsv filter=lfs diff=lfs merge=lfs -text
+*.parquet filter=lfs diff=lfs merge=lfs
+
+# Presentation files
+# =================
+*.pptx filter=lfs diff=lfs merge=lfs -text
+*.word filter=lfs diff=lfs merge=lfs -text
+*.xlsx filter=lfs diff=lfs merge=lfs -text
+*.xls filter=lfs diff=lfs merge=lfs -text
+*.pdf filter=lfs diff=lfs merge=lfs -text
+
+# Archives
+# =================
+*.7z filter=lfs diff=lfs merge=lfs -text
+*.br filter=lfs diff=lfs merge=lfs -text
+*.gz filter=lfs diff=lfs merge=lfs -text
+*.tar filter=lfs diff=lfs merge=lfs -text
+*.tgz filter=lfs diff=lfs merge=lfs -text
+*.tar.gz filter=lfs diff=lfs merge=lfs -text
+*.zip filter=lfs diff=lfs merge=lfs -text
+*.rar filter=lfs diff=lfs merge=lfs -text
+
+# Image files
+# =================
+*.jpg filter=lfs diff=lfs merge=lfs -text
+*.jpeg filter=lfs diff=lfs merge=lfs -text
+*.png filter=lfs diff=lfs merge=lfs -text
+*.gif filter=lfs diff=lfs merge=lfs -text
+*.webp filter=lfs diff=lfs merge=lfs -text
+*.bmp filter=lfs diff=lfs merge=lfs -text
+*.svg filter=lfs diff=lfs merge=lfs -text
+*.tiff filter=lfs diff=lfs merge=lfs -text
+
+# Other
+# =================
+# Windows - keep CRLF
+*.exe filter=lfs diff=lfs merge=lfs -text
+*.bat text eol=crlf
+*.cmd text eol=crlf
+*.ps1 text eol=crlf
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 454257b..07d273f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,7 +21,7 @@
- **ci**: Resolve ty check failures with --all-extras in CI
([`3f74816`](https://github.com/SerPeter/code-atlas/commit/3f7481635091d2d676aed75c3fbcaa5db4332242))
-- **consumers**: Group batches by project in Tier1/Tier2
+- **consumers**: Group batches by project in AST/Embed consumers
([#2](https://github.com/SerPeter/code-atlas/pull/2),
[`5107b24`](https://github.com/SerPeter/code-atlas/commit/5107b24a7dfbcb44cadc7917f632ae6a9743c057))
@@ -190,7 +190,7 @@
- **docs**: Add markdown parser with tree-sitter-markdown
([`e8d372c`](https://github.com/SerPeter/code-atlas/commit/e8d372c162652d6d73d1f66da5e14a61fcb2136a))
-- **embeddings**: Add EmbedClient with litellm routing and Tier 3 pipeline
+- **embeddings**: Add EmbedClient with litellm routing and embed pipeline
([`ad7c972`](https://github.com/SerPeter/code-atlas/commit/ad7c9726f2e48fdb8746b50547089c5c483bcb75))
- **embeddings**: Add three-tier embedding cache with Valkey backend
@@ -241,7 +241,7 @@
- **naming**: Worktree-aware naming and monorepo sub-project prefixing
([`2acdfb3`](https://github.com/SerPeter/code-atlas/commit/2acdfb33ba4b486f966272a01cf8a37f670661f6))
-- **parser**: Add py-tree-sitter parser, implement Tier 2 pipeline, drop Rust
+- **parser**: Add py-tree-sitter parser, implement AST pipeline, drop Rust
([`d56e7d2`](https://github.com/SerPeter/code-atlas/commit/d56e7d2a686ec279a52d85bbc4903f4d85f51a4e))
- **parsing**: Add multi-language support (10 languages, 7 modules)
diff --git a/CLAUDE.md b/CLAUDE.md
index c4c639a..41ca211 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -51,7 +51,7 @@ src/code_atlas/
├── __init__.py # __version__ only
├── schema.py # Graph schema (labels, relationships, DDL generators)
├── settings.py # Pydantic configuration (atlas.toml + env vars)
-├── events.py # Event types (FileChanged, ASTDirty, EmbedDirty) + Valkey Streams EventBus
+├── events.py # Event types (FileChanged, EmbedDirty) + Valkey Streams EventBus
├── telemetry.py # OpenTelemetry integration
├── cli.py # Typer CLI entrypoint (index, search, status, mcp, daemon commands)
│
@@ -69,7 +69,7 @@ src/code_atlas/
│
├── indexing/
│ ├── orchestrator.py # Full-index, monorepo detection, staleness checking
-│ ├── consumers.py # Tier 1/2/3 event consumers (batch-pull pattern)
+│ ├── consumers.py # AST + Embed event consumers (batch-pull pattern)
│ ├── watcher.py # Filesystem watcher (watchfiles + hybrid debounce)
│ └── daemon.py # Daemon lifecycle manager (watcher + pipeline)
│
@@ -78,13 +78,13 @@ src/code_atlas/
└── health.py # Infrastructure health checks + diagnostics
```
-**Event Pipeline:** File Watcher → Valkey Streams → Tier 1 (graph metadata) → Tier 2 (AST diff + gate) → Tier 3 (embeddings) → Memgraph
+**Event Pipeline:** File Watcher → Valkey Streams → AST stage (hash gate + parse + diff) → Embed stage (embeddings) → Memgraph
**Query Pipeline:** MCP Server → Query Router → [Graph Search | Vector Search | BM25 Search] → RRF Fusion → Results
**Deployment:** Daemon (`atlas daemon start`) for indexing + MCP (`atlas mcp`) per agent session, decoupled via Valkey + Memgraph
-**Event model:** Events are atomic — one logical change per event (one file per ASTDirty, one entity per EmbedDirty). Never bundle lists of work items into a single event; use `EventBus.publish_many()` for network-efficient batch publishing. The consumer's `max_batch_size` must directly control work volume, not just message count.
+**Event model:** Events are atomic — one logical change per event (one file per FileChanged, one entity per EmbedDirty). Never bundle lists of work items into a single event; use `EventBus.publish_many()` for network-efficient batch publishing. The consumer's `max_batch_size` must directly control work volume, not just message count.
**Infrastructure:** Memgraph (graph DB, port 7687), TEI (embeddings, port 8080), Valkey (event bus, port 6379)
diff --git a/docs/adr/0004-event-driven-tiered-pipeline.md b/docs/adr/0004-event-driven-tiered-pipeline.md
index b817c98..3664c2f 100644
--- a/docs/adr/0004-event-driven-tiered-pipeline.md
+++ b/docs/adr/0004-event-driven-tiered-pipeline.md
@@ -45,53 +45,50 @@ Redis Streams provide the pub/sub backbone with consumer groups:
Typed frozen dataclasses with JSON serialization for Redis transport:
- `FileChanged(path, change_type, timestamp)` — published by file watcher
-- `ASTDirty(paths, batch_id)` — published by Tier 1
-- `EmbedDirty(entities: list[EntityRef], significance, batch_id)` — published by Tier 2
+- `EmbedDirty(entities: list[EntityRef], significance, batch_id)` — published by AST stage
-### Three-Stream Pipeline
+### Two-Stage Pipeline
```
- atlas:file-changed atlas:ast-dirty atlas:embed-dirty
- stream stream stream
- │ │ │
- ┌──────▼───────┐ ┌──────▼───────┐ ┌──────▼───────┐
- File Watcher ────► │ Tier 1 │ ──────► │ Tier 2 │ ─gate─►│ Tier 3 │
- │ Graph Metadata│ always │ AST Diff + │ only │ Embeddings │
- │ (0.5s batch) │ │ Graph Update │ if sig │ (15s batch) │
- └──────────────┘ │ (3s batch) │ change └──────────────┘
- └──────────────┘
+ atlas:file-changed atlas:embed-dirty
+ stream stream
+ │ │
+ ┌──────▼───────┐ ┌──────▼───────┐
+ File Watcher ────► │ AST Stage │ ─────── significance gate ───► │ Embed Stage │
+ │ hash gate + │ only if semantically changed │ Embeddings │
+ │ parse + diff │ │ (15s batch) │
+ │ (3s batch) │ └──────────────┘
+ └──────────────┘
```
-Each tier pulls at its own pace via `XREADGROUP`, deduplicates within its batch window, and publishes downstream only if
-warranted.
+Each stage pulls at its own pace via `XREADGROUP`, deduplicates within its batch window, and publishes downstream only
+if warranted.
### Per-Consumer Batch Policy
-| Tier | Window | Max Batch | Dedup Key |
-| -------------- | ------ | --------- | --------------------- |
-| Tier 1 (Graph) | 0.5s | 50 | File path |
-| Tier 2 (AST) | 3.0s | 20 | File path |
-| Tier 3 (Embed) | 15.0s | 100 | Entity qualified name |
+| Stage | Window | Max Batch | Dedup Key |
+| ----- | ------ | --------- | --------------------- |
+| AST | 3.0s | 30 | File path |
+| Embed | 15.0s | 100 | Entity qualified name |
Hybrid batching: flush when count OR time threshold hit, whichever first. Same file changed 5× in window = 1 work item.
### Event Data Flow
```
-FileChanged ASTDirty EmbedDirty
-┌─────────────┐ ┌──────────────────┐ ┌──────────────────────────┐
-│ path: str │ │ paths: [str] │ │ entities: [EntityRef] │
-│ change_type │ ─Tier 1──► │ batch_id: str │ ─Tier 2─►│ significance: str │
-│ timestamp │ └──────────────────┘ gate │ batch_id: str │
-└─────────────┘ └──────────────────────────┘
- EntityRef:
- qualified_name, node_type,
- file_path
+FileChanged EmbedDirty
+┌─────────────┐ ┌──────────────────────────┐
+│ path: str │ │ entity: EntityRef │
+│ change_type │ ─── AST stage ── sig gate ────────► │ significance: str │
+│ timestamp │ └──────────────────────────┘
+└─────────────┘ EntityRef:
+ qualified_name, node_type,
+ file_path
```
-### Significance Gating (Tier 2 → 3)
+### Significance Gating (AST → Embed)
-Tier 2 evaluates whether a change is semantically significant enough to warrant re-embedding:
+The AST stage evaluates whether a change is semantically significant enough to warrant re-embedding:
| Condition | Level | Action |
| --------------------------- | -------- | ------------------- |
@@ -115,25 +112,25 @@ own retries through this mechanism, avoiding the need for a separate dead-letter
- Cheap operations (staleness flags, graph metadata) are near-instant — MCP queries reflect changes within ~1s
- Expensive operations (embeddings) only run when semantically justified — significant cost reduction
- Decoupled stages can be developed, tested, and scaled independently
-- Batching per tier matches the cost profile of each operation
+- Batching per stage matches the cost profile of each operation
- Multi-process from day one — no rewrite needed when scaling
- Dual-use of Valkey for event bus + embedding cache
-- Natural extension point: new tiers or event types can be added without restructuring
+- Natural extension point: new stages or event types can be added without restructuring
### Negative
- More architectural complexity than a simple "reindex everything on change"
- Significance threshold heuristics need tuning and may produce false negatives (skipping re-embeds that should have
happened)
-- Debugging event flow across tiers is harder than a linear pipeline
+- Debugging event flow across stages is harder than a linear pipeline
- Additional infrastructure dependency (Valkey), though lightweight
### Risks
- Threshold tuning: too aggressive = stale embeddings, too conservative = excessive TEI calls. Need observability on
gate decisions.
-- Event ordering: if Tier 2 processes file A before file B, but B depends on A's entities, the diff may be incorrect.
- Batch boundaries must align with dependency boundaries.
+- Event ordering: if the AST stage processes file A before file B, but B depends on A's entities, the diff may be
+ incorrect. Batch boundaries must align with dependency boundaries.
- Complexity creep: the event bus must stay simple. If we find ourselves adding routing rules, dead-letter queues, or
retry logic, we've gone too far.
diff --git a/docs/adr/0005-deployment-process-model.md b/docs/adr/0005-deployment-process-model.md
index 3807d8c..ce4e57e 100644
--- a/docs/adr/0005-deployment-process-model.md
+++ b/docs/adr/0005-deployment-process-model.md
@@ -95,12 +95,12 @@ decoupled via Valkey Streams and Memgraph:
└───────┬────────┘
▼
┌────────────────┐
- │ Create Consumer│ Idempotent XGROUP CREATE for all 3 streams
+ │ Create Consumer│ Idempotent XGROUP CREATE for pipeline streams
│ Groups │
└───────┬────────┘
▼
┌────────────────┐
- │ Start Tier │ asyncio.gather(tier1.run(), tier2.run(), tier3.run())
+ │ Start Pipeline │ asyncio.gather(ast.run(), embed.run())
│ Consumers │
└───────┬────────┘
▼
@@ -111,7 +111,7 @@ decoupled via Valkey Streams and Memgraph:
▼
┌────────────────┐ Git-based fast path: diff stored_commit..HEAD
│ Reconcile │ Fallback: mtime comparison for non-git or rebases
- │ (progressive) │ Enqueue stale files → Tier 1 → 2 → 3
+ │ (progressive) │ Enqueue stale files → AST → Embed
└───────┬────────┘
▼
┌────────────────┐
@@ -230,17 +230,11 @@ Queries: Agent calls MCP tools ─────► Memgraph ◄──── Da
### Data Flow at Runtime
```
- ┌──────────┐ FileChanged ┌─────────┐ ASTDirty ┌─────────┐
- │ File │ ──► events ────────► │ Tier 1 │ ──► events ─────────► │ Tier 2 │
- │ Watcher │ (Valkey Stream) │ (graph) │ (Valkey Stream) │ (AST) │
- └──────────┘ └─────────┘ └────┬────┘
- gate │
- EmbedDirty│
- (if sig) │
- ┌────▼────┐
- │ Tier 3 │
- │ (embed) │
- └────┬────┘
+ ┌──────────┐ FileChanged ┌───────────┐ EmbedDirty ┌───────────┐
+ │ File │ ──► events ────────► │ AST Stage │ ──► events ──────► │ Embed │
+ │ Watcher │ (Valkey Stream) │ (parse) │ (Valkey Stream) │ Stage │
+ └──────────┘ └─────┬─────┘ └─────┬────┘
+ │ │
│
┌──────────┐ │
Agent ◄──── MCP Server ◄──── reads │ Memgraph │ ◄──── writes ────────┘
diff --git a/docs/adr/0006-pure-python-tree-sitter.md b/docs/adr/0006-pure-python-tree-sitter.md
index 0b4adc0..e217c52 100644
--- a/docs/adr/0006-pure-python-tree-sitter.md
+++ b/docs/adr/0006-pure-python-tree-sitter.md
@@ -17,7 +17,7 @@ actual cost breakdown:
- **Subprocess overhead** (spawn, JSON serialization, IPC) exceeded the parse time itself for typical files
- **Build complexity** required both `uv` and `cargo` toolchains in dev/CI/Docker
- **Contributor friction** — Rust was isolated to one component, but still required a full toolchain install
-- **Parallelism** is already handled by the event bus (multiple Tier 2 consumer instances via Valkey Streams), not by
+- **Parallelism** is already handled by the event bus (multiple AST consumer instances via Valkey Streams), not by
Rust's threading model
Meanwhile, `py-tree-sitter` uses the exact same C parsing library (tree-sitter) via Python bindings. The grammar
@@ -25,13 +25,13 @@ packages (`tree-sitter-python`, etc.) ship pre-compiled wheels — no compilatio
## Decision
-Drop the Rust binary (`crates/atlas-parser`) and use **py-tree-sitter** called in-process within the Tier 2 pipeline
+Drop the Rust binary (`crates/atlas-parser`) and use **py-tree-sitter** called in-process within the AST pipeline
consumer. The parser module lives at `src/code_atlas/parser.py`.
### Architecture
```
-Tier 2 Consumer
+AST Consumer
└── parser.parse_file(path, source, project_name)
└── tree-sitter C engine (via py-tree-sitter bindings)
└── tree-sitter-python grammar (pre-compiled wheel)
@@ -39,7 +39,7 @@ Tier 2 Consumer
### Parallelism Model
-Multiple Tier 2 consumer instances can run concurrently — each pulls from the `atlas:ast-dirty` Valkey Stream via its
+Multiple AST consumer instances can run concurrently — each pulls from the `atlas:file-changed` Valkey Stream via its
own consumer group member. This gives process-level parallelism without the GIL concern, since each consumer is an
independent process.
diff --git a/docs/architecture.md b/docs/architecture.md
index 0239dc3..5754cc4 100644
--- a/docs/architecture.md
+++ b/docs/architecture.md
@@ -25,9 +25,8 @@ graph TB
subgraph "Code Atlas — Daemon"
FW[File Watcher]
- T1[Tier 1: Graph Metadata]
- T2[Tier 2: AST Diff]
- T3[Tier 3: Embeddings]
+ AST[AST Stage]
+ EMB[Embed Stage]
end
subgraph "Code Atlas — MCP"
@@ -72,15 +71,12 @@ graph TB
BS --> MG
FW --> VK
- VK --> T1
- T1 --> VK
- VK --> T2
- T2 --> VK
- VK --> T3
- T1 --> MG
- T2 --> MG
- T3 --> MG
- T3 --> TEI
+ VK --> AST
+ AST --> VK
+ VK --> EMB
+ AST --> MG
+ EMB --> MG
+ EMB --> TEI
```
## Component Architecture
@@ -111,34 +107,28 @@ Spawned per agent session via `atlas mcp`, it reads Memgraph directly with no de
### Event-Driven Pipeline
-The indexing pipeline is event-driven, with three tiers of increasing cost connected via Valkey (Redis) Streams. Each
-tier pulls at its own pace, deduplicates within its batch window, and gates downstream work based on significance.
+The indexing pipeline is event-driven, with two stages of increasing cost connected via Valkey (Redis) Streams. Each
+stage pulls at its own pace, deduplicates within its batch window, and gates downstream work based on significance.
```mermaid
graph LR
FW[File Watcher] -->|FileChanged| S1[atlas:file-changed]
- S1 -->|XREADGROUP| T1[Tier 1: Graph Metadata
0.5s batch, dedup by path]
- T1 -->|ASTDirty| S2[atlas:ast-dirty]
- S2 -->|XREADGROUP| T2[Tier 2: AST Diff
3s batch, significance gate]
- T2 -->|EmbedDirty| S3[atlas:embed-dirty]
- S3 -->|XREADGROUP| T3[Tier 3: Embeddings
15s batch, dedup by entity]
- T1 -->|write| MG[(Memgraph)]
- T2 -->|write| MG
- T3 -->|write| MG
- T3 -->|embed| TEI[TEI]
+ S1 -->|XREADGROUP| AST[AST Stage
hash gate + parse + diff]
+ AST -->|EmbedDirty| S2[atlas:embed-dirty]
+ S2 -->|XREADGROUP| EMB[Embed Stage
dedup by entity]
+ AST -->|write| MG[(Memgraph)]
+ EMB -->|write| MG
+ EMB -->|embed| TEI[TEI]
```
-**Tier 1 — Graph Metadata** (cheap, ~0.5s batch): Updates file node timestamps and staleness flags. Always publishes
-`ASTDirty` downstream.
+**AST Stage** (medium cost, ~3s batch): Applies a file hash gate to skip unchanged files, re-parses AST via tree-sitter,
+diffs entities, updates graph nodes/edges. Evaluates a significance gate — trivial changes (whitespace, formatting) stop
+here; semantic changes (signature, body, docstring) publish `EmbedDirty` to the Embed stage.
-**Tier 2 — AST Diff** (medium, ~3s batch): Re-parses AST via tree-sitter, diffs entities, updates graph nodes/edges.
-Evaluates a significance gate — trivial changes (whitespace, formatting) stop here; semantic changes (signature, body,
-docstring) publish `EmbedDirty` to Tier 3.
+**Embed Stage** (expensive, ~15s batch): Re-embeds affected entities via TEI, writes vectors to Memgraph. Deduplicates
+by entity qualified name across all events in the batch.
-**Tier 3 — Embeddings** (expensive, ~15s batch): Re-embeds affected entities via TEI, writes vectors to Memgraph.
-Deduplicates by entity qualified name across all events in the batch.
-
-**Significance Gate (Tier 2 → 3):**
+**Significance Gate (AST → Embed):**
- Whitespace/formatting only → stop
- Non-docstring comment → stop
@@ -259,17 +249,17 @@ MCP server is spawned per agent session.
```bash
docker compose up -d # Memgraph (7687) + Valkey (6379)
docker compose --profile tei up -d # Include TEI (8080) for local embeddings
-atlas daemon start # File watcher + tier consumers (long-running)
+atlas daemon start # File watcher + AST/Embed consumers (long-running)
atlas mcp # MCP server — stdio (Claude Code, Cursor)
atlas mcp --transport http # MCP server — Streamable HTTP (VS Code, JetBrains)
```
-The daemon publishes file change events to Valkey Streams, where tier consumers process them and write to Memgraph. The
-MCP server reads Memgraph directly — no dependency on the daemon for queries, so agents can query immediately even with
-a stale index.
+The daemon publishes file change events to Valkey Streams, where the AST and Embed consumers process them and write to
+Memgraph. The MCP server reads Memgraph directly — no dependency on the daemon for queries, so agents can query
+immediately even with a stale index.
On startup, the daemon runs a reconciliation pass: compares filesystem state against the index and enqueues stale files
-through the pipeline progressively (Tier 1 first, then 2, then 3).
+through the pipeline progressively (AST stage first, then Embed stage).
See [ADR-0005](adr/0005-deployment-process-model.md) for full rationale.
diff --git a/docs/benchmarks.md b/docs/benchmarks.md
index dcbcd14..e90716f 100644
--- a/docs/benchmarks.md
+++ b/docs/benchmarks.md
@@ -5,14 +5,14 @@
Measured on the code-atlas repo (107 Python files, 2,706 entities) with local TEI embeddings. See
`scripts/profile_index.py --full`.
-| Stage | Wall Time | Notes |
-| ------------------- | --------- | ---------------------------------------------- |
-| Scan + packages | 0.1s | File discovery + package hierarchy |
-| Tier 2 (AST+graph) | ~26s | Parse, upsert, resolve imports/calls/types |
-| Tier 3 (embeddings) | ~52s | Embed API + graph writes (8 concurrent) |
-| **Total** | **55s** | Embedding-bound; cached reindex is much faster |
-
-Tier 2 and Tier 3 overlap — embedding starts as soon as the first batch of entities is written. The bottleneck is the
+| Stage | Wall Time | Notes |
+| ------------------ | --------- | ---------------------------------------------- |
+| Scan + packages | 0.1s | File discovery + package hierarchy |
+| AST (parse+graph) | ~26s | Parse, upsert, resolve imports/calls/types |
+| Embed (embeddings) | ~52s | Embed API + graph writes (8 concurrent) |
+| **Total** | **55s** | Embedding-bound; cached reindex is much faster |
+
+AST and Embed stages overlap — embedding starts as soon as the first batch of entities is written. The bottleneck is the
embedding API (75.8s cumulative across 8 workers, ~3.2s avg per batch of 128 entities).
## Parse-Only Throughput
diff --git a/docs/guides/repo-guidelines.md b/docs/guides/repo-guidelines.md
index 8bcf9d8..1681f0d 100644
--- a/docs/guides/repo-guidelines.md
+++ b/docs/guides/repo-guidelines.md
@@ -103,7 +103,7 @@ invisible to AST parsing.
```python
# Do
-class Tier2Consumer(TierConsumer): ...
+class ASTConsumer(TierConsumer): ...
# Don't
Consumer = make_consumer(TierConsumer, features=["ast"])
diff --git a/scripts/profile_index.py b/scripts/profile_index.py
index 1d42eb9..138445d 100644
--- a/scripts/profile_index.py
+++ b/scripts/profile_index.py
@@ -209,25 +209,25 @@ def patch_all():
orch._wait_for_drain = timed_async("orch.wait_for_drain")(orch._wait_for_drain)
# --- Parser ---
- parser_mod.parse_file = timed_sync("tier2.parse_file")(parser_mod.parse_file)
+ parser_mod.parse_file = timed_sync("ast.parse_file")(parser_mod.parse_file)
# --- Detectors ---
- det_mod.run_detectors = timed_async("tier2.run_detectors")(det_mod.run_detectors)
+ det_mod.run_detectors = timed_async("ast.run_detectors")(det_mod.run_detectors)
# --- Graph client methods ---
- gc.GraphClient.upsert_file_entities = timed_async("tier2.upsert_file_entities")(gc.GraphClient.upsert_file_entities)
- gc.GraphClient.upsert_batch_entities = timed_async("tier2.upsert_batch_entities")(
+ gc.GraphClient.upsert_file_entities = timed_async("ast.upsert_file_entities")(gc.GraphClient.upsert_file_entities)
+ gc.GraphClient.upsert_batch_entities = timed_async("ast.upsert_batch_entities")(
gc.GraphClient.upsert_batch_entities
)
- gc.GraphClient.resolve_imports = timed_async("tier2.resolve_imports")(gc.GraphClient.resolve_imports)
- gc.GraphClient.build_resolution_lookup = timed_async("tier2.build_resolution_lookup")(
+ gc.GraphClient.resolve_imports = timed_async("ast.resolve_imports")(gc.GraphClient.resolve_imports)
+ gc.GraphClient.build_resolution_lookup = timed_async("ast.build_resolution_lookup")(
gc.GraphClient.build_resolution_lookup
)
- gc.GraphClient.resolve_calls = timed_async("tier2.resolve_calls")(gc.GraphClient.resolve_calls)
- gc.GraphClient.resolve_type_refs = timed_async("tier2.resolve_type_refs")(gc.GraphClient.resolve_type_refs)
+ gc.GraphClient.resolve_calls = timed_async("ast.resolve_calls")(gc.GraphClient.resolve_calls)
+ gc.GraphClient.resolve_type_refs = timed_async("ast.resolve_type_refs")(gc.GraphClient.resolve_type_refs)
gc.GraphClient.merge_package_batch = timed_async("orch.merge_package_batch")(gc.GraphClient.merge_package_batch)
- gc.GraphClient.delete_file_entities = timed_async("tier2.delete_file_entities")(gc.GraphClient.delete_file_entities)
- gc.GraphClient.apply_property_enrichments = timed_async("tier2.apply_enrichments")(
+ gc.GraphClient.delete_file_entities = timed_async("ast.delete_file_entities")(gc.GraphClient.delete_file_entities)
+ gc.GraphClient.apply_property_enrichments = timed_async("ast.apply_enrichments")(
gc.GraphClient.apply_property_enrichments
)
@@ -249,16 +249,16 @@ def patch_all():
gc.GraphClient._recreate_batch_relationships
)
- # --- Tier 3 ---
- gc.GraphClient.read_entity_texts = timed_async("tier3.read_entity_texts")(gc.GraphClient.read_entity_texts)
- gc.GraphClient.write_embeddings_and_hashes = timed_async("tier3.write_embeddings_and_hashes")(
+ # --- Embed stage ---
+ gc.GraphClient.read_entity_texts = timed_async("embed.read_entity_texts")(gc.GraphClient.read_entity_texts)
+ gc.GraphClient.write_embeddings_and_hashes = timed_async("embed.write_embeddings_and_hashes")(
gc.GraphClient.write_embeddings_and_hashes
)
- emb_mod.EmbedClient.embed_batch = timed_async("tier3.embed_api")(emb_mod.EmbedClient.embed_batch)
+ emb_mod.EmbedClient.embed_batch = timed_async("embed.embed_api")(emb_mod.EmbedClient.embed_batch)
- # Tier 3 cache
- emb_mod.EmbedCache.get_many = timed_async("tier3.cache_get_many")(emb_mod.EmbedCache.get_many)
- emb_mod.EmbedCache.put_many = timed_async("tier3.cache_put_many")(emb_mod.EmbedCache.put_many)
+ # Embed cache
+ emb_mod.EmbedCache.get_many = timed_async("embed.cache_get_many")(emb_mod.EmbedCache.get_many)
+ emb_mod.EmbedCache.put_many = timed_async("embed.cache_put_many")(emb_mod.EmbedCache.put_many)
# --- EventBus ---
EventBus.publish_many = timed_async("bus.publish_many")(EventBus.publish_many)
@@ -348,17 +348,17 @@ def print_report(wall_time: float):
"orch.wait_for_drain",
"orch.merge_package_batch",
],
- "Tier 2 (AST + Graph)": [
- "tier2.parse_file",
- "tier2.run_detectors",
- "tier2.upsert_file_entities",
- "tier2.upsert_batch_entities",
- "tier2.delete_file_entities",
- "tier2.apply_enrichments",
- "tier2.resolve_imports",
- "tier2.build_resolution_lookup",
- "tier2.resolve_calls",
- "tier2.resolve_type_refs",
+ "AST Stage (Parse + Graph)": [
+ "ast.parse_file",
+ "ast.run_detectors",
+ "ast.upsert_file_entities",
+ "ast.upsert_batch_entities",
+ "ast.delete_file_entities",
+ "ast.apply_enrichments",
+ "ast.resolve_imports",
+ "ast.build_resolution_lookup",
+ "ast.resolve_calls",
+ "ast.resolve_type_refs",
# Upsert inner breakdown
"upsert.get_hashes",
"upsert.get_batch_hashes",
@@ -369,12 +369,12 @@ def print_report(wall_time: float):
"upsert.recreate_rels",
"upsert.recreate_batch_rels",
],
- "Tier 3 (Embeddings)": [
- "tier3.read_entity_texts",
- "tier3.cache_get_many",
- "tier3.embed_api",
- "tier3.cache_put_many",
- "tier3.write_embeddings_and_hashes",
+ "Embed Stage (Embeddings)": [
+ "embed.read_entity_texts",
+ "embed.cache_get_many",
+ "embed.embed_api",
+ "embed.cache_put_many",
+ "embed.write_embeddings_and_hashes",
],
"Event Bus (Valkey)": [
"bus.publish_many",
diff --git a/src/code_atlas/events.py b/src/code_atlas/events.py
index d856afe..aeab3bd 100644
--- a/src/code_atlas/events.py
+++ b/src/code_atlas/events.py
@@ -44,25 +44,16 @@ class EntityRef:
file_path: str
-@dataclass(frozen=True)
-class ASTDirty:
- """A single file needs AST re-parsing (published by Tier 1, consumed by Tier 2)."""
-
- path: str
- project_name: str = "" # monorepo sub-project (forwarded from FileChanged)
- project_root: str = "" # absolute path to project root (forwarded from FileChanged)
-
-
@dataclass(frozen=True)
class EmbedDirty:
- """A single entity needs re-embedding (published by Tier 2, consumed by Tier 3)."""
+ """A single entity needs re-embedding (published by AST stage, consumed by Embed stage)."""
entity: EntityRef
significance: str # "MODERATE" | "HIGH"
# Type alias for any pipeline event
-Event = FileChanged | ASTDirty | EmbedDirty
+Event = FileChanged | EmbedDirty
class Significance(StrEnum):
@@ -83,14 +74,12 @@ class Topic(StrEnum):
"""Redis Stream keys for the pipeline."""
FILE_CHANGED = "file-changed"
- AST_DIRTY = "ast-dirty"
EMBED_DIRTY = "embed-dirty"
# Map topic → event class for deserialization
_TOPIC_EVENT_MAP: dict[Topic, type[Event]] = {
Topic.FILE_CHANGED: FileChanged,
- Topic.AST_DIRTY: ASTDirty,
Topic.EMBED_DIRTY: EmbedDirty,
}
diff --git a/src/code_atlas/graph/client.py b/src/code_atlas/graph/client.py
index 13aa59b..9eff43d 100644
--- a/src/code_atlas/graph/client.py
+++ b/src/code_atlas/graph/client.py
@@ -655,6 +655,46 @@ async def merge_project_node(self, project_name: str, **metadata: Any) -> None:
props,
)
+ async def get_batch_file_hashes(
+ self,
+ project_name: str,
+ file_paths: list[str],
+ ) -> dict[str, str | None]:
+ """Return ``{file_path: file_hash}`` for Module/Package nodes in one RTT.
+
+ Returns ``None`` for files that have no stored hash.
+ """
+ if not file_paths:
+ return {}
+ records = await self.execute(
+ f"UNWIND $fps AS fp "
+ f"MATCH (n {{project_name: $p, file_path: fp}}) "
+ f"WHERE n:{NodeLabel.MODULE} OR n:{NodeLabel.PACKAGE} "
+ "RETURN DISTINCT n.file_path AS fp, n.file_hash AS fh",
+ {"p": project_name, "fps": file_paths},
+ )
+ result: dict[str, str | None] = dict.fromkeys(file_paths)
+ for r in records:
+ result[r["fp"]] = r["fh"]
+ return result
+
+ async def set_batch_file_hashes(
+ self,
+ project_name: str,
+ file_hashes: dict[str, str],
+ ) -> None:
+ """Write ``file_hash`` on Module/Package nodes for each file path."""
+ if not file_hashes:
+ return
+ params = [{"fp": fp, "fh": fh} for fp, fh in file_hashes.items()]
+ await self.execute_write(
+ f"UNWIND $items AS item "
+ f"MATCH (n {{project_name: $p, file_path: item.fp}}) "
+ f"WHERE n:{NodeLabel.MODULE} OR n:{NodeLabel.PACKAGE} "
+ "SET n.file_hash = item.fh",
+ {"p": project_name, "items": params},
+ )
+
async def merge_package_node(self, project_name: str, qualified_name: str, name: str, file_path: str) -> None:
"""Create or update a Package node by uid."""
uid = f"{project_name}:{qualified_name}"
diff --git a/src/code_atlas/indexing/__init__.py b/src/code_atlas/indexing/__init__.py
index 7da88c7..cbefce9 100644
--- a/src/code_atlas/indexing/__init__.py
+++ b/src/code_atlas/indexing/__init__.py
@@ -3,10 +3,9 @@
from __future__ import annotations
from code_atlas.indexing.consumers import (
+ ASTConsumer,
BatchPolicy,
- Tier1GraphConsumer,
- Tier2ASTConsumer,
- Tier3EmbedConsumer,
+ EmbedConsumer,
TierConsumer,
)
from code_atlas.indexing.daemon import DaemonManager
@@ -25,18 +24,17 @@
from code_atlas.indexing.watcher import FileWatcher
__all__ = [
+ "ASTConsumer",
"BatchPolicy",
"DaemonManager",
"DeltaStats",
"DetectedProject",
+ "EmbedConsumer",
"FileScope",
"FileWatcher",
"IndexResult",
"StalenessChecker",
"StalenessInfo",
- "Tier1GraphConsumer",
- "Tier2ASTConsumer",
- "Tier3EmbedConsumer",
"TierConsumer",
"classify_file_project",
"detect_sub_projects",
diff --git a/src/code_atlas/indexing/consumers.py b/src/code_atlas/indexing/consumers.py
index 8be2407..c676c10 100644
--- a/src/code_atlas/indexing/consumers.py
+++ b/src/code_atlas/indexing/consumers.py
@@ -1,17 +1,18 @@
-"""Tiered consumer pipeline for event-driven indexing.
+"""Two-stage consumer pipeline for event-driven indexing.
-Three tiers form a linear pipeline, each pulling at its own pace:
+ FileChanged → AST stage (hash gate + AST parse + diff)
+ → significance gate → EmbedDirty → Embed stage (embeddings)
- FileChanged → Tier 1 (graph metadata) → ASTDirty → Tier 2 (AST parse)
- → significance gate → EmbedDirty → Tier 3 (embeddings)
-
-Each tier uses batch-pull with configurable time/count policy and
+Each stage uses batch-pull with configurable time/count policy and
deduplicates within its batch window.
"""
from __future__ import annotations
import asyncio
+import contextlib
+import hashlib
+import re
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -21,7 +22,6 @@
from loguru import logger
from code_atlas.events import (
- ASTDirty,
EmbedDirty,
EntityRef,
Event,
@@ -45,6 +45,25 @@
_tracer = get_tracer(__name__)
+_COLLAPSE_BLANK_RE = re.compile(rb"\n{3,}")
+
+
+def _compute_file_hash(source: bytes, *, strip_whitespace: bool = True) -> str:
+ """Compute a short SHA-256 hash of file contents.
+
+ When *strip_whitespace* is True: strip trailing whitespace per line,
+ collapse consecutive blank lines, then hash. This makes the gate
+ ignore formatting-only changes (e.g. ``ruff format``) while preserving
+ leading indentation for indentation-sensitive languages.
+ """
+ if strip_whitespace:
+ lines = [line.rstrip() for line in source.splitlines()]
+ normalized = b"\n".join(lines)
+ normalized = _COLLAPSE_BLANK_RE.sub(b"\n\n", normalized)
+ return hashlib.sha256(normalized).hexdigest()[:16]
+ return hashlib.sha256(source).hexdigest()[:16]
+
+
# ---------------------------------------------------------------------------
# Batch policy
# ---------------------------------------------------------------------------
@@ -105,7 +124,7 @@ def _matches_project(self, event: Event) -> bool:
if self._project_filter is None:
return True
pn = ""
- if isinstance(event, (FileChanged, ASTDirty)):
+ if isinstance(event, FileChanged):
pn = event.project_name
elif isinstance(event, EmbedDirty):
# EmbedDirty doesn't carry project_name directly — always accept
@@ -261,59 +280,10 @@ async def run(self) -> None: # noqa: PLR0912, PLR0915
# ---------------------------------------------------------------------------
-# Tier 1: Graph metadata (cheap, fast)
+# AST stage: parse + graph write (medium cost)
# ---------------------------------------------------------------------------
-
-class Tier1GraphConsumer(TierConsumer):
- """Tier 1: Update file-level graph metadata, always publish ASTDirty downstream."""
-
- def __init__(
- self, bus: EventBus, graph: GraphClient, settings: AtlasSettings, *, project_filter: set[str] | None = None
- ) -> None:
- super().__init__(
- bus=bus,
- input_topic=Topic.FILE_CHANGED,
- group="tier1-graph",
- consumer_name="tier1-graph-0",
- policy=BatchPolicy(time_window_s=0.5, max_batch_size=50),
- project_filter=project_filter,
- )
- self.graph = graph
- self.settings = settings
-
- def dedup_key(self, event: Event) -> str:
- if isinstance(event, FileChanged):
- return event.path
- return super().dedup_key(event)
-
- async def process_batch(self, events: list[Event], batch_id: str) -> None:
- with _tracer.start_as_current_span("tier1.process_batch", attributes={"batch_id": batch_id}):
- # Group files by (project_name, project_root) — monorepo batches can mix sub-projects
- groups: dict[tuple[str, str], list[str]] = {}
- for e in events:
- if isinstance(e, FileChanged):
- key = (e.project_name, e.project_root)
- groups.setdefault(key, []).append(e.path)
-
- total = sum(len(p) for p in groups.values())
- logger.debug("Tier1 batch {}: {} file(s) in {} group(s)", batch_id, total, len(groups))
-
- # TODO: Update Memgraph file nodes (timestamps, staleness flags)
-
- # Publish one ASTDirty per file — Tier 2 decides significance
- for (project_name, project_root), paths in groups.items():
- await self.bus.publish_many(
- Topic.AST_DIRTY,
- [ASTDirty(path=p, project_name=project_name, project_root=project_root) for p in paths],
- )
-
-
-# ---------------------------------------------------------------------------
-# Tier 2: AST parse + graph write (medium cost)
-# ---------------------------------------------------------------------------
-
-# Significance levels for the Tier 2 → Tier 3 gate
+# Significance levels for the AST → Embed gate
#
# | Condition | Level | Action |
# |----------------------------------|----------|---------------|
@@ -335,11 +305,12 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None:
@dataclass
-class Tier2Stats:
- """Accumulated delta statistics for Tier 2 processing."""
+class ASTStats:
+ """Accumulated delta statistics for AST stage processing."""
files_processed: int = 0
files_skipped: int = 0
+ files_deferred: int = 0
files_deleted: int = 0
entities_added: int = 0
entities_modified: int = 0
@@ -365,8 +336,8 @@ class _ParsedFileData:
)
-class Tier2ASTConsumer(TierConsumer):
- """Tier 2: Parse AST via tree-sitter, write entities to graph, publish EmbedDirty."""
+class ASTConsumer(TierConsumer):
+ """AST stage: Parse AST via tree-sitter, write entities to graph, publish EmbedDirty."""
def __init__(
self,
@@ -377,21 +348,28 @@ def __init__(
project_root: Path | None = None,
project_filter: set[str] | None = None,
policy: BatchPolicy | None = None,
+ cooldown_s: float = 0.0,
) -> None:
super().__init__(
bus=bus,
- input_topic=Topic.AST_DIRTY,
- group="tier2-ast",
- consumer_name="tier2-ast-0",
+ input_topic=Topic.FILE_CHANGED,
+ group="ast",
+ consumer_name="ast-0",
policy=policy or BatchPolicy(time_window_s=3.0, max_batch_size=30),
project_filter=project_filter,
)
self.graph = graph
self.settings = settings
self._project_root = project_root or Path(settings.project_root)
- self.stats = Tier2Stats()
+ self.stats = ASTStats()
self._detectors = get_enabled_detectors(settings.detectors.enabled)
+ # Per-file cooldown state (daemon mode)
+ self._cooldown_s = cooldown_s
+ self._cooldowns: dict[str, float] = {} # file_path → expiry (monotonic)
+ self._deferred: dict[str, FileChanged] = {} # file_path → latest deferred event
+ self._deferred_drain_task: asyncio.Task[None] | None = None
+
# Deferred resolution state — accumulate rels across batches, flush periodically.
# In reindex mode (time_window_s=0, block_ms=50) use larger intervals to skip
# redundant resolution; daemon mode (default policy) resolves every batch.
@@ -406,10 +384,36 @@ def __init__(
self._pending_project_names: set[str] = set()
def dedup_key(self, event: Event) -> str:
- if isinstance(event, ASTDirty):
+ if isinstance(event, FileChanged):
return event.path
return super().dedup_key(event)
+ async def _pre_run(self) -> None:
+ if self._cooldown_s > 0:
+ self._deferred_drain_task = asyncio.create_task(self._drain_deferred_loop())
+
+ async def _post_run(self) -> None:
+ if self._deferred_drain_task is not None:
+ self._deferred_drain_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await self._deferred_drain_task
+ self._deferred_drain_task = None
+
+ async def _drain_deferred_loop(self) -> None:
+ """Re-publish deferred events whose cooldown has expired.
+
+ Runs as a background task so deferred events are eventually
+ processed even if no new events arrive to trigger ``process_batch``.
+ """
+ while not self._stop:
+ await asyncio.sleep(2.0)
+ now = asyncio.get_event_loop().time()
+ expired = [(fp, ev) for fp, ev in self._deferred.items() if now >= self._cooldowns.get(fp, 0)]
+ for fp, _ in expired:
+ del self._deferred[fp]
+ if expired:
+ await self.bus.publish_many(Topic.FILE_CHANGED, [ev for _, ev in expired])
+
async def run(self) -> None:
try:
await super().run()
@@ -452,26 +456,30 @@ async def _parse_file(
file_path: str,
*,
project_root: Path | None = None,
+ source: bytes | None = None,
) -> _ParsedFileData | None:
"""Parse and detect a single file without graph writes.
Returns ``None`` for unreadable/unsupported files, ``_SENTINEL_DELETED``
for deleted files, or a ``_ParsedFileData`` with parsed results.
+
+ If *source* is provided, it is used directly (avoids re-reading from disk
+ when the hash gate has already read the file).
"""
root = project_root if project_root is not None else self._project_root
- full_path = root / file_path
- if not full_path.is_file():
- return _SENTINEL_DELETED
-
- try:
- source = full_path.read_bytes()
- except OSError:
- logger.warning("Tier2: cannot read {}", file_path)
- return None
+ if source is None:
+ full_path = root / file_path
+ if not full_path.is_file():
+ return _SENTINEL_DELETED
+ try:
+ source = full_path.read_bytes()
+ except OSError:
+ logger.warning("AST: cannot read {}", file_path)
+ return None
parsed = parse_file(file_path, source, project_name, max_source_chars=self.settings.index.max_source_chars)
if parsed is None:
- logger.debug("Tier2: unsupported language for {}", file_path)
+ logger.debug("AST: unsupported language for {}", file_path)
return None
det_result = await run_detectors(self._detectors, parsed, project_name, self.graph)
@@ -489,11 +497,31 @@ async def _parse_file(
)
async def process_batch(self, events: list[Event], batch_id: str) -> None: # noqa: PLR0912, PLR0915
- with _tracer.start_as_current_span("tier2.process_batch", attributes={"batch_id": batch_id}) as span:
+ with _tracer.start_as_current_span("ast.process_batch", attributes={"batch_id": batch_id}) as span:
+ # Per-file cooldown filter: defer events for recently-processed files
+ if self._cooldown_s > 0:
+ now = asyncio.get_event_loop().time()
+ # Clean expired cooldowns
+ self._cooldowns = {fp: exp for fp, exp in self._cooldowns.items() if exp > now}
+ processable: list[Event] = []
+ deferred_count = 0
+ for ev in events:
+ if isinstance(ev, FileChanged) and ev.path in self._cooldowns:
+ self._deferred[ev.path] = ev # latest overwrites
+ deferred_count += 1
+ else:
+ processable.append(ev)
+ if deferred_count:
+ self.stats.files_deferred += deferred_count
+ logger.debug("AST batch {}: {} event(s) deferred by cooldown", batch_id, deferred_count)
+ events = processable
+ if not events:
+ return
+
# Group paths by (project_name, project_root) — monorepo batches can mix sub-projects
groups: dict[tuple[str, str], list[str]] = {}
for e in events:
- if isinstance(e, ASTDirty):
+ if isinstance(e, FileChanged):
key = (e.project_name, e.project_root)
groups.setdefault(key, []).append(e.path)
@@ -501,7 +529,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
groups = {key: list(dict.fromkeys(paths)) for key, paths in groups.items()}
total_paths = sum(len(p) for p in groups.values())
- logger.debug("Tier2 batch {}: {} unique path(s) in {} group(s)", batch_id, total_paths, len(groups))
+ logger.debug("AST batch {}: {} unique path(s) in {} group(s)", batch_id, total_paths, len(groups))
embed_candidates: dict[str, tuple[EntityRef, str]] = {} # uid → (ref, text_hash)
skipped_before = self.stats.files_skipped
@@ -511,15 +539,66 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
for (event_project_name, event_project_root), unique_paths in groups.items():
project_name = event_project_name or derive_project_name(Path(self.settings.project_root))
effective_root = Path(event_project_root) if event_project_root else None
+ root = effective_root if effective_root is not None else self._project_root
+
+ # 0. File hash gate — read files, compute hashes, skip unchanged
+ use_hash_gate = self.settings.index.file_hash_gate
+ strip_ws = self.settings.index.strip_whitespace
+ file_sources: dict[str, bytes] = {} # file_path → source bytes (pre-read)
+ deleted_files: list[str] = []
+
+ # Separate deleted files (always process) and read live files
+ live_paths: list[str] = []
+ for fp in unique_paths:
+ full_path = root / fp
+ if not full_path.is_file():
+ deleted_files.append(fp)
+ else:
+ try:
+ file_sources[fp] = full_path.read_bytes()
+ live_paths.append(fp)
+ except OSError:
+ logger.warning("AST: cannot read {}", fp)
+
+ # Apply hash gate to live files
+ if use_hash_gate and live_paths:
+ new_hashes = {
+ fp: _compute_file_hash(file_sources[fp], strip_whitespace=strip_ws) for fp in live_paths
+ }
+ stored_hashes = await self.graph.get_batch_file_hashes(project_name, live_paths)
+
+ gate_passed: list[str] = []
+ for fp in live_paths:
+ stored = stored_hashes.get(fp)
+ if stored is not None and stored == new_hashes[fp]:
+ self.stats.files_skipped += 1
+ else:
+ gate_passed.append(fp)
+
+ hash_skipped = len(live_paths) - len(gate_passed)
+ if hash_skipped:
+ logger.debug(
+ "AST batch {}: hash gate skipped {}/{} file(s)",
+ batch_id,
+ hash_skipped,
+ len(live_paths),
+ )
+ live_paths = gate_passed
+ else:
+ new_hashes = {}
# 1. Parse loop (async, per-file) — no graph writes
parsed_files: dict[str, _ParsedFileData] = {}
- deleted_files: list[str] = []
- for file_idx, file_path in enumerate(unique_paths, 1):
+ for file_idx, file_path in enumerate(live_paths, 1):
if file_idx % 50 == 0:
- logger.debug("Tier2 batch {}: parsed {}/{} files", batch_id, file_idx, len(unique_paths))
- pfd = await self._parse_file(project_name, file_path, project_root=effective_root)
+ logger.debug("AST batch {}: parsed {}/{} files", batch_id, file_idx, len(live_paths))
+ pfd = await self._parse_file(
+ project_name,
+ file_path,
+ project_root=effective_root,
+ source=file_sources.get(file_path),
+ )
if pfd is _SENTINEL_DELETED:
deleted_files.append(file_path)
elif pfd is not None:
@@ -527,7 +606,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
# 2. Handle deleted files
for fp in deleted_files:
- logger.debug("Tier2: file deleted, removing entities for {}", fp)
+ logger.debug("AST: file deleted, removing entities for {}", fp)
deleted = await self.graph.delete_file_entities(project_name, fp)
self.stats.files_deleted += 1
self.stats.entities_deleted += len(deleted)
@@ -608,7 +687,13 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
text_hash = EmbedCache.hash_text(text)
embed_candidates[entity.qualified_name] = (ref, text_hash)
- # 6. Accumulate rels for deferred resolution
+ # 6. Write back file hashes for processed files
+ if new_hashes:
+ hashes_to_write = {fp: new_hashes[fp] for fp in parsed_files if fp in new_hashes}
+ if hashes_to_write:
+ await self.graph.set_batch_file_hashes(project_name, hashes_to_write)
+
+ # 7. Accumulate rels for deferred resolution
group_import_rels = [r for pfd in parsed_files.values() for r in pfd.import_rels]
group_call_rels = [r for pfd in parsed_files.values() for r in pfd.call_rels]
group_type_rels = [r for pfd in parsed_files.values() for r in pfd.type_rels]
@@ -618,6 +703,12 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
self._pending_type_rels.extend(group_type_rels)
self._pending_project_names.add(project_name)
+ # 8. Set per-file cooldown for processed files
+ if self._cooldown_s > 0:
+ expiry = asyncio.get_event_loop().time() + self._cooldown_s
+ for fp in list(parsed_files) + deleted_files:
+ self._cooldowns[fp] = expiry
+
self._batches_since_resolve += 1
now = asyncio.get_event_loop().time()
if (
@@ -630,7 +721,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
span.set_attribute("entities_changed", total_changed)
logger.debug(
- "Tier2 batch {}: {} files, {} skipped, {} entities changed",
+ "AST batch {}: {} files, {} skipped, {} entities changed",
batch_id,
total_paths,
self.stats.files_skipped - skipped_before,
@@ -664,14 +755,14 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None: # no
# ---------------------------------------------------------------------------
-# Tier 3: Embeddings (expensive, heavily batched)
+# Embed stage: Embeddings (expensive, heavily batched)
# ---------------------------------------------------------------------------
-class Tier3EmbedConsumer(TierConsumer):
- """Tier 3: Re-embed entities via TEI. Deduplicates by qualified name.
+class EmbedConsumer(TierConsumer):
+ """Embed stage: Re-embed entities via TEI. Deduplicates by qualified name.
- Implements a three-tier lookup to minimize expensive embedding API calls:
+ Implements a three-level lookup to minimize expensive embedding API calls:
1. **Graph hit** — node already has ``embed_hash`` matching current text (free).
2. **Valkey cache hit** — vector stored in Redis from a previous run (1 round-trip).
3. **API call** — embed via TEI / cloud provider (expensive).
@@ -692,8 +783,8 @@ def __init__(
super().__init__(
bus=bus,
input_topic=Topic.EMBED_DIRTY,
- group="tier3-embed",
- consumer_name="tier3-embed",
+ group="embed",
+ consumer_name="embed",
policy=policy
or BatchPolicy(
time_window_s=10.0,
@@ -787,7 +878,7 @@ async def _embed_and_store(self, need_embed: list[tuple[str, str, str]]) -> list
return result
async def process_batch(self, events: list[Event], batch_id: str) -> None:
- with _tracer.start_as_current_span("tier3.process_batch", attributes={"batch_id": batch_id}) as span:
+ with _tracer.start_as_current_span("embed.process_batch", attributes={"batch_id": batch_id}) as span:
# Collect and deduplicate entities across all events in the batch
seen: dict[str, EntityRef] = {}
for e in events:
@@ -795,7 +886,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None:
seen[e.entity.qualified_name] = e.entity
entities = list(seen.values())
- logger.debug("Tier3 batch {}: {} unique entity(ies)", batch_id, len(entities))
+ logger.debug("Embed batch {}: {} unique entity(ies)", batch_id, len(entities))
if not entities:
return
@@ -829,7 +920,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None:
if not to_process:
elapsed = asyncio.get_event_loop().time() - t0
logger.debug(
- "Tier3 batch {}: {} entities, {} graph hits, 0 cache hits, 0 embedded ({:.1f}s)",
+ "Embed batch {}: {} entities, {} graph hits, 0 cache hits, 0 embedded ({:.1f}s)",
batch_id,
total,
graph_hits,
@@ -845,10 +936,10 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None:
# Serialized via _write_lock to avoid Memgraph write-lock contention.
all_resolved = cache_resolved + api_vectors
if all_resolved:
- with _tracer.start_as_current_span("tier3.write_lock_wait"):
+ with _tracer.start_as_current_span("embed.write_lock_wait"):
await self._write_lock.acquire()
try:
- with _tracer.start_as_current_span("tier3.write_embeddings"):
+ with _tracer.start_as_current_span("embed.write_embeddings"):
write_labels = [uid_to_label[uid] for uid, _, _ in all_resolved] if uid_to_label else None
await self.graph.write_embeddings_and_hashes(all_resolved, labels=write_labels)
finally:
@@ -863,7 +954,7 @@ async def process_batch(self, events: list[Event], batch_id: str) -> None:
get_metrics().embedding_latency.record(elapsed)
logger.debug(
- "Tier3 batch {}: {} entities, {} graph hits, {} cache hits, {} embedded ({:.1f}s)",
+ "Embed batch {}: {} entities, {} graph hits, {} cache hits, {} embedded ({:.1f}s)",
batch_id,
total,
graph_hits,
diff --git a/src/code_atlas/indexing/daemon.py b/src/code_atlas/indexing/daemon.py
index 46cc28a..eebec20 100644
--- a/src/code_atlas/indexing/daemon.py
+++ b/src/code_atlas/indexing/daemon.py
@@ -1,7 +1,7 @@
"""Daemon manager — reusable watcher + pipeline lifecycle.
Encapsulates the EventBus, FileWatcher, EmbedClient, EmbedCache,
-and Tier 1/2/3 consumers. Used by both the CLI (``atlas watch``,
+and AST/Embed consumers. Used by both the CLI (``atlas watch``,
``atlas daemon start``) and the MCP server for auto-indexing.
"""
@@ -14,7 +14,7 @@
from loguru import logger
from code_atlas.events import EventBus
-from code_atlas.indexing.consumers import Tier1GraphConsumer, Tier2ASTConsumer, Tier3EmbedConsumer
+from code_atlas.indexing.consumers import ASTConsumer, EmbedConsumer
from code_atlas.indexing.orchestrator import FileScope, detect_sub_projects
from code_atlas.indexing.watcher import FileWatcher
from code_atlas.search.embeddings import EmbedCache, EmbedClient
@@ -31,9 +31,7 @@ class DaemonManager:
_bus: EventBus | None = field(default=None, repr=False)
_watcher: FileWatcher | None = field(default=None, repr=False)
- _consumers: list[Tier1GraphConsumer | Tier2ASTConsumer | Tier3EmbedConsumer] = field(
- default_factory=list, repr=False
- )
+ _consumers: list[ASTConsumer | EmbedConsumer] = field(default_factory=list, repr=False)
_tasks: list[asyncio.Task[None]] = field(default_factory=list, repr=False)
_cache: EmbedCache | None = field(default=None, repr=False)
_embed: EmbedClient | None = field(default=None, repr=False)
@@ -79,12 +77,11 @@ async def start(
cache = EmbedCache(settings.redis, settings.embeddings)
self._cache = cache
- consumers: list[Tier1GraphConsumer | Tier2ASTConsumer | Tier3EmbedConsumer] = [
- Tier1GraphConsumer(bus, graph, settings),
- Tier2ASTConsumer(bus, graph, settings),
+ consumers: list[ASTConsumer | EmbedConsumer] = [
+ ASTConsumer(bus, graph, settings, cooldown_s=settings.watcher.cooldown_s),
]
if embed is not None:
- consumers.append(Tier3EmbedConsumer(bus, graph, embed, cache=cache))
+ consumers.append(EmbedConsumer(bus, graph, embed, cache=cache))
self._consumers = consumers
if include_watcher:
@@ -146,7 +143,7 @@ async def _run_watcher(self) -> None:
logger.exception("File watcher crashed")
@staticmethod
- async def _run_consumer(consumer: Tier1GraphConsumer | Tier2ASTConsumer | Tier3EmbedConsumer) -> None:
+ async def _run_consumer(consumer: ASTConsumer | EmbedConsumer) -> None:
"""Run a consumer, catching exceptions so one failure doesn't crash the rest."""
try:
await consumer.run()
diff --git a/src/code_atlas/indexing/orchestrator.py b/src/code_atlas/indexing/orchestrator.py
index 6f1ba80..8af38e1 100644
--- a/src/code_atlas/indexing/orchestrator.py
+++ b/src/code_atlas/indexing/orchestrator.py
@@ -17,8 +17,8 @@
import pathspec
from loguru import logger
-from code_atlas.events import ASTDirty, Event, EventBus, FileChanged, Topic
-from code_atlas.indexing.consumers import BatchPolicy, Tier1GraphConsumer, Tier2ASTConsumer, Tier3EmbedConsumer
+from code_atlas.events import Event, EventBus, FileChanged, Topic
+from code_atlas.indexing.consumers import ASTConsumer, BatchPolicy, EmbedConsumer
from code_atlas.parsing.ast import get_language_for_file
from code_atlas.search.embeddings import EmbedCache, EmbedClient
from code_atlas.settings import derive_project_name, resolve_git_dir
@@ -848,50 +848,42 @@ async def _run_pipeline(
project_root: Path | None = None,
project_filter: set[str] | None = None,
on_drain_progress: Callable[[int, int, int], None] | None = None,
- skip_tier1: bool = False,
-) -> Tier2ASTConsumer:
- """Start inline tier consumers and wait for the pipeline to drain.
+ reindex_mode: bool = False,
+) -> ASTConsumer:
+ """Start inline consumers and wait for the pipeline to drain.
- Returns the Tier2 consumer so callers can read accumulated stats.
- When *skip_tier1* is True, Tier 1 is not created and reindex-tuned
- policies are used for faster polling.
+ Returns the AST consumer so callers can read accumulated stats.
+ When *reindex_mode* is True, reindex-tuned policies are used for
+ faster polling.
"""
- tier1: Tier1GraphConsumer | None = None
- task1: asyncio.Task[None] | None = None
-
- if not skip_tier1:
- await bus.ensure_group(Topic.FILE_CHANGED, "tier1-graph")
- tier1 = Tier1GraphConsumer(bus, graph, settings, project_filter=project_filter)
- task1 = asyncio.create_task(tier1.run())
-
- await bus.ensure_group(Topic.AST_DIRTY, "tier2-ast")
+ await bus.ensure_group(Topic.FILE_CHANGED, "ast")
# Reindex-tuned policies: flush immediately, short blocking reads
- t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if skip_tier1 else None
- t3_policy = (
+ ast_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None
+ embed_policy = (
BatchPolicy(time_window_s=1.0, max_batch_size=embed.batch_size, block_ms=50)
- if skip_tier1 and embed is not None
+ if reindex_mode and embed is not None
else None
)
- tier2 = Tier2ASTConsumer(
- bus, graph, settings, project_root=project_root, project_filter=project_filter, policy=t2_policy
+ ast_consumer = ASTConsumer(
+ bus, graph, settings, project_root=project_root, project_filter=project_filter, policy=ast_policy
)
- task2 = asyncio.create_task(tier2.run())
+ ast_task = asyncio.create_task(ast_consumer.run())
- tier3: Tier3EmbedConsumer | None = None
- tier3_task: asyncio.Task[None] | None = None
+ embed_consumer: EmbedConsumer | None = None
+ embed_task: asyncio.Task[None] | None = None
if embed is not None:
- await bus.ensure_group(Topic.EMBED_DIRTY, "tier3-embed")
- tier3 = Tier3EmbedConsumer(
+ await bus.ensure_group(Topic.EMBED_DIRTY, "embed")
+ embed_consumer = EmbedConsumer(
bus,
graph,
embed,
cache=cache,
project_filter=project_filter,
- policy=t3_policy,
+ policy=embed_policy,
)
- tier3_task = asyncio.create_task(tier3.run())
+ embed_task = asyncio.create_task(embed_consumer.run())
try:
await _wait_for_drain(
@@ -899,29 +891,24 @@ async def _run_pipeline(
drain_timeout_s,
embed_enabled=embed is not None,
on_drain_progress=on_drain_progress,
- skip_tier1=skip_tier1,
- settle_s=2.0 if skip_tier1 else 8.0,
+ settle_s=2.0,
)
finally:
- if tier1 is not None:
- tier1.stop()
- tier2.stop()
- if tier3 is not None:
- tier3.stop()
+ ast_consumer.stop()
+ if embed_consumer is not None:
+ embed_consumer.stop()
await asyncio.sleep(0.5)
- if task1 is not None:
- task1.cancel()
- task2.cancel()
- if tier3_task is not None:
- tier3_task.cancel()
- for t in [task1, task2, tier3_task]:
+ ast_task.cancel()
+ if embed_task is not None:
+ embed_task.cancel()
+ for t in [ast_task, embed_task]:
if t is not None:
with contextlib.suppress(asyncio.CancelledError):
await t
if cache is not None:
await cache.close()
- return tier2
+ return ast_consumer
@dataclass
@@ -1095,25 +1082,10 @@ async def _publish_events(
*,
project_name: str = "",
project_root: str = "",
- skip_tier1: bool = False,
) -> int:
- """Publish FileChanged (or ASTDirty when skip_tier1) events and return the count published."""
- if skip_tier1:
- # Publish ASTDirty directly, bypassing Tier 1
- if mode == "delta":
- events: list[Event] = [
- ASTDirty(path=fp, project_name=project_name, project_root=project_root)
- for fp in decision.files_added | decision.files_modified | decision.files_deleted
- ]
- else:
- events = [ASTDirty(path=fp, project_name=project_name, project_root=project_root) for fp in files]
- if events:
- await bus.publish_many(Topic.AST_DIRTY, events)
- logger.debug("Published {} ASTDirty events (skip_tier1, {})", len(events), mode)
- return len(events)
-
+ """Publish FileChanged events and return the count published."""
if mode == "delta":
- events = []
+ events: list[Event] = []
events.extend(
FileChanged(path=fp, change_type="created", project_name=project_name, project_root=project_root)
for fp in decision.files_added
@@ -1172,7 +1144,7 @@ async def index_project(
3. Decide full vs. delta mode (git diff, threshold check)
4. Create Project + Package hierarchy in the graph
5. Publish FileChanged events to Valkey (all or delta-only)
- 6. Run inline Tier 1 + Tier 2 consumers until the pipeline drains
+ 6. Run inline AST + Embed consumers until the pipeline drains
7. Update Project metadata (counts, git hash, delta stats)
In monorepo mode, *project_name* and *project_root* override the
@@ -1251,16 +1223,14 @@ async def _index_project_inner( # noqa: PLR0915
# 5. Sort files for optimal resolution order (deep modules before __init__.py)
files = _sort_files_for_indexing(files)
- # 6. Publish events (skip Tier 1 for full reindex — ASTDirty directly)
- skip_tier1 = full_reindex or decision.mode == "full"
- published = await _publish_events(
- bus, decision.mode, files, decision, project_name=project_name, skip_tier1=skip_tier1
- )
+ # 6. Publish events
+ published = await _publish_events(bus, decision.mode, files, decision, project_name=project_name)
- # 6. Start inline consumers and wait for drain
- t2stats = None
+ # 7. Start inline consumers and wait for drain
+ reindex_mode = full_reindex or decision.mode == "full"
+ ast_stats = None
if published > 0:
- tier2 = await _run_pipeline(
+ ast_consumer = await _run_pipeline(
bus,
graph,
settings,
@@ -1270,9 +1240,9 @@ async def _index_project_inner( # noqa: PLR0915
project_root=project_root,
project_filter={project_name},
on_drain_progress=on_drain_progress,
- skip_tier1=skip_tier1,
+ reindex_mode=reindex_mode,
)
- t2stats = tier2.stats
+ ast_stats = ast_consumer.stats
# 7. Set dependency versions on ExternalPackage nodes
dep_versions = _parse_dependency_versions(project_root)
@@ -1297,7 +1267,7 @@ async def _index_project_inner( # noqa: PLR0915
await graph.update_project_metadata(project_name, **metadata)
duration = time.monotonic() - start
- delta_stats = _build_delta_stats(decision, t2stats) if decision.mode == "delta" else None
+ delta_stats = _build_delta_stats(decision, ast_stats) if decision.mode == "delta" else None
logger.debug(
"Indexing complete ({}): {} files scanned, {} published, {} entities, {:.1f}s",
@@ -1377,7 +1347,6 @@ async def _publish_project(
files: list[str],
*,
full_reindex: bool = False,
- skip_tier1: bool = False,
) -> _ProjectPublishResult:
"""Scan, decide delta/full, create packages, and publish events for one project.
@@ -1400,10 +1369,7 @@ async def _publish_project(
# Sort files for optimal resolution order
files = _sort_files_for_indexing(files)
- # Determine skip_tier1 from caller hint or decision mode
- effective_skip = skip_tier1 or decision.mode == "full"
-
- # Publish events (ASTDirty directly when skipping Tier 1)
+ # Publish events
published = await _publish_events(
bus,
decision.mode,
@@ -1411,7 +1377,6 @@ async def _publish_project(
decision,
project_name=project_name,
project_root=str(project_root),
- skip_tier1=effective_skip,
)
return _ProjectPublishResult(
@@ -1493,34 +1458,27 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915
await cache.clear()
# --- Start shared consumers (once for entire monorepo) ---
- skip_tier1 = full_reindex # full reindex publishes ASTDirty directly
-
- tier1: Tier1GraphConsumer | None = None
- if not skip_tier1:
- await bus.ensure_group(Topic.FILE_CHANGED, "tier1-graph")
- tier1 = Tier1GraphConsumer(bus, graph, settings)
+ reindex_mode = full_reindex
- await bus.ensure_group(Topic.AST_DIRTY, "tier2-ast")
+ await bus.ensure_group(Topic.FILE_CHANGED, "ast")
- t2_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if skip_tier1 else None
- t3_policy = (
+ ast_policy = BatchPolicy(time_window_s=0, max_batch_size=30, block_ms=50) if reindex_mode else None
+ embed_policy = (
BatchPolicy(time_window_s=1.0, max_batch_size=embed.batch_size, block_ms=50)
- if skip_tier1 and embed is not None
+ if reindex_mode and embed is not None
else None
)
- tier2 = Tier2ASTConsumer(bus, graph, settings, project_root=project_root, policy=t2_policy)
+ ast_consumer = ASTConsumer(bus, graph, settings, project_root=project_root, policy=ast_policy)
consumer_tasks: list[asyncio.Task[None]] = []
- if tier1 is not None:
- consumer_tasks.append(asyncio.create_task(tier1.run()))
- consumer_tasks.append(asyncio.create_task(tier2.run()))
+ consumer_tasks.append(asyncio.create_task(ast_consumer.run()))
- tier3: Tier3EmbedConsumer | None = None
+ embed_consumer: EmbedConsumer | None = None
if embed is not None:
- await bus.ensure_group(Topic.EMBED_DIRTY, "tier3-embed")
- tier3 = Tier3EmbedConsumer(bus, graph, embed, cache=cache, policy=t3_policy)
- consumer_tasks.append(asyncio.create_task(tier3.run()))
+ await bus.ensure_group(Topic.EMBED_DIRTY, "embed")
+ embed_consumer = EmbedConsumer(bus, graph, embed, cache=cache, policy=embed_policy)
+ consumer_tasks.append(asyncio.create_task(embed_consumer.run()))
start = time.monotonic()
publish_results: list[_ProjectPublishResult] = []
@@ -1542,7 +1500,6 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915
sub.root,
sub_files,
full_reindex=full_reindex,
- skip_tier1=skip_tier1,
)
publish_results.append(pr)
@@ -1566,30 +1523,26 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915
project_root,
root_only_files,
full_reindex=full_reindex,
- skip_tier1=skip_tier1,
)
publish_results.append(root_pr)
if on_progress is not None:
on_progress(root_name, total, total)
- # --- Wait for ALL tiers to drain (once) ---
+ # --- Wait for ALL stages to drain (once) ---
await _wait_for_drain(
bus,
drain_timeout_s,
embed_enabled=embed is not None,
on_drain_progress=on_drain_progress,
- skip_tier1=skip_tier1,
- settle_s=2.0 if skip_tier1 else 8.0,
+ settle_s=2.0,
)
finally:
# --- Tear down consumers (once) ---
- if tier1 is not None:
- tier1.stop()
- tier2.stop()
- if tier3 is not None:
- tier3.stop()
+ ast_consumer.stop()
+ if embed_consumer is not None:
+ embed_consumer.stop()
await asyncio.sleep(0.5)
for task in consumer_tasks:
task.cancel()
@@ -1625,7 +1578,7 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915
# Use shared start time — in monorepo mode all projects share one pipeline,
# so per-project publish timestamps don't reflect actual processing duration.
duration = time.monotonic() - start
- delta_stats = _build_delta_stats(pr.decision, tier2.stats) if pr.mode == "delta" else None
+ delta_stats = _build_delta_stats(pr.decision, ast_consumer.stats) if pr.mode == "delta" else None
results.append(
IndexResult(
files_scanned=pr.files_scanned,
@@ -1653,16 +1606,16 @@ async def _index_monorepo_inner( # noqa: PLR0912, PLR0915
return results
-def _build_delta_stats(decision: _DeltaDecision, t2stats: Any) -> DeltaStats:
- """Build DeltaStats from the decision and Tier2 stats."""
+def _build_delta_stats(decision: _DeltaDecision, ast_stats: Any) -> DeltaStats:
+ """Build DeltaStats from the decision and AST consumer stats."""
return DeltaStats(
files_added=len(decision.files_added),
files_modified=len(decision.files_modified),
files_deleted=len(decision.files_deleted),
- entities_added=t2stats.entities_added if t2stats else 0,
- entities_modified=t2stats.entities_modified if t2stats else 0,
- entities_deleted=t2stats.entities_deleted if t2stats else 0,
- entities_unchanged=t2stats.entities_unchanged if t2stats else 0,
+ entities_added=ast_stats.entities_added if ast_stats else 0,
+ entities_modified=ast_stats.entities_modified if ast_stats else 0,
+ entities_deleted=ast_stats.entities_deleted if ast_stats else 0,
+ entities_unchanged=ast_stats.entities_unchanged if ast_stats else 0,
)
@@ -1672,42 +1625,35 @@ async def _wait_for_drain(
*,
embed_enabled: bool = True,
on_drain_progress: Callable[[int, int, int], None] | None = None,
- skip_tier1: bool = False,
settle_s: float = 2.0,
) -> None:
- """Poll stream groups until Tier 1, Tier 2, and (optionally) Tier 3 are drained.
-
- When *skip_tier1* is True, Tier 1 polling is skipped (events go directly
- to Tier 2).
+ """Poll stream groups until AST and (optionally) Embed consumers are drained.
If *on_drain_progress* is provided, it is called each poll cycle with
``(t1_remaining, t2_remaining, t3_remaining)`` so callers can display
- pipeline progress to the user.
+ pipeline progress to the user. ``t1_remaining`` is always 0 (kept for
+ callback signature compatibility).
"""
deadline = time.monotonic() + timeout_s
settled_since: float | None = None
poll_interval = 0.5
while time.monotonic() < deadline:
- queries: list[tuple[Topic, str]] = []
- if not skip_tier1:
- queries.append((Topic.FILE_CHANGED, "tier1-graph"))
- queries.append((Topic.AST_DIRTY, "tier2-ast"))
+ queries: list[tuple[Topic, str]] = [(Topic.FILE_CHANGED, "ast")]
if embed_enabled:
- queries.append((Topic.EMBED_DIRTY, "tier3-embed"))
+ queries.append((Topic.EMBED_DIRTY, "embed"))
infos = await bus.stream_group_info_multi(queries)
# Build a topic→remaining map so we don't need fragile index tracking
remaining = {topic: info["pending"] + info["lag"] for (topic, _), info in zip(queries, infos, strict=True)}
- t1_remaining = remaining.get(Topic.FILE_CHANGED, 0)
- t2_remaining = remaining.get(Topic.AST_DIRTY, 0)
+ t2_remaining = remaining.get(Topic.FILE_CHANGED, 0)
t3_remaining = remaining.get(Topic.EMBED_DIRTY, 0)
if on_drain_progress is not None:
- on_drain_progress(t1_remaining, t2_remaining, t3_remaining)
+ on_drain_progress(0, t2_remaining, t3_remaining)
- if t1_remaining == 0 and t2_remaining == 0 and t3_remaining == 0:
+ if t2_remaining == 0 and t3_remaining == 0:
if settled_since is None:
settled_since = time.monotonic()
elif time.monotonic() - settled_since >= settle_s:
@@ -1722,9 +1668,8 @@ async def _wait_for_drain(
await asyncio.sleep(poll_interval)
logger.warning(
- "Pipeline drain timed out after {:.0f}s — t1={} t2={} t3={} (raw={})",
+ "Pipeline drain timed out after {:.0f}s — t2={} t3={} (raw={})",
timeout_s,
- t1_remaining,
t2_remaining,
t3_remaining,
infos,
diff --git a/src/code_atlas/search/embeddings.py b/src/code_atlas/search/embeddings.py
index 3eb76d5..70ef286 100644
--- a/src/code_atlas/search/embeddings.py
+++ b/src/code_atlas/search/embeddings.py
@@ -461,7 +461,7 @@ def _build_doc_section_text(qualified_name: str, docstring: str) -> str:
"""Build embed text for DocSection nodes.
The ``qualified_name`` encodes the header breadcrumb
- (e.g. ``docs/architecture.md > Architecture > Event Pipeline > Tier 2``).
+ (e.g. ``docs/architecture.md > Architecture > Event Pipeline > AST Stage``).
"""
# Split on " > " to get file path and section headers
breadcrumb_parts = qualified_name.split(" > ")
diff --git a/src/code_atlas/settings.py b/src/code_atlas/settings.py
index aca7aab..75396c8 100644
--- a/src/code_atlas/settings.py
+++ b/src/code_atlas/settings.py
@@ -188,7 +188,7 @@ class EmbeddingSettings(BaseSettings):
dimension: int | None = Field(default=None, description="Embedding vector dimension. Auto-detected when None.")
batch_size: int | None = Field(default=None, description="Max texts per embedding API call. Auto from provider.")
max_concurrency: int | None = Field(
- default=None, description="Max concurrent embedding API calls / Tier 3 consumers. Auto from provider."
+ default=None, description="Max concurrent embedding API calls / embed consumers. Auto from provider."
)
timeout_s: float = Field(default=30.0, description="Timeout in seconds for embedding API calls.")
truncate_ratio: float = Field(
@@ -272,6 +272,10 @@ class IndexSettings(BaseSettings):
description="Stale index behavior: 'warn' (annotate), 'lock' (refuse), 'ignore' (skip).",
)
max_source_chars: int = Field(default=2000, description="Max characters for entity source text (0 to disable).")
+ file_hash_gate: bool = Field(default=True, description="Skip files whose content hash hasn't changed.")
+ strip_whitespace: bool = Field(
+ default=True, description="Normalize whitespace before hashing (ignores formatting-only changes)."
+ )
class ObservabilitySettings(BaseSettings):
@@ -289,6 +293,7 @@ class WatcherSettings(BaseSettings):
debounce_s: float = Field(default=5.0, description="Debounce timer in seconds (resets per change).")
max_wait_s: float = Field(default=30.0, description="Max-wait ceiling in seconds (per batch).")
+ cooldown_s: float = Field(default=10.0, description="Per-file cooldown after processing (seconds). 0 disables.")
class McpSettings(BaseSettings):
diff --git a/tests/conftest.py b/tests/conftest.py
index c3f5634..04e24d3 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -12,5 +12,5 @@
"""Shortened drain timeout for integration tests (default 600s is too long)."""
NO_EMBED = EmbeddingSettings(enabled=False)
-"""Embedding settings that disable Tier3 entirely — use for pipeline tests
+"""Embedding settings that disable the embed stage entirely — use for pipeline tests
that don't need real or mocked embeddings."""
diff --git a/tests/integration/indexing/test_consumers.py b/tests/integration/indexing/test_consumers.py
index 9302768..fb5bbdc 100644
--- a/tests/integration/indexing/test_consumers.py
+++ b/tests/integration/indexing/test_consumers.py
@@ -6,19 +6,22 @@
from __future__ import annotations
import asyncio
+import time
+from typing import TYPE_CHECKING
import pytest
from code_atlas.events import (
- ASTDirty,
EventBus,
FileChanged,
Topic,
decode_event,
)
-from code_atlas.indexing.consumers import Tier1GraphConsumer
-from code_atlas.settings import AtlasSettings
-from tests.conftest import NO_EMBED
+from code_atlas.indexing.consumers import ASTConsumer, BatchPolicy
+
+if TYPE_CHECKING:
+ from code_atlas.graph.client import GraphClient
+ from code_atlas.settings import AtlasSettings
# All tests in this module require a live Redis/Valkey
pytestmark = pytest.mark.integration
@@ -41,8 +44,15 @@ async def _clean_streams(event_bus: EventBus):
await event_bus._redis.delete(key)
+def _write_python_file(root, rel_path: str, content: str) -> None:
+ """Write a Python file under *root* at the given relative path."""
+ full = root / rel_path
+ full.parent.mkdir(parents=True, exist_ok=True)
+ full.write_text(content, encoding="utf-8")
+
+
# ---------------------------------------------------------------------------
-# Tests
+# EventBus tests
# ---------------------------------------------------------------------------
@@ -104,37 +114,257 @@ async def test_dedup_within_batch(event_bus: EventBus) -> None:
assert pending["src/main.py"].timestamp == 1004.0
+# ---------------------------------------------------------------------------
+# AST consumer tests
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.usefixtures("_clean_streams")
+async def test_ast_consumes_file_changed(
+ event_bus: EventBus,
+ graph_client: GraphClient,
+ settings: AtlasSettings,
+) -> None:
+ """AST consumer processes FileChanged from the file-changed topic and writes entities to graph."""
+ await graph_client.ensure_schema()
+
+ # Write a Python file for the AST consumer to parse
+ _write_python_file(settings.project_root, "hello.py", "def greet(name: str) -> str:\n return f'Hello {name}'\n")
+
+ consumer = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ )
+
+ # Publish a FileChanged and let the consumer process it
+ project_name = settings.project_root.resolve().name
+ await event_bus.publish(
+ Topic.FILE_CHANGED,
+ FileChanged(
+ path="hello.py",
+ change_type="created",
+ timestamp=time.time(),
+ project_name=project_name,
+ project_root=str(settings.project_root),
+ ),
+ )
+
+ task = asyncio.create_task(consumer.run())
+ await asyncio.sleep(1.0)
+ consumer.stop()
+ await asyncio.wait_for(task, timeout=5.0)
+
+ assert consumer.stats.files_processed >= 1
+ assert consumer.stats.entities_added >= 1
+
+
+@pytest.mark.usefixtures("_clean_streams")
+async def test_file_hash_gate_skips_unchanged(
+ event_bus: EventBus,
+ graph_client: GraphClient,
+ settings: AtlasSettings,
+) -> None:
+ """Hash gate skips a file when content hasn't changed between runs."""
+ await graph_client.ensure_schema()
+
+ _write_python_file(settings.project_root, "stable.py", "X = 42\n")
+
+ project_name = settings.project_root.resolve().name
+ ev = FileChanged(
+ path="stable.py",
+ change_type="modified",
+ timestamp=time.time(),
+ project_name=project_name,
+ project_root=str(settings.project_root),
+ )
+
+ # First run: processes the file and stores its hash
+ c1 = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ )
+ await event_bus.publish(Topic.FILE_CHANGED, ev)
+ task = asyncio.create_task(c1.run())
+ await asyncio.sleep(1.0)
+ c1.stop()
+ await asyncio.wait_for(task, timeout=5.0)
+ assert c1.stats.files_processed >= 1
+
+ # Second run: same file, same content — should be skipped
+ c2 = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ )
+ await event_bus.publish(Topic.FILE_CHANGED, ev)
+ task = asyncio.create_task(c2.run())
+ await asyncio.sleep(1.0)
+ c2.stop()
+ await asyncio.wait_for(task, timeout=5.0)
+
+ assert c2.stats.files_skipped >= 1
+ assert c2.stats.files_processed == 0
+
+
+@pytest.mark.usefixtures("_clean_streams")
+async def test_file_hash_gate_processes_modified(
+ event_bus: EventBus,
+ graph_client: GraphClient,
+ settings: AtlasSettings,
+) -> None:
+ """Hash gate allows a file through when content changes between runs."""
+ await graph_client.ensure_schema()
+
+ _write_python_file(settings.project_root, "changing.py", "X = 1\n")
+
+ project_name = settings.project_root.resolve().name
+ ev = FileChanged(
+ path="changing.py",
+ change_type="modified",
+ timestamp=time.time(),
+ project_name=project_name,
+ project_root=str(settings.project_root),
+ )
+
+ # First run
+ c1 = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ )
+ await event_bus.publish(Topic.FILE_CHANGED, ev)
+ task = asyncio.create_task(c1.run())
+ await asyncio.sleep(1.0)
+ c1.stop()
+ await asyncio.wait_for(task, timeout=5.0)
+ assert c1.stats.files_processed >= 1
+
+ # Modify the file
+ _write_python_file(settings.project_root, "changing.py", "X = 2\nY = 3\n")
+
+ # Second run: changed content — should process again
+ c2 = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ )
+ await event_bus.publish(Topic.FILE_CHANGED, ev)
+ task = asyncio.create_task(c2.run())
+ await asyncio.sleep(1.0)
+ c2.stop()
+ await asyncio.wait_for(task, timeout=5.0)
+
+ assert c2.stats.files_processed >= 1
+
+
@pytest.mark.usefixtures("_clean_streams")
-async def test_tier1_publishes_downstream(event_bus: EventBus) -> None:
- """Run Tier1 briefly, verify ASTDirty appears on the ast-dirty stream."""
- # Set up consumer group for downstream
- await event_bus.ensure_group(Topic.AST_DIRTY, "test-downstream")
+async def test_cooldown_defers_rapid_edits(
+ event_bus: EventBus,
+ graph_client: GraphClient,
+ settings: AtlasSettings,
+) -> None:
+ """Per-file cooldown defers rapid re-edits so only the first is processed immediately."""
+ await graph_client.ensure_schema()
+
+ _write_python_file(settings.project_root, "rapid.py", "A = 1\n")
+
+ project_name = settings.project_root.resolve().name
+
+ consumer = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ cooldown_s=60.0, # Long cooldown — second event should be deferred
+ )
- # Publish a FileChanged event
+ # Publish first event
await event_bus.publish(
Topic.FILE_CHANGED,
- FileChanged(path="src/app.py", change_type="modified", timestamp=2000.0),
+ FileChanged(
+ path="rapid.py",
+ change_type="modified",
+ timestamp=time.time(),
+ project_name=project_name,
+ project_root=str(settings.project_root),
+ ),
)
- # Run Tier1 for a short period then stop
- # Tier1 needs graph + settings but we're only testing event flow here;
- # it doesn't call graph in its current implementation.
- from unittest.mock import AsyncMock
+ task = asyncio.create_task(consumer.run())
+ await asyncio.sleep(1.0)
- mock_graph = AsyncMock()
- test_settings = AtlasSettings(embeddings=NO_EMBED)
- tier1 = Tier1GraphConsumer(event_bus, mock_graph, test_settings)
+ # First event should be processed
+ assert consumer.stats.files_processed >= 1
+ first_processed = consumer.stats.files_processed
- async def stop_after_delay() -> None:
- await asyncio.sleep(1.5)
- tier1.stop()
+ # Publish a second event for the same file — should be deferred
+ await event_bus.publish(
+ Topic.FILE_CHANGED,
+ FileChanged(
+ path="rapid.py",
+ change_type="modified",
+ timestamp=time.time(),
+ project_name=project_name,
+ project_root=str(settings.project_root),
+ ),
+ )
+ await asyncio.sleep(1.0)
+
+ consumer.stop()
+ await asyncio.wait_for(task, timeout=5.0)
- await asyncio.gather(tier1.run(), stop_after_delay())
+ # Second event deferred — files_processed should not have increased
+ assert consumer.stats.files_processed == first_processed
+ assert consumer.stats.files_deferred >= 1
+
+
+@pytest.mark.usefixtures("_clean_streams")
+async def test_cooldown_disabled_processes_all(
+ event_bus: EventBus,
+ graph_client: GraphClient,
+ settings: AtlasSettings,
+) -> None:
+ """With cooldown_s=0, all events are processed immediately (reindex mode)."""
+ await graph_client.ensure_schema()
+
+ _write_python_file(settings.project_root, "nodelay.py", "Z = 1\n")
+
+ project_name = settings.project_root.resolve().name
+
+ consumer = ASTConsumer(
+ event_bus,
+ graph_client,
+ settings,
+ policy=BatchPolicy(time_window_s=0, max_batch_size=10, block_ms=50),
+ cooldown_s=0.0, # No cooldown
+ )
+
+ # Publish two events for the same file
+ for i in range(2):
+ await event_bus.publish(
+ Topic.FILE_CHANGED,
+ FileChanged(
+ path="nodelay.py",
+ change_type="modified",
+ timestamp=time.time() + i,
+ project_name=project_name,
+ project_root=str(settings.project_root),
+ ),
+ )
+ # Small gap so they arrive in separate batches
+ await asyncio.sleep(0.1)
- # Read from the downstream ast-dirty stream
- messages = await event_bus.read_batch(Topic.AST_DIRTY, "test-downstream", "test-ds-0", count=10, block_ms=500)
- assert len(messages) >= 1
+ task = asyncio.create_task(consumer.run())
+ await asyncio.sleep(2.0)
+ consumer.stop()
+ await asyncio.wait_for(task, timeout=5.0)
- event = decode_event(Topic.AST_DIRTY, messages[0][1])
- assert isinstance(event, ASTDirty)
- assert event.path == "src/app.py"
+ # No deferral when cooldown is disabled
+ assert consumer.stats.files_deferred == 0
diff --git a/tests/unit/search/test_embeddings.py b/tests/unit/search/test_embeddings.py
index a588845..baa7255 100644
--- a/tests/unit/search/test_embeddings.py
+++ b/tests/unit/search/test_embeddings.py
@@ -10,7 +10,7 @@
import pytest
from code_atlas.events import EmbedDirty, EntityRef
-from code_atlas.indexing.consumers import Tier3EmbedConsumer
+from code_atlas.indexing.consumers import EmbedConsumer
from code_atlas.search.embeddings import EmbedCache, EmbedClient, EmbeddingError, build_embed_text
from code_atlas.settings import EmbeddingSettings
@@ -89,15 +89,15 @@ def test_module(self):
def test_doc_section(self):
props = {
"_label": "DocSection",
- "qualified_name": "docs/architecture.md > Architecture > Event Pipeline > Tier 2",
+ "qualified_name": "docs/architecture.md > Architecture > Event Pipeline > AST Stage",
"kind": "",
"signature": "",
- "docstring": "The AST parsing tier processes file changes...",
+ "docstring": "The AST stage processes file changes...",
}
text = build_embed_text(props)
assert "File: docs/architecture.md" in text
- assert "Section: Architecture > Event Pipeline > Tier 2" in text
- assert '"""The AST parsing tier processes file changes..."""' in text
+ assert "Section: Architecture > Event Pipeline > AST Stage" in text
+ assert '"""The AST stage processes file changes..."""' in text
def test_empty_qualified_name_returns_empty(self):
props = {"_label": "Callable", "qualified_name": "", "kind": "function"}
@@ -353,12 +353,12 @@ async def test_health_check_failure(self):
# ---------------------------------------------------------------------------
-# Tier3 cache integration tests (mocked graph + embed + cache)
+# Embed consumer cache integration tests (mocked graph + embed + cache)
# ---------------------------------------------------------------------------
-class TestTier3CacheLookup:
- """Test the three-tier lookup logic in Tier3EmbedConsumer with mocks."""
+class TestEmbedCacheLookup:
+ """Test the three-level lookup logic in EmbedConsumer with mocks."""
@staticmethod
def _make_entity_ref(qn: str) -> EntityRef:
@@ -402,7 +402,7 @@ async def test_graph_hit_skips_all(self):
]
)
- consumer = Tier3EmbedConsumer(bus, graph, embed, cache=cache)
+ consumer = EmbedConsumer(bus, graph, embed, cache=cache)
entity = self._make_entity_ref("foo.bar")
event = self._make_embed_dirty(entity)
@@ -440,7 +440,7 @@ async def test_cache_hit_skips_embed(self):
)
cache.get_many = AsyncMock(return_value={text_hash: cached_vec})
- consumer = Tier3EmbedConsumer(bus, graph, embed, cache=cache)
+ consumer = EmbedConsumer(bus, graph, embed, cache=cache)
entity = self._make_entity_ref("foo.bar")
event = self._make_embed_dirty(entity)
@@ -483,7 +483,7 @@ async def test_cache_miss_embeds_and_stores(self):
cache.get_many = AsyncMock(return_value={}) # no cache hit
embed.embed_batch = AsyncMock(return_value=[api_vec])
- consumer = Tier3EmbedConsumer(bus, graph, embed, cache=cache)
+ consumer = EmbedConsumer(bus, graph, embed, cache=cache)
entity = self._make_entity_ref("foo.bar")
event = self._make_embed_dirty(entity)