Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.hxx
Go to the documentation of this file.
1/// \file ROOT/RClusterPool.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT7_RClusterPool
17#define ROOT7_RClusterPool
18
19#include <ROOT/RCluster.hxx>
20#include <ROOT/RNTupleUtil.hxx>
21#include <ROOT/RPageStorage.hxx> // for ColumnSet_t
22
23#include <condition_variable>
24#include <memory>
25#include <mutex>
26#include <future>
27#include <queue>
28#include <thread>
29#include <set>
30#include <vector>
31
32namespace ROOT {
33namespace Experimental {
34namespace Detail {
35
36class RPageSource;
37
38// clang-format off
39/**
40\class ROOT::Experimental::Detail::RClusterPool
41\ingroup NTuple
42\brief Managed a set of clusters containing compressed and packed pages
43
44The cluster pool steers the preloading of (partial) clusters. There is a two-step pipeline: in a first step,
45compressed pages are read from clusters into a memory buffer. The second pipeline step decompresses the pages
46and pushes them into the page pool. The actual logic of reading and unzipping is implemented by the page source.
47The cluster pool only orchestrates the work queues for reading and unzipping. It uses two threads, one for
48each pipeline step. The I/O thread for reading waits for data from storage and generates no CPU load. In contrast,
49the unzip thread is supposed to submit multi-threaded, CPU heavy work to the application's task scheduler.
50
51The unzipping step of the pipeline therefore behaves differently depending on whether or not implicit multi-threadin
52is turned on. If it is turned off, i.e. in a single-threaded environment, the cluster pool will only read the
53compressed pages and the page source has to uncompresses pages at a later point when data from the page is requested.
54*/
55// clang-format on
57private:
58 /// Maximum number of queued cluster requests for the I/O thread. A single request can span mutliple clusters.
59 static constexpr unsigned int kWorkQueueLimit = 4;
60
61 /// Request to load a subset of the columns of a particular cluster.
62 /// Work items come in groups and are executed by the page source.
63 struct RReadItem {
64 std::promise<std::unique_ptr<RCluster>> fPromise;
67 };
68
69 /// Request to decompress and if necessary unpack compressed pages. The unzipped pages
70 /// are supposed to be pushed into the page pool by the page source.
71 struct RUnzipItem {
72 std::unique_ptr<RCluster> fCluster;
73 std::promise<std::unique_ptr<RCluster>> fPromise;
74 };
75
76 /// Clusters that are currently being processed by the pipeline. Every in-flight cluster has a corresponding
77 /// work item, first a read item and then an unzip item.
79 std::future<std::unique_ptr<RCluster>> fFuture;
82 /// By the time a cluster has been loaded, this cluster might not be necessary anymore. This can happen if
83 /// there are jumps in the access pattern (i.e. the access pattern deviates from linear access).
84 bool fIsExpired = false;
85
86 bool operator ==(const RInFlightCluster &other) const { return fClusterId == other.fClusterId && fColumns == other.fColumns; }
87 bool operator !=(const RInFlightCluster &other) const { return !(*this == other); }
88 /// First order by cluster id, then by number of columns, than by the column ids in fColumns
89 bool operator <(const RInFlightCluster &other) const;
90 };
91
92 /// Every cluster pool is responsible for exactly one page source that triggers loading of the clusters
93 /// (GetCluster()) and is used for implementing the I/O and cluster memory allocation (PageSource::LoadCluster()).
95 /// The number of clusters before the currently active cluster that should stay in the pool if present
96 unsigned int fWindowPre;
97 /// The number of desired clusters in the pool, including the currently active cluster
98 unsigned int fWindowPost;
99 /// The cache of clusters around the currently active cluster
100 std::vector<std::unique_ptr<RCluster>> fPool;
101
102 /// Protects the shared state between the main thread and the pipeline threads, namely the read and unzip
103 /// work queues and the in-flight clusters vector
104 std::mutex fLockWorkQueue;
105 /// The clusters that were handed off to the I/O thread
106 std::vector<RInFlightCluster> fInFlightClusters;
107 /// Signals a non-empty I/O work queue
108 std::condition_variable fCvHasReadWork;
109 /// The communication channel to the I/O thread
110 std::queue<RReadItem> fReadQueue;
111 /// The lock associated with the fCvHasUnzipWork conditional variable
112 std::mutex fLockUnzipQueue;
113 /// Signals non-empty unzip work queue
114 std::condition_variable fCvHasUnzipWork;
115 /// The communication channel between the I/O thread and the unzip thread
116 std::queue<RUnzipItem> fUnzipQueue;
117
118 /// The I/O thread calls RPageSource::LoadCluster() asynchronously. The thread is mostly waiting for the
119 /// data to arrive (blocked by the kernel) and therefore can safely run in addition to the application
120 /// main threads.
121 std::thread fThreadIo;
122 /// The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it. If implicit
123 /// multi-threading is turned off, the UnzipCluster() call is a no-op. Otherwise, the UnzipCluster() call
124 /// schedules the unzipping of pages using the application's task scheduler.
125 std::thread fThreadUnzip;
126
127 /// Every cluster id has at most one corresponding RCluster pointer in the pool
128 RCluster *FindInPool(DescriptorId_t clusterId) const;
129 /// Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())
130 /// make sure that a free slot actually exists
131 size_t FindFreeSlot() const;
132 /// The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool
133 void ExecReadClusters();
134 /// The unzip thread routine which takes a loaded cluster and passes it to fPageSource.UnzipCluster (which
135 /// might be a no-op if IMT is off). Marks the cluster as ready to be picked up by the main thread.
136 void ExecUnzipClusters();
137 /// Returns the given cluster from the pool, which needs to contain at least the columns `columns`.
138 /// Executed at the end of GetCluster when all missing data pieces have been sent to the load queue.
139 /// Ideally, the function returns without blocking if the cluster is already in the pool.
140 RCluster *WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns);
141
142public:
143 static constexpr unsigned int kDefaultPoolSize = 4;
144 RClusterPool(RPageSource &pageSource, unsigned int size);
145 explicit RClusterPool(RPageSource &pageSource) : RClusterPool(pageSource, kDefaultPoolSize) {}
146 RClusterPool(const RClusterPool &other) = delete;
147 RClusterPool &operator =(const RClusterPool &other) = delete;
149
150 unsigned int GetWindowPre() const { return fWindowPre; }
151 unsigned int GetWindowPost() const { return fWindowPost; }
152
153 /// Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread load
154 /// the cluster in the pool, blocks until done, and then returns it. Triggers along the way the background loading
155 /// of the following fWindowPost number of clusters. The returned cluster has at least all the pages of `columns`
156 /// and possibly pages of other columns, too. If implicit multi-threading is turned on, the uncompressed pages
157 /// of the returned cluster are already pushed into the page pool associated with the page source upon return.
158 /// The cluster remains valid until the next call to GetCluster().
160}; // class RClusterPool
161
162} // namespace Detail
163} // namespace Experimental
164} // namespace ROOT
165
166#endif
Managed a set of clusters containing compressed and packed pages.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the pipeline threads, namely the read and unzip...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::mutex fLockUnzipQueue
The lock associated with the fCvHasUnzipWork conditional variable.
static constexpr unsigned int kDefaultPoolSize
RClusterPool(const RClusterPool &other)=delete
RCluster * WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
unsigned int fWindowPost
The number of desired clusters in the pool, including the currently active cluster.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RClusterPool & operator=(const RClusterPool &other)=delete
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadCluster() asynchronously.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
std::queue< RUnzipItem > fUnzipQueue
The communication channel between the I/O thread and the unzip thread.
std::queue< RReadItem > fReadQueue
The communication channel to the I/O thread.
std::condition_variable fCvHasUnzipWork
Signals non-empty unzip work queue.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present.
RCluster * GetCluster(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
static constexpr unsigned int kWorkQueueLimit
Maximum number of queued cluster requests for the I/O thread. A single request can span mutliple clus...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:154
Abstract interface to read data from an ntuple.
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
bool fIsExpired
By the time a cluster has been loaded, this cluster might not be necessary anymore.
bool operator!=(const RInFlightCluster &other) const
bool operator==(const RInFlightCluster &other) const
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
std::promise< std::unique_ptr< RCluster > > fPromise
Request to decompress and if necessary unpack compressed pages.
std::promise< std::unique_ptr< RCluster > > fPromise