diff --git a/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs b/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs index 28ae71c4..36cc2f63 100644 --- a/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs +++ b/src/Netclaw.Actors.Tests/Sessions/LlmSessionStreamingTimeoutTests.cs @@ -31,6 +31,7 @@ protected override void ConfigureSessionServices(IServiceCollection services) }); services.AddSingleton(new SessionConfig { + PrefillTimeout = TimeSpan.FromSeconds(2), FirstTokenTimeout = TimeSpan.FromSeconds(2), ToolExecutionTimeout = TimeSpan.FromSeconds(10), SidecarLlmTimeout = TimeSpan.FromSeconds(10), diff --git a/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs b/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs index 8c2f80f2..dcdfd836 100644 --- a/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs +++ b/src/Netclaw.Actors.Tests/Sessions/LlmSessionWatchdogTests.cs @@ -29,6 +29,7 @@ protected override void ConfigureSessionServices(IServiceCollection services) }); services.AddSingleton(new SessionConfig { + PrefillTimeout = TimeSpan.FromSeconds(1), FirstTokenTimeout = TimeSpan.FromSeconds(1), ToolExecutionTimeout = TimeSpan.FromSeconds(1), SidecarLlmTimeout = TimeSpan.FromSeconds(1), diff --git a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs index 99d4fa6e..4b24cb1d 100644 --- a/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs +++ b/src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs @@ -13,6 +13,10 @@ namespace Netclaw.Actors.Sessions.Handlers; /// internal sealed class ProcessingWatchdog { + public const string LlmCall = "llm-call"; + public const string ToolExecution = "tool-execution"; + public const string Compaction = "compaction"; + private static readonly object TimerKey = new(); private long _operationId; private string? _operationName; @@ -47,13 +51,23 @@ public void Stop(ITimerScheduler timers) _operationName = null; } + /// + /// Switch from the generous prefill budget to the tighter inter-delta timeout. + /// Called once when the first real streaming delta arrives. + /// + public void Promote(TimeSpan interDeltaTimeout, ITimerScheduler timers) + => RestartLlmTimer(interDeltaTimeout, timers); + /// /// Refresh the watchdog timer for an active LLM call (streaming keepalive). - /// Only refreshes if the current operation is "llm-call". + /// Only refreshes if the current operation is . /// public void Refresh(TimeSpan timeout, ITimerScheduler timers) + => RestartLlmTimer(timeout, timers); + + private void RestartLlmTimer(TimeSpan timeout, ITimerScheduler timers) { - if (_operationName is not "llm-call") + if (_operationName is not LlmCall) return; timers.StartSingleTimer( diff --git a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs index 9ad63bcc..eef6bdf1 100644 --- a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs +++ b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs @@ -492,8 +492,15 @@ private void Processing() Command(msg => { if (msg.CallId != _activeCallId) return; // stale delta from cancelled call - _anyContentStreamed = true; - _watchdog.Refresh(_config.FirstTokenTimeout, Timers); + if (!_anyContentStreamed) + { + _anyContentStreamed = true; + _watchdog.Promote(_config.FirstTokenTimeout, Timers); + } + else + { + _watchdog.Refresh(_config.FirstTokenTimeout, Timers); + } switch (msg.Content) { @@ -908,8 +915,8 @@ await _approvalService.RecordApprovalAsync( var timeout = msg.OperationName switch { - "tool-execution" => _config.ToolExecutionTimeout, - "llm-call" => _config.FirstTokenTimeout, + ProcessingWatchdog.ToolExecution => _config.ToolExecutionTimeout, + ProcessingWatchdog.LlmCall => _config.FirstTokenTimeout, _ => _config.TurnLlmTimeout }; @@ -1097,7 +1104,7 @@ private void Compacting() Command(msg => { var timeout = GetCompactionTimeout(); - _watchdog.Start("compaction", timeout, Timers); + _watchdog.Start(ProcessingWatchdog.Compaction, timeout, Timers); var operationId = _watchdog.CurrentOperationId; var stateSnapshot = _state; @@ -1443,7 +1450,7 @@ private void CommandSessionContextMessages() } private bool IsCurrentCompactionOperation(long operationId) - => _watchdog.IsCurrentOperation("compaction", operationId); + => _watchdog.IsCurrentOperation(ProcessingWatchdog.Compaction, operationId); /// @@ -1600,7 +1607,7 @@ private void HandleToolCallResponse( var maxInlineToolResultChars = _config.Tuning.MaxInlineToolResultChars; var toolExecutionTimeout = _config.ToolExecutionTimeout; - _watchdog.Start("tool-execution", toolExecutionTimeout, Timers); + _watchdog.Start(ProcessingWatchdog.ToolExecution, toolExecutionTimeout, Timers); // Capture subscriber snapshot for subagent activity notifications. // These are emitted directly from the tool execution thread via Tell(), @@ -2408,7 +2415,6 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false) var self = Self; var client = _chatClient; - var timeout = _config.FirstTokenTimeout; var exposedTools = ResolveExposedToolsForCurrentTurn(); ChatOptions? options = null; @@ -2420,7 +2426,7 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false) }; } - _watchdog.Start("llm-call", timeout, Timers); + _watchdog.Start(ProcessingWatchdog.LlmCall, _config.PrefillTimeout, Timers); TurnLog().Info("turn_llm_call_start messages={MessageCount} toolsEnabled={ToolsEnabled} forceNoTools={ForceNoTools} callId={CallId}", messages.Count, @@ -2979,7 +2985,7 @@ private void EmitUsageOutput(UsageDetails usage) private void PauseToolExecutionWatchdogForApprovalWait(string callId) { - if (!string.Equals(_watchdog.CurrentOperationName, "tool-execution", StringComparison.Ordinal)) + if (!string.Equals(_watchdog.CurrentOperationName, ProcessingWatchdog.ToolExecution, StringComparison.Ordinal)) return; _watchdog.Stop(Timers); @@ -2997,7 +3003,7 @@ private void ResumeToolExecutionWatchdogAfterApprovalWait() if (_watchdog.CurrentOperationName is not null) return; - _watchdog.Start("tool-execution", _config.ToolExecutionTimeout, Timers); + _watchdog.Start(ProcessingWatchdog.ToolExecution, _config.ToolExecutionTimeout, Timers); _log.Info("Resumed tool-execution watchdog after approval response"); } diff --git a/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs b/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs index c6cc35d4..b3317ff3 100644 --- a/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs +++ b/src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs @@ -91,6 +91,11 @@ public static async Task StreamAsync( if (textDeltaCount == 1) { pendingTextDelta = text.Text; + self.Tell(new LlmResponseDeltaReceived + { + Content = EmptyTextContent, + CallId = callId + }); } else { @@ -114,6 +119,11 @@ public static async Task StreamAsync( if (thinkingDeltaCount == 1) { pendingThinkingDelta = thinking.Text; + self.Tell(new LlmResponseDeltaReceived + { + Content = EmptyTextContent, + CallId = callId + }); } else { diff --git a/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json b/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json index d88409bb..afb7cfdc 100644 --- a/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json +++ b/src/Netclaw.Configuration/Schemas/netclaw-config.v1.schema.json @@ -145,7 +145,14 @@ "minimum": 10, "maximum": 1800, "default": 600, - "description": "Maximum inactivity in seconds for LLM streaming calls. Used as both the initial wait for the first token and the silence threshold between deltas (resets on each delta)." + "description": "Maximum silence in seconds between consecutive LLM streaming deltas. After the first delta arrives, the watchdog uses this as the inter-delta timeout (timer resets on each delta)." + }, + "PrefillTimeoutSeconds": { + "type": "integer", + "minimum": 60, + "maximum": 7200, + "default": 1800, + "description": "Maximum time in seconds to wait for the first streaming delta from the LLM. Covers queue wait and prompt prefill on self-hosted backends. After the first delta, the watchdog switches to FirstTokenTimeoutSeconds." }, "CompactionThreshold": { "type": "number", diff --git a/src/Netclaw.Configuration/SessionConfig.cs b/src/Netclaw.Configuration/SessionConfig.cs index d9e8bd18..a67597fc 100644 --- a/src/Netclaw.Configuration/SessionConfig.cs +++ b/src/Netclaw.Configuration/SessionConfig.cs @@ -63,13 +63,22 @@ public sealed record SessionConfig public TimeSpan SidecarLlmTimeout { get; init; } = TimeSpan.FromSeconds(90); /// - /// Maximum inactivity timeout for LLM streaming calls. Used both as the - /// initial wait for the first token (prefill phase) and as the silence - /// threshold between consecutive deltas — the timer resets on every delta. + /// Maximum inactivity timeout between consecutive LLM streaming deltas. + /// The timer resets on every delta. Once the first delta arrives the watchdog + /// switches from to this tighter budget. /// Falls back to if not explicitly configured. /// public TimeSpan FirstTokenTimeout { get; init; } = TimeSpan.FromSeconds(600); + /// + /// Maximum time to wait for the first streaming delta from the LLM (covers + /// queue wait + prompt prefill). Generous default because self-hosted backends + /// can be legitimately silent for 10+ minutes during slot contention and cold + /// prefill of large contexts. After the first delta, the watchdog switches to + /// . + /// + public TimeSpan PrefillTimeout { get; init; } = TimeSpan.FromSeconds(1800); + /// /// Internal tuning constants. Bindable from config for development/testing /// but not part of the documented operator surface. @@ -99,6 +108,9 @@ public static SessionConfig BindFromConfiguration(IConfigurationSection section) FirstTokenTimeout = raw.FirstTokenTimeoutSeconds > 0 ? TimeSpan.FromSeconds(raw.FirstTokenTimeoutSeconds) : raw.TurnLlmTimeoutSeconds != 180 ? turnLlmTimeout : TimeSpan.FromSeconds(600), + PrefillTimeout = raw.PrefillTimeoutSeconds > 0 + ? TimeSpan.FromSeconds(raw.PrefillTimeoutSeconds) + : TimeSpan.FromSeconds(1800), Tuning = tuning, }; } @@ -150,5 +162,6 @@ private sealed record RawSessionConfig public int ToolExecutionTimeoutSeconds { get; init; } = 90; public int SidecarLlmTimeoutSeconds { get; init; } = 90; public int FirstTokenTimeoutSeconds { get; init; } + public int PrefillTimeoutSeconds { get; init; } } } diff --git a/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs b/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs index 0cb14497..07ddbffc 100644 --- a/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs +++ b/src/Netclaw.Daemon.Tests/Configuration/OpenAiCompatibleChatClientTests.cs @@ -60,7 +60,8 @@ public async Task StreamsReasoningAndTextDeltas_FromOfficialSpectrum() await foreach (var update in client.GetStreamingResponseAsync([new ChatMessage(ChatRole.User, "hello")], cancellationToken: TestContext.Current.CancellationToken)) updates.Add(update); - Assert.Equal(3, updates.Count); + Assert.Equal(4, updates.Count); + Assert.Contains(updates, u => u.Contents.Count == 0 && u.FinishReason is null); // keepalive from content-less initial chunk Assert.Contains(updates, u => u.Contents.OfType().Any(c => c.Text == "Thinking")); Assert.Contains(updates, u => u.Contents.OfType().Any(c => c.Text == "Hello")); Assert.Contains(updates, u => u.FinishReason == ChatFinishReason.Stop); diff --git a/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs b/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs index 2339e69d..af6a701f 100644 --- a/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs +++ b/src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs @@ -74,9 +74,17 @@ public async IAsyncEnumerable GetStreamingResponseAsync( if (line is null) break; - if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:", StringComparison.Ordinal)) + if (string.IsNullOrWhiteSpace(line)) continue; + if (!line.StartsWith("data:", StringComparison.Ordinal)) + { + // SSE comment lines (`:` prefix) and event-type lines — yield keepalive + // so the watchdog knows the connection is alive during prefill/queuing. + yield return KeepaliveUpdate; + continue; + } + var ssePayload = line[5..].Trim(); if (ssePayload == "[DONE]") break; @@ -173,6 +181,9 @@ private JsonObject BuildPayload(IEnumerable messages, ChatOptions? if (stream) { body["stream_options"] = new JsonObject { ["include_usage"] = true }; + // llama-server sends prefill progress as SSE data events when enabled. + // Harmless on servers that don't support it (unknown fields are ignored). + body["return_progress"] = true; } if (options?.Temperature is { } temperature) @@ -576,7 +587,12 @@ private static IEnumerable ParseStreamingUpdates(JsonElement contents.Add(new UsageContent(usage)); if (contents.Count == 0 && finishReason is null) + { + // Content-less data events (e.g. prompt_progress during prefill) — yield + // keepalive so the watchdog timer resets while the server is working. + yield return KeepaliveUpdate; yield break; + } yield return new ChatResponseUpdate(ChatRole.Assistant, contents) {