join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
queue.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_QUEUE_HPP
26#define JOIN_CORE_QUEUE_HPP
27
28// libjoin.
29#include <join/backoff.hpp>
30#include <join/memory.hpp>
31#include <join/utils.hpp>
32
33// C++.
34#include <type_traits>
35#include <stdexcept>
36#include <algorithm>
37#include <atomic>
38
39// C.
40#include <sys/types.h>
41
42namespace join
43{
47 struct QueueSync
48 {
50 static constexpr uint64_t MAGIC = 0x9F7E3B2A8D5C4E1B;
51
53 alignas (64) std::atomic_uint64_t _magic;
54
56 alignas (64) std::atomic_uint64_t _head;
57
59 alignas (64) std::atomic_uint64_t _tail;
60
62 alignas (64) uint64_t _capacity;
63
65 alignas (64) uint64_t _mask;
66 };
67
71 template <typename Type>
73 {
75 Type data;
76
78 char _padding[(64 - (sizeof (Type) % 64)) % 64];
79 };
80
84 template <typename Type>
86 {
88 alignas (64) std::atomic_uint64_t _seq;
89
91 Type data;
92
94 char _padding[(64 - ((sizeof (std::atomic_uint64_t) + sizeof (Type)) % 64)) % 64];
95 };
96
100 template <typename Type, typename Slot>
102 {
104 alignas (64) QueueSync _sync;
105
107 Slot _elements[];
108 };
109
110 // forward declarations.
111 template <typename Type, typename Backend>
112 struct Spsc;
113
118 template <typename SyncPolicy>
119 struct needs_seq : std::true_type
120 {
121 };
122
126 template <typename Type, typename Backend>
127 struct needs_seq<Spsc<Type, Backend>> : std::false_type
128 {
129 };
130
134 template <typename Type, typename Backend, typename SyncPolicy>
136 {
137 static_assert (std::is_trivially_copyable<Type>::value, "type must be trivially copyable");
138 static_assert (std::is_trivially_destructible<Type>::value, "type must be trivially destructible");
139
140 public:
141 using ValueType = Type;
142 using Slot =
143 typename std::conditional<needs_seq<SyncPolicy>::value, QueueSlotFull<Type>, QueueSlotLight<Type>>::type;
145
151 template <typename... Args>
152 explicit BasicQueue (uint64_t capacity, Args&&... args)
153 : _capacity (roundPow2 (capacity))
154 , _elementSize (sizeof (Slot))
155 , _totalSize (sizeof (QueueSync) + (_capacity * _elementSize))
156 , _backend (_totalSize, std::forward<Args> (args)...)
157 , _segment (static_cast<Segment*> (_backend.get ()))
158 {
159 uint64_t expected = 0;
160
161 if (_segment->_sync._magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF,
162 std::memory_order_acq_rel))
163 {
164 _segment->_sync._head.store (0, std::memory_order_relaxed);
165 _segment->_sync._tail.store (0, std::memory_order_relaxed);
166 _segment->_sync._capacity = _capacity;
167 _segment->_sync._mask = _capacity - 1;
168
169 initSlots<needs_seq<SyncPolicy>::value> ();
170
171 _segment->_sync._magic.store (QueueSync::MAGIC, std::memory_order_release);
172 }
173 else
174 {
175 Backoff backoff;
176 while (_segment->_sync._magic.load (std::memory_order_acquire) != QueueSync::MAGIC)
177 {
178 backoff (); // LCOV_EXCL_LINE
179 }
180 }
181
182 if (_segment->_sync._capacity != _capacity)
183 {
184 throw std::runtime_error ("capacity mismatch");
185 }
186 }
187
192 BasicQueue (const BasicQueue& other) = delete;
193
199 BasicQueue& operator= (const BasicQueue& other) = delete;
200
205 BasicQueue (BasicQueue&& other) = delete;
206
212 BasicQueue& operator= (BasicQueue&& other) = delete;
213
217 ~BasicQueue () noexcept = default;
218
224 int tryPush (const Type& element) noexcept
225 {
226 return SyncPolicy::tryPush (_segment, element);
227 }
228
235 ssize_t tryPush (const Type* elements, size_t size) noexcept
236 {
237 return SyncPolicy::tryPush (_segment, elements, size);
238 }
239
245 int push (const Type& element) noexcept
246 {
247 Backoff backoff;
248
249 while (tryPush (element) == -1)
250 {
251 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
252 {
253 return -1;
254 }
255
256 backoff ();
257 }
258
259 return 0;
260 }
261
268 int push (const Type* elements, size_t size) noexcept
269 {
270 Backoff backoff;
271 uint64_t pushed = 0;
272
273 while (pushed < size)
274 {
275 ssize_t n = tryPush (elements + pushed, size - pushed);
276 if (n == -1)
277 {
278 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
279 {
280 return -1;
281 }
282
283 backoff ();
284 }
285 else
286 {
287 pushed += static_cast<uint64_t> (n);
288 }
289 }
290
291 return 0;
292 }
293
299 int tryPop (Type& element) noexcept
300 {
301 return SyncPolicy::tryPop (_segment, element);
302 }
303
310 ssize_t tryPop (Type* elements, size_t size) noexcept
311 {
312 return SyncPolicy::tryPop (_segment, elements, size);
313 }
314
320 int pop (Type& element) noexcept
321 {
322 Backoff backoff;
323
324 while (tryPop (element) == -1)
325 {
326 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
327 {
328 return -1;
329 }
330
331 backoff ();
332 }
333
334 return 0;
335 }
336
343 int pop (Type* elements, size_t size) noexcept
344 {
345 Backoff backoff;
346 uint64_t popped = 0;
347
348 while (popped < size)
349 {
350 ssize_t n = tryPop (elements + popped, size - popped);
351 if (n == -1)
352 {
353 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
354 {
355 return -1;
356 }
357
358 backoff ();
359 }
360 else
361 {
362 popped += static_cast<uint64_t> (n);
363 }
364 }
365
366 return 0;
367 }
368
373 uint64_t pending () const noexcept
374 {
375 if (JOIN_UNLIKELY (_segment == nullptr))
376 {
377 return 0; // LCOV_EXCL_LINE
378 }
379 auto head = _segment->_sync._head.load (std::memory_order_acquire);
380 auto tail = _segment->_sync._tail.load (std::memory_order_relaxed);
381 return head - tail;
382 }
383
388 uint64_t available () const noexcept
389 {
390 if (JOIN_UNLIKELY (_segment == nullptr))
391 {
392 return 0; // LCOV_EXCL_LINE
393 }
394 return _segment->_sync._capacity - pending ();
395 }
396
401 bool full () const noexcept
402 {
403 if (JOIN_UNLIKELY (_segment == nullptr))
404 {
405 return false; // LCOV_EXCL_LINE
406 }
407 return pending () == _segment->_sync._capacity;
408 }
409
414 bool empty () const noexcept
415 {
416 if (JOIN_UNLIKELY (_segment == nullptr))
417 {
418 return true; // LCOV_EXCL_LINE
419 }
420 return pending () == 0;
421 }
422
423#ifdef JOIN_HAS_NUMA
429 int mbind (int numa) const noexcept
430 {
431 return _backend.mbind (numa);
432 }
433#endif
434
439 int mlock () const noexcept
440 {
441 return _backend.mlock ();
442 }
443
444 private:
450 static constexpr uint64_t roundPow2 (uint64_t v) noexcept
451 {
452 if (v == 0)
453 {
454 return 1;
455 }
456 v--;
457 v |= v >> 1;
458 v |= v >> 2;
459 v |= v >> 4;
460 v |= v >> 8;
461 v |= v >> 16;
462 v |= v >> 32;
463 return v + 1;
464 }
465
469 template <bool NeedsSeq, typename std::enable_if<!NeedsSeq>::type* = nullptr>
470 void initSlots () noexcept
471 {
472 // nothing to initialize.
473 }
474
478 template <bool NeedsSeq, typename std::enable_if<NeedsSeq>::type* = nullptr>
479 void initSlots () noexcept
480 {
481 for (uint64_t i = 0; i < _capacity; ++i)
482 {
483 _segment->_elements[i]._seq.store (i, std::memory_order_relaxed);
484 }
485 }
486
488 const uint64_t _capacity = 0;
489
491 const uint64_t _elementSize = 0;
492
494 const uint64_t _totalSize = 0;
495
497 Backend _backend;
498
500 Segment* _segment = nullptr;
501 };
502
506 template <typename Type, typename Backend>
507 struct Spsc
508 {
511
518 static int tryPush (Segment* segment, const Type& element) noexcept
519 {
520 if (JOIN_UNLIKELY (segment == nullptr))
521 {
522 // LCOV_EXCL_START
523 lastError = make_error_code (Errc::InvalidParam);
524 return -1;
525 // LCOV_EXCL_STOP
526 }
527
528 auto& sync = segment->_sync;
529 uint64_t head = sync._head.load (std::memory_order_relaxed);
530 uint64_t tail = sync._tail.load (std::memory_order_acquire);
531
532 if (JOIN_UNLIKELY ((head - tail) == sync._capacity))
533 {
534 lastError = make_error_code (Errc::TemporaryError);
535 return -1;
536 }
537
538 segment->_elements[head & sync._mask].data = element;
539 sync._head.store (head + 1, std::memory_order_release);
540
541 return 0;
542 }
543
551 static ssize_t tryPush (Segment* segment, const Type* elements, size_t size) noexcept
552 {
553 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
554 {
555 lastError = make_error_code (Errc::InvalidParam);
556 return -1;
557 }
558
559 auto& sync = segment->_sync;
560 uint64_t head = sync._head.load (std::memory_order_relaxed);
561 uint64_t tail = sync._tail.load (std::memory_order_acquire);
562 uint64_t toWrite = std::min (static_cast<uint64_t> (size), sync._capacity - (head - tail));
563
564 if (JOIN_UNLIKELY (toWrite == 0))
565 {
566 lastError = make_error_code (Errc::TemporaryError);
567 return -1;
568 }
569
570 for (uint64_t i = 0; i < toWrite; ++i)
571 {
572 segment->_elements[(head + i) & sync._mask].data = elements[i];
573 }
574
575 sync._head.store (head + toWrite, std::memory_order_release);
576
577 return static_cast<ssize_t> (toWrite);
578 }
579
586 static int tryPop (Segment* segment, Type& element) noexcept
587 {
588 if (JOIN_UNLIKELY (segment == nullptr))
589 {
590 // LCOV_EXCL_START
591 lastError = make_error_code (Errc::InvalidParam);
592 return -1;
593 // LCOV_EXCL_STOP
594 }
595
596 auto& sync = segment->_sync;
597 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
598 uint64_t head = sync._head.load (std::memory_order_acquire);
599
600 if (JOIN_UNLIKELY (head == tail))
601 {
602 lastError = make_error_code (Errc::TemporaryError);
603 return -1;
604 }
605
606 element = segment->_elements[tail & sync._mask].data;
607 sync._tail.store (tail + 1, std::memory_order_release);
608
609 return 0;
610 }
611
619 static ssize_t tryPop (Segment* segment, Type* elements, size_t size) noexcept
620 {
621 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
622 {
623 lastError = make_error_code (Errc::InvalidParam);
624 return -1;
625 }
626
627 auto& sync = segment->_sync;
628 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
629 uint64_t head = sync._head.load (std::memory_order_acquire);
630 uint64_t toRead = std::min (static_cast<uint64_t> (size), head - tail);
631
632 if (JOIN_UNLIKELY (toRead == 0))
633 {
634 lastError = make_error_code (Errc::TemporaryError);
635 return -1;
636 }
637
638 for (uint64_t i = 0; i < toRead; ++i)
639 {
640 elements[i] = segment->_elements[(tail + i) & sync._mask].data;
641 }
642
643 sync._tail.store (tail + toRead, std::memory_order_release);
644
645 return static_cast<ssize_t> (toRead);
646 }
647 };
648
652 template <typename Type, typename Backend>
653 struct Mpsc
654 {
657
664 static int tryPush (Segment* segment, const Type& element) noexcept
665 {
666 if (JOIN_UNLIKELY (segment == nullptr))
667 {
668 // LCOV_EXCL_START
669 lastError = make_error_code (Errc::InvalidParam);
670 return -1;
671 // LCOV_EXCL_STOP
672 }
673
674 auto& sync = segment->_sync;
675 uint64_t head = sync._head.load (std::memory_order_relaxed);
676 Backoff backoff;
677
678 for (;;)
679 {
680 auto* slot = &segment->_elements[head & sync._mask];
681 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
682
683 if (seq == head)
684 {
685 if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire,
686 std::memory_order_relaxed)))
687 {
688 slot->data = element;
689 slot->_seq.store (head + 1, std::memory_order_release);
690 return 0;
691 }
692 }
693 else if (JOIN_UNLIKELY (seq < head))
694 {
695 lastError = make_error_code (Errc::TemporaryError);
696 return -1;
697 }
698 else
699 {
700 backoff ();
701 head = sync._head.load (std::memory_order_relaxed);
702 }
703 }
704 }
705
713 static ssize_t tryPush (Segment* segment, const Type* elements, size_t size) noexcept
714 {
715 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
716 {
717 lastError = make_error_code (Errc::InvalidParam);
718 return -1;
719 }
720
721 Backoff backoff;
722 auto& sync = segment->_sync;
723 uint64_t head = sync._head.load (std::memory_order_relaxed);
724
725 for (;;)
726 {
727 uint64_t tail = sync._tail.load (std::memory_order_acquire);
728 uint64_t toWrite = std::min (static_cast<uint64_t> (size), sync._capacity - (head - tail));
729
730 if (JOIN_UNLIKELY (toWrite == 0))
731 {
732 lastError = make_error_code (Errc::TemporaryError);
733 return -1;
734 }
735
736 if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire,
737 std::memory_order_relaxed)))
738 {
739 for (uint64_t i = 0; i < toWrite; ++i)
740 {
741 auto* slot = &segment->_elements[(head + i) & sync._mask];
742 slot->data = elements[i];
743 slot->_seq.store (head + i + 1, std::memory_order_release);
744 }
745
746 return static_cast<ssize_t> (toWrite);
747 }
748
749 backoff ();
750 }
751 }
752
759 static int tryPop (Segment* segment, Type& element) noexcept
760 {
761 if (JOIN_UNLIKELY (segment == nullptr))
762 {
763 // LCOV_EXCL_START
764 lastError = make_error_code (Errc::InvalidParam);
765 return -1;
766 // LCOV_EXCL_STOP
767 }
768
769 auto& sync = segment->_sync;
770 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
771 auto* slot = &segment->_elements[tail & sync._mask];
772 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
773
774 if (JOIN_UNLIKELY (seq != tail + 1))
775 {
776 lastError = make_error_code (Errc::TemporaryError);
777 return -1;
778 }
779
780 element = slot->data;
781 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
782 sync._tail.store (tail + 1, std::memory_order_release);
783
784 return 0;
785 }
786
794 static ssize_t tryPop (Segment* segment, Type* elements, size_t size) noexcept
795 {
796 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
797 {
798 lastError = make_error_code (Errc::InvalidParam);
799 return -1;
800 }
801
802 auto& sync = segment->_sync;
803 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
804 uint64_t head = sync._head.load (std::memory_order_acquire);
805 uint64_t toRead = std::min (static_cast<uint64_t> (size), head - tail);
806 uint64_t popped = 0;
807
808 for (uint64_t i = 0; i < toRead; ++i)
809 {
810 auto* slot = &segment->_elements[(tail + i) & sync._mask];
811
812 if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + i + 1))
813 {
814 break;
815 }
816
817 elements[i] = slot->data;
818 slot->_seq.store (tail + i + sync._capacity, std::memory_order_release);
819 ++popped;
820 }
821
822 if (JOIN_LIKELY (popped > 0))
823 {
824 sync._tail.store (tail + popped, std::memory_order_release);
825 return static_cast<ssize_t> (popped);
826 }
827
828 lastError = make_error_code (Errc::TemporaryError);
829 return -1;
830 }
831 };
832
836 template <typename Type, typename Backend>
837 struct Mpmc
838 {
841
848 static int tryPush (Segment* segment, const Type& element) noexcept
849 {
850 return Mpsc<Type, Backend>::tryPush (segment, element);
851 }
852
860 static ssize_t tryPush (Segment* segment, const Type* elements, size_t size) noexcept
861 {
862 return Mpsc<Type, Backend>::tryPush (segment, elements, size);
863 }
864
871 static int tryPop (Segment* segment, Type& element) noexcept
872 {
873 if (JOIN_UNLIKELY (segment == nullptr))
874 {
875 // LCOV_EXCL_START
876 lastError = make_error_code (Errc::InvalidParam);
877 return -1;
878 // LCOV_EXCL_STOP
879 }
880
881 auto& sync = segment->_sync;
882 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
883 Backoff backoff;
884
885 for (;;)
886 {
887 auto* slot = &segment->_elements[tail & sync._mask];
888 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
889
890 if (seq == (tail + 1))
891 {
892 if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire,
893 std::memory_order_relaxed)))
894 {
895 element = slot->data;
896 slot->_seq.store (tail + sync._capacity, std::memory_order_release);
897 return 0;
898 }
899 }
900 else if (JOIN_UNLIKELY (seq < (tail + 1)))
901 {
902 lastError = make_error_code (Errc::TemporaryError);
903 return -1;
904 }
905 else
906 {
907 backoff ();
908 tail = sync._tail.load (std::memory_order_relaxed);
909 }
910 }
911 }
912
920 static ssize_t tryPop (Segment* segment, Type* elements, size_t size) noexcept
921 {
922 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
923 {
924 lastError = make_error_code (Errc::InvalidParam);
925 return -1;
926 }
927
928 Backoff backoff;
929 auto& sync = segment->_sync;
930 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
931
932 for (;;)
933 {
934 uint64_t head = sync._head.load (std::memory_order_acquire);
935 uint64_t toRead = std::min (static_cast<uint64_t> (size), head - tail);
936
937 if (JOIN_UNLIKELY (toRead == 0))
938 {
939 lastError = make_error_code (Errc::TemporaryError);
940 return -1;
941 }
942
943 uint64_t ready = 0;
944 for (; ready < toRead; ++ready)
945 {
946 if (JOIN_UNLIKELY (segment->_elements[(tail + ready) & sync._mask]._seq.load (
947 std::memory_order_acquire) != tail + ready + 1))
948 {
949 break;
950 }
951 }
952
953 if (JOIN_UNLIKELY (ready == 0))
954 {
955 lastError = make_error_code (Errc::TemporaryError);
956 return -1;
957 }
958
959 if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + ready, std::memory_order_acquire,
960 std::memory_order_relaxed)))
961 {
962 for (uint64_t i = 0; i < ready; ++i)
963 {
964 auto* slot = &segment->_elements[(tail + i) & sync._mask];
965 elements[i] = slot->data;
966 slot->_seq.store (tail + i + sync._capacity, std::memory_order_release);
967 }
968
969 return static_cast<ssize_t> (ready);
970 }
971
972 backoff ();
973 }
974 }
975 };
976
980 template <typename Backend, template <typename, typename> class SyncPolicy>
982 {
984 template <typename Type>
986 };
987}
988
989#endif
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
queue base class.
Definition queue.hpp:136
int push(const Type *elements, size_t size) noexcept
push multiple elements into the ring buffer.
Definition queue.hpp:268
BasicQueue(uint64_t capacity, Args &&... args)
create instance.
Definition queue.hpp:152
typename std::conditional< needs_seq< SyncPolicy >::value, QueueSlotFull< Type >, QueueSlotLight< Type > >::type Slot
Definition queue.hpp:142
int push(const Type &element) noexcept
push element into the ring buffer.
Definition queue.hpp:245
int mlock() const noexcept
lock memory in RAM.
Definition queue.hpp:439
ssize_t tryPop(Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:310
int tryPop(Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:299
~BasicQueue() noexcept=default
destroy queue instance.
int pop(Type *elements, size_t size) noexcept
pop multiple elements from the ring buffer.
Definition queue.hpp:343
BasicQueue(BasicQueue &&other)=delete
move constructor.
int pop(Type &element) noexcept
pop element from the ring buffer.
Definition queue.hpp:320
bool full() const noexcept
check if the ring buffer is full.
Definition queue.hpp:401
ssize_t tryPush(const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:235
bool empty() const noexcept
check if the ring buffer is empty.
Definition queue.hpp:414
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition queue.hpp:373
uint64_t available() const noexcept
get the number of available slots for writing.
Definition queue.hpp:388
Type ValueType
Definition queue.hpp:141
BasicQueue(const BasicQueue &other)=delete
copy constructor.
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
Definition error.hpp:137
multiple producer multiple consumer ring buffer.
Definition queue.hpp:838
static ssize_t tryPop(Segment *segment, Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:920
static int tryPop(Segment *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:871
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:860
static int tryPush(Segment *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:848
multiple producer single consumer ring buffer.
Definition queue.hpp:654
static ssize_t tryPop(Segment *segment, Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:794
static int tryPop(Segment *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:759
static int tryPush(Segment *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:664
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:713
queue memory segment.
Definition queue.hpp:102
Slot _elements[]
flexible array of queue slots.
Definition queue.hpp:107
QueueSync _sync
synchronization primitives.
Definition queue.hpp:104
full queue slot used by MPSC/MPMC.
Definition queue.hpp:86
Type data
stored element data.
Definition queue.hpp:91
std::atomic_uint64_t _seq
sequence number for synchronization.
Definition queue.hpp:88
char _padding[(64 -((sizeof(std::atomic_uint64_t)+sizeof(Type)) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:94
lightweight queue slot used by SPSC.
Definition queue.hpp:73
Type data
stored element data.
Definition queue.hpp:75
char _padding[(64 -(sizeof(Type) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:78
queue synchronization primitives.
Definition queue.hpp:48
std::atomic_uint64_t _head
write position.
Definition queue.hpp:56
static constexpr uint64_t MAGIC
magic number for initialization detection.
Definition queue.hpp:50
std::atomic_uint64_t _tail
read position.
Definition queue.hpp:59
uint64_t _capacity
total queue capacity (power of 2).
Definition queue.hpp:62
std::atomic_uint64_t _magic
initialization state atomic.
Definition queue.hpp:53
uint64_t _mask
bit mask for fast modulo.
Definition queue.hpp:65
single producer single consumer ring buffer.
Definition queue.hpp:508
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:551
static ssize_t tryPop(Segment *segment, Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:619
static int tryPush(Segment *segment, const Type &element) noexcept
try to push element into the ring buffer.
Definition queue.hpp:518
static int tryPop(Segment *segment, Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:586
queue forward declarations.
Definition queue.hpp:982
primary trait: all sync policies need sequence numbers by default.
Definition queue.hpp:120
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47