48#include <unordered_map>
62static std::string &GetCodeToJit()
64 static std::string code;
68static bool ContainsLeaf(
const std::set<TLeaf *> &leaves,
TLeaf *leaf)
70 return (leaves.find(leaf) != leaves.end());
76static void InsertBranchName(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
77 const std::string &friendName,
bool allowDuplicates)
79 if (!friendName.empty()) {
81 const auto friendBName = friendName +
"." + branchName;
82 if (bNamesReg.insert(friendBName).second)
83 bNames.push_back(friendBName);
86 if (allowDuplicates || friendName.empty()) {
87 if (bNamesReg.insert(branchName).second)
88 bNames.push_back(branchName);
94static void InsertBranchName(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
95 const std::string &friendName, std::set<TLeaf *> &foundLeaves,
TLeaf *leaf,
98 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
103 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
105 foundLeaves.insert(leaf);
109 std::string prefix, std::string &friendName,
bool allowDuplicates)
111 for (
auto sb : *
b->GetListOfBranches()) {
113 auto subBranchName = std::string(subBranch->
GetName());
114 auto fullName = prefix + subBranchName;
116 std::string newPrefix;
118 newPrefix = fullName +
".";
120 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName, allowDuplicates);
122 auto branchDirectlyFromTree = t.
GetBranch(fullName.c_str());
123 if (!branchDirectlyFromTree)
124 branchDirectlyFromTree = t.
FindBranch(fullName.c_str());
125 if (branchDirectlyFromTree)
126 InsertBranchName(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName,
129 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.
GetBranch(subBranchName.c_str()))
130 InsertBranchName(bNamesReg, bNames, subBranchName, friendName, allowDuplicates);
134static void GetBranchNamesImpl(
TTree &t, std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
135 std::set<TTree *> &analysedTrees, std::string &friendName,
bool allowDuplicates)
137 std::set<TLeaf *> foundLeaves;
138 if (!analysedTrees.insert(&t).second) {
147 std::string err(
"GetBranchNames: error in opening the tree ");
149 throw std::runtime_error(err);
152 for (
auto b : *branches) {
154 const auto branchName = std::string(branch->
GetName());
158 if (listOfLeaves->GetEntriesUnsafe() == 1) {
159 auto leaf =
static_cast<TLeaf *
>(listOfLeaves->UncheckedAt(0));
160 InsertBranchName(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
163 for (
auto leaf : *listOfLeaves) {
164 auto castLeaf =
static_cast<TLeaf *
>(leaf);
165 const auto leafName = std::string(leaf->
GetName());
166 const auto fullName = branchName +
"." + leafName;
167 InsertBranchName(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
171 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName, allowDuplicates);
172 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
177 bool dotIsImplied =
false;
180 throw std::runtime_error(
"GetBranchNames: unsupported branch type");
182 if (be->GetType() == 3 || be->GetType() == 4)
185 if (dotIsImplied || branchName.back() ==
'.')
186 ExploreBranch(t, bNamesReg, bNames, branch,
"", friendName, allowDuplicates);
188 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName, allowDuplicates);
190 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
203 for (
auto friendTreeObj : *friendTrees) {
208 if (alias !=
nullptr)
209 frName = std::string(alias);
211 frName = std::string(friendTree->GetName());
213 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
217static void ThrowIfNSlotsChanged(
unsigned int nSlots)
220 if (currentSlots != nSlots) {
221 std::string msg =
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
222 std::to_string(nSlots) +
", but when starting the event loop it was " +
223 std::to_string(currentSlots) +
".";
224 if (currentSlots > nSlots)
225 msg +=
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
227 msg +=
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
228 throw std::runtime_error(msg);
240struct MaxTreeSizeRAII {
243 MaxTreeSizeRAII() : fOldMaxTreeSize(
TTree::GetMaxTreeSize())
251struct DatasetLogInfo {
252 std::string fDataSet;
258std::string LogRangeProcessing(
const DatasetLogInfo &info)
260 std::stringstream msg;
261 msg <<
"Processing " << info.fDataSet <<
": entry range [" << info.fRangeStart <<
"," << info.fRangeEnd - 1
262 <<
"], using slot " << info.fSlot <<
" in thread " << std::this_thread::get_id() <<
'.';
266DatasetLogInfo TreeDatasetLogInfo(
const TTreeReader &
r,
unsigned int slot)
268 const auto tree =
r.GetTree();
269 const auto chain =
dynamic_cast<TChain *
>(
tree);
273 std::vector<std::string> treeNames;
274 std::vector<std::string> fileNames;
276 treeNames.emplace_back(
f->GetName());
277 fileNames.emplace_back(
f->GetTitle());
280 for (
const auto &t : treeNames) {
284 what +=
" in files {";
285 for (
const auto &
f : fileNames) {
290 assert(
tree !=
nullptr);
291 const auto treeName =
tree->GetName();
292 what = std::string(
"tree \"") + treeName +
"\"";
293 const auto file =
tree->GetCurrentFile();
295 what += std::string(
" in file \"") +
file->GetName() +
"\"";
297 const auto entryRange =
r.GetEntriesRange();
298 const ULong64_t end = entryRange.second == -1ll ?
tree->GetEntries() : entryRange.second;
299 return {std::move(
what),
static_cast<ULong64_t>(entryRange.first), end, slot};
302static auto MakeDatasetColReadersKey(
const std::string &colName,
const std::type_info &ti)
308 return colName +
':' + ti.name();
337 std::set<std::string> bNamesSet;
339 std::set<TTree *> analysedTrees;
340 std::string emptyFrName =
"";
341 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
346 : fTree(std::shared_ptr<
TTree>(
tree, [](
TTree *) {})), fDefaultColumns(defaultBranches),
349 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots), fDatasetColumnReaders(fNSlots)
354 : fEmptyEntryRange(0, nEmptyEntries),
357 fNewSampleNotifier(fNSlots),
358 fSampleInfos(fNSlots),
359 fDatasetColumnReaders(fNSlots)
366 fDataSource(std::move(ds)), fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots), fDatasetColumnReaders(fNSlots)
374 fNewSampleNotifier(fNSlots),
375 fSampleInfos(fNSlots),
376 fDatasetColumnReaders(fNSlots)
407 const auto &trees = sample.GetTreeNames();
408 const auto &files = sample.GetFileNameGlobs();
409 for (std::size_t i = 0ul; i < files.size(); ++i) {
412 const auto fullpath = files[i] +
"?#" + trees[i];
413 chain->Add(fullpath.c_str());
417 const auto sampleId = files[i] +
'/' + trees[i];
424 const auto &friendInfo = spec.GetFriendInfo();
426 for (std::size_t i = 0ul; i <
fFriends.size(); i++) {
427 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
428 fTree->AddFriend(
fFriends[i].get(), thisFriendAlias.c_str());
440 const auto nEntriesPerSlot = nEmptyEntries / (
fNSlots * 2);
441 auto remainder = nEmptyEntries % (
fNSlots * 2);
442 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
450 entryRanges.emplace_back(begin, end);
455 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
457 auto slot = slotRAII.
fSlot;
463 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
468 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
474 pool.
Foreach(genFunction, entryRanges);
493 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
508 : std::make_unique<ROOT::TTreeProcessorMT>(*
fTree, entryList,
fNSlots);
510 std::atomic<ULong64_t> entryCount(0ull);
512 tp->Process([
this, &slotStack, &entryCount](
TTreeReader &
r) ->
void {
514 auto slot = slotRAII.
fSlot;
518 const auto entryRange =
r.GetEntriesRange();
519 const auto nEntries = entryRange.second - entryRange.first;
520 auto count = entryCount.fetch_add(nEntries);
530 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
537 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
538 std::to_string(
r.GetEntryStatus()));
555 throw std::logic_error(
"Something went wrong in initializing the TTreeReader.");
571 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
576 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
577 std::to_string(
r.GetEntryStatus()));
592 for (
const auto &range : ranges) {
593 const auto start = range.first;
594 const auto end = range.second;
603 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
621 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
623 const auto slot = slotRAII.
fSlot;
627 const auto start = range.first;
628 const auto end = range.second;
631 for (
auto entry = start; entry < end; ++entry) {
637 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
645 while (!ranges.empty()) {
646 pool.
Foreach(runOnRange, ranges);
665 actionPtr->Run(slot, entry);
667 namedFilterPtr->CheckFilters(slot, entry);
679 ptr->InitSlot(
r, slot);
681 ptr->InitSlot(
r, slot);
683 ptr->InitSlot(
r, slot);
685 ptr->InitSlot(
r, slot);
707 "Empty source, range: {" + std::to_string(range.first) +
", " + std::to_string(range.second) +
"}", range);
712 auto *
tree =
r.GetTree()->GetTree();
715 auto *
file =
tree->GetCurrentFile();
716 const std::string fname =
file !=
nullptr ?
file->GetName() :
"#inmemorytree#";
718 std::pair<Long64_t, Long64_t> range =
r.GetEntriesRange();
720 if (range.second == -1) {
721 range.second =
tree->GetEntries();
723 const std::string &
id = fname +
'/' + treename;
758 ptr->ResetChildrenCount();
760 ptr->ResetChildrenCount();
772 ptr->FinalizeSlot(slot);
774 ptr->FinalizeSlot(slot);
776 ptr->FinalizeSlot(slot);
793 const std::string code = std::move(GetCodeToJit());
805 :
" in less than 1ms.");
817 actionPtr->TriggerChildrenCount();
819 namedFilterPtr->TriggerChildrenCount();
828 MaxTreeSizeRAII ctxtmts;
842 class NodesCleanerRAII {
846 NodesCleanerRAII(
RLoopManager &thisRLM) : fRLM(thisRLM) {}
850 NodesCleanerRAII runKeeper(*
this);
951 fPtr->FillReport(rep);
966 GetCodeToJit().append(code);
971 if (everyNEvents == 0ull)
979 std::vector<std::string>
filters;
981 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
1004 std::unordered_map<
void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1007 auto duplicateRLoopManagerIt = visitedMap.find((
void *)
this);
1008 if (duplicateRLoopManagerIt != visitedMap.end())
1009 return duplicateRLoopManagerIt->second;
1019 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1021 visitedMap[(
void *)
this] = thisNode;
1039 const auto key = MakeDatasetColReadersKey(col, ti);
1047 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1048 const std::type_info &ti)
1050 const auto key = MakeDatasetColReadersKey(col, ti);
1052 assert(readers.size() ==
fNSlots);
1054 for (
auto slot = 0u; slot <
fNSlots; ++slot) {
1063 std::unique_ptr<RColumnReaderBase> &&reader,
1064 const std::type_info &ti)
1067 const auto key = MakeDatasetColReadersKey(col, ti);
1069 assert(readers.find(key) == readers.end() || readers[key] ==
nullptr);
1070 auto *rptr = reader.get();
1071 readers[key] = std::move(reader);
1078 const auto key = MakeDatasetColReadersKey(col, ti);
1081 return it->second.get();
#define R__LOG_DEBUG(DEBUGLEVEL,...)
unsigned long long ULong64_t
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
R__EXTERN TVirtualMutex * gROOTMutex
#define R__LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
ULong64_t GetNEmptyEntries() const
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
bool fMustRunNamedFilters
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
const unsigned int fNSlots
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
virtual ROOT::RDF::SampleCallback_t GetSampleCallback()=0
void SetFlag(unsigned int slot)
bool CheckFlag(unsigned int slot) const
void UnsetFlag(unsigned int slot)
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
void RegisterChain(TChain &c)
A dataset specification for RDataFrame.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
A TTree is a list of TBranches.
TClass * IsA() const override
TObjArray * GetListOfLeaves()
A chain is a collection of files containing TTree objects.
TObjArray * GetListOfFiles() const
A List of entry numbers in a TTree or TChain.
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
virtual TObjArray * GetListOfBranches()
virtual TTree * GetTree() const
virtual TList * GetListOfFriends() const
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
ROOT::Experimental::RLogChannel & RDFLogChannel()
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
std::vector< std::string > ColumnNames_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
RLoopManager & fLoopManager
A RAII object to pop and push slot numbers from a RSlotStack object.