Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 56 additions & 36 deletions async/unstable_circuit_breaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ export interface CircuitBreakerOptions<T> {
* without disrupting circuit breaker operation.
*
* @param error The error that caused the failure, or a synthetic error
* when the result was classified as failure by {@linkcode isResultFailure}.
* when the result was classified as failure by
* {@linkcode CircuitBreakerOptions.isResultFailure | isResultFailure}.
* @param failureCount Current number of failures in the window.
* @param totalRequests Current number of total requests in the window.
*/
Expand Down Expand Up @@ -308,6 +309,21 @@ function safeCallback<A extends unknown[]>(
};
}

/**
* Calls `fn(arg)` and returns the result. If `fn` throws, the error is
* reported asynchronously and `undefined` is returned.
*/
function tryCall<A, R>(fn: (arg: A) => R, arg: A): R | undefined {
try {
return fn(arg);
} catch (error) {
queueMicrotask(() => {
throw error;
});
return undefined;
}
}

/**
* A circuit breaker that wraps async operations to prevent cascading failures.
*
Expand All @@ -321,6 +337,11 @@ function safeCallback<A extends unknown[]>(
* requests have been recorded in the window, preventing false positives
* during low traffic.
*
* When state changes occur, callbacks fire in a fixed order:
* - Circuit trips open: `onFailure` → `onStateChange` → `onOpen`
* - Cooldown expires: `onStateChange` → `onHalfOpen`
* - Recovery completes: `onStateChange` → `onClose`
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*
* @example Usage
Expand Down Expand Up @@ -568,14 +589,8 @@ export class CircuitBreaker<T = unknown> {
try {
result = await fn();
} catch (error) {
try {
if (this.#isFailure(error)) {
this.#handleFailure(error, currentState.state, currentTime);
}
} catch (predicateError) {
queueMicrotask(() => {
throw predicateError;
});
if (tryCall(this.#isFailure, error)) {
this.#handleFailure(error, currentState.state);
}
throw error;
} finally {
Expand All @@ -587,20 +602,12 @@ export class CircuitBreaker<T = unknown> {
}
}

try {
if (this.#isResultFailure(result)) {
const syntheticError = new Error("Result classified as failure");
this.#handleFailure(syntheticError, currentState.state, currentTime);
return result;
}
} catch (predicateError) {
queueMicrotask(() => {
throw predicateError;
});
return result;
const isResultFail = tryCall(this.#isResultFailure, result);
if (isResultFail) {
this.#handleFailure(undefined, currentState.state);
} else if (isResultFail === false) {
this.#handleSuccess(currentState.state);
}

this.#handleSuccess(currentState.state);
return result;
}

Expand All @@ -619,6 +626,8 @@ export class CircuitBreaker<T = unknown> {
* breaker.forceOpen();
* assertEquals(breaker.state, "open");
* ```
*
* @returns void
*/
forceOpen(): void {
const previous = this.#state.state;
Expand All @@ -630,17 +639,18 @@ export class CircuitBreaker<T = unknown> {
openedAt: now,
consecutiveSuccesses: 0,
};
if (previous !== "open") {
this.#onStateChange?.(previous, "open");
this.#onOpen?.(this.#failures.total, this.#requests.total);
}
if (previous === "open") return;
this.#onStateChange?.(previous, "open");
this.#onOpen?.(this.#failures.total, this.#requests.total);
}

/**
* Forces the circuit breaker to closed state and notifies observers.
*
* This is an operational transition that fires {@linkcode onStateChange}
* and {@linkcode onClose} callbacks. Use this when the protected service
* This is an operational transition that fires
* {@linkcode CircuitBreakerOptions.onStateChange | onStateChange} and
* {@linkcode CircuitBreakerOptions.onClose | onClose} callbacks. Use this
* when the protected service
* has recovered and you want observers to be notified.
*
* For silent resets (e.g., in tests), use {@linkcode reset} instead.
Expand All @@ -657,15 +667,16 @@ export class CircuitBreaker<T = unknown> {
* breaker.forceClose();
* assertEquals(breaker.state, "closed");
* ```
*
* @returns void
*/
forceClose(): void {
const previous = this.#state.state;
this.#state = createInitialState();
this.#clearCounters();
if (previous !== "closed") {
this.#onStateChange?.(previous, "closed");
this.#onClose?.();
}
if (previous === "closed") return;
this.#onStateChange?.(previous, "closed");
this.#onClose?.();
}

