38 itr1 !=
fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
54 fPool(2 * clusterBunchSize),
73 std::deque<RReadItem> readItems;
81 while (!readItems.empty()) {
82 std::vector<RCluster::RKey> clusterKeys;
83 std::int64_t bunchId = -1;
84 for (
unsigned i = 0;
i < readItems.size(); ++
i) {
85 const auto &item = readItems[
i];
92 if ((bunchId >= 0) && (item.fBunchId != bunchId))
94 bunchId = item.fBunchId;
95 clusterKeys.emplace_back(item.fClusterKey);
98 auto clusters =
fPageSource.LoadClusters(clusterKeys);
99 for (std::size_t
i = 0;
i < clusters.size(); ++
i) {
106 [thisClusterId = clusters[
i]->GetId()](
auto &inFlight) {
107 return inFlight.fClusterKey.fClusterId == thisClusterId && inFlight.fIsExpired;
113 readItems[
i].fPromise.set_value(std::move(clusters[
i]));
115 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
123 for (
const auto &cptr :
fPool) {
124 if (cptr && (cptr->GetId() == clusterId))
133 for (
unsigned i = 0;
i <
N; ++
i) {
152 std::int64_t fBunchId = -1;
153 std::int64_t fFlags = 0;
154 ColumnSet_t fPhysicalColumnSet;
157 static constexpr std::int64_t kFlagRequired = 0x01;
158 static constexpr std::int64_t kFlagLast = 0x02;
161 std::map<DescriptorId_t, RInfo> fMap;
166 fMap.emplace(clusterId, info);
169 bool Contains(DescriptorId_t clusterId) {
170 return fMap.count(clusterId) > 0;
173 std::size_t GetSize()
const {
return fMap.size(); }
175 void Erase(DescriptorId_t clusterId,
const ColumnSet_t &physicalColumns)
177 auto itr = fMap.find(clusterId);
178 if (itr == fMap.end())
181 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
182 std::inserter(
d,
d.end()),
183 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
187 itr->second.fPhysicalColumnSet =
d;
191 decltype(fMap)::iterator begin() {
return fMap.begin(); }
192 decltype(fMap)::iterator
end() {
return fMap.end(); }
197ROOT::Experimental::Internal::RCluster *
201 std::set<DescriptorId_t> keep;
204 auto descriptorGuard =
fPageSource.GetSharedDescriptorGuard();
207 auto prev = clusterId;
209 prev = descriptorGuard->FindPrevClusterId(prev);
216 RProvides::RInfo provideInfo;
217 provideInfo.fPhysicalColumnSet = physicalColumns;
219 provideInfo.fFlags = RProvides::kFlagRequired;
225 next = descriptorGuard->FindNextClusterId(cid);
227 if (!
fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
231 provideInfo.fFlags |= RProvides::kFlagLast;
233 provide.Insert(cid, provideInfo);
237 provideInfo.fFlags = 0;
242 for (
auto &cptr :
fPool) {
245 if (provide.Contains(cptr->GetId()) > 0)
247 if (keep.count(cptr->GetId()) > 0)
264 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
266 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
268 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
273 auto cptr = itr->fFuture.get();
275 if (!cptr || itr->fIsExpired) {
282 auto existingCluster =
FindInPool(cptr->GetId());
283 if (existingCluster) {
284 existingCluster->Adopt(std::move(*cptr));
287 fPool[idxFreeSlot] = std::move(cptr);
293 for (
auto &cptr :
fPool) {
296 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
300 bool skipPrefetch =
false;
303 for (
const auto &kv : provide) {
304 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
306 skipPrefetch =
false;
316 for (
const auto &kv : provide) {
317 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
321 readItem.
fBunchId = kv.second.fBunchId;
337 return WaitFor(clusterId, physicalColumns);
348 bool hasMissingColumn =
false;
349 for (
auto cid : physicalColumns) {
350 if (
result->ContainsColumn(cid))
353 hasMissingColumn =
true;
356 if (!hasMissingColumn)
363 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
366 if (itr->fClusterKey.fClusterId == clusterId)
376 auto cptr = itr->fFuture.get();
380 result->Adopt(std::move(*cptr));
383 fPool[idxFreeSlot] = std::move(cptr);
386 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
396 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
404 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
#define R__unlikely(expr)
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present Re...
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr ClusterSize_t kInvalidClusterIndex(std::uint64_t(-1))
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Clusters that are currently being processed by the pipeline.
std::future< std::unique_ptr< RCluster > > fFuture
RCluster::RKey fClusterKey
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
RCluster::RKey fClusterKey
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet