Logo ROOT  
Reference Guide
TProcessExecutor.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
14
15//////////////////////////////////////////////////////////////////////////
16///
17/// \class ROOT::TProcessExecutor
18/// \ingroup Parallelism
19/// \brief This class provides a simple interface to execute the same task
20/// multiple times in parallel, possibly with different arguments every
21/// time. This mimics the behaviour of python's pool.Map method.
22///
23/// ###ROOT::TProcessExecutor::Map
24/// This class inherits its interfaces from ROOT::TExecutor\n.
25/// The two possible usages of the Map method are:\n
26/// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
27/// * Map(F func, T& args): func is executed on each element of the collection of arguments args
28///
29/// For either signature, func is executed as many times as needed by a pool of
30/// fNWorkers workers; the number of workers can be passed to the constructor
31/// or set via SetNWorkers. It defaults to the number of cores.\n
32/// A collection containing the result of each execution is returned.\n
33/// **Note:** the user is responsible for the deletion of any object that might
34/// be created upon execution of func, returned objects included: ROOT::TProcessExecutor never
35/// deletes what it returns, it simply forgets it.\n
36/// **Note:** that the usage of ROOT::TProcessExecutor::Map is indicated only when the task to be
37/// executed takes more than a few seconds, otherwise the overhead introduced
38/// by Map will outrun the benefits of parallel execution on most machines.
39///
40/// \param func
41/// \parblock
42/// a lambda expression, an std::function, a loaded macro, a
43/// functor class or a function that takes zero arguments (for the first signature)
44/// or one (for the second signature).
45/// \endparblock
46/// \param args
47/// \parblock
48/// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
49/// An integer only for the first.
50/// \endparblock
51/// **Note:** in cases where the function to be executed takes more than
52/// zero/one argument but all are fixed except zero/one, the function can be wrapped
53/// in a lambda or via std::bind to give it the right signature.\n
54/// **Note:** the user should take care of initializing random seeds differently in each
55/// process (e.g. using the process id in the seed). Otherwise several parallel executions
56/// might generate the same sequence of pseudo-random numbers.
57///
58/// #### Return value:
59/// An std::vector. The elements in the container
60/// will be the objects returned by func.
61///
62///
63/// #### Examples:
64///
65/// ~~~{.cpp}
66/// root[] ROOT::TProcessExecutor pool; auto hists = pool.Map(CreateHisto, 10);
67/// root[] ROOT::TProcessExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
68/// ~~~
69///
70/// ###ROOT::TProcessExecutor::MapReduce
71/// This set of methods behaves exactly like Map, but takes an additional
72/// function as a third argument. This function is applied to the set of
73/// objects returned by the corresponding Map execution to "squash" them
74/// to a single object.
75///
76/// ####Examples:
77/// ~~~{.cpp}
78/// root[] ROOT::TProcessExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
79/// root[] ROOT::TProcessExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
80/// ~~~
81///
82//////////////////////////////////////////////////////////////////////////
83
84namespace ROOT {
85//////////////////////////////////////////////////////////////////////////
86/// Class constructor.
87/// nWorkers is the number of times this ROOT session will be forked, i.e.
88/// the number of workers that will be spawned.
89TProcessExecutor::TProcessExecutor(unsigned nWorkers) : TMPClient(nWorkers)
90{
91 Reset();
92}
93
94//////////////////////////////////////////////////////////////////////////
95/// Reset TProcessExecutor's state.
97{
98 fNProcessed = 0;
99 fNToProcess = 0;
101}
102
103//////////////////////////////////////////////////////////////////////////
104/// Reply to a worker who just sent a result.
105/// If another argument to process exists, tell the worker. Otherwise
106/// send a shutdown order.
108{
109 if (fNProcessed < fNToProcess) {
110 //this cannot be a "greedy worker" task
111 if (fTaskType == ETask::kMap)
113 else if (fTaskType == ETask::kMapWithArg)
115 ++fNProcessed;
116 } else //whatever the task is, we are done
118}
119
120
121//////////////////////////////////////////////////////////////////////////
122/// Reply to a worker who is idle.
123/// If another argument to process exists, tell the worker. Otherwise
124/// ask for a result
126{
127 if (fNProcessed < fNToProcess) {
128 //we are executing a "greedy worker" task
131 else if (fTaskType == ETask::kMapRed)
133 ++fNProcessed;
134 } else
136}
137
138} // namespace ROOT
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
ETask fTaskType
the kind of task that is being executed, if any
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition: MPCode.h:32
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kExecFunc
Execute function without arguments.
Definition: MPCode.h:31
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
static constexpr double s