diff --git a/src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs b/src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs
new file mode 100644
index 00000000..bbc2acf1
--- /dev/null
+++ b/src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs
@@ -0,0 +1,203 @@
+using Netclaw.Actors.Channels;
+using Netclaw.Actors.Protocol;
+using Netclaw.Configuration;
+using Netclaw.Tools;
+using Xunit;
+
+namespace Netclaw.Actors.Tests.Channels;
+
+public sealed class ExecutionOutputAccumulatorTests
+{
+ private static readonly SessionId TestSessionId = new("test/session");
+ private static readonly ToolName TestNotifyTool = new("send_slack_message");
+
+ [Fact]
+ public void TextDeltaOutput_accumulates_text()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ var action1 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "Hello " });
+ var action2 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "world" });
+
+ Assert.Equal(OutputAction.Continue, action1);
+ Assert.Equal(OutputAction.Continue, action2);
+ Assert.Equal("Hello world", acc.GetAccumulatedText());
+ }
+
+ [Fact]
+ public void TextOutput_accumulates_when_no_prior_delta()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "Full text" });
+
+ Assert.Equal("Full text", acc.GetAccumulatedText());
+ }
+
+ [Fact]
+ public void TextOutput_ignored_after_TextDeltaOutput()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "streamed" });
+ acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "assembled" });
+
+ Assert.Equal("streamed", acc.GetAccumulatedText());
+ }
+
+ [Fact]
+ public void TurnCompleted_returns_TurnCompleted_action()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ var action = acc.ProcessOutput(new TurnCompleted { SessionId = TestSessionId, TurnNumber = 1 });
+
+ Assert.Equal(OutputAction.TurnCompleted, action);
+ }
+
+ [Fact]
+ public void ErrorOutput_returns_Error_action_and_stores_details()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+ var cause = new InvalidOperationException("inner");
+
+ var action = acc.ProcessOutput(new ErrorOutput
+ {
+ SessionId = TestSessionId,
+ Message = "Something failed",
+ Category = ErrorCategory.ProviderFailure,
+ Cause = cause
+ });
+
+ Assert.Equal(OutputAction.Error, action);
+ Assert.Equal("Something failed", acc.LastErrorMessage);
+ Assert.Equal(ErrorCategory.ProviderFailure, acc.LastErrorCategory);
+ Assert.Same(cause, acc.LastErrorCause);
+ }
+
+ [Fact]
+ public void BufferFlush_returns_Continue()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ var action = acc.ProcessOutput(new BufferFlush { SessionId = TestSessionId });
+
+ Assert.Equal(OutputAction.Continue, action);
+ }
+
+ [Fact]
+ public void Tracks_successful_notification_tool_result()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ acc.ProcessOutput(new ToolResultOutput
+ {
+ SessionId = TestSessionId,
+ CallId = "call-1",
+ ToolName = "send_slack_message",
+ Result = "Message sent to channel C1."
+ });
+
+ Assert.True(acc.NotifyAttempted);
+ Assert.False(acc.NotifyFailed);
+ }
+
+ [Fact]
+ public void Tracks_failed_notification_tool_result()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ acc.ProcessOutput(new ToolResultOutput
+ {
+ SessionId = TestSessionId,
+ CallId = "call-2",
+ ToolName = "send_slack_message",
+ Result = "Error: channel not found"
+ });
+
+ Assert.True(acc.NotifyAttempted);
+ Assert.True(acc.NotifyFailed);
+ }
+
+ [Fact]
+ public void Ignores_non_notification_tool_results()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ acc.ProcessOutput(new ToolResultOutput
+ {
+ SessionId = TestSessionId,
+ CallId = "call-3",
+ ToolName = "web_search",
+ Result = "Found 5 results"
+ });
+
+ Assert.False(acc.NotifyAttempted);
+ }
+
+ // ── BuildNotifyFailureMessage tests ──────────────────────────────────────
+
+ [Fact]
+ public void No_failure_when_no_notify_instructions()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: false, NotificationPolicy.Required);
+
+ Assert.Null(result);
+ }
+
+ [Fact]
+ public void Required_policy_fails_when_no_notification_sent()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required);
+
+ Assert.Contains("no notification tool was invoked", result);
+ }
+
+ [Fact]
+ public void Conditional_policy_succeeds_when_no_notification_sent()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+
+ var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional);
+
+ Assert.Null(result);
+ }
+
+ [Fact]
+ public void Succeeds_when_notification_attempted_and_succeeded()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+ acc.ProcessOutput(new ToolResultOutput
+ {
+ SessionId = TestSessionId,
+ CallId = "call-ok",
+ ToolName = "send_slack_message",
+ Result = "Message sent."
+ });
+
+ var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required);
+
+ Assert.Null(result);
+ }
+
+ [Fact]
+ public void Fails_when_notification_attempted_and_errored()
+ {
+ var acc = new ExecutionOutputAccumulator(TestNotifyTool);
+ acc.ProcessOutput(new ToolResultOutput
+ {
+ SessionId = TestSessionId,
+ CallId = "call-err",
+ ToolName = "send_slack_message",
+ Result = "Error: channel not found"
+ });
+
+ var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional);
+
+ Assert.Contains("channel not found", result);
+ }
+}
diff --git a/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs b/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs
index 028a0a20..d11a7b10 100644
--- a/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs
+++ b/src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs
@@ -1,4 +1,5 @@
using Akka.Actor;
+using Akka.Configuration;
using Akka.Hosting;
using Akka.Hosting.TestKit;
using Microsoft.Extensions.DependencyInjection;
@@ -15,6 +16,12 @@ namespace Netclaw.Actors.Tests.Channels;
public sealed class SlackActorHierarchyTests(ITestOutputHelper output) : TestKit(output: output)
{
+ // Raise the default ExpectMsg timeout from 3s to 5s to prevent flaky failures
+ // under CI ThreadPool pressure (multiple test classes spin up parallel IHost +
+ // ActorSystem instances, competing for ThreadPool threads).
+ protected override Config? Config =>
+ ConfigurationFactory.ParseString("akka.test.default-timeout = 5s");
+
protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services)
{
}
diff --git a/src/Netclaw.Actors.Tests/Channels/TestHelpers/FailingSessionPipeline.cs b/src/Netclaw.Actors.Tests/Channels/TestHelpers/FailingSessionPipeline.cs
new file mode 100644
index 00000000..dfac604c
--- /dev/null
+++ b/src/Netclaw.Actors.Tests/Channels/TestHelpers/FailingSessionPipeline.cs
@@ -0,0 +1,23 @@
+using Akka.Streams;
+using Netclaw.Actors.Channels;
+using Netclaw.Actors.Protocol;
+using Netclaw.Configuration;
+
+namespace Netclaw.Actors.Tests.Channels.TestHelpers;
+
+///
+/// Fake that throws a pre-configured exception
+/// on . Used to test initialization failure paths.
+///
+internal sealed class FailingSessionPipeline(Exception exception) : ISessionPipeline
+{
+ public Task CreateAsync(
+ SessionId sessionId,
+ SessionPipelineOptions options,
+ IMaterializer? materializer = null,
+ CancellationToken cancellationToken = default) =>
+ throw exception;
+
+ public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) =>
+ Task.CompletedTask;
+}
diff --git a/src/Netclaw.Actors.Tests/Channels/TestHelpers/ScriptedSessionPipeline.cs b/src/Netclaw.Actors.Tests/Channels/TestHelpers/ScriptedSessionPipeline.cs
new file mode 100644
index 00000000..61efad3a
--- /dev/null
+++ b/src/Netclaw.Actors.Tests/Channels/TestHelpers/ScriptedSessionPipeline.cs
@@ -0,0 +1,41 @@
+using Akka;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using Netclaw.Actors.Channels;
+using Netclaw.Actors.Protocol;
+using Netclaw.Configuration;
+
+namespace Netclaw.Actors.Tests.Channels.TestHelpers;
+
+///
+/// Fake that replays a scripted sequence of
+/// events. Input is discarded. Used by
+/// execution actor tests (reminders, webhooks) and pipeline handle tests.
+///
+internal sealed class ScriptedSessionPipeline(
+ Func> outputFactory) : ISessionPipeline
+{
+ public SessionPipelineOptions? CapturedOptions { get; private set; }
+
+ public Task CreateAsync(
+ SessionId sessionId,
+ SessionPipelineOptions options,
+ IMaterializer? materializer = null,
+ CancellationToken cancellationToken = default)
+ {
+ CapturedOptions = options;
+
+ var killSwitch = KillSwitches.Shared($"scripted-{sessionId.Value}");
+
+ var input = Sink.Ignore()
+ .MapMaterializedValue(_ => NotUsed.Instance);
+
+ var output = Source.From(outputFactory(sessionId).ToList())
+ .Via(killSwitch.Flow());
+
+ return Task.FromResult(new MaterializedSession(input, output, killSwitch));
+ }
+
+ public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) =>
+ Task.CompletedTask;
+}
diff --git a/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs b/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs
index f9023e31..494ce98b 100644
--- a/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs
+++ b/src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs
@@ -1,4 +1,3 @@
-using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Hosting;
using Akka.Hosting.TestKit;
@@ -178,18 +177,17 @@ public Task GetResponseAsync(
CancellationToken cancellationToken = default)
=> Task.FromException(new NotSupportedException("Streaming path only."));
- public async IAsyncEnumerable GetStreamingResponseAsync(
+ public IAsyncEnumerable GetStreamingResponseAsync(
IEnumerable messages,
ChatOptions? options = null,
- [EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await Task.Yield();
- // GetException() returns non-null, but the compiler cannot prove it,
- // so the null check keeps yield break reachable and avoids CS0162.
- var ex = GetException();
- if (ex is not null) throw ex;
- yield break;
- }
+ CancellationToken cancellationToken = default)
+ // Throw synchronously so the LlmCallFailed message is enqueued in
+ // the actor's mailbox before the actor finishes processing the
+ // SendUserMessage. Using Task.Yield() here defers the exception to
+ // the thread pool, creating a race where LlmCallFailed delivery
+ // depends on thread pool scheduling and can exceed ExpectMsgAsync
+ // timeouts under load.
+ => throw GetException();
private Exception GetException() => ThrowTimeout
? (Exception)new TimeoutException("Simulated provider timeout")
diff --git a/src/Netclaw.Actors/Channels/ChannelPipeline.cs b/src/Netclaw.Actors/Channels/ChannelPipeline.cs
index b4320e8d..ab7f38cc 100644
--- a/src/Netclaw.Actors/Channels/ChannelPipeline.cs
+++ b/src/Netclaw.Actors/Channels/ChannelPipeline.cs
@@ -302,7 +302,7 @@ private static SendUserMessage MapToCommand(
NetclawPaths paths)
{
var turnId = string.IsNullOrWhiteSpace(input.MessageId)
- ? Guid.NewGuid().ToString("N")[..8]
+ ? IdGen.ShortId()
: input.MessageId!;
var textParts = input.Contents.OfType().Select(t => t.Text);
diff --git a/src/Netclaw.Actors/Channels/ExecutionOutputAccumulator.cs b/src/Netclaw.Actors/Channels/ExecutionOutputAccumulator.cs
new file mode 100644
index 00000000..065979a0
--- /dev/null
+++ b/src/Netclaw.Actors/Channels/ExecutionOutputAccumulator.cs
@@ -0,0 +1,173 @@
+using System.Text;
+using Netclaw.Actors.Protocol;
+using Netclaw.Configuration;
+using Netclaw.Tools;
+
+namespace Netclaw.Actors.Channels;
+
+///
+/// What the caller should do after processing an output via
+/// .
+///
+public enum OutputAction
+{
+ /// Output was accumulated; no caller action needed.
+ Continue,
+
+ /// Turn completed; caller should finalize and stop.
+ TurnCompleted,
+
+ /// Error received; caller should report failure and stop.
+ Error
+}
+
+///
+/// Tracks output accumulation and notification result state for fire-and-forget
+/// execution actors (reminders, webhooks). Pure C# �� no Akka dependency.
+///
+public sealed class ExecutionOutputAccumulator
+{
+ private readonly ToolName _notificationToolName;
+ private readonly Action? _onNotifyTracked;
+ private readonly StringBuilder _buffer = new();
+ private bool _sawTextDelta;
+ private bool _notifyAttempted;
+ private bool _notifyFailed;
+ private string? _notifyFailureDetail;
+
+ ///
+ /// Creates an accumulator.
+ ///
+ ///
+ /// The tool whose results are tracked for notification success/failure.
+ /// The accumulator has no channel knowledge — callers supply the tool they care about.
+ ///
+ ///
+ /// Optional callback invoked when a matching tool result is processed.
+ /// Parameters: (toolName, callId, succeeded).
+ ///
+ public ExecutionOutputAccumulator(
+ ToolName notificationToolName,
+ Action? onNotifyTracked = null)
+ {
+ _notificationToolName = notificationToolName;
+ _onNotifyTracked = onNotifyTracked;
+ }
+
+ ///
+ /// The error message from the most recent ,
+ /// or null if no error has been received.
+ ///
+ public string? LastErrorMessage { get; private set; }
+
+ ///
+ /// The underlying exception from the most recent ,
+ /// or null if no error has been received or the error had no cause.
+ ///
+ public Exception? LastErrorCause { get; private set; }
+
+ ///
+ /// The of the most recent error, if any.
+ ///
+ public ErrorCategory? LastErrorCategory { get; private set; }
+
+ ///
+ /// Returns the accumulated text output, trimmed.
+ ///
+ public string GetAccumulatedText() => _buffer.ToString().Trim();
+
+ ///
+ /// Whether a notification tool was invoked during this execution.
+ ///
+ public bool NotifyAttempted => _notifyAttempted;
+
+ ///
+ /// Whether the most recent notification attempt failed.
+ ///
+ public bool NotifyFailed => _notifyFailed;
+
+ ///
+ /// Processes a and returns the action the caller should take.
+ ///
+ public OutputAction ProcessOutput(SessionOutput output)
+ {
+ switch (output)
+ {
+ case TextDeltaOutput delta:
+ _buffer.Append(delta.Delta);
+ _sawTextDelta = true;
+ return OutputAction.Continue;
+
+ case TextOutput text:
+ if (!_sawTextDelta)
+ _buffer.Append(text.Text);
+ return OutputAction.Continue;
+
+ case ToolResultOutput toolResult:
+ TrackNotificationResult(toolResult);
+ return OutputAction.Continue;
+
+ case BufferFlush:
+ return OutputAction.Continue;
+
+ case TurnCompleted:
+ return OutputAction.TurnCompleted;
+
+ case ErrorOutput err:
+ LastErrorMessage = err.Message;
+ LastErrorCause = err.Cause;
+ LastErrorCategory = err.Category;
+ return OutputAction.Error;
+
+ default:
+ return OutputAction.Continue;
+ }
+ }
+
+ ///
+ /// Evaluates notification policy to determine if the execution should be
+ /// considered a failure due to notification issues. Returns null on
+ /// success, or an error message string describing the notification failure.
+ ///
+ /// Whether notification instructions were configured.
+ /// The notification policy for this execution.
+ public string? BuildNotifyFailureMessage(bool hasNotifyInstructions, NotificationPolicy notifyPolicy)
+ {
+ if (!hasNotifyInstructions)
+ return null;
+
+ if (!_notifyAttempted)
+ {
+ if (notifyPolicy == NotificationPolicy.Conditional)
+ return null;
+
+ return "Notification instructions were provided but no notification tool was invoked.";
+ }
+
+ if (_notifyFailed)
+ return _notifyFailureDetail ?? "Notification tool returned an unspecified error.";
+
+ return null;
+ }
+
+ private void TrackNotificationResult(ToolResultOutput toolResult)
+ {
+ if (!string.Equals(toolResult.ToolName, _notificationToolName.Value, StringComparison.Ordinal))
+ return;
+
+ _notifyAttempted = true;
+
+ var result = toolResult.Result?.Trim() ?? string.Empty;
+ if (result.StartsWith("Error", StringComparison.OrdinalIgnoreCase))
+ {
+ _notifyFailed = true;
+ _notifyFailureDetail = result;
+ _onNotifyTracked?.Invoke(toolResult.ToolName, toolResult.CallId, false);
+ return;
+ }
+
+ _notifyFailed = false;
+ _notifyFailureDetail = null;
+ _onNotifyTracked?.Invoke(toolResult.ToolName, toolResult.CallId, true);
+ }
+}
diff --git a/src/Netclaw.Actors/Channels/SessionPipelineHandle.cs b/src/Netclaw.Actors/Channels/SessionPipelineHandle.cs
new file mode 100644
index 00000000..b59c75a1
--- /dev/null
+++ b/src/Netclaw.Actors/Channels/SessionPipelineHandle.cs
@@ -0,0 +1,221 @@
+using System.Threading.Channels;
+using Akka.Actor;
+using Akka.Event;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using Netclaw.Actors.Protocol;
+using Netclaw.Configuration;
+
+namespace Netclaw.Actors.Channels;
+
+///
+/// Manages the lifecycle of a materialized session pipeline on behalf of an
+/// owning actor. Not thread-safe — designed for use within a single actor context
+/// (no concurrent access). Supports two modes:
+///
+/// - Long-lived (with reinitialization) for binding actors (Slack, SignalR)
+/// - Short-lived (fire-and-forget) for execution actors (Reminders, Webhooks)
+///
+///
+public sealed class SessionPipelineHandle
+{
+ private readonly ISessionPipeline _pipeline;
+ private readonly ILoggingAdapter _log;
+ private readonly string _materializerNamePrefix;
+
+ private ActorMaterializer? _materializer;
+ private MaterializedSession? _session;
+ private ChannelWriter? _inputQueue;
+ private int _pipelineGeneration;
+ private bool _isReinitializing;
+
+ // Stored from first InitializeWithChannelAsync for reinit
+ private IActorContext? _storedContext;
+ private SessionId? _storedSessionId;
+ private SessionPipelineOptions? _storedOptions;
+ private Action? _storedOnOutput;
+ private Action? _storedOnStreamTerminated;
+
+ public SessionPipelineHandle(
+ ISessionPipeline pipeline,
+ ILoggingAdapter log,
+ string materializerNamePrefix)
+ {
+ _pipeline = pipeline;
+ _log = log;
+ _materializerNamePrefix = materializerNamePrefix;
+ }
+
+ /// The current pipeline generation, for OutputStreamTerminated filtering.
+ public int Generation => _pipelineGeneration;
+
+ /// The for long-lived actors to write input.
+ /// Null if not initialized or if initialized via queue mode.
+ public ChannelWriter? InputQueue => _inputQueue;
+
+ /// Whether the handle has been initialized (session is not null).
+ public bool IsInitialized => _session is not null;
+
+ ///
+ /// Idempotent pipeline creation for long-lived actors that use
+ /// for ongoing input. Stores all parameters for use by .
+ ///
+ public async Task> InitializeWithChannelAsync(
+ IActorContext context,
+ SessionId sessionId,
+ SessionPipelineOptions options,
+ Action onOutput,
+ Action onStreamTerminated,
+ CancellationToken cancellationToken = default)
+ {
+ if (_session is not null)
+ return _inputQueue!;
+
+ // Store for reinit
+ _storedContext = context;
+ _storedSessionId = sessionId;
+ _storedOptions = options;
+ _storedOnOutput = onOutput;
+ _storedOnStreamTerminated = onStreamTerminated;
+
+ _log.Info("Initializing {0} session pipeline", _materializerNamePrefix);
+
+ _materializer = context.Materializer(namePrefix: _materializerNamePrefix);
+
+ var materialized = await _pipeline.CreateAsync(
+ sessionId, options,
+ materializer: _materializer,
+ cancellationToken: cancellationToken);
+
+ var inputQueue = Source.Channel(512, true)
+ .ToMaterialized(materialized.Input, Keep.Left)
+ .Run(_materializer);
+
+ var generation = ++_pipelineGeneration;
+ var outputCompletion = materialized.Output
+ .ToMaterialized(
+ Sink.ForEach(onOutput),
+ Keep.Right)
+ .Run(_materializer);
+
+ _ = outputCompletion.ContinueWith(t =>
+ {
+ var cause = t.IsFaulted ? t.Exception?.GetBaseException() : null;
+ onStreamTerminated(generation, cause);
+ },
+ CancellationToken.None,
+ TaskContinuationOptions.ExecuteSynchronously,
+ TaskScheduler.Default);
+
+ _session = materialized;
+ _inputQueue = inputQueue;
+
+ _log.Info("{0} session pipeline initialized", _materializerNamePrefix);
+ return inputQueue;
+ }
+
+ ///
+ /// Pipeline creation for fire-and-forget execution actors that use
+ /// , offer input once, and complete.
+ /// Does not wire stream-terminated detection (the actor stops on TurnCompleted/ErrorOutput).
+ ///
+ public async Task> InitializeWithQueueAsync(
+ IActorContext context,
+ SessionId sessionId,
+ SessionPipelineOptions options,
+ Action onOutput,
+ CancellationToken cancellationToken = default)
+ {
+ _log.Info("Initializing {0} execution pipeline", _materializerNamePrefix);
+
+ _materializer = context.Materializer(namePrefix: _materializerNamePrefix);
+
+ var materialized = await _pipeline.CreateAsync(
+ sessionId, options,
+ materializer: _materializer,
+ cancellationToken: cancellationToken);
+
+ var inputQueue = Source.Queue(8, OverflowStrategy.Backpressure)
+ .ToMaterialized(materialized.Input, Keep.Left)
+ .Run(_materializer);
+
+ materialized.Output
+ .To(Sink.ForEach(onOutput))
+ .Run(_materializer);
+
+ _session = materialized;
+
+ _log.Info("{0} execution pipeline initialized", _materializerNamePrefix);
+ return inputQueue;
+ }
+
+ ///
+ /// Tears down the current pipeline and re-initializes using the parameters
+ /// stored from the original call.
+ /// Guards against concurrent reinitialization. On failure, invokes
+ /// .
+ ///
+ public async Task ReinitializeAsync(string reason, Action onReinitFailed)
+ {
+ if (_isReinitializing)
+ return;
+
+ if (_storedContext is null || _storedSessionId is null || _storedOptions is null
+ || _storedOnOutput is null || _storedOnStreamTerminated is null)
+ {
+ _log.Warning("Cannot reinitialize {0} pipeline: not yet initialized via channel mode", _materializerNamePrefix);
+ return;
+ }
+
+ _isReinitializing = true;
+ try
+ {
+ _log.Warning("Reinitializing {0} session pipeline: {1}", _materializerNamePrefix, reason);
+
+ _inputQueue?.TryComplete();
+ _inputQueue = null;
+
+ if (_session is not null)
+ {
+ await _session.DisposeAsync();
+ _session = null;
+ }
+
+ _materializer?.Dispose();
+ _materializer = null;
+
+ await InitializeWithChannelAsync(
+ _storedContext, _storedSessionId.Value, _storedOptions,
+ _storedOnOutput, _storedOnStreamTerminated);
+ }
+ catch (Exception ex)
+ {
+ _log.Error(ex, "{0} pipeline reinitialization failed; scheduling retry", _materializerNamePrefix);
+ onReinitFailed();
+ }
+ finally
+ {
+ _isReinitializing = false;
+ }
+ }
+
+ ///
+ /// Synchronous cleanup for actor PostStop. Completes input queue,
+ /// disposes session and materializer.
+ ///
+ public void Dispose()
+ {
+ _inputQueue?.TryComplete();
+
+ try
+ {
+ _session?.DisposeAsync().AsTask().GetAwaiter().GetResult();
+ }
+ catch (Exception ex)
+ {
+ _log.Debug(ex, "Failed to dispose {0} session during cleanup", _materializerNamePrefix);
+ }
+
+ _materializer?.Dispose();
+ }
+}
diff --git a/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs b/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs
index 85afbeaf..8226c80b 100644
--- a/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs
+++ b/src/Netclaw.Actors/Reminders/ReminderExecutionActor.cs
@@ -1,15 +1,13 @@
-using System.Text;
using Akka.Actor;
using Akka.Event;
using Akka.Hosting;
using Akka.Reminders;
-using Akka.Streams;
-using Akka.Streams.Dsl;
using Microsoft.Extensions.AI;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Hosting;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;
+using Netclaw.Tools;
namespace Netclaw.Actors.Reminders;
@@ -28,7 +26,6 @@ internal sealed class ReminderExecutionActor : ReceiveActor
private readonly Guid _executionId;
private readonly ReminderDefinition _definition;
- private readonly ISessionPipeline _pipeline;
private readonly ReminderHistoryStore _historyStore;
private readonly TimeProvider _timeProvider;
private readonly ReminderEnvelope? _envelope;
@@ -36,18 +33,12 @@ internal sealed class ReminderExecutionActor : ReceiveActor
private readonly DateTimeOffset _dispatchedAt;
private IReminderClient? _reminderClient;
- private readonly StringBuilder _buffer = new();
- private bool _sawTextDelta;
+ private readonly SessionPipelineHandle _handle;
+ private readonly ExecutionOutputAccumulator _accumulator;
private bool _completed;
- private bool _notifyAttempted;
- private bool _notifyFailed;
- private string? _notifyFailureDetail;
private string? _sessionIdValue;
private HistoryRecord? _pendingHistory;
- private ActorMaterializer? _materializer;
- private MaterializedSession? _session;
-
private bool IsModeB => _envelope is not null;
public static Props CreateProps(
@@ -69,12 +60,21 @@ public ReminderExecutionActor(
{
_executionId = executionId;
_definition = definition;
- _pipeline = pipeline;
_historyStore = historyStore;
_timeProvider = timeProvider;
_envelope = envelope;
_dispatchedAt = timeProvider.GetUtcNow();
_log = Context.GetLogger();
+ _handle = new SessionPipelineHandle(pipeline, _log, "reminder-exec");
+ _accumulator = new ExecutionOutputAccumulator(new ToolName("send_slack_message"), (tool, callId, succeeded) =>
+ {
+ if (succeeded)
+ _log.Info("ReminderExecution NotifySucceeded: execution_id={0} reminder_id={1} tool={2} call_id={3}",
+ _executionId, _definition.Id, tool, callId);
+ else
+ _log.Warning("ReminderExecution NotifyFailed: execution_id={0} reminder_id={1} tool={2} call_id={3}",
+ _executionId, _definition.Id, tool, callId);
+ });
Context.SetReceiveTimeout(TimeSpan.FromSeconds(ExecutionTimeoutSeconds));
@@ -119,34 +119,25 @@ private async Task InitializeAsync()
_log.Info(
$"ReminderExecution Initialized: execution_id={_executionId} reminder_id={_definition.Id} session_id={sessionId.Value} audience={audience} source=stored-definition");
- _materializer = Context.Materializer(namePrefix: "reminder-exec");
-
- var materialized = await _pipeline.CreateAsync(sessionId, new SessionPipelineOptions
- {
- ChannelType = Channels.ChannelType.Reminder,
- DefaultAudience = audience,
- DefaultBoundary = SecurityPolicyDefaults.LocalDaemonBoundary,
- DefaultPrincipal = PrincipalClassification.VerifiedAutomation,
- DefaultProvenance = new SourceProvenance
+ var self = Self;
+ var inputQueue = await _handle.InitializeWithQueueAsync(
+ Context,
+ sessionId,
+ new SessionPipelineOptions
{
- TransportAuthenticity = TransportAuthenticity.LocalProcess,
- PayloadTaint = PayloadTaint.Trusted,
- SourceKind = "reminder"
+ ChannelType = Channels.ChannelType.Reminder,
+ DefaultAudience = audience,
+ DefaultBoundary = SecurityPolicyDefaults.LocalDaemonBoundary,
+ DefaultPrincipal = PrincipalClassification.VerifiedAutomation,
+ DefaultProvenance = new SourceProvenance
+ {
+ TransportAuthenticity = TransportAuthenticity.LocalProcess,
+ PayloadTaint = PayloadTaint.Trusted,
+ SourceKind = "reminder"
+ },
+ Filter = OutputFilter.TextStreaming | OutputFilter.ToolCalls
},
- Filter = OutputFilter.TextStreaming | OutputFilter.ToolCalls
- }, materializer: _materializer);
-
- var self = Self;
-
- var inputQueue = Source.Queue(8, OverflowStrategy.Backpressure)
- .ToMaterialized(materialized.Input, Keep.Left)
- .Run(_materializer);
-
- materialized.Output
- .To(Sink.ForEach(output => self.Tell(new ExecutionOutput(output))))
- .Run(_materializer);
-
- _session = materialized;
+ output => self.Tell(new ExecutionOutput(output)));
var prompt = BuildPrompt(_definition);
@@ -300,48 +291,34 @@ private static string BuildPrompt(ReminderDefinition definition)
private void HandleOutput(ExecutionOutput wrapper)
{
- switch (wrapper.Output)
+ var action = _accumulator.ProcessOutput(wrapper.Output);
+ switch (action)
{
- case TextDeltaOutput delta:
- _buffer.Append(delta.Delta);
- _sawTextDelta = true;
- break;
-
- case TextOutput text:
- if (!_sawTextDelta)
- _buffer.Append(text.Text);
- break;
-
- case ToolResultOutput toolResult:
- TrackNotificationResult(toolResult);
- break;
-
- case BufferFlush:
- // Reminder accumulates full text for final result -- no mid-turn flush needed.
- break;
-
- case TurnCompleted:
- var result = _buffer.ToString().Trim();
- var notifyFailureMessage = BuildNotifyFailureMessage();
+ case OutputAction.TurnCompleted:
+ {
+ var result = _accumulator.GetAccumulatedText();
+ var notifyFailureMessage = _accumulator.BuildNotifyFailureMessage(
+ !string.IsNullOrWhiteSpace(_definition.NotifyInstructions),
+ _definition.NotifyPolicy);
var success = notifyFailureMessage is null;
_log.Info(
- $"ReminderExecution Completed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success={success} output_length={result.Length} notify_attempted={_notifyAttempted} notify_failed={_notifyFailed} dispatched_at={_dispatchedAt} completed_at={_timeProvider.GetUtcNow()}");
-
- if (string.IsNullOrWhiteSpace(result))
- result = $"(Reminder '{_definition.Title}' executed but produced no output)";
+ $"ReminderExecution Completed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success={success} output_length={result.Length} notify_attempted={_accumulator.NotifyAttempted} notify_failed={_accumulator.NotifyFailed} dispatched_at={_dispatchedAt} completed_at={_timeProvider.GetUtcNow()}");
ReportAndStop(success, notifyFailureMessage);
break;
+ }
- case ErrorOutput err:
+ case OutputAction.Error:
+ {
var completedAt = _timeProvider.GetUtcNow();
- var failedMsg = $"ReminderExecution Failed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success=false error_type={err.Category} error_message={err.Message} dispatched_at={_dispatchedAt} completed_at={completedAt}";
- if (err.Cause is not null)
- _log.Error(err.Cause, "{0}\n{1}", failedMsg, err.Cause.ToString());
+ var failedMsg = $"ReminderExecution Failed: execution_id={_executionId} reminder_id={_definition.Id} title={_definition.Title} success=false error_type={_accumulator.LastErrorCategory} error_message={_accumulator.LastErrorMessage} dispatched_at={_dispatchedAt} completed_at={completedAt}";
+ if (_accumulator.LastErrorCause is not null)
+ _log.Error(_accumulator.LastErrorCause, "{0}\n{1}", failedMsg, _accumulator.LastErrorCause.ToString());
else
_log.Warning("{0}", failedMsg);
- ReportAndStop(false, err.Message);
+ ReportAndStop(false, _accumulator.LastErrorMessage);
break;
+ }
}
}
@@ -374,58 +351,6 @@ private void ReportAndStop(bool success, string? errorMessage = null)
Context.Stop(Self);
}
- private void TrackNotificationResult(ToolResultOutput toolResult)
- {
- if (!string.Equals(toolResult.ToolName, "send_slack_message", StringComparison.Ordinal))
- return;
-
- _notifyAttempted = true;
-
- var result = toolResult.Result?.Trim() ?? string.Empty;
- if (result.StartsWith("Error", StringComparison.OrdinalIgnoreCase))
- {
- _notifyFailed = true;
- _notifyFailureDetail = result;
- _log.Warning(
- "ReminderExecution NotifyFailed: execution_id={0} reminder_id={1} tool={2} call_id={3} detail={4}",
- _executionId,
- _definition.Id,
- toolResult.ToolName,
- toolResult.CallId,
- result);
- return;
- }
-
- _notifyFailed = false;
- _notifyFailureDetail = null;
- _log.Info(
- "ReminderExecution NotifySucceeded: execution_id={0} reminder_id={1} tool={2} call_id={3}",
- _executionId,
- _definition.Id,
- toolResult.ToolName,
- toolResult.CallId);
- }
-
- private string? BuildNotifyFailureMessage()
- {
- if (string.IsNullOrWhiteSpace(_definition.NotifyInstructions))
- return null;
-
- if (!_notifyAttempted)
- {
- // Conditional policy: the LLM decided nothing warranted notification — that's OK.
- if (_definition.NotifyPolicy == NotificationPolicy.Conditional)
- return null;
-
- return "Notification instructions were provided but no notification tool was invoked.";
- }
-
- if (_notifyFailed)
- return _notifyFailureDetail ?? "Notification tool returned an unspecified error.";
-
- return null;
- }
-
protected override void PostStop()
{
if (_pendingHistory is not null)
@@ -443,8 +368,7 @@ protected override void PostStop()
try
{
- _session?.DisposeAsync().AsTask().GetAwaiter().GetResult();
- _materializer?.Dispose();
+ _handle.Dispose();
}
catch (Exception ex)
{
diff --git a/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs b/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs
index 0e19b8dd..392b4c44 100644
--- a/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs
+++ b/src/Netclaw.Actors/Reminders/ReminderIdGenerator.cs
@@ -1,4 +1,5 @@
using System.Text.RegularExpressions;
+using Netclaw.Configuration;
namespace Netclaw.Actors.Reminders;
@@ -29,7 +30,7 @@ public static string Normalize(string id)
public static ReminderId Generate(string title)
{
var slug = Normalize(title);
- var suffix = Guid.NewGuid().ToString("N")[..6];
+ var suffix = IdGen.Suffix();
return new ReminderId($"{slug}-{suffix}");
}
}
diff --git a/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs b/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs
index b7a181b3..449b3d04 100644
--- a/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs
+++ b/src/Netclaw.Actors/Reminders/ReminderManagerActor.cs
@@ -526,22 +526,19 @@ private async Task HandleExecutionCompletedAsync(ReminderExecutionCompleted comp
FailurePauseThreshold,
completed.ErrorMessage);
- _notificationSink.Emit(new OperationalAlert
- {
- AlertId = Guid.NewGuid().ToString("N")[..12],
- Type = "reminder.execution.failed",
- Category = AlertType.ReminderExecutionFailed,
- Summary = $"Reminder '{title}' execution failed: {completed.ErrorMessage}",
- Timestamp = _timeProvider.GetUtcNow(),
- Severity = "warning",
- Source = completed.Id.Value,
- Context = new Dictionary
+ _notificationSink.Emit(OperationalAlert.Create(
+ _timeProvider,
+ "reminder.execution.failed",
+ AlertType.ReminderExecutionFailed,
+ $"Reminder '{title}' execution failed: {completed.ErrorMessage}",
+ AlertSeverity.Warning,
+ source: completed.Id.Value,
+ context: new Dictionary
{
["reminderId"] = completed.Id.Value,
["title"] = title,
["error"] = completed.ErrorMessage ?? "unknown",
- }
- });
+ }));
if (count >= FailurePauseThreshold)
{
@@ -549,22 +546,19 @@ private async Task HandleExecutionCompletedAsync(ReminderExecutionCompleted comp
completed.Id.Value,
FailurePauseThreshold);
- _notificationSink.Emit(new OperationalAlert
- {
- AlertId = Guid.NewGuid().ToString("N")[..12],
- Type = "reminder.auto_disabled",
- Category = AlertType.ReminderAutoDisabled,
- Summary = $"Reminder '{title}' disabled after {count} consecutive failures",
- Timestamp = _timeProvider.GetUtcNow(),
- Severity = "critical",
- Source = completed.Id.Value,
- Context = new Dictionary
+ _notificationSink.Emit(OperationalAlert.Create(
+ _timeProvider,
+ "reminder.auto_disabled",
+ AlertType.ReminderAutoDisabled,
+ $"Reminder '{title}' disabled after {count} consecutive failures",
+ AlertSeverity.Critical,
+ source: completed.Id.Value,
+ context: new Dictionary
{
["reminderId"] = completed.Id.Value,
["title"] = title,
["failureCount"] = count.ToString(),
- }
- });
+ }));
await DisableReminderInternalAsync(completed.Id);
_failureCounts.Remove(completed.Id);
diff --git a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs
index c296461e..ee818ff6 100644
--- a/src/Netclaw.Actors/Sessions/LlmSessionActor.cs
+++ b/src/Netclaw.Actors/Sessions/LlmSessionActor.cs
@@ -2921,7 +2921,7 @@ private void BindTurnTelemetry(MessageSource? source)
_activeMessageId = sourceMessageId;
_activeTurnId = source?.TurnId
?? sourceMessageId
- ?? Guid.NewGuid().ToString("N")[..8];
+ ?? IdGen.ShortId();
_activeChannelType = source?.ChannelType;
CrashContextSnapshot.Update(
diff --git a/src/Netclaw.Channels.Slack/SlackChannel.cs b/src/Netclaw.Channels.Slack/SlackChannel.cs
index cf227696..b5404776 100644
--- a/src/Netclaw.Channels.Slack/SlackChannel.cs
+++ b/src/Netclaw.Channels.Slack/SlackChannel.cs
@@ -172,17 +172,14 @@ public async Task StartAsync(CancellationToken cancellationToken)
}
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
- _notificationSink.Emit(new OperationalAlert
- {
- AlertId = Guid.NewGuid().ToString("N")[..12],
- Type = "channel.disconnected",
- Category = AlertType.ChannelDisconnected,
- Summary = $"Slack channel failed to connect: {ex.Message}",
- Timestamp = _timeProvider.GetUtcNow(),
- Severity = "warning",
- Source = "slack",
- Context = new Dictionary { ["channel"] = "slack" }
- });
+ _notificationSink.Emit(OperationalAlert.Create(
+ _timeProvider,
+ "channel.disconnected",
+ AlertType.ChannelDisconnected,
+ $"Slack channel failed to connect: {ex.Message}",
+ AlertSeverity.Warning,
+ source: "slack",
+ context: new Dictionary { ["channel"] = "slack" }));
throw;
}
}
diff --git a/src/Netclaw.Channels.Slack/SlackConversationActor.cs b/src/Netclaw.Channels.Slack/SlackConversationActor.cs
index 6a5274cb..a808d44d 100644
--- a/src/Netclaw.Channels.Slack/SlackConversationActor.cs
+++ b/src/Netclaw.Channels.Slack/SlackConversationActor.cs
@@ -3,6 +3,7 @@
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Channels.Telemetry;
+using Netclaw.Configuration;
namespace Netclaw.Channels.Slack;
@@ -106,7 +107,7 @@ public SlackConversationActor(SlackChannelId conversationId, SlackGatewayDepende
var sessionId = new SessionId($"{_conversationId.Value}/{threadTs.Value}");
var turnId = string.IsNullOrWhiteSpace(message.EventId.Value)
- ? Guid.NewGuid().ToString("N")[..8]
+ ? IdGen.ShortId()
: message.EventId.Value;
var log = _log
.WithContext("SlackThreadTs", threadTs.Value)
diff --git a/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs b/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs
index e24296bd..ab433be5 100644
--- a/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs
+++ b/src/Netclaw.Channels.Slack/SlackThreadBindingActor.cs
@@ -4,8 +4,6 @@
using Akka.Actor;
using Akka.Event;
using Akka.Persistence;
-using Akka.Streams;
-using Akka.Streams.Dsl;
using Microsoft.Extensions.AI;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
@@ -32,11 +30,7 @@ internal sealed class SlackThreadBindingActor : ReceivePersistentActor, IWithTim
private PostResult? _lastFailedPost;
private readonly List _pendingApprovalRequests = [];
- private ActorMaterializer? _materializer;
- private MaterializedSession? _session;
- private ChannelWriter? _inputQueue;
- private int _pipelineGeneration;
- private bool _isReinitializing;
+ private readonly SessionPipelineHandle _handle;
private bool _threadHistoryFetchAttempted;
private SlackEventTs? _cursorTs;
private static readonly object ReinitializeTimerKey = new();
@@ -59,6 +53,7 @@ public SlackThreadBindingActor(
_threadTs = threadTs;
_dependencies = dependencies;
_promptInjectionDetector = dependencies.PromptInjectionDetector ?? new NullPromptInjectionDetector();
+ _handle = new SessionPipelineHandle(dependencies.Pipeline, Context.GetLogger(), "slack-thread");
_log = Context.GetLogger()
.WithContext("Adapter", "slack")
.WithContext("SessionId", _sessionId.Value)
@@ -89,9 +84,7 @@ protected override void PreStart()
protected override void PostStop()
{
- _inputQueue?.TryComplete();
- _session?.DisposeAsync().AsTask().GetAwaiter().GetResult();
- _materializer?.Dispose();
+ _handle.Dispose();
base.PostStop();
}
@@ -128,7 +121,7 @@ private void Active()
CommandAsync(HandleOutputAsync);
Command(msg =>
{
- if (msg.Generation != _pipelineGeneration)
+ if (msg.Generation != _handle.Generation)
return;
var reason = msg.Cause is null
@@ -184,7 +177,7 @@ private async Task HandleTrustedReminderAsync(DeliverTrustedSessionTurn message)
return;
}
- var writer = _inputQueue;
+ var writer = _handle.InputQueue;
if (writer is null)
{
_log.Warning("Slack thread input queue is not initialized; rejecting Mode B reminder");
@@ -300,7 +293,7 @@ await ProcessInboundAttachmentsAsync(
return;
}
- var writer = _inputQueue;
+ var writer = _handle.InputQueue;
if (writer is null)
{
_log.Warning("Slack thread input queue is not initialized; dropping inbound message");
@@ -689,62 +682,35 @@ public sealed record Accepted(string Line, DataContent? Inline) : AttachmentInge
public sealed record Rejected(string UserFacingReason) : AttachmentIngestResult;
}
+ private SessionPipelineOptions BuildOptions() => new()
+ {
+ ChannelType = Actors.Channels.ChannelType.Slack,
+ DefaultAudience = TrustAudience.Public,
+ DefaultBoundary = SecurityPolicyDefaults.SlackWorkspaceBoundary,
+ DefaultPrincipal = PrincipalClassification.UntrustedExternal,
+ DefaultProvenance = new SourceProvenance
+ {
+ TransportAuthenticity = TransportAuthenticity.Verified,
+ PayloadTaint = PayloadTaint.Public,
+ SourceKind = "slack"
+ },
+ Filter = OutputFilter.Text | OutputFilter.Files
+ };
+
private async Task EnsureInitializedAsync()
{
- if (_session is not null)
+ if (_handle.IsInitialized)
return;
- _log.Info("Initializing Slack thread binding pipeline");
var self = Self;
-
- // Create a materializer scoped to this actor — all stream actors become
- // children and are stopped automatically when this actor passivates.
- _materializer = Context.Materializer(namePrefix: "slack-thread");
-
using var initCts = new CancellationTokenSource(OperationTimeout);
- var materialized = await _dependencies.Pipeline.CreateAsync(
+ await _handle.InitializeWithChannelAsync(
+ Context,
_sessionId,
- new SessionPipelineOptions
- {
- ChannelType = Actors.Channels.ChannelType.Slack,
- DefaultAudience = TrustAudience.Public,
- DefaultBoundary = SecurityPolicyDefaults.SlackWorkspaceBoundary,
- DefaultPrincipal = PrincipalClassification.UntrustedExternal,
- DefaultProvenance = new SourceProvenance
- {
- TransportAuthenticity = TransportAuthenticity.Verified,
- PayloadTaint = PayloadTaint.Public,
- SourceKind = "slack"
- },
- Filter = OutputFilter.Text | OutputFilter.Files
- },
- materializer: _materializer,
- cancellationToken: initCts.Token);
-
- var inputQueue = Source.Channel(512, true)
- .ToMaterialized(materialized.Input, Keep.Left)
- .Run(_materializer);
-
- var generation = ++_pipelineGeneration;
- var outputCompletion = materialized.Output
- .ToMaterialized(
- Sink.ForEach(output => self.Tell(new ThreadOutput(output))),
- Keep.Right)
- .Run(_materializer);
-
- _ = outputCompletion.ContinueWith(t =>
- {
- var cause = t.IsFaulted ? t.Exception?.GetBaseException() : null;
- self.Tell(new OutputStreamTerminated(generation, cause));
- },
- CancellationToken.None,
- TaskContinuationOptions.ExecuteSynchronously,
- TaskScheduler.Default);
-
- _session = materialized;
- _inputQueue = inputQueue;
-
- _log.Info("Slack thread binding pipeline initialized");
+ BuildOptions(),
+ output => self.Tell(new ThreadOutput(output)),
+ (gen, cause) => self.Tell(new OutputStreamTerminated(gen, cause)),
+ initCts.Token);
}
private async Task BuildInputForInboundAsync(
@@ -1035,40 +1001,12 @@ private static List MergeGapWithLiveContents(
private async Task ReinitializePipelineAsync(string reason)
{
- if (_isReinitializing)
- return;
-
- _isReinitializing = true;
- try
- {
- _log.Warning("Reinitializing Slack thread pipeline: {Reason}", reason);
-
- _inputQueue?.TryComplete();
- _inputQueue = null;
-
- if (_session is not null)
- {
- await _session.DisposeAsync();
- _session = null;
- }
-
- _materializer?.Dispose();
- _materializer = null;
-
- await EnsureInitializedAsync();
- }
- catch (Exception ex)
- {
- _log.Error(ex, "Slack thread pipeline reinitialization failed; scheduling retry");
- Timers.StartSingleTimer(
+ await _handle.ReinitializeAsync(
+ reason,
+ () => Timers.StartSingleTimer(
ReinitializeTimerKey,
new ReinitializePipeline("retry after failed reinit"),
- TimeSpan.FromSeconds(2));
- }
- finally
- {
- _isReinitializing = false;
- }
+ TimeSpan.FromSeconds(2)));
}
private async Task HandleOutputAsync(ThreadOutput threadOutput)
diff --git a/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs b/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs
index f66a27b5..09a87f6f 100644
--- a/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs
+++ b/src/Netclaw.Channels.Slack/Webhooks/SlackWebhookPayloadBuilder.cs
@@ -71,11 +71,11 @@ private static List