Logo ROOT   6.10/09
Reference Guide
TTreeProcessorMP.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "TEnv.h"
15 #include "TMPWorkerTree.h"
16 
17 //////////////////////////////////////////////////////////////////////////
18 ///
19 /// \class ROOT::TTreeProcessorMP
20 /// \brief This class provides an interface to process a TTree dataset
21 /// in parallel with multi-process technology
22 ///
23 /// ###ROOT::TTreeProcessorMP::Process
24 /// The possible usages of the Process method are the following:\n
25 /// * Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess):
26 /// func is executed nToProcess times with argument a TTreeReader&, initialized for
27 /// the TTree with name treeName, from the dataset <dataset>. The dataset can be
28 /// expressed as:
29 /// const std::string& fileName -> single file name
30 /// const std::vector<std::string>& fileNames -> vector of file names
31 /// TFileCollection& files -> collection of TFileInfo objects
32 /// TChain& files -> TChain with the file paths
33 /// TTree& tree -> Reference to an existing TTree object
34 ///
35 /// For legacy, the following signature is also supported:
36 /// * Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess):
37 /// where selector is a TSelector derived class describing the analysis and the other arguments
38 /// have the same meaning as above.
39 ///
40 /// For either set of signatures, the processing function is executed as many times as
41 /// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
42 /// or set via SetNWorkers. It defaults to the number of cores.\n
43 /// A collection containing the result of each execution is returned.\n
44 /// **Note:** the user is responsible for the deletion of any object that might
45 /// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
46 /// deletes what it returns, it simply forgets it.\n
47 /// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
48 /// executed takes more than a few seconds, otherwise the overhead introduced
49 /// by Process will outrun the benefits of parallel execution on most machines.
50 ///
51 /// \param func
52 /// \parblock
53 /// a lambda expression, an std::function, a loaded macro, a
54 /// functor class or a function that takes zero arguments (for the first signature)
55 /// or one (for the second signature).
56 /// \endparblock
57 /// \param args
58 /// \parblock
59 /// a standard container (vector, list, deque), an initializer list
60 /// or a pointer to a TCollection (TList*, TObjArray*, ...).
61 /// \endparblock
62 /// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
63 /// in the overhead of copying data from the TCollection to an STL container. Only
64 /// use it when absolutely necessary.\n
65 /// **Note:** in cases where the function to be executed takes more than
66 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
67 /// in a lambda or via std::bind to give it the right signature.\n
68 /// **Note:** the user should take care of initializing random seeds differently in each
69 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
70 /// might generate the same sequence of pseudo-random numbers.
71 ///
72 /// #### Return value:
73 /// Methods taking 'F func' return the return type of F.
74 /// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
75 /// content is owned by the caller.
76 ///
77 /// #### Examples:
78 ///
79 /// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
80 ///
81 //////////////////////////////////////////////////////////////////////////
82 
83 namespace ROOT {
84 //////////////////////////////////////////////////////////////////////////
85 /// Class constructor.
86 /// nWorkers is the number of times this ROOT session will be forked, i.e.
87 /// the number of workers that will be spawned.
89 {
90  Reset();
91 }
92 
93 //////////////////////////////////////////////////////////////////////////
94 /// TSelector-based tree processing: memory resident tree
95 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, TEntryList &entries, ULong64_t nToProcess,
96  ULong64_t jFirst)
97 {
98 
99  // Warn for yet unimplemented functionality
100  if (jFirst > 0) {
101  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
102  jFirst = 0;
103  }
104 
105  //prepare environment
106  Reset();
107  UInt_t nWorkers = GetNWorkers();
108  selector.Begin(nullptr);
109 
110  // Check the entry list
111  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
112  //fork
113  TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
114  bool ok = Fork(worker);
115  if(!ok) {
116  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
117  return nullptr;
118  }
119 
120  //divide entries equally between workers
121  fTaskType = ETask::kProcByRange;
122 
123  //tell workers to start processing entries
124  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
125  std::vector<UInt_t> args(nWorkers);
126  std::iota(args.begin(), args.end(), 0);
127  fNProcessed = Broadcast(MPCode::kProcTree, args);
128  if (fNProcessed < nWorkers)
129  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
130  " Some entries might not be processed.");
131 
132  //collect results, distribute new tasks
133  std::vector<TObject*> outLists;
134  Collect(outLists);
135 
136  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
137  FixLists(outLists);
138 
140  auto outList = static_cast<TList*>(redfunc(outLists));
141 
142  // Import the resulting list in the selector
143  selector.ImportOutput(outList);
144  // outList is empty after this: just delete it
145  delete outList;
146 
147  // Finalize the selector tasks
148  selector.Terminate();
149 
150  //clean-up and return
151  ReapWorkers();
152  fTaskType = ETask::kNoTask;
153  return selector.GetOutputList();
154 }
155 
156 //////////////////////////////////////////////////////////////////////////
157 /// TSelector-based tree processing: dataset as a vector of files
158 TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
159  const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
160 {
161 
162  // Warn for yet unimplemented functionality
163  if (jFirst > 0) {
164  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
165  jFirst = 0;
166  }
167 
168  //prepare environment
169  Reset();
170  UInt_t nWorkers = GetNWorkers();
171  selector.Begin(nullptr);
172 
173  // Check the entry list
174  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
175  //fork
176  TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
177  bool ok = Fork(worker);
178  if (!ok) {
179  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
180  return nullptr;
181  }
182 
183  Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
184 
185  if (procByFile) {
186  if (fileNames.size() < nWorkers) {
187  // TTree entry granularity: for each file, we divide entries equally between workers
188  fTaskType = ETask::kProcByRange;
189  // Tell workers to start processing entries
190  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
191  std::vector<UInt_t> args(nWorkers);
192  std::iota(args.begin(), args.end(), 0);
193  fNProcessed = Broadcast(MPCode::kProcRange, args);
194  if (fNProcessed < nWorkers)
195  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
196  " Some entries might not be processed");
197  } else {
198  // File granularity: each worker processes one whole file as a single task
199  fTaskType = ETask::kProcByFile;
200  fNToProcess = fileNames.size();
201  std::vector<UInt_t> args(nWorkers);
202  std::iota(args.begin(), args.end(), 0);
203  fNProcessed = Broadcast(MPCode::kProcFile, args);
204  if (fNProcessed < nWorkers)
205  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
206  " Some entries might not be processed.");
207  }
208  } else {
209  // TTree entry granularity: for each file, we divide entries equally between workers
210  fTaskType = ETask::kProcByRange;
211  // Tell workers to start processing entries
212  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
213  std::vector<UInt_t> args(nWorkers);
214  std::iota(args.begin(), args.end(), 0);
215  fNProcessed = Broadcast(MPCode::kProcRange, args);
216  if (fNProcessed < nWorkers)
217  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
218  " Some entries might not be processed.");
219  }
220 
221  // collect results, distribute new tasks
222  std::vector<TObject*> outLists;
223  Collect(outLists);
224 
225  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
226  FixLists(outLists);
227 
229  auto outList = static_cast<TList*>(redfunc(outLists));
230 
231  // Import the resulting list in the selector
232  selector.ImportOutput(outList);
233  // outList is empty after this: just delete it
234  delete outList;
235 
236  // Finalize the selector tasks
237  selector.Terminate();
238 
239  //clean-up and return
240  ReapWorkers();
241  fTaskType = ETask::kNoTask;
242 
243  return selector.GetOutputList();
244 }
245 
246 //////////////////////////////////////////////////////////////////////////
247 /// TSelector-based tree processing: dataset as a TFileCollection
248 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, TEntryList &entries,
249  const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
250 {
251  std::vector<std::string> fileNames(files.GetNFiles());
252  UInt_t count = 0;
253  for(auto f : *static_cast<THashList*>(files.GetList()))
254  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
255 
256  TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
257  return rl;
258 }
259 
260 //////////////////////////////////////////////////////////////////////////
261 /// TSelector-based tree processing: dataset as a TChain
262 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
263  ULong64_t nToProcess, ULong64_t firstEntry)
264 {
265  TObjArray* filelist = files.GetListOfFiles();
266  std::vector<std::string> fileNames(filelist->GetEntries());
267  UInt_t count = 0;
268  for(auto f : *filelist)
269  fileNames[count++] = f->GetTitle();
270 
271  return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
272 }
273 
274 //////////////////////////////////////////////////////////////////////////
275 /// TSelector-based tree processing: dataset as a single file
276 TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
277  const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
278 {
279  std::vector<std::string> singleFileName(1, fileName);
280  return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
281 }
282 
283 ///
284 /// No TEntryList versions of selector processor
285 ///
286 
287 TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
288  const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
289 {
290  TEntryList noelist;
291  return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
292 }
293 
294 TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
295  ULong64_t nToProcess, ULong64_t jFirst)
296 {
297  TEntryList noelist;
298  return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
299 }
300 
301 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, const std::string &treeName,
302  ULong64_t nToProcess, ULong64_t jFirst)
303 {
304  TEntryList noelist;
305  return Process(files, selector, noelist, treeName, nToProcess, jFirst);
306 }
307 
308 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, const std::string &treeName, ULong64_t nToProcess,
309  ULong64_t jFirst)
310 {
311  TEntryList noelist;
312  return Process(files, selector, noelist, treeName, nToProcess, jFirst);
313 }
314 
315 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, ULong64_t nToProcess, ULong64_t jFirst)
316 {
317  TEntryList noelist;
318  return Process(tree, selector, noelist, nToProcess, jFirst);
319 }
320 
321 /// Fix list of lists before merging (to avoid errors about duplicated objects)
322 void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
323 
324  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
325  TList *firstlist = new TList;
326  TList *oldlist = (TList *) lists[0];
327  TIter nxo(oldlist);
328  TObject *o = 0;
329  while ((o = nxo())) { firstlist->Add(o); }
330  oldlist->SetOwner(kFALSE);
331  lists.erase(lists.begin());
332  lists.insert(lists.begin(), firstlist);
333  delete oldlist;
334 }
335 
336 //////////////////////////////////////////////////////////////////////////
337 /// Reset TTreeProcessorMP's state.
339 {
340  fNProcessed = 0;
341  fNToProcess = 0;
342  fTaskType = ETask::kNoTask;
343 }
344 
345 //////////////////////////////////////////////////////////////////////////
346 /// Reply to a worker who is idle.
347 /// If still events to process, tell the worker. Otherwise
348 /// ask for a result
349 void TTreeProcessorMP::ReplyToIdle(TSocket *s)
350 {
351  if (fNProcessed < fNToProcess) {
352  //we are executing a "greedy worker" task
353  if (fTaskType == ETask::kProcByRange)
354  MPSend(s, MPCode::kProcRange, fNProcessed);
355  else if (fTaskType == ETask::kProcByFile)
356  MPSend(s, MPCode::kProcFile, fNProcessed);
357  ++fNProcessed;
358  } else
360 }
361 
362 } // namespace ROOT
An array of TObjects.
Definition: TObjArray.h:37
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t IsValid() const
Definition: TEntryList.h:81
virtual void ImportOutput(TList *output)
Imports the content of &#39;output&#39; in the internal output list.
Definition: TSelector.cxx:270
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
int Int_t
Definition: RtypesCore.h:41
TObjArray * GetListOfFiles() const
Definition: TChain.h:107
virtual Long64_t Process(const char *filename, Option_t *option, Long64_t nentries, Long64_t firstentry)
Process this tree executing the TSelector code in the specified filename.
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
virtual void Begin(TTree *)
Definition: TSelector.h:56
Merge collection of TObjects.
Definition: PoolUtils.h:35
A doubly linked list.
Definition: TList.h:43
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition: MPCode.h:38
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: MPCode.h:39
THashList * GetList()
Long64_t GetNFiles() const
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:482
TTreeProcessorMP(unsigned nWorkers=0)
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:873
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition: MPCode.h:40
void Reset(Detail::TBranchProxy *x)
virtual void Terminate()
Definition: TSelector.h:73
const Bool_t kFALSE
Definition: RtypesCore.h:92
double f(double x)
R__EXTERN TEnv * gEnv
Definition: TEnv.h:170
unsigned long long ULong64_t
Definition: RtypesCore.h:70
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
Templated derivation of TMPWorkerTree handlign selector tree processing.
Mother of all ROOT objects.
Definition: TObject.h:37
virtual void Add(TObject *obj)
Definition: TList.h:77
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:494
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:78
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:33
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:25
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:859
virtual TList * GetOutputList() const
Definition: TSelector.h:71