16#ifndef ROOT7_RClusterPool
17#define ROOT7_RClusterPool
23#include <condition_variable>
33namespace Experimental {
64 std::promise<std::unique_ptr<RCluster>>
fPromise;
73 std::promise<std::unique_ptr<RCluster>>
fPromise;
79 std::future<std::unique_ptr<RCluster>>
fFuture;
100 std::vector<std::unique_ptr<RCluster>>
fPool;
Managed a set of clusters containing compressed and packed pages.
unsigned int GetWindowPre() const
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the pipeline threads, namely the read and unzip...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
RClusterPool(RPageSource &pageSource)
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::mutex fLockUnzipQueue
The lock associated with the fCvHasUnzipWork conditional variable.
unsigned int GetWindowPost() const
static constexpr unsigned int kDefaultPoolSize
RClusterPool(const RClusterPool &other)=delete
RCluster * WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
unsigned int fWindowPost
The number of desired clusters in the pool, including the currently active cluster.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RClusterPool & operator=(const RClusterPool &other)=delete
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadCluster() asynchronously.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
std::queue< RUnzipItem > fUnzipQueue
The communication channel between the I/O thread and the unzip thread.
std::queue< RReadItem > fReadQueue
The communication channel to the I/O thread.
std::condition_variable fCvHasUnzipWork
Signals non-empty unzip work queue.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present.
RCluster * GetCluster(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
static constexpr unsigned int kWorkQueueLimit
Maximum number of queued cluster requests for the I/O thread. A single request can span mutliple clus...
An in-memory subset of the packed and compressed pages of a cluster.
Abstract interface to read data from an ntuple.
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
RPageSource::ColumnSet_t fColumns
bool fIsExpired
By the time a cluster has been loaded, this cluster might not be necessary anymore.
bool operator!=(const RInFlightCluster &other) const
bool operator==(const RInFlightCluster &other) const
std::future< std::unique_ptr< RCluster > > fFuture
DescriptorId_t fClusterId
Request to load a subset of the columns of a particular cluster.
DescriptorId_t fClusterId
std::promise< std::unique_ptr< RCluster > > fPromise
RPageSource::ColumnSet_t fColumns
Request to decompress and if necessary unpack compressed pages.
std::unique_ptr< RCluster > fCluster
std::promise< std::unique_ptr< RCluster > > fPromise