Logo ROOT  
Reference Guide
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
36 if (fClusterKey.fColumnSet.size() == other.fClusterKey.fColumnSet.size()) {
37 for (auto itr1 = fClusterKey.fColumnSet.begin(), itr2 = other.fClusterKey.fColumnSet.begin();
38 itr1 != fClusterKey.fColumnSet.end(); ++itr1, ++itr2)
39 {
40 if (*itr1 == *itr2)
41 continue;
42 return *itr1 < *itr2;
43 }
44 // *this == other
45 return false;
46 }
47 return fClusterKey.fColumnSet.size() < other.fClusterKey.fColumnSet.size();
48 }
50}
51
53 : fPageSource(pageSource)
54 , fClusterBunchSize(clusterBunchSize)
55 , fPool(2 * clusterBunchSize)
58{
59 R__ASSERT(clusterBunchSize > 0);
60}
61
63{
64 {
65 // Controlled shutdown of the I/O thread
66 std::unique_lock<std::mutex> lock(fLockWorkQueue);
67 fReadQueue.emplace(RReadItem());
68 fCvHasReadWork.notify_one();
69 }
70 fThreadIo.join();
71
72 {
73 // Controlled shutdown of the unzip thread
74 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
75 fUnzipQueue.emplace(RUnzipItem());
76 fCvHasUnzipWork.notify_one();
77 }
78 fThreadUnzip.join();
79}
80
82{
83 while (true) {
84 std::vector<RUnzipItem> unzipItems;
85 {
86 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
87 fCvHasUnzipWork.wait(lock, [&]{ return !fUnzipQueue.empty(); });
88 while (!fUnzipQueue.empty()) {
89 unzipItems.emplace_back(std::move(fUnzipQueue.front()));
90 fUnzipQueue.pop();
91 }
92 }
93
94 for (auto &item : unzipItems) {
95 if (!item.fCluster)
96 return;
97
98 fPageSource.UnzipCluster(item.fCluster.get());
99
100 // Afterwards the GetCluster() method in the main thread can pick-up the cluster
101 item.fPromise.set_value(std::move(item.fCluster));
102 }
103 } // while (true)
104}
105
107{
108 while (true) {
109 std::vector<RReadItem> readItems;
110 std::vector<RCluster::RKey> clusterKeys;
111 std::int64_t bunchId = -1;
112 {
113 std::unique_lock<std::mutex> lock(fLockWorkQueue);
114 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
115 while (!fReadQueue.empty()) {
116 if (fReadQueue.front().fClusterKey.fClusterId == kInvalidDescriptorId) {
117 fReadQueue.pop();
118 return;
119 }
120
121 if ((bunchId >= 0) && (fReadQueue.front().fBunchId != bunchId))
122 break;
123 readItems.emplace_back(std::move(fReadQueue.front()));
124 fReadQueue.pop();
125 bunchId = readItems.back().fBunchId;
126 clusterKeys.emplace_back(readItems.back().fClusterKey);
127 }
128 }
129
130 auto clusters = fPageSource.LoadClusters(clusterKeys);
131
132 for (std::size_t i = 0; i < clusters.size(); ++i) {
133 // Meanwhile, the user might have requested clusters outside the look-ahead window, so that we don't
134 // need the cluster anymore, in which case we simply discard it right away, before moving it to the pool
135 bool discard = false;
136 {
137 std::unique_lock<std::mutex> lock(fLockWorkQueue);
138 for (auto &inFlight : fInFlightClusters) {
139 if (inFlight.fClusterKey.fClusterId != clusters[i]->GetId())
140 continue;
141 discard = inFlight.fIsExpired;
142 break;
143 }
144 }
145 if (discard) {
146 clusters[i].reset();
147 readItems[i].fPromise.set_value(std::move(clusters[i]));
148 } else {
149 // Hand-over the loaded cluster pages to the unzip thread
150 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
151 fUnzipQueue.emplace(RUnzipItem{std::move(clusters[i]), std::move(readItems[i].fPromise)});
152 fCvHasUnzipWork.notify_one();
153 }
154 }
155 } // while (true)
156}
157
160{
161 for (const auto &cptr : fPool) {
162 if (cptr && (cptr->GetId() == clusterId))
163 return cptr.get();
164 }
165 return nullptr;
166}
167
169{
170 auto N = fPool.size();
171 for (unsigned i = 0; i < N; ++i) {
172 if (!fPool[i])
173 return i;
174 }
175
176 R__ASSERT(false);
177 return N;
178}
179
180
181namespace {
182
183/// Helper class for the (cluster, column list) pairs that should be loaded in the background
184class RProvides {
187
188public:
189 struct RInfo {
190 std::int64_t fBunchId = -1;
191 std::int64_t fFlags = 0;
192 ColumnSet_t fColumnSet;
193 };
194
195 static constexpr std::int64_t kFlagRequired = 0x01;
196 static constexpr std::int64_t kFlagLast = 0x02;
197
198private:
199 std::map<DescriptorId_t, RInfo> fMap;
200
201public:
202 void Insert(DescriptorId_t clusterId, const RInfo &info)
203 {
204 fMap.emplace(clusterId, info);
205 }
206
207 bool Contains(DescriptorId_t clusterId) {
208 return fMap.count(clusterId) > 0;
209 }
210
211 std::size_t GetSize() const { return fMap.size(); }
212
213 void Erase(DescriptorId_t clusterId, const ColumnSet_t &columns)
214 {
215 auto itr = fMap.find(clusterId);
216 if (itr == fMap.end())
217 return;
218 ColumnSet_t d;
219 std::copy_if(itr->second.fColumnSet.begin(), itr->second.fColumnSet.end(), std::inserter(d, d.end()),
220 [&columns] (DescriptorId_t needle) { return columns.count(needle) == 0; });
221 if (d.empty()) {
222 fMap.erase(itr);
223 } else {
224 itr->second.fColumnSet = d;
225 }
226 }
227
228 decltype(fMap)::iterator begin() { return fMap.begin(); }
229 decltype(fMap)::iterator end() { return fMap.end(); }
230};
231
232} // anonymous namespace
233
236 DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
237{
238 std::set<DescriptorId_t> keep;
239 RProvides provide;
240 {
241 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
242
243 // Determine previous cluster ids that we keep if they happen to be in the pool
244 auto prev = clusterId;
245 for (unsigned int i = 0; i < fWindowPre; ++i) {
246 prev = descriptorGuard->FindPrevClusterId(prev);
247 if (prev == kInvalidDescriptorId)
248 break;
249 keep.insert(prev);
250 }
251
252 // Determine following cluster ids and the column ids that we want to make available
253 RProvides::RInfo provideInfo;
254 provideInfo.fColumnSet = columns;
255 provideInfo.fBunchId = fBunchId;
256 provideInfo.fFlags = RProvides::kFlagRequired;
257 for (DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
258 if (i == fClusterBunchSize)
259 provideInfo.fBunchId = ++fBunchId;
260
261 auto cid = next;
262 next = descriptorGuard->FindNextClusterId(cid);
263 if (next == kInvalidDescriptorId)
264 provideInfo.fFlags |= RProvides::kFlagLast;
265
266 provide.Insert(cid, provideInfo);
267
268 if (next == kInvalidDescriptorId)
269 break;
270 provideInfo.fFlags = 0;
271 }
272 } // descriptorGuard
273
274 // Clear the cache from clusters not the in the look-ahead or the look-back window
275 for (auto &cptr : fPool) {
276 if (!cptr)
277 continue;
278 if (provide.Contains(cptr->GetId()) > 0)
279 continue;
280 if (keep.count(cptr->GetId()) > 0)
281 continue;
282 cptr.reset();
283 }
284
285 // Move clusters that meanwhile arrived into cache pool
286 {
287 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
288 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
289 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
290 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
291 // still be reasonably small and the lock is rarely taken (usually once per cluster).
292 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
293
294 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
295 R__ASSERT(itr->fFuture.valid());
296 itr->fIsExpired =
297 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
298
299 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
300 // Remove the set of columns that are already scheduled for being loaded
301 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fColumnSet);
302 ++itr;
303 continue;
304 }
305
306 auto cptr = itr->fFuture.get();
307 // If cptr is nullptr, the cluster expired previously and was released by the I/O thread
308 if (!cptr || itr->fIsExpired) {
309 cptr.reset();
310 itr = fInFlightClusters.erase(itr);
311 continue;
312 }
313
314 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
315 auto existingCluster = FindInPool(cptr->GetId());
316 if (existingCluster) {
317 existingCluster->Adopt(std::move(*cptr));
318 } else {
319 auto idxFreeSlot = FindFreeSlot();
320 fPool[idxFreeSlot] = std::move(cptr);
321 }
322 itr = fInFlightClusters.erase(itr);
323 }
324
325 // Determine clusters which get triggered for background loading
326 for (auto &cptr : fPool) {
327 if (!cptr)
328 continue;
329 provide.Erase(cptr->GetId(), cptr->GetAvailColumns());
330 }
331
332 // Figure out if enough work accumulated to justify I/O calls
333 bool skipPrefetch = false;
334 if (provide.GetSize() < fClusterBunchSize) {
335 skipPrefetch = true;
336 for (const auto &kv : provide) {
337 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
338 continue;
339 skipPrefetch = false;
340 break;
341 }
342 }
343
344 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
345 // mutex
346 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
347 // case but it's not ensured by the code
348 if (!skipPrefetch) {
349 for (const auto &kv : provide) {
350 R__ASSERT(!kv.second.fColumnSet.empty());
351
352 RReadItem readItem;
353 readItem.fClusterKey.fClusterId = kv.first;
354 readItem.fBunchId = kv.second.fBunchId;
355 readItem.fClusterKey.fColumnSet = kv.second.fColumnSet;
356
357 RInFlightCluster inFlightCluster;
358 inFlightCluster.fClusterKey.fClusterId = kv.first;
359 inFlightCluster.fClusterKey.fColumnSet = kv.second.fColumnSet;
360 inFlightCluster.fFuture = readItem.fPromise.get_future();
361 fInFlightClusters.emplace_back(std::move(inFlightCluster));
362
363 fReadQueue.emplace(std::move(readItem));
364 }
365 if (fReadQueue.size() > 0)
366 fCvHasReadWork.notify_one();
367 }
368 } // work queue lock guard
369
370 return WaitFor(clusterId, columns);
371}
372
373
376 DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
377{
378 while (true) {
379 // Fast exit: the cluster happens to be already present in the cache pool
380 auto result = FindInPool(clusterId);
381 if (result) {
382 bool hasMissingColumn = false;
383 for (auto cid : columns) {
384 if (result->ContainsColumn(cid))
385 continue;
386
387 hasMissingColumn = true;
388 break;
389 }
390 if (!hasMissingColumn)
391 return result;
392 }
393
394 // Otherwise the missing data must have been triggered for loading by now, so block and wait
395 decltype(fInFlightClusters)::iterator itr;
396 {
397 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
398 itr = fInFlightClusters.begin();
399 for (; itr != fInFlightClusters.end(); ++itr) {
400 if (itr->fClusterKey.fClusterId == clusterId)
401 break;
402 }
403 R__ASSERT(itr != fInFlightClusters.end());
404 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
405 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
406 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
407 // is released. We need to release the lock before potentially blocking on the cluster future.
408 }
409
410 auto cptr = itr->fFuture.get();
411 if (result) {
412 result->Adopt(std::move(*cptr));
413 } else {
414 auto idxFreeSlot = FindFreeSlot();
415 fPool[idxFreeSlot] = std::move(cptr);
416 }
417
418 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
419 fInFlightClusters.erase(itr);
420 }
421}
422
423
425{
426 while (true) {
427 decltype(fInFlightClusters)::iterator itr;
428 {
429 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
430 itr = fInFlightClusters.begin();
431 if (itr == fInFlightClusters.end())
432 return;
433 }
434
435 itr->fFuture.wait();
436
437 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
438 fInFlightClusters.erase(itr);
439 }
440}
unsigned int fFlags
#define d(i)
Definition: RSha256.hxx:102
#define R__ASSERT(e)
Definition: TError.h:118
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
An in-memory subset of the packed and compressed pages of a cluster.
Definition: RCluster.hxx:154
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition: RCluster.hxx:156
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
Definition: RNTupleUtil.hxx:83
constexpr DescriptorId_t kInvalidDescriptorId
Definition: RNTupleUtil.hxx:84
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition: Utils.hxx:187
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
Request to decompress and if necessary unpack compressed pages.