Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/kimi_cli/wire/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, soul: Soul):
# soul running stuffs
self._soul = soul
self._cancel_event: asyncio.Event | None = None
self._turn_active: bool = False
"""True only while the soul is actively running a turn (between run_soul start and return)."""
self._pending_requests: dict[str, Request] = {}
"""Maps JSON RPC message IDs to pending `Request`s."""
self._client_supports_question: bool = False
Expand Down Expand Up @@ -540,6 +542,7 @@ async def _handle_prompt(
)

self._cancel_event = asyncio.Event()
self._turn_active = True
try:
runtime = self._soul.runtime if isinstance(self._soul, KimiSoul) else None
await run_soul(
Expand Down Expand Up @@ -580,6 +583,9 @@ async def _handle_prompt(
result={"status": Statuses.CANCELLED},
)
finally:
# Mark the turn as inactive immediately so concurrent steer/cancel
# handlers see the correct state before _cancel_event is cleared.
self._turn_active = False
# Clean up any remaining pending requests from this turn.
# After run_soul() returns, the soul and all subagents are done,
# so any unresolved requests are stale.
Expand Down Expand Up @@ -609,7 +615,7 @@ async def _handle_prompt(
async def _handle_steer(
self, msg: JSONRPCSteerMessage
) -> JSONRPCSuccessResponse | JSONRPCErrorResponse:
if not isinstance(self._soul, KimiSoul) or not self._is_streaming:
if not isinstance(self._soul, KimiSoul) or not self._turn_active:
return JSONRPCErrorResponse(
id=msg.id,
error=JSONRPCErrorObject(
Expand Down
26 changes: 26 additions & 0 deletions tests/core/test_wire_server_steer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async def test_handle_steer_queues_input_when_streaming(

monkeypatch.setattr(soul, "steer", lambda user_input: queued.append(user_input))
server._cancel_event = asyncio.Event()
server._turn_active = True

response = await server._handle_steer(
JSONRPCSteerMessage(
Expand All @@ -79,6 +80,31 @@ async def test_handle_steer_queues_input_when_streaming(
assert queued == [[TextPart(text="follow-up")]]


@pytest.mark.asyncio
async def test_handle_steer_rejects_after_turn_ends(
runtime: Runtime,
tmp_path: Path,
) -> None:
"""Steer sent after run_soul() returns but before _cancel_event is cleared should be rejected."""
soul = _make_soul(runtime, tmp_path)
server = WireServer(soul)

# Simulate the state after run_soul() returns but before finally cleanup:
# _cancel_event is still set (not yet None), but _turn_active is False.
server._cancel_event = asyncio.Event()
server._turn_active = False

response = await server._handle_steer(
JSONRPCSteerMessage(
id="1",
params=JSONRPCSteerMessage.Params(user_input=[TextPart(text="too late")]),
)
)

assert isinstance(response, JSONRPCErrorResponse)
assert response.error.code == ErrorCodes.INVALID_STATE


@pytest.mark.asyncio
async def test_shutdown_rejects_foreground_approval_in_runtime(
runtime: Runtime,
Expand Down
Loading