25#ifndef JOIN_CORE_PROACTOR_HPP
26#define JOIN_CORE_PROACTOR_HPP
30#ifdef JOIN_HAS_IO_URING
44#include <sys/eventfd.h>
49 class CompletionHandler;
50#ifdef JOIN_HAS_IO_URING
51 template <
typename Policy>
53 template <
typename Policy>
54 class BasicProactorThread;
56 using Proactor = BasicProactor<IoDefaultPolicy>;
57 using HybridProactor = BasicProactor<IoHybridPolicy>;
58 using SqpollProactor = BasicProactor<IoSqpollPolicy>;
60 using HybridProactorThread = BasicProactorThread<IoHybridPolicy>;
61 using SqpollProactorThread = BasicProactorThread<IoSqpollPolicy>;
64 class BasicProactorThread;
77#ifdef JOIN_HAS_IO_URING
78 template <
typename Policy>
146#ifdef JOIN_HAS_IO_URING
147template <
typename Policy = join::IoDefaultPolicy>
208#ifdef JOIN_HAS_IO_URING
214 int flush (
bool sync =
false) noexcept;
226 void stop (
bool sync =
true) noexcept;
228#ifdef JOIN_HAS_IO_URING
234 int registerBuffers (
const std::vector<iovec>& iovecs)
noexcept;
240 int unregisterBuffers () noexcept;
249 int mbind (
int numa)
const noexcept;
256 int mlock () const noexcept;
271#ifdef JOIN_HAS_IO_URING
276 int initWakeup (std::true_type)
noexcept;
282 int initWakeup (std::false_type)
noexcept;
287 void initWakeupOp (std::true_type)
noexcept;
292 void initWakeupOp (std::false_type)
noexcept;
298 void initCqEntries (io_uring_params&, std::false_type)
noexcept;
304 void initCqEntries (io_uring_params& params, std::true_type)
noexcept;
310 void initSqThreadIdle (io_uring_params&, std::false_type)
noexcept;
316 void initSqThreadIdle (io_uring_params& params, std::true_type)
noexcept;
322 void initSqThreadCpu (io_uring_params&, std::false_type)
noexcept;
328 void initSqThreadCpu (io_uring_params& params, std::true_type)
noexcept;
334 enum class CommandType
339#ifdef JOIN_HAS_IO_URING
347 struct alignas (64) Command
352 std::atomic<bool>* done;
353 std::error_code* errc;
361 int writeCommand (
const Command& cmd)
noexcept;
363#ifdef JOIN_HAS_IO_URING
369 int writeCommand (
const Command& cmd, std::true_type)
noexcept;
376 int writeCommand (
const Command& cmd, std::false_type)
noexcept;
382 void readCommands () noexcept;
388 void processCommand (const Command& cmd) noexcept;
396 int submitOperation (IoOperation* op,
bool flush) noexcept;
404 int cancelOperation (IoOperation* op,
bool flush) noexcept;
409 void cancelAllOperations () noexcept;
417 void dispatchOperation (IoOperation* op,
int result,
bool cancelled) noexcept;
425 void endOperation (IoOperation* op,
int result,
bool cancelled = false) noexcept;
427#ifdef JOIN_HAS_IO_URING
432 io_uring_sqe* getSqe () noexcept;
439 void prepareSqe (io_uring_sqe* sqe, IoOperation* op) noexcept;
444 void rearmWakeup () noexcept;
450 void dispatchCqe (io_uring_cqe* cqe) noexcept;
456 void dispatchCqe (io_uring_cqe* cqe,
std::true_type) noexcept;
462 void dispatchCqe (io_uring_cqe* cqe,
std::false_type) noexcept;
467 void eventLoop () noexcept;
472 void eventLoop (
std::false_type,
std::false_type) noexcept;
477 void eventLoop (
std::true_type,
std::false_type) noexcept;
482 void eventLoop (
std::true_type,
std::true_type) noexcept;
489 static bool isWriteOp (uint8_t code)
noexcept;
496 static int executeOp (IoOperation* op)
noexcept;
502 void onReadable (
int fd)
noexcept override;
508 void onWriteable (
int fd)
noexcept override;
514 void onClose (
int fd)
noexcept override;
520 void onError (
int fd)
noexcept override;
524 static constexpr size_t _queueSize = 1024;
530 std::atomic<bool> _stopping{
false};
535#ifdef JOIN_HAS_IO_URING
537 uint64_t _wakeupBuf = 0;
540 IoOperation _wakeupOp = {};
546 std::vector<IoOperation*> _pendingOps;
549 static constexpr pthread_t _invalidThreadId =
static_cast<pthread_t
> (-1);
552 std::atomic<pthread_t> _threadId{_invalidThreadId};
555 std::atomic<bool> _running{
false};
558 std::vector<IoOperation*> _readOps;
561 std::vector<IoOperation*> _writeOps;
572#ifdef JOIN_HAS_IO_URING
573template <
typename Policy>
579 if (isProactorThread ())
581 return submitOperation (op, flush);
584 std::atomic<bool> done{
false}, *pdone =
nullptr;
585 std::error_code errc, *perrc =
nullptr;
593 if (
JOIN_UNLIKELY (writeCommand ({CommandType::Submit, op, flush, pdone, perrc}) == -1))
601 while (!done.load (std::memory_order_acquire))
620#ifdef JOIN_HAS_IO_URING
621template <
typename Policy>
627 if (isProactorThread ())
629 return cancelOperation (op, flush);
632 std::atomic<bool> done{
false}, *pdone =
nullptr;
633 std::error_code errc, *perrc =
nullptr;
641 if (
JOIN_UNLIKELY (writeCommand ({CommandType::Cancel, op, flush, pdone, perrc}) == -1))
649 while (!done.load (std::memory_order_acquire))
668#ifdef JOIN_HAS_IO_URING
669template <
typename Policy>
672inline void join::BasicProactor::dispatchOperation (
IoOperation* op,
int result,
bool cancelled)
noexcept
684 op->handler->onCancel (op, result);
688 op->handler->onComplete (op, result);
692 op->state = IoOperation::State::Idle;
695#ifdef JOIN_HAS_IO_URING
704#ifdef JOIN_HAS_IO_URING
705template <
typename Policy = join::IoDefaultPolicy>
716#ifdef JOIN_HAS_IO_URING
722 return instance ()._proactor;
732 return instance ()._dispatcher.
affinity (core);
741 return instance ()._dispatcher.
affinity ();
751 return instance ()._dispatcher.
priority (prio);
760 return instance ()._dispatcher.
priority ();
769 return instance ()._dispatcher.
handle ();
778 static int mbind (
int numa)
noexcept
780 return instance ()._proactor.mbind (numa);
790 return instance ()._proactor.
mlock ();
801 return proactorThread;
807 BasicProactorThread ()
809 _dispatcher = Thread ([
this] () {
818 BasicProactorThread (
const BasicProactorThread&) =
delete;
825 BasicProactorThread& operator= (
const BasicProactorThread&) =
delete;
831 BasicProactorThread (BasicProactorThread&&) =
delete;
838 BasicProactorThread& operator= (BasicProactorThread&&) =
delete;
843 ~BasicProactorThread ()
849#ifdef JOIN_HAS_IO_URING
851 BasicProactor<Policy> _proactor;
854 BasicProactor _proactor;
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
Convenience class that owns a Proactor running on a dedicated background thread.
Definition proactor.hpp:710
static int affinity() noexcept
get proactor thread affinity.
Definition proactor.hpp:739
static int priority(int prio)
set proactor thread scheduling priority.
Definition proactor.hpp:749
static int mlock() noexcept
lock proactor command queue memory in RAM.
Definition proactor.hpp:788
static int affinity(int core)
set proactor thread affinity.
Definition proactor.hpp:730
static pthread_t handle() noexcept
get the native handle of the proactor thread.
Definition proactor.hpp:767
static BasicProactor & proactor()
get the Proactor instance owned by the singleton ProactorThread.
Definition proactor.hpp:719
static int priority() noexcept
get proactor thread scheduling priority.
Definition proactor.hpp:758
basic proactor class.
Definition proactor.hpp:152
int mlock() const noexcept
lock proactor command queue memory in RAM.
Definition proactor_epoll_impl.hpp:132
int cancel(IoOperation *op, bool flush=false, bool sync=false) noexcept
cancel an in-flight operation.
Definition proactor.hpp:624
bool isRunning() const noexcept
check if the event loop is running.
Definition proactor_epoll_impl.hpp:146
void run()
run the event loop (blocking).
Definition proactor_epoll_impl.hpp:58
BasicProactor(const BasicProactor &other)=delete
copy constructor.
BasicProactor(BasicProactor &&other)=delete
move constructor.
~BasicProactor() noexcept
destroy instance.
Definition proactor_epoll_impl.hpp:47
int submit(IoOperation *op, bool flush=false, bool sync=false) noexcept
submit an asynchronous operation to the proactor.
Definition proactor.hpp:576
BasicProactor & operator=(const BasicProactor &other)=delete
copy assignment operator.
void stop(bool sync=true) noexcept
stop the event loop.
Definition proactor_epoll_impl.hpp:67
bool isProactorThread() const noexcept
check if the calling thread is the proactor thread.
Definition proactor_epoll_impl.hpp:155
BasicProactor()
initialize the proactor and its I/O backend.
Definition proactor_epoll_impl.hpp:29
completion handler interface class.
Definition proactor.hpp:75
CompletionHandler()=default
create instance.
virtual ~CompletionHandler()=default
destroy instance.
CompletionHandler(CompletionHandler &&other)=default
move constructor.
CompletionHandler & operator=(const CompletionHandler &other)=default
copy assignment operator.
virtual void onComplete(IoOperation *op, int result)
method called when an operation completes successfully.
Definition proactor.hpp:127
CompletionHandler(const CompletionHandler &other)=default
copy constructor.
virtual void onCancel(IoOperation *op, int result)
method called when an operation is cancelled.
Definition proactor.hpp:137
Event handler interface class.
Definition reactor.hpp:46
friend class Reactor
friendship with reactor.
Definition reactor.hpp:122
void join() noexcept
block the current thread until the running thread finishes its execution.
Definition thread.cpp:252
int affinity(int core)
set thread affinity.
Definition thread.cpp:106
int priority(int prio)
set thread priority.
Definition thread.cpp:169
pthread_t handle() const noexcept
get the handle of the thread of execution. @retunr thread of execution handle.
Definition thread.cpp:301
Definition acceptor.hpp:32
BasicProactorThread ProactorThread
Definition proactor.hpp:67
BasicProactor Proactor
Definition proactor.hpp:66
Describes a single asynchronous operation submitted to the Proactor.
Definition io_operation.hpp:41
BasicQueue< Type, Backend, SyncPolicy< Type, Backend > > Queue
queue type alias combining backend and policy.
Definition queue.hpp:1086
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47