Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
37 for (auto itr1 = fClusterKey.fPhysicalColumnSet.begin(), itr2 = other.fClusterKey.fPhysicalColumnSet.begin();
38 itr1 != fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
39 if (*itr1 == *itr2)
40 continue;
41 return *itr1 < *itr2;
42 }
43 // *this == other
44 return false;
45 }
47 }
49}
50
52 : fPageSource(pageSource),
53 fClusterBunchSize(clusterBunchSize),
54 fPool(2 * clusterBunchSize),
56{
57 R__ASSERT(clusterBunchSize > 0);
58}
59
61{
62 {
63 // Controlled shutdown of the I/O thread
64 std::unique_lock<std::mutex> lock(fLockWorkQueue);
65 fReadQueue.emplace_back(RReadItem());
66 fCvHasReadWork.notify_one();
67 }
68 fThreadIo.join();
69}
70
72{
73 std::deque<RReadItem> readItems;
74 while (true) {
75 {
76 std::unique_lock<std::mutex> lock(fLockWorkQueue);
77 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
78 std::swap(readItems, fReadQueue);
79 }
80
81 while (!readItems.empty()) {
82 std::vector<RCluster::RKey> clusterKeys;
83 std::int64_t bunchId = -1;
84 for (unsigned i = 0; i < readItems.size(); ++i) {
85 const auto &item = readItems[i];
86 // `kInvalidDescriptorId` is used as a marker for thread cancellation. Such item causes the
87 // thread to terminate; thus, it must appear last in the queue.
88 if (R__unlikely(item.fClusterKey.fClusterId == kInvalidDescriptorId)) {
89 R__ASSERT(i == (readItems.size() - 1));
90 return;
91 }
92 if ((bunchId >= 0) && (item.fBunchId != bunchId))
93 break;
94 bunchId = item.fBunchId;
95 clusterKeys.emplace_back(item.fClusterKey);
96 }
97
98 auto clusters = fPageSource.LoadClusters(clusterKeys);
99 for (std::size_t i = 0; i < clusters.size(); ++i) {
100 // Meanwhile, the user might have requested clusters outside the look-ahead window, so that we don't
101 // need the cluster anymore, in which case we simply discard it right away, before moving it to the pool
102 bool discard;
103 {
104 std::unique_lock<std::mutex> lock(fLockWorkQueue);
105 discard = std::any_of(fInFlightClusters.begin(), fInFlightClusters.end(),
106 [thisClusterId = clusters[i]->GetId()](auto &inFlight) {
107 return inFlight.fClusterKey.fClusterId == thisClusterId && inFlight.fIsExpired;
108 });
109 }
110 if (discard) {
111 clusters[i].reset();
112 // clusters[i] is now nullptr; also return this via the promise.
113 readItems[i].fPromise.set_value(nullptr);
114 } else {
115 readItems[i].fPromise.set_value(std::move(clusters[i]));
116 }
117 }
118 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
119 }
120 } // while (true)
121}
122
125{
126 for (const auto &cptr : fPool) {
127 if (cptr && (cptr->GetId() == clusterId))
128 return cptr.get();
129 }
130 return nullptr;
131}
132
134{
135 auto N = fPool.size();
136 for (unsigned i = 0; i < N; ++i) {
137 if (!fPool[i])
138 return i;
139 }
140
141 R__ASSERT(false);
142 return N;
143}
144
145
146namespace {
147
148/// Helper class for the (cluster, column list) pairs that should be loaded in the background
149class RProvides {
152
153public:
154 struct RInfo {
155 std::int64_t fBunchId = -1;
156 std::int64_t fFlags = 0;
157 ColumnSet_t fPhysicalColumnSet;
158 };
159
160 static constexpr std::int64_t kFlagRequired = 0x01;
161 static constexpr std::int64_t kFlagLast = 0x02;
162
163private:
164 std::map<DescriptorId_t, RInfo> fMap;
165
166public:
167 void Insert(DescriptorId_t clusterId, const RInfo &info)
168 {
169 fMap.emplace(clusterId, info);
170 }
171
172 bool Contains(DescriptorId_t clusterId) {
173 return fMap.count(clusterId) > 0;
174 }
175
176 std::size_t GetSize() const { return fMap.size(); }
177
178 void Erase(DescriptorId_t clusterId, const ColumnSet_t &physicalColumns)
179 {
180 auto itr = fMap.find(clusterId);
181 if (itr == fMap.end())
182 return;
183 ColumnSet_t d;
184 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
185 std::inserter(d, d.end()),
186 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
187 if (d.empty()) {
188 fMap.erase(itr);
189 } else {
190 itr->second.fPhysicalColumnSet = d;
191 }
192 }
193
194 decltype(fMap)::iterator begin() { return fMap.begin(); }
195 decltype(fMap)::iterator end() { return fMap.end(); }
196};
197
198} // anonymous namespace
199
202 const RCluster::ColumnSet_t &physicalColumns)
203{
204 std::set<DescriptorId_t> keep;
205 RProvides provide;
206 {
207 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
208
209 // Determine previous cluster ids that we keep if they happen to be in the pool
210 auto prev = clusterId;
211 for (unsigned int i = 0; i < fWindowPre; ++i) {
212 prev = descriptorGuard->FindPrevClusterId(prev);
213 if (prev == kInvalidDescriptorId)
214 break;
215 keep.insert(prev);
216 }
217
218 // Determine following cluster ids and the column ids that we want to make available
219 RProvides::RInfo provideInfo;
220 provideInfo.fPhysicalColumnSet = physicalColumns;
221 provideInfo.fBunchId = fBunchId;
222 provideInfo.fFlags = RProvides::kFlagRequired;
223 for (DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
224 if (i == fClusterBunchSize)
225 provideInfo.fBunchId = ++fBunchId;
226
227 auto cid = next;
228 next = descriptorGuard->FindNextClusterId(cid);
229 if (next != kInvalidClusterIndex) {
230 if (!fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
232 }
233 if (next == kInvalidDescriptorId)
234 provideInfo.fFlags |= RProvides::kFlagLast;
235
236 provide.Insert(cid, provideInfo);
237
238 if (next == kInvalidDescriptorId)
239 break;
240 provideInfo.fFlags = 0;
241 }
242 } // descriptorGuard
243
244 // Clear the cache from clusters not the in the look-ahead or the look-back window
245 for (auto &cptr : fPool) {
246 if (!cptr)
247 continue;
248 if (provide.Contains(cptr->GetId()) > 0)
249 continue;
250 if (keep.count(cptr->GetId()) > 0)
251 continue;
252 cptr.reset();
253 }
254
255 // Move clusters that meanwhile arrived into cache pool
256 {
257 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
258 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
259 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
260 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
261 // still be reasonably small and the lock is rarely taken (usually once per cluster).
262 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
263
264 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
265 R__ASSERT(itr->fFuture.valid());
266 itr->fIsExpired =
267 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
268
269 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
270 // Remove the set of columns that are already scheduled for being loaded
271 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
272 ++itr;
273 continue;
274 }
275
276 auto cptr = itr->fFuture.get();
277 // If cptr is nullptr, the cluster expired previously and was released by the I/O thread
278 if (!cptr || itr->fIsExpired) {
279 cptr.reset();
280 itr = fInFlightClusters.erase(itr);
281 continue;
282 }
283
284 // Noop unless the page source has a task scheduler
285 fPageSource.UnzipCluster(cptr.get());
286
287 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
288 auto existingCluster = FindInPool(cptr->GetId());
289 if (existingCluster) {
290 existingCluster->Adopt(std::move(*cptr));
291 } else {
292 auto idxFreeSlot = FindFreeSlot();
293 fPool[idxFreeSlot] = std::move(cptr);
294 }
295 itr = fInFlightClusters.erase(itr);
296 }
297
298 // Determine clusters which get triggered for background loading
299 for (auto &cptr : fPool) {
300 if (!cptr)
301 continue;
302 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
303 }
304
305 // Figure out if enough work accumulated to justify I/O calls
306 bool skipPrefetch = false;
307 if (provide.GetSize() < fClusterBunchSize) {
308 skipPrefetch = true;
309 for (const auto &kv : provide) {
310 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
311 continue;
312 skipPrefetch = false;
313 break;
314 }
315 }
316
317 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
318 // mutex
319 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
320 // case but it's not ensured by the code
321 if (!skipPrefetch) {
322 for (const auto &kv : provide) {
323 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
324
325 RReadItem readItem;
326 readItem.fClusterKey.fClusterId = kv.first;
327 readItem.fBunchId = kv.second.fBunchId;
328 readItem.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
329
330 RInFlightCluster inFlightCluster;
331 inFlightCluster.fClusterKey.fClusterId = kv.first;
332 inFlightCluster.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
333 inFlightCluster.fFuture = readItem.fPromise.get_future();
334 fInFlightClusters.emplace_back(std::move(inFlightCluster));
335
336 fReadQueue.emplace_back(std::move(readItem));
337 }
338 if (!fReadQueue.empty())
339 fCvHasReadWork.notify_one();
340 }
341 } // work queue lock guard
342
343 return WaitFor(clusterId, physicalColumns);
344}
345
348 const RCluster::ColumnSet_t &physicalColumns)
349{
350 while (true) {
351 // Fast exit: the cluster happens to be already present in the cache pool
352 auto result = FindInPool(clusterId);
353 if (result) {
354 bool hasMissingColumn = false;
355 for (auto cid : physicalColumns) {
356 if (result->ContainsColumn(cid))
357 continue;
358
359 hasMissingColumn = true;
360 break;
361 }
362 if (!hasMissingColumn)
363 return result;
364 }
365
366 // Otherwise the missing data must have been triggered for loading by now, so block and wait
367 decltype(fInFlightClusters)::iterator itr;
368 {
369 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
370 itr = fInFlightClusters.begin();
371 for (; itr != fInFlightClusters.end(); ++itr) {
372 if (itr->fClusterKey.fClusterId == clusterId)
373 break;
374 }
375 R__ASSERT(itr != fInFlightClusters.end());
376 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
377 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
378 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
379 // is released. We need to release the lock before potentially blocking on the cluster future.
380 }
381
382 auto cptr = itr->fFuture.get();
383 // We were blocked waiting for the cluster, so assume that nobody discarded it.
384 R__ASSERT(cptr != nullptr);
385
386 // Noop unless the page source has a task scheduler
387 fPageSource.UnzipCluster(cptr.get());
388
389 if (result) {
390 result->Adopt(std::move(*cptr));
391 } else {
392 auto idxFreeSlot = FindFreeSlot();
393 fPool[idxFreeSlot] = std::move(cptr);
394 }
395
396 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
397 fInFlightClusters.erase(itr);
398 }
399}
400
402{
403 while (true) {
404 decltype(fInFlightClusters)::iterator itr;
405 {
406 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
407 itr = fInFlightClusters.begin();
408 if (itr == fInFlightClusters.end())
409 return;
410 }
411
412 itr->fFuture.wait();
413
414 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
415 fInFlightClusters.erase(itr);
416 }
417}
#define R__unlikely(expr)
Definition RConfig.hxx:578
std::ios_base::fmtflags fFlags
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr ClusterSize_t kInvalidClusterIndex(std::uint64_t(-1))
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:189
Clusters that are currently being processed by the pipeline.
std::future< std::unique_ptr< RCluster > > fFuture
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise