38 itr1 !=
fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
90 std::deque<RReadItem> readItems;
98 while (!readItems.empty()) {
99 std::vector<RCluster::RKey> clusterKeys;
100 std::int64_t bunchId = -1;
101 for (
unsigned i = 0; i < readItems.size(); ++i) {
102 const auto &item = readItems[i];
109 if ((bunchId >= 0) && (item.fBunchId != bunchId))
111 bunchId = item.fBunchId;
112 clusterKeys.emplace_back(item.fClusterKey);
115 auto clusters =
fPageSource.LoadClusters(clusterKeys);
116 for (std::size_t i = 0; i < clusters.size(); ++i) {
117 readItems[i].fPromise.set_value(std::move(clusters[i]));
119 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
133 std::int64_t fBunchId = -1;
134 std::int64_t fFlags = 0;
135 ColumnSet_t fPhysicalColumnSet;
138 static constexpr std::int64_t kFlagRequired = 0x01;
139 static constexpr std::int64_t kFlagLast = 0x02;
142 std::map<DescriptorId_t, RInfo> fMap;
147 fMap.emplace(clusterId, info);
150 bool Contains(DescriptorId_t clusterId) {
151 return fMap.count(clusterId) > 0;
154 std::size_t GetSize()
const {
return fMap.size(); }
156 void Erase(DescriptorId_t clusterId,
const ColumnSet_t &physicalColumns)
158 auto itr = fMap.find(clusterId);
159 if (itr == fMap.end())
162 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
163 std::inserter(
d,
d.end()),
164 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
168 itr->second.fPhysicalColumnSet =
d;
172 decltype(fMap)::iterator begin() {
return fMap.begin(); }
173 decltype(fMap)::iterator end() {
return fMap.end(); }
178ROOT::Internal::RCluster *
183 std::unordered_set<ROOT::DescriptorId_t> keep{
fPageSource.GetPinnedClusters()};
185 auto descriptorGuard =
fPageSource.GetSharedDescriptorGuard();
188 next = descriptorGuard->FindNextClusterId(next);
190 !
fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next))) {
200 auto descriptorGuard =
fPageSource.GetSharedDescriptorGuard();
203 RProvides::RInfo provideInfo;
204 provideInfo.fPhysicalColumnSet = physicalColumns;
206 provideInfo.fFlags = RProvides::kFlagRequired;
212 next = descriptorGuard->FindNextClusterId(cid);
214 if (!
fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
218 provideInfo.fFlags |= RProvides::kFlagLast;
220 provide.Insert(cid, provideInfo);
224 provideInfo.fFlags = 0;
229 for (
auto itr =
fPool.begin(); itr !=
fPool.end();) {
230 if (provide.Contains(itr->first)) {
234 if (keep.count(itr->first) > 0) {
238 itr =
fPool.erase(itr);
253 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
255 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
260 auto cptr = itr->fFuture.get();
263 const bool isExpired =
264 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
275 auto existingCluster =
fPool.find(cptr->GetId());
276 if (existingCluster !=
fPool.end()) {
277 existingCluster->second->Adopt(std::move(*cptr));
279 const auto cid = cptr->GetId();
280 fPool.emplace(cid, std::move(cptr));
287 for (
const auto &[
_, cptr] :
fPool) {
288 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
292 bool skipPrefetch =
false;
295 for (
const auto &kv : provide) {
296 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
298 skipPrefetch =
false;
308 for (
const auto &kv : provide) {
309 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
313 readItem.
fBunchId = kv.second.fBunchId;
329 return WaitFor(clusterId, physicalColumns);
339 bool hasMissingColumn =
false;
340 for (
auto cid : physicalColumns) {
341 if (
result->second->ContainsColumn(cid))
344 hasMissingColumn =
true;
347 if (!hasMissingColumn)
348 return result->second.get();
354 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
357 if (itr->fClusterKey.fClusterId == clusterId)
367 auto cptr = itr->fFuture.get();
375 result->second->Adopt(std::move(*cptr));
377 const auto cid = cptr->GetId();
378 fPool.emplace(cid, std::move(cptr));
382 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
392 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
395 itr->fFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
#define R__unlikely(expr)
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
A thread-safe integral performance counter.
A thread-safe integral performance counter.
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The cluster pool counters are observed by the page source.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
std::unordered_map< ROOT::DescriptorId_t, std::unique_ptr< RCluster > > fPool
The cache of active clusters and their successors.
std::unique_ptr< RCounters > fCounters
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
void StopBackgroundThread()
Stop the I/O background thread. No-op if already stopped. Called by the destructor.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
void StartBackgroundThread()
Spawn the I/O background thread. No-op if already started.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
ROOT::Internal::RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Performance counters that get registered in fMetrics.
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
RCluster::RKey fClusterKey
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
RCluster::RKey fClusterKey
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
ROOT::DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet