Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions autobot-backend/services/mesh_brain/edge_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class EdgeLearner:
Consumes rag:feedback:{date} Redis streams from Phase 1 (#2024).
Reinforces edges between co-retrieved chunks. Creates new CO_RETRIEVED
edges after co_access_count >= threshold.

Cursor tracking: _cursors maps stream_key -> last consumed entry ID so
that repeated calls to consume_feedback_stream() only read NEW entries
instead of re-processing the entire stream from the start. Fix: #2102.
"""

def __init__(
Expand All @@ -52,6 +56,9 @@ def __init__(
self.ema_decay = ema_decay
self.creation_threshold = creation_threshold
self.initial_weight = initial_weight
# Per-stream cursor: stream_key -> last processed Redis entry ID.
# Prevents duplicate processing when the scheduler loops every second.
self._cursors: dict[str, str] = {}

async def on_retrieval(self, event: dict) -> None:
"""Process a single retrieval feedback event.
Expand Down Expand Up @@ -114,30 +121,52 @@ async def _maybe_create_edge(self, node_a: str, node_b: str) -> None:
)

async def consume_feedback_stream(self, date_key: str | None = None) -> int:
"""Consume all events from a dated feedback stream.
"""Consume only NEW events from a dated feedback stream. Fix: #2102.

Uses a per-stream cursor (_cursors) to remember the last processed
Redis entry ID so repeated calls by the scheduler do not re-process
the same events. Starts from the beginning only on the first call
for a given date key.

Stream key: rag:feedback:{YYYY-MM-DD}

Returns:
Number of events processed.
Number of NEW events processed in this call.
"""
if date_key is None:
date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")

stream_key = f"rag:feedback:{date_key}"
last_id = "0-0"
# Resume from exclusive lower bound. "0-0" reads from the start.
# After processing entry "T-S", we store "T-(S+1)" so the next
# xrange call excludes the already-processed entry.
# Note: _cursors are ephemeral (lost on process restart), which
# causes full re-consumption. Acceptable for now; file a follow-up
# for Redis-based cursor persistence before Phase 3 production.
resume_id = self._cursors.get(stream_key, "0-0")
processed = 0

while True:
entries = await self.redis.xrange(stream_key, min=last_id, count=100)
entries = await self.redis.xrange(stream_key, min=resume_id, count=100)
if not entries:
break
for entry_id, fields in entries:
await self.on_retrieval(fields)
last_id = entry_id
# Advance cursor past this entry (exclusive lower bound).
ts, seq = entry_id.split("-")
resume_id = f"{ts}-{int(seq) + 1}"
processed += 1
if len(entries) < 100:
break

logger.info("EdgeLearner: consumed %d events from %s", processed, stream_key)
# Persist cursor only when we actually advanced past an entry.
if processed > 0:
self._cursors[stream_key] = resume_id

if processed:
logger.info(
"EdgeLearner: consumed %d new events from %s", processed, stream_key
)
else:
logger.debug("EdgeLearner: no new events in %s", stream_key)
return processed
59 changes: 59 additions & 0 deletions autobot-backend/services/mesh_brain/edge_learner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,62 @@ async def test_consume_feedback_stream_uses_today_by_default(self):
today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
expected_key = f"rag:feedback:{today}"
redis.xrange.assert_awaited_once_with(expected_key, min="0-0", count=100)

@pytest.mark.asyncio
async def test_cursor_persists_between_calls_no_duplicate_processing(self):
"""Second call uses exclusive cursor — no re-processing. Fix: #2102."""
db = _make_db_mock()
redis = _make_redis_mock()

entry_a = ("1-0", {"final_ranked_ids": json.dumps(["A", "B"])})
entry_b = ("2-0", {"final_ranked_ids": json.dumps(["C", "D"])})

