diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eff0d6e2..52bfc1607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Only write entries that are worth mentioning to users. ## Unreleased +- Core: Improve session startup resilience — `--continue`/`--resume` now tolerate malformed `context.jsonl` records and corrupted subagent, background-task, or notification artifacts; the CLI skips invalid persisted state where possible instead of failing to restore the session - Grep: Add `include_ignored` parameter to search files excluded by `.gitignore` — when set to `true`, ripgrep's `--no-ignore` flag is enabled, allowing searches in gitignored artifacts such as build outputs or `node_modules`; sensitive files (like `.env`) remain filtered by the sensitive-file protection layer; defaults to `false` to preserve existing behavior - Core: Add sensitive file protection to Grep and Read tools — `.env`, SSH private keys (`id_rsa`, `id_ed25519`, `id_ecdsa`), and cloud credentials (`.aws/credentials`, `.gcp/credentials`) are now detected and blocked; Grep filters them from results with a warning, Read rejects them outright; `.env.example`/`.env.sample`/`.env.template` are exempted - Core: Fix parallel foreground subagent approval requests hanging the session — in interactive shell mode, `_set_active_approval_sink` no longer flushes pending approval requests to the live view sink (which cannot render approval modals); requests stay in the pending queue for the prompt modal path; also adds a 300-second timeout to `wait_for_response` so that any unresolved approval request eventually raises `ApprovalCancelledError` instead of hanging forever diff --git a/docs/en/release-notes/changelog.md b/docs/en/release-notes/changelog.md index 6e52c2a20..e800ea71e 100644 --- a/docs/en/release-notes/changelog.md +++ b/docs/en/release-notes/changelog.md @@ -4,8 +4,9 @@ This page documents the changes in each Kimi Code CLI release. ## Unreleased +- Core: Improve session startup resilience — `--continue`/`--resume` now tolerate malformed `context.jsonl` records and corrupted subagent, background-task, or notification artifacts; the CLI skips invalid persisted state where possible instead of failing to restore the session +- Grep: Add `include_ignored` parameter to search files excluded by `.gitignore` — when set to `true`, ripgrep's `--no-ignore` flag is enabled, allowing searches in gitignored artifacts such as build outputs or `node_modules`; sensitive files (like `.env`) remain filtered by the sensitive-file protection layer; defaults to `false` to preserve existing behavior - CLI: Improve `kimi export` session export UX — `kimi export` now previews the previous session for the current working directory and asks for confirmation, showing the session ID, title, and last user-message time; adds `--yes` to skip confirmation; also fixes explicit session-ID invocations where `--output` after the argument was incorrectly parsed as a subcommand -- Grep: Add `include_ignored` parameter to search files excluded by `.gitignore` — when set to `true`, ripgrep's `--no-ignore` flag is enabled, allowing searches in files like `.env` or `node_modules` that are normally gitignored; defaults to `false` to preserve existing behavior - Core: Add sensitive file protection to Grep and Read tools — `.env`, SSH private keys (`id_rsa`, `id_ed25519`, `id_ecdsa`), and cloud credentials (`.aws/credentials`, `.gcp/credentials`) are now detected and blocked; Grep filters them from results with a warning, Read rejects them outright; `.env.example`/`.env.sample`/`.env.template` are exempted - Core: Fix parallel foreground subagent approval requests hanging the session — in interactive shell mode, `_set_active_approval_sink` no longer flushes pending approval requests to the live view sink (which cannot render approval modals); requests stay in the pending queue for the prompt modal path; also adds a 300-second timeout to `wait_for_response` so that any unresolved approval request eventually raises `ApprovalCancelledError` instead of hanging forever - CLI: Add `--session`/`--resume` (`-S`/`-r`) flag to resume sessions — without an argument opens an interactive session picker (shell UI only); with a session ID resumes that specific session; replaces the reverted `--pick-session`/`--list-sessions` design with a unified optional-value flag diff --git a/docs/zh/release-notes/changelog.md b/docs/zh/release-notes/changelog.md index 6fbc8042d..a85a75dbc 100644 --- a/docs/zh/release-notes/changelog.md +++ b/docs/zh/release-notes/changelog.md @@ -4,6 +4,7 @@ ## 未发布 +- Core:提升会话启动恢复的鲁棒性——`--continue`/`--resume` 现在可容忍损坏的 `context.jsonl` 记录,以及损坏的子 Agent、后台任务或通知持久化工件;CLI 会尽可能跳过无效状态并继续恢复会话,而不是直接启动失败 - CLI:改进 `kimi export` 会话导出体验——`kimi export` 现在默认预览并确认当前工作目录的上一个会话,显示会话 ID、标题和最后一条用户消息时间;新增 `--yes` 跳过确认;同时修复显式会话 ID 时 `--output` 放在参数后面会被错误解析为子命令的问题 - Grep:新增 `include_ignored` 参数,支持搜索被 `.gitignore` 排除的文件——设为 `true` 时启用 ripgrep 的 `--no-ignore` 标志,可搜索构建产物或 `node_modules` 等通常被忽略的文件;敏感文件(如 `.env`)仍由敏感文件保护层过滤;默认 `false`,不影响现有行为 - Core:为 Grep 和 Read 工具添加敏感文件保护——`.env`、SSH 私钥(`id_rsa`、`id_ed25519`、`id_ecdsa`)和云凭据(`.aws/credentials`、`.gcp/credentials`)会被检测并拦截;Grep 从结果中过滤并显示警告,Read 直接拒绝读取;`.env.example`/`.env.sample`/`.env.template` 不受影响 diff --git a/src/kimi_cli/background/store.py b/src/kimi_cli/background/store.py index ff3552793..70d625289 100644 --- a/src/kimi_cli/background/store.py +++ b/src/kimi_cli/background/store.py @@ -4,7 +4,10 @@ import re from pathlib import Path +from pydantic import BaseModel, ValidationError + from kimi_cli.utils.io import atomic_json_write +from kimi_cli.utils.logging import logger from .models import ( TaskConsumerState, @@ -104,7 +107,12 @@ def read_runtime(self, task_id: str) -> TaskRuntime: path = self.runtime_path(task_id) if not path.exists(): return TaskRuntime() - return TaskRuntime.model_validate_json(path.read_text(encoding="utf-8")) + return _read_json_model( + path, + TaskRuntime, + fallback=TaskRuntime(updated_at=0), + artifact="task runtime", + ) def write_control(self, task_id: str, control: TaskControl) -> None: atomic_json_write(control.model_dump(mode="json"), self.control_path(task_id)) @@ -113,7 +121,12 @@ def read_control(self, task_id: str) -> TaskControl: path = self.control_path(task_id) if not path.exists(): return TaskControl() - return TaskControl.model_validate_json(path.read_text(encoding="utf-8")) + return _read_json_model( + path, + TaskControl, + fallback=TaskControl(), + artifact="task control", + ) def write_consumer(self, task_id: str, consumer: TaskConsumerState) -> None: atomic_json_write(consumer.model_dump(mode="json"), self.consumer_path(task_id)) @@ -122,7 +135,12 @@ def read_consumer(self, task_id: str) -> TaskConsumerState: path = self.consumer_path(task_id) if not path.exists(): return TaskConsumerState() - return TaskConsumerState.model_validate_json(path.read_text(encoding="utf-8")) + return _read_json_model( + path, + TaskConsumerState, + fallback=TaskConsumerState(), + artifact="task consumer state", + ) def merged_view(self, task_id: str) -> TaskView: return TaskView( @@ -133,7 +151,17 @@ def merged_view(self, task_id: str) -> TaskView: ) def list_views(self) -> list[TaskView]: - views = [self.merged_view(task_id) for task_id in self.list_task_ids()] + views: list[TaskView] = [] + for task_id in self.list_task_ids(): + try: + views.append(self.merged_view(task_id)) + except (OSError, ValidationError, ValueError, UnicodeDecodeError) as exc: + logger.warning( + "Skipping invalid background task {task_id} from {path}: {error}", + task_id=task_id, + path=self.root / task_id / self.SPEC_FILE, + error=exc, + ) views.sort( key=lambda view: view.runtime.updated_at or view.spec.created_at, reverse=True, @@ -194,3 +222,16 @@ def tail_output(self, task_id: str, max_bytes: int, max_lines: int) -> str: if len(lines) > max_lines: lines = lines[-max_lines:] return "\n".join(lines) + + +def _read_json_model[T: BaseModel](path: Path, model: type[T], *, fallback: T, artifact: str) -> T: + try: + return model.model_validate_json(path.read_text(encoding="utf-8")) + except (OSError, ValidationError, ValueError, UnicodeDecodeError) as exc: + logger.warning( + "Failed to read {artifact} from {path}; using defaults: {error}", + artifact=artifact, + path=path, + error=exc, + ) + return fallback diff --git a/src/kimi_cli/notifications/store.py b/src/kimi_cli/notifications/store.py index f1f0601ed..393f8e815 100644 --- a/src/kimi_cli/notifications/store.py +++ b/src/kimi_cli/notifications/store.py @@ -3,7 +3,10 @@ import re from pathlib import Path +from pydantic import ValidationError + from kimi_cli.utils.io import atomic_json_write +from kimi_cli.utils.logging import logger from .models import NotificationDelivery, NotificationEvent, NotificationView @@ -80,7 +83,15 @@ def read_delivery(self, notification_id: str) -> NotificationDelivery: path = self.delivery_path(notification_id) if not path.exists(): return NotificationDelivery() - return NotificationDelivery.model_validate_json(path.read_text(encoding="utf-8")) + try: + return NotificationDelivery.model_validate_json(path.read_text(encoding="utf-8")) + except (OSError, ValidationError, ValueError, UnicodeDecodeError) as exc: + logger.warning( + "Failed to read notification delivery {path}; using defaults: {error}", + path=path, + error=exc, + ) + return NotificationDelivery() def write_delivery(self, notification_id: str, delivery: NotificationDelivery) -> None: atomic_json_write(delivery.model_dump(mode="json"), self.delivery_path(notification_id)) @@ -92,8 +103,16 @@ def merged_view(self, notification_id: str) -> NotificationView: ) def list_views(self) -> list[NotificationView]: - views = [ - self.merged_view(notification_id) for notification_id in self.list_notification_ids() - ] + views: list[NotificationView] = [] + for notification_id in self.list_notification_ids(): + try: + views.append(self.merged_view(notification_id)) + except (OSError, ValidationError, ValueError, UnicodeDecodeError) as exc: + logger.warning( + "Skipping invalid notification {notification_id} from {path}: {error}", + notification_id=notification_id, + path=self.root / notification_id / self.EVENT_FILE, + error=exc, + ) views.sort(key=lambda view: view.event.created_at, reverse=True) return views diff --git a/src/kimi_cli/soul/context.py b/src/kimi_cli/soul/context.py index e5af074c9..864935d9d 100644 --- a/src/kimi_cli/soul/context.py +++ b/src/kimi_cli/soul/context.py @@ -4,10 +4,12 @@ import json from collections.abc import Sequence from pathlib import Path +from typing import Any, cast import aiofiles import aiofiles.os from kosong.message import Message +from pydantic import ValidationError from kimi_cli.soul.compaction import estimate_text_tokens from kimi_cli.soul.message import system @@ -38,24 +40,26 @@ async def restore(self) -> bool: return False messages_after_last_usage: list[Message] = [] - async with aiofiles.open(self._file_backend, encoding="utf-8") as f: + async with aiofiles.open(self._file_backend, encoding="utf-8", errors="replace") as f: + line_no = 0 async for line in f: + line_no += 1 if not line.strip(): continue - line_json = json.loads(line, strict=False) - if line_json["role"] == "_system_prompt": - self._system_prompt = line_json["content"] + line_json = self._parse_context_line( + line, + file_backend=self._file_backend, + line_no=line_no, + ) + if line_json is None: continue - if line_json["role"] == "_usage": - self._token_count = line_json["token_count"] - messages_after_last_usage.clear() - continue - if line_json["role"] == "_checkpoint": - self._next_checkpoint_id = line_json["id"] + 1 - continue - message = Message.model_validate(line_json) - self._history.append(message) - messages_after_last_usage.append(message) + self._apply_context_record( + line_json, + history=self._history, + messages_after_last_usage=messages_after_last_usage, + file_backend=self._file_backend, + line_no=line_no, + ) self._pending_token_estimate = estimate_text_tokens(messages_after_last_usage) return True @@ -164,29 +168,34 @@ async def revert_to(self, checkpoint_id: int): self._system_prompt = None messages_after_last_usage: list[Message] = [] async with ( - aiofiles.open(rotated_file_path, encoding="utf-8") as old_file, + aiofiles.open(rotated_file_path, encoding="utf-8", errors="replace") as old_file, aiofiles.open(self._file_backend, "w", encoding="utf-8") as new_file, ): + line_no = 0 async for line in old_file: + line_no += 1 if not line.strip(): continue - line_json = json.loads(line, strict=False) - if line_json["role"] == "_checkpoint" and line_json["id"] == checkpoint_id: + line_json = self._parse_context_line( + line, + file_backend=rotated_file_path, + line_no=line_no, + ) + if line_json is None: + continue + if line_json.get("role") == "_checkpoint" and line_json.get("id") == checkpoint_id: break - await new_file.write(line) - if line_json["role"] == "_system_prompt": - self._system_prompt = line_json["content"] - elif line_json["role"] == "_usage": - self._token_count = line_json["token_count"] - messages_after_last_usage.clear() - elif line_json["role"] == "_checkpoint": - self._next_checkpoint_id = line_json["id"] + 1 - else: - message = Message.model_validate(line_json) - self._history.append(message) - messages_after_last_usage.append(message) + keep_line = self._apply_context_record( + line_json, + history=self._history, + messages_after_last_usage=messages_after_last_usage, + file_backend=rotated_file_path, + line_no=line_no, + ) + if keep_line: + await new_file.write(line) self._pending_token_estimate = estimate_text_tokens(messages_after_last_usage) @@ -237,3 +246,94 @@ async def update_token_count(self, token_count: int): async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f: await f.write(json.dumps({"role": "_usage", "token_count": token_count}) + "\n") + + def _parse_context_line( + self, + line: str, + *, + file_backend: Path, + line_no: int, + ) -> dict[str, Any] | None: + try: + line_json = json.loads(line, strict=False) + except json.JSONDecodeError as exc: + logger.warning( + "Skipping malformed context line {line_no} in {file}: {error}", + line_no=line_no, + file=file_backend, + error=exc, + ) + return None + if not isinstance(line_json, dict): + logger.warning( + "Skipping non-object context line {line_no} in {file}", + line_no=line_no, + file=file_backend, + ) + return None + return cast(dict[str, Any], line_json) + + def _apply_context_record( + self, + line_json: dict[str, Any], + *, + history: list[Message], + messages_after_last_usage: list[Message], + file_backend: Path, + line_no: int, + ) -> bool: + role = line_json.get("role") + if not isinstance(role, str): + logger.warning( + "Skipping context line {line_no} in {file}: missing or invalid role", + line_no=line_no, + file=file_backend, + ) + return False + if role == "_system_prompt": + content = line_json.get("content") + if not isinstance(content, str): + logger.warning( + "Skipping invalid system prompt line {line_no} in {file}", + line_no=line_no, + file=file_backend, + ) + return False + self._system_prompt = content + return True + if role == "_usage": + token_count = line_json.get("token_count") + if not isinstance(token_count, int): + logger.warning( + "Skipping invalid usage line {line_no} in {file}", + line_no=line_no, + file=file_backend, + ) + return False + self._token_count = token_count + messages_after_last_usage.clear() + return True + if role == "_checkpoint": + checkpoint_id = line_json.get("id") + if not isinstance(checkpoint_id, int): + logger.warning( + "Skipping invalid checkpoint line {line_no} in {file}", + line_no=line_no, + file=file_backend, + ) + return False + self._next_checkpoint_id = checkpoint_id + 1 + return True + try: + message = Message.model_validate(line_json) + except ValidationError as exc: + logger.warning( + "Skipping invalid context message line {line_no} in {file}: {error}", + line_no=line_no, + file=file_backend, + error=exc, + ) + return False + history.append(message) + messages_after_last_usage.append(message) + return True diff --git a/src/kimi_cli/subagents/store.py b/src/kimi_cli/subagents/store.py index ac5285568..f6c295832 100644 --- a/src/kimi_cli/subagents/store.py +++ b/src/kimi_cli/subagents/store.py @@ -1,26 +1,63 @@ from __future__ import annotations +import json import shutil from dataclasses import asdict from pathlib import Path from typing import Any, cast +from pydantic import BaseModel, ValidationError + from kimi_cli.session import Session from kimi_cli.subagents.models import AgentInstanceRecord, AgentLaunchSpec, SubagentStatus from kimi_cli.utils.io import atomic_json_write +from kimi_cli.utils.logging import logger + + +class _AgentLaunchSpecPayload(BaseModel): + agent_id: str + subagent_type: str + model_override: str | None + effective_model: str | None + created_at: float + + +class _AgentInstanceRecordPayload(BaseModel): + agent_id: str + subagent_type: str + status: str + description: str + created_at: float + updated_at: float + last_task_id: str | None = None + launch_spec: _AgentLaunchSpecPayload + + +_VALID_SUBAGENT_STATUSES = cast( + tuple[str, ...], + ("idle", "running_foreground", "running_background", "completed", "failed", "killed"), +) def _record_from_dict(data: dict[str, Any]) -> AgentInstanceRecord: - launch_spec = data["launch_spec"] + payload = _AgentInstanceRecordPayload.model_validate(data) + if payload.status not in _VALID_SUBAGENT_STATUSES: + raise ValueError(f"Invalid subagent status: {payload.status!r}") return AgentInstanceRecord( - agent_id=data["agent_id"], - subagent_type=data["subagent_type"], - status=data["status"], - description=data["description"], - created_at=data["created_at"], - updated_at=data["updated_at"], - last_task_id=data.get("last_task_id"), - launch_spec=AgentLaunchSpec(**launch_spec), + agent_id=payload.agent_id, + subagent_type=payload.subagent_type, + status=cast(SubagentStatus, payload.status), + description=payload.description, + created_at=payload.created_at, + updated_at=payload.updated_at, + last_task_id=payload.last_task_id, + launch_spec=AgentLaunchSpec( + agent_id=payload.launch_spec.agent_id, + subagent_type=payload.launch_spec.subagent_type, + model_override=payload.launch_spec.model_override, + effective_model=payload.launch_spec.effective_model, + created_at=payload.launch_spec.created_at, + ), ) @@ -89,9 +126,7 @@ def get_instance(self, agent_id: str) -> AgentInstanceRecord | None: meta = self.meta_path(agent_id) if not meta.exists(): return None - import json - - return _record_from_dict(json.loads(meta.read_text(encoding="utf-8"))) + return _load_instance_record(meta) def require_instance(self, agent_id: str) -> AgentInstanceRecord: record = self.get_instance(agent_id) @@ -135,9 +170,10 @@ def list_instances(self) -> list[AgentInstanceRecord]: meta = path / "meta.json" if not meta.exists(): continue - import json - - records.append(_record_from_dict(json.loads(meta.read_text(encoding="utf-8")))) + record = _load_instance_record(meta) + if record is None: + continue + records.append(record) records.sort(key=lambda record: record.updated_at, reverse=True) return records @@ -146,3 +182,15 @@ def delete_instance(self, agent_id: str) -> None: if not instance_dir.exists(): return shutil.rmtree(instance_dir) + + +def _load_instance_record(meta_path: Path) -> AgentInstanceRecord | None: + try: + return _record_from_dict(json.loads(meta_path.read_text(encoding="utf-8"))) + except (OSError, json.JSONDecodeError, ValidationError, TypeError, ValueError) as exc: + logger.warning( + "Skipping invalid subagent metadata {path}: {error}", + path=meta_path, + error=exc, + ) + return None diff --git a/tests/background/test_manager.py b/tests/background/test_manager.py index 6b9d6afb9..bce59f8b5 100644 --- a/tests/background/test_manager.py +++ b/tests/background/test_manager.py @@ -813,6 +813,35 @@ def test_reconcile_recovers_and_publishes_lost_notification(runtime): assert notification.event.source_id == spec.id +def test_reconcile_marks_task_lost_when_runtime_json_is_corrupted(runtime): + manager = runtime.background_tasks + store = manager.store + spec = TaskSpec( + id="b2222226", + kind="bash", + session_id=runtime.session.id, + description="corrupted runtime task", + tool_call_id="tool-3e", + created_at=time.time() - 60, + command="sleep 10", + shell_name="bash", + shell_path="/bin/bash", + cwd=str(runtime.session.work_dir), + timeout_s=60, + ) + store.create_task(spec) + store.runtime_path(spec.id).write_text('{"status":"running"', encoding="utf-8") + + published = manager.reconcile(limit=4) + + assert len(published) == 1 + recovered = store.merged_view(spec.id) + assert recovered.runtime.status == "lost" + notification = runtime.notifications.store.merged_view(published[0]) + assert notification.event.type == "task.lost" + assert notification.event.source_id == spec.id + + def test_reconcile_does_not_republish_same_terminal_notification(runtime): manager = runtime.background_tasks store = manager.store diff --git a/tests/background/test_store.py b/tests/background/test_store.py index 192f41066..3ecb0d9a9 100644 --- a/tests/background/test_store.py +++ b/tests/background/test_store.py @@ -1,5 +1,8 @@ from __future__ import annotations +import json +import time + from kimi_cli.background import BackgroundTaskStore, TaskSpec @@ -90,3 +93,116 @@ def test_list_views_skips_invalid_task_directories(runtime): views = store.list_views() assert len(views) == 1 assert views[0].spec.id == "b8888888" + + +def test_list_views_skips_invalid_task_id_directories_with_spec_file(runtime): + store = BackgroundTaskStore(runtime.session.context_file.parent / "tasks") + valid = TaskSpec( + id="b8888887", + kind="bash", + session_id=runtime.session.id, + description="valid task", + tool_call_id="call-3b", + command="echo ok", + shell_name="bash", + shell_path="/bin/bash", + cwd=str(runtime.session.work_dir), + timeout_s=60, + ) + store.create_task(valid) + + invalid_dir = store.root / "bad-task!" + invalid_dir.mkdir(parents=True, exist_ok=True) + (invalid_dir / store.SPEC_FILE).write_text("{}", encoding="utf-8") + + views = store.list_views() + + assert [view.spec.id for view in views] == ["b8888887"] + + +def test_read_runtime_invalid_json_returns_default(runtime): + store = BackgroundTaskStore(runtime.session.context_file.parent / "tasks") + spec = TaskSpec( + id="b9999998", + kind="bash", + session_id=runtime.session.id, + description="runtime fallback", + tool_call_id="call-4", + command="echo ok", + shell_name="bash", + shell_path="/bin/bash", + cwd=str(runtime.session.work_dir), + timeout_s=60, + ) + store.create_task(spec) + store.runtime_path(spec.id).write_text('{"status":"running"', encoding="utf-8") + + runtime_state = store.read_runtime(spec.id) + + assert runtime_state.status == "created" + assert runtime_state.worker_pid is None + assert runtime_state.updated_at == 0 + + +def test_list_views_skips_task_with_corrupted_spec(runtime): + store = BackgroundTaskStore(runtime.session.context_file.parent / "tasks") + valid = TaskSpec( + id="b9999996", + kind="bash", + session_id=runtime.session.id, + description="valid task", + tool_call_id="call-5", + command="echo ok", + shell_name="bash", + shell_path="/bin/bash", + cwd=str(runtime.session.work_dir), + timeout_s=60, + ) + store.create_task(valid) + + bad_dir = store.task_dir("b9999997") + (bad_dir / store.SPEC_FILE).write_text(json.dumps({"oops": 1}), encoding="utf-8") + + views = store.list_views() + + assert [view.spec.id for view in views] == ["b9999996"] + + +def test_list_views_uses_spec_created_at_when_runtime_is_corrupted(runtime): + store = BackgroundTaskStore(runtime.session.context_file.parent / "tasks") + older = time.time() - 60 + newer = time.time() - 10 + older_spec = TaskSpec( + id="b9999994", + kind="bash", + session_id=runtime.session.id, + description="older task", + tool_call_id="call-6", + created_at=older, + command="echo ok", + shell_name="bash", + shell_path="/bin/bash", + cwd=str(runtime.session.work_dir), + timeout_s=60, + ) + newer_spec = TaskSpec( + id="b9999995", + kind="bash", + session_id=runtime.session.id, + description="newer task", + tool_call_id="call-7", + created_at=newer, + command="echo ok", + shell_name="bash", + shell_path="/bin/bash", + cwd=str(runtime.session.work_dir), + timeout_s=60, + ) + store.create_task(older_spec) + store.create_task(newer_spec) + store.runtime_path(older_spec.id).write_text('{"status":"running"', encoding="utf-8") + store.runtime_path(newer_spec.id).write_text('{"status":"running"', encoding="utf-8") + + views = store.list_views() + + assert [view.spec.id for view in views] == ["b9999995", "b9999994"] diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 0896760c4..2ff20dfb6 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -155,6 +155,48 @@ async def test_restore_with_all_record_types(tmp_path: Path) -> None: assert len(ctx.history) == 2 +@pytest.mark.asyncio +async def test_restore_skips_malformed_trailing_line(tmp_path: Path) -> None: + path = tmp_path / "context.jsonl" + valid_lines = [ + {"role": "_system_prompt", "content": "Frozen prompt"}, + _message_dict("user", "Hello"), + ] + path.write_text( + "".join(json.dumps(line) + "\n" for line in valid_lines) + + '{"role":"assistant","content":"unterminated\n', + encoding="utf-8", + ) + + ctx = Context(file_backend=path) + restored = await ctx.restore() + + assert restored is True + assert ctx.system_prompt == "Frozen prompt" + assert len(ctx.history) == 1 + assert ctx.history[0].role == "user" + + +@pytest.mark.asyncio +async def test_restore_skips_truncated_utf8_trailing_line(tmp_path: Path) -> None: + path = tmp_path / "context.jsonl" + valid_prefix = ( + json.dumps({"role": "_system_prompt", "content": "Frozen prompt"}, ensure_ascii=False) + + "\n" + + json.dumps(_message_dict("user", "Hello"), ensure_ascii=False) + + "\n" + ).encode("utf-8") + path.write_bytes(valid_prefix + b'{"role":"assistant","content":"\xe4\xb8\n') + + ctx = Context(file_backend=path) + restored = await ctx.restore() + + assert restored is True + assert ctx.system_prompt == "Frozen prompt" + assert len(ctx.history) == 1 + assert ctx.history[0].role == "user" + + # --- clear tests --- @@ -238,6 +280,87 @@ async def test_revert_preserves_system_prompt_in_file(tmp_path: Path) -> None: assert lines[0] == {"role": "_system_prompt", "content": "File preserved"} +@pytest.mark.asyncio +async def test_revert_skips_malformed_line_before_checkpoint(tmp_path: Path) -> None: + path = tmp_path / "context.jsonl" + path.write_text( + "".join( + [ + json.dumps({"role": "_system_prompt", "content": "Recovered prompt"}) + "\n", + json.dumps(_message_dict("user", "Before bad line")) + "\n", + '{"role":"assistant","content":"unterminated\n', + json.dumps({"role": "_checkpoint", "id": 0}) + "\n", + ] + ), + encoding="utf-8", + ) + + ctx = Context(file_backend=path) + await ctx.restore() + await ctx.revert_to(0) + + lines = _read_lines(path) + assert lines == [ + {"role": "_system_prompt", "content": "Recovered prompt"}, + _message_dict("user", "Before bad line"), + ] + + +@pytest.mark.asyncio +async def test_revert_skips_structurally_invalid_json_object_before_checkpoint( + tmp_path: Path, +) -> None: + path = tmp_path / "context.jsonl" + path.write_text( + "".join( + [ + json.dumps({"role": "_system_prompt", "content": "Recovered prompt"}) + "\n", + json.dumps(_message_dict("user", "Before bad line")) + "\n", + json.dumps({"oops": 1}) + "\n", + json.dumps({"role": "_checkpoint", "id": 0}) + "\n", + ] + ), + encoding="utf-8", + ) + + ctx = Context(file_backend=path) + await ctx.restore() + await ctx.revert_to(0) + + lines = _read_lines(path) + assert lines == [ + {"role": "_system_prompt", "content": "Recovered prompt"}, + _message_dict("user", "Before bad line"), + ] + + +@pytest.mark.asyncio +async def test_revert_skips_truncated_utf8_line_before_checkpoint(tmp_path: Path) -> None: + path = tmp_path / "context.jsonl" + path.write_bytes( + ( + json.dumps( + {"role": "_system_prompt", "content": "Recovered prompt"}, ensure_ascii=False + ) + + "\n" + + json.dumps(_message_dict("user", "Before bad line"), ensure_ascii=False) + + "\n" + ).encode("utf-8") + + b'{"role":"assistant","content":"\xe4\xb8\n' + + (json.dumps({"role": "_checkpoint", "id": 0}) + "\n").encode("utf-8") + ) + + ctx = Context(file_backend=path) + await ctx.restore() + await ctx.revert_to(0) + + lines = _read_lines(path) + assert lines == [ + {"role": "_system_prompt", "content": "Recovered prompt"}, + _message_dict("user", "Before bad line"), + ] + + # --- round-trip tests --- diff --git a/tests/core/test_subagent_store.py b/tests/core/test_subagent_store.py index 17bbe3639..1a3cbb78c 100644 --- a/tests/core/test_subagent_store.py +++ b/tests/core/test_subagent_store.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import time from kimi_cli.subagents import AgentLaunchSpec, SubagentStore @@ -97,3 +98,101 @@ def test_update_instance_does_not_touch_auxiliary_files(session) -> None: } assert after == before + + +def test_list_instances_skips_corrupted_meta(session) -> None: + store = SubagentStore(session) + store.create_instance( + agent_id="a4444444", + description="valid task", + launch_spec=AgentLaunchSpec( + agent_id="a4444444", + subagent_type="coder", + model_override=None, + effective_model=None, + ), + ) + bad_dir = store.instance_dir("a5555555", create=True) + (bad_dir / "meta.json").write_text('{"agent_id":"a5555555","launch_spec":', encoding="utf-8") + + records = store.list_instances() + + assert [record.agent_id for record in records] == ["a4444444"] + + +def test_get_instance_returns_none_for_corrupted_meta(session) -> None: + store = SubagentStore(session) + bad_dir = store.instance_dir("a6666666", create=True) + (bad_dir / "meta.json").write_text(json.dumps({"oops": 1}), encoding="utf-8") + + assert store.get_instance("a6666666") is None + + +def test_get_instance_allows_missing_last_task_id_for_legacy_meta(session) -> None: + store = SubagentStore(session) + legacy_dir = store.instance_dir("a6767676", create=True) + (legacy_dir / "meta.json").write_text( + json.dumps( + { + "agent_id": "a6767676", + "subagent_type": "coder", + "status": "idle", + "description": "legacy task", + "created_at": 1.0, + "updated_at": 2.0, + "launch_spec": { + "agent_id": "a6767676", + "subagent_type": "coder", + "model_override": None, + "effective_model": None, + "created_at": 1.0, + }, + } + ), + encoding="utf-8", + ) + + record = store.get_instance("a6767676") + + assert record is not None + assert record.last_task_id is None + + +def test_list_instances_skips_meta_with_invalid_field_types(session) -> None: + store = SubagentStore(session) + store.create_instance( + agent_id="a7777777", + description="valid task", + launch_spec=AgentLaunchSpec( + agent_id="a7777777", + subagent_type="coder", + model_override=None, + effective_model=None, + ), + ) + bad_dir = store.instance_dir("a8888888", create=True) + (bad_dir / "meta.json").write_text( + json.dumps( + { + "agent_id": "a8888888", + "subagent_type": "coder", + "status": "idle", + "description": "bad task", + "created_at": "not-a-number", + "updated_at": "also-bad", + "last_task_id": None, + "launch_spec": { + "agent_id": "a8888888", + "subagent_type": "coder", + "model_override": None, + "effective_model": None, + "created_at": "not-a-number", + }, + } + ), + encoding="utf-8", + ) + + records = store.list_instances() + + assert [record.agent_id for record in records] == ["a7777777"] diff --git a/tests/notifications/test_notification_manager.py b/tests/notifications/test_notification_manager.py index c4b4f0122..0ea00ae7e 100644 --- a/tests/notifications/test_notification_manager.py +++ b/tests/notifications/test_notification_manager.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import time import pytest @@ -145,6 +146,28 @@ def test_list_views_skips_incomplete_notification_directories(runtime) -> None: assert views[0].event.id == event.id +def test_list_views_skips_invalid_notification_id_directories(runtime) -> None: + manager = runtime.notifications + event = NotificationEvent( + id=manager.new_id(), + category="task", + type="task.completed", + source_kind="background_task", + source_id="b7654321", + title="Task completed", + body="done", + ) + manager.publish(event) + + invalid_dir = manager.store.root / "n-bad!" + invalid_dir.mkdir(parents=True, exist_ok=True) + (invalid_dir / manager.store.EVENT_FILE).write_text("{}", encoding="utf-8") + + views = manager.store.list_views() + + assert [view.event.id for view in views] == [event.id] + + @pytest.mark.asyncio async def test_deliver_pending_runs_shared_claim_and_ack_flow(runtime) -> None: manager = runtime.notifications @@ -203,3 +226,68 @@ async def _boom(_view) -> None: assert delivered == [] stored = manager.store.merged_view(event.id) assert stored.delivery.sinks["wire"].status == "claimed" + + +def test_list_views_skips_notification_with_corrupted_event(runtime) -> None: + manager = runtime.notifications + event = manager.publish( + NotificationEvent( + id=manager.new_id(), + category="system", + type="system.info", + source_kind="test", + source_id="source-1", + title="Info", + body="hello", + ) + ) + bad_id = "ncorrupt1" + bad_dir = manager.store.notification_dir(bad_id) + (bad_dir / manager.store.EVENT_FILE).write_text('{"id":"ncorrupt1"', encoding="utf-8") + + views = manager.store.list_views() + + assert [view.event.id for view in views] == [event.event.id] + + +def test_read_delivery_invalid_json_returns_default(runtime) -> None: + manager = runtime.notifications + event = manager.publish( + NotificationEvent( + id=manager.new_id(), + category="system", + type="system.info", + source_kind="test", + source_id="source-1", + title="Info", + body="hello", + ) + ) + manager.store.delivery_path(event.event.id).write_text('{"sinks":', encoding="utf-8") + + delivery = manager.store.read_delivery(event.event.id) + + assert delivery.sinks == {} + + +def test_recover_skips_notification_with_structurally_invalid_event(runtime) -> None: + manager = runtime.notifications + event = manager.publish( + NotificationEvent( + id=manager.new_id(), + category="system", + type="system.info", + source_kind="test", + source_id="source-1", + title="Info", + body="hello", + ) + ) + bad_id = "ncorrupt2" + bad_dir = manager.store.notification_dir(bad_id) + (bad_dir / manager.store.EVENT_FILE).write_text(json.dumps({"oops": 1}), encoding="utf-8") + + manager.recover() + + views = manager.store.list_views() + assert [view.event.id for view in views] == [event.event.id]