Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: G Ganis Jan 2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerTree
13#define ROOT_TMPWorkerTree
14
15#include "TMPWorker.h"
16#include "TFile.h"
17#include "TEntryList.h"
18#include "TEventList.h"
19#include "TH1.h"
20#include "TKey.h"
21#include "TSelector.h"
22#include "TTree.h"
23#include "TTreeCache.h"
24#include "TTreeReader.h"
25
26#include <memory> //unique_ptr
27#include <string>
28#include <sstream>
29#include <type_traits> //std::result_of
30#include <unistd.h> //pid_t
31#include <vector>
32
33class TMPWorkerTree : public TMPWorker {
34 /// \cond
35// ClassDef(TMPWorkerTree, 0);
36 /// \endcond
37public:
39 TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
40 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
41 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
42 virtual ~TMPWorkerTree();
43
44 // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
45 TMPWorkerTree(const TMPWorkerTree &) = delete;
47
48protected:
49
50 void CloseFile();
52 void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a MP client
53 void Init(int fd, UInt_t workerN);
54 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
55 std::string &errmsg);
56 TFile *OpenFile(const std::string& fileName);
57 virtual void Process(UInt_t, MPCodeBufPair &) {}
59 virtual void SendResult() { }
60 void Setup();
62
63 std::vector<std::string> fFileNames; ///< the files to be processed by all workers
64 std::string fTreeName; ///< the name of the tree to be processed
65 TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
66 TFile *fFile; ///< last open file
67 TEntryList *fEntryList; ///< entrylist
68 ULong64_t fFirstEntry; ///< first entry to br processed
69
70private:
71
72 // TTree cache handling
73 TTreeCache *fTreeCache; ///< instance of the tree cache for the tree
74 Bool_t fTreeCacheIsLearning; ///< Whether cache is in learning phase
75 Bool_t fUseTreeCache; ///< Control usage of the tree cache
76 Long64_t fCacheSize; ///< Cache size
77};
78
79template<class F>
81public:
82 TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
83 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
84 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
86 {
87 }
88 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
89 ULong64_t firstEntry)
90 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
91 fCanReduce(false)
92 {
93 }
94 virtual ~TMPWorkerTreeFunc() {}
95
96private:
97 void Process(UInt_t code, MPCodeBufPair &msg);
98 void SendResult();
99
100 F fProcFunc; ///< copy the function to be executed
101 typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type fReducedResult; ///< the results of the executions of fProcFunc merged together
102 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
103};
104
106public:
107 TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
108 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
109 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
110 fCallBegin(true)
111 {
112 }
113 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
114 ULong64_t firstEntry)
115 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
116 {
117 }
118 virtual ~TMPWorkerTreeSel() {}
119
120private:
121 void Process(UInt_t code, MPCodeBufPair &msg);
122 void SendResult();
123
124 TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
125 bool fCallBegin = true;
126};
127
128//////////////////////////////////////////////////////////////////////////
129/// Auxilliary templated functions
130/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
131/// problem of that object being automatically owned by the current open file.
132/// For these three types, we call SetDirectory(nullptr) to detach the returned
133/// object from the file we are reading the TTree from.
134/// Note: the only sane case in which this should happen is when a TH1F* is
135/// returned.
136template <class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
137 !std::is_constructible<TCollection *, T>::value>::type * = nullptr>
138void DetachRes(T res)
139{
140 auto th1p = dynamic_cast<TH1*>(res);
141 if(th1p != nullptr) {
142 th1p->SetDirectory(nullptr);
143 return;
144 }
145 auto ttreep = dynamic_cast<TTree*>(res);
146 if(ttreep != nullptr) {
147 ttreep->SetDirectory(nullptr);
148 return;
149 }
150 auto tentrylist = dynamic_cast<TEntryList*>(res);
151 if(tentrylist != nullptr) {
152 tentrylist->SetDirectory(nullptr);
153 return;
154 }
155 auto teventlist = dynamic_cast<TEventList*>(res);
156 if(teventlist != nullptr) {
157 teventlist->SetDirectory(nullptr);
158 return;
159 }
160 return;
161}
162
163// Specialization for TCollections
164template <class T, typename std::enable_if<std::is_pointer<T>::value &&
165 std::is_constructible<TCollection *, T>::value>::type * = nullptr>
166void DetachRes(T res)
167{
168 if (res) {
169 TIter nxo(res);
170 TObject *obj = 0;
171 while ((obj = nxo())) {
172 DetachRes(obj);
173 }
174 }
175}
176
177//////////////////////////////////////////////////////////////////////////
178/// Generic function processing SendResult and Process overload
179
180template<class F>
182{
183 //send back result
184 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
185}
186
187template <class F>
189{
190
191 Long64_t start = 0;
192 Long64_t finish = 0;
193 TEntryList *enl = 0;
194 std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
195 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
196 reply = sn + errmsg;
197 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
198 return;
199 }
200
201 // create a TTreeReader that reads this range of entries
202 TTreeReader reader(fTree, enl);
203
204 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
205 if(status != TTreeReader::kEntryValid) {
206 reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
207 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
208 return;
209 }
210
211 //execute function
212 auto res = fProcFunc(reader);
213
214 //detach result from file if needed (currently needed for TH1, TTree, TEventList)
215 DetachRes(res);
216
217 //update the number of processed entries
218 fProcessedEntries += finish - start;
219
220 if(fCanReduce) {
222 fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
223 } else {
224 fCanReduce = true;
225 fReducedResult = res;
226 }
227
228 if(fMaxNEntries == fProcessedEntries)
229 //we are done forever
230 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
231 else
232 //we are done for now
233 MPSend(GetSocket(), MPCode::kIdling);
234}
235
236#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned int UInt_t
Definition RtypesCore.h:46
long long Long64_t
Definition RtypesCore.h:73
unsigned long long ULong64_t
Definition RtypesCore.h:74
int type
Definition TGX11.cxx:121
void DetachRes(T res)
Auxilliary templated functions If the user lambda returns a TH1F*, TTree*, TEventList*,...
Merge collection of TObjects.
Definition PoolUtils.h:35
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
TH1 is the base class of all histogram classes in ROOT.
Definition TH1.h:58
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition TH1.cxx:8777
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual ~TMPWorkerTreeFunc()
TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
std::result_of< F(std::reference_wrapper< TTreeReader >)>::type fReducedResult
the results of the executions of fProcFunc merged together
TMPWorkerTreeFunc(F procFunc, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
void SendResult()
Generic function processing SendResult and Process overload.
void Process(UInt_t code, MPCodeBufPair &msg)
F fProcFunc
copy the function to be executed
Templated derivation of TMPWorkerTree handlign selector tree processing.
void SendResult()
Selector processing SendResult and Process overload.
virtual ~TMPWorkerTreeSel()
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
TMPWorkerTreeSel(TSelector &selector, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
This class works in conjuction with TTreeProcessorMP, reacting to messages received from it as specif...
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
TMPWorkerTree(const TMPWorkerTree &)=delete
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
void Setup()
Auxilliary method for common initializations.
ULong64_t fFirstEntry
first entry to br processed
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
TMPWorkerTree & operator=(const TMPWorkerTree &)=delete
Bool_t fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
virtual void Process(UInt_t, MPCodeBufPair &)
virtual ~TMPWorkerTree()
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:26
Mother of all ROOT objects.
Definition TObject.h:37
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryValid
data read okay
EEntryStatus SetEntriesRange(Long64_t beginEntry, Long64_t endEntry)
Set the begin and end entry numbers.
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition TTree.cxx:8925
#define F(x, y, z)
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
Definition tree.py:1