From 000ebfc6ddaa4c348a8a16bb2976f019516d31c5 Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 01:26:39 +0300 Subject: [PATCH 01/12] [dotnet] [bidi] Decouple transport and processing --- dotnet/src/webdriver/BiDi/Broker.cs | 95 ++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 21 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index 2cfa324838317..fcc18d6d3f9a4 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -21,6 +21,7 @@ using System.Collections.Concurrent; using System.Text.Json; using System.Text.Json.Serialization.Metadata; +using System.Threading.Channels; using OpenQA.Selenium.BiDi.Session; using OpenQA.Selenium.Internal.Logging; @@ -38,7 +39,13 @@ internal sealed class Broker : IAsyncDisposable private long _currentCommandId; - private readonly Task _receivingMessageTask; + private readonly Channel _receivedMessages = Channel.CreateBounded( + new BoundedChannelOptions(16) { SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait }); + + private readonly ConcurrentBag _bufferPool = []; + + private readonly Task _receivingTask; + private readonly Task _processingTask; private readonly CancellationTokenSource _receiveMessagesCancellationTokenSource; public Broker(ITransport transport, IBiDi bidi, Func sessionProvider) @@ -48,7 +55,8 @@ public Broker(ITransport transport, IBiDi bidi, Func sessionProv _eventDispatcher = new EventDispatcher(sessionProvider); _receiveMessagesCancellationTokenSource = new CancellationTokenSource(); - _receivingMessageTask = Task.Run(() => ReceiveMessagesLoopAsync(_receiveMessagesCancellationTokenSource.Token)); + _receivingTask = Task.Run(() => ReceiveMessagesAsync(_receiveMessagesCancellationTokenSource.Token)); + _processingTask = Task.Run(ProcessMessagesAsync); } public Task SubscribeAsync(string eventName, EventHandler eventHandler, SubscriptionOptions? options, JsonTypeInfo jsonTypeInfo, CancellationToken cancellationToken) @@ -118,17 +126,24 @@ public async ValueTask DisposeAsync() try { - await _receivingMessageTask.ConfigureAwait(false); + await _receivingTask.ConfigureAwait(false); } catch (OperationCanceledException) when (_receiveMessagesCancellationTokenSource.IsCancellationRequested) { // Expected when cancellation is requested, ignore. } + await _processingTask.ConfigureAwait(false); + _receiveMessagesCancellationTokenSource.Dispose(); await _transport.DisposeAsync().ConfigureAwait(false); + while (_bufferPool.TryTake(out var buffer)) + { + buffer.Dispose(); + } + GC.SuppressFinalize(this); } @@ -281,37 +296,33 @@ private void ProcessReceivedMessage(ReadOnlySpan data) } } - private async Task ReceiveMessagesLoopAsync(CancellationToken cancellationToken) + private async Task ReceiveMessagesAsync(CancellationToken cancellationToken) { - using var receiveBufferWriter = new PooledBufferWriter(); - try { while (!cancellationToken.IsCancellationRequested) { - receiveBufferWriter.Reset(); - - await _transport.ReceiveAsync(receiveBufferWriter, cancellationToken).ConfigureAwait(false); + var buffer = RentBuffer(); - if (_logger.IsEnabled(LogEventLevel.Trace)) + try { + await _transport.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false); + + if (_logger.IsEnabled(LogEventLevel.Trace)) + { #if NET8_0_OR_GREATER - _logger.Trace($"BiDi RCV <-- {System.Text.Encoding.UTF8.GetString(receiveBufferWriter.WrittenMemory.Span)}"); + _logger.Trace($"BiDi RCV <-- {System.Text.Encoding.UTF8.GetString(buffer.WrittenMemory.Span)}"); #else - _logger.Trace($"BiDi RCV <-- {System.Text.Encoding.UTF8.GetString(receiveBufferWriter.WrittenMemory.ToArray())}"); + _logger.Trace($"BiDi RCV <-- {System.Text.Encoding.UTF8.GetString(buffer.WrittenMemory.ToArray())}"); #endif - } + } - try - { - ProcessReceivedMessage(receiveBufferWriter.WrittenMemory.Span); + await _receivedMessages.Writer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); } - catch (Exception ex) + catch { - if (_logger.IsEnabled(LogEventLevel.Error)) - { - _logger.Error($"Unhandled error occurred while processing remote message: {ex}"); - } + ReturnBuffer(buffer); + throw; } } } @@ -333,6 +344,48 @@ private async Task ReceiveMessagesLoopAsync(CancellationToken cancellationToken) throw; } + finally + { + _receivedMessages.Writer.TryComplete(); + } + } + + private async Task ProcessMessagesAsync() + { + var reader = _receivedMessages.Reader; + + while (await reader.WaitToReadAsync().ConfigureAwait(false)) + { + while (reader.TryRead(out var buffer)) + { + try + { + ProcessReceivedMessage(buffer.WrittenMemory.Span); + } + catch (Exception ex) + { + if (_logger.IsEnabled(LogEventLevel.Error)) + { + _logger.Error($"Unhandled error occurred while processing remote message: {ex}"); + } + } + finally + { + ReturnBuffer(buffer); + } + } + } + } + + private PooledBufferWriter RentBuffer() + { + return _bufferPool.TryTake(out var buffer) ? buffer : new PooledBufferWriter(); + } + + private void ReturnBuffer(PooledBufferWriter buffer) + { + buffer.Reset(); + _bufferPool.Add(buffer); } private readonly record struct CommandInfo(TaskCompletionSource TaskCompletionSource, JsonTypeInfo JsonResultTypeInfo); From 78567e415d4094ba49c15e59faacecc2237f80a4 Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 02:05:37 +0300 Subject: [PATCH 02/12] Terminal exception --- dotnet/src/webdriver/BiDi/Broker.cs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index fcc18d6d3f9a4..9a80f3bc1e6af 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -44,6 +44,8 @@ internal sealed class Broker : IAsyncDisposable private readonly ConcurrentBag _bufferPool = []; + private volatile Exception? _terminalReceiveException; + private readonly Task _receivingTask; private readonly Task _processingTask; private readonly CancellationTokenSource _receiveMessagesCancellationTokenSource; @@ -333,14 +335,9 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken) _logger.Error($"Unhandled error occurred while receiving remote messages: {ex}"); } - // Fail all pending commands, as the connection is likely broken if we failed to receive messages. - foreach (var id in _pendingCommands.Keys) - { - if (_pendingCommands.TryRemove(id, out var pendingCommand)) - { - pendingCommand.TaskCompletionSource.TrySetException(ex); - } - } + // Record the exception so the processing task can fail remaining commands + // after draining already-received responses from the channel. + _terminalReceiveException = ex; throw; } @@ -375,6 +372,19 @@ private async Task ProcessMessagesAsync() } } } + + // Channel is fully drained. Fail any commands that didn't get a response. + var terminalException = _terminalReceiveException; + if (terminalException is not null) + { + foreach (var id in _pendingCommands.Keys) + { + if (_pendingCommands.TryRemove(id, out var pendingCommand)) + { + pendingCommand.TaskCompletionSource.TrySetException(terminalException); + } + } + } } private PooledBufferWriter RentBuffer() From 3748f372d831f81039f1b24d73715246d6d1dc03 Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 02:25:10 +0300 Subject: [PATCH 03/12] Shutdown --- dotnet/src/webdriver/BiDi/Broker.cs | 46 ++++++++++++++++++----------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index 9a80f3bc1e6af..b9b3ffd6084c8 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -124,26 +124,31 @@ public async ValueTask DisposeAsync() { _receiveMessagesCancellationTokenSource.Cancel(); - await _eventDispatcher.DisposeAsync().ConfigureAwait(false); - try { - await _receivingTask.ConfigureAwait(false); - } - catch (OperationCanceledException) when (_receiveMessagesCancellationTokenSource.IsCancellationRequested) - { - // Expected when cancellation is requested, ignore. - } - - await _processingTask.ConfigureAwait(false); + try + { + await _receivingTask.ConfigureAwait(false); + } + catch (OperationCanceledException) when (_receiveMessagesCancellationTokenSource.IsCancellationRequested) + { + // Expected when cancellation is requested, ignore. + } - _receiveMessagesCancellationTokenSource.Dispose(); + await _transport.DisposeAsync().ConfigureAwait(false); - await _transport.DisposeAsync().ConfigureAwait(false); + await _processingTask.ConfigureAwait(false); - while (_bufferPool.TryTake(out var buffer)) + await _eventDispatcher.DisposeAsync().ConfigureAwait(false); + } + finally { - buffer.Dispose(); + _receiveMessagesCancellationTokenSource.Dispose(); + + while (_bufferPool.TryTake(out var buffer)) + { + buffer.Dispose(); + } } GC.SuppressFinalize(this); @@ -373,16 +378,21 @@ private async Task ProcessMessagesAsync() } } - // Channel is fully drained. Fail any commands that didn't get a response. + // Channel is fully drained. Fail any commands that didn't get a response: + // either with the transport error or cancellation for clean shutdown. var terminalException = _terminalReceiveException; - if (terminalException is not null) + foreach (var id in _pendingCommands.Keys) { - foreach (var id in _pendingCommands.Keys) + if (_pendingCommands.TryRemove(id, out var pendingCommand)) { - if (_pendingCommands.TryRemove(id, out var pendingCommand)) + if (terminalException is not null) { pendingCommand.TaskCompletionSource.TrySetException(terminalException); } + else + { + pendingCommand.TaskCompletionSource.TrySetCanceled(); + } } } } From 20656137e42ff8265f808f9d68c3ac7842fd384a Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:38:33 +0300 Subject: [PATCH 04/12] Disposal test --- dotnet/test/webdriver/BiDi/Session/SessionTests.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dotnet/test/webdriver/BiDi/Session/SessionTests.cs b/dotnet/test/webdriver/BiDi/Session/SessionTests.cs index eda5184fb435d..61b5e06c3edc6 100644 --- a/dotnet/test/webdriver/BiDi/Session/SessionTests.cs +++ b/dotnet/test/webdriver/BiDi/Session/SessionTests.cs @@ -21,6 +21,13 @@ namespace OpenQA.Selenium.Tests.BiDi.Session; internal class SessionTests : BiDiTestFixture { + [Test] + public async Task ShouldIdempotentDisposal() + { + await bidi.DisposeAsync(); + await bidi.DisposeAsync(); + } + [Test] public async Task CanGetStatus() { From c2fb21f9f4cf5be1d18cd7177e4550493b72d85a Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:45:34 +0300 Subject: [PATCH 05/12] Update BiDiFixture.cs --- dotnet/test/webdriver/BiDi/BiDiFixture.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dotnet/test/webdriver/BiDi/BiDiFixture.cs b/dotnet/test/webdriver/BiDi/BiDiFixture.cs index 7c3d01674b740..057176d488c5c 100644 --- a/dotnet/test/webdriver/BiDi/BiDiFixture.cs +++ b/dotnet/test/webdriver/BiDi/BiDiFixture.cs @@ -57,7 +57,10 @@ public async Task BiDiTearDown() await bidi.DisposeAsync(); } - driver?.Dispose(); + if (driver is not null) + { + await driver.DisposeAsync(); + } } public class BiDiEnabledDriverOptions : DriverOptions From 519970d6e2f9332d392f4e26120e483e6fbd210f Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:29:03 +0300 Subject: [PATCH 06/12] Rename test --- dotnet/test/webdriver/BiDi/Session/SessionTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/test/webdriver/BiDi/Session/SessionTests.cs b/dotnet/test/webdriver/BiDi/Session/SessionTests.cs index 61b5e06c3edc6..57d3fac77426f 100644 --- a/dotnet/test/webdriver/BiDi/Session/SessionTests.cs +++ b/dotnet/test/webdriver/BiDi/Session/SessionTests.cs @@ -22,7 +22,7 @@ namespace OpenQA.Selenium.Tests.BiDi.Session; internal class SessionTests : BiDiTestFixture { [Test] - public async Task ShouldIdempotentDisposal() + public async Task ShouldHaveIdempotentDisposal() { await bidi.DisposeAsync(); await bidi.DisposeAsync(); From 593b47871f7085c1beed6a5b25a575c0add7f40f Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:32:57 +0300 Subject: [PATCH 07/12] Const for potential configuration values --- dotnet/src/webdriver/BiDi/Broker.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index b9b3ffd6084c8..9a0647a51c4d4 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -29,6 +29,12 @@ namespace OpenQA.Selenium.BiDi; internal sealed class Broker : IAsyncDisposable { + // Limits how many received messages can be buffered before backpressure is applied to the transport. + private const int ReceivedMessageQueueCapacity = 16; + + // How long to wait for a command response before cancelling. + private static readonly TimeSpan DefaultCommandTimeout = TimeSpan.FromSeconds(30); + private readonly ILogger _logger = Internal.Logging.Log.GetLogger(); private readonly ITransport _transport; @@ -40,7 +46,7 @@ internal sealed class Broker : IAsyncDisposable private long _currentCommandId; private readonly Channel _receivedMessages = Channel.CreateBounded( - new BoundedChannelOptions(16) { SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait }); + new BoundedChannelOptions(ReceivedMessageQueueCapacity) { SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait }); private readonly ConcurrentBag _bufferPool = []; @@ -79,7 +85,7 @@ public async Task ExecuteCommandAsync(TCommand comma ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) : new CancellationTokenSource(); - var timeout = options?.Timeout ?? TimeSpan.FromSeconds(30); + var timeout = options?.Timeout ?? DefaultCommandTimeout; cts.CancelAfter(timeout); using var sendBuffer = new PooledBufferWriter(); From 3949ab1bf89c34745d0756f86365672b77e46b7c Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:35:45 +0300 Subject: [PATCH 08/12] Fail fast commands --- dotnet/src/webdriver/BiDi/Broker.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index 9a0647a51c4d4..0c4fd778d20eb 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -77,6 +77,11 @@ public async Task ExecuteCommandAsync(TCommand comma where TCommand : Command where TResult : EmptyResult { + if (_terminalReceiveException is { } terminalException) + { + throw new BiDiException("The broker is no longer processing messages due to a transport error.", terminalException); + } + command.Id = Interlocked.Increment(ref _currentCommandId); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); From 27aaec0c7076ab6d615389ecc4ee1e61348bbbc7 Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:39:18 +0300 Subject: [PATCH 09/12] Don't throw in processing loop --- dotnet/src/webdriver/BiDi/Broker.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index 0c4fd778d20eb..a8fc598fe8704 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -351,11 +351,8 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken) _logger.Error($"Unhandled error occurred while receiving remote messages: {ex}"); } - // Record the exception so the processing task can fail remaining commands - // after draining already-received responses from the channel. + // Propagated via _terminalReceiveException; not rethrown to keep disposal orderly. _terminalReceiveException = ex; - - throw; } finally { From d18cc1b7faa6e40038826edfac9ccc245137afff Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:54:43 +0300 Subject: [PATCH 10/12] Bounded buffer pool --- dotnet/src/webdriver/BiDi/Broker.cs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index a8fc598fe8704..135adb0a94456 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -48,7 +48,8 @@ internal sealed class Broker : IAsyncDisposable private readonly Channel _receivedMessages = Channel.CreateBounded( new BoundedChannelOptions(ReceivedMessageQueueCapacity) { SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait }); - private readonly ConcurrentBag _bufferPool = []; + private readonly Channel _bufferPool = Channel.CreateBounded( + new BoundedChannelOptions(ReceivedMessageQueueCapacity) { SingleReader = false, SingleWriter = false }); private volatile Exception? _terminalReceiveException; @@ -156,7 +157,7 @@ public async ValueTask DisposeAsync() { _receiveMessagesCancellationTokenSource.Dispose(); - while (_bufferPool.TryTake(out var buffer)) + while (_bufferPool.Reader.TryRead(out var buffer)) { buffer.Dispose(); } @@ -407,13 +408,16 @@ private async Task ProcessMessagesAsync() private PooledBufferWriter RentBuffer() { - return _bufferPool.TryTake(out var buffer) ? buffer : new PooledBufferWriter(); + return _bufferPool.Reader.TryRead(out var buffer) ? buffer : new PooledBufferWriter(); } private void ReturnBuffer(PooledBufferWriter buffer) { buffer.Reset(); - _bufferPool.Add(buffer); + if (!_bufferPool.Writer.TryWrite(buffer)) + { + buffer.Dispose(); + } } private readonly record struct CommandInfo(TaskCompletionSource TaskCompletionSource, JsonTypeInfo JsonResultTypeInfo); @@ -440,7 +444,13 @@ public void Reset() _written = 0; } - public void Advance(int count) => _written += count; + public void Advance(int count) + { + if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); + if (_written + count > (_buffer?.Length ?? 0)) throw new InvalidOperationException("Cannot advance past the end of the buffer."); + + _written += count; + } public Memory GetMemory(int sizeHint = 0) { From a885fcb97868307a880238d07a6e16171d3241b8 Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:56:47 +0300 Subject: [PATCH 11/12] Clean --- dotnet/src/webdriver/BiDi/Broker.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index 135adb0a94456..90a5266671553 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -162,8 +162,6 @@ public async ValueTask DisposeAsync() buffer.Dispose(); } } - - GC.SuppressFinalize(this); } private void ProcessReceivedMessage(ReadOnlySpan data) @@ -468,8 +466,10 @@ private void EnsureCapacity(int sizeHint) { var buffer = _buffer ?? throw new ObjectDisposedException(nameof(PooledBufferWriter)); - if (sizeHint <= 0) sizeHint = buffer.Length - _written; - if (sizeHint <= 0) sizeHint = buffer.Length; + if (sizeHint <= 0) + { + sizeHint = Math.Max(1, buffer.Length - _written); + } if (_written + sizeHint > buffer.Length) { From f54d3a0a75f89c13f7345dcfe624c3e16bc48447 Mon Sep 17 00:00:00 2001 From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:01:24 +0300 Subject: [PATCH 12/12] Use pool for commands --- dotnet/src/webdriver/BiDi/Broker.cs | 49 ++++++++++++++++------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/dotnet/src/webdriver/BiDi/Broker.cs b/dotnet/src/webdriver/BiDi/Broker.cs index 90a5266671553..6419709d315d9 100644 --- a/dotnet/src/webdriver/BiDi/Broker.cs +++ b/dotnet/src/webdriver/BiDi/Broker.cs @@ -94,39 +94,46 @@ public async Task ExecuteCommandAsync(TCommand comma var timeout = options?.Timeout ?? DefaultCommandTimeout; cts.CancelAfter(timeout); - using var sendBuffer = new PooledBufferWriter(); + var sendBuffer = RentBuffer(); - using (var writer = new Utf8JsonWriter(sendBuffer)) + try { - JsonSerializer.Serialize(writer, command, jsonCommandTypeInfo); - } + using (var writer = new Utf8JsonWriter(sendBuffer)) + { + JsonSerializer.Serialize(writer, command, jsonCommandTypeInfo); + } - var commandInfo = new CommandInfo(tcs, jsonResultTypeInfo); - _pendingCommands[command.Id] = commandInfo; + var commandInfo = new CommandInfo(tcs, jsonResultTypeInfo); + _pendingCommands[command.Id] = commandInfo; - using var ctsRegistration = cts.Token.Register(() => - { - tcs.TrySetCanceled(cts.Token); - _pendingCommands.TryRemove(command.Id, out _); - }); + using var ctsRegistration = cts.Token.Register(() => + { + tcs.TrySetCanceled(cts.Token); + _pendingCommands.TryRemove(command.Id, out _); + }); - try - { - if (_logger.IsEnabled(LogEventLevel.Trace)) + try { + if (_logger.IsEnabled(LogEventLevel.Trace)) + { #if NET8_0_OR_GREATER - _logger.Trace($"BiDi SND --> {System.Text.Encoding.UTF8.GetString(sendBuffer.WrittenMemory.Span)}"); + _logger.Trace($"BiDi SND --> {System.Text.Encoding.UTF8.GetString(sendBuffer.WrittenMemory.Span)}"); #else - _logger.Trace($"BiDi SND --> {System.Text.Encoding.UTF8.GetString(sendBuffer.WrittenMemory.ToArray())}"); + _logger.Trace($"BiDi SND --> {System.Text.Encoding.UTF8.GetString(sendBuffer.WrittenMemory.ToArray())}"); #endif - } + } - await _transport.SendAsync(sendBuffer.WrittenMemory, cts.Token).ConfigureAwait(false); + await _transport.SendAsync(sendBuffer.WrittenMemory, cts.Token).ConfigureAwait(false); + } + catch + { + _pendingCommands.TryRemove(command.Id, out _); + throw; + } } - catch + finally { - _pendingCommands.TryRemove(command.Id, out _); - throw; + ReturnBuffer(sendBuffer); } return (TResult)await tcs.Task.ConfigureAwait(false);