30: _commands (_queueSize)
31, _wakeup (eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC))
32, _readOps (256, nullptr)
33, _writeOps (256, nullptr)
37 throw std::system_error (errno, std::system_category (),
"eventfd failed");
40 _reactor.
addHandler (_wakeup,
this,
true,
false,
false);
69 if (isProactorThread ())
71 cancelAllOperations ();
72 _reactor.stop (
false);
76 if (!_reactor.isRunning ())
81 std::atomic<bool> done{
false};
85 bool expected =
false;
86 if (!_stopping.compare_exchange_strong (expected,
true, std::memory_order_acq_rel))
97 writeCommand ({CommandType::Stop,
nullptr, sync, sync ? &done :
nullptr,
nullptr});
102 while (!done.load (std::memory_order_acquire))
106 _stopping.store (
false, std::memory_order_release);
109 _reactor.stop (sync);
117inline int join::BasicProactor::mbind (
int numa)
const noexcept
119 if (_commands.mbind (numa) == -1)
124 return _reactor.mbind (numa);
134 if (_commands.mlock () == -1)
139 return _reactor.mlock ();
148 return _reactor.isRunning ();
157 return _reactor.isReactorThread ();
164inline int join::BasicProactor::writeCommand (
const Command& cmd)
noexcept
172 if (
JOIN_UNLIKELY (::write (_wakeup, &value,
sizeof (uint64_t)) == -1))
175 lastError = std::error_code (errno, std::system_category ());
187inline void join::BasicProactor::readCommands () noexcept
190 if (
JOIN_UNLIKELY (::read (_wakeup, &count,
sizeof (count)) == -1))
196 while (_commands.tryPop (cmd) == 0)
198 processCommand (cmd);
206inline void join::BasicProactor::processCommand (
const Command& cmd)
noexcept
212 case CommandType::Submit:
213 err = submitOperation (cmd.op, cmd.flush);
216 case CommandType::Cancel:
217 err = cancelOperation (cmd.op, cmd.flush);
220 case CommandType::Stop:
221 cancelAllOperations ();
230 if (cmd.errc && (err != 0))
234 cmd.done->store (
true, std::memory_order_release);
242inline int join::BasicProactor::submitOperation (IoOperation* op, [[maybe_unused]]
bool flush)
noexcept
252 lastError = std::make_error_code (std::errc::bad_file_descriptor);
264 if (
JOIN_UNLIKELY (::connect (op->data.connect.fd, op->data.connect.addr, op->data.connect.addrlen) == -1 &&
265 errno != EINPROGRESS))
267 lastError = std::error_code (errno, std::system_category ());
272 if (
JOIN_UNLIKELY (
static_cast<size_t> (op->fd ()) >= _readOps.size ()))
274 size_t newSize =
static_cast<size_t> (op->fd ()) + 1;
275 _readOps.resize (newSize,
nullptr);
276 _writeOps.resize (newSize,
nullptr);
279 bool isWrite = isWriteOp (op->code);
281 if (
JOIN_UNLIKELY ((isWrite && (_writeOps[op->fd ()] !=
nullptr)) ||
282 (!isWrite && (_readOps[op->fd ()] !=
nullptr))))
290 _writeOps[op->fd ()] = op;
294 _readOps[op->fd ()] = op;
299 return _reactor.addHandler (op->fd (),
this, _readOps[op->fd ()] !=
nullptr, _writeOps[op->fd ()] !=
nullptr);
306inline int join::BasicProactor::cancelOperation (IoOperation* op, [[maybe_unused]]
bool flush)
noexcept
316 lastError = std::make_error_code (std::errc::bad_file_descriptor);
326 if (
JOIN_UNLIKELY (
static_cast<size_t> (op->fd ()) >= _readOps.size ()))
328 lastError = std::make_error_code (std::errc::bad_file_descriptor);
333 bool isWrite = isWriteOp (op->code);
335 if (
JOIN_UNLIKELY ((isWrite && (_writeOps[op->fd ()] != op)) || (!isWrite && (_readOps[op->fd ()] != op))))
344 _writeOps[op->fd ()] =
nullptr;
348 _readOps[op->fd ()] =
nullptr;
353 if (_readOps[op->fd ()] ==
nullptr && _writeOps[op->fd ()] ==
nullptr)
355 ret = _reactor.delHandler (op->fd ());
359 ret = _reactor.addHandler (op->fd (),
this, _readOps[op->fd ()] !=
nullptr, _writeOps[op->fd ()] !=
nullptr);
362 dispatchOperation (op, -ECANCELED,
true);
371inline void join::BasicProactor::cancelAllOperations () noexcept
373 for (
size_t fd = 0; fd < _readOps.size (); ++fd)
375 IoOperation* rOp = std::exchange (_readOps[fd],
nullptr);
376 IoOperation* wOp = std::exchange (_writeOps[fd],
nullptr);
379 _reactor.delHandler (fd);
381 dispatchOperation (rOp, -ECANCELED,
true);
382 dispatchOperation (wOp, -ECANCELED,
true);
390inline void join::BasicProactor::endOperation (IoOperation* op,
int result,
bool cancelled)
noexcept
399 if (
JOIN_UNLIKELY (fd < 0 ||
static_cast<size_t> (fd) >= _readOps.size ()))
404 if (isWriteOp (op->code))
406 _writeOps[fd] =
nullptr;
410 _readOps[fd] =
nullptr;
413 if (_readOps[fd] ==
nullptr && _writeOps[fd] ==
nullptr)
415 _reactor.delHandler (fd);
419 _reactor.addHandler (fd,
this, _readOps[fd] !=
nullptr, _writeOps[fd] !=
nullptr);
422 dispatchOperation (op, result, cancelled);
429inline bool join::BasicProactor::isWriteOp (uint8_t code)
noexcept
442inline int join::BasicProactor::executeOp (IoOperation* op)
noexcept
450 int fd = ::accept4 (op->data.accept.fd, op->data.accept.addr, op->data.accept.addrlen,
451 op->data.accept.flags);
456 return (fd == -1) ? -errno : fd;
462 socklen_t len =
sizeof (err);
463 if (
JOIN_UNLIKELY (::getsockopt (op->data.connect.fd, SOL_SOCKET, SO_ERROR, &err, &len) == -1))
473 ssize_t n = ::read (op->data.rw.fd, op->data.rw.buf, op->data.rw.len);
478 return (n == -1) ? -errno :
static_cast<int> (n);
484 ssize_t n = ::write (op->data.rw.fd, op->data.rw.buf, op->data.rw.len);
489 return (n == -1) ? -errno :
static_cast<int> (n);
494 ssize_t n = ::recvmsg (op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
499 return (n == -1) ? -errno :
static_cast<int> (n);
504 ssize_t n = ::sendmsg (op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
509 return (n == -1) ? -errno :
static_cast<int> (n);
515 ::recv (op->data.stream.fd, op->data.stream.buf, op->data.stream.len, op->data.stream.flags);
520 return (n == -1) ? -errno :
static_cast<int> (n);
526 ::send (op->data.stream.fd, op->data.stream.buf, op->data.stream.len, op->data.stream.flags);
531 return (n == -1) ? -errno :
static_cast<int> (n);
544inline void join::BasicProactor::onReadable (
int fd)
noexcept
552 endOperation (_readOps[fd], executeOp (_readOps[fd]),
false);
559inline void join::BasicProactor::onWriteable (
int fd)
noexcept
561 endOperation (_writeOps[fd], executeOp (_writeOps[fd]),
false);
568inline void join::BasicProactor::onClose (
int fd)
noexcept
570 IoOperation* rOp = std::exchange (_readOps[fd],
nullptr);
571 IoOperation* wOp = std::exchange (_writeOps[fd],
nullptr);
574 _reactor.delHandler (fd);
576 dispatchOperation (rOp, 0,
false);
577 dispatchOperation (wOp, 0,
false);
584inline void join::BasicProactor::onError (
int fd)
noexcept
586 IoOperation* rOp = std::exchange (_readOps[fd],
nullptr);
587 IoOperation* wOp = std::exchange (_writeOps[fd],
nullptr);
590 _reactor.delHandler (fd);
592 dispatchOperation (rOp, -ECONNRESET,
false);
593 dispatchOperation (wOp, -ECONNRESET,
false);
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
int mlock() const noexcept
lock proactor command queue memory in RAM.
Definition proactor_epoll_impl.hpp:132
bool isRunning() const noexcept
check if the event loop is running.
Definition proactor_epoll_impl.hpp:146
void run()
run the event loop (blocking).
Definition proactor_epoll_impl.hpp:58
~BasicProactor() noexcept
destroy instance.
Definition proactor_epoll_impl.hpp:47
void stop(bool sync=true) noexcept
stop the event loop.
Definition proactor_epoll_impl.hpp:67
bool isProactorThread() const noexcept
check if the calling thread is the proactor thread.
Definition proactor_epoll_impl.hpp:155
BasicProactor()
initialize the proactor and its I/O backend.
Definition proactor_epoll_impl.hpp:29
int addHandler(int fd, EventHandler *handler, bool wantRead=true, bool wantWrite=false, bool sync=true) noexcept
add handler to reactor.
Definition reactor.cpp:100
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
thread_local std::error_code lastError
last error.
Definition error.cpp:32
Opcode
operation code.
Definition io_operation.hpp:56
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47