From 5dcfe9486bf00a0736f90a6cf589f4b3f1e867bd Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 9 May 2026 13:42:49 +0000 Subject: [PATCH 1/3] fix(watchdog): two-phase timeout + consume prompt_progress keepalives The processing watchdog was killing healthy LLM requests during slot contention and cold prefill (91K tokens, ~10 min silent). Three fixes: 1. Split watchdog into PrefillTimeout (1800s default) and InterDeltaTimeout (FirstTokenTimeout, 600s). Start generous, promote on first delta. 2. Request `return_progress: true` from llama-server and fix ParseStreamingUpdates to yield keepalives for content-less data events (e.g. prompt_progress) instead of silently dropping them. 3. Forward SSE comment lines as keepalives and send watchdog-refresh on first buffered text/thinking delta. --- .../LlmSessionStreamingTimeoutTests.cs | 1 + .../Sessions/LlmSessionWatchdogTests.cs | 1 + .../Sessions/Handlers/ProcessingWatchdog.cs | 19 +++++++++++++++++++ .../Sessions/LlmSessionActor.cs | 5 +++-- .../Sessions/Pipelines/SessionLlmInvoker.cs | 10 ++++++++++ .../Schemas/netclaw-config.v1.schema.json | 9 ++++++++- src/Netclaw.Configuration/SessionConfig.cs | 19 ++++++++++++++++--- .../OpenAiCompatibleChatClientTests.cs | 3 ++- .../SelfHosted/OpenAiCompatibleChatClient.cs | 18 +++++++++++++++++- 9 files changed, 77 insertions(+), 8 deletions(-) diff --git a/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs b/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs index 28ae71c4..36cc2f63 100644 --- a/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs +++ b/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs @@ -31,6 +31,7 @@ protected override void ConfigureSessionServices(IServiceCollection services) }); services.AddSingleton(new SessionConfig { + PrefillTimeout = TimeSpan.FromSeconds(2), FirstTokenTimeout = TimeSpan.FromSeconds(2), ToolExecutionTimeout = TimeSpan.FromSeconds(10), SidecarLlmTimeout = TimeSpan.FromSeconds(10), diff --git a/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs b/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs index 8c2f80f2..dcdfd836 100644 --- a/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs +++ b/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs @@ -29,6 +29,7 @@ protected override void ConfigureSessionServices(IServiceCollection services) }); services.AddSingleton(new SessionConfig { + PrefillTimeout = TimeSpan.FromSeconds(1), FirstTokenTimeout = TimeSpan.FromSeconds(1), ToolExecutionTimeout = TimeSpan.FromSeconds(1), SidecarLlmTimeout = TimeSpan.FromSeconds(1), diff --git a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs index 99d4fa6e..e77bc996 100644 --- a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs +++ b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs @@ -47,6 +47,25 @@ public void Stop(ITimerScheduler timers) _operationName = null; } + /// + /// Switch from the generous prefill budget to the tighter inter-delta timeout. + /// Called once when the first real streaming delta arrives. + /// + public void Promote(TimeSpan interDeltaTimeout, ITimerScheduler timers) + { + if (_operationName is not "llm-call") + return; + + timers.StartSingleTimer( + TimerKey, + new ProcessingWatchdogExpired + { + OperationId = _operationId, + OperationName = _operationName + }, + interDeltaTimeout); + } + /// /// Refresh the watchdog timer for an active LLM call (streaming keepalive). /// Only refreshes if the current operation is "llm-call". diff --git a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs index 9ad63bcc..2b8a75a1 100644 --- a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs +++ b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs @@ -492,6 +492,8 @@ private void Processing() Command(msg => { if (msg.CallId != _activeCallId) return; // stale delta from cancelled call + if (!_anyContentStreamed) + _watchdog.Promote(_config.FirstTokenTimeout, Timers); _anyContentStreamed = true; _watchdog.Refresh(_config.FirstTokenTimeout, Timers); @@ -2408,7 +2410,6 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false) var self = Self; var client = _chatClient; - var timeout = _config.FirstTokenTimeout; var exposedTools = ResolveExposedToolsForCurrentTurn(); ChatOptions? options = null; @@ -2420,7 +2421,7 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false) }; } - _watchdog.Start("llm-call", timeout, Timers); + _watchdog.Start("llm-call", _config.PrefillTimeout, Timers); TurnLog().Info("turn_llm_call_start messages={MessageCount} toolsEnabled={ToolsEnabled} forceNoTools={ForceNoTools} callId={CallId}", messages.Count, diff --git a/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs b/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs index c6cc35d4..b3317ff3 100644 --- a/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs +++ b/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs @@ -91,6 +91,11 @@ public static async Task StreamAsync( if (textDeltaCount == 1) { pendingTextDelta = text.Text; + self.Tell(new LlmResponseDeltaReceived + { + Content = EmptyTextContent, + CallId = callId + }); } else { @@ -114,6 +119,11 @@ public static async Task StreamAsync( if (thinkingDeltaCount == 1) { pendingThinkingDelta = thinking.Text; + self.Tell(new LlmResponseDeltaReceived + { + Content = EmptyTextContent, + CallId = callId + }); } else { diff --git a/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json b/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json index d88409bb..afb7cfdc 100644 --- a/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json +++ b/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json @@ -145,7 +145,14 @@ "minimum": 10, "maximum": 1800, "default": 600, - "description": "Maximum inactivity in seconds for LLM streaming calls. Used as both the initial wait for the first token and the silence threshold between deltas (resets on each delta)." + "description": "Maximum silence in seconds between consecutive LLM streaming deltas. After the first delta arrives, the watchdog uses this as the inter-delta timeout (timer resets on each delta)." + }, + "PrefillTimeoutSeconds": { + "type": "integer", + "minimum": 60, + "maximum": 7200, + "default": 1800, + "description": "Maximum time in seconds to wait for the first streaming delta from the LLM. Covers queue wait and prompt prefill on self-hosted backends. After the first delta, the watchdog switches to FirstTokenTimeoutSeconds." }, "CompactionThreshold": { "type": "number", diff --git a/src/Netclaw.Configuration/SessionConfig.cs b/src/Netclaw.Configuration/SessionConfig.cs index d9e8bd18..a67597fc 100644 --- a/src/Netclaw.Configuration/SessionConfig.cs +++ b/src/Netclaw.Configuration/SessionConfig.cs @@ -63,13 +63,22 @@ public sealed record SessionConfig public TimeSpan SidecarLlmTimeout { get; init; } = TimeSpan.FromSeconds(90); /// - /// Maximum inactivity timeout for LLM streaming calls. Used both as the - /// initial wait for the first token (prefill phase) and as the silence - /// threshold between consecutive deltas — the timer resets on every delta. + /// Maximum inactivity timeout between consecutive LLM streaming deltas. + /// The timer resets on every delta. Once the first delta arrives the watchdog + /// switches from to this tighter budget. /// Falls back to if not explicitly configured. /// public TimeSpan FirstTokenTimeout { get; init; } = TimeSpan.FromSeconds(600); + /// + /// Maximum time to wait for the first streaming delta from the LLM (covers + /// queue wait + prompt prefill). Generous default because self-hosted backends + /// can be legitimately silent for 10+ minutes during slot contention and cold + /// prefill of large contexts. After the first delta, the watchdog switches to + /// . + /// + public TimeSpan PrefillTimeout { get; init; } = TimeSpan.FromSeconds(1800); + /// /// Internal tuning constants. Bindable from config for development/testing /// but not part of the documented operator surface. @@ -99,6 +108,9 @@ public static SessionConfig BindFromConfiguration(IConfigurationSection section) FirstTokenTimeout = raw.FirstTokenTimeoutSeconds > 0 ? TimeSpan.FromSeconds(raw.FirstTokenTimeoutSeconds) : raw.TurnLlmTimeoutSeconds != 180 ? turnLlmTimeout : TimeSpan.FromSeconds(600), + PrefillTimeout = raw.PrefillTimeoutSeconds > 0 + ? TimeSpan.FromSeconds(raw.PrefillTimeoutSeconds) + : TimeSpan.FromSeconds(1800), Tuning = tuning, }; } @@ -150,5 +162,6 @@ private sealed record RawSessionConfig public int ToolExecutionTimeoutSeconds { get; init; } = 90; public int SidecarLlmTimeoutSeconds { get; init; } = 90; public int FirstTokenTimeoutSeconds { get; init; } + public int PrefillTimeoutSeconds { get; init; } } } diff --git a/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs b/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs index 0cb14497..07ddbffc 100644 --- a/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs +++ b/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs @@ -60,7 +60,8 @@ public async Task StreamsReasoningAndTextDeltas_FromOfficialSpectrum() await foreach (var update in client.GetStreamingResponseAsync([new ChatMessage(ChatRole.User, "hello")], cancellationToken: TestContext.Current.CancellationToken)) updates.Add(update); - Assert.Equal(3, updates.Count); + Assert.Equal(4, updates.Count); + Assert.Contains(updates, u => u.Contents.Count == 0 && u.FinishReason is null); // keepalive from content-less initial chunk Assert.Contains(updates, u => u.Contents.OfType().Any(c => c.Text == "Thinking")); Assert.Contains(updates, u => u.Contents.OfType().Any(c => c.Text == "Hello")); Assert.Contains(updates, u => u.FinishReason == ChatFinishReason.Stop); diff --git a/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs b/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs index 2339e69d..af6a701f 100644 --- a/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs +++ b/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs @@ -74,9 +74,17 @@ public async IAsyncEnumerable GetStreamingResponseAsync( if (line is null) break; - if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:", StringComparison.Ordinal)) + if (string.IsNullOrWhiteSpace(line)) continue; + if (!line.StartsWith("data:", StringComparison.Ordinal)) + { + // SSE comment lines (`:` prefix) and event-type lines — yield keepalive + // so the watchdog knows the connection is alive during prefill/queuing. + yield return KeepaliveUpdate; + continue; + } + var ssePayload = line[5..].Trim(); if (ssePayload == "[DONE]") break; @@ -173,6 +181,9 @@ private JsonObject BuildPayload(IEnumerable messages, ChatOptions? if (stream) { body["stream_options"] = new JsonObject { ["include_usage"] = true }; + // llama-server sends prefill progress as SSE data events when enabled. + // Harmless on servers that don't support it (unknown fields are ignored). + body["return_progress"] = true; } if (options?.Temperature is { } temperature) @@ -576,7 +587,12 @@ private static IEnumerable ParseStreamingUpdates(JsonElement contents.Add(new UsageContent(usage)); if (contents.Count == 0 && finishReason is null) + { + // Content-less data events (e.g. prompt_progress during prefill) — yield + // keepalive so the watchdog timer resets while the server is working. + yield return KeepaliveUpdate; yield break; + } yield return new ChatResponseUpdate(ChatRole.Assistant, contents) { From 41c7268f982d762d53a6b1d0ca5bcb3ef17b92ec Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 9 May 2026 13:48:13 +0000 Subject: [PATCH 2/3] refactor(watchdog): deduplicate Promote/Refresh and fix double timer restart Promote() then Refresh() with the same timeout restarted the timer twice on first delta. Restructured to call only one per delta. Extracted shared RestartLlmTimer() to eliminate identical method bodies. --- .../Sessions/Handlers/ProcessingWatchdog.cs | 17 ++++------------- src/Netclaw.Actors/Sessions/LlmSessionActor.cs | 9 +++++++-- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs index e77bc996..c553d10a 100644 --- a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs +++ b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs @@ -52,25 +52,16 @@ public void Stop(ITimerScheduler timers) /// Called once when the first real streaming delta arrives. /// public void Promote(TimeSpan interDeltaTimeout, ITimerScheduler timers) - { - if (_operationName is not "llm-call") - return; - - timers.StartSingleTimer( - TimerKey, - new ProcessingWatchdogExpired - { - OperationId = _operationId, - OperationName = _operationName - }, - interDeltaTimeout); - } + => RestartLlmTimer(interDeltaTimeout, timers); /// /// Refresh the watchdog timer for an active LLM call (streaming keepalive). /// Only refreshes if the current operation is "llm-call". /// public void Refresh(TimeSpan timeout, ITimerScheduler timers) + => RestartLlmTimer(timeout, timers); + + private void RestartLlmTimer(TimeSpan timeout, ITimerScheduler timers) { if (_operationName is not "llm-call") return; diff --git a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs index 2b8a75a1..f5410a7e 100644 --- a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs +++ b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs @@ -493,9 +493,14 @@ private void Processing() { if (msg.CallId != _activeCallId) return; // stale delta from cancelled call if (!_anyContentStreamed) + { + _anyContentStreamed = true; _watchdog.Promote(_config.FirstTokenTimeout, Timers); - _anyContentStreamed = true; - _watchdog.Refresh(_config.FirstTokenTimeout, Timers); + } + else + { + _watchdog.Refresh(_config.FirstTokenTimeout, Timers); + } switch (msg.Content) { From 021345f16a8f2479628c2ff43e4f4e03b327e6a3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 9 May 2026 13:51:57 +0000 Subject: [PATCH 3/3] refactor(watchdog): replace operation name string literals with constants Extract LlmCall, ToolExecution, Compaction constants on ProcessingWatchdog and replace all 7 call sites in LlmSessionActor. --- .../Sessions/Handlers/ProcessingWatchdog.cs | 8 ++++++-- src/Netclaw.Actors/Sessions/LlmSessionActor.cs | 16 ++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs index c553d10a..4b24cb1d 100644 --- a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs +++ b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs @@ -13,6 +13,10 @@ namespace Netclaw.Actors.Sessions.Handlers; /// internal sealed class ProcessingWatchdog { + public const string LlmCall = "llm-call"; + public const string ToolExecution = "tool-execution"; + public const string Compaction = "compaction"; + private static readonly object TimerKey = new(); private long _operationId; private string? _operationName; @@ -56,14 +60,14 @@ public void Promote(TimeSpan interDeltaTimeout, ITimerScheduler timers) /// /// Refresh the watchdog timer for an active LLM call (streaming keepalive). - /// Only refreshes if the current operation is "llm-call". + /// Only refreshes if the current operation is . /// public void Refresh(TimeSpan timeout, ITimerScheduler timers) => RestartLlmTimer(timeout, timers); private void RestartLlmTimer(TimeSpan timeout, ITimerScheduler timers) { - if (_operationName is not "llm-call") + if (_operationName is not LlmCall) return; timers.StartSingleTimer( diff --git a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs index f5410a7e..eef6bdf1 100644 --- a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs +++ b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs @@ -915,8 +915,8 @@ await _approvalService.RecordApprovalAsync( var timeout = msg.OperationName switch { - "tool-execution" => _config.ToolExecutionTimeout, - "llm-call" => _config.FirstTokenTimeout, + ProcessingWatchdog.ToolExecution => _config.ToolExecutionTimeout, + ProcessingWatchdog.LlmCall => _config.FirstTokenTimeout, _ => _config.TurnLlmTimeout }; @@ -1104,7 +1104,7 @@ private void Compacting() Command(msg => { var timeout = GetCompactionTimeout(); - _watchdog.Start("compaction", timeout, Timers); + _watchdog.Start(ProcessingWatchdog.Compaction, timeout, Timers); var operationId = _watchdog.CurrentOperationId; var stateSnapshot = _state; @@ -1450,7 +1450,7 @@ private void CommandSessionContextMessages() } private bool IsCurrentCompactionOperation(long operationId) - => _watchdog.IsCurrentOperation("compaction", operationId); + => _watchdog.IsCurrentOperation(ProcessingWatchdog.Compaction, operationId); /// @@ -1607,7 +1607,7 @@ private void HandleToolCallResponse( var maxInlineToolResultChars = _config.Tuning.MaxInlineToolResultChars; var toolExecutionTimeout = _config.ToolExecutionTimeout; - _watchdog.Start("tool-execution", toolExecutionTimeout, Timers); + _watchdog.Start(ProcessingWatchdog.ToolExecution, toolExecutionTimeout, Timers); // Capture subscriber snapshot for subagent activity notifications. // These are emitted directly from the tool execution thread via Tell(), @@ -2426,7 +2426,7 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false) }; } - _watchdog.Start("llm-call", _config.PrefillTimeout, Timers); + _watchdog.Start(ProcessingWatchdog.LlmCall, _config.PrefillTimeout, Timers); TurnLog().Info("turn_llm_call_start messages={MessageCount} toolsEnabled={ToolsEnabled} forceNoTools={ForceNoTools} callId={CallId}", messages.Count, @@ -2985,7 +2985,7 @@ private void EmitUsageOutput(UsageDetails usage) private void PauseToolExecutionWatchdogForApprovalWait(string callId) { - if (!string.Equals(_watchdog.CurrentOperationName, "tool-execution", StringComparison.Ordinal)) + if (!string.Equals(_watchdog.CurrentOperationName, ProcessingWatchdog.ToolExecution, StringComparison.Ordinal)) return; _watchdog.Stop(Timers); @@ -3003,7 +3003,7 @@ private void ResumeToolExecutionWatchdogAfterApprovalWait() if (_watchdog.CurrentOperationName is not null) return; - _watchdog.Start("tool-execution", _config.ToolExecutionTimeout, Timers); + _watchdog.Start(ProcessingWatchdog.ToolExecution, _config.ToolExecutionTimeout, Timers); _log.Info("Resumed tool-execution watchdog after approval response"); }