Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public sealed class PreviousAttemptReference
public static class HelixJobMonitorUtilities
{
public static bool AreNonMonitorJobsComplete(IEnumerable<AzureDevOpsTimelineRecord> records, string jobMonitorName)
=> GetRelevantJobRecords(records, jobMonitorName).All(IsTerminal);
=> GetRelevantNonMonitorJobRecords(records, jobMonitorName).All(IsTerminal);

public static bool HasFailedNonMonitorJobs(IEnumerable<AzureDevOpsTimelineRecord> records, string jobMonitorName)
=> HasFailedNonMonitorJobs(records, jobMonitorName, new HashSet<string>(StringComparer.OrdinalIgnoreCase));
Expand All @@ -65,12 +65,17 @@ public static bool HasFailedNonMonitorJobs(
IEnumerable<AzureDevOpsTimelineRecord> records,
string jobMonitorName,
IReadOnlySet<string> ignoredJobNames)
=> GetRelevantJobRecords(records, jobMonitorName)
=> GetRelevantNonMonitorJobRecords(records, jobMonitorName)
.Where(r => ignoredJobNames == null || !ignoredJobNames.Contains(r.ReferenceName))
.Any(r =>
string.Equals(r.Result, "failed", StringComparison.OrdinalIgnoreCase)
|| string.Equals(r.Result, "canceled", StringComparison.OrdinalIgnoreCase));

public static IEnumerable<AzureDevOpsTimelineRecord> GetRelevantNonMonitorJobRecords(
IEnumerable<AzureDevOpsTimelineRecord> records,
string jobMonitorName)
=> GetRelevantJobRecords(records, jobMonitorName);

/// <summary>
/// Returns the subset of <paramref name="records"/> that belongs to the pipeline stage
/// named <paramref name="stageName"/>, including the Stage record itself and any
Expand Down
67 changes: 56 additions & 11 deletions src/Microsoft.DotNet.Helix/JobMonitor/JobMonitorRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private async Task<int> RunCoreAsync(CancellationToken cancellationToken)
// (typically Status="running", Finished=null), which would cause ReportTimeout and
// CancelInFlightHelixJobsAsync to treat already-completed jobs as still in flight.
Dictionary<string, HelixJobInfo> associatedJobs = new(StringComparer.OrdinalIgnoreCase);
var latestTimelineRecords = new List<AzureDevOpsTimelineRecord>();

try
{
Expand All @@ -127,6 +128,7 @@ private async Task<int> RunCoreAsync(CancellationToken cancellationToken)
return await RunLoopAsync(
processedHelixJobs,
associatedJobs,
latestTimelineRecords,
jobResubmission,
cancellationToken);
}
Expand All @@ -138,7 +140,7 @@ private async Task<int> RunCoreAsync(CancellationToken cancellationToken)
// lost (and tests that observe UploadedJobNames immediately after a cancelled
// run see a partial set).
await WaitForPendingTestResultUploadsAsync(CancellationToken.None);
ReportTimeout(associatedJobs.Values, processedHelixJobs);
ReportTimeout(associatedJobs.Values, processedHelixJobs, latestTimelineRecords);

// On cancellation (typically the overall timeout), proactively cancel any
// Helix jobs we know about that haven't finished yet so they don't keep
Expand Down Expand Up @@ -200,6 +202,7 @@ private async Task CancelInFlightHelixJobsAsync(
private async Task<int> RunLoopAsync(
HashSet<string> processedHelixJobs,
Dictionary<string, HelixJobInfo> associatedJobs,
List<AzureDevOpsTimelineRecord> latestTimelineRecords,
JobResubmissionResult jobResubmission,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -229,6 +232,8 @@ private async Task<int> RunLoopAsync(
string.IsNullOrEmpty(j.StageName)
|| string.Equals(j.StageName, _options.StageName, StringComparison.OrdinalIgnoreCase))
];
latestTimelineRecords.Clear();
latestTimelineRecords.AddRange(timelineRecords);

// Overwrite per poll so the cached entry reflects the latest Helix-side state
// (in particular, the Finished timestamp transitioning from null to a value).
Expand Down Expand Up @@ -954,28 +959,68 @@ public int GetHashCode((string ChainKey, string WorkItemName) obj)

private void ReportTimeout(
IEnumerable<HelixJobInfo> latestAssociatedJobs,
HashSet<string> processedHelixJobs)
HashSet<string> processedHelixJobs,
IReadOnlyList<AzureDevOpsTimelineRecord> latestTimelineRecords)
{
var timeout = TimeSpan.FromMinutes(_options.MaximumWaitMinutes);
var unfinishedJobs = latestAssociatedJobs
var unfinishedHelixJobs = GetLatestHelixJobAttempts(latestAssociatedJobs)
.Where(j => !j.IsCompleted || !processedHelixJobs.Contains(j.JobName))
.OrderBy(j => j.JobName, StringComparer.OrdinalIgnoreCase)
.ToList();
var inProgressPipelineJobs = GetInProgressNonMonitorPipelineJobs(latestTimelineRecords, _options.JobMonitorName)
.OrderBy(r => r.Name ?? r.ReferenceName ?? r.Identifier ?? string.Empty, StringComparer.OrdinalIgnoreCase)
.ToList();

if (unfinishedJobs.Count == 0)
if (unfinishedHelixJobs.Count == 0 && inProgressPipelineJobs.Count == 0)
{
_logger.LogCritical("Helix Job Monitor timed out after {TimeoutMinutes} minute(s) ({Timeout}). No unfinished Helix jobs were tracked at the time of timeout.",
_logger.LogCritical("Helix Job Monitor timed out after {TimeoutMinutes} minute(s) ({Timeout}). No unfinished Helix or Azure DevOps jobs were tracked at the time of timeout.",
timeout.TotalMinutes,
timeout);
return;
}

_logger.LogError(
$"Helix Job Monitor timed out after {{TimeoutMinutes}} minute(s) ({{Timeout}}). {{UnfinishedCount}} Helix job(s) had not finished:{Environment.NewLine}- {{UnfinishedJobs}}{Environment.NewLine}",
timeout.TotalMinutes,
timeout,
unfinishedJobs.Count,
string.Join(Environment.NewLine + "- ", unfinishedJobs.Select(j => $"{j.DetailsUri} ({j.QueueId})")));
if (unfinishedHelixJobs.Count > 0)
{
_logger.LogError(
$"Helix Job Monitor timed out after {{TimeoutMinutes}} minute(s) ({{Timeout}}). {{UnfinishedCount}} Helix job(s) were unfinished or unprocessed:{Environment.NewLine}- {{UnfinishedJobs}}{Environment.NewLine}",
timeout.TotalMinutes,
timeout,
unfinishedHelixJobs.Count,
string.Join(Environment.NewLine + "- ", unfinishedHelixJobs.Select(FormatUnfinishedHelixJobForTimeoutLog)));
Comment thread
premun marked this conversation as resolved.
}

if (inProgressPipelineJobs.Count > 0)
{
_logger.LogError(
$"At timeout, {{InProgressCount}} non-monitor Azure DevOps pipeline job(s) were still in progress or queued:{Environment.NewLine}- {{InProgressJobs}}{Environment.NewLine}",
inProgressPipelineJobs.Count,
string.Join(Environment.NewLine + "- ", inProgressPipelineJobs.Select(FormatInProgressPipelineJobForTimeoutLog)));
}
Comment thread
mmitche marked this conversation as resolved.
}

private static IEnumerable<AzureDevOpsTimelineRecord> GetInProgressNonMonitorPipelineJobs(
IReadOnlyList<AzureDevOpsTimelineRecord> timelineRecords,
string jobMonitorName)
=> HelixJobMonitorUtilities.GetRelevantNonMonitorJobRecords(timelineRecords, jobMonitorName)
.Where(r => !string.Equals(r.State, "completed", StringComparison.OrdinalIgnoreCase));

private static string FormatUnfinishedHelixJobForTimeoutLog(HelixJobInfo helixJob)
{
string workItemCountText = helixJob.InitialWorkItemCount?.ToString() ?? "unknown";
return $"{helixJob.DisplayName} [status={helixJob.Status}, initialWorkItems={workItemCountText}]{Environment.NewLine} {helixJob.DetailsUri}";
}

private static string FormatInProgressPipelineJobForTimeoutLog(AzureDevOpsTimelineRecord timelineRecord)
{
string state = string.IsNullOrEmpty(timelineRecord.State) ? "unknown" : timelineRecord.State;
string result = string.IsNullOrEmpty(timelineRecord.Result) ? "none" : timelineRecord.Result;
string name = string.IsNullOrEmpty(timelineRecord.Name) ? timelineRecord.ReferenceName : timelineRecord.Name;
if (string.IsNullOrEmpty(name))
{
name = timelineRecord.Identifier;
}

return $"{name} [state={state}, result={result}]";
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,7 @@ public async Task MonitorTimesOut_CancelsLatestInFlightHelixJobs()
{
var azdo = new FakeAzureDevOpsService();
var helix = new FakeHelixService();
var logger = new RecordingLogger();

azdo.AddTimelineResponse(
MonitorJob(),
Expand All @@ -1241,7 +1242,7 @@ public async Task MonitorTimesOut_CancelsLatestInFlightHelixJobs()
});

using var cts = new CancellationTokenSource();
var runner = new JobMonitorRunner(DefaultOptions(), NullLogger.Instance, azdo, helix,
var runner = new JobMonitorRunner(DefaultOptions(), logger, azdo, helix,
async (_, _) =>
{
Task completed = await Task.WhenAny(azdo.UploadCompleted.Task, Task.Delay(TimeSpan.FromSeconds(5)));
Expand All @@ -1256,6 +1257,15 @@ public async Task MonitorTimesOut_CancelsLatestInFlightHelixJobs()
Assert.Equal(
["helix-new-attempt", "helix-running", "helix-waiting"],
helix.CanceledJobs.OrderBy(jobName => jobName, StringComparer.OrdinalIgnoreCase).ToArray());
Assert.Contains(logger.Messages, message =>
message.Contains("Helix job(s) were unfinished or unprocessed", StringComparison.Ordinal)
&& message.Contains("helix-new-attempt", StringComparison.Ordinal)
&& message.Contains("status=running", StringComparison.Ordinal)
&& message.Contains("helix-waiting", StringComparison.Ordinal)
&& message.Contains("status=waiting", StringComparison.Ordinal));
Assert.Contains(logger.Messages, message =>
message.Contains("non-monitor Azure DevOps pipeline job(s) were still in progress or queued", StringComparison.Ordinal)
&& message.Contains("Test Linux [state=inProgress, result=none]", StringComparison.Ordinal));
}

/// <summary>
Expand Down
Loading