Logo ROOT  
Reference Guide
TThreadedObject.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Danilo Piparo, CERN 11/2/2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadedObject
13 #define ROOT_TThreadedObject
14 
15 #include "ROOT/TSpinMutex.hxx"
16 #include "TDirectory.h"
17 #include "TError.h"
18 #include "TList.h"
19 #include "TROOT.h"
20 
21 
22 #include <algorithm>
23 #include <exception>
24 #include <deque>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <mutex>
29 #include <sstream>
30 #include <string>
31 #include <thread>
32 #include <vector>
33 
34 class TH1;
35 
36 namespace ROOT {
37 
38  /**
39  * \class ROOT::TNumSlots
40  * \brief Defines the number of threads in some of ROOT's interfaces.
41  */
42  struct TNumSlots {
43  unsigned int fVal; // number of slots
44  friend bool operator==(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal == rhs.fVal; }
45  friend bool operator!=(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal != rhs.fVal; }
46  };
47 
48  namespace Internal {
49 
50  namespace TThreadedObjectUtils {
51 
52 
53  template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
54  struct Detacher{
55  static T* Detach(T* obj) {
56  return obj;
57  }
58  };
59 
60  template<typename T>
61  struct Detacher<T, true>{
62  static T* Detach(T* obj) {
63  obj->SetDirectory(nullptr);
64  obj->ResetBit(kMustCleanup);
65  return obj;
66  }
67  };
68 
69  /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
70  template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
71  struct Cloner {
72  static T *Clone(const T *obj, TDirectory* d = nullptr) {
73  T* clone;
74  if (d){
75  TDirectory::TContext ctxt(d);
76  clone = new T(*obj);
77  } else {
78  clone = new T(*obj);
79  }
80  return Detacher<T>::Detach(clone);
81  }
82  };
83 
84  template<class T>
85  struct Cloner<T, false> {
86  static T *Clone(const T *obj, TDirectory* d = nullptr) {
87  T* clone;
88  if (d){
89  TDirectory::TContext ctxt(d);
90  clone = (T*)obj->Clone();
91  } else {
92  clone = (T*)obj->Clone();
93  }
94  return clone;
95  }
96  };
97 
98  template <class T, bool ISHISTO = std::is_base_of<TH1, T>::value>
99  struct DirCreator {
100  static TDirectory *Create()
101  {
102  static unsigned dirCounter = 0;
103  const std::string dirName = "__TThreaded_dir_" + std::to_string(dirCounter++) + "_";
104  return gROOT->mkdir(dirName.c_str());
105  }
106  };
107 
108  template <class T>
109  struct DirCreator<T, true> {
110  static TDirectory *Create() { return nullptr; }
111  };
112 
113  } // End of namespace TThreadedObjectUtils
114  } // End of namespace Internal
115 
116  namespace TThreadedObjectUtils {
117 
118  template<class T>
119  using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
120 
121  /// Merge TObjects
122  template<class T>
123  void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
124  {
125  if (!target) return;
126  TList objTList;
127  // Cannot do better than this
128  for (auto obj : objs) {
129  if (obj && obj != target) objTList.Add(obj.get());
130  }
131  target->Merge(&objTList);
132  }
133  } // end of namespace TThreadedObjectUtils
134 
135  /**
136  * \class ROOT::TThreadedObject
137  * \brief A wrapper to make object instances thread private, lazily.
138  * \tparam T Class of the object to be made thread private (e.g. TH1F)
139  * \ingroup Multicore
140  *
141  * A wrapper which makes objects thread private. The methods of the underlying
142  * object can be invoked via the arrow operator. The object is created in
143  * a specific thread lazily, i.e. upon invocation of one of its methods.
144  * The correct object pointer from within a particular thread can be accessed
145  * with the overloaded arrow operator or with the Get method.
146  * In case an elaborate thread management is in place, e.g. in presence of
147  * stream of operations or "processing slots", it is also possible to
148  * manually select the correct object pointer explicitly.
149  */
150  template<class T>
152  public:
153  /// The initial number of empty processing slots that a TThreadedObject is constructed with by default.
154  /// Deprecated: TThreadedObject grows as more slots are required.
155  static constexpr const TNumSlots fgMaxSlots{64};
156 
158 
159  /// Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private objects.
160  /// \param initSlots Set the initial number of slots of the TThreadedObject.
161  /// \tparam ARGS Arguments of the constructor of T
162  ///
163  /// This form of the constructor is useful to manually pre-set the content of a given number of slots
164  /// when used in combination with TThreadedObject::SetAtSlot().
165  template <class... ARGS>
166  TThreadedObject(TNumSlots initSlots, ARGS &&... args) : fIsMerged(false)
167  {
168  const auto nSlots = initSlots.fVal;
169  fObjPointers.resize(nSlots);
170 
171  // create at least one directory (we need it for fModel), plus others as needed by the size of fObjPointers
173  for (auto i = 1u; i < nSlots; ++i)
175 
177  fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
178  }
179 
180  /// Construct the TThreadedObject and the "model" of the thread private objects.
181  /// \tparam ARGS Arguments of the constructor of T
182  template<class ...ARGS>
183  TThreadedObject(ARGS&&... args) : TThreadedObject(fgMaxSlots, args...) { }
184 
185  /// Return the number of currently available slot.
186  ///
187  /// The method is safe to call concurrently to other TThreadedObject methods.
188  /// Note that slots could be available but contain no data (i.e. a nullptr) if
189  /// they have not been used yet.
190  unsigned GetNSlots() const
191  {
192  std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
193  return fObjPointers.size();
194  }
195 
196  /// Access a particular processing slot.
197  ///
198  /// This method is thread-safe as long as concurrent calls request different slots (i.e. pass a different
199  /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of GetAtSlot
200  /// with usage of the arrow operator can be dangerous.
201  std::shared_ptr<T> GetAtSlot(unsigned i)
202  {
203  std::size_t nAvailableSlots;
204  {
205  // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
206  std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
207  nAvailableSlots = fObjPointers.size();
208  }
209 
210  if (i >= nAvailableSlots) {
211  Warning("TThreadedObject::GetAtSlot", "This slot does not exist.");
212  return nullptr;
213  }
214 
215  auto &objPointer = fObjPointers[i];
216  if (!objPointer)
218  return objPointer;
219  }
220 
221  /// Set the value of a particular slot.
222  ///
223  /// This method is thread-safe as long as concurrent calls access different slots (i.e. pass a different
224  /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of SetAtSlot
225  /// with usage of the arrow operator can be dangerous.
226  void SetAtSlot(unsigned i, std::shared_ptr<T> v)
227  {
228  std::size_t nAvailableSlots;
229  {
230  // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
231  std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
232  nAvailableSlots = fObjPointers.size();
233  }
234 
235  if (i >= nAvailableSlots) {
236  Warning("TThreadedObject::SetAtSlot", "This slot does not exist, doing nothing.");
237  return;
238  }
239 
240  fObjPointers[i] = v;
241  }
242 
243  /// Access a particular slot which corresponds to a single thread.
244  /// This is in general faster than the GetAtSlot method but it is
245  /// responsibility of the caller to make sure that the slot exists
246  /// and to check that the contained object is initialized (and not
247  /// a nullptr).
248  std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
249  {
250  return fObjPointers[i];
251  }
252 
253  /// Access a particular slot which corresponds to a single thread.
254  /// This overload is faster than the GetAtSlotUnchecked method but
255  /// the caller is responsible to make sure that the slot exists, to
256  /// check that the contained object is initialized and that the returned
257  /// pointer will not outlive the TThreadedObject that returned it, which
258  /// maintains ownership of the actual object.
259  T* GetAtSlotRaw(unsigned i) const
260  {
261  return fObjPointers[i].get();
262  }
263 
264  /// Access the pointer corresponding to the current slot. This method is
265  /// not adequate for being called inside tight loops as it implies a
266  /// lookup in a mapping between the threadIDs and the slot indices.
267  /// A good practice consists in copying the pointer onto the stack and
268  /// proceed with the loop as shown in this work item (psudo-code) which
269  /// will be sent to different threads:
270  /// ~~~{.cpp}
271  /// auto workItem = [](){
272  /// auto objPtr = tthreadedObject.Get();
273  /// for (auto i : ROOT::TSeqI(1000)) {
274  /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
275  /// objPtr->FastMethod(i);
276  /// }
277  /// }
278  /// ~~~
279  std::shared_ptr<T> Get()
280  {
281  return GetAtSlot(GetThisSlotNumber());
282  }
283 
284  /// Access the wrapped object and allow to call its methods.
286  {
287  return Get().get();
288  }
289 
290  /// Merge all the thread private objects. Can be called once: it does not
291  /// create any new object but destroys the present bookkeping collapsing
292  /// all objects into the one at slot 0.
293  std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
294  {
295  // We do not return if we already merged.
296  if (fIsMerged) {
297  Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
298  return fObjPointers[0];
299  }
300  // need to convert to std::vector because historically mergeFunction requires a vector
301  auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
302  mergeFunction(fObjPointers[0], vecOfObjPtrs);
303  fIsMerged = true;
304  return fObjPointers[0];
305  }
306 
307  /// Merge all the thread private objects. Can be called many times. It
308  /// does create a new instance of class T to represent the "Sum" object.
309  /// This method is not thread safe: correct or acceptable behaviours
310  /// depend on the nature of T and of the merging function.
311  std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
312  {
313  if (fIsMerged) {
314  Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
315  return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
316  }
318  std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
319  // need to convert to std::vector because historically mergeFunction requires a vector
320  auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
321  mergeFunction(targetPtrShared, vecOfObjPtrs);
322  return std::unique_ptr<T>(targetPtr);
323  }
324 
325  private:
326  std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
327  // std::deque's guarantee that references to the elements are not invalidated when appending new slots
328  std::deque<std::shared_ptr<T>> fObjPointers; ///< An object pointer per slot
329  // If the object is a histogram, we also create dummy directories that the histogram associates with
330  // so we do not pollute gDirectory
331  std::deque<TDirectory*> fDirectories; ///< A TDirectory per slot
332  std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
333  mutable ROOT::TSpinMutex fSpinMutex; ///< Protects concurrent access to fThrIDSlotMap, fObjPointers
334  bool fIsMerged : 1; ///< Remember if the objects have been merged already
335 
336  /// Get the slot number for this threadID, make a slot if needed
337  unsigned GetThisSlotNumber()
338  {
339  const auto thisThreadID = std::this_thread::get_id();
340  std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
341  const auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
342  if (thisSlotNumIt != fThrIDSlotMap.end())
343  return thisSlotNumIt->second;
344  const auto newIndex = fThrIDSlotMap.size();
345  fThrIDSlotMap[thisThreadID] = newIndex;
346  R__ASSERT(newIndex <= fObjPointers.size() && "This should never happen, we should create new slots as needed");
347  if (newIndex == fObjPointers.size()) {
349  fObjPointers.emplace_back(nullptr);
350  }
351  return newIndex;
352  }
353  };
354 
355  template<class T>
356  constexpr const TNumSlots TThreadedObject<T>::fgMaxSlots;
357 
358 } // End ROOT namespace
359 
360 ////////////////////////////////////////////////////////////////////////////////
361 /// Print a TThreadedObject at the prompt:
362 
363 namespace cling {
364  template<class T>
365  std::string printValue(ROOT::TThreadedObject<T> *val)
366  {
367  auto model = ((std::unique_ptr<T>*)(val))->get();
368  std::ostringstream ret;
369  ret << "A wrapper to make object instances thread private, lazily. "
370  << "The model which is replicated is " << printValue(model);
371  return ret.str();
372  }
373 }
374 
375 
376 #endif
ROOT::TThreadedObject::TThreadedObject
TThreadedObject(const TThreadedObject &)=delete
ROOT::TThreadedObject::fObjPointers
std::deque< std::shared_ptr< T > > fObjPointers
An object pointer per slot.
Definition: TThreadedObject.hxx:328
ROOT::TNumSlots::operator!=
friend bool operator!=(TNumSlots lhs, TNumSlots rhs)
Definition: TThreadedObject.hxx:45
Warning
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition: TError.cxx:231
TDirectory.h
ROOT::TThreadedObject::fIsMerged
bool fIsMerged
Remember if the objects have been merged already.
Definition: TThreadedObject.hxx:334
ROOT::TThreadedObjectUtils::MergeFunctionType
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
Definition: TThreadedObject.hxx:119
ROOT::TSpinMutex
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:42
ROOT::TThreadedObject::Get
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
Definition: TThreadedObject.hxx:279
ROOT::TThreadedObject::operator->
T * operator->()
Access the wrapped object and allow to call its methods.
Definition: TThreadedObject.hxx:285
TSpinMutex.hxx
ROOT::TThreadedObject::fModel
std::unique_ptr< T > fModel
Use to store a "model" of the object.
Definition: TThreadedObject.hxx:326
TDirectory::TContext
TDirectory::TContext keeps track and restore the current directory.
Definition: TDirectory.h:89
TList.h
ROOT::TThreadedObject::fThrIDSlotMap
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
Definition: TThreadedObject.hxx:332
ROOT::TThreadedObject::SetAtSlot
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
Definition: TThreadedObject.hxx:226
v
@ v
Definition: rootcling_impl.cxx:3664
ROOT::Internal::TThreadedObjectUtils::Detacher
Definition: TThreadedObject.hxx:54
ROOT::TThreadedObject::TThreadedObject
TThreadedObject(TNumSlots initSlots, ARGS &&... args)
Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private object...
Definition: TThreadedObject.hxx:166
TROOT.h
ROOT::TThreadedObject::GetAtSlotRaw
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
Definition: TThreadedObject.hxx:259
ROOT::Internal::TThreadedObjectUtils::Detacher::Detach
static T * Detach(T *obj)
Definition: TThreadedObject.hxx:55
ROOT::TThreadedObject::fDirectories
std::deque< TDirectory * > fDirectories
A TDirectory per slot.
Definition: TThreadedObject.hxx:331
ROOT::TThreadedObject::GetThisSlotNumber
unsigned GetThisSlotNumber()
Get the slot number for this threadID, make a slot if needed.
Definition: TThreadedObject.hxx:337
ROOT::TNumSlots::fVal
unsigned int fVal
Definition: TThreadedObject.hxx:43
ROOT::TNumSlots
Defines the number of threads in some of ROOT's interfaces.
Definition: TThreadedObject.hxx:42
ROOT::R::function
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
ROOT::TThreadedObject::fgMaxSlots
static constexpr const TNumSlots fgMaxSlots
The initial number of empty processing slots that a TThreadedObject is constructed with by default.
Definition: TThreadedObject.hxx:155
ROOT::Internal::TThreadedObjectUtils::DirCreator::Create
static TDirectory * Create()
Definition: TThreadedObject.hxx:100
ROOT::TThreadedObject::GetNSlots
unsigned GetNSlots() const
Return the number of currently available slot.
Definition: TThreadedObject.hxx:190
ROOT::TThreadedObject
A wrapper to make object instances thread private, lazily.
Definition: TThreadedObject.hxx:151
ROOT::TNumSlots::operator==
friend bool operator==(TNumSlots lhs, TNumSlots rhs)
Definition: TThreadedObject.hxx:44
ROOT::TThreadedObject::GetAtSlot
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
Definition: TThreadedObject.hxx:201
void
typedef void((*Func_t)())
ROOT::Internal::TThreadedObjectUtils::Cloner
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
Definition: TThreadedObject.hxx:71
R__ASSERT
#define R__ASSERT(e)
Definition: TError.h:118
ROOT::Internal::TThreadedObjectUtils::Cloner< T, false >::Clone
static T * Clone(const T *obj, TDirectory *d=nullptr)
Definition: TThreadedObject.hxx:86
kMustCleanup
@ kMustCleanup
Definition: TObject.h:355
TList::Add
virtual void Add(TObject *obj)
Definition: TList.h:87
TH1
TH1 is the base class of all histogram classes in ROOT.
Definition: TH1.h:58
ROOT::TThreadedObject::TThreadedObject
TThreadedObject(ARGS &&... args)
Construct the TThreadedObject and the "model" of the thread private objects.
Definition: TThreadedObject.hxx:183
ROOT::Math::Chebyshev::T
double T(double x)
Definition: ChebyshevPol.h:34
d
#define d(i)
Definition: RSha256.hxx:102
TDirectory
Describe directory structure in memory.
Definition: TDirectory.h:45
ROOT::TThreadedObject::SnapshotMerge
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
Definition: TThreadedObject.hxx:311
ROOT::TThreadedObject::GetAtSlotUnchecked
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
Definition: TThreadedObject.hxx:248
make_cnn_model.model
model
Definition: make_cnn_model.py:6
ARGS
#define ARGS(alist)
Definition: gifencode.c:10
ROOT::Internal::TThreadedObjectUtils::DirCreator
Definition: TThreadedObject.hxx:99
ROOT::TThreadedObject::Merge
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
Definition: TThreadedObject.hxx:293
ROOT::TThreadedObjectUtils::MergeTObjects
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T >> &objs)
Merge TObjects.
Definition: TThreadedObject.hxx:123
ROOT::TThreadedObject::fSpinMutex
ROOT::TSpinMutex fSpinMutex
Protects concurrent access to fThrIDSlotMap, fObjPointers.
Definition: TThreadedObject.hxx:333
ROOT
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: EExecutionPolicy.hxx:4
ROOT::Internal::TThreadedObjectUtils::DirCreator< T, true >::Create
static TDirectory * Create()
Definition: TThreadedObject.hxx:110
TList
A doubly linked list.
Definition: TList.h:44
gROOT
#define gROOT
Definition: TROOT.h:406
ROOT::Internal::TThreadedObjectUtils::Detacher< T, true >::Detach
static T * Detach(T *obj)
Definition: TThreadedObject.hxx:62
TError.h
ROOT::Internal::TThreadedObjectUtils::Cloner::Clone
static T * Clone(const T *obj, TDirectory *d=nullptr)
Definition: TThreadedObject.hxx:72