join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
reactor.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_REACTOR_HPP
26#define JOIN_CORE_REACTOR_HPP
27
28// libjoin.
29#include <join/thread.hpp>
30#include <join/queue.hpp>
31
32// C++.
33#include <unordered_map>
34#include <unordered_set>
35#include <atomic>
36
37// C.
38#include <sys/epoll.h>
39
40namespace join
41{
46 {
47 public:
51 EventHandler () = default;
52
57 EventHandler (const EventHandler& other) = default;
58
64 EventHandler& operator= (const EventHandler& other) = default;
65
70 EventHandler (EventHandler&& other) = default;
71
78
82 virtual ~EventHandler () = default;
83
84 protected:
89 virtual void onReceive ([[maybe_unused]] int fd)
90 {
91 // do nothing.
92 }
93
98 virtual void onClose ([[maybe_unused]] int fd)
99 {
100 // do nothing.
101 }
102
107 virtual void onError ([[maybe_unused]] int fd)
108 {
109 // do nothing.
110 }
111
113 friend class Reactor;
114 };
115
120 {
121 public:
125 Reactor ();
126
131 Reactor (const Reactor& other) = delete;
132
138 Reactor& operator= (const Reactor& other) = delete;
139
144 Reactor (Reactor&& other) = delete;
145
151 Reactor& operator= (Reactor&& other) = delete;
152
156 ~Reactor () noexcept;
157
165 int addHandler (int fd, EventHandler* handler, bool sync = true) noexcept;
166
173 int delHandler (int fd, bool sync = true) noexcept;
174
178 void run ();
179
184 void stop (bool sync = true) noexcept;
185
186#ifdef JOIN_HAS_NUMA
192 int mbind (int numa) const noexcept;
193#endif
194
199 int mlock () const noexcept;
200
205 bool isReactorThread () const noexcept;
206
207 private:
209 static constexpr size_t _deletedReserve = 64;
210
212 static constexpr size_t _queueSize = 1024;
213
215 static constexpr size_t _maxEvents = 1024;
216
220 enum class CommandType
221 {
222 Add,
223 Del,
224 Stop
225 };
226
230 struct alignas (64) Command
231 {
232 CommandType type;
233 int fd;
234 EventHandler* handler;
235 std::atomic<bool>* done;
236 std::atomic<int>* errc;
237 };
238
245 int registerHandler (int fd, EventHandler* handler) noexcept;
246
252 int unregisterHandler (int fd) noexcept;
253
259 int writeCommand (const Command& cmd) noexcept;
260
265 void processCommand (const Command& cmd) noexcept;
266
270 void readCommands () noexcept;
271
276 void dispatchEvent (const epoll_event& event);
277
281 void eventLoop ();
282
288 bool isActive (int fd) const noexcept;
289
291 int _wakeup = -1;
292
294 int _epoll = -1;
295
297 LocalMem::Mpsc::Queue<Command> _commands;
298
300 std::unordered_map<int, EventHandler*> _handlers;
301
303 std::unordered_set<int> _deleted;
304
306 std::atomic<bool> _running{false};
307
309 std::atomic<pthread_t> _threadId{0};
310 };
311
316 {
317 public:
322 static Reactor* reactor ();
323
329 static int affinity (int core);
330
335 static int affinity ();
336
342 static int priority (int prio);
343
348 static int priority ();
349
354 static pthread_t handle ();
355
356#ifdef JOIN_HAS_NUMA
362 static int mbind (int numa);
363#endif
364
369 static int mlock ();
370
371 private:
376 static ReactorThread& instance ();
377
381 ReactorThread ();
382
387 ReactorThread (const ReactorThread& other) = delete;
388
394 ReactorThread& operator= (const ReactorThread& other) = delete;
395
400 ReactorThread (ReactorThread&& other) noexcept = delete;
401
407 ReactorThread& operator= (ReactorThread&& other) noexcept = delete;
408
413
415 Reactor _reactor;
416
418 Thread _dispatcher;
419 };
420}
421
422#endif
Event handler interface class.
Definition reactor.hpp:46
EventHandler(const EventHandler &other)=default
copy constructor.
virtual void onClose(int fd)
method called when handle is closed.
Definition reactor.hpp:98
virtual void onReceive(int fd)
method called when data are ready to be read on handle.
Definition reactor.hpp:89
virtual ~EventHandler()=default
destroy instance.
virtual void onError(int fd)
method called when an error occurred on handle.
Definition reactor.hpp:107
EventHandler()=default
create instance.
EventHandler & operator=(const EventHandler &other)=default
copy assignment operator.
EventHandler(EventHandler &&other)=default
move constructor.
Convenience class that owns a Reactor running on a dedicated background thread.
Definition reactor.hpp:316
static int affinity()
get reactor thread affinity.
Definition reactor.cpp:467
static int priority()
get reactor thread priority.
Definition reactor.cpp:485
static pthread_t handle()
get the handle of the reactor thread.
Definition reactor.cpp:494
static Reactor * reactor()
get the global Reactor instance.
Definition reactor.cpp:449
static int mlock()
lock command queue memory in RAM.
Definition reactor.cpp:514
Reactor class.
Definition reactor.hpp:120
Reactor(Reactor &&other)=delete
move constructor.
Reactor()
default constructor.
Definition reactor.cpp:46
bool isReactorThread() const noexcept
check if the calling thread is the reactor thread.
Definition reactor.cpp:259
int mlock() const noexcept
lock command queue memory in RAM.
Definition reactor.cpp:250
int delHandler(int fd, bool sync=true) noexcept
delete handler from reactor.
Definition reactor.cpp:150
int addHandler(int fd, EventHandler *handler, bool sync=true) noexcept
add handler to reactor.
Definition reactor.cpp:94
Reactor & operator=(const Reactor &other)=delete
copy assignment operator.
void run()
run the event loop (blocking).
Definition reactor.cpp:200
Reactor(const Reactor &other)=delete
copy constructor.
~Reactor() noexcept
destroy instance.
Definition reactor.cpp:82
void stop(bool sync=true) noexcept
stop the event loop.
Definition reactor.cpp:214
thread class.
Definition thread.hpp:148
Definition acceptor.hpp:32
Definition error.hpp:137