join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
threadpool.hpp
Go to the documentation of this file.
1
25#ifndef JOIN_CORE_THREADPOOL_HPP
26#define JOIN_CORE_THREADPOOL_HPP
27
28// libjoin.
29#include <join/condition.hpp>
30#include <join/thread.hpp>
31#include <join/cpu.hpp>
32
33// C++.
34#include <functional>
35#include <memory>
36#include <atomic>
37#include <vector>
38#include <deque>
39
40namespace join
41{
43 class ThreadPool;
44
49 {
50 private:
56
57 public:
62 WorkerThread (const WorkerThread& other) = delete;
63
69 WorkerThread& operator= (const WorkerThread& other) = delete;
70
75 WorkerThread (WorkerThread&& other) = delete;
76
83
87 ~WorkerThread () noexcept;
88
89 private:
93 void work ();
94
96 ThreadPool* _pool = nullptr;
97
99 Thread _thread;
100
102 friend class ThreadPool;
103 };
104
109 {
110 public:
115 ThreadPool (int workers = int (CpuTopology::instance ()->cores ().size ()));
116
121 ThreadPool (const ThreadPool& other) = delete;
122
128 ThreadPool& operator= (const ThreadPool& other) = delete;
129
134 ThreadPool (ThreadPool&& other) = delete;
135
141 ThreadPool& operator= (ThreadPool&& other) = delete;
142
146 ~ThreadPool () noexcept;
147
153 template <class Function, class... Args>
154 void push (Function&& func, Args&&... args)
155 {
156 ScopedLock<Mutex> lock (_mutex);
157 _jobs.emplace_back (std::bind (std::forward<Function> (func), std::forward<Args> (args)...));
158 _condition.signal ();
159 }
160
164 size_t size () const noexcept;
165
166 private:
168 std::vector<std::unique_ptr<WorkerThread>> _workers;
169
171 Condition _condition;
172
174 Mutex _mutex;
175
177 std::atomic<bool> _stop;
178
180 std::deque<std::function<void ()>> _jobs;
181
183 friend class WorkerThread;
184 };
185
192 template <class InputIt, class Func>
193 void distribute (InputIt first, InputIt last, Func function)
194 {
195 // check if we have tasks to perform.
196 int count = std::distance (first, last);
197 if (count)
198 {
199 // determine number of threads and tasks per thread to run.
200 int concurrency = int (CpuTopology::instance ()->cores ().size ());
201 // no need to create more threads than tasks.
202 concurrency = std::min (concurrency, count);
203 int elements = count / concurrency;
204 int rest = count % concurrency;
205 // distribute tasks to threads.
206 std::vector<int> tasks (concurrency, elements);
207 for (int i = 0; i < rest; ++i)
208 {
209 ++tasks[i];
210 }
211
212 // determine the real thread pool size (concurrency minus 1 as we are a thread).
213 std::vector<Thread> pool;
214 int nth = concurrency - 1;
215 pool.reserve (nth);
216
217 // create threads.
218 auto beg = first, end = first;
219 for (int i = 0; i < nth; ++i)
220 {
221 std::advance (end, tasks[i]);
222 pool.emplace_back (function, beg, end);
223 beg = end;
224 }
225
226 // we are a thread so we can help.
227 function (beg, last);
228
229 // wait for threads to terminate.
230 for (auto& thread : pool)
231 {
232 thread.join ();
233 }
234 }
235 }
236
243 template <class InputIt, class Func>
244 void parallelForEach (InputIt first, InputIt last, Func function)
245 {
246 distribute (first, last, [&function] (InputIt beg, InputIt end) {
247 for (; beg != end; ++beg)
248 {
249 function (*beg);
250 }
251 });
252 }
253}
254
255#endif
condition variable class.
Definition condition.hpp:42
static const CpuTopology * instance()
get instance.
Definition cpu.cpp:50
class used to protect shared data from being simultaneously accessed by multiple threads.
Definition mutex.hpp:37
class owning a mutex for the duration of a scoped block.
Definition mutex.hpp:246
thread pool class.
Definition threadpool.hpp:109
ThreadPool(ThreadPool &&other)=delete
move constructor.
ThreadPool(const ThreadPool &other)=delete
copy constructor.
thread class.
Definition thread.hpp:148
worker thread class.
Definition threadpool.hpp:49
WorkerThread & operator=(const WorkerThread &other)=delete
copy assignment.
friend class ThreadPool
friendship with ThreadPool.
Definition threadpool.hpp:102
WorkerThread(WorkerThread &&other)=delete
move constructor.
~WorkerThread() noexcept
destroy worker thread.
Definition threadpool.cpp:47
WorkerThread(const WorkerThread &other)=delete
copy constructor.
Definition acceptor.hpp:32
void distribute(InputIt first, InputIt last, Func function)
determine the number of threads and tasks per thread to run and execute them in parallel.
Definition threadpool.hpp:193
void parallelForEach(InputIt first, InputIt last, Func function)
parallel for each loop.
Definition threadpool.hpp:244
Definition error.hpp:137