join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
queue.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_QUEUE_HPP
26#define JOIN_CORE_QUEUE_HPP
27
28// libjoin.
29#include <join/backoff.hpp>
30#include <join/memory.hpp>
31#include <join/utils.hpp>
32
33// C++.
34#include <type_traits>
35#include <stdexcept>
36#include <atomic>
37
38namespace join
39{
43 struct QueueSync
44 {
46 static constexpr uint64_t MAGIC = 0x9F7E3B2A8D5C4E1B;
47
49 alignas (64) std::atomic_uint64_t _magic;
50
52 alignas (64) std::atomic_uint64_t _head;
53
55 alignas (64) std::atomic_uint64_t _tail;
56
58 alignas (64) uint64_t _capacity;
59
61 alignas (64) uint64_t _mask;
62 };
63
67 template <typename Type>
68 struct QueueSlot
69 {
71 alignas (64) std::atomic_uint64_t _seq;
72
74 Type data;
75
77 char _padding[(64 - ((sizeof (std::atomic_uint64_t) + sizeof (Type)) % 64)) % 64];
78 };
79
83 template <typename Type>
85 {
87 alignas (64) QueueSync _sync;
88
91 };
92
96 template <typename Type, typename Backend, typename SyncPolicy>
98 {
99 static_assert (std::is_trivially_copyable<Type>::value, "type must be trivially copyable");
100 static_assert (std::is_trivially_destructible<Type>::value, "type must be trivially destructible");
101
102 public:
103 using ValueType = Type;
104
110 template <typename... Args>
111 BasicQueue (uint64_t capacity, Args&&... args)
112 : _capacity (roundPow2 (capacity))
113 , _elementSize (sizeof (QueueSlot<Type>))
114 , _totalSize (sizeof (QueueSync) + (_capacity * _elementSize))
115 , _backend (_totalSize, std::forward<Args> (args)...)
116 , _segment (static_cast<QueueSegment<Type>*> (_backend.get ()))
117 {
118 uint64_t expected = 0;
119
120 if (_segment->_sync._magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF,
121 std::memory_order_acq_rel))
122 {
123 _segment->_sync._head.store (0, std::memory_order_relaxed);
124 _segment->_sync._tail.store (0, std::memory_order_relaxed);
125 _segment->_sync._capacity = _capacity;
126 _segment->_sync._mask = _capacity - 1;
127
128 for (uint64_t i = 0; i < _capacity; ++i)
129 {
130 _segment->_elements[i]._seq.store (i, std::memory_order_relaxed);
131 }
132
133 _segment->_sync._magic.store (QueueSync::MAGIC, std::memory_order_release);
134 }
135 else
136 {
137 Backoff backoff;
138 while (_segment->_sync._magic.load (std::memory_order_acquire) != QueueSync::MAGIC)
139 {
140 backoff ();
141 }
142 }
143
144 if (_segment->_sync._capacity != _capacity)
145 {
146 throw std::runtime_error ("capacity mismatch");
147 }
148 }
149
154 BasicQueue (const BasicQueue& other) = delete;
155
161 BasicQueue& operator= (const BasicQueue& other) = delete;
162
167 BasicQueue (BasicQueue&& other) = delete;
168
174 BasicQueue& operator= (BasicQueue&& other) = delete;
175
179 ~BasicQueue () noexcept = default;
180
186 int tryPush (const Type& element) noexcept
187 {
188 return _policy.tryPush (_segment, element);
189 }
190
196 int push (const Type& element) noexcept
197 {
198 Backoff backoff;
199
200 while (tryPush (element) == -1)
201 {
202 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
203 {
204 return -1;
205 }
206
207 backoff ();
208 }
209
210 return 0;
211 }
212
218 int tryPop (Type& element) noexcept
219 {
220 return _policy.tryPop (_segment, element);
221 }
222
228 int pop (Type& element) noexcept
229 {
230 Backoff backoff;
231
232 while (tryPop (element) == -1)
233 {
234 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
235 {
236 return -1;
237 }
238
239 backoff ();
240 }
241
242 return 0;
243 }
244
249 uint64_t pending () const noexcept
250 {
251 if (JOIN_UNLIKELY (_segment == nullptr))
252 {
253 return 0;
254 }
255 auto head = _segment->_sync._head.load (std::memory_order_acquire);
256 auto tail = _segment->_sync._tail.load (std::memory_order_relaxed);
257 return head - tail;
258 }
259
264 uint64_t available () const noexcept
265 {
266 if (JOIN_UNLIKELY (_segment == nullptr))
267 {
268 return 0;
269 }
270 return _segment->_sync._capacity - pending ();
271 }
272
277 bool full () const noexcept
278 {
279 if (JOIN_UNLIKELY (_segment == nullptr))
280 {
281 return false;
282 }
283 return pending () == _segment->_sync._capacity;
284 }
285
290 bool empty () const noexcept
291 {
292 if (JOIN_UNLIKELY (_segment == nullptr))
293 {
294 return true;
295 }
296 return pending () == 0;
297 }
298
304 int mbind (int numa) const noexcept
305 {
306 return _backend.mbind (numa);
307 }
308
313 int mlock () const noexcept
314 {
315 return _backend.mlock ();
316 }
317
318 protected:
324 static uint64_t roundPow2 (uint64_t v) noexcept
325 {
326 if (v == 0)
327 {
328 return 1;
329 }
330 v--;
331 v |= v >> 1;
332 v |= v >> 2;
333 v |= v >> 4;
334 v |= v >> 8;
335 v |= v >> 16;
336 v |= v >> 32;
337 return v + 1;
338 }
339
341 uint64_t _capacity = 0;
342
344 uint64_t _elementSize = 0;
345
347 uint64_t _totalSize = 0;
348
350 Backend _backend;
351
353 SyncPolicy _policy;
354
356 QueueSegment<Type>* _segment = nullptr;
357 };
358
362 template <typename Type, typename Backend>
363 struct Spsc
364 {
366
373 static int tryPush (QueueSegment<Type>* segment, const Type& element) noexcept
374 {
375 if (JOIN_UNLIKELY (segment == nullptr))
376 {
377 lastError = make_error_code (Errc::InvalidParam);
378 return -1;
379 }
380
381 auto& sync = segment->_sync;
382 uint64_t head = sync._head.load (std::memory_order_relaxed);
383 uint64_t tail = sync._tail.load (std::memory_order_acquire);
384
385 if (JOIN_UNLIKELY ((head - tail) == sync._capacity))
386 {
387 lastError = make_error_code (Errc::TemporaryError);
388 return -1;
389 }
390
391 segment->_elements[head & sync._mask].data = element;
392 sync._head.store (head + 1, std::memory_order_release);
393
394 return 0;
395 }
396
403 static int tryPop (QueueSegment<Type>* segment, Type& element) noexcept
404 {
405 if (JOIN_UNLIKELY (segment == nullptr))
406 {
407 lastError = make_error_code (Errc::InvalidParam);
408 return -1;
409 }
410
411 auto& sync = segment->_sync;
412 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
413 uint64_t head = sync._head.load (std::memory_order_acquire);
414
415 if (JOIN_UNLIKELY (head == tail))
416 {
417 lastError = make_error_code (Errc::TemporaryError);
418 return -1;
419 }
420
421 element = segment->_elements[tail & sync._mask].data;
422 sync._tail.store (tail + 1, std::memory_order_release);
423
424 return 0;
425 }
426 };
427
431 template <typename Type, typename Backend>
432 struct Mpsc
433 {
435
442 static int tryPush (QueueSegment<Type>* segment, const Type& element) noexcept
443 {
444 if (JOIN_UNLIKELY (segment == nullptr))
445 {
446 lastError = make_error_code (Errc::InvalidParam);
447 return -1;
448 }
449
450 auto& sync = segment->_sync;
451 uint64_t head = sync._head.load (std::memory_order_relaxed);
452 Backoff backoff;
453
454 for (;;)
455 {
456 auto* slot = &segment->_elements[head & sync._mask];
457 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
458
459 if (seq == head)
460 {
461 if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire,
462 std::memory_order_relaxed)))
463 {
464 slot->data = element;
465 slot->_seq.store (head + 1, std::memory_order_release);
466 return 0;
467 }
468 }
469 else if (JOIN_UNLIKELY (seq < head))
470 {
471 lastError = make_error_code (Errc::TemporaryError);
472 return -1;
473 }
474 else
475 {
476 backoff ();
477 head = sync._head.load (std::memory_order_relaxed);
478 }
479 }
480 }
481
488 static int tryPop (QueueSegment<Type>* segment, Type& element) noexcept
489 {
490 if (JOIN_UNLIKELY (segment == nullptr))
491 {
492 lastError = make_error_code (Errc::InvalidParam);
493 return -1;
494 }
495
496 auto& sync = segment->_sync;
497 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
498 auto* slot = &segment->_elements[tail & sync._mask];
499 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
500
501 if (JOIN_UNLIKELY (seq != tail + 1))
502 {
503 lastError = make_error_code (Errc::TemporaryError);
504 return -1;
505 }
506
507 element = slot->data;
508 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
509 sync._tail.store (tail + 1, std::memory_order_release);
510
511 return 0;
512 }
513 };
514
518 template <typename Type, typename Backend>
519 struct Mpmc
520 {
522
529 static int tryPush (QueueSegment<Type>* segment, const Type& element) noexcept
530 {
531 return Mpsc<Type, Backend>::tryPush (segment, element);
532 }
533
540 static int tryPop (QueueSegment<Type>* segment, Type& element) noexcept
541 {
542 if (JOIN_UNLIKELY (segment == nullptr))
543 {
544 lastError = make_error_code (Errc::InvalidParam);
545 return -1;
546 }
547
548 auto& sync = segment->_sync;
549 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
550 Backoff backoff;
551
552 for (;;)
553 {
554 auto* slot = &segment->_elements[tail & sync._mask];
555 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
556
557 if (seq == (tail + 1))
558 {
559 if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire,
560 std::memory_order_relaxed)))
561 {
562 element = slot->data;
563 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
564 return 0;
565 }
566 }
567 else if (JOIN_UNLIKELY (seq < (tail + 1)))
568 {
569 lastError = make_error_code (Errc::TemporaryError);
570 return -1;
571 }
572 else
573 {
574 backoff ();
575 tail = sync._tail.load (std::memory_order_relaxed);
576 }
577 }
578 }
579 };
580
584 template <typename Backend, template <typename, typename> class SyncPolicy>
586 {
588 template <typename Type>
590 };
591}
592
593#endif
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
queue base class.
Definition queue.hpp:98
uint64_t _elementSize
memory segment element size.
Definition queue.hpp:344
uint64_t _totalSize
total memory size.
Definition queue.hpp:347
SyncPolicy _policy
memory segment sync policy.
Definition queue.hpp:353
BasicQueue(uint64_t capacity, Args &&... args)
create instance.
Definition queue.hpp:111
int mbind(int numa) const noexcept
bind memory to a NUMA node.
Definition queue.hpp:304
int push(const Type &element) noexcept
push element into the ring buffer.
Definition queue.hpp:196
int mlock() const noexcept
lock memory in RAM.
Definition queue.hpp:313
int tryPop(Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:218
static uint64_t roundPow2(uint64_t v) noexcept
round up to next power of 2.
Definition queue.hpp:324
~BasicQueue() noexcept=default
destroy queue instance.
BasicQueue(BasicQueue &&other)=delete
move constructor.
int pop(Type &element) noexcept
pop element from the ring buffer.
Definition queue.hpp:228
bool full() const noexcept
check if the ring buffer is full.
Definition queue.hpp:277
QueueSegment< Type > * _segment
shared memory segment.
Definition queue.hpp:356
Backend _backend
memory segment backend.
Definition queue.hpp:350
uint64_t _capacity
memory segment capacity.
Definition queue.hpp:341
bool empty() const noexcept
check if the ring buffer is empty.
Definition queue.hpp:290
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition queue.hpp:249
uint64_t available() const noexcept
get the number of available slots for writing.
Definition queue.hpp:264
Type ValueType
Definition queue.hpp:103
BasicQueue(const BasicQueue &other)=delete
copy constructor.
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
Definition error.hpp:137
multiple producer multiple consumer ring buffer.
Definition queue.hpp:520
static int tryPush(QueueSegment< Type > *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:529
static int tryPop(QueueSegment< Type > *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:540
multiple producer single consumer ring buffer.
Definition queue.hpp:433
static int tryPush(QueueSegment< Type > *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:442
static int tryPop(QueueSegment< Type > *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:488
queue memory segment.
Definition queue.hpp:85
QueueSync _sync
synchronization primitives.
Definition queue.hpp:87
QueueSlot< Type > _elements[]
flexible array of queue slots.
Definition queue.hpp:90
queue slot.
Definition queue.hpp:69
Type data
stored element data.
Definition queue.hpp:74
char _padding[(64 -((sizeof(std::atomic_uint64_t)+sizeof(Type)) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:77
std::atomic_uint64_t _seq
sequence number for synchronization.
Definition queue.hpp:71
queue synchronization primitives.
Definition queue.hpp:44
std::atomic_uint64_t _head
write position.
Definition queue.hpp:52
static constexpr uint64_t MAGIC
magic number for initialization detection.
Definition queue.hpp:46
std::atomic_uint64_t _tail
read position.
Definition queue.hpp:55
uint64_t _capacity
total queue capacity (power of 2).
Definition queue.hpp:58
std::atomic_uint64_t _magic
initialization state atomic.
Definition queue.hpp:49
uint64_t _mask
bit mask for fast modulo.
Definition queue.hpp:61
single producer single consumer ring buffer.
Definition queue.hpp:364
static int tryPop(QueueSegment< Type > *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:403
static int tryPush(QueueSegment< Type > *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:373
queue forward declarations.
Definition queue.hpp:586
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47