Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 87 additions & 14 deletions agents/agents/dynamic_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Wallet context is injected using the same pattern as static agents:
- OG balance fetched via 0G Chain RPC (urllib, not httpx)
- CoinGecko prices fetched via urllib
- SaucerSwap pools and Bonzo Finance markets fetched via urllib
- Concrete holdings prepended to user query
"""
import asyncio
Expand Down Expand Up @@ -68,6 +69,60 @@ def _fetch_prices() -> str:
return "Price data temporarily unavailable"


def _fetch_saucerswap_pools() -> str:
"""Fetch top SaucerSwap liquidity pools via urllib."""
try:
req = urllib.request.Request(
"https://api.saucerswap.finance/v2/pools",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
pools = json.loads(resp.read())
sorted_pools = sorted(pools, key=lambda p: float(p.get("tvl", 0) or 0), reverse=True)
result = [
{
"pair": f"{p.get('tokenA', {}).get('symbol', '?')}/{p.get('tokenB', {}).get('symbol', '?')}",
"tvl_usd": round(float(p.get("tvl", 0) or 0)),
"apr_pct": round(float(p.get("apr", 0) or 0), 2),
}
for p in sorted_pools[:10]
if float(p.get("tvl", 0) or 0) > 10000
]
return json.dumps(result, indent=2)
except Exception as e:
logger.debug("SaucerSwap fetch failed: %s", e)
return json.dumps([
{"pair": "HBAR/USDC", "tvl_usd": 5000000, "apr_pct": 12.5},
{"pair": "HBAR/HBARX", "tvl_usd": 3000000, "apr_pct": 18.2},
{"pair": "USDC/USDT", "tvl_usd": 8000000, "apr_pct": 4.1},
{"note": "SaucerSwap API unavailable — showing fallback data"},
], indent=2)


def _fetch_bonzo_markets() -> str:
"""Fetch Bonzo Finance lending/borrowing markets via urllib."""
try:
req = urllib.request.Request(
"https://api.bonzo.finance/v1/markets",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.read().decode()
except Exception as e:
logger.debug("Bonzo Finance fetch failed: %s", e)
return json.dumps({
"protocol": "Bonzo Finance",
"chain": "Hedera",
"markets": [
{"asset": "HBAR", "supply_apy": 3.5, "borrow_apy": 5.2, "tvl": 8000000},
{"asset": "USDC", "supply_apy": 6.1, "borrow_apy": 8.4, "tvl": 12000000},
{"asset": "HBARX", "supply_apy": 7.8, "borrow_apy": 10.1, "tvl": 3000000},
{"asset": "SAUCE", "supply_apy": 9.2, "borrow_apy": 12.5, "tvl": 1500000},
],
"note": "Bonzo Finance API unavailable — showing fallback data",
}, indent=2)


def _call_anthropic(system: str, user_message: str) -> str:
"""Call Anthropic Messages API via raw SSL socket (bypasses httpx)."""
body = json.dumps({
Expand Down Expand Up @@ -144,16 +199,23 @@ def __init__(self, name: str, description: str, system_prompt: str, price_per_ca

async def execute(self, query: str, wallet_address: str | None = None) -> str:
try:
# Fetch wallet context and prices in background threads (same pattern as static agents)
# Fetch all context in parallel (wallet, prices, Hedera DeFi data)
wallet_banner = ""
holdings_line = ""
price_context = ""

# Always fetch Hedera DeFi data (SaucerSwap + Bonzo) in parallel
fetches = [
asyncio.to_thread(_fetch_saucerswap_pools),
asyncio.to_thread(_fetch_bonzo_markets),
]

if wallet_address:
bal_data, prices = await asyncio.gather(
asyncio.to_thread(_fetch_wallet_balance, wallet_address),
asyncio.to_thread(_fetch_prices),
)
fetches.insert(0, asyncio.to_thread(_fetch_wallet_balance, wallet_address))
fetches.insert(1, asyncio.to_thread(_fetch_prices))
results = await asyncio.gather(*fetches)
bal_data, prices, saucerswap_data, bonzo_data = results[0], results[1], results[2], results[3]

bal = bal_data["balance"]
sym = bal_data["symbol"]
chain = bal_data["chain"]
Expand All @@ -167,19 +229,30 @@ async def execute(self, query: str, wallet_address: str | None = None) -> str:
f"That is 100% of my on-chain portfolio. "
)
price_context = prices
else:
results = await asyncio.gather(*fetches)
saucerswap_data, bonzo_data = results[0], results[1]

# Enhance system prompt with context injection rules
enhanced_system = self.system_prompt

# Inject Hedera DeFi data
enhanced_system += (
f"\n\nSAUCERSWAP DEX — TOP LIQUIDITY POOLS (live from SaucerSwap API):\n{saucerswap_data}\n\n"
f"BONZO FINANCE — LENDING/BORROWING MARKETS (live from Bonzo Finance API):\n{bonzo_data}\n\n"
)

if price_context:
enhanced_system += (
f"\n\nCURRENT MARKET PRICES (live from CoinGecko):\n{price_context}\n\n"
"IMPORTANT RULES:\n"
"- NEVER ask the user for more information. Always analyze with whatever data is provided.\n"
"- Use the real-time prices above in your analysis.\n"
"- If the user mentions token allocations (e.g. '80% ETH'), treat those as their portfolio.\n"
"- Always give concrete numbers, tables, and actionable recommendations.\n"
"- Format output as clean markdown with tables."
)
enhanced_system += f"CURRENT MARKET PRICES (live from CoinGecko):\n{price_context}\n\n"

enhanced_system += (
"IMPORTANT RULES:\n"
"- NEVER ask the user for more information. Always analyze with whatever data is provided.\n"
"- Use the real-time data above in your analysis.\n"
"- If the user mentions token allocations (e.g. '80% ETH'), treat those as their portfolio.\n"
"- Always give concrete numbers, tables, and actionable recommendations.\n"
"- Format output as clean markdown with tables."
)

user_message = holdings_line + query
llm_result = await asyncio.to_thread(_call_anthropic, enhanced_system, user_message)
Expand Down
4 changes: 4 additions & 0 deletions agents/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ class RegisterAgentRequest(BaseModel):
system_prompt: str
token_id: int
price_per_call: float = 0.001
x402_enabled: bool = False
allow_cross_agent: bool = False


@app.post("/agents/register")
Expand All @@ -370,6 +372,8 @@ async def register_dynamic_agent(body: RegisterAgentRequest) -> AgentResponse:
system_prompt=body.system_prompt,
token_id=body.token_id,
price_per_call=body.price_per_call,
x402_enabled=body.x402_enabled,
allow_cross_agent=body.allow_cross_agent,
)
# Add to the live orchestrator registry so it's immediately executable
AGENT_REGISTRY[agent_id] = agent
Expand Down
48 changes: 44 additions & 4 deletions agents/dynamic_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
_token_map: dict[int, str] = {}
_hedera_info: dict[str, dict[str, str]] = {} # agent_id -> {account, inbound, outbound}
_afc_earned: dict[str, float] = {} # agent_id -> cumulative AFC earned
_x402_configs: dict[str, dict] = {} # agent_id -> {x402_enabled, price_afc, allow_cross_agent, max_budget_afc}


def _read_store() -> list[dict]:
Expand All @@ -36,12 +37,13 @@ def _write_store(entries: list[dict]) -> None:

def load_dynamic_agents() -> dict[str, DynamicAgent]:
"""Load all dynamic agents from disk. Called at startup."""
global _dynamic_agents, _token_map, _hedera_info, _afc_earned
global _dynamic_agents, _token_map, _hedera_info, _afc_earned, _x402_configs
entries = _read_store()
agents: dict[str, DynamicAgent] = {}
tmap: dict[int, str] = {}
hinfo: dict[str, dict[str, str]] = {}
afc: dict[str, float] = {}
x402: dict[str, dict] = {}

for entry in entries:
agent_id = entry["agent_id"]
Expand All @@ -58,12 +60,20 @@ def load_dynamic_agents() -> dict[str, DynamicAgent]:
hinfo[agent_id] = entry["hedera"]
if "afc_earned" in entry:
afc[agent_id] = entry["afc_earned"]
if entry.get("x402_enabled"):
x402[agent_id] = {
"x402_enabled": entry.get("x402_enabled", False),
"price_afc": entry.get("price_afc", 1.0),
"allow_cross_agent": entry.get("allow_cross_agent", False),
"max_budget_afc": entry.get("max_budget_afc", 5.0),
}

with _lock:
_dynamic_agents = agents
_token_map = tmap
_hedera_info = hinfo
_afc_earned = afc
_x402_configs = x402

logger.info(f"Loaded {len(agents)} dynamic agents from disk")
return agents
Expand All @@ -76,6 +86,10 @@ def register_agent(
system_prompt: str,
token_id: int,
price_per_call: float = 0.001,
x402_enabled: bool = False,
allow_cross_agent: bool = False,
price_afc: float = 1.0,
max_budget_afc: float = 5.0,
) -> DynamicAgent:
"""Create, persist, and register a new dynamic agent."""
with _lock:
Expand All @@ -91,19 +105,33 @@ def register_agent(
_dynamic_agents[agent_id] = agent
_token_map[token_id] = agent_id

if x402_enabled:
_x402_configs[agent_id] = {
"x402_enabled": True,
"price_afc": price_afc,
"allow_cross_agent": allow_cross_agent,
"max_budget_afc": max_budget_afc,
}

# Persist
entries = _read_store()
entries.append({
entry: dict = {
"agent_id": agent_id,
"name": name,
"description": description,
"system_prompt": system_prompt,
"token_id": token_id,
"price_per_call": price_per_call,
})
}
if x402_enabled:
entry["x402_enabled"] = True
entry["price_afc"] = price_afc
entry["allow_cross_agent"] = allow_cross_agent
entry["max_budget_afc"] = max_budget_afc
entries.append(entry)
_write_store(entries)

logger.info(f"Registered dynamic agent: {agent_id} (tokenId={token_id})")
logger.info(f"Registered dynamic agent: {agent_id} (tokenId={token_id}, x402={x402_enabled})")
return agent


Expand Down Expand Up @@ -198,3 +226,15 @@ def get_afc_balances() -> dict[int, float]:
for token_id, agent_id in _token_map.items():
result[token_id] = _afc_earned.get(agent_id, 0.0)
return result


def get_x402_config(agent_id: str) -> dict | None:
"""Get x402 config for a dynamic agent, or None if not x402-enabled."""
with _lock:
return _x402_configs.get(agent_id)


def get_x402_enabled_agents() -> list[str]:
"""Return agent_ids of all dynamic agents with x402 enabled."""
with _lock:
return [aid for aid, cfg in _x402_configs.items() if cfg.get("x402_enabled")]
80 changes: 77 additions & 3 deletions agents/x402/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,31 @@ def is_authorized_on_chain(token_id: int, wallet_address: str) -> bool:


def get_registry_config(token_id: int) -> dict:
"""Read agent config from the on-chain AgentRegistry contract."""
"""Read agent config from the on-chain AgentRegistry contract.

