Logo ROOT  
Reference Guide
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RField.hxx>
22#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RPagePool.hxx>
24#include <ROOT/RPageSinkBuf.hxx>
26#include <ROOT/RStringView.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <utility>
35
36
38{
39}
40
42{
43}
44
45
46//------------------------------------------------------------------------------
47
49{
50 for (unsigned i = 0; i < fIDs.size(); ++i) {
51 if (fIDs[i] == physicalColumnID) {
52 fRefCounters[i]++;
53 return;
54 }
55 }
56 fIDs.emplace_back(physicalColumnID);
57 fRefCounters.emplace_back(1);
58}
59
61{
62 for (unsigned i = 0; i < fIDs.size(); ++i) {
63 if (fIDs[i] == physicalColumnID) {
64 if (--fRefCounters[i] == 0) {
65 fIDs.erase(fIDs.begin() + i);
66 fRefCounters.erase(fRefCounters.begin() + i);
67 }
68 return;
69 }
70 }
71}
72
75{
77 for (const auto &id : fIDs)
78 result.insert(id);
79 return result;
80}
81
83 : RPageStorage(name), fMetrics(""), fOptions(options)
84{
85}
86
88{
89}
90
91std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSource::Create(
92 std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options)
93{
94 if (ntupleName.empty()) {
95 throw RException(R__FAIL("empty RNTuple name"));
96 }
97 if (location.empty()) {
98 throw RException(R__FAIL("empty storage location"));
99 }
100 if (location.find("daos://") == 0)
101#ifdef R__ENABLE_DAOS
102 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
103#else
104 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
105#endif
106
107 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
108}
109
112{
114 auto physicalId = GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex());
115 R__ASSERT(physicalId != kInvalidDescriptorId);
116 fActivePhysicalColumns.Insert(physicalId);
117 return ColumnHandle_t{physicalId, &column};
118}
119
121{
122 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId);
123}
124
126{
127 return GetSharedDescriptorGuard()->GetNEntries();
128}
129
131{
132 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
133}
134
136{
137 // TODO(jblomer) distinguish trees
138 return columnHandle.fPhysicalId;
139}
140
142{
143 if (fTaskScheduler)
144 UnzipClusterImpl(cluster);
145}
146
147
149 const RSealedPage &sealedPage, const RColumnElementBase &element)
150{
151 const auto bytesPacked = element.GetPackedSize(sealedPage.fNElements);
152 const auto pageSize = element.GetSize() * sealedPage.fNElements;
153
154 // TODO(jblomer): We might be able to do better memory handling for unsealing pages than a new malloc for every
155 // new page.
156 auto pageBuffer = std::make_unique<unsigned char[]>(bytesPacked);
157 if (sealedPage.fSize != bytesPacked) {
158 fDecompressor->Unzip(sealedPage.fBuffer, sealedPage.fSize, bytesPacked, pageBuffer.get());
159 } else {
160 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
161 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
162 // Note that usually pages are compressed.
163 memcpy(pageBuffer.get(), sealedPage.fBuffer, bytesPacked);
164 }
165
166 if (!element.IsMappable()) {
167 auto unpackedBuffer = new unsigned char[pageSize];
168 element.Unpack(unpackedBuffer, pageBuffer.get(), sealedPage.fNElements);
169 pageBuffer = std::unique_ptr<unsigned char []>(unpackedBuffer);
170 }
171
172 return pageBuffer;
173}
174
176{
177 fMetrics = RNTupleMetrics(prefix);
178 fCounters = std::unique_ptr<RCounters>(new RCounters{
179 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nReadV", "", "number of vector read requests"),
180 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nRead", "", "number of byte ranges read"),
181 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szReadPayload", "B", "volume read from storage (required)"),
182 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szReadOverhead", "B", "volume read from storage (overhead)"),
183 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szUnzip", "B", "volume after unzipping"),
184 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nClusterLoaded", "",
185 "number of partial clusters preloaded from storage"),
186 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPageLoaded", "", "number of pages loaded from storage"),
187 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPagePopulated", "", "number of populated pages"),
188 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallRead", "ns", "wall clock time spent reading"),
189 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
190 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*>("timeCpuRead", "ns", "CPU time spent reading"),
191 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*> ("timeCpuUnzip", "ns",
192 "CPU time spent decompressing"),
193 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwRead", "MB/s", "bandwidth compressed bytes read per second",
194 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
195 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
196 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
197 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
198 if (auto walltime = timeWallRead->GetValueAsInt()) {
199 double payload = szReadPayload->GetValueAsInt();
200 double overhead = szReadOverhead->GetValueAsInt();
201 // unit: bytes / nanosecond = GB/s
202 return {true, (1000. * (payload + overhead) / walltime)};
203 }
204 }
205 }
206 }
207 return {false, -1.};
208 }
209 ),
210 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second",
211 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
212 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
213 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
214 if (auto walltime = timeWallRead->GetValueAsInt()) {
215 double unzip = szUnzip->GetValueAsInt();
216 // unit: bytes / nanosecond = GB/s
217 return {true, 1000. * unzip / walltime};
218 }
219 }
220 }
221 return {false, -1.};
222 }
223 ),
224 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second",
225 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
226 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
227 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
228 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
229 double unzip = szUnzip->GetValueAsInt();
230 // unit: bytes / nanosecond = GB/s
231 return {true, 1000. * unzip / walltime};
232 }
233 }
234 }
235 return {false, -1.};
236 }
237 ),
238 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("rtReadEfficiency", "", "ratio of payload over all bytes read",
239 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
240 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
241 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
242 if (auto payload = szReadPayload->GetValueAsInt()) {
243 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
244 return {true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
245 }
246 }
247 }
248 return {false, -1.};
249 }
250 ),
251 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
252 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
253 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
254 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
255 if (auto unzip = szUnzip->GetValueAsInt()) {
256 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
257 }
258 }
259 }
260 return {false, -1.};
261 }
262 )
263 });
264}
265
266
267//------------------------------------------------------------------------------
268
269
271 : RPageStorage(name), fMetrics(""), fOptions(options.Clone())
272{
273}
274
276{
277}
278
279std::unique_ptr<ROOT::Experimental::Detail::RPageSink> ROOT::Experimental::Detail::RPageSink::Create(
280 std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options)
281{
282 if (ntupleName.empty()) {
283 throw RException(R__FAIL("empty RNTuple name"));
284 }
285 if (location.empty()) {
286 throw RException(R__FAIL("empty storage location"));
287 }
288 std::unique_ptr<ROOT::Experimental::Detail::RPageSink> realSink;
289 if (location.find("daos://") == 0) {
290#ifdef R__ENABLE_DAOS
291 realSink = std::make_unique<RPageSinkDaos>(ntupleName, location, options);
292#else
293 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
294#endif
295 } else {
296 realSink = std::make_unique<RPageSinkFile>(ntupleName, location, options);
297 }
298
299 if (options.GetUseBufferedWrite())
300 return std::make_unique<RPageSinkBuf>(std::move(realSink));
301 return realSink;
302}
303
306{
307 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
308 fDescriptorBuilder.AddColumn(columnId, columnId, fieldId, column.GetModel(), column.GetIndex());
309 return ColumnHandle_t{columnId, &column};
310}
311
313{
314 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
315 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
316
317 auto &fieldZero = *model.GetFieldZero();
318 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
319 fieldZero.SetOnDiskId(0);
320 for (auto &f : fieldZero) {
321 auto fieldId = descriptor.GetNFields();
322 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
323 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
324 f.SetOnDiskId(fieldId);
325 f.ConnectPageSink(*this); // issues in turn one or several calls to AddColumn()
326 }
327
329 for (auto &f : *model.GetProjectedFields().GetFieldZero()) {
330 auto fieldId = descriptor.GetNFields();
331 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
332 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
333 f.SetOnDiskId(fieldId);
334 auto sourceFieldId = model.GetProjectedFields().GetSourceField(&f)->GetOnDiskId();
335 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
336 auto targetId = descriptor.GetNLogicalColumns();
337 fDescriptorBuilder.AddColumn(targetId, source.GetLogicalId(), fieldId, source.GetModel(), source.GetIndex());
338 }
339 }
340
341 auto nColumns = descriptor.GetNPhysicalColumns();
342 for (DescriptorId_t i = 0; i < nColumns; ++i) {
344 columnRange.fPhysicalColumnId = i;
345 columnRange.fFirstElementIndex = 0;
346 columnRange.fNElements = 0;
347 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
348 fOpenColumnRanges.emplace_back(columnRange);
350 pageRange.fPhysicalColumnId = i;
351 fOpenPageRanges.emplace_back(std::move(pageRange));
352 }
353
354 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(nullptr, descriptor);
355 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
356 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(buffer.get(), descriptor);
357
358 CreateImpl(model, buffer.get(), fSerializationContext.GetHeaderSize());
359}
360
361
363{
364 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
365
367 pageInfo.fNElements = page.GetNElements();
368 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
369 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
370}
371
373 ROOT::Experimental::DescriptorId_t physicalColumnId,
375{
376 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.fNElements;
377
379 pageInfo.fNElements = sealedPage.fNElements;
380 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
381 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
382}
383
384std::vector<ROOT::Experimental::RNTupleLocator>
385ROOT::Experimental::Detail::RPageSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
386{
387 std::vector<ROOT::Experimental::RNTupleLocator> locators;
388 for (auto &range : ranges) {
389 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
390 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
391 }
392 return locators;
393}
394
395void ROOT::Experimental::Detail::RPageSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
396{
397 auto locators = CommitSealedPageVImpl(ranges);
398 unsigned i = 0;
399
400 for (auto &range : ranges) {
401 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
402 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->fNElements;
403
405 pageInfo.fNElements = sealedPageIt->fNElements;
406 pageInfo.fLocator = locators[i++];
407 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
408 }
409 }
410}
411
413{
414 auto nbytes = CommitClusterImpl(nEntries);
415
416 R__ASSERT((nEntries - fPrevClusterNEntries) < ClusterSize_t(-1));
417 auto nEntriesInCluster = ClusterSize_t(nEntries - fPrevClusterNEntries);
418 RClusterDescriptorBuilder clusterBuilder(fDescriptorBuilder.GetDescriptor().GetNClusters(), fPrevClusterNEntries,
419 nEntriesInCluster);
420 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
422 fullRange.fPhysicalColumnId = i;
423 std::swap(fullRange, fOpenPageRanges[i]);
424 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
425 fOpenColumnRanges[i].fCompressionSettings, fullRange);
426 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
427 fOpenColumnRanges[i].fNElements = 0;
428 }
429 fDescriptorBuilder.AddClusterWithDetails(clusterBuilder.MoveDescriptor().Unwrap());
430 fPrevClusterNEntries = nEntries;
431 return nbytes;
432}
433
435{
436 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
437
438 const auto nClusters = descriptor.GetNClusters();
439 std::vector<DescriptorId_t> physClusterIDs;
440 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
441 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
442 }
443
444 auto szPageList =
445 Internal::RNTupleSerializer::SerializePageListV1(nullptr, descriptor, physClusterIDs, fSerializationContext);
446 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
447 Internal::RNTupleSerializer::SerializePageListV1(bufPageList.get(), descriptor, physClusterIDs,
448 fSerializationContext);
449
450 const auto clusterGroupId = descriptor.GetNClusterGroups();
451 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
453 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
454 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
455 cgBuilder.AddCluster(i);
456 }
457 fDescriptorBuilder.AddClusterGroup(std::move(cgBuilder));
458 fSerializationContext.MapClusterGroupId(clusterGroupId);
459
460 fNextClusterInGroup = nClusters;
461}
462
464{
465 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
466
467 auto szFooter = Internal::RNTupleSerializer::SerializeFooterV1(nullptr, descriptor, fSerializationContext);
468 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
469 Internal::RNTupleSerializer::SerializeFooterV1(bufFooter.get(), descriptor, fSerializationContext);
470
471 CommitDatasetImpl(bufFooter.get(), szFooter);
472}
473
476 const RColumnElementBase &element, int compressionSetting, void *buf)
477{
478 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(page.GetBuffer());
479 bool isAdoptedBuffer = true;
480 auto packedBytes = page.GetNBytes();
481
482 if (!element.IsMappable()) {
483 packedBytes = element.GetPackedSize(page.GetNElements());
484 pageBuf = new unsigned char[packedBytes];
485 isAdoptedBuffer = false;
486 element.Pack(pageBuf, page.GetBuffer(), page.GetNElements());
487 }
488 auto zippedBytes = packedBytes;
489
490 if ((compressionSetting != 0) || !element.IsMappable()) {
491 zippedBytes = RNTupleCompressor::Zip(pageBuf, packedBytes, compressionSetting, buf);
492 if (!isAdoptedBuffer)
493 delete[] pageBuf;
494 pageBuf = reinterpret_cast<unsigned char *>(buf);
495 isAdoptedBuffer = true;
496 }
497
498 R__ASSERT(isAdoptedBuffer);
499
500 return RSealedPage{pageBuf, static_cast<std::uint32_t>(zippedBytes), page.GetNElements()};
501}
502
505 const RPage &page, const RColumnElementBase &element, int compressionSetting)
506{
507 R__ASSERT(fCompressor);
508 return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
509}
510
512{
513 fMetrics = RNTupleMetrics(prefix);
514 fCounters = std::unique_ptr<RCounters>(new RCounters{
515 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPageCommitted", "", "number of pages committed to storage"),
516 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szWritePayload", "B", "volume written for committed pages"),
517 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szZip", "B", "volume before zipping"),
518 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallWrite", "ns", "wall clock time spent writing"),
519 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallZip", "ns", "wall clock time spent compressing"),
520 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*>("timeCpuWrite", "ns", "CPU time spent writing"),
521 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*> ("timeCpuZip", "ns",
522 "CPU time spent compressing")
523 });
524}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition: RError.hxx:303
#define f(i)
Definition: RSha256.hxx:104
#define R__ASSERT(e)
Definition: TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition: TGX11.cxx:110
An in-memory subset of the packed and compressed pages of a cluster.
Definition: RCluster.hxx:155
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition: RCluster.hxx:157
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
std::size_t GetPackedSize(std::size_t nElements) const
const RColumnModel & GetModel() const
Definition: RColumn.hxx:310
std::uint32_t GetIndex() const
Definition: RColumn.hxx:311
void SetOnDiskId(DescriptorId_t id)
Definition: RField.hxx:341
DescriptorId_t GetOnDiskId() const
Definition: RField.hxx:340
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
Definition: RNTupleZip.hxx:63
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
void CommitDataset()
Finalize the current cluster and the entrire data set.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitPage(ColumnHandle_t columnHandle, const RPage &page)
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the file name (location)
std::uint64_t CommitCluster(NTupleSize_t nEntries)
Finalize the current cluster and create a new one for the following data.
void CommitClusterGroup()
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)
Write a preprocessed page to storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< unsigned char[]> UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element)
Helper for unstreaming a page.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
Common functionality of an ntuple storage for both reading and writing.
A page is a slice of a column that is mapped into memory.
Definition: RPage.hxx:41
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition: RPage.hxx:81
std::uint32_t GetNElements() const
Definition: RPage.hxx:83
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & PageListLength(std::uint32_t pageListLength)
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
Base class for all ROOT issued exceptions.
Definition: RError.hxx:78
static RFieldDescriptorBuilder FromField(const Detail::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
const Detail::RFieldBase * GetSourceField(const Detail::RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::string GetDescription() const
RFieldZero * GetFieldZero() const
const RProjectedFields & GetProjectedFields() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
basic_string_view< char > string_view
void swap(RDirectoryEntry &e1, RDirectoryEntry &e2) noexcept
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
Definition: RNTupleUtil.hxx:48
RClusterSize ClusterSize_t
Definition: RNTupleUtil.hxx:63
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
Definition: RNTupleUtil.hxx:96
constexpr DescriptorId_t kInvalidDescriptorId
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
NTupleSize_t fFirstElementIndex
A 64bit element index.
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Records the parition of data into pages for a particular column in a particular cluster.