Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RooBatchCompute.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * Emmanouil Michalainas, CERN, September 2020
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13/**
14\file RooBatchCompute.cxx
15\class RbcClass
16\ingroup roofit_dev_docs_batchcompute
17
18This file contains the code for cpu computations using the RooBatchCompute library.
19**/
20
21#include "RooBatchCompute.h"
22#include "RooNaNPacker.h"
23#include "RooVDTHeaders.h"
24#include "Batches.h"
25
26#include <ROOT/RConfig.hxx>
27
28#ifdef ROOBATCHCOMPUTE_USE_IMT
29#include <ROOT/TExecutor.hxx>
30#endif
31
32#include <Math/Util.h>
33
34#include <algorithm>
35#include <functional>
36#include <map>
37#include <queue>
38#include <sstream>
39#include <stdexcept>
40
41#include <vector>
42
43#ifndef RF_ARCH
44#error "RF_ARCH should always be defined"
45#endif
46
47namespace RooBatchCompute {
48namespace RF_ARCH {
49
50namespace {
51
52void fillBatches(Batches &batches, double *output, size_t nEvents, std::size_t nBatches, ArgSpan extraArgs)
53{
54 batches.extra = extraArgs.data();
55 batches.nEvents = nEvents;
56 batches.nBatches = nBatches;
57 batches.nExtra = extraArgs.size();
58 batches.output = output;
59}
60
61void fillArrays(std::span<Batch> arrays, VarSpan vars, std::size_t nEvents)
62{
63 for (std::size_t i = 0; i < vars.size(); i++) {
64 arrays[i]._array = vars[i].data();
65 arrays[i]._isVector = vars[i].empty() || vars[i].size() >= nEvents;
66 }
67}
68
69inline void advance(Batches &batches, std::size_t nEvents)
70{
71 for (std::size_t i = 0; i < batches.nBatches; i++) {
72 Batch &arg = batches.args[i];
73 arg._array += arg._isVector * nEvents;
74 }
75 batches.output += nEvents;
76}
77
78} // namespace
79
80std::vector<void (*)(Batches &)> getFunctions();
81
82/// This class overrides some RooBatchComputeInterface functions, for the
83/// purpose of providing a CPU specific implementation of the library.
84class RooBatchComputeClass : public RooBatchComputeInterface {
85public:
86 RooBatchComputeClass() : _computeFunctions(getFunctions())
87 {
88 // Set the dispatch pointer to this instance of the library upon loading
89 dispatchCPU = this;
90 }
91
92 Architecture architecture() const override { return Architecture::RF_ARCH; };
93 std::string architectureName() const override
94 {
95 // transform to lower case to match the original architecture name passed to the compiler
96#ifdef _QUOTEVAL_ // to quote the value of the preprocessor macro instead of the name
97#error "It's unexpected that _QUOTEVAL_ is defined at this point!"
98#endif
99#define _QUOTEVAL_(x) _QUOTE_(x)
100 std::string out = _QUOTEVAL_(RF_ARCH);
101#undef _QUOTEVAL_
102 std::transform(out.begin(), out.end(), out.begin(), [](unsigned char c) { return std::tolower(c); });
103 return out;
104 };
105
106 void compute(Config const &, Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs) override;
107 double reduceSum(Config const &, InputArr input, size_t n) override;
108 ReduceNLLOutput reduceNLL(Config const &, std::span<const double> probas, std::span<const double> weights,
109 std::span<const double> offsetProbas) override;
110
111 std::unique_ptr<AbsBufferManager> createBufferManager() const override;
112
113 CudaInterface::CudaEvent *newCudaEvent(bool) const override { throw std::bad_function_call(); }
114 CudaInterface::CudaStream *newCudaStream() const override { throw std::bad_function_call(); }
115 void deleteCudaEvent(CudaInterface::CudaEvent *) const override { throw std::bad_function_call(); }
116 void deleteCudaStream(CudaInterface::CudaStream *) const override { throw std::bad_function_call(); }
118 {
119 throw std::bad_function_call();
120 }
122 {
123 throw std::bad_function_call();
124 }
125 bool cudaStreamIsActive(CudaInterface::CudaStream *) const override { throw std::bad_function_call(); }
126
127private:
128#ifdef ROOBATCHCOMPUTE_USE_IMT
129 void computeIMT(Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs);
130#endif
131
132 const std::vector<void (*)(Batches &)> _computeFunctions;
133};
134
135#ifdef ROOBATCHCOMPUTE_USE_IMT
136void RooBatchComputeClass::computeIMT(Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs)
137{
138 std::size_t nEvents = output.size();
139
140 if (nEvents == 0)
141 return;
143 std::size_t nThreads = ex.GetPoolSize();
144
145 std::size_t nEventsPerThread = nEvents / nThreads + (nEvents % nThreads > 0);
146
147 // Reset the number of threads to the number we actually need given nEventsPerThread
148 nThreads = nEvents / nEventsPerThread + (nEvents % nEventsPerThread > 0);
149
150 auto task = [&](std::size_t idx) -> int {
151 // Fill a std::vector<Batches> with the same object and with ~nEvents/nThreads
152 // Then advance every object but the first to split the work between threads
153 Batches batches;
154 std::vector<Batch> arrays(vars.size());
155 fillBatches(batches, output.data(), nEventsPerThread, vars.size(), extraArgs);
156 fillArrays(arrays, vars, nEvents);
157 batches.args = arrays.data();
158 advance(batches, batches.nEvents * idx);
159
160 // Set the number of events of the last Batches object as the remaining events
161 if (idx == nThreads - 1) {
162 batches.nEvents = nEvents - idx * batches.nEvents;
163 }
164
165 std::size_t events = batches.nEvents;
166 batches.nEvents = bufferSize;
167 while (events > bufferSize) {
168 _computeFunctions[computer](batches);
169 advance(batches, bufferSize);
170 events -= bufferSize;
171 }
172 batches.nEvents = events;
173 _computeFunctions[computer](batches);
174 return 0;
175 };
176
177 std::vector<std::size_t> indices(nThreads);
178 for (unsigned int i = 1; i < nThreads; i++) {
179 indices[i] = i;
180 }
181 ex.Map(task, indices);
182}
183#endif
184
185/** Compute multiple values using optimized functions.
186This method creates a Batches object and passes it to the correct compute function.
187In case Implicit Multithreading is enabled, the events to be processed are equally
188divided among the tasks to be generated and computed in parallel.
189\param computer An enum specifying the compute function to be used.
190\param output The array where the computation results are stored.
191\param vars A std::span containing pointers to the variables involved in the computation.
192\param extraArgs An optional std::span containing extra double values that may participate in the computation. **/
193void RooBatchComputeClass::compute(Config const &, Computer computer, std::span<double> output, VarSpan vars,
194 ArgSpan extraArgs)
195{
196 // In the original implementation of this library, the evaluation was done
197 // multi-threaded in implicit multi-threading was enabled in ROOT with
198 // ROOT::EnableImplicitMT().
199 //
200 // However, this multithreaded mode was not carefully validated and is
201 // therefore not production ready. One would first have to study the
202 // overhead for different numbers of cores, number of events, and model
203 // complexity. The, we should only consider implicit multithreading here if
204 // there is no performance penalty for any scenario, to not surprise the
205 // users with unexpected slowdows!
206 //
207 // Note that the priority of investigating this is not high, because RooFit
208 // R & D efforts currently go in the direction of parallelization at the
209 // level of the gradient components, or achieving single-threaded speedup
210 // with automatic differentiation. Furthermore, the single-threaded
211 // performance of the new CPU evaluation backend with the RooBatchCompute
212 // library, is generally much faster than the legacy evaluation backend
213 // already, even if the latter uses multi-threading.
214#ifdef ROOBATCHCOMPUTE_USE_IMT
216 computeIMT(computer, output, vars, extraArgs);
217 }
218#endif
219
220 std::size_t nEvents = output.size();
221
222 // Fill a std::vector<Batches> with the same object and with ~nEvents/nThreads
223 // Then advance every object but the first to split the work between threads
224 Batches batches;
225 std::vector<Batch> arrays(vars.size());
226 fillBatches(batches, output.data(), nEvents, vars.size(), extraArgs);
227 fillArrays(arrays, vars, nEvents);
228 batches.args = arrays.data();
229
230 std::size_t events = batches.nEvents;
231 batches.nEvents = bufferSize;
232 while (events > bufferSize) {
233 _computeFunctions[computer](batches);
234 advance(batches, bufferSize);
235 events -= bufferSize;
236 }
237 batches.nEvents = events;
238 _computeFunctions[computer](batches);
239}
240
241namespace {
242
243inline std::pair<double, double> getLog(double prob, ReduceNLLOutput &out)
244{
245 if (std::abs(prob) > 1e6) {
246 out.nLargeValues++;
247 }
248
249 if (prob <= 0.0) {
250 out.nNonPositiveValues++;
251 return {std::log(prob), -prob};
252 }
253
254 if (std::isnan(prob)) {
255 out.nNaNValues++;
256 return {prob, RooNaNPacker::unpackNaN(prob)};
257 }
258
259 return {std::log(prob), 0.0};
260}
261
262} // namespace
263
264double RooBatchComputeClass::reduceSum(Config const &, InputArr input, size_t n)
265{
267}
268
269ReduceNLLOutput RooBatchComputeClass::reduceNLL(Config const &, std::span<const double> probas,
270 std::span<const double> weights, std::span<const double> offsetProbas)
271{
272 ReduceNLLOutput out;
273
274 double badness = 0.0;
275
277
278 for (std::size_t i = 0; i < probas.size(); ++i) {
279
280 const double eventWeight = weights.size() > 1 ? weights[i] : weights[0];
281
282 if (0. == eventWeight)
283 continue;
284
285 std::pair<double, double> logOut = getLog(probas[i], out);
286 double term = logOut.first;
287 badness += logOut.second;
288
289 if (!offsetProbas.empty()) {
290 term -= std::log(offsetProbas[i]);
291 }
292
293 term *= -eventWeight;
294
295 nllSum.Add(term);
296 }
297
298 out.nllSum = nllSum.Sum();
299 out.nllSumCarry = nllSum.Carry();
300
301 if (badness != 0.) {
302 // Some events with evaluation errors. Return "badness" of errors.
303 out.nllSum = RooNaNPacker::packFloatIntoNaN(badness);
304 out.nllSumCarry = 0.0;
305 }
306
307 return out;
308}
309
310namespace {
311
312class ScalarBufferContainer {
313public:
314 ScalarBufferContainer() {}
315 ScalarBufferContainer(std::size_t size)
316 {
317 if (size != 1)
318 throw std::runtime_error("ScalarBufferContainer can only be of size 1");
319 }
320
321 double const *hostReadPtr() const { return &_val; }
322 double const *deviceReadPtr() const { return &_val; }
323
324 double *hostWritePtr() { return &_val; }
325 double *deviceWritePtr() { return &_val; }
326
327 void assignFromHost(std::span<const double> input) { _val = input[0]; }
328 void assignFromDevice(std::span<const double>) { throw std::bad_function_call(); }
329
330private:
331 double _val;
332};
333
334class CPUBufferContainer {
335public:
336 CPUBufferContainer(std::size_t size) : _vec(size) {}
337
338 double const *hostReadPtr() const { return _vec.data(); }
339 double const *deviceReadPtr() const
340 {
341 throw std::bad_function_call();
342 return nullptr;
343 }
344
345 double *hostWritePtr() { return _vec.data(); }
346 double *deviceWritePtr()
347 {
348 throw std::bad_function_call();
349 return nullptr;
350 }
351
352 void assignFromHost(std::span<const double> input) { _vec.assign(input.begin(), input.end()); }
353 void assignFromDevice(std::span<const double>) { throw std::bad_function_call(); }
354
355private:
356 std::vector<double> _vec;
357};
358
359template <class Container>
360class BufferImpl : public AbsBuffer {
361public:
362 using Queue = std::queue<std::unique_ptr<Container>>;
363
364 BufferImpl(std::size_t size, Queue &queue) : _queue{queue}
365 {
366 if (_queue.empty()) {
367 _vec = std::make_unique<Container>(size);
368 } else {
369 _vec = std::move(_queue.front());
370 _queue.pop();
371 }
372 }
373
374 ~BufferImpl() override { _queue.emplace(std::move(_vec)); }
375
376 double const *hostReadPtr() const override { return _vec->hostReadPtr(); }
377 double const *deviceReadPtr() const override { return _vec->deviceReadPtr(); }
378
379 double *hostWritePtr() override { return _vec->hostWritePtr(); }
380 double *deviceWritePtr() override { return _vec->deviceWritePtr(); }
381
382 void assignFromHost(std::span<const double> input) override { _vec->assignFromHost(input); }
383 void assignFromDevice(std::span<const double> input) override { _vec->assignFromDevice(input); }
384
385 Container &vec() { return *_vec; }
386
387private:
388 std::unique_ptr<Container> _vec;
389 Queue &_queue;
390};
391
392using ScalarBuffer = BufferImpl<ScalarBufferContainer>;
393using CPUBuffer = BufferImpl<CPUBufferContainer>;
394
395struct BufferQueuesMaps {
396 std::map<std::size_t, ScalarBuffer::Queue> scalarBufferQueuesMap;
397 std::map<std::size_t, CPUBuffer::Queue> cpuBufferQueuesMap;
398};
399
400class BufferManager : public AbsBufferManager {
401
402public:
403 BufferManager() : _queuesMaps{std::make_unique<BufferQueuesMaps>()} {}
404
405 std::unique_ptr<AbsBuffer> makeScalarBuffer() override
406 {
407 return std::make_unique<ScalarBuffer>(1, _queuesMaps->scalarBufferQueuesMap[1]);
408 }
409 std::unique_ptr<AbsBuffer> makeCpuBuffer(std::size_t size) override
410 {
411 return std::make_unique<CPUBuffer>(size, _queuesMaps->cpuBufferQueuesMap[size]);
412 }
413 std::unique_ptr<AbsBuffer> makeGpuBuffer(std::size_t) override { throw std::bad_function_call(); }
414 std::unique_ptr<AbsBuffer> makePinnedBuffer(std::size_t, CudaInterface::CudaStream * = nullptr) override
415 {
416 throw std::bad_function_call();
417 }
418
419private:
420 std::unique_ptr<BufferQueuesMaps> _queuesMaps;
421};
422
423} // namespace
424
425std::unique_ptr<AbsBufferManager> RooBatchComputeClass::createBufferManager() const
426{
427 return std::make_unique<BufferManager>();
428}
429
430/// Static object to trigger the constructor which overwrites the dispatch pointer.
432
433} // End namespace RF_ARCH
434} // End namespace RooBatchCompute
#define RF_ARCH
#define c(i)
Definition RSha256.hxx:101
std::vector< double > _vec
double _val
std::map< std::size_t, CPUBuffer::Queue > cpuBufferQueuesMap
std::map< std::size_t, ScalarBuffer::Queue > scalarBufferQueuesMap
Queue & _queue
std::unique_ptr< BufferQueuesMaps > _queuesMaps
#define _QUOTEVAL_(x)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
These classes encapsulate the necessary data for the computations.
This class implements the interface to execute the same task multiple times, sequentially or in paral...
Definition TExecutor.hxx:37
unsigned GetPoolSize() const
Return the number of pooled workers.
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:122
T Sum() const
Definition Util.h:240
static KahanSum< T, N > Accumulate(Iterator begin, Iterator end, T initialValue=T{})
Iterate over a range and return an instance of a KahanSum.
Definition Util.h:211
T Carry() const
Definition Util.h:250
void Add(T x)
Single-element accumulation. Will not vectorise.
Definition Util.h:165
std::size_t nEvents
Definition Batches.h:46
This class overrides some RooBatchComputeInterface functions, for the purpose of providing a cuda spe...
double reduceSum(Config const &, InputArr input, size_t n) override
void deleteCudaStream(CudaInterface::CudaStream *) const override
CudaInterface::CudaStream * newCudaStream() const override
std::unique_ptr< AbsBufferManager > createBufferManager() const override
CudaInterface::CudaEvent * newCudaEvent(bool) const override
bool cudaStreamIsActive(CudaInterface::CudaStream *) const override
ReduceNLLOutput reduceNLL(Config const &, std::span< const double > probas, std::span< const double > weights, std::span< const double > offsetProbas) override
void cudaStreamWaitForEvent(CudaInterface::CudaStream *, CudaInterface::CudaEvent *) const override
std::string architectureName() const override
void cudaEventRecord(CudaInterface::CudaEvent *, CudaInterface::CudaStream *) const override
void compute(Config const &, Computer computer, std::span< double > output, VarSpan vars, ArgSpan extraArgs) override
void deleteCudaEvent(CudaInterface::CudaEvent *) const override
Architecture architecture() const override
Minimal configuration struct to steer the evaluation of a single node with the RooBatchCompute librar...
const Int_t n
Definition legend1.C:16
Double_t ex[n]
Definition legend1.C:17
void(off) SmallVectorTemplateBase< T
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
std::vector< void(*)(Batches &)> getFunctions()
Returns a std::vector of pointers to the compute functions in this file.
static RooBatchComputeClass computeObj
Static object to trigger the constructor which overwrites the dispatch pointer.
Namespace for dispatching RooFit computations to various backends.
std::span< double > ArgSpan
R__EXTERN RooBatchComputeInterface * dispatchCPU
This dispatch pointer points to an implementation of the compute library, provided one has been loade...
constexpr std::size_t bufferSize
const double *__restrict InputArr
std::span< const std::span< const double > > VarSpan
void probas(TString dataset, TString fin="TMVA.root", Bool_t useTMVAStyle=kTRUE)
__roodevice__ static __roohost__ double packFloatIntoNaN(float payload)
Pack float into mantissa of a NaN.
static float unpackNaN(double val)
If val is NaN and a this NaN has been tagged as containing a payload, unpack the float from the manti...
static void output()