Logo ROOT  
Reference Guide
RClusterPool.hxx
Go to the documentation of this file.
1 /// \file ROOT/RClusterPool.hxx
2 /// \ingroup NTuple ROOT7
3 /// \author Jakob Blomer <jblomer@cern.ch>
4 /// \date 2020-03-11
5 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6 /// is welcome!
7 
8 /*************************************************************************
9  * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10  * All rights reserved. *
11  * *
12  * For the licensing terms see $ROOTSYS/LICENSE. *
13  * For the list of contributors see $ROOTSYS/README/CREDITS. *
14  *************************************************************************/
15 
16 #ifndef ROOT7_RClusterPool
17 #define ROOT7_RClusterPool
18 
19 #include <ROOT/RCluster.hxx>
20 #include <ROOT/RNTupleUtil.hxx>
21 #include <ROOT/RPageStorage.hxx> // for ColumnSet_t
22 
23 #include <condition_variable>
24 #include <memory>
25 #include <mutex>
26 #include <future>
27 #include <queue>
28 #include <thread>
29 #include <set>
30 #include <vector>
31 
32 namespace ROOT {
33 namespace Experimental {
34 namespace Detail {
35 
36 class RPageSource;
37 
38 // clang-format off
39 /**
40 \class ROOT::Experimental::Detail::RClusterPool
41 \ingroup NTuple
42 \brief Managed a set of clusters containing compressed and packed pages
43 
44 The cluster pool steers the preloading of (partial) clusters. There is a two-step pipeline: in a first step,
45 compressed pages are read from clusters into a memory buffer. The second pipeline step decompresses the pages
46 and pushes them into the page pool. The actual logic of reading and unzipping is implemented by the page source.
47 The cluster pool only orchestrates the work queues for reading and unzipping. It uses two threads, one for
48 each pipeline step. The I/O thread for reading waits for data from storage and generates no CPU load. In contrast,
49 the unzip thread is supposed to submit multi-threaded, CPU heavy work to the application's task scheduler.
50 
51 The unzipping step of the pipeline therefore behaves differently depending on whether or not implicit multi-threadin
52 is turned on. If it is turned off, i.e. in a single-threaded environment, the cluster pool will only read the
53 compressed pages and the page source has to uncompresses pages at a later point when data from the page is requested.
54 */
55 // clang-format on
56 class RClusterPool {
57 private:
58  /// Maximum number of queued cluster requests for the I/O thread. A single request can span mutliple clusters.
59  static constexpr unsigned int kWorkQueueLimit = 4;
60 
61  /// Request to load a subset of the columns of a particular cluster.
62  /// Work items come in groups and are executed by the page source.
63  struct RReadItem {
64  std::promise<std::unique_ptr<RCluster>> fPromise;
67  };
68 
69  /// Request to decompress and if necessary unpack compressed pages. The unzipped pages
70  /// are supposed to be pushed into the page pool by the page source.
71  struct RUnzipItem {
72  std::unique_ptr<RCluster> fCluster;
73  std::promise<std::unique_ptr<RCluster>> fPromise;
74  };
75 
76  /// Clusters that are currently being processed by the pipeline. Every in-flight cluster has a corresponding
77  /// work item, first a read item and then an unzip item.
79  std::future<std::unique_ptr<RCluster>> fFuture;
82  /// By the time a cluster has been loaded, this cluster might not be necessary anymore. This can happen if
83  /// there are jumps in the access pattern (i.e. the access pattern deviates from linear access).
84  bool fIsExpired = false;
85 
86  bool operator ==(const RInFlightCluster &other) const { return fClusterId == other.fClusterId && fColumns == other.fColumns; }
87  bool operator !=(const RInFlightCluster &other) const { return !(*this == other); }
88  /// First order by cluster id, then by number of columns, than by the column ids in fColumns
89  bool operator <(const RInFlightCluster &other) const;
90  };
91 
92  /// Every cluster pool is responsible for exactly one page source that triggers loading of the clusters
93  /// (GetCluster()) and is used for implementing the I/O and cluster memory allocation (PageSource::LoadCluster()).
95  /// The number of clusters before the currently active cluster that should stay in the pool if present
96  unsigned int fWindowPre;
97  /// The number of desired clusters in the pool, including the currently active cluster
98  unsigned int fWindowPost;
99  /// The cache of clusters around the currently active cluster
100  std::vector<std::unique_ptr<RCluster>> fPool;
101 
102  /// Protects the shared state between the main thread and the pipeline threads, namely the read and unzip
103  /// work queues and the in-flight clusters vector
104  std::mutex fLockWorkQueue;
105  /// The clusters that were handed off to the I/O thread
106  std::vector<RInFlightCluster> fInFlightClusters;
107  /// Signals a non-empty I/O work queue
108  std::condition_variable fCvHasReadWork;
109  /// The communication channel to the I/O thread
110  std::queue<RReadItem> fReadQueue;
111  /// The lock associated with the fCvHasUnzipWork conditional variable
112  std::mutex fLockUnzipQueue;
113  /// Signals non-empty unzip work queue
114  std::condition_variable fCvHasUnzipWork;
115  /// The communication channel between the I/O thread and the unzip thread
116  std::queue<RUnzipItem> fUnzipQueue;
117 
118  /// The I/O thread calls RPageSource::LoadCluster() asynchronously. The thread is mostly waiting for the
119  /// data to arrive (blocked by the kernel) and therefore can safely run in addition to the application
120  /// main threads.
121  std::thread fThreadIo;
122  /// The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it. If implicit
123  /// multi-threading is turned off, the UnzipCluster() call is a no-op. Otherwise, the UnzipCluster() call
124  /// schedules the unzipping of pages using the application's task scheduler.
125  std::thread fThreadUnzip;
126 
127  /// Every cluster id has at most one corresponding RCluster pointer in the pool
128  RCluster *FindInPool(DescriptorId_t clusterId) const;
129  /// Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())
130  /// make sure that a free slot actually exists
131  size_t FindFreeSlot() const;
132  /// The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool
133  void ExecReadClusters();
134  /// The unzip thread routine which takes a loaded cluster and passes it to fPageSource.UnzipCluster (which
135  /// might be a no-op if IMT is off). Marks the cluster as ready to be picked up by the main thread.
136  void ExecUnzipClusters();
137  /// Returns the given cluster from the pool, which needs to contain at least the columns `columns`.
138  /// Executed at the end of GetCluster when all missing data pieces have been sent to the load queue.
139  /// Ideally, the function returns without blocking if the cluster is already in the pool.
140  RCluster *WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns);
141 
142 public:
143  static constexpr unsigned int kDefaultPoolSize = 4;
144  RClusterPool(RPageSource &pageSource, unsigned int size);
145  explicit RClusterPool(RPageSource &pageSource) : RClusterPool(pageSource, kDefaultPoolSize) {}
146  RClusterPool(const RClusterPool &other) = delete;
147  RClusterPool &operator =(const RClusterPool &other) = delete;
148  ~RClusterPool();
149 
150  unsigned int GetWindowPre() const { return fWindowPre; }
151  unsigned int GetWindowPost() const { return fWindowPost; }
152 
153  /// Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread load
154  /// the cluster in the pool, blocks until done, and then returns it. Triggers along the way the background loading
155  /// of the following fWindowPost number of clusters. The returned cluster has at least all the pages of `columns`
156  /// and possibly pages of other columns, too. If implicit multi-threading is turned on, the uncompressed pages
157  /// of the returned cluster are already pushed into the page pool associated with the page source upon return.
158  /// The cluster remains valid until the next call to GetCluster().
159  RCluster *GetCluster(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns);
160 }; // class RClusterPool
161 
162 } // namespace Detail
163 } // namespace Experimental
164 } // namespace ROOT
165 
166 #endif
ROOT::Experimental::Detail::RClusterPool::operator=
RClusterPool & operator=(const RClusterPool &other)=delete
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::operator!=
bool operator!=(const RInFlightCluster &other) const
Definition: RClusterPool.hxx:87
ROOT::Experimental::Detail::RClusterPool::fReadQueue
std::queue< RReadItem > fReadQueue
The communication channel to the I/O thread.
Definition: RClusterPool.hxx:110
ROOT::Experimental::Detail::RClusterPool::fLockWorkQueue
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the pipeline threads, namely the read and unzip...
Definition: RClusterPool.hxx:104
ROOT::Experimental::Detail::RClusterPool::GetWindowPost
unsigned int GetWindowPost() const
Definition: RClusterPool.hxx:151
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::fFuture
std::future< std::unique_ptr< RCluster > > fFuture
Definition: RClusterPool.hxx:79
ROOT::Experimental::Detail::RClusterPool::fWindowPost
unsigned int fWindowPost
The number of desired clusters in the pool, including the currently active cluster.
Definition: RClusterPool.hxx:98
ROOT::Experimental::Detail::RClusterPool::~RClusterPool
~RClusterPool()
Definition: RClusterPool.cxx:66
ROOT::Experimental::Detail::RClusterPool::RReadItem::fPromise
std::promise< std::unique_ptr< RCluster > > fPromise
Definition: RClusterPool.hxx:64
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::fClusterId
DescriptorId_t fClusterId
Definition: RClusterPool.hxx:80
ROOT::Experimental::DescriptorId_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
Definition: RNTupleUtil.hxx:91
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::fColumns
RPageSource::ColumnSet_t fColumns
Definition: RClusterPool.hxx:81
ROOT::Experimental::Detail::RPageSource::ColumnSet_t
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
Definition: RPageStorage.hxx:196
ROOT::Experimental::Detail::RClusterPool::FindFreeSlot
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
Definition: RClusterPool.cxx:165
ROOT::Experimental::Detail::RPageSource
Abstract interface to read data from an ntuple.
Definition: RPageStorage.hxx:193
ROOT::Experimental::Detail::RClusterPool::GetWindowPre
unsigned int GetWindowPre() const
Definition: RClusterPool.hxx:150
ROOT::Experimental::Detail::RClusterPool::fUnzipQueue
std::queue< RUnzipItem > fUnzipQueue
The communication channel between the I/O thread and the unzip thread.
Definition: RClusterPool.hxx:116
ROOT::Experimental::Detail::RClusterPool::ExecUnzipClusters
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
Definition: RClusterPool.cxx:85
ROOT::Experimental::Detail::RClusterPool::fInFlightClusters
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
Definition: RClusterPool.hxx:106
RPageStorage.hxx
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::operator==
bool operator==(const RInFlightCluster &other) const
Definition: RClusterPool.hxx:86
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::fIsExpired
bool fIsExpired
By the time a cluster has been loaded, this cluster might not be necessary anymore.
Definition: RClusterPool.hxx:84
ROOT::Experimental::Detail::RClusterPool::kWorkQueueLimit
static constexpr unsigned int kWorkQueueLimit
Maximum number of queued cluster requests for the I/O thread. A single request can span mutliple clus...
Definition: RClusterPool.hxx:59
ROOT::Experimental::Detail::RClusterPool::RUnzipItem::fPromise
std::promise< std::unique_ptr< RCluster > > fPromise
Definition: RClusterPool.hxx:73
ROOT::Experimental::Detail::RClusterPool
Managed a set of clusters containing compressed and packed pages.
Definition: RClusterPool.hxx:56
ROOT::Experimental::Detail::RClusterPool::fWindowPre
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present.
Definition: RClusterPool.hxx:96
ROOT::Experimental::Detail::RClusterPool::RUnzipItem::fCluster
std::unique_ptr< RCluster > fCluster
Definition: RClusterPool.hxx:72
ROOT::Experimental::Detail::RClusterPool::RReadItem::fClusterId
DescriptorId_t fClusterId
Definition: RClusterPool.hxx:65
ROOT::Experimental::Detail::RClusterPool::fThreadIo
std::thread fThreadIo
The I/O thread calls RPageSource::LoadCluster() asynchronously.
Definition: RClusterPool.hxx:121
ROOT::Experimental::Detail::RClusterPool::RClusterPool
RClusterPool(RPageSource &pageSource, unsigned int size)
Definition: RClusterPool.cxx:50
ROOT::Experimental::Detail::RClusterPool::RReadItem::fColumns
RPageSource::ColumnSet_t fColumns
Definition: RClusterPool.hxx:66
RNTupleUtil.hxx
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster
Clusters that are currently being processed by the pipeline.
Definition: RClusterPool.hxx:78
ROOT::Experimental::Detail::RClusterPool::RInFlightCluster::operator<
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Definition: RClusterPool.cxx:33
ROOT::Experimental::Detail::RClusterPool::fThreadUnzip
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
Definition: RClusterPool.hxx:125
ROOT::Experimental::Detail::RClusterPool::RReadItem
Request to load a subset of the columns of a particular cluster.
Definition: RClusterPool.hxx:63
ROOT::Experimental::Detail::RClusterPool::fPageSource
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
Definition: RClusterPool.hxx:94
ROOT::Experimental::Detail::RClusterPool::fPool
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
Definition: RClusterPool.hxx:100
ROOT::Experimental::Detail::RClusterPool::fLockUnzipQueue
std::mutex fLockUnzipQueue
The lock associated with the fCvHasUnzipWork conditional variable.
Definition: RClusterPool.hxx:112
ROOT::Experimental::Detail::RClusterPool::RClusterPool
RClusterPool(RPageSource &pageSource)
Definition: RClusterPool.hxx:145
ROOT::Experimental::Detail::RClusterPool::RUnzipItem
Request to decompress and if necessary unpack compressed pages.
Definition: RClusterPool.hxx:71
ROOT::Experimental::Detail::RClusterPool::GetCluster
RCluster * GetCluster(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
Definition: RClusterPool.cxx:220
ROOT::Experimental::kInvalidDescriptorId
constexpr DescriptorId_t kInvalidDescriptorId
Definition: RNTupleUtil.hxx:92
ROOT::Experimental::Detail::RClusterPool::RClusterPool
RClusterPool(const RClusterPool &other)=delete
ROOT::Experimental::Detail::RClusterPool::fCvHasUnzipWork
std::condition_variable fCvHasUnzipWork
Signals non-empty unzip work queue.
Definition: RClusterPool.hxx:114
ROOT::Experimental::Detail::RClusterPool::FindInPool
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
Definition: RClusterPool.cxx:156
RCluster.hxx
ROOT::Experimental::Detail::RClusterPool::fCvHasReadWork
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
Definition: RClusterPool.hxx:108
ROOT
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: EExecutionPolicy.hxx:4
ROOT::Experimental::Detail::RClusterPool::kDefaultPoolSize
static constexpr unsigned int kDefaultPoolSize
Definition: RClusterPool.hxx:143
ROOT::Experimental::Detail::RCluster
An in-memory subset of the packed and compressed pages of a cluster.
Definition: RCluster.hxx:154
ROOT::Experimental::Detail::RClusterPool::ExecReadClusters
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
Definition: RClusterPool.cxx:110
ROOT::Experimental::Detail::RClusterPool::WaitFor
RCluster * WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
Definition: RClusterPool.cxx:334