25#ifndef JOIN_CORE_QUEUE_HPP
26#define JOIN_CORE_QUEUE_HPP
46 static constexpr uint64_t
MAGIC = 0x9F7E3B2A8D5C4E1B;
49 alignas (64) std::atomic_uint64_t
_magic;
52 alignas (64) std::atomic_uint64_t
_head;
55 alignas (64) std::atomic_uint64_t
_tail;
67 template <
typename Type>
71 alignas (64) std::atomic_uint64_t
_seq;
77 char _padding[(64 - ((
sizeof (std::atomic_uint64_t) +
sizeof (Type)) % 64)) % 64];
83 template <
typename Type>
96 template <
typename Type,
typename Backend,
typename SyncPolicy>
99 static_assert (std::is_trivially_copyable<Type>::value,
"type must be trivially copyable");
100 static_assert (std::is_trivially_destructible<Type>::value,
"type must be trivially destructible");
110 template <
typename... Args>
118 uint64_t expected = 0;
120 if (
_segment->_sync._magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF,
121 std::memory_order_acq_rel))
123 _segment->_sync._head.store (0, std::memory_order_relaxed);
124 _segment->_sync._tail.store (0, std::memory_order_relaxed);
125 _segment->_sync._capacity = _capacity;
126 _segment->_sync._mask = _capacity - 1;
128 for (uint64_t i = 0; i < _capacity; ++i)
130 _segment->_elements[i]._seq.store (i, std::memory_order_relaxed);
146 throw std::runtime_error (
"capacity mismatch");
186 int tryPush (const Type& element) noexcept
188 return _policy.tryPush (_segment, element);
196 int push (
const Type& element)
noexcept
200 while (tryPush (element) == -1)
220 return _policy.tryPop (_segment, element);
228 int pop (Type& element)
noexcept
232 while (tryPop (element) == -1)
255 auto head = _segment->_sync._head.load (std::memory_order_acquire);
256 auto tail = _segment->_sync._tail.load (std::memory_order_relaxed);
270 return _segment->_sync._capacity - pending ();
283 return pending () == _segment->_sync._capacity;
296 return pending () == 0;
306 return _backend.mbind (numa);
315 return _backend.mlock ();
341 uint64_t _capacity = 0;
344 uint64_t _elementSize = 0;
347 uint64_t _totalSize = 0;
362 template <
typename Type,
typename Backend>
381 auto& sync = segment->_sync;
382 uint64_t head = sync._head.load (std::memory_order_relaxed);
383 uint64_t tail = sync._tail.load (std::memory_order_acquire);
391 segment->_elements[head & sync._mask].data = element;
392 sync._head.store (head + 1, std::memory_order_release);
411 auto& sync = segment->_sync;
412 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
413 uint64_t head = sync._head.load (std::memory_order_acquire);
421 element = segment->_elements[tail & sync._mask].data;
422 sync._tail.store (tail + 1, std::memory_order_release);
431 template <
typename Type,
typename Backend>
450 auto& sync = segment->_sync;
451 uint64_t head = sync._head.load (std::memory_order_relaxed);
456 auto* slot = &segment->_elements[head & sync._mask];
457 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
461 if (
JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire,
462 std::memory_order_relaxed)))
464 slot->data = element;
465 slot->_seq.store (head + 1, std::memory_order_release);
477 head = sync._head.load (std::memory_order_relaxed);
496 auto& sync = segment->_sync;
497 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
498 auto* slot = &segment->_elements[tail & sync._mask];
499 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
507 element = slot->data;
508 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
509 sync._tail.store (tail + 1, std::memory_order_release);
518 template <
typename Type,
typename Backend>
548 auto& sync = segment->_sync;
549 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
554 auto* slot = &segment->_elements[tail & sync._mask];
555 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
557 if (seq == (tail + 1))
559 if (
JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire,
560 std::memory_order_relaxed)))
562 element = slot->data;
563 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
575 tail = sync._tail.load (std::memory_order_relaxed);
584 template <
typename Backend,
template <
typename,
typename>
class SyncPolicy>
588 template <
typename Type>
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
queue base class.
Definition queue.hpp:98
uint64_t _elementSize
memory segment element size.
Definition queue.hpp:344
uint64_t _totalSize
total memory size.
Definition queue.hpp:347
SyncPolicy _policy
memory segment sync policy.
Definition queue.hpp:353
BasicQueue(uint64_t capacity, Args &&... args)
create instance.
Definition queue.hpp:111
int mbind(int numa) const noexcept
bind memory to a NUMA node.
Definition queue.hpp:304
int push(const Type &element) noexcept
push element into the ring buffer.
Definition queue.hpp:196
int mlock() const noexcept
lock memory in RAM.
Definition queue.hpp:313
int tryPop(Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:218
static uint64_t roundPow2(uint64_t v) noexcept
round up to next power of 2.
Definition queue.hpp:324
~BasicQueue() noexcept=default
destroy queue instance.
BasicQueue(BasicQueue &&other)=delete
move constructor.
int pop(Type &element) noexcept
pop element from the ring buffer.
Definition queue.hpp:228
bool full() const noexcept
check if the ring buffer is full.
Definition queue.hpp:277
QueueSegment< Type > * _segment
shared memory segment.
Definition queue.hpp:356
Backend _backend
memory segment backend.
Definition queue.hpp:350
uint64_t _capacity
memory segment capacity.
Definition queue.hpp:341
bool empty() const noexcept
check if the ring buffer is empty.
Definition queue.hpp:290
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition queue.hpp:249
uint64_t available() const noexcept
get the number of available slots for writing.
Definition queue.hpp:264
Type ValueType
Definition queue.hpp:103
BasicQueue(const BasicQueue &other)=delete
copy constructor.
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
multiple producer multiple consumer ring buffer.
Definition queue.hpp:520
static int tryPush(QueueSegment< Type > *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:529
static int tryPop(QueueSegment< Type > *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:540
multiple producer single consumer ring buffer.
Definition queue.hpp:433
static int tryPush(QueueSegment< Type > *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:442
static int tryPop(QueueSegment< Type > *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:488
queue memory segment.
Definition queue.hpp:85
QueueSync _sync
synchronization primitives.
Definition queue.hpp:87
QueueSlot< Type > _elements[]
flexible array of queue slots.
Definition queue.hpp:90
queue slot.
Definition queue.hpp:69
Type data
stored element data.
Definition queue.hpp:74
char _padding[(64 -((sizeof(std::atomic_uint64_t)+sizeof(Type)) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:77
std::atomic_uint64_t _seq
sequence number for synchronization.
Definition queue.hpp:71
queue synchronization primitives.
Definition queue.hpp:44
std::atomic_uint64_t _head
write position.
Definition queue.hpp:52
static constexpr uint64_t MAGIC
magic number for initialization detection.
Definition queue.hpp:46
std::atomic_uint64_t _tail
read position.
Definition queue.hpp:55
uint64_t _capacity
total queue capacity (power of 2).
Definition queue.hpp:58
std::atomic_uint64_t _magic
initialization state atomic.
Definition queue.hpp:49
uint64_t _mask
bit mask for fast modulo.
Definition queue.hpp:61
single producer single consumer ring buffer.
Definition queue.hpp:364
static int tryPop(QueueSegment< Type > *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:403
static int tryPush(QueueSegment< Type > *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:373
queue forward declarations.
Definition queue.hpp:586
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47