diff --git a/Cargo.toml b/Cargo.toml index 3cc23aa71..376aa6ad5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ cliclack = "0.4.0" console = "0.16.2" ctrlc = "3.5.0" ecmascript_atomics = { version = "0.2.3" } -ecmascript_futex = { version = "0.1.0" } fast-float = "0.2.0" hashbrown = "0.16.1" lexical = { version = "7.0.5", default-features = false, features = [ diff --git a/nova_vm/Cargo.toml b/nova_vm/Cargo.toml index e4842090a..9945aad9e 100644 --- a/nova_vm/Cargo.toml +++ b/nova_vm/Cargo.toml @@ -14,7 +14,6 @@ categories.workspace = true [dependencies] ahash = { workspace = true } ecmascript_atomics = { workspace = true, optional = true } -ecmascript_futex = { workspace = true, optional = true } fast-float = { workspace = true } hashbrown = { workspace = true } lexical = { workspace = true } @@ -61,7 +60,7 @@ atomics = [ "array-buffer", "shared-array-buffer", "dep:ecmascript_atomics", - "dep:ecmascript_futex", + ] date = [] json = ["dep:sonic-rs"] diff --git a/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs b/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs index 1736ec2d3..506817463 100644 --- a/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs +++ b/nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs @@ -5,13 +5,15 @@ use std::{ hint::assert_unchecked, ops::ControlFlow, - sync::{Arc, atomic::AtomicBool}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering as StdOrdering}, + }, thread::{self, JoinHandle}, time::Duration, }; use ecmascript_atomics::Ordering; -use ecmascript_futex::{ECMAScriptAtomicWait, FutexError}; use crate::{ ecmascript::{ @@ -19,13 +21,13 @@ use crate::{ BigInt, Builtin, ExceptionType, InnerJob, Job, JsResult, Number, Numeric, OrdinaryObject, Promise, PromiseCapability, Realm, SharedArrayBuffer, SharedDataBlock, SharedTypedArray, String, TryError, TryResult, TypedArrayAbstractOperations, - TypedArrayWithBufferWitnessRecords, Value, builders::OrdinaryObjectBuilder, - compare_exchange_in_buffer, for_any_typed_array, get_modify_set_value_in_buffer, - get_value_from_buffer, make_typed_array_with_buffer_witness_record, - number_convert_to_integer_or_infinity, set_value_in_buffer, to_big_int, to_big_int64, - to_big_int64_big_int, to_index, to_int32, to_int32_number, to_integer_number_or_infinity, - to_integer_or_infinity, to_number, try_result_into_js, try_to_index, unwrap_try, - validate_index, validate_typed_array, + TypedArrayWithBufferWitnessRecords, Value, WaitResult, WaiterRecord, + builders::OrdinaryObjectBuilder, compare_exchange_in_buffer, for_any_typed_array, + get_modify_set_value_in_buffer, get_value_from_buffer, + make_typed_array_with_buffer_witness_record, number_convert_to_integer_or_infinity, + set_value_in_buffer, to_big_int, to_big_int64, to_big_int64_big_int, to_index, to_int32, + to_int32_number, to_integer_number_or_infinity, to_integer_or_infinity, to_number, + try_result_into_js, try_to_index, unwrap_try, validate_index, validate_typed_array, }, engine::{Bindable, GcScope, Global, NoGcScope, Scopable}, heap::{ObjectEntry, WellKnownSymbols}, @@ -666,36 +668,34 @@ impl AtomicsObject { }; // 6. Let block be buffer.[[ArrayBufferData]]. // 8. Let WL be GetWaiterList(block, byteIndexInBuffer). - let is_big_int_64_array = matches!(typed_array, AnyTypedArray::SharedBigInt64Array(_)); - let slot = buffer.as_slice(agent).slice_from(byte_index_in_buffer); - let n = if is_big_int_64_array { - // SAFETY: offset was checked. - let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if c == usize::MAX { - // Force the notify count down into a reasonable range: the - // ecmascript_futex may return usize::MAX if the OS doesn't - // give us a count number. - slot.notify_all().min(i32::MAX as usize) - } else { - slot.notify_many(c) - } - } else { - // SAFETY: offset was checked. - let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if c == usize::MAX { - // Force the notify count down into a reasonable range: the - // ecmascript_futex may return usize::MAX if the OS doesn't - // give us a count number. - slot.notify_all().min(i32::MAX as usize) - } else { - slot.notify_many(c) - } - }; + let data_block = buffer.get_data_block(agent); // 9. Perform EnterCriticalSection(WL). + // SAFETY: buffer is a valid SharedArrayBuffer and cannot be detached. A 0-sized SAB has a + // dangling data block with no backing allocation, but `get_waiters` returns `None` in that case. + let mut n = 0; + let Some(waiters) = (unsafe { data_block.get_waiters() }) else { + return Ok(0.into()); + }; + + let mut guard = waiters.lock().unwrap(); // 10. Let S be RemoveWaiters(WL, c). + let Some(list) = guard.get_list_mut(byte_index_in_buffer) else { + return Ok(0.into()); + }; + // 11. For each element W of S, do - // a. Perform NotifyWaiter(WL, W). + while n < c { + let Some(w) = list.pop() else { + break; + }; + // a. Perform NotifyWaiter(WL, W). + w.notify_waiters(); + n += 1; + } + // 12. Perform LeaveCriticalSection(WL). + drop(guard); + // 13. Let n be the number of elements in S. // 14. Return 𝔽(n). Ok(Number::from_usize(agent, n, gc).into()) @@ -1486,37 +1486,47 @@ fn do_wait_critical<'gc, const IS_ASYNC: bool, const IS_I64: bool>( // 28. Perform AddWaiter(WL, waiterRecord). // 29. If mode is sync, then if !IS_ASYNC { - // a. Perform SuspendThisAgent(WL, waiterRecord). - let result = if IS_I64 { - let v = v as u64; - // SAFETY: buffer is still live and index was checked. + let data_block = buffer.get_data_block(agent); + // SAFETY: buffer is a valid SharedArrayBuffer and cannot be detached. A 0-sized SAB would + // have a dangling data block, but Atomics.wait requires `byteIndex` to be within bounds, + // so a 0-sized SAB would have been rejected earlier with a RangeError. + let waiters = unsafe { data_block.get_or_init_waiters() }; + let waiter_record = WaiterRecord::new_shared(); + let mut guard = waiters.lock().unwrap(); + + // Re-read value under critical section to avoid TOCTOU race. + let slot = data_block.as_racy_slice().slice_from(byte_index_in_buffer); + let v_changed = if IS_I64 { let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) - } + v as u64 != slot.load(Ordering::SeqCst) } else { - let v = v as u32; - // SAFETY: buffer is still live and index was checked. let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) - } + v as i32 as u32 != slot.load(Ordering::SeqCst) }; - // 31. Perform LeaveCriticalSection(WL). - // 32. If mode is sync, return waiterRecord.[[Result]]. + if v_changed { + return BUILTIN_STRING_MEMORY.not_equal.into(); + } - match result { - Ok(_) => BUILTIN_STRING_MEMORY.ok.into(), - Err(err) => match err { - FutexError::Timeout => BUILTIN_STRING_MEMORY.timed_out.into(), - FutexError::NotEqual => BUILTIN_STRING_MEMORY.not_equal.into(), - FutexError::Unknown => panic!(), - }, + // a. Perform SuspendThisAgent(WL, waiterRecord). + guard.push_to_list(byte_index_in_buffer, waiter_record.clone()); + + if t == u64::MAX { + waiter_record.wait(guard); + } else { + let dur = Duration::from_millis(t); + let (new_guard, timeout) = waiter_record.wait_timeout(guard, dur); + guard = new_guard; + if timeout.timed_out() { + guard.remove_from_list(byte_index_in_buffer, waiter_record); + + // 31. Perform LeaveCriticalSection(WL). + // 32. If mode is sync, return waiterRecord.[[Result]]. + return BUILTIN_STRING_MEMORY.timed_out.into(); + } } + // 31. Perform LeaveCriticalSection(WL). + // 32. If mode is sync, return waiterRecord.[[Result]]. + BUILTIN_STRING_MEMORY.ok.into() } else { let promise_capability = PromiseCapability::new(agent, gc); let promise = Global::new(agent, promise_capability.promise.unbind()); @@ -1623,7 +1633,7 @@ fn create_wait_result_object<'gc>( #[derive(Debug)] struct WaitAsyncJobInner { promise_to_resolve: Global>, - join_handle: JoinHandle>, + join_handle: JoinHandle, _has_timeout: bool, } @@ -1662,18 +1672,8 @@ impl WaitAsyncJob { // c. Perform LeaveCriticalSection(WL). let promise_capability = PromiseCapability::from_promise(promise, true); let result = match result { - Ok(_) => BUILTIN_STRING_MEMORY.ok.into(), - Err(FutexError::NotEqual) => BUILTIN_STRING_MEMORY.ok.into(), - Err(FutexError::Timeout) => BUILTIN_STRING_MEMORY.timed_out.into(), - Err(FutexError::Unknown) => { - let error = agent.throw_exception_with_static_message( - ExceptionType::Error, - "unknown error occurred", - gc, - ); - promise_capability.reject(agent, error.value(), gc); - return Ok(()); - } + WaitResult::Ok | WaitResult::NotEqual => BUILTIN_STRING_MEMORY.ok.into(), + WaitResult::TimedOut => BUILTIN_STRING_MEMORY.timed_out.into(), }; unwrap_try(promise_capability.try_resolve(agent, result, gc)); // d. Return unused. @@ -1701,28 +1701,45 @@ fn enqueue_atomics_wait_async_job( let signal = Arc::new(AtomicBool::new(false)); let s = signal.clone(); let handle = thread::spawn(move || { + // SAFETY: buffer is a cloned SharedDataBlock; non-dangling. + let waiters = unsafe { buffer.get_or_init_waiters() }; + let waiter_record = WaiterRecord::new_shared(); + let mut guard = waiters.lock().unwrap(); + + // Re-check the value under the critical section. let slot = buffer.as_racy_slice().slice_from(byte_index_in_buffer); - if IS_I64 { - let v = v as u64; - // SAFETY: buffer is still live and index was checked. + let v_not_equal = if IS_I64 { let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - s.store(true, std::sync::atomic::Ordering::Release); - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) - } + v as u64 != slot.load(Ordering::SeqCst) } else { - let v = v as i32 as u32; - // SAFETY: buffer is still live and index was checked. let slot = unsafe { slot.as_aligned::().unwrap_unchecked() }; - s.store(true, std::sync::atomic::Ordering::Release); - if t == u64::MAX { - slot.wait(v) - } else { - slot.wait_timeout(v, Duration::from_millis(t)) + v as i32 as u32 != slot.load(Ordering::SeqCst) + }; + + // Signal the main thread that we have the lock and are about to sleep. + s.store(true, StdOrdering::Release); + + if v_not_equal { + return WaitResult::NotEqual; + } + + guard.push_to_list(byte_index_in_buffer, waiter_record.clone()); + + if t == u64::MAX { + waiter_record.wait(guard); + } else { + let dur = Duration::from_millis(t); + let (new_guard, timeout) = waiter_record.wait_timeout(guard, dur); + guard = new_guard; + if timeout.timed_out() { + guard.remove_from_list(byte_index_in_buffer, waiter_record); + + // 31. Perform LeaveCriticalSection(WL). + // 32. If mode is sync, return waiterRecord.[[Result]]. + return WaitResult::TimedOut; } } + WaitResult::Ok }); let wait_async_job = Job { realm: Some(Global::new(agent, agent.current_realm(gc).unbind())), @@ -1732,7 +1749,7 @@ fn enqueue_atomics_wait_async_job( _has_timeout: t != u64::MAX, }))), }; - while !signal.load(std::sync::atomic::Ordering::Acquire) { + while !signal.load(StdOrdering::Acquire) { // Wait until the thread has started up and is about to go to sleep. } // 2. Let now be the time value (UTC) identifying the current time. diff --git a/nova_vm/src/ecmascript/types/spec/data_block.rs b/nova_vm/src/ecmascript/types/spec/data_block.rs index 3d6738abc..77fa0214f 100644 --- a/nova_vm/src/ecmascript/types/spec/data_block.rs +++ b/nova_vm/src/ecmascript/types/spec/data_block.rs @@ -5,9 +5,15 @@ //! ### [6.2.9 Data Blocks](https://tc39.es/ecma262/#sec-data-blocks) #[cfg(feature = "shared-array-buffer")] -use core::sync::atomic::{AtomicUsize, Ordering}; -#[cfg(feature = "shared-array-buffer")] -use std::hint::assert_unchecked; +use std::{ + collections::hash_map::Entry, + hint::assert_unchecked, + sync::{ + Arc, Condvar, Mutex, MutexGuard, WaitTimeoutResult, + atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}, + }, + time::Duration, +}; use core::{ f32, f64, @@ -419,6 +425,127 @@ impl SharedDataBlockMaxByteLength { } } +#[cfg(feature = "shared-array-buffer")] +#[derive(Default)] +pub(crate) struct WaiterRecord { + condvar: Condvar, + notified: AtomicBool, +} + +#[cfg(feature = "shared-array-buffer")] +impl WaiterRecord { + pub(crate) fn new_shared() -> Arc { + Arc::new(Self::default()) + } + + pub(crate) fn notify_waiters(self: Arc) { + self.notified + .store(true, std::sync::atomic::Ordering::Relaxed); + self.condvar.notify_all(); + } + + pub(crate) fn wait<'a, T>(self: &Arc, guard: MutexGuard<'a, T>) { + let lock_result = self + .condvar + .wait_while(guard, |_| !self.notified.load(Ordering::Relaxed)); + match lock_result { + Ok(_) => (), + Err(e) => panic!( + "Another thread panicked while holding the waiter list lock, poisoning it: {e:?}" + ), + } + } + + pub(crate) fn wait_timeout<'a, T>( + self: &Arc, + guard: MutexGuard<'a, T>, + dur: Duration, + ) -> (MutexGuard<'a, T>, WaitTimeoutResult) { + let lock_result = self + .condvar + .wait_timeout_while(guard, dur, |_| !self.notified.load(Ordering::Relaxed)); + + match lock_result { + Ok(result) => result, + Err(e) => panic!( + "Another thread panicked while holding the waiter list lock, poisoning it: {e:?}" + ), + } + } +} + +/// Result of an `Atomics.wait` or `Atomics.waitAsync` operation. +#[derive(Debug)] +#[cfg(feature = "shared-array-buffer")] +pub(crate) enum WaitResult { + Ok, + TimedOut, + NotEqual, +} + +#[cfg(feature = "shared-array-buffer")] +#[derive(Default)] +#[repr(transparent)] +pub(crate) struct WaiterList { + waiters: std::collections::VecDeque>, +} + +impl WaiterList { + pub(crate) fn is_empty(&self) -> bool { + self.waiters.is_empty() + } + + pub(crate) fn pop(&mut self) -> Option> { + self.waiters.pop_front() + } + + pub(crate) fn push(&mut self, w: Arc) { + self.waiters.push_back(w); + } + + pub(crate) fn remove(&mut self, w: Arc) -> bool { + let Some(index) = self + .waiters + .iter() + .enumerate() + .find(|(_, e)| Arc::ptr_eq(e, &w)) + .map(|(i, _)| i) + else { + return false; + }; + self.waiters.remove(index); + true + } +} + +#[cfg(feature = "shared-array-buffer")] +#[repr(transparent)] +#[derive(Default)] +pub(crate) struct WaiterLists { + map: std::collections::HashMap, +} + +impl WaiterLists { + pub(crate) fn get_list_mut(&mut self, index: usize) -> Option<&mut WaiterList> { + self.map.get_mut(&index) + } + + pub(crate) fn push_to_list(&mut self, index: usize, w: Arc) { + self.map.entry(index).or_default().push(w); + } + + pub(crate) fn remove_from_list(&mut self, index: usize, w: Arc) { + match self.map.entry(index) { + Entry::Occupied(mut entry) => { + if entry.get_mut().remove(w) && entry.get().is_empty() { + entry.remove(); + } + } + Entry::Vacant(_) => {} + } + } +} + /// # [6.2.9 Data Blocks](https://tc39.es/ecma262/#sec-data-blocks) /// /// The Shared Data Block specification type is used to describe a distinct and @@ -439,6 +566,7 @@ impl SharedDataBlockMaxByteLength { /// ```rust,ignore /// #[repr(C)] /// struct StaticSharedDataBuffer { +/// waiters: AtomicPtr, /// rc: AtomicUsize, /// bytes: [RacyU8; N], /// } @@ -449,12 +577,18 @@ impl SharedDataBlockMaxByteLength { /// #[repr(C)] /// struct GrowableSharedDataBuffer { /// byte_length: AtomicUsize, +/// waiters: AtomicPtr, /// rc: AtomicUsize, /// bytes: [RacyU8; N], /// } /// ``` /// -/// The `ptr` field points to the start of the `bytes` +/// The `ptr` field points to the start of the `bytes`. +/// +/// The `waiters` pointer is initially null. It is lazily initialized via +/// a compare-and-swap the first time any thread calls `Atomics.wait` or +/// `Atomics.waitAsync` on this block. Its lifetime is managed by the +/// buffer's existing reference count. /// /// Note that the "viewed" byte length of the buffer is defined inside the /// buffer when the SharedDataBlock is growable. @@ -518,8 +652,8 @@ impl Drop for SharedDataBlock { return; } let growable = self.is_growable(); - // SAFETY: SharedDataBlock guarantees we have a AtomicUsize allocated - // before the bytes. + // SAFETY: SharedDataBlock guarantees we have waiters_ptr and rc + // allocated before the bytes. rc is 1 slot before ptr. let rc_ptr = unsafe { self.ptr.as_ptr().cast::().sub(1) }; { // SAFETY: the RC is definitely still allocated, as we haven't @@ -545,24 +679,36 @@ impl Drop for SharedDataBlock { return; } } + + // We are the last holder. Drop the waiter map if it was initialized. + // SAFETY: non-dangling, and we're the sole owner now. + let waiters_ptr = unsafe { self.get_waiters_ptr() }; + let waiters = waiters_ptr.load(Ordering::Acquire); + if !waiters.is_null() { + // SAFETY: the pointer was allocated via Box::into_raw in + // get_or_init_waiters, and we are the last holder. + let _ = unsafe { Box::from_raw(waiters) }; + } + let max_byte_length = self.max_byte_length(); - // SAFETY: if we're here then we're the last holder of the data block. let (size, base_ptr) = if growable { - // This is a growable SharedDataBlock that we're working with here. - // SAFETY: layout guaranteed by type unsafe { ( - max_byte_length - .unchecked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>()), - rc_ptr.sub(1), + max_byte_length.unchecked_add(core::mem::size_of::<( + AtomicUsize, + AtomicUsize, + AtomicUsize, + )>()), + rc_ptr.sub(2), ) } } else { unsafe { ( - max_byte_length.unchecked_add(core::mem::size_of::()), - rc_ptr, + max_byte_length + .unchecked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>()), + rc_ptr.sub(1), ) } }; @@ -570,8 +716,8 @@ impl Drop for SharedDataBlock { // SAFETY: As per the CAS loop on the reference count, we are the only // referrer to the racy memory. We can thus deallocate the ECMAScript // memory; this effectively grows our Rust memory from being just the - // RC and possible byte length value, into also containing the byte - // data. + // Waiters pointer, RC, and possible byte length value, into also + // containing the byte data. let _ = unsafe { memory.exit() }; // SAFETY: layout guaranteed by type. let layout = unsafe { Layout::from_size_align(size, 8).unwrap_unchecked() }; @@ -616,10 +762,10 @@ impl SharedDataBlock { use ecmascript_atomics::RacyMemory; let alloc_size = if growable { // Growable SharedArrayBuffer - size.checked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>())? + size.checked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize, AtomicUsize)>())? } else { // Static SharedArrayBuffer - size.checked_add(core::mem::size_of::())? + size.checked_add(core::mem::size_of::<(AtomicUsize, AtomicUsize)>())? }; let Ok(layout) = Layout::from_size_align(alloc_size, 8) else { return None; @@ -633,23 +779,26 @@ impl SharedDataBlock { // SAFETY: properly allocated, everything is fine. unsafe { base_ptr.write(byte_length) }; // SAFETY: allocation size is - // (AtomicUsize, AtomicUsize, [AtomicU8; max_byte_length]) - unsafe { base_ptr.add(1) } + // (AtomicUsize, AtomicUsize, AtomicUsize, [AtomicU8; max_byte_length]) + // Skip byte_length and waiters to reach rc. + unsafe { base_ptr.add(2) } } else { - base_ptr + // Skip waiters to reach rc. + unsafe { base_ptr.add(1) } }; { // SAFETY: we're the only owner of this data. unsafe { rc_ptr.write(1) }; } - // SAFETY: the pointer is len + usize + + // SAFETY: ptr is past waiters_ptr and rc let ptr = unsafe { rc_ptr.add(1) }; // SAFETY: ptr does point to size bytes of readable and writable // Rust memory. After this call, that memory is deallocated and we // receive a new RacyMemory in its stead. Reads and writes through // it are undefined behaviour. Note though that we still have the - // RC and possible length values before the pointer; those are in - // normal Rust memory. + // Waiters pointer, RC, and possible length values before the + // pointer; those are in normal Rust memory. let ptr = unsafe { RacyMemory::::enter(ptr.cast(), size) }; Some(Self { ptr: ptr.as_slice().into_raw_parts().0, @@ -673,7 +822,7 @@ impl SharedDataBlock { /// /// Must not be a dangling SharedDataBlock. unsafe fn get_rc(&self) -> &AtomicUsize { - // SAFETY: type guarantees layout + // SAFETY: type guarantees layout; rc is 1 slot before ptr. unsafe { self.ptr.as_ptr().cast::().sub(1).as_ref() } } @@ -684,7 +833,7 @@ impl SharedDataBlock { /// Must be a growable, non-dangling SharedDataBlock. unsafe fn get_byte_length(&self) -> &AtomicUsize { // SAFETY: caller guarantees growable; type guarantees layout. - unsafe { self.ptr.as_ptr().cast::().sub(2).as_ref() } + unsafe { self.ptr.as_ptr().cast::().sub(3).as_ref() } } /// Returns the byte length of the SharedArrayBuffer. @@ -731,6 +880,86 @@ impl SharedDataBlock { self.max_byte_length.is_growable() } + /// Get a reference to the atomic waiters pointer. + /// + /// ## Safety + /// + /// Must not be a dangling SharedDataBlock. + unsafe fn get_waiters_ptr(&self) -> &AtomicPtr> { + // SAFETY: type guarantees layout; waiters_ptr is 2 slots before ptr. + unsafe { + self.ptr + .as_ptr() + .cast::>>() + .sub(2) + .as_ref() + } + } + + /// Get or lazily initialize the shared waiter map for this data block. + /// + /// On first call, allocates a new `SharedWaiterMap` and attempts to + /// store it via compare-and-swap. If another thread wins the race, + /// the locally allocated map is dropped and the winner's map is used. + /// + /// ## Safety + /// + /// Must not be a dangling SharedDataBlock. + pub(crate) unsafe fn get_or_init_waiters(&self) -> &Mutex { + // SAFETY: caller guarantees non-dangling. + let waiters_atomic = unsafe { self.get_waiters_ptr() }; + let current = waiters_atomic.load(Ordering::Acquire); + if !current.is_null() { + // SAFETY: non-null means it was previously initialized; the + // buffer RC keeps the allocation alive. + return unsafe { &*current }; + } + + let new_map = Box::into_raw(Box::default()); + match waiters_atomic.compare_exchange( + core::ptr::null_mut(), + new_map, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // We won the race; our map is now the canonical one. + // SAFETY: we just stored it and the buffer RC keeps it alive. + unsafe { &*new_map } + } + Err(winner) => { + // Another thread already initialized the waiters pointer. + // Drop our allocation and use theirs. + // SAFETY: new_map was just allocated by us and never shared. + let _ = unsafe { Box::from_raw(new_map) }; + // SAFETY: winner is the non-null pointer stored by the + // winning thread; the buffer RC keeps it alive. + unsafe { &*winner } + } + } + } + + /// Get the shared waiter map if it has been initialized. + /// + /// Returns `None` if no thread has ever called `get_or_init_waiters` on + /// this block (i.e. no `Atomics.wait` / `Atomics.waitAsync` has occurred). + /// + /// ## Safety + /// + /// Must not be a dangling SharedDataBlock. + pub(crate) unsafe fn get_waiters(&self) -> Option<&Mutex> { + // SAFETY: caller guarantees non-dangling. + let waiters_atomic = unsafe { self.get_waiters_ptr() }; + let current = waiters_atomic.load(Ordering::Acquire); + if current.is_null() { + None + } else { + // SAFETY: non-null means it was previously initialized; the + // buffer RC keeps the allocation alive. + Some(unsafe { &*current }) + } + } + /// Read a value at the given aligned offset and with the given ordering. /// /// Returns `None` if the offset is not correctly aligned or the index is diff --git a/tests/expectations.json b/tests/expectations.json index 3c5e54df2..161c5d5d4 100644 --- a/tests/expectations.json +++ b/tests/expectations.json @@ -7065,4 +7065,4 @@ "staging/sm/syntax/yield-as-identifier.js": "FAIL", "staging/source-phase-imports/import-source-source-text-module.js": "FAIL", "staging/top-level-await/tla-hang-entry.js": "FAIL" -} \ No newline at end of file +} diff --git a/tests/metrics.json b/tests/metrics.json index 9cbff87df..a98986a42 100644 --- a/tests/metrics.json +++ b/tests/metrics.json @@ -2,10 +2,10 @@ "results": { "crash": 52, "fail": 6959, - "pass": 40341, + "pass": 40349, "skip": 3326, "timeout": 18, "unresolved": 37 }, "total": 50733 -} \ No newline at end of file +}