join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
reactor.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_REACTOR_HPP
26#define JOIN_CORE_REACTOR_HPP
27
28// libjoin.
29#include <join/thread.hpp>
30#include <join/queue.hpp>
31
32// C++.
33#include <unordered_map>
34#include <unordered_set>
35#include <atomic>
36
37// C.
38#include <sys/epoll.h>
39
40namespace join
41{
46 {
47 public:
51 EventHandler () = default;
52
57 EventHandler (const EventHandler& other) = default;
58
64 EventHandler& operator= (const EventHandler& other) = default;
65
70 EventHandler (EventHandler&& other) = default;
71
78
82 virtual ~EventHandler () = default;
83
84 protected:
89 virtual void onReadable ([[maybe_unused]] int fd)
90 {
91 // do nothing.
92 }
93
98 virtual void onWriteable ([[maybe_unused]] int fd)
99 {
100 // do nothing.
101 }
102
107 virtual void onClose ([[maybe_unused]] int fd)
108 {
109 // do nothing.
110 }
111
116 virtual void onError ([[maybe_unused]] int fd)
117 {
118 // do nothing.
119 }
120
122 friend class Reactor;
123 };
124
129 {
130 public:
134 Reactor ();
135
140 Reactor (const Reactor& other) = delete;
141
147 Reactor& operator= (const Reactor& other) = delete;
148
153 Reactor (Reactor&& other) = delete;
154
160 Reactor& operator= (Reactor&& other) = delete;
161
165 ~Reactor () noexcept;
166
176 int addHandler (int fd, EventHandler* handler, bool wantRead = true, bool wantWrite = false,
177 bool sync = true) noexcept;
178
185 int delHandler (int fd, bool sync = true) noexcept;
186
190 void run ();
191
196 void stop (bool sync = true) noexcept;
197
198#ifdef JOIN_HAS_NUMA
204 int mbind (int numa) const noexcept;
205#endif
206
211 int mlock () const noexcept;
212
217 bool isRunning () const noexcept;
218
223 bool isReactorThread () const noexcept;
224
225 private:
227 static constexpr size_t _deletedReserve = 64;
228
230 static constexpr size_t _queueSize = 1024;
231
233 static constexpr size_t _maxEvents = 1024;
234
238 enum class CommandType
239 {
240 Add,
241 Del,
242 Stop
243 };
244
248 struct alignas (64) Command
249 {
250 CommandType type;
251 int fd;
252 uint32_t events;
253 EventHandler* handler;
254 std::atomic<bool>* done;
255 std::error_code* errc;
256 };
257
265 int registerHandler (int fd, EventHandler* handler, uint32_t events) noexcept;
266
272 int unregisterHandler (int fd) noexcept;
273
279 int writeCommand (const Command& cmd) noexcept;
280
285 void processCommand (const Command& cmd) noexcept;
286
290 void readCommands () noexcept;
291
296 void dispatchEvent (const epoll_event& event);
297
301 void eventLoop ();
302
308 bool isActive (int fd) const noexcept;
309
311 int _wakeup = -1;
312
314 int _epoll = -1;
315
317 LocalMem::Mpsc::Queue<Command> _commands;
318
320 std::unordered_map<int, EventHandler*> _handlers;
321
323 std::unordered_set<int> _deleted;
324
326 std::atomic<bool> _running{false};
327
329 static constexpr pthread_t _invalidThreadId = static_cast<pthread_t> (-1);
330
332 std::atomic<pthread_t> _threadId{_invalidThreadId};
333 };
334
339 {
340 public:
345 static Reactor& reactor ();
346
352 static int affinity (int core);
353
358 static int affinity ();
359
365 static int priority (int prio);
366
371 static int priority ();
372
377 static pthread_t handle ();
378
379#ifdef JOIN_HAS_NUMA
385 static int mbind (int numa);
386#endif
387
392 static int mlock ();
393
394 private:
399 static ReactorThread& instance ();
400
404 ReactorThread ();
405
410 ReactorThread (const ReactorThread& other) = delete;
411
417 ReactorThread& operator= (const ReactorThread& other) = delete;
418
423 ReactorThread (ReactorThread&& other) noexcept = delete;
424
430 ReactorThread& operator= (ReactorThread&& other) noexcept = delete;
431
436
438 Reactor _reactor;
439
441 Thread _dispatcher;
442 };
443}
444
445#endif
Event handler interface class.
Definition reactor.hpp:46
EventHandler(const EventHandler &other)=default
copy constructor.
virtual void onClose(int fd)
method called when handle was closed by the peer.
Definition reactor.hpp:107
virtual ~EventHandler()=default
destroy instance.
virtual void onError(int fd)
method called when an error occurred on handle.
Definition reactor.hpp:116
EventHandler()=default
create instance.
virtual void onReadable(int fd)
method called when data are ready to be read on handle.
Definition reactor.hpp:89
EventHandler & operator=(const EventHandler &other)=default
copy assignment operator.
EventHandler(EventHandler &&other)=default
move constructor.
virtual void onWriteable(int fd)
method called when data are ready to be written on handle.
Definition reactor.hpp:98
Convenience class that owns a Reactor running on a dedicated background thread.
Definition reactor.hpp:339
static int affinity()
get reactor thread affinity.
Definition reactor.cpp:532
static int priority()
get reactor thread priority.
Definition reactor.cpp:550
static pthread_t handle()
get the handle of the reactor thread.
Definition reactor.cpp:559
static Reactor & reactor()
get the global Reactor instance.
Definition reactor.cpp:514
static int mlock()
lock command queue memory in RAM.
Definition reactor.cpp:579
Reactor class.
Definition reactor.hpp:129
bool isRunning() const noexcept
check if the event loop is running.
Definition reactor.cpp:256
Reactor(Reactor &&other)=delete
move constructor.
Reactor()
default constructor.
Definition reactor.cpp:46
bool isReactorThread() const noexcept
check if the calling thread is the reactor thread.
Definition reactor.cpp:265
int mlock() const noexcept
lock command queue memory in RAM.
Definition reactor.cpp:247
int delHandler(int fd, bool sync=true) noexcept
delete handler from reactor.
Definition reactor.cpp:155
Reactor & operator=(const Reactor &other)=delete
copy assignment operator.
void run()
run the event loop (blocking).
Definition reactor.cpp:198
Reactor(const Reactor &other)=delete
copy constructor.
~Reactor() noexcept
destroy instance.
Definition reactor.cpp:88
void stop(bool sync=true) noexcept
stop the event loop.
Definition reactor.cpp:212
int addHandler(int fd, EventHandler *handler, bool wantRead=true, bool wantWrite=false, bool sync=true) noexcept
add handler to reactor.
Definition reactor.cpp:100
thread class.
Definition thread.hpp:148
Definition acceptor.hpp:32
Definition error.hpp:137