Logo ROOT   6.08/07
Reference Guide
TProcessExecutor.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "TEnv.h"
14 #include "TPoolPlayer.h"
15 
16 //////////////////////////////////////////////////////////////////////////
17 ///
18 /// \class TProcessExecutor
19 /// \brief This class provides a simple interface to execute the same task
20 /// multiple times in parallel, possibly with different arguments every
21 /// time. This mimics the behaviour of python's pool.Map method.
22 ///
23 /// ###TProcessExecutor::Map
24 /// The two possible usages of the Map method are:\n
25 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
26 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
27 ///
28 /// For either signature, func is executed as many times as needed by a pool of
29 /// fNWorkers workers; the number of workers can be passed to the constructor
30 /// or set via SetNWorkers. It defaults to the number of cores.\n
31 /// A collection containing the result of each execution is returned.\n
32 /// **Note:** the user is responsible for the deletion of any object that might
33 /// be created upon execution of func, returned objects included: TProcessExecutor never
34 /// deletes what it returns, it simply forgets it.\n
35 /// **Note:** that the usage of TProcessExecutor::Map is indicated only when the task to be
36 /// executed takes more than a few seconds, otherwise the overhead introduced
37 /// by Map will outrun the benefits of parallel execution on most machines.
38 ///
39 /// \param func
40 /// \parblock
41 /// a lambda expression, an std::function, a loaded macro, a
42 /// functor class or a function that takes zero arguments (for the first signature)
43 /// or one (for the second signature).
44 /// \endparblock
45 /// \param args
46 /// \parblock
47 /// a standard container (vector, list, deque), an initializer list
48 /// or a pointer to a TCollection (TList*, TObjArray*, ...).
49 /// \endparblock
50 /// **Note:** the version of TProcessExecutor::Map that takes a TCollection* as argument incurs
51 /// in the overhead of copying data from the TCollection to an STL container. Only
52 /// use it when absolutely necessary.\n
53 /// **Note:** in cases where the function to be executed takes more than
54 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
55 /// in a lambda or via std::bind to give it the right signature.\n
56 /// **Note:** the user should take care of initializing random seeds differently in each
57 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
58 /// might generate the same sequence of pseudo-random numbers.
59 ///
60 /// #### Return value:
61 /// If T derives from TCollection Map returns a TObjArray, otherwise it
62 /// returns an std::vector. In both cases, the elements in the container
63 /// will be the objects returned by func.
64 ///
65 ///
66 /// #### Examples:
67 ///
68 /// ~~~{.cpp}
69 /// root[] TProcessExecutor pool; auto hists = pool.Map(CreateHisto, 10);
70 /// root[] TProcessExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
71 /// ~~~
72 ///
73 /// ###TProcessExecutor::MapReduce
74 /// This set of methods behaves exactly like Map, but takes an additional
75 /// function as a third argument. This function is applied to the set of
76 /// objects returned by the corresponding Map execution to "squash" them
77 /// to a single object.
78 ///
79 /// ####Examples:
80 /// ~~~{.cpp}
81 /// root[] TProcessExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
82 /// root[] TProcessExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
83 /// ~~~
84 ///
85 //////////////////////////////////////////////////////////////////////////
86 
87 namespace ROOT {
88 //////////////////////////////////////////////////////////////////////////
89 /// Class constructor.
90 /// nWorkers is the number of times this ROOT session will be forked, i.e.
91 /// the number of workers that will be spawned.
92 TProcessExecutor::TProcessExecutor(unsigned nWorkers) : TMPClient(nWorkers)
93 {
94  Reset();
95 }
96 
97 //////////////////////////////////////////////////////////////////////////
98 /// TSelector-based tree processing: memory resident tree
100 {
101  //prepare environment
102  Reset();
103  unsigned nWorkers = GetNWorkers();
104  selector.Begin(nullptr);
105 
106  //fork
107  TPoolPlayer worker(selector, &tree, nWorkers, nToProcess/nWorkers);
108  bool ok = Fork(worker);
109  if(!ok) {
110  Error("TProcessExecutor::ProcTree", "[E][C] Could not fork. Aborting operation");
111  return nullptr;
112  }
113 
114  //divide entries equally between workers
116 
117  //tell workers to start processing entries
118  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
119  std::vector<unsigned> args(nWorkers);
120  std::iota(args.begin(), args.end(), 0);
122  if (fNProcessed < nWorkers)
123  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers."
124  " Some entries might not be processed.");
125 
126  //collect results, distribute new tasks
127  std::vector<TObject*> outLists;
128  Collect(outLists);
129 
130  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
131  FixLists(outLists);
132 
134  auto outList = static_cast<TList*>(redfunc(outLists));
135 
136  TList *selList = selector.GetOutputList();
137  for(auto obj : *outList) {
138  selList->Add(obj);
139  }
140  outList->SetOwner(false);
141  delete outList;
142 
143  selector.Terminate();
144 
145  //clean-up and return
146  ReapWorkers();
148  return outList;
149 }
150 
151 //////////////////////////////////////////////////////////////////////////
152 /// TSelector-based tree processing: dataset as a vector of files
153 TList* TProcessExecutor::ProcTree(const std::vector<std::string>& fileNames, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)
154 {
155 
156  //prepare environment
157  Reset();
158  unsigned nWorkers = GetNWorkers();
159  selector.Begin(nullptr);
160 
161  //fork
162  TPoolPlayer worker(selector, fileNames, treeName, nWorkers, nToProcess);
163  bool ok = Fork(worker);
164  if (!ok) {
165  Error("TProcessExecutor::ProcTree", "[E][C] Could not fork. Aborting operation");
166  return nullptr;
167  }
168 
169  Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
170 
171  if (procByFile) {
172  if (fileNames.size() < nWorkers) {
173  // TTree entry granularity: for each file, we divide entries equally between workers
175  // Tell workers to start processing entries
176  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
177  std::vector<unsigned> args(nWorkers);
178  std::iota(args.begin(), args.end(), 0);
180  if (fNProcessed < nWorkers)
181  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers."
182  " Some entries might not be processed");
183  } else {
184  // File granularity: each worker processes one whole file as a single task
186  fNToProcess = fileNames.size();
187  std::vector<unsigned> args(nWorkers);
188  std::iota(args.begin(), args.end(), 0);
190  if (fNProcessed < nWorkers)
191  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers."
192  " Some entries might not be processed.");
193  }
194  } else {
195  // TTree entry granularity: for each file, we divide entries equally between workers
197  // Tell workers to start processing entries
198  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
199  std::vector<unsigned> args(nWorkers);
200  std::iota(args.begin(), args.end(), 0);
202  if (fNProcessed < nWorkers)
203  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers."
204  " Some entries might not be processed.");
205  }
206 
207  // collect results, distribute new tasks
208  std::vector<TObject*> outLists;
209  Collect(outLists);
210 
211  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
212  FixLists(outLists);
213 
215  auto outList = static_cast<TList*>(redfunc(outLists));
216 
217  TList *selList = selector.GetOutputList();
218  for(auto obj : *outList) {
219  selList->Add(obj);
220  }
221  outList->SetOwner(false);
222  delete outList;
223 
224  selector.Terminate();
225 
226  //clean-up and return
227  ReapWorkers();
229  return outList;
230 
231 }
232 
233 //////////////////////////////////////////////////////////////////////////
234 /// TSelector-based tree processing: dataset as a TFileCollection
235 TList* TProcessExecutor::ProcTree(TFileCollection& files, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)
236 {
237  std::vector<std::string> fileNames(files.GetNFiles());
238  unsigned count = 0;
239  for(auto f : *static_cast<THashList*>(files.GetList()))
240  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
241 
242  TList *rl = ProcTree(fileNames, selector, treeName, nToProcess);
243  return rl;
244 }
245 
246 //////////////////////////////////////////////////////////////////////////
247 /// TSelector-based tree processing: dataset as a TChain
248 TList* TProcessExecutor::ProcTree(TChain& files, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)
249 {
250  TObjArray* filelist = files.GetListOfFiles();
251  std::vector<std::string> fileNames(filelist->GetEntries());
252  unsigned count = 0;
253  for(auto f : *filelist)
254  fileNames[count++] = f->GetTitle();
255 
256  return ProcTree(fileNames, selector, treeName, nToProcess);
257 }
258 
259 //////////////////////////////////////////////////////////////////////////
260 /// TSelector-based tree processing: dataset as a single file
261 TList* TProcessExecutor::ProcTree(const std::string& fileName, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)
262 {
263  std::vector<std::string> singleFileName(1, fileName);
264  return ProcTree(singleFileName, selector, treeName, nToProcess);
265 }
266 
267 /// Fix list of lists before merging (to avoid errors about duplicated objects)
268 void TProcessExecutor::FixLists(std::vector<TObject*> &lists) {
269 
270  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
271  TList *firstlist = new TList;
272  TList *oldlist = (TList *) lists[0];
273  TIter nxo(oldlist);
274  TObject *o = 0;
275  while ((o = nxo())) { firstlist->Add(o); }
276  oldlist->SetOwner(kFALSE);
277  lists.erase(lists.begin());
278  lists.insert(lists.begin(), firstlist);
279  delete oldlist;
280 }
281 
282 //////////////////////////////////////////////////////////////////////////
283 /// Reset TProcessExecutor's state.
285 {
286  fNProcessed = 0;
287  fNToProcess = 0;
289 }
290 
291 
292 //////////////////////////////////////////////////////////////////////////
293 /// Reply to a worker who just sent a result.
294 /// If another argument to process exists, tell the worker. Otherwise
295 /// send a shutdown order.
297 {
298  if (fNProcessed < fNToProcess) {
299  //this cannot be a "greedy worker" task
300  if (fTaskType == ETask::kMap)
302  else if (fTaskType == ETask::kMapWithArg)
304  ++fNProcessed;
305  } else //whatever the task is, we are done
307 }
308 
309 
310 //////////////////////////////////////////////////////////////////////////
311 /// Reply to a worker who is idle.
312 /// If another argument to process exists, tell the worker. Otherwise
313 /// ask for a result
315 {
316  if (fNProcessed < fNToProcess) {
317  //we are executing a "greedy worker" task
320  else if (fTaskType == ETask::kMap)
322  else if (fTaskType == ETask::kProcByRange)
324  else if (fTaskType == ETask::kProcByFile)
326  ++fNProcessed;
327  } else
329 }
330 
331 } // namespace ROOT
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
An array of TObjects.
Definition: TObjArray.h:39
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:99
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:40
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:39
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
Definition: StringConv.hxx:21
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
int Int_t
Definition: RtypesCore.h:41
const Bool_t kFALSE
Definition: Rtypes.h:92
unsigned fNProcessed
number of arguments already passed to the workers
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:38
TObjArray * GetListOfFiles() const
Definition: TChain.h:109
a Map method with arguments is being executed
virtual void Begin(TTree *)
Definition: TSelector.h:62
Merge collection of TObjects.
Definition: PoolUtils.h:63
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
A doubly linked list.
Definition: TList.h:47
THashList * GetList()
Long64_t GetNFiles() const
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:496
Execute function with the argument contained in the message.
Definition: PoolUtils.h:32
a ProcTree method is being executed and each worker will process a certain range of each file ...
a Map method with no arguments is being executed
virtual void Terminate()
Definition: TSelector.h:78
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Execute function without arguments.
Definition: PoolUtils.h:31
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
double f(double x)
Ask for a kFuncResult/kProcResult.
Definition: PoolUtils.h:36
unsigned fNToProcess
total number of arguments to pass to the workers
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
unsigned long long ULong64_t
Definition: RtypesCore.h:70
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
void Reset()
Reset TProcessExecutor&#39;s state.
a ProcTree method is being executed and each worker will process a different file ...
Mother of all ROOT objects.
Definition: TObject.h:37
virtual void Add(TObject *obj)
Definition: TList.h:81
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:494
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:98
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:216
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
void FixLists(std::vector< TObject *> &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:276
unsigned GetNWorkers() const
virtual TList * GetOutputList() const
Definition: TSelector.h:76
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.