Logo ROOT  
Reference Guide
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
17
18#include <functional>
19#include <map>
20#include <memory>
21#include <string>
22#include <vector>
23
24// forward declarations
25class TTree;
26class TTreeReader;
27class TDirectory;
28
29namespace ROOT {
30namespace RDF {
31class RCutFlowReport;
32class RDataSource;
33} // ns RDF
34
35namespace Internal {
36namespace RDF {
37std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
38
39class RActionBase;
40class GraphNode;
41
42namespace GraphDrawing {
44} // ns GraphDrawing
45
46using Callback_t = std::function<void(unsigned int)>;
47
48class RCallback {
51 std::vector<ULong64_t> fCounters;
52
53public:
54 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
55 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
56 {
57 }
58
59 void operator()(unsigned int slot)
60 {
61 auto &c = fCounters[slot];
62 ++c;
63 if (c == fEveryN) {
64 c = 0ull;
65 fFun(slot);
66 }
67 }
68};
69
72 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
73
74public:
75 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
76
77 void operator()(unsigned int slot)
78 {
79 if (fHasBeenCalled[slot] == 1)
80 return;
81 fFun(slot);
82 fHasBeenCalled[slot] = 1;
83 }
84};
85
86} // ns RDF
87} // ns Internal
88
89namespace Detail {
90namespace RDF {
92
93class RFilterBase;
94class RRangeBase;
96
97/// The head node of a RDF computation graph.
98/// This class is responsible of running the event loop.
99class RLoopManager : public RNodeBase {
100 using ColumnNames_t = std::vector<std::string>;
102
103 friend struct RCallCleanUpTask;
104
105 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
106 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
107 std::vector<RFilterBase *> fBookedFilters;
108 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
109 std::vector<RRangeBase *> fBookedRanges;
110
111 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
112 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
113 std::shared_ptr<TTree> fTree{nullptr};
116 const unsigned int fNSlots{1};
118 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
119 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
120 std::map<std::string, std::string> fAliasColumnNameMap; ///< ColumnNameAlias-columnName pairs
121 std::vector<RDFInternal::RCallback> fCallbacks; ///< Registered callbacks
122 /// Registered callbacks to invoke just once before running the loop
123 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
124 /// Registered callbacks to call at the beginning of each "data block"
125 std::vector<ROOT::RDF::SampleCallback_t> fSampleCallbacks;
127 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
128 unsigned int fNRuns{0}; ///< Number of event loops run
129
130 /// Registry of per-slot value pointers for booked data-source columns
131 std::map<std::string, std::vector<void *>> fDSValuePtrMap;
132
133 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
135
137 void RunEmptySourceMT();
138 void RunEmptySource();
139 void RunTreeProcessorMT();
140 void RunTreeReader();
141 void RunDataSourceMT();
142 void RunDataSource();
143 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
144 void InitNodeSlots(TTreeReader *r, unsigned int slot);
145 void InitNodes();
146 void CleanUpNodes();
147 void CleanUpTask(TTreeReader *r, unsigned int slot);
148 void EvalChildrenCounts();
149 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
150 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
151 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
152
153public:
154 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
155 RLoopManager(ULong64_t nEmptyEntries);
156 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
157 RLoopManager(const RLoopManager &) = delete;
159
161 void Jit();
162 RLoopManager *GetLoopManagerUnchecked() final { return this; }
163 void Run();
165 TTree *GetTree() const;
168 RDataSource *GetDataSource() const { return fDataSource.get(); }
169 void Book(RDFInternal::RActionBase *actionPtr);
170 void Deregister(RDFInternal::RActionBase *actionPtr);
171 void Book(RFilterBase *filterPtr);
172 void Deregister(RFilterBase *filterPtr);
173 void Book(RRangeBase *rangePtr);
174 void Deregister(RRangeBase *rangePtr);
175 bool CheckFilters(unsigned int, Long64_t) final;
176 unsigned int GetNSlots() const { return fNSlots; }
177 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
178 /// End of recursive chain of calls, does nothing
180 void SetTree(const std::shared_ptr<TTree> &tree) { fTree = tree; }
181 void IncrChildrenCount() final { ++fNChildren; }
182 void StopProcessing() final { ++fNStopsReceived; }
183 void ToJitExec(const std::string &) const;
184 void AddColumnAlias(const std::string &alias, const std::string &colName) { fAliasColumnNameMap[alias] = colName; }
185 const std::map<std::string, std::string> &GetAliasMap() const { return fAliasColumnNameMap; }
186 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
187 unsigned int GetNRuns() const { return fNRuns; }
188 bool HasDSValuePtrs(const std::string &col) const;
189 const std::map<std::string, std::vector<void *>> &GetDSValuePtrs() const { return fDSValuePtrMap; }
190 void AddDSValuePtrs(const std::string &col, const std::vector<void *> ptrs);
191
192 /// End of recursive chain of calls, does nothing
193 void AddFilterName(std::vector<std::string> &) {}
194 /// For each booked filter, returns either the name or "Unnamed Filter"
195 std::vector<std::string> GetFiltersNames();
196
197 /// Return all graph edges known to RLoopManager
198 /// This includes Filters and Ranges but not Defines.
199 std::vector<RNodeBase *> GetGraphEdges() const;
200
201 /// Return all actions, either booked or already run
202 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
203
204 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> GetGraph();
205
207
209};
210
211} // ns RDF
212} // ns Detail
213} // ns ROOT
214
215#endif
typedef void(GLAPIENTRYP _GLUfuncptr)(void)
ROOT::R::TRInterface & r
Definition: Object.C:4
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
long long Long64_t
Definition: RtypesCore.h:80
unsigned long long ULong64_t
Definition: RtypesCore.h:81
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
const std::map< std::string, std::string > & GetAliasMap() const
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
ULong64_t GetNEmptyEntries() const
RLoopManager & operator=(const RLoopManager &)=delete
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
void AddSampleCallback(ROOT::RDF::SampleCallback_t &&callback)
unsigned int GetNRuns() const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
::TDirectory * GetDirectory() const
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
void AddFilterName(std::vector< std::string > &)
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::map< std::string, std::string > fAliasColumnNameMap
ColumnNameAlias-columnName pairs.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::vector< RDFInternal::RCallback > fCallbacks
Registered callbacks.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void AddColumnAlias(const std::string &alias, const std::string &colName)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
const std::map< std::string, std::vector< void * > > & GetDSValuePtrs() const
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
void SetTree(const std::shared_ptr< TTree > &tree)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
Base class for non-leaf nodes of the computational graph.
Definition: RNodeBase.hxx:41
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: RNodeBase.hxx:45
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: RNodeBase.hxx:44
Helper class that provides the operation graph nodes.
Class used to create the operation graph to be printed in the dot representation.
Definition: GraphNode.hxx:26
RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
std::vector< ULong64_t > fCounters
void operator()(unsigned int slot)
void operator()(unsigned int slot)
ROneTimeCallback(Callback_t &&f, unsigned int nSlots)
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition: TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:44
A TTree represents a columnar dataset.
Definition: TTree.h:79
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
std::function< void(unsigned int)> Callback_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
Definition: RSampleInfo.hxx:84
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:150
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: tree.py:1
A RAII object that calls RLoopManager::CleanUpTask at destruction.