Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \date 2021-03-17
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
18#include <ROOT/RNTupleModel.hxx>
19#include <ROOT/RNTupleZip.hxx>
20#include <ROOT/RPageSinkBuf.hxx>
21
22#include <algorithm>
23
25 : RPageSink(inner->GetNTupleName(), inner->GetWriteOptions())
26 , fMetrics("RPageSinkBuf")
27 , fInnerSink(std::move(inner))
28{
29 fCounters = std::unique_ptr<RCounters>(new RCounters{
30 *fMetrics.MakeCounter<RNTuplePlainCounter*>("ParallelZip", "",
31 "compressing pages in parallel")
32 });
33 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
34}
35
37 unsigned char * /* serializedHeader */,
38 std::uint32_t /* length */)
39{
40 fBufferedColumns.resize(fDescriptorBuilder.GetDescriptor().GetNColumns());
41 fInnerModel = model.Clone();
42 fInnerSink->Create(*fInnerModel);
43}
44
47{
48 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
49 RPage bufPage = ReservePage(columnHandle, page.GetNElements());
50 // make sure the page is aware of how many elements it will have
51 bufPage.GrowUnchecked(page.GetNElements());
52 memcpy(bufPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
53 // Safety: RColumnBuf::iterators are guaranteed to be valid until the
54 // element is destroyed. In other words, all buffered page iterators are
55 // valid until the return value of DrainBufferedPages() goes out of scope in
56 // CommitCluster().
57 RColumnBuf::iterator zipItem =
58 fBufferedColumns.at(columnHandle.fId).BufferPage(columnHandle, bufPage);
59 if (!fTaskScheduler) {
60 return RNTupleLocator{};
61 }
62 fCounters->fParallelZip.SetValue(1);
63 // Thread safety: Each thread works on a distinct zipItem which owns its
64 // compression buffer.
65 zipItem->AllocateSealedPageBuf();
66 R__ASSERT(zipItem->fBuf);
67 auto sealedPage = fBufferedColumns.at(columnHandle.fId).RegisterSealedPage();
68 fTaskScheduler->AddTask([this, zipItem, sealedPage, colId = columnHandle.fId] {
69 *sealedPage = SealPage(zipItem->fPage, *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
70 GetWriteOptions().GetCompression(), zipItem->fBuf.get());
71 zipItem->fSealedPage = &(*sealedPage);
72 });
73
74 // we're feeding bad locators to fOpenPageRanges but it should not matter
75 // because they never get written out
76 return RNTupleLocator{};
77}
78
81 DescriptorId_t columnId, const RSealedPage &sealedPage)
82{
83 fInnerSink->CommitSealedPage(columnId, sealedPage);
84 // we're feeding bad locators to fOpenPageRanges but it should not matter
85 // because they never get written out
86 return RNTupleLocator{};
87}
88
89std::uint64_t
91{
92 if (fTaskScheduler) {
93 fTaskScheduler->Wait();
94 fTaskScheduler->Reset();
95 }
96
97 // If we have only sealed pages in all buffered columns, commit them in a single `CommitSealedPageV()` call
98 bool singleCommitCall = std::all_of(fBufferedColumns.begin(), fBufferedColumns.end(),
99 [](auto &bufColumn) { return bufColumn.HasSealedPagesOnly(); });
100 if (singleCommitCall) {
101 std::vector<RSealedPageGroup> toCommit;
102 toCommit.reserve(fBufferedColumns.size());
103 for (auto &bufColumn : fBufferedColumns) {
104 const auto &sealedPages = bufColumn.GetSealedPages();
105 toCommit.emplace_back(bufColumn.GetHandle().fId, sealedPages.cbegin(), sealedPages.cend());
106 }
107 fInnerSink->CommitSealedPageV(toCommit);
108
109 for (auto &bufColumn : fBufferedColumns) {
110 auto drained = bufColumn.DrainBufferedPages();
111 for (auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained))
112 ReleasePage(bufPage.fPage);
113 }
114 return fInnerSink->CommitCluster(nEntries);
115 }
116
117 // Otherwise, try to do it per column
118 for (auto &bufColumn : fBufferedColumns) {
119 // In practice, either all (see above) or none of the buffered pages have been sealed, depending on whether
120 // a task scheduler is available. The rare condition of a few columns consisting only of sealed pages should
121 // not happen unless the API is misused.
122 if (bufColumn.HasSealedPagesOnly())
123 throw RException(R__FAIL("only a few columns have all pages sealed"));
124
125 // Slow path: if the buffered column contains both sealed and unsealed pages, commit them one by one.
126 // TODO(jalopezg): coalesce contiguous sealed pages and commit via `CommitSealedPageV()`.
127 auto drained = bufColumn.DrainBufferedPages();
128 for (auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained)) {
129 if (bufPage.IsSealed()) {
130 fInnerSink->CommitSealedPage(bufColumn.GetHandle().fId, *bufPage.fSealedPage);
131 } else {
132 fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
133 }
134 ReleasePage(bufPage.fPage);
135 }
136 }
137 return fInnerSink->CommitCluster(nEntries);
138}
139
142 std::uint32_t /* length */)
143{
144 fInnerSink->CommitClusterGroup();
145 // We're not using that locator any further, so it is safe to return a dummy one
146 return RNTupleLocator{};
147}
148
149void ROOT::Experimental::Detail::RPageSinkBuf::CommitDatasetImpl(unsigned char * /* serializedFooter */,
150 std::uint32_t /* length */)
151{
152 fInnerSink->CommitDataset();
153}
154
157{
158 return fInnerSink->ReservePage(columnHandle, nElements);
159}
160
162{
163 fInnerSink->ReleasePage(page);
164}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:303
#define R__ASSERT(e)
Definition TError.h:117
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
std::deque< RPageZipItem >::iterator iterator
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RSealedPage &sealedPage) final
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RCounters > fCounters
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
ClusterSize_t::ValueType GetNElements() const
Definition RPage.hxx:83
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition RPage.hxx:109
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Generic information about the physical location of data.