Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RFieldBase.hxx>
22#include <ROOT/RNTupleModel.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <atomic>
35#include <cassert>
36#include <memory>
37#include <string_view>
38#include <unordered_map>
39#include <utility>
40
42 : fMetrics(""), fPageAllocator(std::make_unique<RPageAllocatorHeap>()), fNTupleName(name)
43{
44}
45
47
49{
50 if (!fHasChecksum)
51 return;
52
53 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
54 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
55 std::uint64_t xxhash3;
56 RNTupleSerializer::SerializeXxHash3(charBuf, GetDataSize(), xxhash3, checksumBuf);
57}
58
61{
62 if (!fHasChecksum)
64
65 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
66 if (!success)
67 return R__FAIL("page checksum verification failed, data corruption detected");
69}
70
72{
73 if (!fHasChecksum)
74 return R__FAIL("invalid attempt to extract non-existing page checksum");
75
76 assert(fBufferSize >= kNBytesPageChecksum);
77 std::uint64_t checksum;
79 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
80 return checksum;
81}
82
83//------------------------------------------------------------------------------
84
86{
87 for (unsigned i = 0; i < fIDs.size(); ++i) {
88 if (fIDs[i] == physicalColumnID) {
89 fRefCounters[i]++;
90 return;
91 }
92 }
93 fIDs.emplace_back(physicalColumnID);
94 fRefCounters.emplace_back(1);
95}
96
98{
99 for (unsigned i = 0; i < fIDs.size(); ++i) {
100 if (fIDs[i] == physicalColumnID) {
101 if (--fRefCounters[i] == 0) {
102 fIDs.erase(fIDs.begin() + i);
103 fRefCounters.erase(fRefCounters.begin() + i);
104 }
105 return;
106 }
107 }
108}
109
112{
114 for (const auto &id : fIDs)
115 result.insert(id);
116 return result;
117}
118
120{
121 if (fFirstEntry == kInvalidNTupleIndex) {
122 /// Entry range unset, we assume that the entry range covers the complete source
123 return true;
124 }
125
126 if (clusterDesc.GetNEntries() == 0)
127 return true;
128 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
129 return false;
130 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
131 return false;
132 return true;
133}
134
136 : RPageStorage(name), fOptions(options)
137{
138}
139
141
142std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
143ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
144 const RNTupleReadOptions &options)
145{
146 if (ntupleName.empty()) {
147 throw RException(R__FAIL("empty RNTuple name"));
148 }
149 if (location.empty()) {
150 throw RException(R__FAIL("empty storage location"));
151 }
152 if (location.find("daos://") == 0)
153#ifdef R__ENABLE_DAOS
154 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
155#else
156 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
157#endif
158
159 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
160}
161
164{
166 auto physicalId =
167 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
168 R__ASSERT(physicalId != kInvalidDescriptorId);
169 fActivePhysicalColumns.Insert(physicalId);
170 return ColumnHandle_t{physicalId, &column};
171}
172
174{
175 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId);
176}
177
179{
180 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
181 throw RException(R__FAIL("invalid entry range"));
182 }
183 fEntryRange = range;
184}
185
187{
188 if (!fHasStructure)
189 LoadStructureImpl();
190 fHasStructure = true;
191}
192
194{
195 LoadStructure();
196 if (!fIsAttached)
197 GetExclDescriptorGuard().MoveIn(AttachImpl());
198 fIsAttached = true;
199}
200
201std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSource::Clone() const
202{
203 auto clone = CloneImpl();
204 if (fIsAttached) {
205 clone->GetExclDescriptorGuard().MoveIn(std::move(*GetSharedDescriptorGuard()->Clone()));
206 clone->fHasStructure = true;
207 clone->fIsAttached = true;
208 }
209 return clone;
210}
211
213{
214 return GetSharedDescriptorGuard()->GetNEntries();
215}
216
218{
219 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
220}
221
223{
224 // TODO(jblomer) distinguish trees
225 return columnHandle.fPhysicalId;
226}
227
229{
230 if (fTaskScheduler)
231 UnzipClusterImpl(cluster);
232}
233
235{
236 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
237
238 const auto clusterId = cluster->GetId();
239 auto descriptorGuard = GetSharedDescriptorGuard();
240 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
241
242 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
243
244 std::atomic<bool> foundChecksumFailure{false};
245
246 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
247 for (const auto columnId : columnsInCluster) {
248 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
249
250 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetType()));
251
252 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
253 std::uint64_t pageNo = 0;
254 std::uint64_t firstInPage = 0;
255 for (const auto &pi : pageRange.fPageInfos) {
256 ROnDiskPage::Key key(columnId, pageNo);
257 auto onDiskPage = cluster->GetOnDiskPage(key);
258 RSealedPage sealedPage;
259 sealedPage.SetNElements(pi.fNElements);
260 sealedPage.SetHasChecksum(pi.fHasChecksum);
261 sealedPage.SetBufferSize(pi.fLocator.fBytesOnStorage + pi.fHasChecksum * kNBytesPageChecksum);
262 sealedPage.SetBuffer(onDiskPage->GetAddress());
263 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
264
265 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
266 &foundChecksumFailure,
267 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
268 auto rv = UnsealPage(sealedPage, *element, columnId);
269 if (!rv) {
270 foundChecksumFailure = true;
271 return;
272 }
273 auto newPage = rv.Unwrap();
274 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
275
276 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
277 fPagePool.PreloadPage(std::move(newPage));
278 };
279
280 fTaskScheduler->AddTask(taskFunc);
281
282 firstInPage += pi.fNElements;
283 pageNo++;
284 } // for all pages in column
285 } // for all columns in cluster
286
287 fCounters->fNPageUnsealed.Add(cluster->GetNOnDiskPages());
288
289 fTaskScheduler->Wait();
290
291 if (foundChecksumFailure) {
292 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
293 }
294}
295
297 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
298 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
299{
300 auto descriptorGuard = GetSharedDescriptorGuard();
301 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
302
303 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
304 if (clusterDesc.GetColumnRange(physicalColumnId).fIsSuppressed)
305 continue;
306
307 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
308 NTupleSize_t pageNo = 0;
309 for (const auto &pageInfo : pageRange.fPageInfos) {
310 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
311 pageZeroMap.Register(
312 ROnDiskPage::Key{physicalColumnId, pageNo},
313 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.fBytesOnStorage));
314 } else {
315 perPageFunc(physicalColumnId, pageNo, pageInfo);
316 }
317 ++pageNo;
318 }
319 }
320}
321
324{
325 const auto columnId = columnHandle.fPhysicalId;
326 auto cachedPageRef = fPagePool.GetPage(columnId, globalIndex);
327 if (!cachedPageRef.Get().IsNull())
328 return cachedPageRef;
329
330 std::uint64_t idxInCluster;
331 RClusterInfo clusterInfo;
332 {
333 auto descriptorGuard = GetSharedDescriptorGuard();
334 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
335
336 if (clusterInfo.fClusterId == kInvalidDescriptorId)
337 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
338
339 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
340 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
341 if (columnRange.fIsSuppressed)
342 return RPageRef();
343
344 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
345 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
346 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
347 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
348 }
349
350 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
351}
352
355{
356 const auto clusterId = clusterIndex.GetClusterId();
357 const auto idxInCluster = clusterIndex.GetIndex();
358 const auto columnId = columnHandle.fPhysicalId;
359 auto cachedPageRef = fPagePool.GetPage(columnId, clusterIndex);
360 if (!cachedPageRef.Get().IsNull())
361 return cachedPageRef;
362
363 if (clusterId == kInvalidDescriptorId)
364 throw RException(R__FAIL("entry out of bounds"));
365
366 RClusterInfo clusterInfo;
367 {
368 auto descriptorGuard = GetSharedDescriptorGuard();
369 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
370 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
371 if (columnRange.fIsSuppressed)
372 return RPageRef();
373
374 clusterInfo.fClusterId = clusterId;
375 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
376 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
377 }
378
379 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
380}
381
383{
384 fMetrics = Detail::RNTupleMetrics(prefix);
385 fCounters = std::make_unique<RCounters>(RCounters{
386 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
387 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
388 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
389 "volume read from storage (required)"),
390 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
391 "volume read from storage (overhead)"),
392 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
393 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
394 "number of partial clusters preloaded from storage"),
395 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
396 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageUnsealed", "",
397 "number of pages unzipped and decoded"),
398 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
399 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
400 "wall clock time spent decompressing"),
401 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
402 "CPU time spent reading"),
403 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
404 "CPU time spent decompressing"),
405 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
406 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
407 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
408 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
409 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
410 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
411 if (auto walltime = timeWallRead->GetValueAsInt()) {
412 double payload = szReadPayload->GetValueAsInt();
413 double overhead = szReadOverhead->GetValueAsInt();
414 // unit: bytes / nanosecond = GB/s
415 return {true, (1000. * (payload + overhead) / walltime)};
416 }
417 }
418 }
419 }
420 return {false, -1.};
421 }),
422 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
423 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
424 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
425 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
426 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
427 if (auto walltime = timeWallRead->GetValueAsInt()) {
428 double unzip = szUnzip->GetValueAsInt();
429 // unit: bytes / nanosecond = GB/s
430 return {true, 1000. * unzip / walltime};
431 }
432 }
433 }
434 return {false, -1.};
435 }),
436 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
437 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
438 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
439 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
440 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
441 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
442 double unzip = szUnzip->GetValueAsInt();
443 // unit: bytes / nanosecond = GB/s
444 return {true, 1000. * unzip / walltime};
445 }
446 }
447 }
448 return {false, -1.};
449 }),
450 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
451 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
452 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
453 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
454 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
455 if (auto payload = szReadPayload->GetValueAsInt()) {
456 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
457 return {true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
458 }
459 }
460 }
461 return {false, -1.};
462 }),
463 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
464 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
465 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
466 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
467 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
468 if (auto unzip = szUnzip->GetValueAsInt()) {
469 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
470 }
471 }
472 }
473 return {false, -1.};
474 })});
475}
476
479 DescriptorId_t physicalColumnId)
480{
481 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
482 // large enough to hold `sealedPage.fNElements`
483 if (sealedPage.GetBuffer() == RPage::GetPageZeroBuffer()) {
484 auto page = RPage::MakePageZero(physicalColumnId, element.GetSize());
485 page.GrowUnchecked(sealedPage.GetNElements());
486 return page;
487 }
488
489 auto rv = sealedPage.VerifyChecksumIfEnabled();
490 if (!rv)
491 return R__FORWARD_ERROR(rv);
492
493 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
494 auto page = fPageAllocator->NewPage(physicalColumnId, element.GetSize(), sealedPage.GetNElements());
495 if (sealedPage.GetDataSize() != bytesPacked) {
496 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), bytesPacked, page.GetBuffer());
497 } else {
498 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
499 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
500 // Note that usually pages are compressed.
501 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
502 }
503
504 if (!element.IsMappable()) {
505 auto tmp = fPageAllocator->NewPage(physicalColumnId, element.GetSize(), sealedPage.GetNElements());
506 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
507 page = std::move(tmp);
508 }
509
510 page.GrowUnchecked(sealedPage.GetNElements());
511 return page;
512}
513
514//------------------------------------------------------------------------------
515
517 : RPageStorage(name), fOptions(options.Clone())
518{
519}
520
522
525{
526 assert(config.fPage);
527 assert(config.fElement);
528 assert(config.fBuffer);
529
530 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
531 bool isAdoptedBuffer = true;
532 auto nBytesPacked = config.fPage->GetNBytes();
533 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
534
535 if (!config.fElement->IsMappable()) {
536 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
537 pageBuf = new unsigned char[nBytesPacked];
538 isAdoptedBuffer = false;
539 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
540 }
541 auto nBytesZipped = nBytesPacked;
542
543 if ((config.fCompressionSetting != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
544 config.fWriteChecksum) {
545 nBytesZipped = RNTupleCompressor::Zip(pageBuf, nBytesPacked, config.fCompressionSetting, config.fBuffer);
546 if (!isAdoptedBuffer)
547 delete[] pageBuf;
548 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
549 isAdoptedBuffer = true;
550 }
551
552 R__ASSERT(isAdoptedBuffer);
553
554 RSealedPage sealedPage{pageBuf, static_cast<std::uint32_t>(nBytesZipped + nBytesChecksum),
555 config.fPage->GetNElements(), config.fWriteChecksum};
556 sealedPage.ChecksumIfEnabled();
557
558 return sealedPage;
559}
560
563{
564 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
565 if (fSealPageBuffer.size() < nBytes)
566 fSealPageBuffer.resize(nBytes);
567
568 RSealPageConfig config;
569 config.fPage = &page;
570 config.fElement = &element;
571 config.fCompressionSetting = GetWriteOptions().GetCompression();
572 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
573 config.fAllowAlias = true;
574 config.fBuffer = fSealPageBuffer.data();
575
576 return SealPage(config);
577}
578
580{
581 for (const auto &cb : fOnDatasetCommitCallbacks)
582 cb(*this);
583 CommitDatasetImpl();
584}
585
588{
589 R__ASSERT(nElements > 0);
590 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
591 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
592}
593
594//------------------------------------------------------------------------------
595
596std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
597ROOT::Experimental::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
598 const RNTupleWriteOptions &options)
599{
600 if (ntupleName.empty()) {
601 throw RException(R__FAIL("empty RNTuple name"));
602 }
603 if (location.empty()) {
604 throw RException(R__FAIL("empty storage location"));
605 }
606 if (location.find("daos://") == 0) {
607#ifdef R__ENABLE_DAOS
608 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
609#else
610 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
611#endif
612 }
613
614 // Otherwise assume that the user wants us to create a file.
615 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
616}
617
619 const RNTupleWriteOptions &options)
620 : RPageSink(name, options)
621{
622}
623
625
628{
629 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
630 RColumnDescriptorBuilder columnBuilder;
631 columnBuilder.LogicalColumnId(columnId)
632 .PhysicalColumnId(columnId)
633 .FieldId(fieldId)
635 .Type(column.GetType())
636 .Index(column.GetIndex())
639 // For late model extension, we assume that the primary column representation is the active one for the
640 // deferred range. All other representations are suppressed.
641 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
642 columnBuilder.SetSuppressedDeferred();
643 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
644 return ColumnHandle_t{columnId, &column};
645}
646
648 NTupleSize_t firstEntry)
649{
650 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
651 auto addField = [&](RFieldBase &f) {
652 auto fieldId = descriptor.GetNFields();
653 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
654 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
655 f.SetOnDiskId(fieldId);
656 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
657 };
658 auto addProjectedField = [&](RFieldBase &f) {
659 auto fieldId = descriptor.GetNFields();
660 auto sourceFieldId = changeset.fModel.GetProjectedFields().GetSourceField(&f)->GetOnDiskId();
661 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
662 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
663 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
664 f.SetOnDiskId(fieldId);
665 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
666 auto targetId = descriptor.GetNLogicalColumns();
667 RColumnDescriptorBuilder columnBuilder;
668 columnBuilder.LogicalColumnId(targetId)
669 .PhysicalColumnId(source.GetLogicalId())
670 .FieldId(fieldId)
671 .BitsOnStorage(source.GetBitsOnStorage())
672 .Type(source.GetType())
673 .Index(source.GetIndex())
674 .RepresentationIndex(source.GetRepresentationIndex());
675 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
676 }
677 };
678
679 R__ASSERT(firstEntry >= fPrevClusterNEntries);
680 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
681 for (auto f : changeset.fAddedFields) {
682 addField(*f);
683 for (auto &descendant : *f)
684 addField(descendant);
685 }
686 for (auto f : changeset.fAddedProjectedFields) {
687 addProjectedField(*f);
688 for (auto &descendant : *f)
689 addProjectedField(descendant);
690 }
691
692 const auto nColumns = descriptor.GetNPhysicalColumns();
693 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
695 columnRange.fPhysicalColumnId = i;
696 // We set the first element index in the current cluster to the first element that is part of a materialized page
697 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
698 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
699 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
700 columnRange.fNElements = 0;
701 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
702 fOpenColumnRanges.emplace_back(columnRange);
704 pageRange.fPhysicalColumnId = i;
705 fOpenPageRanges.emplace_back(std::move(pageRange));
706 }
707
708 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
709 // header was already serialized, this has to be done manually as it is required for page list serialization.
710 if (fSerializationContext.GetHeaderSize() > 0)
711 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
712}
713
715 const RExtraTypeInfoDescriptor &extraTypeInfo)
716{
717 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
718 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
719
720 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
721}
722
724{
725 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
726 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
727
728 auto &fieldZero = model.GetFieldZero();
729 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
730 fieldZero.SetOnDiskId(0);
732
733 RNTupleModelChangeset initialChangeset{model};
734 for (auto f : fieldZero.GetSubFields())
735 initialChangeset.fAddedFields.emplace_back(f);
736 for (auto f : model.GetProjectedFields().GetFieldZero()->GetSubFields())
737 initialChangeset.fAddedProjectedFields.emplace_back(f);
738 UpdateSchema(initialChangeset, 0U);
739
740 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
741 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
742 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
743 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
744
745 fDescriptorBuilder.BeginHeaderExtension();
746}
747
749{
750 {
751 auto model = descriptor.CreateModel();
752 Init(*model.get());
753 }
754
755 auto clusterId = descriptor.FindClusterId(0, 0);
756
757 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
758 auto &cluster = descriptor.GetClusterDescriptor(clusterId);
759 auto nEntries = cluster.GetNEntries();
760
761 RClusterDescriptorBuilder clusterBuilder;
762 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
763 .FirstEntryIndex(fPrevClusterNEntries)
764 .NEntries(nEntries);
765
766 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
767 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
768 const auto &columnRange = cluster.GetColumnRange(i);
769 R__ASSERT(columnRange.fPhysicalColumnId == i);
770 const auto &pageRange = cluster.GetPageRange(i);
771 R__ASSERT(pageRange.fPhysicalColumnId == i);
772 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
773 pageRange);
774 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
775 }
776 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
777 fPrevClusterNEntries += nEntries;
778
779 clusterId = descriptor.FindNextClusterId(clusterId);
780 }
781}
782
784{
785 fOpenColumnRanges.at(columnHandle.fPhysicalId).fIsSuppressed = true;
786}
787
789{
790 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
791
793 pageInfo.fNElements = page.GetNElements();
794 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
795 pageInfo.fHasChecksum = GetWriteOptions().GetEnablePageChecksums();
796 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
797}
798
800 const RPageStorage::RSealedPage &sealedPage)
801{
802 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.GetNElements();
803
805 pageInfo.fNElements = sealedPage.GetNElements();
806 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
807 pageInfo.fHasChecksum = sealedPage.GetHasChecksum();
808 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
809}
810
811std::vector<ROOT::Experimental::RNTupleLocator>
813 std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask)
814{
815 std::vector<ROOT::Experimental::RNTupleLocator> locators;
816 locators.reserve(mask.size());
817 std::size_t i = 0;
818 for (auto &range : ranges) {
819 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
820 if (mask[i++])
821 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
822 }
823 }
824 locators.shrink_to_fit();
825 return locators;
826}
827
829 std::span<RPageStorage::RSealedPageGroup> ranges)
830{
831 /// Used in the `originalPages` map
832 struct RSealedPageLink {
833 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
834 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
835 };
836
837 std::vector<bool> mask;
838 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
839 std::vector<std::size_t> locatorIndexes;
840 // Maps page checksums to the first sealed page with that checksum
841 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
842 std::size_t iLocator = 0;
843 for (auto &range : ranges) {
844 const auto rangeSize = std::distance(range.fFirst, range.fLast);
845 mask.reserve(mask.size() + rangeSize);
846 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
847
848 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
849 if (!fFeatures.fCanMergePages || !sealedPageIt->GetHasChecksum()) {
850 mask.emplace_back(true);
851 locatorIndexes.emplace_back(iLocator++);
852 continue;
853 }
854
855 const auto chk = sealedPageIt->GetChecksum().Unwrap();
856 auto itr = originalPages.find(chk);
857 if (itr == originalPages.end()) {
858 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
859 mask.emplace_back(true);
860 locatorIndexes.emplace_back(iLocator++);
861 continue;
862 }
863
864 const auto *p = itr->second.fSealedPage;
865 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
866 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
867 mask.emplace_back(true);
868 locatorIndexes.emplace_back(iLocator++);
869 continue;
870 }
871
872 mask.emplace_back(false);
873 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
874 }
875
876 mask.shrink_to_fit();
877 locatorIndexes.shrink_to_fit();
878 }
879
880 auto locators = CommitSealedPageVImpl(ranges, mask);
881 unsigned i = 0;
882
883 for (auto &range : ranges) {
884 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
885 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->GetNElements();
886
888 pageInfo.fNElements = sealedPageIt->GetNElements();
889 pageInfo.fLocator = locators[locatorIndexes[i++]];
890 pageInfo.fHasChecksum = sealedPageIt->GetHasChecksum();
891 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
892 }
893 }
894}
895
896std::uint64_t
898{
899 auto nbytes = CommitClusterImpl();
900
901 RClusterDescriptorBuilder clusterBuilder;
902 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
903 .FirstEntryIndex(fPrevClusterNEntries)
904 .NEntries(nNewEntries);
905 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
906 if (fOpenColumnRanges[i].fIsSuppressed) {
907 assert(fOpenPageRanges[i].fPageInfos.empty());
908 clusterBuilder.MarkSuppressedColumnRange(i);
909 } else {
911 fullRange.fPhysicalColumnId = i;
912 std::swap(fullRange, fOpenPageRanges[i]);
913 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
914 fOpenColumnRanges[i].fCompressionSettings, fullRange);
915 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
916 fOpenColumnRanges[i].fNElements = 0;
917 }
918 }
919
920 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
921 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
922 if (!fOpenColumnRanges[i].fIsSuppressed)
923 continue;
924 // We reset suppressed columns to the state they would have if they were active (not suppressed).
925 // In particular, we need to reset the first element index to the first element of the next (upcoming) cluster.
926 // This information has been determined for the committed cluster descriptor through
927 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
928 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(i);
929 fOpenColumnRanges[i].fFirstElementIndex = columnRangeFromDesc.fFirstElementIndex + columnRangeFromDesc.fNElements;
930 fOpenColumnRanges[i].fNElements = 0;
931 fOpenColumnRanges[i].fIsSuppressed = false;
932 }
933
934 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
935 fPrevClusterNEntries += nNewEntries;
936 return nbytes;
937}
938
940{
941 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
942
943 const auto nClusters = descriptor.GetNActiveClusters();
944 std::vector<DescriptorId_t> physClusterIDs;
945 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
946 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
947 }
948
949 auto szPageList = RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext);
950 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
951 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
952
953 const auto clusterGroupId = descriptor.GetNClusterGroups();
954 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
956 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
957 if (fNextClusterInGroup == nClusters) {
958 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
959 } else {
960 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
961 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
962 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
963 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
964 firstClusterDesc.GetFirstEntryIndex())
965 .NClusters(nClusters - fNextClusterInGroup);
966 }
967 std::vector<DescriptorId_t> clusterIds;
968 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
969 clusterIds.emplace_back(i);
970 }
971 cgBuilder.AddClusters(clusterIds);
972 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
973 fSerializationContext.MapClusterGroupId(clusterGroupId);
974
975 fNextClusterInGroup = nClusters;
976}
977
979{
980 if (!fStreamerInfos.empty()) {
981 RExtraTypeInfoDescriptorBuilder extraInfoBuilder;
984 fDescriptorBuilder.AddExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
985 }
986
987 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
988
989 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext);
990 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
991 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
992
993 CommitDatasetImpl(bufFooter.get(), szFooter);
994}
995
997{
998 fMetrics = Detail::RNTupleMetrics(prefix);
999 fCounters = std::make_unique<RCounters>(RCounters{
1000 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
1001 "number of pages committed to storage"),
1002 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
1003 "volume written for committed pages"),
1004 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1005 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1006 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1007 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
1008 "CPU time spent writing"),
1009 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1010 "CPU time spent compressing")});
1011}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:294
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
const RClusterDescriptor::RColumnRange & GetColumnRange(DescriptorId_t physicalId)
RResult< void > MarkSuppressedColumnRange(DescriptorId_t physicalId)
Books the given column ID as being suppressed in this cluster.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< void > CommitSuppressedColumnRanges(const RNTupleDescriptor &desc)
Sets the first element index and number of elements for all the suppressed column ranges.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A helper class for piece-wise construction of an RColumnDescriptor.
RColumnDescriptorBuilder & PhysicalColumnId(DescriptorId_t physicalColumnId)
RColumnDescriptorBuilder & Type(EColumnType type)
RColumnDescriptorBuilder & BitsOnStorage(std::uint16_t bitsOnStorage)
RColumnDescriptorBuilder & RepresentationIndex(std::uint16_t representationIndex)
RColumnDescriptorBuilder & FieldId(DescriptorId_t fieldId)
RColumnDescriptorBuilder & Index(std::uint32_t index)
RColumnDescriptorBuilder & FirstElementIndex(std::uint64_t firstElementIdx)
RResult< RColumnDescriptor > MakeDescriptor() const
Attempt to make a column descriptor.
RColumnDescriptorBuilder & LogicalColumnId(DescriptorId_t logicalColumnId)
A column element encapsulates the translation between basic C++ types and their column representation...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:42
RColumnElementBase * GetElement() const
Definition RColumn.hxx:356
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:360
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:362
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:358
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
RExtraTypeInfoDescriptorBuilder & Content(const std::string &content)
RExtraTypeInfoDescriptorBuilder & ContentId(EExtraTypeInfoIds contentId)
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:120
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
std::uint64_t CommitCluster(NTupleSize_t nEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
Reference to a page stored in the page pool.
Definition RPagePool.hxx:85
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
virtual RPageRef LoadPage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *cluster)
RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
void Attach()
Open the physical storage container and deserialize header and footer.
Common functionality of an ntuple storage for both reading and writing.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:167
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:122
std::uint32_t GetNElements() const
Definition RPage.hxx:124
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
Records the partition of data into pages for a particular column in a particular cluster.
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
std::vector< RFieldBase * > GetSubFields()
Definition RField.cxx:1007
void SetOnDiskId(DescriptorId_t id)
Definition RField.cxx:1049
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
std::unique_ptr< RNTupleModel > CreateModel(const RCreateModelOptions &options=RCreateModelOptions()) const
Re-create the C++ model from the stored meta-data.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
const RFieldBase * GetSourceField(const RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
const std::string & GetDescription() const
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing, and to make adjustments...
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
Definition RField.cxx:415
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
The window of element indexes of a particular column in a particular cluster.
NTupleSize_t fFirstElementIndex
The global index of the first column element in the cluster.
int fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.