Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 02/2018
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11// This header contains helper free functions that slim down RDataFrame's programming model
12
13#ifndef ROOT_RDF_HELPERS
14#define ROOT_RDF_HELPERS
15
19#include <ROOT/RResultHandle.hxx> // users of RunGraphs might rely on this transitive include
20#include <ROOT/TypeTraits.hxx>
21
22#include <array>
23#include <chrono>
24#include <fstream>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <type_traits>
30#include <utility> // std::index_sequence
31#include <vector>
32
33namespace ROOT {
34namespace Internal {
35namespace RDF {
36template <typename... ArgTypes, typename F>
38{
39 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
40}
41
42template <typename... ArgTypes, typename Ret, typename... Args>
44{
45 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
46}
47
48template <typename I, typename T, typename F>
50
51template <std::size_t... N, typename T, typename F>
52class PassAsVecHelper<std::index_sequence<N...>, T, F> {
53 template <std::size_t Idx>
54 using AlwaysT = T;
55 std::decay_t<F> fFunc;
56
57public:
58 PassAsVecHelper(F &&f) : fFunc(std::forward<F>(f)) {}
59 auto operator()(AlwaysT<N>... args) -> decltype(fFunc({args...})) { return fFunc({args...}); }
60};
61
62template <std::size_t N, typename T, typename F>
64{
65 return PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
66}
67
68} // namespace RDF
69} // namespace Internal
70
71namespace RDF {
73
74// clang-format off
75/// Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns the negated result
76///
77/// The callable must have one single non-template definition of operator(). This is a limitation with respect to
78/// std::not_fn, required for interoperability with RDataFrame.
79// clang-format on
80template <typename F,
81 typename Args = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::arg_types_nodecay,
82 typename Ret = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::ret_type>
83auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward<F>(f)))
84{
85 static_assert(std::is_same<Ret, bool>::value, "RDF::Not requires a callable that returns a bool.");
86 return RDFInternal::NotHelper(Args(), std::forward<F>(f));
87}
88
89// clang-format off
90/// PassAsVec is a callable generator that allows passing N variables of type T to a function as a single collection.
91///
92/// PassAsVec<N, T>(func) returns a callable that takes N arguments of type T, passes them down to function `func` as
93/// an initializer list `{t1, t2, t3,..., tN}` and returns whatever f({t1, t2, t3, ..., tN}) returns.
94///
95/// Note that for this to work with RDataFrame the type of all columns that the callable is applied to must be exactly T.
96/// Example usage together with RDataFrame ("varX" columns must all be `float` variables):
97/// \code
98/// bool myVecFunc(std::vector<float> args);
99/// df.Filter(PassAsVec<3, float>(myVecFunc), {"var1", "var2", "var3"});
100/// \endcode
101// clang-format on
102template <std::size_t N, typename T, typename F>
104{
105 return RDFInternal::PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
106}
107
108// clang-format off
109/// Create a graphviz representation of the dataframe computation graph, return it as a string.
110/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
111///
112/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
113///
114/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
115/// effectively optimized away from the computation graph.
116///
117/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
118// clang-format on
119template <typename NodeType>
120std::string SaveGraph(NodeType node)
121{
123 return helper.RepresentGraph(node);
124}
125
126// clang-format off
127/// Create a graphviz representation of the dataframe computation graph, write it to the specified file.
128/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
129/// \param[in] outputFile file where to save the representation.
130///
131/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
132///
133/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
134/// effectively optimized away from the computation graph.
135///
136/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
137// clang-format on
138template <typename NodeType>
139void SaveGraph(NodeType node, const std::string &outputFile)
140{
142 std::string dotGraph = helper.RepresentGraph(node);
143
144 std::ofstream out(outputFile);
145 if (!out.is_open()) {
146 throw std::runtime_error("Could not open output file \"" + outputFile + "\"for reading");
147 }
148
149 out << dotGraph;
150 out.close();
151}
152
153// clang-format off
154/// Cast a RDataFrame node to the common type ROOT::RDF::RNode
155/// \param[in] node Any node of a RDataFrame graph
156// clang-format on
157template <typename NodeType>
159{
160 return node;
161}
162
163// clang-format off
164/// Run the event loops of multiple RDataFrames concurrently.
165/// \param[in] handles A vector of RResultHandles whose event loops should be run.
166/// \return The number of distinct computation graphs that have been processed.
167///
168/// This function triggers the event loop of all computation graphs which relate to the
169/// given RResultHandles. The advantage compared to running the event loop implicitly by accessing the
170/// RResultPtr is that the event loops will run concurrently. Therefore, the overall
171/// computation of all results can be scheduled more efficiently.
172/// It should be noted that user-defined operations (e.g., Filters and Defines) of the different RDataFrame graphs are assumed to be safe to call concurrently.
173/// RDataFrame will pass slot numbers in the range [0, NThread-1] to all helpers used in nodes such as DefineSlot. NThread is the number of threads ROOT was
174/// configured with in EnableImplicitMT().
175/// Slot numbers are unique across all graphs, so no two tasks with the same slot number will run concurrently. Note that it is not guaranteed that each slot
176/// number will be reached in every graph.
177///
178/// ~~~{.cpp}
179/// ROOT::RDataFrame df1("tree1", "file1.root");
180/// auto r1 = df1.Histo1D("var1");
181///
182/// ROOT::RDataFrame df2("tree2", "file2.root");
183/// auto r2 = df2.Sum("var2");
184///
185/// // RResultPtr -> RResultHandle conversion is automatic
186/// ROOT::RDF::RunGraphs({r1, r2});
187/// ~~~
188// clang-format on
189unsigned int RunGraphs(std::vector<RResultHandle> handles);
190
191namespace Experimental {
192
193/// \brief Produce all required systematic variations for the given result.
194/// \param[in] resPtr The result for which variations should be produced.
195/// \return A \ref ROOT::RDF::Experimental::RResultMap "RResultMap" object with full variation names as strings
196/// (e.g. "pt:down") and the corresponding varied results as values.
197///
198/// A given input RResultPtr<T> produces a corresponding RResultMap<T> with a "nominal"
199/// key that will return a value identical to the one contained in the original RResultPtr.
200/// Other keys correspond to the varied values of this result, one for each variation
201/// that the result depends on.
202/// VariationsFor does not trigger the event loop. The event loop is only triggered
203/// upon first access to a valid key, similarly to what happens with RResultPtr.
204///
205/// If the result does not depend, directly or indirectly, from any registered systematic variation, the
206/// returned RResultMap will contain only the "nominal" key.
207///
208/// See RDataFrame's \ref ROOT::RDF::RInterface::Vary() "Vary" method for more information and example usages.
209///
210/// \note Currently, producing variations for the results of \ref ROOT::RDF::RInterface::Display() "Display",
211/// \ref ROOT::RDF::RInterface::Report() "Report" and \ref ROOT::RDF::RInterface::Snapshot() "Snapshot"
212/// actions is not supported.
213//
214// An overview of how systematic variations work internally. Given N variations (including the nominal):
215//
216// RResultMap owns RVariedAction
217// N results N action helpers
218// N previous filters
219// N*#input_cols column readers
220//
221// ...and each RFilter and RDefine knows for what universe it needs to construct column readers ("nominal" by default).
222template <typename T>
224{
225 R__ASSERT(resPtr != nullptr && "Calling VariationsFor on an empty RResultPtr");
226
227 // populate parts of the computation graph for which we only have "empty shells", e.g. RJittedActions and
228 // RJittedFilters
229 resPtr.fLoopManager->Jit();
230
231 std::unique_ptr<RDFInternal::RActionBase> variedAction;
232 std::vector<std::shared_ptr<T>> variedResults;
233
234 std::shared_ptr<RDFInternal::RActionBase> nominalAction = resPtr.fActionPtr;
235 std::vector<std::string> variations = nominalAction->GetVariations();
236 const auto nVariations = variations.size();
237
238 if (nVariations > 0) {
239 // clone the result once for each variation
240 variedResults.reserve(nVariations);
241 for (auto i = 0u; i < nVariations; ++i){
242 // implicitly assuming that T is copiable: this should be the case
243 // for all result types in use, as they are copied for each slot
244 variedResults.emplace_back(new T{*resPtr.fObjPtr});
245
246 // Check if the result's type T inherits from TNamed
247 if constexpr (std::is_base_of<TNamed, T>::value) {
248 // Get the current variation name
249 std::string variationName = variations[i];
250 // Replace the colon with an underscore
251 std::replace(variationName.begin(), variationName.end(), ':', '_');
252 // Get a pointer to the corresponding varied result
253 auto &variedResult = variedResults.back();
254 // Set the varied result's name to NOMINALNAME_VARIATIONAME
255 variedResult->SetName((std::string(variedResult->GetName()) + "_" + variationName).c_str());
256 }
257 }
258
259 std::vector<void *> typeErasedResults;
260 typeErasedResults.reserve(variedResults.size());
261 for (auto &res : variedResults)
262 typeErasedResults.emplace_back(&res);
263
264 // Create the RVariedAction and inject it in the computation graph.
265 // This recursively creates all the required varied column readers and upstream nodes of the computation graph.
266 variedAction = nominalAction->MakeVariedAction(std::move(typeErasedResults));
267 }
268
269 return RDFInternal::MakeResultMap<T>(resPtr.fObjPtr, std::move(variedResults), std::move(variations),
270 *resPtr.fLoopManager, std::move(nominalAction), std::move(variedAction));
271}
272
275
276/// \brief Add ProgressBar to a ROOT::RDF::RNode
277/// \param[in] df RDataFrame node at which ProgressBar is called.
278///
279/// The ProgressBar can be added not only at the RDataFrame head node, but also at any any computational node,
280/// such as Filter or Define.
281/// ###Example usage:
282/// ~~~{.cpp}
283/// ROOT::RDataFrame df("tree", "file.root");
284/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
285/// ROOT::RDF::Experimental::AddProgressBar(df_1);
286/// ~~~
288
289/// \brief Add ProgressBar to an RDataFrame
290/// \param[in] df RDataFrame for which ProgressBar is called.
291///
292/// This function adds a ProgressBar to display the event statistics in the terminal every
293/// \b m events and every \b n seconds, including elapsed time, currently processed file,
294/// currently processed events, the rate of event processing
295/// and an estimated remaining time (per file being processed).
296/// ProgressBar should be added after the dataframe object (df) is created first:
297/// ~~~{.cpp}
298/// ROOT::RDataFrame df("tree", "file.root");
299/// ROOT::RDF::Experimental::AddProgressBar(df);
300/// ~~~
301/// For more details see ROOT::RDF::Experimental::ProgressHelper Class.
303
304/// @brief Set the number of threads sharing one TH3 in RDataFrame.
305/// When RDF runs multi-threaded, each thread typically clones every histogram in the computation graph.
306/// If this consumes too much memory, N threads can share one clone.
307/// Higher values might slow down RDF because they lead to higher contention on the TH3Ds, but save memory.
308/// Lower values run faster with less contention at the cost of higher memory usage.
309/// @param nThread Number of threads that share a TH3D.
310void ThreadsPerTH3(unsigned int nThread = 1);
311
313
314/// RDF progress helper.
315/// This class provides callback functions to the RDataFrame. The event statistics
316/// (including elapsed time, currently processed file, currently processed events, the rate of event processing
317/// and an estimated remaining time (per file being processed))
318/// are recorded and printed in the terminal every m events and every n seconds.
319/// ProgressHelper::operator()(unsigned int, T&) is thread safe, and can be used as a callback in MT mode.
320/// ProgressBar should be added after creating the dataframe object (df):
321/// ~~~{.cpp}
322/// ROOT::RDataFrame df("tree", "file.root");
323/// ROOT::RDF::Experimental::AddProgressBar(df);
324/// ~~~
325/// alternatively RDataFrame can be cast to an RNode first giving it more flexibility.
326/// For example, it can be called at any computational node, such as Filter or Define, not only the head node,
327/// with no change to the ProgressBar function itself:
328/// ~~~{.cpp}
329/// ROOT::RDataFrame df("tree", "file.root");
330/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
331/// ROOT::RDF::Experimental::AddProgressBar(df_1);
332/// ~~~
334private:
335 double EvtPerSec() const;
336 std::pair<std::size_t, std::chrono::seconds> RecordEvtCountAndTime();
337 void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const;
338 void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const;
339 void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const;
340
341 std::chrono::time_point<std::chrono::system_clock> fBeginTime = std::chrono::system_clock::now();
342 std::chrono::time_point<std::chrono::system_clock> fLastPrintTime = fBeginTime;
343 std::chrono::seconds fPrintInterval{1};
344
345 std::atomic<std::size_t> fProcessedEvents{0};
346 std::size_t fLastProcessedEvents{0};
347 std::size_t fIncrement;
348
350 std::map<std::string, ULong64_t> fSampleNameToEventEntries; // Filename, events in the file
351
352 std::array<double, 20> fEventsPerSecondStatistics;
354
355 unsigned int fBarWidth;
356 unsigned int fTotalFiles;
357
358 std::mutex fPrintMutex;
359 bool fIsTTY;
361
362 std::shared_ptr<TTree> fTree{nullptr};
363
364public:
365 /// Create a progress helper.
366 /// \param increment RDF callbacks are called every `n` events. Pass this `n` here.
367 /// \param totalFiles read total number of files in the RDF.
368 /// \param progressBarWidth Number of characters the progress bar will occupy.
369 /// \param printInterval Update every stats every `n` seconds.
370 /// \param useColors Use shell colour codes to colour the output. Automatically disabled when
371 /// we are not writing to a tty.
372 ProgressHelper(std::size_t increment, unsigned int totalFiles = 1, unsigned int progressBarWidth = 40,
373 unsigned int printInterval = 1, bool useColors = true);
374
375 ~ProgressHelper() = default;
376
377 friend class ProgressBarAction;
378
379 /// Register a new sample for completion statistics.
380 /// \see ROOT::RDF::RInterface::DefinePerSample().
381 /// The *id.AsString()* refers to the name of the currently processed file.
382 /// The idea is to populate the event entries in the *fSampleNameToEventEntries* map
383 /// by selecting the greater of the two values:
384 /// *id.EntryRange().second* which is the upper event entry range of the processed sample
385 /// and the current value of the event entries in the *fSampleNameToEventEntries* map.
386 /// In the single threaded case, the two numbers are the same as the entry range corresponds
387 /// to the number of events in an individual file (each sample is simply a single file).
388 /// In the multithreaded case, the idea is to accumulate the higher event entry value until
389 /// the total number of events in a given file is reached.
390 void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id)
391 {
392 std::lock_guard<std::mutex> lock(fSampleNameToEventEntriesMutex);
393 fSampleNameToEventEntries[id.AsString()] =
394 std::max(id.EntryRange().second, fSampleNameToEventEntries[id.AsString()]);
395 }
396
397 /// Thread-safe callback for RDataFrame.
398 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the
399 /// fPrintInterval). \param slot Ignored. \param value Ignored.
400 template <typename T>
401 void operator()(unsigned int /*slot*/, T &value)
402 {
404 }
405 // clang-format off
406 /// Thread-safe callback for RDataFrame.
407 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the fPrintInterval).
408 /// \param value Ignored.
409 // clang-format on
410 template <typename T>
411 void operator()(T & /*value*/)
412 {
413 using namespace std::chrono;
414 // ***************************************************
415 // Warning: Here, everything needs to be thread safe:
416 // ***************************************************
418
419 // We only print every n seconds.
420 if (duration_cast<seconds>(system_clock::now() - fLastPrintTime) < fPrintInterval) {
421 return;
422 }
423
424 // ***************************************************
425 // Protected by lock from here:
426 // ***************************************************
427 if (!fPrintMutex.try_lock())
428 return;
429 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
430
431 std::size_t eventCount;
432 seconds elapsedSeconds;
434
435 if (fIsTTY)
436 std::cout << "\r";
437
438 PrintProgressBar(std::cout, eventCount);
440
441 if (fIsTTY)
442 std::cout << std::flush;
443 else
444 std::cout << std::endl;
445 }
446
447 std::size_t ComputeNEventsSoFar() const
448 {
449 std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
450 std::size_t result = 0;
451 for (const auto &item : fSampleNameToEventEntries)
452 result += item.second;
453 return result;
454 }
455
456 unsigned int ComputeCurrentFileIdx() const
457 {
458 std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
459 return fSampleNameToEventEntries.size();
460 }
461};
462} // namespace Experimental
463} // namespace RDF
464} // namespace ROOT
465#endif
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
TRObject operator()(const T1 &t1) const
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void registerNewSample(unsigned int, const ROOT::RDF::RSampleInfo &id)
Register a new sample for completion statistics.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void operator()(unsigned int, T &value)
Thread-safe callback for RDataFrame.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::map< std::string, ULong64_t > fSampleNameToEventEntries
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
void operator()(T &)
Thread-safe callback for RDataFrame.
The public interface to the RDataFrame federation of classes.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
const_iterator begin() const
const_iterator end() const
#define F(x, y, z)
std::function< bool(ArgTypes...)> NotHelper(ROOT::TypeTraits::TypeList< ArgTypes... >, F &&f)
auto PassAsVec(F &&f) -> PassAsVecHelper< std::make_index_sequence< N >, T, F >
void ThreadsPerTH3(unsigned int nThread=1)
Set the number of threads sharing one TH3 in RDataFrame.
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward< F >(f)))
Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns ...
std::string SaveGraph(NodeType node)
Create a graphviz representation of the dataframe computation graph, return it as a string.
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Lightweight storage for a collection of types.