Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions drivers/std/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,102 @@
use embassy_executor::{Spawner, raw};
use file_system::DirectBaseOperations;
use std::boxed::Box;
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::{Condvar, Mutex};
use synchronization::blocking_mutex::raw::CriticalSectionRawMutex;
use synchronization::signal::Signal;
use task::SpawnerIdentifier;
use task::{ExecutorStatisticsSnapshot, ExecutorWithStatistics, SpawnerIdentifier};

use crate::devices::TimeDevice;

/// Single-threaded std-based executor.
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
signaler: &'static Signaler,
statistics: &'static ExecutorStatistics,
stop: AtomicBool,
}

pub struct ExecutorStatistics {
busy_ticks: AtomicU64,
idle_ticks: AtomicU64,
}

impl ExecutorStatistics {
const fn new() -> Self {
Self {
busy_ticks: AtomicU64::new(0),
idle_ticks: AtomicU64::new(0),
}
}

fn record_busy(&self, elapsed_ticks: u64) {
self.busy_ticks
.fetch_add(elapsed_ticks, std::sync::atomic::Ordering::Relaxed);
}

fn record_idle(&self, elapsed_ticks: u64) {
self.idle_ticks
.fetch_add(elapsed_ticks, std::sync::atomic::Ordering::Relaxed);
}
}

fn get_current_ticks_from_time_device() -> u64 {
let mut current_time = core::time::Duration::default();

let current_time_raw = unsafe {
core::slice::from_raw_parts_mut(
&mut current_time as *mut core::time::Duration as *mut u8,
core::mem::size_of::<core::time::Duration>(),
)
};

if TimeDevice.read(current_time_raw, 0).is_err() {
return 0;
}

current_time.as_nanos() as u64
}

impl ExecutorStatistics {
fn snapshot(&self) -> ExecutorStatisticsSnapshot {
ExecutorStatisticsSnapshot {
busy_ticks: self.busy_ticks.load(std::sync::atomic::Ordering::Relaxed),
idle_ticks: self.idle_ticks.load(std::sync::atomic::Ordering::Relaxed),
}
}
}

impl Default for Executor {
fn default() -> Self {
Self::new()
}
}

impl ExecutorWithStatistics for Executor {
fn spawner(&'static self) -> Spawner {
Executor::spawner(self)
}

fn statistics_snapshot(&self) -> Option<ExecutorStatisticsSnapshot> {
Some(self.statistics.snapshot())
}
}

impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
let signaler = Box::leak(Box::new(Signaler::new()));
let statistics = Box::leak(Box::new(ExecutorStatistics::new()));

Self {
inner: raw::Executor::new(signaler as *mut Signaler as *mut ()),
not_send: PhantomData,
signaler,
statistics,
stop: AtomicBool::new(false),
}
}
Expand Down Expand Up @@ -69,17 +137,35 @@ impl Executor {
init(self.inner.spawner(), self);

while !self.stop.load(std::sync::atomic::Ordering::SeqCst) {
let poll_started = get_current_ticks_from_time_device();
unsafe { self.inner.poll() };
let poll_ended = get_current_ticks_from_time_device();
let poll_elapsed = poll_ended.saturating_sub(poll_started);
self.statistics.record_busy(poll_elapsed);

let wait_started = get_current_ticks_from_time_device();
self.signaler.wait();
let wait_ended = get_current_ticks_from_time_device();
let wait_elapsed = wait_ended.saturating_sub(wait_started);
self.statistics.record_idle(wait_elapsed);
}
}

