Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/StatsdClient/Bufferize/StatsBufferize.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using StatsdClient.Statistic;
using StatsdClient.Worker;

Expand Down Expand Up @@ -45,6 +46,8 @@ public void Dispose()
this._worker.Dispose();
}

public Task DisposeAsync() => this._worker.DisposeAsync();

private class WorkerHandler : IAsynchronousWorkerHandler<Stats>
{
private readonly StatsRouter _statsRouter;
Expand Down
16 changes: 16 additions & 0 deletions src/StatsdClient/DogStatsdService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Diagnostics;
using System.Globalization;
using System.Threading.Tasks;
using StatsdClient.Bufferize;

namespace StatsdClient
Expand Down Expand Up @@ -299,5 +300,20 @@ public void Dispose()
_statsdData?.Dispose();
_statsdData = null;
}

/// <summary>
/// Asynchronously disposes an instance of DogStatsdService.
/// Flushes all metrics.
/// </summary>
/// <returns>A task that represents the asynchronous dispose operation.</returns>
public async Task DisposeAsync()
{
var statsdData = _statsdData;
if (statsdData != null)
{
_statsdData = null;
await statsdData.DisposeAsync().ConfigureAwait(false);
}
}
}
}
8 changes: 8 additions & 0 deletions src/StatsdClient/IDogStatsd.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace StatsdClient
{
Expand Down Expand Up @@ -200,5 +201,12 @@ void ServiceCheck(
/// </summary>
/// <param name="flushTelemetry">The value indicating whether the telemetry must be flushed.</param>
void Flush(bool flushTelemetry = true);

/// <summary>
/// Asynchronously disposes the instance.
/// Flushes all metrics.
/// </summary>
/// <returns>A task that represents the asynchronous dispose operation.</returns>
Task DisposeAsync();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a member to an interface is a breaking change. Any custom types that implement the interface will not compile anymore.

}
}
32 changes: 27 additions & 5 deletions src/StatsdClient/StatsdData.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using StatsdClient.Bufferize;
using StatsdClient.Transport;

