diff --git a/autobot-backend/services/mesh_brain/edge_learner.py b/autobot-backend/services/mesh_brain/edge_learner.py index 111eb433e..3a990eadb 100644 --- a/autobot-backend/services/mesh_brain/edge_learner.py +++ b/autobot-backend/services/mesh_brain/edge_learner.py @@ -37,6 +37,10 @@ class EdgeLearner: Consumes rag:feedback:{date} Redis streams from Phase 1 (#2024). Reinforces edges between co-retrieved chunks. Creates new CO_RETRIEVED edges after co_access_count >= threshold. + + Cursor tracking: _cursors maps stream_key -> last consumed entry ID so + that repeated calls to consume_feedback_stream() only read NEW entries + instead of re-processing the entire stream from the start. Fix: #2102. """ def __init__( @@ -52,6 +56,9 @@ def __init__( self.ema_decay = ema_decay self.creation_threshold = creation_threshold self.initial_weight = initial_weight + # Per-stream cursor: stream_key -> last processed Redis entry ID. + # Prevents duplicate processing when the scheduler loops every second. + self._cursors: dict[str, str] = {} async def on_retrieval(self, event: dict) -> None: """Process a single retrieval feedback event. @@ -114,30 +121,52 @@ async def _maybe_create_edge(self, node_a: str, node_b: str) -> None: ) async def consume_feedback_stream(self, date_key: str | None = None) -> int: - """Consume all events from a dated feedback stream. + """Consume only NEW events from a dated feedback stream. Fix: #2102. + + Uses a per-stream cursor (_cursors) to remember the last processed + Redis entry ID so repeated calls by the scheduler do not re-process + the same events. Starts from the beginning only on the first call + for a given date key. Stream key: rag:feedback:{YYYY-MM-DD} Returns: - Number of events processed. + Number of NEW events processed in this call. """ if date_key is None: date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") stream_key = f"rag:feedback:{date_key}" - last_id = "0-0" + # Resume from exclusive lower bound. "0-0" reads from the start. + # After processing entry "T-S", we store "T-(S+1)" so the next + # xrange call excludes the already-processed entry. + # Note: _cursors are ephemeral (lost on process restart), which + # causes full re-consumption. Acceptable for now; file a follow-up + # for Redis-based cursor persistence before Phase 3 production. + resume_id = self._cursors.get(stream_key, "0-0") processed = 0 while True: - entries = await self.redis.xrange(stream_key, min=last_id, count=100) + entries = await self.redis.xrange(stream_key, min=resume_id, count=100) if not entries: break for entry_id, fields in entries: await self.on_retrieval(fields) - last_id = entry_id + # Advance cursor past this entry (exclusive lower bound). + ts, seq = entry_id.split("-") + resume_id = f"{ts}-{int(seq) + 1}" processed += 1 if len(entries) < 100: break - logger.info("EdgeLearner: consumed %d events from %s", processed, stream_key) + # Persist cursor only when we actually advanced past an entry. + if processed > 0: + self._cursors[stream_key] = resume_id + + if processed: + logger.info( + "EdgeLearner: consumed %d new events from %s", processed, stream_key + ) + else: + logger.debug("EdgeLearner: no new events in %s", stream_key) return processed diff --git a/autobot-backend/services/mesh_brain/edge_learner_test.py b/autobot-backend/services/mesh_brain/edge_learner_test.py index 5ac264ee5..4141ddf1c 100644 --- a/autobot-backend/services/mesh_brain/edge_learner_test.py +++ b/autobot-backend/services/mesh_brain/edge_learner_test.py @@ -192,3 +192,62 @@ async def test_consume_feedback_stream_uses_today_by_default(self): today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") expected_key = f"rag:feedback:{today}" redis.xrange.assert_awaited_once_with(expected_key, min="0-0", count=100) + + @pytest.mark.asyncio + async def test_cursor_persists_between_calls_no_duplicate_processing(self): + """Second call uses exclusive cursor — no re-processing. Fix: #2102.""" + db = _make_db_mock() + redis = _make_redis_mock() + + entry_a = ("1-0", {"final_ranked_ids": json.dumps(["A", "B"])}) + entry_b = ("2-0", {"final_ranked_ids": json.dumps(["C", "D"])}) + + # First call: two entries processed, cursor advances to "2-1". + # Second call: xrange(min="2-1") returns nothing — all consumed. + redis.xrange = AsyncMock( + side_effect=[ + [entry_a, entry_b], # first consume call + [], # second call — cursor "2-1" excludes everything + ] + ) + + learner = _make_learner(db, redis) + + count_first = await learner.consume_feedback_stream(date_key="2026-03-23") + assert count_first == 2 + + db.update_access_count.reset_mock() + + count_second = await learner.consume_feedback_stream(date_key="2026-03-23") + assert count_second == 0 + db.update_access_count.assert_not_awaited() + + @pytest.mark.asyncio + async def test_cursor_advances_when_new_entries_arrive(self): + """New entries after cursor are consumed. Fix: #2102.""" + db = _make_db_mock() + redis = _make_redis_mock() + + entry_a = ("1-0", {"final_ranked_ids": json.dumps(["A", "B"])}) + entry_b = ("2-0", {"final_ranked_ids": json.dumps(["C", "D"])}) + entry_c = ("3-0", {"final_ranked_ids": json.dumps(["E", "F"])}) + + # First call: only entry_a, cursor advances to "1-1". + # Second call: xrange(min="1-1") returns entry_b + entry_c. + redis.xrange = AsyncMock( + side_effect=[ + [entry_a], # first call + [entry_b, entry_c], # second call — new entries only + ] + ) + + learner = _make_learner(db, redis) + + count_first = await learner.consume_feedback_stream(date_key="2026-03-23") + assert count_first == 1 + + db.update_access_count.reset_mock() + + count_second = await learner.consume_feedback_stream(date_key="2026-03-23") + assert count_second == 2 + assert db.update_access_count.await_count == 2 diff --git a/autobot-backend/services/rag_service.py b/autobot-backend/services/rag_service.py index 369a16431..c25dc346a 100644 --- a/autobot-backend/services/rag_service.py +++ b/autobot-backend/services/rag_service.py @@ -355,7 +355,8 @@ async def _store_feedback_in_stream( """Append retrieval feedback to a dated Redis stream. Issue #1516. Stream key: rag:feedback:{YYYY-MM-DD} (UTC date). - TTL: 7 days so the mesh trainer has a rolling window of signal. + TTL: 30 days so Neural Mesh Phase 3 (#2056) can consume the data + before it expires. Increased from 7 days — Fix: #2102. Args: query: Raw query string. @@ -363,7 +364,7 @@ async def _store_feedback_in_stream( ranked_ids: Final ordered chunk IDs after reranking. complexity: QueryComplexity.value string (Issue #2024). """ - _STREAM_TTL_SECONDS = 7 * 24 * 3600 + _STREAM_TTL_SECONDS = 30 * 24 * 3600 date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d") stream_key = f"rag:feedback:{date_key}" entry = { diff --git a/autobot-backend/services/rag_service_events_test.py b/autobot-backend/services/rag_service_events_test.py index cbbb95fd3..4ecb638fa 100644 --- a/autobot-backend/services/rag_service_events_test.py +++ b/autobot-backend/services/rag_service_events_test.py @@ -154,7 +154,7 @@ async def test_publish_error_does_not_propagate(self): class TestStoreFeedbackInStream: """Tests for RAGService._store_feedback_in_stream().""" - _SEVEN_DAYS_SECONDS = 7 * 24 * 3600 + _THIRTY_DAYS_SECONDS = 30 * 24 * 3600 def _make_service(self): from services.rag_service import RAGService @@ -237,8 +237,8 @@ async def test_stream_entry_contains_ids_as_json(self): assert json.loads(entry["final_ranked_ids"]) == ["b"] @pytest.mark.asyncio - async def test_expire_set_to_seven_days(self): - """Stream key TTL is set to 7 days (604800 seconds).""" + async def test_expire_set_to_thirty_days(self): + """Stream key TTL is set to 30 days (2592000 seconds). Fix: #2102.""" mock_redis = self._make_redis_mock() with patch( @@ -255,7 +255,7 @@ async def test_expire_set_to_seven_days(self): assert mock_redis.expire.called ttl_arg = mock_redis.expire.call_args[0][1] - assert ttl_arg == self._SEVEN_DAYS_SECONDS + assert ttl_arg == self._THIRTY_DAYS_SECONDS @pytest.mark.asyncio async def test_redis_unavailable_does_not_raise(self):