pub fn start(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
pub fn start(&'static self, init: impl FnOnce(Spawner)) -> ! {
init(self.inner.spawner());

loop {
let poll_started = get_current_ticks_from_time_device();
unsafe { self.inner.poll() };
self.signaler.wait()
let poll_ended = get_current_ticks_from_time_device();
let poll_elapsed = poll_ended.saturating_sub(poll_started);
self.statistics.record_busy(poll_elapsed);

let wait_started = get_current_ticks_from_time_device();
self.signaler.wait();
let wait_ended = get_current_ticks_from_time_device();
let wait_elapsed = wait_ended.saturating_sub(wait_started);
self.statistics.record_idle(wait_elapsed);
}
}
}
Expand Down Expand Up @@ -138,9 +224,13 @@ pub async fn new_thread_executor() -> SpawnerIdentifier {
std::thread::spawn(move || {
// Use Box::leak to create a 'static reference for this thread's executor
let executor = Box::leak(Box::new(Executor::new()));
let executor_ref: &'static Executor = executor;

executor.start(move |spawner: Spawner| {
let spawner_id = task_manager.register_spawner(spawner).unwrap();
let _ = executor_ref;
let spawner_id = task_manager
.register_spawner_with_executor(spawner, Some(executor_ref))
.unwrap();

signal.signal(spawner_id);
});
Expand Down
165 changes: 164 additions & 1 deletion drivers/wasm/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,167 @@
pub use embassy_executor::Executor;
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::ptr;
use core::sync::atomic::{AtomicU64, Ordering};

use embassy_executor::{Spawner, raw};
use file_system::DirectBaseOperations;
use js_sys::Promise;
use task::{ExecutorStatisticsSnapshot, ExecutorWithStatistics};
use wasm_bindgen::prelude::*;
extern crate alloc;
use alloc::boxed::Box;

use crate::devices::TimeDevice;

#[unsafe(export_name = "__pender")]
fn __pender(context: *mut ()) {
let signaler: &'static WasmContext = unsafe { core::mem::transmute(context) };
let _ = signaler.promise.then(unsafe { signaler.closure.as_mut() });
}

struct UninitCell<T>(MaybeUninit<core::cell::UnsafeCell<T>>);

impl<T> UninitCell<T> {
const fn uninit() -> Self {
Self(core::mem::MaybeUninit::uninit())
}

unsafe fn as_mut_ptr(&self) -> *mut T {
unsafe { (*self.0.as_ptr()).get() }
}

unsafe fn write_in_place(&self, func: impl FnOnce() -> T) {
unsafe { ptr::write(self.as_mut_ptr(), func()) };
}

unsafe fn as_mut(&self) -> &mut T {
unsafe { &mut *self.as_mut_ptr() }
}
}

unsafe impl<T> Sync for UninitCell<T> {}

struct WasmContext {
promise: Promise,
closure: UninitCell<Closure<dyn FnMut(JsValue)>>,
}

impl WasmContext {
fn new() -> Self {
Self {
promise: Promise::resolve(&JsValue::undefined()),
closure: UninitCell::uninit(),
}
}
}

pub struct Executor {
inner: raw::Executor,
ctx: &'static WasmContext,
statistics: &'static ExecutorStatistics,
not_send: PhantomData<*mut ()>,
}

pub struct ExecutorStatistics {
busy_ticks: AtomicU64,
idle_ticks: AtomicU64,
last_poll_end_ticks: AtomicU64,
}

impl ExecutorStatistics {
const fn new() -> Self {
Self {
busy_ticks: AtomicU64::new(0),
idle_ticks: AtomicU64::new(0),
last_poll_end_ticks: AtomicU64::new(0),
}
}

fn record_poll(&self, poll_start_ticks: u64, poll_end_ticks: u64) {
let previous_poll_end = self
.last_poll_end_ticks
.swap(poll_end_ticks, Ordering::Relaxed);

if previous_poll_end > 0 && poll_start_ticks > previous_poll_end {
self.idle_ticks
.fetch_add(poll_start_ticks - previous_poll_end, Ordering::Relaxed);
}

if poll_end_ticks > poll_start_ticks {
self.busy_ticks
.fetch_add(poll_end_ticks - poll_start_ticks, Ordering::Relaxed);
}
}
}

impl ExecutorStatistics {
fn snapshot(&self) -> ExecutorStatisticsSnapshot {
ExecutorStatisticsSnapshot {
busy_ticks: self.busy_ticks.load(Ordering::Relaxed),
idle_ticks: self.idle_ticks.load(Ordering::Relaxed),
}
}
}

impl Executor {
pub fn new() -> Self {
let ctx = Box::leak(Box::new(WasmContext::new()));
let statistics = Box::leak(Box::new(ExecutorStatistics::new()));

Self {
inner: raw::Executor::new(ctx as *mut WasmContext as *mut ()),
ctx,
statistics,
not_send: PhantomData,
}
}

pub fn start(&'static self, init: impl FnOnce(Spawner)) {
unsafe {
let executor = &self.inner;
let statistics = self.statistics;
let future = Closure::new(move |_| {
let poll_start = get_current_ticks_from_time_device();
executor.poll();
let poll_end = get_current_ticks_from_time_device();
statistics.record_poll(poll_start, poll_end);
});
self.ctx.closure.write_in_place(|| future);
init(self.inner.spawner());
}
}

pub fn spawner(&'static self) -> Spawner {
self.inner.spawner()
}
}

impl ExecutorWithStatistics for Executor {
fn spawner(&'static self) -> Spawner {
Executor::spawner(self)
}

fn statistics_snapshot(&self) -> Option<ExecutorStatisticsSnapshot> {
Some(self.statistics.snapshot())
}
}

fn get_current_ticks_from_time_device() -> u64 {
let mut current_time = core::time::Duration::default();

let current_time_raw = unsafe {
core::slice::from_raw_parts_mut(
&mut current_time as *mut core::time::Duration as *mut u8,
core::mem::size_of::<core::time::Duration>(),
)
};

if TimeDevice.read(current_time_raw, 0).is_err() {
return 0;
}

current_time.as_nanos() as u64
}

#[macro_export]
macro_rules! instantiate_static_executor {
Expand Down
32 changes: 32 additions & 0 deletions modules/task/src/executor_statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct ExecutorStatisticsSnapshot {
pub busy_ticks: u64,
pub idle_ticks: u64,
}

impl ExecutorStatisticsSnapshot {
pub const fn new(busy_ticks: u64, idle_ticks: u64) -> Self {
Self {
busy_ticks,
idle_ticks,
}
}

pub fn idle_ratio_basis_points(&self) -> u16 {
let total = self.busy_ticks.saturating_add(self.idle_ticks);

if total == 0 {
return 0;
}

let ratio = self.idle_ticks.saturating_mul(10_000) / total;

ratio.min(10_000) as u16
}
}

pub trait ExecutorWithStatistics {
fn spawner(&'static self) -> embassy_executor::Spawner;

fn statistics_snapshot(&self) -> Option<ExecutorStatisticsSnapshot>;
}
2 changes: 2 additions & 0 deletions modules/task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate std;

mod environment_variable;
mod error;
mod executor_statistics;
mod join_handle;
mod manager;
mod signal;
Expand All @@ -18,6 +19,7 @@ use embassy_time::Timer;
pub use embassy_executor;
pub use environment_variable::*;
pub use error::*;
pub use executor_statistics::*;
pub use join_handle::*;
pub use manager::*;
pub use signal::*;
Expand Down
3 changes: 3 additions & 0 deletions modules/task/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use spawner::*;

// Manager module - core Manager structure and initialization

use crate::ExecutorWithStatistics;
use crate::manager::Metadata;

use alloc::collections::BTreeMap;
Expand All @@ -42,6 +43,7 @@ pub(crate) struct Inner {
pub(crate) tasks: BTreeMap<TaskIdentifier, Metadata>,
pub(crate) identifiers: BTreeMap<usize, TaskIdentifier>,
pub(crate) spawners: BTreeMap<usize, ::embassy_executor::Spawner>,
pub(crate) executors: BTreeMap<usize, Option<&'static dyn ExecutorWithStatistics>>,
}

unsafe impl Send for Manager {}
Expand All @@ -61,6 +63,7 @@ impl Manager {
tasks: BTreeMap::new(),
identifiers: BTreeMap::new(),
spawners: BTreeMap::new(),
executors: BTreeMap::new(),
}))
}
}
Loading
Loading