Logo ROOT   6.14/05
Reference Guide
TMPWorkerTree.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "MPCode.h"
14 #include "MPSendRecv.h"
15 #include "TError.h"
16 #include "TMPWorkerTree.h"
17 #include "TSystem.h"
18 #include "TEnv.h"
19 #include <string>
20 
21 //////////////////////////////////////////////////////////////////////////
22 ///
23 /// \class TMPWorkerTree
24 ///
25 /// This class works in conjuction with TTreeProcessorMP, reacting to messages
26 /// received from it as specified by the Notify and HandleInput methods.
27 ///
28 /// \class TMPWorkerTreeFunc
29 ///
30 /// Templated derivation of TMPWorkerTree handlign generic function tree processing.
31 ///
32 /// \class TMPWorkerTreeSel
33 ///
34 /// Templated derivation of TMPWorkerTree handlign selector tree processing.
35 ///
36 //////////////////////////////////////////////////////////////////////////
37 
38 //////////////////////////////////////////////////////////////////////////
39 /// Class constructors.
40 /// Note that this does not set variables like fPid or fS (worker's socket).\n
41 /// These operations are handled by the Init method, which is called after
42 /// forking.\n
43 /// This separation is in place because the instantiation of a worker
44 /// must be done once _before_ forking, while the initialization of the
45 /// members must be done _after_ forking by each of the children processes.
47  : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48  fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
49 {
50  Setup();
51 }
52 
53 TMPWorkerTree::TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries,
54  const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
55  : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
57  fCacheSize(-1)
58 {
59  Setup();
60 }
61 
63  ULong64_t firstEntry)
64  : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
66 {
67  Setup();
68 }
69 
71 {
72  // Properly close the open file, if any
73  CloseFile();
74 }
75 
76 //////////////////////////////////////////////////////////////////////////
77 /// Auxilliary method for common initializations
79 {
80  Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 0);
81  if (uc != 1) fUseTreeCache = kFALSE;
82  fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
83 }
84 
85 //////////////////////////////////////////////////////////////////////////
86 /// Handle file closing.
87 
89 {
90  // Avoid destroying the cache; must be placed before deleting the trees
91  if (fFile) {
92  if (fTree) fFile->SetCacheRead(0, fTree);
93  delete fFile ;
94  fFile = 0;
95  }
96 }
97 
98 //////////////////////////////////////////////////////////////////////////
99 /// Handle file opening.
100 
101 TFile *TMPWorkerTree::OpenFile(const std::string& fileName)
102 {
103 
104  TFile *fp = TFile::Open(fileName.c_str());
105  if (fp == nullptr || fp->IsZombie()) {
106  std::stringstream ss;
107  ss << "could not open file " << fileName;
108  std::string errmsg = ss.str();
109  SendError(errmsg, MPCode::kProcError);
110  return nullptr;
111  }
112 
113  return fp;
114 }
115 
116 //////////////////////////////////////////////////////////////////////////
117 /// Retrieve a tree from an open file.
118 
120 {
121  //retrieve the TTree with the specified name from file
122  //we are not the owner of the TTree object, the file is!
123  TTree *tree = nullptr;
124  if(fTreeName == "") {
125  // retrieve the first TTree
126  // (re-adapted from TEventIter.cxx)
127  if (fp->GetListOfKeys()) {
128  for(auto k : *fp->GetListOfKeys()) {
129  TKey *key = static_cast<TKey*>(k);
130  if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
131  tree = static_cast<TTree*>(fp->Get(key->GetName()));
132  }
133  }
134  } else {
135  tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
136  }
137  if (tree == nullptr) {
138  std::stringstream ss;
139  ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
140  std::string errmsg = ss.str();
141  SendError(errmsg, MPCode::kProcError);
142  return nullptr;
143  }
144 
145  return tree;
146 }
147 
148 //////////////////////////////////////////////////////////////////////////
149 /// Tree cache handling
150 
152 {
153  if (fUseTreeCache) {
154  TFile *curfile = tree->GetCurrentFile();
155  if (curfile) {
156  if (!fTreeCache) {
157  tree->SetCacheSize(fCacheSize);
158  fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
159  if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
160  } else {
162  curfile->SetCacheRead(fTreeCache, tree);
163  fTreeCache->UpdateBranches(tree);
164  }
165  if (fTreeCache) {
168  Info("SetupTreeCache","the tree cache is in learning phase");
169  }
170  } else {
171  Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
172  }
173  } else {
174  // Disable the cache
175  tree->SetCacheSize(0);
176  }
177 }
178 
179 //////////////////////////////////////////////////////////////////////////
180 /// Init overload definign max entries
181 
183 {
184 
185  TMPWorker::Init(fd, workerN);
187 }
188 
189 //////////////////////////////////////////////////////////////////////////
190 /// Max entries evaluation
191 
193 {
194  // E.g.: when dividing 10 entries between 3 workers, the first
195  // two will process 10/3 == 3 entries, the last one will process
196  // 10 - 2*(10/3) == 4 entries.
197  if(GetNWorker() < fNWorkers-1)
198  return maxEntries/fNWorkers;
199  else
200  return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
201 }
202 
203 //////////////////////////////////////////////////////////////////////////
204 /// Generic input handling
205 
207 {
208  UInt_t code = msg.first;
209 
210  if (code == MPCode::kProcRange
211  || code == MPCode::kProcFile
212  || code == MPCode::kProcTree) {
213  //execute fProcFunc on a file or a range of entries in a file
214  Process(code, msg);
215  } else if (code == MPCode::kSendResult) {
216  //send back result
217  SendResult();
218  } else {
219  //unknown code received
220  std::string reply = "S" + std::to_string(GetNWorker());
221  reply += ": unknown code received: " + std::to_string(code);
222  MPSend(GetSocket(), MPCode::kError, reply.c_str());
223  }
224 }
225 
226 
227 
228 //////////////////////////////////////////////////////////////////////////
229 /// Selector processing SendResult and Process overload
230 
232 {
233  //send back result
234  fSelector.SlaveTerminate();
235  MPSend(GetSocket(), MPCode::kProcResult, fSelector.GetOutputList());
236 }
237 
238 /// Selector specialization
240 {
241  //evaluate the index of the file to process in fFileNames
242  //(we actually don't need the parameter if code == kProcTree)
243 
244  Long64_t start = 0;
245  Long64_t finish = 0;
246  TEntryList *enl = 0;
247  std::string errmsg;
248  if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
249  SendError(errmsg);
250  return;
251  }
252 
253  if (fCallBegin) {
254  fSelector.SlaveBegin(nullptr);
255  fCallBegin = false;
256  }
257 
258  fSelector.Init(fTree);
259  fSelector.Notify();
260  for (Long64_t entry = start; entry < finish; ++entry) {
261  Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
262  fSelector.Process(e);
263  }
264 
265  // update the number of processed entries
266  fProcessedEntries += finish - start;
267 
269 
270  return;
271 }
272 
273 /// Load the requierd tree and evaluate the processing range
274 
276  std::string &errmsg)
277 {
278  // evaluate the index of the file to process in fFileNames
279  //(we actually don't need the parameter if code == kProcTree)
280 
281  start = 0;
282  finish = 0;
283  errmsg = "";
284 
285  UInt_t fileN = 0;
286  UInt_t nProcessed = 0;
287  Bool_t setupcache = true;
288 
289  std::string mgroot = "[S" + std::to_string(GetNWorker()) + "]: ";
290 
291  TTree *tree = 0;
292  if (code == MPCode::kProcTree) {
293 
294  mgroot += "MPCode::kProcTree: ";
295 
296  // The tree must be defined at this level
297  if(fTree == nullptr) {
298  errmsg = mgroot + std::string("tree undefined!");
299  return -1;
300  }
301 
302  //retrieve the total number of entries ranges processed so far by TPool
303  nProcessed = ReadBuffer<UInt_t>(msg.second.get());
304 
305  //create entries range
306  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
307  //and this worker must take the rangeN-th range
308  Long64_t nEntries = fTree->GetEntries();
309  UInt_t nBunch = nEntries / fNWorkers;
310  UInt_t rangeN = nProcessed % fNWorkers;
311  start = rangeN * nBunch;
312  if (rangeN < (fNWorkers - 1)) {
313  finish = (rangeN+1)*nBunch;
314  } else {
315  finish = nEntries;
316  }
317 
318  //process tree
319  tree = fTree;
320  CloseFile(); // May not be needed
321  if (fTree->GetCurrentFile()) {
322  // We need to reopen the file locally (TODO: to understand and fix this)
323  if ((fFile = TFile::Open(fTree->GetCurrentFile()->GetName())) && !fFile->IsZombie()) {
324  if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
325  errmsg = mgroot + std::string("unable to retrieve tree from open file ") +
326  std::string(fTree->GetCurrentFile()->GetName());
327  delete fFile;
328  return -1;
329  }
330  fTree = tree;
331  } else {
332  //errors are handled inside OpenFile
333  errmsg = mgroot + std::string("unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
334  if (fFile && fFile->IsZombie()) delete fFile;
335  return -1;
336  }
337  }
338 
339  } else {
340 
341  if (code == MPCode::kProcRange) {
342  mgroot += "MPCode::kProcRange: ";
343  //retrieve the total number of entries ranges processed so far by TPool
344  nProcessed = ReadBuffer<UInt_t>(msg.second.get());
345  //evaluate the file and the entries range to process
346  fileN = nProcessed / fNWorkers;
347  } else if (code == MPCode::kProcFile) {
348  mgroot += "MPCode::kProcFile: ";
349  //evaluate the file and the entries range to process
350  fileN = ReadBuffer<UInt_t>(msg.second.get());
351  } else {
352  errmsg += "MPCode undefined!";
353  return -1;
354  }
355 
356  // Open the file if required
357  if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
358  if (!fFile) {
359  fFile = OpenFile(fFileNames[fileN]);
360  if (fFile == nullptr) {
361  // errors are handled inside OpenFile
362  errmsg = mgroot + std::string("unable to open file ") + fFileNames[fileN];
363  return -1;
364  }
365  }
366 
367  //retrieve the TTree with the specified name from file
368  //we are not the owner of the TTree object, the file is!
369  tree = RetrieveTree(fFile);
370  if (tree == nullptr) {
371  //errors are handled inside RetrieveTree
372  errmsg = mgroot + std::string("unable to retrieve tree from open file ") + fFileNames[fileN];
373  return -1;
374  }
375 
376  // Prepare to setup the cache, if required
377  setupcache = (tree != fTree) ? true : false;
378 
379  // Store as reference
380  fTree = tree;
381 
382  //create entries range
383  if (code == MPCode::kProcRange) {
384  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
385  //and this worker must take the rangeN-th range
386  Long64_t nEntries = tree->GetEntries();
387  UInt_t nBunch = nEntries / fNWorkers;
388  if(nEntries % fNWorkers) nBunch++;
389  UInt_t rangeN = nProcessed % fNWorkers;
390  start = rangeN * nBunch;
391  if(rangeN < (fNWorkers-1))
392  finish = (rangeN+1)*nBunch;
393  else
394  finish = nEntries;
395  } else {
396  start = 0;
397  finish = tree->GetEntries();
398  }
399  }
400 
401  // Setup the cache, if required
402  if (setupcache) SetupTreeCache(fTree);
403 
404  // Get the entrylist, if required
405  if (fEntryList && enl) {
406  if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
407  // create entries range
408  if (code == MPCode::kProcRange) {
409  // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
410  // and this worker must take the rangeN-th range
411  ULong64_t nEntries = (*enl)->GetN();
412  UInt_t nBunch = nEntries / fNWorkers;
413  if (nEntries % fNWorkers) nBunch++;
414  UInt_t rangeN = nProcessed % fNWorkers;
415  start = rangeN * nBunch;
416  if (rangeN < (fNWorkers - 1))
417  finish = (rangeN + 1) * nBunch;
418  else
419  finish = nEntries;
420  } else {
421  start = 0;
422  finish = (*enl)->GetN();
423  }
424  } else {
425  Warning("LoadTree", "failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
426  }
427  }
428 
429  //check if we are going to reach the max of entries
430  //change finish accordingly
431  if (fMaxNEntries)
432  if (fProcessedEntries + finish - start > fMaxNEntries)
433  finish = start + fMaxNEntries - fProcessedEntries;
434 
435  if (gDebug > 0 && fFile)
436  Info("LoadTree", "%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,
437  finish);
438 
439  return 0;
440 }
void CloseFile()
Handle file closing.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
long long Long64_t
Definition: RtypesCore.h:69
void Setup()
Auxilliary method for common initializations.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=0, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
Definition: TFile.cxx:2265
This class represents a WWW compatible URL.
Definition: TUrl.h:35
virtual const char * GetClassName() const
Definition: TKey.h:71
std::string fTreeName
the name of the tree to be processed
Definition: TMPWorkerTree.h:63
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:35
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:47
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
TMPWorkerTree()
Class constructors.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
Error message.
Definition: MPCode.h:47
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition: TMPWorker.h:51
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
Definition: TMPWorkerTree.h:64
TTreeCache * fTreeCache
instance of the tree cache for the tree
Definition: TMPWorkerTree.h:72
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:115
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3976
unsigned GetNWorker() const
Definition: TMPWorker.h:45
virtual Long64_t GetCacheSize() const
Definition: TTree.h:374
void SendResult()
Selector processing SendResult and Process overload.
void Info(const char *location, const char *msgfmt,...)
virtual void SendResult()
Definition: TMPWorkerTree.h:58
virtual void ResetCache()
This will simply clear the cache.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
Definition: TMPWorkerTree.h:73
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:24
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition: TMPWorker.h:50
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition: TObject.h:134
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition: MPCode.h:38
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
virtual void Process(UInt_t, MPCodeBufPair &)
Definition: TMPWorkerTree.h:56
void SetupTreeCache(TTree *tree)
Tree cache handling.
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: MPCode.h:39
unsigned fNWorkers
the number of workers spawned
Definition: TMPWorker.h:49
unsigned int UInt_t
Definition: RtypesCore.h:42
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition: TTree.cxx:5205
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next()...
Definition: TEntryList.cxx:653
TSocket * GetSocket()
Definition: TMPWorker.h:43
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition: MPCode.h:40
TFileCacheRead * GetCacheRead(const TObject *tree=0) const
Return a pointer to the current read cache.
Definition: TFile.cxx:1215
void Warning(const char *location, const char *msgfmt,...)
const Bool_t kFALSE
Definition: RtypesCore.h:88
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
unsigned long long ULong64_t
Definition: RtypesCore.h:70
Bool_t fUseTreeCache
Control usage of the tree cache.
Definition: TMPWorkerTree.h:74
TEntryList * fEntryList
entrylist
Definition: TMPWorkerTree.h:66
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
Long64_t fCacheSize
Cache size.
Definition: TMPWorkerTree.h:75
virtual Int_t SetCacheSize(Long64_t cachesize=-1)
Set maximum size of the file cache .
Definition: TTree.cxx:8270
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TMPWorkerTree.h:62
virtual Long64_t GetEntries() const
Definition: TTree.h:384
virtual Bool_t IsLearning() const
Definition: TTreeCache.h:153
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
virtual ~TMPWorkerTree()
virtual TList * GetListOfKeys() const
virtual TEntryList * GetEntryList(const char *treename, const char *filename, Option_t *opt="")
Return the entry list, corresponding to treename and filename By default, the filename is first tried...
Definition: TEntryList.cxx:777
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
R__EXTERN Int_t gDebug
Definition: Rtypes.h:86
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:70
Tell the client there was an error while processing.
Definition: MPCode.h:44
ULong64_t fFirstEntry
first entry to br processed
Definition: TMPWorkerTree.h:67
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:25
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
We are ready for the next task.
Definition: MPCode.h:35
const Bool_t kTRUE
Definition: RtypesCore.h:87
TFile * fFile
last open file
Definition: TMPWorkerTree.h:65