52#include <sys/extattr.h> 
   56#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0) 
   59#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size) 
   73#include <unordered_map> 
   89   static std::string code;
 
   97      std::string 
msg = 
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
 
   98                        std::to_string(
nSlots) + 
", but when starting the event loop it was " +
 
  101         msg += 
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
 
  103         msg += 
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
 
  104      throw std::runtime_error(
msg);
 
  116struct MaxTreeSizeRAII {
 
  119   MaxTreeSizeRAII() : fOldMaxTreeSize(
TTree::GetMaxTreeSize())
 
  127struct DatasetLogInfo {
 
  128   std::string fDataSet;
 
  136   std::stringstream 
msg;
 
  137   msg << 
"Processing " << 
info.fDataSet << 
": entry range [" << 
info.fRangeStart << 
"," << 
info.fRangeEnd - 1
 
  138       << 
"], using slot " << 
info.fSlot << 
" in thread " << std::this_thread::get_id() << 
'.';
 
  148   return std::string(
colName) + 
':' + 
ti.name();
 
  164      constexpr std::string_view 
delim{
".root"};
 
  172         return std::make_pair(
fileNameGlob, std::string_view{});
 
  179   if (
baseName.find_first_of(
"[]*?") != std::string_view::npos) { 
 
  181      if (expanded.empty())
 
  182         throw std::invalid_argument{
"RDataFrame: The glob expression '" + std::string{
baseName} +
 
  183                                     "' did not match any files."};
 
  191      throw std::invalid_argument(
"RDataFrame: could not open file \"" + 
fileToOpen + 
"\".");
 
 
  220     fNewSampleNotifier(fNSlots),
 
  221     fSampleInfos(fNSlots),
 
  222     fDatasetColumnReaders(fNSlots)
 
  231     fNewSampleNotifier(fNSlots),
 
  232     fSampleInfos(fNSlots),
 
  233     fDatasetColumnReaders(fNSlots)
 
 
  242     fNewSampleNotifier(fNSlots),
 
  243     fSampleInfos(fNSlots),
 
  244     fDatasetColumnReaders(fNSlots)
 
 
  252     fDataSource(std::
move(
ds)),
 
  253     fNewSampleNotifier(fNSlots),
 
  254     fSampleInfos(fNSlots),
 
  255     fDatasetColumnReaders(fNSlots)
 
 
  263     fNewSampleNotifier(fNSlots),
 
  264     fSampleInfos(fNSlots),
 
  265     fDatasetColumnReaders(fNSlots)
 
 
  278   if (
gEnv->
GetValue(
"TFile.CrossProtocolRedirects", 1) == 1) {
 
  341            for (std::size_t i = 0
ul; i < 
files.size(); ++i) {
 
  358         fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), 
spec.GetFriendInfo());
 
  367            for (std::size_t i = 0
ul; i < 
files.size(); ++i) {
 
  385            throw std::runtime_error(
 
  386               "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
 
  390      fDataSource->SetNSlots(fNSlots);
 
  393         for (
auto &
v : fDatasetColumnReaders[
slot])
 
  397      throw std::invalid_argument(
 
  398         "RDataFrame: unsupported data format for dataset. Make sure you use TTree or RNTuple.");
 
 
  412   std::vector<std::pair<ULong64_t, ULong64_t>> 
entryRanges;
 
  438         std::cerr << 
"RDataFrame::Run: event loop was interrupted\n";
 
 
  463      std::cerr << 
"RDataFrame::Run: event loop was interrupted\n";
 
 
  489   default: 
return false;
 
  540   std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
 
  550         for (
const auto &
range : ranges) {
 
  551            const auto start = 
range.first;
 
  552            const auto end = 
range.second;
 
  562         std::cerr << 
"RDataFrame::Run: event loop was interrupted\n";
 
  570   if (
fEndEntry != std::numeric_limits<Long64_t>::max() &&
 
  572      std::ostringstream buf{};
 
  573      buf << 
"RDataFrame stopped processing after ";
 
  575      buf << 
" entries, whereas an entry range (begin=";
 
  579      buf << 
") was requested. Consider adjusting the end value of the entry range to a maximum of ";
 
  582      Warning(
"RDataFrame::Run", 
"%s", buf.str().c_str());
 
 
  631      ptr->InitSlot(
r, 
slot);
 
  633      ptr->InitSlot(
r, 
slot);
 
  635      ptr->InitSlot(
r, 
slot);
 
  637      ptr->InitSlot(
r, 
slot);
 
 
  659      "Empty source, range: {" + std::to_string(
range.first) + 
", " + std::to_string(
range.second) + 
"}", 
range);
 
 
  664   auto *tree = 
r.GetTree()->GetTree();
 
  667   auto *file = tree->GetCurrentFile();
 
  668   const std::string 
fname = file != 
nullptr ? file->GetName() : 
"#inmemorytree#";
 
  670   std::pair<Long64_t, Long64_t> 
range = 
r.GetEntriesRange();
 
  672   if (
range.second == -1) {
 
  673      range.second = tree->GetEntries(); 
 
  681         throw std::runtime_error(
"Full sample identifier '" + 
id + 
"' cannot be found in the available samples.");
 
 
  697      return std::make_shared<ROOT::Internal::RSlotStack>(
fNSlots);
 
 
  735      ptr->ResetChildrenCount();
 
  737      ptr->ResetChildrenCount();
 
 
  749      ptr->FinalizeSlot(
slot);
 
  751      ptr->FinalizeSlot(
slot);
 
  753      ptr->FinalizeSlot(
slot);
 
 
  775   const std::string code = []() {
 
  786                                                        : 
" in less than 1ms.");
 
 
  838      throw std::runtime_error(
"RDataFrame: executing the computation graph without a data source, aborting.");
 
 
  928      fPtr->FillReport(
rep);
 
 
  947   std::vector<std::string> 
filters;
 
  949      auto name = (filter->HasName() ? filter->GetName() : 
"Unnamed Filter");
 
 
  972   std::unordered_map<
void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &
visitedMap)
 
  985   auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
 
 
 1003                                              std::vector<std::unique_ptr<RColumnReaderBase>> &&
readers,
 
 1004                                              const std::type_info &
ti)
 
 
 1034      return it->second.get();
 
 
 1058   fTTreeLifeline = std::move(
lifeline);
 
 
 1061std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
 
 1077std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
 
 1082      throw std::invalid_argument(
"RDataFrame: empty list of input files.");
 
 
 1094std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
 
 1103std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
 
 1112std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
 
 1125   throw std::invalid_argument(
"RDataFrame: unsupported data format for dataset \"" + std::string(
datasetName) +
 
 1126                               "\" in file \"" + 
inFile->GetName() + 
"\".");
 
 
 1129std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
 
 1135      throw std::invalid_argument(
"RDataFrame: empty list of input files.");
 
 1145   throw std::invalid_argument(
"RDataFrame: unsupported data format for dataset \"" + std::string(
datasetName) +
 
 1146                               "\" in file \"" + 
inFile->GetName() + 
"\".");
 
 
 1156      fDataSource->SetNSlots(fNSlots);
 
 
 1182         if (fDataSource->SetEntry(
slot, 
entry)) {
 
 1187      std::cerr << 
"RDataFrame::Run: event loop was interrupted\n";
 
 1190   fDataSource->FinalizeSlot(
slot);
 
 
 1218         if (fNewSampleNotifier.CheckFlag(
slot)) {
 
 1221         RunAndCheckFilters(
slot, count++);
 
 1224      std::cerr << 
"RDataFrame::Run: event loop was interrupted\n";
 
 1231      throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
 
 1232                               std::to_string(
treeReader.GetEntryStatus()));
 
 
#define R__LOG_DEBUG(DEBUGLEVEL,...)
 
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
 
Basic types used by ROOT and required by TInterpreter.
 
long long Long64_t
Portable signed long integer 8 bytes.
 
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
 
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
 
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
 
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
 
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
 
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
 
R__EXTERN TSystem * gSystem
 
#define R__WRITE_LOCKGUARD(mutex)
 
#define R__READ_LOCKGUARD(mutex)
 
The head node of a RDF computation graph.
 
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
 
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
 
unsigned int fNRuns
Number of event loops run.
 
bool CheckFilters(unsigned int, Long64_t) final
 
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
 
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
 
void RunEmptySource()
Run event loop with no source files, in sequence.
 
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
 
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
 
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
 
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
 
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
 
ULong64_t GetNEmptyEntries() const
 
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
 
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
 
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
 
void ToJitExec(const std::string &) const
 
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
 
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
 
std::set< std::string > fSuppressErrorsForMissingBranches
 
bool fMustRunNamedFilters
 
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
 
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
 
std::vector< RDefineBase * > fBookedDefines
 
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
 
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
 
RLoopManager(const ColumnNames_t &defaultColumns={})
 
std::vector< RRangeBase * > fBookedRanges
 
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
 
std::vector< std::string > ColumnNames_t
 
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
 
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
 
std::vector< RFilterBase * > fBookedFilters
 
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
 
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
 
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
 
ColumnNames_t fDefaultColumns
 
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
 
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
 
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
 
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
 
void Register(RDFInternal::RActionBase *actionPtr)
 
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
 
std::vector< RDFInternal::RVariationBase * > fBookedVariations
 
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
 
RDataSource * GetDataSource() const
 
unsigned int GetNSlots() const
 
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
 
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
 
RDFInternal::RNewSampleNotifier fNewSampleNotifier
 
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
 
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
 
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
 
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
 
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
 
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
 
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
 
void SetTTreeLifeline(std::any lifeline)
 
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
 
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
 
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
 
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
 
void Deregister(RDFInternal::RActionBase *actionPtr)
 
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
 
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
 
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
 
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
 
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
 
void SetFlag(unsigned int slot)
 
bool CheckFlag(unsigned int slot) const
 
void UnsetFlag(unsigned int slot)
 
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
 
This type includes all parts of RVariation that do not depend on the callable signature.
 
A thread-safe list of N indexes (0 to size - 1).
 
The dataset specification for RDataFrame.
 
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
 
virtual void Finalize()
Convenience method called after concluding an event-loop.
 
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
 
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
 
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
 
Representation of an RNTuple data set in a ROOT file.
 
const_iterator begin() const
 
const_iterator end() const
 
This class provides a simple interface to execute the same task multiple times in parallel threads,...
 
TDirectory::TContext keeps track and restore the current directory.
 
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
 
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
 
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
 
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
 
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
 
void Stop()
Stop the stopwatch.
 
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
 
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
 
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
 
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
 
@ kEntryBeyondEnd
last entry loop has reached its end
 
@ kEntryValid
data read okay
 
A TTree represents a columnar dataset.
 
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
 
This class represents a WWW compatible URL.
 
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
 
ROOT::RLogChannel & RDFLogChannel()
 
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
 
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
 
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
 
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
 
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
 
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
 
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
 
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
 
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
 
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
 
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
 
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
 
auto MakeAliasedSharedPtr(T *rawPtr)
 
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
 
std::vector< std::string > ColumnNames_t
 
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
 
R__EXTERN TVirtualRWMutex * gCoreMutex
 
A RAII object that calls RLoopManager::CleanUpTask at destruction.
 
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
 
RLoopManager & fLoopManager
 
ROOT::Detail::RDF::RLoopManager & fLM
 
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
 
TTreeReader * fTreeReader
 
A RAII object to pop and push slot numbers from a RSlotStack object.