Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorkerTree.h"
17#include "TEnv.h"
18#include <string>
19
20//////////////////////////////////////////////////////////////////////////
21///
22/// \class TMPWorkerTree
23///
24/// This class works in conjunction with TTreeProcessorMP, reacting to messages
25/// received from it as specified by the Notify and HandleInput methods.
26///
27/// \class TMPWorkerTreeFunc
28///
29/// Templated derivation of TMPWorkerTree handlign generic function tree processing.
30///
31/// \class TMPWorkerTreeSel
32///
33/// Templated derivation of TMPWorkerTree handlign selector tree processing.
34///
35//////////////////////////////////////////////////////////////////////////
36
37//////////////////////////////////////////////////////////////////////////
38/// Class constructors.
39/// Note that this does not set variables like fPid or fS (worker's socket).\n
40/// These operations are handled by the Init method, which is called after
41/// forking.\n
42/// This separation is in place because the instantiation of a worker
43/// must be done once _before_ forking, while the initialization of the
44/// members must be done _after_ forking by each of the children processes.
45
47 : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48 fTreeCache(nullptr), fTreeCacheIsLearning(false), fUseTreeCache(true), fCacheSize(-1)
49{
50 Setup();
51}
52
53TMPWorkerTree::TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries,
54 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
55 : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
56 fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(nullptr), fTreeCacheIsLearning(false), fUseTreeCache(true),
57 fCacheSize(-1)
58{
59 Setup();
60}
61
62TMPWorkerTree::TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
63 ULong64_t firstEntry)
64 : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
65 fTreeCache(nullptr), fTreeCacheIsLearning(false), fUseTreeCache(true), fCacheSize(-1)
66{
67 Setup();
68}
69
71{
72 // Properly close the open file, if any
73 CloseFile();
74}
75
76//////////////////////////////////////////////////////////////////////////
77/// Auxiliary method for common initialization
79{
80 Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 1);
81 if (uc != 1) fUseTreeCache = false;
82 fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
83}
84
85//////////////////////////////////////////////////////////////////////////
86/// Handle file closing.
87
89{
90 // Avoid destroying the cache; must be placed before deleting the trees
91 if (fFile) {
92 if (fTree) fFile->SetCacheRead(nullptr, fTree);
93 delete fFile ;
94 fFile = nullptr;
95 }
96}
97
98//////////////////////////////////////////////////////////////////////////
99/// Handle file opening.
100
101TFile *TMPWorkerTree::OpenFile(const std::string& fileName)
102{
103
104 TFile *fp = TFile::Open(fileName.c_str());
105 if (fp == nullptr || fp->IsZombie()) {
106 std::stringstream ss;
107 ss << "could not open file " << fileName;
108 std::string errmsg = ss.str();
110 return nullptr;
111 }
112
113 return fp;
114}
115
116//////////////////////////////////////////////////////////////////////////
117/// Retrieve a tree from an open file.
118
120{
121 //retrieve the TTree with the specified name from file
122 //we are not the owner of the TTree object, the file is!
123 TTree *tree = nullptr;
124 if(fTreeName.empty()) {
125 // retrieve the first TTree
126 // (re-adapted from TEventIter.cxx)
127 if (fp->GetListOfKeys()) {
128 for(auto k : *fp->GetListOfKeys()) {
129 TKey *key = static_cast<TKey*>(k);
130 if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
131 tree = static_cast<TTree*>(fp->Get(key->GetName()));
132 }
133 }
134 } else {
135 tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
136 }
137 if (tree == nullptr) {
138 std::stringstream ss;
139 ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
140 std::string errmsg = ss.str();
142 return nullptr;
143 }
144
145 return tree;
146}
147
148//////////////////////////////////////////////////////////////////////////
149/// Tree cache handling
150
152{
153 if (fUseTreeCache) {
154 TFile *curfile = tree->GetCurrentFile();
155 if (curfile) {
156 if (!fTreeCache) {
157 tree->SetCacheSize(fCacheSize);
158 fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
159 if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
160 } else {
163 curfile->SetCacheRead(fTreeCache, tree);
164 }
165 if (fTreeCache) {
167 }
168 } else {
169 Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
170 }
171 } else {
172 // Disable the cache
173 tree->SetCacheSize(0);
174 }
175}
176
177//////////////////////////////////////////////////////////////////////////
178/// Init overload defining max entries
179
181{
182
183 TMPWorker::Init(fd, workerN);
185}
186
187//////////////////////////////////////////////////////////////////////////
188/// Max entries evaluation
189
191{
192 // E.g.: when dividing 10 entries between 3 workers, the first
193 // two will process 10/3 == 3 entries, the last one will process
194 // 10 - 2*(10/3) == 4 entries.
195 if(GetNWorker() < fNWorkers-1)
196 return maxEntries/fNWorkers;
197 else
198 return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
199}
200
201//////////////////////////////////////////////////////////////////////////
202/// Generic input handling
203
205{
206 UInt_t code = msg.first;
207
208 if (code == MPCode::kProcRange
209 || code == MPCode::kProcFile
210 || code == MPCode::kProcTree) {
211 //execute fProcFunc on a file or a range of entries in a file
212 Process(code, msg);
213 } else if (code == MPCode::kSendResult) {
214 //send back result
215 SendResult();
216 } else {
217 //unknown code received
218 std::string reply = "S" + std::to_string(GetNWorker());
219 reply += ": unknown code received: " + std::to_string(code);
220 MPSend(GetSocket(), MPCode::kError, reply.c_str());
221 }
222}
223
224
225
226//////////////////////////////////////////////////////////////////////////
227/// Selector processing SendResult and Process overload
228
230{
231 //send back result
234}
235
236//////////////////////////////////////////////////////////////////////////
237/// Selector specialization
238
240{
241 //evaluate the index of the file to process in fFileNames
242 //(we actually don't need the parameter if code == kProcTree)
243
244 Long64_t start = 0;
245 Long64_t finish = 0;
246 TEntryList *enl = nullptr;
247 std::string errmsg;
248 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
249 SendError(errmsg);
250 return;
251 }
252
253 if (fCallBegin) {
254 fSelector.SlaveBegin(nullptr);
255 fCallBegin = false;
256 }
257
260 for (Long64_t entry = start; entry < finish; ++entry) {
261 Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
263 }
264
265 // update the number of processed entries
266 fProcessedEntries += finish - start;
267
269
270 return;
271}
272
273//////////////////////////////////////////////////////////////////////////
274/// Load the required tree and evaluate the processing range
275
277 std::string &errmsg)
278{
279 // evaluate the index of the file to process in fFileNames
280 //(we actually don't need the parameter if code == kProcTree)
281
282 start = 0;
283 finish = 0;
284 errmsg = "";
285
286 UInt_t fileN = 0;
287 UInt_t nProcessed = 0;
288 bool setupcache = true;
289
290 std::string mgroot = "[S" + std::to_string(GetNWorker()) + "]: ";
291
292 TTree *tree = nullptr;
293 if (code == MPCode::kProcTree) {
294
295 mgroot += "MPCode::kProcTree: ";
296
297 // The tree must be defined at this level
298 if(fTree == nullptr) {
299 errmsg = mgroot + std::string("tree undefined!");
300 return -1;
301 }
302
303 //retrieve the total number of entries ranges processed so far by TPool
304 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
305
306 //create entries range
307 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
308 //and this worker must take the rangeN-th range
309 Long64_t nEntries = fTree->GetEntries();
310 UInt_t nBunch = nEntries / fNWorkers;
311 UInt_t rangeN = nProcessed % fNWorkers;
312 start = rangeN * nBunch;
313 if (start >= nEntries) {
314 start = finish = nEntries;
315 }
316 else if (rangeN < (fNWorkers - 1)) {
317 finish = (rangeN+1)*nBunch;
318 } else {
319 finish = nEntries;
320 }
321
322
323 //process tree
324 tree = fTree;
325 CloseFile(); // May not be needed
326 if (fTree->GetCurrentFile()) {
327 // We need to reopen the file locally (TODO: to understand and fix this)
329 if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
330 errmsg = mgroot + std::string("unable to retrieve tree from open file ") +
331 std::string(fTree->GetCurrentFile()->GetName());
332 delete fFile;
333 return -1;
334 }
335 fTree = tree;
336 } else {
337 //errors are handled inside OpenFile
338 errmsg = mgroot + std::string("unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
339 if (fFile && fFile->IsZombie()) delete fFile;
340 return -1;
341 }
342 }
343
344 } else {
345
346 if (code == MPCode::kProcRange) {
347 mgroot += "MPCode::kProcRange: ";
348 //retrieve the total number of entries ranges processed so far by TPool
349 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
350 //evaluate the file and the entries range to process
351 fileN = nProcessed / fNWorkers;
352 } else if (code == MPCode::kProcFile) {
353 mgroot += "MPCode::kProcFile: ";
354 //evaluate the file and the entries range to process
355 fileN = ReadBuffer<UInt_t>(msg.second.get());
356 } else {
357 errmsg += "MPCode undefined!";
358 return -1;
359 }
360
361 // Open the file if required
362 if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
363 if (!fFile) {
364 fFile = OpenFile(fFileNames[fileN]);
365 if (fFile == nullptr) {
366 // errors are handled inside OpenFile
367 errmsg = mgroot + std::string("unable to open file ") + fFileNames[fileN];
368 return -1;
369 }
370 }
371
372 //retrieve the TTree with the specified name from file
373 //we are not the owner of the TTree object, the file is!
374 tree = RetrieveTree(fFile);
375 if (tree == nullptr) {
376 //errors are handled inside RetrieveTree
377 errmsg = mgroot + std::string("unable to retrieve tree from open file ") + fFileNames[fileN];
378 return -1;
379 }
380
381 // Prepare to setup the cache, if required
382 setupcache = (tree != fTree) ? true : false;
383
384 // Store as reference
385 fTree = tree;
386
387 //create entries range
388 if (code == MPCode::kProcRange) {
389 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
390 //and this worker must take the rangeN-th range
391 Long64_t nEntries = tree->GetEntries();
392 UInt_t nBunch = nEntries / fNWorkers;
393 if(nEntries % fNWorkers) nBunch++;
394 UInt_t rangeN = nProcessed % fNWorkers;
395 start = rangeN * nBunch;
396 if (start >= nEntries)
397 start = finish = nEntries;
398 else if(rangeN < (fNWorkers-1))
399 finish = (rangeN+1)*nBunch;
400 else
401 finish = nEntries;
402 } else {
403 start = 0;
404 finish = tree->GetEntries();
405 }
406 }
407
408 // Setup the cache, if required
409 if (setupcache) SetupTreeCache(fTree);
410
411 // Get the entrylist, if required
412 if (fEntryList && enl) {
413 if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
414 // create entries range
415 if (code == MPCode::kProcRange) {
416 // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
417 // and this worker must take the rangeN-th range
418 Long64_t nEntries = (*enl)->GetN();
419 UInt_t nBunch = nEntries / fNWorkers;
420 if (nEntries % fNWorkers) nBunch++;
421 UInt_t rangeN = nProcessed % fNWorkers;
422 start = rangeN * nBunch;
423 if (start >= nEntries) {
424 start = finish = nEntries;
425 } else if (rangeN < (fNWorkers - 1))
426 finish = (rangeN + 1) * nBunch;
427 else
428 finish = nEntries;
429 } else {
430 start = 0;
431 finish = (*enl)->GetN();
432 }
433 } else {
434 Warning("LoadTree", "failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
435 }
436 }
437
438 //check if we are going to reach the max of entries
439 //change finish accordingly
440 if (fMaxNEntries)
441 if (fProcessedEntries + finish - start > fMaxNEntries)
442 finish = start + fMaxNEntries - fProcessedEntries;
443
444 if (gDebug > 0 && fFile)
445 Info("LoadTree", "%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,
446 finish);
447
448 return 0;
449}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Int_t gDebug
Definition TROOT.cxx:597
TList * GetListOfKeys() const override
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TEntryList * GetEntryList(const char *treename, const char *filename, Option_t *opt="")
Return the entry list, corresponding to treename and filename By default, the filename is first tried...
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=nullptr, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
Definition TFile.cxx:2365
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4089
TFileCacheRead * GetCacheRead(const TObject *tree=nullptr) const
Return a pointer to the current read cache.
Definition TFile.cxx:1262
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual const char * GetClassName() const
Definition TKey.h:75
void SendResult() override
Selector processing SendResult and Process overload.
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg) override
Selector specialization.
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the required tree and evaluate the processing range.
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
~TMPWorkerTree() override
void Init(int fd, UInt_t workerN) override
Init overload defining max entries.
void HandleInput(MPCodeBufPair &msg) override
Execute instructions received from a MP client.
void Setup()
Auxiliary method for common initialization.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
bool fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
virtual void Process(UInt_t, MPCodeBufPair &)
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
bool fTreeCacheIsLearning
Whether cache is in learning phase.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:25
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
unsigned GetNWorker() const
Definition TMPWorker.h:41
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition TMPWorker.h:46
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition TMPWorker.h:47
TSocket * GetSocket()
Definition TMPWorker.h:39
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition TMPWorker.cxx:52
unsigned fNWorkers
the number of workers spawned
Definition TMPWorker.h:45
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition TObject.h:153
virtual void Init(TTree *)
Definition TSelector.h:53
virtual bool Process(Long64_t)
The Process() function is called for each entry in the tree (or possibly keyed object in the case of ...
virtual void SlaveBegin(TTree *)
Definition TSelector.h:55
bool Notify() override
This method must be overridden to handle object notification (the base implementation is no-op).
Definition TSelector.h:56
virtual TList * GetOutputList() const
Definition TSelector.h:69
virtual void SlaveTerminate()
Definition TSelector.h:70
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
bool IsLearning() const override
Definition TTreeCache.h:152
virtual void ResetCache()
This will simply clear the cache.
A TTree represents a columnar dataset.
Definition TTree.h:79
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition TTree.cxx:5479
virtual Long64_t GetEntries() const
Definition TTree.h:463
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetFile() const
Definition TUrl.h:69
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kError
Error message.
Definition MPCode.h:47
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42