Logo ROOT   6.12/07
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TChain.h"
20 #include "TChainElement.h"
21 #include "TError.h"
22 #include "TFileCollection.h"
23 #include "TFileInfo.h"
24 #include "THashList.h"
25 #include "TMPClient.h"
26 #include "ROOT/TExecutor.hxx"
27 #include "TMPWorkerExecutor.h"
28 #include <algorithm> //std::generate
29 #include <numeric> //std::iota
30 #include <string>
31 #include <type_traits> //std::result_of, std::enable_if
32 #include <functional> //std::reference_wrapper
33 #include <vector>
34 
35 namespace ROOT {
36 
37 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
38 public:
39  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
40  ~TProcessExecutor() = default;
41  //it doesn't make sense for a TProcessExecutor to be copied
42  TProcessExecutor(const TProcessExecutor &) = delete;
43  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
44 
45  // Map
47  template<class F, class Cond = noReferenceCond<F>>
48  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
49  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
51  template<class F, class T, class Cond = noReferenceCond<F, T>>
52  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
53 
54  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
55  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
56 
58  template<class F, class R, class Cond = noReferenceCond<F>>
59  auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
60  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
61  auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
62 
64  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
65 
66 private:
67  template<class T> void Collect(std::vector<T> &reslist);
68  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
69 
70  void Reset();
72  void ReplyToIdle(TSocket *s);
73 
74  unsigned fNProcessed; ///< number of arguments already passed to the workers
75  unsigned fNToProcess; ///< total number of arguments to pass to the workers
76 
77  /// A collection of the types of tasks that TProcessExecutor can execute.
78  /// It is used to interpret in the right way and properly reply to the
79  /// messages received (see, for example, TProcessExecutor::HandleInput)
80  enum class ETask : unsigned char {
81  kNoTask, ///< no task is being executed
82  kMap, ///< a Map method with no arguments is being executed
83  kMapWithArg, ///< a Map method with arguments is being executed
84  kMapRed, ///< a MapReduce method with no arguments is being executed
85  kMapRedWithArg ///< a MapReduce method with arguments is being executed
86  };
87 
88  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
89 };
90 
91 
92 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
93 
94 //////////////////////////////////////////////////////////////////////////
95 /// Execute func (with no arguments) nTimes in parallel.
96 /// A vector containg executions' results is returned.
97 /// Functions that take more than zero arguments can be executed (with
98 /// fixed arguments) by wrapping them in a lambda or with std::bind.
99 template<class F, class Cond>
101 {
102  using retType = decltype(func());
103  //prepare environment
104  Reset();
106 
107  //fork max(nTimes, fNWorkers) times
108  unsigned oldNWorkers = GetNWorkers();
109  if (nTimes < oldNWorkers)
110  SetNWorkers(nTimes);
111  TMPWorkerExecutor<F> worker(func);
112  bool ok = Fork(worker);
113  SetNWorkers(oldNWorkers);
114  if (!ok)
115  {
116  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
117  return std::vector<retType>();
118  }
119 
120  //give out tasks
121  fNToProcess = nTimes;
122  std::vector<retType> reslist;
123  reslist.reserve(fNToProcess);
124  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
125 
126  //collect results, give out other tasks if needed
127  Collect(reslist);
128 
129  //clean-up and return
130  ReapWorkers();
132  return reslist;
133 }
134 
135 //////////////////////////////////////////////////////////////////////////
136 /// Execute func in parallel, taking an element of an
137 /// std::vector as argument.
138 /// A vector containg executions' results is returned.
139 // actual implementation of the Map method. all other calls with arguments eventually
140 // call this one
141 template<class F, class T, class Cond>
143 {
144  //check whether func is callable
145  using retType = decltype(func(args.front()));
146  //prepare environment
147  Reset();
149 
150  //fork max(args.size(), fNWorkers) times
151  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
152  unsigned oldNWorkers = GetNWorkers();
153  if (args.size() < oldNWorkers)
154  SetNWorkers(args.size());
155  TMPWorkerExecutor<F, T> worker(func, args);
156  bool ok = Fork(worker);
157  SetNWorkers(oldNWorkers);
158  if (!ok)
159  {
160  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
161  return std::vector<retType>();
162  }
163 
164  //give out tasks
165  fNToProcess = args.size();
166  std::vector<retType> reslist;
167  reslist.reserve(fNToProcess);
168  std::vector<unsigned> range(fNToProcess);
169  std::iota(range.begin(), range.end(), 0);
171 
172  //collect results, give out other tasks if needed
173  Collect(reslist);
174 
175  //clean-up and return
176  ReapWorkers();
178  return reslist;
179 }
180 
181 //////////////////////////////////////////////////////////////////////////
182 /// Execute func in parallel, taking an element of a
183 /// sequence as argument.
184 /// A vector containg executions' results is returned.
185 template<class F, class INTEGER, class Cond>
187 {
188  std::vector<INTEGER> vargs(args.size());
189  std::copy(args.begin(), args.end(), vargs.begin());
190  const auto &reslist = Map(func, vargs);
191  return reslist;
192 }
193 
194 //////////////////////////////////////////////////////////////////////////
195 /// This method behaves just like Map, but an additional redfunc function
196 /// must be provided. redfunc is applied to the vector Map would return and
197 /// must return the same type as func. In practice, redfunc can be used to
198 /// "squash" the vector returned by Map into a single object by merging,
199 /// adding, mixing the elements of the vector.
200 template<class F, class R, class Cond>
201 auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
202 {
203  using retType = decltype(func());
204  //prepare environment
205  Reset();
207 
208  //fork max(nTimes, fNWorkers) times
209  unsigned oldNWorkers = GetNWorkers();
210  if (nTimes < oldNWorkers)
211  SetNWorkers(nTimes);
212  TMPWorkerExecutor<F, void, R> worker(func, redfunc);
213  bool ok = Fork(worker);
214  SetNWorkers(oldNWorkers);
215  if (!ok) {
216  std::cerr << "[E][C] Could not fork. Aborting operation\n";
217  return retType();
218  }
219 
220  //give workers their first task
221  fNToProcess = nTimes;
222  std::vector<retType> reslist;
223  reslist.reserve(fNToProcess);
225 
226  //collect results/give workers their next task
227  Collect(reslist);
228 
229  //clean-up and return
230  ReapWorkers();
232  return redfunc(reslist);
233 }
234 
235 //////////////////////////////////////////////////////////////////////////
236 /// This method behaves just like Map, but an additional redfunc function
237 /// must be provided. redfunc is applied to the vector Map would return and
238 /// must return the same type as func. In practice, redfunc can be used to
239 /// "squash" the vector returned by Map into a single object by merging,
240 /// adding, mixing the elements of the vector.
241 template<class F, class T, class R, class Cond>
242 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
243 {
244 
245  using retType = decltype(func(args.front()));
246  //prepare environment
247  Reset();
249 
250  //fork max(args.size(), fNWorkers) times
251  unsigned oldNWorkers = GetNWorkers();
252  if (args.size() < oldNWorkers)
253  SetNWorkers(args.size());
254  TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
255  bool ok = Fork(worker);
256  SetNWorkers(oldNWorkers);
257  if (!ok) {
258  std::cerr << "[E][C] Could not fork. Aborting operation\n";
259  return decltype(func(args.front()))();
260  }
261 
262  //give workers their first task
263  fNToProcess = args.size();
264  std::vector<retType> reslist;
265  reslist.reserve(fNToProcess);
266  std::vector<unsigned> range(fNToProcess);
267  std::iota(range.begin(), range.end(), 0);
269 
270  //collect results/give workers their next task
271  Collect(reslist);
272 
273  ReapWorkers();
275  return Reduce(reslist, redfunc);
276 }
277 
278 //////////////////////////////////////////////////////////////////////////
279 /// "Reduce" an std::vector into a single object by passing a
280 /// function as the second argument defining the reduction operation.
281 template<class T, class R>
282 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
283 {
284  // check we can apply reduce to objs
285  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
286  return redfunc(objs);
287 }
288 
289 //////////////////////////////////////////////////////////////////////////
290 /// Handle message and reply to the worker
291 template<class T>
292 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
293 {
294  unsigned code = msg.first;
295  if (code == MPCode::kFuncResult) {
296  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
298  } else if (code == MPCode::kIdling) {
299  ReplyToIdle(s);
300  } else if(code == MPCode::kProcResult) {
301  if(msg.second != nullptr)
302  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
304  } else if(code == MPCode::kProcError) {
305  const char *str = ReadBuffer<const char*>(msg.second.get());
306  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
307  "Continuing execution ignoring these entries.", str);
308  ReplyToIdle(s);
309  delete [] str;
310  } else {
311  // UNKNOWN CODE
312  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
313  }
314 }
315 
316 //////////////////////////////////////////////////////////////////////////
317 /// Listen for messages sent by the workers and call the appropriate handler function.
318 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
319 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
320 template<class T>
321 void TProcessExecutor::Collect(std::vector<T> &reslist)
322 {
323  TMonitor &mon = GetMonitor();
324  mon.ActivateAll();
325  while (mon.GetActive() > 0) {
326  TSocket *s = mon.Select();
327  MPCodeBufPair msg = MPRecv(s);
328  if (msg.first == MPCode::kRecvError) {
329  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
330  Remove(s);
331  } else if (msg.first < 1000)
332  HandlePoolCode(msg, s, reslist);
333  else
334  HandleMPCode(msg, s);
335  }
336 }
337 
338 } // ROOT namespace
339 
340 #endif
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
This class works together with TProcessExecutor to allow the execution of functions in server process...
double T(double x)
Definition: ChebyshevPol.h:34
unsigned GetNWorkers() const
Definition: TMPClient.h:40
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error while reading from the socket.
Definition: MPCode.h:51
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
Definition: TExecutor.hxx:61
unsigned fNProcessed
number of arguments already passed to the workers
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
void Reset()
Reset TProcessExecutor&#39;s state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
a MapReduce method with arguments is being executed
#define F(x, y, z)
The message contains the result of a function execution.
Definition: MPCode.h:33
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Execute function with the argument contained in the message.
Definition: MPCode.h:32
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
a MapReduce method with no arguments is being executed
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:248
Tell the client there was an error while processing.
Definition: MPCode.h:44
constexpr Double_t R()
Definition: TMath.h:213
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
Definition: MPCode.h:35
const Int_t n
Definition: legend1.C:16
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:308
TMonitor & GetMonitor()
Definition: TMPClient.h:36
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
Definition: MPCode.h:31
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...