Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/KubernetesClient/Watcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ Task<TR> AttachCancellationToken<TR>(Task<TR> task)
{
if (!task.IsCompleted)
{
// Observe any exception from the original task to prevent an
// UnobservedTaskException when the continuation below is cancelled
// before the original task faults (e.g. the transport tears down the
// connection after cancellation).
_ = task.ContinueWith(
static t => { _ = t.Exception; },
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);

// here to pass cancellationToken into task
return task.ContinueWith(t => t.GetAwaiter().GetResult(), cancellationToken);
}
Expand All @@ -174,7 +184,11 @@ Task<TR> AttachCancellationToken<TR>(Task<TR> task)
for (; ; )
{
// ReadLineAsync will return null when we've reached the end of the stream.
#if NET7_0_OR_GREATER
var line = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
#else
var line = await AttachCancellationToken(streamReader.ReadLineAsync()).ConfigureAwait(false);
#endif

cancellationToken.ThrowIfCancellationRequested();

Expand Down
65 changes: 65 additions & 0 deletions tests/KubernetesClient.Tests/WatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,5 +1028,70 @@ public async Task AsyncEnumerableWatchErrorHandling()
Assert.True(watchCompleted.IsSet);
}
}

[Fact]
public async Task CancellationDoesNotLeaveUnobservedTaskException()
{
// Regression test for https://github.com/kubernetes-client/csharp/issues/1813
// When the cancellation token is cancelled while a read is in flight and the
// underlying task subsequently faults (e.g. transport-level IOException after the
// connection is torn down), the faulting task must be observed so that no
// TaskScheduler.UnobservedTaskException is raised when it is finalized.
var unobservedExceptions = new List<Exception>();
void Handler(object sender, UnobservedTaskExceptionEventArgs e)
{
unobservedExceptions.Add(e.Exception);
}

TaskScheduler.UnobservedTaskException += Handler;
try
{
// Run the cancellation scenario in a separate, non-inlined method so that all
// references to the orphaned task go out of scope before we force a collection.
await RunCancelledWatchAsync().ConfigureAwait(true);

// Force the orphaned task to be finalized; without observing its exception this
// would raise TaskScheduler.UnobservedTaskException.
for (var i = 0; i < 5; i++)
{
GC.Collect();
GC.WaitForPendingFinalizers();
await Task.Delay(50).ConfigureAwait(true);
}

Assert.Empty(unobservedExceptions);
}
finally
{
TaskScheduler.UnobservedTaskException -= Handler;
}
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.NoInlining)]
private static async Task RunCancelledWatchAsync()
{
using var cts = new CancellationTokenSource();
var faultReader = new TaskCompletionSource<TextReader>(TaskCreationOptions.RunContinuationsAsynchronously);

Func<Task<TextReader>> streamReaderCreator = () => faultReader.Task;

var enumerator = Watcher<V1Pod>
.CreateWatchEventEnumerator(streamReaderCreator, onError: null, cancellationToken: cts.Token)
.GetAsyncEnumerator(cts.Token);

// Start the enumeration; this awaits the (not yet completed) creator task.
var moveNext = enumerator.MoveNextAsync();

// Cancel before the creator task completes. The cancellation-aware continuation
// is cancelled, but the original creator task is still pending.
cts.Cancel();

await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await moveNext.ConfigureAwait(true)).ConfigureAwait(true);

await enumerator.DisposeAsync().ConfigureAwait(true);

// Now fault the original creator task, mimicking the transport tear-down.
faultReader.SetException(new IOException("The request was aborted."));
}
}
}
Loading