Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RTaskArena.cxx
Go to the documentation of this file.
1#include "ROOT/RTaskArena.hxx"
3#include "TError.h"
4#include "TROOT.h"
5#include "TSystem.h"
6#include "TThread.h"
7#include <fstream>
8#include <mutex>
9#include <string>
10#include <thread>
11#include <cmath>
12#include "tbb/task_arena.h"
13#include "tbb/global_control.h"
14
15//////////////////////////////////////////////////////////////////////////
16///
17/// \class ROOT::Internal::RTaskArenaWrapper
18/// \brief Wrapper over tbb::task_arena
19///
20/// This class is a wrapper over tbb::task_arena, in order to keep
21/// TBB away from ROOT's headers. We keep a single global instance to be
22/// used by any parallel ROOT class with TBB as a backend.
23///
24/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
25/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
26/// will return areference to the only pointer to the TBB scheduler that
27/// will be active in any ROOT Process.
28///
29/// #### Examples:
30/// ~~~{.cpp}
31/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
32/// //it with nWorkers. Enable thread safety in ROOT
33/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
34/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
35/// //call operations such as execute)
36/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
37/// ~~~
38///
39//////////////////////////////////////////////////////////////////////////
40
41namespace ROOT {
42namespace Internal {
43
44// Honor environment variable `ROOT_MAX_THREADS` if set.
45// Also honor cgroup quotas if set: see https://github.com/oneapi-src/oneTBB/issues/190
47{
48 if (const char *envMaxThreads = gSystem->Getenv("ROOT_MAX_THREADS")) {
49 char *str_end = nullptr;
50 long maxThreads = std::strtol(envMaxThreads, &str_end, 0 /*auto-detect base*/);
51 if (str_end == envMaxThreads && maxThreads == 0) {
52 Error("ROOT::Internal::LogicalCPUBandwidthControl()",
53 "cannot parse number in environment variable ROOT_MAX_THREADS; ignoring.");
54 } else if (maxThreads < 1) {
55 Error("ROOT::Internal::LogicalCPUBandwidthControl()",
56 "environment variable ROOT_MAX_THREADS must be >= 1, but set to %ld; ignoring.",
58 } else
59 return maxThreads;
60 }
61
62#ifdef R__LINUX
63 // Check for CFS bandwith control
64 std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
65 if (f) {
66 float cfs_quota;
67 f >> cfs_quota;
68 f.close();
69 if (cfs_quota > 0) {
70 f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
71 float cfs_period;
72 f >> cfs_period;
73 f.close();
74 return static_cast<int>(std::ceil(cfs_quota / cfs_period));
75 }
76 }
77#endif
78 return std::thread::hardware_concurrency();
79}
80
81////////////////////////////////////////////////////////////////////////////////
82/// Initializes the tbb::task_arena within RTaskArenaWrapper.
83///
84/// * Can't be reinitialized
85/// * Checks for CPU bandwidth control and avoids oversubscribing
86/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
87/// which is CPU affinity aware
88////////////////////////////////////////////////////////////////////////////////
90{
91 const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
93 const unsigned bcCpus = LogicalCPUBandwidthControl();
94 if (maxConcurrency > bcCpus) {
95 Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
97 }
98 if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
99 Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
100 "from this task arena available for execution.");
101 }
102 fTBBArena->initialize(maxConcurrency);
105}
106
107////////////////////////////////////////////////////////////////////////////////
108/// Initializes the tbb::task_arena within RTaskArenaWrapper by attaching to an
109/// existing arena.
110///
111/// * Can't be reinitialized
112////////////////////////////////////////////////////////////////////////////////
114 : fTBBArena(new ROpaqueTaskArena{tbb::task_arena::attach{}})
115{
116 fTBBArena->initialize(tbb::task_arena::attach{});
117 fNWorkers = fTBBArena->max_concurrency();
119}
120
125
127
129{
130 return fNWorkers;
131}
132////////////////////////////////////////////////////////////////////////////////
133/// Provides access to the wrapped tbb::task_arena.
134////////////////////////////////////////////////////////////////////////////////
139
140std::shared_ptr<ROOT::Internal::RTaskArenaWrapper>
142{
143 static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;
144
145 static std::mutex m;
146 const std::lock_guard<std::mutex> lock{m};
147 if (auto sp = weak_GTAWrapper.lock()) {
148 if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
149 Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
150 sp->TaskArenaSize());
151 }
152 return sp;
153 }
154 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp;
156 sp = std::make_shared<ROOT::Internal::RTaskArenaWrapper>(ROOT::Internal::RTaskArenaWrapper::Attach{});
157 } else {
158 if (config == ROOT::EIMTConfig::kWholeMachine) {
159 maxConcurrency = 0;
160 }
161 sp = std::make_shared<ROOT::Internal::RTaskArenaWrapper>(maxConcurrency);
162 }
164 return sp;
165}
166
167std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(ROOT::EIMTConfig config)
168{
169 if (config >= ROOT::EIMTConfig::kNumConfigs)
170 ::Fatal("ROOT::Internal::GetGlobalTaskArena",
171 "Unsupported enum value %d", (int)config);
172 return GetGlobalTaskArena(0, config);
173}
174
175std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
176{
178}
179
180} // namespace Internal
181} // namespace ROOT
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:208
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:267
R__EXTERN TSystem * gSystem
Definition TSystem.h:582
ROOT::ROpaqueTaskArena & Access()
Provides access to the wrapped tbb::task_arena.
RTaskArenaWrapper(unsigned maxConcurrency=0)
Initializes the tbb::task_arena within RTaskArenaWrapper.
std::unique_ptr< ROOT::ROpaqueTaskArena > fTBBArena
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1680
int LogicalCPUBandwidthControl()
Returns the available number of logical cores.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Definition TROOT.cxx:575
EIMTConfig
Definition TROOT.h:83
@ kWholeMachine
Default configuration.
@ kNumConfigs
Number of support IMT semantic configurations.
@ kExistingTBBArena
Use the existing TBB arena.
Marker for attaching to an existing tbb::task_arena.
TMarker m
Definition textangle.C:8