diff --git a/CMakeLists.txt b/CMakeLists.txt index 24dbb37..27f5779 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,6 +19,7 @@ target_include_directories(boost_lockfree INTERFACE include) if (CMAKE_VERSION VERSION_GREATER_EQUAL 3.23 AND BOOST_LOCKFREE_USE_FILE_SET) set(Headers + include/boost/lockfree/mpsc_queue.hpp include/boost/lockfree/spsc_queue.hpp include/boost/lockfree/spsc_value.hpp include/boost/lockfree/policies.hpp diff --git a/doc/lockfree.qbk b/doc/lockfree.qbk index 9c5ef30..d8a2d8b 100644 --- a/doc/lockfree.qbk +++ b/doc/lockfree.qbk @@ -1,7 +1,7 @@ [library Boost.Lockfree [quickbook 1.4] [authors [Blechmann, Tim]] - [copyright 2008-2011 Tim Blechmann] + [copyright 2008-2026 Tim Blechmann] [category algorithms] [purpose lockfree concurrent data structures @@ -115,15 +115,19 @@ lock-freedom: [h2 Data Structures] -_lockfree_ implements four lock-free data structures: +_lockfree_ implements five lock-free data structures: [variablelist [[[classref boost::lockfree::queue]] - [a lock-free multi-producer/multi-consumer queue] + [a lock-free multi-producer/multi-consumer queue (Based on Michael & Scott's algorithm)] + ] + + [[[classref boost::lockfree::mpsc_queue]] + [a multi-producer/single-consumer queue (Based on Dmitry Vyukov's algorithm)] ] [[[classref boost::lockfree::stack]] - [a lock-free multi-producer/multi-consumer stack] + [a lock-free multi-producer/multi-consumer stack (Based on Treiber's algorithm)] ] [[[classref boost::lockfree::spsc_queue]] @@ -278,6 +282,7 @@ _lockfree_ requires a c++14 compliant compiler. Users of MSVC are strongly recom # [@http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.37.3574 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Michael Scott and Maged Michael], In Symposium on Principles of Distributed Computing, pages 267–275, 1996. # [@http://books.google.com/books?id=pFSwuqtJgxYC M. Herlihy & Nir Shavit. The Art of Multiprocessor Programming], Morgan Kaufmann Publishers, 2008 +# [@http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue Multiple Producer Single Consumer Lock-Free Queue by Dmitry Vyukov], 2010 [endsect] diff --git a/include/boost/lockfree/detail/freelist.hpp b/include/boost/lockfree/detail/freelist.hpp index 15c2e37..e392d83 100644 --- a/include/boost/lockfree/detail/freelist.hpp +++ b/include/boost/lockfree/detail/freelist.hpp @@ -520,6 +520,14 @@ class fixed_size_freelist : NodeStorage deallocate< ThreadSafe >( index ); } + template < bool ThreadSafe > + void destruct( index_t index ) + { + T* n = NodeStorage::nodes() + index; + n->~T(); + deallocate< ThreadSafe >( index ); + } + template < bool ThreadSafe > void destruct( T* n ) { @@ -673,6 +681,81 @@ using select_freelist_t = typename select_freelist< T, Alloc, IsCompileTimeSized //---------------------------------------------------------------------------------------------------------------------- +template < typename T, typename Alloc = std::allocator< T > > +class alignas( cacheline_bytes ) direct_allocator : Alloc +{ +public: + typedef T* index_t; + typedef T* tagged_node_handle; + + template < typename Allocator > + direct_allocator( Allocator const& alloc, std::size_t = 0 ) : + Alloc( alloc ) + {} + + template < bool ThreadSafe > + void reserve( std::size_t ) + {} + + template < bool ThreadSafe, bool Bounded > + T* construct( void ) + { + T* node = Alloc::allocate( 1 ); + if ( node ) + new ( node ) T(); + return node; + } + + template < bool ThreadSafe, bool Bounded, typename ArgumentType > + T* construct( ArgumentType&& arg ) + { + T* node = Alloc::allocate( 1 ); + if ( node ) + new ( node ) T( std::forward< ArgumentType >( arg ) ); + return node; + } + + template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 > + T* construct( ArgumentType1&& arg1, ArgumentType2&& arg2 ) + { + T* node = Alloc::allocate( 1 ); + if ( node ) + new ( node ) T( std::forward< ArgumentType1 >( arg1 ), std::forward< ArgumentType2 >( arg2 ) ); + return node; + } + + template < bool ThreadSafe > + void destruct( tagged_node_handle node ) + { + if ( node ) { + node->~T(); + Alloc::deallocate( node, 1 ); + } + } + + bool is_lock_free( void ) const + { + return false; + } + + T* get_handle( T* pointer ) const + { + return pointer; + } + + T* get_pointer( T* tptr ) const + { + return tptr; + } + + T* null_handle( void ) const + { + return nullptr; + } +}; + +//---------------------------------------------------------------------------------------------------------------------- + template < typename T, bool IsNodeBased > struct select_tagged_handle { diff --git a/include/boost/lockfree/detail/parameter.hpp b/include/boost/lockfree/detail/parameter.hpp index 12c42a7..608595d 100644 --- a/include/boost/lockfree/detail/parameter.hpp +++ b/include/boost/lockfree/detail/parameter.hpp @@ -82,6 +82,9 @@ template < typename bound_args, bool default_ = false > using extract_allow_multiple_reads = extract_integral_arg_or_default_t< bound_args, tag::allow_multiple_reads, bool, default_ >; +template < typename bound_args, bool default_ = false > +using extract_freelist = extract_integral_arg_or_default_t< bound_args, tag::freelist, bool, default_ >; + //---------------------------------------------------------------------------------------------------------------------- }}} // namespace boost::lockfree::detail diff --git a/include/boost/lockfree/mpsc_queue.hpp b/include/boost/lockfree/mpsc_queue.hpp new file mode 100644 index 0000000..fa2e1db --- /dev/null +++ b/include/boost/lockfree/mpsc_queue.hpp @@ -0,0 +1,671 @@ +// lock-free multi-producer/single-consumer queue +// based on Dmitry Vyukov's MPSC queue algorithm +// +// Copyright (C) 2026 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_LOCKFREE_MPSC_QUEUE_HPP_INCLUDED +#define BOOST_LOCKFREE_MPSC_QUEUE_HPP_INCLUDED + +#include +#ifdef BOOST_HAS_PRAGMA_ONCE +# pragma once +#endif + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + + +#if defined( _MSC_VER ) +# pragma warning( push ) +# pragma warning( disable : 4324 ) // structure was padded due to __declspec(align()) +#endif + +#if defined( BOOST_INTEL ) && ( BOOST_INTEL_CXX_VERSION > 1000 ) +# pragma warning( push ) +# pragma warning( disable : 488 ) // template parameter unused in declaring parameter types +#endif + + +namespace boost { namespace lockfree { + +#ifndef BOOST_DOXYGEN_INVOKED +namespace detail { + +typedef parameter::parameters< boost::parameter::optional< tag::allocator >, + boost::parameter::optional< tag::capacity >, + boost::parameter::optional< tag::freelist >, + boost::parameter::optional< tag::fixed_sized > > + mpsc_queue_signature; + +} /* namespace detail */ +#endif + + +/** Multi-producer/single-consumer queue based on Dmitry Vyukov's MPSC algorithm. + * + * Push and pop are wait-free. The algorithm is not lock-free in the strict + * sense, though this is more a theoretical problem (see Limitations). + * + * By default, a freelist manages node memory. Freed nodes are recycled + * internally rather than returned to the OS until the queue is destroyed. + * + * \par Limitations + * A good writeup of the limitations of this algorithm is available at + * Ode to a Vyukov Queue. + * + * - \b Push atomicity: push exchanges the tail then links the previous + * node. If a producer thread terminates between these steps, the queue + * becomes inconsistent and stops delivering elements. + * - \b Non-linearizable: per-producer FIFO is preserved, but no global + * ordering across concurrent producers exists. pop may return false + * even if the queue is non-empty when a concurrent push is in progress. + * + * \par Policies + * - \ref boost::lockfree::freelist (default \c true):\n + * When enabled, freed nodes enter an internal freelist for reuse, + * avoiding repeated allocation. When disabled, the allocator handles + * every allocation and deallocation. Allocator operations may not be + * lock-free. + * - \ref boost::lockfree::fixed_sized (default \c false):\n + * When enabled, push never performs dynamic allocation, guaranteeing + * lock-free behavior. Nodes reside in a fixed array addressed by + * index. Capacity is limited to the index type's range (typically + * 2^16-2). This is required for lock-free operation on platforms + * lacking double-width CAS. + * - \ref boost::lockfree::capacity (optional):\n + * Sets queue size at compile time. Implies \c fixed_sized. + * - \ref boost::lockfree::allocator (default \c std::allocator). + * + * \par Requirements + * - T must be move-constructible. + * - T must be move-assignable. + * + * \note Only one consumer thread is supported. Multiple producer threads + * are supported. + */ +template < typename T, typename... Options > +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + requires( std::is_move_constructible_v< T >, std::is_move_assignable_v< T > ) +#endif +class mpsc_queue +{ +private: +#ifndef BOOST_DOXYGEN_INVOKED + + typedef typename detail::mpsc_queue_signature::bind< Options... >::type bound_args; + + static constexpr bool use_freelist_default = true; + static constexpr bool use_freelist = detail::extract_freelist< bound_args, use_freelist_default >::value; + + static constexpr bool has_capacity = detail::extract_capacity< bound_args >::has_capacity; + static constexpr size_t capacity + = detail::extract_capacity< bound_args >::capacity + 1; // the queue uses one dummy node + static constexpr bool fixed_sized = detail::extract_fixed_sized< bound_args >::value; + static constexpr bool node_based = !use_freelist || !( has_capacity || fixed_sized ); + static constexpr bool compile_time_sized = use_freelist && has_capacity; + + static constexpr bool capacity_without_freelist = has_capacity && !use_freelist; + static_assert( capacity_without_freelist == false, + "capacity<> argument cannot be used without freelist<> argument" ); + + static constexpr bool can_reserve = use_freelist; + + struct BOOST_MAY_ALIAS node + { + typedef typename detail::select_tagged_handle< node, node_based >::handle_type handle_type; + + template < typename TagT > + node( T const& v, handle_type null_handle, TagT /*next_tag*/ ) : + data( v ), + next( null_handle ) + {} + + template < typename TagT > + node( T&& v, handle_type null_handle, TagT /*next_tag*/ ) : + data( std::move( v ) ), + next( null_handle ) + {} + + template < typename TagT > + node( handle_type null_handle, TagT /*next_tag*/ ) : + next( null_handle ) + {} + + atomic< handle_type > next; + T data; + }; + + typedef detail::extract_allocator_t< bound_args, node > node_allocator; + typedef std::conditional_t< use_freelist, + detail::select_freelist_t< node, node_allocator, compile_time_sized, fixed_sized, capacity >, + detail::direct_allocator< node, node_allocator > > + pool_t; + typedef typename pool_t::tagged_node_handle tagged_node_handle; + typedef typename detail::select_tagged_handle< node, node_based >::handle_type handle_type; + + void initialize() + { + node* n = pool.template construct< true, false >( pool.null_handle() ); + handle_type dummy_handle = pool.get_handle( n ); + tail_ = dummy_handle; + head_.store( dummy_handle, memory_order_release ); + } + + struct implementation_defined + { + typedef node_allocator allocator; + typedef std::size_t size_type; + }; + +#endif + +public: + typedef T value_type; + typedef typename implementation_defined::allocator allocator; + typedef typename implementation_defined::size_type size_type; + + /** + * \returns true if the implementation is lock-free. + * + * \warning Only checks whether the head, tail, and freelist can be + * modified in a lock-free manner. On most platforms the entire + * implementation is lock-free when this returns true. + */ + bool is_lock_free() const + { + return head_.is_lock_free() && pool.is_lock_free(); + } + + /** Construct a queue. + * + * Requires a capacity<> argument when freelist is enabled. + */ + mpsc_queue() +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + requires( has_capacity || !use_freelist ) +#endif + : + pool( node_allocator(), capacity ) + { + BOOST_ASSERT( has_capacity || !use_freelist ); + initialize(); + } + + /** Construct a fixed-sized queue with a custom allocator. + * + * Requires a capacity<> argument when freelist is enabled. + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + template < typename U > + requires( has_capacity || !use_freelist ) +#else + template < typename U, typename Enabler = std::enable_if< has_capacity || !use_freelist > > +#endif + explicit mpsc_queue( typename boost::allocator_rebind< node_allocator, U >::type const& alloc ) : + pool( alloc, capacity ) + { + initialize(); + } + + /** Construct a fixed-sized queue with a custom allocator. + * + * Requires a capacity<> argument when freelist is enabled. + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + explicit mpsc_queue( allocator const& alloc ) + requires( has_capacity || !use_freelist ) +#else + template < typename Enabler = std::enable_if< has_capacity || !use_freelist > > + explicit mpsc_queue( allocator const& alloc ) +#endif + : + pool( alloc, capacity ) + { + initialize(); + } + + /** Construct a variable-sized queue. + * + * Allocates \c n nodes initially for the freelist. + * + * Requires that no capacity<> argument is specified and that + * freelist is enabled. + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + explicit mpsc_queue( size_type n ) + requires( !has_capacity && use_freelist ) +#else + template < typename Enabler = std::enable_if< !has_capacity && use_freelist > > + explicit mpsc_queue( size_type n ) +#endif + : + pool( node_allocator(), n + 1 ) + { + initialize(); + } + + /** Construct a variable-sized queue with a custom allocator. + * + * Allocates \c n nodes initially for the freelist. + * + * Requires that no capacity<> argument is specified and that + * freelist is enabled. + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + mpsc_queue( size_type n, allocator const& alloc ) + requires( !has_capacity && use_freelist ) +#else + template < typename Enabler = std::enable_if< !has_capacity && use_freelist > > + mpsc_queue( size_type n, allocator const& alloc ) +#endif + : + pool( alloc, n + 1 ) + { + initialize(); + } + + mpsc_queue( const mpsc_queue& ) = delete; + mpsc_queue& operator=( const mpsc_queue& ) = delete; + mpsc_queue( mpsc_queue&& ) = delete; + mpsc_queue& operator=( mpsc_queue&& ) = delete; + + /** \copydoc boost::lockfree::stack::reserve + * */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + void reserve( size_type n ) + requires( can_reserve ) +#else + template < typename Enabler = std::enable_if< can_reserve > > + void reserve( size_type n ) +#endif + { + pool.template reserve< true >( n ); + } + + /** \copydoc boost::lockfree::stack::reserve_unsafe + * */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + void reserve_unsafe( size_type n ) + requires( can_reserve ) +#else + template < typename Enabler = std::enable_if< can_reserve > > + void reserve_unsafe( size_type n ) +#endif + { + pool.template reserve< false >( n ); + } + + /** Destroys the queue and frees all nodes from the freelist. + */ + ~mpsc_queue() + { + consume_all( []( T&& ) {} ); + + pool.template destruct< false >( head_.load( memory_order_relaxed ) ); + } + + /** Check whether the queue is empty. + * + * \returns true if the queue is empty, false otherwise. + * + * \note The result is only accurate when no other thread modifies the + * queue concurrently. + */ + bool empty() const + { + return head_.load( memory_order_acquire ) == tail_; + } + + /** Pushes a value to the queue. + * + * \returns true if the value was pushed, false if node allocation failed. + * + * \note Thread-safe. Multiple producers may call push concurrently. + * When the memory pool is exhausted and the pool is not fixed-sized, + * a new node is allocated via the allocator, which may not be + * lock-free. + */ + bool push( const T& t ) + { + return do_push< false >( t ); + } + + /// \copydoc boost::lockfree::mpsc_queue::push(const T & t) + bool push( T&& t ) + { + return do_push< false >( std::forward< T >( t ) ); + } + + /** Pushes a value to the queue without allocating. + * + * \returns true if the value was pushed, false if the freelist is empty. + * + * \note Thread-safe. Multiple producers may call bounded_push + * concurrently. Unlike push, bounded_push never allocates memory; + * it fails when the freelist is exhausted. + * \throws std::bad_alloc if the allocator throws during node + * construction. + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + bool bounded_push( const T& t ) + requires( use_freelist ) +#else + template < typename Enabler = std::enable_if< use_freelist > > + bool bounded_push( const T& t ) +#endif + { + return do_push< true >( t ); + } + + /// \copydoc boost::lockfree::mpsc_queue::bounded_push(const T & t) +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + bool bounded_push( T&& t ) + requires( use_freelist ) +#else + template < typename Enabler = std::enable_if< use_freelist > > + bool bounded_push( T&& t ) +#endif + { + return do_push< true >( std::forward< T >( t ) ); + } + + +private: +#ifndef BOOST_DOXYGEN_INVOKED + template < bool Bounded > + bool do_push( T&& t ) + { + node* n = pool.template construct< true, Bounded >( std::forward< T >( t ), pool.null_handle() ); + return do_push_node( n ); + } + + template < bool Bounded > + bool do_push( T const& t ) + { + node* n = pool.template construct< true, Bounded >( t, pool.null_handle() ); + return do_push_node( n ); + } + + bool do_push_node( node* n ) + { + if ( n == nullptr ) + return false; + + handle_type node_handle = pool.get_handle( n ); + + handle_type old_head = head_.exchange( node_handle, memory_order_acq_rel ); + node* old_head_ptr = pool.get_pointer( old_head ); + old_head_ptr->next.store( node_handle, memory_order_release ); + return true; + } +#endif + +public: + /** Pushes a value to the queue without synchronization. + * + * \returns true if the value was pushed, false if node allocation failed. + * + * \note Not thread-safe. Only one producer thread should call + * unsynchronized_push. When the memory pool is exhausted and + * the pool is not fixed-sized, a new node is allocated via the + * allocator, which may not be lock-free. + * \throws std::bad_alloc if the allocator throws during node + * construction. + */ + bool unsynchronized_push( const T& t ) + { + return unsynchronized_push_impl( t ); + } + + /// \copydoc boost::lockfree::mpsc_queue::unsynchronized_push(const T& t) + bool unsynchronized_push( T&& t ) + { + return unsynchronized_push_impl( std::forward< T >( t ) ); + } + +private: +#ifndef BOOST_DOXYGEN_INVOKED + template < typename U > + bool unsynchronized_push_impl( U&& t ) + { + node* n = pool.template construct< false, false >( std::forward< U >( t ), pool.null_handle() ); + + if ( n == nullptr ) + return false; + + handle_type node_handle = pool.get_handle( n ); + + handle_type old_head = head_.load( memory_order_relaxed ); + node* old_head_ptr = pool.get_pointer( old_head ); + old_head_ptr->next.store( node_handle, memory_order_relaxed ); + head_.store( node_handle, memory_order_release ); + return true; + } + +#endif +public: + /** Pops an element from the queue. + * + * \returns true if an element was popped, false if the queue was empty. + * + * \note Thread-safe and wait-free. Only one consumer thread should call + * pop. The output argument may be modified even when the operation + * fails. pop may return false even if the queue is non-empty when + * a concurrent push is in progress (see Limitations). + */ + bool pop( T& ret ) + { + return pop< T >( ret ); + } + + /** Pops an element from the queue. + * + * \tparam U type to receive the popped value; U must be constructible + * from T (explicit or implicit). + * \returns true if an element was popped, false if the queue was empty. + * + * \note Thread-safe and wait-free. Only one consumer thread should call + * pop. The output argument may be modified even when the operation + * fails. pop may return false even if the queue is non-empty when + * a concurrent push is in progress (see Limitations). + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + template < typename U > + requires( std::is_constructible_v< U, T && > ) +#else + template < typename U, typename Enabler = std::enable_if_t< std::is_constructible< U, T&& >::value > > +#endif + bool pop( U& ret ) + { + return consume_one( [ & ]( T&& arg ) { + ret = U( std::forward< T >( arg ) ); + } ); + } + +#if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED ) + /** Pops an element, returning a std::optional. + * + * \returns std::optional containing the value on success, + * std::nullopt if the queue is empty. + * + * \note Thread-safe and wait-free. Only one consumer thread should call + * this overload. + */ + std::optional< T > pop( uses_optional_t ) + { + T to_dequeue; + if ( pop( to_dequeue ) ) + return to_dequeue; + else + return std::nullopt; + } + + /** Pops an element, returning a std::optional. + * + * \tparam U type to receive the popped value; T must be convertible to U. + * \returns std::optional<U> containing the value on success, + * std::nullopt if the queue is empty. + * + * \note Thread-safe and wait-free. Only one consumer thread should call + * this overload. + */ + template < typename U > + std::optional< U > pop( uses_optional_t ) + { + U to_dequeue; + if ( pop( to_dequeue ) ) + return to_dequeue; + else + return std::nullopt; + } +#endif + + /** Pops an element without synchronization. + * + * \returns true if an element was popped, false if the queue was empty. + * + * \note Not thread-safe but wait-free. Only one consumer thread should + * call unsynchronized_pop. The output argument may be modified even + * when the operation fails. + */ + bool unsynchronized_pop( T& ret ) + { + return unsynchronized_pop< T >( ret ); + } + + /** Pops an element without synchronization. + * + * \tparam U type to receive the popped value; U must be constructible + * from T (explicit or implicit). + * \returns true if an element was popped, false if the queue was empty. + * + * \note Not thread-safe but wait-free. Only one consumer thread should + * call unsynchronized_pop. + */ +#if !defined( BOOST_NO_CXX20_HDR_CONCEPTS ) + template < typename U > + requires( std::is_constructible_v< U, T && > ) +#else + template < typename U, typename Enabler = std::enable_if_t< std::is_constructible< U, T&& >::value > > +#endif + bool unsynchronized_pop( U& ret ) + { + return unsynchronized_consume_one( [ & ]( T&& arg ) { + ret = U( std::forward< T >( arg ) ); + } ); + } + + /** Consumes one element via a functor. + * + * Pops one element and applies the functor to it. + * + * \returns true if an element was consumed. + * + * \note Thread-safe and wait-free when the functor is thread-safe and + * non-blocking. Only one consumer thread should call this. + */ + template < typename Functor > + bool consume_one( Functor&& f ) + { + handle_type tail = tail_; + node* tail_ptr = pool.get_pointer( tail ); + handle_type next = tail_ptr->next.load( memory_order_acquire ); + node* next_ptr = pool.get_pointer( next ); + + if ( next_ptr == 0 ) { + handle_type head = head_.load( memory_order_acquire ); + if ( tail == head ) + return false; + + return false; + } + + f( std::move( next_ptr->data ) ); + + tail_ = next; + pool.template destruct< true >( tail ); + return true; + } + + /** Consumes one element via a functor without synchronization. + * + * Pops one element and applies the functor to it. + * + * \returns true if an element was consumed. + * + * \note Not thread-safe but wait-free. Only one consumer thread should + * call this. + */ + template < typename Functor > + bool unsynchronized_consume_one( Functor&& f ) + { + handle_type tail = tail_; + node* tail_ptr = pool.get_pointer( tail ); + handle_type next = tail_ptr->next.load( memory_order_relaxed ); + node* next_ptr = pool.get_pointer( next ); + + if ( next_ptr == 0 ) { + handle_type head = head_.load( memory_order_relaxed ); + if ( tail == head ) + return false; + + return false; + } + + f( std::move( next_ptr->data ) ); + + tail_ = next; + pool.template destruct< false >( tail ); + return true; + } + + /** Consumes all elements via a functor. + * + * Sequentially pops all elements and applies the functor to each. + * + * \returns the number of elements consumed. + * + * \note Thread-safe and wait-free when the functor is thread-safe and + * non-blocking. Only one consumer thread should call this. + */ + template < typename Functor > + size_t consume_all( Functor&& f ) + { + size_t element_count = 0; + while ( consume_one( f ) ) + element_count += 1; + + return element_count; + } + +private: +#ifndef BOOST_DOXYGEN_INVOKED + atomic< handle_type > head_ {}; + alignas( detail::cacheline_bytes ) handle_type tail_ {}; + + pool_t pool; +#endif +}; + +}} // namespace boost::lockfree + +#if ( defined( BOOST_INTEL ) && ( BOOST_INTEL_CXX_VERSION > 1000 ) ) || defined( _MSC_VER ) +# pragma warning( pop ) +#endif + +#endif /* BOOST_LOCKFREE_MPSC_QUEUE_HPP_INCLUDED */ diff --git a/include/boost/lockfree/policies.hpp b/include/boost/lockfree/policies.hpp index 037efb5..faab84c 100644 --- a/include/boost/lockfree/policies.hpp +++ b/include/boost/lockfree/policies.hpp @@ -26,6 +26,7 @@ struct allocator; struct fixed_sized; struct capacity; struct allow_multiple_reads; +struct freelist; } // namespace tag template < bool IsFixedSized > @@ -45,6 +46,10 @@ struct allow_multiple_reads : boost::parameter::template_keyword< tag::allow_multiple_reads, std::integral_constant< bool, AllowMultipleReads > > {}; +template < bool UseFreelist > +struct freelist : boost::parameter::template_keyword< tag::freelist, std::integral_constant< bool, UseFreelist > > +{}; + #else /** Configures a data structure as \b fixed-sized. @@ -77,6 +82,14 @@ struct allocator; template < bool AllowMultipleReads > struct allow_multiple_reads; +/** Configures the mpmc_queue to use a freelist. + * + * The mpmc_queue can be configured to use a freelist for memory management. + * + * */ +template < bool UseFreelist > +struct freelist; + #endif }} // namespace boost::lockfree diff --git a/include/boost/lockfree/queue.hpp b/include/boost/lockfree/queue.hpp index f04d96a..84da032 100644 --- a/include/boost/lockfree/queue.hpp +++ b/include/boost/lockfree/queue.hpp @@ -120,9 +120,7 @@ class queue next( tagged_node_handle( null_handle, next_tag ) ) {} - template < typename TagT > - node( TagT /*next_tag*/ ) - {} + node() = delete; alignas( detail::cacheline_bytes ) atomic< tagged_node_handle > next; T data; diff --git a/include/boost/lockfree/stack.hpp b/include/boost/lockfree/stack.hpp index 43512e2..fc52f88 100644 --- a/include/boost/lockfree/stack.hpp +++ b/include/boost/lockfree/stack.hpp @@ -93,6 +93,8 @@ class stack v( std::forward< T >( val ) ) {} + node() = delete; + typedef typename detail::select_tagged_handle< node, node_based >::handle_type handle_t; handle_t next; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 728fa2f..075d9ed 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -22,6 +22,9 @@ endif() set(Tests destructor_test freelist_test + mpsc_queue_comprehensive_stress_test + mpsc_queue_stress_test + mpsc_queue_test queue_bounded_stress_test queue_comprehensive_stress_test queue_fixedsize_stress_test diff --git a/test/mpsc_queue_comprehensive_stress_test.cpp b/test/mpsc_queue_comprehensive_stress_test.cpp new file mode 100644 index 0000000..cfb43ac --- /dev/null +++ b/test/mpsc_queue_comprehensive_stress_test.cpp @@ -0,0 +1,29 @@ +// Copyright (C) 2026 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +#include "test_common.hpp" + +namespace { + +using comprehensive_stress_tester_mpsc = comprehensive_stress_tester< 10, 1 >; + +} // namespace + +BOOST_AUTO_TEST_CASE( mpsc_queue_comprehensive_stress_1_consumer ) +{ + comprehensive_stress_tester_mpsc tester; + boost::lockfree::mpsc_queue< int > q( 128 ); + tester.run( q ); +} diff --git a/test/mpsc_queue_stress_test.cpp b/test/mpsc_queue_stress_test.cpp new file mode 100644 index 0000000..d8ba535 --- /dev/null +++ b/test/mpsc_queue_stress_test.cpp @@ -0,0 +1,184 @@ +// Copyright (C) 2024 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +#include "test_common.hpp" +#include "test_helpers.hpp" + +#include +#include +#include +#include +#ifdef __VXWORKS__ +# include +#endif + +#include +#include + +namespace impl { + +template < bool Bounded = false > +struct mpsc_queue_stress_tester +{ + static const unsigned int buckets = 1 << 13; +#ifndef BOOST_LOCKFREE_STRESS_TEST + static const long node_count = 5000; +#else + static const long node_count = 5000000; +#endif + const int writer_threads; + + std::atomic< int > writers_finished; + + static_hashed_set< long, buckets > data; + static_hashed_set< long, buckets > dequeued; + std::array< std::set< long >, buckets > returned; + + std::atomic< int > push_count, pop_count; + + explicit mpsc_queue_stress_tester( int writer ) : + writer_threads( writer ), + push_count( 0 ), + pop_count( 0 ) + {} + + template < typename queue > + void add_items( queue& q ) + { + for ( long i = 0; i != node_count; ++i ) { + long id = generate_id< long >(); + + bool inserted = data.insert( id ); + assert( inserted ); + (void)inserted; + + if ( Bounded ) + while ( q.bounded_push( id ) == false ) { +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + } + else + while ( q.push( id ) == false ) { +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + } + ++push_count; + } + writers_finished += 1; + } + + std::atomic< bool > running; + + template < typename queue > + bool consume_element( queue& q ) + { + long id; + bool ret = q.pop( id ); + + if ( !ret ) + return false; + + bool erased = data.erase( id ); + bool inserted = dequeued.insert( id ); + (void)erased; + (void)inserted; + assert( erased ); + assert( inserted ); + ++pop_count; + return true; + } + + template < typename queue > + void get_items( queue& q ) + { + for ( ;; ) { + bool received_element = consume_element( q ); + if ( received_element ) + continue; + + if ( writers_finished.load() == writer_threads ) + break; + +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + } + + while ( consume_element( q ) ) + ; + } + + template < typename queue > + void run( queue& q ) + { + BOOST_WARN( q.is_lock_free() ); + writers_finished.store( 0 ); + + boost::thread_group writer; + boost::thread reader; + + BOOST_TEST_REQUIRE( q.empty() ); + + reader = boost::thread( [ & ] { + get_items( q ); + } ); + + for ( int i = 0; i != writer_threads; ++i ) + writer.create_thread( [ & ] { + add_items( q ); + } ); + + std::cout << "threads created" << std::endl; + + writer.join_all(); + + std::cout << "writer threads joined, waiting for readers" << std::endl; + + reader.join(); + + std::cout << "reader thread joined" << std::endl; + + BOOST_TEST_REQUIRE( data.count_nodes() == (size_t)0 ); + BOOST_TEST_REQUIRE( q.empty() ); + + BOOST_TEST_REQUIRE( push_count == pop_count ); + BOOST_TEST_REQUIRE( push_count == writer_threads * node_count ); + } +}; + +} // namespace impl + +using impl::mpsc_queue_stress_tester; + + +BOOST_AUTO_TEST_CASE( mpsc_queue_test_unbounded ) +{ + typedef mpsc_queue_stress_tester< false > tester_type; + std::unique_ptr< tester_type > tester( new tester_type( 4 ) ); + + boost::lockfree::mpsc_queue< long > q( 128 ); + tester->run( q ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_test_unbounded_many_writers ) +{ + typedef mpsc_queue_stress_tester< false > tester_type; + std::unique_ptr< tester_type > tester( new tester_type( 1 ) ); + + boost::lockfree::mpsc_queue< long > q( 128 ); + tester->run( q ); +} diff --git a/test/mpsc_queue_test.cpp b/test/mpsc_queue_test.cpp new file mode 100644 index 0000000..9671296 --- /dev/null +++ b/test/mpsc_queue_test.cpp @@ -0,0 +1,410 @@ +// Copyright (C) 2024 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +#include + + +using namespace boost::lockfree; + +BOOST_AUTO_TEST_CASE( simple_mpsc_queue_test ) +{ + mpsc_queue< int > f( 64 ); + + BOOST_TEST_WARN( f.is_lock_free() ); + + BOOST_TEST_REQUIRE( f.empty() ); + f.push( 1 ); + f.push( 2 ); + + int i1( 0 ), i2( 0 ); + + BOOST_TEST_REQUIRE( f.pop( i1 ) ); + BOOST_TEST_REQUIRE( i1 == 1 ); + + BOOST_TEST_REQUIRE( f.pop( i2 ) ); + BOOST_TEST_REQUIRE( i2 == 2 ); + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( simple_mpsc_queue_test_capacity ) +{ + mpsc_queue< int, capacity< 64 > > f; + + BOOST_TEST_WARN( f.is_lock_free() ); + + BOOST_TEST_REQUIRE( f.empty() ); + f.push( 1 ); + f.push( 2 ); + + int i1( 0 ), i2( 0 ); + + BOOST_TEST_REQUIRE( f.pop( i1 ) ); + BOOST_TEST_REQUIRE( i1 == 1 ); + + BOOST_TEST_REQUIRE( f.pop( i2 ) ); + BOOST_TEST_REQUIRE( i2 == 2 ); + BOOST_TEST_REQUIRE( f.empty() ); +} + + +BOOST_AUTO_TEST_CASE( unsafe_mpsc_queue_test ) +{ + mpsc_queue< int > f( 64 ); + + BOOST_TEST_WARN( f.is_lock_free() ); + BOOST_TEST_REQUIRE( f.empty() ); + + int i1( 0 ), i2( 0 ); + + f.unsynchronized_push( 1 ); + f.unsynchronized_push( 2 ); + + BOOST_TEST_REQUIRE( f.unsynchronized_pop( i1 ) ); + BOOST_TEST_REQUIRE( i1 == 1 ); + + BOOST_TEST_REQUIRE( f.unsynchronized_pop( i2 ) ); + BOOST_TEST_REQUIRE( i2 == 2 ); + BOOST_TEST_REQUIRE( f.empty() ); +} + + +BOOST_AUTO_TEST_CASE( mpsc_queue_consume_one_test ) +{ + mpsc_queue< int > f( 64 ); + + BOOST_TEST_WARN( f.is_lock_free() ); + BOOST_TEST_REQUIRE( f.empty() ); + + f.push( 1 ); + f.push( 2 ); + + bool success1 = f.consume_one( []( int i ) { + BOOST_TEST_REQUIRE( i == 1 ); + } ); + + bool success2 = f.consume_one( []( int i ) mutable { + BOOST_TEST_REQUIRE( i == 2 ); + } ); + + BOOST_TEST_REQUIRE( success1 ); + BOOST_TEST_REQUIRE( success2 ); + + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_consume_all_test ) +{ + mpsc_queue< int > f( 64 ); + + BOOST_TEST_WARN( f.is_lock_free() ); + BOOST_TEST_REQUIRE( f.empty() ); + + f.push( 1 ); + f.push( 2 ); + + size_t consumed = f.consume_all( []( int i ) {} ); + + BOOST_TEST_REQUIRE( consumed == 2u ); + + BOOST_TEST_REQUIRE( f.empty() ); +} + + +BOOST_AUTO_TEST_CASE( mpsc_queue_convert_pop_test ) +{ + mpsc_queue< int* > f( 128 ); + BOOST_TEST_REQUIRE( f.empty() ); + f.push( new int( 1 ) ); + f.push( new int( 2 ) ); + f.push( new int( 3 ) ); + f.push( new int( 4 ) ); + + { + int* i1; + + BOOST_TEST_REQUIRE( f.pop( i1 ) ); + BOOST_TEST_REQUIRE( *i1 == 1 ); + delete i1; + } + + + { + boost::shared_ptr< int > i2; + BOOST_TEST_REQUIRE( f.pop( i2 ) ); + BOOST_TEST_REQUIRE( *i2 == 2 ); + } + + { + std::unique_ptr< int > i3; + BOOST_TEST_REQUIRE( f.pop( i3 ) ); + + BOOST_TEST_REQUIRE( *i3 == 3 ); + } + + { + std::shared_ptr< int > i4; + BOOST_TEST_REQUIRE( f.pop( i4 ) ); + + BOOST_TEST_REQUIRE( *i4 == 4 ); + } + + + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( reserve_test ) +{ + typedef boost::lockfree::mpsc_queue< void* > memory_queue; + + memory_queue ms( 1 ); + ms.reserve( 1 ); + ms.reserve_unsafe( 1 ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_with_allocator ) +{ + using allocator_type = std::allocator< char >; + + using queue_t = boost::lockfree::mpsc_queue< char, boost::lockfree::allocator< allocator_type > >; + using queue_with_capacity_t + = boost::lockfree::mpsc_queue< char, boost::lockfree::allocator< allocator_type >, boost::lockfree::capacity< 16 > >; + + auto allocator = queue_t::allocator {}; + + { + queue_with_capacity_t q_with_allocator { + allocator, + }; + queue_t q_with_size_and_allocator { + 5, + allocator, + }; + } + { + queue_with_capacity_t q_with_allocator { + allocator_type {}, + }; + queue_t q_with_size_and_allocator { + 5, + allocator_type {}, + }; + } +} + +BOOST_AUTO_TEST_CASE( move_semantics ) +{ + boost::lockfree::mpsc_queue< int, boost::lockfree::capacity< 128 > > stk; + + stk.push( 0 ); + stk.push( 1 ); + + auto two = 2; + stk.push( std::move( two ) ); + + int out; + BOOST_TEST_REQUIRE( stk.pop( out ) ); + BOOST_TEST_REQUIRE( out == 0 ); + + stk.consume_one( []( int one ) { + BOOST_TEST_REQUIRE( one == 1 ); + } ); + + stk.consume_all( []( int ) {} ); +} + +#if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) + +BOOST_AUTO_TEST_CASE( mpsc_queue_uses_optional ) +{ + boost::lockfree::mpsc_queue< int > stk( 5 ); + + bool pop_to_nullopt = stk.pop( boost::lockfree::uses_optional ) == std::nullopt; + BOOST_TEST_REQUIRE( pop_to_nullopt ); + + stk.push( 53 ); + bool pop_to_optional = stk.pop( boost::lockfree::uses_optional ) == 53; + BOOST_TEST_REQUIRE( pop_to_optional ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_uses_optional_capacity ) +{ + boost::lockfree::mpsc_queue< int, boost::lockfree::capacity< 64 > > stk; + + bool pop_to_nullopt = stk.pop( boost::lockfree::uses_optional ) == std::nullopt; + BOOST_TEST_REQUIRE( pop_to_nullopt ); + + stk.push( 53 ); + bool pop_to_optional = stk.pop( boost::lockfree::uses_optional ) == 53; + BOOST_TEST_REQUIRE( pop_to_optional ); +} + +#endif + +BOOST_AUTO_TEST_CASE( fixed_size_mpsc_queue_test_exhausted ) +{ + mpsc_queue< int, capacity< 2 >, freelist< true > > f; + + BOOST_TEST_REQUIRE( f.push( 1 ) ); + BOOST_TEST_REQUIRE( f.push( 2 ) ); + BOOST_TEST_REQUIRE( !f.push( 3 ) ); + + int out; + BOOST_TEST_REQUIRE( f.pop( out ) ); + BOOST_TEST_REQUIRE( out == 1 ); + BOOST_TEST_REQUIRE( f.pop( out ) ); + BOOST_TEST_REQUIRE( out == 2 ); + BOOST_TEST_REQUIRE( !f.pop( out ) ); + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( bounded_mpsc_queue_test_exhausted ) +{ + mpsc_queue< int, freelist< true > > f( 2 ); + + BOOST_TEST_REQUIRE( f.bounded_push( 1 ) ); + BOOST_TEST_REQUIRE( f.bounded_push( 2 ) ); + BOOST_TEST_REQUIRE( !f.bounded_push( 3 ) ); + + int out; + BOOST_TEST_REQUIRE( f.pop( out ) ); + BOOST_TEST_REQUIRE( out == 1 ); + BOOST_TEST_REQUIRE( f.pop( out ) ); + BOOST_TEST_REQUIRE( out == 2 ); + BOOST_TEST_REQUIRE( !f.pop( out ) ); + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_unsynchronized_push_const_ref ) +{ + mpsc_queue< int > f( 64 ); + + BOOST_TEST_REQUIRE( f.empty() ); + + const int a = 42; + const int b = 43; + + f.unsynchronized_push( a ); + f.unsynchronized_push( b ); + + int i1( 0 ), i2( 0 ); + BOOST_TEST_REQUIRE( f.unsynchronized_pop( i1 ) ); + BOOST_TEST_REQUIRE( i1 == 42 ); + BOOST_TEST_REQUIRE( f.unsynchronized_pop( i2 ) ); + BOOST_TEST_REQUIRE( i2 == 43 ); + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_consume_one_capacity_test ) +{ + mpsc_queue< int, capacity< 64 > > f; + + BOOST_TEST_REQUIRE( f.empty() ); + + f.push( 10 ); + f.push( 20 ); + + bool success1 = f.consume_one( []( int i ) { + BOOST_TEST_REQUIRE( i == 10 ); + } ); + + bool success2 = f.consume_one( []( int i ) { + BOOST_TEST_REQUIRE( i == 20 ); + } ); + + BOOST_TEST_REQUIRE( success1 ); + BOOST_TEST_REQUIRE( success2 ); + BOOST_TEST_REQUIRE( !f.consume_one( []( int ) {} ) ); + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_consume_all_capacity_test ) +{ + mpsc_queue< int, capacity< 64 > > f; + + BOOST_TEST_REQUIRE( f.empty() ); + + f.push( 1 ); + f.push( 2 ); + f.push( 3 ); + + size_t consumed = f.consume_all( []( int ) {} ); + + BOOST_TEST_REQUIRE( consumed == 3u ); + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_empty_pop_test ) +{ + mpsc_queue< int > f( 64 ); + + int out = 0xDEAD; + BOOST_TEST_REQUIRE( !f.pop( out ) ); + BOOST_TEST_REQUIRE( !f.unsynchronized_pop( out ) ); + BOOST_TEST_REQUIRE( !f.consume_one( []( int ) {} ) ); + BOOST_TEST_REQUIRE( f.consume_all( []( int ) {} ) == 0u ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_push_pop_many ) +{ + mpsc_queue< int > f( 64 ); + + for ( int i = 0; i < 100; ++i ) + BOOST_TEST_REQUIRE( f.push( i ) ); + + for ( int i = 0; i < 100; ++i ) { + int out; + BOOST_TEST_REQUIRE( f.pop( out ) ); + BOOST_TEST_REQUIRE( out == i ); + } + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( mpsc_queue_push_pop_many_capacity ) +{ + mpsc_queue< int, capacity< 128 > > f; + + for ( int i = 0; i < 100; ++i ) + BOOST_TEST_REQUIRE( f.push( i ) ); + + for ( int i = 0; i < 100; ++i ) { + int out; + BOOST_TEST_REQUIRE( f.pop( out ) ); + BOOST_TEST_REQUIRE( out == i ); + } + BOOST_TEST_REQUIRE( f.empty() ); +} + +BOOST_AUTO_TEST_CASE( move_only_types ) +{ + boost::lockfree::mpsc_queue< std::unique_ptr< int >, boost::lockfree::capacity< 128 > > stk; + + stk.push( std::make_unique< int >( 0 ) ); + stk.push( std::make_unique< int >( 1 ) ); + + auto two = std::make_unique< int >( 2 ); + stk.push( std::move( two ) ); + + std::unique_ptr< int > out; + BOOST_TEST_REQUIRE( stk.pop( out ) ); + BOOST_TEST_REQUIRE( *out == 0 ); + + stk.consume_one( []( std::unique_ptr< int > one ) { + BOOST_TEST_REQUIRE( *one == 1 ); + } ); + + stk.consume_all( []( std::unique_ptr< int > ) {} ); +}