25#ifndef __JOIN_SHARED_HPP__
26#define __JOIN_SHARED_HPP__
63 if (
_size >
static_cast <uint64_t
> (std::numeric_limits <off_t>::max ()))
65 throw std::overflow_error (
"size will overflow");
90 this->
_fd = ::shm_open (this->
_name.c_str (), O_CREAT | O_RDWR | O_EXCL | O_CLOEXEC, 0644);
91 if ((this->
_fd == -1) && (errno == EEXIST))
94 this->
_fd = ::shm_open(this->
_name.c_str(), O_RDWR | O_CLOEXEC, 0644);
99 lastError = std::make_error_code (
static_cast <std::errc
> (errno));
104 if (created && (::ftruncate (this->
_fd, this->
_size) == -1))
106 lastError = std::make_error_code (
static_cast <std::errc
> (errno));
111 this->
_ptr = ::mmap (
nullptr, this->
_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->
_fd, 0);
112 if (this->
_ptr == MAP_FAILED)
114 lastError = std::make_error_code (
static_cast <std::errc
> (errno));
128 return (this->
_ptr !=
nullptr) && (this->
_ptr != MAP_FAILED);
147 this->
_ptr =
nullptr;
156 virtual const void*
get (uint64_t offset = 0)
const
158 if (offset >= this->
_size)
160 throw std::out_of_range (
"offset out of bounds");
163 if (this->
_ptr ==
nullptr)
168 return static_cast <const char*
> (this->
_ptr) + offset;
177 virtual void*
get (uint64_t offset = 0)
179 if (offset >= this->
_size)
181 throw std::out_of_range (
"offset out of bounds");
184 if (this->
_ptr ==
nullptr)
189 return static_cast <char*
> (this->
_ptr) + offset;
196 virtual uint64_t
size () const noexcept
206 static int unlink (
const std::string& name)
noexcept
208 if (::shm_unlink (name.c_str ()) == -1)
214 lastError = std::make_error_code (
static_cast <std::errc
> (errno));
240 static constexpr uint64_t
MAGIC = 0x9F7E3B2A8D5C4E1B;
241 alignas (64) std::atomic_uint64_t
_magic;
242 alignas (64) std::atomic_uint64_t
_head;
243 alignas (64) std::atomic_uint64_t
_tail;
254 alignas (max_align_t) uint8_t
_data[];
260 template <
typename Policy>
277 throw std::overflow_error (
"size will overflow");
318 uint64_t expected = 0;
319 if (this->
_segment->
_sync.
_magic.compare_exchange_strong (expected, 0xFFFFFFFFFFFFFFFF, std::memory_order_acquire, std::memory_order_acquire))
332 std::this_thread::yield ();
361 const void*
get (uint64_t offset = 0)
const override
365 throw std::out_of_range (
"offset out of bounds");
377 void*
get (uint64_t offset = 0)
override
381 throw std::out_of_range (
"offset out of bounds");
409 uint64_t
size () const noexcept
override
437 template <
typename Policy>
485 int push (
const void* element)
noexcept
496 template <
class Rep,
class Period>
497 int timedPush (
const void* element, std::chrono::duration <Rep, Period> timeout)
noexcept
524 template <
typename Policy>
572 int pop (
void* element)
noexcept
583 template <
class Rep,
class Period>
584 int timedPop (
void* element, std::chrono::duration <Rep, Period> timeout)
noexcept
611 template <
typename OutboundPolicy,
typename InboundPolicy>
615 using Outbound =
typename OutboundPolicy::Producer;
616 using Inbound =
typename InboundPolicy::Consumer;
683 if ((this->_out->open () == -1) || (this->_in->open () == -1))
699 this->_out->close ();
714 return this->_out && this->_out->opened () && this->_in && this->_in->opened ();
724 return this->_out->tryPush (element);
734 return this->_out->push (element);
743 template <
class Rep,
class Period>
744 int timedSend (
const void* element, std::chrono::duration <Rep, Period> timeout)
746 return this->_out->timedPush (element, timeout);
756 return this->_in->tryPop (element);
766 return this->_in->pop (element);
775 template <
class Rep,
class Period>
776 int timedReceive (
void* element, std::chrono::duration <Rep, Period> timeout)
778 return this->_in->timedPop (element, timeout);
787 return this->_out->available ();
796 return this->_out->full ();
805 return this->_in->pending ();
814 return this->_in->empty ();
830 const std::string&
name ()
const
841 return this->_in->elementSize ();
850 return this->_in->capacity ();
861 std::unique_ptr <Outbound> _out;
864 std::unique_ptr <Inbound> _in;
880 constexpr Spsc () noexcept = default;
890 if ((segment ==
nullptr) || (element ==
nullptr))
895 uint64_t tail = segment->_sync._tail.load (std::memory_order_acquire);
896 uint64_t head = segment->_sync._head.load (std::memory_order_relaxed);
897 if ((head - tail) == segment->_sync._capacity)
899 lastError = make_error_code (Errc::TemporaryError);
902 auto next = head % segment->_sync._capacity;
903 std::memcpy (segment->_data + (next * segment->_sync._elementSize), element, segment->_sync._elementSize);
904 segment->_sync._head.store (head + 1, std::memory_order_release);
916 while (
tryPush (segment, element) == -1)
922 std::this_thread::yield ();
934 template <
class Rep,
class Period>
937 auto const deadline = std::chrono::steady_clock::now () + timeout;
938 while (
tryPush (segment, element) == -1)
944 if (std::chrono::steady_clock::now () >= deadline)
949 std::this_thread::yield ();
962 if ((segment ==
nullptr) || (element ==
nullptr))
967 uint64_t head = segment->_sync._head.load (std::memory_order_acquire);
968 uint64_t tail = segment->_sync._tail.load (std::memory_order_relaxed);
969 if ((head - tail) == 0)
974 auto next = tail % segment->_sync._capacity;
975 std::memcpy (element, segment->_data + (next * segment->_sync._elementSize), segment->_sync._elementSize);
976 segment->_sync._tail.store (tail + 1, std::memory_order_release);
988 while (
tryPop (segment, element) == -1)
994 std::this_thread::yield ();
1006 template <
class Rep,
class Period>
1009 auto const deadline = std::chrono::steady_clock::now () + timeout;
1010 while (
tryPop (segment, element) == -1)
1016 if (std::chrono::steady_clock::now () >= deadline)
1021 std::this_thread::yield ();
1033 auto head = segment->_sync._head.load (std::memory_order_acquire);
1034 auto tail = segment->_sync._tail.load (std::memory_order_acquire);
1045 return segment->_sync._capacity -
pending (segment);
1055 return pending (segment) == segment->_sync._capacity;
1065 return pending (segment) == 0;
1082 constexpr Mpsc () noexcept = default;
1092 if ((segment ==
nullptr) || (element ==
nullptr))
1097 uint64_t head, tail;
1100 tail = segment->_sync._tail.load (std::memory_order_acquire);
1101 head = segment->_sync._head.load (std::memory_order_relaxed);
1102 if ((head - tail) == segment->_sync._capacity)
1108 while (!segment->_sync._head.compare_exchange_weak (head, head + 1, std::memory_order_acquire, std::memory_order_relaxed));
1109 auto slot = head % segment->_sync._capacity;
1110 std::memcpy (segment->_data + (slot * segment->_sync._elementSize), element, segment->_sync._elementSize);
1111 std::atomic_thread_fence (std::memory_order_release);
1129 constexpr Mpmc () noexcept = default;
1139 if ((segment ==
nullptr) || (element ==
nullptr))
1144 uint64_t head, tail;
1147 head = segment->_sync._head.load (std::memory_order_acquire);
1148 tail = segment->_sync._tail.load (std::memory_order_relaxed);
1149 if ((head - tail) == 0)
1155 while (!segment->_sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire, std::memory_order_relaxed));
1156 auto slot = tail % segment->_sync._capacity;
1157 std::memcpy (element, segment->_data + (slot * segment->_sync._elementSize), segment->_sync._elementSize);
1158 std::atomic_thread_fence (std::memory_order_release);
shared memory consumer.
Definition shared.hpp:526
~BasicConsumer()=default
destroy the instance.
BasicConsumer(const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:534
int tryPop(void *element) noexcept
try to pop element from the ring buffer.
Definition shared.hpp:562
BasicConsumer & operator=(const BasicConsumer &other)=delete
copy assignment operator.
int timedPop(void *element, std::chrono::duration< Rep, Period > timeout) noexcept
try to pop element from the ring buffer until timeout expire.
Definition shared.hpp:584
int pop(void *element) noexcept
pop element from the ring buffer.
Definition shared.hpp:572
bool empty() const noexcept
check if the ring buffer is empty.
Definition shared.hpp:602
uint64_t pending() const noexcept
get the number of pending elements for reading.
Definition shared.hpp:593
BasicConsumer(const BasicConsumer &other)=delete
copy constructor.
basic endpoint class.
Definition shared.hpp:613
typename InboundPolicy::Consumer Inbound
Definition shared.hpp:616
void close()
close the channel endpoint.
Definition shared.hpp:695
typename OutboundPolicy::Producer Outbound
Definition shared.hpp:615
BasicEndpoint(const BasicEndpoint &other)=delete
copy constructor.
uint64_t capacity() const
get the buffer capacity.
Definition shared.hpp:848
BasicEndpoint(Side side, const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:634
Side side() const
get the side this endpoint represents.
Definition shared.hpp:821
bool opened() const
check if the endpoint is open.
Definition shared.hpp:712
bool empty() const
check if inbound queue is empty.
Definition shared.hpp:812
int send(const void *element)
send a message to the peer (blocking).
Definition shared.hpp:732
bool full() const
check if outbound queue is full.
Definition shared.hpp:794
int timedSend(const void *element, std::chrono::duration< Rep, Period > timeout)
send a message to the peer with timeout (blocking).
Definition shared.hpp:744
int timedReceive(void *element, std::chrono::duration< Rep, Period > timeout)
receive a message from the peer with timeout (blocking).
Definition shared.hpp:776
int trySend(const void *element)
try to send a message to the peer (non blocking).
Definition shared.hpp:722
int receive(void *element)
receive a message from the peer (blocking).
Definition shared.hpp:764
int open()
open the channel endpoint.
Definition shared.hpp:675
~BasicEndpoint()
destroy the instance.
Definition shared.hpp:666
const std::string & name() const
get the channel name.
Definition shared.hpp:830
uint64_t available() const
get number of available slots for sending.
Definition shared.hpp:785
Side
endpoint side identifier.
Definition shared.hpp:622
@ B
Definition shared.hpp:624
@ A
Definition shared.hpp:623
BasicEndpoint & operator=(const BasicEndpoint &other)=delete
copy assignment operator.
int tryReceive(void *element)
try to receive a message from the peer (non blocking).
Definition shared.hpp:754
uint64_t elementSize() const
get the element size.
Definition shared.hpp:839
uint64_t pending() const
get number of pending messages.
Definition shared.hpp:803
shared memory producer.
Definition shared.hpp:439
~BasicProducer()=default
destroy the instance.
int push(const void *element) noexcept
push element into the ring buffer.
Definition shared.hpp:485
BasicProducer(const BasicProducer &other)=delete
copy constructor.
BasicProducer(const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:447
bool full() const noexcept
check if the ring buffer is full.
Definition shared.hpp:515
int timedPush(const void *element, std::chrono::duration< Rep, Period > timeout) noexcept
try to push element into the ring buffer until timeout expire.
Definition shared.hpp:497
uint64_t available() const noexcept
get the number of available slots for writing.
Definition shared.hpp:506
int tryPush(const void *element) noexcept
try to push element into ring buffer.
Definition shared.hpp:475
BasicProducer & operator=(const BasicProducer &other)=delete
copy assignment operator.
shared memory base class.
Definition shared.hpp:262
BasicQueue(const std::string &name, uint64_t elementSize=1472, uint64_t capacity=144)
create instance.
Definition shared.hpp:270
uint64_t _elementSize
shared memory segment element size.
Definition shared.hpp:419
Policy _policy
shared memory segment policy.
Definition shared.hpp:416
const void * get(uint64_t offset=0) const override
get a const pointer to the shared memory data region at a given offset.
Definition shared.hpp:361
uint64_t _userSize
user shared memory size.
Definition shared.hpp:428
uint64_t size() const noexcept override
get the size of the shared memory region.
Definition shared.hpp:409
BasicQueue & operator=(const BasicQueue &other)=delete
copy assignment operator.
SharedSegment * _segment
shared memory segment.
Definition shared.hpp:431
void * get(uint64_t offset=0) override
get a pointer to the shared memory data region at a given offset.
Definition shared.hpp:377
int open() override
open or create the shared memory segment.
Definition shared.hpp:309
virtual ~BasicQueue()
destroy the instance.
Definition shared.hpp:300
uint64_t _totalSize
total shared memory size.
Definition shared.hpp:425
BasicQueue(const BasicQueue &other)=delete
copy constructor.
uint64_t _capacity
shared memory segment capacity.
Definition shared.hpp:422
uint64_t elementSize() const noexcept
get the element size of the shared memory region.
Definition shared.hpp:391
void close() noexcept override
close the shared memory segment.
Definition shared.hpp:349
uint64_t capacity() const noexcept
get the capacity of the shared memory region.
Definition shared.hpp:400
multiple producer multiple consumer ring buffer policy.
Definition shared.hpp:1120
int tryPop(SharedSegment *segment, void *element) const noexcept override
try to pop element from the ring buffer (lock-free for multiple consumers).
Definition shared.hpp:1137
constexpr Mpmc() noexcept=default
construct the multiple producer multiple consumer ring buffer policy by default.
multiple producer single consumer ring buffer policy.
Definition shared.hpp:1073
int tryPush(SharedSegment *segment, const void *element) const noexcept override
try to push element into the ring buffer (lock-free for multiple producers).
Definition shared.hpp:1090
constexpr Mpsc() noexcept=default
construct the multiple producer single consumer ring buffer policy by default.
shared memory class.
Definition shared.hpp:52
int _fd
shared memory file descriptor.
Definition shared.hpp:232
virtual const void * get(uint64_t offset=0) const
get a const pointer to the shared memory at a given offset.
Definition shared.hpp:156
virtual int open()
open or create the shared memory.
Definition shared.hpp:81
uint64_t _size
shared memory size.
Definition shared.hpp:226
SharedMemory(const std::string &name, uint64_t size)
create instance.
Definition shared.hpp:59
void * _ptr
pointer to mapped shared memory.
Definition shared.hpp:229
virtual void close() noexcept
close the shared memory.
Definition shared.hpp:134
std::string _name
shared memory name.
Definition shared.hpp:223
virtual void * get(uint64_t offset=0)
get a pointer to the shared memory at a given offset.
Definition shared.hpp:177
virtual uint64_t size() const noexcept
get the size of the shared memory.
Definition shared.hpp:196
virtual ~SharedMemory()
destroy the instance.
Definition shared.hpp:72
bool opened() const noexcept
check if shared memory is opened.
Definition shared.hpp:126
static int unlink(const std::string &name) noexcept
destroy synchronization primitives and unlink the shared memory segment.
Definition shared.hpp:206
single producer single consumer ring buffer policy.
Definition shared.hpp:871
virtual int tryPop(SharedSegment *segment, void *element) const noexcept
try to pop element from the ring buffer.
Definition shared.hpp:960
virtual int tryPush(SharedSegment *segment, const void *element) const noexcept
try to push element into the ring buffer.
Definition shared.hpp:888
constexpr Spsc() noexcept=default
construct the single producer single consumer ring buffer policy by default.
int pop(SharedSegment *segment, void *element) const noexcept
pop element from the ring buffer.
Definition shared.hpp:986
uint64_t pending(SharedSegment *segment) const noexcept
get the number of pending elements for reading.
Definition shared.hpp:1031
bool full(SharedSegment *segment) const noexcept
check if the ring buffer is full.
Definition shared.hpp:1053
int push(SharedSegment *segment, const void *element) const noexcept
push element into the ring buffer.
Definition shared.hpp:914
int timedPush(SharedSegment *segment, const void *element, std::chrono::duration< Rep, Period > timeout) const noexcept
try to push element into the ring buffer until timeout expire.
Definition shared.hpp:935
bool empty(SharedSegment *segment) const noexcept
check if the ring buffer is empty.
Definition shared.hpp:1063
int timedPop(SharedSegment *segment, void *element, std::chrono::duration< Rep, Period > timeout) const noexcept
try to pop element from the ring buffer until timeout expire.
Definition shared.hpp:1007
uint64_t available(SharedSegment *segment) const noexcept
get the number of available slots for writing.
Definition shared.hpp:1043
Definition acceptor.hpp:32
std::error_code make_error_code(join::Errc code)
Create an std::error_code object.
Definition error.cpp:154
shared memory segment.
Definition shared.hpp:252
uint8_t _data[]
Definition shared.hpp:254
SharedSync _sync
Definition shared.hpp:253
synchronization primitives.
Definition shared.hpp:239
uint64_t _capacity
Definition shared.hpp:245
std::atomic_uint64_t _magic
Definition shared.hpp:241
static constexpr uint64_t MAGIC
Definition shared.hpp:240
uint64_t _elementSize
Definition shared.hpp:244
std::atomic_uint64_t _tail
Definition shared.hpp:243
std::atomic_uint64_t _head
Definition shared.hpp:242