Logo ROOT   6.14/05
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TError.h"
20 #include "TFileCollection.h"
21 #include "TFileInfo.h"
22 #include "THashList.h"
23 #include "TMPClient.h"
24 #include "ROOT/TExecutor.hxx"
25 #include "TMPWorkerExecutor.h"
26 #include <algorithm> //std::generate
27 #include <numeric> //std::iota
28 #include <string>
29 #include <type_traits> //std::result_of, std::enable_if
30 #include <functional> //std::reference_wrapper
31 #include <vector>
32 
33 namespace ROOT {
34 
35 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
36 public:
37  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
38  ~TProcessExecutor() = default;
39  //it doesn't make sense for a TProcessExecutor to be copied
40  TProcessExecutor(const TProcessExecutor &) = delete;
41  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
42 
43  // Map
45  template<class F, class Cond = noReferenceCond<F>>
46  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
47  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
49  template<class F, class T, class Cond = noReferenceCond<F, T>>
50  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
51 
52  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
53  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
54 
56  template<class F, class R, class Cond = noReferenceCond<F>>
57  auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
58  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
59  auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
60 
62  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
63 
64 private:
65  template<class T> void Collect(std::vector<T> &reslist);
66  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
67 
68  void Reset();
70  void ReplyToIdle(TSocket *s);
71 
72  unsigned fNProcessed; ///< number of arguments already passed to the workers
73  unsigned fNToProcess; ///< total number of arguments to pass to the workers
74 
75  /// A collection of the types of tasks that TProcessExecutor can execute.
76  /// It is used to interpret in the right way and properly reply to the
77  /// messages received (see, for example, TProcessExecutor::HandleInput)
78  enum class ETask : unsigned char {
79  kNoTask, ///< no task is being executed
80  kMap, ///< a Map method with no arguments is being executed
81  kMapWithArg, ///< a Map method with arguments is being executed
82  kMapRed, ///< a MapReduce method with no arguments is being executed
83  kMapRedWithArg ///< a MapReduce method with arguments is being executed
84  };
85 
86  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
87 };
88 
89 
90 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
91 
92 //////////////////////////////////////////////////////////////////////////
93 /// Execute func (with no arguments) nTimes in parallel.
94 /// A vector containg executions' results is returned.
95 /// Functions that take more than zero arguments can be executed (with
96 /// fixed arguments) by wrapping them in a lambda or with std::bind.
97 template<class F, class Cond>
99 {
100  using retType = decltype(func());
101  //prepare environment
102  Reset();
104 
105  //fork max(nTimes, fNWorkers) times
106  unsigned oldNWorkers = GetNWorkers();
107  if (nTimes < oldNWorkers)
108  SetNWorkers(nTimes);
109  TMPWorkerExecutor<F> worker(func);
110  bool ok = Fork(worker);
111  SetNWorkers(oldNWorkers);
112  if (!ok)
113  {
114  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
115  return std::vector<retType>();
116  }
117 
118  //give out tasks
119  fNToProcess = nTimes;
120  std::vector<retType> reslist;
121  reslist.reserve(fNToProcess);
122  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
123 
124  //collect results, give out other tasks if needed
125  Collect(reslist);
126 
127  //clean-up and return
128  ReapWorkers();
130  return reslist;
131 }
132 
133 //////////////////////////////////////////////////////////////////////////
134 /// Execute func in parallel, taking an element of an
135 /// std::vector as argument.
136 /// A vector containg executions' results is returned.
137 // actual implementation of the Map method. all other calls with arguments eventually
138 // call this one
139 template<class F, class T, class Cond>
141 {
142  //check whether func is callable
143  using retType = decltype(func(args.front()));
144  //prepare environment
145  Reset();
147 
148  //fork max(args.size(), fNWorkers) times
149  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
150  unsigned oldNWorkers = GetNWorkers();
151  if (args.size() < oldNWorkers)
152  SetNWorkers(args.size());
153  TMPWorkerExecutor<F, T> worker(func, args);
154  bool ok = Fork(worker);
155  SetNWorkers(oldNWorkers);
156  if (!ok)
157  {
158  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
159  return std::vector<retType>();
160  }
161 
162  //give out tasks
163  fNToProcess = args.size();
164  std::vector<retType> reslist;
165  reslist.reserve(fNToProcess);
166  std::vector<unsigned> range(fNToProcess);
167  std::iota(range.begin(), range.end(), 0);
169 
170  //collect results, give out other tasks if needed
171  Collect(reslist);
172 
173  //clean-up and return
174  ReapWorkers();
176  return reslist;
177 }
178 
179 //////////////////////////////////////////////////////////////////////////
180 /// Execute func in parallel, taking an element of a
181 /// sequence as argument.
182 /// A vector containg executions' results is returned.
183 template<class F, class INTEGER, class Cond>
185 {
186  std::vector<INTEGER> vargs(args.size());
187  std::copy(args.begin(), args.end(), vargs.begin());
188  const auto &reslist = Map(func, vargs);
189  return reslist;
190 }
191 
192 //////////////////////////////////////////////////////////////////////////
193 /// This method behaves just like Map, but an additional redfunc function
194 /// must be provided. redfunc is applied to the vector Map would return and
195 /// must return the same type as func. In practice, redfunc can be used to
196 /// "squash" the vector returned by Map into a single object by merging,
197 /// adding, mixing the elements of the vector.
198 template<class F, class R, class Cond>
199 auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
200 {
201  using retType = decltype(func());
202  //prepare environment
203  Reset();
205 
206  //fork max(nTimes, fNWorkers) times
207  unsigned oldNWorkers = GetNWorkers();
208  if (nTimes < oldNWorkers)
209  SetNWorkers(nTimes);
210  TMPWorkerExecutor<F, void, R> worker(func, redfunc);
211  bool ok = Fork(worker);
212  SetNWorkers(oldNWorkers);
213  if (!ok) {
214  std::cerr << "[E][C] Could not fork. Aborting operation\n";
215  return retType();
216  }
217 
218  //give workers their first task
219  fNToProcess = nTimes;
220  std::vector<retType> reslist;
221  reslist.reserve(fNToProcess);
223 
224  //collect results/give workers their next task
225  Collect(reslist);
226 
227  //clean-up and return
228  ReapWorkers();
230  return redfunc(reslist);
231 }
232 
233 //////////////////////////////////////////////////////////////////////////
234 /// This method behaves just like Map, but an additional redfunc function
235 /// must be provided. redfunc is applied to the vector Map would return and
236 /// must return the same type as func. In practice, redfunc can be used to
237 /// "squash" the vector returned by Map into a single object by merging,
238 /// adding, mixing the elements of the vector.
239 template<class F, class T, class R, class Cond>
240 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
241 {
242 
243  using retType = decltype(func(args.front()));
244  //prepare environment
245  Reset();
247 
248  //fork max(args.size(), fNWorkers) times
249  unsigned oldNWorkers = GetNWorkers();
250  if (args.size() < oldNWorkers)
251  SetNWorkers(args.size());
252  TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
253  bool ok = Fork(worker);
254  SetNWorkers(oldNWorkers);
255  if (!ok) {
256  std::cerr << "[E][C] Could not fork. Aborting operation\n";
257  return decltype(func(args.front()))();
258  }
259 
260  //give workers their first task
261  fNToProcess = args.size();
262  std::vector<retType> reslist;
263  reslist.reserve(fNToProcess);
264  std::vector<unsigned> range(fNToProcess);
265  std::iota(range.begin(), range.end(), 0);
267 
268  //collect results/give workers their next task
269  Collect(reslist);
270 
271  ReapWorkers();
273  return Reduce(reslist, redfunc);
274 }
275 
276 //////////////////////////////////////////////////////////////////////////
277 /// "Reduce" an std::vector into a single object by passing a
278 /// function as the second argument defining the reduction operation.
279 template<class T, class R>
280 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
281 {
282  // check we can apply reduce to objs
283  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
284  return redfunc(objs);
285 }
286 
287 //////////////////////////////////////////////////////////////////////////
288 /// Handle message and reply to the worker
289 template<class T>
290 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
291 {
292  unsigned code = msg.first;
293  if (code == MPCode::kFuncResult) {
294  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
296  } else if (code == MPCode::kIdling) {
297  ReplyToIdle(s);
298  } else if(code == MPCode::kProcResult) {
299  if(msg.second != nullptr)
300  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
302  } else if(code == MPCode::kProcError) {
303  const char *str = ReadBuffer<const char*>(msg.second.get());
304  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
305  "Continuing execution ignoring these entries.", str);
306  ReplyToIdle(s);
307  delete [] str;
308  } else {
309  // UNKNOWN CODE
310  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
311  }
312 }
313 
314 //////////////////////////////////////////////////////////////////////////
315 /// Listen for messages sent by the workers and call the appropriate handler function.
316 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
317 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
318 template<class T>
319 void TProcessExecutor::Collect(std::vector<T> &reslist)
320 {
321  TMonitor &mon = GetMonitor();
322  mon.ActivateAll();
323  while (mon.GetActive() > 0) {
324  TSocket *s = mon.Select();
325  MPCodeBufPair msg = MPRecv(s);
326  if (msg.first == MPCode::kRecvError) {
327  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
328  Remove(s);
329  } else if (msg.first < 1000)
330  HandlePoolCode(msg, s, reslist);
331  else
332  HandleMPCode(msg, s);
333  }
334 }
335 
336 } // ROOT namespace
337 
338 #endif
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
This class works together with TProcessExecutor to allow the execution of functions in server process...
double T(double x)
Definition: ChebyshevPol.h:34
unsigned GetNWorkers() const
Definition: TMPClient.h:40
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error while reading from the socket.
Definition: MPCode.h:51
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
Definition: TExecutor.hxx:61
unsigned fNProcessed
number of arguments already passed to the workers
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
void Reset()
Reset TProcessExecutor&#39;s state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
a MapReduce method with arguments is being executed
#define F(x, y, z)
The message contains the result of a function execution.
Definition: MPCode.h:33
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Execute function with the argument contained in the message.
Definition: MPCode.h:32
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
a MapReduce method with no arguments is being executed
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:248
Tell the client there was an error while processing.
Definition: MPCode.h:44
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
Definition: MPCode.h:35
const Int_t n
Definition: legend1.C:16
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:308
TMonitor & GetMonitor()
Definition: TMPClient.h:36
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
Definition: MPCode.h:31
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...