Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMP.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
19#include "ROOT/TypeTraits.hxx" // InvokeResult_t
20#include "TChain.h"
21#include "TChainElement.h"
22#include "TError.h"
23#include "TFileCollection.h"
24#include "TFileInfo.h"
25#include "THashList.h"
26#include "TMPClient.h"
27#include "TMPWorkerTree.h"
28#include "TSelector.h"
29#include "TTreeReader.h"
30#include <algorithm> //std::generate
31#include <numeric> //std::iota
32#include <string>
33#include <functional> //std::reference_wrapper
34#include <vector>
35
36namespace ROOT {
37
38class TTreeProcessorMP : private TMPClient {
39 template <typename F, typename... Args>
40 using InvokeResult_t = ROOT::TypeTraits::InvokeResult_t<F, Args...>;
41
42public:
43 explicit TTreeProcessorMP(UInt_t nWorkers = 0); //default number of workers is the number of processors
44 ~TTreeProcessorMP() = default;
45 //it doesn't make sense for a TTreeProcessorMP to be copied
48
49 /// \brief Process a TTree dataset with a functor
50 /// \tparam F functor returning a pointer to TObject or inheriting classes and
51 /// taking a TTreeReader& (both enforced at compile-time)
52 ///
53 /// Dataset definition:
54 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
55 /// \param[in] fileName string with the path of the files with the TTree to process
56 /// \param[in] collection TFileCollection with the files with the TTree to process
57 /// \param[in] chain TChain with the files with the TTree to process
58 /// \param[in] tree TTree to process
59 ///
60 /// \param[in] entries TEntryList to filter the dataset
61 /// \param[in] treeName Name of the TTree to process
62 /// \param[in] nToProcess Number of entries to process (0 means all)
63 /// \param[in] jFirst First entry to process (0 means the first of the first file)
64 ///
65 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
66 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
68 template<class F> auto Process(const std::string& fileName, F procFunc, TEntryList &entries,
69 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
71 template<class F> auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
72 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
74 template<class F> auto Process(TChain& chain, F procFunc, TEntryList &entries,
75 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
77 template<class F> auto Process(TTree& tree, F procFunc, TEntryList &entries,
78 ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
80
81 /// \brief Process a TTree dataset with a functor: version without entry list
82 /// \tparam F functor returning a pointer to TObject or inheriting classes and
83 /// taking a TTreeReader& (both enforced at compile-time)
84 ///
85 /// Dataset definition:
86 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
87 /// \param[in] fileName string with the path of the files with the TTree to process
88 /// \param[in] collection TFileCollection with the files with the TTree to process
89 /// \param[in] chain TChain with the files with the TTree to process
90 /// \param[in] tree TTree to process
91 ///
92 /// \param[in] treeName Name of the TTree to process
93 /// \param[in] nToProcess Number of entries to process (0 means all)
94 /// \param[in] jFirst First entry to process (0 means the first of the first file)
95 ///
96 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc,
97 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
99 template<class F> auto Process(const std::string& fileName, F procFunc,
100 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
102 template<class F> auto Process(TFileCollection& files, F procFunc,
103 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
105 template<class F> auto Process(TChain& files, F procFunc,
106 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
108 template<class F> auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
110
111
112 /// \brief Process a TTree dataset with a selector
113 ///
114 /// Dataset definition:
115 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
116 /// \param[in] fileName string with the path of the files with the TTree to process
117 /// \param[in] collection TFileCollection with the files with the TTree to process
118 /// \param[in] chain TChain with the files with the TTree to process
119 /// \param[in] tree TTree to process
120 ///
121 /// \param[in] selector Instance of TSelector to be applied to the dataset
122 /// \param[in] entries TEntryList to filter the dataset
123 /// \param[in] treeName Name of the TTree to process
124 /// \param[in] nToProcess Number of entries to process (0 means all)
125 /// \param[in] jFirst First entry to process (0 means the first of the first file)
126 ///
127 // these versions require a TSelector
128 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
129 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
130 TList* Process(const std::string &fileName, TSelector& selector, TEntryList &entries,
131 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
132 TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
133 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
134 TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
135 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
136 TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
137 ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
138
139
140 /// \brief Process a TTree dataset with a selector: version without entry list
141 ///
142 /// Dataset definition:
143 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
144 /// \param[in] fileName string with the path of the files with the TTree to process
145 /// \param[in] collection TFileCollection with the files with the TTree to process
146 /// \param[in] chain TChain with the files with the TTree to process
147 /// \param[in] tree TTree to process
148 ///
149 /// \param[in] selector Instance of TSelector to be applied to the dataset
150 /// \param[in] treeName Name of the TTree to process
151 /// \param[in] nToProcess Number of entries to process (0 means all)
152 /// \param[in] jFirst First entry to process (0 means the first of the first file)
153 ///
154 // these versions require a TSelector
155 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector,
156 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
157 TList* Process(const std::string &fileName, TSelector& selector,
158 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
159 TList* Process(TFileCollection& files, TSelector& selector,
160 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
161 TList* Process(TChain& files, TSelector& selector,
162 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
163 TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
164
166 unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
167
168private:
169 template<class T> void Collect(std::vector<T> &reslist);
170 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
171
172 void FixLists(std::vector<TObject*> &lists);
173 void Reset();
174 void ReplyToIdle(TSocket *s);
175
176 unsigned fNProcessed; ///< number of arguments already passed to the workers
177 unsigned fNToProcess; ///< total number of arguments to pass to the workers
178
179 /// A collection of the types of tasks that TTreeProcessorMP can execute.
180 /// It is used to interpret in the right way and properly reply to the
181 /// messages received (see, for example, TTreeProcessorMP::HandleInput)
182 enum class ETask : unsigned char {
183 kNoTask, ///< no task is being executed
184 kProcByRange, ///< a Process method is being executed and each worker will process a certain range of each file
185 kProcByFile ///< a Process method is being executed and each worker will process a different file
186 };
187
188 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
189};
190
191template<class F>
192auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
193 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
195{
197 static_assert(std::is_constructible<TObject*, retType>::value,
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
200
201 // Warn for yet unimplemented functionality
202 if (jFirst > 0) {
203 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
204 jFirst = 0;
205 }
206
207 //prepare environment
208 Reset();
209 unsigned nWorkers = GetNWorkers();
210
211 // Check th entry list
212 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
213 //fork
214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
215 bool ok = Fork(worker);
216 if(!ok) {
217 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
218 return nullptr;
219 }
220
221
222 if(fileNames.size() < nWorkers) {
223 //TTree entry granularity. For each file, we divide entries equally between workers
224 fTaskType = ETask::kProcByRange;
225 //Tell workers to start processing entries
226 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
227 std::vector<unsigned> args(nWorkers);
228 std::iota(args.begin(), args.end(), 0);
229 fNProcessed = Broadcast(MPCode::kProcRange, args);
230 if(fNProcessed < nWorkers)
231 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
232 } else {
233 //file granularity. each worker processes one whole file as a single task
234 fTaskType = ETask::kProcByFile;
235 fNToProcess = fileNames.size();
236 std::vector<unsigned> args(nWorkers);
237 std::iota(args.begin(), args.end(), 0);
238 fNProcessed = Broadcast(MPCode::kProcFile, args);
239 if(fNProcessed < nWorkers)
240 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 }
242
243 //collect results, distribute new tasks
244 std::vector<TObject*> reslist;
245 Collect(reslist);
246
247 //merge
249 auto res = redfunc(reslist);
250
251 //clean-up and return
252 ReapWorkers();
253 fTaskType = ETask::kNoTask;
254 return static_cast<retType>(res);
255}
256
257
258template<class F>
259auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc, TEntryList &entries,
260 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
262{
263 std::vector<std::string> singleFileName(1, fileName);
264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
265}
266
267
268template<class F>
270 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
272{
273 std::vector<std::string> fileNames(files.GetNFiles());
274 unsigned count = 0;
275 for(auto f : *static_cast<THashList*>(files.GetList()))
276 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
277
278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
279}
280
281
282template<class F>
283auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
284 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
286{
287 TObjArray* filelist = files.GetListOfFiles();
288 std::vector<std::string> fileNames(filelist->GetEntries());
289 unsigned count = 0;
290 for(auto f : *filelist)
291 fileNames[count++] = f->GetTitle();
292
293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
294}
295
296
297template<class F>
298auto TTreeProcessorMP::Process(TTree& tree, F procFunc, TEntryList &entries,
299 ULong64_t nToProcess, ULong64_t jFirst)
301{
303 static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304
305 // Warn for yet unimplemented functionality
306 if (jFirst > 0) {
307 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
308 jFirst = 0;
309 }
310
311 //prepare environment
312 Reset();
313 unsigned nWorkers = GetNWorkers();
314
315 // Check th entry list
316 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
317 //fork
318 TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
319 bool ok = Fork(worker);
320 if(!ok) {
321 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
322 return nullptr;
323 }
324
325 //divide entries equally between workers
326 fTaskType = ETask::kProcByRange;
327
328 //tell workers to start processing entries
329 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
330 std::vector<unsigned> args(nWorkers);
331 std::iota(args.begin(), args.end(), 0);
332 fNProcessed = Broadcast(MPCode::kProcTree, args);
333 if(fNProcessed < nWorkers)
334 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
335
336 //collect results, distribute new tasks
337 std::vector<TObject*> reslist;
338 Collect(reslist);
339
340 //merge
342 auto res = redfunc(reslist);
343
344 //clean-up and return
345 ReapWorkers();
346 fTaskType = ETask::kNoTask;
347 return static_cast<retType>(res);
348}
349
350
351///
352/// No TEntryList versions of generic processor
353///
354
355template<class F>
356auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,
357 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
359{
360 TEntryList noelist;
361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
362}
363
364
365template<class F>
366auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,
367 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
369{
370 TEntryList noelist;
371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
372}
373
374
375template<class F>
377 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
379{
380 TEntryList noelist;
381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
382}
383
384
385template<class F>
386auto TTreeProcessorMP::Process(TChain& files, F procFunc,
387 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
389{
390 TEntryList noelist;
391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
392}
393
394
395template<class F>
396auto TTreeProcessorMP::Process(TTree& tree, F procFunc,
397 ULong64_t nToProcess, ULong64_t jFirst)
399{
400 TEntryList noelist;
401 return Process(tree, procFunc, noelist, nToProcess, jFirst);
402}
403
404//////////////////////////////////////////////////////////////////////////
405/// Handle message and reply to the worker
406template<class T>
407void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
408{
409 unsigned code = msg.first;
410 if (code == MPCode::kIdling) {
411 ReplyToIdle(s);
412 } else if(code == MPCode::kProcResult) {
413 if(msg.second != nullptr)
414 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
416 } else if(code == MPCode::kProcError) {
417 const char *str = ReadBuffer<const char*>(msg.second.get());
418 Error("TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
420 ReplyToIdle(s);
421 delete [] str;
422 } else {
423 // UNKNOWN CODE
424 Error("TTreeProcessorMP::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
425 }
426}
427
428//////////////////////////////////////////////////////////////////////////
429/// Listen for messages sent by the workers and call the appropriate handler function.
430/// TTreeProcessorMP::HandlePoolCode is called on messages with a code < 1000 and
431/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
432template<class T>
433void TTreeProcessorMP::Collect(std::vector<T> &reslist)
434{
435 TMonitor &mon = GetMonitor();
436 mon.ActivateAll();
437 while (mon.GetActive() > 0) {
438 TSocket *s = mon.Select();
439 MPCodeBufPair msg = MPRecv(s);
440 if (msg.first == MPCode::kRecvError) {
441 Error("TTreeProcessorMP::Collect", "[E][C] Lost connection to a worker");
442 Remove(s);
443 } else if (msg.first < 1000)
444 HandlePoolCode(msg, s, reslist);
445 else
446 HandleMPCode(msg, s);
447 }
448}
449
450} // ROOT namespace
451
452#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define f(i)
Definition RSha256.hxx:104
unsigned long long ULong64_t
Definition RtypesCore.h:81
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Merge collection of TObjects.
Definition PoolUtils.h:35
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
A doubly linked list.
Definition TList.h:38
Base class for multiprocess applications' clients.
Definition TMPClient.h:23
unsigned GetNWorkers() const
Definition TMPClient.h:40
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition TMPClient.h:39
TMonitor & GetMonitor()
Definition TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
Definition TMonitor.cxx:268
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
An array of TObjects.
Definition TObjArray.h:31
Int_t GetEntries() const override
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
A TTree represents a columnar dataset.
Definition TTree.h:79
const Int_t n
Definition legend1.C:16
#define F(x, y, z)
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kRecvError
Error while reading from the socket.
Definition MPCode.h:51
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition MPCode.h:49
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...