Logo ROOT  
Reference Guide
TTreeProcessorMP.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "TEnv.h"
15#include "TMPWorkerTree.h"
16
17//////////////////////////////////////////////////////////////////////////
18///
19/// \class ROOT::TTreeProcessorMP
20/// \ingroup Parallelism
21/// \brief This class provides an interface to process a TTree dataset
22/// in parallel with multi-process technology
23///
24/// ###ROOT::TTreeProcessorMP::Process
25/// The possible usages of the Process method are the following:\n
26/// * Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess):
27/// func is executed nToProcess times with argument a TTreeReader&, initialized for
28/// the TTree with name treeName, from the dataset <dataset>. The dataset can be
29/// expressed as:
30/// const std::string& fileName -> single file name
31/// const std::vector<std::string>& fileNames -> vector of file names
32/// TFileCollection& files -> collection of TFileInfo objects
33/// TChain& files -> TChain with the file paths
34/// TTree& tree -> Reference to an existing TTree object
35///
36/// For legacy, the following signature is also supported:
37/// * Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess):
38/// where selector is a TSelector derived class describing the analysis and the other arguments
39/// have the same meaning as above.
40///
41/// For either set of signatures, the processing function is executed as many times as
42/// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
43/// or set via SetNWorkers. It defaults to the number of cores.\n
44/// A collection containing the result of each execution is returned.\n
45/// **Note:** the user is responsible for the deletion of any object that might
46/// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
47/// deletes what it returns, it simply forgets it.\n
48/// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
49/// executed takes more than a few seconds, otherwise the overhead introduced
50/// by Process will outrun the benefits of parallel execution on most machines.
51///
52/// \param func
53/// \parblock
54/// a lambda expression, an std::function, a loaded macro, a
55/// functor class or a function that takes zero arguments (for the first signature)
56/// or one (for the second signature).
57/// \endparblock
58/// \param args
59/// \parblock
60/// a standard container (vector, list, deque), an initializer list
61/// or a pointer to a TCollection (TList*, TObjArray*, ...).
62/// \endparblock
63/// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
64/// in the overhead of copying data from the TCollection to an STL container. Only
65/// use it when absolutely necessary.\n
66/// **Note:** in cases where the function to be executed takes more than
67/// zero/one argument but all are fixed except zero/one, the function can be wrapped
68/// in a lambda or via std::bind to give it the right signature.\n
69/// **Note:** the user should take care of initializing random seeds differently in each
70/// process (e.g. using the process id in the seed). Otherwise several parallel executions
71/// might generate the same sequence of pseudo-random numbers.
72///
73/// #### Return value:
74/// Methods taking 'F func' return the return type of F.
75/// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
76/// content is owned by the caller.
77///
78/// #### Examples:
79///
80/// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
81///
82//////////////////////////////////////////////////////////////////////////
83
84namespace ROOT {
85//////////////////////////////////////////////////////////////////////////
86/// Class constructor.
87/// nWorkers is the number of times this ROOT session will be forked, i.e.
88/// the number of workers that will be spawned.
90{
91 Reset();
92}
93
94//////////////////////////////////////////////////////////////////////////
95/// TSelector-based tree processing: memory resident tree
96TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, TEntryList &entries, ULong64_t nToProcess,
97 ULong64_t jFirst)
98{
99
100 // Warn for yet unimplemented functionality
101 if (jFirst > 0) {
102 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
103 jFirst = 0;
104 }
105
106 //prepare environment
107 Reset();
108 UInt_t nWorkers = GetNWorkers();
109 selector.Begin(nullptr);
110
111 // Check the entry list
112 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
113 //fork
114 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
115 bool ok = Fork(worker);
116 if(!ok) {
117 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
118 return nullptr;
119 }
120
121 //divide entries equally between workers
122 fTaskType = ETask::kProcByRange;
123
124 //tell workers to start processing entries
125 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
126 std::vector<UInt_t> args(nWorkers);
127 std::iota(args.begin(), args.end(), 0);
128 fNProcessed = Broadcast(MPCode::kProcTree, args);
129 if (fNProcessed < nWorkers)
130 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
131 " Some entries might not be processed.");
132
133 //collect results, distribute new tasks
134 std::vector<TObject*> outLists;
135 Collect(outLists);
136
137 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
138 FixLists(outLists);
139
141 auto outList = static_cast<TList*>(redfunc(outLists));
142
143 // Import the resulting list in the selector
144 selector.ImportOutput(outList);
145 // outList is empty after this: just delete it
146 delete outList;
147
148 // Finalize the selector tasks
149 selector.Terminate();
150
151 //clean-up and return
152 ReapWorkers();
153 fTaskType = ETask::kNoTask;
154 return selector.GetOutputList();
155}
156
157//////////////////////////////////////////////////////////////////////////
158/// TSelector-based tree processing: dataset as a vector of files
159TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
160 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
161{
162
163 // Warn for yet unimplemented functionality
164 if (jFirst > 0) {
165 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
166 jFirst = 0;
167 }
168
169 //prepare environment
170 Reset();
171 UInt_t nWorkers = GetNWorkers();
172 selector.Begin(nullptr);
173
174 // Check the entry list
175 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
176 //fork
177 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
178 bool ok = Fork(worker);
179 if (!ok) {
180 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
181 return nullptr;
182 }
183
184 Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
185
186 if (procByFile) {
187 if (fileNames.size() < nWorkers) {
188 // TTree entry granularity: for each file, we divide entries equally between workers
189 fTaskType = ETask::kProcByRange;
190 // Tell workers to start processing entries
191 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
192 std::vector<UInt_t> args(nWorkers);
193 std::iota(args.begin(), args.end(), 0);
194 fNProcessed = Broadcast(MPCode::kProcRange, args);
195 if (fNProcessed < nWorkers)
196 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
197 " Some entries might not be processed");
198 } else {
199 // File granularity: each worker processes one whole file as a single task
200 fTaskType = ETask::kProcByFile;
201 fNToProcess = fileNames.size();
202 std::vector<UInt_t> args(nWorkers);
203 std::iota(args.begin(), args.end(), 0);
204 fNProcessed = Broadcast(MPCode::kProcFile, args);
205 if (fNProcessed < nWorkers)
206 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
207 " Some entries might not be processed.");
208 }
209 } else {
210 // TTree entry granularity: for each file, we divide entries equally between workers
211 fTaskType = ETask::kProcByRange;
212 // Tell workers to start processing entries
213 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
214 std::vector<UInt_t> args(nWorkers);
215 std::iota(args.begin(), args.end(), 0);
216 fNProcessed = Broadcast(MPCode::kProcRange, args);
217 if (fNProcessed < nWorkers)
218 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
219 " Some entries might not be processed.");
220 }
221
222 // collect results, distribute new tasks
223 std::vector<TObject*> outLists;
224 Collect(outLists);
225
226 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
227 FixLists(outLists);
228
230 auto outList = static_cast<TList*>(redfunc(outLists));
231
232 // Import the resulting list in the selector
233 selector.ImportOutput(outList);
234 // outList is empty after this: just delete it
235 delete outList;
236
237 // Finalize the selector tasks
238 selector.Terminate();
239
240 //clean-up and return
241 ReapWorkers();
242 fTaskType = ETask::kNoTask;
243
244 return selector.GetOutputList();
245}
246
247//////////////////////////////////////////////////////////////////////////
248/// TSelector-based tree processing: dataset as a TFileCollection
249TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, TEntryList &entries,
250 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
251{
252 std::vector<std::string> fileNames(files.GetNFiles());
253 UInt_t count = 0;
254 for(auto f : *static_cast<THashList*>(files.GetList()))
255 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
256
257 TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
258 return rl;
259}
260
261//////////////////////////////////////////////////////////////////////////
262/// TSelector-based tree processing: dataset as a TChain
263TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
264 ULong64_t nToProcess, ULong64_t firstEntry)
265{
266 TObjArray* filelist = files.GetListOfFiles();
267 std::vector<std::string> fileNames(filelist->GetEntries());
268 UInt_t count = 0;
269 for(auto f : *filelist)
270 fileNames[count++] = f->GetTitle();
271
272 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
273}
274
275//////////////////////////////////////////////////////////////////////////
276/// TSelector-based tree processing: dataset as a single file
277TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
278 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
279{
280 std::vector<std::string> singleFileName(1, fileName);
281 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
282}
283
284///
285/// No TEntryList versions of selector processor
286///
287
288TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
289 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
290{
291 TEntryList noelist;
292 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
293}
294
295TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
296 ULong64_t nToProcess, ULong64_t jFirst)
297{
298 TEntryList noelist;
299 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
300}
301
302TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, const std::string &treeName,
303 ULong64_t nToProcess, ULong64_t jFirst)
304{
305 TEntryList noelist;
306 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
307}
308
309TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, const std::string &treeName, ULong64_t nToProcess,
310 ULong64_t jFirst)
311{
312 TEntryList noelist;
313 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
314}
315
316TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, ULong64_t nToProcess, ULong64_t jFirst)
317{
318 TEntryList noelist;
319 return Process(tree, selector, noelist, nToProcess, jFirst);
320}
321
322/// Fix list of lists before merging (to avoid errors about duplicated objects)
323void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
324
325 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
326 TList *firstlist = new TList;
327 TList *oldlist = (TList *) lists[0];
328 TIter nxo(oldlist);
329 TObject *o = 0;
330 while ((o = nxo())) { firstlist->Add(o); }
331 oldlist->SetOwner(kFALSE);
332 lists.erase(lists.begin());
333 lists.insert(lists.begin(), firstlist);
334 delete oldlist;
335}
336
337//////////////////////////////////////////////////////////////////////////
338/// Reset TTreeProcessorMP's state.
339void TTreeProcessorMP::Reset()
340{
341 fNProcessed = 0;
342 fNToProcess = 0;
343 fTaskType = ETask::kNoTask;
344}
345
346//////////////////////////////////////////////////////////////////////////
347/// Reply to a worker who is idle.
348/// If still events to process, tell the worker. Otherwise
349/// ask for a result
350void TTreeProcessorMP::ReplyToIdle(TSocket *s)
351{
352 if (fNProcessed < fNToProcess) {
353 //we are executing a "greedy worker" task
354 if (fTaskType == ETask::kProcByRange)
355 MPSend(s, MPCode::kProcRange, fNProcessed);
356 else if (fTaskType == ETask::kProcByFile)
357 MPSend(s, MPCode::kProcFile, fNProcessed);
358 ++fNProcessed;
359 } else
361}
362
363} // namespace ROOT
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define f(i)
Definition: RSha256.hxx:104
int Int_t
Definition: RtypesCore.h:45
unsigned int UInt_t
Definition: RtypesCore.h:46
const Bool_t kFALSE
Definition: RtypesCore.h:101
unsigned long long ULong64_t
Definition: RtypesCore.h:81
R__EXTERN TEnv * gEnv
Definition: TEnv.h:170
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition: TError.cxx:231
Merge collection of TObjects.
Definition: PoolUtils.h:35
TTreeProcessorMP(unsigned nWorkers=0)
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
TObjArray * GetListOfFiles() const
Definition: TChain.h:107
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
Bool_t IsValid() const
Definition: TEntryList.h:83
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
THashList * GetList()
Long64_t GetNFiles() const
Class describing a generic file including meta information.
Definition: TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:34
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
Templated derivation of TMPWorkerTree handlign selector tree processing.
An array of TObjects.
Definition: TObjArray.h:37
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:523
Mother of all ROOT objects.
Definition: TObject.h:37
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:31
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
Definition: TSelector.cxx:270
virtual TList * GetOutputList() const
Definition: TSelector.h:69
virtual void Begin(TTree *)
Definition: TSelector.h:54
virtual void Terminate()
Definition: TSelector.h:71
A TTree represents a columnar dataset.
Definition: TTree.h:79
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition: MPCode.h:38
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition: MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition: MPCode.h:40
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static constexpr double s
Definition: tree.py:1