Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def tool_before_callback(self, tool_name, args, response): pass
def tool_after_callback(self, tool_name, args, response, ret): pass
def turn_end_callback(self, response, tool_calls, tool_results, turn, next_prompt, exit_reason): return next_prompt
def dispatch(self, tool_name, args, response, index=0):
# Some Anthropic-compatible relays/models may emit an internal "thinking"
# pseudo-tool call. Treat it as a no-op instead of derailing the turn.
if tool_name == 'thinking':
yield "[Info] 忽略兼容层返回的伪工具调用: thinking\n"
return StepOutcome(None, next_prompt="已忽略无效工具 thinking,请继续按真实工具列表调用。", should_exit=False)
method_name = f"do_{tool_name}"
if hasattr(self, method_name):
args['_index'] = index
Expand Down
8 changes: 4 additions & 4 deletions assets/tools_schema.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[
{"type": "function", "function": {
"name": "code_run",
"description": "Code executor. Prefer python. Multi-call OK, use script param. Reply code block is executed if no script arg; prefer for single call to avoid escaping. No hardcoding bulk data",
"description": "Code executor. NEVER call with empty arguments. Provide arguments.script, or place exactly one fenced code block immediately before the tool call. Default runtime cwd is ./temp; use cwd:'../' for the repo root/current project folder. Prefer file_read for inspecting existing files. No hardcoding bulk data",
"parameters": {"type": "object", "properties": {
"script": {"type": "string", "description": "[Mutually exclusive] NEVER use this param when use reply code block."},
"script": {"type": "string", "description": "Required unless the reply body contains exactly one fenced code block for this call."},
"type": {"type": "string", "enum": ["python", "powershell"], "description": "Code type", "default": "python"},
"timeout": {"type": "integer", "description": "in seconds", "default": 60},
"cwd": {"type": "string", "description": "Working directory, defaults to cwd"},
"cwd": {"type": "string", "description": "Working directory. Default is runtime scratch cwd ./temp; use ../ for the repo root/current project folder."},
"inline_eval": {"type": "boolean", "description": "DO NOT USE except explicitly specified."}}}
}},
{"type": "function", "function": {
Expand Down Expand Up @@ -70,4 +70,4 @@
"description": "Start distilling long-term memory. Call when discovering info worth remembering (env facts/user prefs/lessons learned). Skip if memory already updated or in autonomous flow. Must call when a task that took 15+ turns is completed",
"parameters": {"type": "object", "properties": {}}}
}
]
]
60 changes: 45 additions & 15 deletions ga.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop
yield f"[Action] Running {code_type} in {os.path.basename(cwd)}: {preview}\n"
script_dir = os.path.dirname(os.path.abspath(__file__))
cwd = cwd or os.path.join(script_dir, 'temp'); tmp_path = None
if code_type in ["python", "py"]:
if code_type == "python":
tmp_file = tempfile.NamedTemporaryFile(suffix=".ai.py", delete=False, mode='w', encoding='utf-8', dir=code_cwd)
cr_header = os.path.join(script_dir, 'assets', 'code_run_header.py')
if os.path.exists(cr_header): tmp_file.write(open(cr_header, encoding='utf-8').read())
tmp_file.write(code)
tmp_path = tmp_file.name
tmp_file.close()
cmd = [sys.executable, "-X", "utf8", "-u", tmp_path]
elif code_type in ["powershell", "bash", "sh", "shell", "ps1", "pwsh"]:
elif code_type in ["powershell", "bash"]:
if os.name == 'nt': cmd = ["powershell", "-NoProfile", "-NonInteractive", "-Command", code]
else: cmd = ["bash", "-c", code]
else:
Expand Down Expand Up @@ -110,10 +110,12 @@ def first_init_driver():
time.sleep(3)

def web_scan(tabs_only=False, switch_tab_id=None, text_only=False):
"""获取当前页面的简化HTML内容和标签页列表。注意:简化过程会过滤边栏、浮动元素等非主体内容。
"""
获取当前页面的简化HTML内容和标签页列表。注意:简化过程会过滤边栏、浮动元素等非主体内容。
tabs_only: 仅返回标签页列表,不获取HTML内容(节省token)。
switch_tab_id: 可选参数,如果提供,则在扫描前切换到该标签页。
应当多用execute_js,少全量观察html"""
应当多用execute_js,少全量观察html。
"""
global driver
try:
if driver is None: first_init_driver()
Expand Down Expand Up @@ -263,24 +265,41 @@ def __init__(self, parent, last_history=None, cwd='./temp'):
self.cwd = cwd; self.current_turn = 0
self.history_info = last_history if last_history else []
self.code_stop_signal = []
self._done_hooks = []

def _get_abs_path(self, path):
if not path: return ""
return os.path.abspath(os.path.join(self.cwd, path))

