Logo ROOT   6.08/07
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TProcessExecutor
13 #define ROOT_TProcessExecutor
14 
15 #include "MPCode.h"
16 #include "MPSendRecv.h"
17 #include "PoolUtils.h"
18 #include "TChain.h"
19 #include "TChainElement.h"
20 #include "TError.h"
21 #include "TFileCollection.h"
22 #include "TFileInfo.h"
23 #include "THashList.h"
24 #include "TMPClient.h"
25 #include "ROOT/TExecutor.hxx"
26 #include "TPoolProcessor.h"
27 #include "TPoolWorker.h"
28 #include "TSelector.h"
29 #include "TTreeReader.h"
30 #include <algorithm> //std::generate
31 #include <numeric> //std::iota
32 #include <string>
33 #include <type_traits> //std::result_of, std::enable_if
34 #include <functional> //std::reference_wrapper
35 #include <vector>
36 
37 namespace ROOT {
38 
39 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
40 public:
41  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
42  ~TProcessExecutor() = default;
43  //it doesn't make sense for a TProcessExecutor to be copied
44  TProcessExecutor(const TProcessExecutor &) = delete;
45  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
46 
47  // Map
48  template<class F, class Cond = noReferenceCond<F>>
49  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
50  /// \cond
51  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
53  template<class F, class T, class Cond = noReferenceCond<F, T>>
54  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
55  /// \endcond
56  using TExecutor<TProcessExecutor>::Map;
57 
58  // ProcTree
59  // these versions requires that procFunc returns a ptr to TObject or inheriting classes and takes a TTreeReader& (both enforced at compile-time)
60  template<class F> auto ProcTree(const std::vector<std::string>& fileNames, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
61  template<class F> auto ProcTree(const std::string& fileName, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
62  template<class F> auto ProcTree(TFileCollection& files, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
63  template<class F> auto ProcTree(TChain& files, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
64  template<class F> auto ProcTree(TTree& tree, F procFunc, ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
65  // these versions require a TSelector
66  TList* ProcTree(const std::vector<std::string>& fileNames, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
67  TList* ProcTree(const std::string &fileName, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
68  TList* ProcTree(TFileCollection& files, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
69  TList* ProcTree(TChain& files, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
70  TList* ProcTree(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0);
71 
72  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
73  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
74 
75  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
76 
77 private:
78  template<class T> void Collect(std::vector<T> &reslist);
79  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
80 
81  void FixLists(std::vector<TObject*> &lists);
82  void Reset();
83  void ReplyToFuncResult(TSocket *s);
84  void ReplyToIdle(TSocket *s);
85 
86  unsigned fNProcessed; ///< number of arguments already passed to the workers
87  unsigned fNToProcess; ///< total number of arguments to pass to the workers
88 
89  /// A collection of the types of tasks that TProcessExecutor can execute.
90  /// It is used to interpret in the right way and properly reply to the
91  /// messages received (see, for example, TProcessExecutor::HandleInput)
92  enum class ETask : unsigned char {
93  kNoTask, ///< no task is being executed
94  kMap, ///< a Map method with no arguments is being executed
95  kMapWithArg, ///< a Map method with arguments is being executed
96  kProcByRange, ///< a ProcTree method is being executed and each worker will process a certain range of each file
97  kProcByFile, ///< a ProcTree method is being executed and each worker will process a different file
98  };
99 
100  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
101 };
102 
103 
104 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
105 
106 //////////////////////////////////////////////////////////////////////////
107 /// Execute func (with no arguments) nTimes in parallel.
108 /// A vector containg executions' results is returned.
109 /// Functions that take more than zero arguments can be executed (with
110 /// fixed arguments) by wrapping them in a lambda or with std::bind.
111 template<class F, class Cond>
113 {
114  using retType = decltype(func());
115  //prepare environment
116  Reset();
118 
119  //fork max(nTimes, fNWorkers) times
120  unsigned oldNWorkers = GetNWorkers();
121  if (nTimes < oldNWorkers)
122  SetNWorkers(nTimes);
123  TPoolWorker<F> worker(func);
124  bool ok = Fork(worker);
125  SetNWorkers(oldNWorkers);
126  if (!ok)
127  {
128  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
129  return std::vector<retType>();
130  }
131 
132  //give out tasks
133  fNToProcess = nTimes;
134  std::vector<retType> reslist;
135  reslist.reserve(fNToProcess);
136  fNProcessed = Broadcast(PoolCode::kExecFunc, fNToProcess);
137 
138  //collect results, give out other tasks if needed
139  Collect(reslist);
140 
141  //clean-up and return
142  ReapWorkers();
144  return reslist;
145 }
146 
147 // tell doxygen to ignore this (\endcond closes the statement)
148 /// \cond
149 
150 // actual implementation of the Map method. all other calls with arguments eventually
151 // call this one
152 template<class F, class T, class Cond>
154 {
155  //check whether func is callable
156  using retType = decltype(func(args.front()));
157  //prepare environment
158  Reset();
160 
161  //fork max(args.size(), fNWorkers) times
162  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TPoolWorker moved its content away
163  unsigned oldNWorkers = GetNWorkers();
164  if (args.size() < oldNWorkers)
165  SetNWorkers(args.size());
166  TPoolWorker<F, T> worker(func, args);
167  bool ok = Fork(worker);
168  SetNWorkers(oldNWorkers);
169  if (!ok)
170  {
171  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
172  return std::vector<retType>();
173  }
174 
175  //give out tasks
176  fNToProcess = args.size();
177  std::vector<retType> reslist;
178  reslist.reserve(fNToProcess);
179  std::vector<unsigned> range(fNToProcess);
180  std::iota(range.begin(), range.end(), 0);
182 
183  //collect results, give out other tasks if needed
184  Collect(reslist);
185 
186  //clean-up and return
187  ReapWorkers();
189  return reslist;
190 }
191 
192 template<class F, class INTEGER, class Cond>
194 {
195  std::vector<INTEGER> vargs(args.size());
196  std::copy(args.begin(), args.end(), vargs.begin());
197  const auto &reslist = Map(func, vargs);
198  return reslist;
199 }
200 // tell doxygen to stop ignoring code
201 /// \endcond
202 
203 template<class T, class R>
204 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
205 {
206  // check we can apply reduce to objs
207  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
208  return redfunc(objs);
209 }
210 
211 template<class F>
212 auto TProcessExecutor::ProcTree(const std::vector<std::string>& fileNames, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
213 {
214  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
215  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
216 
217  //prepare environment
218  Reset();
219  unsigned nWorkers = GetNWorkers();
220 
221  //fork
222  TPoolProcessor<F> worker(procFunc, fileNames, treeName, nWorkers, nToProcess);
223  bool ok = Fork(worker);
224  if(!ok) {
225  Error("TProcessExecutor::ProcTree", "[E][C] Could not fork. Aborting operation.");
226  return nullptr;
227  }
228 
229  if(fileNames.size() < nWorkers) {
230  //TTree entry granularity. For each file, we divide entries equally between workers
232  //Tell workers to start processing entries
233  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
234  std::vector<unsigned> args(nWorkers);
235  std::iota(args.begin(), args.end(), 0);
237  if(fNProcessed < nWorkers)
238  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
239  } else {
240  //file granularity. each worker processes one whole file as a single task
242  fNToProcess = fileNames.size();
243  std::vector<unsigned> args(nWorkers);
244  std::iota(args.begin(), args.end(), 0);
246  if(fNProcessed < nWorkers)
247  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
248  }
249 
250  //collect results, distribute new tasks
251  std::vector<TObject*> reslist;
252  Collect(reslist);
253 
254  //merge
256  auto res = redfunc(reslist);
257 
258  //clean-up and return
259  ReapWorkers();
261  return static_cast<retType>(res);
262 }
263 
264 
265 template<class F>
266 auto TProcessExecutor::ProcTree(const std::string& fileName, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
267 {
268  std::vector<std::string> singleFileName(1, fileName);
269  return ProcTree(singleFileName, procFunc, treeName, nToProcess);
270 }
271 
272 
273 template<class F>
274 auto TProcessExecutor::ProcTree(TFileCollection& files, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
275 {
276  std::vector<std::string> fileNames(files.GetNFiles());
277  unsigned count = 0;
278  for(auto f : *static_cast<THashList*>(files.GetList()))
279  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
280 
281  return ProcTree(fileNames, procFunc, treeName, nToProcess);
282 }
283 
284 
285 template<class F>
286 auto TProcessExecutor::ProcTree(TChain& files, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
287 {
288  TObjArray* filelist = files.GetListOfFiles();
289  std::vector<std::string> fileNames(filelist->GetEntries());
290  unsigned count = 0;
291  for(auto f : *filelist)
292  fileNames[count++] = f->GetTitle();
293 
294  return ProcTree(fileNames, procFunc, treeName, nToProcess);
295 }
296 
297 
298 template<class F>
299 auto TProcessExecutor::ProcTree(TTree& tree, F procFunc, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
300 {
301  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
302  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
303 
304  //prepare environment
305  Reset();
306  unsigned nWorkers = GetNWorkers();
307 
308  //fork
309  TPoolProcessor<F> worker(procFunc, &tree, nWorkers, nToProcess);
310  bool ok = Fork(worker);
311  if(!ok) {
312  Error("TProcessExecutor::ProcTree", "[E][C] Could not fork. Aborting operation.");
313  return nullptr;
314  }
315 
316  //divide entries equally between workers
318 
319  //tell workers to start processing entries
320  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
321  std::vector<unsigned> args(nWorkers);
322  std::iota(args.begin(), args.end(), 0);
324  if(fNProcessed < nWorkers)
325  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
326 
327  //collect results, distribute new tasks
328  std::vector<TObject*> reslist;
329  Collect(reslist);
330 
331  //merge
333  auto res = redfunc(reslist);
334 
335  //clean-up and return
336  ReapWorkers();
338  return static_cast<retType>(res);
339 }
340 
341 //////////////////////////////////////////////////////////////////////////
342 /// Handle message and reply to the worker
343 template<class T>
344 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
345 {
346  unsigned code = msg.first;
347  if (code == PoolCode::kFuncResult) {
348  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
350  } else if (code == PoolCode::kIdling) {
351  ReplyToIdle(s);
352  } else if(code == PoolCode::kProcResult) {
353  if(msg.second != nullptr)
354  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
356  } else if(code == PoolCode::kProcError) {
357  const char *str = ReadBuffer<const char*>(msg.second.get());
358  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
359  "Continuing execution ignoring these entries.", str);
360  ReplyToIdle(s);
361  delete [] str;
362  } else {
363  // UNKNOWN CODE
364  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
365  }
366 }
367 
368 //////////////////////////////////////////////////////////////////////////
369 /// Listen for messages sent by the workers and call the appropriate handler function.
370 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
371 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
372 template<class T>
373 void TProcessExecutor::Collect(std::vector<T> &reslist)
374 {
375  TMonitor &mon = GetMonitor();
376  mon.ActivateAll();
377  while (mon.GetActive() > 0) {
378  TSocket *s = mon.Select();
379  MPCodeBufPair msg = MPRecv(s);
380  if (msg.first == MPCode::kRecvError) {
381  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
382  Remove(s);
383  } else if (msg.first < 1000)
384  HandlePoolCode(msg, s, reslist);
385  else
386  HandleMPCode(msg, s);
387  }
388 }
389 
390 } // ROOT namespace
391 
392 #endif
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
The message contains the result of a function execution.
Definition: PoolUtils.h:33
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
An array of TObjects.
Definition: TObjArray.h:39
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:99
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:40
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:39
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
Definition: StringConv.hxx:21
double T(double x)
Definition: ChebyshevPol.h:34
unsigned GetNWorkers() const
Definition: TMPClient.h:40
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
std::size_t size() const
Definition: TSeq.hxx:150
Error while reading from the socket.
Definition: MPCode.h:34
unsigned fNProcessed
number of arguments already passed to the workers
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:38
We are ready for the next task.
Definition: PoolUtils.h:35
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:263
a Map method with arguments is being executed
Merge collection of TObjects.
Definition: PoolUtils.h:63
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
A doubly linked list.
Definition: TList.h:47
#define F(x, y, z)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Execute function with the argument contained in the message.
Definition: PoolUtils.h:32
a ProcTree method is being executed and each worker will process a certain range of each file ...
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
The message contains the result of the processing of a TTree.
Definition: PoolUtils.h:42
no task is being executed
Execute function without arguments.
Definition: PoolUtils.h:31
T Reduce(const std::vector< T > &objs, R redfunc)
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
Tell the client there was an error while processing.
Definition: PoolUtils.h:44
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
double f(double x)
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
This class works together with TProcessExecutor to allow the execution of functions in server process...
Definition: TPoolWorker.h:77
double func(double *x, double *p)
Definition: stressTF1.cxx:213
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void Reset()
Reset TProcessExecutor&#39;s state.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:297
a ProcTree method is being executed and each worker will process a different file ...
iterator end() const
Definition: TSeq.hxx:136
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:494
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:98
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:216
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
ETask
A collection of the types of tasks that TProcessExecutor can execute.
void FixLists(std::vector< TObject *> &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
const Int_t n
Definition: legend1.C:16
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:276
TMonitor & GetMonitor()
Definition: TMPClient.h:36
unsigned GetNWorkers() const
TRandom3 R
a TMatrixD.
Definition: testIO.cxx:28
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
iterator begin() const
Definition: TSeq.hxx:133