Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion autobot-backend/agents/agent_orchestration/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"""

import logging
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from itertools import combinations
from typing import Optional, Protocol, runtime_checkable

Expand All @@ -36,6 +37,9 @@ class AgentConnection:
weight: float
co_success_count: int
co_failure_count: int
last_updated: datetime = field(
default_factory=lambda: datetime.now(tz=timezone.utc)
)


@runtime_checkable
Expand Down Expand Up @@ -82,6 +86,17 @@ async def record_agent_task(
"""Append a task history entry for *agent_id*."""
...

async def delete_weak_connections(
self,
min_weight: float,
inactive_since: datetime,
) -> int:
"""Delete connections below *min_weight* not updated since *inactive_since*.

Returns the number of rows deleted.
"""
...


class AgentTopology:
"""Dynamic DAG of agent connections. Evolves from task outcomes.
Expand Down Expand Up @@ -196,3 +211,28 @@ async def _update_pair(
weight=new_weight,
co_failure_count=conn.co_failure_count + 1,
)

async def prune_weak_connections(
self,
min_weight: float = 0.1,
inactive_days: int = 60,
) -> int:
"""Remove weak, inactive agent connections. Returns count deleted.

Deletes every connection whose weight is below *min_weight* AND whose
*last_updated* timestamp is older than *inactive_days* days.

Issue #2167.
"""
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=inactive_days)
deleted = await self.db.delete_weak_connections(
min_weight=min_weight,
inactive_since=cutoff,
)
logger.info(
"Pruned %d weak connections (min_weight=%.3f, inactive_days=%d)",
deleted,
min_weight,
inactive_days,
)
return deleted
39 changes: 38 additions & 1 deletion autobot-backend/agents/agent_orchestration/topology_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import dataclasses
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock

import pytest
Expand Down Expand Up @@ -39,13 +40,16 @@ def _make_connection(
)


def _make_db(connection: AgentConnection | None = None) -> AsyncMock:
def _make_db(
connection: AgentConnection | None = None, delete_count: int = 0
) -> AsyncMock:
"""Return a mock that satisfies the AgentTopologyDB Protocol."""
db = AsyncMock()
db.get_agent_connections.return_value = [connection] if connection else []
db.get_or_create_agent_connection.return_value = connection or _make_connection()
db.update_agent_connection.return_value = None
db.record_agent_task.return_value = None
db.delete_weak_connections.return_value = delete_count
return db


Expand Down Expand Up @@ -201,4 +205,37 @@ def test_agent_connection_dataclass_fields():
"weight",
"co_success_count",
"co_failure_count",
"last_updated",
}


@pytest.mark.asyncio
async def test_prune_weak_connections_calls_db():
"""prune_weak_connections forwards min_weight and a UTC cutoff to delete_weak_connections."""
db = _make_db(delete_count=0)
topology = AgentTopology(db)

before_call = datetime.now(tz=timezone.utc)
await topology.prune_weak_connections(min_weight=0.2, inactive_days=30)
after_call = datetime.now(tz=timezone.utc)

db.delete_weak_connections.assert_awaited_once()
call_kwargs = db.delete_weak_connections.await_args.kwargs
assert call_kwargs["min_weight"] == 0.2

cutoff: datetime = call_kwargs["inactive_since"]
# The cutoff must be 30 days before the call, within a 1-second tolerance.
expected_low = before_call - timedelta(days=30) - timedelta(seconds=1)
expected_high = after_call - timedelta(days=30) + timedelta(seconds=1)
assert expected_low <= cutoff <= expected_high


@pytest.mark.asyncio
async def test_prune_returns_count():
"""prune_weak_connections returns the integer row count reported by the db."""
db = _make_db(delete_count=7)
topology = AgentTopology(db)

result = await topology.prune_weak_connections()

assert result == 7
Loading