Logo ROOT  
Reference Guide
TMPWorkerTree.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: G Ganis Jan 2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerTree
13#define ROOT_TMPWorkerTree
14
15#include "TMPWorker.h"
16#include "TFile.h"
17#include "TEntryList.h"
18#include "TEventList.h"
19#include "TH1.h"
20#include "TKey.h"
21#include "TSelector.h"
22#include "TTree.h"
23#include "TTreeCache.h"
24#include "TTreeReader.h"
25
26#include <memory> //unique_ptr
27#include <string>
28#include <sstream>
29#include <type_traits> //std::result_of
30#include <unistd.h> //pid_t
31
32class TMPWorkerTree : public TMPWorker {
33 /// \cond
34// ClassDef(TMPWorkerTree, 0);
35 /// \endcond
36public:
38 TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
39 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
40 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
41 virtual ~TMPWorkerTree();
42
43 // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
44 TMPWorkerTree(const TMPWorkerTree &) = delete;
46
47protected:
48
49 void CloseFile();
51 void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a MP client
52 void Init(int fd, UInt_t workerN);
53 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
54 std::string &errmsg);
55 TFile *OpenFile(const std::string& fileName);
56 virtual void Process(UInt_t, MPCodeBufPair &) {}
58 virtual void SendResult() { }
59 void Setup();
61
62 std::vector<std::string> fFileNames; ///< the files to be processed by all workers
63 std::string fTreeName; ///< the name of the tree to be processed
64 TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
65 TFile *fFile; ///< last open file
66 TEntryList *fEntryList; ///< entrylist
67 ULong64_t fFirstEntry; ///< first entry to br processed
68
69private:
70
71 // TTree cache handling
72 TTreeCache *fTreeCache; ///< instance of the tree cache for the tree
73 Bool_t fTreeCacheIsLearning; ///< Whether cache is in learning phase
74 Bool_t fUseTreeCache; ///< Control usage of the tree cache
75 Long64_t fCacheSize; ///< Cache size
76};
77
78template<class F>
80public:
81 TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
82 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
83 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
85 {
86 }
87 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
88 ULong64_t firstEntry)
89 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
90 fCanReduce(false)
91 {
92 }
93 virtual ~TMPWorkerTreeFunc() {}
94
95private:
96 void Process(UInt_t code, MPCodeBufPair &msg);
97 void SendResult();
98
99 F fProcFunc; ///< copy the function to be executed
100 typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type fReducedResult; ///< the results of the executions of fProcFunc merged together
101 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
102};
103
105public:
106 TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
107 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
108 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
109 fCallBegin(true)
110 {
111 }
112 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
113 ULong64_t firstEntry)
114 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
115 {
116 }
117 virtual ~TMPWorkerTreeSel() {}
118
119private:
120 void Process(UInt_t code, MPCodeBufPair &msg);
121 void SendResult();
122
123 TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
124 bool fCallBegin = true;
125};
126
127//////////////////////////////////////////////////////////////////////////
128/// Auxilliary templated functions
129/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
130/// problem of that object being automatically owned by the current open file.
131/// For these three types, we call SetDirectory(nullptr) to detach the returned
132/// object from the file we are reading the TTree from.
133/// Note: the only sane case in which this should happen is when a TH1F* is
134/// returned.
135template <class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
136 !std::is_constructible<TCollection *, T>::value>::type * = nullptr>
137void DetachRes(T res)
138{
139 auto th1p = dynamic_cast<TH1*>(res);
140 if(th1p != nullptr) {
141 th1p->SetDirectory(nullptr);
142 return;
143 }
144 auto ttreep = dynamic_cast<TTree*>(res);
145 if(ttreep != nullptr) {
146 ttreep->SetDirectory(nullptr);
147 return;
148 }
149 auto tentrylist = dynamic_cast<TEntryList*>(res);
150 if(tentrylist != nullptr) {
151 tentrylist->SetDirectory(nullptr);
152 return;
153 }
154 auto teventlist = dynamic_cast<TEventList*>(res);
155 if(teventlist != nullptr) {
156 teventlist->SetDirectory(nullptr);
157 return;
158 }
159 return;
160}
161
162// Specialization for TCollections
163template <class T, typename std::enable_if<std::is_pointer<T>::value &&
164 std::is_constructible<TCollection *, T>::value>::type * = nullptr>
165void DetachRes(T res)
166{
167 if (res) {
168 TIter nxo(res);
169 TObject *obj = 0;
170 while ((obj = nxo())) {
171 DetachRes(obj);
172 }
173 }
174}
175
176//////////////////////////////////////////////////////////////////////////
177/// Generic function processing SendResult and Process overload
178
179template<class F>
181{
182 //send back result
183 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
184}
185
186template <class F>
188{
189
190 Long64_t start = 0;
191 Long64_t finish = 0;
192 TEntryList *enl = 0;
193 std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
194 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
195 reply = sn + errmsg;
196 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
197 return;
198 }
199
200 // create a TTreeReader that reads this range of entries
201 TTreeReader reader(fTree, enl);
202
203 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
204 if(status != TTreeReader::kEntryValid) {
205 reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
206 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
207 return;
208 }
209
210 //execute function
211 auto res = fProcFunc(reader);
212
213 //detach result from file if needed (currently needed for TH1, TTree, TEventList)
214 DetachRes(res);
215
216 //update the number of processed entries
217 fProcessedEntries += finish - start;
218
219 if(fCanReduce) {
221 fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
222 } else {
223 fCanReduce = true;
224 fReducedResult = res;
225 }
226
227 if(fMaxNEntries == fProcessedEntries)
228 //we are done forever
229 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
230 else
231 //we are done for now
232 MPSend(GetSocket(), MPCode::kIdling);
233}
234
235#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
unsigned int UInt_t
Definition: RtypesCore.h:44
long long Long64_t
Definition: RtypesCore.h:71
unsigned long long ULong64_t
Definition: RtypesCore.h:72
int type
Definition: TGX11.cxx:120
void DetachRes(T res)
Auxilliary templated functions If the user lambda returns a TH1F*, TTree*, TEventList*,...
Merge collection of TObjects.
Definition: PoolUtils.h:35
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir.
Definition: TEventList.cxx:352
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:53
The TH1 histogram class.
Definition: TH1.h:56
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8393
Templated derivation of TMPWorkerTree handlign generic function tree processing.
Definition: TMPWorkerTree.h:79
virtual ~TMPWorkerTreeFunc()
Definition: TMPWorkerTree.h:93
TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
Definition: TMPWorkerTree.h:87
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
std::result_of< F(std::reference_wrapper< TTreeReader >)>::type fReducedResult
the results of the executions of fProcFunc merged together
TMPWorkerTreeFunc(F procFunc, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
Definition: TMPWorkerTree.h:81
void SendResult()
Generic function processing SendResult and Process overload.
void Process(UInt_t code, MPCodeBufPair &msg)
F fProcFunc
copy the function to be executed
Definition: TMPWorkerTree.h:99
Templated derivation of TMPWorkerTree handlign selector tree processing.
void SendResult()
Selector processing SendResult and Process overload.
virtual ~TMPWorkerTreeSel()
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
TMPWorkerTreeSel(TSelector &selector, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
This class works in conjuction with TTreeProcessorMP, reacting to messages received from it as specif...
Definition: TMPWorkerTree.h:32
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
TMPWorkerTree(const TMPWorkerTree &)=delete
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
Definition: TMPWorkerTree.h:64
TFile * fFile
last open file
Definition: TMPWorkerTree.h:65
void Setup()
Auxilliary method for common initializations.
ULong64_t fFirstEntry
first entry to br processed
Definition: TMPWorkerTree.h:67
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
Definition: TMPWorkerTree.h:66
TMPWorkerTree()
Class constructors.
TMPWorkerTree & operator=(const TMPWorkerTree &)=delete
Bool_t fUseTreeCache
Control usage of the tree cache.
Definition: TMPWorkerTree.h:74
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TMPWorkerTree.h:62
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
Definition: TMPWorkerTree.h:73
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
Definition: TMPWorkerTree.h:58
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
virtual void Process(UInt_t, MPCodeBufPair &)
Definition: TMPWorkerTree.h:56
virtual ~TMPWorkerTree()
Long64_t fCacheSize
Cache size.
Definition: TMPWorkerTree.h:75
std::string fTreeName
the name of the tree to be processed
Definition: TMPWorkerTree.h:63
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
Definition: TMPWorkerTree.h:72
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
Mother of all ROOT objects.
Definition: TObject.h:37
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:33
A cache to speed-up the reading of ROOT datasets.
Definition: TTreeCache.h:35
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:43
@ kEntryValid
data read okay
Definition: TTreeReader.h:125
EEntryStatus SetEntriesRange(Long64_t beginEntry, Long64_t endEntry)
Set the begin and end entry numbers.
A TTree represents a columnar dataset.
Definition: TTree.h:78
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8813
#define F(x, y, z)
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
@ kProcError
Tell the client there was an error while processing.
Definition: MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
double T(double x)
Definition: ChebyshevPol.h:34
Definition: tree.py:1