Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions lib/orchestrator-intervention/engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe("orchestrator intervention engine", () => {
}
});

it("requeues a refining issue when a hold policy matches", async () => {
it("does not auto-requeue a refining issue when a hold policy matches", async () => {
const h = await createTestHarness();
try {
const issue = h.provider.seedIssue({ iid: 77, title: "Blocked", labels: ["Refining"] });
Expand Down Expand Up @@ -79,12 +79,54 @@ describe("orchestrator intervention engine", () => {
source: "worker",
});

assert.equal(executions[0]?.executed, true);
assert.equal(executions[0]?.executed, false);
assert.match(executions[0]?.error ?? "", /not allowed from HOLD state/i);
const updated = await h.provider.getIssue(77);
assert.ok(updated.labels.includes("To Do"));
assert.ok(updated.labels.includes("Refining"));
assert.ok(!updated.labels.includes("To Do"));
const comments = await h.provider.listComments(77);
assert.equal(comments.length, 1);
assert.match(comments[0]!.body, /Requeued after blocked/);
assert.equal(comments.length, 0);
} finally {
await h.cleanup();
}
});

it("does not auto-queue a refining issue via queue_issue after a blocked hold event", async () => {
const h = await createTestHarness();
try {
const issue = h.provider.seedIssue({ iid: 78, title: "Blocked", labels: ["Refining"] });
await upsertInterventionPolicy(h.workspaceDir, h.project.slug, {
id: "queue-blocked",
title: "Queue blocked issues",
mode: "auto",
issueId: 78,
event: { type: "workflow.hold", result: "blocked" },
action: { type: "queue_issue", issueId: 78 },
});

const executions = await recordAndApplyInterventionEvent({
workspaceDir: h.workspaceDir,
channelId: h.channelId,
agentId: "main",
project: h.project,
workflow: h.workflow,
provider: h.provider,
issue,
runCommand: h.runCommand,
}, {
eventType: "workflow.hold",
issueId: 78,
result: "blocked",
fromState: "Doing",
toState: "Refining",
source: "worker",
});

assert.equal(executions[0]?.executed, false);
assert.match(executions[0]?.error ?? "", /automatic queue_issue is not allowed from HOLD state/i);
const updated = await h.provider.getIssue(78);
assert.ok(updated.labels.includes("Refining"));
assert.ok(!updated.labels.includes("To Do"));
} finally {
await h.cleanup();
}
Expand Down
6 changes: 6 additions & 0 deletions lib/orchestrator-intervention/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ async function executePolicyAction(
if (!currentLabel) throw new Error("issue has no recognized workflow label");
const currentState = findStateByLabel(ctx.workflow, currentLabel);
if (!currentState) throw new Error(`unknown state for ${currentLabel}`);
if (currentState.type === StateType.HOLD) {
throw new Error(`automatic requeue is not allowed from HOLD state ${currentLabel}; require explicit human restart via task_start(confirmHoldRestart=true)`);
}
const target = resolveTarget(ctx.workflow, currentLabel, currentState);
if (target.transitioned) {
await ctx.provider.transitionLabel(issue.iid, currentLabel, target.targetLabel);
Expand All @@ -192,6 +195,9 @@ async function executePolicyAction(
if (!currentLabel) throw new Error(`issue #${targetIssueId} has no recognized workflow label`);
const currentState = findStateByLabel(ctx.workflow, currentLabel);
if (!currentState) throw new Error(`unknown state for ${currentLabel}`);
if (currentState.type === StateType.HOLD) {
throw new Error(`automatic queue_issue is not allowed from HOLD state ${currentLabel}; require explicit human restart via task_start(confirmHoldRestart=true)`);
}
const target = resolveTarget(ctx.workflow, currentLabel, currentState);
if (target.transitioned) {
await ctx.provider.transitionLabel(targetIssueId, currentLabel, target.targetLabel);
Expand Down
63 changes: 62 additions & 1 deletion lib/services/heartbeat/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,75 @@
import { describe, it, beforeEach, afterEach } from "node:test";
import assert from "node:assert";
import { createTestHarness, type TestHarness } from "../../testing/index.js";
import { scanOrphanedLabels } from "./health.js";
import { checkWorkerHealth, scanOrphanedLabels } from "./health.js";
import { PrState } from "../../providers/provider.js";
import { writeProjects, type ProjectsData } from "../../projects/index.js";

// ---------------------------------------------------------------------------
// Test suite
// ---------------------------------------------------------------------------

describe("checkWorkerHealth", () => {
let h: TestHarness;

afterEach(async () => {
if (h) await h.cleanup();
});

it("should not revert a blocked hold back to queue from a stale active snapshot", async () => {
h = await createTestHarness({
workers: {
developer: { active: true, issueId: "42", sessionKey: null, previousLabel: "To Do" },
},
});

h.provider.seedIssue({ iid: 42, title: "Blocked issue", labels: ["Doing"] });

const originalGetIssue = h.provider.getIssue.bind(h.provider);
let firstRead = true;
h.provider.getIssue = async (issueId: number) => {
const issue = await originalGetIssue(issueId);
if (!firstRead) return issue;
firstRead = false;

const snapshot = { ...issue, labels: [...issue.labels] };
issue.labels = issue.labels.filter((label) => label !== "Doing");
issue.labels.push("Refining");
return snapshot;
};

const fixes = await checkWorkerHealth({
workspaceDir: h.workspaceDir,
projectSlug: h.project.slug,
project: h.project,
role: "developer",
autoFix: true,
provider: h.provider,
sessions: null,
workflow: h.workflow,
runCommand: async () => ({
stdout: "",
stderr: "",
code: 0,
signal: null,
killed: false,
termination: "exit",
}),
});

assert.strictEqual(fixes.length, 1);
assert.strictEqual(fixes[0]!.fixed, true);
assert.strictEqual(fixes[0]!.labelReverted, undefined, "stale repair should skip queue revert");

const issue = await originalGetIssue(42);
assert.ok(issue.labels.includes("Refining"), `Expected Refining to survive, got: ${issue.labels}`);
assert.ok(!issue.labels.includes("To Do"), `Expected no To Do requeue, got: ${issue.labels}`);

const transitions = h.provider.callsTo("transitionLabel");
assert.strictEqual(transitions.length, 0, "should not transition back to queue from stale state");
});
});

describe("scanOrphanedLabels", () => {
let h: TestHarness;

Expand Down
10 changes: 10 additions & 0 deletions lib/services/heartbeat/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ export async function checkWorkerHealth(opts: {
async function revertLabel(fix: HealthFix, from: StateLabel, to: StateLabel) {
if (!issueIdNum) return;
try {
const latestIssue = await fetchIssue(provider, issueIdNum);
const liveLabel = latestIssue
? getCurrentStateLabel(latestIssue.labels, workflow)
: null;

// Do not overwrite a newer workflow state with a stale health repair.
// This specifically protects blocked HOLD transitions like Doing -> Refining
// from being clobbered back into queue by a stale heartbeat snapshot.
if (liveLabel !== from) return;

await provider.transitionLabel(issueIdNum, from, to);
await recordLoopDiagnostic(workspaceDir, "health_requeue", {
project: project.name,
Expand Down
58 changes: 54 additions & 4 deletions lib/tools/tasks/orchestrator-intervention.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const pluginCtx = {
};

describe("orchestrator_intervention tool", () => {
it("saves and lists policies", async () => {
it("saves and lists safe policies", async () => {
const h = await createTestHarness();
try {
const tool = createOrchestratorInterventionTool(pluginCtx as any)({
Expand All @@ -24,9 +24,9 @@ describe("orchestrator_intervention tool", () => {
channelId: h.channelId,
action: "set_policy",
policy: {
title: "Requeue blocked dev",
title: "Comment on blocked dev",
event: { type: "workflow.hold", role: "developer", result: "blocked" },
action: { type: "requeue", message: "Try again" },
action: { type: "comment", message: "Need human decision" },
},
});

Expand All @@ -37,7 +37,57 @@ describe("orchestrator_intervention tool", () => {

const details = listed.details as { policies: Array<{ title: string }> };
assert.equal(details.policies.length, 1);
assert.equal(details.policies[0]?.title, "Requeue blocked dev");
assert.equal(details.policies[0]?.title, "Comment on blocked dev");
} finally {
await h.cleanup();
}
});

it("rejects auto requeue policies for hold events", async () => {
const h = await createTestHarness();
try {
const tool = createOrchestratorInterventionTool(pluginCtx as any)({
workspaceDir: h.workspaceDir,
messageChannel: "telegram",
});

await assert.rejects(
tool.execute("1", {
channelId: h.channelId,
action: "set_policy",
policy: {
title: "Requeue blocked dev",
event: { type: "workflow.hold", role: "developer", result: "blocked" },
action: { type: "requeue", message: "Try again" },
},
}),
/not allowed for workflow\.hold policies/,
);
} finally {
await h.cleanup();
}
});

it("rejects auto queue_issue policies for hold events", async () => {
const h = await createTestHarness();
try {
const tool = createOrchestratorInterventionTool(pluginCtx as any)({
workspaceDir: h.workspaceDir,
messageChannel: "telegram",
});

await assert.rejects(
tool.execute("1", {
channelId: h.channelId,
action: "set_policy",
policy: {
title: "Queue blocked dev",
event: { type: "workflow.hold", role: "developer", result: "blocked" },
action: { type: "queue_issue", issueId: 42 },
},
}),
/not allowed for workflow\.hold policies/,
);
} finally {
await h.cleanup();
}
Expand Down
7 changes: 7 additions & 0 deletions lib/tools/tasks/orchestrator-intervention.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ Supported action types: ${ORCHESTRATOR_INTERVENTION_ACTION_TYPES.join(", ")}`,
action: payload.action as OrchestratorInterventionPolicy["action"],
updatedBy: toolCtx.sessionKey ?? toolCtx.agentId,
};
if (policy.mode === "auto"
&& policy.event.type === "workflow.hold"
&& (policy.action.type === "requeue" || policy.action.type === "queue_issue")) {
throw new Error(
`auto ${policy.action.type} is not allowed for workflow.hold policies. Hold states like Refining require explicit human restart.`,
);
}
const saved = await upsertInterventionPolicy(workspaceDir, project.slug, policy);
await auditLog(workspaceDir, "orchestrator_intervention_policy_set", {
project: project.name,
Expand Down
37 changes: 37 additions & 0 deletions lib/tools/tasks/task-start.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { describe, it } from "node:test";
import assert from "node:assert/strict";
import { StateType } from "../../workflow/types.js";
import { assertExplicitHoldRestart, resolveTarget } from "./task-start.js";
import { DEFAULT_WORKFLOW } from "../../workflow/defaults.js";

describe("task_start", () => {
it("requires explicit confirmation to restart an issue from Refining", () => {
assert.throws(
() => assertExplicitHoldRestart(42, DEFAULT_WORKFLOW, "Refining", { label: "Refining", type: StateType.HOLD, description: "hold", color: "#000000" }, false),
/confirmHoldRestart: true/,
);
});

it("allows the normal Planning start path without confirmHoldRestart", () => {
assert.doesNotThrow(() => {
assertExplicitHoldRestart(42, DEFAULT_WORKFLOW, "Planning", DEFAULT_WORKFLOW.states.planning!, false);
});
});

it("allows explicit restart from Refining when confirmHoldRestart is true", () => {
assert.doesNotThrow(() => {
assertExplicitHoldRestart(42, DEFAULT_WORKFLOW, "Refining", { label: "Refining", type: StateType.HOLD, description: "hold", color: "#000000" }, true);
});
});

it("resolves Refining to the developer queue when restart is explicitly confirmed", () => {
const target = resolveTarget(
DEFAULT_WORKFLOW,
"Refining",
DEFAULT_WORKFLOW.states.refining!,
);

assert.equal(target.transitioned, true);
assert.equal(target.targetLabel, "To Do");
});
});
27 changes: 25 additions & 2 deletions lib/tools/tasks/task-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
import {
getCurrentStateLabel,
findStateByLabel,
findStateKeyByLabel,
getRoleLabelColor,
} from "../../workflow/index.js";
import { getLevelsForRole } from "../../roles/index.js";
Expand All @@ -39,7 +38,8 @@ Optionally set a level hint (e.g. "junior", "senior") so the heartbeat dispatche

Examples:
- Start work: { channelId: "-1003844794417", issueId: 42 } → advances to next queue
- With level: { channelId: "-1003844794417", issueId: 42, level: "junior" } → advances + hints junior`,
- With level: { channelId: "-1003844794417", issueId: 42, level: "junior" } → advances + hints junior
- Restart from Refining: { channelId: "-1003844794417", issueId: 42, confirmHoldRestart: true } → explicitly leaves blocked/rework hold`,
parameters: {
type: "object",
required: ["channelId", "issueId"],
Expand All @@ -60,13 +60,18 @@ Examples:
type: "string",
description: "Optional level hint for dispatch (e.g. 'junior', 'senior'). Applied as a label so the heartbeat respects it.",
},
confirmHoldRestart: {
type: "boolean",
description: "Required when restarting an issue from blocked/rework hold states like Refining. Normal Planning starts do not need it.",
},
},
},

async execute(_id: string, params: Record<string, unknown>) {
const channelId = resolveChannelId(toolCtx, params.channelId as string | undefined);
const issueId = params.issueId as number;
const levelHint = params.level as string | undefined;
const confirmHoldRestart = params.confirmHoldRestart === true;
const workspaceDir = requireWorkspaceDir(toolCtx);

const messageThreadId = params.messageThreadId as number | undefined;
Expand All @@ -91,6 +96,7 @@ Examples:
if (!currentState) {
throw new Error(`No state config for label "${currentLabel}".`);
}
assertExplicitHoldRestart(issueId, workflow, currentLabel, currentState, confirmHoldRestart);

// Determine target based on current state type
const { targetLabel, targetState, transitioned } = resolveTarget(
Expand Down Expand Up @@ -172,6 +178,23 @@ Examples:
* - ACTIVE: error (already being worked on)
* - TERMINAL: error (issue is closed)
*/
export function assertExplicitHoldRestart(
issueId: number,
workflow: WorkflowConfig,
currentLabel: string,
currentState: StateConfig,
confirmHoldRestart: boolean,
): void {
if (currentState.type !== StateType.HOLD) return;

const initialState = workflow.states[workflow.initial];
const initialHoldLabel = initialState?.label;
if (currentLabel === initialHoldLabel) return;
if (confirmHoldRestart) return;

throw new Error(`Issue #${issueId} is in hold state "${currentLabel}". Restart requires explicit confirmHoldRestart: true.`);
}

export function resolveTarget(
workflow: WorkflowConfig,
currentLabel: string,
Expand Down