Skip to content
203 changes: 203 additions & 0 deletions src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;
using Netclaw.Tools;
using Xunit;

namespace Netclaw.Actors.Tests.Channels;

public sealed class ExecutionOutputAccumulatorTests
{
private static readonly SessionId TestSessionId = new("test/session");
private static readonly ToolName TestNotifyTool = new("send_slack_message");

[Fact]
public void TextDeltaOutput_accumulates_text()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

var action1 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "Hello " });
var action2 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "world" });

Assert.Equal(OutputAction.Continue, action1);
Assert.Equal(OutputAction.Continue, action2);
Assert.Equal("Hello world", acc.GetAccumulatedText());
}

[Fact]
public void TextOutput_accumulates_when_no_prior_delta()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "Full text" });

Assert.Equal("Full text", acc.GetAccumulatedText());
}

[Fact]
public void TextOutput_ignored_after_TextDeltaOutput()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "streamed" });
acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "assembled" });

Assert.Equal("streamed", acc.GetAccumulatedText());
}

[Fact]
public void TurnCompleted_returns_TurnCompleted_action()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

var action = acc.ProcessOutput(new TurnCompleted { SessionId = TestSessionId, TurnNumber = 1 });

Assert.Equal(OutputAction.TurnCompleted, action);
}

[Fact]
public void ErrorOutput_returns_Error_action_and_stores_details()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);
var cause = new InvalidOperationException("inner");

var action = acc.ProcessOutput(new ErrorOutput
{
SessionId = TestSessionId,
Message = "Something failed",
Category = ErrorCategory.ProviderFailure,
Cause = cause
});

Assert.Equal(OutputAction.Error, action);
Assert.Equal("Something failed", acc.LastErrorMessage);
Assert.Equal(ErrorCategory.ProviderFailure, acc.LastErrorCategory);
Assert.Same(cause, acc.LastErrorCause);
}

[Fact]
public void BufferFlush_returns_Continue()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

var action = acc.ProcessOutput(new BufferFlush { SessionId = TestSessionId });

Assert.Equal(OutputAction.Continue, action);
}

[Fact]
public void Tracks_successful_notification_tool_result()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-1",
ToolName = "send_slack_message",
Result = "Message sent to channel C1."
});

Assert.True(acc.NotifyAttempted);
Assert.False(acc.NotifyFailed);
}

[Fact]
public void Tracks_failed_notification_tool_result()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-2",
ToolName = "send_slack_message",
Result = "Error: channel not found"
});

Assert.True(acc.NotifyAttempted);
Assert.True(acc.NotifyFailed);
}

[Fact]
public void Ignores_non_notification_tool_results()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-3",
ToolName = "web_search",
Result = "Found 5 results"
});

Assert.False(acc.NotifyAttempted);
}

// ── BuildNotifyFailureMessage tests ──────────────────────────────────────

[Fact]
public void No_failure_when_no_notify_instructions()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: false, NotificationPolicy.Required);

Assert.Null(result);
}

[Fact]
public void Required_policy_fails_when_no_notification_sent()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required);

Assert.Contains("no notification tool was invoked", result);
}

[Fact]
public void Conditional_policy_succeeds_when_no_notification_sent()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional);

Assert.Null(result);
}

[Fact]
public void Succeeds_when_notification_attempted_and_succeeded()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);
acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-ok",
ToolName = "send_slack_message",
Result = "Message sent."
});

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required);

Assert.Null(result);
}

[Fact]
public void Fails_when_notification_attempted_and_errored()
{
var acc = new ExecutionOutputAccumulator(TestNotifyTool);
acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-err",
ToolName = "send_slack_message",
Result = "Error: channel not found"
});

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional);

Assert.Contains("channel not found", result);
}
}
7 changes: 7 additions & 0 deletions src/Netclaw.Actors.Tests/Channels/SlackActorHierarchyTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.Hosting;
using Akka.Hosting.TestKit;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -15,6 +16,12 @@ namespace Netclaw.Actors.Tests.Channels;

