Logo ROOT   6.10/09
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TChain.h"
20 #include "TChainElement.h"
21 #include "TError.h"
22 #include "TFileCollection.h"
23 #include "TFileInfo.h"
24 #include "THashList.h"
25 #include "TMPClient.h"
26 #include "ROOT/TExecutor.hxx"
27 #include "TMPWorkerExecutor.h"
28 #include <algorithm> //std::generate
29 #include <numeric> //std::iota
30 #include <string>
31 #include <type_traits> //std::result_of, std::enable_if
32 #include <functional> //std::reference_wrapper
33 #include <vector>
34 
35 namespace ROOT {
36 
37 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
38 public:
39  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
40  ~TProcessExecutor() = default;
41  //it doesn't make sense for a TProcessExecutor to be copied
42  TProcessExecutor(const TProcessExecutor &) = delete;
43  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
44 
45  // Map
47  template<class F, class Cond = noReferenceCond<F>>
48  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
49  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
51  template<class F, class T, class Cond = noReferenceCond<F, T>>
52  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
53 
54  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
55  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
56 
58  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
59 
60 private:
61  template<class T> void Collect(std::vector<T> &reslist);
62  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
63 
64  void Reset();
65  void ReplyToFuncResult(TSocket *s);
66  void ReplyToIdle(TSocket *s);
67 
68  unsigned fNProcessed; ///< number of arguments already passed to the workers
69  unsigned fNToProcess; ///< total number of arguments to pass to the workers
70 
71  /// A collection of the types of tasks that TProcessExecutor can execute.
72  /// It is used to interpret in the right way and properly reply to the
73  /// messages received (see, for example, TProcessExecutor::HandleInput)
74  enum class ETask : unsigned char {
75  kNoTask, ///< no task is being executed
76  kMap, ///< a Map method with no arguments is being executed
77  kMapWithArg ///< a Map method with arguments is being executed
78  };
79 
80  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
81 };
82 
83 
84 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
85 
86 //////////////////////////////////////////////////////////////////////////
87 /// Execute func (with no arguments) nTimes in parallel.
88 /// A vector containg executions' results is returned.
89 /// Functions that take more than zero arguments can be executed (with
90 /// fixed arguments) by wrapping them in a lambda or with std::bind.
91 template<class F, class Cond>
93 {
94  using retType = decltype(func());
95  //prepare environment
96  Reset();
98 
99  //fork max(nTimes, fNWorkers) times
100  unsigned oldNWorkers = GetNWorkers();
101  if (nTimes < oldNWorkers)
102  SetNWorkers(nTimes);
103  TMPWorkerExecutor<F> worker(func);
104  bool ok = Fork(worker);
105  SetNWorkers(oldNWorkers);
106  if (!ok)
107  {
108  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
109  return std::vector<retType>();
110  }
111 
112  //give out tasks
113  fNToProcess = nTimes;
114  std::vector<retType> reslist;
115  reslist.reserve(fNToProcess);
116  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
117 
118  //collect results, give out other tasks if needed
119  Collect(reslist);
120 
121  //clean-up and return
122  ReapWorkers();
124  return reslist;
125 }
126 
127 //////////////////////////////////////////////////////////////////////////
128 /// Execute func in parallel, taking an element of an
129 /// std::vector as argument.
130 /// A vector containg executions' results is returned.
131 // actual implementation of the Map method. all other calls with arguments eventually
132 // call this one
133 template<class F, class T, class Cond>
135 {
136  //check whether func is callable
137  using retType = decltype(func(args.front()));
138  //prepare environment
139  Reset();
141 
142  //fork max(args.size(), fNWorkers) times
143  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
144  unsigned oldNWorkers = GetNWorkers();
145  if (args.size() < oldNWorkers)
146  SetNWorkers(args.size());
147  TMPWorkerExecutor<F, T> worker(func, args);
148  bool ok = Fork(worker);
149  SetNWorkers(oldNWorkers);
150  if (!ok)
151  {
152  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
153  return std::vector<retType>();
154  }
155 
156  //give out tasks
157  fNToProcess = args.size();
158  std::vector<retType> reslist;
159  reslist.reserve(fNToProcess);
160  std::vector<unsigned> range(fNToProcess);
161  std::iota(range.begin(), range.end(), 0);
163 
164  //collect results, give out other tasks if needed
165  Collect(reslist);
166 
167  //clean-up and return
168  ReapWorkers();
170  return reslist;
171 }
172 
173 //////////////////////////////////////////////////////////////////////////
174 /// Execute func in parallel, taking an element of a
175 /// sequence as argument.
176 /// A vector containg executions' results is returned.
177 template<class F, class INTEGER, class Cond>
179 {
180  std::vector<INTEGER> vargs(args.size());
181  std::copy(args.begin(), args.end(), vargs.begin());
182  const auto &reslist = Map(func, vargs);
183  return reslist;
184 }
185 
186 //////////////////////////////////////////////////////////////////////////
187 /// "Reduce" an std::vector into a single object by passing a
188 /// function as the second argument defining the reduction operation.
189 template<class T, class R>
190 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
191 {
192  // check we can apply reduce to objs
193  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
194  return redfunc(objs);
195 }
196 
197 //////////////////////////////////////////////////////////////////////////
198 /// Handle message and reply to the worker
199 template<class T>
200 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
201 {
202  unsigned code = msg.first;
203  if (code == MPCode::kFuncResult) {
204  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
206  } else if (code == MPCode::kIdling) {
207  ReplyToIdle(s);
208  } else if(code == MPCode::kProcResult) {
209  if(msg.second != nullptr)
210  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
212  } else if(code == MPCode::kProcError) {
213  const char *str = ReadBuffer<const char*>(msg.second.get());
214  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
215  "Continuing execution ignoring these entries.", str);
216  ReplyToIdle(s);
217  delete [] str;
218  } else {
219  // UNKNOWN CODE
220  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
221  }
222 }
223 
224 //////////////////////////////////////////////////////////////////////////
225 /// Listen for messages sent by the workers and call the appropriate handler function.
226 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
227 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
228 template<class T>
229 void TProcessExecutor::Collect(std::vector<T> &reslist)
230 {
231  TMonitor &mon = GetMonitor();
232  mon.ActivateAll();
233  while (mon.GetActive() > 0) {
234  TSocket *s = mon.Select();
235  MPCodeBufPair msg = MPRecv(s);
236  if (msg.first == MPCode::kRecvError) {
237  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
238  Remove(s);
239  } else if (msg.first < 1000)
240  HandlePoolCode(msg, s, reslist);
241  else
242  HandleMPCode(msg, s);
243  }
244 }
245 
246 } // ROOT namespace
247 
248 #endif
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
This class works together with TProcessExecutor to allow the execution of functions in server process...
double T(double x)
Definition: ChebyshevPol.h:34
unsigned GetNWorkers() const
Definition: TMPClient.h:40
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error while reading from the socket.
Definition: MPCode.h:51
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
Definition: TExecutor.hxx:61
unsigned fNProcessed
number of arguments already passed to the workers
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
void Reset()
Reset TProcessExecutor&#39;s state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
#define F(x, y, z)
The message contains the result of a function execution.
Definition: MPCode.h:33
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Execute function with the argument contained in the message.
Definition: MPCode.h:32
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
double func(double *x, double *p)
Definition: stressTF1.cxx:213
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:248
Tell the client there was an error while processing.
Definition: MPCode.h:44
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
Definition: MPCode.h:35
const Int_t n
Definition: legend1.C:16
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:308
TMonitor & GetMonitor()
Definition: TMPClient.h:36
unsigned GetNWorkers() const
TRandom3 R
a TMatrixD.
Definition: testIO.cxx:28
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
Definition: MPCode.h:31
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...