# First call: two entries processed, cursor advances to "2-1".
# Second call: xrange(min="2-1") returns nothing — all consumed.
redis.xrange = AsyncMock(
side_effect=[
[entry_a, entry_b], # first consume call
[], # second call — cursor "2-1" excludes everything
]
)

learner = _make_learner(db, redis)

count_first = await learner.consume_feedback_stream(date_key="2026-03-23")
assert count_first == 2

db.update_access_count.reset_mock()

count_second = await learner.consume_feedback_stream(date_key="2026-03-23")
assert count_second == 0
db.update_access_count.assert_not_awaited()

@pytest.mark.asyncio
async def test_cursor_advances_when_new_entries_arrive(self):
"""New entries after cursor are consumed. Fix: #2102."""
db = _make_db_mock()
redis = _make_redis_mock()

entry_a = ("1-0", {"final_ranked_ids": json.dumps(["A", "B"])})
entry_b = ("2-0", {"final_ranked_ids": json.dumps(["C", "D"])})
entry_c = ("3-0", {"final_ranked_ids": json.dumps(["E", "F"])})

# First call: only entry_a, cursor advances to "1-1".
# Second call: xrange(min="1-1") returns entry_b + entry_c.
redis.xrange = AsyncMock(
side_effect=[
[entry_a], # first call
[entry_b, entry_c], # second call — new entries only
]
)

learner = _make_learner(db, redis)

count_first = await learner.consume_feedback_stream(date_key="2026-03-23")
assert count_first == 1

db.update_access_count.reset_mock()

count_second = await learner.consume_feedback_stream(date_key="2026-03-23")
assert count_second == 2
assert db.update_access_count.await_count == 2
5 changes: 3 additions & 2 deletions autobot-backend/services/rag_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,16 @@ async def _store_feedback_in_stream(
"""Append retrieval feedback to a dated Redis stream. Issue #1516.

Stream key: rag:feedback:{YYYY-MM-DD} (UTC date).
TTL: 7 days so the mesh trainer has a rolling window of signal.
TTL: 30 days so Neural Mesh Phase 3 (#2056) can consume the data
before it expires. Increased from 7 days — Fix: #2102.

Args:
query: Raw query string.
retrieved_ids: Chunk IDs retrieved before reranking.
ranked_ids: Final ordered chunk IDs after reranking.
complexity: QueryComplexity.value string (Issue #2024).
"""
_STREAM_TTL_SECONDS = 7 * 24 * 3600
_STREAM_TTL_SECONDS = 30 * 24 * 3600
date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
stream_key = f"rag:feedback:{date_key}"
entry = {
Expand Down
8 changes: 4 additions & 4 deletions autobot-backend/services/rag_service_events_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def test_publish_error_does_not_propagate(self):
class TestStoreFeedbackInStream:
"""Tests for RAGService._store_feedback_in_stream()."""

_SEVEN_DAYS_SECONDS = 7 * 24 * 3600
_THIRTY_DAYS_SECONDS = 30 * 24 * 3600

def _make_service(self):
from services.rag_service import RAGService
Expand Down Expand Up @@ -237,8 +237,8 @@ async def test_stream_entry_contains_ids_as_json(self):
assert json.loads(entry["final_ranked_ids"]) == ["b"]

@pytest.mark.asyncio
async def test_expire_set_to_seven_days(self):
"""Stream key TTL is set to 7 days (604800 seconds)."""
async def test_expire_set_to_thirty_days(self):
"""Stream key TTL is set to 30 days (2592000 seconds). Fix: #2102."""
mock_redis = self._make_redis_mock()

with patch(
Expand All @@ -255,7 +255,7 @@ async def test_expire_set_to_seven_days(self):

assert mock_redis.expire.called
ttl_arg = mock_redis.expire.call_args[0][1]
assert ttl_arg == self._SEVEN_DAYS_SECONDS
assert ttl_arg == self._THIRTY_DAYS_SECONDS

@pytest.mark.asyncio
async def test_redis_unavailable_does_not_raise(self):
Expand Down
Loading