29template <
typename Policy>
31: _commands (_queueSize)
32, _wakeup (initWakeup (is_default<Policy>{}))
34 static_assert (has_spin<Policy>::value || !has_sqpoll<Policy>::value,
"spin required for sq poll policy");
36 io_uring_params params{};
37 params.flags = Policy::flags;
38 initCqEntries (params, has_cq_entries<Policy>{});
39 initSqThreadIdle (params, has_sq_thread_idle<Policy>{});
40 initSqThreadCpu (params, has_sq_thread_cpu<Policy>{});
42 if (io_uring_queue_init_params (Policy::sqEntries, &_ring, ¶ms) < 0)
46 throw std::system_error (errno, std::system_category (),
"io_uring_queue_init failed");
50 initWakeupOp (is_default<Policy>{});
51 _pendingOps.reserve (Policy::sqEntries);
58template <
typename Policy>
63 io_uring_queue_exit (&_ring);
75template <
typename Policy>
78 if (isProactorThread ())
83 lastError = std::error_code (errno, std::system_category ());
90 std::atomic<bool> done{
false}, *pdone =
nullptr;
91 std::error_code errc, *perrc =
nullptr;
99 if (
JOIN_UNLIKELY (writeCommand ({CommandType::Flush,
nullptr,
false, pdone, perrc}) == -1))
107 while (!done.load (std::memory_order_acquire))
126template <
typename Policy>
129 _threadId.store (pthread_self (), std::memory_order_release);
131 _running.store (
true, std::memory_order_release);
134 _threadId.store (_invalidThreadId, std::memory_order_release);
141template <
typename Policy>
144 if (isProactorThread ())
146 _running.store (
false, std::memory_order_release);
147 cancelAllOperations ();
159 bool expected =
false;
160 if (!_stopping.compare_exchange_strong (expected,
true, std::memory_order_acq_rel))
163 while (_threadId.load (std::memory_order_acquire) != _invalidThreadId)
171 writeCommand ({CommandType::Stop,
nullptr, sync,
nullptr,
nullptr});
176 while (_threadId.load (std::memory_order_acquire) != _invalidThreadId)
180 _stopping.store (
false, std::memory_order_release);
188template <
typename Policy>
191 int ret = io_uring_register_buffers (&_ring, iovecs.data (), iovecs.size ());
194 lastError = std::error_code (-ret, std::system_category ());
205template <
typename Policy>
208 int ret = io_uring_unregister_buffers (&_ring);
211 lastError = std::error_code (-ret, std::system_category ());
223template <
typename Policy>
226 return _commands.mbind (numa);
234template <
typename Policy>
237 return _commands.
mlock ();
244template <
typename Policy>
247 return _running.load (std::memory_order_acquire);
254template <
typename Policy>
257 return _threadId.load (std::memory_order_acquire) == pthread_self ();
264template <
typename Policy>
267 return eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC);
274template <
typename Policy>
284template <
typename Policy>
287 _wakeupOp = IoOperation::makeRead (_wakeup, &_wakeupBuf,
sizeof (_wakeupBuf),
nullptr);
294template <
typename Policy>
304template <
typename Policy>
314template <
typename Policy>
317 params.flags |= IORING_SETUP_CQSIZE;
318 params.cq_entries = Policy::cqEntries;
325template <
typename Policy>
335template <
typename Policy>
338 params.sq_thread_idle = Policy::sqThreadIdle;
345template <
typename Policy>
355template <
typename Policy>
358 params.sq_thread_cpu = Policy::sqThreadCpu;
365template <
typename Policy>
368 return writeCommand (cmd, is_default<Policy>{});
375template <
typename Policy>
384 if (
JOIN_UNLIKELY (::write (_wakeup, &value,
sizeof (uint64_t)) == -1))
387 lastError = std::error_code (errno, std::system_category ());
399template <
typename Policy>
402 return _commands.push (cmd);
409template <
typename Policy>
413 while (_commands.tryPop (cmd) == 0)
415 processCommand (cmd);
423template <
typename Policy>
430 case CommandType::Submit:
431 err = submitOperation (cmd.op, cmd.flush);
434 case CommandType::Cancel:
435 err = cancelOperation (cmd.op, cmd.flush);
438 case CommandType::Stop:
439 _running.store (
false, std::memory_order_release);
440 cancelAllOperations ();
443 io_uring_submit (&_ring);
447 case CommandType::Flush:
451 lastError = std::error_code (errno, std::system_category ());
463 if (cmd.errc && (err != 0))
465 *cmd.errc = lastError;
467 cmd.done->store (
true, std::memory_order_release);
475template <
typename Policy>
486 lastError = std::make_error_code (std::errc::bad_file_descriptor);
496 io_uring_sqe* sqe = getSqe ();
505 prepareSqe (sqe, op);
506 op->state = IoOperation::State::Submitted;
507 op->index =
static_cast<uint32_t
> (_pendingOps.size ());
508 _pendingOps.push_back (op);
512 io_uring_submit (&_ring);
522template <
typename Policy>
533 lastError = std::make_error_code (std::errc::bad_file_descriptor);
537 if (
JOIN_UNLIKELY (op->state != IoOperation::State::Submitted))
543 if (
JOIN_UNLIKELY (op->index >= _pendingOps.size () || _pendingOps[op->index] != op))
549 io_uring_sqe* sqe = getSqe ();
558 op->state = IoOperation::State::Cancelling;
559 io_uring_prep_cancel (sqe, op, 0);
560 io_uring_sqe_set_data (sqe,
nullptr);
564 io_uring_submit (&_ring);
574template <
typename Policy>
577 for (IoOperation* op : _pendingOps)
579 cancelOperation (op,
false);
587template <
typename Policy>
595 if (
JOIN_LIKELY (op->index < _pendingOps.size () && _pendingOps[op->index] == op))
597 IoOperation* last = _pendingOps.back ();
598 _pendingOps[op->index] = last;
599 last->index = op->index;
600 _pendingOps.pop_back ();
603 dispatchOperation (op, result, cancelled);
610template <
typename Policy>
613 io_uring_sqe* sqe = io_uring_get_sqe (&_ring);
616 io_uring_submit (&_ring);
617 sqe = io_uring_get_sqe (&_ring);
627template <
typename Policy>
630 switch (
static_cast<IoOperation::Opcode
> (op->code))
632 case IoOperation::Opcode::Accept:
633 io_uring_prep_accept (sqe, op->data.accept.fd, op->data.accept.addr, op->data.accept.addrlen,
634 op->data.accept.flags);
637 case IoOperation::Opcode::Connect:
638 io_uring_prep_connect (sqe, op->data.connect.fd, op->data.connect.addr, op->data.connect.addrlen);
641 case IoOperation::Opcode::Read:
642 io_uring_prep_read (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0);
645 case IoOperation::Opcode::Write:
646 io_uring_prep_write (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0);
649 case IoOperation::Opcode::ReadFixed:
650 io_uring_prep_read_fixed (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0, op->data.rw.index);
653 case IoOperation::Opcode::WriteFixed:
654 io_uring_prep_write_fixed (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0, op->data.rw.index);
657 case IoOperation::Opcode::RecvMsg:
658 io_uring_prep_recvmsg (sqe, op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
661 case IoOperation::Opcode::SendMsg:
662 io_uring_prep_sendmsg (sqe, op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
665 case IoOperation::Opcode::Recv:
666 io_uring_prep_recv (sqe, op->data.stream.fd, op->data.stream.buf, op->data.stream.len,
667 op->data.stream.flags);
670 case IoOperation::Opcode::Send:
671 io_uring_prep_send (sqe, op->data.stream.fd, op->data.stream.buf, op->data.stream.len,
672 op->data.stream.flags);
676 io_uring_prep_nop (sqe);
679 io_uring_sqe_set_data (sqe, op);
683 sqe->flags |= IOSQE_IO_LINK;
691template <
typename Policy>
694 io_uring_sqe* sqe = getSqe ();
698 prepareSqe (sqe, &_wakeupOp);
699 _wakeupOp.state = IoOperation::State::Submitted;
700 io_uring_submit (&_ring);
708template <
typename Policy>
711 dispatchCqe (cqe, is_default<Policy>{});
718template <
typename Policy>
721 IoOperation* op =
static_cast<IoOperation*
> (io_uring_cqe_get_data (cqe));
729 _wakeupOp.state = IoOperation::State::Idle;
731 if (
JOIN_LIKELY (_running.load (std::memory_order_acquire)))
743 int result = cqe->res;
744 bool cancelled = (result < 0) && (result == -ECANCELED || op->state == IoOperation::State::Cancelling);
745 endOperation (op, result, cancelled);
752template <
typename Policy>
755 IoOperation* op =
static_cast<IoOperation*
> (io_uring_cqe_get_data (cqe));
766 int result = cqe->res;
767 bool cancelled = (result < 0) && (result == -ECANCELED || op->state == IoOperation::State::Cancelling);
768 endOperation (op, result, cancelled);
775template <
typename Policy>
778 eventLoop (has_spin<Policy>{}, has_sqpoll<Policy>{});
785template <
typename Policy>
788 if (
JOIN_LIKELY (_running.load (std::memory_order_acquire)))
793 while (_running.load (std::memory_order_acquire) || !_pendingOps.empty ())
795 io_uring_cqe* cqe =
nullptr;
797 if (
JOIN_LIKELY (_running.load (std::memory_order_acquire)))
807 io_uring_submit (&_ring);
818 io_uring_cqe_seen (&_ring, cqe);
820 while (io_uring_peek_cqe (&_ring, &cqe) == 0);
828template <
typename Policy>
831 Backoff backoff (Policy::spin);
834 while ((running = _running.load (std::memory_order_acquire)) || !_pendingOps.empty ())
842 io_uring_submit (&_ring);
845 io_uring_cqe* cqe =
nullptr;
855 io_uring_cqe_seen (&_ring, cqe);
857 while (io_uring_peek_cqe (&_ring, &cqe) == 0);
867template <
typename Policy>
870 Backoff backoff (Policy::spin);
873 while ((running = _running.load (std::memory_order_acquire)) || !_pendingOps.empty ())
879 if (
JOIN_UNLIKELY (IO_URING_READ_ONCE (*_ring.sq.kflags) & IORING_SQ_NEED_WAKEUP))
881 io_uring_enter (_ring.ring_fd, 0, 0, IORING_ENTER_SQ_WAKEUP,
nullptr);
886 io_uring_submit (&_ring);
889 io_uring_cqe* cqe =
nullptr;
899 io_uring_cqe_seen (&_ring, cqe);
901 while (io_uring_peek_cqe (&_ring, &cqe) == 0);
basic proactor class.
Definition proactor.hpp:152
int mlock() const noexcept
lock proactor command queue memory in RAM.
Definition proactor_epoll_impl.hpp:132
bool isRunning() const noexcept
check if the event loop is running.
Definition proactor_epoll_impl.hpp:146
void run()
run the event loop (blocking).
Definition proactor_epoll_impl.hpp:58
~BasicProactor() noexcept
destroy instance.
Definition proactor_epoll_impl.hpp:47
void stop(bool sync=true) noexcept
stop the event loop.
Definition proactor_epoll_impl.hpp:67
bool isProactorThread() const noexcept
check if the calling thread is the proactor thread.
Definition proactor_epoll_impl.hpp:155
BasicProactor()
initialize the proactor and its I/O backend.
Definition proactor_epoll_impl.hpp:29
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47