Skip to content

Commit f56a0e4

Browse files
fix(schedules): count usage lim error schedule as failed run (#4853)
* fix(schedules): count usage lim error schedule as failed run * remove backoff logic
1 parent e2c2d9a commit f56a0e4

3 files changed

Lines changed: 51 additions & 62 deletions

File tree

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ vi.mock('@/background/schedule-execution', () => ({
5050
executeScheduleJob: mockExecuteScheduleJob,
5151
executeJobInline: mockExecuteJobInline,
5252
releaseScheduleLock: mockReleaseScheduleLock,
53+
buildScheduleFailureUpdate: (now: Date, nextRunAt: Date | null) => ({
54+
updatedAt: now,
55+
lastQueuedAt: null,
56+
nextRunAt,
57+
failedCount: { type: 'sql' },
58+
lastFailedAt: now,
59+
status: { type: 'sql' },
60+
infraRetryCount: 0,
61+
}),
5362
}))
5463

5564
vi.mock('@/lib/core/config/feature-flags', () => mockFeatureFlags)

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import {
2727
SCHEDULE_WORKFLOW_ENQUEUE_LIMIT,
2828
} from '@/lib/workflows/schedules/execution-limits'
2929
import {
30+
buildScheduleFailureUpdate,
3031
executeJobInline,
3132
executeScheduleJob,
3233
releaseScheduleLock,
3334
type ScheduleExecutionPayload,
3435
} from '@/background/schedule-execution'
35-
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
3636

3737
export const dynamic = 'force-dynamic'
3838
export const maxDuration = 3600
@@ -321,15 +321,7 @@ async function markClaimedScheduleFailed(
321321
const now = new Date()
322322
await db
323323
.update(workflowSchedule)
324-
.set({
325-
updatedAt: now,
326-
lastQueuedAt: null,
327-
lastFailedAt: now,
328-
nextRunAt: getScheduleNextRunAt(schedule, now),
329-
failedCount: sql`COALESCE(${workflowSchedule.failedCount}, 0) + 1`,
330-
status: sql`CASE WHEN COALESCE(${workflowSchedule.failedCount}, 0) + 1 >= ${MAX_CONSECUTIVE_FAILURES} THEN 'disabled' ELSE 'active' END`,
331-
infraRetryCount: 0,
332-
})
324+
.set(buildScheduleFailureUpdate(now, getScheduleNextRunAt(schedule, now)))
333325
.where(
334326
and(
335327
eq(workflowSchedule.id, schedule.id),
@@ -482,15 +474,7 @@ async function recoverStaleDatabaseScheduleJobs(now: Date): Promise<void> {
482474

483475
await tx
484476
.update(workflowSchedule)
485-
.set({
486-
updatedAt: now,
487-
lastQueuedAt: null,
488-
lastFailedAt: now,
489-
nextRunAt: getScheduleNextRunAt(payload, now),
490-
failedCount: sql`COALESCE(${workflowSchedule.failedCount}, 0) + 1`,
491-
status: sql`CASE WHEN COALESCE(${workflowSchedule.failedCount}, 0) + 1 >= ${MAX_CONSECUTIVE_FAILURES} THEN 'disabled' ELSE 'active' END`,
492-
infraRetryCount: 0,
493-
})
477+
.set(buildScheduleFailureUpdate(now, getScheduleNextRunAt(payload, now)))
494478
.where(
495479
and(
496480
eq(workflowSchedule.id, payload.scheduleId),

apps/sim/background/schedule-execution.ts

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,29 @@ function resetScheduleInfraRetryCount(): Pick<WorkflowScheduleUpdate, 'infraRetr
7676
return { infraRetryCount: 0 }
7777
}
7878

79+
/**
80+
* Builds the schedule update shared by every path that treats a run as a failure:
81+
* clears the claim, advances to `nextRunAt`, increments the consecutive-failure
82+
* counter, stamps `lastFailedAt`, and auto-disables once `MAX_CONSECUTIVE_FAILURES`
83+
* is reached. Centralizing this keeps all failure branches (preprocessing,
84+
* execution, exhausted infra retries, usage limit) from diverging — only the
85+
* `nextRunAt` cadence differs per caller.
86+
*/
87+
export function buildScheduleFailureUpdate(
88+
now: Date,
89+
nextRunAt: Date | null
90+
): WorkflowScheduleUpdate {
91+
return {
92+
updatedAt: now,
93+
lastQueuedAt: null,
94+
nextRunAt,
95+
failedCount: incrementScheduleFailedCount(),
96+
lastFailedAt: now,
97+
status: scheduleStatusAfterFailedCountIncrement(),
98+
...resetScheduleInfraRetryCount(),
99+
}
100+
}
101+
79102
type RunWorkflowResult =
80103
| {
81104
status: 'skip'
@@ -191,15 +214,7 @@ async function retryScheduleAfterInfraFailure({
191214
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
192215
await applyScheduleUpdate(
193216
payload.scheduleId,
194-
{
195-
updatedAt: now,
196-
nextRunAt,
197-
lastQueuedAt: null,
198-
failedCount: incrementScheduleFailedCount(),
199-
lastFailedAt: now,
200-
status: scheduleStatusAfterFailedCountIncrement(),
201-
...resetScheduleInfraRetryCount(),
202-
},
217+
buildScheduleFailureUpdate(now, nextRunAt),
203218
requestId,
204219
`Error updating schedule ${payload.scheduleId} after exhausted infrastructure retries`,
205220
{ expectedLastQueuedAt: claimedAt }
@@ -777,17 +792,22 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
777792
}
778793

779794
case 402: {
780-
logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`)
795+
/**
796+
* Usage limits are a billing state, not a broken workflow, but they only
797+
* clear on billing-period rollover or upgrade. Keep retrying at the normal
798+
* cadence, but count each hit toward the shared auto-disable threshold so an
799+
* abandoned over-limit schedule eventually stops instead of running forever.
800+
* A successful run resets failedCount, so transient overages self-heal.
801+
*/
781802
const nextRunAt =
782803
(await calculateNextRunFromDeployment(payload, requestId)) ??
783804
new Date(now.getTime() + 60 * 60 * 1000)
805+
logger.warn(`[${requestId}] Usage limit exceeded, counting as failed run`, {
806+
scheduleId: payload.scheduleId,
807+
nextRunAt: nextRunAt.toISOString(),
808+
})
784809
await updateClaimedSchedule(
785-
{
786-
updatedAt: now,
787-
lastQueuedAt: null,
788-
nextRunAt,
789-
...resetScheduleInfraRetryCount(),
790-
},
810+
buildScheduleFailureUpdate(now, nextRunAt),
791811
`Error updating schedule ${payload.scheduleId} after usage limit check`
792812
)
793813
return
@@ -809,15 +829,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
809829
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
810830

811831
await updateClaimedSchedule(
812-
{
813-
updatedAt: now,
814-
lastQueuedAt: null,
815-
nextRunAt,
816-
failedCount: incrementScheduleFailedCount(),
817-
lastFailedAt: now,
818-
status: scheduleStatusAfterFailedCountIncrement(),
819-
...resetScheduleInfraRetryCount(),
820-
},
832+
buildScheduleFailureUpdate(now, nextRunAt),
821833
`Error updating schedule ${payload.scheduleId} after preprocessing failure`
822834
)
823835
return
@@ -914,15 +926,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
914926
const nextRunAt = calculateNextRunTime(payload, executionResult.blocks)
915927

916928
await updateClaimedSchedule(
917-
{
918-
updatedAt: now,
919-
lastQueuedAt: null,
920-
nextRunAt,
921-
failedCount: incrementScheduleFailedCount(),
922-
lastFailedAt: now,
923-
status: scheduleStatusAfterFailedCountIncrement(),
924-
...resetScheduleInfraRetryCount(),
925-
},
929+
buildScheduleFailureUpdate(now, nextRunAt),
926930
`Error updating schedule ${payload.scheduleId} after failure`
927931
)
928932
} catch (error: unknown) {
@@ -934,15 +938,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
934938
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
935939

936940
await updateClaimedSchedule(
937-
{
938-
updatedAt: now,
939-
lastQueuedAt: null,
940-
nextRunAt,
941-
failedCount: incrementScheduleFailedCount(),
942-
lastFailedAt: now,
943-
status: scheduleStatusAfterFailedCountIncrement(),
944-
...resetScheduleInfraRetryCount(),
945-
},
941+
buildScheduleFailureUpdate(now, nextRunAt),
946942
`Error updating schedule ${payload.scheduleId} after execution error`
947943
)
948944
}

0 commit comments

Comments
 (0)