diff --git a/src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs b/src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs new file mode 100644 index 00000000..bbc2acf1 --- /dev/null +++ b/src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs @@ -0,0 +1,203 @@ +using Netclaw.Actors.Channels; +using Netclaw.Actors.Protocol; +using Netclaw.Configuration; +using Netclaw.Tools; +using Xunit; + +namespace Netclaw.Actors.Tests.Channels; + +public sealed class ExecutionOutputAccumulatorTests +{ + private static readonly SessionId TestSessionId = new("test/session"); + private static readonly ToolName TestNotifyTool = new("send_slack_message"); + + [Fact] + public void TextDeltaOutput_accumulates_text() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + var action1 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "Hello " }); + var action2 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "world" }); + + Assert.Equal(OutputAction.Continue, action1); + Assert.Equal(OutputAction.Continue, action2); + Assert.Equal("Hello world", acc.GetAccumulatedText()); + } + + [Fact] + public void TextOutput_accumulates_when_no_prior_delta() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "Full text" }); + + Assert.Equal("Full text", acc.GetAccumulatedText()); + } + + [Fact] + public void TextOutput_ignored_after_TextDeltaOutput() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "streamed" }); + acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "assembled" }); + + Assert.Equal("streamed", acc.GetAccumulatedText()); + } + + [Fact] + public void TurnCompleted_returns_TurnCompleted_action() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + var action = acc.ProcessOutput(new TurnCompleted { SessionId = TestSessionId, TurnNumber = 1 }); + + Assert.Equal(OutputAction.TurnCompleted, action); + } + + [Fact] + public void ErrorOutput_returns_Error_action_and_stores_details() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + var cause = new InvalidOperationException("inner"); + + var action = acc.ProcessOutput(new ErrorOutput + { + SessionId = TestSessionId, + Message = "Something failed", + Category = ErrorCategory.ProviderFailure, + Cause = cause + }); + + Assert.Equal(OutputAction.Error, action); + Assert.Equal("Something failed", acc.LastErrorMessage); + Assert.Equal(ErrorCategory.ProviderFailure, acc.LastErrorCategory); + Assert.Same(cause, acc.LastErrorCause); + } + + [Fact] + public void BufferFlush_returns_Continue() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + var action = acc.ProcessOutput(new BufferFlush { SessionId = TestSessionId }); + + Assert.Equal(OutputAction.Continue, action); + } + + [Fact] + public void Tracks_successful_notification_tool_result() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + acc.ProcessOutput(new ToolResultOutput + { + SessionId = TestSessionId, + CallId = "call-1", + ToolName = "send_slack_message", + Result = "Message sent to channel C1." + }); + + Assert.True(acc.NotifyAttempted); + Assert.False(acc.NotifyFailed); + } + + [Fact] + public void Tracks_failed_notification_tool_result() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + acc.ProcessOutput(new ToolResultOutput + { + SessionId = TestSessionId, + CallId = "call-2", + ToolName = "send_slack_message", + Result = "Error: channel not found" + }); + + Assert.True(acc.NotifyAttempted); + Assert.True(acc.NotifyFailed); + } + + [Fact] + public void Ignores_non_notification_tool_results() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + acc.ProcessOutput(new ToolResultOutput + { + SessionId = TestSessionId, + CallId = "call-3", + ToolName = "web_search", + Result = "Found 5 results" + }); + + Assert.False(acc.NotifyAttempted); + } + + // ── BuildNotifyFailureMessage tests ────────────────────────────────────── + + [Fact] + public void No_failure_when_no_notify_instructions() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: false, NotificationPolicy.Required); + + Assert.Null(result); + } + + [Fact] + public void Required_policy_fails_when_no_notification_sent() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required); + + Assert.Contains("no notification tool was invoked", result); + } + + [Fact] + public void Conditional_policy_succeeds_when_no_notification_sent() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + + var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional); + + Assert.Null(result); + } + + [Fact] + public void Succeeds_when_notification_attempted_and_succeeded() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + acc.ProcessOutput(new ToolResultOutput + { + SessionId = TestSessionId, + CallId = "call-ok", + ToolName = "send_slack_message", + Result = "Message sent." + }); + + var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required); + + Assert.Null(result); + } + + [Fact] + public void Fails_when_notification_attempted_and_errored() + { + var acc = new ExecutionOutputAccumulator(TestNotifyTool); + acc.ProcessOutput(new ToolResultOutput + { + SessionId = TestSessionId, + CallId = "call-err", + ToolName = "send_slack_message", + Result = "Error: channel not found" + }); + + var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional); + + Assert.Contains("channel not found", result); + } +} diff --git a/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs b/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs index 028a0a20..d11a7b10 100644 --- a/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs +++ b/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs @@ -1,4 +1,5 @@ using Akka.Actor; +using Akka.Configuration; using Akka.Hosting; using Akka.Hosting.TestKit; using Microsoft.Extensions.DependencyInjection; @@ -15,6 +16,12 @@ namespace Netclaw.Actors.Tests.Channels; public sealed class SlackActorHierarchyTests(ITestOutputHelper output) : TestKit(output: output) { + // Raise the default ExpectMsg timeout from 3s to 5s to prevent flaky failures + // under CI ThreadPool pressure (multiple test classes spin up parallel IHost + + // ActorSystem instances, competing for ThreadPool threads). + protected override Config? Config => + ConfigurationFactory.ParseString("akka.test.default-timeout = 5s"); + protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services) { } diff --git a/src/Netclaw.Actors.Tests/Channels/TestHelpers/FailingSessionPipeline.cs b/src/Netclaw.Actors.Tests/Channels/TestHelpers/FailingSessionPipeline.cs new file mode 100644 index 00000000..dfac604c --- /dev/null +++ b/src/Netclaw.Actors.Tests/Channels/TestHelpers/FailingSessionPipeline.cs @@ -0,0 +1,23 @@ +using Akka.Streams; +using Netclaw.Actors.Channels; +using Netclaw.Actors.Protocol; +using Netclaw.Configuration; + +namespace Netclaw.Actors.Tests.Channels.TestHelpers; + +/// +/// Fake that throws a pre-configured exception +/// on . Used to test initialization failure paths. +/// +internal sealed class FailingSessionPipeline(Exception exception) : ISessionPipeline +{ + public Task CreateAsync( + SessionId sessionId, + SessionPipelineOptions options, + IMaterializer? materializer = null, + CancellationToken cancellationToken = default) => + throw exception; + + public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) => + Task.CompletedTask; +} diff --git a/src/Netclaw.Actors.Tests/Channels/TestHelpers/ScriptedSessionPipeline.cs b/src/Netclaw.Actors.Tests/Channels/TestHelpers/ScriptedSessionPipeline.cs new file mode 100644 index 00000000..61efad3a --- /dev/null +++ b/src/Netclaw.Actors.Tests/Channels/TestHelpers/ScriptedSessionPipeline.cs @@ -0,0 +1,41 @@ +using Akka; +using Akka.Streams; +using Akka.Streams.Dsl; +using Netclaw.Actors.Channels; +using Netclaw.Actors.Protocol; +using Netclaw.Configuration; + +namespace Netclaw.Actors.Tests.Channels.TestHelpers; + +/// +/// Fake that replays a scripted sequence of +/// events. Input is discarded. Used by +/// execution actor tests (reminders, webhooks) and pipeline handle tests. +/// +internal sealed class ScriptedSessionPipeline( + Func> outputFactory) : ISessionPipeline +{ + public SessionPipelineOptions? CapturedOptions { get; private set; } + + public Task CreateAsync( + SessionId sessionId, + SessionPipelineOptions options, + IMaterializer? materializer = null, + CancellationToken cancellationToken = default) + { + CapturedOptions = options; + + var killSwitch = KillSwitches.Shared($"scripted-{sessionId.Value}"); + + var input = Sink.Ignore() + .MapMaterializedValue(_ => NotUsed.Instance); + + var output = Source.From(outputFactory(sessionId).ToList()) + .Via(killSwitch.Flow()); + + return Task.FromResult(new MaterializedSession(input, output, killSwitch)); + } + + public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) => + Task.CompletedTask; +} diff --git a/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs b/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs index f9023e31..494ce98b 100644 --- a/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs +++ b/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs @@ -1,4 +1,3 @@ -using System.Runtime.CompilerServices; using Akka.Actor; using Akka.Hosting; using Akka.Hosting.TestKit; @@ -178,18 +177,17 @@ public Task GetResponseAsync( CancellationToken cancellationToken = default) => Task.FromException(new NotSupportedException("Streaming path only.")); - public async IAsyncEnumerable GetStreamingResponseAsync( + public IAsyncEnumerable GetStreamingResponseAsync( IEnumerable messages, ChatOptions? options = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - await Task.Yield(); - // GetException() returns non-null, but the compiler cannot prove it, - // so the null check keeps yield break reachable and avoids CS0162. - var ex = GetException(); - if (ex is not null) throw ex; - yield break; - } + CancellationToken cancellationToken = default) + // Throw synchronously so the LlmCallFailed message is enqueued in + // the actor's mailbox before the actor finishes processing the + // SendUserMessage. Using Task.Yield() here defers the exception to + // the thread pool, creating a race where LlmCallFailed delivery + // depends on thread pool scheduling and can exceed ExpectMsgAsync + // timeouts under load. + => throw GetException(); private Exception GetException() => ThrowTimeout ? (Exception)new TimeoutException("Simulated provider timeout") diff --git a/src/Netclaw.Actors/Channels/ChannelPipeline.cs b/src/Netclaw.Actors/Channels/ChannelPipeline.cs index b4320e8d..ab7f38cc 100644 --- a/src/Netclaw.Actors/Channels/ChannelPipeline.cs +++ b/src/Netclaw.Actors/Channels/ChannelPipeline.cs @@ -302,7 +302,7 @@ private static SendUserMessage MapToCommand( NetclawPaths paths) { var turnId = string.IsNullOrWhiteSpace(input.MessageId) - ? Guid.NewGuid().ToString("N")[..8] + ? IdGen.ShortId() : input.MessageId!; var textParts = input.Contents.OfType().Select(t => t.Text); diff --git a/src/Netclaw.Actors/Channels/ExecutionOutputAccumulator.cs b/src/Netclaw.Actors/Channels/ExecutionOutputAccumulator.cs new file mode 100644 index 00000000..065979a0 --- /dev/null +++ b/src/Netclaw.Actors/Channels/ExecutionOutputAccumulator.cs @@ -0,0 +1,173 @@ +using System.Text; +using Netclaw.Actors.Protocol; +using Netclaw.Configuration; +using Netclaw.Tools; + +namespace Netclaw.Actors.Channels; + +/// +/// What the caller should do after processing an output via +/// . +/// +public enum OutputAction +{ + /// Output was accumulated; no caller action needed. + Continue, + + /// Turn completed; caller should finalize and stop. + TurnCompleted, + + /// Error received; caller should report failure and stop. + Error +} + +/// +/// Tracks output accumulation and notification result state for fire-and-forget +/// execution actors (reminders, webhooks). Pure C# �� no Akka dependency. +/// +public sealed class ExecutionOutputAccumulator +{ + private readonly ToolName _notificationToolName; + private readonly Action? _onNotifyTracked; + private readonly StringBuilder _buffer = new(); + private bool _sawTextDelta; + private bool _notifyAttempted; + private bool _notifyFailed; + private string? _notifyFailureDetail; + + /// + /// Creates an accumulator. + /// + /// + /// The tool whose results are tracked for notification success/failure. + /// The accumulator has no channel knowledge — callers supply the tool they care about. + /// + /// + /// Optional callback invoked when a matching tool result is processed. + /// Parameters: (toolName, callId, succeeded). + /// + public ExecutionOutputAccumulator( + ToolName notificationToolName, + Action? onNotifyTracked = null) + { + _notificationToolName = notificationToolName; + _onNotifyTracked = onNotifyTracked; + } + + /// + /// The error message from the most recent , + /// or null if no error has been received. + /// + public string? LastErrorMessage { get; private set; } + + /// + /// The underlying exception from the most recent , + /// or null if no error has been received or the error had no cause. + /// + public Exception? LastErrorCause { get; private set; } + + /// + /// The of the most recent error, if any. + /// + public ErrorCategory? LastErrorCategory { get; private set; } + + /// + /// Returns the accumulated text output, trimmed. + /// + public string GetAccumulatedText() => _buffer.ToString().Trim(); + + /// + /// Whether a notification tool was invoked during this execution. + /// + public bool NotifyAttempted => _notifyAttempted; + + /// + /// Whether the most recent notification attempt failed. + /// + public bool NotifyFailed => _notifyFailed; + + /// + /// Processes a and returns the action the caller should take. + /// + public OutputAction ProcessOutput(SessionOutput output) + { + switch (output) + { + case TextDeltaOutput delta: + _buffer.Append(delta.Delta); + _sawTextDelta = true; + return OutputAction.Continue; + + case TextOutput text: + if (!_sawTextDelta) + _buffer.Append(text.Text); + return OutputAction.Continue; + + case ToolResultOutput toolResult: + TrackNotificationResult(toolResult); + return OutputAction.Continue; + + case BufferFlush: + return OutputAction.Continue; + + case TurnCompleted: + return OutputAction.TurnCompleted; + + case ErrorOutput err: + LastErrorMessage = err.Message; + LastErrorCause = err.Cause; + LastErrorCategory = err.Category; + return OutputAction.Error; + + default: + return OutputAction.Continue; + } + } + + /// + /// Evaluates notification policy to determine if the execution should be + /// considered a failure due to notification issues. Returns null on + /// success, or an error message string describing the notification failure. + /// + /// Whether notification instructions were configured. + /// The notification policy for this execution. + public string? BuildNotifyFailureMessage(bool hasNotifyInstructions, NotificationPolicy notifyPolicy) + { + if (!hasNotifyInstructions) + return null; + + if (!_notifyAttempted) + { + if (notifyPolicy == NotificationPolicy.Conditional) + return null; + + return "Notification instructions were provided but no notification tool was invoked."; + } + + if (_notifyFailed) + return _notifyFailureDetail ?? "Notification tool returned an unspecified error."; + + return null; + } + + private void TrackNotificationResult(ToolResultOutput toolResult) + { + if (!string.Equals(toolResult.ToolName, _notificationToolName.Value, StringComparison.Ordinal)) + return; + + _notifyAttempted = true; + + var result = toolResult.Result?.Trim() ?? string.Empty; + if (result.StartsWith("Error", StringComparison.OrdinalIgnoreCase)) + { + _notifyFailed = true; + _notifyFailureDetail = result; + _onNotifyTracked?.Invoke(toolResult.ToolName, toolResult.CallId, false); + return; + } + + _notifyFailed = false; + _notifyFailureDetail = null; + _onNotifyTracked?.Invoke(toolResult.ToolName, toolResult.CallId, true); + } +} diff --git a/src/Netclaw.Actors/Channels/SessionPipelineHandle.cs b/src/Netclaw.Actors/Channels/SessionPipelineHandle.cs new file mode 100644 index 00000000..b59c75a1 --- /dev/null +++ b/src/Netclaw.Actors/Channels/SessionPipelineHandle.cs @@ -0,0 +1,221 @@ +using System.Threading.Channels; +using Akka.Actor; +using Akka.Event; +using Akka.Streams; +using Akka.Streams.Dsl; +using Netclaw.Actors.Protocol; +using Netclaw.Configuration; + +namespace Netclaw.Actors.Channels; + +/// +/// Manages the lifecycle of a materialized session pipeline on behalf of an +/// owning actor. Not thread-safe — designed for use within a single actor context +/// (no concurrent access). Supports two modes: +/// +/// Long-lived (with reinitialization) for binding actors (Slack, SignalR) +/// Short-lived (fire-and-forget) for execution actors (Reminders, Webhooks) +/// +/// +public sealed class SessionPipelineHandle +{ + private readonly ISessionPipeline _pipeline; + private readonly ILoggingAdapter _log; + private readonly string _materializerNamePrefix; + + private ActorMaterializer? _materializer; + private MaterializedSession? _session; + private ChannelWriter? _inputQueue; + private int _pipelineGeneration; + private bool _isReinitializing; + + // Stored from first InitializeWithChannelAsync for reinit + private IActorContext? _storedContext; + private SessionId? _storedSessionId; + private SessionPipelineOptions? _storedOptions; + private Action? _storedOnOutput; + private Action? _storedOnStreamTerminated; + + public SessionPipelineHandle( + ISessionPipeline pipeline, + ILoggingAdapter log, + string materializerNamePrefix) + { + _pipeline = pipeline; + _log = log; + _materializerNamePrefix = materializerNamePrefix; + } + + /// The current pipeline generation, for OutputStreamTerminated filtering. + public int Generation => _pipelineGeneration; + + /// The for long-lived actors to write input. + /// Null if not initialized or if initialized via queue mode. + public ChannelWriter? InputQueue => _inputQueue; + + /// Whether the handle has been initialized (session is not null). + public bool IsInitialized => _session is not null; + + /// + /// Idempotent pipeline creation for long-lived actors that use + /// for ongoing input. Stores all parameters for use by . + /// + public async Task> InitializeWithChannelAsync( + IActorContext context, + SessionId sessionId, + SessionPipelineOptions options, + Action onOutput, + Action onStreamTerminated, + CancellationToken cancellationToken = default) + { + if (_session is not null) + return _inputQueue!; + + // Store for reinit + _storedContext = context; + _storedSessionId = sessionId; + _storedOptions = options; + _storedOnOutput = onOutput; + _storedOnStreamTerminated = onStreamTerminated; + + _log.Info("Initializing {0} session pipeline", _materializerNamePrefix); + + _materializer = context.Materializer(namePrefix: _materializerNamePrefix); + + var materialized = await _pipeline.CreateAsync( + sessionId, options, + materializer: _materializer, + cancellationToken: cancellationToken); + + var inputQueue = Source.Channel(512, true) + .ToMaterialized(materialized.Input, Keep.Left) + .Run(_materializer); + + var generation = ++_pipelineGeneration; + var outputCompletion = materialized.Output + .ToMaterialized( + Sink.ForEach(onOutput), + Keep.Right) + .Run(_materializer); + + _ = outputCompletion.ContinueWith(t => + { + var cause = t.IsFaulted ? t.Exception?.GetBaseException() : null; + onStreamTerminated(generation, cause); + }, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + + _session = materialized; + _inputQueue = inputQueue; + + _log.Info("{0} session pipeline initialized", _materializerNamePrefix); + return inputQueue; + } + + /// + /// Pipeline creation for fire-and-forget execution actors that use + /// , offer input once, and complete. + /// Does not wire stream-terminated detection (the actor stops on TurnCompleted/ErrorOutput). + /// + public async Task> InitializeWithQueueAsync( + IActorContext context, + SessionId sessionId, + SessionPipelineOptions options, + Action onOutput, + CancellationToken cancellationToken = default) + { + _log.Info("Initializing {0} execution pipeline", _materializerNamePrefix); + + _materializer = context.Materializer(namePrefix: _materializerNamePrefix); + + var materialized = await _pipeline.CreateAsync( + sessionId, options, + materializer: _materializer, + cancellationToken: cancellationToken); + + var inputQueue = Source.Queue(8, OverflowStrategy.Backpressure) + .ToMaterialized(materialized.Input, Keep.Left) + .Run(_materializer); + + materialized.Output + .To(Sink.ForEach(onOutput)) + .Run(_materializer); + + _session = materialized; + + _log.Info("{0} execution pipeline initialized", _materializerNamePrefix); + return inputQueue; + } + + /// + /// Tears down the current pipeline and re-initializes using the parameters + /// stored from the original call. + /// Guards against concurrent reinitialization. On failure, invokes + /// . + /// + public async Task ReinitializeAsync(string reason, Action onReinitFailed) + { + if (_isReinitializing) + return; + + if (_storedContext is null || _storedSessionId is null || _storedOptions is null + || _storedOnOutput is null || _storedOnStreamTerminated is null) + { + _log.Warning("Cannot reinitialize {0} pipeline: not yet initialized via channel mode", _materializerNamePrefix); + return; + } + + _isReinitializing = true; + try + { + _log.Warning("Reinitializing {0} session pipeline: {1}", _materializerNamePrefix, reason); + + _inputQueue?.TryComplete(); + _inputQueue = null; + + if (_session is not null) + { + await _session.DisposeAsync(); + _session = null; + } + + _materializer?.Dispose(); + _materializer = null; + + await InitializeWithChannelAsync( + _storedContext, _storedSessionId.Value, _storedOptions, + _storedOnOutput, _storedOnStreamTerminated); + } + catch (Exception ex) + { + _log.Error(ex, "{0} pipeline reinitialization failed; scheduling retry", _materializerNamePrefix); + onReinitFailed(); + } + finally + { + _isReinitializing = false; + } + } + + /// + /// Synchronous cleanup for actor PostStop. Completes input queue, + /// disposes session and materializer. + /// + public void Dispose() + { + _inputQueue?.TryComplete(); + + try + { + _session?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + catch (Exception ex) + { + _log.Debug(ex, "Failed to dispose {0} session during cleanup", _materializerNamePrefix); + } + + _materializer?.Dispose(); + } +} diff --git a/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs b/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs index 85afbeaf..8226c80b 100644 --- a/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs +++ b/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs @@ -1,15 +1,13 @@ -using System.Text; using Akka.Actor; using Akka.Event; using Akka.Hosting; using Akka.Reminders; -using Akka.Streams; -using Akka.Streams.Dsl; using Microsoft.Extensions.AI; using Netclaw.Actors.Channels; using Netclaw.Actors.Hosting; using Netclaw.Actors.Protocol; using Netclaw.Configuration; +using Netclaw.Tools; namespace Netclaw.Actors.Reminders; @@ -28,7 +26,6 @@ internal sealed class ReminderExecutionActor : ReceiveActor private readonly Guid _executionId; private readonly ReminderDefinition _definition; - private readonly ISessionPipeline _pipeline; private readonly ReminderHistoryStore _historyStore; private readonly TimeProvider _timeProvider; private readonly ReminderEnvelope? _envelope; @@ -36,18 +33,12 @@ internal sealed class ReminderExecutionActor : ReceiveActor private readonly DateTimeOffset _dispatchedAt; private IReminderClient? _reminderClient; - private readonly StringBuilder _buffer = new(); - private bool _sawTextDelta; + private readonly SessionPipelineHandle _handle; + private readonly ExecutionOutputAccumulator _accumulator; private bool _completed; - private bool _notifyAttempted; - private bool _notifyFailed; - private string? _notifyFailureDetail; private string? _sessionIdValue; private HistoryRecord? _pendingHistory; - private ActorMaterializer? _materializer; - private MaterializedSession? _session; - private bool IsModeB => _envelope is not null; public static Props CreateProps( @@ -69,12 +60,21 @@ public ReminderExecutionActor( { _executionId = executionId; _definition = definition; - _pipeline = pipeline; _historyStore = historyStore; _timeProvider = timeProvider; _envelope = envelope; _dispatchedAt = timeProvider.GetUtcNow(); _log = Context.GetLogger(); + _handle = new SessionPipelineHandle(pipeline, _log, "reminder-exec"); + _accumulator = new ExecutionOutputAccumulator(new ToolName("send_slack_message"), (tool, callId, succeeded) => + { + if (succeeded) + _log.Info("ReminderExecution NotifySucceeded: execution_id={0} reminder_id={1} tool={2} call_id={3}", + _executionId, _definition.Id, tool, callId); + else + _log.Warning("ReminderExecution NotifyFailed: execution_id={0} reminder_id={1} tool={2} call_id={3}", + _executionId, _definition.Id, tool, callId); + }); Context.SetReceiveTimeout(TimeSpan.FromSeconds(ExecutionTimeoutSeconds)); @@ -119,34 +119,25 @@ private async Task InitializeAsync() _log.Info( $"ReminderExecution Initialized: execution_id={_executionId} reminder_id={_definition.Id} session_id={sessionId.Value} audience={audience} source=stored-definition"); - _materializer = Context.Materializer(namePrefix: "reminder-exec"); - - var materialized = await _pipeline.CreateAsync(sessionId, new SessionPipelineOptions - { - ChannelType = Channels.ChannelType.Reminder, - DefaultAudience = audience, - DefaultBoundary = SecurityPolicyDefaults.LocalDaemonBoundary, - DefaultPrincipal = PrincipalClassification.VerifiedAutomation, - DefaultProvenance = new SourceProvenance + var self = Self; + var inputQueue = await _handle.InitializeWithQueueAsync( + Context, + sessionId, + new SessionPipelineOptions { - TransportAuthenticity = TransportAuthenticity.LocalProcess, - PayloadTaint = PayloadTaint.Trusted, - SourceKind = "reminder" + ChannelType = Channels.ChannelType.Reminder, + DefaultAudience = audience, + DefaultBoundary = SecurityPolicyDefaults.LocalDaemonBoundary, + DefaultPrincipal = PrincipalClassification.VerifiedAutomation, + DefaultProvenance = new SourceProvenance + { + TransportAuthenticity = TransportAuthenticity.LocalProcess, + PayloadTaint = PayloadTaint.Trusted, + SourceKind = "reminder" + }, + Filter = OutputFilter.TextStreaming | OutputFilter.ToolCalls }, - Filter = OutputFilter.TextStreaming | OutputFilter.ToolCalls - }, materializer: _materializer); - - var self = Self; - - var inputQueue = Source.Queue(8, OverflowStrategy.Backpressure) - .ToMaterialized(materialized.Input, Keep.Left) - .Run(_materializer); - - materialized.Output - .To(Sink.ForEach(output => self.Tell(new ExecutionOutput(output)))) - .Run(_materializer); - - _session = materialized; + output => self.Tell(new ExecutionOutput(output))); var prompt = BuildPrompt(_definition); @@ -300,48 +291,34 @@ private static string BuildPrompt(ReminderDefinition definition) private void HandleOutput(ExecutionOutput wrapper) { - switch (wrapper.Output) + var action = _accumulator.ProcessOutput(wrapper.Output); + switch (action) { - case TextDeltaOutput delta: - _buffer.Append(delta.Delta); - _sawTextDelta = true; - break; - - case TextOutput text: - if (!_sawTextDelta) - _buffer.Append(text.Text); - break; - - case ToolResultOutput toolResult: - TrackNotificationResult(toolResult); - break; - - case BufferFlush: - // Reminder accumulates full text for final result -- no mid-turn flush needed. - break; - - case TurnCompleted: - var result = _buffer.ToString().Trim(); - var notifyFailureMessage = BuildNotifyFailureMessage(); + case OutputAction.TurnCompleted: + { + var result = _accumulator.GetAccumulatedText(); + var notifyFailureMessage = _accumulator.BuildNotifyFailureMessage( + !string.IsNullOrWhiteSpace(_definition.NotifyInstructions), + _definition.NotifyPolicy); var success = notifyFailureMessage is null; _log.Info( - $"ReminderExecution Completed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success={success} output_length={result.Length} notify_attempted={_notifyAttempted} notify_failed={_notifyFailed} dispatched_at={_dispatchedAt} completed_at={_timeProvider.GetUtcNow()}"); - - if (string.IsNullOrWhiteSpace(result)) - result = $"(Reminder '{_definition.Title}' executed but produced no output)"; + $"ReminderExecution Completed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success={success} output_length={result.Length} notify_attempted={_accumulator.NotifyAttempted} notify_failed={_accumulator.NotifyFailed} dispatched_at={_dispatchedAt} completed_at={_timeProvider.GetUtcNow()}"); ReportAndStop(success, notifyFailureMessage); break; + } - case ErrorOutput err: + case OutputAction.Error: + { var completedAt = _timeProvider.GetUtcNow(); - var failedMsg = $"ReminderExecution Failed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success=false error_type={err.Category} error_message={err.Message} dispatched_at={_dispatchedAt} completed_at={completedAt}"; - if (err.Cause is not null) - _log.Error(err.Cause, "{0}\n{1}", failedMsg, err.Cause.ToString()); + var failedMsg = $"ReminderExecution Failed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success=false error_type={_accumulator.LastErrorCategory} error_message={_accumulator.LastErrorMessage} dispatched_at={_dispatchedAt} completed_at={completedAt}"; + if (_accumulator.LastErrorCause is not null) + _log.Error(_accumulator.LastErrorCause, "{0}\n{1}", failedMsg, _accumulator.LastErrorCause.ToString()); else _log.Warning("{0}", failedMsg); - ReportAndStop(false, err.Message); + ReportAndStop(false, _accumulator.LastErrorMessage); break; + } } } @@ -374,58 +351,6 @@ private void ReportAndStop(bool success, string? errorMessage = null) Context.Stop(Self); } - private void TrackNotificationResult(ToolResultOutput toolResult) - { - if (!string.Equals(toolResult.ToolName, "send_slack_message", StringComparison.Ordinal)) - return; - - _notifyAttempted = true; - - var result = toolResult.Result?.Trim() ?? string.Empty; - if (result.StartsWith("Error", StringComparison.OrdinalIgnoreCase)) - { - _notifyFailed = true; - _notifyFailureDetail = result; - _log.Warning( - "ReminderExecution NotifyFailed: execution_id={0} reminder_id={1} tool={2} call_id={3} detail={4}", - _executionId, - _definition.Id, - toolResult.ToolName, - toolResult.CallId, - result); - return; - } - - _notifyFailed = false; - _notifyFailureDetail = null; - _log.Info( - "ReminderExecution NotifySucceeded: execution_id={0} reminder_id={1} tool={2} call_id={3}", - _executionId, - _definition.Id, - toolResult.ToolName, - toolResult.CallId); - } - - private string? BuildNotifyFailureMessage() - { - if (string.IsNullOrWhiteSpace(_definition.NotifyInstructions)) - return null; - - if (!_notifyAttempted) - { - // Conditional policy: the LLM decided nothing warranted notification — that's OK. - if (_definition.NotifyPolicy == NotificationPolicy.Conditional) - return null; - - return "Notification instructions were provided but no notification tool was invoked."; - } - - if (_notifyFailed) - return _notifyFailureDetail ?? "Notification tool returned an unspecified error."; - - return null; - } - protected override void PostStop() { if (_pendingHistory is not null) @@ -443,8 +368,7 @@ protected override void PostStop() try { - _session?.DisposeAsync().AsTask().GetAwaiter().GetResult(); - _materializer?.Dispose(); + _handle.Dispose(); } catch (Exception ex) { diff --git a/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs b/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs index 0e19b8dd..392b4c44 100644 --- a/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs +++ b/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs @@ -1,4 +1,5 @@ using System.Text.RegularExpressions; +using Netclaw.Configuration; namespace Netclaw.Actors.Reminders; @@ -29,7 +30,7 @@ public static string Normalize(string id) public static ReminderId Generate(string title) { var slug = Normalize(title); - var suffix = Guid.NewGuid().ToString("N")[..6]; + var suffix = IdGen.Suffix(); return new ReminderId($"{slug}-{suffix}"); } } diff --git a/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs b/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs index b7a181b3..449b3d04 100644 --- a/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs +++ b/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs @@ -526,22 +526,19 @@ private async Task HandleExecutionCompletedAsync(ReminderExecutionCompleted comp FailurePauseThreshold, completed.ErrorMessage); - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "reminder.execution.failed", - Category = AlertType.ReminderExecutionFailed, - Summary = $"Reminder '{title}' execution failed: {completed.ErrorMessage}", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = completed.Id.Value, - Context = new Dictionary + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "reminder.execution.failed", + AlertType.ReminderExecutionFailed, + $"Reminder '{title}' execution failed: {completed.ErrorMessage}", + AlertSeverity.Warning, + source: completed.Id.Value, + context: new Dictionary { ["reminderId"] = completed.Id.Value, ["title"] = title, ["error"] = completed.ErrorMessage ?? "unknown", - } - }); + })); if (count >= FailurePauseThreshold) { @@ -549,22 +546,19 @@ private async Task HandleExecutionCompletedAsync(ReminderExecutionCompleted comp completed.Id.Value, FailurePauseThreshold); - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "reminder.auto_disabled", - Category = AlertType.ReminderAutoDisabled, - Summary = $"Reminder '{title}' disabled after {count} consecutive failures", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "critical", - Source = completed.Id.Value, - Context = new Dictionary + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "reminder.auto_disabled", + AlertType.ReminderAutoDisabled, + $"Reminder '{title}' disabled after {count} consecutive failures", + AlertSeverity.Critical, + source: completed.Id.Value, + context: new Dictionary { ["reminderId"] = completed.Id.Value, ["title"] = title, ["failureCount"] = count.ToString(), - } - }); + })); await DisableReminderInternalAsync(completed.Id); _failureCounts.Remove(completed.Id); diff --git a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs index c296461e..ee818ff6 100644 --- a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs +++ b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs @@ -2921,7 +2921,7 @@ private void BindTurnTelemetry(MessageSource? source) _activeMessageId = sourceMessageId; _activeTurnId = source?.TurnId ?? sourceMessageId - ?? Guid.NewGuid().ToString("N")[..8]; + ?? IdGen.ShortId(); _activeChannelType = source?.ChannelType; CrashContextSnapshot.Update( diff --git a/src/Netclaw.Channels.Slack/SlackChannel.cs b/src/Netclaw.Channels.Slack/SlackChannel.cs index cf227696..b5404776 100644 --- a/src/Netclaw.Channels.Slack/SlackChannel.cs +++ b/src/Netclaw.Channels.Slack/SlackChannel.cs @@ -172,17 +172,14 @@ public async Task StartAsync(CancellationToken cancellationToken) } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "channel.disconnected", - Category = AlertType.ChannelDisconnected, - Summary = $"Slack channel failed to connect: {ex.Message}", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = "slack", - Context = new Dictionary { ["channel"] = "slack" } - }); + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "channel.disconnected", + AlertType.ChannelDisconnected, + $"Slack channel failed to connect: {ex.Message}", + AlertSeverity.Warning, + source: "slack", + context: new Dictionary { ["channel"] = "slack" })); throw; } } diff --git a/src/Netclaw.Channels.Slack/SlackConversationActor.cs b/src/Netclaw.Channels.Slack/SlackConversationActor.cs index 6a5274cb..a808d44d 100644 --- a/src/Netclaw.Channels.Slack/SlackConversationActor.cs +++ b/src/Netclaw.Channels.Slack/SlackConversationActor.cs @@ -3,6 +3,7 @@ using Netclaw.Actors.Channels; using Netclaw.Actors.Protocol; using Netclaw.Channels.Telemetry; +using Netclaw.Configuration; namespace Netclaw.Channels.Slack; @@ -106,7 +107,7 @@ public SlackConversationActor(SlackChannelId conversationId, SlackGatewayDepende var sessionId = new SessionId($"{_conversationId.Value}/{threadTs.Value}"); var turnId = string.IsNullOrWhiteSpace(message.EventId.Value) - ? Guid.NewGuid().ToString("N")[..8] + ? IdGen.ShortId() : message.EventId.Value; var log = _log .WithContext("SlackThreadTs", threadTs.Value) diff --git a/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs b/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs index e24296bd..ab433be5 100644 --- a/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs +++ b/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs @@ -4,8 +4,6 @@ using Akka.Actor; using Akka.Event; using Akka.Persistence; -using Akka.Streams; -using Akka.Streams.Dsl; using Microsoft.Extensions.AI; using Netclaw.Actors.Channels; using Netclaw.Actors.Protocol; @@ -32,11 +30,7 @@ internal sealed class SlackThreadBindingActor : ReceivePersistentActor, IWithTim private PostResult? _lastFailedPost; private readonly List _pendingApprovalRequests = []; - private ActorMaterializer? _materializer; - private MaterializedSession? _session; - private ChannelWriter? _inputQueue; - private int _pipelineGeneration; - private bool _isReinitializing; + private readonly SessionPipelineHandle _handle; private bool _threadHistoryFetchAttempted; private SlackEventTs? _cursorTs; private static readonly object ReinitializeTimerKey = new(); @@ -59,6 +53,7 @@ public SlackThreadBindingActor( _threadTs = threadTs; _dependencies = dependencies; _promptInjectionDetector = dependencies.PromptInjectionDetector ?? new NullPromptInjectionDetector(); + _handle = new SessionPipelineHandle(dependencies.Pipeline, Context.GetLogger(), "slack-thread"); _log = Context.GetLogger() .WithContext("Adapter", "slack") .WithContext("SessionId", _sessionId.Value) @@ -89,9 +84,7 @@ protected override void PreStart() protected override void PostStop() { - _inputQueue?.TryComplete(); - _session?.DisposeAsync().AsTask().GetAwaiter().GetResult(); - _materializer?.Dispose(); + _handle.Dispose(); base.PostStop(); } @@ -128,7 +121,7 @@ private void Active() CommandAsync(HandleOutputAsync); Command(msg => { - if (msg.Generation != _pipelineGeneration) + if (msg.Generation != _handle.Generation) return; var reason = msg.Cause is null @@ -184,7 +177,7 @@ private async Task HandleTrustedReminderAsync(DeliverTrustedSessionTurn message) return; } - var writer = _inputQueue; + var writer = _handle.InputQueue; if (writer is null) { _log.Warning("Slack thread input queue is not initialized; rejecting Mode B reminder"); @@ -300,7 +293,7 @@ await ProcessInboundAttachmentsAsync( return; } - var writer = _inputQueue; + var writer = _handle.InputQueue; if (writer is null) { _log.Warning("Slack thread input queue is not initialized; dropping inbound message"); @@ -689,62 +682,35 @@ public sealed record Accepted(string Line, DataContent? Inline) : AttachmentInge public sealed record Rejected(string UserFacingReason) : AttachmentIngestResult; } + private SessionPipelineOptions BuildOptions() => new() + { + ChannelType = Actors.Channels.ChannelType.Slack, + DefaultAudience = TrustAudience.Public, + DefaultBoundary = SecurityPolicyDefaults.SlackWorkspaceBoundary, + DefaultPrincipal = PrincipalClassification.UntrustedExternal, + DefaultProvenance = new SourceProvenance + { + TransportAuthenticity = TransportAuthenticity.Verified, + PayloadTaint = PayloadTaint.Public, + SourceKind = "slack" + }, + Filter = OutputFilter.Text | OutputFilter.Files + }; + private async Task EnsureInitializedAsync() { - if (_session is not null) + if (_handle.IsInitialized) return; - _log.Info("Initializing Slack thread binding pipeline"); var self = Self; - - // Create a materializer scoped to this actor — all stream actors become - // children and are stopped automatically when this actor passivates. - _materializer = Context.Materializer(namePrefix: "slack-thread"); - using var initCts = new CancellationTokenSource(OperationTimeout); - var materialized = await _dependencies.Pipeline.CreateAsync( + await _handle.InitializeWithChannelAsync( + Context, _sessionId, - new SessionPipelineOptions - { - ChannelType = Actors.Channels.ChannelType.Slack, - DefaultAudience = TrustAudience.Public, - DefaultBoundary = SecurityPolicyDefaults.SlackWorkspaceBoundary, - DefaultPrincipal = PrincipalClassification.UntrustedExternal, - DefaultProvenance = new SourceProvenance - { - TransportAuthenticity = TransportAuthenticity.Verified, - PayloadTaint = PayloadTaint.Public, - SourceKind = "slack" - }, - Filter = OutputFilter.Text | OutputFilter.Files - }, - materializer: _materializer, - cancellationToken: initCts.Token); - - var inputQueue = Source.Channel(512, true) - .ToMaterialized(materialized.Input, Keep.Left) - .Run(_materializer); - - var generation = ++_pipelineGeneration; - var outputCompletion = materialized.Output - .ToMaterialized( - Sink.ForEach(output => self.Tell(new ThreadOutput(output))), - Keep.Right) - .Run(_materializer); - - _ = outputCompletion.ContinueWith(t => - { - var cause = t.IsFaulted ? t.Exception?.GetBaseException() : null; - self.Tell(new OutputStreamTerminated(generation, cause)); - }, - CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); - - _session = materialized; - _inputQueue = inputQueue; - - _log.Info("Slack thread binding pipeline initialized"); + BuildOptions(), + output => self.Tell(new ThreadOutput(output)), + (gen, cause) => self.Tell(new OutputStreamTerminated(gen, cause)), + initCts.Token); } private async Task BuildInputForInboundAsync( @@ -1035,40 +1001,12 @@ private static List MergeGapWithLiveContents( private async Task ReinitializePipelineAsync(string reason) { - if (_isReinitializing) - return; - - _isReinitializing = true; - try - { - _log.Warning("Reinitializing Slack thread pipeline: {Reason}", reason); - - _inputQueue?.TryComplete(); - _inputQueue = null; - - if (_session is not null) - { - await _session.DisposeAsync(); - _session = null; - } - - _materializer?.Dispose(); - _materializer = null; - - await EnsureInitializedAsync(); - } - catch (Exception ex) - { - _log.Error(ex, "Slack thread pipeline reinitialization failed; scheduling retry"); - Timers.StartSingleTimer( + await _handle.ReinitializeAsync( + reason, + () => Timers.StartSingleTimer( ReinitializeTimerKey, new ReinitializePipeline("retry after failed reinit"), - TimeSpan.FromSeconds(2)); - } - finally - { - _isReinitializing = false; - } + TimeSpan.FromSeconds(2))); } private async Task HandleOutputAsync(ThreadOutput threadOutput) diff --git a/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs b/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs index f66a27b5..09a87f6f 100644 --- a/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs +++ b/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs @@ -71,11 +71,11 @@ private static List BuildFields(OperationalAlert alert) return fields; } - private static string SeverityEmoji(string severity) => severity.ToLowerInvariant() switch + private static string SeverityEmoji(AlertSeverity severity) => severity switch { - "critical" => ":red_circle:", - "warning" => ":warning:", - "info" => ":information_source:", + AlertSeverity.Critical => ":red_circle:", + AlertSeverity.Warning => ":warning:", + AlertSeverity.Info => ":information_source:", _ => ":grey_question:", }; } diff --git a/src/Netclaw.Cli/Doctor/CrashLogHelper.cs b/src/Netclaw.Cli/Doctor/CrashLogHelper.cs new file mode 100644 index 00000000..2d8315a7 --- /dev/null +++ b/src/Netclaw.Cli/Doctor/CrashLogHelper.cs @@ -0,0 +1,48 @@ +using System.Globalization; + +namespace Netclaw.Cli.Doctor; + +/// +/// Shared crash-log helpers used by +/// and . +/// +internal static class CrashLogHelper +{ + /// + /// Returns true if the daemon's PID file was written after the crash log, + /// indicating the daemon has restarted since the crash occurred. + /// + public static bool IsCrashLogStale(FileInfo crashLog, string pidFilePath) + { + var pidFile = new FileInfo(pidFilePath); + return pidFile.Exists && pidFile.LastWriteTimeUtc > crashLog.LastWriteTimeUtc; + } + + /// + /// Attempts to extract a UTC timestamp from a crash log filename with the format + /// crash-YYYYMMDD-HHMMSS.log (with optional suffixes after the timestamp). + /// Returns null if the filename does not match. + /// + public static DateTimeOffset? TryParseCrashTimestamp(string fileName) + { + var stem = Path.GetFileNameWithoutExtension(fileName); + const string prefix = "crash-"; + if (!stem.StartsWith(prefix, StringComparison.Ordinal)) + return null; + + var payload = stem[prefix.Length..]; + if (payload.Length < 15) + return null; + + var timestampPart = payload[..15]; + if (!DateTimeOffset.TryParseExact( + timestampPart, + "yyyyMMdd-HHmmss", + CultureInfo.InvariantCulture, + DateTimeStyles.AssumeUniversal, + out var parsed)) + return null; + + return parsed.ToUniversalTime(); + } +} diff --git a/src/Netclaw.Cli/Doctor/DaemonCrashDoctorCheck.cs b/src/Netclaw.Cli/Doctor/DaemonCrashDoctorCheck.cs index ee45cd70..0955a1e7 100644 --- a/src/Netclaw.Cli/Doctor/DaemonCrashDoctorCheck.cs +++ b/src/Netclaw.Cli/Doctor/DaemonCrashDoctorCheck.cs @@ -1,4 +1,3 @@ -using System.Globalization; using Netclaw.Configuration; namespace Netclaw.Cli.Doctor; @@ -23,11 +22,11 @@ public Task RunAsync(CancellationToken cancellationToken = de } var latest = recentCrashes[0]; - var occurredAt = TryParseCrashTimestamp(latest.Name) - ?.ToString("u", CultureInfo.InvariantCulture) - ?? latest.LastWriteTimeUtc.ToString("u", CultureInfo.InvariantCulture); + var occurredAt = CrashLogHelper.TryParseCrashTimestamp(latest.Name) + ?.ToString("u", System.Globalization.CultureInfo.InvariantCulture) + ?? latest.LastWriteTimeUtc.ToString("u", System.Globalization.CultureInfo.InvariantCulture); - var restartedSinceLatestCrash = IsCrashLogStale(latest, paths.PidFilePath); + var restartedSinceLatestCrash = CrashLogHelper.IsCrashLogStale(latest, paths.PidFilePath); var restartNote = restartedSinceLatestCrash ? "daemon appears to have restarted since this crash" : "daemon has not recorded a newer PID timestamp since this crash"; @@ -67,33 +66,4 @@ private static bool IsDaemonCrashLog(FileInfo crashLog) } } - private static bool IsCrashLogStale(FileInfo crashLog, string pidFilePath) - { - var pidFile = new FileInfo(pidFilePath); - return pidFile.Exists && pidFile.LastWriteTimeUtc > crashLog.LastWriteTimeUtc; - } - - private static DateTimeOffset? TryParseCrashTimestamp(string fileName) - { - // crash-YYYYMMDD-HHMMSS.log (+ optional suffixes) - var stem = Path.GetFileNameWithoutExtension(fileName); - const string prefix = "crash-"; - if (!stem.StartsWith(prefix, StringComparison.Ordinal)) - return null; - - var payload = stem[prefix.Length..]; - if (payload.Length < 15) - return null; - - var timestampPart = payload[..15]; - if (!DateTimeOffset.TryParseExact( - timestampPart, - "yyyyMMdd-HHmmss", - CultureInfo.InvariantCulture, - DateTimeStyles.AssumeUniversal, - out var parsed)) - return null; - - return parsed.ToUniversalTime(); - } } diff --git a/src/Netclaw.Cli/Doctor/DoctorFixService.cs b/src/Netclaw.Cli/Doctor/DoctorFixService.cs index 2ddbe06b..8b56d44a 100644 --- a/src/Netclaw.Cli/Doctor/DoctorFixService.cs +++ b/src/Netclaw.Cli/Doctor/DoctorFixService.cs @@ -40,7 +40,7 @@ public Task BuildPlanAsync(CancellationToken cancellationToken = appliedFixes.Add("configVersion"); } - if (obj["Slack"] is JsonObject slack && ReadBool(slack, "Enabled")) + if (obj["Slack"] is JsonObject slack && DoctorJsonConfigReader.ReadBool(slack, "Enabled")) { var hasAllowedChannels = slack["AllowedChannelIds"] is JsonArray { Count: > 0 }; var hasDefaultChannel = !string.IsNullOrWhiteSpace(slack["DefaultChannelId"]?.GetValue()) @@ -53,7 +53,7 @@ public Task BuildPlanAsync(CancellationToken cancellationToken = } } - if (obj["Telemetry"] is JsonObject telemetry && ReadBool(telemetry, "Enabled")) + if (obj["Telemetry"] is JsonObject telemetry && DoctorJsonConfigReader.ReadBool(telemetry, "Enabled")) { telemetry["Otlp"] ??= new JsonObject(); if (telemetry["Otlp"] is JsonObject otlp @@ -150,8 +150,6 @@ public async Task ApplyAsync(DoctorFixPlan plan, CancellationToken cancellationT await File.WriteAllTextAsync(fix.FilePath, fix.UpdatedText, cancellationToken); } - private static bool ReadBool(JsonObject obj, string property) - => obj[property] is JsonValue v && v.TryGetValue(out var b) && b; } public sealed record DoctorFixPlan(IReadOnlyList Fixes) diff --git a/src/Netclaw.Cli/Doctor/DoctorJsonConfigReader.cs b/src/Netclaw.Cli/Doctor/DoctorJsonConfigReader.cs index 7c2580a6..b97731e7 100644 --- a/src/Netclaw.Cli/Doctor/DoctorJsonConfigReader.cs +++ b/src/Netclaw.Cli/Doctor/DoctorJsonConfigReader.cs @@ -5,6 +5,28 @@ namespace Netclaw.Cli.Doctor; internal static class DoctorJsonConfigReader { + /// + /// Reads a boolean property from a . Returns false + /// if the property is missing, not a boolean, or not true. + /// + public static bool ReadBool(JsonObject obj, string property) + => obj[property] is JsonValue v && v.TryGetValue(out var b) && b; + + /// + /// Reads a string array property from a . Returns an empty + /// list if the property is missing or not an array. + /// + public static List ReadStringArray(JsonObject obj, string property) + { + if (obj[property] is not JsonArray arr) + return []; + + return arr.Select(v => v?.GetValue()) + .Where(s => !string.IsNullOrWhiteSpace(s)) + .Cast() + .ToList(); + } + public static (JsonObject? Root, DoctorCheckResult? Error) TryReadConfig(NetclawPaths paths) { if (!File.Exists(paths.NetclawConfigPath)) diff --git a/src/Netclaw.Cli/Doctor/SlackAclDoctorCheck.cs b/src/Netclaw.Cli/Doctor/SlackAclDoctorCheck.cs index cc7e3c73..2256e879 100644 --- a/src/Netclaw.Cli/Doctor/SlackAclDoctorCheck.cs +++ b/src/Netclaw.Cli/Doctor/SlackAclDoctorCheck.cs @@ -11,10 +11,10 @@ public Task RunAsync(CancellationToken cancellationToken = de if (readError is not null) return Task.FromResult(DoctorCheckResult.Pass("Slack ACL", "Skipped (base config is missing or invalid).")); - if (root!["Slack"] is not JsonObject slack || !ReadBool(slack, "Enabled")) + if (root!["Slack"] is not JsonObject slack || !DoctorJsonConfigReader.ReadBool(slack, "Enabled")) return Task.FromResult(DoctorCheckResult.Pass("Slack ACL", "Slack connector disabled or not configured.")); - var hasAllowedChannels = ReadStringArray(slack, "AllowedChannelIds").Count > 0; + var hasAllowedChannels = DoctorJsonConfigReader.ReadStringArray(slack, "AllowedChannelIds").Count > 0; var hasDefaultChannel = !string.IsNullOrWhiteSpace(slack["DefaultChannelId"]?.GetValue()) || !string.IsNullOrWhiteSpace(slack["DefaultChannelName"]?.GetValue()); @@ -26,8 +26,8 @@ public Task RunAsync(CancellationToken cancellationToken = de "Set `Slack:AllowedChannelIds` or `Slack:DefaultChannelId`/`Slack:DefaultChannelName`.")); } - var allowDirectMessages = ReadBool(slack, "AllowDirectMessages"); - var allowedUserIds = ReadStringArray(slack, "AllowedUserIds"); + var allowDirectMessages = DoctorJsonConfigReader.ReadBool(slack, "AllowDirectMessages"); + var allowedUserIds = DoctorJsonConfigReader.ReadStringArray(slack, "AllowedUserIds"); if (allowDirectMessages && allowedUserIds.Count == 0) { @@ -39,15 +39,4 @@ public Task RunAsync(CancellationToken cancellationToken = de return Task.FromResult(DoctorCheckResult.Pass("Slack ACL", "Slack channel policy has explicit channel scope.")); } - - private static bool ReadBool(JsonObject obj, string property) - => obj[property] is JsonValue v && v.TryGetValue(out var b) && b; - - private static List ReadStringArray(JsonObject obj, string property) - { - if (obj[property] is not JsonArray arr) - return []; - - return arr.Select(v => v?.GetValue()).Where(s => !string.IsNullOrWhiteSpace(s)).Cast().ToList(); - } } diff --git a/src/Netclaw.Cli/Doctor/SlackAuthDoctorCheck.cs b/src/Netclaw.Cli/Doctor/SlackAuthDoctorCheck.cs index d79c203d..953d83ad 100644 --- a/src/Netclaw.Cli/Doctor/SlackAuthDoctorCheck.cs +++ b/src/Netclaw.Cli/Doctor/SlackAuthDoctorCheck.cs @@ -16,7 +16,7 @@ public async Task RunAsync(CancellationToken cancellationToke if (readError is not null) return DoctorCheckResult.Pass(CheckName, "Skipped (base config is missing or invalid)."); - if (root!["Slack"] is not JsonObject slack || !ReadBool(slack, "Enabled")) + if (root!["Slack"] is not JsonObject slack || !DoctorJsonConfigReader.ReadBool(slack, "Enabled")) return DoctorCheckResult.Pass(CheckName, "Slack is disabled."); // Read bot token from secrets.json @@ -74,6 +74,4 @@ private static (string? Token, string? Error) ReadBotToken(NetclawPaths paths) } } - private static bool ReadBool(JsonObject obj, string property) - => obj[property] is JsonValue v && v.TryGetValue(out var b) && b; } diff --git a/src/Netclaw.Cli/Doctor/SqliteProvisioningDoctorCheck.cs b/src/Netclaw.Cli/Doctor/SqliteProvisioningDoctorCheck.cs index ac862259..22d3b975 100644 --- a/src/Netclaw.Cli/Doctor/SqliteProvisioningDoctorCheck.cs +++ b/src/Netclaw.Cli/Doctor/SqliteProvisioningDoctorCheck.cs @@ -1,4 +1,3 @@ -using System.Globalization; using System.Text.Json.Nodes; using Netclaw.Configuration; @@ -46,16 +45,16 @@ public Task RunAsync(CancellationToken cancellationToken = de } // If the daemon was restarted after the crash, the crash log is stale. - if (IsCrashLogStale(latestCrash, paths.PidFilePath)) + if (CrashLogHelper.IsCrashLogStale(latestCrash, paths.PidFilePath)) { return Task.FromResult(DoctorCheckResult.Pass( CheckName, $"SQLite crash detected ({latestCrash.Name}) but daemon has been restarted since.")); } - var occurredAt = TryParseCrashTimestamp(latestCrash.Name) - ?.ToString("u", CultureInfo.InvariantCulture) - ?? latestCrash.LastWriteTimeUtc.ToString("u", CultureInfo.InvariantCulture); + var occurredAt = CrashLogHelper.TryParseCrashTimestamp(latestCrash.Name) + ?.ToString("u", System.Globalization.CultureInfo.InvariantCulture) + ?? latestCrash.LastWriteTimeUtc.ToString("u", System.Globalization.CultureInfo.InvariantCulture); return Task.FromResult(DoctorCheckResult.Error( CheckName, @@ -100,33 +99,4 @@ private static bool LooksLikeSqliteProvisioningFailure(string crashText) || crashText.Contains("SchemaMigrator.MigrateAsync", StringComparison.Ordinal); } - /// - /// Returns true if the PID file was written after the crash log, meaning the daemon - /// was restarted since the crash and the crash log is stale. - /// - private static bool IsCrashLogStale(FileInfo crashLog, string pidFilePath) - { - var pidFile = new FileInfo(pidFilePath); - return pidFile.Exists && pidFile.LastWriteTimeUtc > crashLog.LastWriteTimeUtc; - } - - private static DateTimeOffset? TryParseCrashTimestamp(string fileName) - { - // crash-YYYYMMDD-HHMMSS.log - var stem = Path.GetFileNameWithoutExtension(fileName); - const string prefix = "crash-"; - if (!stem.StartsWith(prefix, StringComparison.Ordinal)) - return null; - - var payload = stem[prefix.Length..]; - if (!DateTimeOffset.TryParseExact( - payload, - "yyyyMMdd-HHmmss", - CultureInfo.InvariantCulture, - DateTimeStyles.AssumeUniversal, - out var parsed)) - return null; - - return parsed.ToUniversalTime(); - } } diff --git a/src/Netclaw.Cli/Doctor/TelemetryDoctorCheck.cs b/src/Netclaw.Cli/Doctor/TelemetryDoctorCheck.cs index bf23a1b3..bc5fc708 100644 --- a/src/Netclaw.Cli/Doctor/TelemetryDoctorCheck.cs +++ b/src/Netclaw.Cli/Doctor/TelemetryDoctorCheck.cs @@ -11,7 +11,7 @@ public Task RunAsync(CancellationToken cancellationToken = de if (readError is not null) return Task.FromResult(DoctorCheckResult.Pass("Telemetry", "Skipped (base config is missing or invalid).")); - if (root!["Telemetry"] is not JsonObject telemetry || !ReadBool(telemetry, "Enabled")) + if (root!["Telemetry"] is not JsonObject telemetry || !DoctorJsonConfigReader.ReadBool(telemetry, "Enabled")) return Task.FromResult(DoctorCheckResult.Pass("Telemetry", "Telemetry disabled or not configured.")); var endpoint = telemetry["Otlp"]?["Endpoint"]?.GetValue(); @@ -33,7 +33,4 @@ public Task RunAsync(CancellationToken cancellationToken = de return Task.FromResult(DoctorCheckResult.Pass("Telemetry", "Telemetry endpoint is valid.")); } - - private static bool ReadBool(JsonObject obj, string property) - => obj[property] is JsonValue v && v.TryGetValue(out var b) && b; } diff --git a/src/Netclaw.Cli/Tui/ModelManagerViewModel.cs b/src/Netclaw.Cli/Tui/ModelManagerViewModel.cs index fb88e5a9..6406b3c7 100644 --- a/src/Netclaw.Cli/Tui/ModelManagerViewModel.cs +++ b/src/Netclaw.Cli/Tui/ModelManagerViewModel.cs @@ -292,7 +292,7 @@ internal async Task ProbeProviderAsync() _probeCts = new CancellationTokenSource(); var ct = _probeCts.Token; var providerType = provider.Entry.Type; - var probeId = Guid.NewGuid().ToString("N")[..8]; + var probeId = IdGen.ShortId(); var stopwatch = Stopwatch.StartNew(); Exception? probeException = null; diff --git a/src/Netclaw.Cli/Tui/ProviderManagerViewModel.cs b/src/Netclaw.Cli/Tui/ProviderManagerViewModel.cs index f3ac22c2..7fe29d1f 100644 --- a/src/Netclaw.Cli/Tui/ProviderManagerViewModel.cs +++ b/src/Netclaw.Cli/Tui/ProviderManagerViewModel.cs @@ -719,7 +719,7 @@ internal async Task ProbeProviderAsync() _probeCts = new CancellationTokenSource(); var ct = _probeCts.Token; var providerType = NewProviderType ?? "unknown"; - var probeId = Guid.NewGuid().ToString("N")[..8]; + var probeId = IdGen.ShortId(); var stopwatch = Stopwatch.StartNew(); Exception? probeException = null; @@ -856,7 +856,7 @@ p.ConfiguredName is not null && return candidate; } - return $"my-{type}-{Guid.NewGuid().ToString("N")[..6]}"; + return $"my-{type}-{IdGen.Suffix()}"; } private void ClearAddState() diff --git a/src/Netclaw.Configuration.Tests/OperationalAlertCreateTests.cs b/src/Netclaw.Configuration.Tests/OperationalAlertCreateTests.cs new file mode 100644 index 00000000..329c46e4 --- /dev/null +++ b/src/Netclaw.Configuration.Tests/OperationalAlertCreateTests.cs @@ -0,0 +1,59 @@ +using Microsoft.Extensions.Time.Testing; +using Xunit; + +namespace Netclaw.Configuration.Tests; + +public sealed class OperationalAlertCreateTests +{ + [Fact] + public void Create_sets_all_properties_correctly() + { + var fakeTime = new FakeTimeProvider(new DateTimeOffset(2026, 4, 16, 12, 0, 0, TimeSpan.Zero)); + var context = new Dictionary { ["error"] = "timeout" }; + + var alert = OperationalAlert.Create( + fakeTime, + type: "provider.unreachable", + category: AlertType.ProviderUnreachable, + summary: "LLM provider unreachable", + severity: AlertSeverity.Critical, + source: "anthropic", + context: context); + + Assert.Equal("provider.unreachable", alert.Type); + Assert.Equal(AlertType.ProviderUnreachable, alert.Category); + Assert.Equal("LLM provider unreachable", alert.Summary); + Assert.Equal(AlertSeverity.Critical, alert.Severity); + Assert.Equal("anthropic", alert.Source); + Assert.Same(context, alert.Context); + Assert.Equal(fakeTime.GetUtcNow(), alert.Timestamp); + } + + [Fact] + public void Create_generates_12_char_AlertId() + { + var alert = OperationalAlert.Create( + TimeProvider.System, + type: "test.alert", + category: AlertType.DaemonStarted, + summary: "Test", + severity: AlertSeverity.Info); + + Assert.Equal(12, alert.AlertId.Length); + Assert.Matches("^[0-9a-f]{12}$", alert.AlertId); + } + + [Fact] + public void Create_defaults_source_and_context_to_null() + { + var alert = OperationalAlert.Create( + TimeProvider.System, + type: "test.alert", + category: AlertType.DaemonStarted, + summary: "Test", + severity: AlertSeverity.Info); + + Assert.Null(alert.Source); + Assert.Null(alert.Context); + } +} diff --git a/src/Netclaw.Configuration/IdGen.cs b/src/Netclaw.Configuration/IdGen.cs new file mode 100644 index 00000000..cb6a2d28 --- /dev/null +++ b/src/Netclaw.Configuration/IdGen.cs @@ -0,0 +1,20 @@ +namespace Netclaw.Configuration; + +/// +/// Centralized ID generation with purpose-named methods to replace scattered +/// Guid.NewGuid().ToString("N")[..N] patterns across the codebase. +/// +public static class IdGen +{ + /// 12-character alert ID for . + public static string AlertId() => Guid.NewGuid().ToString("N")[..12]; + + /// 8-character short ID for turn IDs, message IDs, and probe IDs. + public static string ShortId() => Guid.NewGuid().ToString("N")[..8]; + + /// 6-character suffix for reminder IDs and similar short suffixes. + public static string Suffix() => Guid.NewGuid().ToString("N")[..6]; + + /// Full 32-character hex string for state tokens, temp directories, and other non-truncated uses. + public static string Full() => Guid.NewGuid().ToString("N"); +} diff --git a/src/Netclaw.Configuration/OperationalAlert.cs b/src/Netclaw.Configuration/OperationalAlert.cs index d8de66fb..55ebab32 100644 --- a/src/Netclaw.Configuration/OperationalAlert.cs +++ b/src/Netclaw.Configuration/OperationalAlert.cs @@ -1,5 +1,15 @@ namespace Netclaw.Configuration; +/// +/// Severity levels for operational alerts. +/// +public enum AlertSeverity +{ + Info, + Warning, + Critical +} + /// /// Categories of operational alerts that can be emitted by daemon components. /// @@ -42,8 +52,8 @@ public sealed record OperationalAlert /// UTC timestamp when the event occurred. public required DateTimeOffset Timestamp { get; init; } - /// Severity level: "info", "warning", "critical". - public required string Severity { get; init; } + /// Severity level. + public required AlertSeverity Severity { get; init; } /// /// Stable source identifier for deduplication (e.g., MCP server name, channel name). @@ -62,4 +72,27 @@ public sealed record OperationalAlert /// Deduplication key built from Type and Source. /// public string DeduplicationKey => Source is not null ? $"{Type}:{Source}" : Type; + + /// + /// Creates an with a generated + /// and current timestamp from the provided . + /// + public static OperationalAlert Create( + TimeProvider timeProvider, + string type, + AlertType category, + string summary, + AlertSeverity severity, + string? source = null, + Dictionary? context = null) => new() + { + AlertId = IdGen.AlertId(), + Type = type, + Category = category, + Summary = summary, + Timestamp = timeProvider.GetUtcNow(), + Severity = severity, + Source = source, + Context = context + }; } diff --git a/src/Netclaw.Daemon.Tests/Services/BinaryUpdateCheckServiceTests.cs b/src/Netclaw.Daemon.Tests/Services/BinaryUpdateCheckServiceTests.cs index 5237e3d3..bb5a46f6 100644 --- a/src/Netclaw.Daemon.Tests/Services/BinaryUpdateCheckServiceTests.cs +++ b/src/Netclaw.Daemon.Tests/Services/BinaryUpdateCheckServiceTests.cs @@ -74,7 +74,7 @@ public async Task CheckAndNotify_EmitsUpdateAvailableAlert() var alert = sink.Alerts[0]; Assert.Equal(AlertType.UpdateAvailable, alert.Category); Assert.Equal("update.available", alert.Type); - Assert.Equal("info", alert.Severity); + Assert.Equal(AlertSeverity.Info, alert.Severity); Assert.Contains("0.2.0", alert.Summary); } diff --git a/src/Netclaw.Daemon.Tests/Services/DaemonLifecycleNotifierTests.cs b/src/Netclaw.Daemon.Tests/Services/DaemonLifecycleNotifierTests.cs index 651c92ee..011424d0 100644 --- a/src/Netclaw.Daemon.Tests/Services/DaemonLifecycleNotifierTests.cs +++ b/src/Netclaw.Daemon.Tests/Services/DaemonLifecycleNotifierTests.cs @@ -26,7 +26,7 @@ public void NotifyStarted_EmitsDaemonStartedAlert() var alert = Assert.Single(_sink.Alerts); Assert.Equal("daemon.started", alert.Type); Assert.Equal(AlertType.DaemonStarted, alert.Category); - Assert.Equal("info", alert.Severity); + Assert.Equal(AlertSeverity.Info, alert.Severity); Assert.NotNull(alert.Context); Assert.True(alert.Context.ContainsKey("pid")); Assert.Equal(Environment.ProcessId.ToString(), alert.Context["pid"]); @@ -40,7 +40,7 @@ public void NotifyShutdown_EmitsDaemonStoppingAlert() var alert = Assert.Single(_sink.Alerts); Assert.Equal("daemon.stopping", alert.Type); Assert.Equal(AlertType.DaemonStopping, alert.Category); - Assert.Equal("info", alert.Severity); + Assert.Equal(AlertSeverity.Info, alert.Severity); Assert.NotNull(alert.Context); Assert.Equal("cli-stop", alert.Context["reason"]); Assert.Equal(Environment.ProcessId.ToString(), alert.Context["pid"]); @@ -70,7 +70,7 @@ public void NotifyCrashing_EmitsCriticalAlert() var alert = Assert.Single(_sink.Alerts); Assert.Equal("daemon.crashing", alert.Type); Assert.Equal(AlertType.DaemonCrashed, alert.Category); - Assert.Equal("critical", alert.Severity); + Assert.Equal(AlertSeverity.Critical, alert.Severity); Assert.NotNull(alert.Context); Assert.Equal("daemon-unhandled", alert.Context["reason"]); Assert.Equal("/tmp/crash-20260414-182900.log", alert.Context["crashLogPath"]); diff --git a/src/Netclaw.Daemon.Tests/Services/SlackBlockKitWebhookFormatterTests.cs b/src/Netclaw.Daemon.Tests/Services/SlackBlockKitWebhookFormatterTests.cs index e2b7575f..39db24aa 100644 --- a/src/Netclaw.Daemon.Tests/Services/SlackBlockKitWebhookFormatterTests.cs +++ b/src/Netclaw.Daemon.Tests/Services/SlackBlockKitWebhookFormatterTests.cs @@ -12,7 +12,7 @@ public sealed class SlackBlockKitWebhookFormatterTests : IAsyncDisposable { private static OperationalAlert CreateAlert( string type = "mcp.auth.expired", - string severity = "warning", + AlertSeverity severity = AlertSeverity.Warning, string? source = null, Dictionary? context = null) { @@ -43,7 +43,7 @@ public void Build_ProducesPayloadWithTextField() Assert.True(root.TryGetProperty("text", out var text)); Assert.Contains("mcp.auth.expired", text.GetString()); - Assert.Contains("warning", text.GetString()); + Assert.Contains("Warning", text.GetString()); } [Fact] @@ -107,11 +107,10 @@ public void Build_OmitsContextBlock_WhenAlertContextIsNull() } [Theory] - [InlineData("critical", ":red_circle:")] - [InlineData("warning", ":warning:")] - [InlineData("info", ":information_source:")] - [InlineData("unknown", ":grey_question:")] - public void Build_SeverityEmoji_MapsCorrectly(string severity, string expectedEmoji) + [InlineData(AlertSeverity.Critical, ":red_circle:")] + [InlineData(AlertSeverity.Warning, ":warning:")] + [InlineData(AlertSeverity.Info, ":information_source:")] + public void Build_SeverityEmoji_MapsCorrectly(AlertSeverity severity, string expectedEmoji) { var alert = CreateAlert(severity: severity); diff --git a/src/Netclaw.Daemon.Tests/Services/WebhookNotificationServiceTests.cs b/src/Netclaw.Daemon.Tests/Services/WebhookNotificationServiceTests.cs index 78bf09a1..9294549c 100644 --- a/src/Netclaw.Daemon.Tests/Services/WebhookNotificationServiceTests.cs +++ b/src/Netclaw.Daemon.Tests/Services/WebhookNotificationServiceTests.cs @@ -21,7 +21,7 @@ private static OperationalAlert CreateAlert( Category = category, Summary = $"Test alert: {type}", Timestamp = DateTimeOffset.UtcNow, - Severity = "warning", + Severity = AlertSeverity.Warning, Source = source, Context = source is not null ? new Dictionary { ["serverName"] = source } diff --git a/src/Netclaw.Daemon/Configuration/AlertingChatClientDecorator.cs b/src/Netclaw.Daemon/Configuration/AlertingChatClientDecorator.cs index aa05b6f0..059d79b4 100644 --- a/src/Netclaw.Daemon/Configuration/AlertingChatClientDecorator.cs +++ b/src/Netclaw.Daemon/Configuration/AlertingChatClientDecorator.cs @@ -74,16 +74,13 @@ private async IAsyncEnumerable StreamWithAlertingAsync( private void EmitUnreachableAlert(Exception ex) { - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "provider.unreachable", - Category = AlertType.ProviderUnreachable, - Summary = "LLM provider unreachable — no fallback configured", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "critical", - Context = new Dictionary { ["error"] = ex.Message } - }); + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "provider.unreachable", + AlertType.ProviderUnreachable, + "LLM provider unreachable — no fallback configured", + AlertSeverity.Critical, + context: new Dictionary { ["error"] = ex.Message })); } public object? GetService(Type serviceType, object? serviceKey = null) diff --git a/src/Netclaw.Daemon/Configuration/FailoverChatClient.cs b/src/Netclaw.Daemon/Configuration/FailoverChatClient.cs index af518e78..45cc4d22 100644 --- a/src/Netclaw.Daemon/Configuration/FailoverChatClient.cs +++ b/src/Netclaw.Daemon/Configuration/FailoverChatClient.cs @@ -166,30 +166,24 @@ private async IAsyncEnumerable StreamWithFailoverAsync( private void EmitFailoverAlert(Exception ex) { - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "provider.failover", - Category = AlertType.ProviderFailover, - Summary = "Primary LLM provider failed, failing over to fallback", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Context = new Dictionary { ["error"] = ex.Message } - }); + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "provider.failover", + AlertType.ProviderFailover, + "Primary LLM provider failed, failing over to fallback", + AlertSeverity.Warning, + context: new Dictionary { ["error"] = ex.Message })); } private void EmitUnreachableAlert(Exception ex) { - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "provider.unreachable", - Category = AlertType.ProviderUnreachable, - Summary = "All LLM providers failed — primary and fallback both unreachable", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "critical", - Context = new Dictionary { ["error"] = ex.Message } - }); + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "provider.unreachable", + AlertType.ProviderUnreachable, + "All LLM providers failed — primary and fallback both unreachable", + AlertSeverity.Critical, + context: new Dictionary { ["error"] = ex.Message })); } public object? GetService(Type serviceType, object? serviceKey = null) diff --git a/src/Netclaw.Daemon/Gateway/SignalRSessionActor.cs b/src/Netclaw.Daemon/Gateway/SignalRSessionActor.cs index a6896e93..3043bc74 100644 --- a/src/Netclaw.Daemon/Gateway/SignalRSessionActor.cs +++ b/src/Netclaw.Daemon/Gateway/SignalRSessionActor.cs @@ -1,8 +1,6 @@ using System.Threading.Channels; using Akka.Actor; using Akka.Event; -using Akka.Streams; -using Akka.Streams.Dsl; using Microsoft.AspNetCore.SignalR; using Netclaw.Actors.Channels; using Netclaw.Actors.Protocol; @@ -12,28 +10,19 @@ namespace Netclaw.Daemon.Gateway; /// /// Per-session binding actor for SignalR connections. -/// Owns a scoped to this actor context so that +/// Uses for pipeline lifecycle so that /// all Akka.Streams stage actors become children and are automatically stopped -/// when this actor stops — eliminating the StreamSupervisor actor leak that -/// occurred when streams were materialized at the system level. +/// when this actor stops. /// -/// -/// Mirrors the pattern used by SlackThreadBindingActor for Slack sessions. -/// internal sealed class SignalRSessionActor : ReceiveActor, IWithUnboundedStash, IWithTimers { private readonly SessionId _sessionId; - private readonly ISessionPipeline _pipeline; private readonly IHubContext _hubContext; private readonly ILoggingAdapter _log; - private ActorMaterializer? _materializer; - private MaterializedSession? _session; - private ChannelWriter? _inputQueue; + private readonly SessionPipelineHandle _handle; private SignalRConnectionId _currentConnectionId; private Actors.Channels.ChannelType _channelType = Actors.Channels.ChannelType.Tui; - private int _pipelineGeneration; - private bool _isReinitializing; private static readonly TimeSpan PipelineInitTimeout = TimeSpan.FromSeconds(15); private static readonly object ReinitializeTimerKey = new(); @@ -47,11 +36,11 @@ public SignalRSessionActor( IHubContext hubContext) { _sessionId = new SessionId(entityId); - _pipeline = pipeline; _hubContext = hubContext; _log = Context.GetLogger() .WithContext("Adapter", "signalr") .WithContext("SessionId", _sessionId.Value); + _handle = new SessionPipelineHandle(pipeline, _log, "signalr"); Initializing(); } @@ -60,6 +49,20 @@ public static Props CreateProps(string entityId, ISessionPipeline pipeline, IHubContext hubContext) => Props.Create(() => new SignalRSessionActor(entityId, pipeline, hubContext)); + private SessionPipelineOptions BuildOptions() => new() + { + ChannelType = _channelType, + DefaultAudience = TrustAudience.Personal, + DefaultBoundary = SecurityPolicyDefaults.LocalDaemonBoundary, + DefaultPrincipal = PrincipalClassification.Operator, + DefaultProvenance = new SourceProvenance + { + TransportAuthenticity = TransportAuthenticity.LocalProcess, + PayloadTaint = PayloadTaint.Trusted, + SourceKind = "signalr" + } + }; + private void Initializing() { ReceiveAsync(async msg => @@ -80,10 +83,6 @@ private void Initializing() } }); - // A reminder can fire against a passivated session with no - // connected client, so we initialize the pipeline lazily without - // a prior StartSignalRSession. Output is dropped if no client is - // connected; the turn still persists via TurnRecorded. ReceiveAsync(async msg => { try @@ -114,7 +113,7 @@ private void Active() ReceiveAsync(async msg => { - var writer = _inputQueue; + var writer = _handle.InputQueue; if (writer is null) { _log.Warning("Input queue not initialized; dropping message for session {SessionId}", _sessionId.Value); @@ -157,7 +156,7 @@ private void Active() Receive(msg => { - if (msg.Generation != _pipelineGeneration) + if (msg.Generation != _handle.Generation) return; var reason = msg.Cause is null @@ -168,7 +167,15 @@ private void Active() Self.Tell(new ReinitializePipeline(reason)); }); - ReceiveAsync(async msg => await ReinitializePipelineAsync(msg.Reason)); + ReceiveAsync(async msg => + { + await _handle.ReinitializeAsync( + msg.Reason, + () => Timers.StartSingleTimer( + ReinitializeTimerKey, + new ReinitializePipeline("retry after failed reinit"), + TimeSpan.FromSeconds(2))); + }); Receive(_ => { @@ -180,7 +187,7 @@ private void Active() { var ackTarget = Sender; - var writer = _inputQueue; + var writer = _handle.InputQueue; if (writer is null) { _log.Warning( @@ -227,105 +234,26 @@ private void Active() }); } - protected override void PostStop() - { - _inputQueue?.TryComplete(); - _session?.DisposeAsync().AsTask().GetAwaiter().GetResult(); - _materializer?.Dispose(); - base.PostStop(); - } - private async Task EnsureInitializedAsync() { - if (_session is not null) + if (_handle.IsInitialized) return; - _log.Info("Initializing SignalR session pipeline"); var self = Self; - - _materializer = Context.Materializer(namePrefix: "signalr"); - using var initCts = new CancellationTokenSource(PipelineInitTimeout); - var materialized = await _pipeline.CreateAsync( + await _handle.InitializeWithChannelAsync( + Context, _sessionId, - new SessionPipelineOptions - { - ChannelType = _channelType, - DefaultAudience = TrustAudience.Personal, - DefaultBoundary = SecurityPolicyDefaults.LocalDaemonBoundary, - DefaultPrincipal = PrincipalClassification.Operator, - DefaultProvenance = new SourceProvenance - { - TransportAuthenticity = TransportAuthenticity.LocalProcess, - PayloadTaint = PayloadTaint.Trusted, - SourceKind = "signalr" - } - }, - materializer: _materializer, - cancellationToken: initCts.Token); - - var inputQueue = Source.Channel(512, true) - .ToMaterialized(materialized.Input, Keep.Left) - .Run(_materializer); - - var generation = ++_pipelineGeneration; - var outputCompletion = materialized.Output - .ToMaterialized( - Sink.ForEach(output => self.Tell(new OutputReceived(output))), - Keep.Right) - .Run(_materializer); - - _ = outputCompletion.ContinueWith(t => - { - var cause = t.IsFaulted ? t.Exception?.GetBaseException() : null; - self.Tell(new OutputStreamTerminated(generation, cause)); - }, - CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); - - _session = materialized; - _inputQueue = inputQueue; - - _log.Info("SignalR session pipeline initialized"); + BuildOptions(), + output => self.Tell(new OutputReceived(output)), + (gen, cause) => self.Tell(new OutputStreamTerminated(gen, cause)), + initCts.Token); } - private async Task ReinitializePipelineAsync(string reason) + protected override void PostStop() { - if (_isReinitializing) - return; - - _isReinitializing = true; - try - { - _log.Warning("Reinitializing SignalR session pipeline: {Reason}", reason); - - _inputQueue?.TryComplete(); - _inputQueue = null; - - if (_session is not null) - { - await _session.DisposeAsync(); - _session = null; - } - - _materializer?.Dispose(); - _materializer = null; - - await EnsureInitializedAsync(); - } - catch (Exception ex) - { - _log.Error(ex, "SignalR pipeline reinitialization failed; scheduling retry"); - Timers.StartSingleTimer( - ReinitializeTimerKey, - new ReinitializePipeline("retry after failed reinit"), - TimeSpan.FromSeconds(2)); - } - finally - { - _isReinitializing = false; - } + _handle.Dispose(); + base.PostStop(); } // ─── Message protocol ─────────────────────────────────────────────────── @@ -364,13 +292,6 @@ internal sealed record ShutdownSignalRSession(SessionId SessionId) : ISignalRSes /// /// Routes messages to children by session ID. -/// Channel-internal messages implement -/// and match first. Upstream protocol messages (e.g. -/// for Mode B reminder -/// re-entry) implement the public -/// interface and are routed via the fallback pattern below — this keeps -/// internal while still allowing -/// shared protocol messages to reach the correct session actor. /// internal sealed class SignalRMessageExtractor : Akka.Cluster.Sharding.HashCodeMessageExtractor { diff --git a/src/Netclaw.Daemon/Mcp/McpClientManager.cs b/src/Netclaw.Daemon/Mcp/McpClientManager.cs index f129b895..9548ab05 100644 --- a/src/Netclaw.Daemon/Mcp/McpClientManager.cs +++ b/src/Netclaw.Daemon/Mcp/McpClientManager.cs @@ -606,36 +606,30 @@ internal static McpServerStatus CreateUnreachableStatus(McpServerName serverName private void EmitAuthAlert(McpServerName serverName, string summary, string reason) { - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "mcp.auth.expired", - Category = AlertType.McpAuthExpired, - Summary = summary, - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = serverName.Value, - Context = new Dictionary + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "mcp.auth.expired", + AlertType.McpAuthExpired, + summary, + AlertSeverity.Warning, + source: serverName.Value, + context: new Dictionary { ["serverName"] = serverName.Value, ["reason"] = reason, - } - }); + })); } private void EmitDisconnectedAlert(McpServerName serverName, string summary) { - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "mcp.server.disconnected", - Category = AlertType.McpServerDisconnected, - Summary = summary, - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = serverName.Value, - Context = new Dictionary { ["serverName"] = serverName.Value } - }); + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "mcp.server.disconnected", + AlertType.McpServerDisconnected, + summary, + AlertSeverity.Warning, + source: serverName.Value, + context: new Dictionary { ["serverName"] = serverName.Value })); } private static IEnumerable? ParseScopes(string? scopeString) diff --git a/src/Netclaw.Daemon/Mcp/McpOAuthService.cs b/src/Netclaw.Daemon/Mcp/McpOAuthService.cs index 4c0af6ee..4c2e95e8 100644 --- a/src/Netclaw.Daemon/Mcp/McpOAuthService.cs +++ b/src/Netclaw.Daemon/Mcp/McpOAuthService.cs @@ -346,21 +346,18 @@ public async Task CompleteAuthorizationAsync(string code, string state, Cancella { _logger.LogWarning("Access token expired for MCP server '{Name}' with no refresh token", serverName.Value); - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "mcp.auth.expired", - Category = AlertType.McpAuthExpired, - Summary = $"MCP server '{serverName.Value}' access token expired with no refresh token. Run: netclaw mcp auth {serverName.Value}", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = serverName.Value, - Context = new Dictionary + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "mcp.auth.expired", + AlertType.McpAuthExpired, + $"MCP server '{serverName.Value}' access token expired with no refresh token. Run: netclaw mcp auth {serverName.Value}", + AlertSeverity.Warning, + source: serverName.Value, + context: new Dictionary { ["serverName"] = serverName.Value, ["reason"] = "no_refresh_token", - } - }); + })); return null; } @@ -404,21 +401,18 @@ public async Task CompleteAuthorizationAsync(string code, string state, Cancella _tokens.TryRemove(serverName, out _); PersistTokens(); - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "mcp.auth.expired", - Category = AlertType.McpAuthExpired, - Summary = $"MCP server '{serverName.Value}' refresh token rejected (invalid_grant). Run: netclaw mcp auth {serverName.Value}", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = serverName.Value, - Context = new Dictionary + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "mcp.auth.expired", + AlertType.McpAuthExpired, + $"MCP server '{serverName.Value}' refresh token rejected (invalid_grant). Run: netclaw mcp auth {serverName.Value}", + AlertSeverity.Warning, + source: serverName.Value, + context: new Dictionary { ["serverName"] = serverName.Value, ["reason"] = "invalid_grant", - } - }); + })); return null; } diff --git a/src/Netclaw.Daemon/Services/BinaryUpdateCheckService.cs b/src/Netclaw.Daemon/Services/BinaryUpdateCheckService.cs index 23ef8db9..921ba11d 100644 --- a/src/Netclaw.Daemon/Services/BinaryUpdateCheckService.cs +++ b/src/Netclaw.Daemon/Services/BinaryUpdateCheckService.cs @@ -91,20 +91,17 @@ internal async Task CheckAndNotifyAsync(CancellationToken cancellationToken) private void EmitUpdateAlert(UpdateCheckResult result) { - _notificationSink?.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N"), - Type = "update.available", - Category = AlertType.UpdateAvailable, - Summary = $"Netclaw update available: {result.CurrentVersion} → {result.LatestVersion}. Run 'netclaw update' to upgrade.", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "info", - Source = result.LatestVersion, - Context = new Dictionary + _notificationSink?.Emit(OperationalAlert.Create( + _timeProvider, + "update.available", + AlertType.UpdateAvailable, + $"Netclaw update available: {result.CurrentVersion} → {result.LatestVersion}. Run 'netclaw update' to upgrade.", + AlertSeverity.Info, + source: result.LatestVersion, + context: new Dictionary { ["currentVersion"] = result.CurrentVersion, ["latestVersion"] = result.LatestVersion, - }, - }); + })); } } diff --git a/src/Netclaw.Daemon/Services/DaemonLifecycleNotifier.cs b/src/Netclaw.Daemon/Services/DaemonLifecycleNotifier.cs index 14194e5f..02c11883 100644 --- a/src/Netclaw.Daemon/Services/DaemonLifecycleNotifier.cs +++ b/src/Netclaw.Daemon/Services/DaemonLifecycleNotifier.cs @@ -33,20 +33,17 @@ public void NotifyStarted() var pid = Environment.ProcessId; _logger.LogInformation("Netclaw daemon started (PID {Pid})", pid); - _sink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "daemon.started", - Category = AlertType.DaemonStarted, - Severity = "info", - Summary = "Netclaw daemon started", - Timestamp = _timeProvider.GetUtcNow(), - Source = pid.ToString(CultureInfo.InvariantCulture), - Context = new Dictionary + _sink.Emit(OperationalAlert.Create( + _timeProvider, + type: "daemon.started", + category: AlertType.DaemonStarted, + summary: "Netclaw daemon started", + severity: AlertSeverity.Info, + source: pid.ToString(CultureInfo.InvariantCulture), + context: new Dictionary { ["pid"] = pid.ToString(CultureInfo.InvariantCulture), - }, - }); + })); } /// @@ -72,17 +69,14 @@ public void NotifyShutdown(string reason, IReadOnlyDictionary? a context[pair.Key] = pair.Value; } - _sink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "daemon.stopping", - Category = AlertType.DaemonStopping, - Severity = "info", - Summary = $"Netclaw daemon stopping: {reason}", - Timestamp = _timeProvider.GetUtcNow(), - Source = pid.ToString(CultureInfo.InvariantCulture), - Context = context, - }); + _sink.Emit(OperationalAlert.Create( + _timeProvider, + type: "daemon.stopping", + category: AlertType.DaemonStopping, + summary: $"Netclaw daemon stopping: {reason}", + severity: AlertSeverity.Info, + source: pid.ToString(CultureInfo.InvariantCulture), + context: context)); } /// @@ -127,17 +121,14 @@ public void NotifyCrashing( try { - _sink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "daemon.crashing", - Category = AlertType.DaemonCrashed, - Severity = "critical", - Summary = $"Netclaw daemon crashing: {reason} ({exceptionType})", - Timestamp = _timeProvider.GetUtcNow(), - Source = pid.ToString(CultureInfo.InvariantCulture), - Context = context, - }); + _sink.Emit(OperationalAlert.Create( + _timeProvider, + type: "daemon.crashing", + category: AlertType.DaemonCrashed, + summary: $"Netclaw daemon crashing: {reason} ({exceptionType})", + severity: AlertSeverity.Critical, + source: pid.ToString(CultureInfo.InvariantCulture), + context: context)); } catch (Exception ex) { diff --git a/src/Netclaw.Daemon/Services/WebhookNotificationService.cs b/src/Netclaw.Daemon/Services/WebhookNotificationService.cs index cb16c087..527541dc 100644 --- a/src/Netclaw.Daemon/Services/WebhookNotificationService.cs +++ b/src/Netclaw.Daemon/Services/WebhookNotificationService.cs @@ -230,7 +230,7 @@ private static JsonContent BuildContent(WebhookTarget target, OperationalAlert a { AlertId = alert.AlertId, Type = alert.Type, - Severity = alert.Severity, + Severity = alert.Severity.ToString().ToLowerInvariant(), Summary = alert.Summary, Timestamp = alert.Timestamp, Source = "netclaw", diff --git a/src/Netclaw.Daemon/Webhooks/WebhookEndpointRouteBuilderExtensions.cs b/src/Netclaw.Daemon/Webhooks/WebhookEndpointRouteBuilderExtensions.cs index 37326557..3449d489 100644 --- a/src/Netclaw.Daemon/Webhooks/WebhookEndpointRouteBuilderExtensions.cs +++ b/src/Netclaw.Daemon/Webhooks/WebhookEndpointRouteBuilderExtensions.cs @@ -127,23 +127,20 @@ public static IEndpointRouteBuilder MapWebhookEndpoints(this IEndpointRouteBuild "Webhook accepted route={Route} reason={Reason} remote_ip={RemoteIp} delivery_id={DeliveryId} event_type={EventType}", registeredRoute.Name, "accepted", remoteIp, verification.DeliveryId, verification.EventType); - notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "webhook.received", - Category = AlertType.WebhookReceived, - Summary = $"Webhook '{registeredRoute.Name}' received event '{verification.EventType ?? "unknown"}'", - Timestamp = now, - Severity = "info", - Source = registeredRoute.Name, - Context = new Dictionary + notificationSink.Emit(OperationalAlert.Create( + timeProvider, + "webhook.received", + AlertType.WebhookReceived, + $"Webhook '{registeredRoute.Name}' received event '{verification.EventType ?? "unknown"}'", + AlertSeverity.Info, + source: registeredRoute.Name, + context: new Dictionary { ["route"] = registeredRoute.Name, ["event"] = verification.EventType ?? "unknown", ["deliveryId"] = verification.DeliveryId ?? "generated", ["sessionId"] = sessionId.Value, - } - }); + })); executionService.StartInvocation(invocation); return Results.Json( diff --git a/src/Netclaw.Daemon/Webhooks/WebhookExecutionActor.cs b/src/Netclaw.Daemon/Webhooks/WebhookExecutionActor.cs index 8c797407..1bf72199 100644 --- a/src/Netclaw.Daemon/Webhooks/WebhookExecutionActor.cs +++ b/src/Netclaw.Daemon/Webhooks/WebhookExecutionActor.cs @@ -1,33 +1,25 @@ -using System.Text; using Akka.Actor; using Akka.Event; -using Akka.Streams; -using Akka.Streams.Dsl; using Microsoft.Extensions.AI; using Netclaw.Actors.Channels; using Netclaw.Actors.Protocol; using Netclaw.Configuration; +using Netclaw.Tools; namespace Netclaw.Daemon.Webhooks; internal sealed class WebhookExecutionActor : ReceiveActor { private readonly WebhookInvocation _invocation; - private readonly ISessionPipeline _pipeline; private readonly WebhooksConfig _config; private readonly TimeProvider _timeProvider; private readonly ILoggingAdapter _log; private readonly DateTimeOffset _dispatchedAt; - private readonly StringBuilder _buffer = new(); - private bool _sawTextDelta; + private readonly SessionPipelineHandle _handle; + private static readonly ToolName NotificationTool = new("send_slack_message"); + private readonly ExecutionOutputAccumulator _accumulator = new(NotificationTool); private bool _completed; - private bool _notifyAttempted; - private bool _notifyFailed; - private string? _notifyFailureDetail; - - private ActorMaterializer? _materializer; - private MaterializedSession? _session; public static Props CreateProps( WebhookInvocation invocation, @@ -43,11 +35,11 @@ public WebhookExecutionActor( TimeProvider timeProvider) { _invocation = invocation; - _pipeline = pipeline; _config = config; _timeProvider = timeProvider; _dispatchedAt = timeProvider.GetUtcNow(); _log = Context.GetLogger(); + _handle = new SessionPipelineHandle(pipeline, _log, "webhook-exec"); Context.SetReceiveTimeout(TimeSpan.FromSeconds(_config.ExecutionTimeoutSeconds)); @@ -74,35 +66,27 @@ private async Task InitializeAsync() { try { - _materializer = Context.Materializer(namePrefix: "webhook-exec"); - - var materialized = await _pipeline.CreateAsync(_invocation.SessionId, new SessionPipelineOptions - { - ChannelType = ChannelType.Webhook, - DefaultAudience = _invocation.Route.Config.Audience, - DefaultBoundary = SecurityPolicyDefaults.ResolveBoundaryFromAudience(_invocation.Route.Config.Audience), - DefaultPrincipal = PrincipalClassification.VerifiedAutomation, - DefaultProvenance = new SourceProvenance + var self = Self; + var inputQueue = await _handle.InitializeWithQueueAsync( + Context, + _invocation.SessionId, + new SessionPipelineOptions { - TransportAuthenticity = TransportAuthenticity.Verified, - PayloadTaint = ToPayloadTaint(_invocation.Route.Config.Audience), - SourceKind = _invocation.EventType ?? _invocation.Route.Name, - SourceScope = _invocation.Route.Name + ChannelType = ChannelType.Webhook, + DefaultAudience = _invocation.Route.Config.Audience, + DefaultBoundary = SecurityPolicyDefaults.ResolveBoundaryFromAudience(_invocation.Route.Config.Audience), + DefaultPrincipal = PrincipalClassification.VerifiedAutomation, + DefaultProvenance = new SourceProvenance + { + TransportAuthenticity = TransportAuthenticity.Verified, + PayloadTaint = ToPayloadTaint(_invocation.Route.Config.Audience), + SourceKind = _invocation.EventType ?? _invocation.Route.Name, + SourceScope = _invocation.Route.Name + }, + Filter = OutputFilter.TextStreaming | OutputFilter.ToolCalls, + PromptOverlay = _invocation.Route.BuildPromptOverlay() }, - Filter = OutputFilter.TextStreaming | OutputFilter.ToolCalls, - PromptOverlay = _invocation.Route.BuildPromptOverlay() - }, materializer: _materializer); - - var self = Self; - var inputQueue = Source.Queue(8, OverflowStrategy.Backpressure) - .ToMaterialized(materialized.Input, Keep.Left) - .Run(_materializer); - - materialized.Output - .To(Sink.ForEach(output => self.Tell(new ExecutionOutput(output)))) - .Run(_materializer); - - _session = materialized; + output => self.Tell(new ExecutionOutput(output))); await inputQueue.OfferAsync(new ChannelInput { @@ -126,31 +110,19 @@ await inputQueue.OfferAsync(new ChannelInput private void HandleOutput(ExecutionOutput wrapper) { - switch (wrapper.Output) + var action = _accumulator.ProcessOutput(wrapper.Output); + switch (action) { - case TextDeltaOutput delta: - _buffer.Append(delta.Delta); - _sawTextDelta = true; - break; - - case TextOutput text: - if (!_sawTextDelta) - _buffer.Append(text.Text); - break; - - case ToolResultOutput toolResult: - TrackNotificationResult(toolResult); - break; - - case BufferFlush: - break; - - case TurnCompleted: - ReportAndStop(BuildNotifyFailureMessage() is null, BuildNotifyFailureMessage()); + case OutputAction.TurnCompleted: + { + var hasNotify = !string.IsNullOrWhiteSpace(_invocation.Route.BuildDefaultNotifyInstructions()) + || !string.IsNullOrWhiteSpace(_invocation.Route.Config.NotifyInstructions); + var failureMsg = _accumulator.BuildNotifyFailureMessage(hasNotify, _invocation.Route.Config.NotifyPolicy); + ReportAndStop(failureMsg is null, failureMsg); break; - - case ErrorOutput err: - ReportAndStop(false, err.Message); + } + case OutputAction.Error: + ReportAndStop(false, _accumulator.LastErrorMessage); break; } } @@ -173,52 +145,11 @@ private void ReportAndStop(bool success, string? errorMessage) Context.Stop(Self); } - private void TrackNotificationResult(ToolResultOutput toolResult) - { - if (!string.Equals(toolResult.ToolName, "send_slack_message", StringComparison.Ordinal)) - return; - - _notifyAttempted = true; - var result = toolResult.Result?.Trim() ?? string.Empty; - if (result.StartsWith("Error", StringComparison.OrdinalIgnoreCase)) - { - _notifyFailed = true; - _notifyFailureDetail = result; - return; - } - - _notifyFailed = false; - _notifyFailureDetail = null; - } - - private string? BuildNotifyFailureMessage() - { - if (string.IsNullOrWhiteSpace(_invocation.Route.BuildDefaultNotifyInstructions()) - && string.IsNullOrWhiteSpace(_invocation.Route.Config.NotifyInstructions)) - { - return null; - } - - if (!_notifyAttempted) - { - if (_invocation.Route.Config.NotifyPolicy == NotificationPolicy.Conditional) - return null; - - return "Notification instructions were provided but no notification tool was invoked."; - } - - if (_notifyFailed) - return _notifyFailureDetail ?? "Notification tool returned an unspecified error."; - - return null; - } - protected override void PostStop() { try { - _session?.DisposeAsync().AsTask().GetAwaiter().GetResult(); - _materializer?.Dispose(); + _handle.Dispose(); } catch (Exception ex) { diff --git a/src/Netclaw.Daemon/Webhooks/WebhookRouteCatalog.cs b/src/Netclaw.Daemon/Webhooks/WebhookRouteCatalog.cs index 72779a3f..fb7c5b92 100644 --- a/src/Netclaw.Daemon/Webhooks/WebhookRouteCatalog.cs +++ b/src/Netclaw.Daemon/Webhooks/WebhookRouteCatalog.cs @@ -236,22 +236,19 @@ private void RemoveRoute(string routeName, string reason, string filePath, bool if (!emitAlert) return; - _notificationSink.Emit(new OperationalAlert - { - AlertId = Guid.NewGuid().ToString("N")[..12], - Type = "webhook.route.invalid", - Category = AlertType.WebhookRouteInvalid, - Summary = $"Webhook route '{routeName}' is unavailable: {reason}", - Timestamp = _timeProvider.GetUtcNow(), - Severity = "warning", - Source = routeName, - Context = new Dictionary + _notificationSink.Emit(OperationalAlert.Create( + _timeProvider, + "webhook.route.invalid", + AlertType.WebhookRouteInvalid, + $"Webhook route '{routeName}' is unavailable: {reason}", + AlertSeverity.Warning, + source: routeName, + context: new Dictionary { ["route"] = routeName, ["file"] = filePath, ["reason"] = reason, - } - }); + })); } private string GetRouteFilePath(string routeName)