join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
proactor_epoll_impl.hpp
Go to the documentation of this file.
1
25// =========================================================================
26// CLASS : BasicProactor
27// METHOD : BasicProactor
28// =========================================================================
30: _commands (_queueSize)
31, _wakeup (eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC))
32, _readOps (256, nullptr)
33, _writeOps (256, nullptr)
34{
35 if (_wakeup == -1)
36 {
37 throw std::system_error (errno, std::system_category (), "eventfd failed"); // LCOV_EXCL_LINE
38 }
39
40 _reactor.addHandler (_wakeup, this, true, false, false);
41}
42
43// =========================================================================
44// CLASS : BasicProactor
45// METHOD : ~BasicProactor
46// =========================================================================
48{
49 stop (true);
50
51 ::close (_wakeup);
52}
53
54// =========================================================================
55// CLASS : BasicProactor
56// METHOD : run
57// =========================================================================
59{
60 _reactor.run ();
61}
62
63// =========================================================================
64// CLASS : BasicProactor
65// METHOD : stop
66// =========================================================================
67inline void join::BasicProactor::stop (bool sync) noexcept
68{
69 if (isProactorThread ())
70 {
71 cancelAllOperations ();
72 _reactor.stop (false);
73 return;
74 }
75
76 if (!_reactor.isRunning ())
77 {
78 return;
79 }
80
81 std::atomic<bool> done{false};
82
83 if (JOIN_LIKELY (sync))
84 {
85 bool expected = false;
86 if (!_stopping.compare_exchange_strong (expected, true, std::memory_order_acq_rel))
87 {
88 Backoff backoff;
89 while (isRunning ())
90 {
91 backoff ();
92 }
93 return;
94 }
95 }
96
97 writeCommand ({CommandType::Stop, nullptr, sync, sync ? &done : nullptr, nullptr});
98
99 if (JOIN_LIKELY (sync))
100 {
101 Backoff backoff;
102 while (!done.load (std::memory_order_acquire))
103 {
104 backoff ();
105 }
106 _stopping.store (false, std::memory_order_release);
107 }
108
109 _reactor.stop (sync);
110}
111
112#ifdef JOIN_HAS_NUMA
113// =========================================================================
114// CLASS : BasicProactor
115// METHOD : mbind
116// =========================================================================
117inline int join::BasicProactor::mbind (int numa) const noexcept
118{
119 if (_commands.mbind (numa) == -1)
120 {
121 return -1;
122 }
123
124 return _reactor.mbind (numa);
125}
126#endif
127
128// =========================================================================
129// CLASS : BasicProactor
130// METHOD : mlock
131// =========================================================================
132inline int join::BasicProactor::mlock () const noexcept
133{
134 if (_commands.mlock () == -1)
135 {
136 return -1;
137 }
138
139 return _reactor.mlock ();
140}
141
142// =========================================================================
143// CLASS : BasicProactor
144// METHOD : isRunning
145// =========================================================================
146inline bool join::BasicProactor::isRunning () const noexcept
147{
148 return _reactor.isRunning ();
149}
150
151// =========================================================================
152// CLASS : BasicProactor
153// METHOD : isProactorThread
154// =========================================================================
155inline bool join::BasicProactor::isProactorThread () const noexcept
156{
157 return _reactor.isReactorThread ();
158}
159
160// =========================================================================
161// CLASS : BasicProactor
162// METHOD : writeCommand
163// =========================================================================
164inline int join::BasicProactor::writeCommand (const Command& cmd) noexcept
165{
166 if (JOIN_UNLIKELY (_commands.push (cmd) == -1))
167 {
168 return -1; // LCOV_EXCL_LINE
169 }
170
171 uint64_t value = 1;
172 if (JOIN_UNLIKELY (::write (_wakeup, &value, sizeof (uint64_t)) == -1))
173 {
174 // LCOV_EXCL_START
175 lastError = std::error_code (errno, std::system_category ());
176 return -1;
177 // LCOV_EXCL_STOP
178 }
179
180 return 0;
181}
182
183// =========================================================================
184// CLASS : BasicProactor
185// METHOD : readCommands
186// =========================================================================
187inline void join::BasicProactor::readCommands () noexcept
188{
189 uint64_t count;
190 if (JOIN_UNLIKELY (::read (_wakeup, &count, sizeof (count)) == -1))
191 {
192 return; // LCOV_EXCL_LINE
193 }
194
195 Command cmd;
196 while (_commands.tryPop (cmd) == 0)
197 {
198 processCommand (cmd);
199 }
200}
201
202// =========================================================================
203// CLASS : BasicProactor
204// METHOD : processCommand
205// =========================================================================
206inline void join::BasicProactor::processCommand (const Command& cmd) noexcept
207{
208 int err = 0;
209
210 switch (cmd.type)
211 {
212 case CommandType::Submit:
213 err = submitOperation (cmd.op, cmd.flush);
214 break;
215
216 case CommandType::Cancel:
217 err = cancelOperation (cmd.op, cmd.flush);
218 break;
219
220 case CommandType::Stop:
221 cancelAllOperations ();
222 break;
223
224 default:
225 break;
226 }
227
228 if (JOIN_UNLIKELY (cmd.done))
229 {
230 if (cmd.errc && (err != 0))
231 {
232 *cmd.errc = lastError;
233 }
234 cmd.done->store (true, std::memory_order_release);
235 }
236}
237
238// =========================================================================
239// CLASS : BasicProactor
240// METHOD : submitOperation
241// =========================================================================
242inline int join::BasicProactor::submitOperation (IoOperation* op, [[maybe_unused]] bool flush) noexcept
243{
244 if (JOIN_UNLIKELY (op == nullptr))
245 {
247 return -1;
248 }
249
250 if (JOIN_UNLIKELY (op->fd () < 0))
251 {
252 lastError = std::make_error_code (std::errc::bad_file_descriptor);
253 return -1;
254 }
255
256 if (JOIN_UNLIKELY (op->state != IoOperation::State::Idle))
257 {
259 return -1;
260 }
261
262 if (JOIN_UNLIKELY (static_cast<IoOperation::Opcode> (op->code) == IoOperation::Opcode::Connect))
263 {
264 if (JOIN_UNLIKELY (::connect (op->data.connect.fd, op->data.connect.addr, op->data.connect.addrlen) == -1 &&
265 errno != EINPROGRESS))
266 {
267 lastError = std::error_code (errno, std::system_category ());
268 return -1;
269 }
270 }
271
272 if (JOIN_UNLIKELY (static_cast<size_t> (op->fd ()) >= _readOps.size ()))
273 {
274 size_t newSize = static_cast<size_t> (op->fd ()) + 1;
275 _readOps.resize (newSize, nullptr);
276 _writeOps.resize (newSize, nullptr);
277 }
278
279 bool isWrite = isWriteOp (op->code);
280
281 if (JOIN_UNLIKELY ((isWrite && (_writeOps[op->fd ()] != nullptr)) ||
282 (!isWrite && (_readOps[op->fd ()] != nullptr))))
283 {
285 return -1;
286 }
287
288 if (isWrite)
289 {
290 _writeOps[op->fd ()] = op;
291 }
292 else
293 {
294 _readOps[op->fd ()] = op;
295 }
296
298
299 return _reactor.addHandler (op->fd (), this, _readOps[op->fd ()] != nullptr, _writeOps[op->fd ()] != nullptr);
300}
301
302// =========================================================================
303// CLASS : BasicProactor
304// METHOD : cancelOperation
305// =========================================================================
306inline int join::BasicProactor::cancelOperation (IoOperation* op, [[maybe_unused]] bool flush) noexcept
307{
308 if (JOIN_UNLIKELY (op == nullptr))
309 {
311 return -1;
312 }
313
314 if (JOIN_UNLIKELY (op->fd () < 0))
315 {
316 lastError = std::make_error_code (std::errc::bad_file_descriptor);
317 return -1;
318 }
319
321 {
323 return -1;
324 }
325
326 if (JOIN_UNLIKELY (static_cast<size_t> (op->fd ()) >= _readOps.size ()))
327 {
328 lastError = std::make_error_code (std::errc::bad_file_descriptor);
329 return -1;
330 }
331
333 bool isWrite = isWriteOp (op->code);
334
335 if (JOIN_UNLIKELY ((isWrite && (_writeOps[op->fd ()] != op)) || (!isWrite && (_readOps[op->fd ()] != op))))
336 {
339 return -1;
340 }
341
342 if (isWrite)
343 {
344 _writeOps[op->fd ()] = nullptr;
345 }
346 else
347 {
348 _readOps[op->fd ()] = nullptr;
349 }
350
351 int ret = 0;
352
353 if (_readOps[op->fd ()] == nullptr && _writeOps[op->fd ()] == nullptr)
354 {
355 ret = _reactor.delHandler (op->fd ());
356 }
357 else
358 {
359 ret = _reactor.addHandler (op->fd (), this, _readOps[op->fd ()] != nullptr, _writeOps[op->fd ()] != nullptr);
360 }
361
362 dispatchOperation (op, -ECANCELED, true);
363
364 return ret;
365}
366
367// =========================================================================
368// CLASS : BasicProactor
369// METHOD : cancelAllOperations
370// =========================================================================
371inline void join::BasicProactor::cancelAllOperations () noexcept
372{
373 for (size_t fd = 0; fd < _readOps.size (); ++fd)
374 {
375 IoOperation* rOp = std::exchange (_readOps[fd], nullptr);
376 IoOperation* wOp = std::exchange (_writeOps[fd], nullptr);
377 if (rOp || wOp)
378 {
379 _reactor.delHandler (fd);
380 }
381 dispatchOperation (rOp, -ECANCELED, true);
382 dispatchOperation (wOp, -ECANCELED, true);
383 }
384}
385
386// =========================================================================
387// CLASS : BasicProactor
388// METHOD : endOperation
389// =========================================================================
390inline void join::BasicProactor::endOperation (IoOperation* op, int result, bool cancelled) noexcept
391{
392 if (JOIN_UNLIKELY (op == nullptr))
393 {
394 return; // LCOV_EXCL_LINE
395 }
396
397 int fd = op->fd ();
398
399 if (JOIN_UNLIKELY (fd < 0 || static_cast<size_t> (fd) >= _readOps.size ()))
400 {
401 return; // LCOV_EXCL_LINE
402 }
403
404 if (isWriteOp (op->code))
405 {
406 _writeOps[fd] = nullptr;
407 }
408 else
409 {
410 _readOps[fd] = nullptr;
411 }
412
413 if (_readOps[fd] == nullptr && _writeOps[fd] == nullptr)
414 {
415 _reactor.delHandler (fd);
416 }
417 else
418 {
419 _reactor.addHandler (fd, this, _readOps[fd] != nullptr, _writeOps[fd] != nullptr);
420 }
421
422 dispatchOperation (op, result, cancelled);
423}
424
425// =========================================================================
426// CLASS : BasicProactor
427// METHOD : isWriteOp
428// =========================================================================
429inline bool join::BasicProactor::isWriteOp (uint8_t code) noexcept
430{
431 return code == static_cast<uint8_t> (IoOperation::Opcode::Connect) ||
432 code == static_cast<uint8_t> (IoOperation::Opcode::Write) ||
433 code == static_cast<uint8_t> (IoOperation::Opcode::WriteFixed) ||
434 code == static_cast<uint8_t> (IoOperation::Opcode::SendMsg) ||
435 code == static_cast<uint8_t> (IoOperation::Opcode::Send);
436}
437
438// =========================================================================
439// CLASS : BasicProactor
440// METHOD : executeOp
441// =========================================================================
442inline int join::BasicProactor::executeOp (IoOperation* op) noexcept
443{
444 for (;;)
445 {
446 switch (static_cast<IoOperation::Opcode> (op->code))
447 {
449 {
450 int fd = ::accept4 (op->data.accept.fd, op->data.accept.addr, op->data.accept.addrlen,
451 op->data.accept.flags);
452 if (JOIN_UNLIKELY ((fd == -1) && (errno == EINTR)))
453 {
454 continue; // LCOV_EXCL_LINE
455 }
456 return (fd == -1) ? -errno : fd;
457 }
458
460 {
461 int err = 0;
462 socklen_t len = sizeof (err);
463 if (JOIN_UNLIKELY (::getsockopt (op->data.connect.fd, SOL_SOCKET, SO_ERROR, &err, &len) == -1))
464 {
465 return -errno;
466 }
467 return JOIN_UNLIKELY (err) ? -err : 0;
468 }
469
472 {
473 ssize_t n = ::read (op->data.rw.fd, op->data.rw.buf, op->data.rw.len);
474 if (JOIN_UNLIKELY ((n == -1) && (errno == EINTR)))
475 {
476 continue; // LCOV_EXCL_LINE
477 }
478 return (n == -1) ? -errno : static_cast<int> (n);
479 }
480
483 {
484 ssize_t n = ::write (op->data.rw.fd, op->data.rw.buf, op->data.rw.len);
485 if (JOIN_UNLIKELY ((n == -1) && (errno == EINTR)))
486 {
487 continue; // LCOV_EXCL_LINE
488 }
489 return (n == -1) ? -errno : static_cast<int> (n);
490 }
491
493 {
494 ssize_t n = ::recvmsg (op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
495 if (JOIN_UNLIKELY ((n == -1) && (errno == EINTR)))
496 {
497 continue; // LCOV_EXCL_LINE
498 }
499 return (n == -1) ? -errno : static_cast<int> (n);
500 }
501
503 {
504 ssize_t n = ::sendmsg (op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
505 if (JOIN_UNLIKELY ((n == -1) && (errno == EINTR)))
506 {
507 continue; // LCOV_EXCL_LINE
508 }
509 return (n == -1) ? -errno : static_cast<int> (n);
510 }
511
513 {
514 ssize_t n =
515 ::recv (op->data.stream.fd, op->data.stream.buf, op->data.stream.len, op->data.stream.flags);
516 if (JOIN_UNLIKELY ((n == -1) && (errno == EINTR)))
517 {
518 continue; // LCOV_EXCL_LINE
519 }
520 return (n == -1) ? -errno : static_cast<int> (n);
521 }
522
524 {
525 ssize_t n =
526 ::send (op->data.stream.fd, op->data.stream.buf, op->data.stream.len, op->data.stream.flags);
527 if (JOIN_UNLIKELY ((n == -1) && (errno == EINTR)))
528 {
529 continue; // LCOV_EXCL_LINE
530 }
531 return (n == -1) ? -errno : static_cast<int> (n);
532 }
533
534 default:
535 return -EINVAL;
536 }
537 }
538}
539
540// =========================================================================
541// CLASS : BasicProactor
542// METHOD : onReadable
543// =========================================================================
544inline void join::BasicProactor::onReadable (int fd) noexcept
545{
546 if (JOIN_UNLIKELY (fd == _wakeup))
547 {
548 readCommands ();
549 return;
550 }
551
552 endOperation (_readOps[fd], executeOp (_readOps[fd]), false);
553}
554
555// =========================================================================
556// CLASS : BasicProactor
557// METHOD : onWriteable
558// =========================================================================
559inline void join::BasicProactor::onWriteable (int fd) noexcept
560{
561 endOperation (_writeOps[fd], executeOp (_writeOps[fd]), false);
562}
563
564// =========================================================================
565// CLASS : BasicProactor
566// METHOD : onClose
567// =========================================================================
568inline void join::BasicProactor::onClose (int fd) noexcept
569{
570 IoOperation* rOp = std::exchange (_readOps[fd], nullptr);
571 IoOperation* wOp = std::exchange (_writeOps[fd], nullptr);
572 if (JOIN_LIKELY (rOp || wOp))
573 {
574 _reactor.delHandler (fd);
575 }
576 dispatchOperation (rOp, 0, false);
577 dispatchOperation (wOp, 0, false);
578}
579
580// =========================================================================
581// CLASS : BasicProactor
582// METHOD : onError
583// =========================================================================
584inline void join::BasicProactor::onError (int fd) noexcept
585{
586 IoOperation* rOp = std::exchange (_readOps[fd], nullptr);
587 IoOperation* wOp = std::exchange (_writeOps[fd], nullptr);
588 if (JOIN_LIKELY (rOp || wOp))
589 {
590 _reactor.delHandler (fd);
591 }
592 dispatchOperation (rOp, -ECONNRESET, false);
593 dispatchOperation (wOp, -ECONNRESET, false);
594}
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
int mlock() const noexcept
lock proactor command queue memory in RAM.
Definition proactor_epoll_impl.hpp:132
bool isRunning() const noexcept
check if the event loop is running.
Definition proactor_epoll_impl.hpp:146
void run()
run the event loop (blocking).
Definition proactor_epoll_impl.hpp:58
~BasicProactor() noexcept
destroy instance.
Definition proactor_epoll_impl.hpp:47
void stop(bool sync=true) noexcept
stop the event loop.
Definition proactor_epoll_impl.hpp:67
bool isProactorThread() const noexcept
check if the calling thread is the proactor thread.
Definition proactor_epoll_impl.hpp:155
BasicProactor()
initialize the proactor and its I/O backend.
Definition proactor_epoll_impl.hpp:29
int addHandler(int fd, EventHandler *handler, bool wantRead=true, bool wantWrite=false, bool sync=true) noexcept
add handler to reactor.
Definition reactor.cpp:100
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
thread_local std::error_code lastError
last error.
Definition error.cpp:32
Opcode
operation code.
Definition io_operation.hpp:56
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47