Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TThreadExecutor.cxx
Go to the documentation of this file.
1// Require TBB without captured exceptions
2#define TBB_USE_CAPTURED_EXCEPTION 0
3
6#if !defined(_MSC_VER)
7#pragma GCC diagnostic push
8#pragma GCC diagnostic ignored "-Wshadow"
9#endif
10#include "tbb/tbb.h"
11#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
12#include "tbb/global_control.h"
13#if !defined(_MSC_VER)
14#pragma GCC diagnostic pop
15#endif
16
17//////////////////////////////////////////////////////////////////////////
18///
19/// \class ROOT::TThreadExecutor
20/// \brief This class provides a simple interface to execute the same task
21/// multiple times in parallel threads, possibly with different arguments every
22/// time.
23///
24/// ### ROOT::TThreadExecutor::Map
25/// This class inherits its interfaces from ROOT::TExecutorCRTP\n, adapting them for multithreaded
26/// parallelism and extends them supporting:
27/// * Parallel `Foreach` operations.
28/// * Custom task granularity and partial reduction, by specifying reduction function
29/// and the number of chunks as extra parameters for the Map call. This is specially useful
30/// to reduce the size of intermediate results when dealing with a sizeable number of elements
31/// in the input data.
32///
33/// The two possible usages of the Map method are:\n
34/// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
35/// * Map(F func, T& args): func is executed on each element of the collection of arguments args
36///
37/// For either signature, func is executed as many times as needed by a pool of
38/// nThreads threads, where nThreads typically defaults to the number of cores.\n
39/// A collection containing the result of each execution is returned.\n
40/// **Note:** the user is responsible for the deletion of any object that might
41/// be created upon execution of func, returned objects included: ROOT::TThreadExecutor never
42/// deletes what it returns, it simply forgets it.\n
43///
44/// \param func
45/// \parblock
46/// a callable object, such as a lambda expression, an std::function, a
47/// functor object or a function that takes zero arguments (for the first signature)
48/// or one (for the second signature).
49/// \endparblock
50/// \param args
51/// \parblock
52/// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
53/// An integer only for the first.
54/// \endparblock
55/// **Note:** in cases where the function to be executed takes more than
56/// zero/one argument but all are fixed except zero/one, the function can be wrapped
57/// in a lambda or via std::bind to give it the right signature.\n
58///
59/// #### Return value:
60/// An std::vector. The elements in the container
61/// will be the objects returned by func.
62///
63///
64/// #### Examples:
65///
66/// ~~~{.cpp}
67/// root[] ROOT::TThreadExecutor pool; auto hists = pool.Map(CreateHisto, 10);
68/// root[] ROOT::TThreadExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
69/// ~~~
70///
71/// ### ROOT::TThreadExecutor::MapReduce
72/// This set of methods behaves exactly like Map, but takes an additional
73/// function as a third argument. This function is applied to the set of
74/// objects returned by the corresponding Map execution to "squash" them
75/// into a single object. This function should be independent of the size of
76/// the vector returned by Map due to optimization of the number of chunks.
77///
78/// If this function is a binary operator, the "squashing" will be performed in parallel.
79/// This is exclusive to ROOT::TThreadExecutor and not any other ROOT::TExecutorCRTP-derived classes.\n
80///
81/// An integer can be passed as the fourth argument indicating the number of chunks we want to divide our work in.
82/// This may be useful to avoid the overhead introduced when running really short tasks.
83///
84/// #### Examples:
85/// ~~~{.cpp}
86/// root[] ROOT::TThreadExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](const std::vector<int> &v) { return std::accumulate(v.begin(), v.end(), 0); })
87/// root[] ROOT::TThreadExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
88/// ~~~
89///
90//////////////////////////////////////////////////////////////////////////
91
92/*
93VERY IMPORTANT NOTE ABOUT WORK ISOLATION
94
95We enclose the parallel_for and parallel_reduce invocations in a
96task_arena::isolate because we want to prevent a thread to start executing an
97outer task when the task it's running spawned subtasks, e.g. with a parallel_for,
98and is waiting on inner tasks to be completed.
99
100While this change has a negligible performance impact, it has benefits for
101several applications, for example big parallelised HEP frameworks and
102RDataFrame analyses.
103- For HEP Frameworks, without work isolation, it can happen that a huge
104framework task is pulled by a yielding ROOT task.
105This causes to delay the processing of the event which is interrupted by the
106long task.
107For example, work isolation avoids that during the wait due to the parallel
108flushing of baskets, a very long simulation task is pulled in by the idle task.
109- For RDataFrame analyses we want to guarantee that each entry is processed from
110the beginning to the end without TBB interrupting it to pull in other work items.
111As a corollary, the usage of ROOT (or TBB in work isolation mode) in actions
112and transformations guarantee that each entry is processed from the beginning to
113the end without being interrupted by the processing of outer tasks.
114*/
115
116namespace ROOT {
117namespace Internal {
118
119/// A helper function to implement the TThreadExecutor::ParallelReduce methods
120template<typename T>
121static T ParallelReduceHelper(const std::vector<T> &objs, const std::function<T(T a, T b)> &redfunc)
122{
123 using BRange_t = tbb::blocked_range<decltype(objs.begin())>;
124
125 auto pred = [redfunc](BRange_t const & range, T init) {
126 return std::accumulate(range.begin(), range.end(), init, redfunc);
127 };
128
130
131 return tbb::this_task_arena::isolate([&] {
132 return tbb::parallel_reduce(objRange, T{}, pred, redfunc);
133 });
134
135}
136
137} // End NS Internal
138
139//////////////////////////////////////////////////////////////////////////
140/// \brief Class constructor.
141/// If the scheduler is active (e.g. because another TThreadExecutor is in flight, or ROOT::EnableImplicitMT() was
142/// called), work with the current pool of threads.
143/// If not, initialize the pool of threads, spawning nThreads. nThreads' default value, 0, initializes the
144/// pool with as many logical threads as are available in the system (see NLogicalCores in RTaskArenaWrapper.cxx).
145///
146/// At construction time, TThreadExecutor automatically enables ROOT's thread-safety locks as per calling
147/// ROOT::EnableThreadSafety().
152
153//////////////////////////////////////////////////////////////////////////
154/// \brief Execute a function in parallel over the indices of a loop.
155///
156/// \param start Start index of the loop.
157/// \param end End index of the loop.
158/// \param step Step size of the loop.
159/// \param f function to execute.
160void TThreadExecutor::ParallelFor(unsigned start, unsigned end, unsigned step,
161 const std::function<void(unsigned int i)> &f)
162{
163 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
164 Warning("TThreadExecutor::ParallelFor",
165 "tbb::global_control is limiting the number of parallel workers."
166 " Proceeding with %zu threads this time",
167 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
168 }
169 fTaskArenaW->Access().execute([&] {
170 tbb::this_task_arena::isolate([&] {
171 tbb::parallel_for(start, end, step, f);
172 });
173 });
174}
175
176//////////////////////////////////////////////////////////////////////////
177/// \brief "Reduce" in parallel an std::vector<double> into a single double value
178///
179/// \param objs A vector of elements to combine.
180/// \param redfunc Reduction function to combine the elements of the vector `objs`.
181/// \return A value result of combining the vector elements into a single object of the same type.
182double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
183 const std::function<double(double a, double b)> &redfunc)
184{
185 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
186 Warning("TThreadExecutor::ParallelReduce",
187 "tbb::global_control is limiting the number of parallel workers."
188 " Proceeding with %zu threads this time",
189 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
190 }
191 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc); });
192}
193
194//////////////////////////////////////////////////////////////////////////
195/// \brief "Reduce" in parallel an std::vector<float> into a single float value
196///
197/// \param objs A vector of elements to combine.
198/// \param redfunc Reduction function to combine the elements of the vector `objs`.
199/// \return A value result of combining the vector elements into a single object of the same type.
200float TThreadExecutor::ParallelReduce(const std::vector<float> &objs,
201 const std::function<float(float a, float b)> &redfunc)
202{
203 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
204 Warning("TThreadExecutor::ParallelReduce",
205 "tbb::global_control is limiting the number of parallel workers."
206 " Proceeding with %zu threads this time",
207 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
208 }
209 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc); });
210}
211
212//////////////////////////////////////////////////////////////////////////
213/// \brief Returns the number of worker threads in the task arena.
214/// \return the number of worker threads assigned to the task arena.
216{
217 return fTaskArenaW->TaskArenaSize();
218}
219
220} // namespace ROOT
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
const_iterator begin() const
const_iterator end() const
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
Execute a function in parallel over the indices of a loop.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > fTaskArenaW
Pointer to the TBB task arena wrapper.
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
"Reduce" in parallel an std::vector<double> into a single double value
static T ParallelReduceHelper(const std::vector< T > &objs, const std::function< T(T a, T b)> &redfunc)
A helper function to implement the TThreadExecutor::ParallelReduce methods.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.