Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
14#include <ROOT/RPageStorage.hxx>
16#include <ROOT/RColumn.hxx>
17#include <ROOT/RFieldBase.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPageSinkBuf.hxx>
26#ifdef R__ENABLE_DAOS
28#endif
29
30#include <Compression.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <atomic>
35#include <cassert>
36#include <cstring>
37#include <functional>
38#include <memory>
39#include <string_view>
40#include <unordered_map>
41#include <utility>
42
52
56
62
64 : fMetrics(""), fPageAllocator(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()), fNTupleName(name)
65{
66}
67
69
71{
72 if (!fHasChecksum)
73 return;
74
75 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
76 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
77 std::uint64_t xxhash3;
79}
80
82{
83 if (!fHasChecksum)
85
86 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
87 if (!success)
88 return R__FAIL("page checksum verification failed, data corruption detected");
90}
91
93{
94 if (!fHasChecksum)
95 return R__FAIL("invalid attempt to extract non-existing page checksum");
96
97 assert(fBufferSize >= kNBytesPageChecksum);
98 std::uint64_t checksum;
100 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
101 return checksum;
102}
103
104//------------------------------------------------------------------------------
105
108{
109 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
110 for (auto &columnInfo : itr->second) {
111 if (columnInfo.fElementId == elementId) {
112 columnInfo.fRefCounter++;
113 return;
114 }
115 }
116 itr->second.emplace_back(RColumnInfo{elementId, 1});
117}
118
121{
122 auto itr = fColumnInfos.find(physicalColumnId);
123 R__ASSERT(itr != fColumnInfos.end());
124 for (std::size_t i = 0; i < itr->second.size(); ++i) {
125 if (itr->second[i].fElementId != elementId)
126 continue;
127
128 itr->second[i].fRefCounter--;
129 if (itr->second[i].fRefCounter == 0) {
130 itr->second.erase(itr->second.begin() + i);
131 if (itr->second.empty()) {
132 fColumnInfos.erase(itr);
133 }
134 }
135 break;
136 }
137}
138
146
148{
149 if (fFirstEntry == ROOT::kInvalidNTupleIndex) {
150 /// Entry range unset, we assume that the entry range covers the complete source
151 return true;
152 }
153
154 if (clusterDesc.GetNEntries() == 0)
155 return true;
156 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
157 return false;
158 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
159 return false;
160 return true;
161}
162
164 : RPageStorage(name), fOptions(options)
165{
166}
167
169
170std::unique_ptr<ROOT::Internal::RPageSource>
171ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
172 const ROOT::RNTupleReadOptions &options)
173{
174 if (ntupleName.empty()) {
175 throw RException(R__FAIL("empty RNTuple name"));
176 }
177 if (location.empty()) {
178 throw RException(R__FAIL("empty storage location"));
179 }
180 if (location.find("daos://") == 0)
181#ifdef R__ENABLE_DAOS
182 return std::make_unique<ROOT::Experimental::Internal::RPageSourceDaos>(ntupleName, location, options);
183#else
184 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
185#endif
186
187 return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
188}
189
192{
194 auto physicalId =
195 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
197 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
198 return ColumnHandle_t{physicalId, &column};
199}
200
202{
203 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
204}
205
207{
208 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
209 throw RException(R__FAIL("invalid entry range"));
210 }
211 fEntryRange = range;
212}
213
215{
216 if (!fHasStructure)
217 LoadStructureImpl();
218 fHasStructure = true;
219}
220
222{
223 LoadStructure();
224 if (!fIsAttached)
225 GetExclDescriptorGuard().MoveIn(AttachImpl(mode));
226 fIsAttached = true;
227}
228
229std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSource::Clone() const
230{
231 auto clone = CloneImpl();
232 if (fIsAttached) {
233 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
234 clone->fHasStructure = true;
235 clone->fIsAttached = true;
236 }
237 return clone;
238}
239
241{
242 return GetSharedDescriptorGuard()->GetNEntries();
243}
244
246{
247 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
248}
249
251{
252 if (fTaskScheduler)
253 UnzipClusterImpl(cluster);
254}
255
257{
258 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
259
260 const auto clusterId = cluster->GetId();
261 auto descriptorGuard = GetSharedDescriptorGuard();
262 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
263
264 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
265
266 std::atomic<bool> foundChecksumFailure{false};
267
268 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
269 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
270 for (const auto columnId : columnsInCluster) {
271 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
272 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
273 // of active columns.
274 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
275 continue;
276 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
277
278 allElements.reserve(allElements.size() + columnInfos.size());
279 for (const auto &info : columnInfos) {
280 allElements.emplace_back(GenerateColumnElement(info.fElementId));
281
282 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
283 std::uint64_t pageNo = 0;
284 std::uint64_t firstInPage = 0;
285 for (const auto &pi : pageRange.GetPageInfos()) {
286 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
288 sealedPage.SetNElements(pi.GetNElements());
289 sealedPage.SetHasChecksum(pi.HasChecksum());
290 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
291 sealedPage.SetBuffer(onDiskPage->GetAddress());
292 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
293
294 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
296 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
297 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
298 auto rv = UnsealPage(sealedPage, *element);
299 if (!rv) {
301 return;
302 }
303 auto newPage = rv.Unwrap();
304 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
305
306 newPage.SetWindow(indexOffset + firstInPage,
308 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
309 };
310
311 fTaskScheduler->AddTask(taskFunc);
312
313 firstInPage += pi.GetNElements();
314 pageNo++;
315 } // for all pages in column
316
317 fCounters->fNPageUnsealed.Add(pageNo);
318 } // for all in-memory types of the column
319 } // for all columns in cluster
320
321 fTaskScheduler->Wait();
322
324 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
325 }
326}
327
332{
333 auto descriptorGuard = GetSharedDescriptorGuard();
334 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
335
336 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
337 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
338 continue;
339
340 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
342 for (const auto &pageInfo : pageRange.GetPageInfos()) {
343 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
346 pageInfo.GetLocator().GetNBytesOnStorage()));
347 } else {
349 }
350 ++pageNo;
351 }
352 }
353}
354
356{
357 if (fLastUsedCluster == clusterId)
358 return;
359
361 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
362 auto itr = fPreloadedClusters.begin();
363 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
364 fPagePool.Evict(itr->second);
365 itr = fPreloadedClusters.erase(itr);
366 }
367 std::size_t poolWindow = 0;
368 while ((itr != fPreloadedClusters.end()) &&
370 ++itr;
371 ++poolWindow;
372 }
373 while (itr != fPreloadedClusters.end()) {
374 fPagePool.Evict(itr->second);
375 itr = fPreloadedClusters.erase(itr);
376 }
377
378 fLastUsedCluster = clusterId;
379}
380
383{
384 const auto columnId = columnHandle.fPhysicalId;
385 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
386 auto cachedPageRef =
387 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
388 if (!cachedPageRef.Get().IsNull()) {
389 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
390 return cachedPageRef;
391 }
392
393 std::uint64_t idxInCluster;
395 {
396 auto descriptorGuard = GetSharedDescriptorGuard();
397 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
398
399 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
400 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
401
402 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
403 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
404 if (columnRange.IsSuppressed())
406
407 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
408 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
409 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
410 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
411 }
412
413 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
414 throw RException(R__FAIL("tried to read a page with an unknown locator"));
415
416 UpdateLastUsedCluster(clusterInfo.fClusterId);
417 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
418}
419
422{
423 const auto clusterId = localIndex.GetClusterId();
424 const auto idxInCluster = localIndex.GetIndexInCluster();
425 const auto columnId = columnHandle.fPhysicalId;
426 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
427 auto cachedPageRef =
428 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
429 if (!cachedPageRef.Get().IsNull()) {
430 UpdateLastUsedCluster(clusterId);
431 return cachedPageRef;
432 }
433
435 throw RException(R__FAIL("entry out of bounds"));
436
438 {
439 auto descriptorGuard = GetSharedDescriptorGuard();
440 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
441 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
442 if (columnRange.IsSuppressed())
444
445 clusterInfo.fClusterId = clusterId;
446 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
447 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
448 }
449
450 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
451 throw RException(R__FAIL("tried to read a page with an unknown locator"));
452
453 UpdateLastUsedCluster(clusterInfo.fClusterId);
454 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
455}
456
458{
459 fMetrics = RNTupleMetrics(prefix);
460 fCounters = std::make_unique<RCounters>(RCounters{
461 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
462 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
463 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadPayload", "B", "volume read from storage (required)"),
464 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadOverhead", "B", "volume read from storage (overhead)"),
465 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
466 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nClusterLoaded", "",
467 "number of partial clusters preloaded from storage"),
468 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
469 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageUnsealed", "", "number of pages unzipped and decoded"),
470 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
471 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
472 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuRead", "ns", "CPU time spent reading"),
473 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
474 "CPU time spent decompressing"),
475 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
476 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
477 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
478 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
479 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
480 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
481 if (auto walltime = timeWallRead->GetValueAsInt()) {
482 double payload = szReadPayload->GetValueAsInt();
483 double overhead = szReadOverhead->GetValueAsInt();
484 // unit: bytes / nanosecond = GB/s
485 return {true, (1000. * (payload + overhead) / walltime)};
486 }
487 }
488 }
489 }
490 return {false, -1.};
491 }),
492 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
493 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
494 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
495 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
496 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
497 if (auto walltime = timeWallRead->GetValueAsInt()) {
498 double unzip = szUnzip->GetValueAsInt();
499 // unit: bytes / nanosecond = GB/s
500 return {true, 1000. * unzip / walltime};
501 }
502 }
503 }
504 return {false, -1.};
505 }),
506 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
507 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
508 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
509 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
510 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
511 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
512 double unzip = szUnzip->GetValueAsInt();
513 // unit: bytes / nanosecond = GB/s
514 return {true, 1000. * unzip / walltime};
515 }
516 }
517 }
518 return {false, -1.};
519 }),
520 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
521 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
522 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
523 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
524 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
525 if (auto payload = szReadPayload->GetValueAsInt()) {
526 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
527 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
528 }
529 }
530 }
531 return {false, -1.};
532 }),
533 *fMetrics.MakeCounter<RNTupleCalcPerf *>("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
534 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
535 if (const auto szReadPayload =
536 metrics.GetLocalCounter("szReadPayload")) {
537 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
538 if (auto unzip = szUnzip->GetValueAsInt()) {
539 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
540 }
542 }
543 return {false, -1.};
544 })});
545}
546
549{
550 return UnsealPage(sealedPage, element, *fPageAllocator);
551}
552
556{
557 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
558 // large enough to hold `sealedPage.fNElements`
560 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
561 page.GrowUnchecked(sealedPage.GetNElements());
562 memset(page.GetBuffer(), 0, page.GetNBytes());
563 return page;
564 }
565
566 auto rv = sealedPage.VerifyChecksumIfEnabled();
567 if (!rv)
568 return R__FORWARD_ERROR(rv);
569
570 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
571 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
572 if (sealedPage.GetDataSize() != bytesPacked) {
574 page.GetBuffer());
575 } else {
576 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
577 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
578 // Note that usually pages are compressed.
579 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
580 }
581
582 if (!element.IsMappable()) {
583 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
584 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
585 page = std::move(tmp);
586 }
587
588 page.GrowUnchecked(sealedPage.GetNElements());
589 return page;
590}
591
592//------------------------------------------------------------------------------
593
595{
596 // Make the sort order unique by adding the physical on-disk column id as a secondary key
597 if (fCurrentPageSize == other.fCurrentPageSize)
598 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
599 return fCurrentPageSize > other.fCurrentPageSize;
600}
601
603{
604 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
605 return true;
606
607 auto itr = fColumnsSortedByPageSize.begin();
608 while (itr != fColumnsSortedByPageSize.end()) {
609 if (itr->fCurrentPageSize <= pageSizeLimit)
610 break;
611 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
612 ++itr;
613 continue;
614 }
615
616 // Flushing the current column will invalidate itr
617 auto itrFlush = itr++;
618
619 RColumnInfo next;
620 if (itr != fColumnsSortedByPageSize.end())
621 next = *itr;
622
623 itrFlush->fColumn->Flush();
624 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
625 return true;
626
627 if (next.fColumn == nullptr)
628 return false;
629 itr = fColumnsSortedByPageSize.find(next);
630 };
631
632 return false;
633}
634
636{
637 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
638 auto itr = fColumnsSortedByPageSize.find(key);
639 if (itr == fColumnsSortedByPageSize.end()) {
640 if (!TryEvict(newWritePageSize, 0))
641 return false;
642 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
643 fCurrentAllocatedBytes += newWritePageSize;
644 return true;
645 }
646
648 assert(newWritePageSize >= elem.fInitialPageSize);
649
650 if (newWritePageSize == elem.fCurrentPageSize)
651 return true;
652
653 fColumnsSortedByPageSize.erase(itr);
654
655 if (newWritePageSize < elem.fCurrentPageSize) {
656 // Page got smaller
657 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
658 elem.fCurrentPageSize = newWritePageSize;
659 fColumnsSortedByPageSize.insert(elem);
660 return true;
661 }
662
663 // Page got larger, we may need to make space available
664 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
665 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
666 // Don't change anything, let the calling column flush itself
667 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
668 fColumnsSortedByPageSize.insert(elem);
669 return false;
670 }
671 fCurrentAllocatedBytes += diffBytes;
672 elem.fCurrentPageSize = newWritePageSize;
673 fColumnsSortedByPageSize.insert(elem);
674 return true;
675}
676
677//------------------------------------------------------------------------------
678
680 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
681{
683}
684
686
688{
689 assert(config.fPage);
690 assert(config.fElement);
691 assert(config.fBuffer);
692
693 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
694 bool isAdoptedBuffer = true;
695 auto nBytesPacked = config.fPage->GetNBytes();
696 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
697
698 if (!config.fElement->IsMappable()) {
699 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
700 pageBuf = new unsigned char[nBytesPacked];
701 isAdoptedBuffer = false;
702 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
703 }
705
706 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
707 config.fWriteChecksum) {
710 if (!isAdoptedBuffer)
711 delete[] pageBuf;
712 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
713 isAdoptedBuffer = true;
714 }
715
717
719 sealedPage.ChecksumIfEnabled();
720
721 return sealedPage;
722}
723
726{
727 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
728 if (fSealPageBuffer.size() < nBytes)
729 fSealPageBuffer.resize(nBytes);
730
731 RSealPageConfig config;
732 config.fPage = &page;
733 config.fElement = &element;
734 config.fCompressionSettings = GetWriteOptions().GetCompression();
735 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
736 config.fAllowAlias = true;
737 config.fBuffer = fSealPageBuffer.data();
738
739 return SealPage(config);
740}
741
743{
744 for (const auto &cb : fOnDatasetCommitCallbacks)
745 cb(*this);
746 CommitDatasetImpl();
747}
748
750{
751 R__ASSERT(nElements > 0);
752 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
753 const auto nBytes = elementSize * nElements;
754 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
755 return ROOT::Internal::RPage();
756 return fPageAllocator->NewPage(elementSize, nElements);
757}
758
759//------------------------------------------------------------------------------
760
761std::unique_ptr<ROOT::Internal::RPageSink>
762ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
763 const ROOT::RNTupleWriteOptions &options)
764{
765 if (ntupleName.empty()) {
766 throw RException(R__FAIL("empty RNTuple name"));
767 }
768 if (location.empty()) {
769 throw RException(R__FAIL("empty storage location"));
770 }
771 if (location.find("daos://") == 0) {
772#ifdef R__ENABLE_DAOS
773 return std::make_unique<ROOT::Experimental::Internal::RPageSinkDaos>(ntupleName, location, options);
774#else
775 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
776#endif
777 }
778
779 // Otherwise assume that the user wants us to create a file.
780 return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
781}
782
784 const ROOT::RNTupleWriteOptions &options)
785 : RPageSink(name, options)
786{
787}
788
790
793{
794 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
796 columnBuilder.LogicalColumnId(columnId)
797 .PhysicalColumnId(columnId)
798 .FieldId(fieldId)
799 .BitsOnStorage(column.GetBitsOnStorage())
800 .ValueRange(column.GetValueRange())
801 .Type(column.GetType())
802 .Index(column.GetIndex())
803 .RepresentationIndex(column.GetRepresentationIndex())
804 .FirstElementIndex(column.GetFirstElementIndex());
805 // For late model extension, we assume that the primary column representation is the active one for the
806 // deferred range. All other representations are suppressed.
807 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
808 columnBuilder.SetSuppressedDeferred();
809 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
810 return ColumnHandle_t{columnId, &column};
811}
812
815{
816 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
817
818 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
819 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
820 // of the changeset follow immediately the already existing physical columns
821 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
822 const auto &reps = f.GetColumnRepresentatives();
823 if (reps.empty())
824 return 0;
825 return reps.size() * reps[0].size();
826 };
827 std::uint32_t nNewPhysicalColumns = 0;
828 for (auto f : changeset.fAddedFields) {
830 for (const auto &descendant : *f)
832 }
833 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
834 }
835
836 auto addField = [&](ROOT::RFieldBase &f) {
837 auto fieldId = descriptor.GetNFields();
838 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
839 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
840 f.SetOnDiskId(fieldId);
841 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
842 };
844 auto fieldId = descriptor.GetNFields();
845 auto sourceFieldId =
847 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
848 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
849 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
850 f.SetOnDiskId(fieldId);
851 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
852 auto targetId = descriptor.GetNLogicalColumns();
854 columnBuilder.LogicalColumnId(targetId)
855 .PhysicalColumnId(source.GetLogicalId())
856 .FieldId(fieldId)
857 .BitsOnStorage(source.GetBitsOnStorage())
858 .ValueRange(source.GetValueRange())
859 .Type(source.GetType())
860 .Index(source.GetIndex())
861 .RepresentationIndex(source.GetRepresentationIndex());
862 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
863 }
864 };
865
866 R__ASSERT(firstEntry >= fPrevClusterNEntries);
867 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
868 for (auto f : changeset.fAddedFields) {
869 addField(*f);
870 for (auto &descendant : *f)
872 }
873 for (auto f : changeset.fAddedProjectedFields) {
875 for (auto &descendant : *f)
877 }
878
879 const auto nColumns = descriptor.GetNPhysicalColumns();
880 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
881 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
884 columnRange.SetPhysicalColumnId(i);
885 // We set the first element index in the current cluster to the first element that is part of a materialized page
886 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
887 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
888 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
889 columnRange.SetNElements(0);
890 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
891 fOpenColumnRanges.emplace_back(columnRange);
893 pageRange.SetPhysicalColumnId(i);
894 fOpenPageRanges.emplace_back(std::move(pageRange));
895 }
896
897 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
898 // header was already serialized, this has to be done manually as it is required for page list serialization.
899 if (fSerializationContext.GetHeaderSize() > 0)
900 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
901}
902
904{
906 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
907
908 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
909}
910
912{
913 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
914 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
915
917 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
918 fieldZero.SetOnDiskId(0);
920 projectedFields.GetFieldZero().SetOnDiskId(0);
921
923 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
924 for (auto f : fieldZero.GetMutableSubfields())
925 initialChangeset.fAddedFields.emplace_back(f);
926 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
927 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
928 initialChangeset.fAddedProjectedFields.emplace_back(f);
929 UpdateSchema(initialChangeset, 0U);
930
931 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
932 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
933 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
934 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
935
936 fDescriptorBuilder.BeginHeaderExtension();
937}
938
939std::unique_ptr<ROOT::RNTupleModel>
941{
942 // Create new descriptor
943 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
944 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
945
946 // Create column/page ranges
947 const auto nColumns = descriptor.GetNPhysicalColumns();
948 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
949 fOpenColumnRanges.reserve(nColumns);
950 fOpenPageRanges.reserve(nColumns);
951 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
952 const auto &column = descriptor.GetColumnDescriptor(i);
954 columnRange.SetPhysicalColumnId(i);
955 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
956 columnRange.SetNElements(0);
957 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
958 fOpenColumnRanges.emplace_back(columnRange);
960 pageRange.SetPhysicalColumnId(i);
961 fOpenPageRanges.emplace_back(std::move(pageRange));
962 }
963
964 if (copyClusters) {
965 // Clone and add all cluster descriptors
966 auto clusterId = srcDescriptor.FindClusterId(0, 0);
968 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
969 auto nEntries = cluster.GetNEntries();
970 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
971 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
972 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
973 break;
974 const auto &columnRange = cluster.GetColumnRange(i);
975 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
976 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
977 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
978 }
979 fDescriptorBuilder.AddCluster(cluster.Clone());
980 fPrevClusterNEntries += nEntries;
981
982 clusterId = srcDescriptor.FindNextClusterId(clusterId);
983 }
984 }
985
986 // Create model
988 modelOpts.SetReconstructProjections(true);
989 auto model = descriptor.CreateModel(modelOpts);
990 if (!copyClusters) {
992 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
993 }
994
995 // Serialize header and init from it
996 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
997 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
998 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
999 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1000
1001 fDescriptorBuilder.BeginHeaderExtension();
1002
1003 // mark this sink as initialized
1004 fIsInitialized = true;
1005
1006 return model;
1007}
1008
1010{
1011 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1012}
1013
1015{
1016 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1017
1019 pageInfo.SetNElements(page.GetNElements());
1020 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1021 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1022 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1023}
1024
1027{
1028 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1029
1031 pageInfo.SetNElements(sealedPage.GetNElements());
1032 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1033 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1034 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1035}
1036
1037std::vector<ROOT::RNTupleLocator>
1038ROOT::Internal::RPagePersistentSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
1039 const std::vector<bool> &mask)
1040{
1041 std::vector<ROOT::RNTupleLocator> locators;
1042 locators.reserve(mask.size());
1043 std::size_t i = 0;
1044 for (auto &range : ranges) {
1045 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1046 if (mask[i++])
1047 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1048 }
1049 }
1050 locators.shrink_to_fit();
1051 return locators;
1052}
1053
1054void ROOT::Internal::RPagePersistentSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
1055{
1056 /// Used in the `originalPages` map
1057 struct RSealedPageLink {
1058 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1059 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1060 };
1061
1062 std::vector<bool> mask;
1063 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1064 std::vector<std::size_t> locatorIndexes;
1065 // Maps page checksums to the first sealed page with that checksum
1066 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1067 std::size_t iLocator = 0;
1068 for (auto &range : ranges) {
1069 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1070 mask.reserve(mask.size() + rangeSize);
1071 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1072
1073 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1074 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1075 mask.emplace_back(true);
1076 locatorIndexes.emplace_back(iLocator++);
1077 continue;
1078 }
1079 // Same page merging requires page checksums - this is checked in the write options
1080 R__ASSERT(sealedPageIt->GetHasChecksum());
1081
1082 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1083 auto itr = originalPages.find(chk);
1084 if (itr == originalPages.end()) {
1085 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1086 mask.emplace_back(true);
1087 locatorIndexes.emplace_back(iLocator++);
1088 continue;
1089 }
1090
1091 const auto *p = itr->second.fSealedPage;
1092 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1093 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1094 mask.emplace_back(true);
1095 locatorIndexes.emplace_back(iLocator++);
1096 continue;
1097 }
1098
1099 mask.emplace_back(false);
1100 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1101 }
1102
1103 mask.shrink_to_fit();
1104 locatorIndexes.shrink_to_fit();
1105 }
1106
1107 auto locators = CommitSealedPageVImpl(ranges, mask);
1108 unsigned i = 0;
1109
1110 for (auto &range : ranges) {
1111 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1112 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1113
1115 pageInfo.SetNElements(sealedPageIt->GetNElements());
1116 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1117 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1118 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1119 }
1120 }
1121}
1122
1125{
1127 stagedCluster.fNBytesWritten = StageClusterImpl();
1128 stagedCluster.fNEntries = nNewEntries;
1129
1130 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1131 RStagedCluster::RColumnInfo columnInfo;
1132 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1133 if (fOpenColumnRanges[i].IsSuppressed()) {
1134 assert(fOpenPageRanges[i].GetPageInfos().empty());
1135 columnInfo.fPageRange.SetPhysicalColumnId(i);
1136 columnInfo.fIsSuppressed = true;
1137 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1138 fOpenColumnRanges[i].SetNElements(0);
1139 fOpenColumnRanges[i].SetIsSuppressed(false);
1140 } else {
1141 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1142 fOpenPageRanges[i].SetPhysicalColumnId(i);
1143
1144 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1145 fOpenColumnRanges[i].SetNElements(0);
1146 }
1147 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1148 }
1149
1150 return stagedCluster;
1151}
1152
1154{
1155 for (const auto &cluster : clusters) {
1157 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1158 .FirstEntryIndex(fPrevClusterNEntries)
1159 .NEntries(cluster.fNEntries);
1160 for (const auto &columnInfo : cluster.fColumnInfos) {
1161 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1162 if (columnInfo.fIsSuppressed) {
1163 assert(columnInfo.fPageRange.GetPageInfos().empty());
1164 clusterBuilder.MarkSuppressedColumnRange(colId);
1165 } else {
1166 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1167 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1168 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1169 }
1170 }
1171
1172 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1173 for (const auto &columnInfo : cluster.fColumnInfos) {
1174 if (!columnInfo.fIsSuppressed)
1175 continue;
1176 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1177 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1178 // cluster. This information has been determined for the committed cluster descriptor through
1179 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1180 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1181 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1182 columnRangeFromDesc.GetNElements());
1183 }
1184
1185 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1186 fPrevClusterNEntries += cluster.fNEntries;
1187 }
1188}
1189
1191{
1192 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1193
1194 const auto nClusters = descriptor.GetNActiveClusters();
1195 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1196 physClusterIDs.reserve(nClusters);
1197 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1198 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1199 }
1200
1201 auto szPageList =
1202 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1205
1206 const auto clusterGroupId = descriptor.GetNClusterGroups();
1207 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1209 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1210 if (fNextClusterInGroup == nClusters) {
1211 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1212 } else {
1213 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1214 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1215 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1216 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1217 firstClusterDesc.GetFirstEntryIndex())
1218 .NClusters(nClusters - fNextClusterInGroup);
1219 }
1220 std::vector<ROOT::DescriptorId_t> clusterIds;
1221 clusterIds.reserve(nClusters);
1222 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1223 clusterIds.emplace_back(i);
1224 }
1225 cgBuilder.AddSortedClusters(clusterIds);
1226 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1227 fSerializationContext.MapClusterGroupId(clusterGroupId);
1228
1229 fNextClusterInGroup = nClusters;
1230}
1231
1233{
1234 if (!fStreamerInfos.empty()) {
1235 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1236 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1237 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1238 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1239 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1240 // should be zero.
1241 R__ASSERT(etDesc.GetTypeName().empty());
1242 R__ASSERT(etDesc.GetTypeVersion() == 0);
1243 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1244 fStreamerInfos.merge(etInfo);
1245 }
1246 }
1247
1250 .Content(RNTupleSerializer::SerializeStreamerInfos(fStreamerInfos));
1251 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1252 }
1253
1254 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1255
1256 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1258 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1259
1260 CommitDatasetImpl(bufFooter.get(), szFooter);
1261}
1262
1264{
1265 fMetrics = RNTupleMetrics(prefix);
1266 fCounters = std::make_unique<RCounters>(RCounters{
1267 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageCommitted", "", "number of pages committed to storage"),
1268 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szWritePayload", "B", "volume written for committed pages"),
1269 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1270 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1271 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1272 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuWrite", "ns", "CPU time spent writing"),
1273 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1274 "CPU time spent compressing")});
1275}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:346
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:352
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:339
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:340
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:354
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:361
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:341
std::uint32_t GetIndex() const
Definition RColumn.hxx:351
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:99
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
void UnzipCluster(ROOT::Internal::RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::NTupleSize_t GetNEntries()
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
The window of element indexes of a particular column in a particular cluster.
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
The RNTupleModel encapulates the schema of an RNTuple.
const std::string & GetDescription() const
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
bool operator>(const RColumnInfo &other) const
Information about a single page in the context of a cluster's page range.