join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
shared.hpp
Go to the documentation of this file.
1
25#ifndef __JOIN_SHARED_HPP__
26#define __JOIN_SHARED_HPP__
27
28// libjoin.
29#include <join/error.hpp>
30
31// C++.
32#include <utility>
33#include <thread>
34#include <atomic>
35#include <chrono>
36#include <memory>
37#include <string>
38
39// C.
40#include <sys/mman.h>
41#include <sys/stat.h>
42#include <unistd.h>
43#include <fcntl.h>
44#include <cstring>
45
46namespace join
47{
52 {
53 public:
59 SharedMemory (const std::string& name, uint64_t size)
60 : _name (name)
61 , _size (size)
62 {
63 if (_size > static_cast <uint64_t> (std::numeric_limits <off_t>::max ()))
64 {
65 throw std::overflow_error ("size will overflow");
66 }
67 }
68
72 virtual ~SharedMemory ()
73 {
74 this->close ();
75 }
76
81 virtual int open ()
82 {
83 if (this->opened ())
84 {
85 lastError = make_error_code (Errc::InUse);
86 return -1;
87 }
88
89 bool created = true;
90 this->_fd = ::shm_open (this->_name.c_str (), O_CREAT | O_RDWR | O_EXCL | O_CLOEXEC, 0644);
91 if ((this->_fd == -1) && (errno == EEXIST))
92 {
93 created = false;
94 this->_fd = ::shm_open(this->_name.c_str(), O_RDWR | O_CLOEXEC, 0644);
95 }
96
97 if (this->_fd == -1)
98 {
99 lastError = std::make_error_code (static_cast <std::errc> (errno));
100 this->close ();
101 return -1;
102 }
103
104 if (created && (::ftruncate (this->_fd, this->_size) == -1))
105 {
106 lastError = std::make_error_code (static_cast <std::errc> (errno));
107 this->close ();
108 return -1;
109 }
110
111 this->_ptr = ::mmap (nullptr, this->_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->_fd, 0);
112 if (this->_ptr == MAP_FAILED)
113 {
114 lastError = std::make_error_code (static_cast <std::errc> (errno));
115 this->close ();
116 return -1;
117 }
118
119 return 0;
120 }
121
126 bool opened () const noexcept
127 {
128 return (this->_ptr != nullptr) && (this->_ptr != MAP_FAILED);
129 }
130
134 virtual void close () noexcept
135 {
136 if (this->opened ())
137 {
138 ::munmap (this->_ptr, this->_size);
139 }
140
141 if (this->_fd != -1)
142 {
143 ::close (this->_fd);
144 this->_fd = -1;
145 }
146
147 this->_ptr = nullptr;
148 }
149
156 virtual const void* get (uint64_t offset = 0) const
157 {
158 if (offset >= this->_size)
159 {
160 throw std::out_of_range ("offset out of bounds");
161 }
162
163 if (this->_ptr == nullptr)
164 {
165 return nullptr;
166 }
167
168 return static_cast <const char*> (this->_ptr) + offset;
169 }
170
177 virtual void* get (uint64_t offset = 0)
178 {
179 if (offset >= this->_size)
180 {
181 throw std::out_of_range ("offset out of bounds");
182 }
183
184 if (this->_ptr == nullptr)
185 {
186 return nullptr;
187 }
188
189 return static_cast <char*> (this->_ptr) + offset;
190 }
191
196 virtual uint64_t size () const noexcept
197 {
198 return this->_size;
199 }
200
206 static int unlink (const std::string& name) noexcept
207 {
208 if (::shm_unlink (name.c_str ()) == -1)
209 {
210 if (errno == ENOENT)
211 {
212 return 0;
213 }
214 lastError = std::make_error_code (static_cast <std::errc> (errno));
215 return -1;
216 }
217
218 return 0;
219 }
220
221 protected:
223 std::string _name;
224
226 uint64_t _size = 0;
227
229 void* _ptr = nullptr;
230
232 int _fd = -1;
233 };
234
239 {
240 static constexpr uint64_t MAGIC = 0x9F7E3B2A8D5C4E1B;
241 alignas (64) std::atomic_uint64_t _magic;
242 alignas (64) std::atomic_uint64_t _head;
243 alignas (64) std::atomic_uint64_t _tail;
244 alignas (64) uint64_t _elementSize;
245 alignas (64) uint64_t _capacity;
246 };
247
252 {
253 alignas (64) SharedSync _sync;
254 alignas (max_align_t) uint8_t _data[];
255 };
256
260 template <typename Policy>
262 {
263 public:
270 BasicQueue (const std::string& name, uint64_t elementSize = 1472, uint64_t capacity = 144)
271 : SharedMemory (name, (elementSize * capacity) + sizeof (SharedSync))
274 {
275 if ((_capacity != 0) && (_elementSize > (std::numeric_limits <uint64_t>::max () / _capacity)))
276 {
277 throw std::overflow_error ("size will overflow");
278 }
279
281 _totalSize = _userSize + sizeof (SharedSync);
282 }
283
288 BasicQueue (const BasicQueue& other) = delete;
289
295 BasicQueue& operator= (const BasicQueue& other) = delete;
296
300 virtual ~BasicQueue ()
301 {
302 this->close ();
303 }
304
309 int open () override
310 {
311 if (SharedMemory::open () != 0)
312 {
313 return -1;
314 }
315
316 this->_segment = static_cast <SharedSegment*> (this->_ptr);
317
318 uint64_t expected = 0;
319 if (this->_segment->_sync._magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF, std::memory_order_acquire, std::memory_order_acquire))
320 {
321 new (&this->_segment->_sync._head) std::atomic_uint64_t (0);
322 new (&this->_segment->_sync._tail) std::atomic_uint64_t (0);
324 this->_segment->_sync._capacity = this->_capacity;
325
326 this->_segment->_sync._magic.store (SharedSync::MAGIC, std::memory_order_release);
327 }
328 else
329 {
330 while (this->_segment->_sync._magic.load (std::memory_order_acquire) != SharedSync::MAGIC)
331 {
332 std::this_thread::yield ();
333 }
334 }
335
336 if ((this->_segment->_sync._elementSize != this->_elementSize) || (this->_segment->_sync._capacity != this->_capacity))
337 {
339 this->close ();
340 return -1;
341 }
342
343 return 0;
344 }
345
349 void close () noexcept override
350 {
351 this->_segment = nullptr;
353 }
354
361 const void* get (uint64_t offset = 0) const override
362 {
363 if (offset > this->_userSize)
364 {
365 throw std::out_of_range ("offset out of bounds");
366 }
367
368 return SharedMemory::get (sizeof (SharedSync) + offset);
369 }
370
377 void* get (uint64_t offset = 0) override
378 {
379 if (offset > this->_userSize)
380 {
381 throw std::out_of_range ("offset out of bounds");
382 }
383
384 return SharedMemory::get (sizeof (SharedSync) + offset);
385 }
386
391 uint64_t elementSize () const noexcept
392 {
393 return this->_elementSize;
394 }
395
400 uint64_t capacity () const noexcept
401 {
402 return this->_capacity;
403 }
404
409 uint64_t size () const noexcept override
410 {
411 return this->_userSize;
412 }
413
414 protected:
416 Policy _policy;
417
419 uint64_t _elementSize = 0;
420
422 uint64_t _capacity = 0;
423
425 uint64_t _totalSize = 0;
426
428 uint64_t _userSize = 0;
429
432 };
433
437 template <typename Policy>
438 class BasicProducer : public BasicQueue <Policy>
439 {
440 public:
447 BasicProducer (const std::string& name, uint64_t elementSize = 1472, uint64_t capacity = 144)
448 : BasicQueue <Policy> (name, elementSize, capacity)
449 {
450 }
451
456 BasicProducer (const BasicProducer& other) = delete;
457
463 BasicProducer& operator= (const BasicProducer& other) = delete;
464
468 ~BasicProducer () = default;
469
475 int tryPush (const void* element) noexcept
476 {
477 return this->_policy.tryPush (this->_segment, element);
478 }
479
485 int push (const void* element) noexcept
486 {
487 return this->_policy.push (this->_segment, element);
488 }
489
496 template <class Rep, class Period>
497 int timedPush (const void* element, std::chrono::duration <Rep, Period> timeout) noexcept
498 {
499 return this->_policy.timedPush (this->_segment, element, timeout);
500 }
501
506 uint64_t available () const noexcept
507 {
508 return this->_policy.available (this->_segment);
509 }
510
515 bool full () const noexcept
516 {
517 return this->_policy.full (this->_segment);
518 }
519 };
520
524 template <typename Policy>
525 class BasicConsumer : public BasicQueue <Policy>
526 {
527 public:
534 BasicConsumer (const std::string& name, uint64_t elementSize = 1472, uint64_t capacity = 144)
535 : BasicQueue <Policy> (name, elementSize, capacity)
536 {
537 }
538
543 BasicConsumer (const BasicConsumer& other) = delete;
544
550 BasicConsumer& operator= (const BasicConsumer& other) = delete;
551
555 ~BasicConsumer () = default;
556
562 int tryPop (void* element) noexcept
563 {
564 return this->_policy.tryPop (this->_segment, element);
565 }
566
572 int pop (void* element) noexcept
573 {
574 return this->_policy.pop (this->_segment, element);
575 }
576
583 template <class Rep, class Period>
584 int timedPop (void* element, std::chrono::duration <Rep, Period> timeout) noexcept
585 {
586 return this->_policy.timedPop (this->_segment, element, timeout);
587 }
588
593 uint64_t pending () const noexcept
594 {
595 return this->_policy.pending (this->_segment);
596 }
597
602 bool empty () const noexcept
603 {
604 return this->_policy.empty (this->_segment);
605 }
606 };
607
611 template <typename OutboundPolicy, typename InboundPolicy>
613 {
614 public:
615 using Outbound = typename OutboundPolicy::Producer;
616 using Inbound = typename InboundPolicy::Consumer;
617
621 enum Side
622 {
625 };
626
634 BasicEndpoint (Side side, const std::string& name, uint64_t elementSize = 1472, uint64_t capacity = 144)
635 : _side (side)
636 , _name (name)
637 {
638 if (this->_side == Side::A)
639 {
640 this->_out = std::make_unique <Outbound> (this->_name + "_AB", elementSize, capacity);
641 this->_in = std::make_unique <Inbound> (this->_name + "_BA", elementSize, capacity);
642 }
643 else
644 {
645 this->_out = std::make_unique <Outbound> (this->_name + "_BA", elementSize, capacity);
646 this->_in = std::make_unique <Inbound> (this->_name + "_AB", elementSize, capacity);
647 }
648 }
649
654 BasicEndpoint (const BasicEndpoint& other) = delete;
655
661 BasicEndpoint& operator= (const BasicEndpoint& other) = delete;
662
667 {
668 this->close();
669 }
670
675 int open ()
676 {
677 if (this->opened ())
678 {
679 lastError = make_error_code (Errc::InUse);
680 return -1;
681 }
682
683 if ((this->_out->open () == -1) || (this->_in->open () == -1))
684 {
685 this->close ();
686 return -1;
687 }
688
689 return 0;
690 }
691
695 void close ()
696 {
697 if (this->_out)
698 {
699 this->_out->close ();
700 }
701
702 if (this->_in)
703 {
704 this->_in->close ();
705 }
706 }
707
712 bool opened () const
713 {
714 return this->_out && this->_out->opened () && this->_in && this->_in->opened ();
715 }
716
722 int trySend (const void* element)
723 {
724 return this->_out->tryPush (element);
725 }
726
732 int send (const void* element)
733 {
734 return this->_out->push (element);
735 }
736
743 template <class Rep, class Period>
744 int timedSend (const void* element, std::chrono::duration <Rep, Period> timeout)
745 {
746 return this->_out->timedPush (element, timeout);
747 }
748
754 int tryReceive (void* element)
755 {
756 return this->_in->tryPop (element);
757 }
758
764 int receive (void* element)
765 {
766 return this->_in->pop (element);
767 }
768
775 template <class Rep, class Period>
776 int timedReceive (void* element, std::chrono::duration <Rep, Period> timeout)
777 {
778 return this->_in->timedPop (element, timeout);
779 }
780
785 uint64_t available () const
786 {
787 return this->_out->available ();
788 }
789
794 bool full () const
795 {
796 return this->_out->full ();
797 }
798
803 uint64_t pending () const
804 {
805 return this->_in->pending ();
806 }
807
812 bool empty () const
813 {
814 return this->_in->empty ();
815 }
816
821 Side side () const
822 {
823 return this->_side;
824 }
825
830 const std::string& name () const
831 {
832 return this->_name;
833 }
834
839 uint64_t elementSize () const
840 {
841 return this->_in->elementSize ();
842 }
843
848 uint64_t capacity () const
849 {
850 return this->_in->capacity ();
851 }
852
853 private:
855 Side _side;
856
858 std::string _name;
859
861 std::unique_ptr <Outbound> _out;
862
864 std::unique_ptr <Inbound> _in;
865 };
866
870 class Spsc
871 {
872 public:
873 using Producer = BasicProducer <Spsc>;
874 using Consumer = BasicConsumer <Spsc>;
875 using Endpoint = BasicEndpoint <Spsc, Spsc>;
876
880 constexpr Spsc () noexcept = default;
881
888 virtual int tryPush (SharedSegment* segment, const void* element) const noexcept
889 {
890 if ((segment == nullptr) || (element == nullptr))
891 {
893 return -1;
894 }
895 uint64_t tail = segment->_sync._tail.load (std::memory_order_acquire);
896 uint64_t head = segment->_sync._head.load (std::memory_order_relaxed);
897 if ((head - tail) == segment->_sync._capacity)
898 {
899 lastError = make_error_code (Errc::TemporaryError);
900 return -1;
901 }
902 auto next = head % segment->_sync._capacity;
903 std::memcpy (segment->_data + (next * segment->_sync._elementSize), element, segment->_sync._elementSize);
904 segment->_sync._head.store (head + 1, std::memory_order_release);
905 return 0;
906 }
907
914 int push (SharedSegment* segment, const void* element) const noexcept
915 {
916 while (tryPush (segment, element) == -1)
917 {
918 if (lastError != Errc::TemporaryError)
919 {
920 return -1;
921 }
922 std::this_thread::yield ();
923 }
924 return 0;
925 }
926
934 template <class Rep, class Period>
935 int timedPush (SharedSegment* segment, const void* element, std::chrono::duration <Rep, Period> timeout) const noexcept
936 {
937 auto const deadline = std::chrono::steady_clock::now () + timeout;
938 while (tryPush (segment, element) == -1)
939 {
940 if (lastError != Errc::TemporaryError)
941 {
942 return -1;
943 }
944 if (std::chrono::steady_clock::now () >= deadline)
945 {
946 lastError = make_error_code (Errc::TimedOut);
947 return -1;
948 }
949 std::this_thread::yield ();
950 }
951 return 0;
952 }
953
960 virtual int tryPop (SharedSegment* segment, void* element) const noexcept
961 {
962 if ((segment == nullptr) || (element == nullptr))
963 {
965 return -1;
966 }
967 uint64_t head = segment->_sync._head.load (std::memory_order_acquire);
968 uint64_t tail = segment->_sync._tail.load (std::memory_order_relaxed);
969 if ((head - tail) == 0)
970 {
972 return -1;
973 }
974 auto next = tail % segment->_sync._capacity;
975 std::memcpy (element, segment->_data + (next * segment->_sync._elementSize), segment->_sync._elementSize);
976 segment->_sync._tail.store (tail + 1, std::memory_order_release);
977 return 0;
978 }
979
986 int pop (SharedSegment* segment, void* element) const noexcept
987 {
988 while (tryPop (segment, element) == -1)
989 {
990 if (lastError != Errc::TemporaryError)
991 {
992 return -1;
993 }
994 std::this_thread::yield ();
995 }
996 return 0;
997 }
998
1006 template <class Rep, class Period>
1007 int timedPop (SharedSegment* segment, void* element, std::chrono::duration <Rep, Period> timeout) const noexcept
1008 {
1009 auto const deadline = std::chrono::steady_clock::now () + timeout;
1010 while (tryPop (segment, element) == -1)
1011 {
1012 if (lastError != Errc::TemporaryError)
1013 {
1014 return -1;
1015 }
1016 if (std::chrono::steady_clock::now () >= deadline)
1017 {
1018 lastError = make_error_code (Errc::TimedOut);
1019 return -1;
1020 }
1021 std::this_thread::yield ();
1022 }
1023 return 0;
1024 }
1025
1031 uint64_t pending (SharedSegment* segment) const noexcept
1032 {
1033 auto head = segment->_sync._head.load (std::memory_order_acquire);
1034 auto tail = segment->_sync._tail.load (std::memory_order_acquire);
1035 return head - tail;
1036 }
1037
1043 uint64_t available (SharedSegment* segment) const noexcept
1044 {
1045 return segment->_sync._capacity - pending (segment);
1046 }
1047
1053 bool full (SharedSegment* segment) const noexcept
1054 {
1055 return pending (segment) == segment->_sync._capacity;
1056 }
1057
1063 bool empty (SharedSegment* segment) const noexcept
1064 {
1065 return pending (segment) == 0;
1066 }
1067 };
1068
1072 class Mpsc : public Spsc
1073 {
1074 public:
1075 using Producer = BasicProducer <Mpsc>;
1076 using Consumer = BasicConsumer <Mpsc>;
1077 using Endpoint = BasicEndpoint <Mpsc, Mpsc>;
1078
1082 constexpr Mpsc () noexcept = default;
1083
1090 int tryPush (SharedSegment* segment, const void* element) const noexcept override
1091 {
1092 if ((segment == nullptr) || (element == nullptr))
1093 {
1094 lastError = make_error_code (Errc::InvalidParam);
1095 return -1;
1096 }
1097 uint64_t head, tail;
1098 do
1099 {
1100 tail = segment->_sync._tail.load (std::memory_order_acquire);
1101 head = segment->_sync._head.load (std::memory_order_relaxed);
1102 if ((head - tail) == segment->_sync._capacity)
1103 {
1105 return -1;
1106 }
1107 }
1108 while (!segment->_sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire, std::memory_order_relaxed));
1109 auto slot = head % segment->_sync._capacity;
1110 std::memcpy (segment->_data + (slot * segment->_sync._elementSize), element, segment->_sync._elementSize);
1111 std::atomic_thread_fence (std::memory_order_release);
1112 return 0;
1113 }
1114 };
1115
1119 class Mpmc : public Mpsc
1120 {
1121 public:
1122 using Producer = BasicProducer <Mpmc>;
1123 using Consumer = BasicConsumer <Mpmc>;
1124 using Endpoint = BasicEndpoint <Mpmc, Mpmc>;
1125
1129 constexpr Mpmc () noexcept = default;
1130
1137 int tryPop (SharedSegment* segment, void* element) const noexcept override
1138 {
1139 if ((segment == nullptr) || (element == nullptr))
1140 {
1141 lastError = make_error_code (Errc::InvalidParam);
1142 return -1;
1143 }
1144 uint64_t head, tail;
1145 do
1146 {
1147 head = segment->_sync._head.load (std::memory_order_acquire);
1148 tail = segment->_sync._tail.load (std::memory_order_relaxed);
1149 if ((head - tail) == 0)
1150 {
1152 return -1;
1153 }
1154 }
1155 while (!segment->_sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire, std::memory_order_relaxed));
1156 auto slot = tail % segment->_sync._capacity;
1157 std::memcpy (element, segment->_data + (slot * segment->_sync._elementSize), segment->_sync._elementSize);
1158 std::atomic_thread_fence (std::memory_order_release);
1159 return 0;
1160 }
1161 };
1162}
1163
1164#endif
shared memory consumer.
Definition shared.hpp:526
~BasicConsumer()=default
destroy the instance.
BasicConsumer(const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:534
int tryPop(void *element) noexcept
try to pop element from the ring buffer.
Definition shared.hpp:562
BasicConsumer & operator=(const BasicConsumer &other)=delete
copy assignment operator.
int timedPop(void *element, std::chrono::duration< Rep, Period > timeout) noexcept
try to pop element from the ring buffer until timeout expire.
Definition shared.hpp:584
int pop(void *element) noexcept
pop element from the ring buffer.
Definition shared.hpp:572
bool empty() const noexcept
check if the ring buffer is empty.
Definition shared.hpp:602
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition shared.hpp:593
BasicConsumer(const BasicConsumer &other)=delete
copy constructor.
basic endpoint class.
Definition shared.hpp:613
typename InboundPolicy::Consumer Inbound
Definition shared.hpp:616
void close()
close the channel endpoint.
Definition shared.hpp:695
typename OutboundPolicy::Producer Outbound
Definition shared.hpp:615
BasicEndpoint(const BasicEndpoint &other)=delete
copy constructor.
uint64_t capacity() const
get the buffer capacity.
Definition shared.hpp:848
BasicEndpoint(Side side, const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:634
Side side() const
get the side this endpoint represents.
Definition shared.hpp:821
bool opened() const
check if the endpoint is open.
Definition shared.hpp:712
bool empty() const
check if inbound queue is empty.
Definition shared.hpp:812
int send(const void *element)
send a message to the peer (blocking).
Definition shared.hpp:732
bool full() const
check if outbound queue is full.
Definition shared.hpp:794
int timedSend(const void *element, std::chrono::duration< Rep, Period > timeout)
send a message to the peer with timeout (blocking).
Definition shared.hpp:744
int timedReceive(void *element, std::chrono::duration< Rep, Period > timeout)
receive a message from the peer with timeout (blocking).
Definition shared.hpp:776
int trySend(const void *element)
try to send a message to the peer (non blocking).
Definition shared.hpp:722
int receive(void *element)
receive a message from the peer (blocking).
Definition shared.hpp:764
int open()
open the channel endpoint.
Definition shared.hpp:675
~BasicEndpoint()
destroy the instance.
Definition shared.hpp:666
const std::string & name() const
get the channel name.
Definition shared.hpp:830
uint64_t available() const
get number of available slots for sending.
Definition shared.hpp:785
Side
endpoint side identifier.
Definition shared.hpp:622
@ B
Definition shared.hpp:624
@ A
Definition shared.hpp:623
BasicEndpoint & operator=(const BasicEndpoint &other)=delete
copy assignment operator.
int tryReceive(void *element)
try to receive a message from the peer (non blocking).
Definition shared.hpp:754
uint64_t elementSize() const
get the element size.
Definition shared.hpp:839
uint64_t pending() const
get number of pending messages.
Definition shared.hpp:803
shared memory producer.
Definition shared.hpp:439
~BasicProducer()=default
destroy the instance.
int push(const void *element) noexcept
push element into the ring buffer.
Definition shared.hpp:485
BasicProducer(const BasicProducer &other)=delete
copy constructor.
BasicProducer(const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:447
bool full() const noexcept
check if the ring buffer is full.
Definition shared.hpp:515
int timedPush(const void *element, std::chrono::duration< Rep, Period > timeout) noexcept
try to push element into the ring buffer until timeout expire.
Definition shared.hpp:497
uint64_t available() const noexcept
get the number of available slots for writing.
Definition shared.hpp:506
int tryPush(const void *element) noexcept
try to push element into ring buffer.
Definition shared.hpp:475
BasicProducer & operator=(const BasicProducer &other)=delete
copy assignment operator.
shared memory base class.
Definition shared.hpp:262
BasicQueue(const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:270
uint64_t _elementSize
shared memory segment element size.
Definition shared.hpp:419
Policy _policy
shared memory segment policy.
Definition shared.hpp:416
const void * get(uint64_t offset=0) const override
get a const pointer to the shared memory data region at a given offset.
Definition shared.hpp:361
uint64_t _userSize
user shared memory size.
Definition shared.hpp:428
uint64_t size() const noexcept override
get the size of the shared memory region.
Definition shared.hpp:409
BasicQueue & operator=(const BasicQueue &other)=delete
copy assignment operator.
SharedSegment * _segment
shared memory segment.
Definition shared.hpp:431
void * get(uint64_t offset=0) override
get a pointer to the shared memory data region at a given offset.
Definition shared.hpp:377
int open() override
open or create the shared memory segment.
Definition shared.hpp:309
virtual ~BasicQueue()
destroy the instance.
Definition shared.hpp:300
uint64_t _totalSize
total shared memory size.
Definition shared.hpp:425
BasicQueue(const BasicQueue &other)=delete
copy constructor.
uint64_t _capacity
shared memory segment capacity.
Definition shared.hpp:422
uint64_t elementSize() const noexcept
get the element size of the shared memory region.
Definition shared.hpp:391
void close() noexcept override
close the shared memory segment.
Definition shared.hpp:349
uint64_t capacity() const noexcept
get the capacity of the shared memory region.
Definition shared.hpp:400
multiple producer multiple consumer ring buffer policy.
Definition shared.hpp:1120
int tryPop(SharedSegment *segment, void *element) const noexcept override
try to pop element from the ring buffer (lock-free for multiple consumers).
Definition shared.hpp:1137
constexpr Mpmc() noexcept=default
construct the multiple producer multiple consumer ring buffer policy by default.
multiple producer single consumer ring buffer policy.
Definition shared.hpp:1073
int tryPush(SharedSegment *segment, const void *element) const noexcept override
try to push element into the ring buffer (lock-free for multiple producers).
Definition shared.hpp:1090
constexpr Mpsc() noexcept=default
construct the multiple producer single consumer ring buffer policy by default.
shared memory class.
Definition shared.hpp:52
int _fd
shared memory file descriptor.
Definition shared.hpp:232
virtual const void * get(uint64_t offset=0) const
get a const pointer to the shared memory at a given offset.
Definition shared.hpp:156
virtual int open()
open or create the shared memory.
Definition shared.hpp:81
uint64_t _size
shared memory size.
Definition shared.hpp:226
SharedMemory(const std::string &name, uint64_t size)
create instance.
Definition shared.hpp:59
void * _ptr
pointer to mapped shared memory.
Definition shared.hpp:229
virtual void close() noexcept
close the shared memory.
Definition shared.hpp:134
std::string _name
shared memory name.
Definition shared.hpp:223
virtual void * get(uint64_t offset=0)
get a pointer to the shared memory at a given offset.
Definition shared.hpp:177
virtual uint64_t size() const noexcept
get the size of the shared memory.
Definition shared.hpp:196
virtual ~SharedMemory()
destroy the instance.
Definition shared.hpp:72
bool opened() const noexcept
check if shared memory is opened.
Definition shared.hpp:126
static int unlink(const std::string &name) noexcept
destroy synchronization primitives and unlink the shared memory segment.
Definition shared.hpp:206
single producer single consumer ring buffer policy.
Definition shared.hpp:871
virtual int tryPop(SharedSegment *segment, void *element) const noexcept
try to pop element from the ring buffer.
Definition shared.hpp:960
virtual int tryPush(SharedSegment *segment, const void *element) const noexcept
try to push element into the ring buffer.
Definition shared.hpp:888
constexpr Spsc() noexcept=default
construct the single producer single consumer ring buffer policy by default.
int pop(SharedSegment *segment, void *element) const noexcept
pop element from the ring buffer.
Definition shared.hpp:986
uint64_t pending(SharedSegment *segment) const noexcept
get the number of pending elements for reading.
Definition shared.hpp:1031
bool full(SharedSegment *segment) const noexcept
check if the ring buffer is full.
Definition shared.hpp:1053
int push(SharedSegment *segment, const void *element) const noexcept
push element into the ring buffer.
Definition shared.hpp:914
int timedPush(SharedSegment *segment, const void *element, std::chrono::duration< Rep, Period > timeout) const noexcept
try to push element into the ring buffer until timeout expire.
Definition shared.hpp:935
bool empty(SharedSegment *segment) const noexcept
check if the ring buffer is empty.
Definition shared.hpp:1063
int timedPop(SharedSegment *segment, void *element, std::chrono::duration< Rep, Period > timeout) const noexcept
try to pop element from the ring buffer until timeout expire.
Definition shared.hpp:1007
uint64_t available(SharedSegment *segment) const noexcept
get the number of available slots for writing.
Definition shared.hpp:1043
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code)
Create an std::error_code object.
Definition error.cpp:154
shared memory segment.
Definition shared.hpp:252
uint8_t _data[]
Definition shared.hpp:254
SharedSync _sync
Definition shared.hpp:253
synchronization primitives.
Definition shared.hpp:239
uint64_t _capacity
Definition shared.hpp:245
std::atomic_uint64_t _magic
Definition shared.hpp:241
static constexpr uint64_t MAGIC
Definition shared.hpp:240
uint64_t _elementSize
Definition shared.hpp:244
std::atomic_uint64_t _tail
Definition shared.hpp:243
std::atomic_uint64_t _head
Definition shared.hpp:242