From 9785d54d876e6621b0e4b2c4bf077511f3504502 Mon Sep 17 00:00:00 2001 From: Noa Date: Fri, 23 Jan 2026 16:51:30 -0600 Subject: [PATCH 1/2] Rework `JobCores` in order to pin v8 instance threads --- crates/core/src/host/host_controller.rs | 67 +-- crates/core/src/host/module_common.rs | 4 +- crates/core/src/host/module_host.rs | 438 +++++++++--------- crates/core/src/host/v8/mod.rs | 176 ++++--- .../src/host/wasm_common/module_host_actor.rs | 4 +- crates/core/src/host/wasmtime/mod.rs | 23 +- crates/core/src/module_host_context.rs | 21 +- crates/core/src/startup.rs | 4 +- crates/core/src/util/jobs.rs | 321 +++++++------ crates/standalone/src/lib.rs | 23 +- crates/standalone/src/main.rs | 2 +- crates/testing/src/modules.rs | 2 +- 12 files changed, 547 insertions(+), 538 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index bf7fe461a62..d41e3f511b4 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -8,7 +8,6 @@ use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; -use crate::host::module_host::ModuleRuntime as _; use crate::host::v8::V8Runtime; use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; @@ -18,7 +17,7 @@ use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset}; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::util::asyncify; -use crate::util::jobs::{JobCores, SingleCoreExecutor}; +use crate::util::jobs::{AllocatedJobCore, JobCores}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, bail, Context}; use async_trait::async_trait; @@ -702,39 +701,41 @@ async fn make_module_host( program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, - executor: SingleCoreExecutor, + core: AllocatedJobCore, ) -> anyhow::Result<(Program, ModuleHost)> { // `make_actor` is blocking, as it needs to compile the wasm to native code, // which may be computationally expensive - sometimes up to 1s for a large module. // TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking // threads, but those aren't for computation. Also, wasmtime uses rayon // to run compilation in parallel, so it'll need to run stuff in rayon anyway. - asyncify(move || { - let database_identity = replica_ctx.database_identity; + let database_identity = replica_ctx.database_identity; - let mcc = ModuleCreationContext { - replica_ctx, - scheduler, - program: &program, - energy_monitor, - }; + let mcc = ModuleCreationContext { + replica_ctx, + scheduler, + program_hash: program.hash, + energy_monitor, + }; - let start = Instant::now(); - let module_host = match host_type { - HostType::Wasm => { - let (actor, init_inst) = runtimes.wasmtime.make_actor(mcc)?; + match host_type { + HostType::Wasm => { + asyncify(move || { + let start = Instant::now(); + let module = runtimes.wasmtime.make_actor(mcc, &program.bytes, core)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, init_inst, unregister, executor, database_identity) - } - HostType::Js => { - let (actor, init_inst) = runtimes.v8.make_actor(mcc)?; - trace!("v8::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, init_inst, unregister, executor, database_identity) - } - }; - Ok((program, module_host)) - }) - .await + let module_host = ModuleHost::new(module, unregister, database_identity); + Ok((program, module_host)) + }) + .await + } + HostType::Js => { + let start = Instant::now(); + let module = runtimes.v8.make_actor(mcc, &program.bytes, core).await?; + trace!("v8::make_actor blocked for {:?}", start.elapsed()); + let module_host = ModuleHost::new(module, unregister, database_identity); + Ok((program, module_host)) + } + } } async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result { @@ -762,7 +763,7 @@ async fn launch_module( energy_monitor: Arc, module_logs: Option, runtimes: Arc, - executor: SingleCoreExecutor, + core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; @@ -780,7 +781,7 @@ async fn launch_module( program, energy_monitor.clone(), on_panic, - executor, + core, ) .await?; @@ -1018,7 +1019,7 @@ impl Host { page_pool: PagePool, database: Database, program: Program, - executor: SingleCoreExecutor, + core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result> { let (db, _connected_clients) = RelationalDB::open( @@ -1042,7 +1043,7 @@ impl Host { Arc::new(NullEnergyMonitor), None, runtimes.clone(), - executor, + core, bsatn_rlb_pool, ) .await?; @@ -1076,7 +1077,7 @@ impl Host { policy: MigrationPolicy, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, - executor: SingleCoreExecutor, + core: AllocatedJobCore, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -1089,7 +1090,7 @@ impl Host { program, energy_monitor, on_panic, - executor, + core, ) .await?; @@ -1253,7 +1254,7 @@ pub(crate) async fn extract_schema_with_pools( initial_program: program.hash, }; - let core = SingleCoreExecutor::in_current_tokio_runtime(); + let core = AllocatedJobCore::default(); let module_info = Host::try_init_in_memory_to_check(runtimes, page_pool, database, program, core, bsatn_rlb_pool).await?; // this should always succeed, but sometimes it doesn't diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index 56b49227910..f444560132b 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -4,7 +4,7 @@ use crate::{ energy::EnergyMonitor, host::{module_host::ModuleInfo, wasm_common::module_host_actor::DescribeError, Scheduler}, - module_host_context::ModuleCreationContextLimited, + module_host_context::ModuleCreationContext, replica_context::ReplicaContext, }; use spacetimedb_lib::{Identity, RawModuleDef}; @@ -13,7 +13,7 @@ use std::sync::Arc; /// Builds a [`ModuleCommon`] from a [`RawModuleDef`]. pub fn build_common_module_from_raw( - mcc: ModuleCreationContextLimited, + mcc: ModuleCreationContext, raw_def: RawModuleDef, ) -> Result { // Perform a bunch of validation on the raw definition. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 012883b84d2..17108cf2c89 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -18,7 +18,6 @@ use crate::host::wasmtime::ModuleInstance; use crate::host::{InvalidFunctionArguments, InvalidViewArguments}; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; -use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; use crate::sql::execute::SqlResult; @@ -27,7 +26,7 @@ use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; -use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor}; +use crate::util::jobs::SingleCoreExecutor; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; @@ -66,7 +65,6 @@ use spacetimedb_schema::table_name::TableName; use spacetimedb_vm::relation::RelValue; use std::collections::{HashSet, VecDeque}; use std::fmt; -use std::future::Future; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; @@ -331,67 +329,78 @@ impl ReducersMap { } } -/// A runtime that can create modules. -pub trait ModuleRuntime { - /// Creates a module based on the context `mcc`. - /// - /// Also returns the initial instance for the module. - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result<(Module, Instance)>; +pub enum ModuleWithInstance { + Wasm { + module: super::wasmtime::Module, + executor: SingleCoreExecutor, + init_inst: Box, + }, + Js { + module: super::v8::JsModule, + init_inst: super::v8::JsInstance, + }, } -pub enum Module { - Wasm(super::wasmtime::Module), - Js(super::v8::JsModule), +enum ModuleHostInner { + Wasm(WasmtimeModuleHost), + Js(V8ModuleHost), } -pub enum Instance { - // Box these instances because they're very different sizes, - // which makes Clippy sad and angry. - Wasm(Box), - Js(Box), +struct WasmtimeModuleHost { + executor: SingleCoreExecutor, + instance_manager: ModuleInstanceManager, } -impl Module { - pub fn replica_ctx(&self) -> &Arc { - match self { - Module::Wasm(module) => module.replica_ctx(), - Module::Js(module) => module.replica_ctx(), - } +struct V8ModuleHost { + instance_manager: ModuleInstanceManager, +} + +/// A module; used as a bound on `InstanceManager`. +trait GenericModule { + type Instance: GenericModuleInstance; + async fn create_instance(&self) -> Self::Instance; + fn host_type(&self) -> HostType; +} + +trait GenericModuleInstance { + fn trapped(&self) -> bool; +} + +impl GenericModuleInstance for super::wasm_common::module_host_actor::WasmModuleInstance { + fn trapped(&self) -> bool { + self.trapped() } +} - fn scheduler(&self) -> &Scheduler { - match self { - Module::Wasm(module) => module.scheduler(), - Module::Js(module) => module.scheduler(), - } +impl GenericModuleInstance for Box { + fn trapped(&self) -> bool { + (**self).trapped() } +} - fn info(&self) -> Arc { - match self { - Module::Wasm(module) => module.info(), - Module::Js(module) => module.info(), - } +impl GenericModule for super::wasmtime::Module { + type Instance = Box; + async fn create_instance(&self) -> Self::Instance { + Box::new(self.create_instance()) } - async fn create_instance(&self) -> Instance { - match self { - Module::Wasm(module) => Instance::Wasm(Box::new(module.create_instance())), - Module::Js(module) => Instance::Js(Box::new(module.create_instance().await)), - } + fn host_type(&self) -> HostType { + HostType::Wasm + } +} + +impl GenericModule for super::v8::JsModule { + type Instance = super::v8::JsInstance; + async fn create_instance(&self) -> Self::Instance { + self.create_instance().await } fn host_type(&self) -> HostType { - match self { - Module::Wasm(_) => HostType::Wasm, - Module::Js(_) => HostType::Js, - } + HostType::Js } } -impl Instance { +impl GenericModuleInstance for super::v8::JsInstance { fn trapped(&self) -> bool { - match self { - Instance::Wasm(inst) => inst.trapped(), - Instance::Js(inst) => inst.trapped(), - } + self.trapped() } } @@ -724,9 +733,9 @@ impl CallProcedureParams { /// When we introduce procedures, it will be necessary to have multiple instances, /// as each procedure invocation will have its own sandboxed instance, /// and multiple procedures can run concurrently with up to one reducer. -struct ModuleInstanceManager { - instances: VecDeque, - module: Arc, +struct ModuleInstanceManager { + instances: Mutex>, + module: M, create_instance_time_metric: CreateInstanceTimeMetric, } @@ -752,8 +761,8 @@ impl CreateInstanceTimeMetric { } } -impl ModuleInstanceManager { - fn new(module: Arc, init_inst: Instance, database_identity: Identity) -> Self { +impl ModuleInstanceManager { + fn new(module: M, init_inst: M::Instance, database_identity: Identity) -> Self { let host_type = module.host_type(); let create_instance_time_metric = CreateInstanceTimeMetric { metric: WORKER_METRICS @@ -768,17 +777,25 @@ impl ModuleInstanceManager { instances.push_front(init_inst); Self { - instances, + instances: Mutex::new(instances), module, create_instance_time_metric, } } - async fn get_instance(&mut self) -> Instance { - if let Some(inst) = self.instances.pop_back() { + + async fn with_instance(&self, f: impl AsyncFnOnce(M::Instance) -> (R, M::Instance)) -> R { + let inst = self.get_instance().await; + let (res, inst) = f(inst).await; + self.return_instance(inst).await; + res + } + + async fn get_instance(&self) -> M::Instance { + let inst = self.instances.lock().await.pop_back(); + if let Some(inst) = inst { inst } else { let start_time = std::time::Instant::now(); - // TODO: should we be calling `create_instance` on the `SingleCoreExecutor` rather than the calling thread? let res = self.module.create_instance().await; let elapsed_time = start_time.elapsed(); self.create_instance_time_metric.observe(elapsed_time); @@ -786,7 +803,7 @@ impl ModuleInstanceManager { } } - fn return_instance(&mut self, inst: Instance) { + async fn return_instance(&self, inst: M::Instance) { if inst.trapped() { // Don't return trapped instances; // they may have left internal data structures in the guest `Instance` @@ -794,18 +811,16 @@ impl ModuleInstanceManager { return; } - self.instances.push_front(inst); + self.instances.lock().await.push_front(inst); } } #[derive(Clone)] pub struct ModuleHost { pub info: Arc, - pub module: Arc, + inner: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, - instance_manager: Arc>, - executor: SingleCoreExecutor, /// Marks whether this module has been closed by [`Self::exit`]. /// @@ -817,17 +832,15 @@ impl fmt::Debug for ModuleHost { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ModuleHost") .field("info", &self.info) - .field("module", &Arc::as_ptr(&self.module)) + .field("inner", &Arc::as_ptr(&self.inner)) .finish() } } pub struct WeakModuleHost { info: Arc, - inner: Weak, + inner: Weak, on_panic: Weak, - instance_manager: Weak>, - executor: WeakSingleCoreExecutor, closed: Weak, } @@ -978,27 +991,36 @@ pub struct RefInstance<'a, I: WasmInstance> { impl ModuleHost { pub(super) fn new( - module: Module, - init_inst: Instance, + module: ModuleWithInstance, on_panic: impl Fn() + Send + Sync + 'static, - executor: SingleCoreExecutor, database_identity: Identity, ) -> Self { - let info = module.info(); - let module = Arc::new(module); + let info; + let inner = match module { + ModuleWithInstance::Wasm { + module, + executor, + init_inst, + } => { + info = module.info(); + let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity); + Arc::new(ModuleHostInner::Wasm(WasmtimeModuleHost { + executor, + instance_manager, + })) + } + ModuleWithInstance::Js { module, init_inst } => { + info = module.info(); + let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity); + Arc::new(ModuleHostInner::Js(V8ModuleHost { instance_manager })) + } + }; let on_panic = Arc::new(on_panic); - let module_clone = module.clone(); - - let instance_manager = ModuleInstanceManager::new(module_clone, init_inst, database_identity); - let instance_manager = Arc::new(Mutex::new(instance_manager)); - ModuleHost { info, - module, + inner, on_panic, - instance_manager, - executor, closed: Arc::new(AtomicBool::new(false)), } } @@ -1035,42 +1057,43 @@ impl ModuleHost { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - self.guard_closed()?; - - let timer_guard = self.start_call_timer(label); - - let res = self - .executor - .run_sync_job(move || { - drop(timer_guard); - f() - }) - .await; - - Ok(res) + self.on_module_thread_async(label, async move || f()).await } /// Run an async function on the JobThread for this module. /// Similar to `on_module_thread`, but for async functions. - pub async fn on_module_thread_async(&self, label: &str, f: Fun) -> Result + pub async fn on_module_thread_async(&self, label: &str, f: F) -> Result where - Fun: (FnOnce() -> Fut) + Send + 'static, - Fut: Future + Send + 'static, + F: AsyncFnOnce() -> R + Send + 'static, R: Send + 'static, { self.guard_closed()?; let timer_guard = self.start_call_timer(label); - let res = self - .executor - .run_job(async move { - drop(timer_guard); - f().await - }) - .await; - - Ok(res) + Ok(match &*self.inner { + ModuleHostInner::Wasm(WasmtimeModuleHost { executor, .. }) => { + executor + .run_job(async move || { + drop(timer_guard); + f().await + }) + .await + } + ModuleHostInner::Js(V8ModuleHost { instance_manager }) => { + instance_manager + .with_instance(async |mut inst| { + let res = inst + .run_on_thread(async move || { + drop(timer_guard); + f().await + }) + .await; + (res, inst) + }) + .await + } + }) } fn start_call_timer(&self, label: &str) -> ScopeGuard<(), impl FnOnce(())> { @@ -1100,16 +1123,15 @@ impl ModuleHost { } /// Run a function for this module which has access to the module instance. - async fn with_instance<'a, Guard, R, F>( - &'a self, + async fn with_instance( + &self, kind: &str, label: &str, + arg: A, timer: impl FnOnce(&str) -> Guard, - work: impl FnOnce(Guard, &'a SingleCoreExecutor, Instance) -> F, - ) -> Result - where - F: Future, - { + work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box, A) -> (R, Box), + work_js: impl AsyncFnOnce(Guard, &mut JsInstance, A) -> R, + ) -> Result { self.guard_closed()?; let timer_guard = timer(label); @@ -1126,78 +1148,59 @@ impl ModuleHost { (self.on_panic)(); }); - // TODO: should we be calling and/or `await`-ing `get_instance` within the below `run_job`? - // Unclear how much overhead this call can have. - let inst = self.instance_manager.lock().await.get_instance().await; - - let (res, inst) = work(timer_guard, &self.executor, inst).await; - - self.instance_manager.lock().await.return_instance(inst); - - Ok(res) - } - - async fn call_async_with_instance(&self, label: &str, work: Fun) -> Result - where - Fun: (FnOnce(Instance) -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - R: Send + 'static, - { - self.with_instance( - "procedure", - label, - |l| self.start_call_timer(l), - |timer_guard, executor, inst| { - executor.run_job(async move { - drop(timer_guard); - work(inst).await - }) - }, - ) - .await + Ok(match &*self.inner { + ModuleHostInner::Wasm(WasmtimeModuleHost { + executor, + instance_manager, + }) => { + instance_manager + .with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await) + .await + } + ModuleHostInner::Js(V8ModuleHost { instance_manager }) => { + instance_manager + .with_instance(async |mut inst| (work_js(timer_guard, &mut inst, arg).await, inst)) + .await + } + }) } /// Run a function for this module which has access to the module instance. /// /// For WASM, the function is run on the module's JobThread. /// For V8/JS, the function is run in the current task. - async fn call( + async fn call( &self, label: &str, arg: A, - wasm: impl FnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js: impl FnOnce(A, Box) -> JF, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl AsyncFnOnce(A, &mut JsInstance) -> R, ) -> Result where - JF: Future)>, R: Send + 'static, A: Send + 'static, { self.with_instance( "reducer", label, + arg, |l| self.start_call_timer(l), // Operations on module instances (e.g. calling reducers) is blocking, // partially because the computation can potentially take a long time // and partially because interacting with the database requires taking a blocking lock. // So, we run `work` on a dedicated thread with `self.executor`. // This will bubble up any panic that may occur. - |timer_guard, executor, inst| async move { - match inst { - Instance::Wasm(mut inst) => { - executor - .run_sync_job(move || { - drop(timer_guard); - (wasm(arg, &mut inst), Instance::Wasm(inst)) - }) - .await - } - Instance::Js(inst) => { + async move |timer_guard, executor, mut inst, arg| { + executor + .run_job(async move || { drop(timer_guard); - let (res, inst) = js(arg, inst).await; - (res, Instance::Js(inst)) - } - } + (wasm(arg, &mut inst).await, inst) + }) + .await + }, + async move |timer_guard, inst, arg| { + drop(timer_guard); + js(arg, inst).await }, ) .await @@ -1209,8 +1212,8 @@ impl ModuleHost { .call( "disconnect_client", client_id, - |client_id, inst| inst.disconnect_client(client_id), - |client_id, inst| inst.disconnect_client(client_id), + async |client_id, inst| inst.disconnect_client(client_id), + async |client_id, inst| inst.disconnect_client(client_id).await, ) .await { @@ -1258,8 +1261,8 @@ impl ModuleHost { self.call( "call_identity_connected", (caller_auth, caller_connection_id), - |(a, b), inst| inst.call_identity_connected(a, b), - |(a, b), inst| inst.call_identity_connected(a, b), + async |(a, b), inst| inst.call_identity_connected(a, b), + async |(a, b), inst| inst.call_identity_connected(a, b).await, ) .await .map_err(ReducerCallError::from)? @@ -1417,8 +1420,8 @@ impl ModuleHost { self.call( "call_identity_disconnected", (caller_identity, caller_connection_id, drop_view_subscribers), - |(a, b, c), inst| inst.call_identity_disconnected(a, b, c), - |(a, b, c), inst| inst.call_identity_disconnected(a, b, c), + async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c), + async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c).await, ) .await? } @@ -1428,8 +1431,8 @@ impl ModuleHost { self.call( "clear_all_clients", (), - |_, inst| inst.clear_all_clients(), - |_, inst| inst.clear_all_clients(), + async |_, inst| inst.clear_all_clients(), + async |_, inst| inst.clear_all_clients().await, ) .await? } @@ -1491,8 +1494,8 @@ impl ModuleHost { .call( &reducer_def.name, call_reducer_params, - |p, inst| inst.call_reducer(p), - |p, inst| inst.call_reducer(p), + async |p, inst| inst.call_reducer(p), + async |p, inst| inst.call_reducer(p).await, ) .await?) } @@ -1560,8 +1563,8 @@ impl ModuleHost { .call( "call_view_add_single_subscription", cmd, - |cmd, inst| inst.call_view(cmd), - |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd).await, ) .await //TODO: handle error better @@ -1593,8 +1596,8 @@ impl ModuleHost { .call( "call_view_add_multi_subscription", cmd, - |cmd, inst| inst.call_view(cmd), - |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd).await, ) .await //TODO: handle error better @@ -1626,8 +1629,8 @@ impl ModuleHost { .call( "call_view_add_legacy_subscription", cmd, - |cmd, inst| inst.call_view(cmd), - |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd).await, ) .await //TODO: handle error better @@ -1660,8 +1663,8 @@ impl ModuleHost { .call( "call_view_sql", cmd, - |cmd, inst| inst.call_view(cmd), - |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd), + async |cmd, inst| inst.call_view(cmd).await, ) .await //TODO: handle error better @@ -1748,31 +1751,20 @@ impl ModuleHost { args, }; - Ok(self - .call_async_with_instance(&procedure_def.name, async move |inst| match inst { - Instance::Wasm(mut inst) => (inst.call_procedure(params).await, Instance::Wasm(inst)), - Instance::Js(inst) => { - let (r, s) = inst.call_procedure(params).await; - (r, Instance::Js(s)) - } - }) - .await?) + Ok(self.call_procedure_with_params(&procedure_def.name, params).await?) } - // This is not reused in `call_procedure_inner` - // due to concerns re. `Timestamp::now`. pub async fn call_procedure_with_params( &self, name: &str, params: CallProcedureParams, ) -> Result { - self.call_async_with_instance(name, async move |inst| match inst { - Instance::Wasm(mut inst) => (inst.call_procedure(params).await, Instance::Wasm(inst)), - Instance::Js(inst) => { - let (r, s) = inst.call_procedure(params).await; - (r, Instance::Js(s)) - } - }) + self.call( + name, + params, + async move |params, inst| inst.call_procedure(params).await, + async move |params, inst| inst.call_procedure(params).await, + ) .await } @@ -1780,25 +1772,11 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.with_instance( - "scheduled function", - "reducer or procedure", - |l| self.start_call_timer(l), - async move |timer_guard, executor, inst| match inst { - Instance::Wasm(mut inst) => { - executor - .run_job(async move { - drop(timer_guard); - (inst.call_scheduled_function(params).await, Instance::Wasm(inst)) - }) - .await - } - Instance::Js(inst) => { - drop(timer_guard); - let (r, s) = inst.call_scheduled_function(params).await; - (r, Instance::Js(s)) - } - }, + self.call( + "unknown scheduled function", + params, + async move |params, inst| inst.call_scheduled_function(params).await, + async move |params, inst| inst.call_scheduled_function(params).await, ) .await } @@ -1960,8 +1938,8 @@ impl ModuleHost { self.call( "", program, - |p, inst| inst.init_database(p), - |p, inst| inst.init_database(p), + async |p, inst| inst.init_database(p), + async |p, inst| inst.init_database(p).await, ) .await? .map_err(InitDatabaseError::Other) @@ -1976,8 +1954,8 @@ impl ModuleHost { self.call( "", (program, old_module_info, policy), - |(a, b, c), inst| inst.update_database(a, b, c), - |(a, b, c), inst| inst.update_database(a, b, c), + async |(a, b, c), inst| inst.update_database(a, b, c), + async |(a, b, c), inst| inst.update_database(a, b, c).await, ) .await? } @@ -1985,12 +1963,12 @@ impl ModuleHost { pub async fn exit(&self) { // As in `Self::marked_closed`, `Relaxed` is sufficient because we're not synchronizing any external state. self.closed.store(true, std::sync::atomic::Ordering::Relaxed); - self.module.scheduler().close(); + self.scheduler().close(); self.exited().await; } pub async fn exited(&self) { - self.module.scheduler().closed().await; + self.scheduler().closed().await; } pub fn inject_logs(&self, log_level: LogLevel, function_name: &str, message: &str) { @@ -2156,10 +2134,8 @@ impl ModuleHost { pub fn downgrade(&self) -> WeakModuleHost { WeakModuleHost { info: self.info.clone(), - inner: Arc::downgrade(&self.module), + inner: Arc::downgrade(&self.inner), on_panic: Arc::downgrade(&self.on_panic), - instance_manager: Arc::downgrade(&self.instance_manager), - executor: self.executor.downgrade(), closed: Arc::downgrade(&self.closed), } } @@ -2177,7 +2153,17 @@ impl ModuleHost { } pub(crate) fn replica_ctx(&self) -> &ReplicaContext { - self.module.replica_ctx() + match &*self.inner { + ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(), + ModuleHostInner::Js(js) => js.instance_manager.module.replica_ctx(), + } + } + + fn scheduler(&self) -> &Scheduler { + match &*self.inner { + ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(), + ModuleHostInner::Js(js) => js.instance_manager.module.scheduler(), + } } } @@ -2185,15 +2171,11 @@ impl WeakModuleHost { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let on_panic = self.on_panic.upgrade()?; - let instance_manager = self.instance_manager.upgrade()?; - let executor = self.executor.upgrade()?; let closed = self.closed.upgrade()?; Some(ModuleHost { info: self.info.clone(), - module: inner, + inner, on_panic, - instance_manager, - executor, closed, }) } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 0abf9203163..d6eedd2359b 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -10,13 +10,13 @@ use self::syscall::{ resolve_sys_module, FnRet, HookFunctions, }; use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon}; -use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; +use super::module_host::{CallProcedureParams, CallReducerParams, ModuleInfo, ModuleWithInstance}; use super::UpdateDatabaseResult; use crate::client::ClientActorId; use crate::host::host_controller::CallProcedureReturn; use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot}; use crate::host::module_host::{ - call_identity_connected, init_database, ClientConnectedError, Instance, ViewCommand, ViewCommandResult, + call_identity_connected, init_database, ClientConnectedError, ViewCommand, ViewCommandResult, }; use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams}; use crate::host::wasm_common::instrumentation::CallTimes; @@ -27,14 +27,15 @@ use crate::host::wasm_common::module_host_actor::{ }; use crate::host::wasm_common::{RowIters, TimingSpanSet, DESCRIBE_MODULE_DUNDER}; use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; -use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited}; +use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_manager::TransactionOffset; -use crate::util::asyncify; +use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard}; use anyhow::Context as _; use core::any::type_name; use core::str; use enum_as_inner::EnumAsInner; +use futures::future::LocalBoxFuture; use futures::FutureExt; use itertools::Either; use spacetimedb_auth::identity::ConnectionAuthCtx; @@ -44,9 +45,11 @@ use spacetimedb_datastore::traits::Program; use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; use spacetimedb_table::static_assert_size; +use std::panic::AssertUnwindSafe; use std::sync::{Arc, LazyLock}; use std::time::Instant; use tokio::sync::oneshot; +use tracing::Instrument; use v8::script_compiler::{compile_module, Source}; use v8::{ scope_with_context, Context, Function, Isolate, Local, MapFnTo, OwnedIsolate, PinScope, ResolveModuleCallback, @@ -70,9 +73,14 @@ pub struct V8Runtime { _priv: (), } -impl ModuleRuntime for V8Runtime { - fn make_actor(&self, mcc: ModuleCreationContext) -> anyhow::Result<(Module, Instance)> { - V8_RUNTIME_GLOBAL.make_actor(mcc) +impl V8Runtime { + pub async fn make_actor( + &self, + mcc: ModuleCreationContext, + program_bytes: &[u8], + core: AllocatedJobCore, + ) -> anyhow::Result { + V8_RUNTIME_GLOBAL.make_actor(mcc, program_bytes, core).await } } @@ -111,28 +119,38 @@ impl V8RuntimeInner { Self { _priv: () } } -} -impl ModuleRuntime for V8RuntimeInner { - fn make_actor(&self, mcc: ModuleCreationContext) -> anyhow::Result<(Module, Instance)> { + async fn make_actor( + &self, + mcc: ModuleCreationContext, + program_bytes: &[u8], + core: AllocatedJobCore, + ) -> anyhow::Result { #![allow(unreachable_code, unused_variables)] log::trace!( "Making new V8 module host actor for database {} with module {}", mcc.replica_ctx.database_identity, - mcc.program.hash, + mcc.program_hash, ); // Convert program to a string. - let program: Arc = str::from_utf8(&mcc.program.bytes)?.into(); + let program: Arc = str::from_utf8(program_bytes)?.into(); // Validate/create the module and spawn the first instance. - let mcc = Either::Right(mcc.into_limited()); - let (common, init_inst) = spawn_instance_worker(program.clone(), mcc)?; + let mcc = Either::Right(mcc); + let load_balance_guard = Arc::new(core.guard); + let core_pinner = core.pinner; + let (common, init_inst) = + spawn_instance_worker(program.clone(), mcc, load_balance_guard.clone(), core_pinner.clone()).await?; + let module = JsModule { + common, + program, + load_balance_guard, + core_pinner, + }; - let module = Module::Js(JsModule { common, program }); - let init_inst = Instance::Js(Box::new(init_inst)); - Ok((module, init_inst)) + Ok(ModuleWithInstance::Js { module, init_inst }) } } @@ -140,6 +158,8 @@ impl ModuleRuntime for V8RuntimeInner { pub struct JsModule { common: ModuleCommon, program: Arc, + load_balance_guard: Arc, + core_pinner: CorePinner, } impl JsModule { @@ -158,14 +178,14 @@ impl JsModule { pub async fn create_instance(&self) -> JsInstance { let program = self.program.clone(); let common = self.common.clone(); + let load_balance_guard = self.load_balance_guard.clone(); + let core_pinner = self.core_pinner.clone(); - asyncify(move || { - // This has to be done in a blocking context because of `blocking_recv`. - let (_, instance) = spawn_instance_worker(program, Either::Left(common)) - .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); - instance - }) - .await + // This has to be done in a blocking context because of `blocking_recv`. + let (_, instance) = spawn_instance_worker(program, Either::Left(common), load_balance_guard, core_pinner) + .await + .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); + instance } } @@ -274,10 +294,10 @@ impl JsInstance { /// Send a request to the worker and wait for a reply. async fn send_recv( - mut self: Box, + &mut self, extract: impl FnOnce(JsWorkerReply) -> Result, request: JsWorkerRequest, - ) -> (T, Box) { + ) -> T { // Send the request. self.request_tx .send_async(request) @@ -295,16 +315,45 @@ impl JsInstance { match extract(reply) { Err(err) => unreachable!("should have received {} but got {err:?}", type_name::()), - Ok(reply) => (reply, self), + Ok(reply) => reply, + } + } + + pub async fn run_on_thread(&mut self, f: F) -> R + where + F: AsyncFnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let span = tracing::Span::current(); + let (tx, rx) = oneshot::channel(); + + let request = JsWorkerRequest::RunFunction(Box::new(move || { + async move { + let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; + if let Err(Err(_panic)) = tx.send(result) { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + })); + + self.request_tx + .send_async(request) + .await + .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); + + match rx.await.unwrap() { + Ok(r) => r, + Err(e) => std::panic::resume_unwind(e), } } pub async fn update_database( - self: Box, + &mut self, program: Program, old_module_info: Arc, policy: MigrationPolicy, - ) -> (anyhow::Result, Box) { + ) -> anyhow::Result { self.send_recv( JsWorkerReply::into_update_database, JsWorkerRequest::UpdateDatabase { @@ -316,7 +365,7 @@ impl JsInstance { .await } - pub async fn call_reducer(self: Box, params: CallReducerParams) -> (ReducerCallResult, Box) { + pub async fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult { self.send_recv( JsWorkerReply::into_call_reducer, JsWorkerRequest::CallReducer { params }, @@ -324,16 +373,16 @@ impl JsInstance { .await } - pub async fn clear_all_clients(self: Box) -> (anyhow::Result<()>, Box) { + pub async fn clear_all_clients(&mut self) -> anyhow::Result<()> { self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) .await } pub async fn call_identity_connected( - self: Box, + &mut self, caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, - ) -> (Result<(), ClientConnectedError>, Box) { + ) -> Result<(), ClientConnectedError> { self.send_recv( JsWorkerReply::into_call_identity_connected, JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id), @@ -342,11 +391,11 @@ impl JsInstance { } pub async fn call_identity_disconnected( - self: Box, + &mut self, caller_identity: Identity, caller_connection_id: ConnectionId, drop_view_subscribers: bool, - ) -> (Result<(), ReducerCallError>, Box) { + ) -> Result<(), ReducerCallError> { self.send_recv( JsWorkerReply::into_call_identity_disconnected, JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id, drop_view_subscribers), @@ -354,10 +403,7 @@ impl JsInstance { .await } - pub async fn disconnect_client( - self: Box, - client_id: ClientActorId, - ) -> (Result<(), ReducerCallError>, Box) { + pub async fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { self.send_recv( JsWorkerReply::into_disconnect_client, JsWorkerRequest::DisconnectClient(client_id), @@ -365,43 +411,37 @@ impl JsInstance { .await } - pub async fn init_database( - self: Box, - program: Program, - ) -> (anyhow::Result>, Box) { - let (ret, inst) = self + pub async fn init_database(&mut self, program: Program) -> anyhow::Result> { + *self .send_recv( JsWorkerReply::into_init_database, JsWorkerRequest::InitDatabase(program), ) - .await; - (*ret, inst) + .await } - pub async fn call_procedure(self: Box, params: CallProcedureParams) -> (CallProcedureReturn, Box) { + pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn { // Get a handle to the current tokio runtime, and pass it to the worker // so that it can execute futures. let rt = tokio::runtime::Handle::current(); - let (r, s) = self + *self .send_recv( JsWorkerReply::into_call_procedure, JsWorkerRequest::CallProcedure { params, rt }, ) - .await; - (*r, s) + .await } - pub async fn call_view(self: Box, cmd: ViewCommand) -> (ViewCommandResult, Box) { - let (r, s) = self + pub async fn call_view(&mut self, cmd: ViewCommand) -> ViewCommandResult { + *self .send_recv(JsWorkerReply::into_call_view, JsWorkerRequest::CallView { cmd }) - .await; - (*r, s) + .await } pub(in crate::host) async fn call_scheduled_function( - self: Box, + &mut self, params: ScheduledFunctionParams, - ) -> (CallScheduledFunctionResult, Box) { + ) -> CallScheduledFunctionResult { // Get a handle to the current tokio runtime, and pass it to the worker // so that it can execute futures. let rt = tokio::runtime::Handle::current(); @@ -434,6 +474,10 @@ static_assert_size!(JsWorkerReply, 48); // We care about optimizing for `CallReducer` as it happens frequently, // so we don't want to box anything in it. enum JsWorkerRequest { + /// See [`JsInstance::run_on_thread`] + /// + /// This variant does not expect a [`JsWorkerReply`]. + RunFunction(Box LocalBoxFuture<'static, ()> + Send>), /// See [`JsInstance::update_database`]. UpdateDatabase { program: Program, @@ -469,7 +513,7 @@ enum JsWorkerRequest { fn startup_instance_worker<'scope>( scope: &mut PinScope<'scope, '_>, program: Arc, - module_or_mcc: Either, + module_or_mcc: Either, ) -> anyhow::Result<(HookFunctions<'scope>, Either)> { // Start-up the user's module. eval_user_module_catch(scope, &program).map_err(DescribeError::Setup)?; @@ -507,11 +551,13 @@ fn new_isolate() -> OwnedIsolate { /// and that it has been validated. /// In that case, `Ok(_)` should be returned. /// -/// Otherwise, when [`ModuleCreationContextLimited`] is passed, +/// Otherwise, when [`ModuleCreationContext`] is passed, /// this is the first time both the module and instance are created. -fn spawn_instance_worker( +async fn spawn_instance_worker( program: Arc, - module_or_mcc: Either, + module_or_mcc: Either, + load_balance_guard: Arc, + mut core_pinner: CorePinner, ) -> anyhow::Result<(ModuleCommon, JsInstance)> { // Spawn channels for bidirectional communication between worker and instance. // The use-case is SPSC and all channels are rendezvous channels @@ -524,7 +570,12 @@ fn spawn_instance_worker( // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); + let rt = tokio::runtime::Handle::current(); + std::thread::spawn(move || { + let _guard = load_balance_guard; + core_pinner.pin_now(); + // Create the isolate and scope. let mut isolate = new_isolate(); scope_with_context!(let scope, &mut isolate, Context::new(scope, Default::default())); @@ -581,8 +632,11 @@ fn spawn_instance_worker( for request in request_rx.iter() { let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst); + core_pinner.pin_if_changed(); + use JsWorkerReply::*; match request { + JsWorkerRequest::RunFunction(f) => rt.block_on(f()), JsWorkerRequest::UpdateDatabase { program, old_module_info, @@ -667,7 +721,7 @@ fn spawn_instance_worker( }); // Get the module, if any, and get any setup errors from the worker. - let res: Result = result_rx.blocking_recv().expect("should have a sender"); + let res: Result = result_rx.await.expect("should have a sender"); res.map(|opt_mc| { let inst = JsInstance { request_tx, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 22fd7a1e050..ab459dd9c47 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -19,7 +19,7 @@ use crate::host::{ }; use crate::identity::Identity; use crate::messages::control_db::HostType; -use crate::module_host_context::ModuleCreationContextLimited; +use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; use crate::sql::execute::run_with_instance; @@ -294,7 +294,7 @@ pub enum DescribeError { impl WasmModuleHostActor { pub fn new( - mcc: ModuleCreationContextLimited, + mcc: ModuleCreationContext, module: T, ) -> Result<(Self, WasmModuleInstance), InitializationError> { log::trace!( diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index c565d768483..8bf760ae64b 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -3,8 +3,8 @@ use super::wasm_common::module_host_actor::{InitializationError, WasmModuleHostA use super::wasm_common::{abi, ModuleCreationError}; use crate::energy::{EnergyQuanta, FunctionBudget}; use crate::error::NodesError; -use crate::host::module_host::{Instance, ModuleRuntime}; use crate::module_host_context::ModuleCreationContext; +use crate::util::jobs::AllocatedJobCore; use anyhow::Context; use spacetimedb_paths::server::ServerDataDir; use std::borrow::Cow; @@ -106,13 +106,15 @@ impl WasmtimeRuntime { pub type Module = WasmModuleHostActor; pub type ModuleInstance = WasmModuleInstance; -impl ModuleRuntime for WasmtimeRuntime { - fn make_actor( +impl WasmtimeRuntime { + pub fn make_actor( &self, mcc: ModuleCreationContext, - ) -> anyhow::Result<(super::module_host::Module, super::module_host::Instance)> { + program_bytes: &[u8], + core: AllocatedJobCore, + ) -> anyhow::Result { let module = - wasmtime::Module::new(&self.engine, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; + wasmtime::Module::new(&self.engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?; let func_imports = module .imports() @@ -128,11 +130,12 @@ impl ModuleRuntime for WasmtimeRuntime { let module = WasmtimeModule::new(module); - let (module, init_inst) = WasmModuleHostActor::new(mcc.into_limited(), module)?; - let module = super::module_host::Module::Wasm(module); - let init_inst = Instance::Wasm(Box::new(init_inst)); - - Ok((module, init_inst)) + let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?; + Ok(super::module_host::ModuleWithInstance::Wasm { + module, + executor: core.spawn_async_executor(), + init_inst: Box::new(init_inst), + }) } } diff --git a/crates/core/src/module_host_context.rs b/crates/core/src/module_host_context.rs index 249e9a6a4c7..50f8c258a37 100644 --- a/crates/core/src/module_host_context.rs +++ b/crates/core/src/module_host_context.rs @@ -1,29 +1,10 @@ use crate::energy::EnergyMonitor; use crate::host::scheduler::Scheduler; use crate::replica_context::ReplicaContext; -use spacetimedb_datastore::traits::Program; use spacetimedb_sats::hash::Hash; use std::sync::Arc; -pub struct ModuleCreationContext<'a> { - pub replica_ctx: Arc, - pub scheduler: Scheduler, - pub program: &'a Program, - pub energy_monitor: Arc, -} - -impl ModuleCreationContext<'_> { - pub fn into_limited(self) -> ModuleCreationContextLimited { - ModuleCreationContextLimited { - replica_ctx: self.replica_ctx, - scheduler: self.scheduler, - program_hash: self.program.hash, - energy_monitor: self.energy_monitor, - } - } -} - -pub struct ModuleCreationContextLimited { +pub struct ModuleCreationContext { pub replica_ctx: Arc, pub scheduler: Scheduler, pub program_hash: Hash, diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 97dcd8a9a3e..f36861cfdd9 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -447,7 +447,7 @@ impl DatabaseCores { /// let mut rt = builder.build().unwrap(); /// let database_cores = cores.databases.make_database_runners(rt.handle()); /// ``` - pub fn make_database_runners(self, global_runtime: &tokio::runtime::Handle) -> JobCores { - JobCores::from_pinned_cores(self.0, global_runtime.clone()) + pub fn make_database_runners(self) -> JobCores { + JobCores::from_pinned_cores(self.0) } } diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 0a42f59db80..e11d18812dc 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,12 +1,15 @@ -use std::future::Future; +use std::panic::AssertUnwindSafe; use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; +use futures::future::LocalBoxFuture; +use futures::FutureExt; use indexmap::IndexMap; use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; use tokio::runtime; -use tokio::sync::watch; +use tokio::sync::{mpsc, oneshot, watch}; +use tracing::Instrument; /// A handle to a pool of Tokio executors for running database WASM code on. /// @@ -17,17 +20,11 @@ use tokio::sync::watch; /// /// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. /// A `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, -/// will use the "global" Tokio executor to run database jobs, -/// rather than creating multiple un-pinned single-threaded runtimes. -/// This means that long-running reducers or queries may block Tokio worker threads. +/// will spawn threads that are not pinned to any cores. /// /// This handle is cheaply cloneable, but at least one handle must be kept alive. -/// If all instances of it are dropped, the per-thread [`runtime::Runtime`]s will be dropped, -/// and so will stop executing jobs for databases. -/// -/// Dropping the last handle on a `JobCores` from an `async` context will panic, -/// as Tokio doesn't like to shut down nested runtimes. -/// To avoid this, keep a handle on the `JobCores` alive outside of the `async` runner. +/// If all instances of it are dropped, load-balancing will no longer occur when +/// threads exit or new threads are spawned. #[derive(Clone)] pub struct JobCores { inner: JobCoresInner, @@ -36,16 +33,16 @@ pub struct JobCores { #[derive(Clone)] enum JobCoresInner { PinnedCores(Arc>), - NoPinning(runtime::Handle), + NoPinning, } struct PinnedCoresExecutorManager { - /// Channels to request that a [`SingleCoreExecutor`] move to a different Tokio runtime. + /// Channels to request that a [`SingleCoreExecutor`] move to a different core. /// - /// Alongside each channel is the [`CoreId`] of the runtime to which that [`SingleCoreExecutor`] is currently pinned. - /// This is used as an index into `self.cores` to make load-balancing decisions when freeing a database executor - /// in [`Self::deallocate`]. - database_executor_move: HashMap)>, + /// The [`CoreId`] that an executor is pinned to is used as an index into + /// `self.cores` to make load-balancing decisions when freeing a database + /// executor in [`Self::deallocate`]. + database_executor_move: HashMap>, cores: IndexMap, /// An index into `cores` of the next core to put a new job onto. /// @@ -55,67 +52,29 @@ struct PinnedCoresExecutorManager { next_id: SingleCoreExecutorId, } -/// Stores the [`tokio::Runtime`] pinned to a particular core, -/// and remembers the [`SingleCoreExecutorId`]s for all databases sharing that executor. +/// Remembers the [`SingleCoreExecutorId`]s for all databases sharing that executor. +#[derive(Default)] struct CoreInfo { jobs: SmallVec<[SingleCoreExecutorId; 4]>, - tokio_runtime: runtime::Runtime, -} - -impl CoreInfo { - fn spawn_executor(id: CoreId) -> CoreInfo { - let runtime = runtime::Builder::new_multi_thread() - .worker_threads(1) - // [`SingleCoreExecutor`]s should only be executing Wasmtime WASM futures, - // and so should never be doing [`Tokio::spawn_blocking`] or performing blocking I/O. - // However, `max_blocking_threads` will panic if passed 0, so we set a limit of 1 - // and use `on_thread_start` to log an error when spawning a blocking task. - .max_blocking_threads(1) - // Enable the timer system so that `procedure_sleep_until` can work. - // TODO(procedure-sleep): Remove this. - .enable_time() - // Enable the IO system so that `procedure_http_request` can work. - // TODO(perf): Disable this and move HTTP requests to the global executor? - .enable_io() - .on_thread_start({ - use std::sync::atomic::{AtomicBool, Ordering}; - let already_spawned_worker = AtomicBool::new(false); - move || { - // `Ordering::Relaxed`: No synchronization is happening here; - // we're not writing to any other memory or coordinating with any other atomic places. - // We rely on Tokio's infrastructure to impose a happens-before relationship - // between spawning worker threads and spawning blocking threads itself. - if already_spawned_worker.swap(true, Ordering::Relaxed) { - // We're spawning a blocking thread, naughty! - log::error!( - "`JobCores` Tokio runtime for `SingleCoreExecutor` use on core {id:?} spawned a blocking thread!" - ); - } else { - // We're spawning our 1 worker, so pin it to the appropriate thread. - core_affinity::set_for_current(id); - } - } - }) - .build() - .expect("Failed to start Tokio executor for `SingleCoreExecutor`"); - CoreInfo { - jobs: SmallVec::new(), - tokio_runtime: runtime, - } - } } #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] struct SingleCoreExecutorId(usize); impl JobCores { - /// Get a handle on a [`SingleCoreExecutor`] to later run a database's jobs on. - pub fn take(&self) -> SingleCoreExecutor { - let database_executor_inner = match &self.inner { - JobCoresInner::NoPinning(handle) => SingleCoreExecutorInner::without_load_balancing(handle.clone()), - JobCoresInner::PinnedCores(manager) => SingleCoreExecutorInner::with_load_balancing(manager), - }; - SingleCoreExecutor::from_inner(database_executor_inner) + /// Get an [`AllocatedCore`] for a job thread. + pub fn take(&self) -> AllocatedJobCore { + match &self.inner { + JobCoresInner::NoPinning => AllocatedJobCore::default(), + JobCoresInner::PinnedCores(manager) => { + let manager_weak = Arc::downgrade(manager); + let (database_executor_id, pinner) = manager.lock().unwrap().allocate(); + let guard = LoadBalanceOnDropGuard { + inner: Some((manager_weak, database_executor_id)), + }; + AllocatedJobCore { guard, pinner } + } + } } /// Construct a [`JobCores`] which runs one Tokio runtime on each of the `cores`, @@ -123,10 +82,10 @@ impl JobCores { /// /// If `cores` is empty, this falls back to [`Self::without_pinned_cores`] /// and runs all databases in the `global_runtime`. - pub fn from_pinned_cores(cores: impl IntoIterator, global_runtime: runtime::Handle) -> Self { - let cores: IndexMap<_, _> = cores.into_iter().map(|id| (id, CoreInfo::spawn_executor(id))).collect(); + pub fn from_pinned_cores(cores: impl IntoIterator) -> Self { + let cores: IndexMap<_, _> = cores.into_iter().map(|id| (id, CoreInfo::default())).collect(); let inner = if cfg!(feature = "no-job-core-pinning") || cores.is_empty() { - JobCoresInner::NoPinning(global_runtime) + JobCoresInner::NoPinning } else { JobCoresInner::PinnedCores(Arc::new(Mutex::new(PinnedCoresExecutorManager { database_executor_move: HashMap::default(), @@ -144,15 +103,15 @@ impl JobCores { /// /// This will be used in deployments where there aren't enough available CPU cores /// to reserve specific cores for database WASM execution. - pub fn without_pinned_cores(global_runtime: runtime::Handle) -> Self { + pub const fn without_pinned_cores() -> Self { Self { - inner: JobCoresInner::NoPinning(global_runtime), + inner: JobCoresInner::NoPinning, } } } impl PinnedCoresExecutorManager { - /// Get a [`runtime::Handle`] for running database operations on, + /// Get a core for running database operations on, /// and store state in `self` necessary to move that database to a new runtime /// for load-balancing purposes. /// @@ -160,34 +119,36 @@ impl PinnedCoresExecutorManager { /// which should be passed to [`Self::deallocate`] when the database is no longer using this executor. /// This is done automatically by [`LoadBalanceOnDropGuard`]. /// - /// The returned `watch::Receiver` stores the Tokio [`runtime::Handle`] - /// on which the database should run its compute-intensive jobs. - /// This may occasionally be replaced with a new [`runtime::Handle`] to balance databases among available cores, - /// so databases should read from the [`watch::Receiver`] when spawning each job, - /// and should not spawn long-lived background tasks such as ones which loop over a channel. - fn allocate(&mut self) -> (SingleCoreExecutorId, watch::Receiver) { + /// The returned [`CorePinner`] stores the [`CoreId`] on which the database + /// should run its compute-intensive jobs. This may occasionally be + /// replaced to balance databases among available cores, so databases should + /// either spawn [`CorePinner::run`] as a thread-local async task, or call + /// [`CorePinner::pin_now`] frequently. + fn allocate(&mut self) -> (SingleCoreExecutorId, CorePinner) { // Determine the next job ID. let database_executor_id = self.next_id; self.next_id.0 += 1; // Put the job ID into the next core. - let (&core_id, runtime_handle) = { - let (core_id, core_info) = self + let core_id = { + let (&core_id, core_info) = self .cores .get_index_mut(self.next_core) .expect("`self.next_core < self.cores.len()`"); core_info.jobs.push(database_executor_id); - (core_id, core_info.tokio_runtime.handle().clone()) + core_id }; // Move the next core one ahead, wrapping around the number of cores we have. self.next_core = (self.next_core + 1) % self.cores.len(); // Record channels and details for moving a job to a different core. - let (move_runtime_tx, move_runtime_rx) = watch::channel(runtime_handle); - self.database_executor_move - .insert(database_executor_id, (core_id, move_runtime_tx)); + let (move_core_tx, move_core_rx) = watch::channel(core_id); + self.database_executor_move.insert(database_executor_id, move_core_tx); - (database_executor_id, move_runtime_rx) + let core_pinner = CorePinner { + move_core_rx: Some(move_core_rx), + }; + (database_executor_id, core_pinner) } /// Mark the executor at `id` as no longer in use, free internal state which tracks it, @@ -198,10 +159,11 @@ impl PinnedCoresExecutorManager { // Determine the `CoreId` that will now have one less job. // The `id`s came from `self.allocate()`, // so there must be a `database_executor_move` for it. - let (freed_core_id, _) = self + let freed_core_id = *self .database_executor_move .remove(&id) - .expect("there should be a `database_executor_move` for `id`"); + .expect("there should be a `database_executor_move` for `id`") + .borrow(); let core_index = self.cores.get_index_of(&freed_core_id).unwrap(); @@ -235,15 +197,56 @@ impl PinnedCoresExecutorManager { // likely to be repinned, while younger ones are liable to bounce around. // Our use of `swap_remove` above makes this not entirely predictable, however. core_info.jobs.push(stolen); - let (ref mut stolen_core_id, migrate_tx) = self.database_executor_move.get_mut(&stolen).unwrap(); - *stolen_core_id = freed_core_id; - migrate_tx.send_replace(core_info.tokio_runtime.handle().clone()); + let migrate_tx = &self.database_executor_move[&stolen]; + migrate_tx.send_replace(freed_core_id); } self.next_core = steal_from_index; } } +#[derive(Default)] +pub struct AllocatedJobCore { + pub guard: LoadBalanceOnDropGuard, + pub pinner: CorePinner, +} + +impl AllocatedJobCore { + pub fn spawn_async_executor(self) -> SingleCoreExecutor { + SingleCoreExecutor::spawn(self) + } +} + +#[derive(Default, Clone)] +pub struct CorePinner { + move_core_rx: Option>, +} + +impl CorePinner { + pub fn pin_now(&mut self) { + if let Some(move_core_rx) = &mut self.move_core_rx { + let core_id = *move_core_rx.borrow_and_update(); + core_affinity::set_for_current(core_id); + } + } + pub fn pin_if_changed(&mut self) { + if let Some(move_core_rx) = &mut self.move_core_rx { + if let Ok(true) = move_core_rx.has_changed() { + let core_id = *move_core_rx.borrow_and_update(); + core_affinity::set_for_current(core_id); + } + } + } + pub async fn run(self) { + if let Some(mut move_core_rx) = self.move_core_rx { + while move_core_rx.changed().await.is_ok() { + let core_id = *move_core_rx.borrow_and_update(); + core_affinity::set_for_current(core_id); + } + } + } +} + /// A handle to a Tokio executor which can be used to run WASM compute for a particular database. /// /// Use [`Self::run_job`] to run futures, and [`Self::run_sync_job`] to run functions. @@ -258,41 +261,44 @@ pub struct SingleCoreExecutor { } struct SingleCoreExecutorInner { - /// Handle on the [`runtime::Runtime`] where this executor should run jobs. - /// - /// This will be occasionally updated by [`PinnedCoresExecutorManager::deallocate`] - /// to evenly distribute databases across the available runtimes/cores. - runtime: watch::Receiver, - - /// [`Drop`] guard which calls [`PinnedCoresExecutorManager::deallocate`] when this database dies, - /// allowing another database from a more-contended runtime/core to migrate here. - _guard: Option, + /// The sending end of a channel over which we send jobs. + job_tx: mpsc::UnboundedSender LocalBoxFuture<'static, ()> + Send>>, } -impl SingleCoreExecutorInner { - fn without_load_balancing(handle: runtime::Handle) -> Self { - SingleCoreExecutorInner { - runtime: watch::channel(handle).1, - _guard: None, - } - } +impl SingleCoreExecutor { + /// Spawn a `SingleCoreExecutor` on the given core. + fn spawn(core: AllocatedJobCore) -> Self { + let AllocatedJobCore { guard, mut pinner } = core; - fn with_load_balancing(manager: &Arc>) -> Self { - let manager_weak = Arc::downgrade(manager); - let (database_executor_id, move_runtime_rx) = manager.lock().unwrap().allocate(); - SingleCoreExecutorInner { - runtime: move_runtime_rx, - _guard: Some(LoadBalanceOnDropGuard { - manager: manager_weak, - database_executor_id, - }), - } - } -} + let (job_tx, mut job_rx) = mpsc::unbounded_channel(); -impl SingleCoreExecutor { - fn from_inner(inner: SingleCoreExecutorInner) -> Self { - Self { inner: Arc::new(inner) } + let inner = Arc::new(SingleCoreExecutorInner { job_tx }); + + let rt = runtime::Handle::current(); + std::thread::spawn(move || { + let _guard = guard; + pinner.pin_now(); + + let _entered = rt.enter(); + let local = tokio::task::LocalSet::new(); + + let job_loop = async { + while let Some(job) = job_rx.recv().await { + local.spawn_local(job()); + } + }; + + // Run the pinner on the same task as the job loop, so that the pinner still + // being alive doesn't prevent the runtime thread from ending. + rt.block_on(local.run_until(super::also_poll(job_loop, pinner.run()))); + + // The sender has closed; finish out any remaining tasks left on the set. + // This is very important to do - otherwise, in-progress tasks will be + // dropped and cancelled. + rt.block_on(local) + }); + + Self { inner } } /// Create a `SingleCoreExecutor` which runs jobs in [`tokio::runtime::Handle::current`]. @@ -302,26 +308,34 @@ impl SingleCoreExecutor { /// This method should only be used for short-lived instances which do not perform intense computation, /// e.g. to extract the schema by calling `describe_module`. pub fn in_current_tokio_runtime() -> Self { - Self::from_inner(SingleCoreExecutorInner::without_load_balancing( - runtime::Handle::current(), - )) + Self::spawn(AllocatedJobCore::default()) } /// Run a job for this database executor. - /// - /// `f` must not perform any `Tokio::spawn_blocking` blocking operations. pub async fn run_job(&self, f: F) -> R where - F: Future + Send + 'static, + F: AsyncFnOnce() -> R + Send + 'static, R: Send + 'static, { - // Clone the handle rather than holding the `watch::Ref` alive - // because `watch::Ref` is not `Send`. - let handle = runtime::Handle::clone(&*self.inner.runtime.borrow()); + let span = tracing::Span::current(); + let (tx, rx) = oneshot::channel(); + + self.inner + .job_tx + .send(Box::new(move || { + async move { + let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; + if let Err(Err(_panic)) = tx.send(result) { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + })) + .unwrap_or_else(|_| panic!("job thread exited")); - match handle.spawn(f).await { + match rx.await.unwrap() { Ok(r) => r, - Err(e) => std::panic::resume_unwind(e.into_panic()), + Err(e) => std::panic::resume_unwind(e), } } @@ -331,40 +345,23 @@ impl SingleCoreExecutor { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - self.run_job(async { f() }).await - } - - pub fn downgrade(&self) -> WeakSingleCoreExecutor { - WeakSingleCoreExecutor { - inner: Arc::downgrade(&self.inner), - } + self.run_job(async || f()).await } } -/// On drop, tells the [`JobCores`] that this database is no longer occupying its Tokio runtime, +/// On drop, tells the [`JobCores`] that this database is no longer occupying its core, /// allowing databases from more-contended runtimes/cores to migrate there. -struct LoadBalanceOnDropGuard { - manager: Weak>, - database_executor_id: SingleCoreExecutorId, +#[derive(Default)] +pub struct LoadBalanceOnDropGuard { + inner: Option<(Weak>, SingleCoreExecutorId)>, } impl Drop for LoadBalanceOnDropGuard { fn drop(&mut self) { - if let Some(cores) = self.manager.upgrade() { - cores.lock().unwrap().deallocate(self.database_executor_id); + if let Some((manager, database_executor_id)) = &self.inner { + if let Some(cores) = manager.upgrade() { + cores.lock().unwrap().deallocate(*database_executor_id); + } } } } - -/// A weak version of `JobThread` that does not hold the thread open. -// used in crate::core::module_host::WeakModuleHost -#[derive(Clone)] -pub struct WeakSingleCoreExecutor { - inner: Weak, -} - -impl WeakSingleCoreExecutor { - pub fn upgrade(&self) -> Option { - self.inner.upgrade().map(|inner| SingleCoreExecutor { inner }) - } -} diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 900042632db..0817d4dbe52 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -597,7 +597,7 @@ pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path: args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()]) } let args = start::cli().try_get_matches_from(args)?; - start::exec(&args, JobCores::without_pinned_cores(tokio::runtime::Handle::current())).await + start::exec(&args, JobCores::without_pinned_cores()).await } #[cfg(test)] @@ -637,22 +637,13 @@ mod tests { websocket: WebSocketOptions::default(), }; - let _env = StandaloneEnv::init( - config, - &ca, - data_dir.clone(), - JobCores::without_pinned_cores(tokio::runtime::Handle::current()), - ) - .await?; + let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?; // Ensure that we have a lock. - assert!(StandaloneEnv::init( - config, - &ca, - data_dir.clone(), - JobCores::without_pinned_cores(tokio::runtime::Handle::current()) - ) - .await - .is_err()); + assert!( + StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()) + .await + .is_err() + ); Ok(()) } diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index ca266f27d76..10665ff8a09 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -76,7 +76,7 @@ fn main() -> anyhow::Result<()> { cores.tokio.configure(&mut builder); let rt = builder.build().unwrap(); cores.rayon.configure(rt.handle()); - let database_cores = cores.databases.make_database_runners(rt.handle()); + let database_cores = cores.databases.make_database_runners(); // Keep a handle on the `database_cores` alive outside of `async_main` // and explicitly drop it to avoid dropping it from an `async` context - diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 68ae6cf5f44..0e659f144d1 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -204,7 +204,7 @@ impl CompiledModule { }, &certs, paths.data_dir.into(), - JobCores::without_pinned_cores(tokio::runtime::Handle::current()), + JobCores::without_pinned_cores(), ) .await .unwrap(); From 9da74d909667454964f240f546265ba3a02801e2 Mon Sep 17 00:00:00 2001 From: Noa Date: Mon, 26 Jan 2026 14:37:25 -0600 Subject: [PATCH 2/2] Run http_request on a tokio task --- crates/core/src/host/instance_env.rs | 7 +++++-- crates/core/src/host/v8/mod.rs | 29 ++++++++-------------------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 9192ef7587a..e846026bd9a 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -10,6 +10,7 @@ use crate::subscription::module_subscription_manager::{from_tx_offset, Transacti use crate::util::prometheus_handle::IntGaugeExt; use chrono::{DateTime, Utc}; use core::mem; +use futures::TryFutureExt; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_client_api_messages::energy::EnergyQuanta; @@ -865,7 +866,8 @@ impl InstanceEnv { // TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call. let execute_fut = reqwest::Client::new().execute(reqwest); - let response_fut = async { + // Run the future that does IO work on a tokio worker thread, where it's more efficent. + let response_fut = tokio::spawn(async { // `reqwest::Error` may contain sensitive info, namely the full URL with query params. // We'll strip those with `strip_query_params_from_eqwest_error` // after `await`ing `response_fut` below. @@ -880,7 +882,8 @@ impl InstanceEnv { let body = http_body_util::BodyExt::collect(body).await?.to_bytes(); Ok((response, body)) - }; + }) + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())); let database_identity = *self.database_identity(); diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index d6eedd2359b..864260eb6fe 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -421,13 +421,10 @@ impl JsInstance { } pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn { - // Get a handle to the current tokio runtime, and pass it to the worker - // so that it can execute futures. - let rt = tokio::runtime::Handle::current(); *self .send_recv( JsWorkerReply::into_call_procedure, - JsWorkerRequest::CallProcedure { params, rt }, + JsWorkerRequest::CallProcedure { params }, ) .await } @@ -442,12 +439,9 @@ impl JsInstance { &mut self, params: ScheduledFunctionParams, ) -> CallScheduledFunctionResult { - // Get a handle to the current tokio runtime, and pass it to the worker - // so that it can execute futures. - let rt = tokio::runtime::Handle::current(); self.send_recv( JsWorkerReply::into_call_scheduled_function, - JsWorkerRequest::CallScheduledFunction(params, rt), + JsWorkerRequest::CallScheduledFunction(params), ) .await } @@ -489,10 +483,7 @@ enum JsWorkerRequest { /// See [`JsInstance::call_view`]. CallView { cmd: ViewCommand }, /// See [`JsInstance::call_procedure`]. - CallProcedure { - params: CallProcedureParams, - rt: tokio::runtime::Handle, - }, + CallProcedure { params: CallProcedureParams }, /// See [`JsInstance::clear_all_clients`]. ClearAllClients, /// See [`JsInstance::call_identity_connected`]. @@ -504,7 +495,7 @@ enum JsWorkerRequest { /// See [`JsInstance::init_database`]. InitDatabase(Program), /// See [`JsInstance::call_scheduled_function`]. - CallScheduledFunction(ScheduledFunctionParams, tokio::runtime::Handle), + CallScheduledFunction(ScheduledFunctionParams), } /// Performs some of the startup work of [`spawn_instance_worker`]. @@ -576,6 +567,8 @@ async fn spawn_instance_worker( let _guard = load_balance_guard; core_pinner.pin_now(); + let _entered = rt.enter(); + // Create the isolate and scope. let mut isolate = new_isolate(); scope_with_context!(let scope, &mut isolate, Context::new(scope, Default::default())); @@ -659,11 +652,7 @@ async fn spawn_instance_worker( let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst); reply("call_view", JsWorkerReply::CallView(res.into()), trapped); } - JsWorkerRequest::CallProcedure { params, rt } => { - // The callee passed us a handle to their tokio runtime - enter its - // context so that we can execute futures. - let _guard = rt.enter(); - + JsWorkerRequest::CallProcedure { params } => { let (res, trapped) = instance_common .call_procedure(params, &mut inst) .now_or_never() @@ -707,9 +696,7 @@ async fn spawn_instance_worker( init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); reply("init_database", InitDatabase(Box::new(res)), trapped); } - JsWorkerRequest::CallScheduledFunction(params, rt) => { - let _guard = rt.enter(); - + JsWorkerRequest::CallScheduledFunction(params) => { let (res, trapped) = instance_common .call_scheduled_function(params, &mut inst) .now_or_never()