50#include <unordered_map>
64static std::string &GetCodeToJit()
66 static std::string code;
70static bool ContainsLeaf(
const std::set<TLeaf *> &leaves,
TLeaf *leaf)
72 return (leaves.find(leaf) != leaves.end());
78static void InsertBranchName(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
79 const std::string &friendName,
bool allowDuplicates)
81 if (!friendName.empty()) {
83 const auto friendBName = friendName +
"." + branchName;
84 if (bNamesReg.insert(friendBName).second)
85 bNames.push_back(friendBName);
88 if (allowDuplicates || friendName.empty()) {
89 if (bNamesReg.insert(branchName).second)
90 bNames.push_back(branchName);
96static void InsertBranchName(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
97 const std::string &friendName, std::set<TLeaf *> &foundLeaves,
TLeaf *leaf,
100 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
105 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
107 foundLeaves.insert(leaf);
111 std::string prefix, std::string &friendName,
bool allowDuplicates)
113 for (
auto sb : *
b->GetListOfBranches()) {
115 auto subBranchName = std::string(subBranch->
GetName());
116 auto fullName = prefix + subBranchName;
118 std::string newPrefix;
120 newPrefix = fullName +
".";
122 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName, allowDuplicates);
124 auto branchDirectlyFromTree = t.
GetBranch(fullName.c_str());
125 if (!branchDirectlyFromTree)
126 branchDirectlyFromTree = t.
FindBranch(fullName.c_str());
127 if (branchDirectlyFromTree)
128 InsertBranchName(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName,
132 InsertBranchName(bNamesReg, bNames, subBranchName, friendName, allowDuplicates);
136static void GetBranchNamesImpl(
TTree &t, std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
137 std::set<TTree *> &analysedTrees, std::string &friendName,
bool allowDuplicates)
139 std::set<TLeaf *> foundLeaves;
140 if (!analysedTrees.insert(&t).second) {
149 std::string err(
"GetBranchNames: error in opening the tree ");
151 throw std::runtime_error(err);
154 for (
auto b : *branches) {
156 const auto branchName = std::string(branch->
GetName());
160 if (listOfLeaves->GetEntriesUnsafe() == 1) {
161 auto leaf =
static_cast<TLeaf *
>(listOfLeaves->UncheckedAt(0));
162 InsertBranchName(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
165 for (
auto leaf : *listOfLeaves) {
166 auto castLeaf =
static_cast<TLeaf *
>(leaf);
167 const auto leafName = std::string(leaf->
GetName());
168 const auto fullName = branchName +
"." + leafName;
169 InsertBranchName(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
173 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName, allowDuplicates);
174 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
179 bool dotIsImplied =
false;
182 throw std::runtime_error(
"GetBranchNames: unsupported branch type");
184 if (be->GetType() == 3 || be->GetType() == 4)
187 if (dotIsImplied || branchName.back() ==
'.')
188 ExploreBranch(t, bNamesReg, bNames, branch,
"", friendName, allowDuplicates);
190 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName, allowDuplicates);
192 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
205 for (
auto friendTreeObj : *friendTrees) {
210 if (alias !=
nullptr)
211 frName = std::string(alias);
213 frName = std::string(friendTree->GetName());
215 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
219static void ThrowIfNSlotsChanged(
unsigned int nSlots)
222 if (currentSlots != nSlots) {
223 std::string msg =
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
224 std::to_string(nSlots) +
", but when starting the event loop it was " +
225 std::to_string(currentSlots) +
".";
226 if (currentSlots > nSlots)
227 msg +=
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
229 msg +=
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
230 throw std::runtime_error(msg);
242struct MaxTreeSizeRAII {
245 MaxTreeSizeRAII() : fOldMaxTreeSize(
TTree::GetMaxTreeSize())
253struct DatasetLogInfo {
254 std::string fDataSet;
260std::string LogRangeProcessing(
const DatasetLogInfo &info)
262 std::stringstream msg;
263 msg <<
"Processing " << info.fDataSet <<
": entry range [" << info.fRangeStart <<
"," << info.fRangeEnd - 1
264 <<
"], using slot " << info.fSlot <<
" in thread " << std::this_thread::get_id() <<
'.';
268DatasetLogInfo TreeDatasetLogInfo(
const TTreeReader &
r,
unsigned int slot)
270 const auto tree =
r.GetTree();
271 const auto chain =
dynamic_cast<TChain *
>(
tree);
274 auto files = chain->GetListOfFiles();
275 std::vector<std::string> treeNames;
276 std::vector<std::string> fileNames;
278 treeNames.emplace_back(
f->GetName());
279 fileNames.emplace_back(
f->GetTitle());
282 for (
const auto &t : treeNames) {
286 what +=
" in files {";
287 for (
const auto &
f : fileNames) {
292 assert(
tree !=
nullptr);
293 const auto treeName =
tree->GetName();
294 what = std::string(
"tree \"") + treeName +
"\"";
295 const auto file =
tree->GetCurrentFile();
297 what += std::string(
" in file \"") +
file->GetName() +
"\"";
299 const auto entryRange =
r.GetEntriesRange();
300 const ULong64_t end = entryRange.second == -1ll ?
tree->GetEntries() : entryRange.second;
301 return {std::move(
what),
static_cast<ULong64_t>(entryRange.first), end, slot};
331 std::set<std::string> bNamesSet;
333 std::set<TTree *> analysedTrees;
334 std::string emptyFrName =
"";
335 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
340 : fTree(std::shared_ptr<
TTree>(
tree, [](
TTree *) {})), fDefaultColumns(defaultBranches),
343 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
350 fSampleInfos(fNSlots)
357 fDataSource(std::move(ds)), fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
378 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
386 entryRanges.emplace_back(start, end);
391 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
393 auto slot = slotRAII.
fSlot;
399 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
404 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
410 pool.
Foreach(genFunction, entryRanges);
427 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
438 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*
fTree, entryList,
fNSlots);
440 std::atomic<ULong64_t> entryCount(0ull);
442 tp->Process([
this, &slotStack, &entryCount](
TTreeReader &
r) ->
void {
444 auto slot = slotRAII.
fSlot;
448 const auto entryRange =
r.GetEntriesRange();
449 const auto nEntries = entryRange.second - entryRange.first;
450 auto count = entryCount.fetch_add(nEntries);
460 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
467 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
468 std::to_string(
r.GetEntryStatus()));
478 if (0 ==
fTree->GetEntriesFast())
494 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
499 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
500 std::to_string(
r.GetEntryStatus()));
515 for (
const auto &range : ranges) {
516 const auto start = range.first;
517 const auto end = range.second;
526 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
544 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
546 const auto slot = slotRAII.
fSlot;
550 const auto start = range.first;
551 const auto end = range.second;
554 for (
auto entry = start; entry < end; ++entry) {
560 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
568 while (!ranges.empty()) {
569 pool.
Foreach(runOnRange, ranges);
589 actionPtr->Run(slot, entry);
591 namedFilterPtr->CheckFilters(slot, entry);
603 ptr->InitSlot(
r, slot);
605 ptr->InitSlot(
r, slot);
607 ptr->InitSlot(
r, slot);
609 ptr->InitSlot(
r, slot);
631 "Empty source, range: {" + std::to_string(range.first) +
", " + std::to_string(range.second) +
"}", range);
636 auto *
tree =
r.GetTree()->GetTree();
639 auto *
file =
tree->GetCurrentFile();
640 const std::string fname =
file !=
nullptr ?
file->GetName() :
"#inmemorytree#";
643 std::pair<Long64_t, Long64_t> range =
r.GetEntriesRange();
645 if (range.second == -1) {
646 range.second =
tree->GetEntries();
683 ptr->ResetChildrenCount();
685 ptr->ResetChildrenCount();
698 ptr->FinalizeSlot(slot);
700 ptr->FinalizeSlot(slot);
702 ptr->FinalizeSlot(slot);
712 const std::string code = std::move(GetCodeToJit());
723 << (
s.RealTime() > 1
e-3 ?
" in " + std::to_string(
s.RealTime()) +
" seconds." :
".");
735 actionPtr->TriggerChildrenCount();
737 namedFilterPtr->TriggerChildrenCount();
745 MaxTreeSizeRAII ctxtmts;
772 <<
s.RealTime() <<
"s elapsed).";
852 fPtr->FillReport(rep);
858 GetCodeToJit().append(code);
863 if (everyNEvents == 0ull)
871 std::vector<std::string>
filters;
873 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
896 std::unordered_map<
void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
899 auto duplicateRLoopManagerIt = visitedMap.find((
void *)
this);
900 if (duplicateRLoopManagerIt != visitedMap.end())
901 return duplicateRLoopManagerIt->second;
911 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
913 visitedMap[(
void *)
this] = thisNode;
unsigned long long ULong64_t
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t b
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
R__EXTERN TVirtualMutex * gROOTMutex
#define R__LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
void AddSampleCallback(ROOT::RDF::SampleCallback_t &&callback)
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
bool fMustRunNamedFilters
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
const ULong64_t fNEmptyEntries
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::vector< RDFInternal::RCallback > fCallbacks
Registered callbacks.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
const unsigned int fNSlots
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap)
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
void SetFlag(unsigned int slot)
bool CheckFlag(unsigned int slot) const
void UnsetFlag(unsigned int slot)
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
void ReturnSlot(unsigned int slotNumber)
This type includes all parts of RVariation that do not depend on the callable signature.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
A TTree is a list of TBranches.
TClass * IsA() const override
TObjArray * GetListOfLeaves()
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
@ kEntryBeyondEnd
last entry loop has reached its end
A TTree represents a columnar dataset.
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
virtual TObjArray * GetListOfBranches()
virtual TTree * GetTree() const
virtual TList * GetListOfFriends() const
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
Retrieve the full path(s) to a TTree or the trees in a TChain.
RVec< PromoteTypes< T0, T1 > > remainder(const T0 &x, const RVec< T1 > &v)
ROOT::Experimental::RLogChannel & RDFLogChannel()
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
std::vector< std::string > ColumnNames_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
static constexpr double s
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
RLoopManager & fLoopManager
RSlotRAII(RSlotStack &slotStack)