Skip to content

Commit f50b1da

Browse files
Add atomic task claiming and stale task release functionality
- Introduced atomic claiming mechanism in TaskSchedulerManager to prevent duplicate task executions. - Added CLAIM_TIMEOUT_MINUTES constant to define the timeout for stale claimed tasks. - Implemented releaseStaleClaimedTasks method to release tasks that have been claimed for too long. - Refactored scheduleTasks method to utilize the new atomic claim logic. - Updated ScheduledTask model to include claimed_by and claimed_at fields. - Created migration to add claimed fields to the scheduled_tasks table. - Added tests to ensure atomic claim functionality and stale claim release behavior.
1 parent 864f6f7 commit f50b1da

File tree

4 files changed

+638
-68
lines changed

4 files changed

+638
-68
lines changed

ProcessMaker/Managers/TaskSchedulerManager.php

Lines changed: 178 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Illuminate\Support\Facades\DB;
1313
use Illuminate\Support\Facades\Log;
1414
use Illuminate\Support\Facades\Schema;
15+
use Illuminate\Support\Str;
1516
use PDOException;
1617
use ProcessMaker\Facades\WorkflowManager;
1718
use ProcessMaker\Jobs\StartEventConditional;
@@ -133,89 +134,198 @@ private function scheduleTask(
133134
}
134135

135136
/**
136-
* Checks the schedule_tasks table to execute jobs
137+
* Timeout in minutes for stale claimed tasks.
138+
* If a task has been claimed for longer than this, it will be released.
139+
*/
140+
const CLAIM_TIMEOUT_MINUTES = 5;
141+
142+
/**
143+
* Checks the schedule_tasks table to execute jobs.
144+
* Uses atomic claim per task to prevent duplicate executions while maintaining
145+
* the original selection logic (nextDate calculation).
137146
*/
138147
public function scheduleTasks()
139148
{
140149
$today = $this->today();
150+
$todayFormatted = $today->format('Y-m-d H:i:s');
151+
141152
try {
142-
/**
143-
* This validation is removed; the database schema should exist before
144-
* any initiation of 'jobs' and 'schedule'.
145-
*
146-
* if (!Schema::hasTable('scheduled_tasks')) {
147-
* return;
148-
* }
149-
*/
150153
$this->removeExpiredLocks();
151154

152-
$tasks = ScheduledTask::cursor();
155+
// 1. Release stale claims (tasks that were claimed but never completed)
156+
$this->releaseStaleClaimedTasks();
157+
158+
// 2. Get candidate tasks using cursor() for memory efficiency
159+
// We filter by unclaimed tasks only, but evaluate nextDate for each
160+
$tasks = ScheduledTask::whereNull('claimed_by')->cursor();
153161

154162
foreach ($tasks as $task) {
155-
try {
156-
$config = json_decode($task->configuration);
157-
158-
$lastExecution = new DateTime($task->last_execution, new DateTimeZone('UTC'));
159-
160-
if ($lastExecution === null) {
161-
continue;
162-
}
163-
$owner = $task->processRequestToken ?: $task->processRequest ?: $task->process;
164-
$ownerDateTime = $owner?->created_at;
165-
$nextDate = $this->nextDate($today, $config, $lastExecution, $ownerDateTime);
166-
167-
// if no execution date exists we go to the next task
168-
if (empty($nextDate)) {
169-
continue;
170-
}
171-
172-
// Since the task scheduler has a presition of 1 minute (crontab)
173-
// the times must be rounded or trucated to the nearest HH:MM:00 before compare
174-
$method = config('app.timer_events_seconds') . 'DateTime';
175-
$todayWithoutSeconds = $this->$method($today);
176-
$nextDateWithoutSeconds = $this->$method($nextDate);
177-
if ($nextDateWithoutSeconds <= $todayWithoutSeconds) {
178-
switch ($task->type) {
179-
case 'TIMER_START_EVENT':
180-
$this->executeTimerStartEvent($task, $config);
181-
$task->last_execution = $today->format('Y-m-d H:i:s');
182-
$task->save();
183-
break;
184-
case 'INTERMEDIATE_TIMER_EVENT':
185-
$executed = $this->executeIntermediateTimerEvent($task, $config);
186-
$task->last_execution = $today->format('Y-m-d H:i:s');
187-
if ($executed) {
188-
$task->save();
189-
}
190-
break;
191-
case 'BOUNDARY_TIMER_EVENT':
192-
$executed = $this->executeBoundaryTimerEvent($task, $config);
193-
$task->last_execution = $today->format('Y-m-d H:i:s');
194-
if ($executed) {
195-
$task->save();
196-
}
197-
break;
198-
case 'SCHEDULED_JOB':
199-
$this->executeScheduledJob($config);
200-
$task->last_execution = $today->format('Y-m-d H:i:s');
201-
$task->save();
202-
break;
203-
default:
204-
throw new Exception('Unknown timer event: ' . $task->type);
205-
}
206-
}
207-
} catch (\Throwable $ex) {
208-
Log::Error('Failed Scheduled Task: ', [
209-
'Task data' => print_r($task->getAttributes(), true),
210-
'Exception' => $ex->__toString(),
211-
]);
212-
}
163+
$this->processTaskWithAtomicClaim($task, $today, $todayFormatted);
213164
}
214165
} catch (PDOException $e) {
215166
Log::error('The connection to the database had problems (scheduleTasks): ' . $e->getMessage());
216167
}
217168
}
218169

