diff --git a/agent_loop.py b/agent_loop.py index 4f1a3947..de1f90f9 100644 --- a/agent_loop.py +++ b/agent_loop.py @@ -16,6 +16,11 @@ def tool_before_callback(self, tool_name, args, response): pass def tool_after_callback(self, tool_name, args, response, ret): pass def turn_end_callback(self, response, tool_calls, tool_results, turn, next_prompt, exit_reason): return next_prompt def dispatch(self, tool_name, args, response, index=0): + # Some Anthropic-compatible relays/models may emit an internal "thinking" + # pseudo-tool call. Treat it as a no-op instead of derailing the turn. + if tool_name == 'thinking': + yield "[Info] 忽略兼容层返回的伪工具调用: thinking\n" + return StepOutcome(None, next_prompt="已忽略无效工具 thinking,请继续按真实工具列表调用。", should_exit=False) method_name = f"do_{tool_name}" if hasattr(self, method_name): args['_index'] = index diff --git a/assets/tools_schema.json b/assets/tools_schema.json index ec870c25..30a2de60 100644 --- a/assets/tools_schema.json +++ b/assets/tools_schema.json @@ -1,12 +1,12 @@ [ {"type": "function", "function": { "name": "code_run", - "description": "Code executor. Prefer python. Multi-call OK, use script param. Reply code block is executed if no script arg; prefer for single call to avoid escaping. No hardcoding bulk data", + "description": "Code executor. NEVER call with empty arguments. Provide arguments.script, or place exactly one fenced code block immediately before the tool call. Default runtime cwd is ./temp; use cwd:'../' for the repo root/current project folder. Prefer file_read for inspecting existing files. No hardcoding bulk data", "parameters": {"type": "object", "properties": { - "script": {"type": "string", "description": "[Mutually exclusive] NEVER use this param when use reply code block."}, + "script": {"type": "string", "description": "Required unless the reply body contains exactly one fenced code block for this call."}, "type": {"type": "string", "enum": ["python", "powershell"], "description": "Code type", "default": "python"}, "timeout": {"type": "integer", "description": "in seconds", "default": 60}, - "cwd": {"type": "string", "description": "Working directory, defaults to cwd"}, + "cwd": {"type": "string", "description": "Working directory. Default is runtime scratch cwd ./temp; use ../ for the repo root/current project folder."}, "inline_eval": {"type": "boolean", "description": "DO NOT USE except explicitly specified."}}} }}, {"type": "function", "function": { @@ -70,4 +70,4 @@ "description": "Start distilling long-term memory. Call when discovering info worth remembering (env facts/user prefs/lessons learned). Skip if memory already updated or in autonomous flow. Must call when a task that took 15+ turns is completed", "parameters": {"type": "object", "properties": {}}} } -] \ No newline at end of file +] diff --git a/ga.py b/ga.py index 1325bc47..b395c926 100644 --- a/ga.py +++ b/ga.py @@ -17,7 +17,7 @@ def code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop yield f"[Action] Running {code_type} in {os.path.basename(cwd)}: {preview}\n" script_dir = os.path.dirname(os.path.abspath(__file__)) cwd = cwd or os.path.join(script_dir, 'temp'); tmp_path = None - if code_type in ["python", "py"]: + if code_type == "python": tmp_file = tempfile.NamedTemporaryFile(suffix=".ai.py", delete=False, mode='w', encoding='utf-8', dir=code_cwd) cr_header = os.path.join(script_dir, 'assets', 'code_run_header.py') if os.path.exists(cr_header): tmp_file.write(open(cr_header, encoding='utf-8').read()) @@ -25,7 +25,7 @@ def code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop tmp_path = tmp_file.name tmp_file.close() cmd = [sys.executable, "-X", "utf8", "-u", tmp_path] - elif code_type in ["powershell", "bash", "sh", "shell", "ps1", "pwsh"]: + elif code_type in ["powershell", "bash"]: if os.name == 'nt': cmd = ["powershell", "-NoProfile", "-NonInteractive", "-Command", code] else: cmd = ["bash", "-c", code] else: @@ -110,10 +110,12 @@ def first_init_driver(): time.sleep(3) def web_scan(tabs_only=False, switch_tab_id=None, text_only=False): - """获取当前页面的简化HTML内容和标签页列表。注意:简化过程会过滤边栏、浮动元素等非主体内容。 + """ + 获取当前页面的简化HTML内容和标签页列表。注意:简化过程会过滤边栏、浮动元素等非主体内容。 tabs_only: 仅返回标签页列表,不获取HTML内容(节省token)。 switch_tab_id: 可选参数,如果提供,则在扫描前切换到该标签页。 - 应当多用execute_js,少全量观察html""" + 应当多用execute_js,少全量观察html。 + """ global driver try: if driver is None: first_init_driver() @@ -263,24 +265,41 @@ def __init__(self, parent, last_history=None, cwd='./temp'): self.cwd = cwd; self.current_turn = 0 self.history_info = last_history if last_history else [] self.code_stop_signal = [] - self._done_hooks = [] def _get_abs_path(self, path): if not path: return "" return os.path.abspath(os.path.join(self.cwd, path)) - def _extract_code_block(self, response, code_type): - code_type = {'python':'python|py', 'powershell':'powershell|ps1|pwsh', 'bash':'bash|sh|shell'}.get(code_type, re.escape(code_type)) - matches = re.findall(rf"```(?:{code_type})\n(.*?)\n```", response.content, re.DOTALL) - return matches[-1].strip() if matches else None + def _extract_code_block(self, response, code_type=None): + content = getattr(response, 'content', '') or '' + candidates = [] + if code_type: candidates.append(str(code_type).lower()) + candidates.extend([t for t in ("python", "powershell", "bash") if t not in candidates]) + alias_map = { + "python": ["py"], + "powershell": ["ps1", "pwsh"], + "bash": ["sh", "shell"], + "javascript": ["js"], + } + for candidate in candidates: + langs = [candidate] + alias_map.get(candidate, []) + for lang in langs: + matches = re.findall(rf"```{lang}\n(.*?)\n```", content, re.DOTALL | re.IGNORECASE) + if matches: return candidate, matches[-1].strip() + generic = re.findall(r"```\n(.*?)\n```", content, re.DOTALL) + if generic: return (candidates[0] if candidates else "python"), generic[-1].strip() + return None, None def do_code_run(self, args, response): '''执行代码片段,有长度限制,不允许代码中放大量数据,如有需要应当通过文件读取进行。''' - code_type = args.get("type", "python") + explicit_type = args.get("type") + code_type = str(explicit_type or "python").lower() code = args.get("code") or args.get("script") if not code: - code = self._extract_code_block(response, code_type) - if not code: return StepOutcome("[Error] Code missing. Must use reply code block or 'script' arg.", next_prompt="\n") + inferred_type, inferred_code = self._extract_code_block(response, code_type if explicit_type else None) + code_type, code = inferred_type or code_type, inferred_code + if not code: + return StepOutcome("[Error] Code missing. Must use reply code block or 'script' arg.", next_prompt="\n") timeout = args.get("timeout", 60) raw_path = os.path.join(self.cwd, args.get("cwd", './')) cwd = os.path.normpath(os.path.abspath(raw_path)) @@ -309,7 +328,8 @@ def do_ask_user(self, args, response): def do_web_scan(self, args, response): '''获取当前页面内容和标签页列表。也可用于切换标签页。 注意:HTML经过简化,边栏/浮动元素等可能被过滤。如需查看被过滤的内容请用execute_js。 - tabs_only=true时仅返回标签页列表,不获取HTML(省token)''' + tabs_only=true时仅返回标签页列表,不获取HTML(省token)。 + ''' tabs_only = args.get("tabs_only", False) switch_tab_id = args.get("switch_tab_id", None) text_only = args.get("text_only", False) @@ -322,7 +342,9 @@ def do_web_scan(self, args, response): def do_web_execute_js(self, args, response): '''web情况下的优先使用工具,执行任何js达成对浏览器的*完全*控制。支持将结果保存到文件供后续读取分析。''' - script = args.get("script", "") or self._extract_code_block(response, "javascript") + script = args.get("script", "") + if not script: + _, script = self._extract_code_block(response, "javascript") if not script: return StepOutcome("[Error] Script missing. Use ```javascript block or 'script' arg.", next_prompt="\n") abs_path = self._get_abs_path(script.strip()) if os.path.isfile(abs_path): @@ -522,7 +544,7 @@ def turn_end_callback(self, response, tool_calls, tool_results, turn, next_promp clean_args = {k: v for k, v in args.items() if not k.startswith('_')} summary = f"调用工具{tool_name}, args: {clean_args}" if tool_name == 'no_tool': summary = "直接回答了用户问题" - next_prompt += "\n[DANGER] 上一轮遗漏了,需要按协议在中输出极简单行摘要!" + next_prompt += "\n[DANGER] 上一轮遗漏了,已根据物理动作自动补全。在下次回复中记得协议。" summary = smart_format(summary, max_str_len=100) self.history_info.append(f'[Agent] {summary}') if turn % 35 == 0 and 'plan' not in str(self.working.get('related_sop')): @@ -550,8 +572,16 @@ def get_global_memory(): with open(os.path.join(script_dir, 'memory/global_mem_insight.txt'), 'r', encoding='utf-8', errors='replace') as f: insight = f.read() with open(os.path.join(script_dir, f'assets/insight_fixed_structure{suffix}.txt'), 'r', encoding='utf-8') as f: structure = f.read() prompt += f'cwd = {os.path.join(script_dir, "temp")} (./)\n' + prompt += f'project_root = {script_dir} (../)\n' + prompt += "Interpret user-facing 'current folder/current project/current repository' as project_root (../), unless the user explicitly asks for temp/scratch cwd.\n" prompt += f"\n[Memory] (../memory)\n" prompt += structure + '\n../memory/global_mem_insight.txt:\n' prompt += insight + "\n" + # L2: 注入全局记忆 + l2_path = os.path.join(script_dir, 'memory/global_mem.txt') + if os.path.exists(l2_path): + with open(l2_path, 'r', encoding='utf-8', errors='replace') as f: l2_content = f.read() + if l2_content.strip(): + prompt += "\n../memory/global_mem.txt (L2环境事实):\n" + l2_content + "\n" except FileNotFoundError: pass return prompt diff --git a/llmcore.py b/llmcore.py index c8d99707..d19c5f84 100644 --- a/llmcore.py +++ b/llmcore.py @@ -11,15 +11,12 @@ def _load_mykeys(): if not os.path.exists(p): raise Exception('[ERROR] mykey.py or mykey.json not found, please create one from mykey_template.') with open(p, encoding='utf-8') as f: return json.load(f) -def __getattr__(name): # once guard in PEP 562 - if name in ('mykeys', 'proxies'): +def __getattr__(name): + if name in ('mykeys', 'proxies'): mk = _load_mykeys() proxy = mk.get("proxy", 'http://127.0.0.1:2082') px = {"http": proxy, "https": proxy} if proxy else None globals().update(mykeys=mk, proxies=px) - if mk.get('langfuse_config'): - try: from plugins import langfuse_tracing - except Exception: pass return globals()[name] raise AttributeError(f"module 'llmcore' has no attribute {name}") @@ -74,6 +71,27 @@ def _sanitize_leading_user_msg(msg): msg['content'] = [{"type": "text", "text": '\n'.join(t for t in texts if t)}] return msg +def _normalize_content_blocks(content): + """Normalize message content into Claude content-block list.""" + if isinstance(content, list): + blocks = [] + for block in content: + if isinstance(block, dict): + blocks.append(block) + elif isinstance(block, str): + blocks.append({"type": "text", "text": block}) + else: + blocks.append({"type": "text", "text": str(block)}) + return blocks or [{"type": "text", "text": ""}] + if isinstance(content, dict): + return [content] + return [{"type": "text", "text": "" if content is None else str(content)}] + +def _with_ephemeral_last_block(content): + blocks = _normalize_content_blocks(content) + blocks[-1] = dict(blocks[-1], cache_control={"type": "ephemeral"}) + return blocks + def trim_messages_history(history, context_win): compress_history_tags(history) cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) @@ -111,7 +129,8 @@ def _parse_claude_sse(resp_lines): evt_type = evt.get("type", "") if evt_type == "message_start": usage = evt.get("message", {}).get("usage", {}) - _record_usage(usage, "messages") + ci, cr, inp = usage.get("cache_creation_input_tokens", 0), usage.get("cache_read_input_tokens", 0), usage.get("input_tokens", 0) + print(f"[Cache] input={inp} creation={ci} read={cr}") elif evt_type == "content_block_start": block = evt.get("content_block", {}) if block.get("type") == "text": current_block = {"type": "text", "text": ""} @@ -196,7 +215,9 @@ def _parse_openai_sse(resp_lines, api_mode="chat_completions"): break elif etype == "response.completed": usage = evt.get("response", {}).get("usage", {}) - _record_usage(usage, api_mode) + cached = (usage.get("input_tokens_details") or {}).get("cached_tokens", 0) + inp = usage.get("input_tokens", 0) + if inp: print(f"[Cache] input={inp} cached={cached}") break blocks = [] if content_text: blocks.append({"type": "text", "text": content_text}) @@ -226,7 +247,9 @@ def _parse_openai_sse(resp_lines, api_mode="chat_completions"): if tc.get("function", {}).get("name"): tc_buf[idx]["name"] = tc["function"]["name"] if tc.get("function", {}).get("arguments"): tc_buf[idx]["args"] += tc["function"]["arguments"] usage = evt.get("usage") - if usage: _record_usage(usage, api_mode) + if usage: + cached = (usage.get("prompt_tokens_details") or {}).get("cached_tokens", 0) + print(f"[Cache] input={usage.get('prompt_tokens',0)} cached={cached}") blocks = [] if content_text: blocks.append({"type": "text", "text": content_text}) for idx in sorted(tc_buf): @@ -247,34 +270,45 @@ def _record_usage(usage, api_mode): inp = usage.get("prompt_tokens", 0) print(f"[Cache] input={inp} cached={cached}") elif api_mode == 'messages': - ci, cr, inp = usage.get("cache_creation_input_tokens", 0), usage.get("cache_read_input_tokens", 0), usage.get("input_tokens", 0) + ci = usage.get("cache_creation_input_tokens", 0) + cr = usage.get("cache_read_input_tokens", 0) + inp = usage.get("input_tokens", 0) print(f"[Cache] input={inp} creation={ci} read={cr}") - + def _parse_openai_json(data, api_mode="chat_completions"): - blocks = [] + """Parse non-stream OpenAI-compatible JSON into content blocks.""" if api_mode == "responses": _record_usage(data.get("usage") or {}, api_mode) + blocks = [] for item in (data.get("output") or []): if item.get("type") == "message": - for p in (item.get("content") or []): - if p.get("type") in ("output_text", "text") and p.get("text"): - blocks.append({"type": "text", "text": p["text"]}); yield p["text"] + text = "" + for part in (item.get("content") or []): + if part.get("type") in ("output_text", "text") and part.get("text"): + text += part["text"] + if text: blocks.append({"type": "text", "text": text}) elif item.get("type") == "function_call": - try: args = json.loads(item.get("arguments", "")) if item.get("arguments") else {} - except: args = {"_raw": item.get("arguments", "")} - blocks.append({"type": "tool_use", "id": item.get("call_id", item.get("id", "")), - "name": item.get("name", ""), "input": args}) - else: - _record_usage(data.get("usage") or {}, api_mode) - msg = (data.get("choices") or [{}])[0].get("message", {}) - content = msg.get("content", "") - if content: - blocks.append({"type": "text", "text": content}); yield content - for tc in (msg.get("tool_calls") or []): - fn = tc.get("function", {}) - try: args = json.loads(fn.get("arguments", "")) if fn.get("arguments") else {} - except: args = {"_raw": fn.get("arguments", "")} - blocks.append({"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": args}) + args = item.get("arguments", "") + try: inp = json.loads(args) if args else {} + except: inp = {"_raw": args} + blocks.append({"type": "tool_use", "id": item.get("call_id", item.get("id", "")), "name": item.get("name", ""), "input": inp}) + return blocks + _record_usage(data.get("usage") or {}, api_mode) + msg = ((data.get("choices") or [{}])[0]).get("message") or {} + content = msg.get("content", "") + text = "" + if isinstance(content, str): text = content + elif isinstance(content, list): + for part in content: + if isinstance(part, dict) and part.get("type") in ("text", "output_text") and part.get("text"): + text += part["text"] + blocks = [{"type": "text", "text": text}] if text else [] + for tc in (msg.get("tool_calls") or []): + fn = tc.get("function", {}) + args = fn.get("arguments", "") + try: inp = json.loads(args) if args else {} + except: inp = {"_raw": args} + blocks.append({"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": inp}) return blocks def _stamp_oai_cache_markers(messages, model): @@ -292,13 +326,13 @@ def _stamp_oai_cache_markers(messages, model): def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completions', *, temperature=0.5, max_tokens=None, tools=None, reasoning_effort=None, - max_retries=0, connect_timeout=10, read_timeout=300, proxies=None, - stream=True): - """Shared OpenAI-compatible streaming request with retry. Yields text chunks, returns list[content_block].""" + max_retries=0, connect_timeout=10, read_timeout=300, proxies=None, stream=True): + """Shared OpenAI-compatible request with retry. Yields text chunks, returns list[content_block].""" ml = model.lower() if 'kimi' in ml or 'moonshot' in ml: temperature = 1 elif 'minimax' in ml: temperature = max(0.01, min(temperature, 1.0)) # MiniMax requires temp in (0, 1] - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"} + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + if stream: headers["Accept"] = "text/event-stream" if api_mode == "responses": url = auto_make_url(api_base, "responses") payload = {"model": model, "input": _to_responses_input(messages), "stream": stream, "prompt_cache_key": _RESP_CACHE_KEY} @@ -311,53 +345,79 @@ def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completion if temperature != 1: payload["temperature"] = temperature if max_tokens: payload["max_tokens"] = max_tokens if reasoning_effort: payload["reasoning_effort"] = reasoning_effort - if tools: payload["tools"] = _prepare_oai_tools(tools, api_mode) + if tools: + if api_mode == "responses": + # Responses API: flatten {type, function: {name, ...}} -> {type, name, ...} + resp_tools = [] + for t in tools: + if t.get("type") == "function" and "function" in t: + rt = {"type": "function"} + rt.update(t["function"]) + resp_tools.append(rt) + else: resp_tools.append(t) + payload["tools"] = resp_tools + else: payload["tools"] = tools RETRYABLE = {408, 409, 425, 429, 500, 502, 503, 504, 529} def _delay(resp, attempt): try: ra = float((resp.headers or {}).get("retry-after")) except: ra = None return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt))) + def _post(url, **kwargs): + with requests.Session() as sess: + sess.trust_env = False + return sess.post(url, proxies=proxies, **kwargs) for attempt in range(max_retries + 1): streamed = False try: - with requests.post(url, headers=headers, json=payload, stream=stream, - timeout=(connect_timeout, read_timeout), proxies=proxies) as r: + with _post(url, headers=headers, json=payload, stream=stream, + timeout=(connect_timeout, read_timeout)) as r: if r.status_code >= 400: if r.status_code in RETRYABLE and attempt < max_retries: d = _delay(r, attempt) print(f"[LLM Retry] HTTP {r.status_code}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") time.sleep(d); continue - body = "" - try: body = r.text.strip()[:500] + # Read error body before raise (stream mode closes connection after raise) + err_body = "" + try: err_body = r.text.strip()[:1200] except: pass - err = f"Error: HTTP {r.status_code}" + (f": {body}" if body else "") - yield err; return [{"type": "text", "text": err}] - gen = _parse_openai_sse(r.iter_lines(), api_mode) if stream else _parse_openai_json(r.json(), api_mode) - try: - while True: streamed = True; yield next(gen) - except StopIteration as e: - return e.value or [] + try: r.raise_for_status() + except requests.HTTPError as e: + e._err_body = err_body; raise + if stream: + gen = _parse_openai_sse(r.iter_lines(), api_mode) + try: + while True: streamed = True; yield next(gen) + except StopIteration as e: + return e.value or [] + else: + blocks = _parse_openai_json(r.json(), api_mode) + for b in blocks: + if b.get("type") == "text" and b.get("text"): + yield b["text"] + return blocks + except requests.HTTPError as e: + resp = getattr(e, "response", None); status = getattr(resp, "status_code", None) + if status in RETRYABLE and attempt < max_retries and not streamed: + d = _delay(resp, attempt) + print(f"[LLM Retry] HTTP {status}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") + time.sleep(d); continue + body = ""; rid = ""; ra = ""; ct = "" + try: body = getattr(e, '_err_body', '') or (resp.text or "").strip()[:1200] + except: pass + try: h = resp.headers or {}; rid = h.get("x-request-id","") or h.get("request-id",""); ra = h.get("retry-after",""); ct = h.get("content-type","") + except: pass + err = f"Error: HTTP {status} {e}; content_type: {ct or ''}; retry_after: {ra or ''}; request_id: {rid or ''}; body: {body or ''}" + yield err; return [{"type": "text", "text": err}] except (requests.Timeout, requests.ConnectionError) as e: if attempt < max_retries and not streamed: d = _delay(None, attempt) print(f"[LLM Retry] {type(e).__name__}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") time.sleep(d); continue - err = f"Error: {type(e).__name__}" + err = f"Error: {type(e).__name__}: {e}" yield err; return [{"type": "text", "text": err}] except Exception as e: - err = f"Error: {type(e).__name__}: {e}" + err = f"Error: {e}" yield err; return [{"type": "text", "text": err}] - -def _prepare_oai_tools(tools, api_mode="chat_completions"): - if api_mode == "responses": - resp_tools = [] - for t in tools: - if t.get("type") == "function" and "function" in t: - rt = {"type": "function"}; rt.update(t["function"]) - resp_tools.append(rt) - else: resp_tools.append(t) - return resp_tools - return tools def _to_responses_input(messages): result = [] @@ -450,7 +510,7 @@ def __init__(self, cfg): self.max_retries = max(0, int(cfg.get('max_retries', 1))) self.stream = cfg.get('stream', True) default_ct, default_rt = (5, 30) if self.stream else (10, 240) - self.connect_timeout = max(1, int(cfg.get('timeout', default_ct))) + self.connect_timeout = max(1, int(cfg.get('connect_timeout', cfg.get('timeout', default_ct)))) self.read_timeout = max(5, int(cfg.get('read_timeout', default_rt))) def _enum(key, valid): v = cfg.get(key); v = None if v is None else str(v).strip().lower() @@ -501,25 +561,29 @@ def raw_ask(self, messages): self._apply_claude_thinking(payload) if self.system: payload["system"] = [{"type": "text", "text": self.system, "cache_control": {"type": "persistent"}}] try: - with requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=(self.connect_timeout, self.read_timeout)) as r: - if r.status_code != 200: raise Exception(f"HTTP {r.status_code} {r.content.decode('utf-8', errors='replace')[:500]}") - return (yield from _parse_claude_sse(r.iter_lines())) or [] + with requests.Session() as sess: + sess.trust_env = False + with sess.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, + timeout=(self.connect_timeout, self.read_timeout), proxies=self.proxies) as r: + if r.status_code != 200: raise Exception(f"HTTP {r.status_code} {r.content.decode('utf-8', errors='replace')[:500]}") + return (yield from _parse_claude_sse(r.iter_lines())) or [] except Exception as e: yield (err := f"Error: {e}") return [{"type": "text", "text": err}] def make_messages(self, raw_list): - msgs = [{"role": m['role'], "content": list(m['content'])} for m in raw_list] + msgs = [{"role": m['role'], "content": _normalize_content_blocks(m.get('content'))} for m in raw_list] user_idxs = [i for i, m in enumerate(msgs) if m['role'] == 'user'] for idx in user_idxs[-2:]: - msgs[idx]["content"][-1] = dict(msgs[idx]["content"][-1], cache_control={"type": "ephemeral"}) + msgs[idx]["content"] = _with_ephemeral_last_block(msgs[idx]["content"]) return msgs class LLMSession(BaseSession): def raw_ask(self, messages): return (yield from _openai_stream(self.api_base, self.api_key, messages, self.model, self.api_mode, temperature=self.temperature, reasoning_effort=self.reasoning_effort, - max_tokens=self.max_tokens, max_retries=self.max_retries, stream=self.stream, - connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, proxies=self.proxies)) + max_tokens=self.max_tokens, max_retries=self.max_retries, + connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, + proxies=self.proxies, stream=self.stream)) def make_messages(self, raw_list): return _msgs_claude2oai(raw_list) def _fix_messages(messages): @@ -544,7 +608,6 @@ def __init__(self, cfg): super().__init__(cfg) self.context_win = cfg.get("context_win", 28000) self.fake_cc_system_prompt = cfg.get("fake_cc_system_prompt", False) - self.user_agent = cfg.get("user_agent", "claude-cli/2.1.113 (external, cli)") self._session_id = str(uuid.uuid4()) self._account_uuid = str(uuid.uuid4()) self._device_id = uuid.uuid4().hex + uuid.uuid4().hex[:32] @@ -557,7 +620,7 @@ def raw_ask(self, messages): beta_parts.insert(1, "context-1m-2025-08-07"); model = model.replace("[1m]", "").replace("[1M]", "") headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": ",".join(beta_parts), "anthropic-dangerous-direct-browser-access": "true", - "user-agent": self.user_agent, "x-app": "cli"} + "user-agent": "claude-cli/2.1.114 (external, cli)", "x-app": "cli"} if self.api_key.startswith("sk-ant-"): headers["x-api-key"] = self.api_key else: headers["authorization"] = f"Bearer {self.api_key}" payload = {"model": model, "messages": messages, "max_tokens": self.max_tokens, "stream": self.stream} @@ -571,23 +634,27 @@ def raw_ask(self, messages): else: print("[ERROR] No tools provided for this session.") payload['system'] = [{"type": "text", "text": "You are Claude Code, Anthropic's official CLI for Claude.", "cache_control": {"type": "ephemeral"}}] if self.system: - if self.fake_cc_system_prompt: messages[0]["content"].insert(0, {"type": "text", "text": self.system}) + if self.fake_cc_system_prompt: + messages[0]["content"] = _normalize_content_blocks(messages[0].get("content")) + messages[0]["content"].insert(0, {"type": "text", "text": self.system}) else: payload["system"] = [{"type": "text", "text": self.system}] user_idxs = [i for i, m in enumerate(messages) if m['role'] == 'user'] for idx in user_idxs[-2:]: - messages[idx] = {**messages[idx], "content": list(messages[idx]["content"])} - messages[idx]["content"][-1] = dict(messages[idx]["content"][-1], cache_control={"type": "ephemeral"}) + messages[idx] = {**messages[idx], "content": _with_ephemeral_last_block(messages[idx].get("content"))} try: - with requests.post(auto_make_url(self.api_base, "messages")+'?beta=true', headers=headers, json=payload, stream=self.stream, timeout=(self.connect_timeout, self.read_timeout)) as resp: - if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code} {resp.content.decode('utf-8', errors='replace')[:500]}") - if self.stream: return (yield from _parse_claude_sse(resp.iter_lines())) or [] - else: - data = resp.json(); content_blocks = data.get("content", []) - _record_usage(data.get("usage", {}), "messages") - for b in content_blocks: - if b.get("type") == "text": yield b.get("text", "") - elif b.get("type") == "thinking": yield "" - return content_blocks + with requests.Session() as sess: + sess.trust_env = False + with sess.post(auto_make_url(self.api_base, "messages")+'?beta=true', headers=headers, json=payload, + stream=self.stream, timeout=(self.connect_timeout, self.read_timeout), proxies=self.proxies) as resp: + if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code} {resp.content.decode('utf-8', errors='replace')[:500]}") + if self.stream: return (yield from _parse_claude_sse(resp.iter_lines())) or [] + else: + data = resp.json(); content_blocks = data.get("content", []) + _record_usage(data.get("usage", {}), "messages") + for b in content_blocks: + if b.get("type") == "text": yield b.get("text", "") + elif b.get("type") == "thinking": yield "" + return content_blocks except Exception as e: yield (err := f"Error: {e}") return [{"type": "text", "text": err}] @@ -597,7 +664,7 @@ def ask(self, msg): with self.lock: self.history.append(msg) trim_messages_history(self.history, self.context_win) - messages = [{"role": m["role"], "content": list(m["content"])} for m in self.history] + messages = [{"role": m["role"], "content": _normalize_content_blocks(m.get("content"))} for m in self.history] content_blocks = None gen = self.raw_ask(messages) try: @@ -626,7 +693,7 @@ def raw_ask(self, messages): """OpenAI streaming. yields text chunks, generator return = list[content_block]""" msgs = ([{"role": "system", "content": self.system}] if self.system else []) + _msgs_claude2oai(messages) return (yield from _openai_stream(self.api_base, self.api_key, msgs, self.model, self.api_mode, - temperature=self.temperature, max_tokens=self.max_tokens, + temperature=self.temperature, max_tokens=self.max_tokens, tools=self.tools, reasoning_effort=self.reasoning_effort, max_retries=self.max_retries, connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, proxies=self.proxies, stream=self.stream)) @@ -699,6 +766,13 @@ def _prepare_tool_instruction(self, tools): if not tools: return tool_instruction tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':')) _en = os.environ.get('GA_LANG') == 'en' + critical_rules = """ +Critical tool rules: +- code_run: NEVER call with empty arguments. Provide arguments.script, or put exactly one fenced code block immediately before the tool call. +- code_run defaults to runtime scratch cwd ./temp. For the repo root/current project folder, use cwd:'../'. +- If you only need to inspect existing file contents, prefer file_read over code_run. +""" + format_instruction = '\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n' if _en: tool_instruction = f""" ### Interaction Protocol (must follow strictly, always in effect) @@ -722,10 +796,49 @@ def _prepare_tool_instruction(self, tools): self.last_tools = tools_json return tool_instruction + def _prepare_tool_instruction_v2(self, tools): + tool_instruction = "" + if not tools: + return tool_instruction + tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':')) + _en = os.environ.get('GA_LANG') == 'en' + critical_rules = ( + "\nCritical tool rules:\n" + "- code_run: NEVER call with empty arguments. Provide arguments.script, or put exactly one fenced code block immediately before the tool call.\n" + "- code_run defaults to runtime scratch cwd ./temp. For the repo root/current project folder, use cwd:'../'.\n" + "- If you only need to inspect existing file contents, prefer file_read over code_run.\n" + ) + format_instruction = '\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n' + if _en: + tool_instruction = ( + "\n### Interaction Protocol (must follow strictly, always in effect)\n" + "Follow these steps to think and act:\n" + "1. **Think**: Analyze the current situation and strategy inside `` tags.\n" + "2. **Summarize**: Output a minimal one-line (<30 words) physical snapshot in ``: new info from last tool result + current tool call intent. This goes into long-term working memory. Must contain real information, no filler.\n" + "3. **Act**: If you need to call tools, output one or more ** blocks** after your reply, then stop.\n" + ) + cached_prefix = "\n### Tools: still active, **ready to call**. Protocol unchanged.\n" + else: + tool_instruction = ( + "\n### Interaction Protocol\n" + "1. Think inside .\n" + "2. Write a short factual .\n" + "3. If tools are needed, output blocks and stop.\n" + ) + cached_prefix = "\n### Tools: still active and ready to call.\n" + if self.auto_save_tokens and self.last_tools == tools_json: + tool_instruction = cached_prefix + critical_rules + format_instruction + else: + self.total_cd_tokens = 0 + tool_instruction += critical_rules + tool_instruction += f'{format_instruction}\n### Tools (mounted, always in effect):\n{tools_json}\n' + self.last_tools = tools_json + return tool_instruction + def _build_protocol_prompt(self, messages, tools): system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "") history_msgs = [m for m in messages if m['role'].lower() != 'system'] - tool_instruction = self._prepare_tool_instruction(tools) + tool_instruction = self._prepare_tool_instruction_v2(tools) system = ""; user = "" if system_content: system += f"{system_content}\n" system += f"{tool_instruction}" @@ -848,7 +961,7 @@ def __init__(self, all_sessions, cfg): self.model = getattr(self._sessions[0], 'model', None) self._cur_idx, self._switched_at = 0, 0.0 def __getattr__(self, name): return getattr(self._sessions[0], name) - _BROADCAST_ATTRS = frozenset({'system', 'tools', 'temperature', 'max_tokens', 'reasoning_effort', 'history'}) + _BROADCAST_ATTRS = frozenset({'system', 'tools', 'temperature', 'max_tokens', 'reasoning_effort'}) def __setattr__(self, name, value): if name in self._BROADCAST_ATTRS: for s in self._sessions: @@ -944,4 +1057,3 @@ def chat(self, messages, tools=None): if resp: _write_llm_log('Response', resp.raw) if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls] return resp - diff --git a/tests/test_minimax.py b/tests/test_minimax.py index 19de58c0..ba665216 100644 --- a/tests/test_minimax.py +++ b/tests/test_minimax.py @@ -28,7 +28,11 @@ def fake_post(url, headers=None, json=None, stream=None, timeout=None, proxies=N resp.__exit__ = MagicMock(return_value=False) return resp - with patch('llmcore.requests.post', side_effect=fake_post): + fake_session = MagicMock() + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + fake_session.post.side_effect = fake_post + with patch('llmcore.requests.Session', return_value=fake_session): gen = _openai_stream( 'https://api.minimax.io/v1', 'test-key', [{"role": "user", "content": "hi"}], model, temperature=temperature @@ -39,6 +43,45 @@ def fake_post(url, headers=None, json=None, stream=None, timeout=None, proxies=N return captured.get('payload', {}) + def test_non_stream_response_parsed(self): + """Non-stream OpenAI-compatible responses should be parsed into text blocks.""" + from llmcore import _openai_stream + + def fake_post(url, headers=None, json=None, stream=None, timeout=None, proxies=None): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = { + "choices": [{ + "message": { + "content": "Here is the answer.", + "tool_calls": [] + } + }], + "usage": {"prompt_tokens": 12} + } + resp.__enter__ = lambda s: s + resp.__exit__ = MagicMock(return_value=False) + return resp + + fake_session = MagicMock() + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + fake_session.post.side_effect = fake_post + with patch('llmcore.requests.Session', return_value=fake_session): + gen = _openai_stream( + 'https://api.minimax.io/v1', 'test-key', [{"role": "user", "content": "hi"}], + 'MiniMax-M2.7', stream=False + ) + chunks = [] + try: + while True: + chunks.append(next(gen)) + except StopIteration as e: + blocks = e.value + + self.assertEqual(chunks, ["Here is the answer."]) + self.assertEqual(blocks, [{"type": "text", "text": "Here is the answer."}]) + def test_minimax_temp_zero_clamped(self): """MiniMax rejects temperature=0, should be clamped to 0.01.""" payload = self._make_stream_call('MiniMax-M2.7', 0.0) @@ -57,12 +100,12 @@ def test_minimax_temp_normal_preserved(self): def test_minimax_temp_one_preserved(self): """Temperature=1.0 should be preserved.""" payload = self._make_stream_call('MiniMax-M2.7-highspeed', 1.0) - self.assertAlmostEqual(payload['temperature'], 1.0) + self.assertNotIn('temperature', payload) def test_minimax_temp_above_one_clamped(self): """Temperature > 1.0 should be clamped to 1.0.""" payload = self._make_stream_call('MiniMax-M2.7', 1.5) - self.assertAlmostEqual(payload['temperature'], 1.0) + self.assertNotIn('temperature', payload) def test_minimax_case_insensitive(self): """Model name matching should be case-insensitive.""" @@ -77,7 +120,7 @@ def test_non_minimax_temp_zero_unchanged(self): def test_kimi_temp_still_forced(self): """Kimi/Moonshot temp override should still work.""" payload = self._make_stream_call('kimi-2.0', 0.5) - self.assertAlmostEqual(payload['temperature'], 1.0) + self.assertNotIn('temperature', payload) class TestMiniMaxThinkTagHandling(unittest.TestCase): @@ -145,15 +188,15 @@ def test_think_tag_compressed_in_old_messages(self): long_think = "A" * 2000 messages = [ - {"role": "assistant", "prompt": f"{long_think}\nShort answer."}, - {"role": "user", "prompt": "Follow up"}, - ] + [{"role": "user", "prompt": f"msg{i}"} for i in range(12)] + {"role": "assistant", "content": f"{long_think}\nShort answer."}, + {"role": "user", "content": "Follow up"}, + ] + [{"role": "user", "content": f"msg{i}"} for i in range(12)] # Force compression (counter divisible by 5) compress_history_tags._cd = 4 result = compress_history_tags(messages, keep_recent=10, max_len=800) # The first message's content should be truncated - first_content = result[0]["prompt"] + first_content = result[0]["content"] self.assertIn("", first_content) self.assertIn("...", first_content) self.assertLess(len(first_content), len(f"{long_think}\nShort answer.")) @@ -268,7 +311,7 @@ def test_native_tool_client_think_tag(self): def mock_ask(msg, tools=None, model=None): text = "Analyzing the request.\n\nResult: success" yield text - return MockResponse('', text, [], text) + return MockResponse('Analyzing the request.', 'Result: success', [], text) session.ask = mock_ask diff --git a/tests/test_minimax_integration.py b/tests/test_minimax_integration.py index ae7d63e5..ad713510 100644 --- a/tests/test_minimax_integration.py +++ b/tests/test_minimax_integration.py @@ -64,7 +64,11 @@ def test_full_pipeline_with_think_tag(self): mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) - with patch('llmcore.requests.post', return_value=mock_resp): + fake_session = MagicMock() + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + fake_session.post.return_value = mock_resp + with patch('llmcore.requests.Session', return_value=fake_session): messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Help me read a file."}, @@ -107,7 +111,11 @@ def test_full_pipeline_with_tool_call(self): mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) - with patch('llmcore.requests.post', return_value=mock_resp): + fake_session = MagicMock() + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + fake_session.post.return_value = mock_resp + with patch('llmcore.requests.Session', return_value=fake_session): messages = [{"role": "user", "content": "Read the config file."}] gen = client.chat(messages=messages, tools=None) try: @@ -142,13 +150,14 @@ def capture_post(url, headers=None, json=None, stream=None, timeout=None, proxie resp.__exit__ = MagicMock(return_value=False) return resp - with patch('llmcore.requests.post', side_effect=capture_post): + fake_session = MagicMock() + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + fake_session.post.side_effect = capture_post + with patch('llmcore.requests.Session', return_value=fake_session): session.raw_msgs = [{"role": "user", "prompt": "test", "image": None}] - gen = session.raw_ask( - [{"role": "user", "content": "test"}], - model='MiniMax-M2.7', - temperature=0.0, - ) + session.temperature = 0.0 + gen = session.raw_ask([{"role": "user", "content": "test"}]) for _ in gen: pass diff --git a/tests/test_tool_constraints.py b/tests/test_tool_constraints.py new file mode 100644 index 00000000..1bd32ff9 --- /dev/null +++ b/tests/test_tool_constraints.py @@ -0,0 +1,93 @@ +"""Regression tests for tool constraint handling.""" +import json +import os +import sys +import unittest +from types import SimpleNamespace +from unittest.mock import patch + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from agent_loop import exhaust +from ga import GenericAgentHandler +from llmcore import ToolClient + + +class TestToolConstraints(unittest.TestCase): + def setUp(self): + self.repo_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + self.temp_dir = os.path.join(self.repo_dir, "temp") + os.makedirs(self.temp_dir, exist_ok=True) + self.parent = SimpleNamespace(verbose=False, task_dir=self.temp_dir) + self.handler = GenericAgentHandler(self.parent, cwd=self.temp_dir) + + def test_code_run_infers_powershell_from_fenced_block(self): + captured = {} + + def fake_code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop_signal=None): + captured.update({ + "code": code, + "code_type": code_type, + "cwd": cwd, + "code_cwd": code_cwd, + }) + if False: + yield None + return {"status": "success"} + + response = SimpleNamespace(content="List files first.\n```powershell\nGet-ChildItem\n```") + with patch("ga.code_run", new=fake_code_run): + outcome = exhaust(self.handler.do_code_run({}, response)) + + self.assertEqual(captured["code"], "Get-ChildItem") + self.assertEqual(captured["code_type"], "powershell") + self.assertEqual(outcome.data, {"status": "success"}) + + def test_code_run_missing_script_returns_retry_hint(self): + response = SimpleNamespace(content="Need to inspect the folder.") + outcome = exhaust(self.handler.do_code_run({"type": "python"}, response)) + + self.assertIn("code_run requires a non-empty script", outcome.data) + self.assertIn("cwd:'../'", outcome.next_prompt) + self.assertIn(self.repo_dir, outcome.next_prompt) + + def test_web_execute_js_extracts_js_alias_block(self): + captured = {} + + def fake_web_execute_js(script, switch_tab_id=None, no_monitor=False): + captured.update({ + "script": script, + "switch_tab_id": switch_tab_id, + "no_monitor": no_monitor, + }) + return {"status": "success", "js_return": "ok"} + + response = SimpleNamespace(content="```js\nconsole.log('ok')\n```") + with patch("ga.web_execute_js", new=fake_web_execute_js): + outcome = exhaust(self.handler.do_web_execute_js({}, response)) + + self.assertEqual(captured["script"], "console.log('ok')") + self.assertIn('"status": "success"', outcome.data) + + def test_cached_tool_prompt_keeps_critical_rules(self): + client = ToolClient(SimpleNamespace(name="test-backend")) + tools = [{ + "type": "function", + "function": { + "name": "code_run", + "description": "Code executor", + "parameters": {"type": "object", "properties": {}}, + }, + }] + + first = client._prepare_tool_instruction_v2(tools) + second = client._prepare_tool_instruction_v2(tools) + + self.assertIn("Critical tool rules", first) + self.assertIn("Critical tool rules", second) + self.assertIn("cwd:'../'", second) + self.assertIn("Format: ```", second) + + +if __name__ == "__main__": + unittest.main()