Logo ROOT  
Reference Guide
RTaskArena.cxx
Go to the documentation of this file.
1// Require TBB without captured exceptions
2#define TBB_USE_CAPTURED_EXCEPTION 0
3
4#include "ROOT/RTaskArena.hxx"
6#include "TError.h"
7#include "TROOT.h"
8#include "TThread.h"
9#include <fstream>
10#include <mutex>
11#include <thread>
12#include "tbb/task_arena.h"
13#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
14#include "tbb/global_control.h"
15
16//////////////////////////////////////////////////////////////////////////
17///
18/// \class ROOT::Internal::RTaskArenaWrapper
19/// \ingroup Parallelism
20/// \brief Wrapper over tbb::task_arena
21///
22/// This class is a wrapper over tbb::task_arena, in order to keep
23/// TBB away from ROOT's headers. We keep a single global instance to be
24/// used by any parallel ROOT class with TBB as a backend.
25///
26/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
27/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
28/// will return areference to the only pointer to the TBB scheduler that
29/// will be active in any ROOT Process.
30///
31/// #### Examples:
32/// ~~~{.cpp}
33/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
34/// //it with nWorkers. Enable thread safety in ROOT
35/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
36/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
37/// //call operations such as execute)
38/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
39/// ~~~
40///
41//////////////////////////////////////////////////////////////////////////
42
43namespace ROOT {
44namespace Internal {
45
46// To honor cgroup quotas if set: see https://github.com/oneapi-src/oneTBB/issues/190
48{
49#ifdef R__LINUX
50 // Check for CFS bandwith control
51 std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
52 if (f) {
53 float cfs_quota;
54 f >> cfs_quota;
55 f.close();
56 if (cfs_quota > 0) {
57 f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
58 float cfs_period;
59 f >> cfs_period;
60 f.close();
61 return static_cast<int>(std::ceil(cfs_quota / cfs_period));
62 }
63 }
64#endif
65 return std::thread::hardware_concurrency();
66}
67
68////////////////////////////////////////////////////////////////////////////////
69/// Initializes the tbb::task_arena within RTaskArenaWrapper.
70///
71/// * Can't be reinitialized
72/// * Checks for CPU bandwidth control and avoids oversubscribing
73/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
74/// which is CPU affinity aware
75////////////////////////////////////////////////////////////////////////////////
76RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new ROpaqueTaskArena{})
77{
78 const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
79 maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads;
80 const unsigned bcCpus = LogicalCPUBandwithControl();
81 if (maxConcurrency > bcCpus) {
82 Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
83 maxConcurrency = bcCpus;
84 }
85 if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
86 Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
87 "from this task arena available for execution.");
88 }
89 fTBBArena->initialize(maxConcurrency);
90 fNWorkers = maxConcurrency;
92}
93
95{
96 fNWorkers = 0u;
97}
98
100
102{
103 return fNWorkers;
104}
105////////////////////////////////////////////////////////////////////////////////
106/// Provides access to the wrapped tbb::task_arena.
107////////////////////////////////////////////////////////////////////////////////
109{
110 return *fTBBArena;
111}
112
113std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
114{
115 static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;
116
117 static std::mutex m;
118 const std::lock_guard<std::mutex> lock{m};
119 if (auto sp = weak_GTAWrapper.lock()) {
120 if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
121 Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
122 sp->TaskArenaSize());
123 }
124 return sp;
125 }
126 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency));
127 weak_GTAWrapper = sp;
128 return sp;
129}
130
131} // namespace Internal
132} // namespace ROOT
#define f(i)
Definition: RSha256.hxx:104
double ceil(double)
Wrapper for tbb::task_arena.
Definition: RTaskArena.hxx:63
ROOT::ROpaqueTaskArena & Access()
Provides access to the wrapped tbb::task_arena.
Definition: RTaskArena.cxx:108
RTaskArenaWrapper(unsigned maxConcurrency=0)
Initializes the tbb::task_arena within RTaskArenaWrapper.
Definition: RTaskArena.cxx:76
std::unique_ptr< ROOT::ROpaqueTaskArena > fTBBArena
Definition: RTaskArena.hxx:71
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
Definition: RTaskArena.cxx:113
int LogicalCPUBandwithControl()
Returns the available number of logical cores.
Definition: RTaskArena.cxx:47
void Warning(const char *location, const char *va_(fmt),...)
Definition: TClingUtils.h:819
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: RNumpyDS.hxx:30
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition: TROOT.cxx:493
auto * m
Definition: textangle.C:8