diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs
index 4c6a61fb362..f681f0c1bb0 100644
--- a/src/Build/BackEnd/Client/MSBuildClient.cs
+++ b/src/Build/BackEnd/Client/MSBuildClient.cs
@@ -5,10 +5,12 @@
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Threading;
+using System.Threading.Tasks;
using Microsoft.Build.BackEnd;
using Microsoft.Build.BackEnd.Client;
using Microsoft.Build.BackEnd.Logging;
@@ -55,11 +57,6 @@ public sealed class MSBuildClient
///
private readonly MSBuildClientExitResult _exitResult;
- ///
- /// Whether MSBuild server finished the build.
- ///
- private bool _buildFinished = false;
-
///
/// Handshake between server and client.
///
@@ -73,7 +70,7 @@ public sealed class MSBuildClient
///
/// The named pipe stream for client-server communication.
///
- private NamedPipeClientStream _nodeStream = null!;
+ private NamedPipeClientStream _nodeStream;
///
/// A way to cache a byte array when writing out packets
@@ -99,7 +96,7 @@ public sealed class MSBuildClient
///
/// Incoming packet pump and redirection.
///
- private MSBuildClientPacketPump _packetPump = null!;
+ private MSBuildClientPacketPump _packetPump;
///
/// PID of the server process this client launched (or null if no launch was attempted /
@@ -132,6 +129,9 @@ public MSBuildClient(string[] commandLine, string msbuildLocation)
CreateNodePipeStream();
}
+#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
+ [MemberNotNull(nameof(_nodeStream), nameof(_packetPump))]
+#pragma warning restore CS3016 // Arrays as attribute arguments is not CLS-compliant
private void CreateNodePipeStream()
{
#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter
@@ -145,7 +145,7 @@ private void CreateNodePipeStream()
#endif
);
#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter
- _packetPump = new MSBuildClientPacketPump(_nodeStream);
+ _packetPump = new MSBuildClientPacketPump(_nodeStream, DeserializePacket);
}
///
@@ -156,6 +156,17 @@ private void CreateNodePipeStream()
/// A value of type that indicates whether the build succeeded,
/// or the manner in which it failed.
public MSBuildClientExitResult Execute(CancellationToken cancellationToken)
+ {
+ return ExecuteAsync(cancellationToken).GetAwaiter().GetResult();
+ }
+
+ ///
+ /// Orchestrates the execution of the build on the server, responsible for client-server communication.
+ ///
+ /// Cancellation token.
+ /// A that will complete with a value of type
+ /// that indicates whether the build succeeded, or the manner in which it failed.
+ public async Task ExecuteAsync(CancellationToken cancellationToken)
{
// Command line in one string used only in human readable content.
string descriptiveCommandLine = string.Join(" ", _commandLine);
@@ -209,12 +220,12 @@ public MSBuildClientExitResult Execute(CancellationToken cancellationToken)
// Send build command.
// Let's send it outside the packet pump so that we easier and quicker deal with possible issues with connection to server.
MSBuildEventSource.Log.MSBuildServerBuildStart(descriptiveCommandLine);
- if (TrySendBuildCommand())
+ if (await TrySendBuildCommandAsync().ConfigureAwait(false))
{
_numConsoleWritePackets = 0;
_sizeOfConsoleWritePackets = 0;
- ReadPacketsLoop(cancellationToken);
+ await ReadPacketsLoop(cancellationToken).ConfigureAwait(false);
MSBuildEventSource.Log.MSBuildServerBuildStop(descriptiveCommandLine, _numConsoleWritePackets, _sizeOfConsoleWritePackets, _exitResult.MSBuildClientExitType.ToString(), _exitResult.MSBuildAppExitTypeString ?? string.Empty);
CommunicationsUtilities.Trace("Build finished.");
@@ -238,10 +249,10 @@ public static bool ShutdownServer(CancellationToken cancellationToken)
// Neither commandLine nor msbuildlocation is involved in node shutdown
var client = new MSBuildClient(commandLine: null!, msbuildLocation: null!);
- return client.TryShutdownServer(cancellationToken);
+ return client.TryShutdownServerAsync(cancellationToken).GetAwaiter().GetResult();
}
- private bool TryShutdownServer(CancellationToken cancellationToken)
+ private async Task TryShutdownServerAsync(CancellationToken cancellationToken)
{
CommunicationsUtilities.Trace("Trying shutdown server node.");
@@ -268,13 +279,13 @@ private bool TryShutdownServer(CancellationToken cancellationToken)
return false;
}
- if (!TrySendShutdownCommand())
+ if (!await TrySendShutdownCommandAsync().ConfigureAwait(false))
{
CommunicationsUtilities.Trace("Failed to send shutdown command to the server.");
return false;
}
- ReadPacketsLoop(cancellationToken);
+ await ReadPacketsLoop(cancellationToken).ConfigureAwait(false);
return _exitResult.MSBuildClientExitType == MSBuildClientExitType.Success;
}
@@ -307,51 +318,46 @@ private bool ServerWasBusy()
return serverWasBusy;
}
- private void ReadPacketsLoop(CancellationToken cancellationToken)
+ private INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) => packetType switch
+ {
+ NodePacketType.ServerNodeConsoleWrite => ServerNodeConsoleWrite.FactoryForDeserialization(translator),
+ NodePacketType.ServerNodeBuildResult => ServerNodeBuildResult.FactoryForDeserialization(translator),
+ _ => throw new InvalidOperationException($"Unexpected packet type {packetType}"),
+ };
+
+ private async Task ReadPacketsLoop(CancellationToken cancellationToken)
{
try
{
// Start packet pump
- using MSBuildClientPacketPump packetPump = _packetPump;
-
- packetPump.RegisterPacketHandler(NodePacketType.ServerNodeConsoleWrite, ServerNodeConsoleWrite.FactoryForDeserialization, packetPump);
- packetPump.RegisterPacketHandler(NodePacketType.ServerNodeBuildResult, ServerNodeBuildResult.FactoryForDeserialization, packetPump);
+ await using MSBuildClientPacketPump packetPump = _packetPump;
packetPump.Start();
- WaitHandle[] waitHandles =
+ while (true)
{
- cancellationToken.WaitHandle,
- packetPump.PacketPumpCompleted,
- packetPump.PacketReceivedEvent
- };
+ bool hasPackets;
+ try
+ {
+ hasPackets = await packetPump.ReceivedPackets.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
+ {
+ await HandleCancellationAsync().ConfigureAwait(false);
- while (!_buildFinished)
- {
- int index = WaitHandle.WaitAny(waitHandles);
- switch (index)
+ // After the cancellation, we want to wait to server gracefully finish the build.
+ // We have to replace the cancelation token, because the thrown OCE would cause to repeatedly hit this branch of code.
+ cancellationToken = CancellationToken.None;
+ continue;
+ }
+
+ if (!hasPackets)
+ {
+ break;
+ }
+
+ while (packetPump.ReceivedPackets.TryRead(out INodePacket? packet))
{
- case 0:
- HandleCancellation();
- // After the cancelation, we want to wait to server gracefuly finish the build.
- // We have to replace the cancelation handle, because WaitAny would cause to repeatedly hit this branch of code.
- waitHandles[0] = CancellationToken.None.WaitHandle;
- break;
-
- case 1:
- HandlePacketPumpCompleted(packetPump);
- break;
-
- case 2:
- while (packetPump.ReceivedPacketsQueue.TryDequeue(out INodePacket? packet) &&
- !_buildFinished)
- {
- if (packet != null)
- {
- HandlePacket(packet);
- }
- }
-
- break;
+ HandlePacket(packet);
}
}
}
@@ -408,13 +414,13 @@ private ConsoleColor QueryConsoleBackgroundColor()
return consoleBackgroundColor;
}
- private bool TrySendPacket(Func packetResolver)
+ private async ValueTask TrySendPacketAsync(Func packetResolver)
{
INodePacket? packet = null;
try
{
packet = packetResolver();
- WritePacket(_nodeStream, packet);
+ await WritePacketAsync(_nodeStream, packet).ConfigureAwait(false);
CommunicationsUtilities.Trace($"Command packet of type '{packet.Type}' sent...");
}
catch (Exception ex)
@@ -489,15 +495,15 @@ private bool TryLaunchServer()
return true;
}
- private bool TrySendBuildCommand() => TrySendPacket(() => GetServerNodeBuildCommand());
+ private ValueTask TrySendBuildCommandAsync() => TrySendPacketAsync(() => GetServerNodeBuildCommand());
- private bool TrySendCancelCommand() => TrySendPacket(() => new ServerNodeBuildCancel());
+ private ValueTask TrySendCancelCommandAsync() => TrySendPacketAsync(() => new ServerNodeBuildCancel());
- private bool TrySendShutdownCommand()
+ private ValueTask TrySendShutdownCommandAsync()
{
CommunicationsUtilities.Trace("Sending shutdown command to server.");
_packetPump.ServerWillDisconnect();
- return TrySendPacket(() => new NodeBuildComplete(false /* no node reuse */));
+ return TrySendPacketAsync(() => new NodeBuildComplete(false /* no node reuse */));
}
private ServerNodeBuildCommand GetServerNodeBuildCommand()
@@ -544,27 +550,13 @@ private ServerNodeBuildCommand GetServerNodeBuildCommand()
///
/// Handle cancellation.
///
- private void HandleCancellation()
+ private async ValueTask HandleCancellationAsync()
{
- TrySendCancelCommand();
+ _ = await TrySendCancelCommandAsync().ConfigureAwait(false);
CommunicationsUtilities.Trace("MSBuild client sent cancellation command.");
}
- ///
- /// Handle when packet pump is completed both successfully or with error.
- ///
- private void HandlePacketPumpCompleted(MSBuildClientPacketPump packetPump)
- {
- if (packetPump.PacketPumpException != null)
- {
- CommunicationsUtilities.Trace($"MSBuild client error: packet pump unexpectedly shut down: {packetPump.PacketPumpException}");
- throw packetPump.PacketPumpException ?? new InternalErrorException("Packet pump unexpectedly shut down");
- }
-
- _buildFinished = true;
- }
-
///
/// Dispatches the packet to the correct handler.
///
@@ -606,7 +598,6 @@ private void HandleServerNodeBuildResult(ServerNodeBuildResult response)
CommunicationsUtilities.Trace($"Build response received: exit code '{response.ExitCode}', exit type '{response.ExitType}'");
_exitResult.MSBuildClientExitType = MSBuildClientExitType.Success;
_exitResult.MSBuildAppExitTypeString = response.ExitType;
- _buildFinished = true;
}
///
@@ -615,13 +606,10 @@ private void HandleServerNodeBuildResult(ServerNodeBuildResult response)
/// Whether the client connected to MSBuild server successfully.
private bool TryConnectToServer(int timeoutMilliseconds)
{
- bool tryAgain = true;
Stopwatch sw = Stopwatch.StartNew();
- while (tryAgain && sw.ElapsedMilliseconds < timeoutMilliseconds)
+ while (sw.ElapsedMilliseconds < timeoutMilliseconds)
{
- tryAgain = false;
-
HandshakeResult result;
bool connected;
try
@@ -662,7 +650,6 @@ private bool TryConnectToServer(int timeoutMilliseconds)
CommunicationsUtilities.Trace($"Retrying to connect to server after {sw.ElapsedMilliseconds} ms");
// This solves race condition for time in which server started but have not yet listen on pipe or
// when it just finished build request and is recycling pipe.
- tryAgain = true;
CreateNodePipeStream();
}
else
@@ -728,7 +715,7 @@ private void LogConnectFailureDiagnostics(int timeoutMilliseconds, bool isTimeou
"If the server child process exited immediately, ensure DOTNET_ROOT is set correctly so the apphost can locate the .NET runtime.");
}
- private void WritePacket(Stream nodeStream, INodePacket packet)
+ private async ValueTask WritePacketAsync(Stream nodeStream, INodePacket packet)
{
MemoryStream memoryStream = _packetMemoryStream;
memoryStream.SetLength(0);
@@ -750,7 +737,11 @@ private void WritePacket(Stream nodeStream, INodePacket packet)
memoryStream.Position = 1;
_binaryWriter.Write(packetStreamLength - 5);
- nodeStream.Write(memoryStream.GetBuffer(), 0, packetStreamLength);
+#if NET
+ await nodeStream.WriteAsync(memoryStream.GetBuffer().AsMemory(0, packetStreamLength)).ConfigureAwait(false);
+#else
+ await nodeStream.WriteAsync(memoryStream.GetBuffer(), 0, packetStreamLength).ConfigureAwait(false);
+#endif
}
}
}
diff --git a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs
index de04a127a7a..c775273a568 100644
--- a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs
+++ b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs
@@ -3,50 +3,38 @@
using System;
using System.Buffers.Binary;
-using System.Collections.Concurrent;
using System.IO;
using System.Threading;
-
-#if NET
+using System.Threading.Channels;
using System.Threading.Tasks;
-#endif
-
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
namespace Microsoft.Build.BackEnd.Client
{
- internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketFactory, IDisposable
+ internal sealed class MSBuildClientPacketPump : IAsyncDisposable
{
- ///
- /// The queue of packets we have received but which have not yet been processed.
- ///
- public ConcurrentQueue ReceivedPacketsQueue { get; }
+ private static readonly UnboundedChannelOptions s_channelOptions = new()
+ {
+ AllowSynchronousContinuations = true,
+ SingleReader = true,
+ SingleWriter = true,
+ };
///
- /// Set when packet pump receive packets and put them to .
+ /// The queue of packets we have received but which have not yet been processed.
+ /// Only one reader is allowed to read from this channel at a time.
///
- public AutoResetEvent PacketReceivedEvent { get; }
+ public ChannelReader ReceivedPackets => _receivedPacketsChannel.Reader;
- ///
- /// Set when the packet pump terminates.
- ///
- public ManualResetEvent PacketPumpCompleted { get; }
+ private readonly CancellationTokenSource _shutdownTokenSource;
- ///
- /// Exception appeared when the packet pump unexpectedly terminates (due to connection problems or because of deserialization issues).
- ///
- public Exception? PacketPumpException { get; set; }
+ private readonly Channel _receivedPacketsChannel;
///
- /// Set when packet pump should shutdown.
+ /// The factory method for creating packets based on packet type.
///
- private readonly ManualResetEvent _packetPumpShutdownEvent;
-
- ///
- /// The packet factory.
- ///
- private readonly NodePacketFactory _packetFactory;
+ private readonly Func _packetFactoryMethod;
///
/// The memory stream for a read buffer.
@@ -54,9 +42,9 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF
private readonly MemoryStream _readBufferMemoryStream;
///
- /// The thread which runs the asynchronous packet pump
+ /// The task which runs the asynchronous packet pump.
///
- private Thread? _packetPumpThread;
+ private Task? _packetPumpTask;
///
/// The stream from where to read packets.
@@ -73,272 +61,137 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF
/// In such case we have sent last packet to server side and we expect
/// it will soon broke pipe connection - unless client do it first.
///
- private bool _isServerDisconnecting;
+ private volatile bool _isServerDisconnecting;
- public MSBuildClientPacketPump(Stream stream)
+ public MSBuildClientPacketPump(Stream stream, Func packetFactoryMethod)
{
ErrorUtilities.VerifyThrowArgumentNull(stream);
+ ErrorUtilities.VerifyThrowArgumentNull(packetFactoryMethod);
_stream = stream;
- _isServerDisconnecting = false;
- _packetFactory = new NodePacketFactory();
-
- ReceivedPacketsQueue = new ConcurrentQueue();
- PacketReceivedEvent = new AutoResetEvent(false);
- PacketPumpCompleted = new ManualResetEvent(false);
- _packetPumpShutdownEvent = new ManualResetEvent(false);
+ _shutdownTokenSource = new CancellationTokenSource();
+ _receivedPacketsChannel = Channel.CreateUnbounded(s_channelOptions);
+ _packetFactoryMethod = packetFactoryMethod;
_readBufferMemoryStream = new MemoryStream();
_binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer());
}
- #region INodePacketFactory Members
-
- ///
- /// Registers a packet handler.
- ///
- /// The packet type for which the handler should be registered.
- /// The factory used to create packets.
- /// The handler for the packets.
- public void RegisterPacketHandler(NodePacketType packetType, NodePacketFactoryMethod factory, INodePacketHandler handler)
- {
- _packetFactory.RegisterPacketHandler(packetType, factory, handler);
- }
-
- ///
- /// Unregisters a packet handler.
- ///
- /// The type of packet for which the handler should be unregistered.
- public void UnregisterPacketHandler(NodePacketType packetType)
- {
- _packetFactory.UnregisterPacketHandler(packetType);
- }
-
- ///
- /// Deserializes and routes a packer to the appropriate handler.
- ///
- /// The node from which the packet was received.
- /// The packet type.
- /// The translator to use as a source for packet data.
- public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
- {
- _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator);
- }
-
- ///
- /// Deserializes a packet.
- ///
- /// The packet type.
- /// The translator to use as a source for packet data.
- public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
- {
- return _packetFactory.DeserializePacket(packetType, translator);
- }
-
- ///
- /// Routes a packet to the appropriate handler.
- ///
- /// The node id from which the packet was received.
- /// The packet to route.
- public void RoutePacket(int nodeId, INodePacket packet)
- {
- _packetFactory.RoutePacket(nodeId, packet);
- }
-
- #endregion
-
- #region INodePacketHandler Members
-
- ///
- /// Called when a packet has been received.
- ///
- /// The node from which the packet was received.
- /// The packet.
- public void PacketReceived(int node, INodePacket packet)
- {
- ReceivedPacketsQueue.Enqueue(packet);
- PacketReceivedEvent.Set();
- }
-
- #endregion
-
#region Packet Pump
- ///
- /// Initializes the packet pump thread.
- ///
- public void Start()
- {
- _packetPumpThread = new Thread(PacketPumpProc)
- {
- IsBackground = true,
- Name = "MSBuild Client Packet Pump"
- };
- _packetPumpThread.Start();
- }
///
- /// Stops the packet pump thread.
+ /// Initializes the packet pump task.
///
- public void Stop()
- {
- _packetPumpShutdownEvent.Set();
- _packetPumpThread?.Join();
- }
+ public void Start() => _packetPumpTask = Task.Run(RunReadLoopAsync);
- ///
- /// This method handles the packet pump reading. It will terminate when the terminate event is
- /// set.
- ///
- ///
- /// Instead of throwing an exception, puts it in and raises event .
- ///
- private void PacketPumpProc()
- {
- RunReadLoop(_stream, _packetPumpShutdownEvent);
- }
-
- private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShutdownEvent)
+ private async Task RunReadLoopAsync()
{
CommunicationsUtilities.Trace("Entering read loop.");
+ CancellationToken shutdownToken = _shutdownTokenSource.Token;
+ ChannelWriter packetWriter = _receivedPacketsChannel.Writer;
try
{
byte[] headerByte = new byte[5];
-#if FEATURE_APM
- IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
-#else
- Task readTask = localStream.ReadAsync(headerByte.AsMemory(), CancellationToken.None).AsTask();
-#endif
-
- // Ordering of the wait handles is important. The first signalled wait handle in the array
- // will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
- // terminate event triggered so that we cannot get into a situation where packets are being
- // spammed to the client and it never gets an opportunity to shutdown.
- WaitHandle[] handles =
- [
- localPacketPumpShutdownEvent,
-#if FEATURE_APM
- result.AsyncWaitHandle
+ while (true)
+ {
+ // Client received a packet header. Read the rest of it.
+#if NET
+ int headerBytesRead = await _stream.ReadAsync(headerByte, shutdownToken).ConfigureAwait(false);
#else
- ((IAsyncResult)readTask).AsyncWaitHandle
+ int headerBytesRead = await _stream.ReadAsync(headerByte, 0, headerByte.Length, shutdownToken).ConfigureAwait(false);
#endif
- ];
- bool continueReading = true;
- do
- {
- int waitId = WaitHandle.WaitAny(handles);
- switch (waitId)
+ if ((headerBytesRead != headerByte.Length) && !shutdownToken.IsCancellationRequested)
{
- case 0:
- // Fulfill the request for shutdown of the message pump.
- CommunicationsUtilities.Trace("Shutdown message pump thread.");
- continueReading = false;
- break;
-
- case 1:
+ // Incomplete read. Abort.
+ if (headerBytesRead == 0)
+ {
+ if (_isServerDisconnecting)
{
- // Client recieved a packet header. Read the rest of it.
- int headerBytesRead = 0;
-#if FEATURE_APM
- headerBytesRead = localStream.EndRead(result);
-#else
- headerBytesRead = readTask.ConfigureAwait(false).GetAwaiter().GetResult();
-#endif
-
- if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0))
- {
- // Incomplete read. Abort.
- if (headerBytesRead == 0)
- {
- if (_isServerDisconnecting)
- {
- continueReading = false;
- break;
- }
+ break;
+ }
- ErrorUtilities.ThrowInternalError("Server disconnected abruptly");
- }
- else
- {
- ErrorUtilities.ThrowInternalError($"Incomplete header read. {headerBytesRead} of {headerByte.Length} bytes read");
- }
- }
+ ErrorUtilities.ThrowInternalError("Server disconnected abruptly");
+ }
+ else
+ {
+ ErrorUtilities.ThrowInternalError($"Incomplete header read. {headerBytesRead} of {headerByte.Length} bytes read");
+ }
+ }
- NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
+ NodePacketType packetType = (NodePacketType)headerByte[0];
- int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(headerByte, 1, 4));
- int packetBytesRead = 0;
+ int packetLength = BinaryPrimitives.ReadInt32LittleEndian(headerByte.AsSpan(1));
+ int packetBytesRead = 0;
- _readBufferMemoryStream.Position = 0;
- _readBufferMemoryStream.SetLength(packetLength);
- byte[] packetData = _readBufferMemoryStream.GetBuffer();
+ _readBufferMemoryStream.Position = 0;
+ _readBufferMemoryStream.SetLength(packetLength);
+ byte[] packetData = _readBufferMemoryStream.GetBuffer();
- while (packetBytesRead < packetLength)
- {
-#if FEATURE_APM
- int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead);
+ while (packetBytesRead < packetLength)
+ {
+#if NET
+ int bytesRead = await _stream.ReadAsync(packetData.AsMemory(packetBytesRead, packetLength - packetBytesRead), shutdownToken).ConfigureAwait(false);
#else
- ValueTask bytesReadTask = localStream.ReadAsync(packetData.AsMemory(packetBytesRead, packetLength - packetBytesRead));
- int bytesRead = bytesReadTask.IsCompleted ? bytesReadTask.Result : bytesReadTask.AsTask().ConfigureAwait(false).GetAwaiter().GetResult();
+ int bytesRead = await _stream.ReadAsync(packetData, packetBytesRead, packetLength - packetBytesRead, shutdownToken).ConfigureAwait(false);
#endif
- if (bytesRead == 0)
- {
- // Incomplete read. Abort.
- ErrorUtilities.ThrowInternalError($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read");
- }
-
- packetBytesRead += bytesRead;
- }
+ if (bytesRead == 0)
+ {
+ // Incomplete read. Abort.
+ ErrorUtilities.ThrowInternalError($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read");
+ }
- try
- {
- _packetFactory.DeserializeAndRoutePacket(0, packetType, _binaryReadTranslator);
- }
- catch
- {
- // Error while deserializing or handling packet. Logging additional info.
- CommunicationsUtilities.Trace($"Packet factory failed to receive package. Exception while deserializing packet {packetType}.");
- throw;
- }
+ packetBytesRead += bytesRead;
+ }
- if (packetType == NodePacketType.ServerNodeBuildResult)
- {
- continueReading = false;
- }
- else
- {
- // Start reading the next package header.
-#if FEATURE_APM
- result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
- handles[1] = result.AsyncWaitHandle;
-#else
- readTask = localStream.ReadAsync(headerByte.AsMemory(), CancellationToken.None).AsTask();
- handles[1] = ((IAsyncResult)readTask).AsyncWaitHandle;
-#endif
- }
- }
- break;
+ try
+ {
+ INodePacket packet = _packetFactoryMethod(packetType, _binaryReadTranslator);
+ await packetWriter.WriteAsync(packet, shutdownToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Error while deserializing or handling packet. Logging additional info.
+ CommunicationsUtilities.Trace($"Packet factory failed to receive package. Exception while deserializing packet {packetType}.");
+ throw;
+ }
- default:
- ErrorUtilities.ThrowInternalError($"WaitId {waitId} out of range.");
- break;
+ if (packetType == NodePacketType.ServerNodeBuildResult)
+ {
+ break;
}
}
- while (continueReading);
+ }
+ catch (OperationCanceledException ex) when (ex.CancellationToken == shutdownToken)
+ {
+ CommunicationsUtilities.Trace("Packet pump shutdown requested.");
}
catch (Exception ex)
{
CommunicationsUtilities.Trace($"Exception occurred in the packet pump: {ex}");
- PacketPumpException = ex;
+ packetWriter.Complete(ex);
}
CommunicationsUtilities.Trace("Ending read loop.");
- PacketPumpCompleted.Set();
+ packetWriter.TryComplete();
}
#endregion
- public void Dispose() => Stop();
+ ///
+ /// Stops the packet pump loop, and waits for it to finish.
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ _shutdownTokenSource.Cancel();
+ if (_packetPumpTask is not null)
+ {
+ await _packetPumpTask.ConfigureAwait(false);
+ }
+
+ // Complete the channel in case Start has not been called yet.
+ _ = _receivedPacketsChannel.Writer.TryComplete();
+ }
///
/// Signalize that from now on we expect server will break connected named pipe.
diff --git a/src/Build/Microsoft.Build.csproj b/src/Build/Microsoft.Build.csproj
index 7f3066e974b..87e353c7535 100644
--- a/src/Build/Microsoft.Build.csproj
+++ b/src/Build/Microsoft.Build.csproj
@@ -74,6 +74,7 @@
+
diff --git a/src/MSBuild/MSBuildClientApp.cs b/src/MSBuild/MSBuildClientApp.cs
index 1282d4fb9e3..8c6b8ef9f3f 100644
--- a/src/MSBuild/MSBuildClientApp.cs
+++ b/src/MSBuild/MSBuildClientApp.cs
@@ -4,6 +4,7 @@
using System;
using System.Globalization;
using System.Threading;
+using System.Threading.Tasks;
using Microsoft.Build.Experimental;
using Microsoft.Build.Framework.Telemetry;
using Microsoft.Build.Shared;
@@ -39,10 +40,10 @@ public static MSBuildApp.ExitType Execute(string[] commandLineArgs, Cancellation
{
string msbuildLocation = BuildEnvironmentHelper.Instance.CurrentMSBuildExePath;
- return Execute(
+ return ExecuteAsync(
commandLineArgs,
msbuildLocation,
- cancellationToken);
+ cancellationToken).GetAwaiter().GetResult();
}
///
@@ -56,10 +57,10 @@ public static MSBuildApp.ExitType Execute(string[] commandLineArgs, Cancellation
/// Cancellation token.
/// A value of type that indicates whether the build succeeded,
/// or the manner in which it failed.
- public static MSBuildApp.ExitType Execute(string[] commandLineArgs, string msbuildLocation, CancellationToken cancellationToken)
+ public static async Task ExecuteAsync(string[] commandLineArgs, string msbuildLocation, CancellationToken cancellationToken)
{
MSBuildClient msbuildClient = new MSBuildClient(commandLineArgs, msbuildLocation);
- MSBuildClientExitResult exitResult = msbuildClient.Execute(cancellationToken);
+ MSBuildClientExitResult exitResult = await msbuildClient.ExecuteAsync(cancellationToken);
if (exitResult.MSBuildClientExitType == MSBuildClientExitType.ServerBusy ||
exitResult.MSBuildClientExitType == MSBuildClientExitType.UnableToConnect ||