Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
14#include "ROOT/InternalTreeUtils.hxx" // RNoCleanupNotifier
20#include "ROOT/RDF/Utils.hxx"
21
22#include <functional>
23#include <limits>
24#include <map>
25#include <memory>
26#include <set>
27#include <string>
28#include <string_view>
29#include <unordered_map>
30#include <unordered_set>
31#include <vector>
32
33// forward declarations
34class TTree;
35class TTreeReader;
36class TDirectory;
37
38namespace ROOT {
39namespace RDF {
40class RCutFlowReport;
41class RDataSource;
42} // ns RDF
43
44namespace Internal {
45namespace RDF {
46std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
47
48class GraphNode;
49class RActionBase;
50class RVariationBase;
51class RDefinesWithReaders;
52class RVariationsWithReaders;
53
54namespace GraphDrawing {
56} // ns GraphDrawing
57
58using Callback_t = std::function<void(unsigned int)>;
59
60class RCallback {
63 std::vector<ULong64_t> fCounters;
64
65public:
66 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
67 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
68 {
69 }
70
71 void operator()(unsigned int slot)
72 {
73 auto &c = fCounters[slot];
74 ++c;
75 if (c == fEveryN) {
76 c = 0ull;
77 fFun(slot);
78 }
79 }
80};
81
84 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
85
86public:
87 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
88
89 void operator()(unsigned int slot)
90 {
91 if (fHasBeenCalled[slot] == 1)
92 return;
93 fFun(slot);
94 fHasBeenCalled[slot] = 1;
95 }
96};
97
98} // namespace RDF
99} // namespace Internal
100} // namespace ROOT
101
102namespace ROOT {
103namespace Detail {
104namespace RDF {
106
107class RFilterBase;
108class RRangeBase;
109class RDefineBase;
111
112/// The head node of a RDF computation graph.
113/// This class is responsible of running the event loop.
114class RLoopManager : public RNodeBase {
115 using ColumnNames_t = std::vector<std::string>;
117
118 friend struct RCallCleanUpTask;
119
120 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
121 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
122 std::vector<RFilterBase *> fBookedFilters;
123 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
124 std::vector<RRangeBase *> fBookedRanges;
125 std::vector<RDefineBase *> fBookedDefines;
126 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
127
128 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
129 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
130 std::shared_ptr<TTree> fTree{nullptr};
132 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
133
134 /// Keys are `fname + "/" + treename` as RSampleInfo::fID; Values are pointers to the corresponding sample
135 std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
136 /// Samples need to survive throughout the whole event loop, hence stored as an attribute
137 std::vector<ROOT::RDF::Experimental::RSample> fSamples;
138
139 /// Friends of the fTree. Only used if we constructed fTree ourselves.
140 std::vector<std::unique_ptr<TChain>> fFriends;
142 /// Range of entries created when no data source is specified.
143 std::pair<ULong64_t, ULong64_t> fEmptyEntryRange{};
144 const unsigned int fNSlots{1};
146 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
147 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
148 /// Registered callbacks to be executed every N events.
149 /// The registration happens via the RegisterCallback method.
150 std::vector<RDFInternal::RCallback> fCallbacksEveryNEvents;
151 /// Registered callbacks to invoke just once before running the loop.
152 /// The registration happens via the RegisterCallback method.
153 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
154 /// Registered callbacks to call at the beginning of each "data block".
155 /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
156 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
158 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
159 unsigned int fNRuns{0}; ///< Number of event loops run
160
161 /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
162 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
163
164 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
166
168
169 void RunEmptySourceMT();
170 void RunEmptySource();
171 void RunTreeProcessorMT();
172 void RunTreeReader();
173 void RunDataSourceMT();
174 void RunDataSource();
175 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
176 void InitNodeSlots(TTreeReader *r, unsigned int slot);
177 void InitNodes();
178 void CleanUpNodes();
179 void CleanUpTask(TTreeReader *r, unsigned int slot);
180 void EvalChildrenCounts();
181 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
182 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
183 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
184
186 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>>
188 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>>
190
191public:
192 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
193 RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches);
194 RLoopManager(ULong64_t nEmptyEntries);
195 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
197
198 // Rule of five
199
200 RLoopManager(const RLoopManager &) = delete;
204 ~RLoopManager() = default;
205
207 void Jit();
208 RLoopManager *GetLoopManagerUnchecked() final { return this; }
209 void Run(bool jit = true);
211 TTree *GetTree() const;
214 RDataSource *GetDataSource() const { return fDataSource.get(); }
215 void Register(RDFInternal::RActionBase *actionPtr);
216 void Deregister(RDFInternal::RActionBase *actionPtr);
217 void Register(RFilterBase *filterPtr);
218 void Deregister(RFilterBase *filterPtr);
219 void Register(RRangeBase *rangePtr);
220 void Deregister(RRangeBase *rangePtr);
221 void Register(RDefineBase *definePtr);
222 void Deregister(RDefineBase *definePtr);
225 bool CheckFilters(unsigned int, Long64_t) final;
226 unsigned int GetNSlots() const { return fNSlots; }
227 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
228 /// End of recursive chain of calls, does nothing
230 void SetTree(std::shared_ptr<TTree> tree);
231 void IncrChildrenCount() final { ++fNChildren; }
232 void StopProcessing() final { ++fNStopsReceived; }
233 void ToJitExec(const std::string &) const;
234 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
235 unsigned int GetNRuns() const { return fNRuns; }
236 bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const;
237 void AddDataSourceColumnReaders(const std::string &col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
238 const std::type_info &ti);
239 RColumnReaderBase *AddTreeColumnReader(unsigned int slot, const std::string &col,
240 std::unique_ptr<RColumnReaderBase> &&reader, const std::type_info &ti);
241 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const;
242
243 /// End of recursive chain of calls, does nothing
244 void AddFilterName(std::vector<std::string> &) final {}
245 /// For each booked filter, returns either the name or "Unnamed Filter"
246 std::vector<std::string> GetFiltersNames();
247
248 /// Return all graph edges known to RLoopManager
249 /// This includes Filters and Ranges but not Defines.
250 std::vector<RNodeBase *> GetGraphEdges() const;
251
252 /// Return all actions, either booked or already run
253 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
254
255 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
256 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
257
259
260 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
261
262 void SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange);
264
266 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>> &
268 {
270 }
271 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>> &
273 {
275 }
276};
277
278/// \brief Create an RLoopManager that reads a TChain.
279/// \param[in] datasetName Name of the TChain
280/// \param[in] fileNameGlob File name (or glob) in which the TChain is stored.
281/// \param[in] defaultColumns List of default columns, see
282/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
283/// \return the RLoopManager instance.
284std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
285CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
286 const std::vector<std::string> &defaultColumns, bool checkFile = true);
287
288/// \brief Create an RLoopManager that reads a TChain.
289/// \param[in] datasetName Name of the TChain
290/// \param[in] fileNameGlobs List of file names (potentially globs).
291/// \param[in] defaultColumns List of default columns, see
292/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
293/// \return the RLoopManager instance.
294std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
295CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
296 const std::vector<std::string> &defaultColumns, bool checkFile = true);
297
298#ifdef R__HAS_ROOT7
299/// \brief Create an RLoopManager that reads an RNTuple.
300/// \param[in] datasetName Name of the RNTuple
301/// \param[in] fileNameGlob File name (or glob) in which the RNTuple is stored.
302/// \param[in] defaultColumns List of default columns, see
303/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
304/// \return the RLoopManager instance.
305std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
306 std::string_view fileNameGlob,
307 const std::vector<std::string> &defaultColumns);
308
309/// \brief Create an RLoopManager that reads multiple RNTuples chained vertically.
310/// \param[in] datasetName Name of the RNTuple
311/// \param[in] fileNameGlobs List of file names (potentially globs).
312/// \param[in] defaultColumns List of default columns, see
313/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
314/// \return the RLoopManager instance.
315std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
316 const std::vector<std::string> &fileNameGlobs,
317 const std::vector<std::string> &defaultColumns);
318
319/// \brief Create an RLoopManager opening a file and checking the data format of the dataset.
320/// \param[in] datasetName Name of the dataset in the file.
321/// \param[in] fileNameGlob File name (or glob) in which the dataset is stored.
322/// \param[in] defaultColumns List of default columns, see
323/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
324/// \throws std::invalid_argument if the file could not be opened.
325/// \return an RLoopManager of the appropriate data source.
326std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
327 std::string_view fileNameGlob,
328 const std::vector<std::string> &defaultColumns);
329
330/// \brief Create an RLoopManager that reads many files. The first is opened to infer the data source type.
331/// \param[in] datasetName Name of the dataset.
332/// \param[in] fileNameGlobs List of file names (potentially globs).
333/// \param[in] defaultColumns List of default columns, see
334/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
335/// \throws std::invalid_argument if the file could not be opened.
336/// \return an RLoopManager of the appropriate data source.
337std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
338 const std::vector<std::string> &fileNameGlobs,
339 const std::vector<std::string> &defaultColumns);
340#endif
341
342} // namespace RDF
343} // namespace Detail
344} // namespace ROOT
345
346#endif
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
RLoopManager & operator=(RLoopManager &&)=delete
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
RLoopManager & operator=(const RLoopManager &)=delete
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
::TDirectory * GetDirectory() const
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RVariationsWithReaders > > > & GetUniqueVariationsWithReaders()
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
void AddFilterName(std::vector< std::string > &) final
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RVariationsWithReaders > > > fUniqueVariationsWithReaders
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RLoopManager(RLoopManager &&)=delete
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RDefinesWithReaders > > > & GetUniqueDefinesWithReaders()
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::set< std::pair< std::string_view, std::unique_ptr< ROOT::Internal::RDF::RDefinesWithReaders > > > fUniqueDefinesWithReaders
const ColumnNames_t fDefaultColumns
ROOT::Internal::RDF::RStringCache fCachedColNames
ROOT::Internal::RDF::RStringCache & GetColumnNamesCache()
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
Base class for non-leaf nodes of the computational graph.
Definition RNodeBase.hxx:43
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
Helper class that provides the operation graph nodes.
RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
std::vector< ULong64_t > fCounters
void operator()(unsigned int slot)
ROneTimeCallback(Callback_t &&f, unsigned int nSlots)
A Thread-safe cache for strings.
Definition Utils.hxx:286
This type includes all parts of RVariation that do not depend on the callable signature.
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
std::function< void(unsigned int)> Callback_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
A RAII object that calls RLoopManager::CleanUpTask at destruction.