Expand All @@ -7,7 +8,7 @@ namespace StatsdClient
internal class StatsdData : IDisposable
{
private ITransport _transport;
private StatsBufferize _statsBufferize;
private StatsBufferize _statsBufferSize;

public StatsdData(
MetricsSender metricsSender,
Expand All @@ -17,7 +18,7 @@ public StatsdData(
{
MetricsSender = metricsSender;
Telemetry = telemetry;
_statsBufferize = statsBufferize;
_statsBufferSize = statsBufferize;
_transport = transport;
}

Expand All @@ -27,7 +28,7 @@ public StatsdData(

public void Flush(bool flushTelemetry)
{
_statsBufferize?.Flush();
_statsBufferSize?.Flush();
if (flushTelemetry)
{
Telemetry.Flush();
Expand All @@ -42,8 +43,29 @@ public void Dispose()
Telemetry?.Dispose();
Telemetry = null;

_statsBufferize?.Dispose();
_statsBufferize = null;
_statsBufferSize?.Dispose();
_statsBufferSize = null;

_transport?.Dispose();
_transport = null;

MetricsSender = null;
}

public async Task DisposeAsync()
{
// _statsBufferize and _telemetry must be disposed before _statsSender to make
// sure _statsSender does not receive data when it is already disposed.

Telemetry?.Dispose();
Telemetry = null;

var statsBufferSize = _statsBufferSize;
if (statsBufferSize != null)
{
await _statsBufferSize.DisposeAsync().ConfigureAwait(false);
_statsBufferSize = null;
}

_transport?.Dispose();
_transport = null;
Expand Down
20 changes: 20 additions & 0 deletions src/StatsdClient/Worker/AsynchronousWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ public void Dispose()
}
}

public async Task DisposeAsync()
{
if (!_terminate)
{
Flush();
_terminate = true;
try
{
await Task.WhenAll(_workers).ConfigureAwait(false);
_flushEvent.Dispose();
}
catch (Exception e)
{
_optionalExceptionHandler?.Invoke(e);
}

_workers.Clear();
}
}

private void Dequeue()
{
var waitDuration = MinWaitDuration;
Expand Down
125 changes: 125 additions & 0 deletions tests/StatsdClient.Tests/AsyncDisposalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using StatsdClient;
using Stopwatch = System.Diagnostics.Stopwatch;

namespace Tests
{
[TestFixture]
public class AsyncDisposalTests
{
/// <summary>
/// Manual stress / demo. Runs only when invoked explicitly.
/// </summary>
[Test]
[Explicit("Manual benchmark for PR docs - demonstrates async vs sync under starvation")]
[Timeout(120000)]
public async Task CompareSyncVsAsync_UnderStarvation()
{
ThreadPool.GetMinThreads(out var origMin, out var origMinIO);
ThreadPool.GetMaxThreads(out var origMax, out var origMaxIO);

try
{
ThreadPool.SetMinThreads(2, 2);
ThreadPool.SetMaxThreads(4, 4);

long syncTime;
long asyncTime;
bool syncCompleted;
bool asyncCompleted;

// --- sync ---
{
var dogstatsd = new DogStatsdService();
dogstatsd.Configure(new StatsdConfig
{
StatsdServerName = "127.0.0.1",
StatsdPort = 4321,
});

var releaseSignal = new ManualResetEventSlim(false);
var tasks = new List<Task>();

for (var i = 0; i < 10; i++)
{
tasks.Add(Task.Run(() =>
{
dogstatsd.Increment("test.metric");
releaseSignal.Wait(15000);
}));
}

Thread.Sleep(500);

var sw = Stopwatch.StartNew();
var disposeThread = new Thread(dogstatsd.Dispose);
disposeThread.Start();
syncCompleted = disposeThread.Join(10000);
sw.Stop();
syncTime = sw.ElapsedMilliseconds;

releaseSignal.Set();

if (!syncCompleted)
{
disposeThread.Join(5000);
}

Task.WaitAll(tasks.ToArray(), 5000);
releaseSignal.Dispose();
}

await Task.Delay(500);

// --- async ---
{
var dogstatsd = new DogStatsdService();
dogstatsd.Configure(new StatsdConfig
{
StatsdServerName = "127.0.0.1",
StatsdPort = 4321,
});

var releaseSignal = new ManualResetEventSlim(false);
var tasks = new List<Task>();

for (var i = 0; i < 10; i++)
{
tasks.Add(Task.Run(() =>
{
dogstatsd.Increment("test.metric");
releaseSignal.Wait(15000);
}));
}

await Task.Delay(500);

var sw = Stopwatch.StartNew();
var disposeTask = dogstatsd.DisposeAsync();
asyncCompleted = await Task.WhenAny(disposeTask, Task.Delay(10000)) == disposeTask;
sw.Stop();
asyncTime = sw.ElapsedMilliseconds;

releaseSignal.Set();
await Task.WhenAll(tasks);
releaseSignal.Dispose();
}

TestContext.WriteLine("=== THREAD POOL STARVATION BENCHMARK ===");
TestContext.WriteLine("Thread pool: min=2, max=4");
TestContext.WriteLine("Blocking tasks: 10");
TestContext.WriteLine();
TestContext.WriteLine($"SYNC Dispose: {(syncCompleted ? $"{syncTime}ms" : $"blocked (>{syncTime}ms)")}");
TestContext.WriteLine($"ASYNC Dispose: {(asyncCompleted ? $"{asyncTime}ms" : $"blocked (>{asyncTime}ms)")}");
}
finally
{
ThreadPool.SetMinThreads(origMin, origMinIO);
ThreadPool.SetMaxThreads(origMax, origMaxIO);
}
}
}
}
Loading