ROOT  6.06/09
Reference Guide
TProcPool.cxx
Go to the documentation of this file.
1 #include "TProcPool.h"
2 
3 //////////////////////////////////////////////////////////////////////////
4 ///
5 /// \class TProcPool
6 /// \brief This class provides a simple interface to execute the same task
7 /// multiple times in parallel, possibly with different arguments every
8 /// time. This mimics the behaviour of python's pool.Map method.
9 ///
10 /// ###TProcPool::Map
11 /// The two possible usages of the Map method are:\n
12 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
13 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
14 ///
15 /// For either signature, func is executed as many times as needed by a pool of
16 /// fNWorkers workers; the number of workers can be passed to the constructor
17 /// or set via SetNWorkers. It defaults to the number of cores.\n
18 /// A collection containing the result of each execution is returned.\n
19 /// **Note:** the user is responsible for the deletion of any object that might
20 /// be created upon execution of func, returned objects included: TProcPool never
21 /// deletes what it returns, it simply forgets it.\n
22 /// **Note:** that the usage of TProcPool::Map is indicated only when the task to be
23 /// executed takes more than a few seconds, otherwise the overhead introduced
24 /// by Map will outrun the benefits of parallel execution on most machines.
25 ///
26 /// \param func
27 /// \parblock
28 /// a lambda expression, an std::function, a loaded macro, a
29 /// functor class or a function that takes zero arguments (for the first signature)
30 /// or one (for the second signature).
31 /// \endparblock
32 /// \param args
33 /// \parblock
34 /// a standard container (vector, list, deque), an initializer list
35 /// or a pointer to a TCollection (TList*, TObjArray*, ...).
36 /// \endparblock
37 /// **Note:** the version of TProcPool::Map that takes a TCollection* as argument incurs
38 /// in the overhead of copying data from the TCollection to an STL container. Only
39 /// use it when absolutely necessary.\n
40 /// **Note:** in cases where the function to be executed takes more than
41 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
42 /// in a lambda or via std::bind to give it the right signature.\n
43 /// **Note:** the user should take care of initializing random seeds differently in each
44 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
45 /// might generate the same sequence of pseudo-random numbers.
46 ///
47 /// #### Return value:
48 /// If T derives from TCollection Map returns a TObjArray, otherwise it
49 /// returns an std::vector. In both cases, the elements in the container
50 /// will be the objects returned by func.
51 ///
52 ///
53 /// #### Examples:
54 ///
55 /// ~~~{.cpp}
56 /// root[] TProcPool pool; auto hists = pool.Map(CreateHisto, 10);
57 /// root[] TProcPool pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
58 /// ~~~
59 ///
60 /// ###TProcPool::MapReduce
61 /// This set of methods behaves exactly like Map, but takes an additional
62 /// function as a third argument. This function is applied to the set of
63 /// objects returned by the corresponding Map execution to "squash" them
64 /// to a single object.
65 ///
66 /// ####Examples:
67 /// ~~~{.cpp}
68 /// root[] TProcPool pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
69 /// root[] TProcPool pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
70 /// ~~~
71 ///
72 //////////////////////////////////////////////////////////////////////////
73 
74 
75 //////////////////////////////////////////////////////////////////////////
76 /// Class constructor.
77 /// nWorkers is the number of times this ROOT session will be forked, i.e.
78 /// the number of workers that will be spawned.
79 TProcPool::TProcPool(unsigned nWorkers) : TMPClient(nWorkers)
80 {
81  Reset();
82 }
83 
84 
85 //////////////////////////////////////////////////////////////////////////
86 /// Reset TProcPool's state.
88 {
89  fNProcessed = 0;
90  fNToProcess = 0;
92 }
93 
94 
95 //////////////////////////////////////////////////////////////////////////
96 /// Reply to a worker who just sent a result.
97 /// If another argument to process exists, tell the worker. Otherwise
98 /// send a shutdown order.
100 {
101  if (fNProcessed < fNToProcess) {
102  //this cannot be a "greedy worker" task
103  if (fTask == ETask::kMap)
105  else if (fTask == ETask::kMapWithArg)
107  ++fNProcessed;
108  } else //whatever the task is, we are done
110 }
111 
112 
113 //////////////////////////////////////////////////////////////////////////
114 /// Reply to a worker who is idle.
115 /// If another argument to process exists, tell the worker. Otherwise
116 /// ask for a result
118 {
119  if (fNProcessed < fNToProcess) {
120  //we are executing a "greedy worker" task
123  else if (fTask == ETask::kMapRed)
125  else if (fTask == ETask::kProcByRange)
127  else if (fTask == ETask::kProcByFile)
129  ++fNProcessed;
130  } else
132 }
a MapReduce method with no arguments is being executed
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:37
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Definition: TProcPool.cxx:117
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
a MapReduce method with arguments is being executed
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:36
no task is being executed
enum TProcPool::ETask fTask
the kind of task that is being executed, if any
a ProcTree method is being executed and each worker will process a certain range of each file ...
unsigned fNToProcess
total number of arguments to pass to the workers
Definition: TProcPool.h:90
TProcPool(unsigned nWorkers=0)
Class constructor.
Definition: TProcPool.cxx:79
Execute function with the argument contained in the message.
Definition: PoolUtils.h:30
Execute function without arguments.
Definition: PoolUtils.h:29
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
Ask for a kFuncResult/kProcResult.
Definition: PoolUtils.h:34
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
a ProcTree method is being executed and each worker will process a different file ...
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
Definition: TProcPool.cxx:99
a Map method with no arguments is being executed
a Map method with arguments is being executed
void Reset()
Reset TProcPool's state.
Definition: TProcPool.cxx:87
unsigned fNProcessed
number of arguments already passed to the workers
Definition: TProcPool.h:89