Logo ROOT  
Reference Guide
TThreadedObject.hxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Danilo Piparo, CERN 11/2/2016
3
4/*************************************************************************
5 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadedObject
13#define ROOT_TThreadedObject
14
15#include "ROOT/TSpinMutex.hxx"
16#include "TDirectory.h"
17#include "TError.h"
18#include "TList.h"
19#include "TROOT.h"
20
21
22#include <algorithm>
23#include <exception>
24#include <deque>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <sstream>
30#include <string>
31#include <thread>
32#include <vector>
33
34class TH1;
35
36namespace ROOT {
37
38 /**
39 * \class ROOT::TNumSlots
40 * \brief Defines the number of threads in some of ROOT's interfaces.
41 */
42 struct TNumSlots {
43 unsigned int fVal; // number of slots
44 friend bool operator==(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal == rhs.fVal; }
45 friend bool operator!=(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal != rhs.fVal; }
46 };
47
48 namespace Internal {
49
50 namespace TThreadedObjectUtils {
51
52
53 template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
54 struct Detacher{
55 static T* Detach(T* obj) {
56 return obj;
57 }
58 };
59
60 template<typename T>
61 struct Detacher<T, true>{
62 static T* Detach(T* obj) {
63 obj->SetDirectory(nullptr);
64 obj->ResetBit(kMustCleanup);
65 return obj;
66 }
67 };
68
69 /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
70 template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
71 struct Cloner {
72 static T *Clone(const T *obj, TDirectory* d = nullptr) {
73 T* clone;
74 if (d){
76 clone = new T(*obj);
77 } else {
78 clone = new T(*obj);
79 }
80 return Detacher<T>::Detach(clone);
81 }
82 };
83
84 template<class T>
85 struct Cloner<T, false> {
86 static T *Clone(const T *obj, TDirectory* d = nullptr) {
87 T* clone;
88 if (d){
90 clone = (T*)obj->Clone();
91 } else {
92 clone = (T*)obj->Clone();
93 }
94 return clone;
95 }
96 };
97
98 template <class T, bool ISHISTO = std::is_base_of<TH1, T>::value>
99 struct DirCreator {
101 {
102 static unsigned dirCounter = 0;
103 const std::string dirName = "__TThreaded_dir_" + std::to_string(dirCounter++) + "_";
104 return gROOT->mkdir(dirName.c_str());
105 }
106 };
107
108 template <class T>
109 struct DirCreator<T, true> {
110 static TDirectory *Create() { return nullptr; }
111 };
112
113 struct MaxSlots_t {
114 unsigned fVal;
115 operator unsigned() const { return fVal; }
116 MaxSlots_t &operator=(unsigned val)
117 R__DEPRECATED(6, 24,
118 "TThreadedObject now adds new slots as needed, on demand, possibly beyond fgMaxSlots")
119 {
120 fVal = val;
121 return *this;
122 }
123 };
124
125 } // End of namespace TThreadedObjectUtils
126 } // End of namespace Internal
127
128 namespace TThreadedObjectUtils {
129
130 template<class T>
131 using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
132
133 /// Merge TObjects
134 template<class T>
135 void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
136 {
137 if (!target) return;
138 TList objTList;
139 // Cannot do better than this
140 for (auto obj : objs) {
141 if (obj && obj != target) objTList.Add(obj.get());
142 }
143 target->Merge(&objTList);
144 }
145 } // end of namespace TThreadedObjectUtils
146
147 /**
148 * \class ROOT::TThreadedObject
149 * \brief A wrapper to make object instances thread private, lazily.
150 * \tparam T Class of the object to be made thread private (e.g. TH1F)
151 * \ingroup Multicore
152 *
153 * A wrapper which makes objects thread private. The methods of the underlying
154 * object can be invoked via the arrow operator. The object is created in
155 * a specific thread lazily, i.e. upon invocation of one of its methods.
156 * The correct object pointer from within a particular thread can be accessed
157 * with the overloaded arrow operator or with the Get method.
158 * In case an elaborate thread management is in place, e.g. in presence of
159 * stream of operations or "processing slots", it is also possible to
160 * manually select the correct object pointer explicitly.
161 */
162 template<class T>
164 public:
165 /// The initial number of empty processing slots that a TThreadedObject is constructed with by default.
166 /// Deprecated: TThreadedObject grows as more slots are required.
168
170
171 /// Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private objects.
172 /// \param initSlots Set the initial number of slots of the TThreadedObject.
173 /// \tparam ARGS Arguments of the constructor of T
174 ///
175 /// This form of the constructor is useful to manually pre-set the content of a given number of slots
176 /// when used in combination with TThreadedObject::SetAtSlot().
177 template <class... ARGS>
178 TThreadedObject(TNumSlots initSlots, ARGS &&... args) : fIsMerged(false)
179 {
180 const auto nSlots = initSlots.fVal;
181 fObjPointers.resize(nSlots);
182
183 // create at least one directory (we need it for fModel), plus others as needed by the size of fObjPointers
185 for (auto i = 1u; i < nSlots; ++i)
187
189 fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
190 }
191
192 /// Construct the TThreadedObject and the "model" of the thread private objects.
193 /// \tparam ARGS Arguments of the constructor of T
194 template<class ...ARGS>
196
197 /// Return the number of currently available slot.
198 ///
199 /// The method is safe to call concurrently to other TThreadedObject methods.
200 /// Note that slots could be available but contain no data (i.e. a nullptr) if
201 /// they have not been used yet.
202 unsigned GetNSlots() const
203 {
204 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
205 return fObjPointers.size();
206 }
207
208 /// Access a particular processing slot.
209 ///
210 /// This method is thread-safe as long as concurrent calls request different slots (i.e. pass a different
211 /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of GetAtSlot
212 /// with usage of the arrow operator can be dangerous.
213 std::shared_ptr<T> GetAtSlot(unsigned i)
214 {
215 std::size_t nAvailableSlots;
216 {
217 // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
218 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
219 nAvailableSlots = fObjPointers.size();
220 }
221
222 if (i >= nAvailableSlots) {
223 Warning("TThreadedObject::GetAtSlot", "This slot does not exist.");
224 return nullptr;
225 }
226
227 auto &objPointer = fObjPointers[i];
228 if (!objPointer)
230 return objPointer;
231 }
232
233 /// Set the value of a particular slot.
234 ///
235 /// This method is thread-safe as long as concurrent calls access different slots (i.e. pass a different
236 /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of SetAtSlot
237 /// with usage of the arrow operator can be dangerous.
238 void SetAtSlot(unsigned i, std::shared_ptr<T> v)
239 {
240 std::size_t nAvailableSlots;
241 {
242 // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
243 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
244 nAvailableSlots = fObjPointers.size();
245 }
246
247 if (i >= nAvailableSlots) {
248 Warning("TThreadedObject::SetAtSlot", "This slot does not exist, doing nothing.");
249 return;
250 }
251
252 fObjPointers[i] = v;
253 }
254
255 /// Access a particular slot which corresponds to a single thread.
256 /// This is in general faster than the GetAtSlot method but it is
257 /// responsibility of the caller to make sure that the slot exists
258 /// and to check that the contained object is initialized (and not
259 /// a nullptr).
260 std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
261 {
262 return fObjPointers[i];
263 }
264
265 /// Access a particular slot which corresponds to a single thread.
266 /// This overload is faster than the GetAtSlotUnchecked method but
267 /// the caller is responsible to make sure that the slot exists, to
268 /// check that the contained object is initialized and that the returned
269 /// pointer will not outlive the TThreadedObject that returned it, which
270 /// maintains ownership of the actual object.
271 T* GetAtSlotRaw(unsigned i) const
272 {
273 return fObjPointers[i].get();
274 }
275
276 /// Access the pointer corresponding to the current slot. This method is
277 /// not adequate for being called inside tight loops as it implies a
278 /// lookup in a mapping between the threadIDs and the slot indices.
279 /// A good practice consists in copying the pointer onto the stack and
280 /// proceed with the loop as shown in this work item (psudo-code) which
281 /// will be sent to different threads:
282 /// ~~~{.cpp}
283 /// auto workItem = [](){
284 /// auto objPtr = tthreadedObject.Get();
285 /// for (auto i : ROOT::TSeqI(1000)) {
286 /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
287 /// objPtr->FastMethod(i);
288 /// }
289 /// }
290 /// ~~~
291 std::shared_ptr<T> Get()
292 {
294 }
295
296 /// Access the wrapped object and allow to call its methods.
298 {
299 return Get().get();
300 }
301
302 /// Merge all the thread private objects. Can be called once: it does not
303 /// create any new object but destroys the present bookkeping collapsing
304 /// all objects into the one at slot 0.
305 std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
306 {
307 // We do not return if we already merged.
308 if (fIsMerged) {
309 Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
310 return fObjPointers[0];
311 }
312 // need to convert to std::vector because historically mergeFunction requires a vector
313 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
314 mergeFunction(fObjPointers[0], vecOfObjPtrs);
315 fIsMerged = true;
316 return fObjPointers[0];
317 }
318
319 /// Merge all the thread private objects. Can be called many times. It
320 /// does create a new instance of class T to represent the "Sum" object.
321 /// This method is not thread safe: correct or acceptable behaviours
322 /// depend on the nature of T and of the merging function.
323 std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
324 {
325 if (fIsMerged) {
326 Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
327 return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
328 }
330 std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
331 // need to convert to std::vector because historically mergeFunction requires a vector
332 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
333 mergeFunction(targetPtrShared, vecOfObjPtrs);
334 return std::unique_ptr<T>(targetPtr);
335 }
336
337 private:
338 std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
339 // std::deque's guarantee that references to the elements are not invalidated when appending new slots
340 std::deque<std::shared_ptr<T>> fObjPointers; ///< An object pointer per slot
341 // If the object is a histogram, we also create dummy directories that the histogram associates with
342 // so we do not pollute gDirectory
343 std::deque<TDirectory*> fDirectories; ///< A TDirectory per slot
344 std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
345 mutable ROOT::TSpinMutex fSpinMutex; ///< Protects concurrent access to fThrIDSlotMap, fObjPointers
346 bool fIsMerged : 1; ///< Remember if the objects have been merged already
347
348 /// Get the slot number for this threadID, make a slot if needed
350 {
351 const auto thisThreadID = std::this_thread::get_id();
352 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
353 const auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
354 if (thisSlotNumIt != fThrIDSlotMap.end())
355 return thisSlotNumIt->second;
356 const auto newIndex = fThrIDSlotMap.size();
357 fThrIDSlotMap[thisThreadID] = newIndex;
358 R__ASSERT(newIndex <= fObjPointers.size() && "This should never happen, we should create new slots as needed");
359 if (newIndex == fObjPointers.size()) {
361 fObjPointers.emplace_back(nullptr);
362 }
363 return newIndex;
364 }
365 };
366
367 template<class T> Internal::TThreadedObjectUtils::MaxSlots_t TThreadedObject<T>::fgMaxSlots{64};
368
369} // End ROOT namespace
370
371////////////////////////////////////////////////////////////////////////////////
372/// Print a TThreadedObject at the prompt:
373
374namespace cling {
375 template<class T>
376 std::string printValue(ROOT::TThreadedObject<T> *val)
377 {
378 auto model = ((std::unique_ptr<T>*)(val))->get();
379 std::ostringstream ret;
380 ret << "A wrapper to make object instances thread private, lazily. "
381 << "The model which is replicated is " << printValue(model);
382 return ret.str();
383 }
384}
385
386
387#endif
#define R__DEPRECATED(MAJOR, MINOR, REASON)
Definition: RConfig.hxx:522
#define d(i)
Definition: RSha256.hxx:102
#define R__ASSERT(e)
Definition: TError.h:96
void Warning(const char *location, const char *msgfmt,...)
@ kMustCleanup
Definition: TObject.h:355
#define gROOT
Definition: TROOT.h:406
typedef void((*Func_t)())
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:42
A wrapper to make object instances thread private, lazily.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
T * operator->()
Access the wrapped object and allow to call its methods.
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::deque< TDirectory * > fDirectories
A TDirectory per slot.
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
ROOT::TSpinMutex fSpinMutex
Protects concurrent access to fThrIDSlotMap, fObjPointers.
TThreadedObject(const TThreadedObject &)=delete
std::unique_ptr< T > fModel
Use to store a "model" of the object.
std::deque< std::shared_ptr< T > > fObjPointers
An object pointer per slot.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetThisSlotNumber()
Get the slot number for this threadID, make a slot if needed.
static Internal::TThreadedObjectUtils::MaxSlots_t fgMaxSlots
The initial number of empty processing slots that a TThreadedObject is constructed with by default.
TThreadedObject(TNumSlots initSlots, ARGS &&... args)
Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private object...
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetNSlots() const
Return the number of currently available slot.
TThreadedObject(ARGS &&... args)
Construct the TThreadedObject and the "model" of the thread private objects.
bool fIsMerged
Remember if the objects have been merged already.
Small helper to keep current directory context.
Definition: TDirectory.h:47
Describe directory structure in memory.
Definition: TDirectory.h:40
The TH1 histogram class.
Definition: TH1.h:56
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
double T(double x)
Definition: ChebyshevPol.h:34
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T > > &objs)
Merge TObjects.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
static T * Clone(const T *obj, TDirectory *d=nullptr)
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
static T * Clone(const T *obj, TDirectory *d=nullptr)
MaxSlots_t & operator=(unsigned val) R__DEPRECATED(6
Defines the number of threads in some of ROOT's interfaces.
friend bool operator!=(TNumSlots lhs, TNumSlots rhs)
unsigned int fVal
friend bool operator==(TNumSlots lhs, TNumSlots rhs)
#define ARGS(alist)
Definition: gifencode.c:10