25#ifndef JOIN_CORE_QUEUE_HPP
26#define JOIN_CORE_QUEUE_HPP
50 static constexpr uint64_t
MAGIC = 0x9F7E3B2A8D5C4E1B;
53 alignas (64) std::atomic_uint64_t
_magic;
56 alignas (64) std::atomic_uint64_t
_head;
59 alignas (64) std::atomic_uint64_t
_tail;
65 template <
typename Type>
75 template <
typename Type>
79 alignas (64) std::atomic_uint64_t
_seq;
85 char _padding[(64 - ((
sizeof (std::atomic_uint64_t) +
sizeof (Type)) % 64)) % 64];
91 template <
typename Type,
typename Slot>
102 template <
typename Type,
typename Backend>
109 template <
typename SyncPolicy>
117 template <
typename Type,
typename Backend>
125 template <
typename Type,
typename Backend,
typename SyncPolicy>
128 static_assert (std::is_trivially_copyable<Type>::value,
"type must be trivially copyable");
129 static_assert (std::is_trivially_destructible<Type>::value,
"type must be trivially destructible");
142 template <
typename... Args>
144 : _capacity (roundPow2 (capacity))
145 , _mask (_capacity - 1)
146 , _elementSize (sizeof (
Slot))
147 , _totalSize (sizeof (
QueueSync) + (_capacity * _elementSize))
148 , _backend (_totalSize,
std::forward<Args> (args)...)
149 , _segment (static_cast<
Segment*> (_backend.get ()))
151 uint64_t expected = 0;
153 if (_segment->
_sync.
_magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF,
154 std::memory_order_acq_rel))
156 _segment->_sync._head.store (0, std::memory_order_relaxed);
157 _segment->_sync._tail.store (0, std::memory_order_relaxed);
159 initSlots<needs_seq<SyncPolicy>::value> ();
161 _segment->_sync._magic.store (QueueSync::MAGIC, std::memory_order_release);
166 while (_segment->_sync._magic.load (std::memory_order_acquire) != QueueSync::MAGIC)
209 int tryPush (const Type& element) noexcept
211 return SyncPolicy::tryPush (_segment, element, _cachedTail, _capacity, _mask);
220 ssize_t
tryPush (
const Type* elements,
size_t size)
noexcept
222 return SyncPolicy::tryPush (_segment, elements, size, _cachedTail, _capacity, _mask);
230 int push (
const Type& element)
noexcept
234 while (tryPush (element) == -1)
253 int push (
const Type* elements,
size_t size)
noexcept
258 while (pushed < size)
260 ssize_t n = tryPush (elements + pushed, size - pushed);
272 pushed +=
static_cast<uint64_t
> (n);
286 return SyncPolicy::tryPop (_segment, element, _cachedHead, _capacity, _mask);
295 ssize_t
tryPop (Type* elements,
size_t size)
noexcept
297 return SyncPolicy::tryPop (_segment, elements, size, _cachedHead, _capacity, _mask);
305 int pop (Type& element)
noexcept
309 while (tryPop (element) == -1)
328 int pop (Type* elements,
size_t size)
noexcept
333 while (popped < size)
335 ssize_t n = tryPop (elements + popped, size - popped);
347 popped +=
static_cast<uint64_t
> (n);
364 auto head = _segment->_sync._head.load (std::memory_order_acquire);
365 auto tail = _segment->_sync._tail.load (std::memory_order_relaxed);
379 return _capacity - pending ();
392 return pending () == _capacity;
405 return pending () == 0;
414 int mbind (
int numa)
const noexcept
416 return _backend.mbind (numa);
426 return _backend.mlock ();
435 static constexpr uint64_t roundPow2 (uint64_t v)
noexcept
454 template <bool NeedsSeq, typename std::enable_if<!NeedsSeq>::type* =
nullptr>
455 void initSlots () noexcept
463 template <bool NeedsSeq, typename std::enable_if<NeedsSeq>::type* =
nullptr>
464 void initSlots () noexcept
466 for (uint64_t i = 0; i < _capacity; ++i)
468 _segment->_elements[i]._seq.store (i, std::memory_order_relaxed);
473 const uint64_t _capacity = 0;
476 const uint64_t _mask = 0;
479 const uint64_t _elementSize = 0;
482 const uint64_t _totalSize = 0;
488 Segment* _segment =
nullptr;
491 alignas (64) uint64_t _cachedTail = 0;
494 alignas (64) uint64_t _cachedHead = 0;
500 template <
typename Type,
typename Backend>
515 static int tryPush (
Segment* segment,
const Type& element, uint64_t& cachedTail, uint64_t capacity,
516 uint64_t mask)
noexcept
526 auto& sync = segment->_sync;
527 uint64_t head = sync._head.load (std::memory_order_relaxed);
531 cachedTail = sync._tail.load (std::memory_order_acquire);
532 if ((head - cachedTail) == capacity)
539 segment->_elements[head & mask].data = element;
540 sync._head.store (head + 1, std::memory_order_release);
555 static ssize_t
tryPush (
Segment* segment,
const Type* elements,
size_t size, uint64_t& cachedTail,
556 uint64_t capacity, uint64_t mask)
noexcept
558 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
564 auto& sync = segment->_sync;
565 uint64_t head = sync._head.load (std::memory_order_relaxed);
566 uint64_t avail = capacity - (head - cachedTail);
570 cachedTail = sync._tail.load (std::memory_order_acquire);
571 avail = capacity - (head - cachedTail);
579 uint64_t toWrite = std::min (
static_cast<uint64_t
> (size), avail);
581 for (uint64_t i = 0; i < toWrite; ++i)
583 segment->_elements[(head + i) & mask].data = elements[i];
586 sync._head.store (head + toWrite, std::memory_order_release);
588 return static_cast<ssize_t
> (toWrite);
600 static int tryPop (
Segment* segment, Type& element, uint64_t& cachedHead, uint64_t ,
601 uint64_t mask)
noexcept
611 auto& sync = segment->_sync;
612 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
614 if (cachedHead == tail)
616 cachedHead = sync._head.load (std::memory_order_acquire);
617 if (cachedHead == tail)
624 element = segment->_elements[tail & mask].data;
625 sync._tail.store (tail + 1, std::memory_order_release);
640 static ssize_t
tryPop (
Segment* segment, Type* elements,
size_t size, uint64_t& cachedHead,
641 uint64_t , uint64_t mask)
noexcept
643 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
649 auto& sync = segment->_sync;
650 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
651 uint64_t pending = cachedHead - tail;
655 cachedHead = sync._head.load (std::memory_order_acquire);
656 pending = cachedHead - tail;
664 uint64_t toRead = std::min (
static_cast<uint64_t
> (size), pending);
666 for (uint64_t i = 0; i < toRead; ++i)
668 elements[i] = segment->_elements[(tail + i) & mask].data;
671 sync._tail.store (tail + toRead, std::memory_order_release);
673 return static_cast<ssize_t
> (toRead);
680 template <
typename Type,
typename Backend>
695 static int tryPush (
Segment* segment,
const Type& element, uint64_t& , uint64_t ,
696 uint64_t mask)
noexcept
706 auto& sync = segment->_sync;
707 uint64_t head = sync._head.load (std::memory_order_relaxed);
712 auto* slot = &segment->_elements[head & mask];
713 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
717 if (
JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire,
718 std::memory_order_relaxed)))
720 slot->data = element;
721 slot->_seq.store (head + 1, std::memory_order_release);
733 head = sync._head.load (std::memory_order_relaxed);
748 static ssize_t
tryPush (
Segment* segment,
const Type* elements,
size_t size, uint64_t& ,
749 uint64_t capacity, uint64_t mask)
noexcept
751 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
758 auto& sync = segment->_sync;
759 uint64_t head = sync._head.load (std::memory_order_relaxed);
763 uint64_t tail = sync._tail.load (std::memory_order_acquire);
764 uint64_t toWrite = std::min (
static_cast<uint64_t
> (size), capacity - (head - tail));
772 if (
JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire,
773 std::memory_order_relaxed)))
775 for (uint64_t i = 0; i < toWrite; ++i)
777 auto* slot = &segment->_elements[(head + i) & mask];
778 slot->data = elements[i];
779 slot->_seq.store (head + i + 1, std::memory_order_release);
782 return static_cast<ssize_t
> (toWrite);
798 static int tryPop (
Segment* segment, Type& element, uint64_t& , uint64_t capacity,
799 uint64_t mask)
noexcept
809 auto& sync = segment->_sync;
810 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
811 auto* slot = &segment->_elements[tail & mask];
812 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
820 element = slot->data;
821 slot->_seq.store (tail + capacity, std::memory_order_release);
822 sync._tail.store (tail + 1, std::memory_order_release);
837 static ssize_t
tryPop (
Segment* segment, Type* elements,
size_t size, uint64_t& ,
838 uint64_t capacity, uint64_t mask)
noexcept
840 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
846 auto& sync = segment->_sync;
847 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
848 uint64_t head = sync._head.load (std::memory_order_acquire);
849 uint64_t toRead = std::min (
static_cast<uint64_t
> (size), head - tail);
852 for (uint64_t i = 0; i < toRead; ++i)
854 auto* slot = &segment->_elements[(tail + i) & mask];
856 if (
JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + i + 1))
861 elements[i] = slot->data;
862 slot->_seq.store (tail + i + capacity, std::memory_order_release);
868 sync._tail.store (tail + popped, std::memory_order_release);
869 return static_cast<ssize_t
> (popped);
880 template <
typename Type,
typename Backend>
895 static int tryPush (
Segment* segment,
const Type& element, uint64_t& cachedTail, uint64_t capacity,
896 uint64_t mask)
noexcept
911 static ssize_t
tryPush (
Segment* segment,
const Type* elements,
size_t size, uint64_t& ,
912 uint64_t capacity, uint64_t mask)
noexcept
914 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
921 auto& sync = segment->_sync;
922 uint64_t head = sync._head.load (std::memory_order_relaxed);
926 uint64_t tail = sync._tail.load (std::memory_order_acquire);
927 uint64_t toWrite = std::min (
static_cast<uint64_t
> (size), capacity - (head - tail));
935 if (
JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire,
936 std::memory_order_relaxed)))
938 for (uint64_t i = 0; i < toWrite; ++i)
940 auto* slot = &segment->_elements[(head + i) & mask];
942 while (slot->_seq.load (std::memory_order_acquire) != head + i)
946 slot->data = elements[i];
947 slot->_seq.store (head + i + 1, std::memory_order_release);
950 return static_cast<ssize_t
> (toWrite);
966 static int tryPop (
Segment* segment, Type& element, uint64_t& , uint64_t capacity,
967 uint64_t mask)
noexcept
977 auto& sync = segment->_sync;
978 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
983 auto* slot = &segment->_elements[tail & mask];
984 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
986 if (seq == (tail + 1))
988 Type local = slot->data;
989 if (
JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire,
990 std::memory_order_relaxed)))
993 slot->_seq.store (tail + capacity, std::memory_order_release);
1005 tail = sync._tail.load (std::memory_order_relaxed);
1020 static ssize_t
tryPop (
Segment* segment, Type* elements,
size_t size, uint64_t& ,
1021 uint64_t capacity, uint64_t mask)
noexcept
1023 if (
JOIN_UNLIKELY (segment ==
nullptr || elements ==
nullptr || size == 0))
1030 auto& sync = segment->_sync;
1031 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
1035 uint64_t head = sync._head.load (std::memory_order_acquire);
1036 uint64_t toRead = std::min (
static_cast<uint64_t
> (size), head - tail);
1045 for (; ready < toRead; ++ready)
1047 auto* slot = &segment->_elements[(tail + ready) & mask];
1048 if (
JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + ready + 1))
1052 elements[ready] = slot->data;
1061 if (
JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + ready, std::memory_order_acquire,
1062 std::memory_order_relaxed)))
1064 for (uint64_t i = 0; i < ready; ++i)
1066 segment->_elements[(tail + i) & mask]._seq.store (tail + i + capacity,
1067 std::memory_order_release);
1070 return static_cast<ssize_t
> (ready);
1081 template <
typename Backend,
template <
typename,
typename>
class SyncPolicy>
1085 template <
typename Type>
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
queue base class.
Definition queue.hpp:127
int push(const Type *elements, size_t size) noexcept
push multiple elements into the ring buffer.
Definition queue.hpp:253
BasicQueue(uint64_t capacity, Args &&... args)
create instance.
Definition queue.hpp:143
typename std::conditional< needs_seq< SyncPolicy >::value, QueueSlotFull< Type >, QueueSlotLight< Type > >::type Slot
Definition queue.hpp:133
int push(const Type &element) noexcept
push element into the ring buffer.
Definition queue.hpp:230
int mlock() const noexcept
lock memory in RAM.
Definition queue.hpp:424
ssize_t tryPop(Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:295
int tryPop(Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:284
~BasicQueue() noexcept=default
destroy queue instance.
int pop(Type *elements, size_t size) noexcept
pop multiple elements from the ring buffer.
Definition queue.hpp:328
BasicQueue(BasicQueue &&other)=delete
move constructor.
int pop(Type &element) noexcept
pop element from the ring buffer.
Definition queue.hpp:305
bool full() const noexcept
check if the ring buffer is full.
Definition queue.hpp:386
ssize_t tryPush(const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:220
bool empty() const noexcept
check if the ring buffer is empty.
Definition queue.hpp:399
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition queue.hpp:358
uint64_t available() const noexcept
get the number of available slots for writing.
Definition queue.hpp:373
Type ValueType
Definition queue.hpp:132
BasicQueue(const BasicQueue &other)=delete
copy constructor.
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
multiple producer multiple consumer ring buffer.
Definition queue.hpp:882
static int tryPush(Segment *segment, const Type &element, uint64_t &cachedTail, uint64_t capacity, uint64_t mask) noexcept
try to push element into the ring buffer.
Definition queue.hpp:895
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:911
static int tryPop(Segment *segment, Type &element, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:966
static ssize_t tryPop(Segment *segment, Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:1020
multiple producer single consumer ring buffer.
Definition queue.hpp:682
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:748
static int tryPush(Segment *segment, const Type &element, uint64_t &, uint64_t, uint64_t mask) noexcept
try to push element into the ring buffer.
Definition queue.hpp:695
static int tryPop(Segment *segment, Type &element, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:798
static ssize_t tryPop(Segment *segment, Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:837
queue memory segment.
Definition queue.hpp:93
Slot _elements[]
flexible array of queue slots.
Definition queue.hpp:98
QueueSync _sync
synchronization primitives.
Definition queue.hpp:95
full queue slot used by MPSC/MPMC.
Definition queue.hpp:77
Type data
stored element data.
Definition queue.hpp:82
std::atomic_uint64_t _seq
sequence number for synchronization.
Definition queue.hpp:79
char _padding[(64 -((sizeof(std::atomic_uint64_t)+sizeof(Type)) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:85
lightweight queue slot used by SPSC.
Definition queue.hpp:67
Type data
stored element data.
Definition queue.hpp:69
queue synchronization primitives.
Definition queue.hpp:48
std::atomic_uint64_t _head
write position.
Definition queue.hpp:56
static constexpr uint64_t MAGIC
magic number for initialization detection.
Definition queue.hpp:50
std::atomic_uint64_t _tail
read position.
Definition queue.hpp:59
std::atomic_uint64_t _magic
initialization state atomic.
Definition queue.hpp:53
single producer single consumer ring buffer.
Definition queue.hpp:502
static ssize_t tryPop(Segment *segment, Type *elements, size_t size, uint64_t &cachedHead, uint64_t, uint64_t mask) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:640
static int tryPush(Segment *segment, const Type &element, uint64_t &cachedTail, uint64_t capacity, uint64_t mask) noexcept
try to push element into the ring buffer.
Definition queue.hpp:515
static int tryPop(Segment *segment, Type &element, uint64_t &cachedHead, uint64_t, uint64_t mask) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:600
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size, uint64_t &cachedTail, uint64_t capacity, uint64_t mask) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:555
queue forward declarations.
Definition queue.hpp:1083
primary trait: all sync policies need sequence numbers by default.
Definition queue.hpp:111
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47