diff --git a/src/Logsmith/LogManager.cs b/src/Logsmith/LogManager.cs index cc9a4e3..e797eef 100644 --- a/src/Logsmith/LogManager.cs +++ b/src/Logsmith/LogManager.cs @@ -84,6 +84,35 @@ public static void Shutdown(TimeSpan? timeout = null) ShutdownAsync(timeout).AsTask().GetAwaiter().GetResult(); } + public static async ValueTask FlushAsync(TimeSpan? timeout = null) + { + var config = _config; + if (config is null) return; + + CancellationTokenSource? cts = timeout.HasValue + ? new CancellationTokenSource(timeout.Value) + : null; + + try + { + var token = cts?.Token ?? CancellationToken.None; + var tasks = new List(); + + foreach (var sink in config.Sinks.AllSinks) + { + if (sink is IFlushableLogSink flushable) + tasks.Add(flushable.FlushAsync(token).AsTask()); + } + + if (tasks.Count > 0) + await Task.WhenAll(tasks); + } + finally + { + cts?.Dispose(); + } + } + public static bool IsEnabled(LogLevel level) { var config = _config; diff --git a/src/Logsmith/Sinks/BufferedLogSink.cs b/src/Logsmith/Sinks/BufferedLogSink.cs index 910289a..48a7586 100644 --- a/src/Logsmith/Sinks/BufferedLogSink.cs +++ b/src/Logsmith/Sinks/BufferedLogSink.cs @@ -3,12 +3,19 @@ namespace Logsmith.Sinks; -public abstract class BufferedLogSink : ILogSink, IAsyncDisposable +public abstract class BufferedLogSink : ILogSink, IFlushableLogSink, IAsyncDisposable { protected readonly record struct BufferedEntry( LogEntry Entry, byte[] Utf8MessageBuffer, - int Utf8MessageLength); + int Utf8MessageLength, + TaskCompletionSource? FlushCompletion = null) + { + internal bool IsFlushSentinel => FlushCompletion is not null; + + internal static BufferedEntry CreateFlushSentinel(TaskCompletionSource tcs) + => new(default, Array.Empty(), 0, tcs); + }; protected LogLevel MinimumLevel { get; } @@ -45,12 +52,29 @@ public void Write(in LogEntry entry, ReadOnlySpan utf8Message) protected abstract Task WriteBufferedAsync(BufferedEntry entry, CancellationToken ct); + public ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var sentinel = BufferedEntry.CreateFlushSentinel(tcs); + + if (!_channel.Writer.TryWrite(sentinel)) + tcs.SetResult(); // channel completed or full — nothing to flush + + return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); + } + private async Task DrainAsync() { try { await foreach (var entry in _channel.Reader.ReadAllAsync(_cts.Token)) { + if (entry.IsFlushSentinel) + { + entry.FlushCompletion!.SetResult(); + continue; + } + try { await WriteBufferedAsync(entry, _cts.Token); diff --git a/src/Logsmith/Sinks/IFlushableLogSink.cs b/src/Logsmith/Sinks/IFlushableLogSink.cs new file mode 100644 index 0000000..f276927 --- /dev/null +++ b/src/Logsmith/Sinks/IFlushableLogSink.cs @@ -0,0 +1,6 @@ +namespace Logsmith; + +public interface IFlushableLogSink : ILogSink +{ + ValueTask FlushAsync(CancellationToken cancellationToken = default); +} diff --git a/tests/Logsmith.Tests/FlushTests.cs b/tests/Logsmith.Tests/FlushTests.cs new file mode 100644 index 0000000..65bbadc --- /dev/null +++ b/tests/Logsmith.Tests/FlushTests.cs @@ -0,0 +1,203 @@ +using System.Text; +using Logsmith.Formatting; +using Logsmith.Sinks; + +namespace Logsmith.Tests; + +[TestFixture] +public class FlushTests +{ + [SetUp] + public void SetUp() => LogManager.Reset(); + + [TearDown] + public void TearDown() => LogManager.Reset(); + + [Test] + public async Task FlushAsync_EntriesBeforeFlush_AreWritten() + { + using var ms = new MemoryStream(); + var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true); + + var entry = MakeEntry(); + sink.Write(in entry, "before-flush"u8); + + await sink.FlushAsync(); + + var content = Encoding.UTF8.GetString(ms.ToArray()); + Assert.That(content, Does.Contain("before-flush")); + + await sink.DisposeAsync(); + } + + [Test] + public async Task FlushAsync_EntriesAfterFlush_AreNotBlocked() + { + using var ms = new MemoryStream(); + var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true); + + var entry = MakeEntry(); + sink.Write(in entry, "before"u8); + await sink.FlushAsync(); + + sink.Write(in entry, "after"u8); + await sink.DisposeAsync(); + + var content = Encoding.UTF8.GetString(ms.ToArray()); + Assert.That(content, Does.Contain("before")); + Assert.That(content, Does.Contain("after")); + } + + [Test] + public async Task FlushAsync_WithTimeout_CancelsCleanly() + { + var sink = new SlowBufferedSink(writeDelay: TimeSpan.FromSeconds(10)); + + var entry = MakeEntry(); + sink.Write(in entry, "slow-entry"u8); + + Assert.CatchAsync(async () => + await sink.FlushAsync(new CancellationTokenSource(TimeSpan.FromMilliseconds(100)).Token)); + + await sink.DisposeAsync(); + } + + [Test] + public async Task FlushAsync_EmptyChannel_CompletesImmediately() + { + using var ms = new MemoryStream(); + var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true); + + // Flush with nothing enqueued — should complete instantly + var flushTask = sink.FlushAsync().AsTask(); + var completed = await Task.WhenAny(flushTask, Task.Delay(TimeSpan.FromSeconds(5))); + + Assert.That(completed, Is.SameAs(flushTask)); + + await sink.DisposeAsync(); + } + + [Test] + public async Task FlushAsync_AfterDispose_CompletesImmediately() + { + using var ms = new MemoryStream(); + var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true); + + await sink.DisposeAsync(); + + // Channel is completed — flush should complete immediately + var flushTask = sink.FlushAsync().AsTask(); + var completed = await Task.WhenAny(flushTask, Task.Delay(TimeSpan.FromSeconds(5))); + + Assert.That(completed, Is.SameAs(flushTask)); + } + + [Test] + public async Task LogManager_FlushAsync_SkipsNonFlushableSinks() + { + var recording = new RecordingSink(); + LogManager.Initialize(c => c.AddSink(recording)); + + DispatchTestMessage(LogLevel.Information, "test"); + + // RecordingSink does not implement IFlushableLogSink — should not throw + Assert.DoesNotThrowAsync(async () => await LogManager.FlushAsync()); + } + + [Test] + public async Task LogManager_FlushAsync_FlushesBufferedSinks() + { + using var ms = new MemoryStream(); + LogManager.Initialize(c => + c.AddStreamSink(ms, leaveOpen: true, formatter: NullLogFormatter.Instance)); + + DispatchTestMessage(LogLevel.Information, "flush-via-manager"); + + await LogManager.FlushAsync(); + + var content = Encoding.UTF8.GetString(ms.ToArray()); + Assert.That(content, Does.Contain("flush-via-manager")); + } + + [Test] + public async Task LogManager_FlushAsync_WithTimeout_CompletesOrCancels() + { + using var ms = new MemoryStream(); + LogManager.Initialize(c => + c.AddStreamSink(ms, leaveOpen: true, formatter: NullLogFormatter.Instance)); + + DispatchTestMessage(LogLevel.Information, "timed-flush"); + + // Short timeout — should still complete since the stream sink is fast + await LogManager.FlushAsync(TimeSpan.FromSeconds(5)); + + var content = Encoding.UTF8.GetString(ms.ToArray()); + Assert.That(content, Does.Contain("timed-flush")); + } + + [Test] + public async Task LogManager_FlushAsync_WhenNotInitialized_IsNoOp() + { + // No Initialize called — should not throw + Assert.DoesNotThrowAsync(async () => await LogManager.FlushAsync()); + } + + [Test] + public async Task FlushAsync_MultipleFlushes_AllComplete() + { + using var ms = new MemoryStream(); + var sink = new StreamSink(ms, formatter: NullLogFormatter.Instance, leaveOpen: true); + + var entry = MakeEntry(); + sink.Write(in entry, "msg1"u8); + await sink.FlushAsync(); + + sink.Write(in entry, "msg2"u8); + await sink.FlushAsync(); + + sink.Write(in entry, "msg3"u8); + await sink.FlushAsync(); + + var content = Encoding.UTF8.GetString(ms.ToArray()); + Assert.That(content, Does.Contain("msg1")); + Assert.That(content, Does.Contain("msg2")); + Assert.That(content, Does.Contain("msg3")); + + await sink.DisposeAsync(); + } + + private static LogEntry MakeEntry() => new( + LogLevel.Information, 1, DateTime.UtcNow.Ticks, "Test"); + + private static void DispatchTestMessage(LogLevel level, string message, string category = "Test") + { + if (!LogManager.IsEnabled(level, category)) + return; + + var entry = new LogEntry( + level: level, + eventId: 1, + timestampTicks: DateTime.UtcNow.Ticks, + category: category); + + var utf8 = Encoding.UTF8.GetBytes(message).AsSpan(); + + LogManager.Dispatch(in entry, utf8, 0, static (writer, state) => { }); + } + + private sealed class SlowBufferedSink : BufferedLogSink + { + private readonly TimeSpan _writeDelay; + + public SlowBufferedSink(TimeSpan writeDelay) + : base(LogLevel.Trace) + { + _writeDelay = writeDelay; + } + + protected override async Task WriteBufferedAsync(BufferedEntry entry, CancellationToken ct) + { + await Task.Delay(_writeDelay, ct); + } + } +}