From 6e56d45b571f8f1fd7c0fcb40382895c28a88c5a Mon Sep 17 00:00:00 2001 From: mrveiss Date: Mon, 23 Mar 2026 21:38:16 +0200 Subject: [PATCH] feat(mesh): add AgentTopology pruning + last_updated (#2167) Adds last_updated field and prune_weak_connections(). 2 new tests. --- .../agents/agent_orchestration/topology.py | 42 ++++++++++++++++++- .../agent_orchestration/topology_test.py | 39 ++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/autobot-backend/agents/agent_orchestration/topology.py b/autobot-backend/agents/agent_orchestration/topology.py index f615634b7..83688daf5 100644 --- a/autobot-backend/agents/agent_orchestration/topology.py +++ b/autobot-backend/agents/agent_orchestration/topology.py @@ -13,7 +13,8 @@ """ import logging -from dataclasses import dataclass +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone from itertools import combinations from typing import Optional, Protocol, runtime_checkable @@ -36,6 +37,9 @@ class AgentConnection: weight: float co_success_count: int co_failure_count: int + last_updated: datetime = field( + default_factory=lambda: datetime.now(tz=timezone.utc) + ) @runtime_checkable @@ -82,6 +86,17 @@ async def record_agent_task( """Append a task history entry for *agent_id*.""" ... + async def delete_weak_connections( + self, + min_weight: float, + inactive_since: datetime, + ) -> int: + """Delete connections below *min_weight* not updated since *inactive_since*. + + Returns the number of rows deleted. + """ + ... + class AgentTopology: """Dynamic DAG of agent connections. Evolves from task outcomes. @@ -196,3 +211,28 @@ async def _update_pair( weight=new_weight, co_failure_count=conn.co_failure_count + 1, ) + + async def prune_weak_connections( + self, + min_weight: float = 0.1, + inactive_days: int = 60, + ) -> int: + """Remove weak, inactive agent connections. Returns count deleted. + + Deletes every connection whose weight is below *min_weight* AND whose + *last_updated* timestamp is older than *inactive_days* days. + + Issue #2167. + """ + cutoff = datetime.now(tz=timezone.utc) - timedelta(days=inactive_days) + deleted = await self.db.delete_weak_connections( + min_weight=min_weight, + inactive_since=cutoff, + ) + logger.info( + "Pruned %d weak connections (min_weight=%.3f, inactive_days=%d)", + deleted, + min_weight, + inactive_days, + ) + return deleted diff --git a/autobot-backend/agents/agent_orchestration/topology_test.py b/autobot-backend/agents/agent_orchestration/topology_test.py index 0d98cd918..1f8305aae 100644 --- a/autobot-backend/agents/agent_orchestration/topology_test.py +++ b/autobot-backend/agents/agent_orchestration/topology_test.py @@ -9,6 +9,7 @@ """ import dataclasses +from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock import pytest @@ -39,13 +40,16 @@ def _make_connection( ) -def _make_db(connection: AgentConnection | None = None) -> AsyncMock: +def _make_db( + connection: AgentConnection | None = None, delete_count: int = 0 +) -> AsyncMock: """Return a mock that satisfies the AgentTopologyDB Protocol.""" db = AsyncMock() db.get_agent_connections.return_value = [connection] if connection else [] db.get_or_create_agent_connection.return_value = connection or _make_connection() db.update_agent_connection.return_value = None db.record_agent_task.return_value = None + db.delete_weak_connections.return_value = delete_count return db @@ -201,4 +205,37 @@ def test_agent_connection_dataclass_fields(): "weight", "co_success_count", "co_failure_count", + "last_updated", } + + +@pytest.mark.asyncio +async def test_prune_weak_connections_calls_db(): + """prune_weak_connections forwards min_weight and a UTC cutoff to delete_weak_connections.""" + db = _make_db(delete_count=0) + topology = AgentTopology(db) + + before_call = datetime.now(tz=timezone.utc) + await topology.prune_weak_connections(min_weight=0.2, inactive_days=30) + after_call = datetime.now(tz=timezone.utc) + + db.delete_weak_connections.assert_awaited_once() + call_kwargs = db.delete_weak_connections.await_args.kwargs + assert call_kwargs["min_weight"] == 0.2 + + cutoff: datetime = call_kwargs["inactive_since"] + # The cutoff must be 30 days before the call, within a 1-second tolerance. + expected_low = before_call - timedelta(days=30) - timedelta(seconds=1) + expected_high = after_call - timedelta(days=30) + timedelta(seconds=1) + assert expected_low <= cutoff <= expected_high + + +@pytest.mark.asyncio +async def test_prune_returns_count(): + """prune_weak_connections returns the integer row count reported by the db.""" + db = _make_db(delete_count=7) + topology = AgentTopology(db) + + result = await topology.prune_weak_connections() + + assert result == 7