Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions src/Netclaw.Actors.Tests/Channels/ExecutionOutputAccumulatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;
using Xunit;

namespace Netclaw.Actors.Tests.Channels;

public sealed class ExecutionOutputAccumulatorTests
{
private static readonly SessionId TestSessionId = new("test/session");

[Fact]
public void TextDeltaOutput_accumulates_text()
{
var acc = new ExecutionOutputAccumulator();

var action1 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "Hello " });
var action2 = acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "world" });

Assert.Equal(OutputAction.Continue, action1);
Assert.Equal(OutputAction.Continue, action2);
Assert.Equal("Hello world", acc.GetAccumulatedText());
}

[Fact]
public void TextOutput_accumulates_when_no_prior_delta()
{
var acc = new ExecutionOutputAccumulator();

acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "Full text" });

Assert.Equal("Full text", acc.GetAccumulatedText());
}

[Fact]
public void TextOutput_ignored_after_TextDeltaOutput()
{
var acc = new ExecutionOutputAccumulator();

acc.ProcessOutput(new TextDeltaOutput { SessionId = TestSessionId, Delta = "streamed" });
acc.ProcessOutput(new TextOutput { SessionId = TestSessionId, Text = "assembled" });

Assert.Equal("streamed", acc.GetAccumulatedText());
}

[Fact]
public void TurnCompleted_returns_TurnCompleted_action()
{
var acc = new ExecutionOutputAccumulator();

var action = acc.ProcessOutput(new TurnCompleted { SessionId = TestSessionId, TurnNumber = 1 });

Assert.Equal(OutputAction.TurnCompleted, action);
}

[Fact]
public void ErrorOutput_returns_Error_action_and_stores_details()
{
var acc = new ExecutionOutputAccumulator();
var cause = new InvalidOperationException("inner");

var action = acc.ProcessOutput(new ErrorOutput
{
SessionId = TestSessionId,
Message = "Something failed",
Category = ErrorCategory.ProviderFailure,
Cause = cause
});

Assert.Equal(OutputAction.Error, action);
Assert.Equal("Something failed", acc.LastErrorMessage);
Assert.Equal(ErrorCategory.ProviderFailure, acc.LastErrorCategory);
Assert.Same(cause, acc.LastErrorCause);
}

[Fact]
public void BufferFlush_returns_Continue()
{
var acc = new ExecutionOutputAccumulator();

var action = acc.ProcessOutput(new BufferFlush { SessionId = TestSessionId });

Assert.Equal(OutputAction.Continue, action);
}

[Fact]
public void Tracks_successful_notification_tool_result()
{
var acc = new ExecutionOutputAccumulator();

acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-1",
ToolName = "send_slack_message",
Result = "Message sent to channel C1."
});

Assert.True(acc.NotifyAttempted);
Assert.False(acc.NotifyFailed);
}

[Fact]
public void Tracks_failed_notification_tool_result()
{
var acc = new ExecutionOutputAccumulator();

acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-2",
ToolName = "send_slack_message",
Result = "Error: channel not found"
});

Assert.True(acc.NotifyAttempted);
Assert.True(acc.NotifyFailed);
}

[Fact]
public void Ignores_non_notification_tool_results()
{
var acc = new ExecutionOutputAccumulator();

acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-3",
ToolName = "web_search",
Result = "Found 5 results"
});

Assert.False(acc.NotifyAttempted);
}

// ── BuildNotifyFailureMessage tests ──────────────────────────────────────

[Fact]
public void No_failure_when_no_notify_instructions()
{
var acc = new ExecutionOutputAccumulator();

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: false, NotificationPolicy.Required);

Assert.Null(result);
}

[Fact]
public void Required_policy_fails_when_no_notification_sent()
{
var acc = new ExecutionOutputAccumulator();

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required);

Assert.Contains("no notification tool was invoked", result);
}

[Fact]
public void Conditional_policy_succeeds_when_no_notification_sent()
{
var acc = new ExecutionOutputAccumulator();

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional);

Assert.Null(result);
}

[Fact]
public void Succeeds_when_notification_attempted_and_succeeded()
{
var acc = new ExecutionOutputAccumulator();
acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-ok",
ToolName = "send_slack_message",
Result = "Message sent."
});

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Required);

Assert.Null(result);
}

[Fact]
public void Fails_when_notification_attempted_and_errored()
{
var acc = new ExecutionOutputAccumulator();
acc.ProcessOutput(new ToolResultOutput
{
SessionId = TestSessionId,
CallId = "call-err",
ToolName = "send_slack_message",
Result = "Error: channel not found"
});

var result = acc.BuildNotifyFailureMessage(hasNotifyInstructions: true, NotificationPolicy.Conditional);

Assert.Contains("channel not found", result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Akka.Streams;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;

namespace Netclaw.Actors.Tests.Channels.TestHelpers;

/// <summary>
/// Fake <see cref="ISessionPipeline"/> that throws a pre-configured exception
/// on <see cref="CreateAsync"/>. Used to test initialization failure paths.
/// </summary>
internal sealed class FailingSessionPipeline(Exception exception) : ISessionPipeline
{
public Task<MaterializedSession> CreateAsync(
SessionId sessionId,
SessionPipelineOptions options,
IMaterializer? materializer = null,
CancellationToken cancellationToken = default) =>
throw exception;

public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) =>
Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Netclaw.Actors.Channels;
using Netclaw.Actors.Protocol;
using Netclaw.Configuration;

namespace Netclaw.Actors.Tests.Channels.TestHelpers;

/// <summary>
/// Fake <see cref="ISessionPipeline"/> that replays a scripted sequence of
/// <see cref="SessionOutput"/> events. Input is discarded. Used by
/// execution actor tests (reminders, webhooks) and pipeline handle tests.
/// </summary>
internal sealed class ScriptedSessionPipeline(
Func<SessionId, IReadOnlyList<SessionOutput>> outputFactory) : ISessionPipeline
{
public SessionPipelineOptions? CapturedOptions { get; private set; }

public Task<MaterializedSession> CreateAsync(
SessionId sessionId,
SessionPipelineOptions options,
IMaterializer? materializer = null,
CancellationToken cancellationToken = default)
{
CapturedOptions = options;

var killSwitch = KillSwitches.Shared($"scripted-{sessionId.Value}");

var input = Sink.Ignore<ChannelInput>()
.MapMaterializedValue<NotUsed>(_ => NotUsed.Instance);

var output = Source.From(outputFactory(sessionId).ToList())
.Via(killSwitch.Flow<SessionOutput>());

return Task.FromResult(new MaterializedSession(input, output, killSwitch));
}

public Task SendFeedbackAsync(IWithSessionId feedback, CancellationToken ct = default) =>
Task.CompletedTask;
}
2 changes: 1 addition & 1 deletion src/Netclaw.Actors/Channels/ChannelPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private static SendUserMessage MapToCommand(
NetclawPaths paths)
{
var turnId = string.IsNullOrWhiteSpace(input.MessageId)
? Guid.NewGuid().ToString("N")[..8]
? IdGen.ShortId()
: input.MessageId!;

var textParts = input.Contents.OfType<TextContent>().Select(t => t.Text);
Expand Down
Loading
Loading