/**
Expand All @@ -686,6 +697,8 @@ export class CircuitBreaker<T = unknown> {
* breaker.reset();
* assertEquals(breaker.state, "closed");
* ```
*
* @returns void
*/
reset(): void {
this.#state = createInitialState();
Expand Down Expand Up @@ -739,7 +752,6 @@ export class CircuitBreaker<T = unknown> {
#handleFailure(
error: unknown,
previousState: CircuitState,
now: number,
): void {
this.#failures.increment();
const totalRequests = this.#requests.total;
Expand All @@ -749,11 +761,15 @@ export class CircuitBreaker<T = unknown> {
(totalRequests >= this.#minimumThroughput &&
failureCount / totalRequests >= this.#failureRateThreshold);

const existingOpenedAt = this.#state.state === "open"
? this.#state.openedAt
: undefined;

if (shouldOpen) {
this.#state = {
...this.#state,
state: "open",
openedAt: now,
openedAt: existingOpenedAt ?? Date.now(),
consecutiveSuccesses: 0,
};
} else {
Expand All @@ -763,8 +779,12 @@ export class CircuitBreaker<T = unknown> {
};
}

this.#onFailure?.(error, failureCount, totalRequests);
if (shouldOpen) {
this.#onFailure?.(
error ?? new Error("Result classified as failure"),
failureCount,
totalRequests,
);
if (shouldOpen && existingOpenedAt === undefined) {
this.#onStateChange?.(previousState, "open");
this.#onOpen?.(failureCount, totalRequests);
}
Expand Down
70 changes: 70 additions & 0 deletions async/unstable_circuit_breaker_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ Deno.test("CircuitBreaker.execute() propagates thrown errors", async () => {
);
});

Deno.test("CircuitBreaker.execute() propagates synchronous throws and counts them as failures", async () => {
const breaker = new CircuitBreaker({
failureRateThreshold: 1,
minimumThroughput: 1,
});

await assertRejects(
() =>
breaker.execute(() => {
throw new Error("sync boom");
}),
Error,
"sync boom",
);
assertEquals(breaker.state, "open");
});

// ---------------------------------------------------------------------------
// Failure-rate tripping
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -454,6 +471,59 @@ Deno.test("CircuitBreaker.execute() prevents stale half_open success from closin
assertEquals(breaker.state, "open");
});

Deno.test("CircuitBreaker.execute() fires callbacks once when concurrent half_open requests both fail", async () => {
using time = new FakeTime();

const transitions: Array<[CircuitState, CircuitState]> = [];
let openCallCount = 0;

const breaker = new CircuitBreaker({
failureRateThreshold: 1,
minimumThroughput: 1,
cooldownMs: 1000,
halfOpenMaxConcurrent: 2,
onStateChange: (from, to) => transitions.push([from, to]),
onOpen: () => openCallCount++,
});

await failN(breaker, 1);
time.tick(1001);

let rejectA: ((e: Error) => void) | undefined;
let rejectB: ((e: Error) => void) | undefined;

const promiseA = breaker.execute(
() =>
new Promise<string>((_r, rej) => {
rejectA = (e) => rej(e);
}),
);
const promiseB = breaker.execute(
() =>
new Promise<string>((_r, rej) => {
rejectB = (e) => rej(e);
}),
);

rejectA?.(new Error("fail A"));
rejectB?.(new Error("fail B"));

try {
await promiseA;
} catch { /* expected */ }
try {
await promiseB;
} catch { /* expected */ }

assertEquals(breaker.state, "open");
assertEquals(
transitions.filter(([from, to]) => from === "half_open" && to === "open")
.length,
1,
);
assertEquals(openCallCount, 2); // 1 initial trip + 1 half_open reopen
});

// ---------------------------------------------------------------------------
// Predicates: isFailure / isResultFailure
// ---------------------------------------------------------------------------
Expand Down
Loading