Logo ROOT   6.10/09
Reference Guide
TMPWorkerTree.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: G Ganis Jan 2017
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TMPWorkerTree
13 #define ROOT_TMPWorkerTree
14 
15 #include "TMPWorker.h"
16 #include "TFile.h"
17 #include "TEntryList.h"
18 #include "TEventList.h"
19 #include "TH1.h"
20 #include "TKey.h"
21 #include "TSelector.h"
22 #include "TTree.h"
23 #include "TTreeCache.h"
24 #include "TTreeReader.h"
25 
26 #include <memory> //unique_ptr
27 #include <string>
28 #include <sstream>
29 #include <type_traits> //std::result_of
30 #include <unistd.h> //pid_t
31 
32 class TMPWorkerTree : public TMPWorker {
33  /// \cond
34 // ClassDef(TMPWorkerTree, 0);
35  /// \endcond
36 public:
37  TMPWorkerTree();
38  TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
39  UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
40  TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
41  virtual ~TMPWorkerTree();
42 
43  // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
44  TMPWorkerTree(const TMPWorkerTree &) = delete;
45  TMPWorkerTree &operator=(const TMPWorkerTree &) = delete;
46 
47 protected:
48 
49  void CloseFile();
51  void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a MP client
52  void Init(int fd, UInt_t workerN);
53  Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
54  std::string &errmsg);
55  TFile *OpenFile(const std::string& fileName);
56  virtual void Process(UInt_t, MPCodeBufPair &) {}
57  TTree *RetrieveTree(TFile *fp);
58  virtual void SendResult() { }
59  void Setup();
60  void SetupTreeCache(TTree *tree);
61 
62  std::vector<std::string> fFileNames; ///< the files to be processed by all workers
63  std::string fTreeName; ///< the name of the tree to be processed
64  TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
65  TFile *fFile; ///< last open file
66  TEntryList *fEntryList; ///< entrylist
67  ULong64_t fFirstEntry; ///< first entry to br processed
68 
69 private:
70 
71  // TTree cache handling
72  TTreeCache *fTreeCache; ///< instance of the tree cache for the tree
73  Bool_t fTreeCacheIsLearning; ///< Whether cache is in learning phase
74  Bool_t fUseTreeCache; ///< Control usage of the tree cache
75  Long64_t fCacheSize; ///< Cache size
76 };
77 
78 template<class F>
80 public:
81  TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
82  const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
83  : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
84  fReducedResult(), fCanReduce(false)
85  {
86  }
87  TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
88  ULong64_t firstEntry)
89  : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
90  fCanReduce(false)
91  {
92  }
93  virtual ~TMPWorkerTreeFunc() {}
94 
95 private:
96  void Process(UInt_t code, MPCodeBufPair &msg);
97  void SendResult();
98 
99  F fProcFunc; ///< copy the function to be executed
100  typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type fReducedResult; ///< the results of the executions of fProcFunc merged together
101  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
102 };
103 
105 public:
106  TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
107  const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
108  : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
109  fCallBegin(true)
110  {
111  }
112  TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
113  ULong64_t firstEntry)
114  : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
115  {
116  }
117  virtual ~TMPWorkerTreeSel() {}
118 
119 private:
120  void Process(UInt_t code, MPCodeBufPair &msg);
121  void SendResult();
122 
123  TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
124  bool fCallBegin = true;
125 };
126 
127 //////////////////////////////////////////////////////////////////////////
128 /// Auxilliary templated functions
129 /// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
130 /// problem of that object being automatically owned by the current open file.
131 /// For these three types, we call SetDirectory(nullptr) to detach the returned
132 /// object from the file we are reading the TTree from.
133 /// Note: the only sane case in which this should happen is when a TH1F* is
134 /// returned.
135 template <class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
136  !std::is_constructible<TCollection *, T>::value>::type * = nullptr>
137 void DetachRes(T res)
138 {
139  auto th1p = dynamic_cast<TH1*>(res);
140  if(th1p != nullptr) {
141  th1p->SetDirectory(nullptr);
142  return;
143  }
144  auto ttreep = dynamic_cast<TTree*>(res);
145  if(ttreep != nullptr) {
146  ttreep->SetDirectory(nullptr);
147  return;
148  }
149  auto tentrylist = dynamic_cast<TEntryList*>(res);
150  if(tentrylist != nullptr) {
151  tentrylist->SetDirectory(nullptr);
152  return;
153  }
154  auto teventlist = dynamic_cast<TEventList*>(res);
155  if(teventlist != nullptr) {
156  teventlist->SetDirectory(nullptr);
157  return;
158  }
159  return;
160 }
161 
162 // Specialization for TCollections
163 template <class T, typename std::enable_if<std::is_pointer<T>::value &&
164  std::is_constructible<TCollection *, T>::value>::type * = nullptr>
165 void DetachRes(T res)
166 {
167  if (res) {
168  TIter nxo(res);
169  TObject *obj = 0;
170  while ((obj = nxo())) {
171  DetachRes(obj);
172  }
173  }
174 }
175 
176 //////////////////////////////////////////////////////////////////////////
177 /// Generic function processing SendResult and Process overload
178 
179 template<class F>
181 {
182  //send back result
183  MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
184 }
185 
186 template <class F>
188 {
189 
190  Long64_t start = 0;
191  Long64_t finish = 0;
192  TEntryList *enl = 0;
193  std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
194  if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
195  reply = sn + errmsg;
196  MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
197  return;
198  }
199 
200  // create a TTreeReader that reads this range of entries
201  TTreeReader reader(fTree, enl);
202 
203  TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
204  if(status != TTreeReader::kEntryValid) {
205  reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
206  MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
207  return;
208  }
209 
210  //execute function
211  auto res = fProcFunc(reader);
212 
213  //detach result from file if needed (currently needed for TH1, TTree, TEventList)
214  DetachRes(res);
215 
216  //update the number of processed entries
217  fProcessedEntries += finish - start;
218 
219  if(fCanReduce) {
220  PoolUtils::ReduceObjects<TObject *> redfunc;
221  fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
222  } else {
223  fCanReduce = true;
224  fReducedResult = res;
225  }
226 
228  //we are done forever
229  MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
230  else
231  //we are done for now
233 }
234 
235 #endif
void CloseFile()
Handle file closing.
TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
Definition: TMPWorkerTree.h:87
long long Long64_t
Definition: RtypesCore.h:69
void Setup()
Auxilliary method for common initializations.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:43
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8053
double T(double x)
Definition: ChebyshevPol.h:34
std::string fTreeName
the name of the tree to be processed
Definition: TMPWorkerTree.h:63
void DetachRes(T res)
Auxilliary templated functions If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the problem of that object being automatically owned by the current open file.
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:30
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector...
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:46
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
TMPWorkerTree()
Class constructors.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
TMPWorkerTreeSel(TSelector &selector, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition: TMPWorker.h:51
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
Definition: TMPWorkerTree.h:64
TTreeCache * fTreeCache
instance of the tree cache for the tree
Definition: TMPWorkerTree.h:72
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
This class works in conjuction with TTreeProcessorMP, reacting to messages received from it as specif...
Definition: TMPWorkerTree.h:32
unsigned GetNWorker() const
Definition: TMPWorker.h:45
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result ...
TMPWorkerTree & operator=(const TMPWorkerTree &)=delete
virtual void SendResult()
Definition: TMPWorkerTree.h:58
virtual ~TMPWorkerTreeFunc()
Definition: TMPWorkerTree.h:93
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
void Process(UInt_t code, MPCodeBufPair &msg)
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
Definition: TMPWorkerTree.h:73
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition: TMPWorker.h:50
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir...
Definition: TEventList.cxx:351
#define F(x, y, z)
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
virtual void Process(UInt_t, MPCodeBufPair &)
Definition: TMPWorkerTree.h:56
void SetupTreeCache(TTree *tree)
Tree cache handling.
virtual ~TMPWorkerTreeSel()
unsigned int UInt_t
Definition: RtypesCore.h:42
TSocket * GetSocket()
Definition: TMPWorker.h:43
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
TMPWorkerTreeFunc(F procFunc, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
Definition: TMPWorkerTree.h:81
virtual void SetDirectory(TDirectory *dir)
Change the tree&#39;s directory.
Definition: TTree.cxx:8439
please use SetEntriesRange()!")
Definition: TTreeReader.h:188
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
Bool_t fUseTreeCache
Control usage of the tree cache.
Definition: TMPWorkerTree.h:74
TEntryList * fEntryList
entrylist
Definition: TMPWorkerTree.h:66
The TH1 histogram class.
Definition: TH1.h:56
Templated derivation of TMPWorkerTree handlign generic function tree processing.
Definition: TMPWorkerTree.h:79
Long64_t fCacheSize
Cache size.
Definition: TMPWorkerTree.h:75
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TMPWorkerTree.h:62
Templated derivation of TMPWorkerTree handlign selector tree processing.
Mother of all ROOT objects.
Definition: TObject.h:37
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
F fProcFunc
copy the function to be executed
Definition: TMPWorkerTree.h:99
virtual ~TMPWorkerTree()
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:78
void SendResult()
Generic function processing SendResult and Process overload.
Tell the client there was an error while processing.
Definition: MPCode.h:44
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:33
ULong64_t fFirstEntry
first entry to br processed
Definition: TMPWorkerTree.h:67
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:25
We are ready for the next task.
Definition: MPCode.h:35
TFile * fFile
last open file
Definition: TMPWorkerTree.h:65