Logo ROOT  
Reference Guide
TReentrantRWLock.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor CERN 12/09/2016
3// Philippe Canal FNAL 12/09/2016
4
5/*************************************************************************
6 * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/** \class TReentrantRWLock
14 \brief An implementation of a reentrant read-write lock with a
15 configurable internal mutex/lock (default Spin Lock).
16
17This class provides an implementation of a reentrant read-write lock
18that uses an internal lock and a condition variable to synchronize
19readers and writers when necessary.
20
21The implementation allows a single reader to take the write lock without
22releasing the reader lock. It also allows the writer to take a read lock.
23In other word, the lock is re-entrant for both reading and writing.
24
25The implementation tries to make faster the scenario when readers come
26and go but there is no writer. In that case, readers will not pay the
27price of taking the internal spin lock.
28
29Moreover, this RW lock tries to be fair with writers, giving them the
30possibility to claim the lock and wait for only the remaining readers,
31thus preventing starvation.
32*/
33
34#include "TReentrantRWLock.hxx"
35#include "ROOT/TSpinMutex.hxx"
36#include "TMutex.h"
37#include "TError.h"
38#include <assert.h>
39
40using namespace ROOT;
41
42#ifdef NDEBUG
43# define R__MAYBE_AssertReadCountLocIsFromCurrentThread(READERSCOUNTLOC)
44# define R__MAYBE_ASSERT_WITH_LOCAL_LOCK(where,msg,what)
45#else
46# define R__MAYBE_AssertReadCountLocIsFromCurrentThread(READERSCOUNTLOC) \
47 AssertReadCountLocIsFromCurrentThread(READERSCOUNTLOC)
48#define R__MAYBE_ASSERT_WITH_LOCAL_LOCK(where, msg, what) \
49 { \
50 std::unique_lock<MutexT> lock(fMutex); \
51 auto local = fRecurseCounts.GetLocal(); \
52 if (!(what)) \
53 Error(where, "%s", msg); \
54 }
55#endif
56
57Internal::UniqueLockRecurseCount::UniqueLockRecurseCount()
58{
59 static bool singleton = false;
60 if (singleton) {
61 ::Fatal("UniqueLockRecurseCount Ctor", "Only one TReentrantRWLock using a UniqueLockRecurseCount is allowed.");
62 }
63 singleton = true;
64}
65
66
67////////////////////////////////////////////////////////////////////////////
68/// Acquire the lock in read mode.
69template <typename MutexT, typename RecurseCountsT>
71{
72 ++fReaderReservation;
73
74 // if (fReaders == std::numeric_limits<decltype(fReaders)>::max()) {
75 // ::Fatal("TRWSpinLock::WriteLock", "Too many recursions in TRWSpinLock!");
76 // }
77
78 auto local = fRecurseCounts.GetLocal();
79
80 TVirtualRWMutex::Hint_t *hint = nullptr;
81
82 if (!fWriter) {
83 // There is no writer, go freely to the critical section
84 ++fReaders;
85 --fReaderReservation;
86
87 hint = fRecurseCounts.IncrementReadCount(local, fMutex);
88
89 } else if (fRecurseCounts.IsCurrentWriter(local)) {
90
91 --fReaderReservation;
92 // This can run concurrently with another thread trying to get
93 // the read lock and ending up in the next section ("Wait for writers, if any")
94 // which need to also get the local readers count and thus can
95 // modify the map.
96 hint = fRecurseCounts.IncrementReadCount(local, fMutex);
97 ++fReaders;
98
99 } else {
100 // A writer claimed the RW lock, we will need to wait on the
101 // internal lock
102 --fReaderReservation;
103
104 std::unique_lock<MutexT> lock(fMutex);
105
106 // Wait for writers, if any
107 if (fWriter && fRecurseCounts.IsNotCurrentWriter(local)) {
108 auto readerCount = fRecurseCounts.GetLocalReadersCount(local);
109 if (readerCount == 0)
110 fCond.wait(lock, [this] { return !fWriter; });
111 // else
112 // There is a writer **but** we have outstanding readers
113 // locks, this must mean that the writer is actually
114 // waiting on this thread to release its read locks.
115 // This can be done in only two ways:
116 // * request the writer lock
117 // * release the reader lock
118 // Either way, this thread needs to proceed to
119 // be able to reach a point whether it does one
120 // of the two.
121 }
122
123 hint = fRecurseCounts.IncrementReadCount(local);
124
125 // This RW lock now belongs to the readers
126 ++fReaders;
127
128 lock.unlock();
129 }
130
131 return hint;
132}
133
134//////////////////////////////////////////////////////////////////////////
135/// Release the lock in read mode.
136template <typename MutexT, typename RecurseCountsT>
138{
139 size_t *localReaderCount;
140 if (!hint) {
141 // This should be very rare.
142 auto local = fRecurseCounts.GetLocal();
143 std::lock_guard<MutexT> lock(fMutex);
144 localReaderCount = &(fRecurseCounts.GetLocalReadersCount(local));
145 } else {
146 localReaderCount = reinterpret_cast<size_t*>(hint);
147 }
148
149 --fReaders;
150 if (fWriterReservation && fReaders == 0) {
151 // We still need to lock here to prevent interleaving with a writer
152 std::lock_guard<MutexT> lock(fMutex);
153
154 --(*localReaderCount);
155
156 // Make sure you wake up a writer, if any
157 // Note: spurrious wakeups are okay, fReaders
158 // will be checked again in WriteLock
159 fCond.notify_all();
160 } else {
161
162 --(*localReaderCount);
163 }
164}
165
166//////////////////////////////////////////////////////////////////////////
167/// Acquire the lock in write mode.
168template <typename MutexT, typename RecurseCountsT>
170{
171 ++fWriterReservation;
172
173 std::unique_lock<MutexT> lock(fMutex);
174
175 auto local = fRecurseCounts.GetLocal();
176
177 // Release this thread's reader lock(s)
178 auto &readerCount = fRecurseCounts.GetLocalReadersCount(local);
179 TVirtualRWMutex::Hint_t *hint = reinterpret_cast<TVirtualRWMutex::Hint_t *>(&readerCount);
180
181 fReaders -= readerCount;
182
183 // Wait for other writers, if any
184 if (fWriter && fRecurseCounts.IsNotCurrentWriter(local)) {
185 if (readerCount && fReaders == 0) {
186 // we decrease fReaders to zero, let's wake up the
187 // other writer.
188 fCond.notify_all();
189 }
190 fCond.wait(lock, [this] { return !fWriter; });
191 }
192
193 // Claim the lock for this writer
194 fWriter = true;
195 fRecurseCounts.SetIsWriter(local);
196
197 // Wait until all reader reservations finish
198 while (fReaderReservation) {
199 };
200
201 // Wait for remaining readers
202 fCond.wait(lock, [this] { return fReaders == 0; });
203
204 // Restore this thread's reader lock(s)
205 fReaders += readerCount;
206
207 --fWriterReservation;
208
209 lock.unlock();
210
211 return hint;
212}
213
214//////////////////////////////////////////////////////////////////////////
215/// Release the lock in write mode.
216template <typename MutexT, typename RecurseCountsT>
218{
219 // We need to lock here to prevent interleaving with a reader
220 std::lock_guard<MutexT> lock(fMutex);
221
222 if (!fWriter || fRecurseCounts.fWriteRecurse == 0) {
223 Error("TReentrantRWLock::WriteUnLock", "Write lock already released for %p", this);
224 return;
225 }
226
227 fRecurseCounts.DecrementWriteCount();
228
229 if (!fRecurseCounts.fWriteRecurse) {
230 fWriter = false;
231
232 auto local = fRecurseCounts.GetLocal();
233 fRecurseCounts.ResetIsWriter(local);
234
235 // Notify all potential readers/writers that are waiting
236 fCond.notify_all();
237 }
238}
239namespace {
240template <typename MutexT, typename RecurseCountsT>
241struct TReentrantRWLockState: public TVirtualRWMutex::State {
242 size_t *fReadersCountLoc = nullptr;
243 int fReadersCount = 0;
244 size_t fWriteRecurse = 0;
245};
246
247template <typename MutexT, typename RecurseCountsT>
248struct TReentrantRWLockStateDelta: public TVirtualRWMutex::StateDelta {
249 size_t *fReadersCountLoc = nullptr;
250 int fDeltaReadersCount = 0;
251 int fDeltaWriteRecurse = 0;
252};
253}
254
255//////////////////////////////////////////////////////////////////////////
256/// Get the lock state before the most recent write lock was taken.
257
258template <typename MutexT, typename RecurseCountsT>
259std::unique_ptr<TVirtualRWMutex::State>
261{
262 using State_t = TReentrantRWLockState<MutexT, RecurseCountsT>;
263
264 if (!fWriter) {
265 Error("TReentrantRWLock::GetStateBefore()", "Must be write locked!");
266 return nullptr;
267 }
268
269 auto local = fRecurseCounts.GetLocal();
270 if (fRecurseCounts.IsNotCurrentWriter(local)) {
271 Error("TReentrantRWLock::GetStateBefore()", "Not holding the write lock!");
272 return nullptr;
273 }
274
275 std::unique_ptr<State_t> pState(new State_t);
276 {
277 std::lock_guard<MutexT> lock(fMutex);
278 pState->fReadersCountLoc = &(fRecurseCounts.GetLocalReadersCount(local));
279 }
280 pState->fReadersCount = *(pState->fReadersCountLoc);
281 // *Before* the most recent write lock (that is required by GetStateBefore())
282 // was taken, the write recursion level was `fWriteRecurse - 1`
283 pState->fWriteRecurse = fRecurseCounts.fWriteRecurse - 1;
284
285#if __GNUC__ < 7
286 // older version of gcc can not convert implicitly from
287 // unique_ptr of derived to unique_ptr of base
288 using BaseState_t = TVirtualRWMutex::State;
289 return std::unique_ptr<BaseState_t>(pState.release());
290#else
291 return pState;
292#endif
293}
294
295//////////////////////////////////////////////////////////////////////////
296/// Rewind to an earlier mutex state, returning the delta.
297
298template <typename MutexT, typename RecurseCountsT>
299std::unique_ptr<TVirtualRWMutex::StateDelta>
301 using State_t = TReentrantRWLockState<MutexT, RecurseCountsT>;
302 using StateDelta_t = TReentrantRWLockStateDelta<MutexT, RecurseCountsT>;
303 auto& typedState = static_cast<const State_t&>(earlierState);
304
305 R__MAYBE_AssertReadCountLocIsFromCurrentThread(typedState.fReadersCountLoc);
306
307 std::unique_ptr<StateDelta_t> pStateDelta(new StateDelta_t);
308 pStateDelta->fReadersCountLoc = typedState.fReadersCountLoc;
309 pStateDelta->fDeltaReadersCount = *typedState.fReadersCountLoc - typedState.fReadersCount;
310 pStateDelta->fDeltaWriteRecurse = fRecurseCounts.fWriteRecurse - typedState.fWriteRecurse;
311
312 if (pStateDelta->fDeltaReadersCount < 0) {
313 Error("TReentrantRWLock::Rewind", "Inconsistent read lock count!");
314 return nullptr;
315 }
316
317 if (pStateDelta->fDeltaWriteRecurse < 0) {
318 Error("TReentrantRWLock::Rewind", "Inconsistent write lock count!");
319 return nullptr;
320 }
321
322 auto hint = reinterpret_cast<TVirtualRWMutex::Hint_t *>(typedState.fReadersCountLoc);
323 if (pStateDelta->fDeltaWriteRecurse != 0) {
324 R__MAYBE_ASSERT_WITH_LOCAL_LOCK("TReentrantRWLock::Rewind",
325 "Lock rewinded from a thread that does not own the Write lock",
326 fWriter && !fRecurseCounts.IsNotCurrentWriter(local)); // local defined in macro
327
328 // Claim a recurse-state +1 to be able to call Unlock() below.
329 fRecurseCounts.fWriteRecurse = typedState.fWriteRecurse + 1;
330 // Release this thread's write lock
331 WriteUnLock(hint);
332 } else {
333 Error("TReentrantRWLock::Rewind", "has been called with no write lock held.");
334 }
335
336 if (pStateDelta->fDeltaReadersCount != 0) {
337 // Claim a recurse-state +1 to be able to call Unlock() below.
338 *typedState.fReadersCountLoc = typedState.fReadersCount + 1;
339 // ReadersCount and Readers can be read/updated before actually holding the R/W lock.
340 // (See WriteLock and ReadLock methods) but fReaders is an atomic, so that should be fine
341 // but that's weird, that does not account to other change in fReaders during between
342 // the snapshot and the rewind ... humm unless the lock held is a WriteLock
343 // (the actual use case) in which case there is no other thread that can update fReaders
344 // and we also assume that the "user code" is balanced and release all read locks it takes.
345 fReaders = typedState.fReadersCount + 1;
346 // Release this thread's reader lock(s)
347 ReadUnLock(hint);
348 }
349 // else earlierState and *this are identical!
350
351 return std::unique_ptr<TVirtualRWMutex::StateDelta>(std::move(pStateDelta));
352}
353
354//////////////////////////////////////////////////////////////////////////
355/// Re-apply a delta.
356
357template <typename MutexT, typename RecurseCountsT>
358void TReentrantRWLock<MutexT, RecurseCountsT>::Apply(std::unique_ptr<StateDelta> &&state) {
359 if (!state) {
360 Error("TReentrantRWLock::Apply", "Cannot apply empty delta!");
361 return;
362 }
363
364 using StateDelta_t = TReentrantRWLockStateDelta<MutexT, RecurseCountsT>;
365 const StateDelta_t* typedDelta = static_cast<const StateDelta_t*>(state.get());
366
367 if (typedDelta->fDeltaWriteRecurse < 0) {
368 Error("TReentrantRWLock::Apply", "Negative write recurse count delta!");
369 return;
370 }
371 if (typedDelta->fDeltaReadersCount < 0) {
372 Error("TReentrantRWLock::Apply", "Negative read count delta!");
373 return;
374 }
375 R__MAYBE_AssertReadCountLocIsFromCurrentThread(typedDelta->fReadersCountLoc);
376
377 if (typedDelta->fDeltaWriteRecurse != 0) {
378 WriteLock();
379 fRecurseCounts.fWriteRecurse += typedDelta->fDeltaWriteRecurse - 1;
380 }
381 if (typedDelta->fDeltaReadersCount != 0) {
382 ReadLock();
383 // "- 1" due to ReadLock() above.
384 fReaders += typedDelta->fDeltaReadersCount - 1;
385 *typedDelta->fReadersCountLoc += typedDelta->fDeltaReadersCount - 1;
386 }
387}
388
389//////////////////////////////////////////////////////////////////////////
390/// Assert that presumedLocalReadersCount really matches the local read count.
391/// Print an error message if not.
392
393template <typename MutexT, typename RecurseCountsT>
395{
396 auto local = fRecurseCounts.GetLocal();
397 size_t* localReadersCount;
398 {
399 std::lock_guard<MutexT> lock(fMutex);
400 localReadersCount = &(fRecurseCounts.GetLocalReadersCount(local));
401 }
402 if (localReadersCount != presumedLocalReadersCount) {
403 Error("TReentrantRWLock::AssertReadCountLocIsFromCurrentThread", "ReadersCount is from different thread!");
404 }
405}
406
407
408namespace ROOT {
412
416
417#ifdef R__HAS_TBB
420#endif
421}
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition: TError.cxx:245
#define R__MAYBE_AssertReadCountLocIsFromCurrentThread(READERSCOUNTLOC)
#define R__MAYBE_ASSERT_WITH_LOCAL_LOCK(where, msg, what)
void WriteUnLock(TVirtualRWMutex::Hint_t *)
Release the lock in write mode.
std::unique_ptr< StateDelta > Rewind(const State &earlierState)
Rewind to an earlier mutex state, returning the delta.
TVirtualRWMutex::Hint_t * ReadLock()
Acquire the lock in read mode.
TVirtualRWMutex::Hint_t * WriteLock()
Acquire the lock in write mode.
void ReadUnLock(TVirtualRWMutex::Hint_t *)
Release the lock in read mode.
void AssertReadCountLocIsFromCurrentThread(const size_t *presumedLocalReadersCount)
Assert that presumedLocalReadersCount really matches the local read count.
std::unique_ptr< State > GetStateBefore()
Get the lock state before the most recent write lock was taken.
void Apply(std::unique_ptr< StateDelta > &&delta)
Re-apply a delta.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
State as returned by GetStateDelta() that can be passed to Restore()
Earlier lock state as returned by GetState() that can be passed to Restore()