Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Max Orok <maxwellorok@gmail.com>
4/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
5/// \date 2021-03-17
6
7/*************************************************************************
8 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#include <ROOT/RNTupleModel.hxx>
16#include <ROOT/RNTupleUtils.hxx>
18#include <ROOT/RNTupleZip.hxx>
19#include <ROOT/RPageSinkBuf.hxx>
20
21#include <algorithm>
22#include <memory>
23
30
32{
33 fBufferedPages.clear();
34 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
35 // clean-up is required
36 fSealedPages.clear();
37}
38
41{
42 fMetrics = RNTupleMetrics("RPageSinkBuf");
43 fCounters = std::make_unique<RCounters>(
44 RCounters{*fMetrics.MakeCounter<RNTuplePlainCounter *>("ParallelZip", "", "compressing pages in parallel"),
45 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
46 *fMetrics.MakeCounter<RNTuplePlainCounter *>("timeWallCriticalSection", "ns",
47 "wall clock time spent in critical sections"),
49 "CPU time spent compressing"),
51 "timeCpuCriticalSection", "ns", "CPU time spent in critical section")});
52 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
53}
54
56{
57 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
58 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
59 // base class destructor is invoked.
60 WaitForAllTasks();
61}
62
65{
66 return ColumnHandle_t{fNColumns++, &column};
67}
68
69void ROOT::Internal::RPageSinkBuf::ConnectFields(const std::vector<ROOT::RFieldBase *> &fields,
71{
72 auto connectField = [&](ROOT::RFieldBase &f) {
73 // Field Zero would have id 0.
74 ++fNFields;
75 f.SetOnDiskId(fNFields);
76 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
77 };
78 for (auto *f : fields) {
80 for (auto &descendant : *f) {
82 }
83 }
84 fBufferedColumns.resize(fNColumns);
85}
86
88{
89 return fInnerSink->GetDescriptor();
90}
91
93{
94 ConnectFields(GetFieldZeroOfModel(model).GetMutableSubfields(), 0U);
95
96 fInnerModel = model.Clone();
97 fInnerSink->Init(*fInnerModel);
98}
99
102{
103 ConnectFields(changeset.fAddedFields, firstEntry);
104
105 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
106 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
107 auto cloneAddField = [&](const ROOT::RFieldBase *field) {
108 auto cloned = field->Clone(field->GetFieldName());
109 auto p = &(*cloned);
110
111 auto parent = field->GetParent();
112 assert(parent);
113 if (typeid(*parent) != typeid(RFieldZero)) {
114 auto &innerParent = fInnerModel->GetMutableField(parent->GetQualifiedFieldName());
115 assert(dynamic_cast<RRecordField *>(&innerParent));
116 AddItemToRecord(static_cast<RRecordField &>(innerParent), std::move(cloned));
117 } else {
118 fInnerModel->AddField(std::move(cloned));
119 }
120 return p;
121 };
123 auto cloned = field->Clone(field->GetFieldName());
124 auto p = &(*cloned);
127 fieldMap[p] = &fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
128 auto targetIt = cloned->begin();
129 for (auto &f : *field)
130 fieldMap[&(*targetIt++)] =
131 &fInnerModel->GetConstField(projectedFields.GetSourceField(&f)->GetQualifiedFieldName());
132 GetProjectedFieldsOfModel(*fInnerModel).Add(std::move(cloned), fieldMap);
133 return p;
134 };
136 fInnerModel->Unfreeze();
137 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
138 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
139 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
140 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
141 fInnerModel->Freeze();
142 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
143}
144
146{
147 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
148 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
149 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
150}
151
156
158{
159 auto colId = columnHandle.fPhysicalId;
160 const auto &element = *columnHandle.fColumn->GetElement();
161
162 // Safety: References are guaranteed to be valid until the element is destroyed. In other words, all buffered page
163 // elements are valid until DropBufferedPages().
164 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
165 std::size_t maxSealedPageBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
166 // Do not allocate the buffer yet, in case of IMT we only need it once the task is started.
167 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
168
171 R__ASSERT(zipItem.fBuf);
172 };
174 // If the sealed page is smaller than the maximum size (with compression), allocate what is needed and copy the
175 // sealed page content to save memory.
176 auto sealedBufferSize = sealedPage.GetBufferSize();
179 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
180 zipItem.fBuf = std::move(buf);
181 sealedPage.SetBuffer(zipItem.fBuf.get());
182 }
183 };
184
185 // If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough
186 // work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed
187 // fast enough, and heuristically reduces the memory usage, especially for big compression factors.
188 std::size_t bufferedUncompressed = fBufferedUncompressed.load();
189 bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize();
190
191 if (!fTaskScheduler || enoughWork) {
192 allocateBuf();
193 // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
194 RSealPageConfig config;
195 config.fPage = &page;
196 config.fElement = &element;
197 config.fCompressionSettings = GetWriteOptions().GetCompression();
198 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
199 config.fAllowAlias = false;
200 config.fBuffer = zipItem.fBuf.get();
201 {
202 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
203 sealedPage = SealPage(config);
204 }
206 zipItem.fSealedPage = &sealedPage;
207 return;
208 }
209
210 // We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed
211 // directly.
212 fBufferedUncompressed += page.GetNBytes();
213
214 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
215 zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
216 // make sure the page is aware of how many elements it will have
217 zipItem.fPage.GrowUnchecked(page.GetNElements());
218 assert(zipItem.fPage.GetNBytes() == page.GetNBytes());
219 memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
220
221 fCounters->fParallelZip.SetValue(1);
222 // Thread safety: Each thread works on a distinct zipItem which owns its
223 // compression buffer.
224 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
225 // The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived
226 // when we are done.
227 fBufferedUncompressed -= zipItem.fPage.GetNBytes();
228
229 allocateBuf();
230 RSealPageConfig config;
231 config.fPage = &zipItem.fPage;
232 config.fElement = &element;
233 config.fCompressionSettings = GetWriteOptions().GetCompression();
234 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
235 // Make sure the page buffer is not aliased so that we can free the uncompressed page.
236 config.fAllowAlias = false;
237 config.fBuffer = zipItem.fBuf.get();
238 // TODO: Somehow expose the time spent in zipping via the metrics. Wall time is tricky because the tasks run
239 // in parallel...
240 sealedPage = SealPage(config);
242 zipItem.fSealedPage = &sealedPage;
243 // Release the uncompressed page. This works because the "page allocator must be thread-safe."
244 zipItem.fPage = RPage();
245 });
246}
247
249 const RSealedPage & /*sealedPage*/)
250{
251 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
252}
253
255 std::span<ROOT::Internal::RPageStorage::RSealedPageGroup> /*ranges*/)
256{
257 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
258}
259
260// We implement both StageCluster() and CommitCluster() because we can call CommitCluster() on the inner sink more
261// efficiently in a single critical section. For parallel writing, it also guarantees that we produce a fully sequential
262// file.
264{
265 WaitForAllTasks();
266 assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed");
267
268 std::vector<RSealedPageGroup> toCommit;
269 toCommit.reserve(fBufferedColumns.size());
270 for (auto &bufColumn : fBufferedColumns) {
271 R__ASSERT(bufColumn.HasSealedPagesOnly());
272 const auto &sealedPages = bufColumn.GetSealedPages();
273 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
274 }
275
276 {
277 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
278 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
279 fInnerSink->CommitSealedPageV(toCommit);
280
281 for (auto handle : fSuppressedColumns)
282 fInnerSink->CommitSuppressedColumn(handle);
283 fSuppressedColumns.clear();
284
286 }
287
288 for (auto &bufColumn : fBufferedColumns)
289 bufColumn.DropBufferedPages();
290}
291
293{
294 std::uint64_t nbytes;
295 FlushClusterImpl([&] { nbytes = fInnerSink->CommitCluster(nNewEntries); });
296 return nbytes;
297}
298
305
307{
308 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
309 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
310 fInnerSink->CommitStagedClusters(clusters);
311}
312
314{
315 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
316 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
317 fInnerSink->CommitClusterGroup();
318}
319
321{
322 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
323 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
324 return fInnerSink->CommitDataset();
325}
326
331
332std::unique_ptr<ROOT::Internal::RPageSink>
334{
335 return fInnerSink->CloneAsHidden(name, opts);
336}
337
339{
340 fInnerSink->CommitAttributeSet(attrSetName, attrAnchorInfo);
341}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:322
#define f(i)
Definition RSha256.hxx:104
#define g(i)
Definition RSha256.hxx:105
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
char name[80]
Definition TGX11.cxx:148
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:37
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
std::unique_ptr< RPageSink > CloneAsHidden(std::string_view name, const RNTupleWriteOptions &opts) const final
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::unique_ptr< RCounters > fCounters
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void FlushClusterImpl(const std::function< void(void)> &FlushClusterFn)
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final
Adds the given anchor information (name + locator) into the main RNTuple's descriptor as an attribute...
RNTupleLink CommitDatasetImpl() final
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
const std::string & GetNTupleName() const
Returns the NTuple name.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The container field for an ntuple model, which itself has no physical representation.
Definition RField.hxx:58
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
std::unique_ptr< RNTupleModel > Clone() const
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
The field for an untyped record.
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
void AddItemToRecord(RRecordField &record, std::unique_ptr< RFieldBase > newItem)
Definition RField.cxx:641
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
A sealed page contains the bytes of a page as written to storage (packed & compressed).