Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -954,11 +954,21 @@ fn spawn_run_handler(
);
}
Err(error) => {
tracing::error!(
actor_id = %ctx.inner().actor_id(),
?error,
"napi run handler failed"
);
let extracted = RivetTransportError::extract(&error);
if extracted.group() == "actor" && extracted.code() == "aborted" {
// The run handler was unwound because the actor is going to
// sleep. This is the expected exit, not a failure.
tracing::debug!(
actor_id = %ctx.inner().actor_id(),
"napi run handler aborted for sleep"
);
} else {
tracing::error!(
actor_id = %ctx.inner().actor_id(),
?error,
"napi run handler failed"
);
}
}
}
})
Expand Down
11 changes: 11 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ export function isRivetErrorLike(
);
}

// Matches the `actor`/`aborted` error that core raises when an in-flight queue
// wait is interrupted because the actor is going to sleep. This is the expected
// way a parked `run` handler unwinds during sleep, not a failure.
export function isActorAbortedError(error: unknown): boolean {
return (
isRivetErrorLike(error) &&
error.group === "actor" &&
error.code === "aborted"
);
}

function isActorSpecifier(value: unknown): value is ActorSpecifier {
return (
typeof value === "object" &&
Expand Down
7 changes: 2 additions & 5 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
encodeBridgeRivetError,
forbiddenError,
INTERNAL_ERROR_CODE,
isActorAbortedError,
isRivetErrorLike,
RivetError,
type RivetErrorLike,
Expand Down Expand Up @@ -1802,11 +1803,7 @@ class NativeQueueAdapter {
}
yield message;
} catch (error) {
if (
isRivetErrorLike(error) &&
error.group === "actor" &&
error.code === "aborted"
) {
if (isActorAbortedError(error)) {
return;
}
throw error;
Expand Down
6 changes: 4 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
RUN_FUNCTION_CONFIG_SYMBOL,
} from "@/actor/config";
import type { AnyStaticActorInstance } from "@/actor/definition";
import { RivetError } from "@/actor/errors";
import { isActorAbortedError, RivetError } from "@/actor/errors";
import type { EventSchemaConfig, QueueSchemaConfig } from "@/actor/schema";
import type { AnyDatabaseProvider } from "@/common/database/config";
import { stringifyError } from "@/utils";
Expand Down Expand Up @@ -256,7 +256,9 @@ export function workflow<
try {
await handle.result;
} catch (error) {
if (runCtx.abortSignal.aborted) {
// `abortSignal.aborted` is delivered on a separate async hop and
// races the rejection, so detect the sleep abort structurally too.
if (runCtx.abortSignal.aborted || isActorAbortedError(error)) {
return;
}

Expand Down
Loading