11 #include "RConfigure.h" 51 RActionBase::RActionBase(
RLoopManager *implPtr,
const unsigned int nSlots) : fLoopManager(implPtr), fNSlots(nSlots)
60 const bool isDSColumn)
95 return !
fName.empty();
103 const auto all = accepted + std::accumulate(
fRejected.begin(),
fRejected.end(), 0ULL);
116 fConcreteFilter = std::move(f);
122 fConcreteFilter->InitSlot(r, slot);
128 return fConcreteFilter->CheckFilters(slot, entry);
134 fConcreteFilter->Report(cr);
140 fConcreteFilter->PartialReport(cr);
146 fConcreteFilter->FillReport(cr);
152 fConcreteFilter->IncrChildrenCount();
158 fConcreteFilter->StopProcessing();
164 fConcreteFilter->ResetChildrenCount();
170 fConcreteFilter->TriggerChildrenCount();
176 fConcreteFilter->ResetReportCount();
182 fConcreteFilter->ClearValueReaders(slot);
188 fConcreteFilter->InitNode();
191 void TSlotStack::ReturnSlot(
unsigned int slotNumber)
193 auto &index = GetIndex();
194 auto &count = GetCount();
195 assert(count > 0U &&
"TSlotStack has a reference count relative to an index which will become negative.");
199 std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
200 fBuf[fCursor++] = slotNumber;
201 assert(fCursor <= fBuf.size() &&
"TSlotStack assumes that at most a fixed number of values can be present in the " 202 "stack. fCursor is greater than the size of the internal buffer. This violates " 207 unsigned int TSlotStack::GetSlot()
209 auto &index = GetIndex();
210 auto &count = GetCount();
212 if (UINT_MAX != index)
214 std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
215 assert(fCursor > 0 &&
"TSlotStack assumes that a value can be always obtained. In this case fCursor is <=0 and this " 216 "violates such assumption.");
217 index = fBuf[--fCursor];
222 : fTree(
std::shared_ptr<TTree>(tree, [](TTree *) {})),
fDefaultColumns(defaultBranches),
251 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
259 entryRanges.emplace_back(start, end);
264 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
265 auto slot = slotStack.
GetSlot();
267 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
275 pool.
Foreach(genFunction, entryRanges);
277 #endif // not implemented otherwise 295 std::unique_ptr<ttpmt_t> tp;
296 tp.reset(
new ttpmt_t(*
fTree));
298 tp->Process([
this, &slotStack](
TTreeReader &
r) ->
void {
299 auto slot = slotStack.
GetSlot();
308 #endif // no-op otherwise (will not be called) 315 if (0 ==
fTree->GetEntriesFast())
333 while (!ranges.empty()) {
336 for (
const auto &range : ranges) {
337 auto end = range.second;
338 for (
auto entry = range.first; entry < end; ++entry) {
359 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
360 const auto slot = slotStack.
GetSlot();
363 const auto end = range.second;
364 for (
auto entry = range.first; entry < end; ++entry) {
376 while (!ranges.empty()) {
377 pool.
Foreach(runOnRange, ranges);
381 #endif // not implemented otherwise (never called) 389 actionPtr->Run(slot, entry);
391 namedFilterPtr->CheckFilters(slot, entry);
405 bookedBranch.second->InitSlot(r, slot);
407 ptr->InitSlot(r, slot);
409 ptr->InitSlot(r, slot);
424 customColumn.second->InitNode();
441 fResProxyReadiness.clear();
447 ptr->ResetChildrenCount();
449 ptr->ResetChildrenCount();
450 for (
auto &ptr : fBookedRanges)
451 ptr->ResetChildrenCount();
461 ptr->FinalizeSlot(slot);
463 ptr->ClearValueReaders(slot);
465 pair.second->ClearValueReaders(slot);
471 auto error = TInterpreter::EErrorCode::kNoError;
473 if (TInterpreter::EErrorCode::kNoError != error) {
474 std::string exceptionText =
475 "An error occurred while jitting. The lines above might indicate the cause of the crash\n";
476 throw std::runtime_error(exceptionText.c_str());
490 actionPtr->TriggerChildrenCount();
492 namedFilterPtr->TriggerChildrenCount();
497 static unsigned int id = 0;
552 if (filterPtr->HasName()) {
560 const auto &
name = columnPtr->GetName();
584 fPtr->FillReport(rep);
589 if (everyNEvents == 0ull)
596 const unsigned int nSlots)
597 : fLoopManager(implPtr), fStart(start), fStop(stop), fStride(stride),
fNSlots(nSlots)
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
const unsigned int fNSlots
Number of thread slots used by this node.
void JitActions()
Jit all actions that required runtime column type inference, and clean the fToJit member variable...
void AddCut(TCutInfo &&ci)
RRangeBase(RLoopManager *implPtr, unsigned int start, unsigned int stop, unsigned int stride, const unsigned int nSlots)
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
std::string GetName() const
Namespace for new ROOT classes and functions.
std::shared_ptr< RFilterBase > FilterBasePtr_t
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
std::shared_ptr< RDFInternal::RActionBase > ActionBasePtr_t
std::shared_ptr< TTree > fTree
void InitSlot(TTreeReader *r, unsigned int slot) override final
bool CheckFilters(unsigned int slot, Long64_t entry) override final
::TDirectory *const fDirPtr
bool fMustRunNamedFilters
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
FilterBaseVec_t fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
ActionBaseVec_t fBookedActions
void TriggerChildrenCount() override final
virtual ~RCustomColumnBase()
const ColumnNames_t fDefaultColumns
RLoopManager * GetLoopManagerUnchecked() const
RFilterBase(RLoopManager *df, std::string_view name, const unsigned int nSlots)
RangeBaseVec_t fBookedRanges
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
RCustomColumnBase(RLoopManager *df, std::string_view name, const unsigned int nSlots, const bool isDSColumn)
std::map< std::string, RCustomColumnBasePtr_t > fBookedCustomColumns
void InitNode() override final
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source...
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame...
RLoopManager * GetLoopManagerUnchecked()
void ReturnSlot(unsigned int slotNumber)
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
unsigned int GetNSlots() const
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
::TDirectory * GetDirectory() const
void RunEmptySource()
Run event loop with no source files, in sequence.
bool CheckFilters(int, unsigned int)
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
virtual void ResetReportCount()
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
RLoopManager * GetLoopManagerUnchecked() const
void Report(ROOT::RDF::RCutFlowReport &) const override final
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
const unsigned int fNSlots
This class provides a simple interface to execute the same task multiple times in parallel...
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
std::vector< TCallback > fCallbacks
Registered callbacks.
std::shared_ptr< RCustomColumnBase > RCustomColumnBasePtr_t
RLoopManager * GetLoopManagerUnchecked() const
std::vector< ULong64_t > fAccepted
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< std::shared_ptr< bool > > fResProxyReadiness
std::string fToJit
string containing all BuildAndBook actions that should be jitted before running
std::vector< Long64_t > fLastCheckedEntry
Long64_t fLastCheckedEntry
const ULong64_t fNEmptyEntries
void StopProcessing() override final
void Report(ROOT::RDF::RCutFlowReport &rep) const
Call FillReport on all booked filters.
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
unsigned int GetNextID() const
Describe directory structure in memory.
ULong64_t fNProcessedEntries
void ResetReportCount() override final
unsigned long long ULong64_t
void FillReport(ROOT::RDF::RCutFlowReport &) const override final
void PartialReport(ROOT::RDF::RCutFlowReport &) const override final
basic_string_view< char > string_view
std::shared_ptr< RRangeBase > RangeBasePtr_t
void ClearValueReaders(unsigned int slot) override final
void ResetChildrenCount() override final
virtual void FillReport(ROOT::RDF::RCutFlowReport &) const
bool fHasStopped
True if the end of the range has been reached.
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
std::vector< Long64_t > fLastCheckedEntry
void SetFilter(std::unique_ptr< RFilterBase > f)
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::vector< ULong64_t > fRejected
const unsigned int fNSlots
number of thread slots used by this node, inherited from parent node.
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
FilterBaseVec_t fBookedFilters
void IncrChildrenCount() override final
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
A class to process the entries of a TTree in parallel.
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void Book(const ActionBasePtr_t &actionPtr)