Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
35 if (fClusterKey.fClusterId == other.fClusterKey.fClusterId) {
36 if (fClusterKey.fPhysicalColumnSet.size() == other.fClusterKey.fPhysicalColumnSet.size()) {
37 for (auto itr1 = fClusterKey.fPhysicalColumnSet.begin(), itr2 = other.fClusterKey.fPhysicalColumnSet.begin();
38 itr1 != fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
39 if (*itr1 == *itr2)
40 continue;
41 return *itr1 < *itr2;
42 }
43 // *this == other
44 return false;
45 }
46 return fClusterKey.fPhysicalColumnSet.size() < other.fClusterKey.fPhysicalColumnSet.size();
47 }
48 return fClusterKey.fClusterId < other.fClusterKey.fClusterId;
49}
50
52 : fPageSource(pageSource),
53 fClusterBunchSize(clusterBunchSize),
54 fPool(2 * clusterBunchSize),
56{
57 R__ASSERT(clusterBunchSize > 0);
58}
59
61{
62 {
63 // Controlled shutdown of the I/O thread
64 std::unique_lock<std::mutex> lock(fLockWorkQueue);
65 fReadQueue.emplace_back(RReadItem());
66 fCvHasReadWork.notify_one();
67 }
68 fThreadIo.join();
69}
70
72{
73 std::deque<RReadItem> readItems;
74 while (true) {
75 {
76 std::unique_lock<std::mutex> lock(fLockWorkQueue);
77 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
78 std::swap(readItems, fReadQueue);
79 }
80
81 while (!readItems.empty()) {
82 std::vector<RCluster::RKey> clusterKeys;
83 std::int64_t bunchId = -1;
84 for (unsigned i = 0; i < readItems.size(); ++i) {
85 const auto &item = readItems[i];
86 // `kInvalidDescriptorId` is used as a marker for thread cancellation. Such item causes the
87 // thread to terminate; thus, it must appear last in the queue.
88 if (R__unlikely(item.fClusterKey.fClusterId == kInvalidDescriptorId)) {
89 R__ASSERT(i == (readItems.size() - 1));
90 return;
91 }
92 if ((bunchId >= 0) && (item.fBunchId != bunchId))
93 break;
94 bunchId = item.fBunchId;
95 clusterKeys.emplace_back(item.fClusterKey);
96 }
97
98 auto clusters = fPageSource.LoadClusters(clusterKeys);
99 for (std::size_t i = 0; i < clusters.size(); ++i) {
100 // Meanwhile, the user might have requested clusters outside the look-ahead window, so that we don't
101 // need the cluster anymore, in which case we simply discard it right away, before moving it to the pool
102 bool discard;
103 {
104 std::unique_lock<std::mutex> lock(fLockWorkQueue);
105 discard = std::any_of(fInFlightClusters.begin(), fInFlightClusters.end(),
106 [thisClusterId = clusters[i]->GetId()](auto &inFlight) {
107 return inFlight.fClusterKey.fClusterId == thisClusterId && inFlight.fIsExpired;
108 });
109 }
110 if (discard) {
111 clusters[i].reset();
112 }
113 readItems[i].fPromise.set_value(std::move(clusters[i]));
114 }
115 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
116 }
117 } // while (true)
118}
119
122{
123 for (const auto &cptr : fPool) {
124 if (cptr && (cptr->GetId() == clusterId))
125 return cptr.get();
126 }
127 return nullptr;
128}
129
131{
132 auto N = fPool.size();
133 for (unsigned i = 0; i < N; ++i) {
134 if (!fPool[i])
135 return i;
136 }
137
138 R__ASSERT(false);
139 return N;
140}
141
142
143namespace {
144
145/// Helper class for the (cluster, column list) pairs that should be loaded in the background
146class RProvides {
149
150public:
151 struct RInfo {
152 std::int64_t fBunchId = -1;
153 std::int64_t fFlags = 0;
154 ColumnSet_t fPhysicalColumnSet;
155 };
156
157 static constexpr std::int64_t kFlagRequired = 0x01;
158 static constexpr std::int64_t kFlagLast = 0x02;
159
160private:
161 std::map<DescriptorId_t, RInfo> fMap;
162
163public:
164 void Insert(DescriptorId_t clusterId, const RInfo &info)
165 {
166 fMap.emplace(clusterId, info);
167 }
168
169 bool Contains(DescriptorId_t clusterId) {
170 return fMap.count(clusterId) > 0;
171 }
172
173 std::size_t GetSize() const { return fMap.size(); }
174
175 void Erase(DescriptorId_t clusterId, const ColumnSet_t &physicalColumns)
176 {
177 auto itr = fMap.find(clusterId);
178 if (itr == fMap.end())
179 return;
180 ColumnSet_t d;
181 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
182 std::inserter(d, d.end()),
183 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
184 if (d.empty()) {
185 fMap.erase(itr);
186 } else {
187 itr->second.fPhysicalColumnSet = d;
188 }
189 }
190
191 decltype(fMap)::iterator begin() { return fMap.begin(); }
192 decltype(fMap)::iterator end() { return fMap.end(); }
193};
194
195} // anonymous namespace
196
197ROOT::Experimental::Internal::RCluster *
199 const RCluster::ColumnSet_t &physicalColumns)
200{
201 std::set<DescriptorId_t> keep;
202 RProvides provide;
203 {
204 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
205
206 // Determine previous cluster ids that we keep if they happen to be in the pool
207 auto prev = clusterId;
208 for (unsigned int i = 0; i < fWindowPre; ++i) {
209 prev = descriptorGuard->FindPrevClusterId(prev);
210 if (prev == kInvalidDescriptorId)
211 break;
212 keep.insert(prev);
213 }
214
215 // Determine following cluster ids and the column ids that we want to make available
216 RProvides::RInfo provideInfo;
217 provideInfo.fPhysicalColumnSet = physicalColumns;
218 provideInfo.fBunchId = fBunchId;
219 provideInfo.fFlags = RProvides::kFlagRequired;
220 for (DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
221 if (i == fClusterBunchSize)
222 provideInfo.fBunchId = ++fBunchId;
223
224 auto cid = next;
225 next = descriptorGuard->FindNextClusterId(cid);
226 if (next != kInvalidClusterIndex) {
227 if (!fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
229 }
230 if (next == kInvalidDescriptorId)
231 provideInfo.fFlags |= RProvides::kFlagLast;
232
233 provide.Insert(cid, provideInfo);
234
235 if (next == kInvalidDescriptorId)
236 break;
237 provideInfo.fFlags = 0;
238 }
239 } // descriptorGuard
240
241 // Clear the cache from clusters not the in the look-ahead or the look-back window
242 for (auto &cptr : fPool) {
243 if (!cptr)
244 continue;
245 if (provide.Contains(cptr->GetId()) > 0)
246 continue;
247 if (keep.count(cptr->GetId()) > 0)
248 continue;
249 cptr.reset();
250 }
251
252 // Move clusters that meanwhile arrived into cache pool
253 {
254 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
255 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
256 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
257 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
258 // still be reasonably small and the lock is rarely taken (usually once per cluster).
259 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
260
261 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
262 R__ASSERT(itr->fFuture.valid());
263 itr->fIsExpired =
264 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
265
266 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
267 // Remove the set of columns that are already scheduled for being loaded
268 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
269 ++itr;
270 continue;
271 }
272
273 auto cptr = itr->fFuture.get();
274 // If cptr is nullptr, the cluster expired previously and was released by the I/O thread
275 if (!cptr || itr->fIsExpired) {
276 cptr.reset();
277 itr = fInFlightClusters.erase(itr);
278 continue;
279 }
280
281 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
282 auto existingCluster = FindInPool(cptr->GetId());
283 if (existingCluster) {
284 existingCluster->Adopt(std::move(*cptr));
285 } else {
286 auto idxFreeSlot = FindFreeSlot();
287 fPool[idxFreeSlot] = std::move(cptr);
288 }
289 itr = fInFlightClusters.erase(itr);
290 }
291
292 // Determine clusters which get triggered for background loading
293 for (auto &cptr : fPool) {
294 if (!cptr)
295 continue;
296 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
297 }
298
299 // Figure out if enough work accumulated to justify I/O calls
300 bool skipPrefetch = false;
301 if (provide.GetSize() < fClusterBunchSize) {
302 skipPrefetch = true;
303 for (const auto &kv : provide) {
304 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
305 continue;
306 skipPrefetch = false;
307 break;
308 }
309 }
310
311 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
312 // mutex
313 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
314 // case but it's not ensured by the code
315 if (!skipPrefetch) {
316 for (const auto &kv : provide) {
317 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
318
319 RReadItem readItem;
320 readItem.fClusterKey.fClusterId = kv.first;
321 readItem.fBunchId = kv.second.fBunchId;
322 readItem.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
323
324 RInFlightCluster inFlightCluster;
325 inFlightCluster.fClusterKey.fClusterId = kv.first;
326 inFlightCluster.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
327 inFlightCluster.fFuture = readItem.fPromise.get_future();
328 fInFlightClusters.emplace_back(std::move(inFlightCluster));
329
330 fReadQueue.emplace_back(std::move(readItem));
331 }
332 if (!fReadQueue.empty())
333 fCvHasReadWork.notify_one();
334 }
335 } // work queue lock guard
336
337 return WaitFor(clusterId, physicalColumns);
338}
339
342 const RCluster::ColumnSet_t &physicalColumns)
343{
344 while (true) {
345 // Fast exit: the cluster happens to be already present in the cache pool
346 auto result = FindInPool(clusterId);
347 if (result) {
348 bool hasMissingColumn = false;
349 for (auto cid : physicalColumns) {
350 if (result->ContainsColumn(cid))
351 continue;
352
353 hasMissingColumn = true;
354 break;
355 }
356 if (!hasMissingColumn)
357 return result;
358 }
359
360 // Otherwise the missing data must have been triggered for loading by now, so block and wait
361 decltype(fInFlightClusters)::iterator itr;
362 {
363 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
364 itr = fInFlightClusters.begin();
365 for (; itr != fInFlightClusters.end(); ++itr) {
366 if (itr->fClusterKey.fClusterId == clusterId)
367 break;
368 }
369 R__ASSERT(itr != fInFlightClusters.end());
370 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
371 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
372 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
373 // is released. We need to release the lock before potentially blocking on the cluster future.
374 }
375
376 auto cptr = itr->fFuture.get();
377 if (result) {
378 // Noop unless the page source has a task scheduler
379 fPageSource.UnzipCluster(cptr.get());
380 result->Adopt(std::move(*cptr));
381 } else {
382 auto idxFreeSlot = FindFreeSlot();
383 fPool[idxFreeSlot] = std::move(cptr);
384 }
385
386 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
387 fInFlightClusters.erase(itr);
388 }
389}
390
392{
393 while (true) {
394 decltype(fInFlightClusters)::iterator itr;
395 {
396 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
397 itr = fInFlightClusters.begin();
398 if (itr == fInFlightClusters.end())
399 return;
400 }
401
402 itr->fFuture.wait();
403
404 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
405 fInFlightClusters.erase(itr);
406 }
407}
#define R__unlikely(expr)
Definition RConfig.hxx:578
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Definition TError.h:118
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Int_t i
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present Re...
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr ClusterSize_t kInvalidClusterIndex(std::uint64_t(-1))
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:189
Clusters that are currently being processed by the pipeline.
std::future< std::unique_ptr< RCluster > > fFuture
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise