25 :
RPageSink(inner->GetNTupleName(), inner->GetWriteOptions())
26 , fMetrics(
"RPageSinkBuf")
27 , fInnerSink(std::move(inner))
31 "compressing pages in parallel")
40 fBufferedColumns.resize(fDescriptorBuilder.GetDescriptor().GetNColumns());
41 fInnerModel = model.
Clone();
42 fInnerSink->Create(*fInnerModel);
58 fBufferedColumns.at(columnHandle.
fId).BufferPage(columnHandle, bufPage);
59 if (!fTaskScheduler) {
62 fCounters->fParallelZip.SetValue(1);
65 zipItem->AllocateSealedPageBuf();
67 auto sealedPage = fBufferedColumns.at(columnHandle.
fId).RegisterSealedPage();
68 fTaskScheduler->AddTask([
this, zipItem, sealedPage, colId = columnHandle.
fId] {
69 *sealedPage = SealPage(zipItem->fPage, *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
70 GetWriteOptions().GetCompression(), zipItem->fBuf.get());
71 zipItem->fSealedPage = &(*sealedPage);
83 fInnerSink->CommitSealedPage(columnId, sealedPage);
93 fTaskScheduler->Wait();
94 fTaskScheduler->Reset();
98 bool singleCommitCall = std::all_of(fBufferedColumns.begin(), fBufferedColumns.end(),
99 [](
auto &bufColumn) { return bufColumn.HasSealedPagesOnly(); });
100 if (singleCommitCall) {
101 std::vector<RSealedPageGroup> toCommit;
102 toCommit.reserve(fBufferedColumns.size());
103 for (
auto &bufColumn : fBufferedColumns) {
104 const auto &sealedPages = bufColumn.GetSealedPages();
105 toCommit.emplace_back(bufColumn.GetHandle().fId, sealedPages.cbegin(), sealedPages.cend());
107 fInnerSink->CommitSealedPageV(toCommit);
109 for (
auto &bufColumn : fBufferedColumns) {
110 auto drained = bufColumn.DrainBufferedPages();
111 for (
auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained))
112 ReleasePage(bufPage.fPage);
114 return fInnerSink->CommitCluster(nEntries);
118 for (
auto &bufColumn : fBufferedColumns) {
122 if (bufColumn.HasSealedPagesOnly())
127 auto drained = bufColumn.DrainBufferedPages();
128 for (
auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained)) {
129 if (bufPage.IsSealed()) {
130 fInnerSink->CommitSealedPage(bufColumn.GetHandle().fId, *bufPage.fSealedPage);
132 fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
134 ReleasePage(bufPage.fPage);
137 return fInnerSink->CommitCluster(nEntries);
144 fInnerSink->CommitClusterGroup();
152 fInnerSink->CommitDataset();
158 return fInnerSink->ReservePage(columnHandle, nElements);
163 fInnerSink->ReleasePage(page);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
std::deque< RPageZipItem >::iterator iterator
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RSealedPage &sealedPage) final
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RCounters > fCounters
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
ClusterSize_t::ValueType GetNElements() const
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Base class for all ROOT issued exceptions.
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Generic information about the physical location of data.