Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMP.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "TEnv.h"
15#include "TMPWorkerTree.h"
16
17//////////////////////////////////////////////////////////////////////////
18///
19/// \class ROOT::TTreeProcessorMP
20/// \ingroup Parallelism
21/// \brief This class provides an interface to process a TTree dataset
22/// in parallel with multi-process technology
23///
24/// ###ROOT::TTreeProcessorMP::Process
25/// The possible usages of the Process method are the following:\n
26/// * `Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess)`:
27/// func is executed nToProcess times with argument a TTreeReader&, initialized for
28/// the TTree with name treeName, from the dataset `<dataset>`. The dataset can be
29/// expressed as:
30/// \code{.cpp}
31/// const std::string& fileName -> single file name
32/// const std::vector<std::string>& fileNames -> vector of file names
33/// TFileCollection& files -> collection of TFileInfo objects
34/// TChain& files -> TChain with the file paths
35/// TTree& tree -> Reference to an existing TTree object
36/// \endcode
37/// For legacy, the following signature is also supported:
38/// * `Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)`:
39/// where selector is a TSelector derived class describing the analysis and the other arguments
40/// have the same meaning as above.
41///
42/// For either set of signatures, the processing function is executed as many times as
43/// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
44/// or set via SetNWorkers. It defaults to the number of cores.\n
45/// A collection containing the result of each execution is returned.\n
46/// **Note:** the user is responsible for the deletion of any object that might
47/// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
48/// deletes what it returns, it simply forgets it.\n
49/// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
50/// executed takes more than a few seconds, otherwise the overhead introduced
51/// by Process will outrun the benefits of parallel execution on most machines.
52///
53/// \param func
54/// \parblock
55/// a lambda expression, an std::function, a loaded macro, a
56/// functor class or a function that takes zero arguments (for the first signature)
57/// or one (for the second signature).
58/// \endparblock
59/// \param args
60/// \parblock
61/// a standard container (vector, list, deque), an initializer list
62/// or a pointer to a TCollection (TList*, TObjArray*, ...).
63/// \endparblock
64/// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
65/// in the overhead of copying data from the TCollection to an STL container. Only
66/// use it when absolutely necessary.\n
67/// **Note:** in cases where the function to be executed takes more than
68/// zero/one argument but all are fixed except zero/one, the function can be wrapped
69/// in a lambda or via std::bind to give it the right signature.\n
70/// **Note:** the user should take care of initializing random seeds differently in each
71/// process (e.g. using the process id in the seed). Otherwise several parallel executions
72/// might generate the same sequence of pseudo-random numbers.
73///
74/// #### Return value:
75/// Methods taking 'F func' return the return type of F.
76/// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
77/// content is owned by the caller.
78///
79/// #### Examples:
80///
81/// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
82///
83//////////////////////////////////////////////////////////////////////////
84
85namespace ROOT {
86//////////////////////////////////////////////////////////////////////////
87/// Class constructor.
88/// nWorkers is the number of times this ROOT session will be forked, i.e.
89/// the number of workers that will be spawned.
91{
92 Reset();
93}
94
95//////////////////////////////////////////////////////////////////////////
96/// TSelector-based tree processing: memory resident tree
98 ULong64_t jFirst)
99{
100
101 // Warn for yet unimplemented functionality
102 if (jFirst > 0) {
103 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
104 jFirst = 0;
105 }
106
107 //prepare environment
108 Reset();
109 UInt_t nWorkers = GetNWorkers();
110 selector.Begin(nullptr);
111
112 // Check the entry list
113 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
114 //fork
115 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
116 bool ok = Fork(worker);
117 if(!ok) {
118 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
119 return nullptr;
120 }
121
122 //divide entries equally between workers
124
125 //tell workers to start processing entries
126 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
127 std::vector<UInt_t> args(nWorkers);
128 std::iota(args.begin(), args.end(), 0);
130 if (fNProcessed < nWorkers)
131 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
132 " Some entries might not be processed.");
133
134 //collect results, distribute new tasks
135 std::vector<TObject*> outLists;
136 Collect(outLists);
137
138 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
139 FixLists(outLists);
140
142 auto outList = static_cast<TList*>(redfunc(outLists));
143
144 // Import the resulting list in the selector
145 selector.ImportOutput(outList);
146 // outList is empty after this: just delete it
147 delete outList;
148
149 // Finalize the selector tasks
150 selector.Terminate();
151
152 //clean-up and return
153 ReapWorkers();
155 return selector.GetOutputList();
156}
157
158//////////////////////////////////////////////////////////////////////////
159/// TSelector-based tree processing: dataset as a vector of files
160TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
161 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
162{
163
164 // Warn for yet unimplemented functionality
165 if (jFirst > 0) {
166 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
167 jFirst = 0;
168 }
169
170 //prepare environment
171 Reset();
172 UInt_t nWorkers = GetNWorkers();
173 selector.Begin(nullptr);
174
175 // Check the entry list
176 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
177 //fork
178 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
179 bool ok = Fork(worker);
180 if (!ok) {
181 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
182 return nullptr;
183 }
184
185 Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
186
187 if (procByFile) {
188 if (fileNames.size() < nWorkers) {
189 // TTree entry granularity: for each file, we divide entries equally between workers
191 // Tell workers to start processing entries
192 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
193 std::vector<UInt_t> args(nWorkers);
194 std::iota(args.begin(), args.end(), 0);
196 if (fNProcessed < nWorkers)
197 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
198 " Some entries might not be processed");
199 } else {
200 // File granularity: each worker processes one whole file as a single task
202 fNToProcess = fileNames.size();
203 std::vector<UInt_t> args(nWorkers);
204 std::iota(args.begin(), args.end(), 0);
206 if (fNProcessed < nWorkers)
207 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
208 " Some entries might not be processed.");
209 }
210 } else {
211 // TTree entry granularity: for each file, we divide entries equally between workers
213 // Tell workers to start processing entries
214 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
215 std::vector<UInt_t> args(nWorkers);
216 std::iota(args.begin(), args.end(), 0);
218 if (fNProcessed < nWorkers)
219 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
220 " Some entries might not be processed.");
221 }
222
223 // collect results, distribute new tasks
224 std::vector<TObject*> outLists;
225 Collect(outLists);
226
227 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
228 FixLists(outLists);
229
231 auto outList = static_cast<TList*>(redfunc(outLists));
232
233 // Import the resulting list in the selector
234 selector.ImportOutput(outList);
235 // outList is empty after this: just delete it
236 delete outList;
237
238 // Finalize the selector tasks
239 selector.Terminate();
240
241 //clean-up and return
242 ReapWorkers();
244
245 return selector.GetOutputList();
246}
247
248//////////////////////////////////////////////////////////////////////////
249/// TSelector-based tree processing: dataset as a TFileCollection
251 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
252{
253 std::vector<std::string> fileNames(files.GetNFiles());
254 UInt_t count = 0;
255 for(auto f : *static_cast<THashList*>(files.GetList()))
256 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
257
258 TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
259 return rl;
260}
261
262//////////////////////////////////////////////////////////////////////////
263/// TSelector-based tree processing: dataset as a TChain
264TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
265 ULong64_t nToProcess, ULong64_t firstEntry)
266{
267 TObjArray* filelist = files.GetListOfFiles();
268 std::vector<std::string> fileNames(filelist->GetEntries());
269 UInt_t count = 0;
270 for(auto f : *filelist)
271 fileNames[count++] = f->GetTitle();
272
273 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
274}
275
276//////////////////////////////////////////////////////////////////////////
277/// TSelector-based tree processing: dataset as a single file
278TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
279 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
280{
281 std::vector<std::string> singleFileName(1, fileName);
282 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
283}
284
285///
286/// No TEntryList versions of selector processor
287///
288
289TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
290 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
291{
292 TEntryList noelist;
293 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
294}
295
296TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
297 ULong64_t nToProcess, ULong64_t jFirst)
298{
299 TEntryList noelist;
300 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
301}
302
303TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, const std::string &treeName,
304 ULong64_t nToProcess, ULong64_t jFirst)
305{
306 TEntryList noelist;
307 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
308}
309
310TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, const std::string &treeName, ULong64_t nToProcess,
311 ULong64_t jFirst)
312{
313 TEntryList noelist;
314 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
315}
316
318{
319 TEntryList noelist;
320 return Process(tree, selector, noelist, nToProcess, jFirst);
321}
322
323/// Fix list of lists before merging (to avoid errors about duplicated objects)
324void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
325
326 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
327 TList *firstlist = new TList;
328 TList *oldlist = (TList *) lists[0];
329 TIter nxo(oldlist);
330 TObject *o = 0;
331 while ((o = nxo())) { firstlist->Add(o); }
332 oldlist->SetOwner(kFALSE);
333 lists.erase(lists.begin());
334 lists.insert(lists.begin(), firstlist);
335 delete oldlist;
336}
337
338//////////////////////////////////////////////////////////////////////////
339/// Reset TTreeProcessorMP's state.
341{
342 fNProcessed = 0;
343 fNToProcess = 0;
345}
346
347//////////////////////////////////////////////////////////////////////////
348/// Reply to a worker who is idle.
349/// If still events to process, tell the worker. Otherwise
350/// ask for a result
352{
353 if (fNProcessed < fNToProcess) {
354 //we are executing a "greedy worker" task
357 else if (fTaskType == ETask::kProcByFile)
359 ++fNProcessed;
360 } else
362}
363
364} // namespace ROOT
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define f(i)
Definition RSha256.hxx:104
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
unsigned long long ULong64_t
Definition RtypesCore.h:81
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Merge collection of TObjects.
Definition PoolUtils.h:35
unsigned fNProcessed
number of arguments already passed to the workers
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TObjArray * GetListOfFiles() const
Definition TChain.h:111
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
Bool_t IsValid() const
Definition TEntryList.h:84
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
THashList * GetList()
Long64_t GetNFiles() const
Class describing a generic file including meta information.
Definition TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
A doubly linked list.
Definition TList.h:38
void Add(TObject *obj) override
Definition TList.h:81
Base class for multiprocess applications' clients.
Definition TMPClient.h:23
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign selector tree processing.
An array of TObjects.
Definition TObjArray.h:31
Int_t GetEntries() const override
Return the number of objects in array (i.e.
Mother of all ROOT objects.
Definition TObject.h:41
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
virtual TList * GetOutputList() const
Definition TSelector.h:69
virtual void Begin(TTree *)
Definition TSelector.h:54
virtual void Terminate()
Definition TSelector.h:71
A TTree represents a columnar dataset.
Definition TTree.h:79
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Definition tree.py:1