Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/Logsmith/LogManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,35 @@ public static void Shutdown(TimeSpan? timeout = null)
ShutdownAsync(timeout).AsTask().GetAwaiter().GetResult();
}

public static async ValueTask FlushAsync(TimeSpan? timeout = null)
{
var config = _config;
if (config is null) return;

CancellationTokenSource? cts = timeout.HasValue
? new CancellationTokenSource(timeout.Value)
: null;

try
{
var token = cts?.Token ?? CancellationToken.None;
var tasks = new List<Task>();

foreach (var sink in config.Sinks.AllSinks)
{
if (sink is IFlushableLogSink flushable)
tasks.Add(flushable.FlushAsync(token).AsTask());
}

if (tasks.Count > 0)
await Task.WhenAll(tasks);
}
finally
{
cts?.Dispose();
}
}

public static bool IsEnabled(LogLevel level)
{
var config = _config;
Expand Down
28 changes: 26 additions & 2 deletions src/Logsmith/Sinks/BufferedLogSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@

namespace Logsmith.Sinks;

public abstract class BufferedLogSink : ILogSink, IAsyncDisposable
public abstract class BufferedLogSink : ILogSink, IFlushableLogSink, IAsyncDisposable
{
protected readonly record struct BufferedEntry(
LogEntry Entry,
byte[] Utf8MessageBuffer,
int Utf8MessageLength);
int Utf8MessageLength,
TaskCompletionSource? FlushCompletion = null)
{
internal bool IsFlushSentinel => FlushCompletion is not null;

internal static BufferedEntry CreateFlushSentinel(TaskCompletionSource tcs)
=> new(default, Array.Empty<byte>(), 0, tcs);
};

protected LogLevel MinimumLevel { get; }

Expand Down Expand Up @@ -45,12 +52,29 @@ public void Write(in LogEntry entry, ReadOnlySpan<byte> utf8Message)

protected abstract Task WriteBufferedAsync(BufferedEntry entry, CancellationToken ct);

public ValueTask FlushAsync(CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var sentinel = BufferedEntry.CreateFlushSentinel(tcs);

if (!_channel.Writer.TryWrite(sentinel))
tcs.SetResult(); // channel completed or full — nothing to flush

return new ValueTask(tcs.Task.WaitAsync(cancellationToken));
}

private async Task DrainAsync()
{
try
{
await foreach (var entry in _channel.Reader.ReadAllAsync(_cts.Token))
{
if (entry.IsFlushSentinel)
{
entry.FlushCompletion!.SetResult();
continue;
}

try
{
await WriteBufferedAsync(entry, _cts.Token);
Expand Down
6 changes: 6 additions & 0 deletions src/Logsmith/Sinks/IFlushableLogSink.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Logsmith;

public interface IFlushableLogSink : ILogSink
{
ValueTask FlushAsync(CancellationToken cancellationToken = default);
}
203 changes: 203 additions & 0 deletions tests/Logsmith.Tests/FlushTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
using System.Text;
using Logsmith.Formatting;
using Logsmith.Sinks;

namespace Logsmith.Tests;

[TestFixture]
public class FlushTests
{
[SetUp]
public void SetUp() => LogManager.Reset();

[TearDown]
public void TearDown() => LogManager.Reset();

[Test]
public async Task FlushAsync_EntriesBeforeFlush_AreWritten()
{
using var ms = new MemoryStream();
var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true);

var entry = MakeEntry();
sink.Write(in entry, "before-flush"u8);

await sink.FlushAsync();

var content = Encoding.UTF8.GetString(ms.ToArray());
Assert.That(content, Does.Contain("before-flush"));

await sink.DisposeAsync();
}

[Test]
public async Task FlushAsync_EntriesAfterFlush_AreNotBlocked()
{
using var ms = new MemoryStream();
var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true);

var entry = MakeEntry();
sink.Write(in entry, "before"u8);
await sink.FlushAsync();

sink.Write(in entry, "after"u8);
await sink.DisposeAsync();

var content = Encoding.UTF8.GetString(ms.ToArray());
Assert.That(content, Does.Contain("before"));
Assert.That(content, Does.Contain("after"));
}

[Test]
public async Task FlushAsync_WithTimeout_CancelsCleanly()
{
var sink = new SlowBufferedSink(writeDelay: TimeSpan.FromSeconds(10));

var entry = MakeEntry();
sink.Write(in entry, "slow-entry"u8);

Assert.CatchAsync<OperationCanceledException>(async () =>
await sink.FlushAsync(new CancellationTokenSource(TimeSpan.FromMilliseconds(100)).Token));

await sink.DisposeAsync();
}

[Test]
public async Task FlushAsync_EmptyChannel_CompletesImmediately()
{
using var ms = new MemoryStream();
var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true);

// Flush with nothing enqueued — should complete instantly
var flushTask = sink.FlushAsync().AsTask();
var completed = await Task.WhenAny(flushTask, Task.Delay(TimeSpan.FromSeconds(5)));

Assert.That(completed, Is.SameAs(flushTask));

await sink.DisposeAsync();
}

[Test]
public async Task FlushAsync_AfterDispose_CompletesImmediately()
{
using var ms = new MemoryStream();
var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true);

await sink.DisposeAsync();

// Channel is completed — flush should complete immediately
var flushTask = sink.FlushAsync().AsTask();
var completed = await Task.WhenAny(flushTask, Task.Delay(TimeSpan.FromSeconds(5)));

Assert.That(completed, Is.SameAs(flushTask));
}

[Test]
public async Task LogManager_FlushAsync_SkipsNonFlushableSinks()
{
var recording = new RecordingSink();
LogManager.Initialize(c => c.AddSink(recording));

DispatchTestMessage(LogLevel.Information, "test");

// RecordingSink does not implement IFlushableLogSink — should not throw
Assert.DoesNotThrowAsync(async () => await LogManager.FlushAsync());
}

[Test]
public async Task LogManager_FlushAsync_FlushesBufferedSinks()
{
using var ms = new MemoryStream();
LogManager.Initialize(c =>
c.AddStreamSink(ms, leaveOpen: true, formatter: NullLogFormatter.Instance));

DispatchTestMessage(LogLevel.Information, "flush-via-manager");

await LogManager.FlushAsync();

var content = Encoding.UTF8.GetString(ms.ToArray());
Assert.That(content, Does.Contain("flush-via-manager"));
}

[Test]
public async Task LogManager_FlushAsync_WithTimeout_CompletesOrCancels()
{
using var ms = new MemoryStream();
LogManager.Initialize(c =>
c.AddStreamSink(ms, leaveOpen: true, formatter: NullLogFormatter.Instance));

DispatchTestMessage(LogLevel.Information, "timed-flush");

// Short timeout — should still complete since the stream sink is fast
await LogManager.FlushAsync(TimeSpan.FromSeconds(5));

var content = Encoding.UTF8.GetString(ms.ToArray());
Assert.That(content, Does.Contain("timed-flush"));
}

[Test]
public async Task LogManager_FlushAsync_WhenNotInitialized_IsNoOp()
{
// No Initialize called — should not throw
Assert.DoesNotThrowAsync(async () => await LogManager.FlushAsync());
}

[Test]
public async Task FlushAsync_MultipleFlushes_AllComplete()
{
using var ms = new MemoryStream();
var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true);

var entry = MakeEntry();
sink.Write(in entry, "msg1"u8);
await sink.FlushAsync();

sink.Write(in entry, "msg2"u8);
await sink.FlushAsync();

sink.Write(in entry, "msg3"u8);
await sink.FlushAsync();

var content = Encoding.UTF8.GetString(ms.ToArray());
Assert.That(content, Does.Contain("msg1"));
Assert.That(content, Does.Contain("msg2"));
Assert.That(content, Does.Contain("msg3"));

await sink.DisposeAsync();
}

private static LogEntry MakeEntry() => new(
LogLevel.Information, 1, DateTime.UtcNow.Ticks, "Test");

private static void DispatchTestMessage(LogLevel level, string message, string category = "Test")
{
if (!LogManager.IsEnabled(level, category))
return;

var entry = new LogEntry(
level: level,
eventId: 1,
timestampTicks: DateTime.UtcNow.Ticks,
category: category);

var utf8 = Encoding.UTF8.GetBytes(message).AsSpan();

LogManager.Dispatch(in entry, utf8, 0, static (writer, state) => { });
}

private sealed class SlowBufferedSink : BufferedLogSink
{
private readonly TimeSpan _writeDelay;

public SlowBufferedSink(TimeSpan writeDelay)
: base(LogLevel.Trace)
{
_writeDelay = writeDelay;
}

protected override async Task WriteBufferedAsync(BufferedEntry entry, CancellationToken ct)
{
await Task.Delay(_writeDelay, ct);
}
}
}
Loading