From 89ade282ed8b31fcf07458928651d0b358a42abd Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Wed, 13 May 2026 01:46:39 +0300 Subject: [PATCH 1/7] Non-blocking `MSBuildClientPacketPump`. --- src/Build/BackEnd/Client/MSBuildClient.cs | 144 ++++---- .../BackEnd/Client/MSBuildClientPacketPump.cs | 346 ++++++------------ 2 files changed, 174 insertions(+), 316 deletions(-) diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs index c1073683256..053b799f777 100644 --- a/src/Build/BackEnd/Client/MSBuildClient.cs +++ b/src/Build/BackEnd/Client/MSBuildClient.cs @@ -5,10 +5,12 @@ using System.Collections; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.IO; using System.IO.Pipes; using System.Threading; +using System.Threading.Tasks; using Microsoft.Build.BackEnd; using Microsoft.Build.BackEnd.Client; using Microsoft.Build.BackEnd.Logging; @@ -55,11 +57,6 @@ public sealed class MSBuildClient /// private readonly MSBuildClientExitResult _exitResult; - /// - /// Whether MSBuild server finished the build. - /// - private bool _buildFinished = false; - /// /// Handshake between server and client. /// @@ -73,7 +70,7 @@ public sealed class MSBuildClient /// /// The named pipe stream for client-server communication. /// - private NamedPipeClientStream _nodeStream = null!; + private NamedPipeClientStream _nodeStream; /// /// A way to cache a byte array when writing out packets @@ -99,7 +96,7 @@ public sealed class MSBuildClient /// /// Incoming packet pump and redirection. /// - private MSBuildClientPacketPump _packetPump = null!; + private MSBuildClientPacketPump _packetPump; /// /// Public constructor with parameters. @@ -126,6 +123,9 @@ public MSBuildClient(string[] commandLine, string msbuildLocation) CreateNodePipeStream(); } +#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant + [MemberNotNull(nameof(_nodeStream), nameof(_packetPump))] +#pragma warning restore CS3016 // Arrays as attribute arguments is not CLS-compliant private void CreateNodePipeStream() { #pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter @@ -150,6 +150,17 @@ private void CreateNodePipeStream() /// A value of type that indicates whether the build succeeded, /// or the manner in which it failed. public MSBuildClientExitResult Execute(CancellationToken cancellationToken) + { + return ExecuteAsync(cancellationToken).GetAwaiter().GetResult(); + } + + /// + /// Orchestrates the execution of the build on the server, responsible for client-server communication. + /// + /// Cancellation token. + /// A that will complete with a value of type + /// that indicates whether the build succeeded, or the manner in which it failed. + internal async Task ExecuteAsync(CancellationToken cancellationToken) { // Command line in one string used only in human readable content. string descriptiveCommandLine = string.Join(" ", _commandLine); @@ -203,12 +214,12 @@ public MSBuildClientExitResult Execute(CancellationToken cancellationToken) // Send build command. // Let's send it outside the packet pump so that we easier and quicker deal with possible issues with connection to server. MSBuildEventSource.Log.MSBuildServerBuildStart(descriptiveCommandLine); - if (TrySendBuildCommand()) + if (await TrySendBuildCommandAsync().ConfigureAwait(false)) { _numConsoleWritePackets = 0; _sizeOfConsoleWritePackets = 0; - ReadPacketsLoop(cancellationToken); + await ReadPacketsLoop(cancellationToken).ConfigureAwait(false); MSBuildEventSource.Log.MSBuildServerBuildStop(descriptiveCommandLine, _numConsoleWritePackets, _sizeOfConsoleWritePackets, _exitResult.MSBuildClientExitType.ToString(), _exitResult.MSBuildAppExitTypeString ?? string.Empty); CommunicationsUtilities.Trace("Build finished."); @@ -232,10 +243,10 @@ public static bool ShutdownServer(CancellationToken cancellationToken) // Neither commandLine nor msbuildlocation is involved in node shutdown var client = new MSBuildClient(commandLine: null!, msbuildLocation: null!); - return client.TryShutdownServer(cancellationToken); + return client.TryShutdownServerAsync(cancellationToken).GetAwaiter().GetResult(); } - private bool TryShutdownServer(CancellationToken cancellationToken) + private async Task TryShutdownServerAsync(CancellationToken cancellationToken) { CommunicationsUtilities.Trace("Trying shutdown server node."); @@ -262,13 +273,13 @@ private bool TryShutdownServer(CancellationToken cancellationToken) return false; } - if (!TrySendShutdownCommand()) + if (!await TrySendShutdownCommandAsync().ConfigureAwait(false)) { CommunicationsUtilities.Trace("Failed to send shutdown command to the server."); return false; } - ReadPacketsLoop(cancellationToken); + await ReadPacketsLoop(cancellationToken).ConfigureAwait(false); return _exitResult.MSBuildClientExitType == MSBuildClientExitType.Success; } @@ -301,51 +312,42 @@ private bool ServerWasBusy() return serverWasBusy; } - private void ReadPacketsLoop(CancellationToken cancellationToken) + private async Task ReadPacketsLoop(CancellationToken cancellationToken) { try { // Start packet pump - using MSBuildClientPacketPump packetPump = _packetPump; + await using MSBuildClientPacketPump packetPump = _packetPump; - packetPump.RegisterPacketHandler(NodePacketType.ServerNodeConsoleWrite, ServerNodeConsoleWrite.FactoryForDeserialization, packetPump); - packetPump.RegisterPacketHandler(NodePacketType.ServerNodeBuildResult, ServerNodeBuildResult.FactoryForDeserialization, packetPump); + packetPump.RegisterPacketHandler(NodePacketType.ServerNodeConsoleWrite, ServerNodeConsoleWrite.FactoryForDeserialization); + packetPump.RegisterPacketHandler(NodePacketType.ServerNodeBuildResult, ServerNodeBuildResult.FactoryForDeserialization); packetPump.Start(); - WaitHandle[] waitHandles = + while (true) { - cancellationToken.WaitHandle, - packetPump.PacketPumpCompleted, - packetPump.PacketReceivedEvent - }; + bool hasPackets; + try + { + hasPackets = await packetPump.ReceivedPackets.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) + { + await HandleCancellationAsync().ConfigureAwait(false); - while (!_buildFinished) - { - int index = WaitHandle.WaitAny(waitHandles); - switch (index) + // After the cancelation, we want to wait to server gracefuly finish the build. + // We have to replace the cancelation token, because WaitAny would cause to repeatedly hit this branch of code. + cancellationToken = CancellationToken.None; + continue; + } + + if (!hasPackets) { - case 0: - HandleCancellation(); - // After the cancelation, we want to wait to server gracefuly finish the build. - // We have to replace the cancelation handle, because WaitAny would cause to repeatedly hit this branch of code. - waitHandles[0] = CancellationToken.None.WaitHandle; - break; - - case 1: - HandlePacketPumpCompleted(packetPump); - break; - - case 2: - while (packetPump.ReceivedPacketsQueue.TryDequeue(out INodePacket? packet) && - !_buildFinished) - { - if (packet != null) - { - HandlePacket(packet); - } - } - - break; + break; + } + + while (packetPump.ReceivedPackets.TryRead(out INodePacket? packet)) + { + HandlePacket(packet); } } } @@ -402,13 +404,13 @@ private ConsoleColor QueryConsoleBackgroundColor() return consoleBackgroundColor; } - private bool TrySendPacket(Func packetResolver) + private async ValueTask TrySendPacketAsync(Func packetResolver) { INodePacket? packet = null; try { packet = packetResolver(); - WritePacket(_nodeStream, packet); + await WritePacketAsync(_nodeStream, packet).ConfigureAwait(false); CommunicationsUtilities.Trace($"Command packet of type '{packet.Type}' sent..."); } catch (Exception ex) @@ -472,15 +474,15 @@ private bool TryLaunchServer() return true; } - private bool TrySendBuildCommand() => TrySendPacket(() => GetServerNodeBuildCommand()); + private ValueTask TrySendBuildCommandAsync() => TrySendPacketAsync(() => GetServerNodeBuildCommand()); - private bool TrySendCancelCommand() => TrySendPacket(() => new ServerNodeBuildCancel()); + private ValueTask TrySendCancelCommandAsync() => TrySendPacketAsync(() => new ServerNodeBuildCancel()); - private bool TrySendShutdownCommand() + private ValueTask TrySendShutdownCommandAsync() { CommunicationsUtilities.Trace("Sending shutdown command to server."); _packetPump.ServerWillDisconnect(); - return TrySendPacket(() => new NodeBuildComplete(false /* no node reuse */)); + return TrySendPacketAsync(() => new NodeBuildComplete(false /* no node reuse */)); } private ServerNodeBuildCommand GetServerNodeBuildCommand() @@ -527,27 +529,13 @@ private ServerNodeBuildCommand GetServerNodeBuildCommand() /// /// Handle cancellation. /// - private void HandleCancellation() + private async ValueTask HandleCancellationAsync() { - TrySendCancelCommand(); + _ = await TrySendCancelCommandAsync().ConfigureAwait(false); CommunicationsUtilities.Trace("MSBuild client sent cancellation command."); } - /// - /// Handle when packet pump is completed both successfully or with error. - /// - private void HandlePacketPumpCompleted(MSBuildClientPacketPump packetPump) - { - if (packetPump.PacketPumpException != null) - { - CommunicationsUtilities.Trace($"MSBuild client error: packet pump unexpectedly shut down: {packetPump.PacketPumpException}"); - throw packetPump.PacketPumpException ?? new InternalErrorException("Packet pump unexpectedly shut down"); - } - - _buildFinished = true; - } - /// /// Dispatches the packet to the correct handler. /// @@ -589,7 +577,6 @@ private void HandleServerNodeBuildResult(ServerNodeBuildResult response) CommunicationsUtilities.Trace($"Build response received: exit code '{response.ExitCode}', exit type '{response.ExitType}'"); _exitResult.MSBuildClientExitType = MSBuildClientExitType.Success; _exitResult.MSBuildAppExitTypeString = response.ExitType; - _buildFinished = true; } /// @@ -598,14 +585,10 @@ private void HandleServerNodeBuildResult(ServerNodeBuildResult response) /// Whether the client connected to MSBuild server successfully. private bool TryConnectToServer(int timeoutMilliseconds) { - bool tryAgain = true; Stopwatch sw = Stopwatch.StartNew(); - while (tryAgain && sw.ElapsedMilliseconds < timeoutMilliseconds) + while (sw.ElapsedMilliseconds < timeoutMilliseconds) { - tryAgain = false; - - if (NodeProviderOutOfProcBase.TryConnectToPipeStream( _nodeStream, _pipeName, _handshake, Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds), out HandshakeResult result)) { @@ -618,7 +601,6 @@ private bool TryConnectToServer(int timeoutMilliseconds) CommunicationsUtilities.Trace($"Retrying to connect to server after {sw.ElapsedMilliseconds} ms"); // This solves race condition for time in which server started but have not yet listen on pipe or // when it just finished build request and is recycling pipe. - tryAgain = true; CreateNodePipeStream(); } else @@ -633,7 +615,7 @@ private bool TryConnectToServer(int timeoutMilliseconds) return false; } - private void WritePacket(Stream nodeStream, INodePacket packet) + private async ValueTask WritePacketAsync(Stream nodeStream, INodePacket packet) { MemoryStream memoryStream = _packetMemoryStream; memoryStream.SetLength(0); @@ -655,7 +637,11 @@ private void WritePacket(Stream nodeStream, INodePacket packet) memoryStream.Position = 1; _binaryWriter.Write(packetStreamLength - 5); - nodeStream.Write(memoryStream.GetBuffer(), 0, packetStreamLength); +#if NET + await nodeStream.WriteAsync(memoryStream.GetBuffer().AsMemory(0, packetStreamLength)).ConfigureAwait(false); +#else + await nodeStream.WriteAsync(memoryStream.GetBuffer(), 0, packetStreamLength).ConfigureAwait(false); +#endif } } } diff --git a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs index de04a127a7a..04e71ffe8a9 100644 --- a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs +++ b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs @@ -3,50 +3,39 @@ using System; using System.Buffers.Binary; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.IO; using System.Threading; - -#if NET +using System.Threading.Channels; using System.Threading.Tasks; -#endif - using Microsoft.Build.Internal; using Microsoft.Build.Shared; namespace Microsoft.Build.BackEnd.Client { - internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketFactory, IDisposable + internal sealed class MSBuildClientPacketPump : IAsyncDisposable { - /// - /// The queue of packets we have received but which have not yet been processed. - /// - public ConcurrentQueue ReceivedPacketsQueue { get; } - - /// - /// Set when packet pump receive packets and put them to . - /// - public AutoResetEvent PacketReceivedEvent { get; } + private static readonly UnboundedChannelOptions s_channelOptions = new() + { + AllowSynchronousContinuations = true, + SingleReader = true, + SingleWriter = true, + }; /// - /// Set when the packet pump terminates. + /// The queue of packets we have received but which have not yet been processed. + /// Only one reader is allowed to read from this channel at a time. /// - public ManualResetEvent PacketPumpCompleted { get; } + public ChannelReader ReceivedPackets => _receivedPacketsChannel.Reader; - /// - /// Exception appeared when the packet pump unexpectedly terminates (due to connection problems or because of deserialization issues). - /// - public Exception? PacketPumpException { get; set; } + private readonly CancellationTokenSource _shutdownTokenSource; - /// - /// Set when packet pump should shutdown. - /// - private readonly ManualResetEvent _packetPumpShutdownEvent; + private readonly Channel _receivedPacketsChannel; /// - /// The packet factory. + /// Mapping of packet types to deserialization methods. /// - private readonly NodePacketFactory _packetFactory; + private readonly Dictionary _packetDeserializationMethods; /// /// The memory stream for a read buffer. @@ -54,9 +43,9 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF private readonly MemoryStream _readBufferMemoryStream; /// - /// The thread which runs the asynchronous packet pump + /// The task which runs the asynchronous packet pump. /// - private Thread? _packetPumpThread; + private Task? _packetPumpTask; /// /// The stream from where to read packets. @@ -73,272 +62,155 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF /// In such case we have sent last packet to server side and we expect /// it will soon broke pipe connection - unless client do it first. /// - private bool _isServerDisconnecting; + private volatile bool _isServerDisconnecting; public MSBuildClientPacketPump(Stream stream) { ErrorUtilities.VerifyThrowArgumentNull(stream); _stream = stream; - _isServerDisconnecting = false; - _packetFactory = new NodePacketFactory(); - - ReceivedPacketsQueue = new ConcurrentQueue(); - PacketReceivedEvent = new AutoResetEvent(false); - PacketPumpCompleted = new ManualResetEvent(false); - _packetPumpShutdownEvent = new ManualResetEvent(false); + _shutdownTokenSource = new CancellationTokenSource(); + _receivedPacketsChannel = Channel.CreateUnbounded(s_channelOptions); + _packetDeserializationMethods = new Dictionary(); _readBufferMemoryStream = new MemoryStream(); _binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer()); } - #region INodePacketFactory Members - /// /// Registers a packet handler. /// /// The packet type for which the handler should be registered. /// The factory used to create packets. - /// The handler for the packets. - public void RegisterPacketHandler(NodePacketType packetType, NodePacketFactoryMethod factory, INodePacketHandler handler) - { - _packetFactory.RegisterPacketHandler(packetType, factory, handler); - } - - /// - /// Unregisters a packet handler. - /// - /// The type of packet for which the handler should be unregistered. - public void UnregisterPacketHandler(NodePacketType packetType) - { - _packetFactory.UnregisterPacketHandler(packetType); - } - - /// - /// Deserializes and routes a packer to the appropriate handler. - /// - /// The node from which the packet was received. - /// The packet type. - /// The translator to use as a source for packet data. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) - { - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); - } + public void RegisterPacketHandler(NodePacketType packetType, NodePacketFactoryMethod factory) => + _packetDeserializationMethods.Add(packetType, factory); /// /// Deserializes a packet. /// /// The packet type. /// The translator to use as a source for packet data. - public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) + private INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { - return _packetFactory.DeserializePacket(packetType, translator); - } + ErrorUtilities.VerifyThrow( + _packetDeserializationMethods.TryGetValue(packetType, out NodePacketFactoryMethod? factory), + $"No packet handler for type {packetType}"); - /// - /// Routes a packet to the appropriate handler. - /// - /// The node id from which the packet was received. - /// The packet to route. - public void RoutePacket(int nodeId, INodePacket packet) - { - _packetFactory.RoutePacket(nodeId, packet); + return factory(translator); } - #endregion - - #region INodePacketHandler Members - - /// - /// Called when a packet has been received. - /// - /// The node from which the packet was received. - /// The packet. - public void PacketReceived(int node, INodePacket packet) - { - ReceivedPacketsQueue.Enqueue(packet); - PacketReceivedEvent.Set(); - } - - #endregion - #region Packet Pump - /// - /// Initializes the packet pump thread. - /// - public void Start() - { - _packetPumpThread = new Thread(PacketPumpProc) - { - IsBackground = true, - Name = "MSBuild Client Packet Pump" - }; - _packetPumpThread.Start(); - } - - /// - /// Stops the packet pump thread. - /// - public void Stop() - { - _packetPumpShutdownEvent.Set(); - _packetPumpThread?.Join(); - } /// - /// This method handles the packet pump reading. It will terminate when the terminate event is - /// set. + /// Initializes the packet pump task. /// - /// - /// Instead of throwing an exception, puts it in and raises event . - /// - private void PacketPumpProc() - { - RunReadLoop(_stream, _packetPumpShutdownEvent); - } + public void Start() => _packetPumpTask = Task.Run(RunReadLoopAsync); - private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShutdownEvent) + private async Task RunReadLoopAsync() { CommunicationsUtilities.Trace("Entering read loop."); + CancellationToken shutdownToken = _shutdownTokenSource.Token; + ChannelWriter packetWriter = _receivedPacketsChannel.Writer; try { byte[] headerByte = new byte[5]; -#if FEATURE_APM - IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null); -#else - Task readTask = localStream.ReadAsync(headerByte.AsMemory(), CancellationToken.None).AsTask(); -#endif - - // Ordering of the wait handles is important. The first signalled wait handle in the array - // will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the - // terminate event triggered so that we cannot get into a situation where packets are being - // spammed to the client and it never gets an opportunity to shutdown. - WaitHandle[] handles = - [ - localPacketPumpShutdownEvent, -#if FEATURE_APM - result.AsyncWaitHandle + while (true) + { + // Client recieved a packet header. Read the rest of it. +#if NET + int headerBytesRead = await _stream.ReadAsync(headerByte, shutdownToken).ConfigureAwait(false); #else - ((IAsyncResult)readTask).AsyncWaitHandle + int headerBytesRead = await _stream.ReadAsync(headerByte, 0, headerByte.Length, shutdownToken).ConfigureAwait(false); #endif - ]; - bool continueReading = true; - do - { - int waitId = WaitHandle.WaitAny(handles); - switch (waitId) + if ((headerBytesRead != headerByte.Length) && !shutdownToken.IsCancellationRequested) { - case 0: - // Fulfill the request for shutdown of the message pump. - CommunicationsUtilities.Trace("Shutdown message pump thread."); - continueReading = false; - break; - - case 1: + // Incomplete read. Abort. + if (headerBytesRead == 0) + { + if (_isServerDisconnecting) { - // Client recieved a packet header. Read the rest of it. - int headerBytesRead = 0; -#if FEATURE_APM - headerBytesRead = localStream.EndRead(result); -#else - headerBytesRead = readTask.ConfigureAwait(false).GetAwaiter().GetResult(); -#endif + break; + } - if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0)) - { - // Incomplete read. Abort. - if (headerBytesRead == 0) - { - if (_isServerDisconnecting) - { - continueReading = false; - break; - } - - ErrorUtilities.ThrowInternalError("Server disconnected abruptly"); - } - else - { - ErrorUtilities.ThrowInternalError($"Incomplete header read. {headerBytesRead} of {headerByte.Length} bytes read"); - } - } - - NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]); - - int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(headerByte, 1, 4)); - int packetBytesRead = 0; - - _readBufferMemoryStream.Position = 0; - _readBufferMemoryStream.SetLength(packetLength); - byte[] packetData = _readBufferMemoryStream.GetBuffer(); - - while (packetBytesRead < packetLength) - { -#if FEATURE_APM - int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead); -#else - ValueTask bytesReadTask = localStream.ReadAsync(packetData.AsMemory(packetBytesRead, packetLength - packetBytesRead)); - int bytesRead = bytesReadTask.IsCompleted ? bytesReadTask.Result : bytesReadTask.AsTask().ConfigureAwait(false).GetAwaiter().GetResult(); -#endif - if (bytesRead == 0) - { - // Incomplete read. Abort. - ErrorUtilities.ThrowInternalError($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read"); - } - - packetBytesRead += bytesRead; - } - - try - { - _packetFactory.DeserializeAndRoutePacket(0, packetType, _binaryReadTranslator); - } - catch - { - // Error while deserializing or handling packet. Logging additional info. - CommunicationsUtilities.Trace($"Packet factory failed to receive package. Exception while deserializing packet {packetType}."); - throw; - } - - if (packetType == NodePacketType.ServerNodeBuildResult) - { - continueReading = false; - } - else - { - // Start reading the next package header. -#if FEATURE_APM - result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null); - handles[1] = result.AsyncWaitHandle; + ErrorUtilities.ThrowInternalError("Server disconnected abruptly"); + } + else + { + ErrorUtilities.ThrowInternalError($"Incomplete header read. {headerBytesRead} of {headerByte.Length} bytes read"); + } + } + + NodePacketType packetType = (NodePacketType)headerByte[0]; + + int packetLength = BinaryPrimitives.ReadInt32LittleEndian(headerByte.AsSpan(1)); + int packetBytesRead = 0; + + _readBufferMemoryStream.Position = 0; + _readBufferMemoryStream.SetLength(packetLength); + byte[] packetData = _readBufferMemoryStream.GetBuffer(); + + while (packetBytesRead < packetLength) + { +#if NET + int bytesRead = await _stream.ReadAsync(packetData.AsMemory(packetBytesRead, packetLength - packetBytesRead), shutdownToken).ConfigureAwait(false); #else - readTask = localStream.ReadAsync(headerByte.AsMemory(), CancellationToken.None).AsTask(); - handles[1] = ((IAsyncResult)readTask).AsyncWaitHandle; + int bytesRead = await _stream.ReadAsync(packetData, packetBytesRead, packetLength - packetBytesRead, shutdownToken).ConfigureAwait(false); #endif - } - } - break; + if (bytesRead == 0) + { + // Incomplete read. Abort. + ErrorUtilities.ThrowInternalError($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read"); + } + + packetBytesRead += bytesRead; + } - default: - ErrorUtilities.ThrowInternalError($"WaitId {waitId} out of range."); - break; + try + { + INodePacket packet = DeserializePacket(packetType, _binaryReadTranslator); + await packetWriter.WriteAsync(packet, shutdownToken).ConfigureAwait(false); + } + catch + { + // Error while deserializing or handling packet. Logging additional info. + CommunicationsUtilities.Trace($"Packet factory failed to receive package. Exception while deserializing packet {packetType}."); + throw; + } + + if (packetType == NodePacketType.ServerNodeBuildResult) + { + break; } } - while (continueReading); + } + catch (OperationCanceledException ex) when (ex.CancellationToken == shutdownToken) + { + CommunicationsUtilities.Trace("Packet pump shutdown requested."); } catch (Exception ex) { CommunicationsUtilities.Trace($"Exception occurred in the packet pump: {ex}"); - PacketPumpException = ex; + packetWriter.Complete(ex); } CommunicationsUtilities.Trace("Ending read loop."); - PacketPumpCompleted.Set(); + packetWriter.TryComplete(); } #endregion - public void Dispose() => Stop(); + /// + /// Stops the packet pump loop, and waits for it to finish. + /// + public async ValueTask DisposeAsync() + { + _shutdownTokenSource.Cancel(); + if (_packetPumpTask is not null) + { + await _packetPumpTask.ConfigureAwait(false); + } + } /// /// Signalize that from now on we expect server will break connected named pipe. From 2ea803ff6a05788ab6ed9550d5677082d18f2847 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Wed, 20 May 2026 04:27:17 +0300 Subject: [PATCH 2/7] Make `MSBuildClient.ExecuteAsync` public. --- src/Build/BackEnd/Client/MSBuildClient.cs | 2 +- src/MSBuild/MSBuildClientApp.cs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs index 733dfd0173c..2c8d91efcce 100644 --- a/src/Build/BackEnd/Client/MSBuildClient.cs +++ b/src/Build/BackEnd/Client/MSBuildClient.cs @@ -166,7 +166,7 @@ public MSBuildClientExitResult Execute(CancellationToken cancellationToken) /// Cancellation token. /// A that will complete with a value of type /// that indicates whether the build succeeded, or the manner in which it failed. - internal async Task ExecuteAsync(CancellationToken cancellationToken) + public async Task ExecuteAsync(CancellationToken cancellationToken) { // Command line in one string used only in human readable content. string descriptiveCommandLine = string.Join(" ", _commandLine); diff --git a/src/MSBuild/MSBuildClientApp.cs b/src/MSBuild/MSBuildClientApp.cs index 1282d4fb9e3..8c6b8ef9f3f 100644 --- a/src/MSBuild/MSBuildClientApp.cs +++ b/src/MSBuild/MSBuildClientApp.cs @@ -4,6 +4,7 @@ using System; using System.Globalization; using System.Threading; +using System.Threading.Tasks; using Microsoft.Build.Experimental; using Microsoft.Build.Framework.Telemetry; using Microsoft.Build.Shared; @@ -39,10 +40,10 @@ public static MSBuildApp.ExitType Execute(string[] commandLineArgs, Cancellation { string msbuildLocation = BuildEnvironmentHelper.Instance.CurrentMSBuildExePath; - return Execute( + return ExecuteAsync( commandLineArgs, msbuildLocation, - cancellationToken); + cancellationToken).GetAwaiter().GetResult(); } /// @@ -56,10 +57,10 @@ public static MSBuildApp.ExitType Execute(string[] commandLineArgs, Cancellation /// Cancellation token. /// A value of type that indicates whether the build succeeded, /// or the manner in which it failed. - public static MSBuildApp.ExitType Execute(string[] commandLineArgs, string msbuildLocation, CancellationToken cancellationToken) + public static async Task ExecuteAsync(string[] commandLineArgs, string msbuildLocation, CancellationToken cancellationToken) { MSBuildClient msbuildClient = new MSBuildClient(commandLineArgs, msbuildLocation); - MSBuildClientExitResult exitResult = msbuildClient.Execute(cancellationToken); + MSBuildClientExitResult exitResult = await msbuildClient.ExecuteAsync(cancellationToken); if (exitResult.MSBuildClientExitType == MSBuildClientExitType.ServerBusy || exitResult.MSBuildClientExitType == MSBuildClientExitType.UnableToConnect || From 94a718bf3dc44660ea7efa4fcb3674faf25342eb Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Wed, 20 May 2026 04:29:52 +0300 Subject: [PATCH 3/7] Update comment. --- src/Build/BackEnd/Client/MSBuildClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs index 2c8d91efcce..09fbef7aa1f 100644 --- a/src/Build/BackEnd/Client/MSBuildClient.cs +++ b/src/Build/BackEnd/Client/MSBuildClient.cs @@ -341,7 +341,7 @@ private async Task ReadPacketsLoop(CancellationToken cancellationToken) await HandleCancellationAsync().ConfigureAwait(false); // After the cancelation, we want to wait to server gracefuly finish the build. - // We have to replace the cancelation token, because WaitAny would cause to repeatedly hit this branch of code. + // We have to replace the cancelation token, because the thrown OCE would cause to repeatedly hit this branch of code. cancellationToken = CancellationToken.None; continue; } From 095dafbcb80c56e688f58cfa23bd37e0f0ad6b4e Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Thu, 21 May 2026 02:26:35 +0300 Subject: [PATCH 4/7] Add direct dependency to `System.Threading.Channels` on .NET Framework. Fixes test failures. There are already binding redirects in the app.config. --- src/Build/Microsoft.Build.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Build/Microsoft.Build.csproj b/src/Build/Microsoft.Build.csproj index 7f3066e974b..87e353c7535 100644 --- a/src/Build/Microsoft.Build.csproj +++ b/src/Build/Microsoft.Build.csproj @@ -74,6 +74,7 @@ + From 6bbe10eed01f5b48799d53229906d2de427a1599 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Fri, 22 May 2026 22:08:31 +0300 Subject: [PATCH 5/7] Simplify packet deserialization; use a simple switch expression instead of a dictionary. --- src/Build/BackEnd/Client/MSBuildClient.cs | 12 ++++--- .../BackEnd/Client/MSBuildClientPacketPump.cs | 34 ++++--------------- 2 files changed, 14 insertions(+), 32 deletions(-) diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs index 09fbef7aa1f..560249da24b 100644 --- a/src/Build/BackEnd/Client/MSBuildClient.cs +++ b/src/Build/BackEnd/Client/MSBuildClient.cs @@ -145,7 +145,7 @@ private void CreateNodePipeStream() #endif ); #pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter - _packetPump = new MSBuildClientPacketPump(_nodeStream); + _packetPump = new MSBuildClientPacketPump(_nodeStream, DeserializePacket); } /// @@ -318,15 +318,19 @@ private bool ServerWasBusy() return serverWasBusy; } + private INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) => packetType switch + { + NodePacketType.ServerNodeConsoleWrite => ServerNodeConsoleWrite.FactoryForDeserialization(translator), + NodePacketType.ServerNodeBuildResult => ServerNodeBuildResult.FactoryForDeserialization(translator), + _ => throw new InvalidOperationException($"Unexpected packet type {packetType}"), + }; + private async Task ReadPacketsLoop(CancellationToken cancellationToken) { try { // Start packet pump await using MSBuildClientPacketPump packetPump = _packetPump; - - packetPump.RegisterPacketHandler(NodePacketType.ServerNodeConsoleWrite, ServerNodeConsoleWrite.FactoryForDeserialization); - packetPump.RegisterPacketHandler(NodePacketType.ServerNodeBuildResult, ServerNodeBuildResult.FactoryForDeserialization); packetPump.Start(); while (true) diff --git a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs index 04e71ffe8a9..02ad60d2d23 100644 --- a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs +++ b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs @@ -3,7 +3,6 @@ using System; using System.Buffers.Binary; -using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Channels; @@ -33,9 +32,9 @@ internal sealed class MSBuildClientPacketPump : IAsyncDisposable private readonly Channel _receivedPacketsChannel; /// - /// Mapping of packet types to deserialization methods. + /// The factory method for creating packets based on packet type. /// - private readonly Dictionary _packetDeserializationMethods; + private readonly Func _packetFactoryMethod; /// /// The memory stream for a read buffer. @@ -64,41 +63,20 @@ internal sealed class MSBuildClientPacketPump : IAsyncDisposable /// private volatile bool _isServerDisconnecting; - public MSBuildClientPacketPump(Stream stream) + public MSBuildClientPacketPump(Stream stream, Func packetFactoryMethod) { ErrorUtilities.VerifyThrowArgumentNull(stream); + ErrorUtilities.VerifyThrowArgumentNull(packetFactoryMethod); _stream = stream; _shutdownTokenSource = new CancellationTokenSource(); _receivedPacketsChannel = Channel.CreateUnbounded(s_channelOptions); - _packetDeserializationMethods = new Dictionary(); + _packetFactoryMethod = packetFactoryMethod; _readBufferMemoryStream = new MemoryStream(); _binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer()); } - /// - /// Registers a packet handler. - /// - /// The packet type for which the handler should be registered. - /// The factory used to create packets. - public void RegisterPacketHandler(NodePacketType packetType, NodePacketFactoryMethod factory) => - _packetDeserializationMethods.Add(packetType, factory); - - /// - /// Deserializes a packet. - /// - /// The packet type. - /// The translator to use as a source for packet data. - private INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) - { - ErrorUtilities.VerifyThrow( - _packetDeserializationMethods.TryGetValue(packetType, out NodePacketFactoryMethod? factory), - $"No packet handler for type {packetType}"); - - return factory(translator); - } - #region Packet Pump /// @@ -169,7 +147,7 @@ private async Task RunReadLoopAsync() try { - INodePacket packet = DeserializePacket(packetType, _binaryReadTranslator); + INodePacket packet = _packetFactoryMethod(packetType, _binaryReadTranslator); await packetWriter.WriteAsync(packet, shutdownToken).ConfigureAwait(false); } catch From 42058101bfa3e80f33413c4209012b786a783601 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Fri, 22 May 2026 22:09:19 +0300 Subject: [PATCH 6/7] Fix spelling in comments. --- src/Build/BackEnd/Client/MSBuildClient.cs | 2 +- src/Build/BackEnd/Client/MSBuildClientPacketPump.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs index 560249da24b..f681f0c1bb0 100644 --- a/src/Build/BackEnd/Client/MSBuildClient.cs +++ b/src/Build/BackEnd/Client/MSBuildClient.cs @@ -344,7 +344,7 @@ private async Task ReadPacketsLoop(CancellationToken cancellationToken) { await HandleCancellationAsync().ConfigureAwait(false); - // After the cancelation, we want to wait to server gracefuly finish the build. + // After the cancellation, we want to wait to server gracefully finish the build. // We have to replace the cancelation token, because the thrown OCE would cause to repeatedly hit this branch of code. cancellationToken = CancellationToken.None; continue; diff --git a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs index 02ad60d2d23..b0132499d36 100644 --- a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs +++ b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs @@ -95,7 +95,7 @@ private async Task RunReadLoopAsync() byte[] headerByte = new byte[5]; while (true) { - // Client recieved a packet header. Read the rest of it. + // Client received a packet header. Read the rest of it. #if NET int headerBytesRead = await _stream.ReadAsync(headerByte, shutdownToken).ConfigureAwait(false); #else From 1384c7e9e159f15a34aa3dc635b99e11bf9f045d Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Fri, 22 May 2026 22:36:31 +0300 Subject: [PATCH 7/7] Complete the received packets channel on disposal --- src/Build/BackEnd/Client/MSBuildClientPacketPump.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs index b0132499d36..c775273a568 100644 --- a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs +++ b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs @@ -188,6 +188,9 @@ public async ValueTask DisposeAsync() { await _packetPumpTask.ConfigureAwait(false); } + + // Complete the channel in case Start has not been called yet. + _ = _receivedPacketsChannel.Writer.TryComplete(); } ///