Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
35 if (fClusterId == other.fClusterId) {
36 if (fColumns.size() == other.fColumns.size()) {
37 for (auto itr1 = fColumns.begin(), itr2 = other.fColumns.begin(); itr1 != fColumns.end(); ++itr1, ++itr2) {
38 if (*itr1 == *itr2)
39 continue;
40 return *itr1 < *itr2;
41 }
42 // *this == other
43 return false;
44 }
45 return fColumns.size() < other.fColumns.size();
46 }
47 return fClusterId < other.fClusterId;
48}
49
51 : fPageSource(pageSource)
52 , fPool(size)
55{
56 R__ASSERT(size > 0);
57 fWindowPre = 0;
58 fWindowPost = size;
59 // Large pools maintain a small look-back window together with the large look-ahead window
60 while ((1u << fWindowPre) < (fWindowPost - (fWindowPre + 1))) {
61 fWindowPre++;
63 }
64}
65
67{
68 {
69 // Controlled shutdown of the I/O thread
70 std::unique_lock<std::mutex> lock(fLockWorkQueue);
71 fReadQueue.emplace(RReadItem());
72 fCvHasReadWork.notify_one();
73 }
74 fThreadIo.join();
75
76 {
77 // Controlled shutdown of the unzip thread
78 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
79 fUnzipQueue.emplace(RUnzipItem());
80 fCvHasUnzipWork.notify_one();
81 }
82 fThreadUnzip.join();
83}
84
86{
87 while (true) {
88 std::vector<RUnzipItem> unzipItems;
89 {
90 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
91 fCvHasUnzipWork.wait(lock, [&]{ return !fUnzipQueue.empty(); });
92 while (!fUnzipQueue.empty()) {
93 unzipItems.emplace_back(std::move(fUnzipQueue.front()));
94 fUnzipQueue.pop();
95 }
96 }
97
98 for (auto &item : unzipItems) {
99 if (!item.fCluster)
100 return;
101
102 fPageSource.UnzipCluster(item.fCluster.get());
103
104 // Afterwards the GetCluster() method in the main thread can pick-up the cluster
105 item.fPromise.set_value(std::move(item.fCluster));
106 }
107 } // while (true)
108}
109
111{
112 while (true) {
113 std::vector<RReadItem> readItems;
114 {
115 std::unique_lock<std::mutex> lock(fLockWorkQueue);
116 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
117 while (!fReadQueue.empty()) {
118 readItems.emplace_back(std::move(fReadQueue.front()));
119 fReadQueue.pop();
120 }
121 }
122
123 for (auto &item : readItems) {
124 if (item.fClusterId == kInvalidDescriptorId)
125 return;
126
127 // TODO(jblomer): the page source needs to be capable of loading multiple clusters in one go
128 auto cluster = fPageSource.LoadCluster(item.fClusterId, item.fColumns);
129
130 // Meanwhile, the user might have requested clusters outside the look-ahead window, so that we don't
131 // need the cluster anymore, in which case we simply discard it right away, before moving it to the pool
132 bool discard = false;
133 {
134 std::unique_lock<std::mutex> lock(fLockWorkQueue);
135 for (auto &inFlight : fInFlightClusters) {
136 if (inFlight.fClusterId != item.fClusterId)
137 continue;
138 discard = inFlight.fIsExpired;
139 break;
140 }
141 }
142 if (discard) {
143 cluster.reset();
144 item.fPromise.set_value(std::move(cluster));
145 } else {
146 // Hand-over the loaded cluster pages to the unzip thread
147 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
148 fUnzipQueue.emplace(RUnzipItem{std::move(cluster), std::move(item.fPromise)});
149 fCvHasUnzipWork.notify_one();
150 }
151 }
152 } // while (true)
153}
154
157{
158 for (const auto &cptr : fPool) {
159 if (cptr && (cptr->GetId() == clusterId))
160 return cptr.get();
161 }
162 return nullptr;
163}
164
166{
167 auto N = fPool.size();
168 for (unsigned i = 0; i < N; ++i) {
169 if (!fPool[i])
170 return i;
171 }
172
173 R__ASSERT(false);
174 return N;
175}
176
177
178namespace {
179
180/// Helper class for the (cluster, column list) pairs that should be loaded in the background
181class RProvides {
184
185private:
186 std::map<DescriptorId_t, ColumnSet_t> fMap;
187
188public:
189 void Insert(DescriptorId_t clusterId, const ColumnSet_t &columns)
190 {
191 fMap.emplace(clusterId, columns);
192 }
193
194 bool Contains(DescriptorId_t clusterId) {
195 return fMap.count(clusterId) > 0;
196 }
197
198 void Erase(DescriptorId_t clusterId, const ColumnSet_t &columns)
199 {
200 auto itr = fMap.find(clusterId);
201 if (itr == fMap.end())
202 return;
203 ColumnSet_t d;
204 std::copy_if(itr->second.begin(), itr->second.end(), std::inserter(d, d.end()),
205 [&columns] (DescriptorId_t needle) { return columns.count(needle) == 0; });
206 if (d.empty()) {
207 fMap.erase(itr);
208 } else {
209 itr->second = d;
210 }
211 }
212
213 decltype(fMap)::iterator begin() { return fMap.begin(); }
214 decltype(fMap)::iterator end() { return fMap.end(); }
215};
216
217} // anonymous namespace
218
221 DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
222{
223 const auto &desc = fPageSource.GetDescriptor();
224
225 // Determine previous cluster ids that we keep if they happen to be in the pool
226 std::set<DescriptorId_t> keep;
227 auto prev = clusterId;
228 for (unsigned int i = 0; i < fWindowPre; ++i) {
229 prev = desc.FindPrevClusterId(prev);
230 if (prev == kInvalidDescriptorId)
231 break;
232 keep.insert(prev);
233 }
234
235 // Determine following cluster ids and the column ids that we want to make available
236 RProvides provide;
237 provide.Insert(clusterId, columns);
238 auto next = clusterId;
239 // TODO(jblomer): instead of a fixed-sized window, eventually we should determine the window size based on
240 // a user-defined memory limit. The size of the preloaded data can be determined at the beginning of
241 // GetCluster from the descriptor and the current contents of fPool.
242 for (unsigned int i = 1; i < fWindowPost; ++i) {
243 next = desc.FindNextClusterId(next);
244 if (next == kInvalidDescriptorId)
245 break;
246 provide.Insert(next, columns);
247 }
248
249 // Clear the cache from clusters not the in the look-ahead or the look-back window
250 for (auto &cptr : fPool) {
251 if (!cptr)
252 continue;
253 if (provide.Contains(cptr->GetId()) > 0)
254 continue;
255 if (keep.count(cptr->GetId()) > 0)
256 continue;
257 cptr.reset();
258 }
259
260 // Move clusters that meanwhile arrived into cache pool
261 {
262 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
263 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
264 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
265 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
266 // still be reasonably small and the lock is rarely taken (usually once per cluster).
267 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
268
269 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
270 R__ASSERT(itr->fFuture.valid());
271 itr->fIsExpired = !provide.Contains(itr->fClusterId) && (keep.count(itr->fClusterId) == 0);
272
273 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
274 // Remove the set of columns that are already scheduled for being loaded
275 provide.Erase(itr->fClusterId, itr->fColumns);
276 ++itr;
277 continue;
278 }
279
280 auto cptr = itr->fFuture.get();
281 // If cptr is nullptr, the cluster expired previously and was released by the I/O thread
282 if (!cptr || itr->fIsExpired) {
283 cptr.reset();
284 itr = fInFlightClusters.erase(itr);
285 continue;
286 }
287
288 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
289 auto existingCluster = FindInPool(cptr->GetId());
290 if (existingCluster) {
291 existingCluster->Adopt(std::move(*cptr));
292 } else {
293 auto idxFreeSlot = FindFreeSlot();
294 fPool[idxFreeSlot] = std::move(cptr);
295 }
296 itr = fInFlightClusters.erase(itr);
297 }
298
299 // Determine clusters which get triggered for background loading
300 for (auto &cptr : fPool) {
301 if (!cptr)
302 continue;
303 provide.Erase(cptr->GetId(), cptr->GetAvailColumns());
304 }
305
306 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
307 // mutex
308 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
309 // case but it's not ensured by the code
310 for (const auto &kv : provide) {
311 R__ASSERT(!kv.second.empty());
312
313 RReadItem readItem;
314 readItem.fClusterId = kv.first;
315 readItem.fColumns = kv.second;
316
317 RInFlightCluster inFlightCluster;
318 inFlightCluster.fClusterId = kv.first;
319 inFlightCluster.fColumns = kv.second;
320 inFlightCluster.fFuture = readItem.fPromise.get_future();
321 fInFlightClusters.emplace_back(std::move(inFlightCluster));
322
323 fReadQueue.emplace(std::move(readItem));
324 }
325 if (fReadQueue.size() > 0)
326 fCvHasReadWork.notify_one();
327 } // work queue lock guard
328
329 return WaitFor(clusterId, columns);
330}
331
332
335 DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
336{
337 while (true) {
338 // Fast exit: the cluster happens to be already present in the cache pool
339 auto result = FindInPool(clusterId);
340 if (result) {
341 bool hasMissingColumn = false;
342 for (auto cid : columns) {
343 if (result->ContainsColumn(cid))
344 continue;
345
346 hasMissingColumn = true;
347 break;
348 }
349 if (!hasMissingColumn)
350 return result;
351 }
352
353 // Otherwise the missing data must have been triggered for loading by now, so block and wait
354 decltype(fInFlightClusters)::iterator itr;
355 {
356 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
357 itr = fInFlightClusters.begin();
358 for (; itr != fInFlightClusters.end(); ++itr) {
359 if (itr->fClusterId == clusterId)
360 break;
361 }
362 R__ASSERT(itr != fInFlightClusters.end());
363 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
364 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
365 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
366 // is released. We need to release the lock before potentially blocking on the cluster future.
367 }
368
369 auto cptr = itr->fFuture.get();
370 if (result) {
371 result->Adopt(std::move(*cptr));
372 } else {
373 auto idxFreeSlot = FindFreeSlot();
374 fPool[idxFreeSlot] = std::move(cptr);
375 }
376
377 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
378 fInFlightClusters.erase(itr);
379 }
380}
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Definition TError.h:120
#define N
Managed a set of clusters containing compressed and packed pages.
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
RCluster * WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
unsigned int fWindowPost
The number of desired clusters in the pool, including the currently active cluster.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadCluster() asynchronously.
RClusterPool(RPageSource &pageSource, unsigned int size)
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present.
RCluster * GetCluster(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:154
Abstract interface to read data from an ntuple.
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
std::promise< std::unique_ptr< RCluster > > fPromise
Request to decompress and if necessary unpack compressed pages.