Logo ROOT   6.14/05
Reference Guide
RResultPtr.hxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #ifndef ROOT_RRESULTPTR
12 #define ROOT_RRESULTPTR
13 
14 #include "ROOT/TypeTraits.hxx"
15 #include "ROOT/RDFNodes.hxx"
16 #include "TError.h" // Warning
17 
18 #include <memory>
19 #include <functional>
20 
21 namespace ROOT {
22 
23 
24 namespace RDF {
25 // Fwd decl for MakeResultPtr
26 template <typename T>
27 class RResultPtr;
28 } // ns RDF
29 
30 namespace Detail {
31 namespace RDF {
33 // Fwd decl for RResultPtr
34 template <typename T>
35 RResultPtr<T> MakeResultPtr(const std::shared_ptr<T> &r, const std::shared_ptr<RLoopManager> &df,
37 template <typename T>
38 std::pair<RResultPtr<T>, std::shared_ptr<ROOT::Internal::RDF::RActionBase *>>
39 MakeResultPtr(const std::shared_ptr<T> &r, const std::shared_ptr<RLoopManager> &df);
40 } // ns RDF
41 } // ns Detail
42 
43 
44 namespace RDF {
46 namespace RDFDetail = ROOT::Detail::RDF;
47 namespace TTraits = ROOT::TypeTraits;
48 
49 /// Smart pointer for the return type of actions
50 /**
51 \class ROOT::RDF::RResultPtr
52 \ingroup dataframe
53 \brief A wrapper around the result of RDataFrame actions able to trigger calculations lazily.
54 \tparam T Type of the action result
55 
56 A smart pointer which allows to access the result of a RDataFrame action. The
57 methods of the encapsulated object can be accessed via the arrow operator.
58 Upon invocation of the arrow operator or dereferencing (`operator*`), the
59 loop on the events and calculations of all scheduled actions are executed
60 if needed.
61 It is possible to iterate on the result proxy if the proxied object is a collection.
62 ~~~{.cpp}
63 for (auto& myItem : myResultProxy) { ... };
64 ~~~
65 If iteration is not supported by the type of the proxied object, a compilation error is thrown.
66 
67 */
68 template <typename T>
69 class RResultPtr {
70  // private using declarations
71  using SPT_t = std::shared_ptr<T>;
72  using SPTLM_t = std::shared_ptr<RDFDetail::RLoopManager>;
73  using WPTLM_t = std::weak_ptr<RDFDetail::RLoopManager>;
74  using ShrdPtrBool_t = std::shared_ptr<bool>;
75 
76  // friend declarations
77  template <typename T1>
78  friend RResultPtr<T1>
79  RDFDetail::MakeResultPtr(const std::shared_ptr<T1> &, const SPTLM_t &, RDFInternal::RActionBase *);
80  template <typename T1>
81  friend std::pair<RResultPtr<T1>, std::shared_ptr<RDFInternal::RActionBase *>>
82  RDFDetail::MakeResultPtr(const std::shared_ptr<T1> &, const SPTLM_t &);
83  template <class T1, class T2>
84  friend bool operator==(const RResultPtr<T1> &lhs, const RResultPtr<T2> &rhs);
85  template <class T1, class T2>
86  friend bool operator!=(const RResultPtr<T1> &lhs, const RResultPtr<T2> &rhs);
87  template <class T1>
88  friend bool operator==(const RResultPtr<T1> &lhs, std::nullptr_t rhs);
89  template <class T1>
90  friend bool operator==(std::nullptr_t lhs, const RResultPtr<T1> &rhs);
91  template <class T1>
92  friend bool operator!=(const RResultPtr<T1> &lhs, std::nullptr_t rhs);
93  template <class T1>
94  friend bool operator!=(std::nullptr_t lhs, const RResultPtr<T1> &rhs);
95 
96  /// \cond HIDDEN_SYMBOLS
97  template <typename V, bool hasBeginEnd = TTraits::HasBeginAndEnd<V>::value>
98  struct TIterationHelper {
99  using Iterator_t = void;
100  void GetBegin(const V &) { static_assert(sizeof(V) == 0, "It does not make sense to ask begin for this class."); }
101  void GetEnd(const V &) { static_assert(sizeof(V) == 0, "It does not make sense to ask end for this class."); }
102  };
103 
104  template <typename V>
105  struct TIterationHelper<V, true> {
106  using Iterator_t = decltype(std::begin(std::declval<V>()));
107  static Iterator_t GetBegin(const V &v) { return std::begin(v); };
108  static Iterator_t GetEnd(const V &v) { return std::end(v); };
109  };
110  /// \endcond
111 
112  /// State registered also in the RLoopManager until the event loop is executed
113  ShrdPtrBool_t fReadiness = std::make_shared<bool>(false);
114  WPTLM_t fImplWeakPtr; ///< Points to the RLoopManager at the root of the functional graph
115  SPT_t fObjPtr; ///< Shared pointer encapsulating the wrapped result
116  /// Shared_ptr to a _pointer_ to the RDF action that produces this result. It is set at construction time for
117  /// non-jitted actions, and at jitting time for jitted actions (at the time of writing, this means right
118  /// before the event-loop).
119  // N.B. what's on the heap is the _pointer_ to RActionBase, we are _not_ taking shared ownership of a RAction.
120  // This cannot be a unique_ptr because that would disallow copy-construction of TResultProxies.
121  // It cannot be just a pointer to RActionBase because we need something to store in the callback callable that will
122  // be passed to RLoopManager _before_ the pointer to RActionBase is set in the case of jitted actions.
123  std::shared_ptr<RDFInternal::RActionBase *> fActionPtrPtr;
124 
125  /// Triggers the event loop in the RLoopManager instance to which it's associated via the fImplWeakPtr
126  void TriggerRun();
127 
128  /// Get the pointer to the encapsulated result.
129  /// Ownership is not transferred to the caller.
130  /// Triggers event loop and execution of all actions booked in the associated RLoopManager.
131  T *Get()
132  {
133  if (!*fReadiness)
134  TriggerRun();
135  return fObjPtr.get();
136  }
137 
138  RResultPtr(const SPT_t &objPtr, const ShrdPtrBool_t &readiness, const SPTLM_t &loopManager,
139  RDFInternal::RActionBase *actionPtr = nullptr)
140  : fReadiness(readiness), fImplWeakPtr(loopManager), fObjPtr(objPtr),
141  fActionPtrPtr(new (RDFInternal::RActionBase *)(actionPtr))
142  {
143  }
144 
145  std::shared_ptr<RDFInternal::RActionBase *> GetActionPtrPtr() const { return fActionPtrPtr; }
146 
147 public:
148  using Value_t = T; ///< Convenience alias to simplify access to proxied type
149  static constexpr ULong64_t kOnce = 0ull; ///< Convenience definition to express a callback must be executed once
150 
151  RResultPtr() = default;
152  RResultPtr(const RResultPtr &) = default;
153  RResultPtr(RResultPtr &&) = default;
154  RResultPtr &operator=(const RResultPtr &) = default;
155  RResultPtr &operator=(RResultPtr &&) = default;
156  explicit operator bool() const { return bool(fObjPtr); }
157 
158  /// Get a const reference to the encapsulated object.
159  /// Triggers event loop and execution of all actions booked in the associated RLoopManager.
160  const T &GetValue() { return *Get(); }
161 
162  /// Get the pointer to the encapsulated object.
163  /// Triggers event loop and execution of all actions booked in the associated RLoopManager.
164  T *GetPtr() { return Get(); }
165 
166  /// Get a pointer to the encapsulated object.
167  /// Triggers event loop and execution of all actions booked in the associated RLoopManager.
168  T &operator*() { return *Get(); }
169 
170  /// Get a pointer to the encapsulated object.
171  /// Ownership is not transferred to the caller.
172  /// Triggers event loop and execution of all actions booked in the associated RLoopManager.
173  T *operator->() { return Get(); }
174 
175  /// Return an iterator to the beginning of the contained object if this makes
176  /// sense, throw a compilation error otherwise
177  typename TIterationHelper<T>::Iterator_t begin()
178  {
179  if (!*fReadiness)
180  TriggerRun();
181  return TIterationHelper<T>::GetBegin(*fObjPtr);
182  }
183 
184  /// Return an iterator to the end of the contained object if this makes
185  /// sense, throw a compilation error otherwise
186  typename TIterationHelper<T>::Iterator_t end()
187  {
188  if (!*fReadiness)
189  TriggerRun();
190  return TIterationHelper<T>::GetEnd(*fObjPtr);
191  }
192 
193  // clang-format off
194  /// Register a callback that RDataFrame will execute "everyNEvents" on a partial result.
195  ///
196  /// \param[in] everyNEvents Frequency at which the callback will be called, as a number of events processed
197  /// \param[in] callback a callable with signature `void(Value_t&)` where Value_t is the type of the value contained in this RResultPtr
198  /// \return this RResultPtr, to allow chaining of OnPartialResultSlot with other calls
199  ///
200  /// The callback must be a callable (lambda, function, functor class...) that takes a reference to the result type as
201  /// argument and returns nothing. RDataFrame will invoke registered callbacks passing partial action results as
202  /// arguments to them (e.g. a histogram filled with a part of the selected events, a counter incremented only up to a
203  /// certain point, a mean over a subset of the events and so forth).
204  ///
205  /// Callbacks can be used e.g. to inspect partial results of the analysis while the event loop is running. For
206  /// example one can draw an up-to-date version of a result histogram every 100 entries like this:
207  /// \code{.cpp}
208  /// auto h = tdf.Histo1D("x");
209  /// TCanvas c("c","x hist");
210  /// h.OnPartialResult(100, [&c](TH1D &h_) { c.cd(); h_.Draw(); c.Update(); });
211  /// h->Draw(); // event loop runs here, this `Draw` is executed after the event loop is finished
212  /// \endcode
213  ///
214  /// A value of 0 for everyNEvents indicates the callback must be executed only once, before running the event loop.
215  /// A conveniece definition `kOnce` is provided to make this fact more expressive in user code (see snippet below).
216  /// Multiple callbacks can be registered with the same RResultPtr (i.e. results of RDataFrame actions) and will
217  /// be executed sequentially. Callbacks are executed in the order they were registered.
218  /// The type of the value contained in a RResultPtr is also available as RResultPtr<T>::Value_t, e.g.
219  /// \code{.cpp}
220  /// auto h = tdf.Histo1D("x");
221  /// // h.kOnce is 0
222  /// // decltype(h)::Value_t is TH1D
223  /// \endcode
224  ///
225  /// When implicit multi-threading is enabled, the callback:
226  /// - will never be executed by multiple threads concurrently: it needs not be thread-safe. For example the snippet
227  /// above that draws the partial histogram on a canvas works seamlessly in multi-thread event loops.
228  /// - will always be executed "everyNEvents": partial results will "contain" that number of events more from
229  /// one call to the next
230  /// - might be executed by a different worker thread at different times: the value of `std::this_thread::get_id()`
231  /// might change between calls
232  /// To register a callback that is called by _each_ worker thread (concurrently) every N events one can use
233  /// OnPartialResultSlot.
234  // clang-format on
235  RResultPtr<T> &OnPartialResult(ULong64_t everyNEvents, std::function<void(T &)> callback)
236  {
237  auto lm = fImplWeakPtr.lock();
238  if (!lm)
239  throw std::runtime_error("The main RDataFrame is not reachable: did it go out of scope?");
240  const auto nSlots = lm->GetNSlots();
241  auto actionPtrPtr = fActionPtrPtr.get();
242  auto c = [nSlots, actionPtrPtr, callback](unsigned int slot) {
243  if (slot != nSlots - 1)
244  return;
245  auto partialResult = static_cast<Value_t *>((*actionPtrPtr)->PartialUpdate(slot));
246  callback(*partialResult);
247  };
248  lm->RegisterCallback(everyNEvents, std::move(c));
249  return *this;
250  }
251 
252  // clang-format off
253  /// Register a callback that RDataFrame will execute in each worker thread concurrently on that thread's partial result.
254  ///
255  /// \param[in] everyNEvents Frequency at which the callback will be called by each thread, as a number of events processed
256  /// \param[in] a callable with signature `void(unsigned int, Value_t&)` where Value_t is the type of the value contained in this RResultPtr
257  /// \return this RResultPtr, to allow chaining of OnPartialResultSlot with other calls
258  ///
259  /// See `OnPartialResult` for a generic explanation of the callback mechanism.
260  /// Compared to `OnPartialResult`, this method has two major differences:
261  /// - all worker threads invoke the callback once every specified number of events. The event count is per-thread,
262  /// and callback invocation might happen concurrently (i.e. the callback must be thread-safe)
263  /// - the callable must take an extra `unsigned int` parameter corresponding to a multi-thread "processing slot":
264  /// this is a "helper value" to simplify writing thread-safe callbacks: different worker threads might invoke the
265  /// callback concurrently but always with different `slot` numbers.
266  /// - a value of 0 for everyNEvents indicates the callback must be executed once _per slot_.
267  ///
268  /// For example, the following snippet prints out a thread-safe progress bar of the events processed by RDataFrame
269  /// \code
270  /// auto c = tdf.Count(); // any action would do, but `Count` is the most lightweight
271  /// std::string progress;
272  /// std::mutex bar_mutex;
273  /// c.OnPartialResultSlot(nEvents / 100, [&progress, &bar_mutex](unsigned int, ULong64_t &) {
274  /// std::lock_guard<std::mutex> lg(bar_mutex);
275  /// progress.push_back('#');
276  /// std::cout << "\r[" << std::left << std::setw(100) << progress << ']' << std::flush;
277  /// });
278  /// std::cout << "Analysis running..." << std::endl;
279  /// *c; // trigger the event loop by accessing an action's result
280  /// std::cout << "\nDone!" << std::endl;
281  /// \endcode
282  // clang-format on
283  RResultPtr<T> &OnPartialResultSlot(ULong64_t everyNEvents, std::function<void(unsigned int, T &)> callback)
284  {
285  auto lm = fImplWeakPtr.lock();
286  if (!lm)
287  throw std::runtime_error("The main RDataFrame is not reachable: did it go out of scope?");
288  auto actionPtrPtr = fActionPtrPtr.get();
289  auto c = [actionPtrPtr, callback](unsigned int slot) {
290  auto partialResult = static_cast<Value_t *>((*actionPtrPtr)->PartialUpdate(slot));
291  callback(slot, *partialResult);
292  };
293  lm->RegisterCallback(everyNEvents, std::move(c));
294  return *this;
295  }
296 };
297 
298 template <typename T>
300 {
301  auto df = fImplWeakPtr.lock();
302  if (!df) {
303  throw std::runtime_error("The main RDataFrame is not reachable: did it go out of scope?");
304  }
305  df->Run();
306 }
307 
308 template <class T1, class T2>
309 bool operator==(const RResultPtr<T1> &lhs, const RResultPtr<T2> &rhs)
310 {
311  return lhs.fObjPtr == rhs.fObjPtr;
312 }
313 
314 template <class T1, class T2>
315 bool operator!=(const RResultPtr<T1> &lhs, const RResultPtr<T2> &rhs)
316 {
317  return lhs.fObjPtr != rhs.fObjPtr;
318 }
319 
320 template <class T1>
321 bool operator==(const RResultPtr<T1> &lhs, std::nullptr_t rhs)
322 {
323  return lhs.fObjPtr == rhs;
324 }
325 
326 template <class T1>
327 bool operator==(std::nullptr_t lhs, const RResultPtr<T1> &rhs)
328 {
329  return lhs == rhs.fObjPtr;
330 }
331 
332 template <class T1>
333 bool operator!=(const RResultPtr<T1> &lhs, std::nullptr_t rhs)
334 {
335  return lhs.fObjPtr != rhs;
336 }
337 
338 template <class T1>
339 bool operator!=(std::nullptr_t lhs, const RResultPtr<T1> &rhs)
340 {
341  return lhs != rhs.fObjPtr;
342 }
343 
344 } // end NS RDF
345 
346 
347 namespace Detail {
348 namespace RDF {
349 /// Create a RResultPtr and set its pointer to the corresponding RAction
350 /// This overload is invoked by non-jitted actions, as they have access to RAction before constructing RResultPtr.
351 template <typename T>
353 MakeResultPtr(const std::shared_ptr<T> &r, const std::shared_ptr<RLoopManager> &df, RDFInternal::RActionBase *actionPtr)
354 {
355  auto readiness = std::make_shared<bool>(false);
356  auto resPtr = RResultPtr<T>(r, readiness, df, actionPtr);
357  df->Book(readiness);
358  return resPtr;
359 }
360 
361 /// Create a RResultPtr and return it together with its pointer to RAction
362 /// This overload is invoked by jitted actions; the pointer to RAction will be set right before the loop by jitted code
363 template <typename T>
364 std::pair<RResultPtr<T>, std::shared_ptr<RDFInternal::RActionBase *>>
365 MakeResultPtr(const std::shared_ptr<T> &r, const std::shared_ptr<RLoopManager> &df)
366 {
367  auto readiness = std::make_shared<bool>(false);
368  auto resPtr = RResultPtr<T>(r, readiness, df);
369  df->Book(readiness);
370  return std::make_pair(resPtr, resPtr.GetActionPtrPtr());
371 }
372 } // end NS RDF
373 } // end NS Detail
374 } // end NS ROOT
375 
376 #endif // ROOT_TRESULTPROXY
T * Get()
Get the pointer to the encapsulated result.
Definition: RResultPtr.hxx:131
RResultPtr< T > & OnPartialResultSlot(ULong64_t everyNEvents, std::function< void(unsigned int, T &)> callback)
Register a callback that RDataFrame will execute in each worker thread concurrently on that thread&#39;s ...
Definition: RResultPtr.hxx:283
Smart pointer for the return type of actions.
Definition: RResultPtr.hxx:27
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
double T(double x)
Definition: ChebyshevPol.h:34
std::shared_ptr< RDFDetail::RLoopManager > SPTLM_t
Definition: RResultPtr.hxx:72
RResultPtr(const SPT_t &objPtr, const ShrdPtrBool_t &readiness, const SPTLM_t &loopManager, RDFInternal::RActionBase *actionPtr=nullptr)
Definition: RResultPtr.hxx:138
void TriggerRun()
Triggers the event loop in the RLoopManager instance to which it&#39;s associated via the fImplWeakPtr...
Definition: RResultPtr.hxx:299
std::shared_ptr< RDFInternal::RActionBase * > fActionPtrPtr
Shared_ptr to a pointer to the RDF action that produces this result.
Definition: RResultPtr.hxx:123
SPT_t fObjPtr
Shared pointer encapsulating the wrapped result.
Definition: RResultPtr.hxx:115
bool operator==(const RResultPtr< T1 > &lhs, const RResultPtr< T2 > &rhs)
Definition: RResultPtr.hxx:309
T Value_t
Convenience alias to simplify access to proxied type.
Definition: RResultPtr.hxx:148
RResultPtr< T > & OnPartialResult(ULong64_t everyNEvents, std::function< void(T &)> callback)
Register a callback that RDataFrame will execute "everyNEvents" on a partial result.
Definition: RResultPtr.hxx:235
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:146
bool operator!=(const RResultPtr< T1 > &lhs, const RResultPtr< T2 > &rhs)
Definition: RResultPtr.hxx:315
std::weak_ptr< RDFDetail::RLoopManager > WPTLM_t
Definition: RResultPtr.hxx:73
T * operator->()
Get a pointer to the encapsulated object.
Definition: RResultPtr.hxx:173
ROOT::R::TRInterface & r
Definition: Object.C:4
SVector< double, 2 > v
Definition: Dict.h:5
std::shared_ptr< T > SPT_t
Definition: RResultPtr.hxx:71
std::pair< RResultPtr< T >, std::shared_ptr< RDFInternal::RActionBase * > > MakeResultPtr(const std::shared_ptr< T > &r, const std::shared_ptr< RLoopManager > &df)
Create a RResultPtr and return it together with its pointer to RAction This overload is invoked by ji...
Definition: RResultPtr.hxx:365
TIterationHelper< T >::Iterator_t end()
Return an iterator to the end of the contained object if this makes sense, throw a compilation error ...
Definition: RResultPtr.hxx:186
WPTLM_t fImplWeakPtr
Points to the RLoopManager at the root of the functional graph.
Definition: RResultPtr.hxx:114
unsigned long long ULong64_t
Definition: RtypesCore.h:70
ROOT type_traits extensions.
Definition: TypeTraits.hxx:23
std::shared_ptr< bool > ShrdPtrBool_t
Definition: RResultPtr.hxx:74
Binding & operator=(OUT(*fun)(void))
typedef void((*Func_t)())
T & operator*()
Get a pointer to the encapsulated object.
Definition: RResultPtr.hxx:168
T * GetPtr()
Get the pointer to the encapsulated object.
Definition: RResultPtr.hxx:164
#define c(i)
Definition: RSha256.hxx:101
const T & GetValue()
Get a const reference to the encapsulated object.
Definition: RResultPtr.hxx:160
std::shared_ptr< RDFInternal::RActionBase * > GetActionPtrPtr() const
Definition: RResultPtr.hxx:145
TIterationHelper< T >::Iterator_t begin()
Return an iterator to the beginning of the contained object if this makes sense, throw a compilation ...
Definition: RResultPtr.hxx:177