-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTaskStateManager.cs
More file actions
137 lines (113 loc) · 5.02 KB
/
TaskStateManager.cs
File metadata and controls
137 lines (113 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
using System.Diagnostics;
using AsyncKeyedLock;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
namespace TaskTurnstile;
internal sealed class TaskStateManager(ITaskStateStore store, TaskTurnstileOptions options) : ITaskStateManager
{
private readonly AsyncKeyedLocker<string> _locker = new();
private readonly IMemoryCache _localCache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
private const int DefaultPollIntervalMs = 250;
public Task<bool> IsRunningAsync(string taskName, CancellationToken cancellationToken = default) =>
store.IsRunningAsync(taskName, cancellationToken);
public async Task<bool> CanStartAsync(string taskName, CancellationToken cancellationToken = default)
{
if (!await store.IsRunningAsync(taskName, cancellationToken))
return true;
return await store.IsExpiredAsync(taskName, cancellationToken);
}
public async Task RunAsync(string taskName, Func<CancellationToken, Task> work, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
await WaitAndStartAsync(taskName, maxRuntime ?? options.DefaultMaxRuntime, DefaultPollIntervalMs, null, cancellationToken);
try
{
await work(cancellationToken);
}
finally
{
await TryStopAsync(taskName);
}
}
public async Task<bool> TryRunAsync(string taskName, Func<CancellationToken, Task> work, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
if (!await StartAsync(taskName, maxRuntime, cancellationToken))
return false;
try
{
await work(cancellationToken);
return true;
}
finally
{
await TryStopAsync(taskName);
}
}
public async Task<TryRunResult<T>> TryRunAsync<T>(string taskName, Func<CancellationToken, Task<T>> work, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
if (!await StartAsync(taskName, maxRuntime, cancellationToken))
return TryRunResult<T>.Skipped;
try
{
var value = await work(cancellationToken);
return TryRunResult<T>.Ran(value);
}
finally
{
await TryStopAsync(taskName);
}
}
public Task WaitAsync(string taskName, CancellationToken cancellationToken = default) =>
WaitAndStartAsync(taskName, options.DefaultMaxRuntime, DefaultPollIntervalMs, null, cancellationToken);
public Task WaitAsync(string taskName, int pollIntervalMs, int? maxWaitMs = null, CancellationToken cancellationToken = default) =>
WaitAndStartAsync(taskName, options.DefaultMaxRuntime, pollIntervalMs, maxWaitMs, cancellationToken);
private async Task WaitAndStartAsync(string taskName, TimeSpan? maxRuntime, int pollIntervalMs, int? maxWaitMs, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
// Fast path: if this instance started the task and it hasn't expired locally,
// skip the backing store query entirely.
if (_localCache.TryGetValue(taskName, out _))
{
if (maxWaitMs.HasValue && stopwatch.ElapsedMilliseconds >= maxWaitMs.Value)
throw new TimeoutException($"Task '{taskName}' did not become available within {maxWaitMs.Value}ms.");
await Task.Delay(pollIntervalMs, cancellationToken);
continue;
}
if (await StartAsync(taskName, maxRuntime, cancellationToken))
return;
if (maxWaitMs.HasValue && stopwatch.ElapsedMilliseconds >= maxWaitMs.Value)
throw new TimeoutException($"Task '{taskName}' did not become available within {maxWaitMs.Value}ms.");
await Task.Delay(pollIntervalMs, cancellationToken);
}
}
public async Task<bool> StartAsync(string taskName, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
using (await _locker.LockAsync(taskName, cancellationToken))
{
if (!await CanStartAsync(taskName, cancellationToken))
return false;
await store.SetRunningAsync(taskName, maxRuntime ?? options.DefaultMaxRuntime, cancellationToken);
var effectiveRuntime = maxRuntime ?? options.DefaultMaxRuntime;
if (effectiveRuntime.HasValue)
_localCache.Set(taskName, true, absoluteExpirationRelativeToNow: effectiveRuntime.Value);
else
_localCache.Set(taskName, true);
return true;
}
}
public async Task<bool> TryStopAsync(string taskName)
{
try
{
_localCache.Remove(taskName);
await store.SetStoppedAsync(taskName, CancellationToken.None);
return true;
}
catch
{
return false;
}
}
}