feat: scheduled pipelines + dependency cascade fix#80
Conversation
Add SchedulePipeline MCP tool and CLI support for creating recurring or one-time schedules that trigger multi-step pipelines. Each trigger creates fresh tasks with linear dependencies (step N depends on step N-1). Bug fixes: - Dependency failure cascade: failed/cancelled upstream tasks now cascade cancellation to dependents instead of incorrectly unblocking them - Queue handler race condition: fast-path dependencyState check prevents blocked tasks from being enqueued before dependency rows are written Features: - SchedulePipeline MCP tool (2-20 steps, cron/one-time, per-step agent) - CLI: --pipeline --step flags for schedule create - CancelSchedule: optional cancelTasks flag for in-flight pipeline tasks - ListSchedules: isPipeline/stepCount indicators - GetSchedule: pipelineSteps in response - Database migration 8: pipeline_steps, pipeline_task_ids columns - 188 new tests (1,467 total), zero regressions
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant SE as ScheduleExecutor
participant SH as ScheduleHandler
participant TR as TaskRepo
participant SR as ScheduleRepo
participant EB as EventBus
participant DH as DependencyHandler
participant QH as QueueHandler
SE->>EB: emit(ScheduleTriggered)
EB->>SH: handleScheduleTriggered()
SH->>SH: resolveAfterScheduleTaskId()
SH->>SH: handlePipelineTrigger(schedule, steps)
loop For each step i (0..N-1)
SH->>TR: save(task_i, dependsOn=[task_{i-1}])
TR-->>SH: ok(task_i)
end
SH->>SR: recordExecution(lastTaskId, allTaskIds)
SH->>SR: updateSchedule(runCount, nextRunAt)
loop TaskDelegated for each step
SH->>EB: emit(TaskDelegated, task_i)
EB->>DH: handleTaskDelegated()
Note over DH: Register dependency rows for step i
EB->>QH: handleTaskPersisted()
alt task.dependencyState === blocked
QH-->>EB: skip enqueue (fast-path)
else not blocked (step 0)
QH->>EB: emit(TaskEnqueued)
end
end
SH->>EB: emit(ScheduleExecuted, lastTaskId)
Note over DH: When step i completes...
DH->>DH: getDependencies(step_{i+1})
alt all deps completed
DH->>EB: emit(TaskUnblocked, step_{i+1})
QH->>EB: emit(TaskEnqueued, step_{i+1})
else any dep failed/cancelled
DH->>EB: emit(TaskCancellationRequested, step_{i+1})
Note over DH: Cascade continues recursively
end
Last reviewed commit: 209c3dc |
| this.logger.info('Injected afterSchedule dependency on pipeline step 0', { | ||
| scheduleId, | ||
| afterScheduleId: schedule.afterScheduleId, | ||
| dependsOnTaskId: latestExecution.taskId, |
There was a problem hiding this comment.
HIGH: Duplicated afterScheduleId resolution logic
The handlePipelineTrigger method contains inline afterScheduleId resolution (lines 327-345 approximately) that duplicates the extracted resolveAfterScheduleDependency helper (lines 455-483). The single-task path correctly calls the shared helper, but the pipeline path re-implements the same business logic:
- Fetch execution history
- Get latest execution
- Check if task exists and is in terminal state
- Return undefined or the task ID
Impact: Two places to maintain the same logic. If afterScheduleId resolution rules change (e.g., checking multiple executions, different terminal state semantics), both paths must be updated independently.
Fix: Refactor resolveAfterScheduleDependency to return the resolved TaskId | undefined instead of a modified template, so both handleSingleTaskTrigger and handlePipelineTrigger consume the same primitive:
private async resolveAfterScheduleTaskId(afterScheduleId: ScheduleId): Promise<TaskId | undefined> {
const historyResult = await this.scheduleRepo.getExecutionHistory(afterScheduleId, 1);
if (!historyResult.ok || historyResult.value.length === 0) return undefined;
const latestExecution = historyResult.value[0];
if (!latestExecution.taskId) return undefined;
const depTaskResult = await this.taskRepo.findById(latestExecution.taskId);
if (!depTaskResult.ok || !depTaskResult.value || isTerminalState(depTaskResult.value.status)) {
return undefined;
}
return latestExecution.taskId;
}Flagged by: Architecture, Complexity, Consistency reviews
Code Review Summary: PR #80 - Scheduled PipelinesStatus: CHANGES_REQUESTED This PR introduces scheduled pipelines with dependency failure cascade fixes. The feature is well-structured overall, but there are 8 HIGH severity and 5 MEDIUM severity blocking issues that should be addressed before merge. Below is a deduplicated summary based on comprehensive reviews from 10 reviewers (Architecture, Complexity, Consistency, Database, Documentation, Performance, Regression, Security, Tests, TypeScript). BLOCKING ISSUESCritical1. TASK-DEPENDENCIES.md contradicts cascade behavior (
HIGH Issues2. Duplicated afterScheduleId resolution logic (
private async resolveAfterScheduleTaskId(afterScheduleId: ScheduleId): Promise<TaskId | undefined> {
const historyResult = await this.scheduleRepo.getExecutionHistory(afterScheduleId, 1);
if (!historyResult.ok || historyResult.value.length === 0) return undefined;
const latestExecution = historyResult.value[0];
if (!latestExecution.taskId) return undefined;
const depTaskResult = await this.taskRepo.findById(latestExecution.taskId);
if (!depTaskResult.ok || !depTaskResult.value || isTerminalState(depTaskResult.value.status)) return undefined;
return latestExecution.taskId;
}3. createSchedule not using validateScheduleTiming (
4. Pipeline task creation loop not wrapped in transaction (
const txResult = await this.taskRepo.transaction(async (txRepo) => {
for (let i = 0; i < steps.length; i++) {
// ... build task ...
const saveResult = await txRepo.save(task);
if (!saveResult.ok) return saveResult;
savedTasks.push(task);
}
return ok(undefined);
});
if (!txResult.ok) {
await this.recordFailedExecution(...);
return txResult;
}5. MCP adapter tests use simulate helpers instead of real code (
6. Missing release notes for v0.6.0 (
7. No JSDoc on cancelSchedule updated signature (
/**
* @param cancelTasks - If true, also cancel in-flight tasks from the latest execution
*/8. No JSDoc on createScheduledPipeline (
MEDIUM Issues9. Pipeline cleanup bypasses event system (
10. Validation duplication in CLI (
11. Non-null assertion on pipelineSteps (
// Option A: Accept as parameter
private async handlePipelineTrigger(
schedule: Schedule,
triggeredAt: number,
steps: readonly PipelineStepRequest[]
): Promise<Result<void>> { ... }
// Option B: Local guard
if (!schedule.pipelineSteps || schedule.pipelineSteps.length === 0) {
return err(new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline requires steps'));
}
const steps = schedule.pipelineSteps; // TypeScript narrows correctly12. Missing Zod validation on pipeline_task_ids JSON parse (
const PipelineTaskIdsSchema = z.array(z.string().min(1));
if (data.pipeline_task_ids) {
try {
const parsed = JSON.parse(data.pipeline_task_ids);
const validated = PipelineTaskIdsSchema.parse(parsed);
pipelineTaskIds = validated.map((id) => TaskId(id));
} catch {
pipelineTaskIds = undefined;
}
}13. No service-level test for cancelSchedule with cancelTasks=true (
RECOMMENDATIONCHANGES_REQUESTED - The blocking issues should be resolved before merge: Priority 1 (Critical/High - Required):
Priority 2 (Medium - Should Fix): POSITIVE OBSERVATIONS
Files Requiring Changes
Claude Code - Code Review for Backbeat PR #80 |
…k in handleSchedulePipeline The handleSchedulePipeline handler used undefined as the nextRunAt fallback, while all other schedule handlers (handleScheduleTask, handleListSchedules, handleGetSchedule) used null. This caused inconsistent JSON serialization — undefined omits the field entirely, while null includes it explicitly as "nextRunAt": null. Co-Authored-By: Claude <noreply@anthropic.com>
…hedule Replace ~80 lines of inline timing validation in createSchedule with a call to the existing validateScheduleTiming helper. The helper's JSDoc already claims it is shared between createSchedule and createScheduledPipeline, but createSchedule was never refactored to use it. Now both methods follow the same pattern. Co-Authored-By: Claude <noreply@anthropic.com>
Replace bare `as string[]` type assertion with PipelineTaskIdsSchema Zod validation, matching the pattern used by pipeline_steps. This ensures malformed JSON (e.g., non-string elements, empty strings) is caught at the database boundary rather than silently propagated. Co-Authored-By: Claude <noreply@anthropic.com>
…lease notes TASK-DEPENDENCIES.md contradicted running code: the v0.6.0 cascade cancellation behavior (failed/cancelled deps auto-cancel dependents) was not reflected in the Event Flow diagram, Error Handling examples, or Best Practices section. Updated all sections to match the actual DependencyHandler implementation. Created missing RELEASE_NOTES_v0.6.0.md covering scheduled pipelines, dependency cascade fix, queue handler race condition fix, migration 8, SchedulePipeline MCP tool, and CLI pipeline flags. Co-Authored-By: Claude <noreply@anthropic.com>
…type-safety - Extract resolveAfterScheduleTaskId() returning TaskId | undefined, replacing the old resolveAfterScheduleDependency() that returned a modified task template. Both single-task and pipeline paths now call the same shared helper, eliminating ~18 lines of duplicated logic. - Pass pipelineSteps as a typed parameter (NonNullable) to handlePipelineTrigger, removing the non-null assertion (!). - Add ARCHITECTURE EXCEPTION comment to pipeline cleanup code explaining why direct taskRepo.update() is correct (no events emitted yet). - Add TODO for async transaction wrapping (better-sqlite3 limitation). Co-Authored-By: Claude <noreply@anthropic.com>
…rage Add 3 tests for the cancelTasks branch in ScheduleManagerService.cancelSchedule(): - Pipeline execution with pipelineTaskIds emits TaskCancellationRequested per task - Single taskId fallback when no pipelineTaskIds present - cancelTasks=false does not emit TaskCancellationRequested Co-Authored-By: Claude <noreply@anthropic.com>
| if (cancelTasks) { | ||
| const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId, 1); | ||
| if (historyResult.ok && historyResult.value.length > 0) { | ||
| const latestExecution = historyResult.value[0]; | ||
| const taskIds = latestExecution.pipelineTaskIds ?? (latestExecution.taskId ? [latestExecution.taskId] : []); | ||
| for (const taskId of taskIds) { | ||
| const cancelResult = await this.eventBus.emit('TaskCancellationRequested', { | ||
| taskId, | ||
| reason: `Schedule ${scheduleId} cancelled`, | ||
| }); | ||
| if (!cancelResult.ok) { | ||
| this.logger.warn('Failed to cancel pipeline task', { | ||
| taskId, | ||
| scheduleId, | ||
| error: cancelResult.error.message, | ||
| }); | ||
| } | ||
| } | ||
| this.logger.info('Cancelled in-flight pipeline tasks', { | ||
| scheduleId, | ||
| taskCount: taskIds.length, | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
cancelTasks only covers the single latest execution
getExecutionHistory(scheduleId, 1) limits the lookup to one record. For CRON schedules with short intervals and slow multi-step pipelines, it is possible for a second trigger to fire before the first pipeline finishes. In that situation --cancel-tasks / cancelTasks: true will cancel the newly-created tasks from the latest run but leave the still-running tasks from the previous run untouched.
Consider fetching a small window (e.g., the last 5 executions) and deduplicating task IDs before cancelling:
const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId, 5);
if (historyResult.ok) {
const taskIdSet = new Set<TaskId>();
for (const execution of historyResult.value) {
if (execution.pipelineTaskIds) {
execution.pipelineTaskIds.forEach((id) => taskIdSet.add(id));
} else if (execution.taskId) {
taskIdSet.add(execution.taskId);
}
}
for (const taskId of taskIdSet) { ... }
}At a minimum the doc-comment / tool description should mention that only the most recent execution is targeted.
Chained schedules (afterScheduleId) resolve the predecessor's execution.taskId to check if it's terminal. Storing firstTaskId caused the chain to fire when step 1 completed, not the full pipeline. Now stores lastTaskId so chaining waits for pipeline completion.
…ascade check When getDependencies returned an error, code fell through to the unblock path — potentially unblocking a task whose dependency actually failed. Now logs a warning and skips the task instead.
…ectory validatePath() resolves relative/symlink paths to absolute, but the validation loop discarded the result — schedule stored the original un-normalized path. Now builds normalizedSteps array with resolved paths from validatePath().
…iledExecution Helper hardcoded "Failed to create task: " prefix. Pipeline callsite already passed "Pipeline failed at step N: ..." — resulting in double-wrapping in the audit trail. Moved prefix to single-task callsite, helper now passes errorMessage directly.
Field name implied task IDs but carried a boolean. Renamed to cancelTasksRequested to accurately describe its semantics.
…ernary Replace two identical 7-line ternary chains in schedule create (single-task and pipeline modes) with the existing toMissedRunPolicy() from schedule-manager. Guards with undefined check to preserve CLI pass-through semantics.
…eline error Step 0 is the only task that becomes runnable — all later steps block on it. If its TaskDelegated emit fails, the pipeline is orphaned forever. Now cancels all saved tasks instead of continuing best-effort. Also warn when positional prompt words are silently ignored in --pipeline mode, and fix Biome formatting in schedule-manager test.
| } else if (arg === '--step' && next) { | ||
| pipelineSteps.push(next); | ||
| i++; |
There was a problem hiding this comment.
--step without --pipeline silently discards all steps
pipelineSteps is populated whenever --step appears, but is only consumed in the isPipeline branch. In single-task mode the array is never checked. A user who accidentally omits --pipeline ends up with a single-task schedule and no indication that the --step flags were ignored.
For example:
beat schedule create "run ci" --step "lint" --step "test" --cron "0 9 * * *"
# Creates a single-task schedule for "run ci"; lint/test steps are silently droppedAdd a guard before entering single-task mode:
| } else if (arg === '--step' && next) { | |
| pipelineSteps.push(next); | |
| i++; | |
| } else if (arg === '--step' && next) { | |
| pipelineSteps.push(next); | |
| i++; | |
| } else if (arg.startsWith('-')) { |
And after the isPipeline block returns, before the single-task prompt check:
if (pipelineSteps.length > 0) {
ui.error('--step requires --pipeline. Did you mean: beat schedule create --pipeline --step "..." --step "..."');
process.exit(1);
}
Summary
SchedulePipelineMCP tool and--pipeline --stepCLI flags for creating cron or one-time schedules that trigger multi-step pipelines (2–20 steps). Each trigger creates fresh tasks with linear dependencies.dependencyStatecheck prevents blocked tasks from being enqueued before dependency rows are written to DB.Changes (21 files, +2180 / -183)
Bug fixes:
dependency-handler.ts— cascade cancellation on failed/cancelled upstreamqueue-handler.ts— fast-path blocked task check viadependencyStateCore:
domain.ts—pipelineStepson Schedule,ScheduledPipelineCreateRequestinterfaces.ts—createScheduledPipeline(),cancelTaskson canceldatabase.ts— migration 8:pipeline_steps,pipeline_task_idscolumnsschedule-repository.ts— JSON round-trip with Zod validationschedule-manager.ts—createScheduledPipeline(), shared timing validation,cancelTaskssupportschedule-handler.ts—handlePipelineTrigger()with partial save failure cleanupAdapters & CLI:
mcp-adapter.ts—SchedulePipelinetool, enhancedCancelSchedule/ListSchedules/GetScheduleschedule.ts—--pipeline --stepflags,--cancel-taskson cancelTests (188 new, 1,467 total):
Docs:
Closes #78
Test plan
npm run test:handlers— 115 passed (dependency cascade + queue handler + schedule handler pipeline)npm run test:services— 141 passed (schedule manager createScheduledPipeline)npm run test:implementations— 302 passed (schedule repo pipeline round-trip)npm run test:adapters— 55 passed (SchedulePipeline tool, CancelSchedule cancelTasks)npm run test:cli— 150 passed (--pipeline --step, --cancel-tasks)npm run test:all— 1,467 tests, zero regressionsnpm run build— clean