Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.cxx
Go to the documentation of this file.
1// Author: Stefan Wunsch, Enrico Guiraud CERN 09/2020
2
3/*************************************************************************
4 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ROOT/RDFHelpers.hxx"
12
13#include "ROOT/RDF/RActionImpl.hxx" // for RActionImpl
14#include "ROOT/RDF/RFilterBase.hxx" // for RDFInternal
15#include "ROOT/RDF/RLoopManager.hxx" // for RLoopManager
16#include "ROOT/RDF/Utils.hxx"
17#include "ROOT/RResultHandle.hxx" // for RResultHandle, RunGraphs
18
19#include "TROOT.h" // IsImplicitMTEnabled
20#include "TError.h" // Warning
21#include "TStopwatch.h"
22#include "RConfigure.h" // R__USE_IMT
23#include "ROOT/RLogger.hxx"
24#include "ROOT/RSlotStack.hxx"
25#ifdef R__USE_IMT
27#endif // R__USE_IMT
28
29#include <algorithm>
30#include <cstdio>
31#include <iomanip>
32#include <iostream>
33#include <set>
34#include <sstream>
35
36#include <numeric>
37
38#if defined(_WIN32) || defined(_WIN64)
39#define WIN32_LEAN_AND_MEAN
40#define VC_EXTRALEAN
41#include <io.h>
42#include <Windows.h>
43#else
44#include <sys/ioctl.h>
45#include <unistd.h>
46#endif
47
48class TTreeReader;
49
50namespace {
51// Get terminal size for progress bar
52// TODO: Put this in core libraries?
53int get_tty_size()
54{
55#if defined(_WIN32) || defined(_WIN64)
56 if (!_isatty(_fileno(stdout)))
57 return 0;
58 int width = 0;
61 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
62 return width;
63#else
64 int width = 0;
65 struct winsize w;
67 width = (int)(w.ws_col);
68 return width;
69#endif
70}
71
72/// Restore an output stream to its previous state using RAII.
73struct RestoreStreamState {
74 RestoreStreamState(std::ostream &stream) : fStream(stream) { fPreservedState.copyfmt(stream); }
75 ~RestoreStreamState() { fStream.copyfmt(fPreservedState); }
76
77 std::ostream &fStream;
78 std::ios fPreservedState{nullptr};
79};
80
81/// Format std::chrono::seconds as `1:30m`.
82std::string prettyPrint(std::chrono::seconds const &elapsedSeconds)
83{
84 std::stringstream str;
85 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
86 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds - h);
87 auto s = (elapsedSeconds - h - m).count();
88
89 if (h.count() > 0)
90 str << h.count() << ':' << std::setw(2) << std::right << std::setfill('0');
91 str << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s;
92 str << (h.count() > 0 ? 'h' : 'm');
93
94 return str.str();
95}
96
97} // namespace
98
100
101unsigned int ROOT::RDF::RunGraphs(std::vector<RResultHandle> handles)
102{
103 if (handles.empty()) {
104 Warning("RunGraphs", "Got an empty list of handles, now quitting.");
105 return 0u;
106 }
107
108 // Check that there are results which have not yet been run
109 const unsigned int nToRun =
110 std::count_if(handles.begin(), handles.end(), [](const auto &h) { return !h.IsReady(); });
111 if (nToRun < handles.size()) {
112 Warning("RunGraphs", "Got %zu handles from which %zu link to results which are already ready.", handles.size(),
113 handles.size() - nToRun);
114 }
115 if (nToRun == 0u)
116 return 0u;
117
118 // Find the unique event loops
119 auto sameGraph = [](const RResultHandle &a, const RResultHandle &b) { return a.fLoopManager < b.fLoopManager; };
120 std::set<RResultHandle, decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
121 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
122
123 // Trigger jitting. One call is enough to jit the code required by all computation graphs.
125 sw.Start();
126 {
130 // a very high verbosity was requested, let's not silence anything
131 uniqueLoops[0].fLoopManager->Jit();
132 } else {
133 // silence logs from RLoopManager::Jit: RunGraphs does its own logging
135 uniqueLoops[0].fLoopManager->Jit();
136 }
137 }
138 sw.Stop();
140 << "Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
141 << " unique computation graphs) completed"
142 << (sw.RealTime() > 1e-3 ? " in " + std::to_string(sw.RealTime()) + " seconds." : " in less than 1ms.");
143
144 // Trigger the unique event loops
145 auto slotStack = std::make_shared<ROOT::Internal::RSlotStack>(ROOT::GetThreadPoolSize());
146 auto run = [&slotStack](RResultHandle &h) {
147 if (h.fLoopManager) {
148 h.fLoopManager->SetSlotStack(slotStack);
149 h.fLoopManager->Run(/*jit=*/false);
150 }
151 };
152
153 sw.Start();
154#ifdef R__USE_IMT
157 } else {
158#endif
159 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
160#ifdef R__USE_IMT
161 }
162#endif
163 sw.Stop();
165 << "Finished RunGraphs run (" << uniqueLoops.size() << " unique computation graphs, " << sw.CpuTime() << "s CPU, "
166 << sw.RealTime() << "s elapsed).";
167
168 return uniqueLoops.size();
169}
170
171namespace ROOT::RDF::Experimental {
172
173void ThreadsPerTH3(unsigned int N)
174{
176}
177
178ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval,
179 bool useColors)
180 :
182 fIsTTY{_isatty(_fileno(stdout)) != 0},
183 fUseShellColours{false && useColors},
184#else
185 fIsTTY{isatty(fileno(stdout)) == 1},
186 fUseShellColours{useColors && fIsTTY}, // Control characters only with terminals.
187#endif
188 fIncrement{increment},
189 fNColumns(fIsTTY ? (get_tty_size() == 0 ? 60 : get_tty_size()) : 50),
190 fTotalFiles{totalFiles},
191 fPrintInterval{printInterval == 0 ? (fIsTTY ? 1 : 10) : printInterval}
192{
193 std::fill(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), -1.);
194}
195
196/// Register a new sample for completion statistics.
197/// \see ROOT::RDF::RInterface::DefinePerSample().
198/// The *id.AsString()* refers to the name of the currently processed file.
199/// The idea is to populate the event entries in the *fSampleNameToEventEntries* map
200/// by selecting the greater of the two values:
201/// *id.EntryRange().second* which is the upper event entry range of the processed sample
202/// and the current value of the event entries in the *fSampleNameToEventEntries* map.
203/// In the single threaded case, the two numbers are the same as the entry range corresponds
204/// to the number of events in an individual file (each sample is simply a single file).
205/// In the multithreaded case, the idea is to accumulate the higher event entry value until
206/// the total number of events in a given file is reached.
207void ProgressHelper::RegisterNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id)
208{
209 std::scoped_lock lock(fSampleNameToEventEntriesMutex);
210 fSampleNameToEventEntries[id.AsString()] = std::max(id.NEntriesTotal(), fSampleNameToEventEntries[id.AsString()]);
211}
212
213/// Compute a running mean of events/s.
215{
216 double sum = 0;
217 unsigned int n = 0;
218 for (auto item : fEventsPerSecondStatistics) {
219 if (item >= 0.) {
220 sum += item;
221 ++n;
222 }
223 }
224
225 return n > 0 ? sum / n : 0.;
226}
227
228/// Compute total events in all open files.
230{
231 std::scoped_lock lock(fSampleNameToEventEntriesMutex);
232 std::size_t result = 0;
233 for (const auto &item : fSampleNameToEventEntries)
234 result += item.second;
235 return result;
236}
237
238/// Record current event counts and time stamp, populate evts/s statistics array.
239/// The function assumes that a lock on the update mutex is held.
259
260/// Print event and time statistics.
261void ProgressHelper::PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount,
262 std::chrono::seconds elapsedSeconds) const
263{
264 std::ostringstream buffer;
265 auto evtpersec = EvtPerSec();
267 std::size_t currentFileIdx;
268 {
269 std::scoped_lock lock(fSampleNameToEventEntriesMutex);
271 }
272
273 if (totalEventsInOpenFiles == 0)
274 return;
275
276 double completion = 0.;
280 } else {
282 }
283
284 // Print the bar
285 {
286 const auto barWidth = fNColumns / 4;
287 unsigned int nBar = std::min(completion, 1.) * barWidth;
288 std::string bars(std::max(nBar, 1u), '=');
289 bars.back() = (nBar == barWidth) ? '=' : '>';
290
292 buffer << "\033[33m";
293 buffer << '|' << std::setfill(' ') << std::setw(barWidth) << std::left << bars << "| ";
295 buffer << "\033[0m";
296 }
297
298 // Elapsed time
299 buffer << "[";
301 buffer << "\033[35m";
302 buffer << "Elapsed: " << prettyPrint(elapsedSeconds) << " ";
304 buffer << "\033[0m";
305 buffer << "files: " << currentFileIdx << " / " << fTotalFiles << " ";
306
307 // Event counts:
309 buffer << "\033[32m";
310
311 buffer << "events: " << currentEventCount;
312 if (totalEventsInOpenFiles != 0) {
313 buffer << " / " << std::scientific << std::setprecision(2);
315 buffer << totalEventsInOpenFiles;
316 else
317 buffer << "(" << totalEventsInOpenFiles << " + x)";
318 }
319 buffer << " ";
320
322 buffer << "\033[0m";
323
324 // events/s
325 buffer << std::scientific << std::setprecision(2) << evtpersec << " evt/s";
326
327 // Time statistics:
328 // As long as not all files have been opened, estimate when 100% completion will be reached
329 // based on current completion elapsed time. (This assumes that unopened files have about the
330 // same size as the files that have been seen.)
331 // Once the total event count is known, use "missing events / evt/s".
332 if (totalEventsInOpenFiles != 0) {
334 buffer << "\033[35m";
335
336 std::chrono::seconds remainingSeconds;
339 std::chrono::seconds{static_cast<long long>((ComputeTotalEvents() - currentEventCount) / evtpersec)};
340 } else {
342 std::chrono::seconds{static_cast<long long>(elapsedSeconds.count() / completion - elapsedSeconds.count())};
343 }
344 buffer << " remaining ca.: " << prettyPrint(remainingSeconds);
345
347 buffer << "\033[0m";
348 }
349
350 buffer << "]";
351
352 RestoreStreamState restore(stream);
353 stream << std::left << std::setw(fNColumns - 1) << buffer.str();
354}
355
357{
358 auto &stream = std::cout;
359 RestoreStreamState restore(stream);
360 const auto elapsedSeconds =
361 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - fBeginTime);
362 const auto totalEvents = ComputeTotalEvents();
363
364 // The next line resets the current line output in the terminal.
365 // Brings the cursor at the beginning ('\r'), prints whitespace with the
366 // same length as the terminal size, then resets the cursor again so we
367 // can print the final stats on a clean line.
368 if (fIsTTY)
369 stream << '\r' << std::string(fNColumns, ' ') << '\r';
370
372 stream << "\033[35m";
373 stream << "["
374 << "Total elapsed time: " << prettyPrint(elapsedSeconds) << " ";
376 stream << "\033[0m";
377 stream << "processed files: " << fTotalFiles << " ";
378
379 // Event counts:
381 stream << "\033[32m";
382
383 stream << "processed events: " << totalEvents;
384
386 stream << "\033[0m";
387
388 stream << " " << std::scientific << std::setprecision(2) << (double)totalEvents / elapsedSeconds.count()
389 << " evt/s";
390
391 stream << "]\n";
392}
393
394/// Record number of events processed and update progress bar.
395/// This function will atomically record elapsed times and event statistics, and one thread will udpate the progress bar
396/// every n seconds (set by the fPrintInterval).
398{
399 using namespace std::chrono;
400 // ***************************************************
401 // Warning: Here, everything needs to be thread safe:
402 // ***************************************************
403 fProcessedEvents.fetch_add(fIncrement, std::memory_order_relaxed);
404
405 // We only print every n seconds.
406 if (duration_cast<seconds>(system_clock::now() - fLastPrintTime) < fPrintInterval) {
407 return;
408 }
409
410 // ******************************************************
411 // Update the progress bar. Only one thread can proceed.
412 // ******************************************************
413 std::unique_lock<std::mutex> lockGuard(fUpdateMutex, std::try_to_lock);
414 if (!lockGuard)
415 return;
416
418
419 if (fIsTTY)
420 std::cout << "\r";
421
423
424 if (fIsTTY)
425 std::cout << std::flush;
426 else
427 std::cout << std::endl;
428}
429
430class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl<ProgressBarAction> {
431public:
432 using Result_t = int;
433
434private:
435 std::shared_ptr<ProgressHelper> fHelper;
436 std::shared_ptr<int> fDummyResult = std::make_shared<int>();
437
438public:
439 ProgressBarAction(std::shared_ptr<ProgressHelper> r) : fHelper(std::move(r)) {}
440
441 std::shared_ptr<Result_t> GetResultPtr() const { return fDummyResult; }
442
443 void Initialize() {}
444 void InitTask(TTreeReader *, unsigned int) {}
445
446 void Exec(unsigned int) {}
447
448 void Finalize() { fHelper->PrintStatsFinal(); }
449
450 std::string GetActionName() { return "ProgressBar"; }
451 // dummy implementation of PartialUpdate
452 int &PartialUpdate(unsigned int) { return *fDummyResult; }
453
455 {
456 return
457 [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) { this->fHelper->RegisterNewSample(slot, id); };
458 }
459};
460
462{
463 constexpr std::size_t callbackEveryNEvents = 1000;
464 auto total_files = node.GetNFiles();
465 auto progress = std::make_shared<ProgressHelper>(callbackEveryNEvents, total_files);
466 ProgressBarAction c(progress);
467 auto r = node.Book<>(c);
468 r.OnPartialResultSlot(callbackEveryNEvents, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); });
469}
470
472{
473 auto node = ROOT::RDF::AsRNode(dataframe);
475}
476
477} // namespace ROOT::RDF::Experimental
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t width
TCanvas * bars()
Definition bars.C:1
Base class for action helpers, see RInterface::Book() for more information.
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< Result_t > GetResultPtr() const
std::size_t ComputeTotalEvents() const
Compute total events in all open files.
ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval=0, bool useColors=true)
Create a progress helper.
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void RegisterNewSample(unsigned int, const ROOT::RDF::RSampleInfo &id)
Register a new sample for completion statistics.
std::chrono::time_point< std::chrono::system_clock > const fBeginTime
void Update()
Record number of events processed and update progress bar.
std::map< std::string, ULong64_t > fSampleNameToEventEntries
std::chrono::seconds const fPrintInterval
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
std::array< double, 10 > fEventsPerSecondStatistics
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
Definition RLogger.hxx:310
static RLogManager & Get()
Definition RLogger.cxx:60
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
Definition RLogger.hxx:240
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Stopwatch class.
Definition TStopwatch.h:28
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
const Int_t n
Definition legend1.C:16
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:43
unsigned int & NThreadPerTH3()
Obtain or set the number of threads that will share a clone of a thread-safe 3D histogram.
Definition RDFUtils.cxx:76
RLogChannel & GetChannelOrManager()
Definition RLogger.hxx:299
void ThreadsPerTH3(unsigned int nThread=1)
Set the number of threads sharing one TH3 in RDataFrame.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Run the event loops of multiple RDataFrames concurrently.
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Definition TROOT.cxx:607
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
@ kError
An error.
TMarker m
Definition textangle.C:8
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2338