diff --git a/drivers/std/src/executor.rs b/drivers/std/src/executor.rs index 0decf3d5..437c5221 100644 --- a/drivers/std/src/executor.rs +++ b/drivers/std/src/executor.rs @@ -1,34 +1,102 @@ use embassy_executor::{Spawner, raw}; +use file_system::DirectBaseOperations; use std::boxed::Box; use std::marker::PhantomData; use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; use std::sync::{Condvar, Mutex}; use synchronization::blocking_mutex::raw::CriticalSectionRawMutex; use synchronization::signal::Signal; -use task::SpawnerIdentifier; +use task::{ExecutorStatisticsSnapshot, ExecutorWithStatistics, SpawnerIdentifier}; + +use crate::devices::TimeDevice; /// Single-threaded std-based executor. pub struct Executor { inner: raw::Executor, not_send: PhantomData<*mut ()>, signaler: &'static Signaler, + statistics: &'static ExecutorStatistics, stop: AtomicBool, } +pub struct ExecutorStatistics { + busy_ticks: AtomicU64, + idle_ticks: AtomicU64, +} + +impl ExecutorStatistics { + const fn new() -> Self { + Self { + busy_ticks: AtomicU64::new(0), + idle_ticks: AtomicU64::new(0), + } + } + + fn record_busy(&self, elapsed_ticks: u64) { + self.busy_ticks + .fetch_add(elapsed_ticks, std::sync::atomic::Ordering::Relaxed); + } + + fn record_idle(&self, elapsed_ticks: u64) { + self.idle_ticks + .fetch_add(elapsed_ticks, std::sync::atomic::Ordering::Relaxed); + } +} + +fn get_current_ticks_from_time_device() -> u64 { + let mut current_time = core::time::Duration::default(); + + let current_time_raw = unsafe { + core::slice::from_raw_parts_mut( + &mut current_time as *mut core::time::Duration as *mut u8, + core::mem::size_of::(), + ) + }; + + if TimeDevice.read(current_time_raw, 0).is_err() { + return 0; + } + + current_time.as_nanos() as u64 +} + +impl ExecutorStatistics { + fn snapshot(&self) -> ExecutorStatisticsSnapshot { + ExecutorStatisticsSnapshot { + busy_ticks: self.busy_ticks.load(std::sync::atomic::Ordering::Relaxed), + idle_ticks: self.idle_ticks.load(std::sync::atomic::Ordering::Relaxed), + } + } +} + impl Default for Executor { fn default() -> Self { Self::new() } } +impl ExecutorWithStatistics for Executor { + fn spawner(&'static self) -> Spawner { + Executor::spawner(self) + } + + fn statistics_snapshot(&self) -> Option { + Some(self.statistics.snapshot()) + } +} + impl Executor { /// Create a new Executor. pub fn new() -> Self { let signaler = Box::leak(Box::new(Signaler::new())); + let statistics = Box::leak(Box::new(ExecutorStatistics::new())); + Self { inner: raw::Executor::new(signaler as *mut Signaler as *mut ()), not_send: PhantomData, signaler, + statistics, stop: AtomicBool::new(false), } } @@ -69,17 +137,35 @@ impl Executor { init(self.inner.spawner(), self); while !self.stop.load(std::sync::atomic::Ordering::SeqCst) { + let poll_started = get_current_ticks_from_time_device(); unsafe { self.inner.poll() }; + let poll_ended = get_current_ticks_from_time_device(); + let poll_elapsed = poll_ended.saturating_sub(poll_started); + self.statistics.record_busy(poll_elapsed); + + let wait_started = get_current_ticks_from_time_device(); self.signaler.wait(); + let wait_ended = get_current_ticks_from_time_device(); + let wait_elapsed = wait_ended.saturating_sub(wait_started); + self.statistics.record_idle(wait_elapsed); } } - pub fn start(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + pub fn start(&'static self, init: impl FnOnce(Spawner)) -> ! { init(self.inner.spawner()); loop { + let poll_started = get_current_ticks_from_time_device(); unsafe { self.inner.poll() }; - self.signaler.wait() + let poll_ended = get_current_ticks_from_time_device(); + let poll_elapsed = poll_ended.saturating_sub(poll_started); + self.statistics.record_busy(poll_elapsed); + + let wait_started = get_current_ticks_from_time_device(); + self.signaler.wait(); + let wait_ended = get_current_ticks_from_time_device(); + let wait_elapsed = wait_ended.saturating_sub(wait_started); + self.statistics.record_idle(wait_elapsed); } } } @@ -138,9 +224,13 @@ pub async fn new_thread_executor() -> SpawnerIdentifier { std::thread::spawn(move || { // Use Box::leak to create a 'static reference for this thread's executor let executor = Box::leak(Box::new(Executor::new())); + let executor_ref: &'static Executor = executor; executor.start(move |spawner: Spawner| { - let spawner_id = task_manager.register_spawner(spawner).unwrap(); + let _ = executor_ref; + let spawner_id = task_manager + .register_spawner_with_executor(spawner, Some(executor_ref)) + .unwrap(); signal.signal(spawner_id); }); diff --git a/drivers/wasm/src/executor.rs b/drivers/wasm/src/executor.rs index 41872483..0534b922 100644 --- a/drivers/wasm/src/executor.rs +++ b/drivers/wasm/src/executor.rs @@ -1,4 +1,167 @@ -pub use embassy_executor::Executor; +use core::marker::PhantomData; +use core::mem::MaybeUninit; +use core::ptr; +use core::sync::atomic::{AtomicU64, Ordering}; + +use embassy_executor::{Spawner, raw}; +use file_system::DirectBaseOperations; +use js_sys::Promise; +use task::{ExecutorStatisticsSnapshot, ExecutorWithStatistics}; +use wasm_bindgen::prelude::*; +extern crate alloc; +use alloc::boxed::Box; + +use crate::devices::TimeDevice; + +#[unsafe(export_name = "__pender")] +fn __pender(context: *mut ()) { + let signaler: &'static WasmContext = unsafe { core::mem::transmute(context) }; + let _ = signaler.promise.then(unsafe { signaler.closure.as_mut() }); +} + +struct UninitCell(MaybeUninit>); + +impl UninitCell { + const fn uninit() -> Self { + Self(core::mem::MaybeUninit::uninit()) + } + + unsafe fn as_mut_ptr(&self) -> *mut T { + unsafe { (*self.0.as_ptr()).get() } + } + + unsafe fn write_in_place(&self, func: impl FnOnce() -> T) { + unsafe { ptr::write(self.as_mut_ptr(), func()) }; + } + + unsafe fn as_mut(&self) -> &mut T { + unsafe { &mut *self.as_mut_ptr() } + } +} + +unsafe impl Sync for UninitCell {} + +struct WasmContext { + promise: Promise, + closure: UninitCell>, +} + +impl WasmContext { + fn new() -> Self { + Self { + promise: Promise::resolve(&JsValue::undefined()), + closure: UninitCell::uninit(), + } + } +} + +pub struct Executor { + inner: raw::Executor, + ctx: &'static WasmContext, + statistics: &'static ExecutorStatistics, + not_send: PhantomData<*mut ()>, +} + +pub struct ExecutorStatistics { + busy_ticks: AtomicU64, + idle_ticks: AtomicU64, + last_poll_end_ticks: AtomicU64, +} + +impl ExecutorStatistics { + const fn new() -> Self { + Self { + busy_ticks: AtomicU64::new(0), + idle_ticks: AtomicU64::new(0), + last_poll_end_ticks: AtomicU64::new(0), + } + } + + fn record_poll(&self, poll_start_ticks: u64, poll_end_ticks: u64) { + let previous_poll_end = self + .last_poll_end_ticks + .swap(poll_end_ticks, Ordering::Relaxed); + + if previous_poll_end > 0 && poll_start_ticks > previous_poll_end { + self.idle_ticks + .fetch_add(poll_start_ticks - previous_poll_end, Ordering::Relaxed); + } + + if poll_end_ticks > poll_start_ticks { + self.busy_ticks + .fetch_add(poll_end_ticks - poll_start_ticks, Ordering::Relaxed); + } + } +} + +impl ExecutorStatistics { + fn snapshot(&self) -> ExecutorStatisticsSnapshot { + ExecutorStatisticsSnapshot { + busy_ticks: self.busy_ticks.load(Ordering::Relaxed), + idle_ticks: self.idle_ticks.load(Ordering::Relaxed), + } + } +} + +impl Executor { + pub fn new() -> Self { + let ctx = Box::leak(Box::new(WasmContext::new())); + let statistics = Box::leak(Box::new(ExecutorStatistics::new())); + + Self { + inner: raw::Executor::new(ctx as *mut WasmContext as *mut ()), + ctx, + statistics, + not_send: PhantomData, + } + } + + pub fn start(&'static self, init: impl FnOnce(Spawner)) { + unsafe { + let executor = &self.inner; + let statistics = self.statistics; + let future = Closure::new(move |_| { + let poll_start = get_current_ticks_from_time_device(); + executor.poll(); + let poll_end = get_current_ticks_from_time_device(); + statistics.record_poll(poll_start, poll_end); + }); + self.ctx.closure.write_in_place(|| future); + init(self.inner.spawner()); + } + } + + pub fn spawner(&'static self) -> Spawner { + self.inner.spawner() + } +} + +impl ExecutorWithStatistics for Executor { + fn spawner(&'static self) -> Spawner { + Executor::spawner(self) + } + + fn statistics_snapshot(&self) -> Option { + Some(self.statistics.snapshot()) + } +} + +fn get_current_ticks_from_time_device() -> u64 { + let mut current_time = core::time::Duration::default(); + + let current_time_raw = unsafe { + core::slice::from_raw_parts_mut( + &mut current_time as *mut core::time::Duration as *mut u8, + core::mem::size_of::(), + ) + }; + + if TimeDevice.read(current_time_raw, 0).is_err() { + return 0; + } + + current_time.as_nanos() as u64 +} #[macro_export] macro_rules! instantiate_static_executor { diff --git a/modules/task/src/executor_statistics.rs b/modules/task/src/executor_statistics.rs new file mode 100644 index 00000000..c374036e --- /dev/null +++ b/modules/task/src/executor_statistics.rs @@ -0,0 +1,32 @@ +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct ExecutorStatisticsSnapshot { + pub busy_ticks: u64, + pub idle_ticks: u64, +} + +impl ExecutorStatisticsSnapshot { + pub const fn new(busy_ticks: u64, idle_ticks: u64) -> Self { + Self { + busy_ticks, + idle_ticks, + } + } + + pub fn idle_ratio_basis_points(&self) -> u16 { + let total = self.busy_ticks.saturating_add(self.idle_ticks); + + if total == 0 { + return 0; + } + + let ratio = self.idle_ticks.saturating_mul(10_000) / total; + + ratio.min(10_000) as u16 + } +} + +pub trait ExecutorWithStatistics { + fn spawner(&'static self) -> embassy_executor::Spawner; + + fn statistics_snapshot(&self) -> Option; +} diff --git a/modules/task/src/lib.rs b/modules/task/src/lib.rs index 2f86ee6e..bc842ec9 100644 --- a/modules/task/src/lib.rs +++ b/modules/task/src/lib.rs @@ -7,6 +7,7 @@ extern crate std; mod environment_variable; mod error; +mod executor_statistics; mod join_handle; mod manager; mod signal; @@ -18,6 +19,7 @@ use embassy_time::Timer; pub use embassy_executor; pub use environment_variable::*; pub use error::*; +pub use executor_statistics::*; pub use join_handle::*; pub use manager::*; pub use signal::*; diff --git a/modules/task/src/manager/mod.rs b/modules/task/src/manager/mod.rs index a7166dad..b04fd22a 100644 --- a/modules/task/src/manager/mod.rs +++ b/modules/task/src/manager/mod.rs @@ -21,6 +21,7 @@ pub use spawner::*; // Manager module - core Manager structure and initialization +use crate::ExecutorWithStatistics; use crate::manager::Metadata; use alloc::collections::BTreeMap; @@ -42,6 +43,7 @@ pub(crate) struct Inner { pub(crate) tasks: BTreeMap, pub(crate) identifiers: BTreeMap, pub(crate) spawners: BTreeMap, + pub(crate) executors: BTreeMap>, } unsafe impl Send for Manager {} @@ -61,6 +63,7 @@ impl Manager { tasks: BTreeMap::new(), identifiers: BTreeMap::new(), spawners: BTreeMap::new(), + executors: BTreeMap::new(), })) } } diff --git a/modules/task/src/manager/spawner.rs b/modules/task/src/manager/spawner.rs index 4247eb35..0374fe4c 100644 --- a/modules/task/src/manager/spawner.rs +++ b/modules/task/src/manager/spawner.rs @@ -7,8 +7,32 @@ use embassy_executor::Spawner; pub type SpawnerIdentifier = usize; +const IDLE_RATIO_GAP_THRESHOLD_BASIS_POINTS: u16 = 1_000; + +#[derive(Clone, Copy, Debug)] +pub(crate) struct SpawnerCandidate { + pub(crate) identifier: SpawnerIdentifier, + pub(crate) telemetry: Option, + pub(crate) task_count: usize, +} + impl Manager { + pub fn register_executor( + &'static self, + executor: &'static ExecutorType, + ) -> Result { + self.register_spawner_with_executor(executor.spawner(), Some(executor)) + } + pub fn register_spawner(&'static self, spawner: Spawner) -> Result { + self.register_spawner_with_executor(spawner, None) + } + + pub fn register_spawner_with_executor( + &'static self, + spawner: Spawner, + executor: Option<&'static dyn ExecutorWithStatistics>, + ) -> Result { let mut inner = embassy_futures::block_on(self.0.write()); let identifier = Self::find_first_available_identifier( @@ -21,6 +45,8 @@ impl Manager { unreachable!("Spawner identifier already exists"); } + inner.executors.insert(identifier, executor); + Ok(identifier) } @@ -32,6 +58,8 @@ impl Manager { .remove(&identifier) .ok_or(Error::NoSpawnerAvailable)?; + inner.executors.remove(&identifier); + Ok(()) } @@ -41,30 +69,87 @@ impl Manager { return Err(Error::NoSpawnerAvailable); } - let mut map = BTreeMap::new(); + let mut task_count_per_spawner = BTreeMap::new(); for identifier in inner.spawners.keys() { - map.insert(*identifier, 0); // Initialize all spawners with a load of 0 + task_count_per_spawner.insert(*identifier, 0); } for metadata in inner.tasks.values() { - if let Some(load) = map.get_mut(&metadata.spawner_identifier) { - *load += 1; // Increment the load for the spawner + if let Some(load) = task_count_per_spawner.get_mut(&metadata.spawner_identifier) { + *load += 1; + } + } + + let mut candidates = alloc::vec::Vec::new(); + + for (identifier, task_count) in task_count_per_spawner { + let telemetry = inner + .executors + .get(&identifier) + .and_then(|executor| executor.and_then(|executor| executor.statistics_snapshot())); + + candidates.push(SpawnerCandidate { + identifier, + telemetry, + task_count, + }); + } + + Self::choose_spawner_from_candidates(&candidates) + } + + pub(crate) fn choose_spawner_from_candidates( + candidates: &[SpawnerCandidate], + ) -> Result { + if candidates.is_empty() { + return Err(Error::NoSpawnerAvailable); + } + + let mut best_telemetry_candidate = None; + let mut second_best_idle_ratio = 0u16; + + for candidate in candidates { + let Some(snapshot) = candidate.telemetry else { + continue; + }; + + let idle_ratio = snapshot.idle_ratio_basis_points(); + + if let Some((_, best_idle_ratio)) = best_telemetry_candidate { + if idle_ratio > best_idle_ratio { + second_best_idle_ratio = best_idle_ratio; + best_telemetry_candidate = Some((candidate.identifier, idle_ratio)); + } else if idle_ratio > second_best_idle_ratio { + second_best_idle_ratio = idle_ratio; + } + } else { + best_telemetry_candidate = Some((candidate.identifier, idle_ratio)); + } + } + + if let Some((best_identifier, best_idle_ratio)) = best_telemetry_candidate { + let idle_gap = best_idle_ratio.saturating_sub(second_best_idle_ratio); + + if idle_gap >= IDLE_RATIO_GAP_THRESHOLD_BASIS_POINTS { + return Ok(best_identifier); } } - // Find the spawner with the lowest load score - let mut best_index = 0; - let mut best_score = usize::MAX; + let mut best_identifier = candidates[0].identifier; + let mut best_task_count = candidates[0].task_count; - for (identifier, spawner) in map.iter() { - if *spawner < best_score { - best_score = *spawner; - best_index = *identifier; + for candidate in candidates.iter().skip(1) { + if candidate.task_count < best_task_count + || (candidate.task_count == best_task_count + && candidate.identifier < best_identifier) + { + best_task_count = candidate.task_count; + best_identifier = candidate.identifier; } } - Ok(best_index) + Ok(best_identifier) } pub async fn get_spawner(&self, task: TaskIdentifier) -> Result { diff --git a/modules/task/src/manager/tests.rs b/modules/task/src/manager/tests.rs index 4979cd27..b958d38c 100644 --- a/modules/task/src/manager/tests.rs +++ b/modules/task/src/manager/tests.rs @@ -3,10 +3,29 @@ extern crate std; use super::*; use crate::test; -use alloc::{collections::BTreeMap, format, vec::Vec}; +use alloc::{boxed::Box, collections::BTreeMap, format, string::String, vec::Vec}; use core::time::Duration; use users::{GroupIdentifier, UserIdentifier}; +struct TestExecutorWithStatistics { + spawner: embassy_executor::Spawner, + snapshot: Option, +} + +impl crate::ExecutorWithStatistics for TestExecutorWithStatistics { + fn spawner(&'static self) -> embassy_executor::Spawner { + self.spawner + } + + fn statistics_snapshot(&self) -> Option { + self.snapshot + } +} + +async fn read_executor_stats_device(_spawner_identifier: SpawnerIdentifier) -> Result { + Err(Error::NotInitialized) +} + #[test(task_path = crate)] async fn test_get_task_name() { let manager = initialize(); @@ -864,3 +883,121 @@ async fn test_spawner_reuse_after_unregister() { let current_spawner = manager.get_spawner(current_task).await.unwrap(); assert!(current_spawner != usize::MAX); } + +#[test(task_path = crate)] +async fn test_choose_spawner_prefers_statistics_busy_idle_gap() { + let candidates = [ + SpawnerCandidate { + identifier: 1, + telemetry: Some(crate::ExecutorStatisticsSnapshot::new(8_000, 2_000)), + task_count: 4, + }, + SpawnerCandidate { + identifier: 2, + telemetry: Some(crate::ExecutorStatisticsSnapshot::new(4_000, 6_000)), + task_count: 1, + }, + ]; + + let selected = Manager::choose_spawner_from_candidates(&candidates).unwrap(); + + assert_eq!(selected, 2); +} + +#[test(task_path = crate)] +async fn test_choose_spawner_falls_back_when_statistics_gap_small() { + let candidates = [ + SpawnerCandidate { + identifier: 3, + telemetry: Some(crate::ExecutorStatisticsSnapshot::new(4_500, 5_500)), + task_count: 2, + }, + SpawnerCandidate { + identifier: 4, + telemetry: Some(crate::ExecutorStatisticsSnapshot::new(4_200, 5_800)), + task_count: 1, + }, + ]; + + let selected = Manager::choose_spawner_from_candidates(&candidates).unwrap(); + + assert_eq!(selected, 4); +} + +#[test(task_path = crate)] +async fn test_executor_stats_device_is_mounted_and_readable() { + let manager = get_instance(); + let task = manager.get_current_task_identifier().await; + let current_spawner_identifier = manager.get_spawner(task).await.unwrap(); + let spawner = { + let inner = manager.0.read().await; + *inner.spawners.get(¤t_spawner_identifier).unwrap() + }; + + let executor = Box::leak(Box::new(TestExecutorWithStatistics { + spawner, + snapshot: Some(crate::ExecutorStatisticsSnapshot::new(8_000, 2_000)), + })); + + let spawner_identifier = manager.register_executor(executor).unwrap(); + + let output = read_executor_stats_device(spawner_identifier) + .await + .unwrap(); + + assert!(output.contains(&format!("executor_id={spawner_identifier}"))); + assert!(output.contains("busy_ticks=")); + assert!(output.contains("idle_ticks=")); + assert!(output.contains("total_ticks=")); + assert!(output.contains("idle_ratio_bp=")); + + manager.unregister_spawner(spawner_identifier).unwrap(); +} + +#[test(task_path = crate)] +async fn test_executor_stats_device_reports_unavailable_without_telemetry() { + let manager = get_instance(); + let task = manager.get_current_task_identifier().await; + let current_spawner_identifier = manager.get_spawner(task).await.unwrap(); + let spawner = { + let inner = manager.0.read().await; + *inner.spawners.get(¤t_spawner_identifier).unwrap() + }; + + let spawner_identifier = manager.register_spawner(spawner).unwrap(); + + let output = read_executor_stats_device(spawner_identifier) + .await + .unwrap(); + + assert!(output.contains(&format!("executor_id={spawner_identifier}"))); + assert!(output.contains("telemetry=unavailable")); + + manager.unregister_spawner(spawner_identifier).unwrap(); +} + +#[test(task_path = crate)] +async fn test_executor_stats_device_removed_on_unregister() { + let manager = get_instance(); + let task = manager.get_current_task_identifier().await; + let current_spawner_identifier = manager.get_spawner(task).await.unwrap(); + let spawner = { + let inner = manager.0.read().await; + *inner.spawners.get(¤t_spawner_identifier).unwrap() + }; + + let executor = Box::leak(Box::new(TestExecutorWithStatistics { + spawner, + snapshot: Some(crate::ExecutorStatisticsSnapshot::new(4_000, 6_000)), + })); + + let spawner_identifier = manager.register_executor(executor).unwrap(); + + let before_unregister = read_executor_stats_device(spawner_identifier).await; + assert!(before_unregister.is_ok()); + + manager.unregister_spawner(spawner_identifier).unwrap(); + + let after_unregister = read_executor_stats_device(spawner_identifier).await; + assert!(after_unregister.is_err()); +} diff --git a/modules/task/task_macros/src/lib.rs b/modules/task/task_macros/src/lib.rs index 3f20d4ae..b0ffd9a8 100644 --- a/modules/task/task_macros/src/lib.rs +++ b/modules/task/task_macros/src/lib.rs @@ -140,7 +140,10 @@ pub fn test(arguments: TokenStream, input: TokenStream) -> TokenStream { let manager = #task_path::initialize(); unsafe { - __SPAWNER = manager.register_spawner(Spawner).expect("Failed to register spawner"); + let _ = Spawner; + __SPAWNER = manager + .register_spawner_with_executor(Spawner, None) + .expect("Failed to register spawner"); } #task_path::block_on(async move { @@ -270,7 +273,10 @@ pub fn run(arguments: TokenStream, input: TokenStream) -> TokenStream { let manager = #task_path::initialize(); unsafe { - __SPAWNER = manager.register_spawner(Spawner).expect("Failed to register spawner"); + let _ = Spawner; + __SPAWNER = manager + .register_spawner_with_executor(Spawner, None) + .expect("Failed to register spawner"); } #task_path::block_on(async move {