@@ -39,6 +39,7 @@ import {
3939 SCHEDULE_INFRA_RETRY_BASE_MS ,
4040 SCHEDULE_INFRA_RETRY_MAX_ATTEMPTS ,
4141 SCHEDULE_INFRA_RETRY_MAX_MS ,
42+ SCHEDULE_USAGE_LIMIT_BACKOFF_MS ,
4243} from '@/lib/workflows/schedules/execution-limits'
4344import {
4445 type BlockState ,
@@ -76,6 +77,29 @@ function resetScheduleInfraRetryCount(): Pick<WorkflowScheduleUpdate, 'infraRetr
7677 return { infraRetryCount : 0 }
7778}
7879
80+ /**
81+ * Builds the schedule update shared by every path that treats a run as a failure:
82+ * clears the claim, advances to `nextRunAt`, increments the consecutive-failure
83+ * counter, stamps `lastFailedAt`, and auto-disables once `MAX_CONSECUTIVE_FAILURES`
84+ * is reached. Centralizing this keeps all failure branches (preprocessing,
85+ * execution, exhausted infra retries, usage limit) from diverging — only the
86+ * `nextRunAt` cadence differs per caller.
87+ */
88+ export function buildScheduleFailureUpdate (
89+ now : Date ,
90+ nextRunAt : Date | null
91+ ) : WorkflowScheduleUpdate {
92+ return {
93+ updatedAt : now ,
94+ lastQueuedAt : null ,
95+ nextRunAt,
96+ failedCount : incrementScheduleFailedCount ( ) ,
97+ lastFailedAt : now ,
98+ status : scheduleStatusAfterFailedCountIncrement ( ) ,
99+ ...resetScheduleInfraRetryCount ( ) ,
100+ }
101+ }
102+
79103type RunWorkflowResult =
80104 | {
81105 status : 'skip'
@@ -191,15 +215,7 @@ async function retryScheduleAfterInfraFailure({
191215 const nextRunAt = await determineNextRunAfterError ( payload , now , requestId )
192216 await applyScheduleUpdate (
193217 payload . scheduleId ,
194- {
195- updatedAt : now ,
196- nextRunAt,
197- lastQueuedAt : null ,
198- failedCount : incrementScheduleFailedCount ( ) ,
199- lastFailedAt : now ,
200- status : scheduleStatusAfterFailedCountIncrement ( ) ,
201- ...resetScheduleInfraRetryCount ( ) ,
202- } ,
218+ buildScheduleFailureUpdate ( now , nextRunAt ) ,
203219 requestId ,
204220 `Error updating schedule ${ payload . scheduleId } after exhausted infrastructure retries` ,
205221 { expectedLastQueuedAt : claimedAt }
@@ -777,17 +793,26 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
777793 }
778794
779795 case 402 : {
780- logger . warn ( `[${ requestId } ] Usage limit exceeded, scheduling next run` )
796+ /**
797+ * Usage limits are a billing state, not a broken workflow, and only clear
798+ * on billing-period rollover or upgrade. Back off to at most the usage-limit
799+ * cadence (never faster than the schedule's own cadence) so an over-limit
800+ * schedule stops re-running every tick, and count each hit toward the shared
801+ * auto-disable threshold so an abandoned over-limit schedule eventually stops.
802+ * A successful run resets failedCount, so transient overages self-heal.
803+ */
804+ const cronNextRunAt = await calculateNextRunFromDeployment ( payload , requestId )
805+ const backoffRunAt = new Date ( now . getTime ( ) + SCHEDULE_USAGE_LIMIT_BACKOFF_MS )
781806 const nextRunAt =
782- ( await calculateNextRunFromDeployment ( payload , requestId ) ) ??
783- new Date ( now . getTime ( ) + 60 * 60 * 1000 )
807+ cronNextRunAt && cronNextRunAt . getTime ( ) > backoffRunAt . getTime ( )
808+ ? cronNextRunAt
809+ : backoffRunAt
810+ logger . warn ( `[${ requestId } ] Usage limit exceeded, backing off scheduled run` , {
811+ scheduleId : payload . scheduleId ,
812+ nextRunAt : nextRunAt . toISOString ( ) ,
813+ } )
784814 await updateClaimedSchedule (
785- {
786- updatedAt : now ,
787- lastQueuedAt : null ,
788- nextRunAt,
789- ...resetScheduleInfraRetryCount ( ) ,
790- } ,
815+ buildScheduleFailureUpdate ( now , nextRunAt ) ,
791816 `Error updating schedule ${ payload . scheduleId } after usage limit check`
792817 )
793818 return
@@ -809,15 +834,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
809834 const nextRunAt = await determineNextRunAfterError ( payload , now , requestId )
810835
811836 await updateClaimedSchedule (
812- {
813- updatedAt : now ,
814- lastQueuedAt : null ,
815- nextRunAt,
816- failedCount : incrementScheduleFailedCount ( ) ,
817- lastFailedAt : now ,
818- status : scheduleStatusAfterFailedCountIncrement ( ) ,
819- ...resetScheduleInfraRetryCount ( ) ,
820- } ,
837+ buildScheduleFailureUpdate ( now , nextRunAt ) ,
821838 `Error updating schedule ${ payload . scheduleId } after preprocessing failure`
822839 )
823840 return
@@ -914,15 +931,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
914931 const nextRunAt = calculateNextRunTime ( payload , executionResult . blocks )
915932
916933 await updateClaimedSchedule (
917- {
918- updatedAt : now ,
919- lastQueuedAt : null ,
920- nextRunAt,
921- failedCount : incrementScheduleFailedCount ( ) ,
922- lastFailedAt : now ,
923- status : scheduleStatusAfterFailedCountIncrement ( ) ,
924- ...resetScheduleInfraRetryCount ( ) ,
925- } ,
934+ buildScheduleFailureUpdate ( now , nextRunAt ) ,
926935 `Error updating schedule ${ payload . scheduleId } after failure`
927936 )
928937 } catch ( error : unknown ) {
@@ -934,15 +943,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
934943 const nextRunAt = await determineNextRunAfterError ( payload , now , requestId )
935944
936945 await updateClaimedSchedule (
937- {
938- updatedAt : now ,
939- lastQueuedAt : null ,
940- nextRunAt,
941- failedCount : incrementScheduleFailedCount ( ) ,
942- lastFailedAt : now ,
943- status : scheduleStatusAfterFailedCountIncrement ( ) ,
944- ...resetScheduleInfraRetryCount ( ) ,
945- } ,
946+ buildScheduleFailureUpdate ( now , nextRunAt ) ,
946947 `Error updating schedule ${ payload . scheduleId } after execution error`
947948 )
948949 }
0 commit comments