diff --git a/src/StatsdClient/Bufferize/StatsBufferize.cs b/src/StatsdClient/Bufferize/StatsBufferize.cs index 278b89ad..3483bacf 100644 --- a/src/StatsdClient/Bufferize/StatsBufferize.cs +++ b/src/StatsdClient/Bufferize/StatsBufferize.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using StatsdClient.Statistic; using StatsdClient.Worker; @@ -45,6 +46,8 @@ public void Dispose() this._worker.Dispose(); } + public Task DisposeAsync() => this._worker.DisposeAsync(); + private class WorkerHandler : IAsynchronousWorkerHandler { private readonly StatsRouter _statsRouter; diff --git a/src/StatsdClient/DogStatsdService.cs b/src/StatsdClient/DogStatsdService.cs index 79c11edb..c0bf5c2c 100644 --- a/src/StatsdClient/DogStatsdService.cs +++ b/src/StatsdClient/DogStatsdService.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Globalization; +using System.Threading.Tasks; using StatsdClient.Bufferize; namespace StatsdClient @@ -299,5 +300,20 @@ public void Dispose() _statsdData?.Dispose(); _statsdData = null; } + + /// + /// Asynchronously disposes an instance of DogStatsdService. + /// Flushes all metrics. + /// + /// A task that represents the asynchronous dispose operation. + public async Task DisposeAsync() + { + var statsdData = _statsdData; + if (statsdData != null) + { + _statsdData = null; + await statsdData.DisposeAsync().ConfigureAwait(false); + } + } } } diff --git a/src/StatsdClient/IDogStatsd.cs b/src/StatsdClient/IDogStatsd.cs index a5d1195a..3fe15b14 100644 --- a/src/StatsdClient/IDogStatsd.cs +++ b/src/StatsdClient/IDogStatsd.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace StatsdClient { @@ -200,5 +201,12 @@ void ServiceCheck( /// /// The value indicating whether the telemetry must be flushed. void Flush(bool flushTelemetry = true); + + /// + /// Asynchronously disposes the instance. + /// Flushes all metrics. + /// + /// A task that represents the asynchronous dispose operation. + Task DisposeAsync(); } } diff --git a/src/StatsdClient/StatsdData.cs b/src/StatsdClient/StatsdData.cs index f8ba7610..22cf8557 100644 --- a/src/StatsdClient/StatsdData.cs +++ b/src/StatsdClient/StatsdData.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using StatsdClient.Bufferize; using StatsdClient.Transport; @@ -7,7 +8,7 @@ namespace StatsdClient internal class StatsdData : IDisposable { private ITransport _transport; - private StatsBufferize _statsBufferize; + private StatsBufferize _statsBufferSize; public StatsdData( MetricsSender metricsSender, @@ -17,7 +18,7 @@ public StatsdData( { MetricsSender = metricsSender; Telemetry = telemetry; - _statsBufferize = statsBufferize; + _statsBufferSize = statsBufferize; _transport = transport; } @@ -27,7 +28,7 @@ public StatsdData( public void Flush(bool flushTelemetry) { - _statsBufferize?.Flush(); + _statsBufferSize?.Flush(); if (flushTelemetry) { Telemetry.Flush(); @@ -42,8 +43,29 @@ public void Dispose() Telemetry?.Dispose(); Telemetry = null; - _statsBufferize?.Dispose(); - _statsBufferize = null; + _statsBufferSize?.Dispose(); + _statsBufferSize = null; + + _transport?.Dispose(); + _transport = null; + + MetricsSender = null; + } + + public async Task DisposeAsync() + { + // _statsBufferize and _telemetry must be disposed before _statsSender to make + // sure _statsSender does not receive data when it is already disposed. + + Telemetry?.Dispose(); + Telemetry = null; + + var statsBufferSize = _statsBufferSize; + if (statsBufferSize != null) + { + await _statsBufferSize.DisposeAsync().ConfigureAwait(false); + _statsBufferSize = null; + } _transport?.Dispose(); _transport = null; diff --git a/src/StatsdClient/Worker/AsynchronousWorker.cs b/src/StatsdClient/Worker/AsynchronousWorker.cs index ed4f72af..dfd8ec4d 100644 --- a/src/StatsdClient/Worker/AsynchronousWorker.cs +++ b/src/StatsdClient/Worker/AsynchronousWorker.cs @@ -85,6 +85,26 @@ public void Dispose() } } + public async Task DisposeAsync() + { + if (!_terminate) + { + Flush(); + _terminate = true; + try + { + await Task.WhenAll(_workers).ConfigureAwait(false); + _flushEvent.Dispose(); + } + catch (Exception e) + { + _optionalExceptionHandler?.Invoke(e); + } + + _workers.Clear(); + } + } + private void Dequeue() { var waitDuration = MinWaitDuration; diff --git a/tests/StatsdClient.Tests/AsyncDisposalTests.cs b/tests/StatsdClient.Tests/AsyncDisposalTests.cs new file mode 100644 index 00000000..516a1ab6 --- /dev/null +++ b/tests/StatsdClient.Tests/AsyncDisposalTests.cs @@ -0,0 +1,125 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using StatsdClient; +using Stopwatch = System.Diagnostics.Stopwatch; + +namespace Tests +{ + [TestFixture] + public class AsyncDisposalTests + { + /// + /// Manual stress / demo. Runs only when invoked explicitly. + /// + [Test] + [Explicit("Manual benchmark for PR docs - demonstrates async vs sync under starvation")] + [Timeout(120000)] + public async Task CompareSyncVsAsync_UnderStarvation() + { + ThreadPool.GetMinThreads(out var origMin, out var origMinIO); + ThreadPool.GetMaxThreads(out var origMax, out var origMaxIO); + + try + { + ThreadPool.SetMinThreads(2, 2); + ThreadPool.SetMaxThreads(4, 4); + + long syncTime; + long asyncTime; + bool syncCompleted; + bool asyncCompleted; + + // --- sync --- + { + var dogstatsd = new DogStatsdService(); + dogstatsd.Configure(new StatsdConfig + { + StatsdServerName = "127.0.0.1", + StatsdPort = 4321, + }); + + var releaseSignal = new ManualResetEventSlim(false); + var tasks = new List(); + + for (var i = 0; i < 10; i++) + { + tasks.Add(Task.Run(() => + { + dogstatsd.Increment("test.metric"); + releaseSignal.Wait(15000); + })); + } + + Thread.Sleep(500); + + var sw = Stopwatch.StartNew(); + var disposeThread = new Thread(dogstatsd.Dispose); + disposeThread.Start(); + syncCompleted = disposeThread.Join(10000); + sw.Stop(); + syncTime = sw.ElapsedMilliseconds; + + releaseSignal.Set(); + + if (!syncCompleted) + { + disposeThread.Join(5000); + } + + Task.WaitAll(tasks.ToArray(), 5000); + releaseSignal.Dispose(); + } + + await Task.Delay(500); + + // --- async --- + { + var dogstatsd = new DogStatsdService(); + dogstatsd.Configure(new StatsdConfig + { + StatsdServerName = "127.0.0.1", + StatsdPort = 4321, + }); + + var releaseSignal = new ManualResetEventSlim(false); + var tasks = new List(); + + for (var i = 0; i < 10; i++) + { + tasks.Add(Task.Run(() => + { + dogstatsd.Increment("test.metric"); + releaseSignal.Wait(15000); + })); + } + + await Task.Delay(500); + + var sw = Stopwatch.StartNew(); + var disposeTask = dogstatsd.DisposeAsync(); + asyncCompleted = await Task.WhenAny(disposeTask, Task.Delay(10000)) == disposeTask; + sw.Stop(); + asyncTime = sw.ElapsedMilliseconds; + + releaseSignal.Set(); + await Task.WhenAll(tasks); + releaseSignal.Dispose(); + } + + TestContext.WriteLine("=== THREAD POOL STARVATION BENCHMARK ==="); + TestContext.WriteLine("Thread pool: min=2, max=4"); + TestContext.WriteLine("Blocking tasks: 10"); + TestContext.WriteLine(); + TestContext.WriteLine($"SYNC Dispose: {(syncCompleted ? $"{syncTime}ms" : $"blocked (>{syncTime}ms)")}"); + TestContext.WriteLine($"ASYNC Dispose: {(asyncCompleted ? $"{asyncTime}ms" : $"blocked (>{asyncTime}ms)")}"); + } + finally + { + ThreadPool.SetMinThreads(origMin, origMinIO); + ThreadPool.SetMaxThreads(origMax, origMaxIO); + } + } + } +}