Logo ROOT   6.14/05
Reference Guide
TTreeProcessorMP.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "TEnv.h"
15 #include "TMPWorkerTree.h"
16 
17 //////////////////////////////////////////////////////////////////////////
18 ///
19 /// \class ROOT::TTreeProcessorMP
20 /// \ingroup Parallelism
21 /// \brief This class provides an interface to process a TTree dataset
22 /// in parallel with multi-process technology
23 ///
24 /// ###ROOT::TTreeProcessorMP::Process
25 /// The possible usages of the Process method are the following:\n
26 /// * Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess):
27 /// func is executed nToProcess times with argument a TTreeReader&, initialized for
28 /// the TTree with name treeName, from the dataset <dataset>. The dataset can be
29 /// expressed as:
30 /// const std::string& fileName -> single file name
31 /// const std::vector<std::string>& fileNames -> vector of file names
32 /// TFileCollection& files -> collection of TFileInfo objects
33 /// TChain& files -> TChain with the file paths
34 /// TTree& tree -> Reference to an existing TTree object
35 ///
36 /// For legacy, the following signature is also supported:
37 /// * Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess):
38 /// where selector is a TSelector derived class describing the analysis and the other arguments
39 /// have the same meaning as above.
40 ///
41 /// For either set of signatures, the processing function is executed as many times as
42 /// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
43 /// or set via SetNWorkers. It defaults to the number of cores.\n
44 /// A collection containing the result of each execution is returned.\n
45 /// **Note:** the user is responsible for the deletion of any object that might
46 /// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
47 /// deletes what it returns, it simply forgets it.\n
48 /// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
49 /// executed takes more than a few seconds, otherwise the overhead introduced
50 /// by Process will outrun the benefits of parallel execution on most machines.
51 ///
52 /// \param func
53 /// \parblock
54 /// a lambda expression, an std::function, a loaded macro, a
55 /// functor class or a function that takes zero arguments (for the first signature)
56 /// or one (for the second signature).
57 /// \endparblock
58 /// \param args
59 /// \parblock
60 /// a standard container (vector, list, deque), an initializer list
61 /// or a pointer to a TCollection (TList*, TObjArray*, ...).
62 /// \endparblock
63 /// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
64 /// in the overhead of copying data from the TCollection to an STL container. Only
65 /// use it when absolutely necessary.\n
66 /// **Note:** in cases where the function to be executed takes more than
67 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
68 /// in a lambda or via std::bind to give it the right signature.\n
69 /// **Note:** the user should take care of initializing random seeds differently in each
70 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
71 /// might generate the same sequence of pseudo-random numbers.
72 ///
73 /// #### Return value:
74 /// Methods taking 'F func' return the return type of F.
75 /// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
76 /// content is owned by the caller.
77 ///
78 /// #### Examples:
79 ///
80 /// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
81 ///
82 //////////////////////////////////////////////////////////////////////////
83 
84 namespace ROOT {
85 //////////////////////////////////////////////////////////////////////////
86 /// Class constructor.
87 /// nWorkers is the number of times this ROOT session will be forked, i.e.
88 /// the number of workers that will be spawned.
90 {
91  Reset();
92 }
93 
94 //////////////////////////////////////////////////////////////////////////
95 /// TSelector-based tree processing: memory resident tree
96 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, TEntryList &entries, ULong64_t nToProcess,
97  ULong64_t jFirst)
98 {
99 
100  // Warn for yet unimplemented functionality
101  if (jFirst > 0) {
102  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
103  jFirst = 0;
104  }
105 
106  //prepare environment
107  Reset();
108  UInt_t nWorkers = GetNWorkers();
109  selector.Begin(nullptr);
110 
111  // Check the entry list
112  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
113  //fork
114  TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
115  bool ok = Fork(worker);
116  if(!ok) {
117  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
118  return nullptr;
119  }
120 
121  //divide entries equally between workers
122  fTaskType = ETask::kProcByRange;
123 
124  //tell workers to start processing entries
125  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
126  std::vector<UInt_t> args(nWorkers);
127  std::iota(args.begin(), args.end(), 0);
128  fNProcessed = Broadcast(MPCode::kProcTree, args);
129  if (fNProcessed < nWorkers)
130  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
131  " Some entries might not be processed.");
132 
133  //collect results, distribute new tasks
134  std::vector<TObject*> outLists;
135  Collect(outLists);
136 
137  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
138  FixLists(outLists);
139 
141  auto outList = static_cast<TList*>(redfunc(outLists));
142 
143  // Import the resulting list in the selector
144  selector.ImportOutput(outList);
145  // outList is empty after this: just delete it
146  delete outList;
147 
148  // Finalize the selector tasks
149  selector.Terminate();
150 
151  //clean-up and return
152  ReapWorkers();
153  fTaskType = ETask::kNoTask;
154  return selector.GetOutputList();
155 }
156 
157 //////////////////////////////////////////////////////////////////////////
158 /// TSelector-based tree processing: dataset as a vector of files
159 TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
160  const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
161 {
162 
163  // Warn for yet unimplemented functionality
164  if (jFirst > 0) {
165  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
166  jFirst = 0;
167  }
168 
169  //prepare environment
170  Reset();
171  UInt_t nWorkers = GetNWorkers();
172  selector.Begin(nullptr);
173 
174  // Check the entry list
175  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
176  //fork
177  TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
178  bool ok = Fork(worker);
179  if (!ok) {
180  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
181  return nullptr;
182  }
183 
184  Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
185 
186  if (procByFile) {
187  if (fileNames.size() < nWorkers) {
188  // TTree entry granularity: for each file, we divide entries equally between workers
189  fTaskType = ETask::kProcByRange;
190  // Tell workers to start processing entries
191  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
192  std::vector<UInt_t> args(nWorkers);
193  std::iota(args.begin(), args.end(), 0);
194  fNProcessed = Broadcast(MPCode::kProcRange, args);
195  if (fNProcessed < nWorkers)
196  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
197  " Some entries might not be processed");
198  } else {
199  // File granularity: each worker processes one whole file as a single task
200  fTaskType = ETask::kProcByFile;
201  fNToProcess = fileNames.size();
202  std::vector<UInt_t> args(nWorkers);
203  std::iota(args.begin(), args.end(), 0);
204  fNProcessed = Broadcast(MPCode::kProcFile, args);
205  if (fNProcessed < nWorkers)
206  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
207  " Some entries might not be processed.");
208  }
209  } else {
210  // TTree entry granularity: for each file, we divide entries equally between workers
211  fTaskType = ETask::kProcByRange;
212  // Tell workers to start processing entries
213  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
214  std::vector<UInt_t> args(nWorkers);
215  std::iota(args.begin(), args.end(), 0);
216  fNProcessed = Broadcast(MPCode::kProcRange, args);
217  if (fNProcessed < nWorkers)
218  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
219  " Some entries might not be processed.");
220  }
221 
222  // collect results, distribute new tasks
223  std::vector<TObject*> outLists;
224  Collect(outLists);
225 
226  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
227  FixLists(outLists);
228 
230  auto outList = static_cast<TList*>(redfunc(outLists));
231 
232  // Import the resulting list in the selector
233  selector.ImportOutput(outList);
234  // outList is empty after this: just delete it
235  delete outList;
236 
237  // Finalize the selector tasks
238  selector.Terminate();
239 
240  //clean-up and return
241  ReapWorkers();
242  fTaskType = ETask::kNoTask;
243 
244  return selector.GetOutputList();
245 }
246 
247 //////////////////////////////////////////////////////////////////////////
248 /// TSelector-based tree processing: dataset as a TFileCollection
249 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, TEntryList &entries,
250  const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
251 {
252  std::vector<std::string> fileNames(files.GetNFiles());
253  UInt_t count = 0;
254  for(auto f : *static_cast<THashList*>(files.GetList()))
255  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
256 
257  TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
258  return rl;
259 }
260 
261 //////////////////////////////////////////////////////////////////////////
262 /// TSelector-based tree processing: dataset as a TChain
263 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
264  ULong64_t nToProcess, ULong64_t firstEntry)
265 {
266  TObjArray* filelist = files.GetListOfFiles();
267  std::vector<std::string> fileNames(filelist->GetEntries());
268  UInt_t count = 0;
269  for(auto f : *filelist)
270  fileNames[count++] = f->GetTitle();
271 
272  return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
273 }
274 
275 //////////////////////////////////////////////////////////////////////////
276 /// TSelector-based tree processing: dataset as a single file
277 TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
278  const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
279 {
280  std::vector<std::string> singleFileName(1, fileName);
281  return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
282 }
283 
284 ///
285 /// No TEntryList versions of selector processor
286 ///
287 
288 TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
289  const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
290 {
291  TEntryList noelist;
292  return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
293 }
294 
295 TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
296  ULong64_t nToProcess, ULong64_t jFirst)
297 {
298  TEntryList noelist;
299  return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
300 }
301 
302 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, const std::string &treeName,
303  ULong64_t nToProcess, ULong64_t jFirst)
304 {
305  TEntryList noelist;
306  return Process(files, selector, noelist, treeName, nToProcess, jFirst);
307 }
308 
309 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, const std::string &treeName, ULong64_t nToProcess,
310  ULong64_t jFirst)
311 {
312  TEntryList noelist;
313  return Process(files, selector, noelist, treeName, nToProcess, jFirst);
314 }
315 
316 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, ULong64_t nToProcess, ULong64_t jFirst)
317 {
318  TEntryList noelist;
319  return Process(tree, selector, noelist, nToProcess, jFirst);
320 }
321 
322 /// Fix list of lists before merging (to avoid errors about duplicated objects)
323 void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
324 
325  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
326  TList *firstlist = new TList;
327  TList *oldlist = (TList *) lists[0];
328  TIter nxo(oldlist);
329  TObject *o = 0;
330  while ((o = nxo())) { firstlist->Add(o); }
331  oldlist->SetOwner(kFALSE);
332  lists.erase(lists.begin());
333  lists.insert(lists.begin(), firstlist);
334  delete oldlist;
335 }
336 
337 //////////////////////////////////////////////////////////////////////////
338 /// Reset TTreeProcessorMP's state.
340 {
341  fNProcessed = 0;
342  fNToProcess = 0;
343  fTaskType = ETask::kNoTask;
344 }
345 
346 //////////////////////////////////////////////////////////////////////////
347 /// Reply to a worker who is idle.
348 /// If still events to process, tell the worker. Otherwise
349 /// ask for a result
350 void TTreeProcessorMP::ReplyToIdle(TSocket *s)
351 {
352  if (fNProcessed < fNToProcess) {
353  //we are executing a "greedy worker" task
354  if (fTaskType == ETask::kProcByRange)
355  MPSend(s, MPCode::kProcRange, fNProcessed);
356  else if (fTaskType == ETask::kProcByFile)
357  MPSend(s, MPCode::kProcFile, fNProcessed);
358  ++fNProcessed;
359  } else
361 }
362 
363 } // namespace ROOT
An array of TObjects.
Definition: TObjArray.h:37
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t IsValid() const
Definition: TEntryList.h:81
virtual void ImportOutput(TList *output)
Imports the content of &#39;output&#39; in the internal output list.
Definition: TSelector.cxx:270
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define f(i)
Definition: RSha256.hxx:104
int Int_t
Definition: RtypesCore.h:41
TObjArray * GetListOfFiles() const
Definition: TChain.h:107
virtual Long64_t Process(const char *filename, Option_t *option, Long64_t nentries, Long64_t firstentry)
Process this tree executing the TSelector code in the specified filename.
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
virtual void Begin(TTree *)
Definition: TSelector.h:56
Merge collection of TObjects.
Definition: PoolUtils.h:35
A doubly linked list.
Definition: TList.h:44
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition: MPCode.h:38
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: MPCode.h:39
THashList * GetList()
Long64_t GetNFiles() const
TTreeProcessorMP(unsigned nWorkers=0)
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition: MPCode.h:40
void Reset(Detail::TBranchProxy *x)
virtual void Terminate()
Definition: TSelector.h:73
const Bool_t kFALSE
Definition: RtypesCore.h:88
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
unsigned long long ULong64_t
Definition: RtypesCore.h:70
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
static constexpr double s
Templated derivation of TMPWorkerTree handlign selector tree processing.
Mother of all ROOT objects.
Definition: TObject.h:37
virtual void Add(TObject *obj)
Definition: TList.h:87
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:522
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:70
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:33
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:25
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
virtual TList * GetOutputList() const
Definition: TSelector.h:71