25#ifndef JOIN_CORE_QUEUE_HPP
26#define JOIN_CORE_QUEUE_HPP
50 static constexpr uint64_t
MAGIC = 0x9F7E3B2A8D5C4E1B;
53 alignas (64) std::atomic_uint64_t
_magic;
56 alignas (64) std::atomic_uint64_t
_head;
59 alignas (64) std::atomic_uint64_t
_tail;
71 template <
typename Type>
78 char _padding[(64 - (
sizeof (Type) % 64)) % 64];
84 template <
typename Type>
88 alignas (64) std::atomic_uint64_t
_seq;
94 char _padding[(64 - ((
sizeof (std::atomic_uint64_t) +
sizeof (Type)) % 64)) % 64];
100 template <
typename Type,
typename Slot>
111 template <
typename Type,
typename Backend>
118 template <
typename SyncPolicy>
126 template <
typename Type,
typename Backend>
134 template <
typename Type,
typename Backend,
typename SyncPolicy>
137 static_assert (std::is_trivially_copyable<Type>::value,
"type must be trivially copyable");
138 static_assert (std::is_trivially_destructible<Type>::value,
"type must be trivially destructible");
151 template <
typename... Args>
153 : _capacity (roundPow2 (capacity))
154 , _elementSize (sizeof (
Slot))
155 , _totalSize (sizeof (
QueueSync) + (_capacity * _elementSize))
156 , _backend (_totalSize,
std::forward<Args> (args)...)
157 , _segment (static_cast<
Segment*> (_backend.get ()))
159 uint64_t expected = 0;
161 if (_segment->
_sync.
_magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF,
162 std::memory_order_acq_rel))
164 _segment->_sync._head.store (0, std::memory_order_relaxed);
165 _segment->_sync._tail.store (0, std::memory_order_relaxed);
166 _segment->_sync._capacity = _capacity;
167 _segment->_sync._mask = _capacity - 1;
169 initSlots<needs_seq<SyncPolicy>::value> ();
171 _segment->_sync._magic.store (QueueSync::MAGIC, std::memory_order_release);
176 while (_segment->_sync._magic.load (std::memory_order_acquire) != QueueSync::MAGIC)
182 if (_segment->_sync._capacity != _capacity)
184 throw std::runtime_error (
"capacity mismatch");
224 int tryPush (const Type& element) noexcept
226 return SyncPolicy::tryPush (_segment, element);
235 ssize_t
tryPush (
const Type* elements,
size_t size)
noexcept
237 return SyncPolicy::tryPush (_segment, elements, size);
245 int push (
const Type& element)
noexcept
249 while (tryPush (element) == -1)
268 int push (
const Type* elements,
size_t size)
noexcept
273 while (pushed < size)
275 ssize_t n = tryPush (elements + pushed, size - pushed);
287 pushed +=
static_cast<uint64_t
> (n);
301 return SyncPolicy::tryPop (_segment, element);
310 ssize_t
tryPop (Type* elements,
size_t size)
noexcept
312 return SyncPolicy::tryPop (_segment, elements, size);
320 int pop (Type& element)
noexcept
324 while (tryPop (element) == -1)
343 int pop (Type* elements,
size_t size)
noexcept
348 while (popped < size)
350 ssize_t n = tryPop (elements + popped, size - popped);
362 popped +=
static_cast<uint64_t
> (n);
379 auto head = _segment->_sync._head.load (std::memory_order_acquire);
380 auto tail = _segment->_sync._tail.load (std::memory_order_relaxed);
394 return _segment->_sync._capacity - pending ();
407 return pending () == _segment->_sync._capacity;
420 return pending () == 0;
429 int mbind (
int numa)
const noexcept
431 return _backend.mbind (numa);
441 return _backend.mlock ();
450 static constexpr uint64_t roundPow2 (uint64_t v)
noexcept
469 template <bool NeedsSeq, typename std::enable_if<!NeedsSeq>::type* =
nullptr>
470 void initSlots () noexcept
478 template <bool NeedsSeq, typename std::enable_if<NeedsSeq>::type* =
nullptr>
479 void initSlots () noexcept
481 for (uint64_t i = 0; i < _capacity; ++i)
483 _segment->_elements[i]._seq.store (i, std::memory_order_relaxed);
488 const uint64_t _capacity = 0;
491 const uint64_t _elementSize = 0;
494 const uint64_t _totalSize = 0;
500 Segment* _segment =
nullptr;
506 template <
typename Type,
typename Backend>
528 auto& sync = segment->_sync;
529 uint64_t head = sync._head.load (std::memory_order_relaxed);
530 uint64_t tail = sync._tail.load (std::memory_order_acquire);
538 segment->_elements[head & sync._mask].data = element;
539 sync._head.store (head + 1, std::memory_order_release);
551 static ssize_t
tryPush (
Segment* segment,
const Type* elements,
size_t size)
noexcept
553 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
559 auto& sync = segment->_sync;
560 uint64_t head = sync._head.load (std::memory_order_relaxed);
561 uint64_t tail = sync._tail.load (std::memory_order_acquire);
562 uint64_t toWrite = std::min (
static_cast<uint64_t
> (size), sync._capacity - (head - tail));
570 for (uint64_t i = 0; i < toWrite; ++i)
572 segment->_elements[(head + i) & sync._mask].data = elements[i];
575 sync._head.store (head + toWrite, std::memory_order_release);
577 return static_cast<ssize_t
> (toWrite);
596 auto& sync = segment->_sync;
597 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
598 uint64_t head = sync._head.load (std::memory_order_acquire);
606 element = segment->_elements[tail & sync._mask].data;
607 sync._tail.store (tail + 1, std::memory_order_release);
619 static ssize_t
tryPop (
Segment* segment, Type* elements,
size_t size)
noexcept
621 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
627 auto& sync = segment->_sync;
628 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
629 uint64_t head = sync._head.load (std::memory_order_acquire);
630 uint64_t toRead = std::min (
static_cast<uint64_t
> (size), head - tail);
638 for (uint64_t i = 0; i < toRead; ++i)
640 elements[i] = segment->_elements[(tail + i) & sync._mask].data;
643 sync._tail.store (tail + toRead, std::memory_order_release);
645 return static_cast<ssize_t
> (toRead);
652 template <
typename Type,
typename Backend>
674 auto& sync = segment->_sync;
675 uint64_t head = sync._head.load (std::memory_order_relaxed);
680 auto* slot = &segment->_elements[head & sync._mask];
681 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
685 if (
JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire,
686 std::memory_order_relaxed)))
688 slot->data = element;
689 slot->_seq.store (head + 1, std::memory_order_release);
701 head = sync._head.load (std::memory_order_relaxed);
713 static ssize_t
tryPush (
Segment* segment,
const Type* elements,
size_t size)
noexcept
715 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
722 auto& sync = segment->_sync;
723 uint64_t head = sync._head.load (std::memory_order_relaxed);
727 uint64_t tail = sync._tail.load (std::memory_order_acquire);
728 uint64_t toWrite = std::min (
static_cast<uint64_t
> (size), sync._capacity - (head - tail));
736 if (
JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire,
737 std::memory_order_relaxed)))
739 for (uint64_t i = 0; i < toWrite; ++i)
741 auto* slot = &segment->_elements[(head + i) & sync._mask];
742 slot->data = elements[i];
743 slot->_seq.store (head + i + 1, std::memory_order_release);
746 return static_cast<ssize_t
> (toWrite);
769 auto& sync = segment->_sync;
770 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
771 auto* slot = &segment->_elements[tail & sync._mask];
772 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
780 element = slot->data;
781 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
782 sync._tail.store (tail + 1, std::memory_order_release);
794 static ssize_t
tryPop (
Segment* segment, Type* elements,
size_t size)
noexcept
796 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
802 auto& sync = segment->_sync;
803 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
804 uint64_t head = sync._head.load (std::memory_order_acquire);
805 uint64_t toRead = std::min (
static_cast<uint64_t
> (size), head - tail);
808 for (uint64_t i = 0; i < toRead; ++i)
810 auto* slot = &segment->_elements[(tail + i) & sync._mask];
812 if (
JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + i + 1))
817 elements[i] = slot->data;
818 slot->_seq.store (tail + i + sync._capacity, std::memory_order_release);
824 sync._tail.store (tail + popped, std::memory_order_release);
825 return static_cast<ssize_t
> (popped);
836 template <
typename Type,
typename Backend>
860 static ssize_t
tryPush (
Segment* segment,
const Type* elements,
size_t size)
noexcept
881 auto& sync = segment->_sync;
882 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
887 auto* slot = &segment->_elements[tail & sync._mask];
888 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
890 if (seq == (tail + 1))
892 if (
JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire,
893 std::memory_order_relaxed)))
895 element = slot->data;
896 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
908 tail = sync._tail.load (std::memory_order_relaxed);
920 static ssize_t
tryPop (
Segment* segment, Type* elements,
size_t size)
noexcept
922 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
929 auto& sync = segment->_sync;
930 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
934 uint64_t head = sync._head.load (std::memory_order_acquire);
935 uint64_t toRead = std::min (
static_cast<uint64_t
> (size), head - tail);
944 for (; ready < toRead; ++ready)
946 if (
JOIN_UNLIKELY (segment->_elements[(tail + ready) & sync._mask]._seq.load (
947 std::memory_order_acquire) != tail + ready + 1))
959 if (
JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + ready, std::memory_order_acquire,
960 std::memory_order_relaxed)))
962 for (uint64_t i = 0; i < ready; ++i)
964 auto* slot = &segment->_elements[(tail + i) & sync._mask];
965 elements[i] = slot->data;
966 slot->_seq.store (tail + i + sync._capacity, std::memory_order_release);
969 return static_cast<ssize_t
> (ready);
980 template <
typename Backend,
template <
typename,
typename>
class SyncPolicy>
984 template <
typename Type>
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
queue base class.
Definition queue.hpp:136
int push(const Type *elements, size_t size) noexcept
push multiple elements into the ring buffer.
Definition queue.hpp:268
BasicQueue(uint64_t capacity, Args &&... args)
create instance.
Definition queue.hpp:152
typename std::conditional< needs_seq< SyncPolicy >::value, QueueSlotFull< Type >, QueueSlotLight< Type > >::type Slot
Definition queue.hpp:142
int push(const Type &element) noexcept
push element into the ring buffer.
Definition queue.hpp:245
int mlock() const noexcept
lock memory in RAM.
Definition queue.hpp:439
ssize_t tryPop(Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:310
int tryPop(Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:299
~BasicQueue() noexcept=default
destroy queue instance.
int pop(Type *elements, size_t size) noexcept
pop multiple elements from the ring buffer.
Definition queue.hpp:343
BasicQueue(BasicQueue &&other)=delete
move constructor.
int pop(Type &element) noexcept
pop element from the ring buffer.
Definition queue.hpp:320
bool full() const noexcept
check if the ring buffer is full.
Definition queue.hpp:401
ssize_t tryPush(const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:235
bool empty() const noexcept
check if the ring buffer is empty.
Definition queue.hpp:414
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition queue.hpp:373
uint64_t available() const noexcept
get the number of available slots for writing.
Definition queue.hpp:388
Type ValueType
Definition queue.hpp:141
BasicQueue(const BasicQueue &other)=delete
copy constructor.
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
multiple producer multiple consumer ring buffer.
Definition queue.hpp:838
static ssize_t tryPop(Segment *segment, Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:920
static int tryPop(Segment *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:871
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:860
static int tryPush(Segment *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:848
multiple producer single consumer ring buffer.
Definition queue.hpp:654
static ssize_t tryPop(Segment *segment, Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:794
static int tryPop(Segment *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:759
static int tryPush(Segment *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:664
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:713
queue memory segment.
Definition queue.hpp:102
Slot _elements[]
flexible array of queue slots.
Definition queue.hpp:107
QueueSync _sync
synchronization primitives.
Definition queue.hpp:104
full queue slot used by MPSC/MPMC.
Definition queue.hpp:86
Type data
stored element data.
Definition queue.hpp:91
std::atomic_uint64_t _seq
sequence number for synchronization.
Definition queue.hpp:88
char _padding[(64 -((sizeof(std::atomic_uint64_t)+sizeof(Type)) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:94
lightweight queue slot used by SPSC.
Definition queue.hpp:73
Type data
stored element data.
Definition queue.hpp:75
char _padding[(64 -(sizeof(Type) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:78
queue synchronization primitives.
Definition queue.hpp:48
std::atomic_uint64_t _head
write position.
Definition queue.hpp:56
static constexpr uint64_t MAGIC
magic number for initialization detection.
Definition queue.hpp:50
std::atomic_uint64_t _tail
read position.
Definition queue.hpp:59
uint64_t _capacity
total queue capacity (power of 2).
Definition queue.hpp:62
std::atomic_uint64_t _magic
initialization state atomic.
Definition queue.hpp:53
uint64_t _mask
bit mask for fast modulo.
Definition queue.hpp:65
single producer single consumer ring buffer.
Definition queue.hpp:508
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:551
static ssize_t tryPop(Segment *segment, Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:619
static int tryPush(Segment *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:518
static int tryPop(Segment *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:586
queue forward declarations.
Definition queue.hpp:982
primary trait: all sync policies need sequence numbers by default.
Definition queue.hpp:120
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47