Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ protected override void ConfigureSessionServices(IServiceCollection services)
});
services.AddSingleton(new SessionConfig
{
PrefillTimeout = TimeSpan.FromSeconds(2),
FirstTokenTimeout = TimeSpan.FromSeconds(2),
ToolExecutionTimeout = TimeSpan.FromSeconds(10),
SidecarLlmTimeout = TimeSpan.FromSeconds(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ protected override void ConfigureSessionServices(IServiceCollection services)
});
services.AddSingleton(new SessionConfig
{
PrefillTimeout = TimeSpan.FromSeconds(1),
FirstTokenTimeout = TimeSpan.FromSeconds(1),
ToolExecutionTimeout = TimeSpan.FromSeconds(1),
SidecarLlmTimeout = TimeSpan.FromSeconds(1),
Expand Down
18 changes: 16 additions & 2 deletions src/Netclaw.Actors/Sessions/Handlers/ProcessingWatchdog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ namespace Netclaw.Actors.Sessions.Handlers;
/// </summary>
internal sealed class ProcessingWatchdog
{
public const string LlmCall = "llm-call";
public const string ToolExecution = "tool-execution";
public const string Compaction = "compaction";

private static readonly object TimerKey = new();
private long _operationId;
private string? _operationName;
Expand Down Expand Up @@ -47,13 +51,23 @@ public void Stop(ITimerScheduler timers)
_operationName = null;
}

/// <summary>
/// Switch from the generous prefill budget to the tighter inter-delta timeout.
/// Called once when the first real streaming delta arrives.
/// </summary>
public void Promote(TimeSpan interDeltaTimeout, ITimerScheduler timers)
=> RestartLlmTimer(interDeltaTimeout, timers);

/// <summary>
/// Refresh the watchdog timer for an active LLM call (streaming keepalive).
/// Only refreshes if the current operation is "llm-call".
/// Only refreshes if the current operation is <see cref="LlmCall"/>.
/// </summary>
public void Refresh(TimeSpan timeout, ITimerScheduler timers)
=> RestartLlmTimer(timeout, timers);

private void RestartLlmTimer(TimeSpan timeout, ITimerScheduler timers)
{
if (_operationName is not "llm-call")
if (_operationName is not LlmCall)
return;

timers.StartSingleTimer(
Expand Down
28 changes: 17 additions & 11 deletions src/Netclaw.Actors/Sessions/LlmSessionActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,15 @@ private void Processing()
Command<LlmResponseDeltaReceived>(msg =>
{
if (msg.CallId != _activeCallId) return; // stale delta from cancelled call
_anyContentStreamed = true;
_watchdog.Refresh(_config.FirstTokenTimeout, Timers);
if (!_anyContentStreamed)
{
_anyContentStreamed = true;
_watchdog.Promote(_config.FirstTokenTimeout, Timers);
}
else
{
_watchdog.Refresh(_config.FirstTokenTimeout, Timers);
}

switch (msg.Content)
{
Expand Down Expand Up @@ -908,8 +915,8 @@ await _approvalService.RecordApprovalAsync(

var timeout = msg.OperationName switch
{
"tool-execution" => _config.ToolExecutionTimeout,
"llm-call" => _config.FirstTokenTimeout,
ProcessingWatchdog.ToolExecution => _config.ToolExecutionTimeout,
ProcessingWatchdog.LlmCall => _config.FirstTokenTimeout,
_ => _config.TurnLlmTimeout
};

Expand Down Expand Up @@ -1097,7 +1104,7 @@ private void Compacting()
Command<CompactionTriggered>(msg =>
{
var timeout = GetCompactionTimeout();
_watchdog.Start("compaction", timeout, Timers);
_watchdog.Start(ProcessingWatchdog.Compaction, timeout, Timers);

var operationId = _watchdog.CurrentOperationId;
var stateSnapshot = _state;
Expand Down Expand Up @@ -1443,7 +1450,7 @@ private void CommandSessionContextMessages()
}

private bool IsCurrentCompactionOperation(long operationId)
=> _watchdog.IsCurrentOperation("compaction", operationId);
=> _watchdog.IsCurrentOperation(ProcessingWatchdog.Compaction, operationId);


/// <summary>
Expand Down Expand Up @@ -1600,7 +1607,7 @@ private void HandleToolCallResponse(
var maxInlineToolResultChars = _config.Tuning.MaxInlineToolResultChars;
var toolExecutionTimeout = _config.ToolExecutionTimeout;

_watchdog.Start("tool-execution", toolExecutionTimeout, Timers);
_watchdog.Start(ProcessingWatchdog.ToolExecution, toolExecutionTimeout, Timers);

// Capture subscriber snapshot for subagent activity notifications.
// These are emitted directly from the tool execution thread via Tell(),
Expand Down Expand Up @@ -2408,7 +2415,6 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false)

var self = Self;
var client = _chatClient;
var timeout = _config.FirstTokenTimeout;

var exposedTools = ResolveExposedToolsForCurrentTurn();
ChatOptions? options = null;
Expand All @@ -2420,7 +2426,7 @@ private void FireLlmCall(string? recallQuery = null, bool forceNoTools = false)
};
}

_watchdog.Start("llm-call", timeout, Timers);
_watchdog.Start(ProcessingWatchdog.LlmCall, _config.PrefillTimeout, Timers);

TurnLog().Info("turn_llm_call_start messages={MessageCount} toolsEnabled={ToolsEnabled} forceNoTools={ForceNoTools} callId={CallId}",
messages.Count,
Expand Down Expand Up @@ -2979,7 +2985,7 @@ private void EmitUsageOutput(UsageDetails usage)

private void PauseToolExecutionWatchdogForApprovalWait(string callId)
{
if (!string.Equals(_watchdog.CurrentOperationName, "tool-execution", StringComparison.Ordinal))
if (!string.Equals(_watchdog.CurrentOperationName, ProcessingWatchdog.ToolExecution, StringComparison.Ordinal))
return;

_watchdog.Stop(Timers);
Expand All @@ -2997,7 +3003,7 @@ private void ResumeToolExecutionWatchdogAfterApprovalWait()
if (_watchdog.CurrentOperationName is not null)
return;

_watchdog.Start("tool-execution", _config.ToolExecutionTimeout, Timers);
_watchdog.Start(ProcessingWatchdog.ToolExecution, _config.ToolExecutionTimeout, Timers);
_log.Info("Resumed tool-execution watchdog after approval response");
}

Expand Down
10 changes: 10 additions & 0 deletions src/Netclaw.Actors/Sessions/Pipelines/SessionLlmInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public static async Task<LlmResponseReceived> StreamAsync(
if (textDeltaCount == 1)
{
pendingTextDelta = text.Text;
self.Tell(new LlmResponseDeltaReceived
{
Content = EmptyTextContent,
CallId = callId
});
}
else
{
Expand All @@ -114,6 +119,11 @@ public static async Task<LlmResponseReceived> StreamAsync(
if (thinkingDeltaCount == 1)
{
pendingThinkingDelta = thinking.Text;
self.Tell(new LlmResponseDeltaReceived
{
Content = EmptyTextContent,
CallId = callId
});
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,14 @@
"minimum": 10,
"maximum": 1800,
"default": 600,
"description": "Maximum inactivity in seconds for LLM streaming calls. Used as both the initial wait for the first token and the silence threshold between deltas (resets on each delta)."
"description": "Maximum silence in seconds between consecutive LLM streaming deltas. After the first delta arrives, the watchdog uses this as the inter-delta timeout (timer resets on each delta)."
},
"PrefillTimeoutSeconds": {
"type": "integer",
"minimum": 60,
"maximum": 7200,
"default": 1800,
"description": "Maximum time in seconds to wait for the first streaming delta from the LLM. Covers queue wait and prompt prefill on self-hosted backends. After the first delta, the watchdog switches to FirstTokenTimeoutSeconds."
},
"CompactionThreshold": {
"type": "number",
Expand Down
19 changes: 16 additions & 3 deletions src/Netclaw.Configuration/SessionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,22 @@ public sealed record SessionConfig
public TimeSpan SidecarLlmTimeout { get; init; } = TimeSpan.FromSeconds(90);

/// <summary>
/// Maximum inactivity timeout for LLM streaming calls. Used both as the
/// initial wait for the first token (prefill phase) and as the silence
/// threshold between consecutive deltas — the timer resets on every delta.
/// Maximum inactivity timeout between consecutive LLM streaming deltas.
/// The timer resets on every delta. Once the first delta arrives the watchdog
/// switches from <see cref="PrefillTimeout"/> to this tighter budget.
/// Falls back to <see cref="TurnLlmTimeout"/> if not explicitly configured.
/// </summary>
public TimeSpan FirstTokenTimeout { get; init; } = TimeSpan.FromSeconds(600);

/// <summary>
/// Maximum time to wait for the first streaming delta from the LLM (covers
/// queue wait + prompt prefill). Generous default because self-hosted backends
/// can be legitimately silent for 10+ minutes during slot contention and cold
/// prefill of large contexts. After the first delta, the watchdog switches to
/// <see cref="FirstTokenTimeout"/>.
/// </summary>
public TimeSpan PrefillTimeout { get; init; } = TimeSpan.FromSeconds(1800);

/// <summary>
/// Internal tuning constants. Bindable from config for development/testing
/// but not part of the documented operator surface.
Expand Down Expand Up @@ -99,6 +108,9 @@ public static SessionConfig BindFromConfiguration(IConfigurationSection section)
FirstTokenTimeout = raw.FirstTokenTimeoutSeconds > 0
? TimeSpan.FromSeconds(raw.FirstTokenTimeoutSeconds)
: raw.TurnLlmTimeoutSeconds != 180 ? turnLlmTimeout : TimeSpan.FromSeconds(600),
PrefillTimeout = raw.PrefillTimeoutSeconds > 0
? TimeSpan.FromSeconds(raw.PrefillTimeoutSeconds)
: TimeSpan.FromSeconds(1800),
Tuning = tuning,
};
}
Expand Down Expand Up @@ -150,5 +162,6 @@ private sealed record RawSessionConfig
public int ToolExecutionTimeoutSeconds { get; init; } = 90;
public int SidecarLlmTimeoutSeconds { get; init; } = 90;
public int FirstTokenTimeoutSeconds { get; init; }
public int PrefillTimeoutSeconds { get; init; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public async Task StreamsReasoningAndTextDeltas_FromOfficialSpectrum()
await foreach (var update in client.GetStreamingResponseAsync([new ChatMessage(ChatRole.User, "hello")], cancellationToken: TestContext.Current.CancellationToken))
updates.Add(update);

Assert.Equal(3, updates.Count);
Assert.Equal(4, updates.Count);
Assert.Contains(updates, u => u.Contents.Count == 0 && u.FinishReason is null); // keepalive from content-less initial chunk
Assert.Contains(updates, u => u.Contents.OfType<TextReasoningContent>().Any(c => c.Text == "Thinking"));
Assert.Contains(updates, u => u.Contents.OfType<TextContent>().Any(c => c.Text == "Hello"));
Assert.Contains(updates, u => u.FinishReason == ChatFinishReason.Stop);
Expand Down
18 changes: 17 additions & 1 deletion src/Netclaw.Providers/SelfHosted/OpenAiCompatibleChatClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,17 @@ public async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
if (line is null)
break;

if (string.IsNullOrWhiteSpace(line) || !line.StartsWith("data:", StringComparison.Ordinal))
if (string.IsNullOrWhiteSpace(line))
continue;

if (!line.StartsWith("data:", StringComparison.Ordinal))
{
// SSE comment lines (`:` prefix) and event-type lines — yield keepalive
// so the watchdog knows the connection is alive during prefill/queuing.
yield return KeepaliveUpdate;
continue;
}

var ssePayload = line[5..].Trim();
if (ssePayload == "[DONE]")
break;
Expand Down Expand Up @@ -173,6 +181,9 @@ private JsonObject BuildPayload(IEnumerable<ChatMessage> messages, ChatOptions?
if (stream)
{
body["stream_options"] = new JsonObject { ["include_usage"] = true };
// llama-server sends prefill progress as SSE data events when enabled.
// Harmless on servers that don't support it (unknown fields are ignored).
body["return_progress"] = true;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encourages llama-server to send us progress updates during prefill et al

}

if (options?.Temperature is { } temperature)
Expand Down Expand Up @@ -576,7 +587,12 @@ private static IEnumerable<ChatResponseUpdate> ParseStreamingUpdates(JsonElement
contents.Add(new UsageContent(usage));

if (contents.Count == 0 && finishReason is null)
{
// Content-less data events (e.g. prompt_progress during prefill) — yield
// keepalive so the watchdog timer resets while the server is working.
yield return KeepaliveUpdate;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should prevent netclaw's watchdog from aggressively nuking sessions when the model is sending back progress reports, thinking-only updates, etc.

yield break;
}

yield return new ChatResponseUpdate(ChatRole.Assistant, contents)
{
Expand Down
Loading