join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
netlinkmanager.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_FABRIC_NETLINKMANAGER_HPP
26#define JOIN_FABRIC_NETLINKMANAGER_HPP
27
28// libjoin.
29#include <join/condition.hpp>
30#include <join/reactor.hpp>
31#include <join/socket.hpp>
32#include <join/queue.hpp>
33
34// C++.
35#include <unordered_map>
36#include <functional>
37#include <memory>
38#include <atomic>
39
40// C.
41#include <linux/rtnetlink.h>
42#include <linux/netlink.h>
43#include <cstdint>
44#include <cstddef>
45
46namespace join
47{
52 {
53 public:
60
64 NetlinkManager (const NetlinkManager&) = delete;
65
70
75
80
84 virtual ~NetlinkManager ();
85
90 Reactor& reactor () const noexcept;
91
92 protected:
96 void start ();
97
101 void stop ();
102
110 int sendRequest (struct nlmsghdr* nlh, bool sync, std::chrono::milliseconds timeout = std::chrono::seconds (5));
111
117 template <typename Func, typename... Args>
118 void pushJob (Func&& func, Args&&... args) noexcept
119 {
120 Job job;
121 job.func = std::bind (std::forward<Func> (func), std::forward<Args> (args)...);
122
124 {
125 job.func ();
126 return;
127 }
128
129 _jobs.push (&job);
130
131 uint64_t v = 1;
132 [[maybe_unused]] ssize_t bytes = ::write (_wakeup, &v, sizeof (v));
133
134 Backoff backoff;
135 while (!job.done.load (std::memory_order_acquire))
136 {
137 backoff ();
138 }
139 }
140
145 virtual void onReadable (int fd) override final;
146
151 virtual void onMessage (struct nlmsghdr* nlh) = 0;
152
158 void notifyRequest (uint32_t seq, int error = 0);
159
167 static void addAttributes (struct nlmsghdr* nlh, int type, const void* data, int alen);
168
175 static struct rtattr* startNestedAttributes (struct nlmsghdr* nlh, int type);
176
183 static int stopNestedAttributes (struct nlmsghdr* nlh, struct rtattr* nested);
184
191 template <typename T, typename Flag>
192 static Flag updateValue (T& oldVal, const T& newVal, Flag changed)
193 {
194 if (oldVal != newVal)
195 {
196 oldVal = newVal;
197 return changed;
198 }
199 return static_cast<Flag> (0);
200 }
201
203 static constexpr size_t _bufferSize = 16384;
204
206 std::unique_ptr<char[]> _buffer;
207
209 std::atomic<uint32_t> _seq;
210
213 {
215 int error = 0;
216 };
217
219 std::unordered_map<uint32_t, std::unique_ptr<PendingRequest>> _pending;
220
223
227 struct Job
228 {
230 std::function<void ()> func;
231
233 std::atomic<bool> done{false};
234 };
235
237 static constexpr size_t _jobQueueSize = 256;
238
241
243 int _wakeup = -1;
244
247 };
248}
249
250#endif
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
basic datagram socket class.
Definition socket.hpp:644
virtual int write(const char *data, unsigned long maxSize) noexcept override
write data.
Definition socket.hpp:925
queue base class.
Definition queue.hpp:127
int type() const noexcept
get the protocol communication semantic.
Definition socket.hpp:515
condition variable class.
Definition condition.hpp:42
Event handler interface class.
Definition reactor.hpp:46
class used to protect shared data from being simultaneously accessed by multiple threads.
Definition mutex.hpp:37
base class for netlink-based managers.
Definition netlinkmanager.hpp:52
Reactor & reactor() const noexcept
get the event loop reactor.
Definition netlinkmanager.cpp:62
static void addAttributes(struct nlmsghdr *nlh, int type, const void *data, int alen)
add an attribute to a netlink message.
Definition netlinkmanager.cpp:209
std::atomic< uint32_t > _seq
sequence number.
Definition netlinkmanager.hpp:209
NetlinkManager(uint32_t groups, Reactor &reactor=ReactorThread::reactor())
create instance.
Definition netlinkmanager.cpp:38
static constexpr size_t _bufferSize
internal buffer size.
Definition netlinkmanager.hpp:203
virtual void onReadable(int fd) override final
method called when data are ready to be read on handle.
Definition netlinkmanager.cpp:146
int _wakeup
eventfd used to wake the reactor thread for pending jobs.
Definition netlinkmanager.hpp:243
int sendRequest(struct nlmsghdr *nlh, bool sync, std::chrono::milliseconds timeout=std::chrono::seconds(5))
send a netlink request, optionally waiting for the ack.
Definition netlinkmanager.cpp:91
Mutex _syncMutex
protection mutex.
Definition netlinkmanager.hpp:222
void pushJob(Func &&func, Args &&... args) noexcept
push a job to be executed on the reactor thread.
Definition netlinkmanager.hpp:118
static Flag updateValue(T &oldVal, const T &newVal, Flag changed)
update a value in place and report whether it changed.
Definition netlinkmanager.hpp:192
void notifyRequest(uint32_t seq, int error=0)
notify a pending synchronous request.
Definition netlinkmanager.cpp:193
NetlinkManager(NetlinkManager &&)=delete
create instance by move.
Reactor & _reactor
event loop reactor.
Definition netlinkmanager.hpp:246
NetlinkManager & operator=(const NetlinkManager &)=delete
assign instance by copy.
void stop()
stop listening for netlink events.
Definition netlinkmanager.cpp:81
NetlinkManager(const NetlinkManager &)=delete
create instance by copy.
static struct rtattr * startNestedAttributes(struct nlmsghdr *nlh, int type)
open a nested attribute block.
Definition netlinkmanager.cpp:224
void start()
start listening for netlink events.
Definition netlinkmanager.cpp:71
std::unique_ptr< char[]> _buffer
internal read buffer.
Definition netlinkmanager.hpp:206
LocalMem::Mpsc::Queue< Job * > _jobs
job queue.
Definition netlinkmanager.hpp:240
virtual void onMessage(struct nlmsghdr *nlh)=0
dispatch a single RTM_* message to the derived class.
static constexpr size_t _jobQueueSize
job queue size.
Definition netlinkmanager.hpp:237
virtual ~NetlinkManager()
destroy instance.
Definition netlinkmanager.cpp:53
std::unordered_map< uint32_t, std::unique_ptr< PendingRequest > > _pending
synchronous requests indexed by sequence number.
Definition netlinkmanager.hpp:219
static int stopNestedAttributes(struct nlmsghdr *nlh, struct rtattr *nested)
close a nested attribute block.
Definition netlinkmanager.cpp:236
static Reactor & reactor()
get the global Reactor instance.
Definition reactor.cpp:514
Reactor class.
Definition reactor.hpp:129
bool isReactorThread() const noexcept
check if the calling thread is the reactor thread.
Definition reactor.cpp:265
Definition acceptor.hpp:32
Definition error.hpp:137
job to be executed on the reactor thread.
Definition netlinkmanager.hpp:228
std::function< void()> func
function to execute.
Definition netlinkmanager.hpp:230
std::atomic< bool > done
set to true when the job has been executed.
Definition netlinkmanager.hpp:233
pending synchronous request.
Definition netlinkmanager.hpp:213
int error
Definition netlinkmanager.hpp:215
Condition cond
Definition netlinkmanager.hpp:214