Logo ROOT   6.14/05
Reference Guide
RDFActionHelpers.hxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 12/2016
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #ifndef ROOT_RDFOPERATIONS
12 #define ROOT_RDFOPERATIONS
13 
14 #include <algorithm>
15 #include <limits>
16 #include <memory>
17 #include <stack>
18 #include <stdexcept>
19 #include <string>
20 #include <type_traits>
21 #include <vector>
22 
23 #include "Compression.h"
25 #include "ROOT/RStringView.hxx"
26 #include "ROOT/RVec.hxx"
27 #include "ROOT/TBufferMerger.hxx" // for SnapshotHelper
28 #include "ROOT/RCutFlowReport.hxx"
29 #include "ROOT/RDFUtils.hxx"
30 #include "ROOT/RMakeUnique.hxx"
32 #include "ROOT/TThreadedObject.hxx"
33 #include "ROOT/TypeTraits.hxx"
34 #include "RtypesCore.h"
35 #include "TBranch.h"
36 #include "TClassEdit.h"
37 #include "TDirectory.h"
38 #include "TFile.h" // for SnapshotHelper
39 #include "TH1.h"
40 #include "TLeaf.h"
41 #include "TObjArray.h"
42 #include "TObject.h"
43 #include "TTree.h"
44 #include "TTreeReader.h" // for SnapshotHelper
45 
46 /// \cond HIDDEN_SYMBOLS
47 
48 namespace ROOT {
49 namespace Detail {
50 namespace RDF {
51 
52 template <typename Helper>
53 class RActionImpl
54 {
55 public:
56  // call Helper::FinalizeTask if present, do nothing otherwise
57  template <typename T = Helper>
58  auto CallFinalizeTask(unsigned int slot) -> decltype(&T::FinalizeTask, void())
59  {
60  static_cast<Helper*>(this)->FinalizeTask(slot);
61  }
62 
63  template <typename... Args>
64  void CallFinalizeTask(unsigned int, Args...) {}
65 };
66 
67 } // namespace RDF
68 } // namespace Detail
69 
70 namespace Internal {
71 namespace RDF {
72 using namespace ROOT::TypeTraits;
73 using namespace ROOT::VecOps;
74 using namespace ROOT::RDF;
75 using namespace ROOT::Detail::RDF;
76 
77 using Hist_t = ::TH1D;
78 
79 /// The container type for each thread's partial result in an action helper
80 // We have to avoid to instantiate std::vector<bool> as that makes it impossible to return a reference to one of
81 // the thread-local results. In addition, a common definition for the type of the container makes it easy to swap
82 // the type of the underlying container if e.g. we see problems with false sharing of the thread-local results..
83 template <typename T>
84 using Results = typename std::conditional<std::is_same<T, bool>::value, std::deque<T>, std::vector<T>>::type;
85 
86 
87 template <typename F>
88 class ForeachSlotHelper : public RActionImpl<ForeachSlotHelper<F>> {
89  F fCallable;
90 
91 public:
92  using ColumnTypes_t = RemoveFirstParameter_t<typename CallableTraits<F>::arg_types>;
93  ForeachSlotHelper(F &&f) : fCallable(f) {}
94  ForeachSlotHelper(ForeachSlotHelper &&) = default;
95  ForeachSlotHelper(const ForeachSlotHelper &) = delete;
96 
97  void InitTask(TTreeReader *, unsigned int) {}
98 
99  template <typename... Args>
100  void Exec(unsigned int slot, Args &&... args)
101  {
102  // check that the decayed types of Args are the same as the branch types
103  static_assert(std::is_same<TypeList<typename std::decay<Args>::type...>, ColumnTypes_t>::value, "");
104  fCallable(slot, std::forward<Args>(args)...);
105  }
106 
107  void Initialize() { /* noop */}
108 
109  void Finalize() { /* noop */}
110 };
111 
112 class CountHelper : public RActionImpl<CountHelper> {
113  const std::shared_ptr<ULong64_t> fResultCount;
114  Results<ULong64_t> fCounts;
115 
116 public:
117  using ColumnTypes_t = TypeList<>;
118  CountHelper(const std::shared_ptr<ULong64_t> &resultCount, const unsigned int nSlots);
119  CountHelper(CountHelper &&) = default;
120  CountHelper(const CountHelper &) = delete;
121  void InitTask(TTreeReader *, unsigned int) {}
122  void Exec(unsigned int slot);
123  void Initialize() { /* noop */}
124  void Finalize();
125  ULong64_t &PartialUpdate(unsigned int slot);
126 };
127 
128 template <typename ProxiedVal_t>
129 class ReportHelper : public RActionImpl<ReportHelper<ProxiedVal_t>> {
130  const std::shared_ptr<RCutFlowReport> fReport;
131  // Here we have a weak pointer since we need to keep track of the validity
132  // of the proxied node. It can happen that the user does not trigger the
133  // event loop by looking into the RResultPtr and the chain goes out of scope
134  // before the Finalize method is invoked.
135  std::weak_ptr<ProxiedVal_t> fProxiedWPtr;
136  bool fReturnEmptyReport;
137 
138 public:
139  using ColumnTypes_t = TypeList<>;
140  ReportHelper(const std::shared_ptr<RCutFlowReport> &report, const std::shared_ptr<ProxiedVal_t> &pp, bool emptyRep)
141  : fReport(report), fProxiedWPtr(pp), fReturnEmptyReport(emptyRep){};
142  ReportHelper(ReportHelper &&) = default;
143  ReportHelper(const ReportHelper &) = delete;
144  void InitTask(TTreeReader *, unsigned int) {}
145  void Exec(unsigned int /* slot */) {}
146  void Initialize() { /* noop */}
147  void Finalize()
148  {
149  // We need the weak_ptr in order to avoid crashes at tear down
150  if (!fReturnEmptyReport && !fProxiedWPtr.expired())
151  fProxiedWPtr.lock()->Report(*fReport);
152  }
153 };
154 
155 class FillHelper : public RActionImpl<FillHelper> {
156  // this sets a total initial size of 16 MB for the buffers (can increase)
157  static constexpr unsigned int fgTotalBufSize = 2097152;
158  using BufEl_t = double;
159  using Buf_t = std::vector<BufEl_t>;
160 
161  std::vector<Buf_t> fBuffers;
162  std::vector<Buf_t> fWBuffers;
163  const std::shared_ptr<Hist_t> fResultHist;
164  unsigned int fNSlots;
165  unsigned int fBufSize;
166  /// Histograms containing "snapshots" of partial results. Non-null only if a registered callback requires it.
167  Results<std::unique_ptr<Hist_t>> fPartialHists;
168  Buf_t fMin;
169  Buf_t fMax;
170 
171  void UpdateMinMax(unsigned int slot, double v);
172 
173 public:
174  FillHelper(const std::shared_ptr<Hist_t> &h, const unsigned int nSlots);
175  FillHelper(FillHelper &&) = default;
176  FillHelper(const FillHelper &) = delete;
177  void InitTask(TTreeReader *, unsigned int) {}
178  void Exec(unsigned int slot, double v);
179  void Exec(unsigned int slot, double v, double w);
180 
181  template <typename T, typename std::enable_if<IsContainer<T>::value, int>::type = 0>
182  void Exec(unsigned int slot, const T &vs)
183  {
184  auto &thisBuf = fBuffers[slot];
185  for (auto &v : vs) {
186  UpdateMinMax(slot, v);
187  thisBuf.emplace_back(v); // TODO: Can be optimised in case T == BufEl_t
188  }
189  }
190 
191  template <typename T, typename W,
192  typename std::enable_if<IsContainer<T>::value && IsContainer<W>::value, int>::type = 0>
193  void Exec(unsigned int slot, const T &vs, const W &ws)
194  {
195  auto &thisBuf = fBuffers[slot];
196  for (auto &v : vs) {
197  UpdateMinMax(slot, v);
198  thisBuf.emplace_back(v); // TODO: Can be optimised in case T == BufEl_t
199  }
200 
201  auto &thisWBuf = fWBuffers[slot];
202  for (auto &w : ws) {
203  thisWBuf.emplace_back(w); // TODO: Can be optimised in case T == BufEl_t
204  }
205  }
206 
207  Hist_t &PartialUpdate(unsigned int);
208 
209  void Initialize() { /* noop */}
210 
211  void Finalize();
212 };
213 
214 extern template void FillHelper::Exec(unsigned int, const std::vector<float> &);
215 extern template void FillHelper::Exec(unsigned int, const std::vector<double> &);
216 extern template void FillHelper::Exec(unsigned int, const std::vector<char> &);
217 extern template void FillHelper::Exec(unsigned int, const std::vector<int> &);
218 extern template void FillHelper::Exec(unsigned int, const std::vector<unsigned int> &);
219 extern template void FillHelper::Exec(unsigned int, const std::vector<float> &, const std::vector<float> &);
220 extern template void FillHelper::Exec(unsigned int, const std::vector<double> &, const std::vector<double> &);
221 extern template void FillHelper::Exec(unsigned int, const std::vector<char> &, const std::vector<char> &);
222 extern template void FillHelper::Exec(unsigned int, const std::vector<int> &, const std::vector<int> &);
223 extern template void
224 FillHelper::Exec(unsigned int, const std::vector<unsigned int> &, const std::vector<unsigned int> &);
225 
226 template <typename HIST = Hist_t>
227 class FillTOHelper : public RActionImpl<FillTOHelper<HIST>> {
228  std::unique_ptr<TThreadedObject<HIST>> fTo;
229 
230 public:
231  FillTOHelper(FillTOHelper &&) = default;
232  FillTOHelper(const FillTOHelper &) = delete;
233 
234  FillTOHelper(const std::shared_ptr<HIST> &h, const unsigned int nSlots) : fTo(new TThreadedObject<HIST>(*h))
235  {
236  fTo->SetAtSlot(0, h);
237  // Initialise all other slots
238  for (unsigned int i = 0; i < nSlots; ++i) {
239  fTo->GetAtSlot(i);
240  }
241  }
242 
243  void InitTask(TTreeReader *, unsigned int) {}
244 
245  void Exec(unsigned int slot, double x0) // 1D histos
246  {
247  fTo->GetAtSlotRaw(slot)->Fill(x0);
248  }
249 
250  void Exec(unsigned int slot, double x0, double x1) // 1D weighted and 2D histos
251  {
252  fTo->GetAtSlotRaw(slot)->Fill(x0, x1);
253  }
254 
255  void Exec(unsigned int slot, double x0, double x1, double x2) // 2D weighted and 3D histos
256  {
257  fTo->GetAtSlotRaw(slot)->Fill(x0, x1, x2);
258  }
259 
260  void Exec(unsigned int slot, double x0, double x1, double x2, double x3) // 3D weighted histos
261  {
262  fTo->GetAtSlotRaw(slot)->Fill(x0, x1, x2, x3);
263  }
264 
265  template <typename X0, typename std::enable_if<IsContainer<X0>::value, int>::type = 0>
266  void Exec(unsigned int slot, const X0 &x0s)
267  {
268  auto thisSlotH = fTo->GetAtSlotRaw(slot);
269  for (auto &x0 : x0s) {
270  thisSlotH->Fill(x0); // TODO: Can be optimised in case T == vector<double>
271  }
272  }
273 
274  template <typename X0, typename X1,
275  typename std::enable_if<IsContainer<X0>::value && IsContainer<X1>::value, int>::type = 0>
276  void Exec(unsigned int slot, const X0 &x0s, const X1 &x1s)
277  {
278  auto thisSlotH = fTo->GetAtSlotRaw(slot);
279  if (x0s.size() != x1s.size()) {
280  throw std::runtime_error("Cannot fill histogram with values in containers of different sizes.");
281  }
282  auto x0sIt = std::begin(x0s);
283  const auto x0sEnd = std::end(x0s);
284  auto x1sIt = std::begin(x1s);
285  for (; x0sIt != x0sEnd; x0sIt++, x1sIt++) {
286  thisSlotH->Fill(*x0sIt, *x1sIt); // TODO: Can be optimised in case T == vector<double>
287  }
288  }
289 
290  template <typename X0, typename X1, typename X2,
291  typename std::enable_if<IsContainer<X0>::value && IsContainer<X1>::value && IsContainer<X2>::value,
292  int>::type = 0>
293  void Exec(unsigned int slot, const X0 &x0s, const X1 &x1s, const X2 &x2s)
294  {
295  auto thisSlotH = fTo->GetAtSlotRaw(slot);
296  if (!(x0s.size() == x1s.size() && x1s.size() == x2s.size())) {
297  throw std::runtime_error("Cannot fill histogram with values in containers of different sizes.");
298  }
299  auto x0sIt = std::begin(x0s);
300  const auto x0sEnd = std::end(x0s);
301  auto x1sIt = std::begin(x1s);
302  auto x2sIt = std::begin(x2s);
303  for (; x0sIt != x0sEnd; x0sIt++, x1sIt++, x2sIt++) {
304  thisSlotH->Fill(*x0sIt, *x1sIt, *x2sIt); // TODO: Can be optimised in case T == vector<double>
305  }
306  }
307  template <typename X0, typename X1, typename X2, typename X3,
308  typename std::enable_if<IsContainer<X0>::value && IsContainer<X1>::value && IsContainer<X2>::value &&
310  int>::type = 0>
311  void Exec(unsigned int slot, const X0 &x0s, const X1 &x1s, const X2 &x2s, const X3 &x3s)
312  {
313  auto thisSlotH = fTo->GetAtSlotRaw(slot);
314  if (!(x0s.size() == x1s.size() && x1s.size() == x2s.size() && x1s.size() == x3s.size())) {
315  throw std::runtime_error("Cannot fill histogram with values in containers of different sizes.");
316  }
317  auto x0sIt = std::begin(x0s);
318  const auto x0sEnd = std::end(x0s);
319  auto x1sIt = std::begin(x1s);
320  auto x2sIt = std::begin(x2s);
321  auto x3sIt = std::begin(x3s);
322  for (; x0sIt != x0sEnd; x0sIt++, x1sIt++, x2sIt++, x3sIt++) {
323  thisSlotH->Fill(*x0sIt, *x1sIt, *x2sIt, *x3sIt); // TODO: Can be optimised in case T == vector<double>
324  }
325  }
326 
327  void Initialize() { /* noop */}
328 
329  void Finalize() { fTo->Merge(); }
330 
331  HIST &PartialUpdate(unsigned int slot) { return *fTo->GetAtSlotRaw(slot); }
332 };
333 
334 // In case of the take helper we have 4 cases:
335 // 1. The column is not an RVec, the collection is not a vector
336 // 2. The column is not an RVec, the collection is a vector
337 // 3. The column is an RVec, the collection is not a vector
338 // 4. The column is an RVec, the collection is a vector
339 
340 // Case 1.: The column is not an RVec, the collection is not a vector
341 // No optimisations, no transformations: just copies.
342 template <typename RealT_t, typename T, typename COLL>
343 class TakeHelper : public RActionImpl<TakeHelper<RealT_t, T, COLL>> {
344  Results<std::shared_ptr<COLL>> fColls;
345 
346 public:
347  using ColumnTypes_t = TypeList<T>;
348  TakeHelper(const std::shared_ptr<COLL> &resultColl, const unsigned int nSlots)
349  {
350  fColls.emplace_back(resultColl);
351  for (unsigned int i = 1; i < nSlots; ++i)
352  fColls.emplace_back(std::make_shared<COLL>());
353  }
354  TakeHelper(TakeHelper &&) = default;
355  TakeHelper(const TakeHelper &) = delete;
356 
357  void InitTask(TTreeReader *, unsigned int) {}
358 
359  void Exec(unsigned int slot, T &v) { fColls[slot]->emplace_back(v); }
360 
361  void Initialize() { /* noop */}
362 
363  void Finalize()
364  {
365  auto rColl = fColls[0];
366  for (unsigned int i = 1; i < fColls.size(); ++i) {
367  auto &coll = fColls[i];
368  for (T &v : *coll) {
369  rColl->emplace_back(v);
370  }
371  }
372  }
373 
374  COLL &PartialUpdate(unsigned int slot) { return *fColls[slot].get(); }
375 };
376 
377 // Case 2.: The column is not an RVec, the collection is a vector
378 // Optimisations, no transformations: just copies.
379 template <typename RealT_t, typename T>
380 class TakeHelper<RealT_t, T, std::vector<T>> : public RActionImpl<TakeHelper<RealT_t, T, std::vector<T>>> {
381  Results<std::shared_ptr<std::vector<T>>> fColls;
382 
383 public:
384  using ColumnTypes_t = TypeList<T>;
385  TakeHelper(const std::shared_ptr<std::vector<T>> &resultColl, const unsigned int nSlots)
386  {
387  fColls.emplace_back(resultColl);
388  for (unsigned int i = 1; i < nSlots; ++i) {
389  auto v = std::make_shared<std::vector<T>>();
390  v->reserve(1024);
391  fColls.emplace_back(v);
392  }
393  }
394  TakeHelper(TakeHelper &&) = default;
395  TakeHelper(const TakeHelper &) = delete;
396 
397  void InitTask(TTreeReader *, unsigned int) {}
398 
399  void Exec(unsigned int slot, T &v) { fColls[slot]->emplace_back(v); }
400 
401  void Initialize() { /* noop */}
402 
403  // This is optimised to treat vectors
404  void Finalize()
405  {
406  ULong64_t totSize = 0;
407  for (auto &coll : fColls)
408  totSize += coll->size();
409  auto rColl = fColls[0];
410  rColl->reserve(totSize);
411  for (unsigned int i = 1; i < fColls.size(); ++i) {
412  auto &coll = fColls[i];
413  rColl->insert(rColl->end(), coll->begin(), coll->end());
414  }
415  }
416 
417  std::vector<T> &PartialUpdate(unsigned int slot) { return *fColls[slot]; }
418 };
419 
420 // Case 3.: The column is a RVec, the collection is not a vector
421 // No optimisations, transformations from RVecs to vectors
422 template <typename RealT_t, typename COLL>
423 class TakeHelper<RealT_t, RVec<RealT_t>, COLL> : public RActionImpl<TakeHelper<RealT_t, RVec<RealT_t>, COLL>> {
424  Results<std::shared_ptr<COLL>> fColls;
425 
426 public:
427  using ColumnTypes_t = TypeList<RVec<RealT_t>>;
428  TakeHelper(const std::shared_ptr<COLL> &resultColl, const unsigned int nSlots)
429  {
430  fColls.emplace_back(resultColl);
431  for (unsigned int i = 1; i < nSlots; ++i)
432  fColls.emplace_back(std::make_shared<COLL>());
433  }
434  TakeHelper(TakeHelper &&) = default;
435  TakeHelper(const TakeHelper &) = delete;
436 
437  void InitTask(TTreeReader *, unsigned int) {}
438 
439  void Exec(unsigned int slot, RVec<RealT_t> av) { fColls[slot]->emplace_back(av.begin(), av.end()); }
440 
441  void Initialize() { /* noop */}
442 
443  void Finalize()
444  {
445  auto rColl = fColls[0];
446  for (unsigned int i = 1; i < fColls.size(); ++i) {
447  auto &coll = fColls[i];
448  for (auto &v : *coll) {
449  rColl->emplace_back(v);
450  }
451  }
452  }
453 };
454 
455 // Case 4.: The column is an RVec, the collection is a vector
456 // Optimisations, transformations from RVecs to vectors
457 template <typename RealT_t>
458 class TakeHelper<RealT_t, RVec<RealT_t>, std::vector<RealT_t>>
459  : public RActionImpl<TakeHelper<RealT_t, RVec<RealT_t>, std::vector<RealT_t>>> {
460 
461  Results<std::shared_ptr<std::vector<std::vector<RealT_t>>>> fColls;
462 
463 public:
464  using ColumnTypes_t = TypeList<RVec<RealT_t>>;
465  TakeHelper(const std::shared_ptr<std::vector<std::vector<RealT_t>>> &resultColl, const unsigned int nSlots)
466  {
467  fColls.emplace_back(resultColl);
468  for (unsigned int i = 1; i < nSlots; ++i) {
469  auto v = std::make_shared<std::vector<RealT_t>>();
470  v->reserve(1024);
471  fColls.emplace_back(v);
472  }
473  }
474  TakeHelper(TakeHelper &&) = default;
475  TakeHelper(const TakeHelper &) = delete;
476 
477  void InitTask(TTreeReader *, unsigned int) {}
478 
479  void Exec(unsigned int slot, RVec<RealT_t> av) { fColls[slot]->emplace_back(av.begin(), av.end()); }
480 
481  void Initialize() { /* noop */}
482 
483  // This is optimised to treat vectors
484  void Finalize()
485  {
486  ULong64_t totSize = 0;
487  for (auto &coll : fColls)
488  totSize += coll->size();
489  auto rColl = fColls[0];
490  rColl->reserve(totSize);
491  for (unsigned int i = 1; i < fColls.size(); ++i) {
492  auto &coll = fColls[i];
493  rColl->insert(rColl->end(), coll->begin(), coll->end());
494  }
495  }
496 };
497 
498 template <typename ResultType>
499 class MinHelper : public RActionImpl<MinHelper<ResultType>> {
500  const std::shared_ptr<ResultType> fResultMin;
501  Results<ResultType> fMins;
502 
503 public:
504  MinHelper(MinHelper &&) = default;
505  MinHelper(const std::shared_ptr<ResultType> &minVPtr, const unsigned int nSlots)
506  : fResultMin(minVPtr), fMins(nSlots, std::numeric_limits<ResultType>::max())
507  {
508  }
509 
510  void Exec(unsigned int slot, ResultType v) { fMins[slot] = std::min(v, fMins[slot]); }
511 
512  void InitTask(TTreeReader *, unsigned int) {}
513 
514  template <typename T, typename std::enable_if<IsContainer<T>::value, int>::type = 0>
515  void Exec(unsigned int slot, const T &vs)
516  {
517  for (auto &&v : vs)
518  fMins[slot] = std::min(v, fMins[slot]);
519  }
520 
521  void Initialize() { /* noop */}
522 
523  void Finalize()
524  {
525  *fResultMin = std::numeric_limits<ResultType>::max();
526  for (auto &m : fMins)
527  *fResultMin = std::min(m, *fResultMin);
528  }
529 
530  ResultType &PartialUpdate(unsigned int slot) { return fMins[slot]; }
531 };
532 
533 // TODO
534 // extern template void MinHelper::Exec(unsigned int, const std::vector<float> &);
535 // extern template void MinHelper::Exec(unsigned int, const std::vector<double> &);
536 // extern template void MinHelper::Exec(unsigned int, const std::vector<char> &);
537 // extern template void MinHelper::Exec(unsigned int, const std::vector<int> &);
538 // extern template void MinHelper::Exec(unsigned int, const std::vector<unsigned int> &);
539 
540 template <typename ResultType>
541 class MaxHelper : public RActionImpl<MaxHelper<ResultType>> {
542  const std::shared_ptr<ResultType> fResultMax;
543  Results<ResultType> fMaxs;
544 
545 public:
546  MaxHelper(MaxHelper &&) = default;
547  MaxHelper(const MaxHelper &) = delete;
548  MaxHelper(const std::shared_ptr<ResultType> &maxVPtr, const unsigned int nSlots)
549  : fResultMax(maxVPtr), fMaxs(nSlots, std::numeric_limits<ResultType>::lowest())
550  {
551  }
552 
553  void InitTask(TTreeReader *, unsigned int) {}
554  void Exec(unsigned int slot, ResultType v) { fMaxs[slot] = std::max(v, fMaxs[slot]); }
555 
556  template <typename T, typename std::enable_if<IsContainer<T>::value, int>::type = 0>
557  void Exec(unsigned int slot, const T &vs)
558  {
559  for (auto &&v : vs)
560  fMaxs[slot] = std::max((ResultType)v, fMaxs[slot]);
561  }
562 
563  void Initialize() { /* noop */}
564 
565  void Finalize()
566  {
567  *fResultMax = std::numeric_limits<ResultType>::lowest();
568  for (auto &m : fMaxs) {
569  *fResultMax = std::max(m, *fResultMax);
570  }
571  }
572 
573  ResultType &PartialUpdate(unsigned int slot) { return fMaxs[slot]; }
574 };
575 
576 // TODO
577 // extern template void MaxHelper::Exec(unsigned int, const std::vector<float> &);
578 // extern template void MaxHelper::Exec(unsigned int, const std::vector<double> &);
579 // extern template void MaxHelper::Exec(unsigned int, const std::vector<char> &);
580 // extern template void MaxHelper::Exec(unsigned int, const std::vector<int> &);
581 // extern template void MaxHelper::Exec(unsigned int, const std::vector<unsigned int> &);
582 
583 template <typename ResultType>
584 class SumHelper : public RActionImpl<SumHelper<ResultType>> {
585  const std::shared_ptr<ResultType> fResultSum;
586  Results<ResultType> fSums;
587 
588  /// Evaluate neutral element for this type and the sum operation.
589  /// This is assumed to be any_value - any_value if operator- is defined
590  /// for the type, otherwise a default-constructed ResultType{} is used.
591  template <typename T = ResultType>
592  auto NeutralElement(const T &v, int /*overloadresolver*/) -> decltype(v - v)
593  {
594  return v - v;
595  }
596 
597  template <typename T = ResultType, typename Dummy = int>
598  ResultType NeutralElement(const T &, Dummy) // this overload has lower priority thanks to the template arg
599  {
600  return ResultType{};
601  }
602 
603 public:
604  SumHelper(SumHelper &&) = default;
605  SumHelper(const SumHelper &) = delete;
606  SumHelper(const std::shared_ptr<ResultType> &sumVPtr, const unsigned int nSlots)
607  : fResultSum(sumVPtr), fSums(nSlots, NeutralElement(*sumVPtr, -1))
608  {
609  }
610 
611  void InitTask(TTreeReader *, unsigned int) {}
612  void Exec(unsigned int slot, ResultType v) { fSums[slot] += v; }
613 
614  template <typename T, typename std::enable_if<IsContainer<T>::value, int>::type = 0>
615  void Exec(unsigned int slot, const T &vs)
616  {
617  for (auto &&v : vs)
618  fSums[slot] += static_cast<ResultType>(v);
619  }
620 
621  void Initialize() { /* noop */}
622 
623  void Finalize()
624  {
625  for (auto &m : fSums)
626  *fResultSum += m;
627  }
628 
629  ResultType &PartialUpdate(unsigned int slot) { return fSums[slot]; }
630 };
631 
632 class MeanHelper : public RActionImpl<MeanHelper> {
633  const std::shared_ptr<double> fResultMean;
634  std::vector<ULong64_t> fCounts;
635  std::vector<double> fSums;
636  std::vector<double> fPartialMeans;
637 
638 public:
639  MeanHelper(const std::shared_ptr<double> &meanVPtr, const unsigned int nSlots);
640  MeanHelper(MeanHelper &&) = default;
641  MeanHelper(const MeanHelper &) = delete;
642  void InitTask(TTreeReader *, unsigned int) {}
643  void Exec(unsigned int slot, double v);
644 
645  template <typename T, typename std::enable_if<IsContainer<T>::value, int>::type = 0>
646  void Exec(unsigned int slot, const T &vs)
647  {
648  for (auto &&v : vs) {
649  fSums[slot] += v;
650  fCounts[slot]++;
651  }
652  }
653 
654  void Initialize() { /* noop */}
655 
656  void Finalize();
657 
658  double &PartialUpdate(unsigned int slot);
659 };
660 
661 extern template void MeanHelper::Exec(unsigned int, const std::vector<float> &);
662 extern template void MeanHelper::Exec(unsigned int, const std::vector<double> &);
663 extern template void MeanHelper::Exec(unsigned int, const std::vector<char> &);
664 extern template void MeanHelper::Exec(unsigned int, const std::vector<int> &);
665 extern template void MeanHelper::Exec(unsigned int, const std::vector<unsigned int> &);
666 
667 /// Helper function for SnapshotHelper and SnapshotHelperMT. It creates new branches for the output TTree of a Snapshot.
668 template <typename T>
669 void SetBranchesHelper(TTree * /*inputTree*/, TTree &outputTree, const std::string & /*validName*/,
670  const std::string &name, T *address)
671 {
672  outputTree.Branch(name.c_str(), address);
673 }
674 
675 /// Helper function for SnapshotHelper and SnapshotHelperMT. It creates new branches for the output TTree of a Snapshot.
676 /// This overload is called for columns of type `RVec<T>`. For RDF, these can represent:
677 /// 1. c-style arrays in ROOT files, so we are sure that there are input trees to which we can ask the correct branch title
678 /// 2. RVecs coming from a custom column or a source
679 /// 3. vectors coming from ROOT files
680 template <typename T>
681 void SetBranchesHelper(TTree *inputTree, TTree &outputTree, const std::string &validName, const std::string &name,
682  RVec<T> *ab)
683 {
684  // Treat 2. and 3.:
685  // 2. RVec coming from a custom column or a source
686  // 3. RVec coming from a column on disk of type vector (the RVec is adopting the data of that vector)
687  auto *const inputBranch = inputTree ? inputTree->GetBranch(validName.c_str()) : nullptr;
688  auto mustWriteRVec =
689  !inputBranch || ROOT::ESTLType::kSTLvector == TClassEdit::IsSTLCont(inputBranch->GetClassName());
690  if (mustWriteRVec) {
691  outputTree.Branch(name.c_str(), reinterpret_cast<typename RVec<T>::Impl_t *>(ab));
692  return;
693  }
694 
695  // Treat 1, the C-array case
696  auto *const leaf = static_cast<TLeaf *>(inputBranch->GetListOfLeaves()->UncheckedAt(0));
697  const auto bname = leaf->GetName();
698  const auto counterStr =
699  leaf->GetLeafCount() ? std::string(leaf->GetLeafCount()->GetName()) : std::to_string(leaf->GetLenStatic());
700  const auto btype = leaf->GetTypeName();
701  const auto rootbtype = TypeName2ROOTTypeName(btype);
702  const auto leaflist = std::string(bname) + "[" + counterStr + "]/" + rootbtype;
703  auto *const outputBranch = outputTree.Branch(name.c_str(), ab->data(), leaflist.c_str());
704  outputBranch->SetTitle(inputBranch->GetTitle());
705 }
706 
707 /// Helper object for a single-thread Snapshot action
708 template <typename... BranchTypes>
709 class SnapshotHelper : public RActionImpl<SnapshotHelper<BranchTypes...>> {
710  const std::string fFileName;
711  const std::string fDirName;
712  const std::string fTreeName;
713  const RSnapshotOptions fOptions;
714  std::unique_ptr<TFile> fOutputFile;
715  std::unique_ptr<TTree> fOutputTree; // must be a ptr because TTrees are not copy/move constructible
716  bool fIsFirstEvent{true};
717  const ColumnNames_t fInputBranchNames; // This contains the resolved aliases
718  const ColumnNames_t fOutputBranchNames;
719  TTree *fInputTree = nullptr; // Current input tree. Set at initialization time (`InitTask`)
720 
721 public:
722  SnapshotHelper(std::string_view filename, std::string_view dirname, std::string_view treename,
723  const ColumnNames_t &vbnames, const ColumnNames_t &bnames, const RSnapshotOptions &options)
724  : fFileName(filename), fDirName(dirname), fTreeName(treename), fOptions(options), fInputBranchNames(vbnames),
725  fOutputBranchNames(ReplaceDotWithUnderscore(bnames))
726  {
727  }
728 
729  SnapshotHelper(const SnapshotHelper &) = delete;
730  SnapshotHelper(SnapshotHelper &&) = default;
731 
732  void InitTask(TTreeReader *r, unsigned int /* slot */)
733  {
734  if (!r) // empty source, nothing to do
735  return;
736  fInputTree = r->GetTree();
737  // AddClone guarantees that if the input file changes the branches of the output tree are updated with the new
738  // addresses of the branch values
739  fInputTree->AddClone(fOutputTree.get());
740  }
741 
742  void Exec(unsigned int /* slot */, BranchTypes &... values)
743  {
744  if (fIsFirstEvent) {
745  using ind_t = std::index_sequence_for<BranchTypes...>;
746  SetBranches(values..., ind_t());
747  }
748  fOutputTree->Fill();
749  }
750 
751  template <std::size_t... S>
752  void SetBranches(BranchTypes &... values, std::index_sequence<S...> /*dummy*/)
753  {
754  // call TTree::Branch on all variadic template arguments
755  int expander[] = {
756  (SetBranchesHelper(fInputTree, *fOutputTree, fInputBranchNames[S], fOutputBranchNames[S], &values), 0)..., 0};
757  (void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
758  fIsFirstEvent = false;
759  }
760 
761  void Initialize()
762  {
763  fOutputFile.reset(
764  TFile::Open(fFileName.c_str(), fOptions.fMode.c_str(), /*ftitle=*/"",
765  ROOT::CompressionSettings(fOptions.fCompressionAlgorithm, fOptions.fCompressionLevel)));
766 
767  if (!fDirName.empty()) {
768  fOutputFile->mkdir(fDirName.c_str());
769  fOutputFile->cd(fDirName.c_str());
770  }
771 
772  fOutputTree.reset(
773  new TTree(fTreeName.c_str(), fTreeName.c_str(), fOptions.fSplitLevel, /*dir=*/fOutputFile.get()));
774 
775  if (fOptions.fAutoFlush)
776  fOutputTree->SetAutoFlush(fOptions.fAutoFlush);
777  }
778 
779  void Finalize()
780  {
781  if (fOutputFile && fOutputTree) {
782  ::TDirectory::TContext ctxt(fOutputFile->GetDirectory(fDirName.c_str()));
783  fOutputTree->Write();
784  } else {
785  Warning("Snapshot", "A lazy Snapshot action was booked but never triggered.");
786  }
787 
788  }
789 };
790 
791 /// Helper object for a multi-thread Snapshot action
792 template <typename... BranchTypes>
793 class SnapshotHelperMT : public RActionImpl<SnapshotHelperMT<BranchTypes...>> {
794  const unsigned int fNSlots;
795  std::unique_ptr<ROOT::Experimental::TBufferMerger> fMerger; // must use a ptr because TBufferMerger is not movable
796  std::vector<std::shared_ptr<ROOT::Experimental::TBufferMergerFile>> fOutputFiles;
797  std::vector<std::stack<std::unique_ptr<TTree>>> fOutputTrees;
798  std::vector<int> fIsFirstEvent; // vector<bool> is evil
799  const std::string fFileName; // name of the output file name
800  const std::string fDirName; // name of TFile subdirectory in which output must be written (possibly empty)
801  const std::string fTreeName; // name of output tree
802  const RSnapshotOptions fOptions; // struct holding options to pass down to TFile and TTree in this action
803  const ColumnNames_t fInputBranchNames; // This contains the resolved aliases
804  const ColumnNames_t fOutputBranchNames;
805  std::vector<TTree *> fInputTrees; // Current input trees. Set at initialization time (`InitTask`)
806 
807 public:
808  using ColumnTypes_t = TypeList<BranchTypes...>;
809  SnapshotHelperMT(const unsigned int nSlots, std::string_view filename, std::string_view dirname,
810  std::string_view treename, const ColumnNames_t &vbnames, const ColumnNames_t &bnames,
811  const RSnapshotOptions &options)
812  : fNSlots(nSlots), fOutputFiles(fNSlots), fOutputTrees(fNSlots), fIsFirstEvent(fNSlots, 1), fFileName(filename),
813  fDirName(dirname), fTreeName(treename), fOptions(options), fInputBranchNames(vbnames),
814  fOutputBranchNames(ReplaceDotWithUnderscore(bnames)), fInputTrees(fNSlots)
815  {
816  }
817  SnapshotHelperMT(const SnapshotHelperMT &) = delete;
818  SnapshotHelperMT(SnapshotHelperMT &&) = default;
819 
820  void InitTask(TTreeReader *r, unsigned int slot)
821  {
822  ::TDirectory::TContext c; // do not let tasks change the thread-local gDirectory
823  if (!fOutputFiles[slot]) {
824  // first time this thread executes something, let's create a TBufferMerger output directory
825  fOutputFiles[slot] = fMerger->GetFile();
826  }
827  TDirectory *treeDirectory = fOutputFiles[slot].get();
828  if (!fDirName.empty()) {
829  treeDirectory = fOutputFiles[slot]->mkdir(fDirName.c_str());
830  }
831  // re-create output tree as we need to create its branches again, with new input variables
832  // TODO we could instead create the output tree and its branches, change addresses of input variables in each task
833  fOutputTrees[slot].emplace(
834  std::make_unique<TTree>(fTreeName.c_str(), fTreeName.c_str(), fOptions.fSplitLevel, /*dir=*/treeDirectory));
835  if (fOptions.fAutoFlush)
836  fOutputTrees[slot].top()->SetAutoFlush(fOptions.fAutoFlush);
837  if (r) {
838  // not an empty-source RDF
839  fInputTrees[slot] = r->GetTree();
840  // AddClone guarantees that if the input file changes the branches of the output tree are updated with the new
841  // addresses of the branch values. We need this in case of friend trees with different cluster granularity
842  // than the main tree.
843  // FIXME: AddClone might result in many many (safe) warnings printed by TTree::CopyAddresses, see ROOT-9487.
844  const auto friendsListPtr = fInputTrees[slot]->GetListOfFriends();
845  if (friendsListPtr && friendsListPtr->GetEntries() > 0)
846  fInputTrees[slot]->AddClone(fOutputTrees[slot].top().get());
847  }
848  fIsFirstEvent[slot] = 1; // reset first event flag for this slot
849  }
850 
851  void FinalizeTask(unsigned int slot)
852  {
853  if (fOutputTrees[slot].top()->GetEntries() > 0)
854  fOutputFiles[slot]->Write();
855  // clear now to avoid concurrent destruction of output trees and input tree (which has them listed as fClones)
856  fOutputTrees[slot].pop();
857  }
858 
859  void Exec(unsigned int slot, BranchTypes &... values)
860  {
861  if (fIsFirstEvent[slot]) {
862  using ind_t = std::index_sequence_for<BranchTypes...>;
863  SetBranches(slot, values..., ind_t());
864  fIsFirstEvent[slot] = 0;
865  }
866  fOutputTrees[slot].top()->Fill();
867  auto entries = fOutputTrees[slot].top()->GetEntries();
868  auto autoFlush = fOutputTrees[slot].top()->GetAutoFlush();
869  if ((autoFlush > 0) && (entries % autoFlush == 0))
870  fOutputFiles[slot]->Write();
871  }
872 
873  template <std::size_t... S>
874  void SetBranches(unsigned int slot, BranchTypes &... values, std::index_sequence<S...> /*dummy*/)
875  {
876  // hack to call TTree::Branch on all variadic template arguments
877  int expander[] = {(SetBranchesHelper(fInputTrees[slot], *fOutputTrees[slot].top(), fInputBranchNames[S],
878  fOutputBranchNames[S], &values),
879  0)...,
880  0};
881  (void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
882  (void)slot; // avoid unused variable warnings in gcc6.2
883  }
884 
885  void Initialize()
886  {
887  const auto cs = ROOT::CompressionSettings(fOptions.fCompressionAlgorithm, fOptions.fCompressionLevel);
888  fMerger.reset(new ROOT::Experimental::TBufferMerger(fFileName.c_str(), fOptions.fMode.c_str(), cs));
889  }
890 
891  void Finalize()
892  {
893  auto fileWritten = false;
894  for (auto &file : fOutputFiles) {
895  if (file) {
896  file->Write();
897  fileWritten = true;
898  }
899  }
900 
901  if (!fileWritten) {
902  Warning("Snapshot", "A lazy Snapshot action was booked but never triggered.");
903  }
904  }
905 
906 };
907 
908 template <typename Acc, typename Merge, typename R, typename T, typename U,
909  bool MustCopyAssign = std::is_same<R, U>::value>
910 class AggregateHelper : public RActionImpl<AggregateHelper<Acc, Merge, R, T, U, MustCopyAssign>> {
911  Acc fAggregate;
912  Merge fMerge;
913  const std::shared_ptr<U> fResult;
914  Results<U> fAggregators;
915 
916 public:
917  using ColumnTypes_t = TypeList<T>;
918  AggregateHelper(Acc &&f, Merge &&m, const std::shared_ptr<U> &result, const unsigned int nSlots)
919  : fAggregate(std::move(f)), fMerge(std::move(m)), fResult(result), fAggregators(nSlots, *result)
920  {
921  }
922  AggregateHelper(AggregateHelper &&) = default;
923  AggregateHelper(const AggregateHelper &) = delete;
924 
925  void InitTask(TTreeReader *, unsigned int) {}
926 
928  void Exec(unsigned int slot, const T &value)
929  {
930  fAggregators[slot] = fAggregate(fAggregators[slot], value);
931  }
932 
934  void Exec(unsigned int slot, const T &value)
935  {
936  fAggregate(fAggregators[slot], value);
937  }
938 
939  void Initialize() { /* noop */}
940 
941  template <typename MergeRet = typename CallableTraits<Merge>::ret_type,
942  bool MergeAll = std::is_same<void, MergeRet>::value>
943  typename std::enable_if<MergeAll, void>::type Finalize()
944  {
945  fMerge(fAggregators);
946  *fResult = fAggregators[0];
947  }
948 
949  template <typename MergeRet = typename CallableTraits<Merge>::ret_type,
950  bool MergeTwoByTwo = std::is_same<U, MergeRet>::value>
951  typename std::enable_if<MergeTwoByTwo, void>::type Finalize(...) // ... needed to let compiler distinguish overloads
952  {
953  for (auto &acc : fAggregators)
954  *fResult = fMerge(*fResult, acc);
955  }
956 
957  U &PartialUpdate(unsigned int slot) { return fAggregators[slot]; }
958 };
959 
960 } // end of NS RDF
961 } // end of NS Internal
962 } // end of NS ROOT
963 
964 /// \endcond
965 
966 #endif
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition: TLeaf.h:32
ROOT::ESTLType IsSTLCont(std::string_view type)
type : type name: vector<list<classA,allocator>,allocator> result: 0 : not stl container code of cont...
auto * m
Definition: textangle.C:8
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:43
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
void ws()
Definition: ws.C:62
double T(double x)
Definition: ChebyshevPol.h:34
T * data() noexcept
Definition: RVec.hxx:239
#define f(i)
Definition: RSha256.hxx:104
iterator begin() noexcept
Definition: RVec.hxx:242
STL namespace.
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3976
typename std::vector< T, ::ROOT::Detail::VecOps::RAdoptAllocator< T > > Impl_t
Definition: RVec.hxx:148
TTree * GetTree() const
Definition: TTreeReader.h:162
static const double x2[5]
A collection of options to steer the creation of the dataset on file.
std::vector< std::string > ReplaceDotWithUnderscore(const std::vector< std::string > &columnNames)
Replace occurrences of &#39;.
Definition: RDFUtils.cxx:257
A "std::vector"-like collection of values implementing handy operation to analyse them...
Definition: RVec.hxx:146
char TypeName2ROOTTypeName(const std::string &b)
Convert type name (e.g.
Definition: RDFUtils.cxx:217
RooArgSet S(const RooAbsArg &v1)
#define F(x, y, z)
ROOT::R::TRInterface & r
Definition: Object.C:4
void Initialize(Bool_t useTMVAStyle=kTRUE)
Definition: tmvaglob.cxx:176
iterator end() noexcept
Definition: RVec.hxx:245
SVector< double, 2 > v
Definition: Dict.h:5
TBufferMerger is a class to facilitate writing data in parallel from multiple threads, while writing to a single output file.
void Warning(const char *location, const char *msgfmt,...)
#define h(i)
Definition: RSha256.hxx:106
Lightweight storage for a collection of types.
Definition: TypeTraits.hxx:27
static const double x1[5]
Describe directory structure in memory.
Definition: TDirectory.h:34
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
basic_string_view< char > string_view
Definition: RStringView.hxx:35
ROOT type_traits extensions.
Definition: TypeTraits.hxx:23
make_index_sequence< sizeof...(_Tp)> index_sequence_for
typedef void((*Func_t)())
std::shared_ptr< TBufferMergerFile > GetFile()
Returns a TBufferMergerFile to which data can be written.
Definition: file.py:1
THist< 1, double, THistStatContent, THistStatUncertainty > TH1D
Definition: THist.hxx:284
Check for container traits.
Definition: TypeTraits.hxx:98
#define c(i)
Definition: RSha256.hxx:101
char name[80]
Definition: TGX11.cxx:109
static const double x3[11]