Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
14#include "ROOT/InternalTreeUtils.hxx" // RNoCleanupNotifier
20
21#include <functional>
22#include <limits>
23#include <map>
24#include <memory>
25#include <set>
26#include <string>
27#include <string_view>
28#include <unordered_map>
29#include <unordered_set>
30#include <vector>
31
32// forward declarations
33class TTree;
34class TTreeReader;
35class TDirectory;
36
37namespace ROOT {
38namespace RDF {
39class RCutFlowReport;
40class RDataSource;
41} // ns RDF
42
43namespace Internal {
44namespace RDF {
45std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
46
47class GraphNode;
48class RActionBase;
49class RVariationBase;
50class RDefinesWithReaders;
51class RVariationsWithReaders;
52
53namespace GraphDrawing {
55} // ns GraphDrawing
56
57using Callback_t = std::function<void(unsigned int)>;
58
59class RCallback {
62 std::vector<ULong64_t> fCounters;
63
64public:
65 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
66 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
67 {
68 }
69
70 void operator()(unsigned int slot)
71 {
72 auto &c = fCounters[slot];
73 ++c;
74 if (c == fEveryN) {
75 c = 0ull;
76 fFun(slot);
77 }
78 }
79};
80
83 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
84
85public:
86 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
87
88 void operator()(unsigned int slot)
89 {
90 if (fHasBeenCalled[slot] == 1)
91 return;
92 fFun(slot);
93 fHasBeenCalled[slot] = 1;
94 }
95};
96
97} // namespace RDF
98} // namespace Internal
99} // namespace ROOT
100
101namespace ROOT {
102namespace Detail {
103namespace RDF {
105
106class RFilterBase;
107class RRangeBase;
108class RDefineBase;
110
111/// The head node of a RDF computation graph.
112/// This class is responsible of running the event loop.
113class RLoopManager : public RNodeBase {
114 using ColumnNames_t = std::vector<std::string>;
116
117 friend struct RCallCleanUpTask;
118
119 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
120 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
121 std::vector<RFilterBase *> fBookedFilters;
122 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
123 std::vector<RRangeBase *> fBookedRanges;
124 std::vector<RDefineBase *> fBookedDefines;
125 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
126
127 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
128 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
129 std::shared_ptr<TTree> fTree{nullptr};
131 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
132
133 /// Keys are `fname + "/" + treename` as RSampleInfo::fID; Values are pointers to the corresponding sample
134 std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
135 /// Samples need to survive throughout the whole event loop, hence stored as an attribute
136 std::vector<ROOT::RDF::Experimental::RSample> fSamples;
137
138 /// Friends of the fTree. Only used if we constructed fTree ourselves.
139 std::vector<std::unique_ptr<TChain>> fFriends;
141 /// Range of entries created when no data source is specified.
142 std::pair<ULong64_t, ULong64_t> fEmptyEntryRange{};
143 const unsigned int fNSlots{1};
145 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
146 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
147 /// Registered callbacks to be executed every N events.
148 /// The registration happens via the RegisterCallback method.
149 std::vector<RDFInternal::RCallback> fCallbacksEveryNEvents;
150 /// Registered callbacks to invoke just once before running the loop.
151 /// The registration happens via the RegisterCallback method.
152 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
153 /// Registered callbacks to call at the beginning of each "data block".
154 /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
155 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
157 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
158 unsigned int fNRuns{0}; ///< Number of event loops run
159
160 /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
161 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
162
163 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
165
167
168 void RunEmptySourceMT();
169 void RunEmptySource();
170 void RunTreeProcessorMT();
171 void RunTreeReader();
172 void RunDataSourceMT();
173 void RunDataSource();
174 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
175 void InitNodeSlots(TTreeReader *r, unsigned int slot);
176 void InitNodes();
177 void CleanUpNodes();
178 void CleanUpTask(TTreeReader *r, unsigned int slot);
179 void EvalChildrenCounts();
180 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
181 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
182 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
183
184 std::unordered_set<std::string> fCachedColNames;
185 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>>
187 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>>
189
190public:
191 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
192 RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches);
193 RLoopManager(ULong64_t nEmptyEntries);
194 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
196
197 // Rule of five
198
199 RLoopManager(const RLoopManager &) = delete;
203 ~RLoopManager() = default;
204
206 void Jit();
207 RLoopManager *GetLoopManagerUnchecked() final { return this; }
208 void Run(bool jit = true);
210 TTree *GetTree() const;
213 RDataSource *GetDataSource() const { return fDataSource.get(); }
214 void Register(RDFInternal::RActionBase *actionPtr);
215 void Deregister(RDFInternal::RActionBase *actionPtr);
216 void Register(RFilterBase *filterPtr);
217 void Deregister(RFilterBase *filterPtr);
218 void Register(RRangeBase *rangePtr);
219 void Deregister(RRangeBase *rangePtr);
220 void Register(RDefineBase *definePtr);
221 void Deregister(RDefineBase *definePtr);
224 bool CheckFilters(unsigned int, Long64_t) final;
225 unsigned int GetNSlots() const { return fNSlots; }
226 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
227 /// End of recursive chain of calls, does nothing
229 void SetTree(std::shared_ptr<TTree> tree);
230 void IncrChildrenCount() final { ++fNChildren; }
231 void StopProcessing() final { ++fNStopsReceived; }
232 void ToJitExec(const std::string &) const;
233 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
234 unsigned int GetNRuns() const { return fNRuns; }
235 bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const;
236 void AddDataSourceColumnReaders(const std::string &col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
237 const std::type_info &ti);
238 RColumnReaderBase *AddTreeColumnReader(unsigned int slot, const std::string &col,
239 std::unique_ptr<RColumnReaderBase> &&reader, const std::type_info &ti);
240 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const;
241
242 /// End of recursive chain of calls, does nothing
243 void AddFilterName(std::vector<std::string> &) final {}
244 /// For each booked filter, returns either the name or "Unnamed Filter"
245 std::vector<std::string> GetFiltersNames();
246
247 /// Return all graph edges known to RLoopManager
248 /// This includes Filters and Ranges but not Defines.
249 std::vector<RNodeBase *> GetGraphEdges() const;
250
251 /// Return all actions, either booked or already run
252 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
253
254 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
255 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
256
258
259 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
260
261 void SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange);
263
264 std::unordered_set<std::string> &GetColumnNamesCache() { return fCachedColNames; }
265 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>> &
267 {
269 }
270 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>> &
272 {
274 }
275};
276
277/// \brief Create an RLoopManager that reads a TChain.
278/// \param[in] datasetName Name of the TChain
279/// \param[in] fileNameGlob File name (or glob) in which the TChain is stored.
280/// \param[in] defaultColumns List of default columns, see
281/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
282/// \return the RLoopManager instance.
283std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
284CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
285 const std::vector<std::string> &defaultColumns, bool checkFile = true);
286
287/// \brief Create an RLoopManager that reads a TChain.
288/// \param[in] datasetName Name of the TChain
289/// \param[in] fileNameGlobs List of file names (potentially globs).
290/// \param[in] defaultColumns List of default columns, see
291/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
292/// \return the RLoopManager instance.
293std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
294CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
295 const std::vector<std::string> &defaultColumns, bool checkFile = true);
296
297#ifdef R__HAS_ROOT7
298/// \brief Create an RLoopManager that reads an RNTuple.
299/// \param[in] datasetName Name of the RNTuple
300/// \param[in] fileNameGlob File name (or glob) in which the RNTuple is stored.
301/// \param[in] defaultColumns List of default columns, see
302/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
303/// \return the RLoopManager instance.
304std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
305 std::string_view fileNameGlob,
306 const std::vector<std::string> &defaultColumns);
307
308/// \brief Create an RLoopManager that reads multiple RNTuples chained vertically.
309/// \param[in] datasetName Name of the RNTuple
310/// \param[in] fileNameGlobs List of file names (potentially globs).
311/// \param[in] defaultColumns List of default columns, see
312/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
313/// \return the RLoopManager instance.
314std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
315 const std::vector<std::string> &fileNameGlobs,
316 const std::vector<std::string> &defaultColumns);
317
318/// \brief Create an RLoopManager opening a file and checking the data format of the dataset.
319/// \param[in] datasetName Name of the dataset in the file.
320/// \param[in] fileNameGlob File name (or glob) in which the dataset is stored.
321/// \param[in] defaultColumns List of default columns, see
322/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
323/// \throws std::invalid_argument if the file could not be opened.
324/// \return an RLoopManager of the appropriate data source.
325std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
326 std::string_view fileNameGlob,
327 const std::vector<std::string> &defaultColumns);
328
329/// \brief Create an RLoopManager that reads many files. The first is opened to infer the data source type.
330/// \param[in] datasetName Name of the dataset.
331/// \param[in] fileNameGlobs List of file names (potentially globs).
332/// \param[in] defaultColumns List of default columns, see
333/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
334/// \throws std::invalid_argument if the file could not be opened.
335/// \return an RLoopManager of the appropriate data source.
336std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
337 const std::vector<std::string> &fileNameGlobs,
338 const std::vector<std::string> &defaultColumns);
339#endif
340
341} // namespace RDF
342} // namespace Detail
343} // namespace ROOT
344
345#endif
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
RLoopManager & operator=(RLoopManager &&)=delete
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
RLoopManager & operator=(const RLoopManager &)=delete
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
::TDirectory * GetDirectory() const
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RVariationsWithReaders > > > & GetUniqueVariationsWithReaders()
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
void AddFilterName(std::vector< std::string > &) final
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::unordered_set< std::string > & GetColumnNamesCache()
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RVariationsWithReaders > > > fUniqueVariationsWithReaders
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RLoopManager(RLoopManager &&)=delete
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RDefinesWithReaders > > > & GetUniqueDefinesWithReaders()
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RDefinesWithReaders > > > fUniqueDefinesWithReaders
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::unordered_set< std::string > fCachedColNames
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
Base class for non-leaf nodes of the computational graph.
Definition RNodeBase.hxx:43
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
Helper class that provides the operation graph nodes.
RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
std::vector< ULong64_t > fCounters
void operator()(unsigned int slot)
ROneTimeCallback(Callback_t &&f, unsigned int nSlots)
This type includes all parts of RVariation that do not depend on the callable signature.
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
std::function< void(unsigned int)> Callback_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
A RAII object that calls RLoopManager::CleanUpTask at destruction.