Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TProcessExecutor.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
19#include "ROOT/RConfig.hxx" //R__DEPRECATED
21#include "ROOT/TSeq.hxx"
22#include "TError.h"
23#include "TFileCollection.h"
24#include "TFileInfo.h"
25#include "THashList.h"
26#include "TMPClient.h"
27#include "TMPWorkerExecutor.h"
28
29#include <algorithm> //std::generate
30#include <numeric> //std::iota
31#include <string>
32#include <type_traits> //std::result_of, std::enable_if
33#include <functional> //std::reference_wrapper
34#include <vector>
35
36namespace ROOT {
37
38class TProcessExecutor : public TExecutorCRTP<TProcessExecutor>, private TMPClient {
40public:
41 explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
42 ~TProcessExecutor() = default;
43 //it doesn't make sense for a TProcessExecutor to be copied
46
47 // Map
48 //
50
51 // MapReduce
52 // Redefinition of the MapReduce classes of the base class, to adapt them to
53 // TProcessExecutor's logic
55 template<class F, class R, class Cond = noReferenceCond<F>>
56 auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
57 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
58 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
59 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
60 auto MapReduce(F func, const std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
61
62 // Reduce
63 //
65
67
68 /// \deprecated Use GetPoolSize()
69 unsigned GetNWorkers() const _R__DEPRECATED_626("Please use GetPoolSize instead") { return TMPClient::GetNWorkers(); }
70
71 //////////////////////////////////////////////////////////////////////////
72 /// \brief Return the number of pooled parallel workers.
73 ///
74 /// \return The number of workers in the pool.
75 unsigned GetPoolSize() const { return TMPClient::GetNWorkers(); }
76
77private:
78 // Implementation of the Map functions declared in the parent class (TExecutorCRTP)
79 //
80 template<class F, class Cond = noReferenceCond<F>>
81 auto MapImpl(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
82 template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
83 auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
84 template<class F, class T, class Cond = noReferenceCond<F, T>>
85 auto MapImpl(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
86 template<class F, class T, class Cond = noReferenceCond<F, T>>
87 auto MapImpl(F func, const std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
88
89 template<class T> void Collect(std::vector<T> &reslist);
90 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
91
92 void Reset();
94 void ReplyToIdle(TSocket *s);
95
96 unsigned fNProcessed; ///< number of arguments already passed to the workers
97 unsigned fNToProcess; ///< total number of arguments to pass to the workers
98
99 /// A collection of the types of tasks that TProcessExecutor can execute.
100 /// It is used to interpret in the right way and properly reply to the
101 /// messages received (see, for example, TProcessExecutor::HandleInput)
102 enum class ETask : unsigned char {
103 kNoTask, ///< no task is being executed
104 kMap, ///< a Map method with no arguments is being executed
105 kMapWithArg, ///< a Map method with arguments is being executed
106 kMapRed, ///< a MapReduce method with no arguments is being executed
107 kMapRedWithArg ///< a MapReduce method with arguments is being executed
108 };
109
110 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
111};
112
113
114/************ TEMPLATE METHODS IMPLEMENTATION ******************/
115
116//////////////////////////////////////////////////////////////////////////
117/// \brief Execute a function without arguments several times in parallel.
118/// Implementation of the Map method.
119///
120/// \copydetails TExecutorCRTP::Map(F func,unsigned nTimes)
121template<class F, class Cond>
122auto TProcessExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>
123{
124 using retType = decltype(func());
125 //prepare environment
126 Reset();
127 fTaskType = ETask::kMap;
128
129 //fork max(nTimes, fNWorkers) times
130 unsigned oldNWorkers = GetPoolSize();
131 if (nTimes < oldNWorkers)
132 SetNWorkers(nTimes);
133 TMPWorkerExecutor<F> worker(func);
134 bool ok = Fork(worker);
135 SetNWorkers(oldNWorkers);
136 if (!ok)
137 {
138 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
139 return std::vector<retType>();
140 }
141
142 //give out tasks
143 fNToProcess = nTimes;
144 std::vector<retType> reslist;
145 reslist.reserve(fNToProcess);
146 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
147
148 //collect results, give out other tasks if needed
149 Collect(reslist);
150
151 //clean-up and return
152 ReapWorkers();
153 fTaskType = ETask::kNoTask;
154 return reslist;
155}
156
157//////////////////////////////////////////////////////////////////////////
158/// \brief Execute a function over the elements of a vector in parallel
159/// Implementation of the Map method.
160///
161/// \copydetails TExecutorCRTP::Map(F func,std::vector<T> &args)
162template<class F, class T, class Cond>
163auto TProcessExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
164{
165 //check whether func is callable
166 using retType = decltype(func(args.front()));
167 //prepare environment
168 Reset();
169 fTaskType = ETask::kMapWithArg;
170
171 //fork max(args.size(), fNWorkers) times
172 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
173 unsigned oldNWorkers = GetPoolSize();
174 if (args.size() < oldNWorkers)
175 SetNWorkers(args.size());
176 TMPWorkerExecutor<F, T> worker(func, args);
177 bool ok = Fork(worker);
178 SetNWorkers(oldNWorkers);
179 if (!ok)
180 {
181 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
182 return std::vector<retType>();
183 }
184
185 //give out tasks
186 fNToProcess = args.size();
187 std::vector<retType> reslist;
188 reslist.reserve(fNToProcess);
189 std::vector<unsigned> range(fNToProcess);
190 std::iota(range.begin(), range.end(), 0);
191 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
192
193 //collect results, give out other tasks if needed
194 Collect(reslist);
195
196 //clean-up and return
197 ReapWorkers();
198 fTaskType = ETask::kNoTask;
199 return reslist;
200}
201
202//////////////////////////////////////////////////////////////////////////
203/// \brief Execute a function over the elements of an immutable vector in parallel
204/// Implementation of the Map method.
205///
206/// \copydetails TExecutorCRTP::Map(F func,const std::vector<T> &args)
207template<class F, class T, class Cond>
208auto TProcessExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
209{
210 //check whether func is callable
211 using retType = decltype(func(args.front()));
212 //prepare environment
213 Reset();
214 fTaskType = ETask::kMapWithArg;
215
216 //fork max(args.size(), fNWorkers) times
217 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
218 unsigned oldNWorkers = GetPoolSize();
219 if (args.size() < oldNWorkers)
220 SetNWorkers(args.size());
221 TMPWorkerExecutor<F, T> worker(func, args);
222 bool ok = Fork(worker);
223 SetNWorkers(oldNWorkers);
224 if (!ok)
225 {
226 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
227 return std::vector<retType>();
228 }
229
230 //give out tasks
231 fNToProcess = args.size();
232 std::vector<retType> reslist;
233 reslist.reserve(fNToProcess);
234 std::vector<unsigned> range(fNToProcess);
235 std::iota(range.begin(), range.end(), 0);
236 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
237
238 //collect results, give out other tasks if needed
239 Collect(reslist);
240
241 //clean-up and return
242 ReapWorkers();
243 fTaskType = ETask::kNoTask;
244 return reslist;
245}
246
247//////////////////////////////////////////////////////////////////////////
248/// \brief Execute a function over a sequence of indexes in parallel.
249/// Implementation of the Map method.
250///
251/// \copydetails TExecutorCRTP::Map(F func,ROOT::TSeq<INTEGER> args)
252template<class F, class INTEGER, class Cond>
253auto TProcessExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>
254{
255 std::vector<INTEGER> vargs(args.size());
256 std::copy(args.begin(), args.end(), vargs.begin());
257 const auto &reslist = Map(func, vargs);
258 return reslist;
259}
260
261//////////////////////////////////////////////////////////////////////////
262/// \brief Execute a function `nTimes` in parallel (Map) and accumulate the results into a single value (Reduce).
263/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,unsigned nTimes,R redfunc)
264template<class F, class R, class Cond>
265auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
266{
267 using retType = decltype(func());
268 //prepare environment
269 Reset();
270 fTaskType= ETask::kMapRed;
271
272 //fork max(nTimes, fNWorkers) times
273 unsigned oldNWorkers = GetPoolSize();
274 if (nTimes < oldNWorkers)
275 SetNWorkers(nTimes);
276 TMPWorkerExecutor<F, void, R> worker(func, redfunc);
277 bool ok = Fork(worker);
278 SetNWorkers(oldNWorkers);
279 if (!ok) {
280 std::cerr << "[E][C] Could not fork. Aborting operation\n";
281 return retType();
282 }
283
284 //give workers their first task
285 fNToProcess = nTimes;
286 std::vector<retType> reslist;
287 reslist.reserve(fNToProcess);
288 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
289
290 //collect results/give workers their next task
291 Collect(reslist);
292
293 //clean-up and return
294 ReapWorkers();
295 fTaskType= ETask::kNoTask;
296 return redfunc(reslist);
297}
298
299//////////////////////////////////////////////////////////////////////////
300/// \brief Execute a function in parallel over the elements of a vector (Map) and accumulate the results into a single value (Reduce).
301/// Benefits from partial reduction into `nChunks` intermediate results.
302///
303/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,std::vector<T> &args,R redfunc,unsigned nChunks).
304template<class F, class T, class R, class Cond>
305auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
306{
307
308 using retType = decltype(func(args.front()));
309 //prepare environment
310 Reset();
311 fTaskType= ETask::kMapRedWithArg;
312
313 //fork max(args.size(), fNWorkers) times
314 unsigned oldNWorkers = GetPoolSize();
315 if (args.size() < oldNWorkers)
316 SetNWorkers(args.size());
317 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
318 bool ok = Fork(worker);
319 SetNWorkers(oldNWorkers);
320 if (!ok) {
321 std::cerr << "[E][C] Could not fork. Aborting operation\n";
322 return decltype(func(args.front()))();
323 }
324
325 //give workers their first task
326 fNToProcess = args.size();
327 std::vector<retType> reslist;
328 reslist.reserve(fNToProcess);
329 std::vector<unsigned> range(fNToProcess);
330 std::iota(range.begin(), range.end(), 0);
331 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
332
333 //collect results/give workers their next task
334 Collect(reslist);
335
336 ReapWorkers();
337 fTaskType= ETask::kNoTask;
338 return Reduce(reslist, redfunc);
339}
340
341//////////////////////////////////////////////////////////////////////////
342/// \brief Execute a function in parallel over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce).
343/// Benefits from partial reduction into `nChunks` intermediate results.
344///
345/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,const std::vector<T> &args,R redfunc,unsigned nChunks).
346template<class F, class T, class R, class Cond>
347auto TProcessExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
348{
349
350 using retType = decltype(func(args.front()));
351 //prepare environment
352 Reset();
353 fTaskType= ETask::kMapRedWithArg;
354
355 //fork max(args.size(), fNWorkers) times
356 unsigned oldNWorkers = GetPoolSize();
357 if (args.size() < oldNWorkers)
358 SetNWorkers(args.size());
359 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
360 bool ok = Fork(worker);
361 SetNWorkers(oldNWorkers);
362 if (!ok) {
363 std::cerr << "[E][C] Could not fork. Aborting operation\n";
364 return decltype(func(args.front()))();
365 }
366
367 //give workers their first task
368 fNToProcess = args.size();
369 std::vector<retType> reslist;
370 reslist.reserve(fNToProcess);
371 std::vector<unsigned> range(fNToProcess);
372 std::iota(range.begin(), range.end(), 0);
373 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
374
375 //collect results/give workers their next task
376 Collect(reslist);
377
378 ReapWorkers();
379 fTaskType= ETask::kNoTask;
380 return Reduce(reslist, redfunc);
381}
382
383//////////////////////////////////////////////////////////////////////////
384/// Handle message and reply to the worker
385template<class T>
386void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
387{
388 unsigned code = msg.first;
389 if (code == MPCode::kFuncResult) {
390 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
392 } else if (code == MPCode::kIdling) {
393 ReplyToIdle(s);
394 } else if(code == MPCode::kProcResult) {
395 if(msg.second != nullptr)
396 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
398 } else if(code == MPCode::kProcError) {
399 const char *str = ReadBuffer<const char*>(msg.second.get());
400 Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
401 "Continuing execution ignoring these entries.", str);
402 ReplyToIdle(s);
403 delete [] str;
404 } else {
405 // UNKNOWN CODE
406 Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
407 }
408}
409
410//////////////////////////////////////////////////////////////////////////
411/// Listen for messages sent by the workers and call the appropriate handler function.
412/// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
413/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
414template<class T>
415void TProcessExecutor::Collect(std::vector<T> &reslist)
416{
417 TMonitor &mon = GetMonitor();
418 mon.ActivateAll();
419 while (mon.GetActive() > 0) {
420 TSocket *s = mon.Select();
421 MPCodeBufPair msg = MPRecv(s);
422 if (msg.first == MPCode::kRecvError) {
423 Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
424 Remove(s);
425 } else if (msg.first < 1000)
426 HandlePoolCode(msg, s, reslist);
427 else
428 HandleMPCode(msg, s);
429 }
430}
431
432} // ROOT namespace
433
434#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define _R__DEPRECATED_626(REASON)
Definition RConfig.hxx:512
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:187
int type
Definition TGX11.cxx:121
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute a function without arguments several times.
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
auto MapImpl(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute a function without arguments several times in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
unsigned GetNWorkers() const _R__DEPRECATED_626("Please use GetPoolSize instead")
TProcessExecutor(const TProcessExecutor &)=delete
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Definition TSeq.hxx:66
Base class for multiprocess applications' clients.
Definition TMPClient.h:23
unsigned GetNWorkers() const
Definition TMPClient.h:40
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition TMPClient.h:39
TMonitor & GetMonitor()
Definition TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
Definition TMonitor.cxx:268
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
const Int_t n
Definition legend1.C:16
#define F(x, y, z)
@ kRecvError
Error while reading from the socket.
Definition MPCode.h:51
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kFuncResult
The message contains the result of a function execution.
Definition MPCode.h:33
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition MPCode.h:32
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition MPCode.h:49
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kExecFunc
Execute function without arguments.
Definition MPCode.h:31
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...