Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RNTupleModel.hxx>
18#include <ROOT/RNTupleZip.hxx>
19#include <ROOT/RPageSinkBuf.hxx>
20
21#include <algorithm>
22#include <memory>
23
31
33{
34 fBufferedPages.clear();
35 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
36 // clean-up is required
37 fSealedPages.clear();
38}
39
42{
43 fMetrics = RNTupleMetrics("RPageSinkBuf");
44 fCounters = std::make_unique<RCounters>(
45 RCounters{*fMetrics.MakeCounter<RNTuplePlainCounter *>("ParallelZip", "", "compressing pages in parallel"),
46 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
47 *fMetrics.MakeCounter<RNTuplePlainCounter *>("timeWallCriticalSection", "ns",
48 "wall clock time spent in critical sections"),
50 "CPU time spent compressing"),
52 "timeCpuCriticalSection", "ns", "CPU time spent in critical section")});
53 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
54}
55
57{
58 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
59 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
60 // base class destructor is invoked.
61 WaitForAllTasks();
62}
63
66{
67 return ColumnHandle_t{fNColumns++, &column};
68}
69
70void ROOT::Internal::RPageSinkBuf::ConnectFields(const std::vector<ROOT::RFieldBase *> &fields,
72{
73 auto connectField = [&](ROOT::RFieldBase &f) {
74 // Field Zero would have id 0.
75 ++fNFields;
76 f.SetOnDiskId(fNFields);
77 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
78 };
79 for (auto *f : fields) {
81 for (auto &descendant : *f) {
83 }
84 }
85 fBufferedColumns.resize(fNColumns);
86}
87
89{
90 return fInnerSink->GetDescriptor();
91}
92
94{
95 ConnectFields(GetFieldZeroOfModel(model).GetMutableSubfields(), 0U);
96
97 fInnerModel = model.Clone();
98 fInnerSink->Init(*fInnerModel);
99}
100
103{
104 ConnectFields(changeset.fAddedFields, firstEntry);
105
106 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
107 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
108 auto cloneAddField = [&](const ROOT::RFieldBase *field) {
109 auto cloned = field->Clone(field->GetFieldName());
110 auto p = &(*cloned);
111 fInnerModel->AddField(std::move(cloned));
112 return p;
113 };
115 auto cloned = field->Clone(field->GetFieldName());
116 auto p = &(*cloned);
119 fieldMap[p] = &fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
120 auto targetIt = cloned->begin();
121 for (auto &f : *field)
122 fieldMap[&(*targetIt++)] =
123 &fInnerModel->GetConstField(projectedFields.GetSourceField(&f)->GetQualifiedFieldName());
124 GetProjectedFieldsOfModel(*fInnerModel).Add(std::move(cloned), fieldMap);
125 return p;
126 };
128 fInnerModel->Unfreeze();
129 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
130 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
131 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
132 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
133 fInnerModel->Freeze();
134 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
135}
136
138{
139 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
140 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
141 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
142}
143
148
150{
151 auto colId = columnHandle.fPhysicalId;
152 const auto &element = *columnHandle.fColumn->GetElement();
153
154 // Safety: References are guaranteed to be valid until the element is destroyed. In other words, all buffered page
155 // elements are valid until DropBufferedPages().
156 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
157 std::size_t maxSealedPageBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
158 // Do not allocate the buffer yet, in case of IMT we only need it once the task is started.
159 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
160
163 R__ASSERT(zipItem.fBuf);
164 };
166 // If the sealed page is smaller than the maximum size (with compression), allocate what is needed and copy the
167 // sealed page content to save memory.
168 auto sealedBufferSize = sealedPage.GetBufferSize();
171 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
172 zipItem.fBuf = std::move(buf);
173 sealedPage.SetBuffer(zipItem.fBuf.get());
174 }
175 };
176
177 if (!fTaskScheduler) {
178 allocateBuf();
179 // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
180 RSealPageConfig config;
181 config.fPage = &page;
182 config.fElement = &element;
183 config.fCompressionSettings = GetWriteOptions().GetCompression();
184 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
185 config.fAllowAlias = false;
186 config.fBuffer = zipItem.fBuf.get();
187 {
188 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
189 sealedPage = SealPage(config);
190 }
192 zipItem.fSealedPage = &sealedPage;
193 return;
194 }
195
196 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
197 zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
198 // make sure the page is aware of how many elements it will have
199 zipItem.fPage.GrowUnchecked(page.GetNElements());
200 memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
201
202 fCounters->fParallelZip.SetValue(1);
203 // Thread safety: Each thread works on a distinct zipItem which owns its
204 // compression buffer.
205 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
206 allocateBuf();
207 RSealPageConfig config;
208 config.fPage = &zipItem.fPage;
209 config.fElement = &element;
210 config.fCompressionSettings = GetWriteOptions().GetCompression();
211 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
212 // Make sure the page buffer is not aliased so that we can free the uncompressed page.
213 config.fAllowAlias = false;
214 config.fBuffer = zipItem.fBuf.get();
215 // TODO: Somehow expose the time spent in zipping via the metrics. Wall time is tricky because the tasks run
216 // in parallel...
217 sealedPage = SealPage(config);
219 zipItem.fSealedPage = &sealedPage;
220 // Release the uncompressed page. This works because the "page allocator must be thread-safe."
221 zipItem.fPage = RPage();
222 });
223}
224
226 const RSealedPage & /*sealedPage*/)
227{
228 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
229}
230
232 std::span<ROOT::Internal::RPageStorage::RSealedPageGroup> /*ranges*/)
233{
234 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
235}
236
237// We implement both StageCluster() and CommitCluster() because we can call CommitCluster() on the inner sink more
238// efficiently in a single critical section. For parallel writing, it also guarantees that we produce a fully sequential
239// file.
241{
242 WaitForAllTasks();
243
244 std::vector<RSealedPageGroup> toCommit;
245 toCommit.reserve(fBufferedColumns.size());
246 for (auto &bufColumn : fBufferedColumns) {
247 R__ASSERT(bufColumn.HasSealedPagesOnly());
248 const auto &sealedPages = bufColumn.GetSealedPages();
249 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
250 }
251
252 {
253 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
254 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
255 fInnerSink->CommitSealedPageV(toCommit);
256
257 for (auto handle : fSuppressedColumns)
258 fInnerSink->CommitSuppressedColumn(handle);
259 fSuppressedColumns.clear();
260
262 }
263
264 for (auto &bufColumn : fBufferedColumns)
265 bufColumn.DropBufferedPages();
266}
267
269{
270 std::uint64_t nbytes;
271 FlushClusterImpl([&] { nbytes = fInnerSink->CommitCluster(nNewEntries); });
272 return nbytes;
273}
274
281
283{
284 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
285 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
286 fInnerSink->CommitStagedClusters(clusters);
287}
288
290{
291 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
292 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
293 fInnerSink->CommitClusterGroup();
294}
295
297{
298 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
299 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
300 fInnerSink->CommitDataset();
301}
302
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
#define g(i)
Definition RSha256.hxx:105
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::unique_ptr< RCounters > fCounters
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
const std::string & GetNTupleName() const
Returns the NTuple name.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
std::unique_ptr< RNTupleModel > Clone() const
const_iterator begin() const
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
A sealed page contains the bytes of a page as written to storage (packed & compressed).