Logo ROOT  
Reference Guide
RLoopManager.cxx
Go to the documentation of this file.
1#include "RConfigure.h" // R__USE_IMT
8#include "RtypesCore.h" // Long64_t
9#include "TBranchElement.h"
10#include "TBranchObject.h"
11#include "TEntryList.h"
12#include "TFriendElement.h"
13#include "TInterpreter.h"
14#include "TROOT.h" // IsImplicitMTEnabled
15#include "TTreeReader.h"
16#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
17
18#ifdef R__USE_IMT
21#endif
22
23#include <atomic>
24#include <exception>
25#include <functional>
26#include <iostream>
27#include <memory>
28#include <stdexcept>
29#include <string>
30#include <unordered_map>
31#include <vector>
32#include <set>
33#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
34
35using namespace ROOT::Detail::RDF;
36using namespace ROOT::Internal::RDF;
37
38namespace {
39/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
40/// This allows different RLoopManager instances to share these data.
41/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
42/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
43/// much faster than jitting each RLoopManager's code separately.
44static std::string &GetCodeToJit()
45{
46 static std::string code;
47 return code;
48}
49
50static bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
51{
52 return (leaves.find(leaf) != leaves.end());
53}
54
55///////////////////////////////////////////////////////////////////////////////
56/// This overload does not perform any check on the duplicates.
57/// It is used for TBranch objects.
58static void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
59 const std::string &friendName)
60{
61
62 if (!friendName.empty()) {
63 // In case of a friend tree, users might prepend its name/alias to the branch names
64 const auto friendBName = friendName + "." + branchName;
65 if (bNamesReg.insert(friendBName).second)
66 bNames.push_back(friendBName);
67 }
68
69 if (bNamesReg.insert(branchName).second)
70 bNames.push_back(branchName);
71}
72
73///////////////////////////////////////////////////////////////////////////////
74/// This overloads makes sure that the TLeaf has not been already inserted.
75static void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
76 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf, bool allowDuplicates)
77{
78 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
79 if (!canAdd) {
80 return;
81 }
82
83 UpdateList(bNamesReg, bNames, branchName, friendName);
84
85 foundLeaves.insert(leaf);
86}
87
88static void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
89 std::string prefix, std::string &friendName)
90{
91 for (auto sb : *b->GetListOfBranches()) {
92 TBranch *subBranch = static_cast<TBranch *>(sb);
93 auto subBranchName = std::string(subBranch->GetName());
94 auto fullName = prefix + subBranchName;
95
96 std::string newPrefix;
97 if (!prefix.empty())
98 newPrefix = fullName + ".";
99
100 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName);
101
102 if (t.GetBranch(fullName.c_str()) || t.FindBranch(fullName.c_str()))
103 UpdateList(bNamesReg, bNames, fullName, friendName);
104
105 if (t.GetBranch(subBranchName.c_str()))
106 UpdateList(bNamesReg, bNames, subBranchName, friendName);
107 }
108}
109
110static void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
111 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
112{
113 std::set<TLeaf *> foundLeaves;
114 if (!analysedTrees.insert(&t).second) {
115 return;
116 }
117
118 const auto branches = t.GetListOfBranches();
119 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
120 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
121 // operations
122 if (!t.GetTree()) {
123 std::string err("GetBranchNames: error in opening the tree ");
124 err += t.GetName();
125 throw std::runtime_error(err);
126 }
127 if (branches) {
128 for (auto b : *branches) {
129 TBranch *branch = static_cast<TBranch *>(b);
130 const auto branchName = std::string(branch->GetName());
131 if (branch->IsA() == TBranch::Class()) {
132 // Leaf list
133 auto listOfLeaves = branch->GetListOfLeaves();
134 if (listOfLeaves->GetEntries() == 1) {
135 auto leaf = static_cast<TLeaf *>(listOfLeaves->At(0));
136 const auto leafName = std::string(leaf->GetName());
137 if (leafName == branchName) {
138 UpdateList(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
139 }
140 }
141
142 for (auto leaf : *listOfLeaves) {
143 auto castLeaf = static_cast<TLeaf *>(leaf);
144 const auto leafName = std::string(leaf->GetName());
145 const auto fullName = branchName + "." + leafName;
146 UpdateList(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
147 }
148 } else if (branch->IsA() == TBranchObject::Class()) {
149 // TBranchObject
150 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName);
151 UpdateList(bNamesReg, bNames, branchName, friendName);
152 } else {
153 // TBranchElement
154 // Check if there is explicit or implicit dot in the name
155
156 bool dotIsImplied = false;
157 auto be = dynamic_cast<TBranchElement *>(b);
158 if (!be)
159 throw std::runtime_error("GetBranchNames: unsupported branch type");
160 // TClonesArray (3) and STL collection (4)
161 if (be->GetType() == 3 || be->GetType() == 4)
162 dotIsImplied = true;
163
164 if (dotIsImplied || branchName.back() == '.')
165 ExploreBranch(t, bNamesReg, bNames, branch, "", friendName);
166 else
167 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName);
168
169 UpdateList(bNamesReg, bNames, branchName, friendName);
170 }
171 }
172 }
173
174 auto friendTrees = t.GetListOfFriends();
175
176 if (!friendTrees)
177 return;
178
179 for (auto friendTreeObj : *friendTrees) {
180 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
181
182 std::string frName;
183 auto alias = t.GetFriendAlias(friendTree);
184 if (alias != nullptr)
185 frName = std::string(alias);
186 else
187 frName = std::string(friendTree->GetName());
188
189 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
190 }
191}
192
193static void ThrowIfPoolSizeChanged(unsigned int nSlots)
194{
195 const auto poolSize = ROOT::GetThreadPoolSize();
196 const bool isSingleThreadRun = (poolSize == 0 && nSlots == 1);
197 if (!isSingleThreadRun && poolSize != nSlots) {
198 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the size of the thread pool was " +
199 std::to_string(nSlots) + ", but when starting the event loop it was " +
200 std::to_string(poolSize) + ".";
201 if (poolSize > nSlots)
202 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
203 else
204 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
205 throw std::runtime_error(msg);
206 }
207}
208
209/**
210\struct MaxTreeSizeRAII
211\brief Scope-bound change of `TTree::fgMaxTreeSize`.
212
213This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
214changes it to maximum at construction time and restores it back at destruction
215time. Needed for issue #6523 and should be reverted when #6640 will be solved.
216*/
217struct MaxTreeSizeRAII {
218 Long64_t fOldMaxTreeSize;
219
220 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
221 {
222 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
223 }
224
225 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
226};
227
228} // anonymous namespace
229
230///////////////////////////////////////////////////////////////////////////////
231/// Get all the branches names, including the ones of the friend trees
232ColumnNames_t ROOT::Internal::RDF::GetBranchNames(TTree &t, bool allowDuplicates)
233{
234 std::set<std::string> bNamesSet;
235 ColumnNames_t bNames;
236 std::set<TTree *> analysedTrees;
237 std::string emptyFrName = "";
238 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
239 return bNames;
240}
241
242RLoopManager::RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
243 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
244 fNSlots(RDFInternal::GetNSlots()),
245 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles)
246{
247}
248
250 : fNEmptyEntries(nEmptyEntries), fNSlots(RDFInternal::GetNSlots()),
251 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles)
252{
253}
254
255RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
256 : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
257 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
258 fDataSource(std::move(ds))
259{
260 fDataSource->SetNSlots(fNSlots);
261}
262
263// ROOT-9559: we cannot handle indexed friends
265{
266 auto friends = fTree->GetListOfFriends();
267 if (!friends)
268 return;
269 for (auto friendElObj : *friends) {
270 auto friendEl = static_cast<TFriendElement *>(friendElObj);
271 auto friendTree = friendEl->GetTree();
272 if (friendTree && friendTree->GetTreeIndex()) {
273 std::string err = fTree->GetName();
274 err += " has a friend, \"";
275 err += friendTree->GetName();
276 err += "\", which has an index. This is not supported.";
277 throw std::runtime_error(err);
278 }
279 }
280}
281
282/// Run event loop with no source files, in parallel.
284{
285#ifdef R__USE_IMT
286 RSlotStack slotStack(fNSlots);
287 // Working with an empty tree.
288 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
289 const auto nEntriesPerSlot = fNEmptyEntries / (fNSlots * 2);
290 auto remainder = fNEmptyEntries % (fNSlots * 2);
291 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
292 ULong64_t start = 0;
293 while (start < fNEmptyEntries) {
294 ULong64_t end = start + nEntriesPerSlot;
295 if (remainder > 0) {
296 ++end;
297 --remainder;
298 }
299 entryRanges.emplace_back(start, end);
300 start = end;
301 }
302
303 // Each task will generate a subrange of entries
304 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
305 auto slot = slotStack.GetSlot();
306 InitNodeSlots(nullptr, slot);
307 try {
308 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
309 RunAndCheckFilters(slot, currEntry);
310 }
311 } catch (...) {
312 CleanUpTask(slot);
313 // Error might throw in experiment frameworks like CMSSW
314 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
315 throw;
316 }
317 CleanUpTask(slot);
318 slotStack.ReturnSlot(slot);
319 };
320
322 pool.Foreach(genFunction, entryRanges);
323
324#endif // not implemented otherwise
325}
326
327/// Run event loop with no source files, in sequence.
329{
330 InitNodeSlots(nullptr, 0);
331 try {
332 for (ULong64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
333 RunAndCheckFilters(0, currEntry);
334 }
335 } catch (...) {
336 CleanUpTask(0u);
337 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
338 throw;
339 }
340 CleanUpTask(0u);
341}
342
343/// Run event loop over one or multiple ROOT files, in parallel.
345{
346#ifdef R__USE_IMT
348 RSlotStack slotStack(fNSlots);
349 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
350 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots);
351
352 std::atomic<ULong64_t> entryCount(0ull);
353
354 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
355 auto slot = slotStack.GetSlot();
356 InitNodeSlots(&r, slot);
357 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
358 const auto nEntries = entryRange.second - entryRange.first;
359 auto count = entryCount.fetch_add(nEntries);
360 try {
361 // recursive call to check filters and conditionally execute actions
362 while (r.Next()) {
363 RunAndCheckFilters(slot, count++);
364 }
365 } catch (...) {
366 CleanUpTask(slot);
367 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
368 throw;
369 }
370 CleanUpTask(slot);
371 slotStack.ReturnSlot(slot);
372 });
373#endif // no-op otherwise (will not be called)
374}
375
376/// Run event loop over one or multiple ROOT files, in sequence.
378{
380 TTreeReader r(fTree.get(), fTree->GetEntryList());
381 if (0 == fTree->GetEntriesFast())
382 return;
383 InitNodeSlots(&r, 0);
384
385 // recursive call to check filters and conditionally execute actions
386 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
387 try {
388 while (r.Next() && fNStopsReceived < fNChildren) {
389 RunAndCheckFilters(0, r.GetCurrentEntry());
390 }
391 } catch (...) {
392 CleanUpTask(0u);
393 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
394 throw;
395 }
396 if (r.GetEntryStatus() != TTreeReader::kEntryNotFound && fNStopsReceived < fNChildren) {
397 // something went wrong in the TTreeReader event loop
398 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
399 std::to_string(r.GetEntryStatus()));
400 }
401 CleanUpTask(0u);
402}
403
404/// Run event loop over data accessed through a DataSource, in sequence.
406{
407 R__ASSERT(fDataSource != nullptr);
408 fDataSource->Initialise();
409 auto ranges = fDataSource->GetEntryRanges();
410 while (!ranges.empty() && fNStopsReceived < fNChildren) {
411 InitNodeSlots(nullptr, 0u);
412 fDataSource->InitSlot(0u, 0ull);
413 try {
414 for (const auto &range : ranges) {
415 auto end = range.second;
416 for (auto entry = range.first; entry < end && fNStopsReceived < fNChildren; ++entry) {
417 if (fDataSource->SetEntry(0u, entry)) {
418 RunAndCheckFilters(0u, entry);
419 }
420 }
421 }
422 } catch (...) {
423 CleanUpTask(0u);
424 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
425 throw;
426 }
427 CleanUpTask(0u);
428 fDataSource->FinaliseSlot(0u);
429 ranges = fDataSource->GetEntryRanges();
430 }
431 fDataSource->Finalise();
432}
433
434/// Run event loop over data accessed through a DataSource, in parallel.
436{
437#ifdef R__USE_IMT
438 R__ASSERT(fDataSource != nullptr);
439 RSlotStack slotStack(fNSlots);
441
442 // Each task works on a subrange of entries
443 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
444 const auto slot = slotStack.GetSlot();
445 InitNodeSlots(nullptr, slot);
446 fDataSource->InitSlot(slot, range.first);
447 const auto end = range.second;
448 try {
449 for (auto entry = range.first; entry < end; ++entry) {
450 if (fDataSource->SetEntry(slot, entry)) {
451 RunAndCheckFilters(slot, entry);
452 }
453 }
454 } catch (...) {
455 CleanUpTask(slot);
456 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
457 throw;
458 }
459 CleanUpTask(slot);
460 fDataSource->FinaliseSlot(slot);
461 slotStack.ReturnSlot(slot);
462 };
463
464 fDataSource->Initialise();
465 auto ranges = fDataSource->GetEntryRanges();
466 while (!ranges.empty()) {
467 pool.Foreach(runOnRange, ranges);
468 ranges = fDataSource->GetEntryRanges();
469 }
470 fDataSource->Finalise();
471#endif // not implemented otherwise (never called)
472}
473
474/// Execute actions and make sure named filters are called for each event.
475/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
476void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
477{
478 for (auto &actionPtr : fBookedActions)
479 actionPtr->Run(slot, entry);
480 for (auto &namedFilterPtr : fBookedNamedFilters)
481 namedFilterPtr->CheckFilters(slot, entry);
482 for (auto &callback : fCallbacks)
483 callback(slot);
484}
485
486/// Build TTreeReaderValues for all nodes
487/// This method loops over all filters, actions and other booked objects and
488/// calls their `InitRDFValues` methods. It is called once per node per slot, before
489/// running the event loop. It also informs each node of the TTreeReader that
490/// a particular slot will be using.
492{
493 for (auto &ptr : fBookedActions)
494 ptr->InitSlot(r, slot);
495 for (auto &ptr : fBookedFilters)
496 ptr->InitSlot(r, slot);
497 for (auto &callback : fCallbacksOnce)
498 callback(slot);
499}
500
501/// Initialize all nodes of the functional graph before running the event loop.
502/// This method is called once per event-loop and performs generic initialization
503/// operations that do not depend on the specific processing slot (i.e. operations
504/// that are common for all threads).
506{
508 for (auto &filter : fBookedFilters)
509 filter->InitNode();
510 for (auto &range : fBookedRanges)
511 range->InitNode();
512 for (auto &ptr : fBookedActions)
513 ptr->Initialize();
514}
515
516/// Perform clean-up operations. To be called at the end of each event loop.
518{
519 fMustRunNamedFilters = false;
520
521 // forget RActions and detach TResultProxies
522 for (auto &ptr : fBookedActions)
523 ptr->Finalize();
524
525 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
526 fBookedActions.clear();
527
528 // reset children counts
529 fNChildren = 0;
530 fNStopsReceived = 0;
531 for (auto &ptr : fBookedFilters)
532 ptr->ResetChildrenCount();
533 for (auto &ptr : fBookedRanges)
534 ptr->ResetChildrenCount();
535
536 fCallbacks.clear();
537 fCallbacksOnce.clear();
538}
539
540/// Perform clean-up operations. To be called at the end of each task execution.
541void RLoopManager::CleanUpTask(unsigned int slot)
542{
543 for (auto &ptr : fBookedActions)
544 ptr->FinalizeSlot(slot);
545 for (auto &ptr : fBookedFilters)
546 ptr->ClearTask(slot);
547}
548
549/// Add RDF nodes that require just-in-time compilation to the computation graph.
550/// This method also clears the contents of GetCodeToJit().
552{
553 const std::string code = std::move(GetCodeToJit());
554 if (code.empty())
555 return;
556
557 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
558}
559
560/// Trigger counting of number of children nodes for each node of the functional graph.
561/// This is done once before starting the event loop. Each action sends an `increase children count` signal
562/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
563/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
564/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
565/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
567{
568 for (auto &actionPtr : fBookedActions)
569 actionPtr->TriggerChildrenCount();
570 for (auto &namedFilterPtr : fBookedNamedFilters)
571 namedFilterPtr->TriggerChildrenCount();
572}
573
574/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
575/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
577{
578 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
579 MaxTreeSizeRAII ctxtmts;
580
581 ThrowIfPoolSizeChanged(GetNSlots());
582
583 Jit();
584
585 InitNodes();
586
587 switch (fLoopType) {
591 case ELoopType::kNoFiles: RunEmptySource(); break;
594 }
595
596 CleanUpNodes();
597
598 fNRuns++;
599}
600
601/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
602const ColumnNames_t &RLoopManager::GetDefaultColumnNames() const
603{
604 return fDefaultColumns;
605}
606
608{
609 return fTree.get();
610}
611
613{
614 fBookedActions.emplace_back(actionPtr);
615}
616
618{
619 RDFInternal::Erase(actionPtr, fRunActions);
620 RDFInternal::Erase(actionPtr, fBookedActions);
621}
622
624{
625 fBookedFilters.emplace_back(filterPtr);
626 if (filterPtr->HasName()) {
627 fBookedNamedFilters.emplace_back(filterPtr);
629 }
630}
631
633{
634 RDFInternal::Erase(filterPtr, fBookedFilters);
635 RDFInternal::Erase(filterPtr, fBookedNamedFilters);
636}
637
639{
640 fBookedRanges.emplace_back(rangePtr);
641}
642
644{
645 RDFInternal::Erase(rangePtr, fBookedRanges);
646}
647
648// dummy call, end of recursive chain of calls
650{
651 return true;
652}
653
654/// Call `FillReport` on all booked filters
656{
657 for (const auto &fPtr : fBookedNamedFilters)
658 fPtr->FillReport(rep);
659}
660
661void RLoopManager::ToJitExec(const std::string &code) const
662{
663 GetCodeToJit().append(code);
664}
665
666void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
667{
668 if (everyNEvents == 0ull)
669 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
670 else
671 fCallbacks.emplace_back(everyNEvents, std::move(f), fNSlots);
672}
673
674std::vector<std::string> RLoopManager::GetFiltersNames()
675{
676 std::vector<std::string> filters;
677 for (auto &filter : fBookedFilters) {
678 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
679 filters.push_back(name);
680 }
681 return filters;
682}
683
684std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions()
685{
686 std::vector<RDFInternal::RActionBase *> actions;
687 actions.insert(actions.begin(), fBookedActions.begin(), fBookedActions.end());
688 actions.insert(actions.begin(), fRunActions.begin(), fRunActions.end());
689 return actions;
690}
691
692std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph()
693{
694 std::string name;
695 if (fDataSource) {
696 name = fDataSource->GetLabel();
697 } else if (fTree) {
698 name = fTree->GetName();
699 } else {
700 name = std::to_string(fNEmptyEntries);
701 }
702
703 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(name);
704 thisNode->SetRoot();
705 thisNode->SetCounter(0);
706 return thisNode;
707}
708
709////////////////////////////////////////////////////////////////////////////
710/// Return all valid TTree::Branch names (caching results for subsequent calls).
711/// Never use fBranchNames directy, always request it through this method.
712const ColumnNames_t &RLoopManager::GetBranchNames()
713{
714 if (fValidBranchNames.empty() && fTree) {
715 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
716 }
717 return fValidBranchNames;
718}
void Class()
Definition: Class.C:29
ROOT::R::TRInterface & r
Definition: Object.C:4
#define b(i)
Definition: RSha256.hxx:100
#define f(i)
Definition: RSha256.hxx:104
long long Long64_t
Definition: RtypesCore.h:71
unsigned long long ULong64_t
Definition: RtypesCore.h:72
#define R__ASSERT(e)
Definition: TError.h:96
const char * filters[]
char name[80]
Definition: TGX11.cxx:109
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > GetAllActions()
For all the actions, either booked or run.
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::vector< RRangeBase * > fBookedRanges
std::vector< TCallback > fCallbacks
Registered callbacks.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
unsigned int GetNSlots() const
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: RNodeBase.hxx:45
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: RNodeBase.hxx:44
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
Definition: RSlotStack.hxx:26
void ReturnSlot(unsigned int slotNumber)
Definition: RSlotStack.cxx:23
This class provides a simple interface to execute the same task multiple times in parallel,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
A Branch for the case of an object.
A TTree is a list of TBranches.
Definition: TBranch.h:91
TObjArray * GetListOfLeaves()
Definition: TBranch.h:245
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
A TFriendElement TF describes a TTree object TF in a file.
virtual TTree * GetTree()
Return pointer to friend TTree.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition: TLeaf.h:49
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:43
@ kEntryNotFound
the tree entry number does not exist
Definition: TTreeReader.h:128
A TTree represents a columnar dataset.
Definition: TTree.h:78
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition: TTree.cxx:4762
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition: TTree.cxx:5209
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition: TTree.cxx:9044
virtual TObjArray * GetListOfBranches()
Definition: TTree.h:482
virtual TTree * GetTree() const
Definition: TTree.h:511
virtual TList * GetListOfFriends() const
Definition: TTree.h:484
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition: TTree.cxx:5937
ColumnNames_t GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition: RDFUtils.cxx:270
Long64_t InterpreterCalc(const std::string &code, const std::string &context)
Definition: RDFUtils.cxx:312
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:557
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Definition: TROOT.cxx:564
Definition: tree.py:1