Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 02/2018
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11// This header contains helper free functions that slim down RDataFrame's programming model
12
13#ifndef ROOT_RDF_HELPERS
14#define ROOT_RDF_HELPERS
15
19#include <ROOT/RResultHandle.hxx> // users of RunGraphs might rely on this transitive include
20#include <ROOT/TypeTraits.hxx>
21
22#include <array>
23#include <chrono>
24#include <fstream>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <type_traits>
30#include <utility> // std::index_sequence
31#include <vector>
32
33namespace ROOT {
34namespace Internal {
35namespace RDF {
36template <typename... ArgTypes, typename F>
38{
39 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
40}
41
42template <typename... ArgTypes, typename Ret, typename... Args>
44{
45 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
46}
47
48template <typename I, typename T, typename F>
50
51template <std::size_t... N, typename T, typename F>
52class PassAsVecHelper<std::index_sequence<N...>, T, F> {
53 template <std::size_t Idx>
54 using AlwaysT = T;
55 std::decay_t<F> fFunc;
56
57public:
58 PassAsVecHelper(F &&f) : fFunc(std::forward<F>(f)) {}
59 auto operator()(AlwaysT<N>... args) -> decltype(fFunc({args...})) { return fFunc({args...}); }
60};
61
62template <std::size_t N, typename T, typename F>
64{
65 return PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
66}
67
68} // namespace RDF
69} // namespace Internal
70
71namespace RDF {
73
74// clang-format off
75/// Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns the negated result
76///
77/// The callable must have one single non-template definition of operator(). This is a limitation with respect to
78/// std::not_fn, required for interoperability with RDataFrame.
79// clang-format on
80template <typename F,
81 typename Args = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::arg_types_nodecay,
82 typename Ret = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::ret_type>
83auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward<F>(f)))
84{
85 static_assert(std::is_same<Ret, bool>::value, "RDF::Not requires a callable that returns a bool.");
86 return RDFInternal::NotHelper(Args(), std::forward<F>(f));
87}
88
89// clang-format off
90/// PassAsVec is a callable generator that allows passing N variables of type T to a function as a single collection.
91///
92/// PassAsVec<N, T>(func) returns a callable that takes N arguments of type T, passes them down to function `func` as
93/// an initializer list `{t1, t2, t3,..., tN}` and returns whatever f({t1, t2, t3, ..., tN}) returns.
94///
95/// Note that for this to work with RDataFrame the type of all columns that the callable is applied to must be exactly T.
96/// Example usage together with RDataFrame ("varX" columns must all be `float` variables):
97/// \code
98/// bool myVecFunc(std::vector<float> args);
99/// df.Filter(PassAsVec<3, float>(myVecFunc), {"var1", "var2", "var3"});
100/// \endcode
101// clang-format on
102template <std::size_t N, typename T, typename F>
104{
105 return RDFInternal::PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
106}
107
108// clang-format off
109/// Create a graphviz representation of the dataframe computation graph, return it as a string.
110/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
111///
112/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
113///
114/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
115/// effectively optimized away from the computation graph.
116///
117/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
118// clang-format on
119template <typename NodeType>
120std::string SaveGraph(NodeType node)
121{
123 return helper.RepresentGraph(node);
124}
125
126// clang-format off
127/// Create a graphviz representation of the dataframe computation graph, write it to the specified file.
128/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
129/// \param[in] outputFile file where to save the representation.
130///
131/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
132///
133/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
134/// effectively optimized away from the computation graph.
135///
136/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
137// clang-format on
138template <typename NodeType>
139void SaveGraph(NodeType node, const std::string &outputFile)
140{
142 std::string dotGraph = helper.RepresentGraph(node);
143
144 std::ofstream out(outputFile);
145 if (!out.is_open()) {
146 throw std::runtime_error("Could not open output file \"" + outputFile + "\"for reading");
147 }
148
149 out << dotGraph;
150 out.close();
151}
152
153// clang-format off
154/// Cast a RDataFrame node to the common type ROOT::RDF::RNode
155/// \param[in] node Any node of a RDataFrame graph
156// clang-format on
157template <typename NodeType>
159{
160 return node;
161}
162
163// clang-format off
164/// Run the event loops of multiple RDataFrames concurrently.
165/// \param[in] handles A vector of RResultHandles whose event loops should be run.
166/// \return The number of distinct computation graphs that have been processed.
167///
168/// This function triggers the event loop of all computation graphs which relate to the
169/// given RResultHandles. The advantage compared to running the event loop implicitly by accessing the
170/// RResultPtr is that the event loops will run concurrently. Therefore, the overall
171/// computation of all results can be scheduled more efficiently.
172/// It should be noted that user-defined operations (e.g., Filters and Defines) of the different RDataFrame graphs are assumed to be safe to call concurrently.
173/// RDataFrame will pass slot numbers in the range [0, NThread-1] to all helpers used in nodes such as DefineSlot. NThread is the number of threads ROOT was
174/// configured with in EnableImplicitMT().
175/// Slot numbers are unique across all graphs, so no two tasks with the same slot number will run concurrently. Note that it is not guaranteed that each slot
176/// number will be reached in every graph.
177///
178/// ~~~{.cpp}
179/// ROOT::RDataFrame df1("tree1", "file1.root");
180/// auto r1 = df1.Histo1D("var1");
181///
182/// ROOT::RDataFrame df2("tree2", "file2.root");
183/// auto r2 = df2.Sum("var2");
184///
185/// // RResultPtr -> RResultHandle conversion is automatic
186/// ROOT::RDF::RunGraphs({r1, r2});
187/// ~~~
188// clang-format on
189unsigned int RunGraphs(std::vector<RResultHandle> handles);
190
191namespace Experimental {
192
193/// \brief Produce all required systematic variations for the given result.
194/// \param[in] resPtr The result for which variations should be produced.
195/// \return A \ref ROOT::RDF::Experimental::RResultMap "RResultMap" object with full variation names as strings
196/// (e.g. "pt:down") and the corresponding varied results as values.
197///
198/// A given input RResultPtr<T> produces a corresponding RResultMap<T> with a "nominal"
199/// key that will return a value identical to the one contained in the original RResultPtr.
200/// Other keys correspond to the varied values of this result, one for each variation
201/// that the result depends on.
202/// VariationsFor does not trigger the event loop. The event loop is only triggered
203/// upon first access to a valid key, similarly to what happens with RResultPtr.
204///
205/// If the result does not depend, directly or indirectly, from any registered systematic variation, the
206/// returned RResultMap will contain only the "nominal" key.
207///
208/// See RDataFrame's \ref ROOT::RDF::RInterface::Vary() "Vary" method for more information and example usages.
209///
210/// \note Currently, producing variations for the results of \ref ROOT::RDF::RInterface::Display() "Display",
211/// \ref ROOT::RDF::RInterface::Report() "Report" and \ref ROOT::RDF::RInterface::Snapshot() "Snapshot"
212/// actions is not supported.
213//
214// An overview of how systematic variations work internally. Given N variations (including the nominal):
215//
216// RResultMap owns RVariedAction
217// N results N action helpers
218// N previous filters
219// N*#input_cols column readers
220//
221// ...and each RFilter and RDefine knows for what universe it needs to construct column readers ("nominal" by default).
222template <typename T>
224{
226 static_assert(!std::is_same_v<T, SnapshotResult_t>,
227 "Snapshot with variations can only be enabled via RSnapshotOptions.");
228
229 R__ASSERT(resPtr != nullptr && "Calling VariationsFor on an empty RResultPtr");
230
231 // populate parts of the computation graph for which we only have "empty shells", e.g. RJittedActions and
232 // RJittedFilters
233 resPtr.fLoopManager->Jit();
234
235 std::unique_ptr<RDFInternal::RActionBase> variedAction;
236 std::vector<std::shared_ptr<T>> variedResults;
237
238 std::shared_ptr<RDFInternal::RActionBase> nominalAction = resPtr.fActionPtr;
239 std::vector<std::string> variations = nominalAction->GetVariations();
240 const auto nVariations = variations.size();
241
242 if (nVariations > 0) {
243 // clone the result once for each variation
244 variedResults.reserve(nVariations);
245 for (auto i = 0u; i < nVariations; ++i){
246 // implicitly assuming that T is copiable: this should be the case
247 // for all result types in use, as they are copied for each slot
248 variedResults.emplace_back(new T{*resPtr.fObjPtr});
249
250 // Check if the result's type T inherits from TNamed
251 if constexpr (std::is_base_of<TNamed, T>::value) {
252 // Get the current variation name
253 std::string variationName = variations[i];
254 // Replace the colon with an underscore
255 std::replace(variationName.begin(), variationName.end(), ':', '_');
256 // Get a pointer to the corresponding varied result
257 auto &variedResult = variedResults.back();
258 // Set the varied result's name to NOMINALNAME_VARIATIONAME
259 variedResult->SetName((std::string(variedResult->GetName()) + "_" + variationName).c_str());
260 }
261 }
262
263 std::vector<void *> typeErasedResults;
264 typeErasedResults.reserve(variedResults.size());
265 for (auto &res : variedResults)
266 typeErasedResults.emplace_back(&res);
267
268 // Create the RVariedAction and inject it in the computation graph.
269 // This recursively creates all the required varied column readers and upstream nodes of the computation graph.
270 variedAction = nominalAction->MakeVariedAction(std::move(typeErasedResults));
271 }
272
273 return RDFInternal::MakeResultMap<T>(resPtr.fObjPtr, std::move(variedResults), std::move(variations),
274 *resPtr.fLoopManager, std::move(nominalAction), std::move(variedAction));
275}
276
277/// \brief Add ProgressBar to a ROOT::RDF::RNode
278/// \param[in] df RDataFrame node at which ProgressBar is called.
279///
280/// The ProgressBar can be added not only at the RDataFrame head node, but also at any any computational node,
281/// such as Filter or Define.
282/// ###Example usage:
283/// ~~~{.cpp}
284/// ROOT::RDataFrame df("tree", "file.root");
285/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
286/// ROOT::RDF::Experimental::AddProgressBar(df_1);
287/// ~~~
289
290/// \brief Add ProgressBar to an RDataFrame
291/// \param[in] df RDataFrame for which ProgressBar is called.
292///
293/// This function adds a ProgressBar to display the event statistics in the terminal every
294/// \b m events and every \b n seconds, including elapsed time, currently processed file,
295/// currently processed events, the rate of event processing
296/// and an estimated remaining time (per file being processed).
297/// ProgressBar should be added after the dataframe object (df) is created first:
298/// ~~~{.cpp}
299/// ROOT::RDataFrame df("tree", "file.root");
300/// ROOT::RDF::Experimental::AddProgressBar(df);
301/// ~~~
302/// For more details see ROOT::RDF::Experimental::ProgressHelper Class.
304
305/// @brief Set the number of threads sharing one TH3 in RDataFrame.
306/// When RDF runs multi-threaded, each thread typically clones every histogram in the computation graph.
307/// If this consumes too much memory, N threads can share one clone.
308/// Higher values might slow down RDF because they lead to higher contention on the TH3Ds, but save memory.
309/// Lower values run faster with less contention at the cost of higher memory usage.
310/// @param nThread Number of threads that share a TH3D.
311void ThreadsPerTH3(unsigned int nThread = 1);
312
313/// RDF progress helper.
314/// This class provides callback functions to the RDataFrame. The event statistics
315/// (including elapsed time, currently processed file, currently processed events, the rate of event processing
316/// and an estimated remaining time (per file being processed))
317/// are recorded and printed in the terminal every m events and every n seconds.
318/// ProgressHelper::operator()(unsigned int, T&) is thread safe, and can be used as a callback in MT mode.
319/// ProgressBar should be added after creating the dataframe object (df):
320/// ~~~{.cpp}
321/// ROOT::RDataFrame df("tree", "file.root");
322/// ROOT::RDF::Experimental::AddProgressBar(df);
323/// ~~~
324/// alternatively RDataFrame can be cast to an RNode first giving it more flexibility.
325/// For example, it can be called at any computational node, such as Filter or Define, not only the head node,
326/// with no change to the ProgressBar function itself:
327/// ~~~{.cpp}
328/// ROOT::RDataFrame df("tree", "file.root");
329/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
330/// ROOT::RDF::Experimental::AddProgressBar(df_1);
331/// ~~~
333private:
334 std::size_t ComputeTotalEvents() const;
335 double EvtPerSec() const;
336 void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount,
337 std::chrono::seconds totalElapsedSeconds) const;
338 std::pair<std::size_t, std::chrono::seconds> RecordEvtCountAndTime();
339 void Update();
340
341 bool const fIsTTY;
343
344 std::atomic<std::size_t> fProcessedEvents{0};
345 std::size_t fLastProcessedEvents{0};
346 std::size_t const fIncrement;
347 unsigned int const fNColumns;
348 unsigned int const fTotalFiles;
349
350 std::array<double, 10> fEventsPerSecondStatistics;
352
353 std::chrono::time_point<std::chrono::system_clock> const fBeginTime = std::chrono::system_clock::now();
354 std::chrono::time_point<std::chrono::system_clock> fLastPrintTime = fBeginTime;
355 std::chrono::seconds const fPrintInterval;
356
357 // Mutex to ensure that only one thread updates the progress bar.
358 // Lock this mutex to update any of the members above:
359 std::mutex fUpdateMutex;
360
361 mutable std::mutex fSampleNameToEventEntriesMutex; // Mutex to protect access to the below map
362 std::map<std::string, ULong64_t> fSampleNameToEventEntries; // Filename, events in the file
363
364public:
365 /// Create a progress helper.
366 /// \param increment RDF callbacks are called every `n` events. Pass this `n` here.
367 /// \param totalFiles number of files read in the RDF.
368 /// \param printInterval Update stats every `n` seconds.
369 /// \param useColors Use shell colour codes to colour the output. Automatically disabled when
370 /// we are not writing to a tty.
371 ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval = 0,
372 bool useColors = true);
373 ProgressHelper(ProgressHelper const &) = delete; // The mutexes and atomics won't allow copy/move
375 ~ProgressHelper() = default;
378
379 void RegisterNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id);
380
381 /// Thread-safe callback for RDataFrame.
382 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the
383 /// fPrintInterval). \param slot Ignored. \param value Ignored.
384 template <typename T>
385 void operator()(unsigned int /*slot*/, T & /*value*/)
386 {
387 Update();
388 }
389 void PrintStatsFinal() const;
390};
391} // namespace Experimental
392} // namespace RDF
393} // namespace ROOT
394#endif
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define N
TRObject operator()(const T1 &t1) const
std::size_t ComputeTotalEvents() const
Compute total events in all open files.
ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval=0, bool useColors=true)
Create a progress helper.
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void RegisterNewSample(unsigned int, const ROOT::RDF::RSampleInfo &id)
Register a new sample for completion statistics.
std::chrono::time_point< std::chrono::system_clock > const fBeginTime
void Update()
Record number of events processed and update progress bar.
std::map< std::string, ULong64_t > fSampleNameToEventEntries
ProgressHelper(ProgressHelper const &)=delete
ProgressHelper & operator=(ProgressHelper &&)=delete
ProgressHelper(ProgressHelper &&)=delete
std::chrono::seconds const fPrintInterval
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
std::array< double, 10 > fEventsPerSecondStatistics
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
void operator()(unsigned int, T &)
Thread-safe callback for RDataFrame.
void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
ProgressHelper & operator=(ProgressHelper const &)=delete
The public interface to the RDataFrame federation of classes.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
const_iterator begin() const
const_iterator end() const
#define F(x, y, z)
std::function< bool(ArgTypes...)> NotHelper(ROOT::TypeTraits::TypeList< ArgTypes... >, F &&f)
auto PassAsVec(F &&f) -> PassAsVecHelper< std::make_index_sequence< N >, T, F >
void ThreadsPerTH3(unsigned int nThread=1)
Set the number of threads sharing one TH3 in RDataFrame.
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward< F >(f)))
Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns ...
std::string SaveGraph(NodeType node)
Create a graphviz representation of the dataframe computation graph, return it as a string.
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
Lightweight storage for a collection of types.