diff --git a/packages/durabletask-js/src/testing/in-memory-backend.ts b/packages/durabletask-js/src/testing/in-memory-backend.ts index 2c96acb..64f6824 100644 --- a/packages/durabletask-js/src/testing/in-memory-backend.ts +++ b/packages/durabletask-js/src/testing/in-memory-backend.ts @@ -62,6 +62,7 @@ export class InMemoryOrchestrationBackend { private readonly activityQueue: ActivityWorkItem[] = []; private readonly stateWaiters: Map = new Map(); private readonly pendingTimers: Set> = new Set(); + private readonly instanceTimers: Map>> = new Map(); private nextCompletionToken: number = 1; private readonly maxHistorySize: number; @@ -217,6 +218,7 @@ export class InMemoryOrchestrationBackend { this.instances.delete(instanceId); this.stateWaiters.delete(instanceId); + this.cancelInstanceTimers(instanceId); return true; } @@ -394,6 +396,7 @@ export class InMemoryOrchestrationBackend { clearTimeout(timer); } this.pendingTimers.clear(); + this.instanceTimers.clear(); } /** @@ -543,6 +546,7 @@ export class InMemoryOrchestrationBackend { const timerHandle = setTimeout(() => { this.pendingTimers.delete(timerHandle); + this.removeInstanceTimer(instance.instanceId, timerHandle); const currentInstance = this.instances.get(instance.instanceId); if (currentInstance && !this.isTerminalStatus(currentInstance.status)) { const timerFiredEvent = pbh.newTimerFiredEvent(timerId, fireAt); @@ -552,6 +556,7 @@ export class InMemoryOrchestrationBackend { } }, delay); this.pendingTimers.add(timerHandle); + this.addInstanceTimer(instance.instanceId, timerHandle); } private processCreateSubOrchestrationAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void { @@ -638,6 +643,36 @@ export class InMemoryOrchestrationBackend { } } + private addInstanceTimer(instanceId: string, timerHandle: ReturnType): void { + let timers = this.instanceTimers.get(instanceId); + if (!timers) { + timers = new Set(); + this.instanceTimers.set(instanceId, timers); + } + timers.add(timerHandle); + } + + private removeInstanceTimer(instanceId: string, timerHandle: ReturnType): void { + const timers = this.instanceTimers.get(instanceId); + if (timers) { + timers.delete(timerHandle); + if (timers.size === 0) { + this.instanceTimers.delete(instanceId); + } + } + } + + private cancelInstanceTimers(instanceId: string): void { + const timers = this.instanceTimers.get(instanceId); + if (timers) { + for (const timer of timers) { + clearTimeout(timer); + this.pendingTimers.delete(timer); + } + this.instanceTimers.delete(instanceId); + } + } + private notifyWaiters(instanceId: string): void { const instance = this.instances.get(instanceId); const waiters = this.stateWaiters.get(instanceId); diff --git a/packages/durabletask-js/test/in-memory-backend.spec.ts b/packages/durabletask-js/test/in-memory-backend.spec.ts index 66fe6a9..a4d6d3b 100644 --- a/packages/durabletask-js/test/in-memory-backend.spec.ts +++ b/packages/durabletask-js/test/in-memory-backend.spec.ts @@ -356,6 +356,73 @@ describe("In-Memory Backend", () => { expect(state).toBeUndefined(); }); + it("should cancel pending timers when purging a terminated orchestration", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Create a timer far in the future — it will still be pending when we terminate + yield ctx.createTimer(3600); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + // Wait for the orchestration to start so the timer action is processed by the backend + await client.waitForOrchestrationStart(id, false, 5); + + // Terminate while the long timer is still pending + await client.terminateOrchestration(id, "terminated"); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED); + + // Timer should still be pending before purge + const pendingTimersBefore = (backend as any).pendingTimers.size; + expect(pendingTimersBefore).toBeGreaterThan(0); + + // Purge the terminated orchestration + const result = await client.purgeOrchestration(id); + expect(result.deletedInstanceCount).toEqual(1); + + // After purge, pending timers for this instance should be cancelled + expect((backend as any).pendingTimers.size).toBe(0); + expect((backend as any).instanceTimers.size).toBe(0); + }); + + it("should cancel pending timers for only the purged orchestration", async () => { + const timerOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.createTimer(3600); + return "done"; + }; + + const waitOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.createTimer(7200); + return "done"; + }; + + worker.addOrchestrator(timerOrchestrator); + worker.addOrchestrator(waitOrchestrator); + await worker.start(); + + // Start two orchestrations that both create long timers + const id1 = await client.scheduleNewOrchestration(timerOrchestrator); + const id2 = await client.scheduleNewOrchestration(waitOrchestrator); + + await client.waitForOrchestrationStart(id1, false, 5); + await client.waitForOrchestrationStart(id2, false, 5); + + // Terminate and purge only the first orchestration + await client.terminateOrchestration(id1, "terminated"); + await client.waitForOrchestrationCompletion(id1, false, 10); + + const result = await client.purgeOrchestration(id1); + expect(result.deletedInstanceCount).toEqual(1); + + // The second orchestration's timer should still be pending + expect((backend as any).pendingTimers.size).toBe(1); + expect((backend as any).instanceTimers.has(id2)).toBe(true); + expect((backend as any).instanceTimers.has(id1)).toBe(false); + }); + it("should allow reusing instance IDs after reset", async () => { const orchestrator: TOrchestrator = async (_: OrchestrationContext, input: number) => { return input * 2;