Logo ROOT   6.14/05
Reference Guide
RDFNodes.cxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #include "RConfigure.h" // R__USE_IMT
12 #include "ROOT/RCutFlowReport.hxx"
13 #include "ROOT/RDFNodes.hxx"
14 #include "ROOT/RDFUtils.hxx"
15 #include "ROOT/RDataSource.hxx"
17 #include "ROOT/RStringView.hxx"
18 #include "TTree.h"
19 #ifdef R__USE_IMT
20 #include "ROOT/TThreadExecutor.hxx"
21 #endif
22 #include <limits.h>
23 #include <cassert>
24 #include <functional>
25 #include <map>
26 #include <memory>
27 #include <mutex>
28 #include <numeric>
29 #include <stdexcept>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 #include "RtypesCore.h" // Long64_t
35 #include "TInterpreter.h"
36 #include "TROOT.h" // IsImplicitMTEnabled
37 #include "TTreeReader.h"
38 
39 class TDirectory;
40 namespace ROOT {
41 class TSpinMutex;
42 } // namespace ROOT
43 
44 using namespace ROOT::Detail::RDF;
45 using namespace ROOT::Internal::RDF;
46 
47 namespace ROOT {
48 namespace Internal {
49 namespace RDF {
50 
51 RActionBase::RActionBase(RLoopManager *implPtr, const unsigned int nSlots) : fLoopManager(implPtr), fNSlots(nSlots)
52 {
53 }
54 
55 } // end NS RDF
56 } // end NS Internal
57 } // end NS ROOT
58 
60  const bool isDSColumn)
61  : fLoopManager(implPtr), fName(name), fNSlots(nSlots), fIsDataSourceColumn(isDSColumn)
62 {
63 }
64 
65 // pin vtable. Work around cling JIT issue.
67 
68 std::string RCustomColumnBase::GetName() const
69 {
70  return fName;
71 }
72 
74 {
75  return fLoopManager;
76 }
77 
79 {
80  fLastCheckedEntry = std::vector<Long64_t>(fNSlots, -1);
81 }
82 
83 RFilterBase::RFilterBase(RLoopManager *implPtr, std::string_view name, const unsigned int nSlots)
84  : fLoopManager(implPtr), fLastResult(nSlots), fAccepted(nSlots), fRejected(nSlots), fName(name), fNSlots(nSlots)
85 {
86 }
87 
89 {
90  return fLoopManager;
91 }
92 
94 {
95  return !fName.empty();
96 };
97 
99 {
100  if (fName.empty()) // FillReport is no-op for unnamed filters
101  return;
102  const auto accepted = std::accumulate(fAccepted.begin(), fAccepted.end(), 0ULL);
103  const auto all = accepted + std::accumulate(fRejected.begin(), fRejected.end(), 0ULL);
104  rep.AddCut({fName, accepted, all});
105 }
106 
108 {
109  fLastCheckedEntry = std::vector<Long64_t>(fNSlots, -1);
110  if (!fName.empty()) // if this is a named filter we care about its report count
112 }
113 
114 void RJittedFilter::SetFilter(std::unique_ptr<RFilterBase> f)
115 {
116  fConcreteFilter = std::move(f);
117 }
118 
119 void RJittedFilter::InitSlot(TTreeReader *r, unsigned int slot)
120 {
121  R__ASSERT(fConcreteFilter != nullptr);
122  fConcreteFilter->InitSlot(r, slot);
123 }
124 
125 bool RJittedFilter::CheckFilters(unsigned int slot, Long64_t entry)
126 {
127  R__ASSERT(fConcreteFilter != nullptr);
128  return fConcreteFilter->CheckFilters(slot, entry);
129 }
130 
132 {
133  R__ASSERT(fConcreteFilter != nullptr);
134  fConcreteFilter->Report(cr);
135 }
136 
138 {
139  R__ASSERT(fConcreteFilter != nullptr);
140  fConcreteFilter->PartialReport(cr);
141 }
142 
144 {
145  R__ASSERT(fConcreteFilter != nullptr);
146  fConcreteFilter->FillReport(cr);
147 }
148 
150 {
151  R__ASSERT(fConcreteFilter != nullptr);
152  fConcreteFilter->IncrChildrenCount();
153 }
154 
156 {
157  R__ASSERT(fConcreteFilter != nullptr);
158  fConcreteFilter->StopProcessing();
159 }
160 
162 {
163  R__ASSERT(fConcreteFilter != nullptr);
164  fConcreteFilter->ResetChildrenCount();
165 }
166 
168 {
169  R__ASSERT(fConcreteFilter != nullptr);
170  fConcreteFilter->TriggerChildrenCount();
171 }
172 
174 {
175  R__ASSERT(fConcreteFilter != nullptr);
176  fConcreteFilter->ResetReportCount();
177 }
178 
179 void RJittedFilter::ClearValueReaders(unsigned int slot)
180 {
181  R__ASSERT(fConcreteFilter != nullptr);
182  fConcreteFilter->ClearValueReaders(slot);
183 }
184 
186 {
187  R__ASSERT(fConcreteFilter != nullptr);
188  fConcreteFilter->InitNode();
189 }
190 
191 void TSlotStack::ReturnSlot(unsigned int slotNumber)
192 {
193  auto &index = GetIndex();
194  auto &count = GetCount();
195  assert(count > 0U && "TSlotStack has a reference count relative to an index which will become negative.");
196  count--;
197  if (0U == count) {
198  index = UINT_MAX;
199  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
200  fBuf[fCursor++] = slotNumber;
201  assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the "
202  "stack. fCursor is greater than the size of the internal buffer. This violates "
203  "such assumption.");
204  }
205 }
206 
207 unsigned int TSlotStack::GetSlot()
208 {
209  auto &index = GetIndex();
210  auto &count = GetCount();
211  count++;
212  if (UINT_MAX != index)
213  return index;
214  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
215  assert(fCursor > 0 && "TSlotStack assumes that a value can be always obtained. In this case fCursor is <=0 and this "
216  "violates such assumption.");
217  index = fBuf[--fCursor];
218  return index;
219 }
220 
221 RLoopManager::RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
222  : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
225 {
226 }
227 
229  : fNEmptyEntries(nEmptyEntries), fNSlots(RDFInternal::GetNSlots()),
230  fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles)
231 {
232 }
233 
234 RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
235  : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
236  fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
237  fDataSource(std::move(ds))
238 {
239  fDataSource->SetNSlots(fNSlots);
240 }
241 
242 /// Run event loop with no source files, in parallel.
244 {
245 #ifdef R__USE_IMT
246  TSlotStack slotStack(fNSlots);
247  // Working with an empty tree.
248  // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
249  const auto nEntriesPerSlot = fNEmptyEntries / (fNSlots * 2);
250  auto remainder = fNEmptyEntries % (fNSlots * 2);
251  std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
252  ULong64_t start = 0;
253  while (start < fNEmptyEntries) {
254  ULong64_t end = start + nEntriesPerSlot;
255  if (remainder > 0) {
256  ++end;
257  --remainder;
258  }
259  entryRanges.emplace_back(start, end);
260  start = end;
261  }
262 
263  // Each task will generate a subrange of entries
264  auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
265  auto slot = slotStack.GetSlot();
266  InitNodeSlots(nullptr, slot);
267  for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
268  RunAndCheckFilters(slot, currEntry);
269  }
270  CleanUpTask(slot);
271  slotStack.ReturnSlot(slot);
272  };
273 
275  pool.Foreach(genFunction, entryRanges);
276 
277 #endif // not implemented otherwise
278 }
279 
280 /// Run event loop with no source files, in sequence.
282 {
283  InitNodeSlots(nullptr, 0);
284  for (ULong64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
285  RunAndCheckFilters(0, currEntry);
286  }
287 }
288 
289 /// Run event loop over one or multiple ROOT files, in parallel.
291 {
292 #ifdef R__USE_IMT
293  TSlotStack slotStack(fNSlots);
294  using ttpmt_t = ROOT::TTreeProcessorMT;
295  std::unique_ptr<ttpmt_t> tp;
296  tp.reset(new ttpmt_t(*fTree));
297 
298  tp->Process([this, &slotStack](TTreeReader &r) -> void {
299  auto slot = slotStack.GetSlot();
300  InitNodeSlots(&r, slot);
301  // recursive call to check filters and conditionally execute actions
302  while (r.Next()) {
304  }
305  CleanUpTask(slot);
306  slotStack.ReturnSlot(slot);
307  });
308 #endif // no-op otherwise (will not be called)
309 }
310 
311 /// Run event loop over one or multiple ROOT files, in sequence.
313 {
314  TTreeReader r(fTree.get());
315  if (0 == fTree->GetEntriesFast())
316  return;
317  InitNodeSlots(&r, 0);
318 
319  // recursive call to check filters and conditionally execute actions
320  // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
321  while (r.Next() && fNStopsReceived < fNChildren) {
322  RunAndCheckFilters(0, r.GetCurrentEntry());
323  }
324  fTree->GetEntry(0);
325 }
326 
327 /// Run event loop over data accessed through a DataSource, in sequence.
329 {
330  assert(fDataSource != nullptr);
331  fDataSource->Initialise();
332  auto ranges = fDataSource->GetEntryRanges();
333  while (!ranges.empty()) {
334  InitNodeSlots(nullptr, 0u);
335  fDataSource->InitSlot(0u, 0ull);
336  for (const auto &range : ranges) {
337  auto end = range.second;
338  for (auto entry = range.first; entry < end; ++entry) {
339  if (fDataSource->SetEntry(0u, entry)) {
340  RunAndCheckFilters(0u, entry);
341  }
342  }
343  }
344  fDataSource->FinaliseSlot(0u);
345  ranges = fDataSource->GetEntryRanges();
346  }
347  fDataSource->Finalise();
348 }
349 
350 /// Run event loop over data accessed through a DataSource, in parallel.
352 {
353 #ifdef R__USE_IMT
354  assert(fDataSource != nullptr);
355  TSlotStack slotStack(fNSlots);
357 
358  // Each task works on a subrange of entries
359  auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
360  const auto slot = slotStack.GetSlot();
361  InitNodeSlots(nullptr, slot);
362  fDataSource->InitSlot(slot, range.first);
363  const auto end = range.second;
364  for (auto entry = range.first; entry < end; ++entry) {
365  if (fDataSource->SetEntry(slot, entry)) {
366  RunAndCheckFilters(slot, entry);
367  }
368  }
369  CleanUpTask(slot);
370  fDataSource->FinaliseSlot(slot);
371  slotStack.ReturnSlot(slot);
372  };
373 
374  fDataSource->Initialise();
375  auto ranges = fDataSource->GetEntryRanges();
376  while (!ranges.empty()) {
377  pool.Foreach(runOnRange, ranges);
378  ranges = fDataSource->GetEntryRanges();
379  }
380  fDataSource->Finalise();
381 #endif // not implemented otherwise (never called)
382 }
383 
384 /// Execute actions and make sure named filters are called for each event.
385 /// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
386 void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
387 {
388  for (auto &actionPtr : fBookedActions)
389  actionPtr->Run(slot, entry);
390  for (auto &namedFilterPtr : fBookedNamedFilters)
391  namedFilterPtr->CheckFilters(slot, entry);
392  for (auto &callback : fCallbacks)
393  callback(slot);
394 }
395 
396 /// Build TTreeReaderValues for all nodes
397 /// This method loops over all filters, actions and other booked objects and
398 /// calls their `InitRDFValues` methods. It is called once per node per slot, before
399 /// running the event loop. It also informs each node of the TTreeReader that
400 /// a particular slot will be using.
401 void RLoopManager::InitNodeSlots(TTreeReader *r, unsigned int slot)
402 {
403  // booked branches must be initialized first because other nodes might need to point to the values they encapsulate
404  for (auto &bookedBranch : fBookedCustomColumns)
405  bookedBranch.second->InitSlot(r, slot);
406  for (auto &ptr : fBookedActions)
407  ptr->InitSlot(r, slot);
408  for (auto &ptr : fBookedFilters)
409  ptr->InitSlot(r, slot);
410  for (auto &callback : fCallbacksOnce)
411  callback(slot);
412 }
413 
414 /// Initialize all nodes of the functional graph before running the event loop.
415 /// This method is called once per event-loop and performs generic initialization
416 /// operations that do not depend on the specific processing slot (i.e. operations
417 /// that are common for all threads).
419 {
421  for (auto &filter : fBookedFilters)
422  filter->InitNode();
423  for (auto &customColumn : fBookedCustomColumns)
424  customColumn.second->InitNode();
425  for (auto &range : fBookedRanges)
426  range->InitNode();
427  for (auto &ptr : fBookedActions)
428  ptr->Initialize();
429 }
430 
431 /// Perform clean-up operations. To be called at the end of each event loop.
433 {
434  fMustRunNamedFilters = false;
435 
436  // forget RActions and detach TResultProxies
437  fBookedActions.clear();
438  for (auto readiness : fResProxyReadiness) {
439  *readiness = true;
440  }
441  fResProxyReadiness.clear();
442 
443  // reset children counts
444  fNChildren = 0;
445  fNStopsReceived = 0;
446  for (auto &ptr : fBookedFilters)
447  ptr->ResetChildrenCount();
448  for (auto &ptr : fBookedRanges)
449  ptr->ResetChildrenCount();
450  for (auto &ptr : fBookedRanges)
451  ptr->ResetChildrenCount();
452 
453  fCallbacks.clear();
454  fCallbacksOnce.clear();
455 }
456 
457 /// Perform clean-up operations. To be called at the end of each task execution.
458 void RLoopManager::CleanUpTask(unsigned int slot)
459 {
460  for (auto &ptr : fBookedActions)
461  ptr->FinalizeSlot(slot);
462  for (auto &ptr : fBookedFilters)
463  ptr->ClearValueReaders(slot);
464  for (auto &pair : fBookedCustomColumns)
465  pair.second->ClearValueReaders(slot);
466 }
467 
468 /// Jit all actions that required runtime column type inference, and clean the `fToJit` member variable.
470 {
471  auto error = TInterpreter::EErrorCode::kNoError;
472  gInterpreter->Calc(fToJit.c_str(), &error);
473  if (TInterpreter::EErrorCode::kNoError != error) {
474  std::string exceptionText =
475  "An error occurred while jitting. The lines above might indicate the cause of the crash\n";
476  throw std::runtime_error(exceptionText.c_str());
477  }
478  fToJit.clear();
479 }
480 
481 /// Trigger counting of number of children nodes for each node of the functional graph.
482 /// This is done once before starting the event loop. Each action sends an `increase children count` signal
483 /// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
484 /// children counter. Each node only propagates the signal once, even if it receives it multiple times.
485 /// Named filters also send an `increase children count` signal, just like actions, as they always execute during
486 /// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
488 {
489  for (auto &actionPtr : fBookedActions)
490  actionPtr->TriggerChildrenCount();
491  for (auto &namedFilterPtr : fBookedNamedFilters)
492  namedFilterPtr->TriggerChildrenCount();
493 }
494 
495 unsigned int RLoopManager::GetNextID() const
496 {
497  static unsigned int id = 0;
498  ++id;
499  return id;
500 }
501 
502 /// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
503 /// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
505 {
506  if (!fToJit.empty())
507  JitActions();
508 
509  InitNodes();
510 
511  switch (fLoopType) {
515  case ELoopType::kNoFiles: RunEmptySource(); break;
516  case ELoopType::kROOTFiles: RunTreeReader(); break;
517  case ELoopType::kDataSource: RunDataSource(); break;
518  }
519 
520  CleanUpNodes();
521 }
522 
524 {
525  return this;
526 }
527 
528 /// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
529 const ColumnNames_t &RLoopManager::GetDefaultColumnNames() const
530 {
531  return fDefaultColumns;
532 }
533 
534 TTree *RLoopManager::GetTree() const
535 {
536  return fTree.get();
537 }
538 
540 {
541  return fDirPtr;
542 }
543 
544 void RLoopManager::Book(const ActionBasePtr_t &actionPtr)
545 {
546  fBookedActions.emplace_back(actionPtr);
547 }
548 
549 void RLoopManager::Book(const FilterBasePtr_t &filterPtr)
550 {
551  fBookedFilters.emplace_back(filterPtr);
552  if (filterPtr->HasName()) {
553  fBookedNamedFilters.emplace_back(filterPtr);
554  fMustRunNamedFilters = true;
555  }
556 }
557 
559 {
560  const auto &name = columnPtr->GetName();
561  fBookedCustomColumns[name] = columnPtr;
562 }
563 
564 void RLoopManager::Book(const std::shared_ptr<bool> &readinessPtr)
565 {
566  fResProxyReadiness.emplace_back(readinessPtr);
567 }
568 
569 void RLoopManager::Book(const RangeBasePtr_t &rangePtr)
570 {
571  fBookedRanges.emplace_back(rangePtr);
572 }
573 
574 // dummy call, end of recursive chain of calls
575 bool RLoopManager::CheckFilters(int, unsigned int)
576 {
577  return true;
578 }
579 
580 /// Call `FillReport` on all booked filters
582 {
583  for (const auto &fPtr : fBookedNamedFilters)
584  fPtr->FillReport(rep);
585 }
586 
587 void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
588 {
589  if (everyNEvents == 0ull)
590  fCallbacksOnce.emplace_back(std::move(f), fNSlots);
591  else
592  fCallbacks.emplace_back(everyNEvents, std::move(f), fNSlots);
593 }
594 
595 RRangeBase::RRangeBase(RLoopManager *implPtr, unsigned int start, unsigned int stop, unsigned int stride,
596  const unsigned int nSlots)
597  : fLoopManager(implPtr), fStart(start), fStop(stop), fStride(stride), fNSlots(nSlots)
598 {
599 }
600 
602 {
603  return fLoopManager;
604 }
605 
607 {
608  fLastCheckedEntry = -1;
609  fNProcessedEntries = 0;
610  fHasStopped = false;
611 }
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
Definition: RDFNodes.cxx:312
const unsigned int fNSlots
Number of thread slots used by this node.
Definition: RDFNodes.hxx:343
void JitActions()
Jit all actions that required runtime column type inference, and clean the fToJit member variable...
Definition: RDFNodes.cxx:469
void AddCut(TCutInfo &&ci)
RRangeBase(RLoopManager *implPtr, unsigned int start, unsigned int stop, unsigned int stride, const unsigned int nSlots)
Definition: RDFNodes.cxx:595
long long Long64_t
Definition: RtypesCore.h:69
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:43
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
Definition: RDFNodes.cxx:432
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
std::shared_ptr< RFilterBase > FilterBasePtr_t
Definition: RDFNodes.hxx:83
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
Definition: TTreeReader.h:215
std::shared_ptr< RDFInternal::RActionBase > ActionBasePtr_t
Definition: RDFNodes.hxx:78
std::shared_ptr< TTree > fTree
Definition: RDFNodes.hxx:140
void InitSlot(TTreeReader *r, unsigned int slot) override final
Definition: RDFNodes.cxx:119
bool CheckFilters(unsigned int slot, Long64_t entry) override final
Definition: RDFNodes.cxx:125
::TDirectory *const fDirPtr
Definition: RDFNodes.hxx:139
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
Definition: RDFNodes.hxx:564
FilterBaseVec_t fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
Definition: RDFNodes.hxx:134
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
Definition: RDFNodes.cxx:587
ActionBaseVec_t fBookedActions
Definition: RDFNodes.hxx:132
#define R__ASSERT(e)
Definition: TError.h:96
void TriggerChildrenCount() override final
Definition: RDFNodes.cxx:167
#define f(i)
Definition: RSha256.hxx:104
const ColumnNames_t fDefaultColumns
Definition: RDFNodes.hxx:143
RLoopManager * GetLoopManagerUnchecked() const
Definition: RDFNodes.cxx:73
#define gInterpreter
Definition: TInterpreter.h:527
unsigned int GetNSlots()
Definition: RDFUtils.cxx:244
STL namespace.
RFilterBase(RLoopManager *df, std::string_view name, const unsigned int nSlots)
Definition: RDFNodes.cxx:83
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
Definition: RDFNodes.cxx:221
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
Definition: RDFNodes.hxx:149
RCustomColumnBase(RLoopManager *df, std::string_view name, const unsigned int nSlots, const bool isDSColumn)
Definition: RDFNodes.cxx:59
std::map< std::string, RCustomColumnBasePtr_t > fBookedCustomColumns
Definition: RDFNodes.hxx:135
void InitNode() override final
Definition: RDFNodes.cxx:185
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
Definition: RDFNodes.cxx:290
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source...
Definition: RDFNodes.cxx:504
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
Definition: RDFNodes.hxx:340
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
Definition: RDFNodes.cxx:351
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame...
Definition: RDFNodes.cxx:529
RLoopManager * GetLoopManagerUnchecked()
Definition: RDFNodes.cxx:523
void ReturnSlot(unsigned int slotNumber)
Definition: RDFNodes.cxx:191
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
Definition: RDFNodes.cxx:243
unsigned int GetNSlots() const
Definition: RDFNodes.hxx:197
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
Definition: RDFNodes.hxx:723
::TDirectory * GetDirectory() const
Definition: RDFNodes.cxx:539
void RunEmptySource()
Run event loop with no source files, in sequence.
Definition: RDFNodes.cxx:281
bool CheckFilters(int, unsigned int)
Definition: RDFNodes.cxx:575
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:146
XFontStruct * id
Definition: TGX11.cxx:108
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
Definition: RDFNodes.cxx:487
virtual void ResetReportCount()
Definition: RDFNodes.hxx:595
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
Definition: RDFNodes.cxx:328
RLoopManager * GetLoopManagerUnchecked() const
Definition: RDFNodes.cxx:88
void Report(ROOT::RDF::RCutFlowReport &) const override final
Definition: RDFNodes.cxx:131
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
Definition: RDFNodes.cxx:401
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: RDFNodes.hxx:148
const unsigned int fNSlots
Definition: RDFNodes.hxx:145
This class provides a simple interface to execute the same task multiple times in parallel...
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: RDFNodes.hxx:147
ROOT::R::TRInterface & r
Definition: Object.C:4
std::vector< TCallback > fCallbacks
Registered callbacks.
Definition: RDFNodes.hxx:154
std::shared_ptr< RCustomColumnBase > RCustomColumnBasePtr_t
Definition: RDFNodes.hxx:81
RLoopManager * GetLoopManagerUnchecked() const
Definition: RDFNodes.cxx:601
std::vector< ULong64_t > fAccepted
Definition: RDFNodes.hxx:568
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
Definition: RDFNodes.cxx:386
std::vector< std::shared_ptr< bool > > fResProxyReadiness
Definition: RDFNodes.hxx:138
std::string fToJit
string containing all BuildAndBook actions that should be jitted before running
Definition: RDFNodes.hxx:150
std::vector< Long64_t > fLastCheckedEntry
Definition: RDFNodes.hxx:446
const ULong64_t fNEmptyEntries
Definition: RDFNodes.hxx:144
void StopProcessing() override final
Definition: RDFNodes.cxx:155
void Report(ROOT::RDF::RCutFlowReport &rep) const
Call FillReport on all booked filters.
Definition: RDFNodes.cxx:581
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
Definition: RDFNodes.cxx:458
unsigned int GetNextID() const
Definition: RDFNodes.cxx:495
Describe directory structure in memory.
Definition: TDirectory.h:34
void ResetReportCount() override final
Definition: RDFNodes.cxx:173
unsigned long long ULong64_t
Definition: RtypesCore.h:70
void FillReport(ROOT::RDF::RCutFlowReport &) const override final
Definition: RDFNodes.cxx:143
void PartialReport(ROOT::RDF::RCutFlowReport &) const override final
Definition: RDFNodes.cxx:137
basic_string_view< char > string_view
Definition: RStringView.hxx:35
std::shared_ptr< RRangeBase > RangeBasePtr_t
Definition: RDFNodes.hxx:86
void ClearValueReaders(unsigned int slot) override final
Definition: RDFNodes.cxx:179
void ResetChildrenCount() override final
Definition: RDFNodes.cxx:161
virtual void FillReport(ROOT::RDF::RCutFlowReport &) const
Definition: RDFNodes.cxx:98
bool fHasStopped
True if the end of the range has been reached.
Definition: RDFNodes.hxx:733
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
Definition: RDFNodes.hxx:151
std::vector< Long64_t > fLastCheckedEntry
Definition: RDFNodes.hxx:566
void SetFilter(std::unique_ptr< RFilterBase > f)
Definition: RDFNodes.cxx:114
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
Definition: TTreeReader.h:171
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:607
std::vector< ULong64_t > fRejected
Definition: RDFNodes.hxx:569
const unsigned int fNSlots
number of thread slots used by this node, inherited from parent node.
Definition: RDFNodes.hxx:444
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
Definition: RDFNodes.hxx:573
Definition: tree.py:1
RLoopManager * fLoopManager
A raw pointer to the RLoopManager at the root of this functional graph.
Definition: RDFNodes.hxx:439
FilterBaseVec_t fBookedFilters
Definition: RDFNodes.hxx:133
void IncrChildrenCount() override final
Definition: RDFNodes.cxx:149
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
Definition: RDFNodes.cxx:418
A class to process the entries of a TTree in parallel.
char name[80]
Definition: TGX11.cxx:109
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
Definition: RDFNodes.hxx:155
void Book(const ActionBasePtr_t &actionPtr)
Definition: RDFNodes.cxx:544