Logo ROOT   6.08/07
Reference Guide
TPoolProcessor.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud September 2015.
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TPoolProcessor
13 #define ROOT_TPoolProcessor
14 
15 #include "MPCode.h"
16 #include "MPSendRecv.h"
17 #include "PoolUtils.h"
18 #include "TError.h"
19 #include "TEntryList.h"
20 #include "TEventList.h"
21 #include "TFile.h"
22 #include "TH1.h"
23 #include "TKey.h"
24 #include "TMPWorker.h"
25 #include "TTree.h"
26 #include "TTreeReader.h"
27 #include <memory>
28 #include <string>
29 #include <sstream>
30 #include <type_traits> //std::result_of
31 
32 
33 //If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
34 //problem of that object being automatically owned by the current open file.
35 //For these three types, we call SetDirectory(nullptr) to detach the returned
36 //object from the file we are reading the TTree from.
37 //Note: the only sane case in which this should happen is when a TH1F* is
38 //returned.
39 template<class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject*, T>::value>::type* = nullptr>
40 void DetachRes(T res)
41 {
42  auto th1p = dynamic_cast<TH1*>(res);
43  if(th1p != nullptr) {
44  th1p->SetDirectory(nullptr);
45  return;
46  }
47 
48  auto ttreep = dynamic_cast<TTree*>(res);
49  if(ttreep != nullptr) {
50  ttreep->SetDirectory(nullptr);
51  return;
52  }
53 
54  auto tentrylist = dynamic_cast<TEntryList*>(res);
55  if(tentrylist != nullptr) {
56  tentrylist->SetDirectory(nullptr);
57  return;
58  }
59 
60  auto teventlist = dynamic_cast<TEventList*>(res);
61  if(teventlist != nullptr) {
62  teventlist->SetDirectory(nullptr);
63  return;
64  }
65 
66  return;
67 }
68 
69 
70 template<class F>
71 class TPoolProcessor : public TMPWorker {
72 public:
73  TPoolProcessor(F procFunc, const std::vector<std::string>& fileNames, const std::string& treeName, unsigned nWorkers, ULong64_t maxEntries);
74  TPoolProcessor(F procFunc, TTree *tree, unsigned nWorkers, ULong64_t maxEntries);
76 
77  void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a TPool client
78  void Init(int fd, unsigned workerN);
79 
80 private:
81  void Process(unsigned code, MPCodeBufPair& msg);
83 
84  F fProcFunc; ///< the function to be executed
85  typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type fReducedResult; ///< the results of the executions of fProcFunc merged together
86  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
87 };
88 
89 
90 template<class F>
91 TPoolProcessor<F>::TPoolProcessor(F procFunc, const std::vector<std::string>& fileNames,
92  const std::string& treeName, unsigned nWorkers, ULong64_t maxEntries)
93  : TMPWorker(fileNames, treeName, nWorkers, maxEntries),
94  fProcFunc(procFunc), fReducedResult(), fCanReduce(false)
95 {}
96 
97 
98 template<class F>
99 TPoolProcessor<F>::TPoolProcessor(F procFunc, TTree *tree, unsigned nWorkers, ULong64_t maxEntries)
100  : TMPWorker(tree, nWorkers, maxEntries),
101  fProcFunc(procFunc), fReducedResult(), fCanReduce(false)
102 {}
103 
104 
105 template<class F>
107 {
108  unsigned code = msg.first;
109 
110  if (code == PoolCode::kProcRange
111  || code == PoolCode::kProcFile
112  || code == PoolCode::kProcTree) {
113  //execute fProcFunc on a file or a range of entries in a file
114  Process(code, msg);
115  } else if (code == PoolCode::kSendResult) {
116  //send back result
118  } else {
119  //unknown code received
120  std::string reply = "S" + std::to_string(GetNWorker());
121  reply += ": unknown code received: " + std::to_string(code);
122  MPSend(GetSocket(), MPCode::kError, reply.data());
123  }
124 }
125 
126 
127 template<class F>
128 void TPoolProcessor<F>::Init(int fd, unsigned workerN) {
129  TMPWorker::Init(fd, workerN);
130 
132 }
133 
134 
135 template<class F>
136 void TPoolProcessor<F>::Process(unsigned code, MPCodeBufPair& msg)
137 {
138  //evaluate the index of the file to process in fFileNames
139  //(we actually don't need the parameter if code == kProcTree)
140  unsigned fileN = 0;
141  unsigned nProcessed = 0;
142  if (code == PoolCode::kProcRange || code == PoolCode::kProcTree) {
143  if (code == PoolCode::kProcTree && !fTree) {
144  // This must be defined
145  Error("TPoolProcessor::Process", "[S]: Process:kProcTree fTree undefined!\n");
146  return;
147  }
148  //retrieve the total number of entries ranges processed so far by TPool
149  nProcessed = ReadBuffer<unsigned>(msg.second.get());
150  //evaluate the file and the entries range to process
151  fileN = nProcessed / fNWorkers;
152  } else {
153  //evaluate the file and the entries range to process
154  fileN = ReadBuffer<unsigned>(msg.second.get());
155  }
156 
157  std::unique_ptr<TFile> fp;
158  TTree *tree = nullptr;
159  if (code != PoolCode::kProcTree ||
160  (code == PoolCode::kProcTree && fTree->GetCurrentFile())) {
161  //open file
162  if (code == PoolCode::kProcTree && fTree->GetCurrentFile()) {
163  // Single tree from file: we need to reopen, because file descriptor gets invalidated across Fork
164  fp.reset(OpenFile(fTree->GetCurrentFile()->GetName()));
165  } else {
166  fp.reset(OpenFile(fFileNames[fileN]));
167  }
168  if (fp == nullptr) {
169  //errors are handled inside OpenFile
170  return;
171  }
172 
173  //retrieve the TTree with the specified name from file
174  //we are not the owner of the TTree object, the file is!
175  tree = RetrieveTree(fp.get());
176  if(tree == nullptr) {
177  //errors are handled inside RetrieveTree
178  return;
179  }
180  } else {
181  // Tree in memory: OK
182  tree = fTree;
183  }
184 
185  // Setup the cache, if required
186  SetupTreeCache(tree);
187 
188  //create entries range
189  Long64_t start = 0;
190  Long64_t finish = 0;
191  if (code == PoolCode::kProcRange || code == PoolCode::kProcTree) {
192  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
193  //and this worker must take the rangeN-th range
194  unsigned nEntries = tree->GetEntries();
195  unsigned nBunch = nEntries / fNWorkers;
196  unsigned rangeN = nProcessed % fNWorkers;
197  start = rangeN*nBunch;
198  if(rangeN < (fNWorkers-1))
199  finish = (rangeN+1)*nBunch;
200  else
201  finish = nEntries;
202  } else {
203  start = 0;
204  finish = tree->GetEntries();
205  }
206 
207  //check if we are going to reach the max of entries
208  //change finish accordingly
209  if (fMaxNEntries)
210  if (fProcessedEntries + finish - start > fMaxNEntries)
211  finish = start + fMaxNEntries - fProcessedEntries;
212 
213  // create a TTreeReader that reads this range of entries
214  TTreeReader reader(tree);
215  TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
216  if(status != TTreeReader::kEntryValid) {
217  std::string reply = "S" + std::to_string(GetNWorker());
218  reply += ": could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish);
219  MPSend(GetSocket(), PoolCode::kProcError, reply.data());
220  return;
221  }
222 
223  //execute function
224  auto res = fProcFunc(reader);
225 
226  //detach result from file if needed (currently needed for TH1, TTree, TEventList)
227  DetachRes(res);
228 
229  //update the number of processed entries
230  fProcessedEntries += finish - start;
231 
232  if(fCanReduce) {
233  PoolUtils::ReduceObjects<TObject *> redfunc;
234  fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
235  } else {
236  fCanReduce = true;
237  fReducedResult = res;
238  }
239 
240  if(fMaxNEntries == fProcessedEntries)
241  //we are done forever
243  else
244  //we are done for now
246 }
247 
248 template<class F>
250 {
251  //e.g.: when dividing 10 entries between 3 workers, the first
252  //two will process 10/3 == 3 entries, the last one will process
253  //10 - 2*(10/3) == 4 entries.
254  if(GetNWorker() < fNWorkers-1)
255  return maxEntries/fNWorkers;
256  else
257  return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
258 }
259 
260 #endif
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
std::result_of< F(std::reference_wrapper< TTreeReader >)>::type fReducedResult
the results of the executions of fProcFunc merged together
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result ...
long long Long64_t
Definition: RtypesCore.h:69
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:40
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:48
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:39
TFile * OpenFile(const std::string &fileName)
Handle file opening.
Definition: TMPWorker.cxx:185
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8051
double T(double x)
Definition: ChebyshevPol.h:34
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:30
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error message.
Definition: MPCode.h:30
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition: TMPWorker.h:57
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:38
We are ready for the next task.
Definition: PoolUtils.h:35
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
unsigned GetNWorker() const
Definition: TMPWorker.h:47
void Process(unsigned code, MPCodeBufPair &msg)
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
Definition: TMPWorker.h:53
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition: TMPWorker.h:56
void Error(const char *location, const char *msgfmt,...)
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a TPool client.
F fProcFunc
the function to be executed
void SetupTreeCache(TTree *tree)
Tree cache handling.
Definition: TMPWorker.cxx:236
EEntryStatus SetEntriesRange(Long64_t beginEntry, Long64_t endEntry)
Set the range of entries to be processed.
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir...
Definition: TEventList.cxx:351
#define F(x, y, z)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
unsigned fNWorkers
the number of workers spawned
Definition: TMPWorker.h:55
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition: TTree.cxx:5053
TSocket * GetSocket()
Definition: TMPWorker.h:45
TPoolProcessor(F procFunc, const std::vector< std::string > &fileNames, const std::string &treeName, unsigned nWorkers, ULong64_t maxEntries)
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
The message contains the result of the processing of a TTree.
Definition: PoolUtils.h:42
Tell the client there was an error while processing.
Definition: PoolUtils.h:44
virtual void SetDirectory(TDirectory *dir)
Change the tree&#39;s directory.
Definition: TTree.cxx:8326
Ask for a kFuncResult/kProcResult.
Definition: PoolUtils.h:36
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
void DetachRes(T res)
The TH1 histogram class.
Definition: TH1.h:80
virtual Long64_t GetEntries() const
Definition: TTree.h:393
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TMPWorker.h:51
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:111
void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:98
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
Definition: TMPWorker.cxx:204