From b6094d50288e1510244ee3740c8884d2c0642f44 Mon Sep 17 00:00:00 2001 From: mrveiss Date: Mon, 23 Mar 2026 19:55:44 +0200 Subject: [PATCH 01/12] fix(rag): consume feedback streams with cursor tracking (#2102) EdgeLearner now tracks per-stream cursors to consume only new events on each scheduler call, preventing duplicate processing. Stream TTL extended from 7 to 30 days so data survives until Phase 3 consumers are wired. Adds two tests for cursor persistence and advancement. --- .../services/mesh_brain/edge_learner.py | 37 ++++++++-- .../services/mesh_brain/edge_learner_test.py | 69 +++++++++++++++++++ autobot-backend/services/rag_service.py | 5 +- .../services/rag_service_events_test.py | 8 +-- 4 files changed, 107 insertions(+), 12 deletions(-) diff --git a/autobot-backend/services/mesh_brain/edge_learner.py b/autobot-backend/services/mesh_brain/edge_learner.py index 111eb433e..2365f4e50 100644 --- a/autobot-backend/services/mesh_brain/edge_learner.py +++ b/autobot-backend/services/mesh_brain/edge_learner.py @@ -37,6 +37,10 @@ class EdgeLearner: Consumes rag:feedback:{date} Redis streams from Phase 1 (#2024). Reinforces edges between co-retrieved chunks. Creates new CO_RETRIEVED edges after co_access_count >= threshold. + + Cursor tracking: _cursors maps stream_key -> last consumed entry ID so + that repeated calls to consume_feedback_stream() only read NEW entries + instead of re-processing the entire stream from the start. Fix: #2102. """ def __init__( @@ -52,6 +56,9 @@ def __init__( self.ema_decay = ema_decay self.creation_threshold = creation_threshold self.initial_weight = initial_weight + # Per-stream cursor: stream_key -> last processed Redis entry ID. + # Prevents duplicate processing when the scheduler loops every second. + self._cursors: dict[str, str] = {} async def on_retrieval(self, event: dict) -> None: """Process a single retrieval feedback event. @@ -114,30 +121,48 @@ async def _maybe_create_edge(self, node_a: str, node_b: str) -> None: ) async def consume_feedback_stream(self, date_key: str | None = None) -> int: - """Consume all events from a dated feedback stream. + """Consume only NEW events from a dated feedback stream. Fix: #2102. + + Uses a per-stream cursor (_cursors) to remember the last processed + Redis entry ID so repeated calls by the scheduler do not re-process + the same events. Starts from the beginning only on the first call + for a given date key. Stream key: rag:feedback:{YYYY-MM-DD} Returns: - Number of events processed. + Number of NEW events processed in this call. """ if date_key is None: date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") stream_key = f"rag:feedback:{date_key}" - last_id = "0-0" + # Resume from last seen ID; "0-0" means read from the very beginning. + resume_id = self._cursors.get(stream_key, "0-0") processed = 0 while True: - entries = await self.redis.xrange(stream_key, min=last_id, count=100) + entries = await self.redis.xrange(stream_key, min=resume_id, count=100) if not entries: break for entry_id, fields in entries: + # Skip the exact ID we already processed on a previous call. + if entry_id == resume_id and processed == 0: + continue await self.on_retrieval(fields) - last_id = entry_id + resume_id = entry_id processed += 1 if len(entries) < 100: break - logger.info("EdgeLearner: consumed %d events from %s", processed, stream_key) + # Persist cursor only when we actually advanced past at least one entry. + if processed > 0: + self._cursors[stream_key] = resume_id + + if processed: + logger.info( + "EdgeLearner: consumed %d new events from %s", processed, stream_key + ) + else: + logger.debug("EdgeLearner: no new events in %s", stream_key) return processed diff --git a/autobot-backend/services/mesh_brain/edge_learner_test.py b/autobot-backend/services/mesh_brain/edge_learner_test.py index 5ac264ee5..d889f93cb 100644 --- a/autobot-backend/services/mesh_brain/edge_learner_test.py +++ b/autobot-backend/services/mesh_brain/edge_learner_test.py @@ -192,3 +192,72 @@ async def test_consume_feedback_stream_uses_today_by_default(self): today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") expected_key = f"rag:feedback:{today}" redis.xrange.assert_awaited_once_with(expected_key, min="0-0", count=100) + + @pytest.mark.asyncio + async def test_cursor_persists_between_calls_no_duplicate_processing(self): + """Second call resumes from cursor — already-seen entry is skipped. Fix: #2102.""" + db = _make_db_mock() + redis = _make_redis_mock() + + entry_a = ("1-0", {"final_ranked_ids": json.dumps(["A", "B"])}) + entry_b = ("2-0", {"final_ranked_ids": json.dumps(["C", "D"])}) + + # First call: two entries; second call: only entry_b visible from cursor "1-0" + redis.xrange = AsyncMock( + side_effect=[ + [entry_a, entry_b], # first consume call + [ + entry_b + ], # second call resumes from "1-0", Redis returns "2-0" onwards + [], # second call inner loop — nothing more + ] + ) + + learner = _make_learner(db, redis) + + count_first = await learner.consume_feedback_stream(date_key="2026-03-23") + assert count_first == 2 + + # Reset access_count tracking to measure only the second call's effect + db.update_access_count.reset_mock() + + count_second = await learner.consume_feedback_stream(date_key="2026-03-23") + # Second call sees entry_b returned again at cursor boundary; it should be + # skipped via the cursor-skip guard (entry_id == resume_id and processed==0). + assert count_second == 0 + db.update_access_count.assert_not_awaited() + + @pytest.mark.asyncio + async def test_cursor_advances_when_new_entries_arrive(self): + """New entries written after the first call are consumed on the next call. Fix: #2102.""" + db = _make_db_mock() + redis = _make_redis_mock() + + entry_a = ("1-0", {"final_ranked_ids": json.dumps(["A", "B"])}) + entry_b = ("2-0", {"final_ranked_ids": json.dumps(["C", "D"])}) + entry_c = ("3-0", {"final_ranked_ids": json.dumps(["E", "F"])}) + + # First call: only entry_a exists + # Second call resumes from "1-0"; Redis returns entry_a (the cursor) + entry_b + entry_c + redis.xrange = AsyncMock( + side_effect=[ + [entry_a], # first call + [ + entry_a, + entry_b, + entry_c, + ], # second call (entry_a is the cursor boundary) + ] + ) + + learner = _make_learner(db, redis) + + count_first = await learner.consume_feedback_stream(date_key="2026-03-23") + assert count_first == 1 + + db.update_access_count.reset_mock() + + count_second = await learner.consume_feedback_stream(date_key="2026-03-23") + # entry_a is skipped (cursor boundary); entry_b and entry_c are new + assert count_second == 2 + assert db.update_access_count.await_count == 2 diff --git a/autobot-backend/services/rag_service.py b/autobot-backend/services/rag_service.py index 369a16431..c25dc346a 100644 --- a/autobot-backend/services/rag_service.py +++ b/autobot-backend/services/rag_service.py @@ -355,7 +355,8 @@ async def _store_feedback_in_stream( """Append retrieval feedback to a dated Redis stream. Issue #1516. Stream key: rag:feedback:{YYYY-MM-DD} (UTC date). - TTL: 7 days so the mesh trainer has a rolling window of signal. + TTL: 30 days so Neural Mesh Phase 3 (#2056) can consume the data + before it expires. Increased from 7 days — Fix: #2102. Args: query: Raw query string. @@ -363,7 +364,7 @@ async def _store_feedback_in_stream( ranked_ids: Final ordered chunk IDs after reranking. complexity: QueryComplexity.value string (Issue #2024). """ - _STREAM_TTL_SECONDS = 7 * 24 * 3600 + _STREAM_TTL_SECONDS = 30 * 24 * 3600 date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") stream_key = f"rag:feedback:{date_key}" entry = { diff --git a/autobot-backend/services/rag_service_events_test.py b/autobot-backend/services/rag_service_events_test.py index cbbb95fd3..4ecb638fa 100644 --- a/autobot-backend/services/rag_service_events_test.py +++ b/autobot-backend/services/rag_service_events_test.py @@ -154,7 +154,7 @@ async def test_publish_error_does_not_propagate(self): class TestStoreFeedbackInStream: """Tests for RAGService._store_feedback_in_stream().""" - _SEVEN_DAYS_SECONDS = 7 * 24 * 3600 + _THIRTY_DAYS_SECONDS = 30 * 24 * 3600 def _make_service(self): from services.rag_service import RAGService @@ -237,8 +237,8 @@ async def test_stream_entry_contains_ids_as_json(self): assert json.loads(entry["final_ranked_ids"]) == ["b"] @pytest.mark.asyncio - async def test_expire_set_to_seven_days(self): - """Stream key TTL is set to 7 days (604800 seconds).""" + async def test_expire_set_to_thirty_days(self): + """Stream key TTL is set to 30 days (2592000 seconds). Fix: #2102.""" mock_redis = self._make_redis_mock() with patch( @@ -255,7 +255,7 @@ async def test_expire_set_to_seven_days(self): assert mock_redis.expire.called ttl_arg = mock_redis.expire.call_args[0][1] - assert ttl_arg == self._SEVEN_DAYS_SECONDS + assert ttl_arg == self._THIRTY_DAYS_SECONDS @pytest.mark.asyncio async def test_redis_unavailable_does_not_raise(self): From f01adbcd7001604c5e7e4872c3a32c022f84e91b Mon Sep 17 00:00:00 2001 From: Martins Veiss Date: Mon, 23 Mar 2026 18:59:21 +0200 Subject: [PATCH 02/12] fix(frontend): add missing navigation links for registered routes (#2071) (#2100) * fix(frontend): add missing navigation links for 7 registered routes (#2071) * refactor(frontend): data-driven nav with v-for + i18n for all locales (#2071) - Replace 500+ lines of duplicated desktop/mobile nav markup with a single navItems array and v-for loops (App.vue: 1247 -> 857 lines) - Add navItems return value to Options API setup - Add missing nav.* i18n keys to all 10 non-English locale files (ar, de, es, fa, fr, he, lv, pl, pt, ur) using English as placeholder - Keep SLM Admin as a special-case external link --- autobot-frontend/src/App.vue | 231 +++++----------------- autobot-frontend/src/i18n/locales/ar.json | 11 +- autobot-frontend/src/i18n/locales/de.json | 11 +- autobot-frontend/src/i18n/locales/en.json | 7 + autobot-frontend/src/i18n/locales/es.json | 11 +- autobot-frontend/src/i18n/locales/fa.json | 209 ++++++++++---------- autobot-frontend/src/i18n/locales/fr.json | 11 +- autobot-frontend/src/i18n/locales/he.json | 209 ++++++++++---------- autobot-frontend/src/i18n/locales/lv.json | 11 +- autobot-frontend/src/i18n/locales/pl.json | 11 +- autobot-frontend/src/i18n/locales/pt.json | 11 +- autobot-frontend/src/i18n/locales/ur.json | 209 ++++++++++---------- autobot-frontend/src/router/index.ts | 2 +- 13 files changed, 439 insertions(+), 505 deletions(-) diff --git a/autobot-frontend/src/App.vue b/autobot-frontend/src/App.vue index 8407a0a08..3d0052d74 100644 --- a/autobot-frontend/src/App.vue +++ b/autobot-frontend/src/App.vue @@ -38,107 +38,29 @@