一个无锁队列

    科技2026-02-06  1

    在github上,发现一个非阻塞多消费者多生产者无锁队列,基于c++11实现,并且只有一个头文件,而这个头文件只有200多行代码。

    alignas、false sharing和true sharing

    alignas是c++11的一个关键字,限制内存地址的开始位置。 这里有个要注意的地方,用alignas修饰struct/class时,假设使用malloc分配了内存,在用new在此内存上构造一个对象,会出现不符合alignas要求的情况。

    struct alignas(256) A { int a_; }; int main() { void* p = malloc(sizeof(A)); A* pA = new (p) A(); /* 这里的内存开始地址就不一定符合alignas(256)要求 */ A* pB = new A(); /* 这里的内存开始地址符合alignas(256)要求 */ return 0; } /* 出现这种区别原因是因为new有多个重载版本,其中一个如下 */ void* __CRTDECL operator new( size_t _Size, std::align_val_t _Al );

    false sharing,简单来说就是当两个成员变量在同一个cache line时,其中一个成员变量被修改,会导致cache line无效,另一个成员变量就没法享受cache带来的好处了。所以有时候要避免false sharing。

    true sharing,如果一个结构体的sizeof的大小小于或等于cache line大小,那么这个结构体的成员间就存在true sharing。

    c++17提供了std::hardware_destructive_interference_size来避免false sharing,std::hardware_constructive_interference_size来检测是否存在true sharing。

    memory order

    memory_order_acquire——用于一个原子变量的load操作,所有读、写不能reorder到此load之前。并且,如果其他线程对同一个原子变量进行release的store操作,那么这个线程的所有写操作对于load操作所在的线程可见。

    memory_order_release——用于一个原子变量的store操作,所有读、写不能reorder到此store之后。并且,此store所在的线程的所有写操作对于对同一个原子变量进行acquire的load操作所在的线程可见。

    MPMCQueue

    MPMCQueue内部维护一个数组Slots,长度为Capacity,两个下标Head和Tail。Head与Tail只能向前,每次push时,Head加1;每次pop时,Tail加1。故而可以通过Head或者Tail除以数组长度得到现在是多少轮,通过Head或者Tail对数组长度求模得到数组的下标。数组Slots中每个元素都有一个Turn表明现在是多少轮,可读还是可写。还有一个T是用于保存队列的数据。

    假设数组满了的情况下,再尝试添加元素,结果是失败返回或者busy wait,不会覆写之前的数据。

    /* Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #pragma once #include <atomic> #include <cassert> #include <cstddef> // offsetof #include <limits> #include <memory> #include <new> // std::hardware_destructive_interference_size #include <stdexcept> #ifndef __cpp_aligned_new #ifdef _WIN32 #include <malloc.h> // _aligned_malloc #else #include <stdlib.h> // posix_memalign #endif #endif namespace rigtorp { namespace mpmc { #ifdef __cpp_lib_hardware_interference_size static constexpr size_t hardwareInterferenceSize = std::hardware_destructive_interference_size; #else static constexpr size_t hardwareInterferenceSize = 64; #endif #if defined(__cpp_aligned_new) template <typename T> using AlignedAllocator = std::allocator<T>; #else template <typename T> struct AlignedAllocator { using value_type = T; T *allocate(std::size_t n) { if (n > std::numeric_limits<std::size_t>::max() / sizeof(T)) { throw std::bad_array_new_length(); } #ifdef _WIN32 auto *p = static_cast<T *>(_aligned_malloc(sizeof(T) * n, alignof(T))); if (p == nullptr) { throw std::bad_alloc(); } #else T *p; if (posix_memalign(reinterpret_cast<void **>(&p), alignof(T), sizeof(T) * n) != 0) { throw std::bad_alloc(); } #endif return p; } void deallocate(T *p, std::size_t) { #ifdef WIN32 _aligned_free(p); #else free(p); #endif } }; #endif template <typename T> struct Slot { ~Slot() noexcept { if (turn & 1) { destroy(); } } template <typename... Args> void construct(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); new (&storage) T(std::forward<Args>(args)...); } void destroy() noexcept { static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible"); reinterpret_cast<T *>(&storage)->~T(); } T &&move() noexcept { return reinterpret_cast<T &&>(storage); } // Align to avoid false sharing between adjacent slots alignas(hardwareInterferenceSize) std::atomic<size_t> turn = {0}; typename std::aligned_storage<sizeof(T), alignof(T)>::type storage; }; template <typename T, typename Allocator = AlignedAllocator<Slot<T>>> class Queue { private: static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable"); static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible"); public: explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { if (capacity_ < 1) { throw std::invalid_argument("capacity < 1"); } // Allocate one extra slot to prevent false sharing on the last slot slots_ = allocator_.allocate(capacity_ + 1); // Allocators are not required to honor alignment for over-aligned types // (see http://eel.is/c++draft/allocator.requirements#10) so we verify // alignment here if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) { allocator_.deallocate(slots_, capacity_ + 1); throw std::bad_alloc(); } for (size_t i = 0; i < capacity_; ++i) { new (&slots_[i]) Slot<T>(); } static_assert( alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent " "false sharing between adjacent slots"); static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to " "prevent false sharing between adjacent queues"); static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing"); } ~Queue() noexcept { for (size_t i = 0; i < capacity_; ++i) { slots_[i].~Slot(); } allocator_.deallocate(slots_, capacity_ + 1); } // non-copyable and non-movable Queue(const Queue &) = delete; Queue &operator=(const Queue &) = delete; template <typename... Args> void emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); auto &slot = slots_[idx(head)]; while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) ; slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); } template <typename... Args> bool try_emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); auto head = head_.load(std::memory_order_acquire); for (;;) { auto &slot = slots_[idx(head)]; if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { if (head_.compare_exchange_strong(head, head + 1)) { slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); return true; } } else { auto const prevHead = head; head = head_.load(std::memory_order_acquire); if (head == prevHead) { return false; } } } } void push(const T &v) noexcept { static_assert(std::is_nothrow_copy_constructible<T>::value, "T must be nothrow copy constructible"); emplace(v); } template <typename P, typename = typename std::enable_if< std::is_nothrow_constructible<T, P &&>::value>::type> void push(P &&v) noexcept { emplace(std::forward<P>(v)); } bool try_push(const T &v) noexcept { static_assert(std::is_nothrow_copy_constructible<T>::value, "T must be nothrow copy constructible"); return try_emplace(v); } template <typename P, typename = typename std::enable_if< std::is_nothrow_constructible<T, P &&>::value>::type> bool try_push(P &&v) noexcept { return try_emplace(std::forward<P>(v)); } void pop(T &v) noexcept { auto const tail = tail_.fetch_add(1); auto &slot = slots_[idx(tail)]; while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) ; v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); } bool try_pop(T &v) noexcept { auto tail = tail_.load(std::memory_order_acquire); for (;;) { auto &slot = slots_[idx(tail)]; if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) { if (tail_.compare_exchange_strong(tail, tail + 1)) { v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); return true; } } else { auto const prevTail = tail; tail = tail_.load(std::memory_order_acquire); if (tail == prevTail) { return false; } } } } private: constexpr size_t idx(size_t i) const noexcept { return i % capacity_; } constexpr size_t turn(size_t i) const noexcept { return i / capacity_; } private: const size_t capacity_; Slot<T> *slots_; #if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address) Allocator allocator_ [[no_unique_address]]; #else Allocator allocator_; #endif // Align to avoid false sharing between head_ and tail_ alignas(hardwareInterferenceSize) std::atomic<size_t> head_; alignas(hardwareInterferenceSize) std::atomic<size_t> tail_; }; } // namespace mpmc template <typename T, typename Allocator = mpmc::AlignedAllocator<mpmc::Slot<T>>> using MPMCQueue = mpmc::Queue<T, Allocator>; } // namespace rigtorp
    Processed: 0.034, SQL: 12