170+
/**
171+
* Release tasks that have been claimed for too long (stale claims).
172+
* This handles cases where a process crashed after claiming tasks.
173+
*/
174+
private function releaseStaleClaimedTasks(): void
175+
{
176+
$staleThreshold = Carbon::now()->subMinutes(self::CLAIM_TIMEOUT_MINUTES);
177+
178+
ScheduledTask::whereNotNull('claimed_by')
179+
->where('claimed_at', '<', $staleThreshold)
180+
->update([
181+
'claimed_by' => null,
182+
'claimed_at' => null,
183+
]);
184+
}
185+
186+
/**
187+
* Process a task with atomic claim to prevent duplicate execution.
188+
* This maintains the original selection logic (nextDate calculation) while
189+
* adding protection against concurrent execution.
190+
*
191+
* @param ScheduledTask $task The task to evaluate and potentially execute
192+
* @param DateTime $today Current datetime
193+
* @param string $todayFormatted Formatted datetime string
194+
*/
195+
private function processTaskWithAtomicClaim(ScheduledTask $task, DateTime $today, string $todayFormatted): void
196+
{
197+
try {
198+
$config = json_decode($task->configuration);
199+
$lastExecution = new DateTime($task->last_execution, new DateTimeZone('UTC'));
200+
201+
if ($lastExecution === null) {
202+
return;
203+
}
204+
205+
$owner = $task->processRequestToken ?: $task->processRequest ?: $task->process;
206+
$ownerDateTime = $owner?->created_at;
207+
$nextDate = $this->nextDate($today, $config, $lastExecution, $ownerDateTime);
208+
209+
// If no execution date exists, skip this task
210+
if (empty($nextDate)) {
211+
return;
212+
}
213+
214+
// Since the task scheduler has a precision of 1 minute (crontab)
215+
// the times must be rounded or truncated to the nearest HH:MM:00 before compare
216+
$method = config('app.timer_events_seconds') . 'DateTime';
217+
$todayWithoutSeconds = $this->$method($today);
218+
$nextDateWithoutSeconds = $this->$method($nextDate);
219+
220+
// Only proceed if the task should execute now
221+
if ($nextDateWithoutSeconds > $todayWithoutSeconds) {
222+
return;
223+
}
224+
225+
// Try to atomically claim this specific task
226+
$claimed = $this->claimTask($task->id, $todayFormatted);
227+
228+
if (!$claimed) {
229+
// Another process already claimed this task, skip it
230+
return;
231+
}
232+
233+
// Re-fetch the task to get fresh data after claiming
234+
$task = ScheduledTask::find($task->id);
235+
if (!$task) {
236+
return;
237+
}
238+
239+
// Execute the task
240+
$this->executeTask($task, $config, $todayFormatted);
241+
242+
} catch (\Throwable $ex) {
243+
Log::error('Failed Scheduled Task: ', [
244+
'Task data' => print_r($task->getAttributes(), true),
245+
'Exception' => $ex->__toString(),
246+
]);
247+
// Release task on error so it can be retried
248+
$this->releaseTask($task);
249+
}
250+
}
251+
252+
/**
253+
* Atomically claim a single task for execution.
254+
* Uses UPDATE with WHERE to ensure only one process can claim it.
255+
*
256+
* @param int $taskId The task ID to claim
257+
* @param string $todayFormatted Current datetime formatted
258+
* @return bool True if successfully claimed, false if already claimed by another process
259+
*/
260+
private function claimTask(int $taskId, string $todayFormatted): bool
261+
{
262+
$claimId = Str::uuid()->toString();
263+
264+
$affected = DB::table('scheduled_tasks')
265+
->where('id', $taskId)
266+
->whereNull('claimed_by')
267+
->update([
268+
'claimed_by' => $claimId,
269+
'claimed_at' => $todayFormatted,
270+
]);
271+
272+
return $affected > 0;
273+
}
274+
275+
/**
276+
* Execute a task based on its type.
277+
*
278+
* @param ScheduledTask $task The task to execute
279+
* @param object $config Task configuration
280+
* @param string $todayFormatted Formatted datetime for last_execution
281+
*/
282+
private function executeTask(ScheduledTask $task, object $config, string $todayFormatted): void
283+
{
284+
$executed = false;
285+
286+
switch ($task->type) {
287+
case 'TIMER_START_EVENT':
288+
$this->executeTimerStartEvent($task, $config);
289+
$executed = true;
290+
break;
291+
case 'INTERMEDIATE_TIMER_EVENT':
292+
$executed = $this->executeIntermediateTimerEvent($task, $config);
293+
break;
294+
case 'BOUNDARY_TIMER_EVENT':
295+
$executed = $this->executeBoundaryTimerEvent($task, $config);
296+
break;
297+
case 'SCHEDULED_JOB':
298+
$this->executeScheduledJob($config);
299+
$executed = true;
300+
break;
301+
default:
302+
throw new Exception('Unknown timer event: ' . $task->type);
303+
}
304+
305+
if ($executed) {
306+
// Update last_execution and release claim
307+
$task->last_execution = $todayFormatted;
308+
$task->claimed_by = null;
309+
$task->claimed_at = null;
310+
$task->save();
311+
} else {
312+
// Release claim without updating last_execution
313+
$this->releaseTask($task);
314+
}
315+
}
316+
317+
/**
318+
* Release a task claim without updating last_execution.
319+
*
320+
* @param ScheduledTask $task The task to release
321+
*/
322+
private function releaseTask(ScheduledTask $task): void
323+
{
324+
$task->claimed_by = null;
325+
$task->claimed_at = null;
326+
$task->save();
327+
}
328+
219329
/**
220330
* Create a scheduled job
221331
*

ProcessMaker/Models/ScheduledTask.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ class ScheduledTask extends ProcessMakerModel
1515

1616
protected $fillable = [
1717
'process_id', 'process_request_id', 'process_request_token_id', 'configuration',
18+
'type', 'last_execution', 'claimed_by', 'claimed_at',
19+
];
20+
21+
protected $casts = [
22+
'claimed_at' => 'datetime',
1823
];
1924

2025
public static function rules()
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
use Illuminate\Database\Migrations\Migration;
4+
use Illuminate\Database\Schema\Blueprint;
5+
use Illuminate\Support\Facades\Schema;
6+
7+
return new class extends Migration
8+
{
9+
/**
10+
* Run the migrations.
11+
*
12+
* @return void
13+
*/
14+
public function up()
15+
{
16+
Schema::table('scheduled_tasks', function (Blueprint $table) {
17+
$table->string('claimed_by', 36)->nullable()->after('configuration');
18+
$table->dateTime('claimed_at')->nullable()->after('claimed_by');
19+
20+
// Index for faster queries when claiming tasks
21+
$table->index(['claimed_by', 'claimed_at']);
22+
});
23+
}
24+
25+
/**
26+
* Reverse the migrations.
27+
*
28+
* @return void
29+
*/
30+
public function down()
31+
{
32+
Schema::table('scheduled_tasks', function (Blueprint $table) {
33+
$table->dropIndex(['claimed_by', 'claimed_at']);
34+
$table->dropColumn(['claimed_by', 'claimed_at']);
35+
});
36+
}
37+
};

0 commit comments

Comments
 (0)