ROOT  6.06/09
Reference Guide
TPoolProcessor.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud September 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TPoolProcessor
13 #define ROOT_TPoolProcessor
14 
15 #include "TMPWorker.h"
16 #include "PoolUtils.h"
17 #include "MPCode.h"
18 #include "MPSendRecv.h"
19 #include "TTree.h"
20 #include "TTreeReader.h"
21 #include "TEventList.h"
22 #include "TEntryList.h"
23 #include "TTree.h"
24 #include "TFile.h"
25 #include "TKey.h"
26 #include "TH1.h"
27 #include <memory>
28 #include <string>
29 #include <sstream>
30 #include <type_traits> //std::result_of
31 
32 
33 //If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
34 //problem of that object being automatically owned by the current open file.
35 //For these three types, we call SetDirectory(nullptr) to detach the returned
36 //object from the file we are reading the TTree from.
37 //Note: the only sane case in which this should happen is when a TH1F* is
38 //returned.
40 void DetachRes(T res)
41 {
42  auto th1p = dynamic_cast<TH1*>(res);
43  if(th1p != nullptr) {
44  th1p->SetDirectory(nullptr);
45  return;
46  }
47 
48  auto ttreep = dynamic_cast<TTree*>(res);
49  if(ttreep != nullptr) {
50  ttreep->SetDirectory(nullptr);
51  return;
52  }
53 
54  auto tentrylist = dynamic_cast<TEntryList*>(res);
55  if(tentrylist != nullptr) {
56  tentrylist->SetDirectory(nullptr);
57  return;
58  }
59 
60  auto teventlist = dynamic_cast<TEventList*>(res);
61  if(teventlist != nullptr) {
62  teventlist->SetDirectory(nullptr);
63  return;
64  }
65 
66  return;
67 }
68 
69 
70 template<class F>
71 class TPoolProcessor : public TMPWorker {
72 public:
73  TPoolProcessor(F procFunc, const std::vector<std::string>& fileNames, const std::string& treeName, unsigned nWorkers, ULong64_t maxEntries);
74  TPoolProcessor(F procFunc, TTree *tree, unsigned nWorkers, ULong64_t maxEntries);
76 
77  void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a TPool client
78  void Init(int fd, unsigned workerN);
79 
80 private:
81  void Process(unsigned code, MPCodeBufPair& msg);
82  TFile *OpenFile(const std::string& fileName);
83  TTree *RetrieveTree(TFile *fp);
85 
86  F fProcFunc; ///< the function to be executed
87  std::vector<std::string> fFileNames; ///< the files to be processed by all workers
88  std::string fTreeName; ///< the name of the tree to be processed
89  TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcPool::Process as argument
90  unsigned fNWorkers; ///< the number of workers spawned
91  ULong64_t fMaxNEntries; ///< the maximum number of entries to be processed by this worker
92  ULong64_t fProcessedEntries; ///< the number of entries processed by this worker so far
93  typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type fReducedResult; ///< the results of the executions of fProcFunc merged together
94  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
95 };
96 
97 
98 template<class F>
99 TPoolProcessor<F>::TPoolProcessor(F procFunc, const std::vector<std::string>& fileNames, const std::string& treeName, unsigned nWorkers, ULong64_t maxEntries) : TMPWorker(), fProcFunc(procFunc),
100  fFileNames(fileNames), fTreeName(treeName), fTree(nullptr),
101  fNWorkers(nWorkers), fMaxNEntries(maxEntries),
102  fProcessedEntries(0), fReducedResult(), fCanReduce(false)
103 {}
104 
105 
106 template<class F>
107 TPoolProcessor<F>::TPoolProcessor(F procFunc, TTree *tree, unsigned nWorkers, ULong64_t maxEntries) :
108  TMPWorker(), fProcFunc(procFunc),
109  fFileNames(), fTreeName(), fTree(tree),
110  fNWorkers(nWorkers), fMaxNEntries(maxEntries),
111  fProcessedEntries(0), fReducedResult(), fCanReduce(false)
112 {}
113 
114 
115 template<class F>
117 {
118  unsigned code = msg.first;
119 
120  if (code == PoolCode::kProcRange
121  || code == PoolCode::kProcFile
122  || code == PoolCode::kProcTree) {
123  //execute fProcFunc on a file or a range of entries in a file
124  Process(code, msg);
125  } else if (code == PoolCode::kSendResult) {
126  //send back result
127  MPSend(GetSocket(), PoolCode::kProcResult, fReducedResult);
128  } else {
129  //unknown code received
130  std::string reply = "S" + std::to_string(GetNWorker());
131  reply += ": unknown code received: " + std::to_string(code);
132  MPSend(GetSocket(), MPCode::kError, reply.data());
133  }
134 }
135 
136 
137 template<class F>
138 void TPoolProcessor<F>::Init(int fd, unsigned workerN) {
139  TMPWorker::Init(fd, workerN);
140 
141  fMaxNEntries = EvalMaxEntries(fMaxNEntries);
142 }
143 
144 
145 template<class F>
146 void TPoolProcessor<F>::Process(unsigned code, MPCodeBufPair& msg)
147 {
148  //evaluate the index of the file to process in fFileNames
149  //(we actually don't need the parameter if code == kProcTree)
150  unsigned fileN = 0;
151  unsigned nProcessed = 0;
152  if (code == PoolCode::kProcRange || code == PoolCode::kProcTree) {
153  if (code == PoolCode::kProcTree && !fTree) {
154  // This must be defined
155  std::cerr << "[S]: Process:kProcTree fTree undefined!\n";
156  return;
157  }
158  //retrieve the total number of entries ranges processed so far by TPool
159  nProcessed = ReadBuffer<unsigned>(msg.second.get());
160  //evaluate the file and the entries range to process
161  fileN = nProcessed / fNWorkers;
162  } else {
163  //evaluate the file and the entries range to process
164  fileN = ReadBuffer<unsigned>(msg.second.get());
165  }
166 
167  std::unique_ptr<TFile> fp;
168  TTree *tree = nullptr;
169  if (code != PoolCode::kProcTree ||
170  (code == PoolCode::kProcTree && fTree->GetCurrentFile())) {
171  //open file
172  if (code == PoolCode::kProcTree && fTree->GetCurrentFile()) {
173  // Single tree from file: we need to reopen, because file descriptor gets invalidated across Fork
174  fp.reset(OpenFile(fTree->GetCurrentFile()->GetName()));
175  } else {
176  fp.reset(OpenFile(fFileNames[fileN]));
177  }
178  if (fp == nullptr) {
179  //errors are handled inside OpenFile
180  return;
181  }
182 
183  //retrieve the TTree with the specified name from file
184  //we are not the owner of the TTree object, the file is!
185  tree = RetrieveTree(fp.get());
186  if(tree == nullptr) {
187  //errors are handled inside RetrieveTree
188  return;
189  }
190  } else {
191  // Tree in memory: OK
192  tree = fTree;
193  }
194 
195  //create entries range
196  Long64_t start = 0;
197  Long64_t finish = 0;
198  if (code == PoolCode::kProcRange || code == PoolCode::kProcTree) {
199  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
200  //and this worker must take the rangeN-th range
201  unsigned nEntries = tree->GetEntries();
202  unsigned nBunch = nEntries / fNWorkers;
203  unsigned rangeN = nProcessed % fNWorkers;
204  start = rangeN*nBunch;
205  if(rangeN < (fNWorkers-1))
206  finish = (rangeN+1)*nBunch;
207  else
208  finish = nEntries;
209  } else {
210  start = 0;
211  finish = tree->GetEntries();
212  }
213 
214  //check if we are going to reach the max of entries
215  //change finish accordingly
216  if (fMaxNEntries)
217  if (fProcessedEntries + finish - start > fMaxNEntries)
218  finish = start + fMaxNEntries - fProcessedEntries;
219 
220  // create a TTreeReader that reads this range of entries
221  TTreeReader reader(tree);
222  //Set first entry to start-1 so that the next call to TTreeReader::Next() sets the entry to the right value
223  TTreeReader::EEntryStatus status = reader.SetEntriesRange(start-1, finish);
224  if(status != TTreeReader::kEntryValid) {
225  std::string reply = "S" + std::to_string(GetNWorker());
226  reply += ": could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish);
227  MPSend(GetSocket(), PoolCode::kProcError, reply.data());
228  return;
229  }
230 
231  //execute function
232  auto res = fProcFunc(reader);
233 
234  //detach result from file if needed (currently needed for TH1, TTree, TEventList)
235  DetachRes(res);
236 
237  //update the number of processed entries
238  fProcessedEntries += finish - start;
239 
240  if(fCanReduce) {
241  fReducedResult = static_cast<decltype(fReducedResult)>(PoolUtils::ReduceObjects({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
242  } else {
243  fCanReduce = true;
244  fReducedResult = res;
245  }
246 
247  if(fMaxNEntries == fProcessedEntries)
248  //we are done forever
249  MPSend(GetSocket(), PoolCode::kProcResult, fReducedResult);
250  else
251  //we are done for now
252  MPSend(GetSocket(), PoolCode::kIdling);
253 }
254 
255 
256 template<class F>
257 TFile *TPoolProcessor<F>::OpenFile(const std::string& fileName)
258 {
259 
260  TFile *fp = TFile::Open(fileName.c_str());
261  if (fp == nullptr || fp->IsZombie()) {
262  std::string reply = "S" + std::to_string(GetNWorker());
263  reply.append(": could not open file ");
264  reply.append(fileName);
265  MPSend(GetSocket(), PoolCode::kProcError, reply.data());
266  return nullptr;
267  }
268 
269  return fp;
270 }
271 
272 
273 template<class F>
275 {
276  //retrieve the TTree with the specified name from file
277  //we are not the owner of the TTree object, the file is!
278  TTree *tree = nullptr;
279  if(fTreeName == "") {
280  // retrieve the first TTree
281  // (re-adapted from TEventIter.cxx)
282  if (fp->GetListOfKeys()) {
283  for(auto k : *fp->GetListOfKeys()) {
284  TKey *key = static_cast<TKey*>(k);
285  if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
286  tree = static_cast<TTree*>(fp->Get(key->GetName()));
287  }
288  }
289  } else {
290  tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
291  }
292  if (tree == nullptr) {
293  std::string reply = "S" + std::to_string(GetNWorker());
294  std::stringstream ss;
295  ss << ": cannot find tree with name " << fTreeName << " in file " << fp->GetName();
296  reply.append(ss.str());
297  MPSend(GetSocket(), PoolCode::kProcError, reply.data());
298  return nullptr;
299  }
300 
301  return tree;
302 }
303 
304 
305 template<class F>
307 {
308  //e.g.: when dividing 10 entries between 3 workers, the first
309  //two will process 10/3 == 3 entries, the last one will process
310  //10 - 2*(10/3) == 4 entries.
311  if(GetNWorker() < fNWorkers-1)
312  return maxEntries/fNWorkers;
313  else
314  return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
315 }
316 
317 #endif
std::result_of< F(std::reference_wrapper< TTreeReader >)>::type fReducedResult
the results of the executions of fProcFunc merged together
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result ...
TObject * ReduceObjects(const std::vector< TObject * > &objs)
Merge collection of TObjects.
Definition: PoolUtils.cxx:11
long long Long64_t
Definition: RtypesCore.h:69
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:38
Small helper to encapsulate whether to return the value pointed to by the iterator or its address...
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:48
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:37
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8266
double T(double x)
Definition: ChebyshevPol.h:34
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
TFile * OpenFile(const TString &fin)
Definition: tmvaglob.cxx:192
virtual TList * GetListOfKeys() const
std::string fTreeName
the name of the tree to be processed
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:20
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
Bool_t IsZombie() const
Definition: TObject.h:141
virtual const char * GetClassName() const
Definition: TKey.h:77
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
Error message.
Definition: MPCode.h:30
TFile * OpenFile(const std::string &fileName)
std::vector< std::string > fFileNames
the files to be processed by all workers
ClassImp(TIterator) Bool_t TIterator return false
Compare two iterator objects.
Definition: TIterator.cxx:20
TTree * RetrieveTree(TFile *fp)
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:36
We are ready for the next task.
Definition: PoolUtils.h:33
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
void Process(unsigned code, MPCodeBufPair &msg)
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:30
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a TPool client.
F fProcFunc
the function to be executed
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir...
Definition: TEventList.cxx:349
#define F(x, y, z)
TPoolProcessor(F procFunc, const std::vector< std::string > &fileNames, const std::string &treeName, unsigned nWorkers, ULong64_t maxEntries)
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
unsigned fNWorkers
the number of workers spawned
The message contains the result of the processing of a TTree.
Definition: PoolUtils.h:39
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:20
Tell the client there was an error while processing.
Definition: PoolUtils.h:41
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8095
Ask for a kFuncResult/kProcResult.
Definition: PoolUtils.h:34
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
void DetachRes(T res)
The TH1 histogram class.
Definition: TH1.h:80
#define nullptr
Definition: Rtypes.h:87
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual Long64_t GetEntries() const
Definition: TTree.h:382
A TTree object has a header with a name and a title.
Definition: TTree.h:94
float value
Definition: math.cpp:443
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
EEntryStatus SetEntriesRange(Long64_t first, Long64_t last)
Set the range of entries to be processed.
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcPool::Pro...