Logo ROOT   6.16/01
Reference Guide
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
16
17#include <functional>
18#include <map>
19#include <memory>
20#include <string>
21#include <vector>
22
23// forward declarations
24class TTreeReader;
25
26namespace ROOT {
27namespace RDF {
28class RCutFlowReport;
29class RDataSource;
30} // ns RDF
31
32namespace Internal {
33namespace RDF {
34ColumnNames_t GetBranchNames(TTree &t, bool allowDuplicates = true);
35
36class RActionBase;
37class GraphNode;
38
39namespace GraphDrawing {
41} // ns GraphDrawing
42} // ns RDF
43} // ns Internal
44
45namespace Detail {
46namespace RDF {
47using namespace ROOT::TypeTraits;
49
50class RCustomColumnBase;
51class RFilterBase;
52class RRangeBase;
53
54/// The head node of a RDF computation graph.
55/// This class is responsible of running the event loop.
56class RLoopManager : public RNodeBase {
60 using Callback_t = std::function<void(unsigned int)>;
61 class TCallback {
64 std::vector<ULong64_t> fCounters;
65
66 public:
67 TCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
68 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
69 {
70 }
71
72 void operator()(unsigned int slot)
73 {
74 auto &c = fCounters[slot];
75 ++c;
76 if (c == fEveryN) {
77 c = 0ull;
78 fFun(slot);
79 }
80 }
81 };
82
85 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
86
87 public:
88 TOneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
89
90 void operator()(unsigned int slot)
91 {
92 if (fHasBeenCalled[slot] == 1)
93 return;
94 fFun(slot);
95 fHasBeenCalled[slot] = 1;
96 }
97 };
98
99 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
100 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
101 std::vector<RFilterBase *> fBookedFilters;
102 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
103 std::vector<RRangeBase *> fBookedRanges;
104
105 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
106 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
107 std::shared_ptr<TTree> fTree{nullptr};
110 const unsigned int fNSlots{1};
112 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
113 std::string fToJit; ///< code that should be jitted and executed right before the event loop
114 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
115 std::map<std::string, std::string> fAliasColumnNameMap; ///< ColumnNameAlias-columnName pairs
116 std::vector<TCallback> fCallbacks; ///< Registered callbacks
117 std::vector<TOneTimeCallback> fCallbacksOnce; ///< Registered callbacks to invoke just once before running the loop
118 /// A unique ID that identifies the computation graph that starts with this RLoopManager.
119 /// Used, for example, to jit objects in a namespace reserved for this computation graph
120 const unsigned int fID = GetNextID();
121
122 std::vector<RCustomColumnBase *> fCustomColumns; ///< Non-owning container of all custom columns created so far.
123 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
125
126 void RunEmptySourceMT();
127 void RunEmptySource();
128 void RunTreeProcessorMT();
129 void RunTreeReader();
130 void RunDataSourceMT();
131 void RunDataSource();
132 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
133 void InitNodeSlots(TTreeReader *r, unsigned int slot);
134 void InitNodes();
135 void CleanUpNodes();
136 void CleanUpTask(unsigned int slot);
137 void EvalChildrenCounts();
138 static unsigned int GetNextID();
139
140public:
141 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
142 RLoopManager(ULong64_t nEmptyEntries);
143 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
144 RLoopManager(const RLoopManager &) = delete;
146
147 void BuildJittedNodes();
148 RLoopManager *GetLoopManagerUnchecked() final { return this; }
149 void Run();
151 TTree *GetTree() const;
154 RDataSource *GetDataSource() const { return fDataSource.get(); }
155 void Book(RDFInternal::RActionBase *actionPtr);
156 void Deregister(RDFInternal::RActionBase *actionPtr);
157 void Book(RFilterBase *filterPtr);
158 void Deregister(RFilterBase *filterPtr);
159 void Book(RRangeBase *rangePtr);
160 void Deregister(RRangeBase *rangePtr);
161 bool CheckFilters(unsigned int, Long64_t) final;
162 unsigned int GetNSlots() const { return fNSlots; }
164 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
165 /// End of recursive chain of calls, does nothing
167 void SetTree(const std::shared_ptr<TTree> &tree) { fTree = tree; }
168 void IncrChildrenCount() final { ++fNChildren; }
169 void StopProcessing() final { ++fNStopsReceived; }
170 void ToJit(const std::string &s) { fToJit.append(s); }
171 void AddColumnAlias(const std::string &alias, const std::string &colName) { fAliasColumnNameMap[alias] = colName; }
172 const std::map<std::string, std::string> &GetAliasMap() const { return fAliasColumnNameMap; }
173 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
174 unsigned int GetID() const { return fID; }
175
176 /// End of recursive chain of calls, does nothing
177 void AddFilterName(std::vector<std::string> &) {}
178 /// For each booked filter, returns either the name or "Unnamed Filter"
179 std::vector<std::string> GetFiltersNames();
180
181 /// For all the actions, either booked or run
182 std::vector<RDFInternal::RActionBase *> GetAllActions();
183
184 void RegisterCustomColumn(RCustomColumnBase *column) { fCustomColumns.push_back(column); }
185
187 {
188 fCustomColumns.erase(std::remove(fCustomColumns.begin(), fCustomColumns.end(), column), fCustomColumns.end());
189 }
190
191 std::vector<RDFInternal::RActionBase *> GetBookedActions() { return fBookedActions; }
192 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> GetGraph();
193
195};
196
197} // ns RDF
198} // ns Detail
199} // ns ROOT
200
201#endif
ROOT::R::TRInterface & r
Definition: Object.C:4
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
long long Long64_t
Definition: RtypesCore.h:69
unsigned long long ULong64_t
Definition: RtypesCore.h:70
typedef void((*Func_t)())
TCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
TOneTimeCallback(Callback_t &&f, unsigned int nSlots)
The head node of a RDF computation graph.
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
const unsigned int fID
A unique ID that identifies the computation graph that starts with this RLoopManager.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
const std::map< std::string, std::string > & GetAliasMap() const
std::function< void(unsigned int)> Callback_t
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void ToJit(const std::string &s)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
ULong64_t GetNEmptyEntries() const
void DeRegisterCustomColumn(RCustomColumnBase *column)
RLoopManager & operator=(const RLoopManager &)=delete
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
::TDirectory * GetDirectory() const
std::vector< RDFInternal::RActionBase * > GetBookedActions()
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
void RegisterCustomColumn(RCustomColumnBase *column)
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > GetAllActions()
For all the actions, either booked or run.
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
void AddFilterName(std::vector< std::string > &)
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::map< std::string, std::string > fAliasColumnNameMap
ColumnNameAlias-columnName pairs.
std::vector< TCallback > fCallbacks
Registered callbacks.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddColumnAlias(const std::string &alias, const std::string &colName)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
std::vector< RCustomColumnBase * > fCustomColumns
Non-owning container of all custom columns created so far.
void SetTree(const std::shared_ptr< TTree > &tree)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::string fToJit
code that should be jitted and executed right before the event loop
void BuildJittedNodes()
Jit all actions that required runtime column type inference, and clean the fToJit member variable.
RDataSource * GetDataSource() const
unsigned int GetNSlots() const
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
static unsigned int GetNextID()
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
Base class for non-leaf nodes of the computational graph.
Definition: RNodeBase.hxx:41
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: RNodeBase.hxx:45
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: RNodeBase.hxx:44
Helper class that provides the operation graph nodes.
Class used to create the operation graph to be printed in the dot representation.
Definition: GraphNode.hxx:26
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition: TDirectory.h:34
A simple, robust and fast interface to read values from ROOT colmnar datasets such as TTree,...
Definition: TTreeReader.h:44
A TTree object has a header with a name and a title.
Definition: TTree.h:71
ColumnNames_t GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
ROOT type_traits extensions.
Definition: TypeTraits.hxx:23
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
ROOT::Detail::RDF::ColumnNames_t ColumnNames_t
Definition: RDataFrame.cxx:790
static constexpr double s
STL namespace.
Definition: tree.py:1