public sealed class SlackActorHierarchyTests(ITestOutputHelper output) : TestKit(output: output)
{
// Raise the default ExpectMsg timeout from 3s to 5s to prevent flaky failures
// under CI ThreadPool pressure (multiple test classes spin up parallel IHost +
// ActorSystem instances, competing for ThreadPool threads).
protected override Config? Config =>
ConfigurationFactory.ParseString("akka.test.default-timeout = 5s");

protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services)
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Akka.Streams;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;

namespace Netclaw.Actors.Tests.Channels.TestHelpers;

/// <summary>
/// Fake <see cref="ISessionPipeline"/> that throws a pre-configured exception
/// on <see cref="CreateAsync"/>. Used to test initialization failure paths.
/// </summary>
internal sealed class FailingSessionPipeline(Exception exception) : ISessionPipeline
{
public Task<MaterializedSession> CreateAsync(
SessionId sessionId,
SessionPipelineOptions options,
IMaterializer? materializer = null,
CancellationToken cancellationToken = default) =>
throw exception;

public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) =>
Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;

namespace Netclaw.Actors.Tests.Channels.TestHelpers;

/// <summary>
/// Fake <see cref="ISessionPipeline"/> that replays a scripted sequence of
/// <see cref="SessionOutput"/> events. Input is discarded. Used by
/// execution actor tests (reminders, webhooks) and pipeline handle tests.
/// </summary>
internal sealed class ScriptedSessionPipeline(
Func<SessionId, IReadOnlyList<SessionOutput>> outputFactory) : ISessionPipeline
{
public SessionPipelineOptions? CapturedOptions { get; private set; }

public Task<MaterializedSession> CreateAsync(
SessionId sessionId,
SessionPipelineOptions options,
IMaterializer? materializer = null,
CancellationToken cancellationToken = default)
{
CapturedOptions = options;

var killSwitch = KillSwitches.Shared($"scripted-{sessionId.Value}");

var input = Sink.Ignore<ChannelInput>()
.MapMaterializedValue<NotUsed>(_ => NotUsed.Instance);

var output = Source.From(outputFactory(sessionId).ToList())
.Via(killSwitch.Flow<SessionOutput>());

return Task.FromResult(new MaterializedSession(input, output, killSwitch));
}

public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) =>
Task.CompletedTask;
}
20 changes: 9 additions & 11 deletions src/Netclaw.Actors.Tests/Sessions/ErrorCorrelationTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Hosting;
using Akka.Hosting.TestKit;
Expand Down Expand Up @@ -178,18 +177,17 @@ public Task<ChatResponse> GetResponseAsync(
CancellationToken cancellationToken = default)
=> Task.FromException<ChatResponse>(new NotSupportedException("Streaming path only."));

public async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
public IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
IEnumerable<ChatMessage> messages,
ChatOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.Yield();
// GetException() returns non-null, but the compiler cannot prove it,
// so the null check keeps yield break reachable and avoids CS0162.
var ex = GetException();
if (ex is not null) throw ex;
yield break;
}
CancellationToken cancellationToken = default)
// Throw synchronously so the LlmCallFailed message is enqueued in
// the actor's mailbox before the actor finishes processing the
// SendUserMessage. Using Task.Yield() here defers the exception to
// the thread pool, creating a race where LlmCallFailed delivery
// depends on thread pool scheduling and can exceed ExpectMsgAsync
// timeouts under load.
=> throw GetException();

private Exception GetException() => ThrowTimeout
? (Exception)new TimeoutException("Simulated provider timeout")
Expand Down
2 changes: 1 addition & 1 deletion src/Netclaw.Actors/Channels/ChannelPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private static SendUserMessage MapToCommand(
NetclawPaths paths)
{
var turnId = string.IsNullOrWhiteSpace(input.MessageId)
? Guid.NewGuid().ToString("N")[..8]
? IdGen.ShortId()
: input.MessageId!;

var textParts = input.Contents.OfType<TextContent>().Select(t => t.Text);
Expand Down
Loading
Loading