join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
proactor_uring_impl.hpp
Go to the documentation of this file.
1
25// =========================================================================
26// CLASS : BasicProactor
27// METHOD : BasicProactor
28// =========================================================================
29template <typename Policy>
31: _commands (_queueSize)
32, _wakeup (initWakeup (is_default<Policy>{}))
33{
34 static_assert (has_spin<Policy>::value || !has_sqpoll<Policy>::value, "spin required for sq poll policy");
35
36 io_uring_params params{};
37 params.flags = Policy::flags;
38 initCqEntries (params, has_cq_entries<Policy>{});
39 initSqThreadIdle (params, has_sq_thread_idle<Policy>{});
40 initSqThreadCpu (params, has_sq_thread_cpu<Policy>{});
41
42 if (io_uring_queue_init_params (Policy::sqEntries, &_ring, &params) < 0)
43 {
44 // LCOV_EXCL_START
45 ::close (_wakeup);
46 throw std::system_error (errno, std::system_category (), "io_uring_queue_init failed");
47 // LCOV_EXCL_STOP
48 }
49
50 initWakeupOp (is_default<Policy>{});
51 _pendingOps.reserve (Policy::sqEntries);
52}
53
54// =========================================================================
55// CLASS : BasicProactor
56// METHOD : ~BasicProactor
57// =========================================================================
58template <typename Policy>
60{
61 stop (true);
62
63 io_uring_queue_exit (&_ring);
64
65 if (_wakeup != -1)
66 {
67 ::close (_wakeup);
68 }
69}
70
71// =========================================================================
72// CLASS : BasicProactor
73// METHOD : flush
74// =========================================================================
75template <typename Policy>
76int join::BasicProactor<Policy>::flush (bool sync) noexcept
77{
78 if (isProactorThread ())
79 {
80 if (JOIN_UNLIKELY (io_uring_submit (&_ring) < 0))
81 {
82 // LCOV_EXCL_START
83 lastError = std::error_code (errno, std::system_category ());
84 return -1;
85 // LCOV_EXCL_STOP
86 }
87 return 0;
88 }
89
90 std::atomic<bool> done{false}, *pdone = nullptr;
91 std::error_code errc, *perrc = nullptr;
92
93 if (JOIN_UNLIKELY (sync))
94 {
95 pdone = &done;
96 perrc = &errc;
97 }
98
99 if (JOIN_UNLIKELY (writeCommand ({CommandType::Flush, nullptr, false, pdone, perrc}) == -1))
100 {
101 return -1; // LCOV_EXCL_LINE
102 }
103
104 if (JOIN_UNLIKELY (sync))
105 {
106 Backoff backoff;
107 while (!done.load (std::memory_order_acquire))
108 {
109 backoff ();
110 }
111
112 if (JOIN_UNLIKELY (errc))
113 {
114 lastError = errc;
115 return -1;
116 }
117 }
118
119 return 0;
120}
121
122// =========================================================================
123// CLASS : BasicProactor
124// METHOD : run
125// =========================================================================
126template <typename Policy>
128{
129 _threadId.store (pthread_self (), std::memory_order_release);
130
131 _running.store (true, std::memory_order_release);
132 eventLoop ();
133
134 _threadId.store (_invalidThreadId, std::memory_order_release);
135}
136
137// =========================================================================
138// CLASS : BasicProactor
139// METHOD : stop
140// =========================================================================
141template <typename Policy>
142void join::BasicProactor<Policy>::stop (bool sync) noexcept
143{
144 if (isProactorThread ())
145 {
146 _running.store (false, std::memory_order_release);
147 cancelAllOperations ();
148 eventLoop ();
149 return;
150 }
151
152 if (!isRunning ())
153 {
154 return;
155 }
156
157 if (JOIN_LIKELY (sync))
158 {
159 bool expected = false;
160 if (!_stopping.compare_exchange_strong (expected, true, std::memory_order_acq_rel))
161 {
162 Backoff backoff;
163 while (_threadId.load (std::memory_order_acquire) != _invalidThreadId)
164 {
165 backoff ();
166 }
167 return;
168 }
169 }
170
171 writeCommand ({CommandType::Stop, nullptr, sync, nullptr, nullptr});
172
173 if (JOIN_LIKELY (sync))
174 {
175 Backoff backoff;
176 while (_threadId.load (std::memory_order_acquire) != _invalidThreadId)
177 {
178 backoff ();
179 }
180 _stopping.store (false, std::memory_order_release);
181 }
182}
183
184// =========================================================================
185// CLASS : BasicProactor
186// METHOD : registerBuffers
187// =========================================================================
188template <typename Policy>
189int join::BasicProactor<Policy>::registerBuffers (const std::vector<iovec>& iovecs) noexcept
190{
191 int ret = io_uring_register_buffers (&_ring, iovecs.data (), iovecs.size ());
192 if (JOIN_UNLIKELY (ret < 0))
193 {
194 lastError = std::error_code (-ret, std::system_category ());
195 return -1;
196 }
197
198 return 0;
199}
200
201// =========================================================================
202// CLASS : BasicProactor
203// METHOD : unregisterBuffers
204// =========================================================================
205template <typename Policy>
207{
208 int ret = io_uring_unregister_buffers (&_ring);
209 if (JOIN_UNLIKELY (ret < 0))
210 {
211 lastError = std::error_code (-ret, std::system_category ());
212 return -1;
213 }
214
215 return 0;
216}
217
218#ifdef JOIN_HAS_NUMA
219// =========================================================================
220// CLASS : BasicProactor
221// METHOD : mbind
222// =========================================================================
223template <typename Policy>
224int join::BasicProactor<Policy>::mbind (int numa) const noexcept
225{
226 return _commands.mbind (numa);
227}
228#endif
229
230// =========================================================================
231// CLASS : BasicProactor
232// METHOD : mlock
233// =========================================================================
234template <typename Policy>
235int join::BasicProactor<Policy>::mlock () const noexcept
236{
237 return _commands.mlock ();
238}
239
240// =========================================================================
241// CLASS : BasicProactor
242// METHOD : isRunning
243// =========================================================================
244template <typename Policy>
245bool join::BasicProactor<Policy>::isRunning () const noexcept
246{
247 return _running.load (std::memory_order_acquire);
248}
249
250// =========================================================================
251// CLASS : BasicProactor
252// METHOD : isProactorThread
253// =========================================================================
254template <typename Policy>
256{
257 return _threadId.load (std::memory_order_acquire) == pthread_self ();
258}
259
260// =========================================================================
261// CLASS : BasicProactor
262// METHOD : initWakeup
263// =========================================================================
264template <typename Policy>
265int join::BasicProactor<Policy>::initWakeup (std::true_type) noexcept
266{
267 return eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC);
268}
269
270// =========================================================================
271// CLASS : BasicProactor
272// METHOD : initWakeup
273// =========================================================================
274template <typename Policy>
275int join::BasicProactor<Policy>::initWakeup (std::false_type) noexcept
276{
277 return -1;
278}
279
280// =========================================================================
281// CLASS : BasicProactor
282// METHOD : initWakeupOp
283// =========================================================================
284template <typename Policy>
285void join::BasicProactor<Policy>::initWakeupOp (std::true_type) noexcept
286{
287 _wakeupOp = IoOperation::makeRead (_wakeup, &_wakeupBuf, sizeof (_wakeupBuf), nullptr);
288}
289
290// =========================================================================
291// CLASS : BasicProactor
292// METHOD : initWakeupOp
293// =========================================================================
294template <typename Policy>
295void join::BasicProactor<Policy>::initWakeupOp (std::false_type) noexcept
296{
297 // no-op.
298}
299
300// =========================================================================
301// CLASS : BasicProactor
302// METHOD : initCqEntries
303// =========================================================================
304template <typename Policy>
305void join::BasicProactor<Policy>::initCqEntries (io_uring_params&, std::false_type) noexcept
306{
307 // no-op.
308}
309
310// =========================================================================
311// CLASS : BasicProactor
312// METHOD : initCqEntries
313// =========================================================================
314template <typename Policy>
315void join::BasicProactor<Policy>::initCqEntries (io_uring_params& params, std::true_type) noexcept
316{
317 params.flags |= IORING_SETUP_CQSIZE;
318 params.cq_entries = Policy::cqEntries;
319}
320
321// =========================================================================
322// CLASS : BasicProactor
323// METHOD : initSqThreadIdle
324// =========================================================================
325template <typename Policy>
326void join::BasicProactor<Policy>::initSqThreadIdle (io_uring_params&, std::false_type) noexcept
327{
328 // no-op.
329}
330
331// =========================================================================
332// CLASS : BasicProactor
333// METHOD : initSqThreadIdle
334// =========================================================================
335template <typename Policy>
336void join::BasicProactor<Policy>::initSqThreadIdle (io_uring_params& params, std::true_type) noexcept
337{
338 params.sq_thread_idle = Policy::sqThreadIdle;
339}
340
341// =========================================================================
342// CLASS : BasicProactor
343// METHOD : initSqThreadCpu
344// =========================================================================
345template <typename Policy>
346void join::BasicProactor<Policy>::initSqThreadCpu (io_uring_params&, std::false_type) noexcept
347{
348 // no-op.
349}
350
351// =========================================================================
352// CLASS : BasicProactor
353// METHOD : initSqThreadCpu
354// =========================================================================
355template <typename Policy>
356void join::BasicProactor<Policy>::initSqThreadCpu (io_uring_params& params, std::true_type) noexcept
357{
358 params.sq_thread_cpu = Policy::sqThreadCpu;
359}
360
361// =========================================================================
362// CLASS : BasicProactor
363// METHOD : writeCommand
364// =========================================================================
365template <typename Policy>
366int join::BasicProactor<Policy>::writeCommand (const Command& cmd) noexcept
367{
368 return writeCommand (cmd, is_default<Policy>{});
369}
370
371// =========================================================================
372// CLASS : BasicProactor
373// METHOD : writeCommand
374// =========================================================================
375template <typename Policy>
376int join::BasicProactor<Policy>::writeCommand (const Command& cmd, std::true_type) noexcept
377{
378 if (JOIN_UNLIKELY (_commands.push (cmd) == -1))
379 {
380 return -1; // LCOV_EXCL_LINE
381 }
382
383 uint64_t value = 1;
384 if (JOIN_UNLIKELY (::write (_wakeup, &value, sizeof (uint64_t)) == -1))
385 {
386 // LCOV_EXCL_START
387 lastError = std::error_code (errno, std::system_category ());
388 return -1;
389 // LCOV_EXCL_STOP
390 }
391
392 return 0;
393}
394
395// =========================================================================
396// CLASS : BasicProactor
397// METHOD : writeCommand
398// =========================================================================
399template <typename Policy>
400int join::BasicProactor<Policy>::writeCommand (const Command& cmd, std::false_type) noexcept
401{
402 return _commands.push (cmd);
403}
404
405// =========================================================================
406// CLASS : BasicProactor
407// METHOD : readCommands
408// =========================================================================
409template <typename Policy>
411{
412 Command cmd;
413 while (_commands.tryPop (cmd) == 0)
414 {
415 processCommand (cmd);
416 }
417}
418
419// =========================================================================
420// CLASS : BasicProactor
421// METHOD : processCommand
422// =========================================================================
423template <typename Policy>
424void join::BasicProactor<Policy>::processCommand (const Command& cmd) noexcept
425{
426 int err = 0;
427
428 switch (cmd.type)
429 {
430 case CommandType::Submit:
431 err = submitOperation (cmd.op, cmd.flush);
432 break;
433
434 case CommandType::Cancel:
435 err = cancelOperation (cmd.op, cmd.flush);
436 break;
437
438 case CommandType::Stop:
439 _running.store (false, std::memory_order_release);
440 cancelAllOperations ();
441 if (JOIN_UNLIKELY (cmd.flush))
442 {
443 io_uring_submit (&_ring);
444 }
445 break;
446
447 case CommandType::Flush:
448 if (JOIN_UNLIKELY (io_uring_submit (&_ring) < 0))
449 {
450 // LCOV_EXCL_START
451 lastError = std::error_code (errno, std::system_category ());
452 err = -1;
453 // LCOV_EXCL_STOP
454 }
455 break;
456
457 default:
458 break;
459 }
460
461 if (JOIN_UNLIKELY (cmd.done))
462 {
463 if (cmd.errc && (err != 0))
464 {
465 *cmd.errc = lastError;
466 }
467 cmd.done->store (true, std::memory_order_release);
468 }
469}
470
471// =========================================================================
472// CLASS : BasicProactor
473// METHOD : submitOperation
474// =========================================================================
475template <typename Policy>
476int join::BasicProactor<Policy>::submitOperation (IoOperation* op, bool flush) noexcept
477{
478 if (JOIN_UNLIKELY (op == nullptr))
479 {
480 lastError = make_error_code (Errc::InvalidParam);
481 return -1;
482 }
483
484 if (JOIN_UNLIKELY (op->fd () < 0))
485 {
486 lastError = std::make_error_code (std::errc::bad_file_descriptor);
487 return -1;
488 }
489
490 if (JOIN_UNLIKELY (op->state != IoOperation::State::Idle))
491 {
492 lastError = make_error_code (Errc::OperationFailed);
493 return -1;
494 }
495
496 io_uring_sqe* sqe = getSqe ();
497 if (JOIN_UNLIKELY (sqe == nullptr))
498 {
499 // LCOV_EXCL_START
500 lastError = make_error_code (Errc::OperationFailed);
501 return -1;
502 // LCOV_EXCL_STOP
503 }
504
505 prepareSqe (sqe, op);
506 op->state = IoOperation::State::Submitted;
507 op->index = static_cast<uint32_t> (_pendingOps.size ());
508 _pendingOps.push_back (op);
509
510 if (JOIN_UNLIKELY (flush))
511 {
512 io_uring_submit (&_ring);
513 }
514
515 return 0;
516}
517
518// =========================================================================
519// CLASS : BasicProactor
520// METHOD : cancelOperation
521// =========================================================================
522template <typename Policy>
523int join::BasicProactor<Policy>::cancelOperation (IoOperation* op, bool flush) noexcept
524{
525 if (JOIN_UNLIKELY (op == nullptr))
526 {
527 lastError = make_error_code (Errc::InvalidParam);
528 return -1;
529 }
530
531 if (JOIN_UNLIKELY (op->fd () < 0))
532 {
533 lastError = std::make_error_code (std::errc::bad_file_descriptor);
534 return -1;
535 }
536
537 if (JOIN_UNLIKELY (op->state != IoOperation::State::Submitted))
538 {
539 lastError = make_error_code (Errc::OperationFailed);
540 return -1;
541 }
542
543 if (JOIN_UNLIKELY (op->index >= _pendingOps.size () || _pendingOps[op->index] != op))
544 {
545 lastError = make_error_code (Errc::InvalidParam);
546 return -1;
547 }
548
549 io_uring_sqe* sqe = getSqe ();
550 if (JOIN_UNLIKELY (sqe == nullptr))
551 {
552 // LCOV_EXCL_START
553 lastError = make_error_code (Errc::OperationFailed);
554 return -1;
555 // LCOV_EXCL_STOP
556 }
557
558 op->state = IoOperation::State::Cancelling;
559 io_uring_prep_cancel (sqe, op, 0);
560 io_uring_sqe_set_data (sqe, nullptr);
561
562 if (JOIN_UNLIKELY (flush))
563 {
564 io_uring_submit (&_ring);
565 }
566
567 return 0;
568}
569
570// =========================================================================
571// CLASS : BasicProactor
572// METHOD : cancelAllOperations
573// =========================================================================
574template <typename Policy>
576{
577 for (IoOperation* op : _pendingOps)
578 {
579 cancelOperation (op, false);
580 }
581}
582
583// =========================================================================
584// CLASS : BasicProactor
585// METHOD : endOperation
586// =========================================================================
587template <typename Policy>
588void join::BasicProactor<Policy>::endOperation (IoOperation* op, int result, bool cancelled) noexcept
589{
590 if (JOIN_UNLIKELY (op == nullptr))
591 {
592 return; // LCOV_EXCL_LINE
593 }
594
595 if (JOIN_LIKELY (op->index < _pendingOps.size () && _pendingOps[op->index] == op))
596 {
597 IoOperation* last = _pendingOps.back ();
598 _pendingOps[op->index] = last;
599 last->index = op->index;
600 _pendingOps.pop_back ();
601 }
602
603 dispatchOperation (op, result, cancelled);
604}
605
606// =========================================================================
607// CLASS : BasicProactor
608// METHOD : getSqe
609// =========================================================================
610template <typename Policy>
611io_uring_sqe* join::BasicProactor<Policy>::getSqe () noexcept
612{
613 io_uring_sqe* sqe = io_uring_get_sqe (&_ring);
614 if (JOIN_UNLIKELY (sqe == nullptr))
615 {
616 io_uring_submit (&_ring);
617 sqe = io_uring_get_sqe (&_ring);
618 }
619
620 return sqe;
621}
622
623// =========================================================================
624// CLASS : BasicProactor
625// METHOD : prepareSqe
626// =========================================================================
627template <typename Policy>
628void join::BasicProactor<Policy>::prepareSqe (io_uring_sqe* sqe, IoOperation* op) noexcept
629{
630 switch (static_cast<IoOperation::Opcode> (op->code))
631 {
632 case IoOperation::Opcode::Accept:
633 io_uring_prep_accept (sqe, op->data.accept.fd, op->data.accept.addr, op->data.accept.addrlen,
634 op->data.accept.flags);
635 break;
636
637 case IoOperation::Opcode::Connect:
638 io_uring_prep_connect (sqe, op->data.connect.fd, op->data.connect.addr, op->data.connect.addrlen);
639 break;
640
641 case IoOperation::Opcode::Read:
642 io_uring_prep_read (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0);
643 break;
644
645 case IoOperation::Opcode::Write:
646 io_uring_prep_write (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0);
647 break;
648
649 case IoOperation::Opcode::ReadFixed:
650 io_uring_prep_read_fixed (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0, op->data.rw.index);
651 break;
652
653 case IoOperation::Opcode::WriteFixed:
654 io_uring_prep_write_fixed (sqe, op->data.rw.fd, op->data.rw.buf, op->data.rw.len, 0, op->data.rw.index);
655 break;
656
657 case IoOperation::Opcode::RecvMsg:
658 io_uring_prep_recvmsg (sqe, op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
659 break;
660
661 case IoOperation::Opcode::SendMsg:
662 io_uring_prep_sendmsg (sqe, op->data.msg.fd, op->data.msg.msg, op->data.msg.flags);
663 break;
664
665 case IoOperation::Opcode::Recv:
666 io_uring_prep_recv (sqe, op->data.stream.fd, op->data.stream.buf, op->data.stream.len,
667 op->data.stream.flags);
668 break;
669
670 case IoOperation::Opcode::Send:
671 io_uring_prep_send (sqe, op->data.stream.fd, op->data.stream.buf, op->data.stream.len,
672 op->data.stream.flags);
673 break;
674
675 default:
676 io_uring_prep_nop (sqe);
677 }
678
679 io_uring_sqe_set_data (sqe, op);
680
681 if (JOIN_UNLIKELY (op->linked))
682 {
683 sqe->flags |= IOSQE_IO_LINK;
684 }
685}
686
687// =========================================================================
688// CLASS : BasicProactor
689// METHOD : rearmWakeup
690// =========================================================================
691template <typename Policy>
693{
694 io_uring_sqe* sqe = getSqe ();
695
696 if (JOIN_LIKELY (sqe != nullptr))
697 {
698 prepareSqe (sqe, &_wakeupOp);
699 _wakeupOp.state = IoOperation::State::Submitted;
700 io_uring_submit (&_ring);
701 }
702}
703
704// =========================================================================
705// CLASS : BasicProactor
706// METHOD : dispatchCqe
707// =========================================================================
708template <typename Policy>
709void join::BasicProactor<Policy>::dispatchCqe (io_uring_cqe* cqe) noexcept
710{
711 dispatchCqe (cqe, is_default<Policy>{});
712}
713
714// =========================================================================
715// CLASS : BasicProactor
716// METHOD : dispatchCqe
717// =========================================================================
718template <typename Policy>
719void join::BasicProactor<Policy>::dispatchCqe (io_uring_cqe* cqe, std::true_type) noexcept
720{
721 IoOperation* op = static_cast<IoOperation*> (io_uring_cqe_get_data (cqe));
722 if (JOIN_UNLIKELY (op == nullptr))
723 {
724 return;
725 }
726
727 if (JOIN_UNLIKELY (op == &_wakeupOp))
728 {
729 _wakeupOp.state = IoOperation::State::Idle;
730 readCommands ();
731 if (JOIN_LIKELY (_running.load (std::memory_order_acquire)))
732 {
733 rearmWakeup ();
734 }
735 return;
736 }
737
738 if (JOIN_UNLIKELY (op->state == IoOperation::State::Idle))
739 {
740 return; // LCOV_EXCL_LINE
741 }
742
743 int result = cqe->res;
744 bool cancelled = (result < 0) && (result == -ECANCELED || op->state == IoOperation::State::Cancelling);
745 endOperation (op, result, cancelled);
746}
747
748// =========================================================================
749// CLASS : BasicProactor
750// METHOD : dispatchCqe
751// =========================================================================
752template <typename Policy>
753void join::BasicProactor<Policy>::dispatchCqe (io_uring_cqe* cqe, std::false_type) noexcept
754{
755 IoOperation* op = static_cast<IoOperation*> (io_uring_cqe_get_data (cqe));
756 if (JOIN_UNLIKELY (op == nullptr))
757 {
758 return;
759 }
760
761 if (JOIN_UNLIKELY (op->state == IoOperation::State::Idle))
762 {
763 return; // LCOV_EXCL_LINE
764 }
765
766 int result = cqe->res;
767 bool cancelled = (result < 0) && (result == -ECANCELED || op->state == IoOperation::State::Cancelling);
768 endOperation (op, result, cancelled);
769}
770
771// =========================================================================
772// CLASS : BasicProactor
773// METHOD : eventLoop
774// =========================================================================
775template <typename Policy>
777{
778 eventLoop (has_spin<Policy>{}, has_sqpoll<Policy>{});
779}
780
781// =========================================================================
782// CLASS : BasicProactor
783// METHOD : eventLoop
784// =========================================================================
785template <typename Policy>
786void join::BasicProactor<Policy>::eventLoop (std::false_type, std::false_type) noexcept
787{
788 if (JOIN_LIKELY (_running.load (std::memory_order_acquire)))
789 {
790 rearmWakeup ();
791 }
792
793 while (_running.load (std::memory_order_acquire) || !_pendingOps.empty ())
794 {
795 io_uring_cqe* cqe = nullptr;
796
797 if (JOIN_LIKELY (_running.load (std::memory_order_acquire)))
798 {
799 if (JOIN_UNLIKELY (io_uring_wait_cqe (&_ring, &cqe) < 0))
800 {
801 continue; // LCOV_EXCL_LINE
802 }
803 }
804 else
805 {
806 // LCOV_EXCL_START
807 io_uring_submit (&_ring);
808 if (JOIN_UNLIKELY (io_uring_peek_cqe (&_ring, &cqe) != 0))
809 {
810 continue;
811 }
812 // LCOV_EXCL_STOP
813 }
814
815 do
816 {
817 dispatchCqe (cqe);
818 io_uring_cqe_seen (&_ring, cqe);
819 }
820 while (io_uring_peek_cqe (&_ring, &cqe) == 0);
821 }
822}
823
824// =========================================================================
825// CLASS : BasicProactor
826// METHOD : eventLoop
827// =========================================================================
828template <typename Policy>
829void join::BasicProactor<Policy>::eventLoop (std::true_type, std::false_type) noexcept
830{
831 Backoff backoff (Policy::spin);
832 bool running;
833
834 while ((running = _running.load (std::memory_order_acquire)) || !_pendingOps.empty ())
835 {
836 if (JOIN_LIKELY (running))
837 {
838 readCommands ();
839 }
840 else
841 {
842 io_uring_submit (&_ring); // LCOV_EXCL_LINE
843 }
844
845 io_uring_cqe* cqe = nullptr;
846 if (JOIN_UNLIKELY (io_uring_peek_cqe (&_ring, &cqe) != 0))
847 {
848 backoff ();
849 continue;
850 }
851
852 do
853 {
854 dispatchCqe (cqe);
855 io_uring_cqe_seen (&_ring, cqe);
856 }
857 while (io_uring_peek_cqe (&_ring, &cqe) == 0);
858
859 backoff.reset ();
860 }
861}
862
863// =========================================================================
864// CLASS : BasicProactor
865// METHOD : eventLoop
866// =========================================================================
867template <typename Policy>
868void join::BasicProactor<Policy>::eventLoop (std::true_type, std::true_type) noexcept
869{
870 Backoff backoff (Policy::spin);
871 bool running;
872
873 while ((running = _running.load (std::memory_order_acquire)) || !_pendingOps.empty ())
874 {
875 if (JOIN_LIKELY (running))
876 {
877 readCommands ();
878
879 if (JOIN_UNLIKELY (IO_URING_READ_ONCE (*_ring.sq.kflags) & IORING_SQ_NEED_WAKEUP))
880 {
881 io_uring_enter (_ring.ring_fd, 0, 0, IORING_ENTER_SQ_WAKEUP, nullptr);
882 }
883 }
884 else
885 {
886 io_uring_submit (&_ring); // LCOV_EXCL_LINE
887 }
888
889 io_uring_cqe* cqe = nullptr;
890 if (JOIN_UNLIKELY (io_uring_peek_cqe (&_ring, &cqe) != 0))
891 {
892 backoff ();
893 continue;
894 }
895
896 do
897 {
898 dispatchCqe (cqe);
899 io_uring_cqe_seen (&_ring, cqe);
900 }
901 while (io_uring_peek_cqe (&_ring, &cqe) == 0);
902
903 backoff.reset ();
904 }
905}
basic proactor class.
Definition proactor.hpp:152
int mlock() const noexcept
lock proactor command queue memory in RAM.
Definition proactor_epoll_impl.hpp:132
bool isRunning() const noexcept
check if the event loop is running.
Definition proactor_epoll_impl.hpp:146
void run()
run the event loop (blocking).
Definition proactor_epoll_impl.hpp:58
~BasicProactor() noexcept
destroy instance.
Definition proactor_epoll_impl.hpp:47
void stop(bool sync=true) noexcept
stop the event loop.
Definition proactor_epoll_impl.hpp:67
bool isProactorThread() const noexcept
check if the calling thread is the proactor thread.
Definition proactor_epoll_impl.hpp:155
BasicProactor()
initialize the proactor and its I/O backend.
Definition proactor_epoll_impl.hpp:29
std::error_code make_error_code(join::Errc code) noexcept
Create an std::error_code object.
Definition error.cpp:150
#define JOIN_LIKELY(x)
Definition utils.hpp:46
#define JOIN_UNLIKELY(x)
Definition utils.hpp:47