Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorkerTree.h"
17#include "TEnv.h"
18#include <string>
19
20//////////////////////////////////////////////////////////////////////////
21///
22/// \class TMPWorkerTree
23///
24/// This class works in conjuction with TTreeProcessorMP, reacting to messages
25/// received from it as specified by the Notify and HandleInput methods.
26///
27/// \class TMPWorkerTreeFunc
28///
29/// Templated derivation of TMPWorkerTree handlign generic function tree processing.
30///
31/// \class TMPWorkerTreeSel
32///
33/// Templated derivation of TMPWorkerTree handlign selector tree processing.
34///
35//////////////////////////////////////////////////////////////////////////
36
37//////////////////////////////////////////////////////////////////////////
38/// Class constructors.
39/// Note that this does not set variables like fPid or fS (worker's socket).\n
40/// These operations are handled by the Init method, which is called after
41/// forking.\n
42/// This separation is in place because the instantiation of a worker
43/// must be done once _before_ forking, while the initialization of the
44/// members must be done _after_ forking by each of the children processes.
46 : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
47 fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
48{
49 Setup();
50}
51
52TMPWorkerTree::TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries,
53 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
54 : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
55 fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE),
56 fCacheSize(-1)
57{
58 Setup();
59}
60
62 ULong64_t firstEntry)
63 : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
64 fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
65{
66 Setup();
67}
68
70{
71 // Properly close the open file, if any
72 CloseFile();
73}
74
75//////////////////////////////////////////////////////////////////////////
76/// Auxilliary method for common initializations
78{
79 Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 1);
80 if (uc != 1) fUseTreeCache = kFALSE;
81 fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
82}
83
84//////////////////////////////////////////////////////////////////////////
85/// Handle file closing.
86
88{
89 // Avoid destroying the cache; must be placed before deleting the trees
90 if (fFile) {
91 if (fTree) fFile->SetCacheRead(0, fTree);
92 delete fFile ;
93 fFile = 0;
94 }
95}
96
97//////////////////////////////////////////////////////////////////////////
98/// Handle file opening.
99
100TFile *TMPWorkerTree::OpenFile(const std::string& fileName)
101{
102
103 TFile *fp = TFile::Open(fileName.c_str());
104 if (fp == nullptr || fp->IsZombie()) {
105 std::stringstream ss;
106 ss << "could not open file " << fileName;
107 std::string errmsg = ss.str();
109 return nullptr;
110 }
111
112 return fp;
113}
114
115//////////////////////////////////////////////////////////////////////////
116/// Retrieve a tree from an open file.
117
119{
120 //retrieve the TTree with the specified name from file
121 //we are not the owner of the TTree object, the file is!
122 TTree *tree = nullptr;
123 if(fTreeName == "") {
124 // retrieve the first TTree
125 // (re-adapted from TEventIter.cxx)
126 if (fp->GetListOfKeys()) {
127 for(auto k : *fp->GetListOfKeys()) {
128 TKey *key = static_cast<TKey*>(k);
129 if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
130 tree = static_cast<TTree*>(fp->Get(key->GetName()));
131 }
132 }
133 } else {
134 tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
135 }
136 if (tree == nullptr) {
137 std::stringstream ss;
138 ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
139 std::string errmsg = ss.str();
141 return nullptr;
142 }
143
144 return tree;
145}
146
147//////////////////////////////////////////////////////////////////////////
148/// Tree cache handling
149
151{
152 if (fUseTreeCache) {
153 TFile *curfile = tree->GetCurrentFile();
154 if (curfile) {
155 if (!fTreeCache) {
156 tree->SetCacheSize(fCacheSize);
157 fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
158 if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
159 } else {
162 curfile->SetCacheRead(fTreeCache, tree);
163 }
164 if (fTreeCache) {
166 }
167 } else {
168 Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
169 }
170 } else {
171 // Disable the cache
172 tree->SetCacheSize(0);
173 }
174}
175
176//////////////////////////////////////////////////////////////////////////
177/// Init overload definign max entries
178
180{
181
182 TMPWorker::Init(fd, workerN);
184}
185
186//////////////////////////////////////////////////////////////////////////
187/// Max entries evaluation
188
190{
191 // E.g.: when dividing 10 entries between 3 workers, the first
192 // two will process 10/3 == 3 entries, the last one will process
193 // 10 - 2*(10/3) == 4 entries.
194 if(GetNWorker() < fNWorkers-1)
195 return maxEntries/fNWorkers;
196 else
197 return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
198}
199
200//////////////////////////////////////////////////////////////////////////
201/// Generic input handling
202
204{
205 UInt_t code = msg.first;
206
207 if (code == MPCode::kProcRange
208 || code == MPCode::kProcFile
209 || code == MPCode::kProcTree) {
210 //execute fProcFunc on a file or a range of entries in a file
211 Process(code, msg);
212 } else if (code == MPCode::kSendResult) {
213 //send back result
214 SendResult();
215 } else {
216 //unknown code received
217 std::string reply = "S" + std::to_string(GetNWorker());
218 reply += ": unknown code received: " + std::to_string(code);
219 MPSend(GetSocket(), MPCode::kError, reply.c_str());
220 }
221}
222
223
224
225//////////////////////////////////////////////////////////////////////////
226/// Selector processing SendResult and Process overload
227
229{
230 //send back result
233}
234
235/// Selector specialization
237{
238 //evaluate the index of the file to process in fFileNames
239 //(we actually don't need the parameter if code == kProcTree)
240
241 Long64_t start = 0;
242 Long64_t finish = 0;
243 TEntryList *enl = 0;
244 std::string errmsg;
245 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
246 SendError(errmsg);
247 return;
248 }
249
250 if (fCallBegin) {
251 fSelector.SlaveBegin(nullptr);
252 fCallBegin = false;
253 }
254
257 for (Long64_t entry = start; entry < finish; ++entry) {
258 Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
260 }
261
262 // update the number of processed entries
263 fProcessedEntries += finish - start;
264
266
267 return;
268}
269
270/// Load the requierd tree and evaluate the processing range
271
273 std::string &errmsg)
274{
275 // evaluate the index of the file to process in fFileNames
276 //(we actually don't need the parameter if code == kProcTree)
277
278 start = 0;
279 finish = 0;
280 errmsg = "";
281
282 UInt_t fileN = 0;
283 UInt_t nProcessed = 0;
284 Bool_t setupcache = true;
285
286 std::string mgroot = "[S" + std::to_string(GetNWorker()) + "]: ";
287
288 TTree *tree = 0;
289 if (code == MPCode::kProcTree) {
290
291 mgroot += "MPCode::kProcTree: ";
292
293 // The tree must be defined at this level
294 if(fTree == nullptr) {
295 errmsg = mgroot + std::string("tree undefined!");
296 return -1;
297 }
298
299 //retrieve the total number of entries ranges processed so far by TPool
300 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
301
302 //create entries range
303 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
304 //and this worker must take the rangeN-th range
305 Long64_t nEntries = fTree->GetEntries();
306 UInt_t nBunch = nEntries / fNWorkers;
307 UInt_t rangeN = nProcessed % fNWorkers;
308 start = rangeN * nBunch;
309 if (rangeN < (fNWorkers - 1)) {
310 finish = (rangeN+1)*nBunch;
311 } else {
312 finish = nEntries;
313 }
314
315 //process tree
316 tree = fTree;
317 CloseFile(); // May not be needed
318 if (fTree->GetCurrentFile()) {
319 // We need to reopen the file locally (TODO: to understand and fix this)
321 if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
322 errmsg = mgroot + std::string("unable to retrieve tree from open file ") +
323 std::string(fTree->GetCurrentFile()->GetName());
324 delete fFile;
325 return -1;
326 }
327 fTree = tree;
328 } else {
329 //errors are handled inside OpenFile
330 errmsg = mgroot + std::string("unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
331 if (fFile && fFile->IsZombie()) delete fFile;
332 return -1;
333 }
334 }
335
336 } else {
337
338 if (code == MPCode::kProcRange) {
339 mgroot += "MPCode::kProcRange: ";
340 //retrieve the total number of entries ranges processed so far by TPool
341 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
342 //evaluate the file and the entries range to process
343 fileN = nProcessed / fNWorkers;
344 } else if (code == MPCode::kProcFile) {
345 mgroot += "MPCode::kProcFile: ";
346 //evaluate the file and the entries range to process
347 fileN = ReadBuffer<UInt_t>(msg.second.get());
348 } else {
349 errmsg += "MPCode undefined!";
350 return -1;
351 }
352
353 // Open the file if required
354 if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
355 if (!fFile) {
356 fFile = OpenFile(fFileNames[fileN]);
357 if (fFile == nullptr) {
358 // errors are handled inside OpenFile
359 errmsg = mgroot + std::string("unable to open file ") + fFileNames[fileN];
360 return -1;
361 }
362 }
363
364 //retrieve the TTree with the specified name from file
365 //we are not the owner of the TTree object, the file is!
367 if (tree == nullptr) {
368 //errors are handled inside RetrieveTree
369 errmsg = mgroot + std::string("unable to retrieve tree from open file ") + fFileNames[fileN];
370 return -1;
371 }
372
373 // Prepare to setup the cache, if required
374 setupcache = (tree != fTree) ? true : false;
375
376 // Store as reference
377 fTree = tree;
378
379 //create entries range
380 if (code == MPCode::kProcRange) {
381 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
382 //and this worker must take the rangeN-th range
383 Long64_t nEntries = tree->GetEntries();
384 UInt_t nBunch = nEntries / fNWorkers;
385 if(nEntries % fNWorkers) nBunch++;
386 UInt_t rangeN = nProcessed % fNWorkers;
387 start = rangeN * nBunch;
388 if(rangeN < (fNWorkers-1))
389 finish = (rangeN+1)*nBunch;
390 else
391 finish = nEntries;
392 } else {
393 start = 0;
394 finish = tree->GetEntries();
395 }
396 }
397
398 // Setup the cache, if required
399 if (setupcache) SetupTreeCache(fTree);
400
401 // Get the entrylist, if required
402 if (fEntryList && enl) {
403 if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
404 // create entries range
405 if (code == MPCode::kProcRange) {
406 // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
407 // and this worker must take the rangeN-th range
408 ULong64_t nEntries = (*enl)->GetN();
409 UInt_t nBunch = nEntries / fNWorkers;
410 if (nEntries % fNWorkers) nBunch++;
411 UInt_t rangeN = nProcessed % fNWorkers;
412 start = rangeN * nBunch;
413 if (rangeN < (fNWorkers - 1))
414 finish = (rangeN + 1) * nBunch;
415 else
416 finish = nEntries;
417 } else {
418 start = 0;
419 finish = (*enl)->GetN();
420 }
421 } else {
422 Warning("LoadTree", "failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
423 }
424 }
425
426 //check if we are going to reach the max of entries
427 //change finish accordingly
428 if (fMaxNEntries)
429 if (fProcessedEntries + finish - start > fMaxNEntries)
430 finish = start + fMaxNEntries - fProcessedEntries;
431
432 if (gDebug > 0 && fFile)
433 Info("LoadTree", "%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,
434 finish);
435
436 return 0;
437}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define e(i)
Definition RSha256.hxx:103
const Bool_t kFALSE
Definition RtypesCore.h:92
long long Long64_t
Definition RtypesCore.h:73
unsigned long long ULong64_t
Definition RtypesCore.h:74
const Bool_t kTRUE
Definition RtypesCore.h:91
R__EXTERN TEnv * gEnv
Definition TEnv.h:171
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:220
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:231
Int_t gDebug
Definition TROOT.cxx:590
TList * GetListOfKeys() const override
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TEntryList * GetEntryList(const char *treename, const char *filename, Option_t *opt="")
Return the entry list, corresponding to treename and filename By default, the filename is first tried...
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3997
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=0, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
Definition TFile.cxx:2281
TFileCacheRead * GetCacheRead(const TObject *tree=nullptr) const
Return a pointer to the current read cache.
Definition TFile.cxx:1182
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual const char * GetClassName() const
Definition TKey.h:76
void SendResult()
Selector processing SendResult and Process overload.
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
void Setup()
Auxilliary method for common initializations.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
Bool_t fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
virtual void Process(UInt_t, MPCodeBufPair &)
virtual ~TMPWorkerTree()
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:26
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
unsigned GetNWorker() const
Definition TMPWorker.h:42
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition TMPWorker.h:47
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition TMPWorker.h:48
TSocket * GetSocket()
Definition TMPWorker.h:40
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition TMPWorker.cxx:52
unsigned fNWorkers
the number of workers spawned
Definition TMPWorker.h:46
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition TObject.h:149
virtual void Init(TTree *)
Definition TSelector.h:53
virtual Bool_t Process(Long64_t)
virtual void SlaveBegin(TTree *)
Definition TSelector.h:55
virtual TList * GetOutputList() const
Definition TSelector.h:69
virtual Bool_t Notify()
This method must be overridden to handle object notification.
Definition TSelector.h:56
virtual void SlaveTerminate()
Definition TSelector.h:70
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
virtual Bool_t IsLearning() const
Definition TTreeCache.h:152
virtual void ResetCache()
This will simply clear the cache.
A TTree represents a columnar dataset.
Definition TTree.h:79
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition TTree.cxx:5459
virtual Long64_t GetEntries() const
Definition TTree.h:460
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetFile() const
Definition TUrl.h:69
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kError
Error message.
Definition MPCode.h:47
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
Definition tree.py:1