Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
20#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
21#include "ROOT/RLogger.hxx"
22#include "ROOT/RNTuple.hxx"
23#include "ROOT/RNTupleDS.hxx"
24#include "RtypesCore.h" // Long64_t
25#include "TStopwatch.h"
26#include "TBranchElement.h"
27#include "TBranchObject.h"
28#include "TChain.h"
29#include "TEntryList.h"
30#include "TFile.h"
31#include "TFriendElement.h"
32#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
33#include "TTreeReader.h"
34#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
35#include "ROOT/InternalTreeUtils.hxx" // GetTopLevelBranchNames
36
37#include "ROOT/RTTreeDS.hxx"
38
39#ifdef R__USE_IMT
42#include "ROOT/RSlotStack.hxx"
43#endif
44
45#ifdef R__UNIX
46// Functions needed to perform EOS XRootD redirection in ChangeSpec
47#include "TEnv.h"
48#include "TSystem.h"
49#ifndef R__FBSD
50#include <sys/xattr.h>
51#else
52#include <sys/extattr.h>
53#endif
54#ifdef R__MACOSX
55/* On macOS getxattr takes two extra arguments that should be set to 0 */
56#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
57#endif
58#ifdef R__FBSD
59#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
60#endif
61#endif
62
63#include <algorithm>
64#include <atomic>
65#include <cassert>
66#include <functional>
67#include <iostream>
68#include <memory>
69#include <stdexcept>
70#include <string>
71#include <sstream>
72#include <thread>
73#include <unordered_map>
74#include <vector>
75#include <set>
76#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
77
78using namespace ROOT::Detail::RDF;
79using namespace ROOT::Internal::RDF;
80
81namespace {
82/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
83/// This allows different RLoopManager instances to share these data.
84/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
85/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
86/// much faster than jitting each RLoopManager's code separately.
87std::string &GetCodeToJit()
88{
89 static std::string code;
90 return code;
91}
92
93bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
94{
95 return (leaves.find(leaf) != leaves.end());
96}
97
98///////////////////////////////////////////////////////////////////////////////
99/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
100/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
101void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
102 const std::string &friendName, bool allowDuplicates)
103{
104 if (!friendName.empty()) {
105 // In case of a friend tree, users might prepend its name/alias to the branch names
106 const auto friendBName = friendName + "." + branchName;
107 if (bNamesReg.insert(friendBName).second)
108 bNames.push_back(friendBName);
109 }
110
111 if (allowDuplicates || friendName.empty()) {
112 if (bNamesReg.insert(branchName).second)
113 bNames.push_back(branchName);
114 }
115}
116
117///////////////////////////////////////////////////////////////////////////////
118/// This overload makes sure that the TLeaf has not been already inserted.
119void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
120 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
121 bool allowDuplicates)
122{
124 if (!canAdd) {
125 return;
126 }
127
129
130 foundLeaves.insert(leaf);
131}
132
133void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
134 std::string prefix, std::string &friendName, bool allowDuplicates)
135{
136 // We want to avoid situations of overlap between the prefix and the
137 // sub-branch name that might happen when the branch is composite, e.g.
138 // prefix=reco_Wdecay2_from_tbar_4vect_NOSYS.fCoordinates.
139 // subBranchName=fCoordinates.fPt
140 // which would lead to a repetition of fCoordinates in the output branch name
141 // Boundary to search for the token before the last dot
142 auto prefixEndingDot = std::string::npos;
143 if (!prefix.empty() && prefix.back() == '.')
144 prefixEndingDot = prefix.size() - 2;
145 std::string lastPrefixToken{};
146 if (auto prefixLastRealDot = prefix.find_last_of('.', prefixEndingDot); prefixLastRealDot != std::string::npos)
148
149 for (auto sb : *b->GetListOfBranches()) {
150 TBranch *subBranch = static_cast<TBranch *>(sb);
151 auto subBranchName = std::string(subBranch->GetName());
152 auto fullName = prefix + subBranchName;
153
154 if (auto subNameFirstDot = subBranchName.find_first_of('.'); subNameFirstDot != std::string::npos) {
155 // Concatenate the prefix to the sub-branch name without overlaps
156 if (!lastPrefixToken.empty() && lastPrefixToken == subBranchName.substr(0, subNameFirstDot))
157 fullName = prefix + subBranchName.substr(subNameFirstDot + 1);
158 }
159
160 std::string newPrefix;
161 if (!prefix.empty())
162 newPrefix = fullName + ".";
163
165
166 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
168 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
172
173 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.GetBranch(subBranchName.c_str()))
175 }
176}
177
178void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
179 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
180{
181 std::set<TLeaf *> foundLeaves;
182 if (!analysedTrees.insert(&t).second) {
183 return;
184 }
185
186 const auto branches = t.GetListOfBranches();
187 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
188 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
189 // operations
190 if (!t.GetTree()) {
191 std::string err("GetBranchNames: error in opening the tree ");
192 err += t.GetName();
193 throw std::runtime_error(err);
194 }
195 if (branches) {
196 for (auto b : *branches) {
197 TBranch *branch = static_cast<TBranch *>(b);
198 const auto branchName = std::string(branch->GetName());
199 if (branch->IsA() == TBranch::Class()) {
200 // Leaf list
201 auto listOfLeaves = branch->GetListOfLeaves();
202 if (listOfLeaves->GetEntriesUnsafe() == 1) {
203 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
205 }
206
207 for (auto leaf : *listOfLeaves) {
208 auto castLeaf = static_cast<TLeaf *>(leaf);
209 const auto leafName = std::string(leaf->GetName());
210 const auto fullName = branchName + "." + leafName;
212 }
213 } else if (branch->IsA() == TBranchObject::Class()) {
214 // TBranchObject
217 } else {
218 // TBranchElement
219 // Check if there is explicit or implicit dot in the name
220
221 bool dotIsImplied = false;
222 auto be = dynamic_cast<TBranchElement *>(b);
223 if (!be)
224 throw std::runtime_error("GetBranchNames: unsupported branch type");
225 // TClonesArray (3) and STL collection (4)
226 if (be->GetType() == 3 || be->GetType() == 4)
227 dotIsImplied = true;
228
229 if (dotIsImplied || branchName.back() == '.')
231 else
233
235 }
236 }
237 }
238
239 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
240 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
241 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
242 auto friendTrees = t.GetTree()->GetListOfFriends();
243
244 if (!friendTrees)
245 return;
246
247 for (auto friendTreeObj : *friendTrees) {
248 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
249
250 std::string frName;
252 if (alias != nullptr)
253 frName = std::string(alias);
254 else
255 frName = std::string(friendTree->GetName());
256
258 }
259}
260
261void ThrowIfNSlotsChanged(unsigned int nSlots)
262{
264 if (currentSlots != nSlots) {
265 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
266 std::to_string(nSlots) + ", but when starting the event loop it was " +
267 std::to_string(currentSlots) + ".";
268 if (currentSlots > nSlots)
269 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
270 else
271 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
272 throw std::runtime_error(msg);
273 }
274}
275
276/**
277\struct MaxTreeSizeRAII
278\brief Scope-bound change of `TTree::fgMaxTreeSize`.
279
280This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
281changes it to maximum at construction time and restores it back at destruction
282time. Needed for issue #6523 and should be reverted when #6640 will be solved.
283*/
284struct MaxTreeSizeRAII {
285 Long64_t fOldMaxTreeSize;
286
287 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
288 {
289 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
290 }
291
292 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
293};
294
295struct DatasetLogInfo {
296 std::string fDataSet;
297 ULong64_t fRangeStart;
298 ULong64_t fRangeEnd;
299 unsigned int fSlot;
300};
301
302std::string LogRangeProcessing(const DatasetLogInfo &info)
303{
304 std::stringstream msg;
305 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
306 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
307 return msg.str();
308}
309
310auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
311{
312 // We use a combination of column name and column type name as the key because in some cases we might end up
313 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
314 // df.Sum<vector<int>>("stdVectorBranch");
315 // df.Sum<RVecI>("stdVectorBranch");
316 return std::string(colName) + ':' + ti.name();
317}
318} // anonymous namespace
319
320namespace ROOT {
321namespace Detail {
322namespace RDF {
323
324/// A RAII object that calls RLoopManager::CleanUpTask at destruction
336
337} // namespace RDF
338} // namespace Detail
339} // namespace ROOT
340
341///////////////////////////////////////////////////////////////////////////////
342/// Get all the branches names, including the ones of the friend trees
344{
345 std::set<std::string> bNamesSet;
347 std::set<TTree *> analysedTrees;
348 std::string emptyFrName = "";
350 return bNames;
351}
352
353ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
354 : fDefaultColumns(defaultColumns),
355 fNSlots(RDFInternal::GetNSlots()),
356 fNewSampleNotifier(fNSlots),
357 fSampleInfos(fNSlots),
358 fDatasetColumnReaders(fNSlots)
359{
360}
361
363 : fDefaultColumns(defaultBranches),
364 fNSlots(RDFInternal::GetNSlots()),
365 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
366 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
367 fNewSampleNotifier(fNSlots),
368 fSampleInfos(fNSlots),
369 fDatasetColumnReaders(fNSlots)
370{
371 fDataSource->SetNSlots(fNSlots);
372}
373
375 : fEmptyEntryRange(0, nEmptyEntries),
376 fNSlots(RDFInternal::GetNSlots()),
377 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
378 fNewSampleNotifier(fNSlots),
379 fSampleInfos(fNSlots),
380 fDatasetColumnReaders(fNSlots)
381{
382}
383
384RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
385 : fDefaultColumns(defaultBranches),
386 fNSlots(RDFInternal::GetNSlots()),
387 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
388 fDataSource(std::move(ds)),
389 fNewSampleNotifier(fNSlots),
390 fSampleInfos(fNSlots),
391 fDatasetColumnReaders(fNSlots)
392{
393 fDataSource->SetNSlots(fNSlots);
394}
395
397 : fNSlots(RDFInternal::GetNSlots()),
398 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
399 fNewSampleNotifier(fNSlots),
400 fSampleInfos(fNSlots),
401 fDatasetColumnReaders(fNSlots)
402{
403 ChangeSpec(std::move(spec));
404}
405
406#ifdef R__UNIX
407namespace {
408std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
409{
410 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
411 // If so, we create a redirected sample ID with the full xroot URL.
412 TString expandedUrl(path.data());
414 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
415 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
416 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
417 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
418 if (len > 0) {
419 std::string xurl(len, 0);
420 std::string fileNameFromUrl{fileurl.GetFile()};
421 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
422 // Sometimes the `getxattr` call may return an invalid URL due
423 // to the POSIX attribute not being yet completely filled by EOS.
424 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
425 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
426 return xurl + '/' + datasetName.data();
427 }
428 }
429 }
430 }
431 }
432
433 return std::nullopt;
434}
435} // namespace
436#endif
437
438/**
439 * @brief Changes the internal TTree held by the RLoopManager.
440 *
441 * @warning This method may lead to potentially dangerous interactions if used
442 * after the construction of the RDataFrame. Changing the specification makes
443 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
444 * new specification refers to exactly the same number of columns, with the
445 * same names and types. The actual use case of this method is moving the
446 * processing of the same RDataFrame to a different range of entries of the
447 * same dataset (which may be stored in a different set of files).
448 *
449 * @param spec The specification of the dataset to be adopted.
450 */
452{
453 // Change the range of entries to be processed
454 fBeginEntry = spec.GetEntryRangeBegin();
455 fEndEntry = spec.GetEntryRangeEnd();
456
457 // Store the samples
458 fSamples = spec.MoveOutSamples();
459 fSampleMap.clear();
460
461 // Create the internal main chain
463 for (auto &sample : fSamples) {
464 const auto &trees = sample.GetTreeNames();
465 const auto &files = sample.GetFileNameGlobs();
466 for (std::size_t i = 0ul; i < files.size(); ++i) {
467 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
468 // (see https://github.com/root-project/root/pull/8820 for why)
469 const auto fullpath = files[i] + "?#" + trees[i];
470 chain->Add(fullpath.c_str());
471 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
472 // change this easily because of backward compatibility: the sample ID
473 // is exposed to users via RSampleInfo and DefinePerSample).
474 const auto sampleId = files[i] + '/' + trees[i];
475 fSampleMap.insert({sampleId, &sample});
476#ifdef R__UNIX
477 // Also add redirected EOS xroot URL when available
479 fSampleMap.insert({redirectedSampleId.value(), &sample});
480#endif
481 }
482 }
483 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
484 fDataSource->SetNSlots(fNSlots);
485 for (unsigned int slot{}; slot < fNSlots; slot++) {
486 for (auto &v : fDatasetColumnReaders[slot])
487 v.second.reset();
488 }
489}
490
491/// Run event loop with no source files, in parallel.
493{
494#ifdef R__USE_IMT
495 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack = SlotStack();
496 // Working with an empty tree.
497 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
498 const auto nEmptyEntries = GetNEmptyEntries();
499 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
500 auto remainder = nEmptyEntries % (fNSlots * 2);
501 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
502 ULong64_t begin = fEmptyEntryRange.first;
503 while (begin < fEmptyEntryRange.second) {
504 ULong64_t end = begin + nEntriesPerSlot;
505 if (remainder > 0) {
506 ++end;
507 --remainder;
508 }
509 entryRanges.emplace_back(begin, end);
510 begin = end;
511 }
512
513 // Each task will generate a subrange of entries
514 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
516 auto slot = slotRAII.fSlot;
517 RCallCleanUpTask cleanup(*this, slot);
518 InitNodeSlots(nullptr, slot);
519 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
520 try {
522 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
524 }
525 } catch (...) {
526 // Error might throw in experiment frameworks like CMSSW
527 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
528 throw;
529 }
530 };
531
533 pool.Foreach(genFunction, entryRanges);
534
535#endif // not implemented otherwise
536}
537
538/// Run event loop with no source files, in sequence.
540{
541 InitNodeSlots(nullptr, 0);
543 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
544 RCallCleanUpTask cleanup(*this);
545 try {
550 }
551 } catch (...) {
552 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
553 throw;
554 }
555}
556
557#ifdef R__USE_IMT
558namespace {
559/// Return true on succesful entry read.
560///
561/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
562/// there can be other situations where the read is still valid. For now, these
563/// are:
564/// - If there was no match of the current entry in one or more friend trees
565/// according to their respective indexes.
566/// - If there was a missing branch at the start of a new tree in the dataset.
567///
568/// In such situations, although the entry is not complete, the processing
569/// should not be aborted and nodes of the computation graph will take action
570/// accordingly.
572{
573 treeReader.Next();
574 switch (treeReader.GetEntryStatus()) {
575 case TTreeReader::kEntryValid: return true;
576 case TTreeReader::kIndexedFriendNoMatch: return true;
578 default: return false;
579 }
580}
581} // namespace
582#endif
583
584namespace {
585struct DSRunRAII {
587 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
588 {
590 }
591 ~DSRunRAII() { fDS.Finalize(); }
592};
593} // namespace
594
597 unsigned int fSlot;
600 TTreeReader *treeReader = nullptr)
601 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
602 {
603 fLM.InitNodeSlots(fTreeReader, fSlot);
604 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
605 }
607};
608
609/// Run event loop over data accessed through a DataSource, in sequence.
611{
612 assert(fDataSource != nullptr);
613 // Shortcut if the entry range would result in not reading anything
614 if (fBeginEntry == fEndEntry)
615 return;
616 // Apply global entry range if necessary
617 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
618 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
619 // Initialize data source and book finalization
621 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
622 // sources that need it (currently only TTree).
623 RCallCleanUpTask cleanup(*this);
624
625 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
626 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
627 // TTree).
628 std::uint64_t processedEntries{};
629 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
630 do {
631
633
634 ranges = fDataSource->GetEntryRanges();
635
637
638 try {
639 for (const auto &range : ranges) {
640 const auto start = range.first;
641 const auto end = range.second;
642 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
643 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
644 if (fDataSource->SetEntry(0u, entry)) {
646 }
648 }
649 }
650 } catch (...) {
651 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
652 throw;
653 }
654
655 } while (!ranges.empty() && fNStopsReceived < fNChildren);
656
658
659 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
660 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
661 std::ostringstream buf{};
662 buf << "RDataFrame stopped processing after ";
663 buf << processedEntries;
664 buf << " entries, whereas an entry range (begin=";
665 buf << fBeginEntry;
666 buf << ",end=";
667 buf << fEndEntry;
668 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
669 buf << (fBeginEntry + processedEntries);
670 buf << ".";
671 Warning("RDataFrame::Run", "%s", buf.str().c_str());
672 }
673}
674
675/// Run event loop over data accessed through a DataSource, in parallel.
677{
678#ifdef R__USE_IMT
679 assert(fDataSource != nullptr);
680 // Shortcut if the entry range would result in not reading anything
681 if (fBeginEntry == fEndEntry)
682 return;
683 // Apply global entry range if necessary
684 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
685 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
686
688
690
691#endif // not implemented otherwise (never called)
692}
693
694/// Execute actions and make sure named filters are called for each event.
695/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
697{
698 // data-block callbacks run before the rest of the graph
700 for (auto &callback : fSampleCallbacks)
701 callback.second(slot, fSampleInfos[slot]);
703 }
704
705 for (auto *actionPtr : fBookedActions)
706 actionPtr->Run(slot, entry);
708 namedFilterPtr->CheckFilters(slot, entry);
709 for (auto &callback : fCallbacksEveryNEvents)
710 callback(slot);
711}
712
713/// Build TTreeReaderValues for all nodes
714/// This method loops over all filters, actions and other booked objects and
715/// calls their `InitSlot` method, to get them ready for running a task.
717{
719 for (auto *ptr : fBookedActions)
720 ptr->InitSlot(r, slot);
721 for (auto *ptr : fBookedFilters)
722 ptr->InitSlot(r, slot);
723 for (auto *ptr : fBookedDefines)
724 ptr->InitSlot(r, slot);
725 for (auto *ptr : fBookedVariations)
726 ptr->InitSlot(r, slot);
727
728 for (auto &callback : fCallbacksOnce)
729 callback(slot);
730}
731
733 if (r != nullptr) {
734 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
735 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
736 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
737 }
738 // Whatever the data source, initially set the "new data block" flag:
739 // - for TChains, this ensures that we don't skip the first data block because
740 // the correct tree is already loaded
741 // - for RDataSources and empty sources, which currently don't have data blocks, this
742 // ensures that we run once per task
744}
745
746void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
748 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
749}
750
752 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
753 auto *tree = r.GetTree()->GetTree();
754 R__ASSERT(tree != nullptr);
755 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
756 auto *file = tree->GetCurrentFile();
757 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
758
759 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
760 R__ASSERT(range.first >= 0);
761 if (range.second == -1) {
762 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
763 }
764 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
765 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
766 if (fSampleMap.empty()) {
768 } else {
769 if (fSampleMap.find(id) == fSampleMap.end())
770 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
772 }
773}
774
775/// Create a slot stack with the desired number of slots or reuse a shared instance.
776/// When a LoopManager runs in isolation, it will create its own slot stack from the
777/// number of slots. When it runs as part of RunGraphs(), each loop manager will be
778/// assigned a shared slot stack, so dataframe helpers can be shared in a thread-safe
779/// manner.
780std::shared_ptr<ROOT::Internal::RSlotStack> RLoopManager::SlotStack() const
781{
782#ifdef R__USE_IMT
783 if (auto shared = fSlotStack.lock(); shared) {
784 return shared;
785 } else {
786 return std::make_shared<ROOT::Internal::RSlotStack>(fNSlots);
787 }
788#else
789 return nullptr;
790#endif
791}
792
793/// Initialize all nodes of the functional graph before running the event loop.
794/// This method is called once per event-loop and performs generic initialization
795/// operations that do not depend on the specific processing slot (i.e. operations
796/// that are common for all threads).
798{
800 for (auto *filter : fBookedFilters)
801 filter->InitNode();
802 for (auto *range : fBookedRanges)
803 range->InitNode();
804 for (auto *ptr : fBookedActions)
805 ptr->Initialize();
806}
807
808/// Perform clean-up operations. To be called at the end of each event loop.
810{
811 fMustRunNamedFilters = false;
812
813 // forget RActions and detach TResultProxies
814 for (auto *ptr : fBookedActions)
815 ptr->Finalize();
816
817 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
818 fBookedActions.clear();
819
820 // reset children counts
821 fNChildren = 0;
822 fNStopsReceived = 0;
823 for (auto *ptr : fBookedFilters)
824 ptr->ResetChildrenCount();
825 for (auto *ptr : fBookedRanges)
826 ptr->ResetChildrenCount();
827
829 fCallbacksOnce.clear();
830}
831
832/// Perform clean-up operations. To be called at the end of each task execution.
834{
835 if (r != nullptr)
836 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
837 for (auto *ptr : fBookedActions)
838 ptr->FinalizeSlot(slot);
839 for (auto *ptr : fBookedFilters)
840 ptr->FinalizeSlot(slot);
841 for (auto *ptr : fBookedDefines)
842 ptr->FinalizeSlot(slot);
843
844 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
845 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
846 // because the TTreeReader object changes at every task
847 for (auto &v : fDatasetColumnReaders[slot])
848 v.second.reset();
849 }
850}
851
852/// Add RDF nodes that require just-in-time compilation to the computation graph.
853/// This method also clears the contents of GetCodeToJit().
855{
856 {
858 if (GetCodeToJit().empty()) {
859 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
860 return;
861 }
862 }
863
864 const std::string code = []() {
866 return std::move(GetCodeToJit());
867 }();
868
869 TStopwatch s;
870 s.Start();
871 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
872 s.Stop();
873 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
874 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
875 : " in less than 1ms.");
876}
877
878/// Trigger counting of number of children nodes for each node of the functional graph.
879/// This is done once before starting the event loop. Each action sends an `increase children count` signal
880/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
881/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
882/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
883/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
885{
886 for (auto *actionPtr : fBookedActions)
887 actionPtr->TriggerChildrenCount();
889 namedFilterPtr->TriggerChildrenCount();
890}
891
892/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
893/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
894/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
896{
897 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
898 MaxTreeSizeRAII ctxtmts;
899
900 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
901
903
904 if (jit)
905 Jit();
906
907 InitNodes();
908
909 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
910 // we use RAII: even in case of an exception, the destructor of the object is invoked and
911 // all the cleanup takes place.
912 class NodesCleanerRAII {
914
915 public:
917 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
918 };
919
921
922 TStopwatch s;
923 s.Start();
924
925 switch (fLoopType) {
927 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
928 break;
931 case ELoopType::kNoFiles: RunEmptySource(); break;
933 }
934 s.Stop();
935
936 fNRuns++;
937
938 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
939 << s.RealTime() << "s elapsed).";
940}
941
942/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
947
949{
950 // This is currently called in SnapshotTTreeHelper[MT] to retrieve the task-local input TTree. It is not guaranteed
951 // that the same RLoopManager will always have the same input TTree for its entire lifetime, notably it could be
952 // changed by ChangeSpec when moving to a different entry range.
953 if (auto *treeDS = dynamic_cast<ROOT::Internal::RDF::RTTreeDS *>(fDataSource.get())) {
954 return treeDS->GetTree();
955 }
956 return nullptr;
957}
958
964
971
973{
974 fBookedFilters.emplace_back(filterPtr);
975 if (filterPtr->HasName()) {
976 fBookedNamedFilters.emplace_back(filterPtr);
978 }
979}
980
986
991
996
998{
999 fBookedDefines.emplace_back(ptr);
1000}
1001
1007
1012
1017
1018// dummy call, end of recursive chain of calls
1020{
1021 return true;
1022}
1023
1024/// Call `FillReport` on all booked filters
1026{
1027 for (const auto *fPtr : fBookedNamedFilters)
1028 fPtr->FillReport(rep);
1029}
1030
1031void RLoopManager::ToJitExec(const std::string &code) const
1032{
1034 GetCodeToJit().append(code);
1035}
1036
1037void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1038{
1039 if (everyNEvents == 0ull)
1040 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1041 else
1042 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1043}
1044
1045std::vector<std::string> RLoopManager::GetFiltersNames()
1046{
1047 std::vector<std::string> filters;
1048 for (auto *filter : fBookedFilters) {
1049 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1050 filters.push_back(name);
1051 }
1052 return filters;
1053}
1054
1055std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1056{
1057 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1058 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1059 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1060 return nodes;
1061}
1062
1063std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1064{
1065 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1066 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1067 std::copy(fRunActions.begin(), fRunActions.end(), it);
1068 return actions;
1069}
1070
1071std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1072 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1073{
1074 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1075 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1077 return duplicateRLoopManagerIt->second;
1078
1079 std::string name;
1080 if (fDataSource) {
1081 name = fDataSource->GetLabel();
1082 } else {
1083 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1084 }
1085 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1087 visitedMap[(void *)this] = thisNode;
1088 return thisNode;
1089}
1090
1091////////////////////////////////////////////////////////////////////////////
1092/// Return all valid TTree::Branch names (caching results for subsequent calls).
1093/// Never use fBranchNames directy, always request it through this method.
1095{
1096 if (fValidBranchNames.empty() && fDataSource) {
1097 fValidBranchNames = fDataSource->GetColumnNames();
1098 }
1099 return fValidBranchNames;
1100}
1101
1102/// Return true if AddDataSourceColumnReaders was called for column name col.
1103bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
1104{
1105 const auto key = MakeDatasetColReadersKey(col, ti);
1106 assert(fDataSource != nullptr);
1107 // since data source column readers are always added for all slots at the same time,
1108 // if the reader is present for slot 0 we have it for all other slots as well.
1109 auto it = fDatasetColumnReaders[0].find(key);
1110 return (it != fDatasetColumnReaders[0].end() && it->second);
1111}
1112
1114 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1115 const std::type_info &ti)
1116{
1117 const auto key = MakeDatasetColReadersKey(col, ti);
1118 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1119 assert(readers.size() == fNSlots);
1120
1121 for (auto slot = 0u; slot < fNSlots; ++slot) {
1122 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1123 }
1124}
1125
1126// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1127/// \brief Register a new RTreeColumnReader with this RLoopManager.
1128/// \return A shared pointer to the inserted column reader.
1130 std::unique_ptr<RColumnReaderBase> &&reader,
1131 const std::type_info &ti)
1132{
1134 const auto key = MakeDatasetColReadersKey(col, ti);
1135 // if a reader for this column and this slot was already there, we are doing something wrong
1136 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1137 auto *rptr = reader.get();
1138 readers[key] = std::move(reader);
1139 return rptr;
1140}
1141
1143 const std::type_info &ti, TTreeReader *treeReader)
1144{
1146 const auto key = MakeDatasetColReadersKey(col, ti);
1147 // if a reader for this column and this slot was already there, we are doing something wrong
1148 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1149 assert(fDataSource && "Missing RDataSource to add column reader.");
1150
1152
1153 return readers[key].get();
1154}
1155
1157RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1158{
1159 const auto key = MakeDatasetColReadersKey(col, ti);
1160 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1161 return it->second.get();
1162 else
1163 return nullptr;
1164}
1165
1167{
1168 if (callback)
1169 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1170}
1171
1172void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1173{
1174 fEmptyEntryRange = std::move(newRange);
1175}
1176
1178{
1179 fBeginEntry = begin;
1180 fEndEntry = end;
1181}
1182
1184{
1185 fTTreeLifeline = std::move(lifeline);
1186}
1187
1188/**
1189 * \brief Helper function to open a file (or the first file from a glob).
1190 * This function is used at construction time of an RDataFrame, to check the
1191 * concrete type of the dataset stored inside the file.
1192 */
1193std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
1194{
1195 // Follow same logic in TChain::Add to find the correct string to look for globbing:
1196 // - If the extension ".root" is present in the file name, pass along the basename.
1197 // - If not, use the "?" token to delimit the part of the string which represents the basename.
1198 // - Otherwise, pass the full filename.
1199 auto &&baseNameAndQuery = [&fileNameGlob]() {
1200 constexpr std::string_view delim{".root"};
1201 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
1202 it != fileNameGlob.end()) {
1203 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
1204 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
1205 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
1206 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
1207 else
1208 return std::make_pair(fileNameGlob, std::string_view{});
1209 }();
1210 // Captured structured bindings variable are only valid since C++20
1211 auto &&baseName = baseNameAndQuery.first;
1212 auto &&query = baseNameAndQuery.second;
1213
1214 const auto nameHasWildcard = [&baseName]() {
1215 constexpr std::array<char, 4> wildCards{'[', ']', '*', '?'}; // Wildcards accepted by TChain::Add
1216 return std::any_of(wildCards.begin(), wildCards.end(),
1217 [&baseName](auto &&wc) { return baseName.find(wc) != std::string_view::npos; });
1218 }();
1219
1220 // Open first file in case of glob, suppose all files in the glob use the same data format
1221 std::string fileToOpen{nameHasWildcard
1222 ? ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName})[0] + std::string{query}
1223 : fileNameGlob};
1224
1225 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
1226 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
1227 if (!inFile || inFile->IsZombie())
1228 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
1229
1230 return inFile;
1231}
1232
1233std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1236{
1237 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1238 // Creating an RDataFrame with a non-existing file will throw early rather
1239 // than wait for the start of the graph execution.
1240 if (checkFile) {
1242 }
1243
1244 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1245 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1246 return lm;
1247}
1248
1249std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1250ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1251 const std::vector<std::string> &defaultColumns, bool checkFile)
1252{
1253 if (fileNameGlobs.size() == 0)
1254 throw std::invalid_argument("RDataFrame: empty list of input files.");
1255 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1256 // Creating an RDataFrame with a non-existing file will throw early rather
1257 // than wait for the start of the graph execution.
1258 if (checkFile) {
1260 }
1261 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1262 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1263 return lm;
1264}
1265
1266std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1269{
1270 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1271 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1272 return lm;
1273}
1274
1275std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1276ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1278{
1279 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1280 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1281 return lm;
1282}
1283
1284std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1287{
1288
1290
1291 if (inFile->Get<TTree>(datasetName.data())) {
1292 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1293 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1295 }
1296
1297 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1298 "\" in file \"" + inFile->GetName() + "\".");
1299}
1300
1301std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1302ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1304{
1305
1306 if (fileNameGlobs.size() == 0)
1307 throw std::invalid_argument("RDataFrame: empty list of input files.");
1308
1310
1311 if (inFile->Get<TTree>(datasetName.data())) {
1312 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1313 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1315 }
1316
1317 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1318 "\" in file \"" + inFile->GetName() + "\".");
1319}
1320
1321// outlined to pin virtual table
1323
1324void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1325{
1326 if (dataSource) {
1327 fDataSource = std::move(dataSource);
1328 fDataSource->SetNSlots(fNSlots);
1329 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1330 }
1331}
1332
1333void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1335 std::atomic<ULong64_t> &entryCount)
1336{
1337#ifdef R__USE_IMT
1339 const auto &slot = slotRAII.fSlot;
1340
1341 const auto &[start, end] = entryRange;
1342 const auto nEntries = end - start;
1343 entryCount.fetch_add(nEntries);
1344
1345 RCallCleanUpTask cleanup(*this, slot);
1346 RDSRangeRAII _{*this, slot, start};
1347
1348 fSampleInfos[slot] = ROOT::Internal::RDF::CreateSampleInfo(*fDataSource, slot, fSampleMap);
1349
1350 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1351
1352 try {
1353 for (auto entry = start; entry < end; ++entry) {
1354 if (fDataSource->SetEntry(slot, entry)) {
1355 RunAndCheckFilters(slot, entry);
1356 }
1357 }
1358 } catch (...) {
1359 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1360 throw;
1361 }
1362 fDataSource->FinalizeSlot(slot);
1363#else
1364 (void)entryRange;
1365 (void)slotStack;
1366 (void)entryCount;
1367#endif
1368}
1369
1371 std::atomic<ULong64_t> &entryCount)
1372{
1373#ifdef R__USE_IMT
1375 const auto &slot = slotRAII.fSlot;
1376
1377 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1378 const auto &[start, end] = entryRange;
1379 const auto nEntries = end - start;
1380 auto count = entryCount.fetch_add(nEntries);
1381
1382 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1383 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1384
1386 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1387 try {
1388 // recursive call to check filters and conditionally execute actions
1390 if (fNewSampleNotifier.CheckFlag(slot)) {
1391 UpdateSampleInfo(slot, treeReader);
1392 }
1393 RunAndCheckFilters(slot, count++);
1394 }
1395 } catch (...) {
1396 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1397 throw;
1398 }
1399 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1400 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1401 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1402 // something went wrong in the TTreeReader event loop
1403 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1404 std::to_string(treeReader.GetEntryStatus()));
1405 }
1406#else
1407 (void)treeReader;
1408 (void)slotStack;
1409 (void)entryCount;
1410#endif
1411}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, std::string_view col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:65
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A Branch for the case of an object.
static TClass * Class()
A TTree is a list of TBranches.
Definition TBranch.h:93
static TClass * Class()
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4112
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:139
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1287
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:84
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4856
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5309
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9319
virtual TObjArray * GetListOfBranches()
Definition TTree.h:540
virtual TTree * GetTree() const
Definition TTree.h:569
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6052
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:41
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:587
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:580
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:311
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:569
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:200
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:598
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:355
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:592
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:595
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.