54 fPool(2 * clusterBunchSize),
64 std::unique_lock<std::mutex> lock(fLockWorkQueue);
66 fCvHasReadWork.notify_one();
73 std::deque<RReadItem> readItems;
76 std::unique_lock<std::mutex> lock(fLockWorkQueue);
77 fCvHasReadWork.wait(lock, [&]{
return !fReadQueue.empty(); });
78 std::swap(readItems, fReadQueue);
81 while (!readItems.empty()) {
82 std::vector<RCluster::RKey> clusterKeys;
83 std::int64_t bunchId = -1;
84 for (
unsigned i = 0; i < readItems.size(); ++i) {
85 const auto &item = readItems[i];
92 if ((bunchId >= 0) && (item.fBunchId != bunchId))
94 bunchId = item.fBunchId;
95 clusterKeys.emplace_back(item.fClusterKey);
98 auto clusters = fPageSource.LoadClusters(clusterKeys);
99 for (std::size_t i = 0; i < clusters.size(); ++i) {
104 std::unique_lock<std::mutex> lock(fLockWorkQueue);
105 discard = std::any_of(fInFlightClusters.begin(), fInFlightClusters.end(),
106 [thisClusterId = clusters[i]->GetId()](
auto &inFlight) {
107 return inFlight.fClusterKey.fClusterId == thisClusterId && inFlight.fIsExpired;
113 readItems[i].fPromise.set_value(
nullptr);
115 readItems[i].fPromise.set_value(std::move(clusters[i]));
118 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
126 for (
const auto &cptr : fPool) {
127 if (cptr && (cptr->GetId() == clusterId))
135 auto N = fPool.size();
136 for (
unsigned i = 0; i <
N; ++i) {
155 std::int64_t fBunchId = -1;
157 ColumnSet_t fPhysicalColumnSet;
160 static constexpr std::int64_t kFlagRequired = 0x01;
161 static constexpr std::int64_t kFlagLast = 0x02;
164 std::map<DescriptorId_t, RInfo> fMap;
169 fMap.emplace(clusterId, info);
172 bool Contains(DescriptorId_t clusterId) {
173 return fMap.count(clusterId) > 0;
176 std::size_t GetSize()
const {
return fMap.size(); }
178 void Erase(DescriptorId_t clusterId,
const ColumnSet_t &physicalColumns)
180 auto itr = fMap.find(clusterId);
181 if (itr == fMap.end())
184 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
185 std::inserter(
d,
d.end()),
186 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
190 itr->second.fPhysicalColumnSet =
d;
194 decltype(fMap)::iterator begin() {
return fMap.begin(); }
195 decltype(fMap)::iterator
end() {
return fMap.end(); }
204 std::set<DescriptorId_t> keep;
207 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
210 auto prev = clusterId;
211 for (
unsigned int i = 0; i < fWindowPre; ++i) {
212 prev = descriptorGuard->FindPrevClusterId(prev);
219 RProvides::RInfo provideInfo;
220 provideInfo.fPhysicalColumnSet = physicalColumns;
221 provideInfo.fBunchId = fBunchId;
222 provideInfo.fFlags = RProvides::kFlagRequired;
223 for (
DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
224 if (i == fClusterBunchSize)
225 provideInfo.fBunchId = ++fBunchId;
228 next = descriptorGuard->FindNextClusterId(cid);
230 if (!fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
234 provideInfo.fFlags |= RProvides::kFlagLast;
236 provide.Insert(cid, provideInfo);
240 provideInfo.fFlags = 0;
245 for (
auto &cptr : fPool) {
248 if (provide.Contains(cptr->GetId()) > 0)
250 if (keep.count(cptr->GetId()) > 0)
262 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
264 for (
auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
267 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
269 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
271 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
276 auto cptr = itr->fFuture.get();
278 if (!cptr || itr->fIsExpired) {
280 itr = fInFlightClusters.erase(itr);
285 fPageSource.UnzipCluster(cptr.get());
288 auto existingCluster = FindInPool(cptr->GetId());
289 if (existingCluster) {
290 existingCluster->Adopt(std::move(*cptr));
292 auto idxFreeSlot = FindFreeSlot();
293 fPool[idxFreeSlot] = std::move(cptr);
295 itr = fInFlightClusters.erase(itr);
299 for (
auto &cptr : fPool) {
302 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
306 bool skipPrefetch =
false;
307 if (provide.GetSize() < fClusterBunchSize) {
309 for (
const auto &kv : provide) {
310 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
312 skipPrefetch =
false;
322 for (
const auto &kv : provide) {
323 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
327 readItem.
fBunchId = kv.second.fBunchId;
334 fInFlightClusters.emplace_back(std::move(inFlightCluster));
336 fReadQueue.emplace_back(std::move(readItem));
338 if (!fReadQueue.empty())
339 fCvHasReadWork.notify_one();
343 return WaitFor(clusterId, physicalColumns);
352 auto result = FindInPool(clusterId);
354 bool hasMissingColumn =
false;
355 for (
auto cid : physicalColumns) {
356 if (
result->ContainsColumn(cid))
359 hasMissingColumn =
true;
362 if (!hasMissingColumn)
367 decltype(fInFlightClusters)::iterator itr;
369 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
370 itr = fInFlightClusters.begin();
371 for (; itr != fInFlightClusters.end(); ++itr) {
372 if (itr->fClusterKey.fClusterId == clusterId)
375 R__ASSERT(itr != fInFlightClusters.end());
382 auto cptr = itr->fFuture.get();
387 fPageSource.UnzipCluster(cptr.get());
390 result->Adopt(std::move(*cptr));
392 auto idxFreeSlot = FindFreeSlot();
393 fPool[idxFreeSlot] = std::move(cptr);
396 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
397 fInFlightClusters.erase(itr);
404 decltype(fInFlightClusters)::iterator itr;
406 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
407 itr = fInFlightClusters.begin();
408 if (itr == fInFlightClusters.end())
414 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
415 fInFlightClusters.erase(itr);
#define R__unlikely(expr)
std::ios_base::fmtflags fFlags
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr ClusterSize_t kInvalidClusterIndex(std::uint64_t(-1))
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Clusters that are currently being processed by the pipeline.
std::future< std::unique_ptr< RCluster > > fFuture
RCluster::RKey fClusterKey
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
RCluster::RKey fClusterKey
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet