Logo ROOT  
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TError.h"
20 #include "TFileCollection.h"
21 #include "TFileInfo.h"
22 #include "THashList.h"
23 #include "TMPClient.h"
24 #include "ROOT/TExecutor.hxx"
25 #include "TMPWorkerExecutor.h"
26 #include <algorithm> //std::generate
27 #include <numeric> //std::iota
28 #include <string>
29 #include <type_traits> //std::result_of, std::enable_if
30 #include <functional> //std::reference_wrapper
31 #include <vector>
32 
33 namespace ROOT {
34 
35 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
36 public:
37  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
38  ~TProcessExecutor() = default;
39  //it doesn't make sense for a TProcessExecutor to be copied
42 
43  // Map
45  template<class F, class Cond = noReferenceCond<F>>
46  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
47  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
48  auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
49  template<class F, class T, class Cond = noReferenceCond<F, T>>
50  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
51 
52  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
53  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
54 
56  template<class F, class R, class Cond = noReferenceCond<F>>
57  auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
58  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
59  auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
60 
62  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
63 
64 private:
65  template<class T> void Collect(std::vector<T> &reslist);
66  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
67 
68  void Reset();
70  void ReplyToIdle(TSocket *s);
71 
72  unsigned fNProcessed; ///< number of arguments already passed to the workers
73  unsigned fNToProcess; ///< total number of arguments to pass to the workers
74 
75  /// A collection of the types of tasks that TProcessExecutor can execute.
76  /// It is used to interpret in the right way and properly reply to the
77  /// messages received (see, for example, TProcessExecutor::HandleInput)
78  enum class ETask : unsigned char {
79  kNoTask, ///< no task is being executed
80  kMap, ///< a Map method with no arguments is being executed
81  kMapWithArg, ///< a Map method with arguments is being executed
82  kMapRed, ///< a MapReduce method with no arguments is being executed
83  kMapRedWithArg ///< a MapReduce method with arguments is being executed
84  };
85 
86  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
87 };
88 
89 
90 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
91 
92 //////////////////////////////////////////////////////////////////////////
93 /// Execute func (with no arguments) nTimes in parallel.
94 /// A vector containg executions' results is returned.
95 /// Functions that take more than zero arguments can be executed (with
96 /// fixed arguments) by wrapping them in a lambda or with std::bind.
97 template<class F, class Cond>
98 auto TProcessExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>
99 {
100  using retType = decltype(func());
101  //prepare environment
102  Reset();
103  fTaskType = ETask::kMap;
104 
105  //fork max(nTimes, fNWorkers) times
106  unsigned oldNWorkers = GetNWorkers();
107  if (nTimes < oldNWorkers)
108  SetNWorkers(nTimes);
109  TMPWorkerExecutor<F> worker(func);
110  bool ok = Fork(worker);
111  SetNWorkers(oldNWorkers);
112  if (!ok)
113  {
114  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
115  return std::vector<retType>();
116  }
117 
118  //give out tasks
119  fNToProcess = nTimes;
120  std::vector<retType> reslist;
121  reslist.reserve(fNToProcess);
122  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
123 
124  //collect results, give out other tasks if needed
125  Collect(reslist);
126 
127  //clean-up and return
128  ReapWorkers();
129  fTaskType = ETask::kNoTask;
130  return reslist;
131 }
132 
133 //////////////////////////////////////////////////////////////////////////
134 /// Execute func in parallel, taking an element of an
135 /// std::vector as argument.
136 /// A vector containg executions' results is returned.
137 // actual implementation of the Map method. all other calls with arguments eventually
138 // call this one
139 template<class F, class T, class Cond>
140 auto TProcessExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
141 {
142  //check whether func is callable
143  using retType = decltype(func(args.front()));
144  //prepare environment
145  Reset();
146  fTaskType = ETask::kMapWithArg;
147 
148  //fork max(args.size(), fNWorkers) times
149  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
150  unsigned oldNWorkers = GetNWorkers();
151  if (args.size() < oldNWorkers)
152  SetNWorkers(args.size());
153  TMPWorkerExecutor<F, T> worker(func, args);
154  bool ok = Fork(worker);
155  SetNWorkers(oldNWorkers);
156  if (!ok)
157  {
158  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
159  return std::vector<retType>();
160  }
161 
162  //give out tasks
163  fNToProcess = args.size();
164  std::vector<retType> reslist;
165  reslist.reserve(fNToProcess);
166  std::vector<unsigned> range(fNToProcess);
167  std::iota(range.begin(), range.end(), 0);
168  fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
169 
170  //collect results, give out other tasks if needed
171  Collect(reslist);
172 
173  //clean-up and return
174  ReapWorkers();
175  fTaskType = ETask::kNoTask;
176  return reslist;
177 }
178 
179 //////////////////////////////////////////////////////////////////////////
180 /// Execute func in parallel, taking an element of a
181 /// sequence as argument.
182 /// A vector containg executions' results is returned.
183 template<class F, class INTEGER, class Cond>
184 auto TProcessExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>
185 {
186  std::vector<INTEGER> vargs(args.size());
187  std::copy(args.begin(), args.end(), vargs.begin());
188  const auto &reslist = Map(func, vargs);
189  return reslist;
190 }
191 
192 //////////////////////////////////////////////////////////////////////////
193 /// This method behaves just like Map, but an additional redfunc function
194 /// must be provided. redfunc is applied to the vector Map would return and
195 /// must return the same type as func. In practice, redfunc can be used to
196 /// "squash" the vector returned by Map into a single object by merging,
197 /// adding, mixing the elements of the vector.
198 template<class F, class R, class Cond>
199 auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
200 {
201  using retType = decltype(func());
202  //prepare environment
203  Reset();
204  fTaskType= ETask::kMapRed;
205 
206  //fork max(nTimes, fNWorkers) times
207  unsigned oldNWorkers = GetNWorkers();
208  if (nTimes < oldNWorkers)
209  SetNWorkers(nTimes);
210  TMPWorkerExecutor<F, void, R> worker(func, redfunc);
211  bool ok = Fork(worker);
212  SetNWorkers(oldNWorkers);
213  if (!ok) {
214  std::cerr << "[E][C] Could not fork. Aborting operation\n";
215  return retType();
216  }
217 
218  //give workers their first task
219  fNToProcess = nTimes;
220  std::vector<retType> reslist;
221  reslist.reserve(fNToProcess);
222  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
223 
224  //collect results/give workers their next task
225  Collect(reslist);
226 
227  //clean-up and return
228  ReapWorkers();
229  fTaskType= ETask::kNoTask;
230  return redfunc(reslist);
231 }
232 
233 //////////////////////////////////////////////////////////////////////////
234 /// This method behaves just like Map, but an additional redfunc function
235 /// must be provided. redfunc is applied to the vector Map would return and
236 /// must return the same type as func. In practice, redfunc can be used to
237 /// "squash" the vector returned by Map into a single object by merging,
238 /// adding, mixing the elements of the vector.
239 template<class F, class T, class R, class Cond>
240 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
241 {
242 
243  using retType = decltype(func(args.front()));
244  //prepare environment
245  Reset();
246  fTaskType= ETask::kMapRedWithArg;
247 
248  //fork max(args.size(), fNWorkers) times
249  unsigned oldNWorkers = GetNWorkers();
250  if (args.size() < oldNWorkers)
251  SetNWorkers(args.size());
252  TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
253  bool ok = Fork(worker);
254  SetNWorkers(oldNWorkers);
255  if (!ok) {
256  std::cerr << "[E][C] Could not fork. Aborting operation\n";
257  return decltype(func(args.front()))();
258  }
259 
260  //give workers their first task
261  fNToProcess = args.size();
262  std::vector<retType> reslist;
263  reslist.reserve(fNToProcess);
264  std::vector<unsigned> range(fNToProcess);
265  std::iota(range.begin(), range.end(), 0);
266  fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
267 
268  //collect results/give workers their next task
269  Collect(reslist);
270 
271  ReapWorkers();
272  fTaskType= ETask::kNoTask;
273  return Reduce(reslist, redfunc);
274 }
275 
276 //////////////////////////////////////////////////////////////////////////
277 /// "Reduce" an std::vector into a single object by passing a
278 /// function as the second argument defining the reduction operation.
279 template<class T, class R>
280 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
281 {
282  // check we can apply reduce to objs
283  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
284  return redfunc(objs);
285 }
286 
287 //////////////////////////////////////////////////////////////////////////
288 /// Handle message and reply to the worker
289 template<class T>
290 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
291 {
292  unsigned code = msg.first;
293  if (code == MPCode::kFuncResult) {
294  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
296  } else if (code == MPCode::kIdling) {
297  ReplyToIdle(s);
298  } else if(code == MPCode::kProcResult) {
299  if(msg.second != nullptr)
300  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
302  } else if(code == MPCode::kProcError) {
303  const char *str = ReadBuffer<const char*>(msg.second.get());
304  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
305  "Continuing execution ignoring these entries.", str);
306  ReplyToIdle(s);
307  delete [] str;
308  } else {
309  // UNKNOWN CODE
310  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
311  }
312 }
313 
314 //////////////////////////////////////////////////////////////////////////
315 /// Listen for messages sent by the workers and call the appropriate handler function.
316 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
317 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
318 template<class T>
319 void TProcessExecutor::Collect(std::vector<T> &reslist)
320 {
321  TMonitor &mon = GetMonitor();
322  mon.ActivateAll();
323  while (mon.GetActive() > 0) {
324  TSocket *s = mon.Select();
325  MPCodeBufPair msg = MPRecv(s);
326  if (msg.first == MPCode::kRecvError) {
327  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
328  Remove(s);
329  } else if (msg.first < 1000)
330  HandlePoolCode(msg, s, reslist);
331  else
332  HandleMPCode(msg, s);
333  }
334 }
335 
336 } // ROOT namespace
337 
338 #endif
MPCode::kProcResult
@ kProcResult
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
n
const Int_t n
Definition: legend1.C:16
ROOT::TProcessExecutor::HandlePoolCode
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
Definition: TProcessExecutor.hxx:290
ROOT::TProcessExecutor::ETask
ETask
A collection of the types of tasks that TProcessExecutor can execute.
Definition: TProcessExecutor.hxx:78
MPSend
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
ROOT::TProcessExecutor::MapReduce
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
Definition: TProcessExecutor.hxx:199
TMPClient.h
ROOT::TExecutor
This class defines an interface to execute the same task multiple times in parallel,...
Definition: TExecutor.hxx:61
F
#define F(x, y, z)
ROOT::TProcessExecutor::SetNWorkers
void SetNWorkers(unsigned n)
Definition: TProcessExecutor.hxx:52
ROOT::TProcessExecutor::ReplyToIdle
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Definition: TProcessExecutor.cxx:125
MPCode::kIdling
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
TMPClient::Remove
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:300
ROOT::TProcessExecutor::Reset
void Reset()
Reset TProcessExecutor's state.
Definition: TProcessExecutor.cxx:96
MPCode.h
TGeant4Unit::s
static constexpr double s
Definition: TGeant4SystemOfUnits.h:162
ROOT::TProcessExecutor::ReplyToFuncResult
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
Definition: TProcessExecutor.cxx:107
ROOT::TProcessExecutor::Reduce
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...
Definition: TProcessExecutor.hxx:280
TMonitor
Definition: TMonitor.h:36
TMPWorkerExecutor
This class works together with TProcessExecutor to allow the execution of functions in server process...
Definition: TMPWorkerExecutor.h:77
TMPClient::GetMonitor
TMonitor & GetMonitor()
Definition: TMPClient.h:36
TMPClient::HandleMPCode
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:334
ROOT::TProcessExecutor::TProcessExecutor
TProcessExecutor(const TProcessExecutor &)=delete
TClassEdit::kMap
@ kMap
Definition: TClassEdit.h:98
R
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
TSocket
Definition: TSocket.h:41
ROOT::TProcessExecutor::fNProcessed
unsigned fNProcessed
number of arguments already passed to the workers
Definition: TProcessExecutor.hxx:72
ROOT::TProcessExecutor::ETask::kMapRed
@ kMapRed
a MapReduce method with no arguments is being executed
MPCode::kExecFunc
@ kExecFunc
Execute function without arguments.
Definition: MPCode.h:31
ROOT::TProcessExecutor::ETask::kMapWithArg
@ kMapWithArg
a Map method with arguments is being executed
TFileInfo.h
ROOT::TProcessExecutor::fTaskType
ETask fTaskType
the kind of task that is being executed, if any
Definition: TProcessExecutor.hxx:86
TMPWorkerExecutor< F, void, R >
Definition: TMPWorkerExecutor.h:127
ROOT::TProcessExecutor::fNToProcess
unsigned fNToProcess
total number of arguments to pass to the workers
Definition: TProcessExecutor.hxx:73
TMonitor::Select
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TMPClient::GetNWorkers
unsigned GetNWorkers() const
Definition: TMPClient.h:40
ROOT::TProcessExecutor::ETask::kMap
@ kMap
a Map method with no arguments is being executed
TFileCollection.h
PoolUtils.h
ROOT::TProcessExecutor::Map
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
Definition: TProcessExecutor.hxx:98
MPCode::kProcError
@ kProcError
Tell the client there was an error while processing.
Definition: MPCode.h:44
THashList.h
TMPClient
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
ROOT::TProcessExecutor::Collect
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
Definition: TProcessExecutor.hxx:319
MPCode::kRecvError
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
ROOT::TProcessExecutor
This class provides a simple interface to execute the same task multiple times in parallel,...
Definition: TProcessExecutor.hxx:35
ROOT::TProcessExecutor::GetNWorkers
unsigned GetNWorkers() const
Definition: TProcessExecutor.hxx:53
MPRecv
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
MPCode::kFuncResult
@ kFuncResult
The message contains the result of a function execution.
Definition: MPCode.h:33
ROOT::TProcessExecutor::ETask::kMapRedWithArg
@ kMapRedWithArg
a MapReduce method with arguments is being executed
TExecutor.hxx
ROOT::TProcessExecutor::ETask::kNoTask
@ kNoTask
no task is being executed
ROOT::TProcessExecutor::TProcessExecutor
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
Definition: TProcessExecutor.cxx:89
TMPClient::SetNWorkers
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
ROOT::Math::Chebyshev::T
double T(double x)
Definition: ChebyshevPol.h:34
MPCode::kShutdownOrder
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
TMonitor::GetActive
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
MPCodeBufPair
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:32
MPSendRecv.h
TMonitor::ActivateAll
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
ROOT::TSeq
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
type
int type
Definition: TGX11.cxx:121
TMPWorkerExecutor.h
ROOT::VecOps::Map
auto Map(Args &&... args) -> decltype(ROOT::Detail::VecOps::MapFromTuple(std::forward_as_tuple(args...), std::make_index_sequence< sizeof...(args) - 1 >()))
Create new collection applying a callable to the elements of the input collection.
Definition: RVec.hxx:909
ROOT
VSD Structures.
Definition: StringConv.hxx:21
ROOT::TProcessExecutor::~TProcessExecutor
~TProcessExecutor()=default
MPCode::kExecFuncWithArg
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition: MPCode.h:32
Error
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
TError.h
ROOT::TProcessExecutor::operator=
TProcessExecutor & operator=(const TProcessExecutor &)=delete