Logo ROOT  
Reference Guide
TThreadedObject.hxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Danilo Piparo, CERN 11/2/2016
3
4/*************************************************************************
5 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadedObject
13#define ROOT_TThreadedObject
14
15#include "TList.h"
16#include "TError.h"
17
18#include <functional>
19#include <map>
20#include <memory>
21#include <mutex>
22#include <string>
23#include <thread>
24#include <vector>
25
26#include "ROOT/TSpinMutex.hxx"
27#include "TROOT.h"
28
29class TH1;
30
31namespace ROOT {
32
33 namespace Internal {
34
35 namespace TThreadedObjectUtils {
36
37 /// Get the unique index identifying a TThreadedObject.
38 inline unsigned GetTThreadedObjectIndex() {
39 static unsigned fgTThreadedObjectIndex = 0;
40 return fgTThreadedObjectIndex++;
41 }
42
43 template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
44 struct Detacher{
45 static T* Detach(T* obj) {
46 return obj;
47 }
48 };
49
50 template<typename T>
51 struct Detacher<T, true>{
52 static T* Detach(T* obj) {
53 obj->SetDirectory(nullptr);
54 obj->ResetBit(kMustCleanup);
55 return obj;
56 }
57 };
58
59 /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
60 template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
61 struct Cloner {
62 static T *Clone(const T *obj, TDirectory* d = nullptr) {
63 T* clone;
64 if (d){
66 clone = new T(*obj);
67 } else {
68 clone = new T(*obj);
69 }
70 return Detacher<T>::Detach(clone);
71 }
72 };
73
74 template<class T>
75 struct Cloner<T, false> {
76 static T *Clone(const T *obj, TDirectory* d = nullptr) {
77 T* clone;
78 if (d){
80 clone = (T*)obj->Clone();
81 } else {
82 clone = (T*)obj->Clone();
83 }
84 return clone;
85 }
86 };
87
88 template<class T, bool ISHISTO = std::is_base_of<TH1,T>::value>
89 struct DirCreator{
90 static std::vector<TDirectory*> Create(unsigned maxSlots) {
91 std::string dirName = "__TThreaded_dir_";
93 std::vector<TDirectory*> dirs;
94 dirs.reserve(maxSlots);
95 for (unsigned i=0; i< maxSlots;++i) {
96 auto dir = gROOT->mkdir((dirName+std::to_string(i)).c_str());
97 dirs.emplace_back(dir);
98 }
99 return dirs;
100 }
101 };
102
103 template<class T>
104 struct DirCreator<T, true>{
105 static std::vector<TDirectory*> Create(unsigned maxSlots) {
106 std::vector<TDirectory*> dirs(maxSlots, nullptr);
107 return dirs;
108 }
109 };
110
111 } // End of namespace TThreadedObjectUtils
112 } // End of namespace Internals
113
114 namespace TThreadedObjectUtils {
115
116 template<class T>
117 using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
118 /// Merge TObjects
119 template<class T>
120 void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
121 {
122 if (!target) return;
123 TList objTList;
124 // Cannot do better than this
125 for (auto obj : objs) {
126 if (obj && obj != target) objTList.Add(obj.get());
127 }
128 target->Merge(&objTList);
129 }
130 } // end of namespace TThreadedObjectUtils
131
132 /**
133 * \class ROOT::TThreadedObject
134 * \brief A wrapper to make object instances thread private, lazily.
135 * \tparam T Class of the object to be made thread private (e.g. TH1F)
136 * \ingroup Multicore
137 *
138 * A wrapper which makes objects thread private. The methods of the underlying
139 * object can be invoked via the the arrow operator. The object is created in
140 * a specific thread lazily, i.e. upon invocation of one of its methods.
141 * The correct object pointer from within a particular thread can be accessed
142 * with the overloaded arrow operator or with the Get method.
143 * In case an elaborate thread management is in place, e.g. in presence of
144 * stream of operations or "processing slots", it is also possible to
145 * manually select the correct object pointer explicitly.
146 * The default size of the threaded objects is 64. This size can be extended
147 * manually via the fgMaxSlots parameter. The size of individual instances
148 * is automatically extended if the size of the implicit MT pool is bigger
149 * than 64.
150 *
151 */
152 template<class T>
154 public:
155 static unsigned fgMaxSlots; ///< The maximum number of processing slots (distinct threads) which the instances can manage
157 /// Construct the TThreaded object and the "model" of the thread private
158 /// objects.
159 /// \tparam ARGS Arguments of the constructor of T
160 template<class ...ARGS>
162 {
163 const auto imtPoolSize = ROOT::GetImplicitMTPoolSize();
164 fMaxSlots = (64 > imtPoolSize) ? fgMaxSlots : imtPoolSize;
165 fObjPointers = std::vector<std::shared_ptr<T>>(fMaxSlots, nullptr);
167
169 fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
170 }
171
172 /// Access a particular processing slot. This
173 /// method is *thread-unsafe*: it cannot be invoked from two different
174 /// threads with the same argument.
175 std::shared_ptr<T> GetAtSlot(unsigned i)
176 {
177 if ( i >= fObjPointers.size()) {
178 Warning("TThreadedObject::GetAtSlot", "Maximum number of slots reached.");
179 return nullptr;
180 }
181 auto objPointer = fObjPointers[i];
182 if (!objPointer) {
184 fObjPointers[i] = objPointer;
185 }
186 return objPointer;
187 }
188
189 /// Set the value of a particular slot.
190 void SetAtSlot(unsigned i, std::shared_ptr<T> v)
191 {
192 fObjPointers[i] = v;
193 }
194
195 /// Access a particular slot which corresponds to a single thread.
196 /// This is in general faster than the GetAtSlot method but it is
197 /// responsibility of the caller to make sure that an object is
198 /// initialised for the particular slot.
199 std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
200 {
201 return fObjPointers[i];
202 }
203
204 /// Access a particular slot which corresponds to a single thread.
205 /// This overload is faster than the GetAtSlotUnchecked method but
206 /// the caller is responsible to make sure that an object is
207 /// initialised for the particular slot and that the returned pointer
208 /// will not outlive the TThreadedObject that returned it.
209 T* GetAtSlotRaw(unsigned i) const
210 {
211 return fObjPointers[i].get();
212 }
213
214 /// Access the pointer corresponding to the current slot. This method is
215 /// not adequate for being called inside tight loops as it implies a
216 /// lookup in a mapping between the threadIDs and the slot indices.
217 /// A good practice consists in copying the pointer onto the stack and
218 /// proceed with the loop as shown in this work item (psudo-code) which
219 /// will be sent to different threads:
220 /// ~~~{.cpp}
221 /// auto workItem = [](){
222 /// auto objPtr = tthreadedObject.Get();
223 /// for (auto i : ROOT::TSeqI(1000)) {
224 /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
225 /// objPtr->FastMethod(i);
226 /// }
227 /// }
228 /// ~~~
229 std::shared_ptr<T> Get()
230 {
232 }
233
234 /// Access the wrapped object and allow to call its methods.
236 {
237 return Get().get();
238 }
239
240 /// Merge all the thread private objects. Can be called once: it does not
241 /// create any new object but destroys the present bookkeping collapsing
242 /// all objects into the one at slot 0.
243 std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
244 {
245 // We do not return if we already merged.
246 if (fIsMerged) {
247 Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
248 return fObjPointers[0];
249 }
250 mergeFunction(fObjPointers[0], fObjPointers);
251 fIsMerged = true;
252 return fObjPointers[0];
253 }
254
255 /// Merge all the thread private objects. Can be called many times. It
256 /// does create a new instance of class T to represent the "Sum" object.
257 /// This method is not thread safe: correct or acceptable behaviours
258 /// depend on the nature of T and of the merging function.
259 std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
260 {
261 if (fIsMerged) {
262 Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
263 return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
264 }
266 std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
267 mergeFunction(targetPtrShared, fObjPointers);
268 return std::unique_ptr<T>(targetPtr);
269 }
270
271 private:
272 unsigned fMaxSlots; ///< The size of the instance
273 std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
274 std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
275 std::vector<TDirectory*> fDirectories; ///< A TDirectory per thread is kept.
276 std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
277 unsigned fCurrMaxSlotIndex = 0; ///< The maximum slot index
278 bool fIsMerged = false; ///< Remember if the objects have been merged already
279 ROOT::TSpinMutex fThrIDSlotMutex; ///< Mutex to protect the ID-slot map access
280
281 /// Get the slot number for this threadID.
283 {
284 const auto thisThreadID = std::this_thread::get_id();
285 unsigned thisIndex;
286 {
287 std::lock_guard<ROOT::TSpinMutex> lg(fThrIDSlotMutex);
288 auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
289 if (thisSlotNumIt != fThrIDSlotMap.end()) return thisSlotNumIt->second;
290 thisIndex = fCurrMaxSlotIndex++;
291 fThrIDSlotMap[thisThreadID] = thisIndex;
292 }
293 return thisIndex;
294 }
295
296 };
297
298 template<class T> unsigned TThreadedObject<T>::fgMaxSlots = 64;
299
300} // End ROOT namespace
301
302#include <sstream>
303
304////////////////////////////////////////////////////////////////////////////////
305/// Print a TThreadedObject at the prompt:
306
307namespace cling {
308 template<class T>
309 std::string printValue(ROOT::TThreadedObject<T> *val)
310 {
311 auto model = ((std::unique_ptr<T>*)(val))->get();
312 std::ostringstream ret;
313 ret << "A wrapper to make object instances thread private, lazily. "
314 << "The model which is replicated is " << printValue(model);
315 return ret.str();
316 }
317}
318
319
320#endif
#define d(i)
Definition: RSha256.hxx:102
void Warning(const char *location, const char *msgfmt,...)
@ kMustCleanup
Definition: TObject.h:340
#define gROOT
Definition: TROOT.h:415
typedef void((*Func_t)())
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:42
A wrapper to make object instances thread private, lazily.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
unsigned fMaxSlots
The size of the instance.
T * operator->()
Access the wrapped object and allow to call its methods.
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
TThreadedObject(const TThreadedObject &)=delete
std::unique_ptr< T > fModel
Use to store a "model" of the object.
static unsigned fgMaxSlots
The maximum number of processing slots (distinct threads) which the instances can manage.
unsigned fCurrMaxSlotIndex
The maximum slot index.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetThisSlotNumber()
Get the slot number for this threadID.
ROOT::TSpinMutex fThrIDSlotMutex
Mutex to protect the ID-slot map access.
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
std::vector< TDirectory * > fDirectories
A TDirectory per thread is kept.
TThreadedObject(ARGS &&... args)
Construct the TThreaded object and the "model" of the thread private objects.
bool fIsMerged
Remember if the objects have been merged already.
std::vector< std::shared_ptr< T > > fObjPointers
A pointer per thread is kept.
Small helper to keep current directory context.
Definition: TDirectory.h:41
Describe directory structure in memory.
Definition: TDirectory.h:34
The TH1 histogram class.
Definition: TH1.h:56
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
unsigned GetTThreadedObjectIndex()
Get the unique index identifying a TThreadedObject.
double T(double x)
Definition: ChebyshevPol.h:34
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T > > &objs)
Merge TObjects.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
VSD Structures.
Definition: StringConv.hxx:21
UInt_t GetImplicitMTPoolSize()
Returns the size of the pool used for implicit multi-threading.
Definition: TROOT.cxx:618
static T * Clone(const T *obj, TDirectory *d=nullptr)
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
static T * Clone(const T *obj, TDirectory *d=nullptr)
static std::vector< TDirectory * > Create(unsigned maxSlots)
static std::vector< TDirectory * > Create(unsigned maxSlots)
#define ARGS(alist)
Definition: gifencode.c:10