Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ cliclack = "0.4.0"
console = "0.16.2"
ctrlc = "3.5.0"
ecmascript_atomics = { version = "0.2.3" }
ecmascript_futex = { version = "0.1.0" }
fast-float = "0.2.0"
hashbrown = "0.16.1"
lexical = { version = "7.0.5", default-features = false, features = [
Expand Down
3 changes: 1 addition & 2 deletions nova_vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ categories.workspace = true
[dependencies]
ahash = { workspace = true }
ecmascript_atomics = { workspace = true, optional = true }
ecmascript_futex = { workspace = true, optional = true }
fast-float = { workspace = true }
hashbrown = { workspace = true }
lexical = { workspace = true }
Expand Down Expand Up @@ -61,7 +60,7 @@ atomics = [
"array-buffer",
"shared-array-buffer",
"dep:ecmascript_atomics",
"dep:ecmascript_futex",

]
date = []
json = ["dep:sonic-rs"]
Expand Down
197 changes: 107 additions & 90 deletions nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,29 @@
use std::{
hint::assert_unchecked,
ops::ControlFlow,
sync::{Arc, atomic::AtomicBool},
sync::{
Arc,
atomic::{AtomicBool, Ordering as StdOrdering},
},
thread::{self, JoinHandle},
time::Duration,
};

use ecmascript_atomics::Ordering;
use ecmascript_futex::{ECMAScriptAtomicWait, FutexError};

use crate::{
ecmascript::{
Agent, AnyArrayBuffer, AnyTypedArray, ArgumentsList, BUILTIN_STRING_MEMORY, Behaviour,
BigInt, Builtin, ExceptionType, InnerJob, Job, JsResult, Number, Numeric, OrdinaryObject,
Promise, PromiseCapability, Realm, SharedArrayBuffer, SharedDataBlock, SharedTypedArray,
String, TryError, TryResult, TypedArrayAbstractOperations,
TypedArrayWithBufferWitnessRecords, Value, builders::OrdinaryObjectBuilder,
compare_exchange_in_buffer, for_any_typed_array, get_modify_set_value_in_buffer,
get_value_from_buffer, make_typed_array_with_buffer_witness_record,
number_convert_to_integer_or_infinity, set_value_in_buffer, to_big_int, to_big_int64,
to_big_int64_big_int, to_index, to_int32, to_int32_number, to_integer_number_or_infinity,
to_integer_or_infinity, to_number, try_result_into_js, try_to_index, unwrap_try,
validate_index, validate_typed_array,
TypedArrayWithBufferWitnessRecords, Value, WaitResult, WaiterRecord,
builders::OrdinaryObjectBuilder, compare_exchange_in_buffer, for_any_typed_array,
get_modify_set_value_in_buffer, get_value_from_buffer,
make_typed_array_with_buffer_witness_record, number_convert_to_integer_or_infinity,
set_value_in_buffer, to_big_int, to_big_int64, to_big_int64_big_int, to_index, to_int32,
to_int32_number, to_integer_number_or_infinity, to_integer_or_infinity, to_number,
try_result_into_js, try_to_index, unwrap_try, validate_index, validate_typed_array,
},
engine::{Bindable, GcScope, Global, NoGcScope, Scopable},
heap::{ObjectEntry, WellKnownSymbols},
Expand Down Expand Up @@ -666,36 +668,34 @@ impl AtomicsObject {
};
// 6. Let block be buffer.[[ArrayBufferData]].
// 8. Let WL be GetWaiterList(block, byteIndexInBuffer).
let is_big_int_64_array = matches!(typed_array, AnyTypedArray::SharedBigInt64Array(_));
let slot = buffer.as_slice(agent).slice_from(byte_index_in_buffer);
let n = if is_big_int_64_array {
// SAFETY: offset was checked.
let slot = unsafe { slot.as_aligned::<u64>().unwrap_unchecked() };
if c == usize::MAX {
// Force the notify count down into a reasonable range: the
// ecmascript_futex may return usize::MAX if the OS doesn't
// give us a count number.
slot.notify_all().min(i32::MAX as usize)
} else {
slot.notify_many(c)
}
} else {
// SAFETY: offset was checked.
let slot = unsafe { slot.as_aligned::<u32>().unwrap_unchecked() };
if c == usize::MAX {
// Force the notify count down into a reasonable range: the
// ecmascript_futex may return usize::MAX if the OS doesn't
// give us a count number.
slot.notify_all().min(i32::MAX as usize)
} else {
slot.notify_many(c)
}
};
let data_block = buffer.get_data_block(agent);
// 9. Perform EnterCriticalSection(WL).
// SAFETY: buffer is a valid SharedArrayBuffer and cannot be detached. A 0-sized SAB has a
// dangling data block with no backing allocation, but `get_waiters` returns `None` in that case.
let mut n = 0;
let Some(waiters) = (unsafe { data_block.get_waiters() }) else {
return Ok(0.into());
};

let mut guard = waiters.lock().unwrap();
// 10. Let S be RemoveWaiters(WL, c).
let Some(list) = guard.get_list_mut(byte_index_in_buffer) else {
return Ok(0.into());
};

// 11. For each element W of S, do
// a. Perform NotifyWaiter(WL, W).
while n < c {
let Some(w) = list.pop() else {
break;
};
// a. Perform NotifyWaiter(WL, W).
w.notify_waiters();
n += 1;
}

// 12. Perform LeaveCriticalSection(WL).
drop(guard);

// 13. Let n be the number of elements in S.
// 14. Return 𝔽(n).
Ok(Number::from_usize(agent, n, gc).into())
Expand Down Expand Up @@ -1486,37 +1486,47 @@ fn do_wait_critical<'gc, const IS_ASYNC: bool, const IS_I64: bool>(
// 28. Perform AddWaiter(WL, waiterRecord).
// 29. If mode is sync, then
if !IS_ASYNC {
// a. Perform SuspendThisAgent(WL, waiterRecord).
let result = if IS_I64 {
let v = v as u64;
// SAFETY: buffer is still live and index was checked.
let data_block = buffer.get_data_block(agent);
// SAFETY: buffer is a valid SharedArrayBuffer and cannot be detached. A 0-sized SAB would
// have a dangling data block, but Atomics.wait requires `byteIndex` to be within bounds,
// so a 0-sized SAB would have been rejected earlier with a RangeError.
let waiters = unsafe { data_block.get_or_init_waiters() };
let waiter_record = WaiterRecord::new_shared();
let mut guard = waiters.lock().unwrap();

// Re-read value under critical section to avoid TOCTOU race.
let slot = data_block.as_racy_slice().slice_from(byte_index_in_buffer);
let v_changed = if IS_I64 {
let slot = unsafe { slot.as_aligned::<u64>().unwrap_unchecked() };
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
}
v as u64 != slot.load(Ordering::SeqCst)
} else {
let v = v as u32;
// SAFETY: buffer is still live and index was checked.
let slot = unsafe { slot.as_aligned::<u32>().unwrap_unchecked() };
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
}
v as i32 as u32 != slot.load(Ordering::SeqCst)
};
// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
if v_changed {
return BUILTIN_STRING_MEMORY.not_equal.into();
}

match result {
Ok(_) => BUILTIN_STRING_MEMORY.ok.into(),
Err(err) => match err {
FutexError::Timeout => BUILTIN_STRING_MEMORY.timed_out.into(),
FutexError::NotEqual => BUILTIN_STRING_MEMORY.not_equal.into(),
FutexError::Unknown => panic!(),
},
// a. Perform SuspendThisAgent(WL, waiterRecord).
guard.push_to_list(byte_index_in_buffer, waiter_record.clone());

if t == u64::MAX {
waiter_record.wait(guard);
} else {
let dur = Duration::from_millis(t);
let (new_guard, timeout) = waiter_record.wait_timeout(guard, dur);
guard = new_guard;
if timeout.timed_out() {
guard.remove_from_list(byte_index_in_buffer, waiter_record);

// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
return BUILTIN_STRING_MEMORY.timed_out.into();
}
}
// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
BUILTIN_STRING_MEMORY.ok.into()
} else {
let promise_capability = PromiseCapability::new(agent, gc);
let promise = Global::new(agent, promise_capability.promise.unbind());
Expand Down Expand Up @@ -1623,7 +1633,7 @@ fn create_wait_result_object<'gc>(
#[derive(Debug)]
struct WaitAsyncJobInner {
promise_to_resolve: Global<Promise<'static>>,
join_handle: JoinHandle<Result<(), FutexError>>,
join_handle: JoinHandle<WaitResult>,
_has_timeout: bool,
}

Expand Down Expand Up @@ -1662,18 +1672,8 @@ impl WaitAsyncJob {
// c. Perform LeaveCriticalSection(WL).
let promise_capability = PromiseCapability::from_promise(promise, true);
let result = match result {
Ok(_) => BUILTIN_STRING_MEMORY.ok.into(),
Err(FutexError::NotEqual) => BUILTIN_STRING_MEMORY.ok.into(),
Err(FutexError::Timeout) => BUILTIN_STRING_MEMORY.timed_out.into(),
Err(FutexError::Unknown) => {
let error = agent.throw_exception_with_static_message(
ExceptionType::Error,
"unknown error occurred",
gc,
);
promise_capability.reject(agent, error.value(), gc);
return Ok(());
}
WaitResult::Ok | WaitResult::NotEqual => BUILTIN_STRING_MEMORY.ok.into(),
WaitResult::TimedOut => BUILTIN_STRING_MEMORY.timed_out.into(),
};
unwrap_try(promise_capability.try_resolve(agent, result, gc));
// d. Return unused.
Expand Down Expand Up @@ -1701,28 +1701,45 @@ fn enqueue_atomics_wait_async_job<const IS_I64: bool>(
let signal = Arc::new(AtomicBool::new(false));
let s = signal.clone();
let handle = thread::spawn(move || {
// SAFETY: buffer is a cloned SharedDataBlock; non-dangling.
let waiters = unsafe { buffer.get_or_init_waiters() };
let waiter_record = WaiterRecord::new_shared();
let mut guard = waiters.lock().unwrap();

// Re-check the value under the critical section.
let slot = buffer.as_racy_slice().slice_from(byte_index_in_buffer);
if IS_I64 {
let v = v as u64;
// SAFETY: buffer is still live and index was checked.
let v_not_equal = if IS_I64 {
let slot = unsafe { slot.as_aligned::<u64>().unwrap_unchecked() };
s.store(true, std::sync::atomic::Ordering::Release);
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
}
v as u64 != slot.load(Ordering::SeqCst)
} else {
let v = v as i32 as u32;
// SAFETY: buffer is still live and index was checked.
let slot = unsafe { slot.as_aligned::<u32>().unwrap_unchecked() };
s.store(true, std::sync::atomic::Ordering::Release);
if t == u64::MAX {
slot.wait(v)
} else {
slot.wait_timeout(v, Duration::from_millis(t))
v as i32 as u32 != slot.load(Ordering::SeqCst)
};

// Signal the main thread that we have the lock and are about to sleep.
s.store(true, StdOrdering::Release);

if v_not_equal {
return WaitResult::NotEqual;
}

guard.push_to_list(byte_index_in_buffer, waiter_record.clone());

if t == u64::MAX {
waiter_record.wait(guard);
} else {
let dur = Duration::from_millis(t);
let (new_guard, timeout) = waiter_record.wait_timeout(guard, dur);
guard = new_guard;
if timeout.timed_out() {
guard.remove_from_list(byte_index_in_buffer, waiter_record);

// 31. Perform LeaveCriticalSection(WL).
// 32. If mode is sync, return waiterRecord.[[Result]].
return WaitResult::TimedOut;
}
}
WaitResult::Ok
});
let wait_async_job = Job {
realm: Some(Global::new(agent, agent.current_realm(gc).unbind())),
Expand All @@ -1732,7 +1749,7 @@ fn enqueue_atomics_wait_async_job<const IS_I64: bool>(
_has_timeout: t != u64::MAX,
}))),
};
while !signal.load(std::sync::atomic::Ordering::Acquire) {
while !signal.load(StdOrdering::Acquire) {
// Wait until the thread has started up and is about to go to sleep.
}
// 2. Let now be the time value (UTC) identifying the current time.
Expand Down
Loading