Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RDFHelpers.cxx
Go to the documentation of this file.
1// Author: Stefan Wunsch, Enrico Guiraud CERN 09/2020
2
3/*************************************************************************
4 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ROOT/RDFHelpers.hxx"
12#include "TROOT.h" // IsImplicitMTEnabled
13#include "TError.h" // Warning
14#include "TStopwatch.h"
15#include "RConfigure.h" // R__USE_IMT
16#include "ROOT/RLogger.hxx"
17#include "ROOT/RDF/RLoopManager.hxx" // for RLoopManager
18#include "ROOT/RDF/Utils.hxx"
19#include "ROOT/RResultHandle.hxx" // for RResultHandle, RunGraphs
20#ifdef R__USE_IMT
22#endif // R__USE_IMT
23
24#include <algorithm>
25#include <iostream>
26#include <set>
27#include <cstdio>
28
29// TODO, this function should be part of core libraries
30#include <numeric>
31#if (!defined(_WIN32)) && (!defined(_WIN64))
32#include <unistd.h>
33#endif
34
35#if defined(_WIN32) || defined(_WIN64)
36#define WIN32_LEAN_AND_MEAN
37#define VC_EXTRALEAN
38#include <io.h>
39#include <Windows.h>
40#else
41#include <sys/ioctl.h>
42#endif
43
44// Get terminal size for progress bar
46{
47#if defined(_WIN32) || defined(_WIN64)
48 if (!_isatty(_fileno(stdout)))
49 return 0;
50 int width = 0;
53 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
54 return width;
55#else
56 int width = 0;
57 struct winsize w;
59 width = (int)(w.ws_col);
60 return width;
61#endif
62}
63
65
66unsigned int ROOT::RDF::RunGraphs(std::vector<RResultHandle> handles)
67{
68 if (handles.empty()) {
69 Warning("RunGraphs", "Got an empty list of handles, now quitting.");
70 return 0u;
71 }
72
73 // Check that there are results which have not yet been run
74 const unsigned int nToRun =
75 std::count_if(handles.begin(), handles.end(), [](const auto &h) { return !h.IsReady(); });
76 if (nToRun < handles.size()) {
77 Warning("RunGraphs", "Got %zu handles from which %zu link to results which are already ready.", handles.size(),
78 handles.size() - nToRun);
79 }
80 if (nToRun == 0u)
81 return 0u;
82
83 // Find the unique event loops
84 auto sameGraph = [](const RResultHandle &a, const RResultHandle &b) { return a.fLoopManager < b.fLoopManager; };
85 std::set<RResultHandle, decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
86 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
87
88 // Trigger jitting. One call is enough to jit the code required by all computation graphs.
90 sw.Start();
91 {
95 // a very high verbosity was requested, let's not silence anything
96 uniqueLoops[0].fLoopManager->Jit();
97 } else {
98 // silence logs from RLoopManager::Jit: RunGraphs does its own logging
100 uniqueLoops[0].fLoopManager->Jit();
101 }
102 }
103 sw.Stop();
105 << "Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
106 << " unique computation graphs) completed"
107 << (sw.RealTime() > 1e-3 ? " in " + std::to_string(sw.RealTime()) + " seconds." : " in less than 1ms.");
108
109 // Trigger the unique event loops
110 auto run = [](RResultHandle &h) {
111 if (h.fLoopManager)
112 h.fLoopManager->Run(/*jit=*/false);
113 };
114
115 sw.Start();
116#ifdef R__USE_IMT
119 } else {
120#endif
121 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
122#ifdef R__USE_IMT
123 }
124#endif
125 sw.Stop();
127 << "Finished RunGraphs run (" << uniqueLoops.size() << " unique computation graphs, " << sw.CpuTime() << "s CPU, "
128 << sw.RealTime() << "s elapsed).";
129
130 return uniqueLoops.size();
131}
132
134{
135 throw std::logic_error("Varying a Snapshot result is not implemented yet.");
136}
137
138namespace ROOT {
139namespace RDF {
140
141namespace Experimental {
142
144 unsigned int printInterval, bool useColors)
145 : fPrintInterval(printInterval),
146 fIncrement{increment},
147 fBarWidth{progressBarWidth = int(get_tty_size() / 4)},
148 fTotalFiles{totalFiles},
150 fIsTTY{_isatty(_fileno(stdout)) != 0},
151 fUseShellColours{false && useColors}
152#else
153 fIsTTY{isatty(fileno(stdout)) == 1},
154 fUseShellColours{useColors && fIsTTY} // Control characters only with terminals.
155#endif
156{
157}
158
159/// Compute a running mean of events/s.
170
171/// Record current event counts and time stamp, populate evts/s statistics array.
190
191namespace {
192
193struct RestoreStreamState {
194 RestoreStreamState(std::ostream &stream) : fStream(stream), fFlags(stream.flags()), fFillChar(stream.fill()) {}
196 {
197 fStream.flags(fFlags);
198 fStream.fill(fFillChar);
199 }
200
201 std::ostream &fStream;
202 std::ios_base::fmtflags fFlags;
203 std::ostream::char_type fFillChar;
204};
205
206/// Format std::chrono::seconds as `1:30m`.
207std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds)
208{
209 RestoreStreamState restore(stream);
210 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
211 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds - h);
212 auto s = (elapsedSeconds - h - m).count();
213
214 if (h.count() > 0)
215 stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0');
216 stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s;
217 return stream << (h.count() > 0 ? 'h' : 'm');
218}
219
220} // namespace
221
222/// Print event and time statistics.
223void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount,
224 std::chrono::seconds elapsedSeconds) const
225{
226 RestoreStreamState restore(stream);
227 auto evtpersec = EvtPerSec();
230 auto totalFiles = fTotalFiles;
231
233 stream << "\033[35m";
234 stream << "["
235 << "Elapsed time: " << elapsedSeconds << " ";
237 stream << "\033[0m";
238 stream << "processing file: " << currentFileIdx << " / " << totalFiles << " ";
239
240 // Event counts:
242 stream << "\033[32m";
243
244 stream << "processed evts: " << currentEventCount;
245 if (GetNEventsOfCurrentFile != 0) {
246 stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile;
247 }
248 stream << " ";
249
251 stream << "\033[0m";
252
253 // events/s
254 stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s";
255
256 // Time statistics:
257 if (GetNEventsOfCurrentFile != 0) {
259 stream << "\033[35m";
260 std::chrono::seconds remainingSeconds(
261 static_cast<long long>((ComputeNEventsSoFar() - currentEventCount) / evtpersec));
262 stream << " " << remainingSeconds << " "
263 << " remaining time (per file being processed)";
265 stream << "\033[0m";
266 }
267
268 stream << "] ";
269}
270
271void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const
272{
273 RestoreStreamState restore(stream);
275 auto totalFiles = fTotalFiles;
276
278 stream << "\033[35m";
279 stream << "["
280 << "Total elapsed time: " << elapsedSeconds << " ";
282 stream << "\033[0m";
283 stream << "processed files: " << totalFiles << " / " << totalFiles << " ";
284
285 // Event counts:
287 stream << "\033[32m";
288
289 stream << "processed evts: " << totalEvents;
290 if (totalEvents != 0) {
291 stream << " / " << std::scientific << std::setprecision(2) << totalEvents;
292 }
293
295 stream << "\033[0m";
296
297 stream << "] ";
298}
299
300/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known.
301void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
302{
305 return;
306
307 RestoreStreamState restore(stream);
308
310 unsigned int nBar = std::min(completion, 1.) * fBarWidth;
311
312 std::string bars(std::max(nBar, 1u), '=');
313 bars.back() = (nBar == fBarWidth) ? '=' : '>';
314
316 stream << "\033[33m";
317 stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| ";
319 stream << "\033[0m";
320}
321//*/
322
323class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl<ProgressBarAction> {
324public:
325 using Result_t = int;
326
327private:
328 std::shared_ptr<ProgressHelper> fHelper;
329 std::shared_ptr<int> fDummyResult = std::make_shared<int>();
330
331public:
332 ProgressBarAction(std::shared_ptr<ProgressHelper> r) : fHelper(std::move(r)) {}
333
334 std::shared_ptr<Result_t> GetResultPtr() const { return fDummyResult; }
335
336 void Initialize() {}
337 void InitTask(TTreeReader *, unsigned int) {}
338
339 void Exec(unsigned int) {}
340
341 void Finalize()
342 {
343 std::mutex fPrintMutex;
344 if (!fPrintMutex.try_lock())
345 return;
346 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
347 const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime();
348
349 // The next line resets the current line output in the terminal.
350 // Brings the cursor at the beginning ('\r'), prints whitespace with the
351 // same length as the terminal size, then resets the cursor again so we
352 // can print the final stats on a clean line.
353 std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r';
354 fHelper->PrintStatsFinal(std::cout, elapsedSeconds);
355 std::cout << '\n';
356 }
357
358 std::string GetActionName() { return "ProgressBar"; }
359 // dummy implementation of PartialUpdate
360 int &PartialUpdate(unsigned int) { return *fDummyResult; }
361
363 {
364 return [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) {
365 this->fHelper->registerNewSample(slot, id);
366 return this->fHelper->ComputeNEventsSoFar();
367 };
368 }
369};
370
372{
373 auto total_files = node.GetNFiles();
374 auto progress = std::make_shared<ProgressHelper>(1000, total_files);
375 ProgressBarAction c(progress);
376 auto r = node.Book<>(c);
377 r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); });
378}
379
381{
382 auto node = ROOT::RDF::AsRNode(dataframe);
384}
385} // namespace Experimental
386} // namespace RDF
387} // namespace ROOT
int get_tty_size()
std::ostream & fStream
std::ostream::char_type fFillChar
std::ios_base::fmtflags fFlags
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t width
TCanvas * bars()
Definition bars.C:1
Base class for action helpers, see RInterface::Book() for more information.
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< Result_t > GetResultPtr() const
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
Definition RLogger.hxx:310
static RLogManager & Get()
Definition RLogger.cxx:60
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
Definition RLogger.hxx:240
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Stopwatch class.
Definition TStopwatch.h:28
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
RLogChannel & GetChannelOrManager()
Definition RLogger.hxx:299
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Trigger the event loop of multiple RDataFrames concurrently.
std::ostream & operator<<(std::ostream &os, const RDFDescription &description)
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
@ kError
An error.
TMarker m
Definition textangle.C:8