Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.hxx
Go to the documentation of this file.
1/// \file ROOT/RPageSinkBuf.hxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT_RPageSinkBuf
17#define ROOT_RPageSinkBuf
18
20#include <ROOT/RPageStorage.hxx>
21
22#include <deque>
23#include <functional>
24#include <iterator>
25#include <memory>
26#include <tuple>
27
28namespace ROOT {
29namespace Internal {
30
31// clang-format off
32/**
33\class ROOT::Internal::RPageSinkBuf
34\ingroup NTuple
35\brief Wrapper sink that coalesces cluster column page writes
36*/
37// clang-format on
38class RPageSinkBuf : public RPageSink {
39private:
40 /// A buffered column. The column is not responsible for RPage memory management (i.e. ReservePage),
41 /// which is handled by the enclosing RPageSinkBuf.
42 class RColumnBuf {
43 public:
44 struct RPageZipItem {
46 // Compression scratch buffer for fSealedPage.
47 std::unique_ptr<unsigned char[]> fBuf;
49 bool IsSealed() const { return fSealedPage != nullptr; }
50 };
51 public:
52 RColumnBuf() = default;
53 RColumnBuf(const RColumnBuf&) = delete;
54 RColumnBuf& operator=(const RColumnBuf&) = delete;
55 RColumnBuf(RColumnBuf&&) = default;
58
59 /// Returns a reference to the newly buffered page. The reference remains
60 /// valid until DropBufferedPages().
62 {
63 if (!fCol) {
65 }
66 // Safety: Insertion at the end of a deque never invalidates references
67 // to existing elements.
68 return fBufferedPages.emplace_back();
69 }
70 const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
71 bool IsEmpty() const { return fBufferedPages.empty(); }
72 bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
74
75 void DropBufferedPages();
76
77 // The returned reference points to a default-constructed RSealedPage. It can be used
78 // to fill in data after sealing.
80 {
81 return fSealedPages.emplace_back();
82 }
83
84 private:
86 /// Using a deque guarantees that element iterators are never invalidated
87 /// by appends to the end of the iterator by BufferPage.
88 std::deque<RPageZipItem> fBufferedPages;
89 /// Pages that have been already sealed by a concurrent task. A vector commit can be issued if all
90 /// buffered pages have been sealed.
91 /// Note that each RSealedPage refers to the same buffer as `fBufferedPages[i].fBuf` for some value of `i`, and
92 /// thus owned by RPageZipItem
94 };
95
96private:
97 /// I/O performance counters that get registered in fMetrics
106 std::unique_ptr<RCounters> fCounters;
107 /// The inner sink, responsible for actually performing I/O.
108 std::unique_ptr<RPageSink> fInnerSink;
109 /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
110 /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
111 std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
112 /// Vector of buffered column pages. Indexed by column id.
113 std::vector<RColumnBuf> fBufferedColumns;
114 /// Columns committed as suppressed are stored and passed to the inner sink at cluster commit
115 std::vector<ColumnHandle_t> fSuppressedColumns;
118
119 void ConnectFields(const std::vector<ROOT::RFieldBase *> &fields, ROOT::NTupleSize_t firstEntry);
120 void FlushClusterImpl(std::function<void(void)> FlushClusterFn);
121
122public:
123 explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
124 RPageSinkBuf(const RPageSinkBuf&) = delete;
128 ~RPageSinkBuf() override;
129
131
133
134 ROOT::NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
135
136 void InitImpl(ROOT::RNTupleModel &model) final;
139
141 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
143 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
144 std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final;
145 RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
146 void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
149
151}; // RPageSinkBuf
152
153} // namespace Internal
154} // namespace ROOT
155
156#endif
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
A thread-safe integral performance counter.
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RColumnBuf & operator=(const RColumnBuf &)=delete
RColumnBuf & operator=(RColumnBuf &&)=default
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPageZipItem & BufferPage(RPageStorage::ColumnHandle_t columnHandle)
Returns a reference to the newly buffered page.
RColumnBuf(const RColumnBuf &)=delete
const RPageStorage::SealedPageSequence_t & GetSealedPages() const
RPageStorage::ColumnHandle_t fCol
const RPageStorage::ColumnHandle_t & GetHandle() const
Wrapper sink that coalesces cluster column page writes.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::vector< ColumnHandle_t > fSuppressedColumns
Columns committed as suppressed are stored and passed to the inner sink at cluster commit.
RPageSinkBuf(RPageSinkBuf &&)=default
RPageSinkBuf & operator=(RPageSinkBuf &&)=default
std::unique_ptr< RCounters > fCounters
ROOT::DescriptorId_t fNColumns
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
ROOT::NTupleSize_t GetNEntries() const final
ROOT::DescriptorId_t fNFields
RPageSinkBuf(const RPageSinkBuf &)=delete
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
std::unique_ptr< ROOT::RNTupleModel > fInnerModel
The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
std::vector< RColumnBuf > fBufferedColumns
Vector of buffered column pages. Indexed by column id.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
RPageSinkBuf & operator=(const RPageSinkBuf &)=delete
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
Abstract interface to write data into an ntuple.
std::deque< RSealedPage > SealedPageSequence_t
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
Field specific extra type information from the header / extenstion header.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
ROOT::Experimental::Detail::RNTupleTickCounter< ROOT::Experimental::Detail::RNTupleAtomicCounter > & fTimeCpuZip
ROOT::Experimental::Detail::RNTupleTickCounter< ROOT::Experimental::Detail::RNTuplePlainCounter > & fTimeCpuCriticalSection
ROOT::Experimental::Detail::RNTupleAtomicCounter & fTimeWallZip
ROOT::Experimental::Detail::RNTuplePlainCounter & fParallelZip
ROOT::Experimental::Detail::RNTuplePlainCounter & fTimeWallCriticalSection
A sealed page contains the bytes of a page as written to storage (packed & compressed).