From af73d201835dbd3a1b617190bf593f027caa14c6 Mon Sep 17 00:00:00 2001 From: Maximo Bautista Date: Wed, 4 Feb 2026 16:44:42 -0500 Subject: [PATCH 1/4] Making needed DisposeAsync methods --- src/StatsdClient/Bufferize/StatsBufferize.cs | 3 ++ src/StatsdClient/DogStatsdService.cs | 16 ++++++++++ src/StatsdClient/IDogStatsd.cs | 8 +++++ src/StatsdClient/StatsdData.cs | 32 ++++++++++++++++--- src/StatsdClient/Worker/AsynchronousWorker.cs | 22 ++++++++++++- 5 files changed, 75 insertions(+), 6 deletions(-) diff --git a/src/StatsdClient/Bufferize/StatsBufferize.cs b/src/StatsdClient/Bufferize/StatsBufferize.cs index 278b89ad..3483bacf 100644 --- a/src/StatsdClient/Bufferize/StatsBufferize.cs +++ b/src/StatsdClient/Bufferize/StatsBufferize.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using StatsdClient.Statistic; using StatsdClient.Worker; @@ -45,6 +46,8 @@ public void Dispose() this._worker.Dispose(); } + public Task DisposeAsync() => this._worker.DisposeAsync(); + private class WorkerHandler : IAsynchronousWorkerHandler { private readonly StatsRouter _statsRouter; diff --git a/src/StatsdClient/DogStatsdService.cs b/src/StatsdClient/DogStatsdService.cs index 79c11edb..c0bf5c2c 100644 --- a/src/StatsdClient/DogStatsdService.cs +++ b/src/StatsdClient/DogStatsdService.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Globalization; +using System.Threading.Tasks; using StatsdClient.Bufferize; namespace StatsdClient @@ -299,5 +300,20 @@ public void Dispose() _statsdData?.Dispose(); _statsdData = null; } + + /// + /// Asynchronously disposes an instance of DogStatsdService. + /// Flushes all metrics. + /// + /// A task that represents the asynchronous dispose operation. + public async Task DisposeAsync() + { + var statsdData = _statsdData; + if (statsdData != null) + { + _statsdData = null; + await statsdData.DisposeAsync().ConfigureAwait(false); + } + } } } diff --git a/src/StatsdClient/IDogStatsd.cs b/src/StatsdClient/IDogStatsd.cs index a5d1195a..3fe15b14 100644 --- a/src/StatsdClient/IDogStatsd.cs +++ b/src/StatsdClient/IDogStatsd.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace StatsdClient { @@ -200,5 +201,12 @@ void ServiceCheck( /// /// The value indicating whether the telemetry must be flushed. void Flush(bool flushTelemetry = true); + + /// + /// Asynchronously disposes the instance. + /// Flushes all metrics. + /// + /// A task that represents the asynchronous dispose operation. + Task DisposeAsync(); } } diff --git a/src/StatsdClient/StatsdData.cs b/src/StatsdClient/StatsdData.cs index f8ba7610..22cf8557 100644 --- a/src/StatsdClient/StatsdData.cs +++ b/src/StatsdClient/StatsdData.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using StatsdClient.Bufferize; using StatsdClient.Transport; @@ -7,7 +8,7 @@ namespace StatsdClient internal class StatsdData : IDisposable { private ITransport _transport; - private StatsBufferize _statsBufferize; + private StatsBufferize _statsBufferSize; public StatsdData( MetricsSender metricsSender, @@ -17,7 +18,7 @@ public StatsdData( { MetricsSender = metricsSender; Telemetry = telemetry; - _statsBufferize = statsBufferize; + _statsBufferSize = statsBufferize; _transport = transport; } @@ -27,7 +28,7 @@ public StatsdData( public void Flush(bool flushTelemetry) { - _statsBufferize?.Flush(); + _statsBufferSize?.Flush(); if (flushTelemetry) { Telemetry.Flush(); @@ -42,8 +43,29 @@ public void Dispose() Telemetry?.Dispose(); Telemetry = null; - _statsBufferize?.Dispose(); - _statsBufferize = null; + _statsBufferSize?.Dispose(); + _statsBufferSize = null; + + _transport?.Dispose(); + _transport = null; + + MetricsSender = null; + } + + public async Task DisposeAsync() + { + // _statsBufferize and _telemetry must be disposed before _statsSender to make + // sure _statsSender does not receive data when it is already disposed. + + Telemetry?.Dispose(); + Telemetry = null; + + var statsBufferSize = _statsBufferSize; + if (statsBufferSize != null) + { + await _statsBufferSize.DisposeAsync().ConfigureAwait(false); + _statsBufferSize = null; + } _transport?.Dispose(); _transport = null; diff --git a/src/StatsdClient/Worker/AsynchronousWorker.cs b/src/StatsdClient/Worker/AsynchronousWorker.cs index ed4f72af..0b67d5a7 100644 --- a/src/StatsdClient/Worker/AsynchronousWorker.cs +++ b/src/StatsdClient/Worker/AsynchronousWorker.cs @@ -85,11 +85,31 @@ public void Dispose() } } + public async Task DisposeAsync() + { + if (!_terminate) + { + Flush(); + _terminate = true; + try + { + await Task.WhenAll(_workers).ConfigureAwait(false); + _flushEvent.Dispose(); + } + catch (Exception e) + { + _optionalExceptionHandler?.Invoke(e); + } + + _workers.Clear(); + } + } + private void Dequeue() { var waitDuration = MinWaitDuration; - while (true) + while (!_terminate) { try { From 60c5e0d27a08ac240757e9b15e9cb1cd6ab244e9 Mon Sep 17 00:00:00 2001 From: Maximo Bautista Date: Fri, 6 Feb 2026 16:41:36 -0500 Subject: [PATCH 2/4] Adding some test to show usage of DisposeAsync --- .../StatsdClient.Tests/AsyncDisposalTests.cs | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 tests/StatsdClient.Tests/AsyncDisposalTests.cs diff --git a/tests/StatsdClient.Tests/AsyncDisposalTests.cs b/tests/StatsdClient.Tests/AsyncDisposalTests.cs new file mode 100644 index 00000000..96b21431 --- /dev/null +++ b/tests/StatsdClient.Tests/AsyncDisposalTests.cs @@ -0,0 +1,193 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using StatsdClient; +using Stopwatch = System.Diagnostics.Stopwatch; + +namespace Tests +{ + [TestFixture] + public class AsyncDisposalTests + { + [Test] + [Timeout(10000)] + public async Task DisposeAsync_CanBeCalledMultipleTimes() + { + var dogstatsd = new DogStatsdService(); + dogstatsd.Configure(new StatsdConfig + { + StatsdServerName = "127.0.0.1", + StatsdPort = 4321, + }); + + var sendTasks = new List(); + for (var i = 0; i < 50; i++) + { + sendTasks.Add(Task.Run(() => dogstatsd.Increment("test.metric"))); + } + + await Task.WhenAll(sendTasks); + + var disposeTasks = new[] + { + dogstatsd.DisposeAsync(), + dogstatsd.DisposeAsync(), + dogstatsd.DisposeAsync() + }; + + await Task.WhenAll(disposeTasks); + } + + [Test] + [Timeout(10000)] + public async Task DisposeAsync_Completes_WithInFlightMetrics() + { + var dogstatsd = new DogStatsdService(); + dogstatsd.Configure(new StatsdConfig + { + StatsdServerName = "127.0.0.1", + StatsdPort = 4321, + }); + + var cts = new CancellationTokenSource(); + var spamTask = Task.Run( + async () => + { + while (!cts.Token.IsCancellationRequested) + { + dogstatsd.Increment("test.metric"); + await Task.Yield(); + } + }, + cts.Token); + + await Task.Delay(200); + + await dogstatsd.DisposeAsync(); + + cts.Cancel(); + + try + { + await spamTask; + } + catch (TaskCanceledException) + { + // expected + } + } + + /// + /// Manual stress / demo. Runs only when invoked explicitly. + /// + [Test] + [Explicit("Manual benchmark for PR docs - demonstrates async vs sync under starvation")] + [Timeout(120000)] + public async Task CompareSyncVsAsync_UnderStarvation() + { + ThreadPool.GetMinThreads(out var origMin, out var origMinIO); + ThreadPool.GetMaxThreads(out var origMax, out var origMaxIO); + + try + { + ThreadPool.SetMinThreads(2, 2); + ThreadPool.SetMaxThreads(4, 4); + + long syncTime; + long asyncTime; + bool syncCompleted; + bool asyncCompleted; + + // --- sync --- + { + var dogstatsd = new DogStatsdService(); + dogstatsd.Configure(new StatsdConfig + { + StatsdServerName = "127.0.0.1", + StatsdPort = 4321, + }); + + var releaseSignal = new ManualResetEventSlim(false); + var tasks = new List(); + + for (var i = 0; i < 10; i++) + { + tasks.Add(Task.Run(() => + { + dogstatsd.Increment("test.metric"); + releaseSignal.Wait(15000); + })); + } + + Thread.Sleep(500); + + var sw = Stopwatch.StartNew(); + var disposeThread = new Thread(dogstatsd.Dispose); + disposeThread.Start(); + syncCompleted = disposeThread.Join(10000); + sw.Stop(); + syncTime = sw.ElapsedMilliseconds; + + releaseSignal.Set(); + + if (!syncCompleted) + { + disposeThread.Join(5000); + } + + Task.WaitAll(tasks.ToArray(), 5000); + releaseSignal.Dispose(); + } + + await Task.Delay(500); + + // --- async --- + { + var dogstatsd = new DogStatsdService(); + dogstatsd.Configure(new StatsdConfig + { + StatsdServerName = "127.0.0.1", + StatsdPort = 4321, + }); + + var releaseSignal = new ManualResetEventSlim(false); + var tasks = new List(); + + for (var i = 0; i < 10; i++) + { + tasks.Add(Task.Run(() => + { + dogstatsd.Increment("test.metric"); + releaseSignal.Wait(15000); + })); + } + + await Task.Delay(500); + + var sw = Stopwatch.StartNew(); + var disposeTask = dogstatsd.DisposeAsync(); + asyncCompleted = await Task.WhenAny(disposeTask, Task.Delay(10000)) == disposeTask; + sw.Stop(); + asyncTime = sw.ElapsedMilliseconds; + + releaseSignal.Set(); + await Task.WhenAll(tasks); + releaseSignal.Dispose(); + } + + TestContext.WriteLine("=== THREAD POOL STARVATION BENCHMARK ==="); + TestContext.WriteLine("Thread pool: min=2, max=4"); + TestContext.WriteLine("Blocking tasks: 10"); + TestContext.WriteLine(); + TestContext.WriteLine($"SYNC Dispose: {(syncCompleted ? $"{syncTime}ms" : $"blocked (>{syncTime}ms)")}"); + TestContext.WriteLine($"ASYNC Dispose: {(asyncCompleted ? $"{asyncTime}ms" : $"blocked (>{asyncTime}ms)")}"); + } + finally + { + ThreadPool.SetMinThreads(origMin, origMinIO); + ThreadPool.SetMaxThreads(origMax, origMaxIO); + } + } + } +} From 6dc399f6bec0d80e4404b84b9e43a252c4a5ed51 Mon Sep 17 00:00:00 2001 From: Maximo Bautista Date: Fri, 6 Feb 2026 17:31:21 -0500 Subject: [PATCH 3/4] Fixing errors and applying comments changes --- src/StatsdClient/Worker/AsynchronousWorker.cs | 2 +- .../StatsdClient.Tests/AsyncDisposalTests.cs | 29 ------------------- 2 files changed, 1 insertion(+), 30 deletions(-) diff --git a/src/StatsdClient/Worker/AsynchronousWorker.cs b/src/StatsdClient/Worker/AsynchronousWorker.cs index 0b67d5a7..dfd8ec4d 100644 --- a/src/StatsdClient/Worker/AsynchronousWorker.cs +++ b/src/StatsdClient/Worker/AsynchronousWorker.cs @@ -109,7 +109,7 @@ private void Dequeue() { var waitDuration = MinWaitDuration; - while (!_terminate) + while (true) { try { diff --git a/tests/StatsdClient.Tests/AsyncDisposalTests.cs b/tests/StatsdClient.Tests/AsyncDisposalTests.cs index 96b21431..386284e7 100644 --- a/tests/StatsdClient.Tests/AsyncDisposalTests.cs +++ b/tests/StatsdClient.Tests/AsyncDisposalTests.cs @@ -10,35 +10,6 @@ namespace Tests [TestFixture] public class AsyncDisposalTests { - [Test] - [Timeout(10000)] - public async Task DisposeAsync_CanBeCalledMultipleTimes() - { - var dogstatsd = new DogStatsdService(); - dogstatsd.Configure(new StatsdConfig - { - StatsdServerName = "127.0.0.1", - StatsdPort = 4321, - }); - - var sendTasks = new List(); - for (var i = 0; i < 50; i++) - { - sendTasks.Add(Task.Run(() => dogstatsd.Increment("test.metric"))); - } - - await Task.WhenAll(sendTasks); - - var disposeTasks = new[] - { - dogstatsd.DisposeAsync(), - dogstatsd.DisposeAsync(), - dogstatsd.DisposeAsync() - }; - - await Task.WhenAll(disposeTasks); - } - [Test] [Timeout(10000)] public async Task DisposeAsync_Completes_WithInFlightMetrics() From 3c465fa0d0eff43397d77779254b62a5db620022 Mon Sep 17 00:00:00 2001 From: Maximo Bautista Date: Fri, 6 Feb 2026 18:40:58 -0500 Subject: [PATCH 4/4] Removing test on favor of comparison one --- .../StatsdClient.Tests/AsyncDisposalTests.cs | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/tests/StatsdClient.Tests/AsyncDisposalTests.cs b/tests/StatsdClient.Tests/AsyncDisposalTests.cs index 386284e7..516a1ab6 100644 --- a/tests/StatsdClient.Tests/AsyncDisposalTests.cs +++ b/tests/StatsdClient.Tests/AsyncDisposalTests.cs @@ -10,45 +10,6 @@ namespace Tests [TestFixture] public class AsyncDisposalTests { - [Test] - [Timeout(10000)] - public async Task DisposeAsync_Completes_WithInFlightMetrics() - { - var dogstatsd = new DogStatsdService(); - dogstatsd.Configure(new StatsdConfig - { - StatsdServerName = "127.0.0.1", - StatsdPort = 4321, - }); - - var cts = new CancellationTokenSource(); - var spamTask = Task.Run( - async () => - { - while (!cts.Token.IsCancellationRequested) - { - dogstatsd.Increment("test.metric"); - await Task.Yield(); - } - }, - cts.Token); - - await Task.Delay(200); - - await dogstatsd.DisposeAsync(); - - cts.Cancel(); - - try - { - await spamTask; - } - catch (TaskCanceledException) - { - // expected - } - } - /// /// Manual stress / demo. Runs only when invoked explicitly. ///