join 1.0
lightweight network framework library
Loading...
Searching...
No Matches
threadpool.hpp
Go to the documentation of this file.
1
25#ifndef __JOIN_THREADPOOL_HPP__
26#define __JOIN_THREADPOOL_HPP__
27
28// libjoin.
29#include <join/condition.hpp>
30#include <join/thread.hpp>
31
32// C++.
33#include <functional>
34#include <thread>
35#include <memory>
36#include <atomic>
37#include <vector>
38#include <deque>
39
40namespace join
41{
43 class ThreadPool;
44
49 {
50 public:
56
61
62 private:
66 void work ();
67
69 ThreadPool* _pool = nullptr;
70
72 Thread _thread;
73 };
74
79 {
80 public:
85 ThreadPool (int workers = std::max (int (std::thread::hardware_concurrency ()), 1));
86
90 ~ThreadPool ();
91
95 template <class Function, class... Args>
96 void push (Function&& func, Args&&... args)
97 {
98 ScopedLock <Mutex> lock (_mutex);
99 _jobs.emplace_back (std::bind (std::forward <Function> (func), std::forward <Args> (args)...));
100 _condition.signal ();
101 }
102
106 size_t size ();
107
108 private:
110 std::vector <std::unique_ptr <WorkerThread>> _workers;
111
113 Condition _condition;
114
116 Mutex _mutex;
117
119 std::atomic <bool> _stop;
120
122 std::deque <std::function <void ()>> _jobs;
123
125 friend class WorkerThread;
126 };
127
134 template <class InputIt, class Func>
135 void distribute (InputIt first, InputIt last, Func function)
136 {
137 // check if we have tasks to perform.
138 int count = std::distance (first, last);
139 if (count)
140 {
141 // determine number of threads and tasks per thread to run.
142 int concurrency = std::max (int (std::thread::hardware_concurrency ()), 1);
143 // no need to create more threads than tasks.
144 concurrency = std::min (concurrency, count);
145 int elements = count / concurrency;
146 int rest = count % concurrency;
147 // distribute tasks to threads.
148 std::vector <int> tasks (concurrency, elements);
149 for (int i = 0; i < rest; ++i)
150 {
151 ++tasks[i];
152 }
153
154 // determine the real thread pool size (concurrency minus 1 as we are a thread).
155 std::vector <Thread> pool;
156 int nth = concurrency - 1;
157 pool.reserve (nth);
158
159 // create threads.
160 auto beg = first, end = first;
161 for (int i = 0; i < nth; ++i)
162 {
163 std::advance (end, tasks[i]);
164 pool.emplace_back (function, beg, end);
165 beg = end;
166 }
167
168 // we are a thread so we can help.
169 function (beg, last);
170
171 // wait for threads to terminate.
172 for (auto& thread : pool)
173 {
174 thread.join ();
175 }
176 }
177 }
178
185 template <class InputIt, class Func>
186 void parallelForEach (InputIt first, InputIt last, Func function)
187 {
188 distribute (first, last, [&function] (InputIt beg, InputIt end)
189 {
190 for (; beg != end; ++beg)
191 {
192 function (*beg);
193 }
194 });
195 }
196}
197
198#endif
condition variable class.
Definition condition.hpp:42
void signal() noexcept
unblocks one of the waiting threads.
Definition condition.cpp:60
class used to protect shared data from being simultaneously accessed by multiple threads.
Definition mutex.hpp:37
thread pool class.
Definition threadpool.hpp:79
~ThreadPool()
destroy thread pool.
Definition threadpool.cpp:90
ThreadPool(int workers=std::max(int(std::thread::hardware_concurrency()), 1))
create thread pool.
Definition threadpool.cpp:77
size_t size()
return thread pool size.
Definition threadpool.cpp:101
void push(Function &&func, Args &&... args)
push a job to the work queue.
Definition threadpool.hpp:96
thread class.
Definition thread.hpp:138
worker thread class.
Definition threadpool.hpp:49
~WorkerThread()
destroy worker thread.
Definition threadpool.cpp:45
WorkerThread(ThreadPool &pool)
create worker thread.
Definition threadpool.cpp:35
Definition acceptor.hpp:32
void distribute(InputIt first, InputIt last, Func function)
determine the number of threads and tasks per thread to run and execute them in parallel.
Definition threadpool.hpp:135
void parallelForEach(InputIt first, InputIt last, Func function)
parrallel for each loop.
Definition threadpool.hpp:186