join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
netlinkmanager.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_FABRIC_NETLINKMANAGER_HPP
26#define JOIN_FABRIC_NETLINKMANAGER_HPP
27
28// libjoin.
29#include <join/condition.hpp>
30#include <join/socket.hpp>
31#include <join/queue.hpp>
32
33// C++.
34#include <unordered_map>
35#include <functional>
36#include <memory>
37#include <atomic>
38
39// C.
40#include <linux/rtnetlink.h>
41#include <linux/netlink.h>
42#include <cstdint>
43#include <cstddef>
44
45namespace join
46{
51 {
52 public:
58 NetlinkManager (uint32_t groups, Reactor* reactor = nullptr);
59
63 NetlinkManager (const NetlinkManager&) = delete;
64
69
74
79
83 virtual ~NetlinkManager ();
84
89 Reactor* reactor () const noexcept;
90
91 protected:
95 void start ();
96
100 void stop ();
101
109 int sendRequest (struct nlmsghdr* nlh, bool sync, std::chrono::milliseconds timeout = std::chrono::seconds (5));
110
118 template <class Rep, class Period>
119 int waitResponse (ScopedLock<Mutex>& lock, uint32_t seq, std::chrono::duration<Rep, Period> timeout)
120 {
121 auto inserted = _pending.emplace (seq, std::make_unique<PendingRequest> ());
122 if (!inserted.second)
123 {
124 lastError = make_error_code (Errc::OperationFailed);
125 return -1;
126 }
127
128 if (!inserted.first->second->cond.timedWait (lock, timeout))
129 {
130 _pending.erase (inserted.first);
131 lastError = make_error_code (Errc::TimedOut);
132 return -1;
133 }
134
135 if (inserted.first->second->error != 0)
136 {
137 int err = inserted.first->second->error;
138 _pending.erase (inserted.first);
139 lastError = std::error_code (err, std::generic_category ());
140 return -1;
141 }
142
143 _pending.erase (inserted.first);
144 return 0;
145 }
146
152 template <typename Func, typename... Args>
153 void pushJob (Func&& func, Args&&... args) noexcept
154 {
155 Job job;
156 job.func = std::bind (std::forward<Func> (func), std::forward<Args> (args)...);
157
159 {
160 job.func ();
161 return;
162 }
163
164 _jobs.push (&job);
165
166 uint64_t v = 1;
167 [[maybe_unused]] ssize_t bytes = ::write (_wakeup, &v, sizeof (v));
168
169 Backoff backoff;
170 while (!job.done.load (std::memory_order_acquire))
171 {
172 backoff ();
173 }
174 }
175
180 virtual void onReceive (int fd) override final;
181
186 virtual void onMessage (struct nlmsghdr* nlh) = 0;
187
193 void notifyRequest (uint32_t seq, int error = 0);
194
202 static void addAttributes (struct nlmsghdr* nlh, int type, const void* data, int alen);
203
210 static struct rtattr* startNestedAttributes (struct nlmsghdr* nlh, int type);
211
218 static int stopNestedAttributes (struct nlmsghdr* nlh, struct rtattr* nested);
219
226 template <typename T, typename Flag>
227 static Flag updateValue (T& oldVal, const T& newVal, Flag changed)
228 {
229 if (oldVal != newVal)
230 {
231 oldVal = newVal;
232 return changed;
233 }
234 return static_cast<Flag> (0);
235 }
236
238 static constexpr size_t _bufferSize = 16384;
239
241 std::unique_ptr<char[]> _buffer;
242
244 std::atomic<uint32_t> _seq;
245
248 {
250 int error = 0;
251 };
252
254 std::unordered_map<uint32_t, std::unique_ptr<PendingRequest>> _pending;
255
258
262 struct Job
263 {
265 std::function<void ()> func;
266
268 std::atomic<bool> done{false};
269 };
270
272 static constexpr size_t _jobQueueSize = 256;
273
276
278 int _wakeup = -1;
279
282 };
283}
284
285#endif
adaptive backoff strategy for busy-wait loops.
Definition backoff.hpp:45
basic datagram socket class.
Definition socket.hpp:645
virtual int write(const char *data, unsigned long maxSize) noexcept override
write data.
Definition socket.hpp:926
queue base class.
Definition queue.hpp:136
int type() const noexcept
get the protocol communication semantic.
Definition socket.hpp:516
condition variable class.
Definition condition.hpp:42
class used to protect shared data from being simultaneously accessed by multiple threads.
Definition mutex.hpp:37
base class for netlink-based managers.
Definition netlinkmanager.hpp:51
static void addAttributes(struct nlmsghdr *nlh, int type, const void *data, int alen)
add an attribute to a netlink message.
Definition netlinkmanager.cpp:184
std::atomic< uint32_t > _seq
sequence number.
Definition netlinkmanager.hpp:244
virtual void onReceive(int fd) override final
method called when data are ready to be read on handle.
Definition netlinkmanager.cpp:121
int waitResponse(ScopedLock< Mutex > &lock, uint32_t seq, std::chrono::duration< Rep, Period > timeout)
wait for specific netlink response.
Definition netlinkmanager.hpp:119
static constexpr size_t _bufferSize
internal buffer size.
Definition netlinkmanager.hpp:238
NetlinkManager(uint32_t groups, Reactor *reactor=nullptr)
create instance.
Definition netlinkmanager.cpp:38
int _wakeup
eventfd used to wake the reactor thread for pending jobs.
Definition netlinkmanager.hpp:278
int sendRequest(struct nlmsghdr *nlh, bool sync, std::chrono::milliseconds timeout=std::chrono::seconds(5))
send a netlink request, optionally waiting for the ack.
Definition netlinkmanager.cpp:95
Mutex _syncMutex
protection mutex.
Definition netlinkmanager.hpp:257
void pushJob(Func &&func, Args &&... args) noexcept
push a job to be executed on the reactor thread.
Definition netlinkmanager.hpp:153
static Flag updateValue(T &oldVal, const T &newVal, Flag changed)
update a value in place and report whether it changed.
Definition netlinkmanager.hpp:227
void notifyRequest(uint32_t seq, int error=0)
notify a pending synchronous request.
Definition netlinkmanager.cpp:168
NetlinkManager(NetlinkManager &&)=delete
create instance by move.
Reactor * reactor() const noexcept
get the event loop reactor.
Definition netlinkmanager.cpp:66
Reactor * _reactor
event loop reactor.
Definition netlinkmanager.hpp:281
NetlinkManager & operator=(const NetlinkManager &)=delete
assign instance by copy.
void stop()
stop listening for netlink events.
Definition netlinkmanager.cpp:85
NetlinkManager(const NetlinkManager &)=delete
create instance by copy.
static struct rtattr * startNestedAttributes(struct nlmsghdr *nlh, int type)
open a nested attribute block.
Definition netlinkmanager.cpp:199
void start()
start listening for netlink events.
Definition netlinkmanager.cpp:75
std::unique_ptr< char[]> _buffer
internal read buffer.
Definition netlinkmanager.hpp:241
LocalMem::Mpsc::Queue< Job * > _jobs
job queue.
Definition netlinkmanager.hpp:275
virtual void onMessage(struct nlmsghdr *nlh)=0
dispatch a single RTM_* message to the derived class.
static constexpr size_t _jobQueueSize
job queue size.
Definition netlinkmanager.hpp:272
virtual ~NetlinkManager()
destroy instance.
Definition netlinkmanager.cpp:57
std::unordered_map< uint32_t, std::unique_ptr< PendingRequest > > _pending
synchronous requests indexed by sequence number.
Definition netlinkmanager.hpp:254
static int stopNestedAttributes(struct nlmsghdr *nlh, struct rtattr *nested)
close a nested attribute block.
Definition netlinkmanager.cpp:211
Reactor class.
Definition reactor.hpp:120
bool isReactorThread() const noexcept
check if the calling thread is the reactor thread.
Definition reactor.cpp:259
class owning a mutex for the duration of a scoped block.
Definition mutex.hpp:246
Definition acceptor.hpp:32
Definition error.hpp:137
job to be executed on the reactor thread.
Definition netlinkmanager.hpp:263
std::function< void()> func
function to execute.
Definition netlinkmanager.hpp:265
std::atomic< bool > done
set to true when the job has been executed.
Definition netlinkmanager.hpp:268
pending synchronous request.
Definition netlinkmanager.hpp:248
int error
Definition netlinkmanager.hpp:250
Condition cond
Definition netlinkmanager.hpp:249