From c0f538dd3d5882dbe78b3fe9f341b2c57cdf8d95 Mon Sep 17 00:00:00 2001
From: BIN LI <1457956056@qq.com>
Date: Sun, 19 Apr 2026 16:47:03 +0800
Subject: [PATCH] solve some unstable problems
---
agent_loop.py | 5 +
assets/tools_schema.json | 8 +-
ga.py | 49 +++++++--
llmcore.py | 169 ++++++++++++++++++++++++------
tests/test_minimax.py | 61 +++++++++--
tests/test_minimax_integration.py | 25 +++--
tests/test_tool_constraints.py | 93 ++++++++++++++++
7 files changed, 351 insertions(+), 59 deletions(-)
create mode 100644 tests/test_tool_constraints.py
diff --git a/agent_loop.py b/agent_loop.py
index 6a77a0fd..e31cca3c 100644
--- a/agent_loop.py
+++ b/agent_loop.py
@@ -16,6 +16,11 @@ def tool_before_callback(self, tool_name, args, response): pass
def tool_after_callback(self, tool_name, args, response, ret): pass
def turn_end_callback(self, response, tool_calls, tool_results, turn, next_prompt, exit_reason): return next_prompt
def dispatch(self, tool_name, args, response, index=0):
+ # Some Anthropic-compatible relays/models may emit an internal "thinking"
+ # pseudo-tool call. Treat it as a no-op instead of derailing the turn.
+ if tool_name == 'thinking':
+ yield "[Info] 忽略兼容层返回的伪工具调用: thinking\n"
+ return StepOutcome(None, next_prompt="已忽略无效工具 thinking,请继续按真实工具列表调用。", should_exit=False)
method_name = f"do_{tool_name}"
if hasattr(self, method_name):
args['_index'] = index
diff --git a/assets/tools_schema.json b/assets/tools_schema.json
index ec870c25..30a2de60 100644
--- a/assets/tools_schema.json
+++ b/assets/tools_schema.json
@@ -1,12 +1,12 @@
[
{"type": "function", "function": {
"name": "code_run",
- "description": "Code executor. Prefer python. Multi-call OK, use script param. Reply code block is executed if no script arg; prefer for single call to avoid escaping. No hardcoding bulk data",
+ "description": "Code executor. NEVER call with empty arguments. Provide arguments.script, or place exactly one fenced code block immediately before the tool call. Default runtime cwd is ./temp; use cwd:'../' for the repo root/current project folder. Prefer file_read for inspecting existing files. No hardcoding bulk data",
"parameters": {"type": "object", "properties": {
- "script": {"type": "string", "description": "[Mutually exclusive] NEVER use this param when use reply code block."},
+ "script": {"type": "string", "description": "Required unless the reply body contains exactly one fenced code block for this call."},
"type": {"type": "string", "enum": ["python", "powershell"], "description": "Code type", "default": "python"},
"timeout": {"type": "integer", "description": "in seconds", "default": 60},
- "cwd": {"type": "string", "description": "Working directory, defaults to cwd"},
+ "cwd": {"type": "string", "description": "Working directory. Default is runtime scratch cwd ./temp; use ../ for the repo root/current project folder."},
"inline_eval": {"type": "boolean", "description": "DO NOT USE except explicitly specified."}}}
}},
{"type": "function", "function": {
@@ -70,4 +70,4 @@
"description": "Start distilling long-term memory. Call when discovering info worth remembering (env facts/user prefs/lessons learned). Skip if memory already updated or in autonomous flow. Must call when a task that took 15+ turns is completed",
"parameters": {"type": "object", "properties": {}}}
}
-]
\ No newline at end of file
+]
diff --git a/ga.py b/ga.py
index 1ad9836e..db531809 100644
--- a/ga.py
+++ b/ga.py
@@ -270,17 +270,48 @@ def _get_abs_path(self, path):
if not path: return ""
return os.path.abspath(os.path.join(self.cwd, path))
- def _extract_code_block(self, response, code_type):
- matches = re.findall(rf"```{code_type}\n(.*?)\n```", response.content, re.DOTALL)
- return matches[-1].strip() if matches else None
+ def _extract_code_block(self, response, code_type=None):
+ content = getattr(response, 'content', '') or ''
+ candidates = []
+ if code_type: candidates.append(str(code_type).lower())
+ candidates.extend([t for t in ("python", "powershell", "bash") if t not in candidates])
+ alias_map = {
+ "python": ["py"],
+ "powershell": ["ps1", "pwsh"],
+ "bash": ["sh", "shell"],
+ "javascript": ["js"],
+ }
+ for candidate in candidates:
+ langs = [candidate] + alias_map.get(candidate, [])
+ for lang in langs:
+ matches = re.findall(rf"```{lang}\n(.*?)\n```", content, re.DOTALL | re.IGNORECASE)
+ if matches: return candidate, matches[-1].strip()
+ generic = re.findall(r"```\n(.*?)\n```", content, re.DOTALL)
+ if generic: return (candidates[0] if candidates else "python"), generic[-1].strip()
+ return None, None
+
+ def _code_run_retry_hint(self):
+ project_root = os.path.abspath(os.path.join(self.cwd, '..'))
+ return (
+ "[System] Invalid code_run call. Provide a non-empty arguments.script, or put exactly one fenced "
+ "code block immediately before the tool call. Never call code_run with only type/cwd/inline_eval. "
+ f"Runtime scratch cwd is {self.cwd}. Project root is {project_root}; use cwd:'../' for the current "
+ "project folder/repo root. If you only need to inspect existing files, prefer file_read."
+ )
def do_code_run(self, args, response):
'''执行代码片段,有长度限制,不允许代码中放大量数据,如有需要应当通过文件读取进行。'''
- code_type = args.get("type", "python")
+ explicit_type = args.get("type")
+ code_type = str(explicit_type or "python").lower()
code = args.get("code") or args.get("script")
if not code:
- code = self._extract_code_block(response, code_type)
- if not code: return StepOutcome("[Error] Code missing. Use ```{code_type} block or 'script' arg.", next_prompt="\n")
+ inferred_type, inferred_code = self._extract_code_block(response, code_type if explicit_type else None)
+ code_type, code = inferred_type or code_type, inferred_code
+ if not code:
+ return StepOutcome(
+ "[Error] code_run requires a non-empty script. Use arguments.script or exactly one fenced code block immediately before the tool call.",
+ next_prompt=self._get_anchor_prompt(skip=args.get('_index', 0) > 0) + "\n" + self._code_run_retry_hint()
+ )
timeout = args.get("timeout", 60)
raw_path = os.path.join(self.cwd, args.get("cwd", './'))
cwd = os.path.normpath(os.path.abspath(raw_path))
@@ -323,7 +354,9 @@ def do_web_scan(self, args, response):
def do_web_execute_js(self, args, response):
'''web情况下的优先使用工具,执行任何js达成对浏览器的*完全*控制。支持将结果保存到文件供后续读取分析。'''
- script = args.get("script", "") or self._extract_code_block(response, "javascript")
+ script = args.get("script", "")
+ if not script:
+ _, script = self._extract_code_block(response, "javascript")
if not script: return StepOutcome("[Error] Script missing. Use ```javascript block or 'script' arg.", next_prompt="\n")
abs_path = self._get_abs_path(script.strip())
if os.path.isfile(abs_path):
@@ -551,6 +584,8 @@ def get_global_memory():
with open(os.path.join(script_dir, 'memory/global_mem_insight.txt'), 'r', encoding='utf-8', errors='replace') as f: insight = f.read()
with open(os.path.join(script_dir, f'assets/insight_fixed_structure{suffix}.txt'), 'r', encoding='utf-8') as f: structure = f.read()
prompt += f'cwd = {os.path.join(script_dir, "temp")} (./)\n'
+ prompt += f'project_root = {script_dir} (../)\n'
+ prompt += "Interpret user-facing 'current folder/current project/current repository' as project_root (../), unless the user explicitly asks for temp/scratch cwd.\n"
prompt += f"\n[Memory] (../memory)\n"
prompt += structure + '\n../memory/global_mem_insight.txt:\n'
prompt += insight + "\n"
diff --git a/llmcore.py b/llmcore.py
index a8887fb4..196ace89 100644
--- a/llmcore.py
+++ b/llmcore.py
@@ -238,6 +238,47 @@ def _parse_openai_sse(resp_lines, api_mode="chat_completions"):
blocks.append({"type": "tool_use", "id": tc["id"], "name": tc["name"], "input": inp})
return blocks
+def _parse_openai_json(data, api_mode="chat_completions"):
+ """Parse non-stream OpenAI-compatible JSON into content blocks."""
+ if api_mode == "responses":
+ usage = data.get("usage", {})
+ cached = (usage.get("input_tokens_details") or {}).get("cached_tokens", 0)
+ inp = usage.get("input_tokens", 0)
+ if inp: print(f"[Cache] input={inp} cached={cached}")
+ blocks = []
+ for item in (data.get("output") or []):
+ if item.get("type") == "message":
+ text = ""
+ for part in (item.get("content") or []):
+ if part.get("type") in ("output_text", "text") and part.get("text"):
+ text += part["text"]
+ if text: blocks.append({"type": "text", "text": text})
+ elif item.get("type") == "function_call":
+ args = item.get("arguments", "")
+ try: inp = json.loads(args) if args else {}
+ except: inp = {"_raw": args}
+ blocks.append({"type": "tool_use", "id": item.get("call_id", item.get("id", "")), "name": item.get("name", ""), "input": inp})
+ return blocks
+ usage = data.get("usage") or {}
+ cached = (usage.get("prompt_tokens_details") or {}).get("cached_tokens", 0)
+ if usage: print(f"[Cache] input={usage.get('prompt_tokens',0)} cached={cached}")
+ msg = ((data.get("choices") or [{}])[0]).get("message") or {}
+ content = msg.get("content", "")
+ text = ""
+ if isinstance(content, str): text = content
+ elif isinstance(content, list):
+ for part in content:
+ if isinstance(part, dict) and part.get("type") in ("text", "output_text") and part.get("text"):
+ text += part["text"]
+ blocks = [{"type": "text", "text": text}] if text else []
+ for tc in (msg.get("tool_calls") or []):
+ fn = tc.get("function", {})
+ args = fn.get("arguments", "")
+ try: inp = json.loads(args) if args else {}
+ except: inp = {"_raw": args}
+ blocks.append({"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": inp})
+ return blocks
+
def _stamp_oai_cache_markers(messages, model):
"""Add cache_control to last 2 user messages for Anthropic models via OAI-compatible relay."""
ml = model.lower()
@@ -253,20 +294,22 @@ def _stamp_oai_cache_markers(messages, model):
def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completions', *,
temperature=0.5, max_tokens=None, tools=None, reasoning_effort=None,
- max_retries=0, connect_timeout=10, read_timeout=300, proxies=None):
- """Shared OpenAI-compatible streaming request with retry. Yields text chunks, returns list[content_block]."""
+ max_retries=0, connect_timeout=10, read_timeout=300, proxies=None, stream=True):
+ """Shared OpenAI-compatible request with retry. Yields text chunks, returns list[content_block]."""
ml = model.lower()
if 'kimi' in ml or 'moonshot' in ml: temperature = 1
elif 'minimax' in ml: temperature = max(0.01, min(temperature, 1.0)) # MiniMax requires temp in (0, 1]
- headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"}
+ headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
+ if stream: headers["Accept"] = "text/event-stream"
if api_mode == "responses":
url = auto_make_url(api_base, "responses")
- payload = {"model": model, "input": _to_responses_input(messages), "stream": True, "prompt_cache_key": _RESP_CACHE_KEY}
+ payload = {"model": model, "input": _to_responses_input(messages), "stream": stream, "prompt_cache_key": _RESP_CACHE_KEY}
if reasoning_effort: payload["reasoning"] = {"effort": reasoning_effort}
else:
url = auto_make_url(api_base, "chat/completions")
_stamp_oai_cache_markers(messages, model)
- payload = {"model": model, "messages": messages, "stream": True, "stream_options": {"include_usage": True}}
+ payload = {"model": model, "messages": messages, "stream": stream}
+ if stream: payload["stream_options"] = {"include_usage": True}
if temperature != 1: payload["temperature"] = temperature
if max_tokens: payload["max_tokens"] = max_tokens
if reasoning_effort: payload["reasoning_effort"] = reasoning_effort
@@ -287,11 +330,15 @@ def _delay(resp, attempt):
try: ra = float((resp.headers or {}).get("retry-after"))
except: ra = None
return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt)))
+ def _post(url, **kwargs):
+ with requests.Session() as sess:
+ sess.trust_env = False
+ return sess.post(url, proxies=proxies, **kwargs)
for attempt in range(max_retries + 1):
streamed = False
try:
- with requests.post(url, headers=headers, json=payload, stream=True,
- timeout=(connect_timeout, read_timeout), proxies=proxies) as r:
+ with _post(url, headers=headers, json=payload, stream=stream,
+ timeout=(connect_timeout, read_timeout)) as r:
if r.status_code >= 400:
if r.status_code in RETRYABLE and attempt < max_retries:
d = _delay(r, attempt)
@@ -304,11 +351,18 @@ def _delay(resp, attempt):
try: r.raise_for_status()
except requests.HTTPError as e:
e._err_body = err_body; raise
- gen = _parse_openai_sse(r.iter_lines(), api_mode)
- try:
- while True: streamed = True; yield next(gen)
- except StopIteration as e:
- return e.value or []
+ if stream:
+ gen = _parse_openai_sse(r.iter_lines(), api_mode)
+ try:
+ while True: streamed = True; yield next(gen)
+ except StopIteration as e:
+ return e.value or []
+ else:
+ blocks = _parse_openai_json(r.json(), api_mode)
+ for b in blocks:
+ if b.get("type") == "text" and b.get("text"):
+ yield b["text"]
+ return blocks
except requests.HTTPError as e:
resp = getattr(e, "response", None); status = getattr(resp, "status_code", None)
if status in RETRYABLE and attempt < max_retries and not streamed:
@@ -424,7 +478,7 @@ def __init__(self, cfg):
self.max_retries = max(0, int(cfg.get('max_retries', 1)))
self.stream = cfg.get('stream', True)
default_ct, default_rt = (5, 30) if self.stream else (10, 240)
- self.connect_timeout = max(1, int(cfg.get('timeout', default_ct)))
+ self.connect_timeout = max(1, int(cfg.get('connect_timeout', cfg.get('timeout', default_ct))))
self.read_timeout = max(5, int(cfg.get('read_timeout', default_rt)))
def _enum(key, valid):
v = cfg.get(key); v = None if v is None else str(v).strip().lower()
@@ -475,9 +529,12 @@ def raw_ask(self, messages):
self._apply_claude_thinking(payload)
if self.system: payload["system"] = [{"type": "text", "text": self.system, "cache_control": {"type": "persistent"}}]
try:
- with requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=(self.connect_timeout, self.read_timeout)) as r:
- if r.status_code != 200: raise Exception(f"HTTP {r.status_code} {r.content.decode('utf-8', errors='replace')[:500]}")
- return (yield from _parse_claude_sse(r.iter_lines())) or []
+ with requests.Session() as sess:
+ sess.trust_env = False
+ with sess.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True,
+ timeout=(self.connect_timeout, self.read_timeout), proxies=self.proxies) as r:
+ if r.status_code != 200: raise Exception(f"HTTP {r.status_code} {r.content.decode('utf-8', errors='replace')[:500]}")
+ return (yield from _parse_claude_sse(r.iter_lines())) or []
except Exception as e:
yield (err := f"Error: {e}")
return [{"type": "text", "text": err}]
@@ -493,7 +550,8 @@ def raw_ask(self, messages):
return (yield from _openai_stream(self.api_base, self.api_key, messages, self.model, self.api_mode,
temperature=self.temperature, reasoning_effort=self.reasoning_effort,
max_tokens=self.max_tokens, max_retries=self.max_retries,
- connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, proxies=self.proxies))
+ connect_timeout=self.connect_timeout, read_timeout=self.read_timeout,
+ proxies=self.proxies, stream=self.stream))
def make_messages(self, raw_list): return _msgs_claude2oai(raw_list)
def _fix_messages(messages):
@@ -551,17 +609,20 @@ def raw_ask(self, messages):
messages[idx] = {**messages[idx], "content": list(messages[idx]["content"])}
messages[idx]["content"][-1] = dict(messages[idx]["content"][-1], cache_control={"type": "ephemeral"})
try:
- with requests.post(auto_make_url(self.api_base, "messages")+'?beta=true', headers=headers, json=payload, stream=self.stream, timeout=(self.connect_timeout, self.read_timeout)) as resp:
- if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code} {resp.content.decode('utf-8', errors='replace')[:500]}")
- if self.stream: return (yield from _parse_claude_sse(resp.iter_lines())) or []
- else:
- data = resp.json(); content_blocks = data.get("content", [])
- usage = data.get("usage", {})
- print(f"[Cache] input={usage.get('input_tokens',0)} creation={usage.get('cache_creation_input_tokens',0)} read={usage.get('cache_read_input_tokens',0)}")
- for b in content_blocks:
- if b.get("type") == "text": yield b.get("text", "")
- elif b.get("type") == "thinking": yield ""
- return content_blocks
+ with requests.Session() as sess:
+ sess.trust_env = False
+ with sess.post(auto_make_url(self.api_base, "messages")+'?beta=true', headers=headers, json=payload,
+ stream=self.stream, timeout=(self.connect_timeout, self.read_timeout), proxies=self.proxies) as resp:
+ if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code} {resp.content.decode('utf-8', errors='replace')[:500]}")
+ if self.stream: return (yield from _parse_claude_sse(resp.iter_lines())) or []
+ else:
+ data = resp.json(); content_blocks = data.get("content", [])
+ usage = data.get("usage", {})
+ print(f"[Cache] input={usage.get('input_tokens',0)} creation={usage.get('cache_creation_input_tokens',0)} read={usage.get('cache_read_input_tokens',0)}")
+ for b in content_blocks:
+ if b.get("type") == "text": yield b.get("text", "")
+ elif b.get("type") == "thinking": yield ""
+ return content_blocks
except Exception as e:
yield (err := f"Error: {e}")
return [{"type": "text", "text": err}]
@@ -603,7 +664,7 @@ def raw_ask(self, messages):
temperature=self.temperature, max_tokens=self.max_tokens,
tools=self.tools, reasoning_effort=self.reasoning_effort,
max_retries=self.max_retries, connect_timeout=self.connect_timeout,
- read_timeout=self.read_timeout, proxies=self.proxies))
+ read_timeout=self.read_timeout, proxies=self.proxies, stream=self.stream))
def openai_tools_to_claude(tools):
"""[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}]."""
@@ -673,6 +734,13 @@ def _prepare_tool_instruction(self, tools):
if not tools: return tool_instruction
tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':'))
_en = os.environ.get('GA_LANG') == 'en'
+ critical_rules = """
+Critical tool rules:
+- code_run: NEVER call with empty arguments. Provide arguments.script, or put exactly one fenced code block immediately before the tool call.
+- code_run defaults to runtime scratch cwd ./temp. For the repo root/current project folder, use cwd:'../'.
+- If you only need to inspect existing file contents, prefer file_read over code_run.
+"""
+ format_instruction = '\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n'
if _en:
tool_instruction = f"""
### Interaction Protocol (must follow strictly, always in effect)
@@ -696,10 +764,49 @@ def _prepare_tool_instruction(self, tools):
self.last_tools = tools_json
return tool_instruction
+ def _prepare_tool_instruction_v2(self, tools):
+ tool_instruction = ""
+ if not tools:
+ return tool_instruction
+ tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':'))
+ _en = os.environ.get('GA_LANG') == 'en'
+ critical_rules = (
+ "\nCritical tool rules:\n"
+ "- code_run: NEVER call with empty arguments. Provide arguments.script, or put exactly one fenced code block immediately before the tool call.\n"
+ "- code_run defaults to runtime scratch cwd ./temp. For the repo root/current project folder, use cwd:'../'.\n"
+ "- If you only need to inspect existing file contents, prefer file_read over code_run.\n"
+ )
+ format_instruction = '\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n'
+ if _en:
+ tool_instruction = (
+ "\n### Interaction Protocol (must follow strictly, always in effect)\n"
+ "Follow these steps to think and act:\n"
+ "1. **Think**: Analyze the current situation and strategy inside `` tags.\n"
+ "2. **Summarize**: Output a minimal one-line (<30 words) physical snapshot in ``: new info from last tool result + current tool call intent. This goes into long-term working memory. Must contain real information, no filler.\n"
+ "3. **Act**: If you need to call tools, output one or more ** blocks** after your reply, then stop.\n"
+ )
+ cached_prefix = "\n### Tools: still active, **ready to call**. Protocol unchanged.\n"
+ else:
+ tool_instruction = (
+ "\n### Interaction Protocol\n"
+ "1. Think inside .\n"
+ "2. Write a short factual .\n"
+ "3. If tools are needed, output blocks and stop.\n"
+ )
+ cached_prefix = "\n### Tools: still active and ready to call.\n"
+ if self.auto_save_tokens and self.last_tools == tools_json:
+ tool_instruction = cached_prefix + critical_rules + format_instruction
+ else:
+ self.total_cd_tokens = 0
+ tool_instruction += critical_rules
+ tool_instruction += f'{format_instruction}\n### Tools (mounted, always in effect):\n{tools_json}\n'
+ self.last_tools = tools_json
+ return tool_instruction
+
def _build_protocol_prompt(self, messages, tools):
system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "")
history_msgs = [m for m in messages if m['role'].lower() != 'system']
- tool_instruction = self._prepare_tool_instruction(tools)
+ tool_instruction = self._prepare_tool_instruction_v2(tools)
system = ""; user = ""
if system_content: system += f"{system_content}\n"
system += f"{tool_instruction}"
@@ -917,4 +1024,4 @@ def chat(self, messages, tools=None):
except StopIteration as e: resp = e.value
if resp: _write_llm_log('Response', resp.raw)
if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls]
- return resp
\ No newline at end of file
+ return resp
diff --git a/tests/test_minimax.py b/tests/test_minimax.py
index 19de58c0..ba665216 100644
--- a/tests/test_minimax.py
+++ b/tests/test_minimax.py
@@ -28,7 +28,11 @@ def fake_post(url, headers=None, json=None, stream=None, timeout=None, proxies=N
resp.__exit__ = MagicMock(return_value=False)
return resp
- with patch('llmcore.requests.post', side_effect=fake_post):
+ fake_session = MagicMock()
+ fake_session.__enter__.return_value = fake_session
+ fake_session.__exit__.return_value = False
+ fake_session.post.side_effect = fake_post
+ with patch('llmcore.requests.Session', return_value=fake_session):
gen = _openai_stream(
'https://api.minimax.io/v1', 'test-key', [{"role": "user", "content": "hi"}],
model, temperature=temperature
@@ -39,6 +43,45 @@ def fake_post(url, headers=None, json=None, stream=None, timeout=None, proxies=N
return captured.get('payload', {})
+ def test_non_stream_response_parsed(self):
+ """Non-stream OpenAI-compatible responses should be parsed into text blocks."""
+ from llmcore import _openai_stream
+
+ def fake_post(url, headers=None, json=None, stream=None, timeout=None, proxies=None):
+ resp = MagicMock()
+ resp.status_code = 200
+ resp.json.return_value = {
+ "choices": [{
+ "message": {
+ "content": "Here is the answer.",
+ "tool_calls": []
+ }
+ }],
+ "usage": {"prompt_tokens": 12}
+ }
+ resp.__enter__ = lambda s: s
+ resp.__exit__ = MagicMock(return_value=False)
+ return resp
+
+ fake_session = MagicMock()
+ fake_session.__enter__.return_value = fake_session
+ fake_session.__exit__.return_value = False
+ fake_session.post.side_effect = fake_post
+ with patch('llmcore.requests.Session', return_value=fake_session):
+ gen = _openai_stream(
+ 'https://api.minimax.io/v1', 'test-key', [{"role": "user", "content": "hi"}],
+ 'MiniMax-M2.7', stream=False
+ )
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(gen))
+ except StopIteration as e:
+ blocks = e.value
+
+ self.assertEqual(chunks, ["Here is the answer."])
+ self.assertEqual(blocks, [{"type": "text", "text": "Here is the answer."}])
+
def test_minimax_temp_zero_clamped(self):
"""MiniMax rejects temperature=0, should be clamped to 0.01."""
payload = self._make_stream_call('MiniMax-M2.7', 0.0)
@@ -57,12 +100,12 @@ def test_minimax_temp_normal_preserved(self):
def test_minimax_temp_one_preserved(self):
"""Temperature=1.0 should be preserved."""
payload = self._make_stream_call('MiniMax-M2.7-highspeed', 1.0)
- self.assertAlmostEqual(payload['temperature'], 1.0)
+ self.assertNotIn('temperature', payload)
def test_minimax_temp_above_one_clamped(self):
"""Temperature > 1.0 should be clamped to 1.0."""
payload = self._make_stream_call('MiniMax-M2.7', 1.5)
- self.assertAlmostEqual(payload['temperature'], 1.0)
+ self.assertNotIn('temperature', payload)
def test_minimax_case_insensitive(self):
"""Model name matching should be case-insensitive."""
@@ -77,7 +120,7 @@ def test_non_minimax_temp_zero_unchanged(self):
def test_kimi_temp_still_forced(self):
"""Kimi/Moonshot temp override should still work."""
payload = self._make_stream_call('kimi-2.0', 0.5)
- self.assertAlmostEqual(payload['temperature'], 1.0)
+ self.assertNotIn('temperature', payload)
class TestMiniMaxThinkTagHandling(unittest.TestCase):
@@ -145,15 +188,15 @@ def test_think_tag_compressed_in_old_messages(self):
long_think = "A" * 2000
messages = [
- {"role": "assistant", "prompt": f"{long_think}\nShort answer."},
- {"role": "user", "prompt": "Follow up"},
- ] + [{"role": "user", "prompt": f"msg{i}"} for i in range(12)]
+ {"role": "assistant", "content": f"{long_think}\nShort answer."},
+ {"role": "user", "content": "Follow up"},
+ ] + [{"role": "user", "content": f"msg{i}"} for i in range(12)]
# Force compression (counter divisible by 5)
compress_history_tags._cd = 4
result = compress_history_tags(messages, keep_recent=10, max_len=800)
# The first message's content should be truncated
- first_content = result[0]["prompt"]
+ first_content = result[0]["content"]
self.assertIn("", first_content)
self.assertIn("...", first_content)
self.assertLess(len(first_content), len(f"{long_think}\nShort answer."))
@@ -268,7 +311,7 @@ def test_native_tool_client_think_tag(self):
def mock_ask(msg, tools=None, model=None):
text = "Analyzing the request.\n\nResult: success"
yield text
- return MockResponse('', text, [], text)
+ return MockResponse('Analyzing the request.', 'Result: success', [], text)
session.ask = mock_ask
diff --git a/tests/test_minimax_integration.py b/tests/test_minimax_integration.py
index ae7d63e5..ad713510 100644
--- a/tests/test_minimax_integration.py
+++ b/tests/test_minimax_integration.py
@@ -64,7 +64,11 @@ def test_full_pipeline_with_think_tag(self):
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
- with patch('llmcore.requests.post', return_value=mock_resp):
+ fake_session = MagicMock()
+ fake_session.__enter__.return_value = fake_session
+ fake_session.__exit__.return_value = False
+ fake_session.post.return_value = mock_resp
+ with patch('llmcore.requests.Session', return_value=fake_session):
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Help me read a file."},
@@ -107,7 +111,11 @@ def test_full_pipeline_with_tool_call(self):
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
- with patch('llmcore.requests.post', return_value=mock_resp):
+ fake_session = MagicMock()
+ fake_session.__enter__.return_value = fake_session
+ fake_session.__exit__.return_value = False
+ fake_session.post.return_value = mock_resp
+ with patch('llmcore.requests.Session', return_value=fake_session):
messages = [{"role": "user", "content": "Read the config file."}]
gen = client.chat(messages=messages, tools=None)
try:
@@ -142,13 +150,14 @@ def capture_post(url, headers=None, json=None, stream=None, timeout=None, proxie
resp.__exit__ = MagicMock(return_value=False)
return resp
- with patch('llmcore.requests.post', side_effect=capture_post):
+ fake_session = MagicMock()
+ fake_session.__enter__.return_value = fake_session
+ fake_session.__exit__.return_value = False
+ fake_session.post.side_effect = capture_post
+ with patch('llmcore.requests.Session', return_value=fake_session):
session.raw_msgs = [{"role": "user", "prompt": "test", "image": None}]
- gen = session.raw_ask(
- [{"role": "user", "content": "test"}],
- model='MiniMax-M2.7',
- temperature=0.0,
- )
+ session.temperature = 0.0
+ gen = session.raw_ask([{"role": "user", "content": "test"}])
for _ in gen:
pass
diff --git a/tests/test_tool_constraints.py b/tests/test_tool_constraints.py
new file mode 100644
index 00000000..1bd32ff9
--- /dev/null
+++ b/tests/test_tool_constraints.py
@@ -0,0 +1,93 @@
+"""Regression tests for tool constraint handling."""
+import json
+import os
+import sys
+import unittest
+from types import SimpleNamespace
+from unittest.mock import patch
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from agent_loop import exhaust
+from ga import GenericAgentHandler
+from llmcore import ToolClient
+
+
+class TestToolConstraints(unittest.TestCase):
+ def setUp(self):
+ self.repo_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ self.temp_dir = os.path.join(self.repo_dir, "temp")
+ os.makedirs(self.temp_dir, exist_ok=True)
+ self.parent = SimpleNamespace(verbose=False, task_dir=self.temp_dir)
+ self.handler = GenericAgentHandler(self.parent, cwd=self.temp_dir)
+
+ def test_code_run_infers_powershell_from_fenced_block(self):
+ captured = {}
+
+ def fake_code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop_signal=None):
+ captured.update({
+ "code": code,
+ "code_type": code_type,
+ "cwd": cwd,
+ "code_cwd": code_cwd,
+ })
+ if False:
+ yield None
+ return {"status": "success"}
+
+ response = SimpleNamespace(content="List files first.\n```powershell\nGet-ChildItem\n```")
+ with patch("ga.code_run", new=fake_code_run):
+ outcome = exhaust(self.handler.do_code_run({}, response))
+
+ self.assertEqual(captured["code"], "Get-ChildItem")
+ self.assertEqual(captured["code_type"], "powershell")
+ self.assertEqual(outcome.data, {"status": "success"})
+
+ def test_code_run_missing_script_returns_retry_hint(self):
+ response = SimpleNamespace(content="Need to inspect the folder.")
+ outcome = exhaust(self.handler.do_code_run({"type": "python"}, response))
+
+ self.assertIn("code_run requires a non-empty script", outcome.data)
+ self.assertIn("cwd:'../'", outcome.next_prompt)
+ self.assertIn(self.repo_dir, outcome.next_prompt)
+
+ def test_web_execute_js_extracts_js_alias_block(self):
+ captured = {}
+
+ def fake_web_execute_js(script, switch_tab_id=None, no_monitor=False):
+ captured.update({
+ "script": script,
+ "switch_tab_id": switch_tab_id,
+ "no_monitor": no_monitor,
+ })
+ return {"status": "success", "js_return": "ok"}
+
+ response = SimpleNamespace(content="```js\nconsole.log('ok')\n```")
+ with patch("ga.web_execute_js", new=fake_web_execute_js):
+ outcome = exhaust(self.handler.do_web_execute_js({}, response))
+
+ self.assertEqual(captured["script"], "console.log('ok')")
+ self.assertIn('"status": "success"', outcome.data)
+
+ def test_cached_tool_prompt_keeps_critical_rules(self):
+ client = ToolClient(SimpleNamespace(name="test-backend"))
+ tools = [{
+ "type": "function",
+ "function": {
+ "name": "code_run",
+ "description": "Code executor",
+ "parameters": {"type": "object", "properties": {}},
+ },
+ }]
+
+ first = client._prepare_tool_instruction_v2(tools)
+ second = client._prepare_tool_instruction_v2(tools)
+
+ self.assertIn("Critical tool rules", first)
+ self.assertIn("Critical tool rules", second)
+ self.assertIn("cwd:'../'", second)
+ self.assertIn("Format: ```", second)
+
+
+if __name__ == "__main__":
+ unittest.main()