Logo ROOT  
Reference Guide
TThreadExecutor.hxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Xavier Valls March 2016
3
4/*************************************************************************
5 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadExecutor
13#define ROOT_TThreadExecutor
14
15#include "RConfigure.h"
16
17// exclude in case ROOT does not have IMT support
18#ifndef R__USE_IMT
19// No need to error out for dictionaries.
20# if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
21# error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT."
22# endif
23#else
24
25#include "ROOT/TExecutor.hxx"
26#include "RTaskArena.hxx"
27#include "TError.h"
28#include <functional>
29#include <memory>
30#include <numeric>
31
32
33namespace ROOT {
34
35 class TThreadExecutor: public TExecutor<TThreadExecutor> {
36 public:
37
38 explicit TThreadExecutor(UInt_t nThreads = 0u);
39
42
43 template<class F>
44 void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
45 template<class F, class INTEGER>
46 void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
47 /// \cond
48 template<class F, class T>
49 void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
50 /// \endcond
51 template<class F, class T>
52 void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
53 template<class F, class T>
54 void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);
55
57 template<class F, class Cond = noReferenceCond<F>>
58 auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
59 template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
60 auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
61 template<class F, class T, class Cond = noReferenceCond<F, T>>
62 auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
63
64 // // MapReduce
65 // // the late return types also check at compile-time whether redfunc is compatible with func,
66 // // other than checking that func is compatible with the type of arguments.
67 // // a static_assert check in TThreadExecutor::Reduce is used to check that redfunc is compatible with the type returned by func
69 template<class F, class R, class Cond = noReferenceCond<F>>
70 auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
71 template<class F, class R, class Cond = noReferenceCond<F>>
72 auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type;
73 template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
74 auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type;
75 /// \cond
76 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
77 auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
78 /// \endcond
79 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
80 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
81 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
82 auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
83
85 template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()));
86 template<class T, class R> auto Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
87
88 unsigned GetPoolSize();
89
90 protected:
91 template<class F, class R, class Cond = noReferenceCond<F>>
92 auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type>;
93 template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
94 auto Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
95 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
96 auto Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
97 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
98 auto Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
99
100 private:
101 void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function<void(unsigned int i)> &f);
102 double ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc);
103 float ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc);
104 template<class T, class R>
105 auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
106
107 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> fTaskArenaW = nullptr;
108 };
109
110 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
111
112 //////////////////////////////////////////////////////////////////////////
113 /// Execute func (with no arguments) nTimes in parallel.
114 /// Functions that take more than zero arguments can be executed (with
115 /// fixed arguments) by wrapping them in a lambda or with std::bind.
116 template<class F>
117 void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
118 if (nChunks == 0) {
119 ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
120 return;
121 }
122
123 unsigned step = (nTimes + nChunks - 1) / nChunks;
124 auto lambda = [&](unsigned int i)
125 {
126 for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
127 func();
128 }
129 };
130 ParallelFor(0U, nTimes, step, lambda);
131 }
132
133 //////////////////////////////////////////////////////////////////////////
134 /// Execute func in parallel, taking an element of a
135 /// sequence as argument.
136 template<class F, class INTEGER>
137 void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
138 if (nChunks == 0) {
139 ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
140 return;
141 }
142 unsigned start = *args.begin();
143 unsigned end = *args.end();
144 unsigned seqStep = args.step();
145 unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
146
147 auto lambda = [&](unsigned int i)
148 {
149 for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
150 func(i + j);
151 }
152 };
153 ParallelFor(start, end, step, lambda);
154 }
155
156 /// \cond
157 //////////////////////////////////////////////////////////////////////////
158 /// Execute func in parallel, taking an element of a
159 /// initializer_list as argument.
160 template<class F, class T>
161 void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
162 std::vector<T> vargs(std::move(args));
163 Foreach(func, vargs, nChunks);
164 }
165 /// \endcond
166
167 //////////////////////////////////////////////////////////////////////////
168 /// Execute func in parallel, taking an element of an
169 /// std::vector as argument.
170 template<class F, class T>
171 void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
172 unsigned int nToProcess = args.size();
173 if (nChunks == 0) {
174 ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
175 return;
176 }
177
178 unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
179 auto lambda = [&](unsigned int i)
180 {
181 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
182 func(args[i + j]);
183 }
184 };
185 ParallelFor(0U, nToProcess, step, lambda);
186 }
187
188 //////////////////////////////////////////////////////////////////////////
189 /// Execute func in parallel, taking an element of a std::vector as argument.
190 template<class F, class T>
191 void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
192 unsigned int nToProcess = args.size();
193 if (nChunks == 0) {
194 ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
195 return;
196 }
197
198 unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
199 auto lambda = [&](unsigned int i)
200 {
201 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
202 func(args[i + j]);
203 }
204 };
205 ParallelFor(0U, nToProcess, step, lambda);
206 }
207
208 //////////////////////////////////////////////////////////////////////////
209 /// Execute func (with no arguments) nTimes in parallel.
210 /// A vector containg executions' results is returned.
211 /// Functions that take more than zero arguments can be executed (with
212 /// fixed arguments) by wrapping them in a lambda or with std::bind.
213 template<class F, class Cond>
214 auto TThreadExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type> {
215 using retType = decltype(func());
216 std::vector<retType> reslist(nTimes);
217 auto lambda = [&](unsigned int i)
218 {
219 reslist[i] = func();
220 };
221 ParallelFor(0U, nTimes, 1, lambda);
222
223 return reslist;
224 }
225
226 //////////////////////////////////////////////////////////////////////////
227 /// Execute func in parallel, taking an element of a
228 /// sequence as argument.
229 /// A vector containg executions' results is returned.
230 template<class F, class INTEGER, class Cond>
231 auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
232 unsigned start = *args.begin();
233 unsigned end = *args.end();
234 unsigned seqStep = args.step();
235
236 using retType = decltype(func(start));
237 std::vector<retType> reslist(args.size());
238 auto lambda = [&](unsigned int i)
239 {
240 reslist[i] = func(i);
241 };
242 ParallelFor(start, end, seqStep, lambda);
243
244 return reslist;
245 }
246
247 //////////////////////////////////////////////////////////////////////////
248 /// Execute func (with no arguments) nTimes in parallel.
249 /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction;
250 /// A vector containg partial reductions' results is returned.
251 template<class F, class R, class Cond>
252 auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type> {
253 if (nChunks == 0)
254 {
255 return Map(func, nTimes);
256 }
257
258 unsigned step = (nTimes + nChunks - 1) / nChunks;
259 // Avoid empty chunks
260 unsigned actualChunks = (nTimes + step - 1) / step;
261 using retType = decltype(func());
262 std::vector<retType> reslist(actualChunks);
263 auto lambda = [&](unsigned int i)
264 {
265 std::vector<retType> partialResults(std::min(nTimes-i, step));
266 for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
267 partialResults[j] = func();
268 }
269 reslist[i / step] = Reduce(partialResults, redfunc);
270 };
271 ParallelFor(0U, nTimes, step, lambda);
272
273 return reslist;
274 }
275
276 //////////////////////////////////////////////////////////////////////////
277 /// Execute func in parallel, taking an element of an
278 /// std::vector as argument.
279 /// A vector containg executions' results is returned.
280 // actual implementation of the Map method. all other calls with arguments eventually
281 // call this one
282 template<class F, class T, class Cond>
283 auto TThreadExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type> {
284 // //check whether func is callable
285 using retType = decltype(func(args.front()));
286
287 unsigned int nToProcess = args.size();
288 std::vector<retType> reslist(nToProcess);
289
290 auto lambda = [&](unsigned int i)
291 {
292 reslist[i] = func(args[i]);
293 };
294
295 ParallelFor(0U, nToProcess, 1, lambda);
296
297 return reslist;
298 }
299
300 //////////////////////////////////////////////////////////////////////////
301 /// Execute func in parallel, taking an element of a
302 /// sequence as argument.
303 /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction\n
304 /// A vector containg partial reductions' results is returned.
305 template<class F, class INTEGER, class R, class Cond>
306 auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
307 if (nChunks == 0)
308 {
309 return Map(func, args);
310 }
311
312 unsigned start = *args.begin();
313 unsigned end = *args.end();
314 unsigned seqStep = args.step();
315 unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
316 // Avoid empty chunks
317 unsigned actualChunks = (end - start + step - 1) / step;
318
319 using retType = decltype(func(start));
320 std::vector<retType> reslist(actualChunks);
321 auto lambda = [&](unsigned int i)
322 {
323 std::vector<retType> partialResults(std::min(end-i, step));
324 for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
325 partialResults[j] = func(i + j);
326 }
327 reslist[i / step] = Reduce(partialResults, redfunc);
328 };
329 ParallelFor(start, end, step, lambda);
330
331 return reslist;
332 }
333
334/// \cond
335 //////////////////////////////////////////////////////////////////////////
336 /// Execute func in parallel, taking an element of an
337 /// std::vector as argument. Divides and groups the executions in nChunks with partial reduction.
338 /// If it doesn't make sense will reduce the number of chunks.\n
339 /// A vector containg partial reductions' results is returned.
340 template<class F, class T, class R, class Cond>
341 auto TThreadExecutor::Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
342 if (nChunks == 0)
343 {
344 return Map(func, args);
345 }
346
347 unsigned int nToProcess = args.size();
348 unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
349 // Avoid empty chunks
350 unsigned actualChunks = (nToProcess + step - 1) / step;
351
352 using retType = decltype(func(args.front()));
353 std::vector<retType> reslist(actualChunks);
354 auto lambda = [&](unsigned int i)
355 {
356 std::vector<T> partialResults(step);
357 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
358 partialResults[j] = func(args[i + j]);
359 }
360 reslist[i / step] = Reduce(partialResults, redfunc);
361 };
362
363 ParallelFor(0U, nToProcess, step, lambda);
364
365 return reslist;
366 }
367
368 //////////////////////////////////////////////////////////////////////////
369 /// Execute func in parallel, taking an element of an
370 /// std::initializer_list as an argument. Divides and groups the executions in nChunks with partial reduction.
371 /// If it doesn't make sense will reduce the number of chunks.\n
372 /// A vector containg partial reductions' results is returned.
373 template<class F, class T, class R, class Cond>
374 auto TThreadExecutor::Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
375 std::vector<T> vargs(std::move(args));
376 const auto &reslist = Map(func, vargs, redfunc, nChunks);
377 return reslist;
378 }
379/// \endcond
380
381
382 //////////////////////////////////////////////////////////////////////////
383 /// This method behaves just like Map, but an additional redfunc function
384 /// must be provided. redfunc is applied to the vector Map would return and
385 /// must return the same type as func. In practice, redfunc can be used to
386 /// "squash" the vector returned by Map into a single object by merging,
387 /// adding, mixing the elements of the vector.\n
388 /// The fourth argument indicates the number of chunks we want to divide our work in.
389 template<class F, class R, class Cond>
390 auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type {
391 return Reduce(Map(func, nTimes), redfunc);
392 }
393
394 template<class F, class R, class Cond>
395 auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type {
396 return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc);
397 }
398
399 template<class F, class INTEGER, class R, class Cond>
400 auto TThreadExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type {
401 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
402 }
403 /// \cond
404 template<class F, class T, class R, class Cond>
405 auto TThreadExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
406 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
407 }
408 /// \endcond
409
410 template<class F, class T, class R, class Cond>
411 auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type {
412 return Reduce(Map(func, args), redfunc);
413 }
414
415 template<class F, class T, class R, class Cond>
416 auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
417 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
418 }
419
420 //////////////////////////////////////////////////////////////////////////
421 /// "Reduce" an std::vector into a single object in parallel by passing a
422 /// binary operator as the second argument to act on pairs of elements of the std::vector.
423 template<class T, class BINARYOP>
424 auto TThreadExecutor::Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
425 {
426 // check we can apply reduce to objs
427 static_assert(std::is_same<decltype(redfunc(objs.front(), objs.front())), T>::value, "redfunc does not have the correct signature");
428 return ParallelReduce(objs, redfunc);
429 }
430
431 //////////////////////////////////////////////////////////////////////////
432 /// "Reduce" an std::vector into a single object by passing a
433 /// function as the second argument defining the reduction operation.
434 template<class T, class R>
435 auto TThreadExecutor::Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
436 {
437 // check we can apply reduce to objs
438 static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
439 return SeqReduce(objs, redfunc);
440 }
441
442 template<class T, class R>
443 auto TThreadExecutor::SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
444 {
445 return redfunc(objs);
446 }
447
448} // namespace ROOT
449
450#endif // R__USE_IMT
451#endif
#define b(i)
Definition: RSha256.hxx:100
#define f(i)
Definition: RSha256.hxx:104
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
unsigned int UInt_t
Definition: RtypesCore.h:44
int type
Definition: TGX11.cxx:120
This class defines an interface to execute the same task multiple times in parallel,...
Definition: TExecutor.hxx:61
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
iterator begin() const
Definition: TSeq.hxx:163
T step() const
Definition: TSeq.hxx:184
iterator end() const
Definition: TSeq.hxx:166
This class provides a simple interface to execute the same task multiple times in parallel,...
auto SeqReduce(const std::vector< T > &objs, R redfunc) -> decltype(redfunc(objs))
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
TThreadExecutor & operator=(TThreadExecutor &)=delete
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
auto Map(F func, std::vector< T > &args, R redfunc, unsigned nChunks) -> std::vector< typename std::result_of< F(T)>::type >
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > fTaskArenaW
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
auto Reduce(const std::vector< T > &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
"Reduce" an std::vector into a single object in parallel by passing a binary operator as the second a...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
TThreadExecutor(TThreadExecutor &)=delete
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
auto Map(F func, std::initializer_list< T > args, R redfunc, unsigned nChunks) -> std::vector< typename std::result_of< F(T)>::type >
#define F(x, y, z)
double T(double x)
Definition: ChebyshevPol.h:34
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
auto Map(Args &&... args) -> decltype(ROOT::Detail::VecOps::MapFromTuple(std::forward_as_tuple(args...), std::make_index_sequence< sizeof...(args) - 1 >()))
Create new collection applying a callable to the elements of the input collection.
Definition: RVec.hxx:910
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
auto * a
Definition: textangle.C:12