Dynamic agents (token_id > 2) are not registered on-chain, so we check
the local dynamic registry first to avoid the contract returning default
(all-false) values for unknown token IDs.
"""
# Dynamic agents: check local registry first (contract returns zeros for unknown IDs)
if token_id > 2:
from dynamic_registry import get_token_map, get_x402_config
tmap = get_token_map()
agent_id = tmap.get(token_id)
if agent_id:
x402_cfg = get_x402_config(agent_id)
if x402_cfg:
hedera_accounts = get_full_hedera_accounts()
return {
"agent_hedera_account": hedera_accounts.get(token_id, ""),
"owner_hedera_account": OPERATOR_HEDERA_ACCOUNT,
"x402_enabled": x402_cfg.get("x402_enabled", False),
"price_afc": x402_cfg.get("price_afc", 1.0),
"price_usdt": 0.01,
"max_budget_afc": x402_cfg.get("max_budget_afc", 5.0),
"allow_cross_agent": x402_cfg.get("allow_cross_agent", False),
}

try:
if not AGENT_REGISTRY_ADDRESS:
raise ValueError("AGENT_REGISTRY_ADDRESS not set")
Expand Down Expand Up @@ -144,10 +168,60 @@ def _local_fallback_config(token_id: int) -> dict:
1: {"price_afc": 1.50, "price_usdt": 0.015, "max_budget_afc": 3.00, "allow_cross_agent": True},
2: {"price_afc": 0.50, "price_usdt": 0.005, "max_budget_afc": 2.00, "allow_cross_agent": False},
}
d = defaults.get(token_id, defaults[0])
if token_id in defaults:
d = defaults[token_id]
else:
# Check dynamic registry for non-static agents
from dynamic_registry import get_token_map, get_x402_config
tmap = get_token_map()
agent_id = tmap.get(token_id)
if agent_id:
x402_cfg = get_x402_config(agent_id)
if x402_cfg:
hedera_accounts = get_full_hedera_accounts()
return {
"agent_hedera_account": hedera_accounts.get(token_id, ""),
"owner_hedera_account": OPERATOR_HEDERA_ACCOUNT,
"x402_enabled": x402_cfg.get("x402_enabled", False),
"price_afc": x402_cfg.get("price_afc", 1.0),
"price_usdt": 0.01,
"max_budget_afc": x402_cfg.get("max_budget_afc", 5.0),
"allow_cross_agent": x402_cfg.get("allow_cross_agent", False),
}
d = {"price_afc": 1.00, "price_usdt": 0.01, "max_budget_afc": 5.00, "allow_cross_agent": False}
return {
"agent_hedera_account": AGENT_HEDERA_ACCOUNTS.get(token_id, ""),
"agent_hedera_account": AGENT_HEDERA_ACCOUNTS.get(token_id, get_full_hedera_accounts().get(token_id, "")),
"owner_hedera_account": OPERATOR_HEDERA_ACCOUNT,
"x402_enabled": True,
**d,
}


# ─── Dynamic-aware lookup functions ───────────────────────────


def get_full_agent_name_to_token_id() -> dict[str, int]:
"""Merge static AGENT_NAME_TO_TOKEN_ID with dynamic agents."""
from dynamic_registry import get_token_map
merged = dict(AGENT_NAME_TO_TOKEN_ID)
tmap = get_token_map()
for token_id, agent_id in tmap.items():
if agent_id not in merged:
merged[agent_id] = token_id
return merged


def get_full_hedera_accounts() -> dict[int, str]:
"""Merge static AGENT_HEDERA_ACCOUNTS with dynamic agents."""
from dynamic_registry import get_all_hedera_accounts
return get_all_hedera_accounts()


def get_full_cross_agent_recommendations(caller_name: str) -> list[str]:
"""Static recommendations + all dynamic agents with x402 enabled."""
from dynamic_registry import get_x402_enabled_agents
static = list(CROSS_AGENT_RECOMMENDATIONS.get(caller_name, []))
for agent_id in get_x402_enabled_agents():
if agent_id != caller_name and agent_id not in static:
static.append(agent_id)
return static
Loading
Loading