Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TThreadExecutor.cxx
Go to the documentation of this file.
3#if !defined(_MSC_VER)
4#pragma GCC diagnostic push
5#pragma GCC diagnostic ignored "-Wshadow"
6#endif
7#include "tbb/tbb.h"
8#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
9#include "tbb/global_control.h"
10#if !defined(_MSC_VER)
11#pragma GCC diagnostic pop
12#endif
13
14//////////////////////////////////////////////////////////////////////////
15///
16/// \class ROOT::TThreadExecutor
17/// \ingroup Parallelism
18/// \brief This class provides a simple interface to execute the same task
19/// multiple times in parallel threads, possibly with different arguments every
20/// time.
21///
22/// ### ROOT::TThreadExecutor::Map
23/// This class inherits its interfaces from ROOT::TExecutorCRTP\n, adapting them for multithreaded
24/// parallelism and extends them supporting:
25/// * Parallel `Foreach` operations.
26/// * Custom task granularity and partial reduction, by specifying reduction function
27/// and the number of chunks as extra parameters for the Map call. This is specially useful
28/// to reduce the size of intermediate results when dealing with a sizeable number of elements
29/// in the input data.
30///
31/// The two possible usages of the Map method are:\n
32/// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
33/// * Map(F func, T& args): func is executed on each element of the collection of arguments args
34///
35/// For either signature, func is executed as many times as needed by a pool of
36/// nThreads threads, where nThreads typically defaults to the number of cores.\n
37/// A collection containing the result of each execution is returned.\n
38/// **Note:** the user is responsible for the deletion of any object that might
39/// be created upon execution of func, returned objects included: ROOT::TThreadExecutor never
40/// deletes what it returns, it simply forgets it.\n
41///
42/// \param func
43/// \parblock
44/// a callable object, such as a lambda expression, an std::function, a
45/// functor object or a function that takes zero arguments (for the first signature)
46/// or one (for the second signature).
47/// \endparblock
48/// \param args
49/// \parblock
50/// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
51/// An integer only for the first.
52/// \endparblock
53/// **Note:** in cases where the function to be executed takes more than
54/// zero/one argument but all are fixed except zero/one, the function can be wrapped
55/// in a lambda or via std::bind to give it the right signature.\n
56///
57/// #### Return value:
58/// An std::vector. The elements in the container
59/// will be the objects returned by func.
60///
61///
62/// #### Examples:
63///
64/// ~~~{.cpp}
65/// root[] ROOT::TThreadExecutor pool; auto hists = pool.Map(CreateHisto, 10);
66/// root[] ROOT::TThreadExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
67/// ~~~
68///
69/// ### ROOT::TThreadExecutor::MapReduce
70/// This set of methods behaves exactly like Map, but takes an additional
71/// function as a third argument. This function is applied to the set of
72/// objects returned by the corresponding Map execution to "squash" them
73/// into a single object. This function should be independent of the size of
74/// the vector returned by Map due to optimization of the number of chunks.
75///
76/// If this function is a binary operator, the "squashing" will be performed in parallel.
77/// This is exclusive to ROOT::TThreadExecutor and not any other ROOT::TExecutorCRTP-derived classes.\n
78///
79/// An integer can be passed as the fourth argument indicating the number of chunks we want to divide our work in.
80/// This may be useful to avoid the overhead introduced when running really short tasks.
81///
82/// #### Examples:
83/// ~~~{.cpp}
84/// root[] ROOT::TThreadExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](const std::vector<int> &v) { return std::accumulate(v.begin(), v.end(), 0); })
85/// root[] ROOT::TThreadExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
86/// ~~~
87///
88//////////////////////////////////////////////////////////////////////////
89
90/*
91VERY IMPORTANT NOTE ABOUT WORK ISOLATION
92
93We enclose the parallel_for and parallel_reduce invocations in a
94task_arena::isolate because we want to prevent a thread to start executing an
95outer task when the task it's running spawned subtasks, e.g. with a parallel_for,
96and is waiting on inner tasks to be completed.
97
98While this change has a negligible performance impact, it has benefits for
99several applications, for example big parallelised HEP frameworks and
100RDataFrame analyses.
101- For HEP Frameworks, without work isolation, it can happen that a huge
102framework task is pulled by a yielding ROOT task.
103This causes to delay the processing of the event which is interrupted by the
104long task.
105For example, work isolation avoids that during the wait due to the parallel
106flushing of baskets, a very long simulation task is pulled in by the idle task.
107- For RDataFrame analyses we want to guarantee that each entry is processed from
108the beginning to the end without TBB interrupting it to pull in other work items.
109As a corollary, the usage of ROOT (or TBB in work isolation mode) in actions
110and transformations guarantee that each entry is processed from the beginning to
111the end without being interrupted by the processing of outer tasks.
112*/
113
114namespace ROOT {
115namespace Internal {
116
117/// A helper function to implement the TThreadExecutor::ParallelReduce methods
118template<typename T>
119static T ParallelReduceHelper(const std::vector<T> &objs, const std::function<T(T a, T b)> &redfunc)
120{
121 using BRange_t = tbb::blocked_range<decltype(objs.begin())>;
122
123 auto pred = [redfunc](BRange_t const & range, T init) {
124 return std::accumulate(range.begin(), range.end(), init, redfunc);
125 };
126
127 BRange_t objRange(objs.begin(), objs.end());
128
129 return tbb::this_task_arena::isolate([&] {
130 return tbb::parallel_reduce(objRange, T{}, pred, redfunc);
131 });
132
133}
134
135} // End NS Internal
136
137//////////////////////////////////////////////////////////////////////////
138/// \brief Class constructor.
139/// If the scheduler is active (e.g. because another TThreadExecutor is in flight, or ROOT::EnableImplicitMT() was
140/// called), work with the current pool of threads.
141/// If not, initialize the pool of threads, spawning nThreads. nThreads' default value, 0, initializes the
142/// pool with as many logical threads as are available in the system (see NLogicalCores in RTaskArenaWrapper.cxx).
143///
144/// At construction time, TThreadExecutor automatically enables ROOT's thread-safety locks as per calling
145/// ROOT::EnableThreadSafety().
147{
149}
150
151//////////////////////////////////////////////////////////////////////////
152/// \brief Execute a function in parallel over the indices of a loop.
153///
154/// \param start Start index of the loop.
155/// \param end End index of the loop.
156/// \param step Step size of the loop.
157/// \param f function to execute.
158void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step,
159 const std::function<void(unsigned int i)> &f)
160{
161 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
162 Warning("TThreadExecutor::ParallelFor",
163 "tbb::global_control is limiting the number of parallel workers."
164 " Proceeding with %zu threads this time",
165 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
166 }
167 fTaskArenaW->Access().execute([&] {
168 tbb::this_task_arena::isolate([&] {
169 tbb::parallel_for(start, end, step, f);
170 });
171 });
172}
173
174//////////////////////////////////////////////////////////////////////////
175/// \brief "Reduce" in parallel an std::vector<double> into a single double value
176///
177/// \param objs A vector of elements to combine.
178/// \param redfunc Reduction function to combine the elements of the vector `objs`.
179/// \return A value result of combining the vector elements into a single object of the same type.
180double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
181 const std::function<double(double a, double b)> &redfunc)
182{
183 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
184 Warning("TThreadExecutor::ParallelReduce",
185 "tbb::global_control is limiting the number of parallel workers."
186 " Proceeding with %zu threads this time",
187 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
188 }
189 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc); });
190}
191
192//////////////////////////////////////////////////////////////////////////
193/// \brief "Reduce" in parallel an std::vector<float> into a single float value
194///
195/// \param objs A vector of elements to combine.
196/// \param redfunc Reduction function to combine the elements of the vector `objs`.
197/// \return A value result of combining the vector elements into a single object of the same type.
198float TThreadExecutor::ParallelReduce(const std::vector<float> &objs,
199 const std::function<float(float a, float b)> &redfunc)
200{
201 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
202 Warning("TThreadExecutor::ParallelReduce",
203 "tbb::global_control is limiting the number of parallel workers."
204 " Proceeding with %zu threads this time",
205 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
206 }
207 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc); });
208}
209
210//////////////////////////////////////////////////////////////////////////
211/// \brief Returns the number of worker threads in the task arena.
212/// \return the number of worker threads assigned to the task arena.
214{
215 return fTaskArenaW->TaskArenaSize();
216}
217
218} // namespace ROOT
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define a(i)
Definition RSha256.hxx:99
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:231
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
Execute a function in parallel over the indices of a loop.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > fTaskArenaW
Pointer to the TBB task arena wrapper.
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
"Reduce" in parallel an std::vector<double> into a single double value
static T ParallelReduceHelper(const std::vector< T > &objs, const std::function< T(T a, T b)> &redfunc)
A helper function to implement the TThreadExecutor::ParallelReduce methods.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...