Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
20#include "ROOT/RLogger.hxx"
21#include "RtypesCore.h" // Long64_t
22#include "TStopwatch.h"
23#include "TBranchElement.h"
24#include "TBranchObject.h"
25#include "TChain.h"
26#include "TEntryList.h"
27#include "TFile.h"
28#include "TFriendElement.h"
29#include "TInterpreter.h"
30#include "TROOT.h" // IsImplicitMTEnabled
31#include "TTreeReader.h"
32#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
33
34#ifdef R__USE_IMT
37#endif
38
39#include <algorithm>
40#include <atomic>
41#include <cassert>
42#include <exception>
43#include <functional>
44#include <iostream>
45#include <memory>
46#include <stdexcept>
47#include <string>
48#include <sstream>
49#include <thread>
50#include <unordered_map>
51#include <vector>
52#include <set>
53#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
54
55using namespace ROOT::Detail::RDF;
56using namespace ROOT::Internal::RDF;
57
58namespace {
59/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
60/// This allows different RLoopManager instances to share these data.
61/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
62/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
63/// much faster than jitting each RLoopManager's code separately.
64static std::string &GetCodeToJit()
65{
66 static std::string code;
67 return code;
68}
69
70static bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
71{
72 return (leaves.find(leaf) != leaves.end());
73}
74
75///////////////////////////////////////////////////////////////////////////////
76/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
77/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
78static void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
79 const std::string &friendName, bool allowDuplicates)
80{
81 if (!friendName.empty()) {
82 // In case of a friend tree, users might prepend its name/alias to the branch names
83 const auto friendBName = friendName + "." + branchName;
84 if (bNamesReg.insert(friendBName).second)
85 bNames.push_back(friendBName);
86 }
87
88 if (allowDuplicates || friendName.empty()) {
89 if (bNamesReg.insert(branchName).second)
90 bNames.push_back(branchName);
91 }
92}
93
94///////////////////////////////////////////////////////////////////////////////
95/// This overload makes sure that the TLeaf has not been already inserted.
96static void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
97 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
98 bool allowDuplicates)
99{
100 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
101 if (!canAdd) {
102 return;
103 }
104
105 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
106
107 foundLeaves.insert(leaf);
108}
109
110static void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
111 std::string prefix, std::string &friendName, bool allowDuplicates)
112{
113 for (auto sb : *b->GetListOfBranches()) {
114 TBranch *subBranch = static_cast<TBranch *>(sb);
115 auto subBranchName = std::string(subBranch->GetName());
116 auto fullName = prefix + subBranchName;
117
118 std::string newPrefix;
119 if (!prefix.empty())
120 newPrefix = fullName + ".";
121
122 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName, allowDuplicates);
123
124 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
125 if (!branchDirectlyFromTree)
126 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
127 if (branchDirectlyFromTree)
128 InsertBranchName(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName,
129 allowDuplicates);
130
131 if (t.GetBranch(subBranchName.c_str()))
132 InsertBranchName(bNamesReg, bNames, subBranchName, friendName, allowDuplicates);
133 }
134}
135
136static void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
137 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
138{
139 std::set<TLeaf *> foundLeaves;
140 if (!analysedTrees.insert(&t).second) {
141 return;
142 }
143
144 const auto branches = t.GetListOfBranches();
145 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
146 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
147 // operations
148 if (!t.GetTree()) {
149 std::string err("GetBranchNames: error in opening the tree ");
150 err += t.GetName();
151 throw std::runtime_error(err);
152 }
153 if (branches) {
154 for (auto b : *branches) {
155 TBranch *branch = static_cast<TBranch *>(b);
156 const auto branchName = std::string(branch->GetName());
157 if (branch->IsA() == TBranch::Class()) {
158 // Leaf list
159 auto listOfLeaves = branch->GetListOfLeaves();
160 if (listOfLeaves->GetEntriesUnsafe() == 1) {
161 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
162 InsertBranchName(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
163 }
164
165 for (auto leaf : *listOfLeaves) {
166 auto castLeaf = static_cast<TLeaf *>(leaf);
167 const auto leafName = std::string(leaf->GetName());
168 const auto fullName = branchName + "." + leafName;
169 InsertBranchName(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
170 }
171 } else if (branch->IsA() == TBranchObject::Class()) {
172 // TBranchObject
173 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName, allowDuplicates);
174 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
175 } else {
176 // TBranchElement
177 // Check if there is explicit or implicit dot in the name
178
179 bool dotIsImplied = false;
180 auto be = dynamic_cast<TBranchElement *>(b);
181 if (!be)
182 throw std::runtime_error("GetBranchNames: unsupported branch type");
183 // TClonesArray (3) and STL collection (4)
184 if (be->GetType() == 3 || be->GetType() == 4)
185 dotIsImplied = true;
186
187 if (dotIsImplied || branchName.back() == '.')
188 ExploreBranch(t, bNamesReg, bNames, branch, "", friendName, allowDuplicates);
189 else
190 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName, allowDuplicates);
191
192 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
193 }
194 }
195 }
196
197 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
198 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
199 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
200 auto friendTrees = t.GetTree()->GetListOfFriends();
201
202 if (!friendTrees)
203 return;
204
205 for (auto friendTreeObj : *friendTrees) {
206 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
207
208 std::string frName;
209 auto alias = t.GetFriendAlias(friendTree);
210 if (alias != nullptr)
211 frName = std::string(alias);
212 else
213 frName = std::string(friendTree->GetName());
214
215 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
216 }
217}
218
219static void ThrowIfNSlotsChanged(unsigned int nSlots)
220{
221 const auto currentSlots = RDFInternal::GetNSlots();
222 if (currentSlots != nSlots) {
223 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
224 std::to_string(nSlots) + ", but when starting the event loop it was " +
225 std::to_string(currentSlots) + ".";
226 if (currentSlots > nSlots)
227 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
228 else
229 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
230 throw std::runtime_error(msg);
231 }
232}
233
234/**
235\struct MaxTreeSizeRAII
236\brief Scope-bound change of `TTree::fgMaxTreeSize`.
237
238This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
239changes it to maximum at construction time and restores it back at destruction
240time. Needed for issue #6523 and should be reverted when #6640 will be solved.
241*/
242struct MaxTreeSizeRAII {
243 Long64_t fOldMaxTreeSize;
244
245 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
246 {
247 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
248 }
249
250 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
251};
252
253struct DatasetLogInfo {
254 std::string fDataSet;
255 ULong64_t fRangeStart;
256 ULong64_t fRangeEnd;
257 unsigned int fSlot;
258};
259
260std::string LogRangeProcessing(const DatasetLogInfo &info)
261{
262 std::stringstream msg;
263 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
264 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
265 return msg.str();
266}
267
268DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
269{
270 const auto tree = r.GetTree();
271 const auto chain = dynamic_cast<TChain *>(tree);
272 std::string what;
273 if (chain) {
274 auto files = chain->GetListOfFiles();
275 std::vector<std::string> treeNames;
276 std::vector<std::string> fileNames;
277 for (TObject *f : *files) {
278 treeNames.emplace_back(f->GetName());
279 fileNames.emplace_back(f->GetTitle());
280 }
281 what = "trees {";
282 for (const auto &t : treeNames) {
283 what += t + ",";
284 }
285 what.back() = '}';
286 what += " in files {";
287 for (const auto &f : fileNames) {
288 what += f + ",";
289 }
290 what.back() = '}';
291 } else {
292 assert(tree != nullptr); // to make clang-tidy happy
293 const auto treeName = tree->GetName();
294 what = std::string("tree \"") + treeName + "\"";
295 const auto file = tree->GetCurrentFile();
296 if (file)
297 what += std::string(" in file \"") + file->GetName() + "\"";
298 }
299 const auto entryRange = r.GetEntriesRange();
300 const ULong64_t end = entryRange.second == -1ll ? tree->GetEntries() : entryRange.second;
301 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
302}
303
304} // anonymous namespace
305
306namespace ROOT {
307namespace Detail {
308namespace RDF {
309
310/// A RAII object that calls RLoopManager::CleanUpTask at destruction
313 unsigned int fArg;
315
316 RCallCleanUpTask(RLoopManager &lm, unsigned int arg = 0u, TTreeReader *reader = nullptr)
317 : fLoopManager(lm), fArg(arg), fReader(reader)
318 {
319 }
321};
322
323} // namespace RDF
324} // namespace Detail
325} // namespace ROOT
326
327///////////////////////////////////////////////////////////////////////////////
328/// Get all the branches names, including the ones of the friend trees
330{
331 std::set<std::string> bNamesSet;
332 ColumnNames_t bNames;
333 std::set<TTree *> analysedTrees;
334 std::string emptyFrName = "";
335 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
336 return bNames;
337}
338
340 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
341 fNSlots(RDFInternal::GetNSlots()),
342 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
343 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
344{
345}
346
348 : fNEmptyEntries(nEmptyEntries), fNSlots(RDFInternal::GetNSlots()),
349 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles), fNewSampleNotifier(fNSlots),
350 fSampleInfos(fNSlots)
351{
352}
353
354RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
355 : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
356 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
357 fDataSource(std::move(ds)), fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
358{
359 fDataSource->SetNSlots(fNSlots);
360}
361
362RLoopManager::RLoopManager(ROOT::Internal::RDF::RDatasetSpec &&spec)
363 : fBeginEntry(spec.fEntryRange.fBegin), fEndEntry(spec.fEntryRange.fEnd), fNSlots(RDFInternal::GetNSlots()),
364 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
365 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
366{
367 auto chain = std::make_shared<TChain>(spec.fTreeNames.size() == 1 ? spec.fTreeNames[0].c_str() : "");
368 if (spec.fTreeNames.size() == 1) {
369 // A TChain has a global name, that is the name of single tree
370 // The global name of the chain is also the name of each tree in the list
371 // of files that make the chain.
372 for (const auto &f : spec.fFileNameGlobs)
373 chain->Add(f.c_str());
374 } else {
375 // Some other times, each different file has its own tree name, we need to
376 // reconstruct the full path to the tree in each file and pass that to
377 for (auto i = 0u; i < spec.fFileNameGlobs.size(); i++) {
378 const auto fullpath = spec.fFileNameGlobs[i] + "?#" + spec.fTreeNames[i];
379 chain->Add(fullpath.c_str());
380 }
381 }
382 SetTree(std::move(chain));
383
384 // Create the friends from the list of friends
385 const auto &friendNames = spec.fFriendInfo.fFriendNames;
386 const auto &friendFileNames = spec.fFriendInfo.fFriendFileNames;
387 const auto &friendChainSubNames = spec.fFriendInfo.fFriendChainSubNames;
388 const auto nFriends = friendNames.size();
389
390 for (auto i = 0u; i < nFriends; ++i) {
391 const auto &thisFriendName = friendNames[i].first;
392 const auto &thisFriendAlias = friendNames[i].second;
393 const auto &thisFriendFiles = friendFileNames[i];
394 const auto &thisFriendChainSubNames = friendChainSubNames[i];
395
396 // Build a friend chain
397 auto frChain = std::make_unique<TChain>(thisFriendName.c_str());
398 const auto nFileNames = friendFileNames[i].size();
399 if (thisFriendChainSubNames.empty()) {
400 // If there are no chain subnames, the friend was a TTree. It's safe
401 // to add to the chain the filename directly.
402 for (auto j = 0u; j < nFileNames; ++j) {
403 frChain->Add(thisFriendFiles[j].c_str());
404 }
405 } else {
406 // Otherwise, the new friend chain needs to be built using the nomenclature
407 // "filename?#treename" as argument to `TChain::Add`
408 for (auto j = 0u; j < nFileNames; ++j) {
409 frChain->Add((thisFriendFiles[j] + "?#" + thisFriendChainSubNames[j]).c_str());
410 }
411 }
412
413 // Make it friends with the main chain
414 fTree->AddFriend(frChain.get(), thisFriendAlias.c_str());
415 // the friend trees must have same lifetime as fTree
416 fFriends.emplace_back(std::move(frChain));
417 }
418}
419
420struct RSlotRAII {
422 unsigned int fSlot;
423 RSlotRAII(RSlotStack &slotStack) : fSlotStack(slotStack), fSlot(slotStack.GetSlot()) {}
424 ~RSlotRAII() { fSlotStack.ReturnSlot(fSlot); }
425};
426
427/// Run event loop with no source files, in parallel.
429{
430#ifdef R__USE_IMT
431 RSlotStack slotStack(fNSlots);
432 // Working with an empty tree.
433 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
434 const auto nEntriesPerSlot = fNEmptyEntries / (fNSlots * 2);
435 auto remainder = fNEmptyEntries % (fNSlots * 2);
436 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
437 ULong64_t start = 0;
438 while (start < fNEmptyEntries) {
439 ULong64_t end = start + nEntriesPerSlot;
440 if (remainder > 0) {
441 ++end;
442 --remainder;
443 }
444 entryRanges.emplace_back(start, end);
445 start = end;
446 }
447
448 // Each task will generate a subrange of entries
449 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
450 RSlotRAII slotRAII(slotStack);
451 auto slot = slotRAII.fSlot;
452 RCallCleanUpTask cleanup(*this, slot);
453 InitNodeSlots(nullptr, slot);
454 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
455 try {
456 UpdateSampleInfo(slot, range);
457 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
458 RunAndCheckFilters(slot, currEntry);
459 }
460 } catch (...) {
461 // Error might throw in experiment frameworks like CMSSW
462 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
463 throw;
464 }
465 };
466
468 pool.Foreach(genFunction, entryRanges);
469
470#endif // not implemented otherwise
471}
472
473/// Run event loop with no source files, in sequence.
475{
476 InitNodeSlots(nullptr, 0);
477 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({"an empty source", 0, fNEmptyEntries, 0u});
478 RCallCleanUpTask cleanup(*this);
479 try {
480 UpdateSampleInfo(/*slot*/0, {0, fNEmptyEntries});
481 for (ULong64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
482 RunAndCheckFilters(0, currEntry);
483 }
484 } catch (...) {
485 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
486 throw;
487 }
488}
489
490/// Run event loop over one or multiple ROOT files, in parallel.
492{
493#ifdef R__USE_IMT
494 RSlotStack slotStack(fNSlots);
495 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
496 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots);
497
498 std::atomic<ULong64_t> entryCount(0ull);
499
500 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
501 RSlotRAII slotRAII(slotStack);
502 auto slot = slotRAII.fSlot;
503 RCallCleanUpTask cleanup(*this, slot, &r);
504 InitNodeSlots(&r, slot);
505 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, slot));
506 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
507 const auto nEntries = entryRange.second - entryRange.first;
508 auto count = entryCount.fetch_add(nEntries);
509 try {
510 // recursive call to check filters and conditionally execute actions
511 while (r.Next()) {
512 if (fNewSampleNotifier.CheckFlag(slot)) {
513 UpdateSampleInfo(slot, r);
514 }
515 RunAndCheckFilters(slot, count++);
516 }
517 } catch (...) {
518 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
519 throw;
520 }
521 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
522 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
523 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
524 // something went wrong in the TTreeReader event loop
525 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
526 std::to_string(r.GetEntryStatus()));
527 }
528 });
529#endif // no-op otherwise (will not be called)
530}
531
532/// Run event loop over one or multiple ROOT files, in sequence.
534{
535 TTreeReader r(fTree.get(), fTree->GetEntryList());
536 if (0 == fTree->GetEntriesFast() || fBeginEntry == fEndEntry)
537 return;
538 // Apply the range if there is any
539 // In case of a chain with a total of N entries, calling SetEntriesRange(N + 1, ...) does not error out
540 // This is a bug, reported here: https://github.com/root-project/root/issues/10774
541 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
542 if (r.SetEntriesRange(fBeginEntry, fEndEntry) != TTreeReader::kEntryValid)
543 throw std::logic_error("Something went wrong in initializing the TTreeReader.");
544 RCallCleanUpTask cleanup(*this, 0u, &r);
545 InitNodeSlots(&r, 0);
546 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, 0u));
547
548 // recursive call to check filters and conditionally execute actions
549 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
550 try {
551 while (r.Next() && fNStopsReceived < fNChildren) {
553 UpdateSampleInfo(/*slot*/0, r);
554 }
555 RunAndCheckFilters(0, r.GetCurrentEntry());
556 }
557 } catch (...) {
558 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
559 throw;
560 }
561 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
562 // something went wrong in the TTreeReader event loop
563 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
564 std::to_string(r.GetEntryStatus()));
565 }
566}
567
568/// Run event loop over data accessed through a DataSource, in sequence.
570{
571 assert(fDataSource != nullptr);
572 fDataSource->Initialise();
573 auto ranges = fDataSource->GetEntryRanges();
574 while (!ranges.empty() && fNStopsReceived < fNChildren) {
575 InitNodeSlots(nullptr, 0u);
576 fDataSource->InitSlot(0u, 0ull);
577 RCallCleanUpTask cleanup(*this);
578 try {
579 for (const auto &range : ranges) {
580 const auto start = range.first;
581 const auto end = range.second;
582 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
583 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
584 if (fDataSource->SetEntry(0u, entry)) {
585 RunAndCheckFilters(0u, entry);
586 }
587 }
588 }
589 } catch (...) {
590 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
591 throw;
592 }
593 fDataSource->FinaliseSlot(0u);
594 ranges = fDataSource->GetEntryRanges();
595 }
596 fDataSource->Finalise();
597}
598
599/// Run event loop over data accessed through a DataSource, in parallel.
601{
602#ifdef R__USE_IMT
603 assert(fDataSource != nullptr);
604 RSlotStack slotStack(fNSlots);
606
607 // Each task works on a subrange of entries
608 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
609 RSlotRAII slotRAII(slotStack);
610 const auto slot = slotRAII.fSlot;
611 InitNodeSlots(nullptr, slot);
612 RCallCleanUpTask cleanup(*this, slot);
613 fDataSource->InitSlot(slot, range.first);
614 const auto start = range.first;
615 const auto end = range.second;
616 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
617 try {
618 for (auto entry = start; entry < end; ++entry) {
619 if (fDataSource->SetEntry(slot, entry)) {
620 RunAndCheckFilters(slot, entry);
621 }
622 }
623 } catch (...) {
624 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
625 throw;
626 }
627 fDataSource->FinaliseSlot(slot);
628 };
629
630 fDataSource->Initialise();
631 auto ranges = fDataSource->GetEntryRanges();
632 while (!ranges.empty()) {
633 pool.Foreach(runOnRange, ranges);
634 ranges = fDataSource->GetEntryRanges();
635 }
636 fDataSource->Finalise();
637#endif // not implemented otherwise (never called)
638}
639
640/// Execute actions and make sure named filters are called for each event.
641/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
642void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
643{
644 // data-block callbacks run before the rest of the graph
645 if (fNewSampleNotifier.CheckFlag(slot)) {
646 for (auto &callback : fSampleCallbacks)
647 callback.second(slot, fSampleInfos[slot]);
649 }
650
651 for (auto &actionPtr : fBookedActions)
652 actionPtr->Run(slot, entry);
653 for (auto &namedFilterPtr : fBookedNamedFilters)
654 namedFilterPtr->CheckFilters(slot, entry);
655 for (auto &callback : fCallbacks)
656 callback(slot);
657}
658
659/// Build TTreeReaderValues for all nodes
660/// This method loops over all filters, actions and other booked objects and
661/// calls their `InitSlot` method, to get them ready for running a task.
663{
664 SetupSampleCallbacks(r, slot);
665 for (auto &ptr : fBookedActions)
666 ptr->InitSlot(r, slot);
667 for (auto &ptr : fBookedFilters)
668 ptr->InitSlot(r, slot);
669 for (auto &ptr : fBookedDefines)
670 ptr->InitSlot(r, slot);
671 for (auto &ptr : fBookedVariations)
672 ptr->InitSlot(r, slot);
673
674 for (auto &callback : fCallbacksOnce)
675 callback(slot);
676}
677
679 if (r != nullptr) {
680 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
681 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
682 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
683 }
684 // Whatever the data source, initially set the "new data block" flag:
685 // - for TChains, this ensures that we don't skip the first data block because
686 // the correct tree is already loaded
687 // - for RDataSources and empty sources, which currently don't have data blocks, this
688 // ensures that we run once per task
690}
691
692void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
694 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
695}
696
698 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
699 auto *tree = r.GetTree()->GetTree();
700 R__ASSERT(tree != nullptr);
701 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
702 auto *file = tree->GetCurrentFile();
703 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
704
705
706 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
707 R__ASSERT(range.first >= 0);
708 if (range.second == -1) {
709 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
710 }
711
712 fSampleInfos[slot] = RSampleInfo(fname + "/" + treename, range);
713}
714
715/// Initialize all nodes of the functional graph before running the event loop.
716/// This method is called once per event-loop and performs generic initialization
717/// operations that do not depend on the specific processing slot (i.e. operations
718/// that are common for all threads).
720{
722 for (auto &filter : fBookedFilters)
723 filter->InitNode();
724 for (auto &range : fBookedRanges)
725 range->InitNode();
726 for (auto &ptr : fBookedActions)
727 ptr->Initialize();
728}
729
730/// Perform clean-up operations. To be called at the end of each event loop.
732{
733 fMustRunNamedFilters = false;
734
735 // forget RActions and detach TResultProxies
736 for (auto &ptr : fBookedActions)
737 ptr->Finalize();
738
739 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
740 fBookedActions.clear();
741
742 // reset children counts
743 fNChildren = 0;
744 fNStopsReceived = 0;
745 for (auto &ptr : fBookedFilters)
746 ptr->ResetChildrenCount();
747 for (auto &ptr : fBookedRanges)
748 ptr->ResetChildrenCount();
749
750 fCallbacks.clear();
751 fCallbacksOnce.clear();
752 fSampleCallbacks.clear();
753}
754
755/// Perform clean-up operations. To be called at the end of each task execution.
756void RLoopManager::CleanUpTask(TTreeReader *r, unsigned int slot)
757{
758 if (r != nullptr)
759 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
760 for (auto &ptr : fBookedActions)
761 ptr->FinalizeSlot(slot);
762 for (auto &ptr : fBookedFilters)
763 ptr->FinaliseSlot(slot);
764 for (auto &ptr : fBookedDefines)
765 ptr->FinaliseSlot(slot);
766}
767
768/// Add RDF nodes that require just-in-time compilation to the computation graph.
769/// This method also clears the contents of GetCodeToJit().
771{
772 // TODO this should be a read lock unless we find GetCodeToJit non-empty
774
775 const std::string code = std::move(GetCodeToJit());
776 if (code.empty()) {
777 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
778 return;
779 }
780
781 TStopwatch s;
782 s.Start();
783 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
784 s.Stop();
785 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
786 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds." : ".");
787}
788
789/// Trigger counting of number of children nodes for each node of the functional graph.
790/// This is done once before starting the event loop. Each action sends an `increase children count` signal
791/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
792/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
793/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
794/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
796{
797 for (auto &actionPtr : fBookedActions)
798 actionPtr->TriggerChildrenCount();
799 for (auto &namedFilterPtr : fBookedNamedFilters)
800 namedFilterPtr->TriggerChildrenCount();
801}
802
803/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
804/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
806{
807 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
808 MaxTreeSizeRAII ctxtmts;
809
810 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
811
812 ThrowIfNSlotsChanged(GetNSlots());
813
814 Jit();
815
816 InitNodes();
817
818 TStopwatch s;
819 s.Start();
820 switch (fLoopType) {
824 case ELoopType::kNoFiles: RunEmptySource(); break;
827 }
828 s.Stop();
829
830 CleanUpNodes();
831
832 fNRuns++;
833
834 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
835 << s.RealTime() << "s elapsed).";
836}
837
838/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
840{
841 return fDefaultColumns;
842}
843
845{
846 return fTree.get();
847}
848
850{
851 fBookedActions.emplace_back(actionPtr);
852 AddSampleCallback(actionPtr, actionPtr->GetSampleCallback());
853}
854
856{
859 fSampleCallbacks.erase(actionPtr);
860}
861
863{
864 fBookedFilters.emplace_back(filterPtr);
865 if (filterPtr->HasName()) {
866 fBookedNamedFilters.emplace_back(filterPtr);
868 }
869}
870
872{
875}
876
878{
879 fBookedRanges.emplace_back(rangePtr);
880}
881
883{
885}
886
888{
889 fBookedDefines.emplace_back(ptr);
890}
891
893{
895 fSampleCallbacks.erase(ptr);
896}
897
899{
900 fBookedVariations.emplace_back(v);
901}
902
904{
906}
907
908// dummy call, end of recursive chain of calls
910{
911 return true;
912}
913
914/// Call `FillReport` on all booked filters
916{
917 for (const auto &fPtr : fBookedNamedFilters)
918 fPtr->FillReport(rep);
919}
920
921void RLoopManager::ToJitExec(const std::string &code) const
922{
924 GetCodeToJit().append(code);
925}
926
927void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
928{
929 if (everyNEvents == 0ull)
930 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
931 else
932 fCallbacks.emplace_back(everyNEvents, std::move(f), fNSlots);
933}
934
935std::vector<std::string> RLoopManager::GetFiltersNames()
936{
937 std::vector<std::string> filters;
938 for (auto &filter : fBookedFilters) {
939 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
940 filters.push_back(name);
941 }
942 return filters;
943}
944
945std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
946{
947 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
948 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
949 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
950 return nodes;
951}
952
953std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
954{
955 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
956 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
957 std::copy(fRunActions.begin(), fRunActions.end(), it);
958 return actions;
959}
960
961std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph()
962{
963 std::string name;
964 if (fDataSource) {
965 name = fDataSource->GetLabel();
966 } else if (fTree) {
967 name = fTree->GetName();
968 } else {
969 name = std::to_string(fNEmptyEntries);
970 }
971
972 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(name);
973 thisNode->SetRoot();
974 thisNode->SetCounter(0);
975 return thisNode;
976}
977
978////////////////////////////////////////////////////////////////////////////
979/// Return all valid TTree::Branch names (caching results for subsequent calls).
980/// Never use fBranchNames directy, always request it through this method.
982{
983 if (fValidBranchNames.empty() && fTree) {
984 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
985 }
986 return fValidBranchNames;
987}
988
989bool RLoopManager::HasDSValuePtrs(const std::string &col) const
990{
991 return fDSValuePtrMap.find(col) != fDSValuePtrMap.end();
992}
993
994void RLoopManager::AddDSValuePtrs(const std::string &col, const std::vector<void *> ptrs)
995{
996 fDSValuePtrMap[col] = ptrs;
997}
998
1000{
1001 if (callback)
1002 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1003}
ROOT::R::TRInterface & r
Definition Object.C:4
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
#define R__ASSERT(e)
Definition TError.h:118
const char * filters[]
char name[80]
Definition TGX11.cxx:110
R__EXTERN TVirtualMutex * gROOTMutex
Definition TROOT.h:63
#define R__LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< std::unique_ptr< TTree > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::vector< RRangeBase * > fBookedRanges
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::vector< RDFInternal::RCallback > fCallbacks
Registered callbacks.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
void SetTree(const std::shared_ptr< TTree > &tree)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:46
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:45
virtual ROOT::RDF::SampleCallback_t GetSampleCallback()=0
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
void ReturnSlot(unsigned int slotNumber)
This type includes all parts of RVariation that do not depend on the callable signature.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
A TTree is a list of TBranches.
Definition TBranch.h:89
TObjArray * GetListOfLeaves()
Definition TBranch.h:243
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TObjArray * GetListOfFiles() const
Definition TChain.h:110
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4832
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5279
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9163
virtual TObjArray * GetListOfBranches()
Definition TTree.h:485
virtual TTree * GetTree() const
Definition TTree.h:514
virtual TList * GetListOfFriends() const
Definition TTree.h:487
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6016
ROOT::Experimental::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:285
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:187
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:329
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::vector< std::string > ColumnNames_t
Definition Utils.hxx:35
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:558
TMatrixT< Element > & Add(TMatrixT< Element > &target, Element scalar, const TMatrixT< Element > &source)
Modify addition: target += scalar * source.
Definition file.py:1
Definition tree.py:1
static const char * what
Definition stlLoader.cc:6
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
RSlotRAII(RSlotStack &slotStack)
RSlotStack & fSlotStack
unsigned int fSlot