Logo ROOT   master
Reference Guide
TThreadExecutor.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Xavier Valls March 2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadExecutor
13 #define ROOT_TThreadExecutor
14 
15 #include "RConfigure.h"
16 
17 // exclude in case ROOT does not have IMT support
18 #ifndef R__USE_IMT
19 // No need to error out for dictionaries.
20 # if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
21 # error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT."
22 # endif
23 #else
24 
25 #include "ROOT/TExecutor.hxx"
26 #include "ROOT/TPoolManager.hxx"
27 #include "TError.h"
28 #include <functional>
29 #include <memory>
30 #include <numeric>
31 
32 namespace ROOT {
33 
34  class TThreadExecutor: public TExecutor<TThreadExecutor> {
35  public:
36 
37  explicit TThreadExecutor(UInt_t nThreads = 0u);
38 
39  TThreadExecutor(TThreadExecutor &) = delete;
41 
42  template<class F>
43  void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
44  template<class F, class INTEGER>
45  void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
46  /// \cond
47  template<class F, class T>
48  void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
49  /// \endcond
50  template<class F, class T>
51  void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
52  template<class F, class T>
53  void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);
54 
56  template<class F, class Cond = noReferenceCond<F>>
57  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
58  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
60  template<class F, class T, class Cond = noReferenceCond<F, T>>
61  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
62 
63  // // MapReduce
64  // // the late return types also check at compile-time whether redfunc is compatible with func,
65  // // other than checking that func is compatible with the type of arguments.
66  // // a static_assert check in TThreadExecutor::Reduce is used to check that redfunc is compatible with the type returned by func
68  template<class F, class R, class Cond = noReferenceCond<F>>
69  auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
70  template<class F, class R, class Cond = noReferenceCond<F>>
71  auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type;
72  template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
73  auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type;
74  /// \cond
75  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
76  auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
77  /// \endcond
78  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
79  auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
80  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
81  auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
82 
84  template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()));
85  template<class T, class R> auto Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
86 
87  unsigned GetPoolSize();
88 
89  protected:
90  template<class F, class R, class Cond = noReferenceCond<F>>
91  auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type>;
92  template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
93  auto Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
94  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
95  auto Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
96  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
97  auto Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
98 
99  private:
100  void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function<void(unsigned int i)> &f);
101  double ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc);
102  float ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc);
103  template<class T, class R>
104  auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
105 
106  std::shared_ptr<ROOT::Internal::TPoolManager> fSched = nullptr;
107  };
108 
109  /************ TEMPLATE METHODS IMPLEMENTATION ******************/
110 
111  //////////////////////////////////////////////////////////////////////////
112  /// Execute func (with no arguments) nTimes in parallel.
113  /// Functions that take more than zero arguments can be executed (with
114  /// fixed arguments) by wrapping them in a lambda or with std::bind.
115  template<class F>
116  void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
117  if (nChunks == 0) {
118  ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
119  return;
120  }
121 
122  unsigned step = (nTimes + nChunks - 1) / nChunks;
123  auto lambda = [&](unsigned int i)
124  {
125  for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
126  func();
127  }
128  };
129  ParallelFor(0U, nTimes, step, lambda);
130  }
131 
132  //////////////////////////////////////////////////////////////////////////
133  /// Execute func in parallel, taking an element of a
134  /// sequence as argument.
135  template<class F, class INTEGER>
136  void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
137  if (nChunks == 0) {
138  ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
139  return;
140  }
141  unsigned start = *args.begin();
142  unsigned end = *args.end();
143  unsigned seqStep = args.step();
144  unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
145 
146  auto lambda = [&](unsigned int i)
147  {
148  for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
149  func(i + j);
150  }
151  };
152  ParallelFor(start, end, step, lambda);
153  }
154 
155  /// \cond
156  //////////////////////////////////////////////////////////////////////////
157  /// Execute func in parallel, taking an element of a
158  /// initializer_list as argument.
159  template<class F, class T>
160  void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
161  std::vector<T> vargs(std::move(args));
162  Foreach(func, vargs, nChunks);
163  }
164  /// \endcond
165 
166  //////////////////////////////////////////////////////////////////////////
167  /// Execute func in parallel, taking an element of an
168  /// std::vector as argument.
169  template<class F, class T>
170  void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
171  unsigned int nToProcess = args.size();
172  if (nChunks == 0) {
173  ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
174  return;
175  }
176 
177  unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
178  auto lambda = [&](unsigned int i)
179  {
180  for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
181  func(args[i + j]);
182  }
183  };
184  ParallelFor(0U, nToProcess, step, lambda);
185  }
186 
187  //////////////////////////////////////////////////////////////////////////
188  /// Execute func in parallel, taking an element of a std::vector as argument.
189  template<class F, class T>
190  void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
191  unsigned int nToProcess = args.size();
192  if (nChunks == 0) {
193  ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
194  return;
195  }
196 
197  unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
198  auto lambda = [&](unsigned int i)
199  {
200  for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
201  func(args[i + j]);
202  }
203  };
204  ParallelFor(0U, nToProcess, step, lambda);
205  }
206 
207  //////////////////////////////////////////////////////////////////////////
208  /// Execute func (with no arguments) nTimes in parallel.
209  /// A vector containg executions' results is returned.
210  /// Functions that take more than zero arguments can be executed (with
211  /// fixed arguments) by wrapping them in a lambda or with std::bind.
212  template<class F, class Cond>
214  using retType = decltype(func());
215  std::vector<retType> reslist(nTimes);
216  auto lambda = [&](unsigned int i)
217  {
218  reslist[i] = func();
219  };
220  ParallelFor(0U, nTimes, 1, lambda);
221 
222  return reslist;
223  }
224 
225  //////////////////////////////////////////////////////////////////////////
226  /// Execute func in parallel, taking an element of a
227  /// sequence as argument.
228  /// A vector containg executions' results is returned.
229  template<class F, class INTEGER, class Cond>
231  unsigned start = *args.begin();
232  unsigned end = *args.end();
233  unsigned seqStep = args.step();
234 
235  using retType = decltype(func(start));
236  std::vector<retType> reslist(args.size());
237  auto lambda = [&](unsigned int i)
238  {
239  reslist[i] = func(i);
240  };
241  ParallelFor(start, end, seqStep, lambda);
242 
243  return reslist;
244  }
245 
246  //////////////////////////////////////////////////////////////////////////
247  /// Execute func (with no arguments) nTimes in parallel.
248  /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction;
249  /// A vector containg partial reductions' results is returned.
250  template<class F, class R, class Cond>
251  auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type> {
252  if (nChunks == 0)
253  {
254  return Map(func, nTimes);
255  }
256 
257  unsigned step = (nTimes + nChunks - 1) / nChunks;
258  // Avoid empty chunks
259  unsigned actualChunks = (nTimes + step - 1) / step;
260  using retType = decltype(func());
261  std::vector<retType> reslist(actualChunks);
262  auto lambda = [&](unsigned int i)
263  {
264  std::vector<retType> partialResults(std::min(nTimes-i, step));
265  for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
266  partialResults[j] = func();
267  }
268  reslist[i / step] = Reduce(partialResults, redfunc);
269  };
270  ParallelFor(0U, nTimes, step, lambda);
271 
272  return reslist;
273  }
274 
275  //////////////////////////////////////////////////////////////////////////
276  /// Execute func in parallel, taking an element of an
277  /// std::vector as argument.
278  /// A vector containg executions' results is returned.
279  // actual implementation of the Map method. all other calls with arguments eventually
280  // call this one
281  template<class F, class T, class Cond>
283  // //check whether func is callable
284  using retType = decltype(func(args.front()));
285 
286  unsigned int nToProcess = args.size();
287  std::vector<retType> reslist(nToProcess);
288 
289  auto lambda = [&](unsigned int i)
290  {
291  reslist[i] = func(args[i]);
292  };
293 
294  ParallelFor(0U, nToProcess, 1, lambda);
295 
296  return reslist;
297  }
298 
299  //////////////////////////////////////////////////////////////////////////
300  /// Execute func in parallel, taking an element of a
301  /// sequence as argument.
302  /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction\n
303  /// A vector containg partial reductions' results is returned.
304  template<class F, class INTEGER, class R, class Cond>
306  if (nChunks == 0)
307  {
308  return Map(func, args);
309  }
310 
311  unsigned start = *args.begin();
312  unsigned end = *args.end();
313  unsigned seqStep = args.step();
314  unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
315  // Avoid empty chunks
316  unsigned actualChunks = (end - start + step - 1) / step;
317 
318  using retType = decltype(func(start));
319  std::vector<retType> reslist(actualChunks);
320  auto lambda = [&](unsigned int i)
321  {
322  std::vector<retType> partialResults(std::min(end-i, step));
323  for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
324  partialResults[j] = func(i + j);
325  }
326  reslist[i / step] = Reduce(partialResults, redfunc);
327  };
328  ParallelFor(start, end, step, lambda);
329 
330  return reslist;
331  }
332 
333 /// \cond
334  //////////////////////////////////////////////////////////////////////////
335  /// Execute func in parallel, taking an element of an
336  /// std::vector as argument. Divides and groups the executions in nChunks with partial reduction.
337  /// If it doesn't make sense will reduce the number of chunks.\n
338  /// A vector containg partial reductions' results is returned.
339  template<class F, class T, class R, class Cond>
340  auto TThreadExecutor::Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
341  if (nChunks == 0)
342  {
343  return Map(func, args);
344  }
345 
346  unsigned int nToProcess = args.size();
347  unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
348  // Avoid empty chunks
349  unsigned actualChunks = (nToProcess + step - 1) / step;
350 
351  using retType = decltype(func(args.front()));
352  std::vector<retType> reslist(actualChunks);
353  auto lambda = [&](unsigned int i)
354  {
355  std::vector<T> partialResults(step);
356  for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
357  partialResults[j] = func(args[i + j]);
358  }
359  reslist[i / step] = Reduce(partialResults, redfunc);
360  };
361 
362  ParallelFor(0U, nToProcess, step, lambda);
363 
364  return reslist;
365  }
366 
367  //////////////////////////////////////////////////////////////////////////
368  /// Execute func in parallel, taking an element of an
369  /// std::initializer_list as an argument. Divides and groups the executions in nChunks with partial reduction.
370  /// If it doesn't make sense will reduce the number of chunks.\n
371  /// A vector containg partial reductions' results is returned.
372  template<class F, class T, class R, class Cond>
373  auto TThreadExecutor::Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
374  std::vector<T> vargs(std::move(args));
375  const auto &reslist = Map(func, vargs, redfunc, nChunks);
376  return reslist;
377  }
378 /// \endcond
379 
380 
381  //////////////////////////////////////////////////////////////////////////
382  /// This method behaves just like Map, but an additional redfunc function
383  /// must be provided. redfunc is applied to the vector Map would return and
384  /// must return the same type as func. In practice, redfunc can be used to
385  /// "squash" the vector returned by Map into a single object by merging,
386  /// adding, mixing the elements of the vector.\n
387  /// The fourth argument indicates the number of chunks we want to divide our work in.
388  template<class F, class R, class Cond>
389  auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type {
390  return Reduce(Map(func, nTimes), redfunc);
391  }
392 
393  template<class F, class R, class Cond>
394  auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type {
395  return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc);
396  }
397 
398  template<class F, class INTEGER, class R, class Cond>
399  auto TThreadExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type {
400  return Reduce(Map(func, args, redfunc, nChunks), redfunc);
401  }
402  /// \cond
403  template<class F, class T, class R, class Cond>
404  auto TThreadExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
405  return Reduce(Map(func, args, redfunc, nChunks), redfunc);
406  }
407  /// \endcond
408 
409  template<class F, class T, class R, class Cond>
410  auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type {
411  return Reduce(Map(func, args), redfunc);
412  }
413 
414  template<class F, class T, class R, class Cond>
415  auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
416  return Reduce(Map(func, args, redfunc, nChunks), redfunc);
417  }
418 
419  //////////////////////////////////////////////////////////////////////////
420  /// "Reduce" an std::vector into a single object in parallel by passing a
421  /// binary operator as the second argument to act on pairs of elements of the std::vector.
422  template<class T, class BINARYOP>
423  auto TThreadExecutor::Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
424  {
425  // check we can apply reduce to objs
426  static_assert(std::is_same<decltype(redfunc(objs.front(), objs.front())), T>::value, "redfunc does not have the correct signature");
427  return ParallelReduce(objs, redfunc);
428  }
429 
430  //////////////////////////////////////////////////////////////////////////
431  /// "Reduce" an std::vector into a single object by passing a
432  /// function as the second argument defining the reduction operation.
433  template<class T, class R>
434  auto TThreadExecutor::Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
435  {
436  // check we can apply reduce to objs
437  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
438  return SeqReduce(objs, redfunc);
439  }
440 
441  template<class T, class R>
442  auto TThreadExecutor::SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
443  {
444  return redfunc(objs);
445  }
446 
447 } // namespace ROOT
448 
449 #endif // R__USE_IMT
450 #endif
Returns the available number of logical cores.
Definition: StringConv.hxx:21
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
double T(double x)
Definition: ChebyshevPol.h:34
auto SeqReduce(const std::vector< T > &objs, R redfunc) -> decltype(redfunc(objs))
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
#define f(i)
Definition: RSha256.hxx:104
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
Definition: TExecutor.hxx:61
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
auto Reduce(const std::vector< T > &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
"Reduce" an std::vector into a single object in parallel by passing a binary operator as the second a...
This class provides a simple interface to execute the same task multiple times in parallel...
#define F(x, y, z)
TThreadExecutor & operator=(TThreadExecutor &)=delete
auto * a
Definition: textangle.C:12
unsigned int UInt_t
Definition: RtypesCore.h:44
T step() const
Definition: TSeq.hxx:184
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
auto Map(Args &&... args) -> decltype(ROOT::Detail::VecOps::MapFromTuple(std::forward_as_tuple(args...), std::make_index_sequence< sizeof...(args) - 1 >()))
Create new collection applying a callable to the elements of the input collection.
Definition: RVec.hxx:909
iterator end() const
Definition: TSeq.hxx:166
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Definition: TRolke.cxx:630
std::shared_ptr< ROOT::Internal::TPoolManager > fSched
iterator begin() const
Definition: TSeq.hxx:163