join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
queue.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_QUEUE_HPP
26#define JOIN_CORE_QUEUE_HPP
27
28// libjoin.
29#include <join/backoff.hpp>
30#include <join/memory.hpp>
31#include <join/utils.hpp>
32
33// C++.
34#include <type_traits>
35#include <stdexcept>
36#include <algorithm>
37#include <atomic>
38
39// C.
40#include <sys/types.h>
41
42namespace join
43{
47 struct QueueSync
48 {
50 static constexpr uint64_t MAGIC = 0x9F7E3B2A8D5C4E1B;
51
53 alignas (64) std::atomic_uint64_t _magic;
54
56 alignas (64) std::atomic_uint64_t _head;
57
59 alignas (64) std::atomic_uint64_t _tail;
60 };
61
65 template <typename Type>
67 {
69 Type data;
70 };
71
75 template <typename Type>
77 {
79 alignas (64) std::atomic_uint64_t _seq;
80
82 Type data;
83
85 char _padding[(64 - ((sizeof (std::atomic_uint64_t) + sizeof (Type)) % 64)) % 64];
86 };
87
91 template <typename Type, typename Slot>
93 {
95 alignas (64) QueueSync _sync;
96
98 Slot _elements[];
99 };
100
101 // forward declarations.
102 template <typename Type, typename Backend>
103 struct Spsc;
104
109 template <typename SyncPolicy>
110 struct needs_seq : std::true_type
111 {
112 };
113
117 template <typename Type, typename Backend>
118 struct needs_seq<Spsc<Type, Backend>> : std::false_type
119 {
120 };
121
125 template <typename Type, typename Backend, typename SyncPolicy>
127 {
128 static_assert (std::is_trivially_copyable<Type>::value, "type must be trivially copyable");
129 static_assert (std::is_trivially_destructible<Type>::value, "type must be trivially destructible");
130
131 public:
132 using ValueType = Type;
133 using Slot =
134 typename std::conditional<needs_seq<SyncPolicy>::value, QueueSlotFull<Type>, QueueSlotLight<Type>>::type;
136
142 template <typename... Args>
143 explicit BasicQueue (uint64_t capacity, Args&&... args)
144 : _capacity (roundPow2 (capacity))
145 , _mask (_capacity - 1)
146 , _elementSize (sizeof (Slot))
147 , _totalSize (sizeof (QueueSync) + (_capacity * _elementSize))
148 , _backend (_totalSize, std::forward<Args> (args)...)
149 , _segment (static_cast<Segment*> (_backend.get ()))
150 {
151 uint64_t expected = 0;
152
153 if (_segment->_sync._magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF,
154 std::memory_order_acq_rel))
155 {
156 _segment->_sync._head.store (0, std::memory_order_relaxed);
157 _segment->_sync._tail.store (0, std::memory_order_relaxed);
158
159 initSlots<needs_seq<SyncPolicy>::value> ();
160
161 _segment->_sync._magic.store (QueueSync::MAGIC, std::memory_order_release);
162 }
163 else
164 {
165 Backoff backoff;
166 while (_segment->_sync._magic.load (std::memory_order_acquire) != QueueSync::MAGIC)
167 {
168 backoff (); // LCOV_EXCL_LINE
169 }
170 }
171 }
172
177 BasicQueue (const BasicQueue& other) = delete;
178
184 BasicQueue& operator= (const BasicQueue& other) = delete;
185
190 BasicQueue (BasicQueue&& other) = delete;
191
197 BasicQueue& operator= (BasicQueue&& other) = delete;
198
202 ~BasicQueue () noexcept = default;
203
209 int tryPush (const Type& element) noexcept
210 {
211 return SyncPolicy::tryPush (_segment, element, _cachedTail, _capacity, _mask);
212 }
213
220 ssize_t tryPush (const Type* elements, size_t size) noexcept
221 {
222 return SyncPolicy::tryPush (_segment, elements, size, _cachedTail, _capacity, _mask);
223 }
224
230 int push (const Type& element) noexcept
231 {
232 Backoff backoff;
233
234 while (tryPush (element) == -1)
235 {
236 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
237 {
238 return -1; // LCOV_EXCL_LINE
239 }
240
241 backoff ();
242 }
243
244 return 0;
245 }
246
253 int push (const Type* elements, size_t size) noexcept
254 {
255 Backoff backoff;
256 uint64_t pushed = 0;
257
258 while (pushed < size)
259 {
260 ssize_t n = tryPush (elements + pushed, size - pushed);
261 if (n == -1)
262 {
263 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
264 {
265 return -1;
266 }
267
268 backoff (); // LCOV_EXCL_LINE
269 }
270 else
271 {
272 pushed += static_cast<uint64_t> (n);
273 }
274 }
275
276 return 0;
277 }
278
284 int tryPop (Type& element) noexcept
285 {
286 return SyncPolicy::tryPop (_segment, element, _cachedHead, _capacity, _mask);
287 }
288
295 ssize_t tryPop (Type* elements, size_t size) noexcept
296 {
297 return SyncPolicy::tryPop (_segment, elements, size, _cachedHead, _capacity, _mask);
298 }
299
305 int pop (Type& element) noexcept
306 {
307 Backoff backoff;
308
309 while (tryPop (element) == -1)
310 {
311 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
312 {
313 return -1; // LCOV_EXCL_LINE
314 }
315
316 backoff ();
317 }
318
319 return 0;
320 }
321
328 int pop (Type* elements, size_t size) noexcept
329 {
330 Backoff backoff;
331 uint64_t popped = 0;
332
333 while (popped < size)
334 {
335 ssize_t n = tryPop (elements + popped, size - popped);
336 if (n == -1)
337 {
338 if (JOIN_UNLIKELY (lastError != Errc::TemporaryError))
339 {
340 return -1;
341 }
342
343 backoff (); // LCOV_EXCL_LINE
344 }
345 else
346 {
347 popped += static_cast<uint64_t> (n);
348 }
349 }
350
351 return 0;
352 }
353
358 uint64_t pending () const noexcept
359 {
360 if (JOIN_UNLIKELY (_segment == nullptr))
361 {
362 return 0; // LCOV_EXCL_LINE
363 }
364 auto head = _segment->_sync._head.load (std::memory_order_acquire);
365 auto tail = _segment->_sync._tail.load (std::memory_order_relaxed);
366 return head - tail;
367 }
368
373 uint64_t available () const noexcept
374 {
375 if (JOIN_UNLIKELY (_segment == nullptr))
376 {
377 return 0; // LCOV_EXCL_LINE
378 }
379 return _capacity - pending ();
380 }
381
386 bool full () const noexcept
387 {
388 if (JOIN_UNLIKELY (_segment == nullptr))
389 {
390 return false; // LCOV_EXCL_LINE
391 }
392 return pending () == _capacity;
393 }
394
399 bool empty () const noexcept
400 {
401 if (JOIN_UNLIKELY (_segment == nullptr))
402 {
403 return true; // LCOV_EXCL_LINE
404 }
405 return pending () == 0;
406 }
407
408#ifdef JOIN_HAS_NUMA
414 int mbind (int numa) const noexcept
415 {
416 return _backend.mbind (numa);
417 }
418#endif
419
424 int mlock () const noexcept
425 {
426 return _backend.mlock ();
427 }
428
429 private:
435 static constexpr uint64_t roundPow2 (uint64_t v) noexcept
436 {
437 if (v == 0)
438 {
439 return 1;
440 }
441 v--;
442 v |= v >> 1;
443 v |= v >> 2;
444 v |= v >> 4;
445 v |= v >> 8;
446 v |= v >> 16;
447 v |= v >> 32;
448 return v + 1;
449 }
450
454 template <bool NeedsSeq, typename std::enable_if<!NeedsSeq>::type* = nullptr>
455 void initSlots () noexcept
456 {
457 // nothing to initialize.
458 }
459
463 template <bool NeedsSeq, typename std::enable_if<NeedsSeq>::type* = nullptr>
464 void initSlots () noexcept
465 {
466 for (uint64_t i = 0; i < _capacity; ++i)
467 {
468 _segment->_elements[i]._seq.store (i, std::memory_order_relaxed);
469 }
470 }
471
473 const uint64_t _capacity = 0;
474
476 const uint64_t _mask = 0;
477
479 const uint64_t _elementSize = 0;
480
482 const uint64_t _totalSize = 0;
483
485 Backend _backend;
486
488 Segment* _segment = nullptr;
489
491 alignas (64) uint64_t _cachedTail = 0;
492
494 alignas (64) uint64_t _cachedHead = 0;
495 };
496
500 template <typename Type, typename Backend>
501 struct Spsc
502 {
505
515 static int tryPush (Segment* segment, const Type& element, uint64_t& cachedTail, uint64_t capacity,
516 uint64_t mask) noexcept
517 {
518 if (JOIN_UNLIKELY (segment == nullptr))
519 {
520 // LCOV_EXCL_START
521 lastError = make_error_code (Errc::InvalidParam);
522 return -1;
523 // LCOV_EXCL_STOP
524 }
525
526 auto& sync = segment->_sync;
527 uint64_t head = sync._head.load (std::memory_order_relaxed);
528
529 if (JOIN_UNLIKELY ((head - cachedTail) == capacity))
530 {
531 cachedTail = sync._tail.load (std::memory_order_acquire);
532 if ((head - cachedTail) == capacity)
533 {
534 lastError = make_error_code (Errc::TemporaryError);
535 return -1;
536 }
537 }
538
539 segment->_elements[head & mask].data = element;
540 sync._head.store (head + 1, std::memory_order_release);
541
542 return 0;
543 }
544
555 static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& cachedTail,
556 uint64_t capacity, uint64_t mask) noexcept
557 {
558 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
559 {
560 lastError = make_error_code (Errc::InvalidParam);
561 return -1;
562 }
563
564 auto& sync = segment->_sync;
565 uint64_t head = sync._head.load (std::memory_order_relaxed);
566 uint64_t avail = capacity - (head - cachedTail);
567
568 if (JOIN_UNLIKELY (avail == 0))
569 {
570 cachedTail = sync._tail.load (std::memory_order_acquire);
571 avail = capacity - (head - cachedTail);
572 if (avail == 0)
573 {
574 lastError = make_error_code (Errc::TemporaryError);
575 return -1;
576 }
577 }
578
579 uint64_t toWrite = std::min (static_cast<uint64_t> (size), avail);
580
581 for (uint64_t i = 0; i < toWrite; ++i)
582 {
583 segment->_elements[(head + i) & mask].data = elements[i];
584 }
585
586 sync._head.store (head + toWrite, std::memory_order_release);
587
588 return static_cast<ssize_t> (toWrite);
589 }
590
600 static int tryPop (Segment* segment, Type& element, uint64_t& cachedHead, uint64_t /*capacity*/,
601 uint64_t mask) noexcept
602 {
603 if (JOIN_UNLIKELY (segment == nullptr))
604 {
605 // LCOV_EXCL_START
606 lastError = make_error_code (Errc::InvalidParam);
607 return -1;
608 // LCOV_EXCL_STOP
609 }
610
611 auto& sync = segment->_sync;
612 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
613
614 if (cachedHead == tail)
615 {
616 cachedHead = sync._head.load (std::memory_order_acquire);
617 if (cachedHead == tail)
618 {
619 lastError = make_error_code (Errc::TemporaryError);
620 return -1;
621 }
622 }
623
624 element = segment->_elements[tail & mask].data;
625 sync._tail.store (tail + 1, std::memory_order_release);
626
627 return 0;
628 }
629
640 static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& cachedHead,
641 uint64_t /*capacity*/, uint64_t mask) noexcept
642 {
643 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
644 {
645 lastError = make_error_code (Errc::InvalidParam);
646 return -1;
647 }
648
649 auto& sync = segment->_sync;
650 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
651 uint64_t pending = cachedHead - tail;
652
653 if (pending == 0)
654 {
655 cachedHead = sync._head.load (std::memory_order_acquire);
656 pending = cachedHead - tail;
657 if (pending == 0)
658 {
659 lastError = make_error_code (Errc::TemporaryError);
660 return -1;
661 }
662 }
663
664 uint64_t toRead = std::min (static_cast<uint64_t> (size), pending);
665
666 for (uint64_t i = 0; i < toRead; ++i)
667 {
668 elements[i] = segment->_elements[(tail + i) & mask].data;
669 }
670
671 sync._tail.store (tail + toRead, std::memory_order_release);
672
673 return static_cast<ssize_t> (toRead);
674 }
675 };
676
680 template <typename Type, typename Backend>
681 struct Mpsc
682 {
685
695 static int tryPush (Segment* segment, const Type& element, uint64_t& /*cachedTail*/, uint64_t /*capacity*/,
696 uint64_t mask) noexcept
697 {
698 if (JOIN_UNLIKELY (segment == nullptr))
699 {
700 // LCOV_EXCL_START
701 lastError = make_error_code (Errc::InvalidParam);
702 return -1;
703 // LCOV_EXCL_STOP
704 }
705
706 auto& sync = segment->_sync;
707 uint64_t head = sync._head.load (std::memory_order_relaxed);
708 Backoff backoff;
709
710 for (;;)
711 {
712 auto* slot = &segment->_elements[head & mask];
713 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
714
715 if (seq == head)
716 {
717 if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire,
718 std::memory_order_relaxed)))
719 {
720 slot->data = element;
721 slot->_seq.store (head + 1, std::memory_order_release);
722 return 0;
723 }
724 }
725 else if (JOIN_UNLIKELY (seq < head))
726 {
727 lastError = make_error_code (Errc::TemporaryError);
728 return -1;
729 }
730 else
731 {
732 backoff ();
733 head = sync._head.load (std::memory_order_relaxed);
734 }
735 }
736 }
737
748 static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& /*cachedTail*/,
749 uint64_t capacity, uint64_t mask) noexcept
750 {
751 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
752 {
753 lastError = make_error_code (Errc::InvalidParam);
754 return -1;
755 }
756
757 Backoff backoff;
758 auto& sync = segment->_sync;
759 uint64_t head = sync._head.load (std::memory_order_relaxed);
760
761 for (;;)
762 {
763 uint64_t tail = sync._tail.load (std::memory_order_acquire);
764 uint64_t toWrite = std::min (static_cast<uint64_t> (size), capacity - (head - tail));
765
766 if (JOIN_UNLIKELY (toWrite == 0))
767 {
768 lastError = make_error_code (Errc::TemporaryError);
769 return -1;
770 }
771
772 if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire,
773 std::memory_order_relaxed)))
774 {
775 for (uint64_t i = 0; i < toWrite; ++i)
776 {
777 auto* slot = &segment->_elements[(head + i) & mask];
778 slot->data = elements[i];
779 slot->_seq.store (head + i + 1, std::memory_order_release);
780 }
781
782 return static_cast<ssize_t> (toWrite);
783 }
784
785 backoff (); // LCOV_EXCL_LINE
786 }
787 }
788
798 static int tryPop (Segment* segment, Type& element, uint64_t& /*cachedHead*/, uint64_t capacity,
799 uint64_t mask) noexcept
800 {
801 if (JOIN_UNLIKELY (segment == nullptr))
802 {
803 // LCOV_EXCL_START
804 lastError = make_error_code (Errc::InvalidParam);
805 return -1;
806 // LCOV_EXCL_STOP
807 }
808
809 auto& sync = segment->_sync;
810 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
811 auto* slot = &segment->_elements[tail & mask];
812 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
813
814 if (JOIN_UNLIKELY (seq != tail + 1))
815 {
816 lastError = make_error_code (Errc::TemporaryError);
817 return -1;
818 }
819
820 element = slot->data;
821 slot->_seq.store (tail + capacity, std::memory_order_release);
822 sync._tail.store (tail + 1, std::memory_order_release);
823
824 return 0;
825 }
826
837 static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& /*cachedHead*/,
838 uint64_t capacity, uint64_t mask) noexcept
839 {
840 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
841 {
842 lastError = make_error_code (Errc::InvalidParam);
843 return -1;
844 }
845
846 auto& sync = segment->_sync;
847 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
848 uint64_t head = sync._head.load (std::memory_order_acquire);
849 uint64_t toRead = std::min (static_cast<uint64_t> (size), head - tail);
850 uint64_t popped = 0;
851
852 for (uint64_t i = 0; i < toRead; ++i)
853 {
854 auto* slot = &segment->_elements[(tail + i) & mask];
855
856 if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + i + 1))
857 {
858 break;
859 }
860
861 elements[i] = slot->data;
862 slot->_seq.store (tail + i + capacity, std::memory_order_release);
863 ++popped;
864 }
865
866 if (JOIN_LIKELY (popped > 0))
867 {
868 sync._tail.store (tail + popped, std::memory_order_release);
869 return static_cast<ssize_t> (popped);
870 }
871
872 lastError = make_error_code (Errc::TemporaryError);
873 return -1;
874 }
875 };
876
880 template <typename Type, typename Backend>
881 struct Mpmc
882 {
885
895 static int tryPush (Segment* segment, const Type& element, uint64_t& cachedTail, uint64_t capacity,
896 uint64_t mask) noexcept
897 {
898 return Mpsc<Type, Backend>::tryPush (segment, element, cachedTail, capacity, mask);
899 }
900
911 static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& /*cachedTail*/,
912 uint64_t capacity, uint64_t mask) noexcept
913 {
914 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
915 {
916 lastError = make_error_code (Errc::InvalidParam);
917 return -1;
918 }
919
920 Backoff backoff;
921 auto& sync = segment->_sync;
922 uint64_t head = sync._head.load (std::memory_order_relaxed);
923
924 for (;;)
925 {
926 uint64_t tail = sync._tail.load (std::memory_order_acquire);
927 uint64_t toWrite = std::min (static_cast<uint64_t> (size), capacity - (head - tail));
928
929 if (JOIN_UNLIKELY (toWrite == 0))
930 {
931 lastError = make_error_code (Errc::TemporaryError);
932 return -1;
933 }
934
935 if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire,
936 std::memory_order_relaxed)))
937 {
938 for (uint64_t i = 0; i < toWrite; ++i)
939 {
940 auto* slot = &segment->_elements[(head + i) & mask];
941 Backoff slotBackoff;
942 while (slot->_seq.load (std::memory_order_acquire) != head + i)
943 {
944 slotBackoff (); // LCOV_EXCL_LINE
945 }
946 slot->data = elements[i];
947 slot->_seq.store (head + i + 1, std::memory_order_release);
948 }
949
950 return static_cast<ssize_t> (toWrite);
951 }
952
953 backoff (); // LCOV_EXCL_LINE
954 }
955 }
956
966 static int tryPop (Segment* segment, Type& element, uint64_t& /*cachedHead*/, uint64_t capacity,
967 uint64_t mask) noexcept
968 {
969 if (JOIN_UNLIKELY (segment == nullptr))
970 {
971 // LCOV_EXCL_START
972 lastError = make_error_code (Errc::InvalidParam);
973 return -1;
974 // LCOV_EXCL_STOP
975 }
976
977 auto& sync = segment->_sync;
978 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
979 Backoff backoff;
980
981 for (;;)
982 {
983 auto* slot = &segment->_elements[tail & mask];
984 uint64_t seq = slot->_seq.load (std::memory_order_acquire);
985
986 if (seq == (tail + 1))
987 {
988 Type local = slot->data;
989 if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire,
990 std::memory_order_relaxed)))
991 {
992 element = local;
993 slot->_seq.store (tail + capacity, std::memory_order_release);
994 return 0;
995 }
996 }
997 else if (JOIN_UNLIKELY (seq < (tail + 1)))
998 {
999 lastError = make_error_code (Errc::TemporaryError);
1000 return -1;
1001 }
1002 else
1003 {
1004 backoff ();
1005 tail = sync._tail.load (std::memory_order_relaxed);
1006 }
1007 }
1008 }
1009
1020 static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& /*cachedHead*/,
1021 uint64_t capacity, uint64_t mask) noexcept
1022 {
1023 if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0))
1024 {
1025 lastError = make_error_code (Errc::InvalidParam);
1026 return -1;
1027 }
1028
1029 Backoff backoff;
1030 auto& sync = segment->_sync;
1031 uint64_t tail = sync._tail.load (std::memory_order_relaxed);
1032
1033 for (;;)
1034 {
1035 uint64_t head = sync._head.load (std::memory_order_acquire);
1036 uint64_t toRead = std::min (static_cast<uint64_t> (size), head - tail);
1037
1038 if (JOIN_UNLIKELY (toRead == 0))
1039 {
1040 lastError = make_error_code (Errc::TemporaryError);
1041 return -1;
1042 }
1043
1044 uint64_t ready = 0;
1045 for (; ready < toRead; ++ready)
1046 {
1047 auto* slot = &segment->_elements[(tail + ready) & mask];
1048 if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + ready + 1))
1049 {
1050 break;
1051 }
1052 elements[ready] = slot->data;
1053 }
1054
1055 if (JOIN_UNLIKELY (ready == 0))
1056 {
1057 lastError = make_error_code (Errc::TemporaryError);
1058 return -1;
1059 }
1060
1061 if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + ready, std::memory_order_acquire,
1062 std::memory_order_relaxed)))
1063 {
1064 for (uint64_t i = 0; i < ready; ++i)
1065 {
1066 segment->_elements[(tail + i) & mask]._seq.store (tail + i + capacity,
1067 std::memory_order_release);
1068 }
1069
1070 return static_cast<ssize_t> (ready);
1071 }
1072
1073 backoff (); // LCOV_EXCL_LINE
1074 }
1075 }
1076 };
1077
1081 template <typename Backend, template <typename, typename> class SyncPolicy>
1083 {
1085 template <typename Type>
1087 };
1088}
1089
1090#endif
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
queue base class.
Definition queue.hpp:127
int push(const Type *elements, size_t size) noexcept
push multiple elements into the ring buffer.
Definition queue.hpp:253
BasicQueue(uint64_t capacity, Args &&... args)
create instance.
Definition queue.hpp:143
typename std::conditional< needs_seq< SyncPolicy >::value, QueueSlotFull< Type >, QueueSlotLight< Type > >::type Slot
Definition queue.hpp:133
int push(const Type &element) noexcept
push element into the ring buffer.
Definition queue.hpp:230
int mlock() const noexcept
lock memory in RAM.
Definition queue.hpp:424
ssize_t tryPop(Type *elements, size_t size) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:295
int tryPop(Type &element) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:284
~BasicQueue() noexcept=default
destroy queue instance.
int pop(Type *elements, size_t size) noexcept
pop multiple elements from the ring buffer.
Definition queue.hpp:328
BasicQueue(BasicQueue &&other)=delete
move constructor.
int pop(Type &element) noexcept
pop element from the ring buffer.
Definition queue.hpp:305
bool full() const noexcept
check if the ring buffer is full.
Definition queue.hpp:386
ssize_t tryPush(const Type *elements, size_t size) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:220
bool empty() const noexcept
check if the ring buffer is empty.
Definition queue.hpp:399
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition queue.hpp:358
uint64_t available() const noexcept
get the number of available slots for writing.
Definition queue.hpp:373
Type ValueType
Definition queue.hpp:132
BasicQueue(const BasicQueue &other)=delete
copy constructor.
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
Definition error.hpp:137
multiple producer multiple consumer ring buffer.
Definition queue.hpp:882
static int tryPush(Segment *segment, const Type &element, uint64_t &cachedTail, uint64_t capacity, uint64_t mask) noexcept
try to push element into the ring buffer.
Definition queue.hpp:895
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:911
static int tryPop(Segment *segment, Type &element, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:966
static ssize_t tryPop(Segment *segment, Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:1020
multiple producer single consumer ring buffer.
Definition queue.hpp:682
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:748
static int tryPush(Segment *segment, const Type &element, uint64_t &, uint64_t, uint64_t mask) noexcept
try to push element into the ring buffer.
Definition queue.hpp:695
static int tryPop(Segment *segment, Type &element, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:798
static ssize_t tryPop(Segment *segment, Type *elements, size_t size, uint64_t &, uint64_t capacity, uint64_t mask) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:837
queue memory segment.
Definition queue.hpp:93
Slot _elements[]
flexible array of queue slots.
Definition queue.hpp:98
QueueSync _sync
synchronization primitives.
Definition queue.hpp:95
full queue slot used by MPSC/MPMC.
Definition queue.hpp:77
Type data
stored element data.
Definition queue.hpp:82
std::atomic_uint64_t _seq
sequence number for synchronization.
Definition queue.hpp:79
char _padding[(64 -((sizeof(std::atomic_uint64_t)+sizeof(Type)) % 64)) % 64]
padding to prevent false sharing.
Definition queue.hpp:85
lightweight queue slot used by SPSC.
Definition queue.hpp:67
Type data
stored element data.
Definition queue.hpp:69
queue synchronization primitives.
Definition queue.hpp:48
std::atomic_uint64_t _head
write position.
Definition queue.hpp:56
static constexpr uint64_t MAGIC
magic number for initialization detection.
Definition queue.hpp:50
std::atomic_uint64_t _tail
read position.
Definition queue.hpp:59
std::atomic_uint64_t _magic
initialization state atomic.
Definition queue.hpp:53
single producer single consumer ring buffer.
Definition queue.hpp:502
static ssize_t tryPop(Segment *segment, Type *elements, size_t size, uint64_t &cachedHead, uint64_t, uint64_t mask) noexcept
try to pop multiple elements from the ring buffer.
Definition queue.hpp:640
static int tryPush(Segment *segment, const Type &element, uint64_t &cachedTail, uint64_t capacity, uint64_t mask) noexcept
try to push element into the ring buffer.
Definition queue.hpp:515
static int tryPop(Segment *segment, Type &element, uint64_t &cachedHead, uint64_t, uint64_t mask) noexcept
try to pop element from the ring buffer.
Definition queue.hpp:600
static ssize_t tryPush(Segment *segment, const Type *elements, size_t size, uint64_t &cachedTail, uint64_t capacity, uint64_t mask) noexcept
try to push multiple elements into the ring buffer.
Definition queue.hpp:555
queue forward declarations.
Definition queue.hpp:1083
primary trait: all sync policies need sequence numbers by default.
Definition queue.hpp:111
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47