Logo ROOT   6.08/07
Reference Guide
TThreadedObject.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Danilo Piparo, CERN 11/2/2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadedObject
13 #define ROOT_TThreadedObject
14 
15 #include "TList.h"
16 #include "TError.h"
17 
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "ROOT/TSpinMutex.hxx"
27 #include "TROOT.h"
28 
29 namespace ROOT {
30 
31  namespace Internal {
32 
33  namespace TThreadedObjectUtils {
34 
35  /// Get the unique index identifying a TThreadedObject.
36  inline unsigned GetTThreadedObjectIndex() {
37  static unsigned fgTThreadedObjectIndex = 0;
38  return fgTThreadedObjectIndex++;
39  }
40 
41  /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
42  template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
43  struct Cloner {
44  static T *Clone(const T *obj, TDirectory* d = nullptr) {
45  T* clone;
46  if (d){
47  TDirectory::TContext ctxt(d);
48  clone = new T(*obj);
49  } else {
50  clone = new T(*obj);
51  }
52  return clone;
53  }
54  };
55 
56  template<class T>
57  struct Cloner<T, false> {
58  static T *Clone(const T *obj, TDirectory* d = nullptr) {
59  T* clone;
60  if (d){
61  TDirectory::TContext ctxt(d);
62  clone = (T*)obj->Clone();
63  } else {
64  clone = (T*)obj->Clone();
65  }
66  return clone;
67  }
68  };
69 
70  } // End of namespace TThreadedObjectUtils
71  } // End of namespace Internals
72 
73  namespace TThreadedObjectUtils {
74 
75  template<class T>
76  using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
77  /// Merge TObjects
78  template<class T>
79  void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
80  {
81  if (!target) return;
82  TList objTList;
83  // Cannot do better than this
84  for (auto obj : objs) {
85  if (obj && obj != target) objTList.Add(obj.get());
86  }
87  target->Merge(&objTList);
88  }
89  } // end of namespace TThreadedObjectUtils
90 
91  /**
92  * \class ROOT::TThreadedObject
93  * \brief A wrapper to make object instances thread private, lazily.
94  * \tparam T Class of the object to be made thread private (e.g. TH1F)
95  * \ingroup Multicore
96  *
97  * A wrapper which makes objects thread private. The methods of the underlying
98  * object can be invoked via the the arrow operator. The object is created in
99  * a specific thread lazily, i.e. upon invocation of one of its methods.
100  * The correct object pointer from within a particular thread can be accessed
101  * with the overloaded arrow operator or with the Get method.
102  * In case an elaborate thread management is in place, e.g. in presence of
103  * stream of operations or "processing slots", it is also possible to
104  * manually select the correct object pointer explicitly.
105  */
106  template<class T>
108  public:
109  static unsigned fgMaxSlots; ///< The maximum number of processing slots (distinct threads) which the instances can manage
110  TThreadedObject(const TThreadedObject&) = delete;
111  /// Construct the TThreaded object and the "model" of the thread private
112  /// objects.
113  /// \tparam ARGS Arguments of the constructor of T
114  template<class ...ARGS>
115  TThreadedObject(ARGS&&... args): fObjPointers(fgMaxSlots, nullptr)
116  {
117  fDirectories.reserve(fgMaxSlots);
118 
119  std::string dirName = "__TThreaded_dir_";
120  dirName += std::to_string(ROOT::Internal::TThreadedObjectUtils::GetTThreadedObjectIndex()) + "_";
121  for (unsigned i=0; i< fgMaxSlots;++i) {
122  fDirectories.emplace_back(gROOT->mkdir((dirName+std::to_string(i)).c_str()));
123  }
124 
125  TDirectory::TContext ctxt(fDirectories[0]);
126  fModel.reset(new T(std::forward<ARGS>(args)...));
127  }
128 
129  /// Access a particular processing slot. This
130  /// method is *thread-unsafe*: it cannot be invoked from two different
131  /// threads with the same argument.
132  std::shared_ptr<T> GetAtSlot(unsigned i)
133  {
134  if ( i >= fObjPointers.size()) {
135  Warning("TThreadedObject::Merge", "Maximum number of slots reached.");
136  return nullptr;
137  }
138  auto objPointer = fObjPointers[i];
139  if (!objPointer) {
140  objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
141  fObjPointers[i] = objPointer;
142  }
143  return objPointer;
144  }
145 
146  /// Access a particular slot which corresponds to a single thread.
147  /// This is in general faster than the GetAtSlot method but it is
148  /// responsibility of the caller to make sure that an object is
149  /// initialised for the particular slot.
150  std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
151  {
152  return fObjPointers[i];
153  }
154 
155  /// Access the pointer corresponding to the current slot. This method is
156  /// not adequate for being called inside tight loops as it implies a
157  /// lookup in a mapping between the threadIDs and the slot indices.
158  /// A good practice consists in copying the pointer onto the stack and
159  /// proceed with the loop as shown in this work item (psudo-code) which
160  /// will be sent to different threads:
161  /// ~~~{.cpp}
162  /// auto workItem = [](){
163  /// auto objPtr = tthreadedObject.Get();
164  /// for (auto i : ROOT::TSeqI(1000)) {
165  /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
166  /// objPtr->FastMethod(i);
167  /// }
168  /// }
169  /// ~~~
170  std::shared_ptr<T> Get()
171  {
172  return GetAtSlot(GetThisSlotNumber());
173  }
174 
175  /// Access the wrapped object and allow to call its methods.
177  {
178  return Get().get();
179  }
180 
181  /// Merge all the thread private objects. Can be called once: it does not
182  /// create any new object but destroys the present bookkeping collapsing
183  /// all objects into the one at slot 0.
184  std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
185  {
186  // We do not return if we already merged.
187  if (fIsMerged) {
188  Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
189  return fObjPointers[0];
190  }
191  mergeFunction(fObjPointers[0], fObjPointers);
192  fIsMerged = true;
193  return fObjPointers[0];
194  }
195 
196  /// Merge all the thread private objects. Can be called many times. It
197  /// does create a new instance of class T to represent the "Sum" object.
198  /// This method is not thread safe: correct or acceptable behaviours
199  /// depend on the nature of T and of the merging function.
200  std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
201  {
202  if (fIsMerged) {
203  Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
204  return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
205  }
206  auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
207  std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
208  mergeFunction(targetPtrShared, fObjPointers);
209  return std::unique_ptr<T>(targetPtr);
210  }
211 
212  private:
213  std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
214  std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
215  std::vector<TDirectory*> fDirectories; ///< A TDirectory per thread is kept.
216  std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
217  unsigned fCurrMaxSlotIndex = 0; ///< The maximum slot index
218  bool fIsMerged = false; ///< Remember if the objects have been merged already
219  ROOT::TSpinMutex fThrIDSlotMutex; ///< Mutex to protect the ID-slot map access
220 
221  /// Get the slot number for this threadID.
222  unsigned GetThisSlotNumber()
223  {
224  const auto thisThreadID = std::this_thread::get_id();
225  unsigned thisIndex;
226  {
227  std::lock_guard<ROOT::TSpinMutex> lg(fThrIDSlotMutex);
228  auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
229  if (thisSlotNumIt != fThrIDSlotMap.end()) return thisSlotNumIt->second;
230  thisIndex = fCurrMaxSlotIndex++;
231  fThrIDSlotMap[thisThreadID] = thisIndex;
232  }
233  return thisIndex;
234  }
235 
236  };
237 
238  template<class T> unsigned TThreadedObject<T>::fgMaxSlots = 64;
239 
240 } // End ROOT namespace
241 
242 #include <sstream>
243 
244 ////////////////////////////////////////////////////////////////////////////////
245 /// Print a TThreadedObject at the prompt:
246 
247 namespace cling {
248  template<class T>
250  {
251  auto model = ((std::unique_ptr<T>*)(val))->get();
252  std::ostringstream ret;
253  ret << "A wrapper to make object instances thread private, lazily. "
254  << "The model which is replicated is " << printValue(model);
255  return ret.str();
256  }
257 }
258 
259 
260 #endif
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
#define ARGS(alist)
Definition: gifencode.c:10
T * operator->()
Access the wrapped object and allow to call its methods.
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
Definition: StringConv.hxx:21
double T(double x)
Definition: ChebyshevPol.h:34
A wrapper to make object instances thread private, lazily.
#define gROOT
Definition: TROOT.h:364
std::string printValue(ROOT::TThreadedObject< T > *val)
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:40
std::unique_ptr< T > fModel
Use to store a "model" of the object.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
std::vector< std::shared_ptr< T > > fObjPointers
A pointer per thread is kept.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
A doubly linked list.
Definition: TList.h:47
unsigned GetThisSlotNumber()
Get the slot number for this threadID.
static unsigned fgMaxSlots
The maximum number of processing slots (distinct threads) which the instances can manage...
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
void Warning(const char *location, const char *msgfmt,...)
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T >> &objs)
Merge TObjects.
static T * Clone(const T *obj, TDirectory *d=nullptr)
Describe directory structure in memory.
Definition: TDirectory.h:44
Print a TSeq at the prompt:
Definition: TDatime.h:114
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::vector< TDirectory * > fDirectories
A TDirectory per thread is kept.
ROOT::TSpinMutex fThrIDSlotMutex
Mutex to protect the ID-slot map access.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
#define nullptr
Definition: Rtypes.h:87
virtual void Add(TObject *obj)
Definition: TList.h:81
static T * Clone(const T *obj, TDirectory *d=nullptr)
unsigned GetTThreadedObjectIndex()
Get the unique index identifying a TThreadedObject.
TThreadedObject(ARGS &&... args)
Construct the TThreaded object and the "model" of the thread private objects.
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.