-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathrun_eval.py
More file actions
284 lines (243 loc) · 9.79 KB
/
run_eval.py
File metadata and controls
284 lines (243 loc) · 9.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""Main benchmark evaluation script.
Usage:
uv run python run_eval.py # defaults: browser-use-cloud + bu-2-0
uv run python run_eval.py --browser anchor # use Anchor Browser provider
uv run python run_eval.py --browser local_headless # use local headless Chromium
uv run python run_eval.py --tasks 5 # run only 5 tasks
Available browsers: browser-use-cloud (default), anchor, browserbase,
browserless, hyperbrowser, local_headful, local_headless, onkernel,
rebrowser, steel
"""
# Fix for MacOS users using uv without SSL certificate setup
import certifi, os
os.environ.setdefault("SSL_CERT_FILE", certifi.where())
import logging
os.environ["BROWSER_USE_SETUP_LOGGING"] = (
"false" # Must be set before importing browser_use
)
logging.basicConfig(
level=logging.CRITICAL
) # Suppress all logs including shutdown warnings
import argparse
import asyncio
import base64, hashlib, json, traceback
from datetime import datetime
from pathlib import Path
from cryptography.fernet import Fernet
from dotenv import load_dotenv
from browser_use import Agent, Browser, ChatGoogle
from browser_use.llm import ChatBrowserUse
from browsers import PROVIDERS, get_provider
from judge import construct_judge_messages, JudgementResult
load_dotenv()
# Judge LLM - always use gemini-2.5-flash for consistent judging across all evaluations
JUDGE_LLM = ChatGoogle(model="gemini-2.5-flash", api_key=os.getenv("GOOGLE_API_KEY"))
TASKS_FILE = Path(__file__).parent / "BU_Bench_V1.enc"
MAX_CONCURRENT = 3
TASK_TIMEOUT = 1800 # 30 minutes max per task
AGENT_FRAMEWORK_NAME = "BrowserUse"
AGENT_FRAMEWORK_VERSION = "0.11.5"
MODEL_NAME = "bu-2-0"
def encode_screenshots(paths: list[str]) -> list[str]:
"""Encode screenshot files to base64. Skips files that don't exist."""
result = []
for p in paths:
path = Path(p)
if path.exists():
result.append(base64.b64encode(path.read_bytes()).decode())
return result
def load_tasks() -> list[dict]:
key = base64.urlsafe_b64encode(hashlib.sha256(b"BU_Bench_V1").digest())
encrypted = base64.b64decode(TASKS_FILE.read_text())
return json.loads(Fernet(key).decrypt(encrypted))
async def create_browser(browser_provider) -> Browser:
"""Create a Browser instance from a provider module.
browser-use-cloud uses the native use_cloud=True path.
Local providers launch browser-use's built-in Chromium.
All other providers return a CDP URL for Browser(cdp_url=...).
"""
if browser_provider is None:
return Browser(use_cloud=True, cloud_timeout=30)
cdp_url = await browser_provider.connect()
if cdp_url is None:
return Browser(headless=getattr(browser_provider, "HEADLESS", True))
return Browser(cdp_url=cdp_url)
async def run_task(
task: dict,
semaphore: asyncio.Semaphore,
browser_provider=None,
llm=None,
run_data_dir: Path = None,
) -> dict:
"""Run a single task. Returns result dict with score (0 on failure).
Args:
browser_provider: Browser provider module (None = browser-use-cloud).
llm: LLM to use. Defaults to ChatBrowserUse().
run_data_dir: Directory for trace output.
"""
async with semaphore:
try:
task_id = task.get("task_id", "unknown")
print(f"Running task: {task_id}")
browser = await create_browser(browser_provider)
# To swap model: replace ChatBrowserUse() with your LLM (e.g. ChatOpenAI, ChatAnthropic)
# You can use any OpenAI API compatible model by changing base_url. You can use ollama too. See https://docs.browser-use.com/supported-models for info
agent = Agent(
task=task["confirmed_task"],
llm=llm or ChatBrowserUse(model="bu-2-0"),
browser=browser,
)
try:
agent_history = await asyncio.wait_for(
agent.run(), timeout=TASK_TIMEOUT
)
except asyncio.TimeoutError:
await browser.stop()
if browser_provider:
await browser_provider.disconnect()
print(f"Task {task_id} timed out after {TASK_TIMEOUT}s")
return {
"task_id": task_id,
"score": 0,
"steps": 0,
"duration": TASK_TIMEOUT,
"cost": 0,
"error": f"Task timed out after {TASK_TIMEOUT}s",
}
if browser_provider:
await browser_provider.disconnect()
# Collect task metrics from agent history
steps = agent_history.number_of_steps()
duration = agent_history.total_duration_seconds()
cost = agent_history.usage.total_cost if agent_history.usage else 0
# Collect judge inputs from agent history
agent_task = task["confirmed_task"]
final_result = (
agent_history.final_result() or "Agent did not return a result"
)
agent_steps = agent_history.agent_steps()
ground_truth = task.get("answer")
screenshots_b64 = encode_screenshots(
[p for p in agent_history.screenshot_paths() if p is not None]
)
# Run judge
judge_messages = construct_judge_messages(
task=agent_task,
final_result=final_result,
agent_steps=agent_steps,
ground_truth=ground_truth,
screenshots_b64=screenshots_b64,
)
response = await JUDGE_LLM.ainvoke(
judge_messages, output_format=JudgementResult
)
judgement: JudgementResult = response.completion
score = 1 if judgement.verdict else 0
print(
f"Task {task_id} completed: score={score}, verdict={judgement.verdict}"
)
# Save trace to run_data/
run_data_dir.mkdir(parents=True, exist_ok=True)
trace = {
"agent_task": agent_task,
"final_result": final_result,
"agent_steps": agent_steps,
"ground_truth": ground_truth,
"screenshots_b64": screenshots_b64,
}
metrics = {"steps": steps, "duration": duration, "cost": cost}
(run_data_dir / f"{task_id}.json").write_text(
json.dumps(
{
"agent_trace": trace,
"metrics": metrics,
"judgement": judgement.model_dump(),
},
indent=2,
)
)
return {
"task_id": task_id,
"score": score,
"steps": steps,
"duration": duration,
"cost": cost,
"judgement": judgement.model_dump(),
}
except Exception as e:
error_type = type(e).__name__
error_msg = f"{error_type}: {e}"
print(f"Task {task.get('task_id', 'unknown')} failed: {error_msg}")
return {
"task_id": task.get("task_id"),
"score": 0,
"steps": 0,
"duration": 0,
"cost": 0,
"error": error_msg,
"traceback": traceback.format_exc(),
}
async def main():
parser = argparse.ArgumentParser(description="Run BU_Bench_V1 evaluation")
parser.add_argument(
"--browser",
default="browser-use-cloud",
choices=["browser-use-cloud"] + PROVIDERS,
help="Browser provider (default: browser-use-cloud)",
)
parser.add_argument(
"--tasks",
type=int,
default=None,
help="Number of tasks to run (default: all)",
)
args = parser.parse_args()
# Resolve browser provider (None = use native browser-use-cloud path)
browser_name = args.browser
if browser_name == "browser-use-cloud":
browser_provider = None
else:
browser_provider = get_provider(browser_name)
# Build run key and paths
run_start = datetime.now().strftime("%Y%m%d_%H%M%S")
run_key = f"{AGENT_FRAMEWORK_NAME}_{AGENT_FRAMEWORK_VERSION}_browser_{browser_name}_model_{MODEL_NAME}"
run_data_dir = (
Path(__file__).parent / "run_data" / f"{run_key}_start_at_{run_start}"
)
results_file = Path(__file__).parent / "results" / f"{run_key}.json"
tasks = load_tasks()
if args.tasks:
tasks = tasks[: args.tasks]
sem = asyncio.Semaphore(MAX_CONCURRENT)
results = await asyncio.gather(
*[
run_task(
t, sem, browser_provider=browser_provider, run_data_dir=run_data_dir
)
for t in tasks
]
)
# Aggregate metrics
successful = sum(1 for r in results if r.get("score") == 1)
total_steps = sum(r.get("steps", 0) for r in results)
total_duration = sum(r.get("duration", 0) for r in results)
total_cost = sum(r.get("cost", 0) for r in results)
# Save results (append to existing runs)
results_file.parent.mkdir(parents=True, exist_ok=True)
runs = json.loads(results_file.read_text()) if results_file.exists() else []
runs.append(
{
"run_start": run_start,
"tasks_completed": len(results),
"tasks_successful": successful,
"total_steps": total_steps,
"total_duration": total_duration,
"total_cost": total_cost,
}
)
results_file.write_text(json.dumps(runs, indent=2))
print(
f"Run complete: {successful}/{len(results)} tasks successful, {total_steps} steps, {total_duration:.1f}s, ${total_cost:.2f}"
)
if __name__ == "__main__":
asyncio.run(main())