-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor.py
More file actions
193 lines (160 loc) · 6.3 KB
/
monitor.py
File metadata and controls
193 lines (160 loc) · 6.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
monitor.py — Reference Agent Health Monitor
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Reads each agent's deduplication state file and queries Numbers Mainnet
to report:
- Registrations per agent today (from state file sizes as proxy)
- Actual on-chain stats from mainnet.num.network
- Daily target vs. actual comparison
Run manually:
python monitor.py
Run as a cron job for daily Slack summary (add to crontab):
0 9 * * * cd /opt/numbers-agents && /opt/numbers-agents/venv/bin/python monitor.py --slack
"""
import argparse
import json
import os
import sys
import time
from datetime import date, datetime, timezone
from pathlib import Path
import httpx
from dotenv import load_dotenv
load_dotenv()
STATE_DIR = Path(os.getenv("STATE_DIR", "./state"))
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
AGENTS = {
"provart": {"target": 500, "label": "ProvArt (#1)"},
"newsprove": {"target": 300, "label": "NewsProve (#2)"},
"agentlog": {"target": 200, "label": "AgentLog (#3)"},
"dataprove": {"target": 200, "label": "DataProve (#4)"},
"socialprove": {"target": 200, "label": "SocialProve (#5)"},
"researchprove": {"target": 150, "label": "ResearchProve (#6)"},
"codeprove": {"target": 50, "label": "CodeProve (#7)"},
}
TOTAL_TARGET = sum(a["target"] for a in AGENTS.values()) # 1,600
# ── State file reader ─────────────────────────────────────────────────────────
def read_state_count(agent: str) -> int:
"""
Return the count of unique IDs registered by an agent.
This is a cumulative total (all-time), not today-only.
Use as a proxy for relative activity.
"""
path = STATE_DIR / f"{agent}_seen.json"
try:
with open(path) as f:
return len(json.load(f))
except (FileNotFoundError, json.JSONDecodeError):
return 0
# ── Mainnet stats ─────────────────────────────────────────────────────────────
def fetch_mainnet_stats() -> dict | None:
"""
Query mainnet.num.network for current transaction stats.
The explorer exposes a basic stats endpoint.
Returns None on failure.
"""
urls_to_try = [
"https://mainnet.num.network/api/v2/stats",
"https://mainnet.num.network/api/stats",
]
for url in urls_to_try:
try:
resp = httpx.get(
url,
headers={"Accept": "application/json"},
timeout=10,
)
if resp.status_code == 200:
return resp.json()
except Exception:
continue
return None
# ── Reporting ─────────────────────────────────────────────────────────────────
def render_report(mainnet: dict | None) -> str:
today = date.today().isoformat()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
lines = [
f"Numbers Protocol Reference Agents — Status Report",
f"Generated: {ts}",
f"",
f"{'Agent':<24} {'State IDs':>10} {'Target/day':>12}",
f"{'-'*52}",
]
total_ids = 0
for agent, info in AGENTS.items():
count = read_state_count(agent)
total_ids += count
indicator = "OK" if count > 0 else "WARN (no state)"
lines.append(
f"{info['label']:<24} {count:>10,} {info['target']:>10}/day [{indicator}]"
)
lines += [
f"{'-'*52}",
f"{'Total registered IDs':<24} {total_ids:>10,} {TOTAL_TARGET:>10}/day (target)",
f"",
]
# Mainnet stats
def _fmt(v):
return f"{int(v):,}" if str(v).isdigit() else str(v)
if mainnet:
lines += [
f"Numbers Mainnet (live)",
f" Total transactions: {_fmt(mainnet.get('total_transactions', 'n/a'))}",
f" Transactions today: {mainnet.get('transactions_today', 'n/a')}",
f" Total wallets: {_fmt(mainnet.get('total_addresses', 'n/a'))}",
]
else:
lines.append("Numbers Mainnet: could not reach explorer API")
lines += [
f"",
f"State directory: {STATE_DIR.resolve()}",
f"Note: 'State IDs' is a cumulative all-time count, not today-only.",
f" Check agent logs for today's registration counts.",
]
return "\n".join(lines)
def post_slack(text: str) -> None:
if not SLACK_WEBHOOK:
print("SLACK_WEBHOOK_URL not set — skipping Slack post", file=sys.stderr)
return
try:
resp = httpx.post(
SLACK_WEBHOOK,
json={"text": f"```\n{text}\n```"},
timeout=10,
)
resp.raise_for_status()
print("Slack report posted.")
except Exception as exc:
print(f"Slack post failed: {exc}", file=sys.stderr)
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Reference Agent Monitor")
parser.add_argument(
"--slack", action="store_true", help="Post report to Slack webhook"
)
parser.add_argument(
"--json", action="store_true", help="Output raw JSON instead of text"
)
args = parser.parse_args()
mainnet = fetch_mainnet_stats()
if args.json:
data = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"agents": {
agent: {
"state_ids": read_state_count(agent),
"target_per_day": info["target"],
}
for agent, info in AGENTS.items()
},
"total_target_per_day": TOTAL_TARGET,
"mainnet": mainnet,
}
print(json.dumps(data, indent=2))
return
report = render_report(mainnet)
print(report)
if args.slack:
post_slack(report)
if __name__ == "__main__":
main()