def _extract_code_block(self, response, code_type):
code_type = {'python':'python|py', 'powershell':'powershell|ps1|pwsh', 'bash':'bash|sh|shell'}.get(code_type, re.escape(code_type))
matches = re.findall(rf"```(?:{code_type})\n(.*?)\n```", response.content, re.DOTALL)
return matches[-1].strip() if matches else None
def _extract_code_block(self, response, code_type=None):
content = getattr(response, 'content', '') or ''
candidates = []
if code_type: candidates.append(str(code_type).lower())
candidates.extend([t for t in ("python", "powershell", "bash") if t not in candidates])
alias_map = {
"python": ["py"],
"powershell": ["ps1", "pwsh"],
"bash": ["sh", "shell"],
"javascript": ["js"],
}
for candidate in candidates:
langs = [candidate] + alias_map.get(candidate, [])
for lang in langs:
matches = re.findall(rf"```{lang}\n(.*?)\n```", content, re.DOTALL | re.IGNORECASE)
if matches: return candidate, matches[-1].strip()
generic = re.findall(r"```\n(.*?)\n```", content, re.DOTALL)
if generic: return (candidates[0] if candidates else "python"), generic[-1].strip()
return None, None

def do_code_run(self, args, response):
'''执行代码片段,有长度限制,不允许代码中放大量数据,如有需要应当通过文件读取进行。'''
code_type = args.get("type", "python")
explicit_type = args.get("type")
code_type = str(explicit_type or "python").lower()
code = args.get("code") or args.get("script")
if not code:
code = self._extract_code_block(response, code_type)
if not code: return StepOutcome("[Error] Code missing. Must use reply code block or 'script' arg.", next_prompt="\n")
inferred_type, inferred_code = self._extract_code_block(response, code_type if explicit_type else None)
code_type, code = inferred_type or code_type, inferred_code
if not code:
return StepOutcome("[Error] Code missing. Must use reply code block or 'script' arg.", next_prompt="\n")
timeout = args.get("timeout", 60)
raw_path = os.path.join(self.cwd, args.get("cwd", './'))
cwd = os.path.normpath(os.path.abspath(raw_path))
Expand Down Expand Up @@ -309,7 +328,8 @@ def do_ask_user(self, args, response):
def do_web_scan(self, args, response):
'''获取当前页面内容和标签页列表。也可用于切换标签页。
注意:HTML经过简化,边栏/浮动元素等可能被过滤。如需查看被过滤的内容请用execute_js。
tabs_only=true时仅返回标签页列表,不获取HTML(省token)'''
tabs_only=true时仅返回标签页列表,不获取HTML(省token)。
'''
tabs_only = args.get("tabs_only", False)
switch_tab_id = args.get("switch_tab_id", None)
text_only = args.get("text_only", False)
Expand All @@ -322,7 +342,9 @@ def do_web_scan(self, args, response):

def do_web_execute_js(self, args, response):
'''web情况下的优先使用工具,执行任何js达成对浏览器的*完全*控制。支持将结果保存到文件供后续读取分析。'''
script = args.get("script", "") or self._extract_code_block(response, "javascript")
script = args.get("script", "")
if not script:
_, script = self._extract_code_block(response, "javascript")
if not script: return StepOutcome("[Error] Script missing. Use ```javascript block or 'script' arg.", next_prompt="\n")
abs_path = self._get_abs_path(script.strip())
if os.path.isfile(abs_path):
Expand Down Expand Up @@ -522,7 +544,7 @@ def turn_end_callback(self, response, tool_calls, tool_results, turn, next_promp
clean_args = {k: v for k, v in args.items() if not k.startswith('_')}
summary = f"调用工具{tool_name}, args: {clean_args}"
if tool_name == 'no_tool': summary = "直接回答了用户问题"
next_prompt += "\n[DANGER] 上一轮遗漏了<summary>,需要按协议在<summary>中输出极简单行摘要!"
next_prompt += "\n[DANGER] 上一轮遗漏了<summary>,已根据物理动作自动补全。在下次回复中记得<summary>协议。"
summary = smart_format(summary, max_str_len=100)
self.history_info.append(f'[Agent] {summary}')
if turn % 35 == 0 and 'plan' not in str(self.working.get('related_sop')):
Expand Down Expand Up @@ -550,8 +572,16 @@ def get_global_memory():
with open(os.path.join(script_dir, 'memory/global_mem_insight.txt'), 'r', encoding='utf-8', errors='replace') as f: insight = f.read()
with open(os.path.join(script_dir, f'assets/insight_fixed_structure{suffix}.txt'), 'r', encoding='utf-8') as f: structure = f.read()
prompt += f'cwd = {os.path.join(script_dir, "temp")} (./)\n'
prompt += f'project_root = {script_dir} (../)\n'
prompt += "Interpret user-facing 'current folder/current project/current repository' as project_root (../), unless the user explicitly asks for temp/scratch cwd.\n"
prompt += f"\n[Memory] (../memory)\n"
prompt += structure + '\n../memory/global_mem_insight.txt:\n'
prompt += insight + "\n"
# L2: 注入全局记忆
l2_path = os.path.join(script_dir, 'memory/global_mem.txt')
if os.path.exists(l2_path):
with open(l2_path, 'r', encoding='utf-8', errors='replace') as f: l2_content = f.read()
if l2_content.strip():
prompt += "\n../memory/global_mem.txt (L2环境事实):\n" + l2_content + "\n"
except FileNotFoundError: pass
return prompt
Loading