Logo ROOT   6.18/05
Reference Guide
TMPWorkerTree.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorkerTree.h"
17#include "TSystem.h"
18#include "TEnv.h"
19#include <string>
20
21//////////////////////////////////////////////////////////////////////////
22///
23/// \class TMPWorkerTree
24///
25/// This class works in conjuction with TTreeProcessorMP, reacting to messages
26/// received from it as specified by the Notify and HandleInput methods.
27///
28/// \class TMPWorkerTreeFunc
29///
30/// Templated derivation of TMPWorkerTree handlign generic function tree processing.
31///
32/// \class TMPWorkerTreeSel
33///
34/// Templated derivation of TMPWorkerTree handlign selector tree processing.
35///
36//////////////////////////////////////////////////////////////////////////
37
38//////////////////////////////////////////////////////////////////////////
39/// Class constructors.
40/// Note that this does not set variables like fPid or fS (worker's socket).\n
41/// These operations are handled by the Init method, which is called after
42/// forking.\n
43/// This separation is in place because the instantiation of a worker
44/// must be done once _before_ forking, while the initialization of the
45/// members must be done _after_ forking by each of the children processes.
47 : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48 fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
49{
50 Setup();
51}
52
53TMPWorkerTree::TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries,
54 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
55 : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
56 fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE),
57 fCacheSize(-1)
58{
59 Setup();
60}
61
63 ULong64_t firstEntry)
64 : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
65 fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
66{
67 Setup();
68}
69
71{
72 // Properly close the open file, if any
73 CloseFile();
74}
75
76//////////////////////////////////////////////////////////////////////////
77/// Auxilliary method for common initializations
79{
80 Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 0);
81 if (uc != 1) fUseTreeCache = kFALSE;
82 fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
83}
84
85//////////////////////////////////////////////////////////////////////////
86/// Handle file closing.
87
89{
90 // Avoid destroying the cache; must be placed before deleting the trees
91 if (fFile) {
92 if (fTree) fFile->SetCacheRead(0, fTree);
93 delete fFile ;
94 fFile = 0;
95 }
96}
97
98//////////////////////////////////////////////////////////////////////////
99/// Handle file opening.
100
101TFile *TMPWorkerTree::OpenFile(const std::string& fileName)
102{
103
104 TFile *fp = TFile::Open(fileName.c_str());
105 if (fp == nullptr || fp->IsZombie()) {
106 std::stringstream ss;
107 ss << "could not open file " << fileName;
108 std::string errmsg = ss.str();
110 return nullptr;
111 }
112
113 return fp;
114}
115
116//////////////////////////////////////////////////////////////////////////
117/// Retrieve a tree from an open file.
118
120{
121 //retrieve the TTree with the specified name from file
122 //we are not the owner of the TTree object, the file is!
123 TTree *tree = nullptr;
124 if(fTreeName == "") {
125 // retrieve the first TTree
126 // (re-adapted from TEventIter.cxx)
127 if (fp->GetListOfKeys()) {
128 for(auto k : *fp->GetListOfKeys()) {
129 TKey *key = static_cast<TKey*>(k);
130 if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
131 tree = static_cast<TTree*>(fp->Get(key->GetName()));
132 }
133 }
134 } else {
135 tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
136 }
137 if (tree == nullptr) {
138 std::stringstream ss;
139 ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
140 std::string errmsg = ss.str();
142 return nullptr;
143 }
144
145 return tree;
146}
147
148//////////////////////////////////////////////////////////////////////////
149/// Tree cache handling
150
152{
153 if (fUseTreeCache) {
154 TFile *curfile = tree->GetCurrentFile();
155 if (curfile) {
156 if (!fTreeCache) {
157 tree->SetCacheSize(fCacheSize);
158 fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
159 if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
160 } else {
162 curfile->SetCacheRead(fTreeCache, tree);
164 }
165 if (fTreeCache) {
168 Info("SetupTreeCache","the tree cache is in learning phase");
169 }
170 } else {
171 Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
172 }
173 } else {
174 // Disable the cache
175 tree->SetCacheSize(0);
176 }
177}
178
179//////////////////////////////////////////////////////////////////////////
180/// Init overload definign max entries
181
183{
184
185 TMPWorker::Init(fd, workerN);
187}
188
189//////////////////////////////////////////////////////////////////////////
190/// Max entries evaluation
191
193{
194 // E.g.: when dividing 10 entries between 3 workers, the first
195 // two will process 10/3 == 3 entries, the last one will process
196 // 10 - 2*(10/3) == 4 entries.
197 if(GetNWorker() < fNWorkers-1)
198 return maxEntries/fNWorkers;
199 else
200 return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
201}
202
203//////////////////////////////////////////////////////////////////////////
204/// Generic input handling
205
207{
208 UInt_t code = msg.first;
209
210 if (code == MPCode::kProcRange
211 || code == MPCode::kProcFile
212 || code == MPCode::kProcTree) {
213 //execute fProcFunc on a file or a range of entries in a file
214 Process(code, msg);
215 } else if (code == MPCode::kSendResult) {
216 //send back result
217 SendResult();
218 } else {
219 //unknown code received
220 std::string reply = "S" + std::to_string(GetNWorker());
221 reply += ": unknown code received: " + std::to_string(code);
222 MPSend(GetSocket(), MPCode::kError, reply.c_str());
223 }
224}
225
226
227
228//////////////////////////////////////////////////////////////////////////
229/// Selector processing SendResult and Process overload
230
232{
233 //send back result
236}
237
238/// Selector specialization
240{
241 //evaluate the index of the file to process in fFileNames
242 //(we actually don't need the parameter if code == kProcTree)
243
244 Long64_t start = 0;
245 Long64_t finish = 0;
246 TEntryList *enl = 0;
247 std::string errmsg;
248 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
249 SendError(errmsg);
250 return;
251 }
252
253 if (fCallBegin) {
254 fSelector.SlaveBegin(nullptr);
255 fCallBegin = false;
256 }
257
260 for (Long64_t entry = start; entry < finish; ++entry) {
261 Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
263 }
264
265 // update the number of processed entries
266 fProcessedEntries += finish - start;
267
269
270 return;
271}
272
273/// Load the requierd tree and evaluate the processing range
274
276 std::string &errmsg)
277{
278 // evaluate the index of the file to process in fFileNames
279 //(we actually don't need the parameter if code == kProcTree)
280
281 start = 0;
282 finish = 0;
283 errmsg = "";
284
285 UInt_t fileN = 0;
286 UInt_t nProcessed = 0;
287 Bool_t setupcache = true;
288
289 std::string mgroot = "[S" + std::to_string(GetNWorker()) + "]: ";
290
291 TTree *tree = 0;
292 if (code == MPCode::kProcTree) {
293
294 mgroot += "MPCode::kProcTree: ";
295
296 // The tree must be defined at this level
297 if(fTree == nullptr) {
298 errmsg = mgroot + std::string("tree undefined!");
299 return -1;
300 }
301
302 //retrieve the total number of entries ranges processed so far by TPool
303 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
304
305 //create entries range
306 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
307 //and this worker must take the rangeN-th range
308 Long64_t nEntries = fTree->GetEntries();
309 UInt_t nBunch = nEntries / fNWorkers;
310 UInt_t rangeN = nProcessed % fNWorkers;
311 start = rangeN * nBunch;
312 if (rangeN < (fNWorkers - 1)) {
313 finish = (rangeN+1)*nBunch;
314 } else {
315 finish = nEntries;
316 }
317
318 //process tree
319 tree = fTree;
320 CloseFile(); // May not be needed
321 if (fTree->GetCurrentFile()) {
322 // We need to reopen the file locally (TODO: to understand and fix this)
324 if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
325 errmsg = mgroot + std::string("unable to retrieve tree from open file ") +
326 std::string(fTree->GetCurrentFile()->GetName());
327 delete fFile;
328 return -1;
329 }
330 fTree = tree;
331 } else {
332 //errors are handled inside OpenFile
333 errmsg = mgroot + std::string("unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
334 if (fFile && fFile->IsZombie()) delete fFile;
335 return -1;
336 }
337 }
338
339 } else {
340
341 if (code == MPCode::kProcRange) {
342 mgroot += "MPCode::kProcRange: ";
343 //retrieve the total number of entries ranges processed so far by TPool
344 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
345 //evaluate the file and the entries range to process
346 fileN = nProcessed / fNWorkers;
347 } else if (code == MPCode::kProcFile) {
348 mgroot += "MPCode::kProcFile: ";
349 //evaluate the file and the entries range to process
350 fileN = ReadBuffer<UInt_t>(msg.second.get());
351 } else {
352 errmsg += "MPCode undefined!";
353 return -1;
354 }
355
356 // Open the file if required
357 if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
358 if (!fFile) {
359 fFile = OpenFile(fFileNames[fileN]);
360 if (fFile == nullptr) {
361 // errors are handled inside OpenFile
362 errmsg = mgroot + std::string("unable to open file ") + fFileNames[fileN];
363 return -1;
364 }
365 }
366
367 //retrieve the TTree with the specified name from file
368 //we are not the owner of the TTree object, the file is!
370 if (tree == nullptr) {
371 //errors are handled inside RetrieveTree
372 errmsg = mgroot + std::string("unable to retrieve tree from open file ") + fFileNames[fileN];
373 return -1;
374 }
375
376 // Prepare to setup the cache, if required
377 setupcache = (tree != fTree) ? true : false;
378
379 // Store as reference
380 fTree = tree;
381
382 //create entries range
383 if (code == MPCode::kProcRange) {
384 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
385 //and this worker must take the rangeN-th range
386 Long64_t nEntries = tree->GetEntries();
387 UInt_t nBunch = nEntries / fNWorkers;
388 if(nEntries % fNWorkers) nBunch++;
389 UInt_t rangeN = nProcessed % fNWorkers;
390 start = rangeN * nBunch;
391 if(rangeN < (fNWorkers-1))
392 finish = (rangeN+1)*nBunch;
393 else
394 finish = nEntries;
395 } else {
396 start = 0;
397 finish = tree->GetEntries();
398 }
399 }
400
401 // Setup the cache, if required
402 if (setupcache) SetupTreeCache(fTree);
403
404 // Get the entrylist, if required
405 if (fEntryList && enl) {
406 if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
407 // create entries range
408 if (code == MPCode::kProcRange) {
409 // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
410 // and this worker must take the rangeN-th range
411 ULong64_t nEntries = (*enl)->GetN();
412 UInt_t nBunch = nEntries / fNWorkers;
413 if (nEntries % fNWorkers) nBunch++;
414 UInt_t rangeN = nProcessed % fNWorkers;
415 start = rangeN * nBunch;
416 if (rangeN < (fNWorkers - 1))
417 finish = (rangeN + 1) * nBunch;
418 else
419 finish = nEntries;
420 } else {
421 start = 0;
422 finish = (*enl)->GetN();
423 }
424 } else {
425 Warning("LoadTree", "failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
426 }
427 }
428
429 //check if we are going to reach the max of entries
430 //change finish accordingly
431 if (fMaxNEntries)
432 if (fProcessedEntries + finish - start > fMaxNEntries)
433 finish = start + fMaxNEntries - fProcessedEntries;
434
435 if (gDebug > 0 && fFile)
436 Info("LoadTree", "%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,
437 finish);
438
439 return 0;
440}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:41
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
long long Long64_t
Definition: RtypesCore.h:69
unsigned long long ULong64_t
Definition: RtypesCore.h:70
const Bool_t kTRUE
Definition: RtypesCore.h:87
R__EXTERN Int_t gDebug
Definition: Rtypes.h:91
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
void Info(const char *location, const char *msgfmt,...)
void Warning(const char *location, const char *msgfmt,...)
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual TList * GetListOfKeys() const
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual TEntryList * GetEntryList(const char *treename, const char *filename, Option_t *opt="")
Return the entry list, corresponding to treename and filename By default, the filename is first tried...
Definition: TEntryList.cxx:785
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
Definition: TEntryList.cxx:655
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:48
TFileCacheRead * GetCacheRead(const TObject *tree=0) const
Return a pointer to the current read cache.
Definition: TFile.cxx:1217
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3980
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=0, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
Definition: TFile.cxx:2266
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:24
virtual const char * GetClassName() const
Definition: TKey.h:71
void SendResult()
Selector processing SendResult and Process overload.
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
Definition: TMPWorkerTree.h:64
TFile * fFile
last open file
Definition: TMPWorkerTree.h:65
void Setup()
Auxilliary method for common initializations.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
Definition: TMPWorkerTree.h:66
TMPWorkerTree()
Class constructors.
Bool_t fUseTreeCache
Control usage of the tree cache.
Definition: TMPWorkerTree.h:74
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TMPWorkerTree.h:62
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
Definition: TMPWorkerTree.h:73
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
Definition: TMPWorkerTree.h:58
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
virtual void Process(UInt_t, MPCodeBufPair &)
Definition: TMPWorkerTree.h:56
virtual ~TMPWorkerTree()
Long64_t fCacheSize
Cache size.
Definition: TMPWorkerTree.h:75
std::string fTreeName
the name of the tree to be processed
Definition: TMPWorkerTree.h:63
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
Definition: TMPWorkerTree.h:72
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:115
unsigned GetNWorker() const
Definition: TMPWorker.h:42
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition: TMPWorker.h:47
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition: TMPWorker.h:48
TSocket * GetSocket()
Definition: TMPWorker.h:40
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
unsigned fNWorkers
the number of workers spawned
Definition: TMPWorker.h:46
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition: TObject.h:134
virtual void Init(TTree *)
Definition: TSelector.h:55
virtual Bool_t Process(Long64_t)
Definition: TSelector.cxx:330
virtual void SlaveBegin(TTree *)
Definition: TSelector.h:57
virtual TList * GetOutputList() const
Definition: TSelector.h:71
virtual Bool_t Notify()
This method must be overridden to handle object notification.
Definition: TSelector.h:58
virtual void SlaveTerminate()
Definition: TSelector.h:72
A cache to speed-up the reading of ROOT datasets.
Definition: TTreeCache.h:35
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
virtual Bool_t IsLearning() const
Definition: TTreeCache.h:155
virtual void ResetCache()
This will simply clear the cache.
A TTree represents a columnar dataset.
Definition: TTree.h:71
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition: TTree.cxx:5263
virtual Long64_t GetEntries() const
Definition: TTree.h:402
This class represents a WWW compatible URL.
Definition: TUrl.h:35
const char * GetFile() const
Definition: TUrl.h:72
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition: MPCode.h:38
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
@ kError
Error message.
Definition: MPCode.h:47
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition: MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition: MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition: MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
Definition: tree.py:1