join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
proactor.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_PROACTOR_HPP
26#define JOIN_CORE_PROACTOR_HPP
27
28// libjoin.
29#include <join/io_operation.hpp>
30#ifdef JOIN_HAS_IO_URING
31#include <join/io_policy.hpp>
32#else
33#include <join/reactor.hpp>
34#endif
35#include <join/backoff.hpp>
36#include <join/thread.hpp>
37#include <join/queue.hpp>
38
39// C++.
40#include <utility>
41#include <vector>
42
43// C.
44#include <sys/eventfd.h>
45#include <cerrno>
46
47namespace join
48{
49 class CompletionHandler;
50#ifdef JOIN_HAS_IO_URING
51 template <typename Policy>
52 class BasicProactor;
53 template <typename Policy>
54 class BasicProactorThread;
55
56 using Proactor = BasicProactor<IoDefaultPolicy>;
57 using HybridProactor = BasicProactor<IoHybridPolicy>;
58 using SqpollProactor = BasicProactor<IoSqpollPolicy>;
59 using ProactorThread = BasicProactorThread<IoDefaultPolicy>;
60 using HybridProactorThread = BasicProactorThread<IoHybridPolicy>;
61 using SqpollProactorThread = BasicProactorThread<IoSqpollPolicy>;
62#else
63 class BasicProactor;
64 class BasicProactorThread;
65
68#endif
69}
70
75{
77#ifdef JOIN_HAS_IO_URING
78 template <typename Policy>
79 friend class join::BasicProactor;
80#else
81 friend class join::BasicProactor;
82#endif
83
84public:
88 CompletionHandler () = default;
89
94 CompletionHandler (const CompletionHandler& other) = default;
95
102
108
115
119 virtual ~CompletionHandler () = default;
120
121protected:
127 virtual void onComplete ([[maybe_unused]] IoOperation* op, [[maybe_unused]] int result)
128 {
129 // do nothing.
130 }
131
137 virtual void onCancel ([[maybe_unused]] IoOperation* op, [[maybe_unused]] int result)
138 {
139 // do nothing.
140 }
141};
142
146#ifdef JOIN_HAS_IO_URING
147template <typename Policy = join::IoDefaultPolicy>
149#else
151#endif
152{
153public:
157 explicit BasicProactor ();
158
163 BasicProactor (const BasicProactor& other) = delete;
164
170 BasicProactor& operator= (const BasicProactor& other) = delete;
171
176 BasicProactor (BasicProactor&& other) = delete;
177
184
188 ~BasicProactor () noexcept;
189
197 int submit (IoOperation* op, bool flush = false, bool sync = false) noexcept;
198
206 int cancel (IoOperation* op, bool flush = false, bool sync = false) noexcept;
207
208#ifdef JOIN_HAS_IO_URING
214 int flush (bool sync = false) noexcept;
215#endif
216
220 void run ();
221
226 void stop (bool sync = true) noexcept;
227
228#ifdef JOIN_HAS_IO_URING
234 int registerBuffers (const std::vector<iovec>& iovecs) noexcept;
235
240 int unregisterBuffers () noexcept;
241#endif
242
243#ifdef JOIN_HAS_NUMA
249 int mbind (int numa) const noexcept;
250#endif
251
256 int mlock () const noexcept;
257
262 bool isRunning () const noexcept;
263
268 bool isProactorThread () const noexcept;
269
270private:
271#ifdef JOIN_HAS_IO_URING
276 int initWakeup (std::true_type) noexcept;
277
282 int initWakeup (std::false_type) noexcept;
283
287 void initWakeupOp (std::true_type) noexcept;
288
292 void initWakeupOp (std::false_type) noexcept;
293
298 void initCqEntries (io_uring_params&, std::false_type) noexcept;
299
304 void initCqEntries (io_uring_params& params, std::true_type) noexcept;
305
310 void initSqThreadIdle (io_uring_params&, std::false_type) noexcept;
311
316 void initSqThreadIdle (io_uring_params& params, std::true_type) noexcept;
317
322 void initSqThreadCpu (io_uring_params&, std::false_type) noexcept;
323
328 void initSqThreadCpu (io_uring_params& params, std::true_type) noexcept;
329#endif
330
334 enum class CommandType
335 {
336 Submit,
337 Cancel,
338 Stop,
339#ifdef JOIN_HAS_IO_URING
340 Flush,
341#endif
342 };
343
347 struct alignas (64) Command
348 {
349 CommandType type;
350 IoOperation* op;
351 bool flush;
352 std::atomic<bool>* done;
353 std::error_code* errc;
354 };
355
361 int writeCommand (const Command& cmd) noexcept;
362
363#ifdef JOIN_HAS_IO_URING
369 int writeCommand (const Command& cmd, std::true_type) noexcept;
370
376 int writeCommand (const Command& cmd, std::false_type) noexcept;
377#endif
378
382 void readCommands () noexcept;
383
388 void processCommand (const Command& cmd) noexcept;
389
396 int submitOperation (IoOperation* op, bool flush) noexcept;
397
404 int cancelOperation (IoOperation* op, bool flush) noexcept;
405
409 void cancelAllOperations () noexcept;
410
417 void dispatchOperation (IoOperation* op, int result, bool cancelled) noexcept;
418
425 void endOperation (IoOperation* op, int result, bool cancelled = false) noexcept;
426
427#ifdef JOIN_HAS_IO_URING
432 io_uring_sqe* getSqe () noexcept;
433
439 void prepareSqe (io_uring_sqe* sqe, IoOperation* op) noexcept;
440
444 void rearmWakeup () noexcept;
445
450 void dispatchCqe (io_uring_cqe* cqe) noexcept;
451
456 void dispatchCqe (io_uring_cqe* cqe, std::true_type) noexcept;
457
462 void dispatchCqe (io_uring_cqe* cqe, std::false_type) noexcept;
463
467 void eventLoop () noexcept;
468
472 void eventLoop (std::false_type, std::false_type) noexcept;
473
477 void eventLoop (std::true_type, std::false_type) noexcept;
478
482 void eventLoop (std::true_type, std::true_type) noexcept;
483#else
489 static bool isWriteOp (uint8_t code) noexcept;
490
496 static int executeOp (IoOperation* op) noexcept;
497
502 void onReadable (int fd) noexcept override;
503
508 void onWriteable (int fd) noexcept override;
509
514 void onClose (int fd) noexcept override;
515
520 void onError (int fd) noexcept override;
521#endif
522
524 static constexpr size_t _queueSize = 1024;
525
528
530 std::atomic<bool> _stopping{false};
531
533 int _wakeup = -1;
534
535#ifdef JOIN_HAS_IO_URING
537 uint64_t _wakeupBuf = 0;
538
540 IoOperation _wakeupOp = {};
541
543 io_uring _ring = {};
544
546 std::vector<IoOperation*> _pendingOps;
547
549 static constexpr pthread_t _invalidThreadId = static_cast<pthread_t> (-1);
550
552 std::atomic<pthread_t> _threadId{_invalidThreadId};
553
555 std::atomic<bool> _running{false};
556#else
558 std::vector<IoOperation*> _readOps;
559
561 std::vector<IoOperation*> _writeOps;
562
564 Reactor _reactor;
565#endif
566};
567
568// =========================================================================
569// CLASS : BasicProactor
570// METHOD : submit
571// =========================================================================
572#ifdef JOIN_HAS_IO_URING
573template <typename Policy>
574int join::BasicProactor<Policy>::submit (IoOperation* op, bool flush, bool sync) noexcept
575#else
576inline int join::BasicProactor::submit (IoOperation* op, bool flush, bool sync) noexcept
577#endif
578{
579 if (isProactorThread ())
580 {
581 return submitOperation (op, flush);
582 }
583
584 std::atomic<bool> done{false}, *pdone = nullptr;
585 std::error_code errc, *perrc = nullptr;
586
587 if (JOIN_UNLIKELY (sync))
588 {
589 pdone = &done;
590 perrc = &errc;
591 }
592
593 if (JOIN_UNLIKELY (writeCommand ({CommandType::Submit, op, flush, pdone, perrc}) == -1))
594 {
595 return -1; // LCOV_EXCL_LINE
596 }
597
598 if (JOIN_UNLIKELY (sync))
599 {
600 Backoff backoff;
601 while (!done.load (std::memory_order_acquire))
602 {
603 backoff ();
604 }
605
606 if (JOIN_UNLIKELY (errc))
607 {
608 lastError = errc;
609 return -1;
610 }
611 }
612
613 return 0;
614}
615
616// =========================================================================
617// CLASS : BasicProactor
618// METHOD : cancel
619// =========================================================================
620#ifdef JOIN_HAS_IO_URING
621template <typename Policy>
622int join::BasicProactor<Policy>::cancel (IoOperation* op, bool flush, bool sync) noexcept
623#else
624inline int join::BasicProactor::cancel (IoOperation* op, bool flush, bool sync) noexcept
625#endif
626{
627 if (isProactorThread ())
628 {
629 return cancelOperation (op, flush);
630 }
631
632 std::atomic<bool> done{false}, *pdone = nullptr;
633 std::error_code errc, *perrc = nullptr;
634
635 if (JOIN_UNLIKELY (sync))
636 {
637 pdone = &done;
638 perrc = &errc;
639 }
640
641 if (JOIN_UNLIKELY (writeCommand ({CommandType::Cancel, op, flush, pdone, perrc}) == -1))
642 {
643 return -1; // LCOV_EXCL_LINE
644 }
645
646 if (JOIN_UNLIKELY (sync))
647 {
648 Backoff backoff;
649 while (!done.load (std::memory_order_acquire))
650 {
651 backoff ();
652 }
653
654 if (JOIN_UNLIKELY (errc))
655 {
656 lastError = errc;
657 return -1;
658 }
659 }
660
661 return 0;
662}
663
664// =========================================================================
665// CLASS : BasicProactor
666// METHOD : dispatchOperation
667// =========================================================================
668#ifdef JOIN_HAS_IO_URING
669template <typename Policy>
670void join::BasicProactor<Policy>::dispatchOperation (IoOperation* op, int result, bool cancelled) noexcept
671#else
672inline void join::BasicProactor::dispatchOperation (IoOperation* op, int result, bool cancelled) noexcept
673#endif
674{
675 if (JOIN_UNLIKELY (op == nullptr))
676 {
677 return; // LCOV_EXCL_LINE
678 }
679
680 if (JOIN_LIKELY (op->handler))
681 {
682 if (cancelled)
683 {
684 op->handler->onCancel (op, result);
685 }
686 else
687 {
688 op->handler->onComplete (op, result);
689 }
690 }
691
692 op->state = IoOperation::State::Idle;
693}
694
695#ifdef JOIN_HAS_IO_URING
697#else
699#endif
700
704#ifdef JOIN_HAS_IO_URING
705template <typename Policy = join::IoDefaultPolicy>
707#else
709#endif
710{
711public:
716#ifdef JOIN_HAS_IO_URING
718#else
720#endif
721 {
722 return instance ()._proactor;
723 }
724
730 static int affinity (int core)
731 {
732 return instance ()._dispatcher.affinity (core);
733 }
734
739 static int affinity () noexcept
740 {
741 return instance ()._dispatcher.affinity ();
742 }
743
749 static int priority (int prio)
750 {
751 return instance ()._dispatcher.priority (prio);
752 }
753
758 static int priority () noexcept
759 {
760 return instance ()._dispatcher.priority ();
761 }
762
767 static pthread_t handle () noexcept
768 {
769 return instance ()._dispatcher.handle ();
770 }
771
772#ifdef JOIN_HAS_NUMA
778 static int mbind (int numa) noexcept
779 {
780 return instance ()._proactor.mbind (numa);
781 }
782#endif
783
788 static int mlock () noexcept
789 {
790 return instance ()._proactor.mlock ();
791 }
792
793private:
798 static BasicProactorThread& instance ()
799 {
800 static BasicProactorThread proactorThread;
801 return proactorThread;
802 }
803
807 BasicProactorThread ()
808 {
809 _dispatcher = Thread ([this] () {
810 _proactor.run ();
811 });
812 }
813
818 BasicProactorThread (const BasicProactorThread&) = delete;
819
825 BasicProactorThread& operator= (const BasicProactorThread&) = delete;
826
831 BasicProactorThread (BasicProactorThread&&) = delete;
832
838 BasicProactorThread& operator= (BasicProactorThread&&) = delete;
839
843 ~BasicProactorThread ()
844 {
845 _proactor.stop ();
846 _dispatcher.join ();
847 }
848
849#ifdef JOIN_HAS_IO_URING
851 BasicProactor<Policy> _proactor;
852#else
854 BasicProactor _proactor;
855#endif
856
858 Thread _dispatcher;
859};
860
861#endif
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
Convenience class that owns a Proactor running on a dedicated background thread.
Definition proactor.hpp:710
static int affinity() noexcept
get proactor thread affinity.
Definition proactor.hpp:739
static int priority(int prio)
set proactor thread scheduling priority.
Definition proactor.hpp:749
static int mlock() noexcept
lock proactor command queue memory in RAM.
Definition proactor.hpp:788
static int affinity(int core)
set proactor thread affinity.
Definition proactor.hpp:730
static pthread_t handle() noexcept
get the native handle of the proactor thread.
Definition proactor.hpp:767
static BasicProactor & proactor()
get the Proactor instance owned by the singleton ProactorThread.
Definition proactor.hpp:719
static int priority() noexcept
get proactor thread scheduling priority.
Definition proactor.hpp:758
basic proactor class.
Definition proactor.hpp:152
int mlock() const noexcept
lock proactor command queue memory in RAM.
Definition proactor_epoll_impl.hpp:132
int cancel(IoOperation *op, bool flush=false, bool sync=false) noexcept
cancel an in-flight operation.
Definition proactor.hpp:624
bool isRunning() const noexcept
check if the event loop is running.
Definition proactor_epoll_impl.hpp:146
void run()
run the event loop (blocking).
Definition proactor_epoll_impl.hpp:58
BasicProactor(const BasicProactor &other)=delete
copy constructor.
BasicProactor(BasicProactor &&other)=delete
move constructor.
~BasicProactor() noexcept
destroy instance.
Definition proactor_epoll_impl.hpp:47
int submit(IoOperation *op, bool flush=false, bool sync=false) noexcept
submit an asynchronous operation to the proactor.
Definition proactor.hpp:576
BasicProactor & operator=(const BasicProactor &other)=delete
copy assignment operator.
void stop(bool sync=true) noexcept
stop the event loop.
Definition proactor_epoll_impl.hpp:67
bool isProactorThread() const noexcept
check if the calling thread is the proactor thread.
Definition proactor_epoll_impl.hpp:155
BasicProactor()
initialize the proactor and its I/O backend.
Definition proactor_epoll_impl.hpp:29
completion handler interface class.
Definition proactor.hpp:75
CompletionHandler()=default
create instance.
virtual ~CompletionHandler()=default
destroy instance.
CompletionHandler(CompletionHandler &&other)=default
move constructor.
CompletionHandler & operator=(const CompletionHandler &other)=default
copy assignment operator.
virtual void onComplete(IoOperation *op, int result)
method called when an operation completes successfully.
Definition proactor.hpp:127
CompletionHandler(const CompletionHandler &other)=default
copy constructor.
virtual void onCancel(IoOperation *op, int result)
method called when an operation is cancelled.
Definition proactor.hpp:137
Event handler interface class.
Definition reactor.hpp:46
friend class Reactor
friendship with reactor.
Definition reactor.hpp:122
void join() noexcept
block the current thread until the running thread finishes its execution.
Definition thread.cpp:252
int affinity(int core)
set thread affinity.
Definition thread.cpp:106
int priority(int prio)
set thread priority.
Definition thread.cpp:169
pthread_t handle() const noexcept
get the handle of the thread of execution. @retunr thread of execution handle.
Definition thread.cpp:301
Definition acceptor.hpp:32
BasicProactorThread ProactorThread
Definition proactor.hpp:67
BasicProactor Proactor
Definition proactor.hpp:66
Definition error.hpp:137
Describes a single asynchronous operation submitted to the Proactor.
Definition io_operation.hpp:41
BasicQueue< Type, Backend, SyncPolicy< Type, Backend > > Queue
queue type alias combining backend and policy.
Definition queue.hpp:1086
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47