Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RFieldBase.hxx>
22#include <ROOT/RNTupleModel.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <functional>
38#include <memory>
39#include <string_view>
40#include <unordered_map>
41#include <utility>
42
44 : fMetrics(""), fPageAllocator(std::make_unique<RPageAllocatorHeap>()), fNTupleName(name)
45{
46}
47
49
51{
52 if (!fHasChecksum)
53 return;
54
55 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
56 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
57 std::uint64_t xxhash3;
58 RNTupleSerializer::SerializeXxHash3(charBuf, GetDataSize(), xxhash3, checksumBuf);
59}
60
63{
64 if (!fHasChecksum)
66
67 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
68 if (!success)
69 return R__FAIL("page checksum verification failed, data corruption detected");
71}
72
74{
75 if (!fHasChecksum)
76 return R__FAIL("invalid attempt to extract non-existing page checksum");
77
78 assert(fBufferSize >= kNBytesPageChecksum);
79 std::uint64_t checksum;
81 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
82 return checksum;
83}
84
85//------------------------------------------------------------------------------
86
88 DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
89{
90 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
91 for (auto &columnInfo : itr->second) {
92 if (columnInfo.fElementId == elementId) {
93 columnInfo.fRefCounter++;
94 return;
95 }
96 }
97 itr->second.emplace_back(RColumnInfo{elementId, 1});
98}
99
102{
103 auto itr = fColumnInfos.find(physicalColumnId);
104 R__ASSERT(itr != fColumnInfos.end());
105 for (std::size_t i = 0; i < itr->second.size(); ++i) {
106 if (itr->second[i].fElementId != elementId)
107 continue;
108
109 itr->second[i].fRefCounter--;
110 if (itr->second[i].fRefCounter == 0) {
111 itr->second.erase(itr->second.begin() + i);
112 if (itr->second.empty()) {
113 fColumnInfos.erase(itr);
114 }
115 }
116 break;
117 }
118}
119
122{
124 for (const auto &[physicalColumnId, _] : fColumnInfos)
125 result.insert(physicalColumnId);
126 return result;
127}
128
130{
131 if (fFirstEntry == kInvalidNTupleIndex) {
132 /// Entry range unset, we assume that the entry range covers the complete source
133 return true;
134 }
135
136 if (clusterDesc.GetNEntries() == 0)
137 return true;
138 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
139 return false;
140 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
141 return false;
142 return true;
143}
144
146 : RPageStorage(name), fOptions(options)
147{
148}
149
151
152std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
153ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
154 const RNTupleReadOptions &options)
155{
156 if (ntupleName.empty()) {
157 throw RException(R__FAIL("empty RNTuple name"));
158 }
159 if (location.empty()) {
160 throw RException(R__FAIL("empty storage location"));
161 }
162 if (location.find("daos://") == 0)
163#ifdef R__ENABLE_DAOS
164 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
165#else
166 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
167#endif
168
169 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
170}
171
174{
176 auto physicalId =
177 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
178 R__ASSERT(physicalId != kInvalidDescriptorId);
179 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
180 return ColumnHandle_t{physicalId, &column};
181}
182
184{
185 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
186}
187
189{
190 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
191 throw RException(R__FAIL("invalid entry range"));
192 }
193 fEntryRange = range;
194}
195
197{
198 if (!fHasStructure)
199 LoadStructureImpl();
200 fHasStructure = true;
201}
202
204{
205 LoadStructure();
206 if (!fIsAttached)
207 GetExclDescriptorGuard().MoveIn(AttachImpl());
208 fIsAttached = true;
209}
210
211std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSource::Clone() const
212{
213 auto clone = CloneImpl();
214 if (fIsAttached) {
215 clone->GetExclDescriptorGuard().MoveIn(std::move(*GetSharedDescriptorGuard()->Clone()));
216 clone->fHasStructure = true;
217 clone->fIsAttached = true;
218 }
219 return clone;
220}
221
223{
224 return GetSharedDescriptorGuard()->GetNEntries();
225}
226
228{
229 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
230}
231
233{
234 if (fTaskScheduler)
235 UnzipClusterImpl(cluster);
236}
237
239{
240 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
241
242 const auto clusterId = cluster->GetId();
243 auto descriptorGuard = GetSharedDescriptorGuard();
244 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
245
246 std::atomic<bool> foundChecksumFailure{false};
247
248 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
249 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
250 for (const auto columnId : columnsInCluster) {
251 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
252 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
253 // of active columns.
254 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
255 continue;
256 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
257
258 for (const auto &info : columnInfos) {
259 allElements.emplace_back(GenerateColumnElement(info.fElementId));
260
261 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
262 std::uint64_t pageNo = 0;
263 std::uint64_t firstInPage = 0;
264 for (const auto &pi : pageRange.fPageInfos) {
265 ROnDiskPage::Key key(columnId, pageNo);
266 auto onDiskPage = cluster->GetOnDiskPage(key);
267 RSealedPage sealedPage;
268 sealedPage.SetNElements(pi.fNElements);
269 sealedPage.SetHasChecksum(pi.fHasChecksum);
270 sealedPage.SetBufferSize(pi.fLocator.fBytesOnStorage + pi.fHasChecksum * kNBytesPageChecksum);
271 sealedPage.SetBuffer(onDiskPage->GetAddress());
272 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
273
274 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
275 &foundChecksumFailure,
276 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
277 auto rv = UnsealPage(sealedPage, *element, columnId);
278 if (!rv) {
279 foundChecksumFailure = true;
280 return;
281 }
282 auto newPage = rv.Unwrap();
283 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
284
285 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
286 fPagePool.PreloadPage(std::move(newPage), element->GetIdentifier().fInMemoryType);
287 };
288
289 fTaskScheduler->AddTask(taskFunc);
290
291 firstInPage += pi.fNElements;
292 pageNo++;
293 } // for all pages in column
294
295 fCounters->fNPageUnsealed.Add(pageNo);
296 } // for all in-memory types of the column
297 } // for all columns in cluster
298
299 fTaskScheduler->Wait();
300
301 if (foundChecksumFailure) {
302 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
303 }
304}
305
307 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
308 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
309{
310 auto descriptorGuard = GetSharedDescriptorGuard();
311 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
312
313 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
314 if (clusterDesc.GetColumnRange(physicalColumnId).fIsSuppressed)
315 continue;
316
317 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
318 NTupleSize_t pageNo = 0;
319 for (const auto &pageInfo : pageRange.fPageInfos) {
320 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
321 pageZeroMap.Register(
322 ROnDiskPage::Key{physicalColumnId, pageNo},
323 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.fBytesOnStorage));
324 } else {
325 perPageFunc(physicalColumnId, pageNo, pageInfo);
326 }
327 ++pageNo;
328 }
329 }
330}
331
334{
335 const auto columnId = columnHandle.fPhysicalId;
336 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
337 auto cachedPageRef = fPagePool.GetPage(columnId, columnElementId.fInMemoryType, globalIndex);
338 if (!cachedPageRef.Get().IsNull())
339 return cachedPageRef;
340
341 std::uint64_t idxInCluster;
342 RClusterInfo clusterInfo;
343 {
344 auto descriptorGuard = GetSharedDescriptorGuard();
345 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
346
347 if (clusterInfo.fClusterId == kInvalidDescriptorId)
348 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
349
350 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
351 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
352 if (columnRange.fIsSuppressed)
353 return RPageRef();
354
355 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
356 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
357 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
358 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
359 }
360
362 throw RException(R__FAIL("tried to read a page with an unknown locator"));
363
364 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
365}
366
369{
370 const auto clusterId = clusterIndex.GetClusterId();
371 const auto idxInCluster = clusterIndex.GetIndex();
372 const auto columnId = columnHandle.fPhysicalId;
373 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
374 auto cachedPageRef = fPagePool.GetPage(columnId, columnElementId.fInMemoryType, clusterIndex);
375 if (!cachedPageRef.Get().IsNull())
376 return cachedPageRef;
377
378 if (clusterId == kInvalidDescriptorId)
379 throw RException(R__FAIL("entry out of bounds"));
380
381 RClusterInfo clusterInfo;
382 {
383 auto descriptorGuard = GetSharedDescriptorGuard();
384 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
385 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
386 if (columnRange.fIsSuppressed)
387 return RPageRef();
388
389 clusterInfo.fClusterId = clusterId;
390 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
391 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
392 }
393
395 throw RException(R__FAIL("tried to read a page with an unknown locator"));
396
397 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
398}
399
401{
402 fMetrics = Detail::RNTupleMetrics(prefix);
403 fCounters = std::make_unique<RCounters>(RCounters{
404 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
405 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
406 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
407 "volume read from storage (required)"),
408 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
409 "volume read from storage (overhead)"),
410 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
411 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
412 "number of partial clusters preloaded from storage"),
413 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
414 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageUnsealed", "",
415 "number of pages unzipped and decoded"),
416 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
417 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
418 "wall clock time spent decompressing"),
419 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
420 "CPU time spent reading"),
421 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
422 "CPU time spent decompressing"),
423 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
424 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
425 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
426 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
427 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
428 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
429 if (auto walltime = timeWallRead->GetValueAsInt()) {
430 double payload = szReadPayload->GetValueAsInt();
431 double overhead = szReadOverhead->GetValueAsInt();
432 // unit: bytes / nanosecond = GB/s
433 return {true, (1000. * (payload + overhead) / walltime)};
434 }
435 }
436 }
437 }
438 return {false, -1.};
439 }),
440 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
441 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
442 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
443 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
444 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
445 if (auto walltime = timeWallRead->GetValueAsInt()) {
446 double unzip = szUnzip->GetValueAsInt();
447 // unit: bytes / nanosecond = GB/s
448 return {true, 1000. * unzip / walltime};
449 }
450 }
451 }
452 return {false, -1.};
453 }),
454 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
455 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
456 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
457 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
458 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
459 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
460 double unzip = szUnzip->GetValueAsInt();
461 // unit: bytes / nanosecond = GB/s
462 return {true, 1000. * unzip / walltime};
463 }
464 }
465 }
466 return {false, -1.};
467 }),
468 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
469 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
470 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
471 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
472 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
473 if (auto payload = szReadPayload->GetValueAsInt()) {
474 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
475 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
476 }
477 }
478 }
479 return {false, -1.};
480 }),
481 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
482 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
483 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
484 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
485 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
486 if (auto unzip = szUnzip->GetValueAsInt()) {
487 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
488 }
489 }
490 }
491 return {false, -1.};
492 })});
493}
494
497 DescriptorId_t physicalColumnId)
498{
499 return UnsealPage(sealedPage, element, physicalColumnId, *fPageAllocator);
500}
501
504 DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
505{
506 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
507 // large enough to hold `sealedPage.fNElements`
508 if (sealedPage.GetBuffer() == RPage::GetPageZeroBuffer()) {
509 auto page = RPage::MakePageZero(physicalColumnId, element.GetSize());
510 page.GrowUnchecked(sealedPage.GetNElements());
511 return page;
512 }
513
514 auto rv = sealedPage.VerifyChecksumIfEnabled();
515 if (!rv)
516 return R__FORWARD_ERROR(rv);
517
518 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
519 auto page = pageAlloc.NewPage(physicalColumnId, element.GetPackedSize(), sealedPage.GetNElements());
520 if (sealedPage.GetDataSize() != bytesPacked) {
521 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), bytesPacked, page.GetBuffer());
522 } else {
523 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
524 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
525 // Note that usually pages are compressed.
526 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
527 }
528
529 if (!element.IsMappable()) {
530 auto tmp = pageAlloc.NewPage(physicalColumnId, element.GetSize(), sealedPage.GetNElements());
531 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
532 page = std::move(tmp);
533 }
534
535 page.GrowUnchecked(sealedPage.GetNElements());
536 return page;
537}
538
539//------------------------------------------------------------------------------
540
542{
543 // Make the sort order unique by adding the physical on-disk column id as a secondary key
544 if (fCurrentPageSize == other.fCurrentPageSize)
545 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
546 return fCurrentPageSize > other.fCurrentPageSize;
547}
548
550 std::size_t pageSizeLimit)
551{
552 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
553 return true;
554
555 auto itr = fColumnsSortedByPageSize.begin();
556 while (itr != fColumnsSortedByPageSize.end()) {
557 if (itr->fCurrentPageSize <= pageSizeLimit)
558 break;
559 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
560 ++itr;
561 continue;
562 }
563
564 // Flushing the current column will invalidate itr
565 auto itrFlush = itr++;
566
567 RColumnInfo next;
568 if (itr != fColumnsSortedByPageSize.end())
569 next = *itr;
570
571 itrFlush->fColumn->Flush();
572 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
573 return true;
574
575 if (next.fColumn == nullptr)
576 return false;
577 itr = fColumnsSortedByPageSize.find(next);
578 };
579
580 return false;
581}
582
584{
585 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
586 auto itr = fColumnsSortedByPageSize.find(key);
587 if (itr == fColumnsSortedByPageSize.end()) {
588 if (!TryEvict(newWritePageSize, 0))
589 return false;
590 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
591 fCurrentAllocatedBytes += newWritePageSize;
592 return true;
593 }
594
595 RColumnInfo elem{*itr};
596 assert(newWritePageSize >= elem.fInitialPageSize);
597
598 if (newWritePageSize == elem.fCurrentPageSize)
599 return true;
600
601 fColumnsSortedByPageSize.erase(itr);
602
603 if (newWritePageSize < elem.fCurrentPageSize) {
604 // Page got smaller
605 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
606 elem.fCurrentPageSize = newWritePageSize;
607 fColumnsSortedByPageSize.insert(elem);
608 return true;
609 }
610
611 // Page got larger, we may need to make space available
612 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
613 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
614 // Don't change anything, let the calling column flush itself
615 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
616 fColumnsSortedByPageSize.insert(elem);
617 return false;
618 }
619 fCurrentAllocatedBytes += diffBytes;
620 elem.fCurrentPageSize = newWritePageSize;
621 fColumnsSortedByPageSize.insert(elem);
622 return true;
623}
624
625//------------------------------------------------------------------------------
626
628 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
629{
631}
632
634
637{
638 assert(config.fPage);
639 assert(config.fElement);
640 assert(config.fBuffer);
641
642 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
643 bool isAdoptedBuffer = true;
644 auto nBytesPacked = config.fPage->GetNBytes();
645 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
646
647 if (!config.fElement->IsMappable()) {
648 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
649 pageBuf = new unsigned char[nBytesPacked];
650 isAdoptedBuffer = false;
651 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
652 }
653 auto nBytesZipped = nBytesPacked;
654
655 if ((config.fCompressionSetting != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
656 config.fWriteChecksum) {
657 nBytesZipped = RNTupleCompressor::Zip(pageBuf, nBytesPacked, config.fCompressionSetting, config.fBuffer);
658 if (!isAdoptedBuffer)
659 delete[] pageBuf;
660 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
661 isAdoptedBuffer = true;
662 }
663
664 R__ASSERT(isAdoptedBuffer);
665
666 RSealedPage sealedPage{pageBuf, nBytesZipped + nBytesChecksum, config.fPage->GetNElements(), config.fWriteChecksum};
667 sealedPage.ChecksumIfEnabled();
668
669 return sealedPage;
670}
671
674{
675 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
676 if (fSealPageBuffer.size() < nBytes)
677 fSealPageBuffer.resize(nBytes);
678
679 RSealPageConfig config;
680 config.fPage = &page;
681 config.fElement = &element;
682 config.fCompressionSetting = GetWriteOptions().GetCompression();
683 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
684 config.fAllowAlias = true;
685 config.fBuffer = fSealPageBuffer.data();
686
687 return SealPage(config);
688}
689
691{
692 for (const auto &cb : fOnDatasetCommitCallbacks)
693 cb(*this);
694 CommitDatasetImpl();
695}
696
699{
700 R__ASSERT(nElements > 0);
701 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
702 const auto nBytes = elementSize * nElements;
703 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
704 return RPage();
705 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
706}
707
708//------------------------------------------------------------------------------
709
710std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
711ROOT::Experimental::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
712 const RNTupleWriteOptions &options)
713{
714 if (ntupleName.empty()) {
715 throw RException(R__FAIL("empty RNTuple name"));
716 }
717 if (location.empty()) {
718 throw RException(R__FAIL("empty storage location"));
719 }
720 if (location.find("daos://") == 0) {
721#ifdef R__ENABLE_DAOS
722 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
723#else
724 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
725#endif
726 }
727
728 // Otherwise assume that the user wants us to create a file.
729 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
730}
731
733 const RNTupleWriteOptions &options)
734 : RPageSink(name, options)
735{
736}
737
739
742{
743 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
744 RColumnDescriptorBuilder columnBuilder;
745 columnBuilder.LogicalColumnId(columnId)
746 .PhysicalColumnId(columnId)
747 .FieldId(fieldId)
749 .ValueRange(column.GetValueRange())
750 .Type(column.GetType())
751 .Index(column.GetIndex())
754 // For late model extension, we assume that the primary column representation is the active one for the
755 // deferred range. All other representations are suppressed.
756 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
757 columnBuilder.SetSuppressedDeferred();
758 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
759 return ColumnHandle_t{columnId, &column};
760}
761
763 NTupleSize_t firstEntry)
764{
765 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
766
767 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
768 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
769 // of the changeset follow immediately the already existing physical columns
770 auto getNColumns = [](const RFieldBase &f) -> std::size_t {
771 const auto &reps = f.GetColumnRepresentatives();
772 if (reps.empty())
773 return 0;
774 return reps.size() * reps[0].size();
775 };
776 std::uint32_t nNewPhysicalColumns = 0;
777 for (auto f : changeset.fAddedFields) {
778 nNewPhysicalColumns += getNColumns(*f);
779 for (const auto &descendant : *f)
780 nNewPhysicalColumns += getNColumns(descendant);
781 }
782 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
783 }
784
785 auto addField = [&](RFieldBase &f) {
786 auto fieldId = descriptor.GetNFields();
787 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
788 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
789 f.SetOnDiskId(fieldId);
790 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
791 };
792 auto addProjectedField = [&](RFieldBase &f) {
793 auto fieldId = descriptor.GetNFields();
794 auto sourceFieldId = GetProjectedFieldsOfModel(changeset.fModel).GetSourceField(&f)->GetOnDiskId();
795 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
796 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
797 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
798 f.SetOnDiskId(fieldId);
799 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
800 auto targetId = descriptor.GetNLogicalColumns();
801 RColumnDescriptorBuilder columnBuilder;
802 columnBuilder.LogicalColumnId(targetId)
803 .PhysicalColumnId(source.GetLogicalId())
804 .FieldId(fieldId)
805 .BitsOnStorage(source.GetBitsOnStorage())
806 .ValueRange(source.GetValueRange())
807 .Type(source.GetType())
808 .Index(source.GetIndex())
809 .RepresentationIndex(source.GetRepresentationIndex());
810 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
811 }
812 };
813
814 R__ASSERT(firstEntry >= fPrevClusterNEntries);
815 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
816 for (auto f : changeset.fAddedFields) {
817 addField(*f);
818 for (auto &descendant : *f)
819 addField(descendant);
820 }
821 for (auto f : changeset.fAddedProjectedFields) {
822 addProjectedField(*f);
823 for (auto &descendant : *f)
824 addProjectedField(descendant);
825 }
826
827 const auto nColumns = descriptor.GetNPhysicalColumns();
828 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
830 columnRange.fPhysicalColumnId = i;
831 // We set the first element index in the current cluster to the first element that is part of a materialized page
832 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
833 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
834 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
835 columnRange.fNElements = 0;
836 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
837 fOpenColumnRanges.emplace_back(columnRange);
839 pageRange.fPhysicalColumnId = i;
840 fOpenPageRanges.emplace_back(std::move(pageRange));
841 }
842
843 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
844 // header was already serialized, this has to be done manually as it is required for page list serialization.
845 if (fSerializationContext.GetHeaderSize() > 0)
846 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
847}
848
850 const RExtraTypeInfoDescriptor &extraTypeInfo)
851{
852 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
853 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
854
855 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
856}
857
859{
860 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
861 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
862
863 auto &fieldZero = Internal::GetFieldZeroOfModel(model);
864 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
865 fieldZero.SetOnDiskId(0);
866 auto &projectedFields = GetProjectedFieldsOfModel(model);
867 projectedFields.GetFieldZero().SetOnDiskId(0);
868
869 RNTupleModelChangeset initialChangeset{model};
870 for (auto f : fieldZero.GetSubFields())
871 initialChangeset.fAddedFields.emplace_back(f);
872 for (auto f : projectedFields.GetFieldZero().GetSubFields())
873 initialChangeset.fAddedProjectedFields.emplace_back(f);
874 UpdateSchema(initialChangeset, 0U);
875
876 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
877 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
878 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
879 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
880
881 fDescriptorBuilder.BeginHeaderExtension();
882}
883
885{
886 {
887 auto model = descriptor.CreateModel();
888 Init(*model.get());
889 }
890
891 auto clusterId = descriptor.FindClusterId(0, 0);
892
893 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
894 auto &cluster = descriptor.GetClusterDescriptor(clusterId);
895 auto nEntries = cluster.GetNEntries();
896
897 RClusterDescriptorBuilder clusterBuilder;
898 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
899 .FirstEntryIndex(fPrevClusterNEntries)
900 .NEntries(nEntries);
901
902 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
903 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
904 const auto &columnRange = cluster.GetColumnRange(i);
905 R__ASSERT(columnRange.fPhysicalColumnId == i);
906 const auto &pageRange = cluster.GetPageRange(i);
907 R__ASSERT(pageRange.fPhysicalColumnId == i);
908 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
909 pageRange);
910 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
911 }
912 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
913 fPrevClusterNEntries += nEntries;
914
915 clusterId = descriptor.FindNextClusterId(clusterId);
916 }
917}
918
920{
921 fOpenColumnRanges.at(columnHandle.fPhysicalId).fIsSuppressed = true;
922}
923
925{
926 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
927
929 pageInfo.fNElements = page.GetNElements();
930 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
931 pageInfo.fHasChecksum = GetWriteOptions().GetEnablePageChecksums();
932 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
933}
934
936 const RPageStorage::RSealedPage &sealedPage)
937{
938 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.GetNElements();
939
941 pageInfo.fNElements = sealedPage.GetNElements();
942 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
943 pageInfo.fHasChecksum = sealedPage.GetHasChecksum();
944 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
945}
946
947std::vector<ROOT::Experimental::RNTupleLocator>
949 std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask)
950{
951 std::vector<ROOT::Experimental::RNTupleLocator> locators;
952 locators.reserve(mask.size());
953 std::size_t i = 0;
954 for (auto &range : ranges) {
955 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
956 if (mask[i++])
957 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
958 }
959 }
960 locators.shrink_to_fit();
961 return locators;
962}
963
965 std::span<RPageStorage::RSealedPageGroup> ranges)
966{
967 /// Used in the `originalPages` map
968 struct RSealedPageLink {
969 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
970 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
971 };
972
973 std::vector<bool> mask;
974 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
975 std::vector<std::size_t> locatorIndexes;
976 // Maps page checksums to the first sealed page with that checksum
977 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
978 std::size_t iLocator = 0;
979 for (auto &range : ranges) {
980 const auto rangeSize = std::distance(range.fFirst, range.fLast);
981 mask.reserve(mask.size() + rangeSize);
982 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
983
984 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
985 if (!fFeatures.fCanMergePages || !sealedPageIt->GetHasChecksum()) {
986 mask.emplace_back(true);
987 locatorIndexes.emplace_back(iLocator++);
988 continue;
989 }
990
991 const auto chk = sealedPageIt->GetChecksum().Unwrap();
992 auto itr = originalPages.find(chk);
993 if (itr == originalPages.end()) {
994 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
995 mask.emplace_back(true);
996 locatorIndexes.emplace_back(iLocator++);
997 continue;
998 }
999
1000 const auto *p = itr->second.fSealedPage;
1001 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1002 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1003 mask.emplace_back(true);
1004 locatorIndexes.emplace_back(iLocator++);
1005 continue;
1006 }
1007
1008 mask.emplace_back(false);
1009 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1010 }
1011
1012 mask.shrink_to_fit();
1013 locatorIndexes.shrink_to_fit();
1014 }
1015
1016 auto locators = CommitSealedPageVImpl(ranges, mask);
1017 unsigned i = 0;
1018
1019 for (auto &range : ranges) {
1020 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1021 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->GetNElements();
1022
1024 pageInfo.fNElements = sealedPageIt->GetNElements();
1025 pageInfo.fLocator = locators[locatorIndexes[i++]];
1026 pageInfo.fHasChecksum = sealedPageIt->GetHasChecksum();
1027 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
1028 }
1029 }
1030}
1031
1034{
1035 RStagedCluster stagedCluster;
1036 stagedCluster.fNBytesWritten = StageClusterImpl();
1037 stagedCluster.fNEntries = nNewEntries;
1038
1039 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1040 RStagedCluster::RColumnInfo columnInfo;
1041 if (fOpenColumnRanges[i].fIsSuppressed) {
1042 assert(fOpenPageRanges[i].fPageInfos.empty());
1043 columnInfo.fPageRange.fPhysicalColumnId = i;
1044 columnInfo.fIsSuppressed = true;
1045 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1046 fOpenColumnRanges[i].fNElements = 0;
1047 fOpenColumnRanges[i].fIsSuppressed = false;
1048 } else {
1049 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1050 fOpenPageRanges[i].fPhysicalColumnId = i;
1051
1052 columnInfo.fNElements = fOpenColumnRanges[i].fNElements;
1053 fOpenColumnRanges[i].fNElements = 0;
1054 }
1055 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1056 }
1057
1058 return stagedCluster;
1059}
1060
1062{
1063 for (const auto &cluster : clusters) {
1064 RClusterDescriptorBuilder clusterBuilder;
1065 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1066 .FirstEntryIndex(fPrevClusterNEntries)
1067 .NEntries(cluster.fNEntries);
1068 for (const auto &columnInfo : cluster.fColumnInfos) {
1069 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1070 if (columnInfo.fIsSuppressed) {
1071 assert(columnInfo.fPageRange.fPageInfos.empty());
1072 clusterBuilder.MarkSuppressedColumnRange(colId);
1073 } else {
1074 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].fFirstElementIndex,
1075 fOpenColumnRanges[colId].fCompressionSettings, columnInfo.fPageRange);
1076 fOpenColumnRanges[colId].fFirstElementIndex += columnInfo.fNElements;
1077 }
1078 }
1079
1080 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1081 for (const auto &columnInfo : cluster.fColumnInfos) {
1082 if (!columnInfo.fIsSuppressed)
1083 continue;
1084 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1085 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1086 // cluster. This information has been determined for the committed cluster descriptor through
1087 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1088 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1089 fOpenColumnRanges[colId].fFirstElementIndex =
1090 columnRangeFromDesc.fFirstElementIndex + columnRangeFromDesc.fNElements;
1091 }
1092
1093 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1094 fPrevClusterNEntries += cluster.fNEntries;
1095 }
1096}
1097
1099{
1100 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1101
1102 const auto nClusters = descriptor.GetNActiveClusters();
1103 std::vector<DescriptorId_t> physClusterIDs;
1104 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1105 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1106 }
1107
1108 auto szPageList = RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext);
1109 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
1110 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
1111
1112 const auto clusterGroupId = descriptor.GetNClusterGroups();
1113 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1115 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1116 if (fNextClusterInGroup == nClusters) {
1117 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1118 } else {
1119 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1120 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1121 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1122 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1123 firstClusterDesc.GetFirstEntryIndex())
1124 .NClusters(nClusters - fNextClusterInGroup);
1125 }
1126 std::vector<DescriptorId_t> clusterIds;
1127 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1128 clusterIds.emplace_back(i);
1129 }
1130 cgBuilder.AddClusters(clusterIds);
1131 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1132 fSerializationContext.MapClusterGroupId(clusterGroupId);
1133
1134 fNextClusterInGroup = nClusters;
1135}
1136
1138{
1139 if (!fStreamerInfos.empty()) {
1140 RExtraTypeInfoDescriptorBuilder extraInfoBuilder;
1143 fDescriptorBuilder.AddExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1144 }
1145
1146 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1147
1148 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext);
1149 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
1150 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1151
1152 CommitDatasetImpl(bufFooter.get(), szFooter);
1153}
1154
1156{
1157 fMetrics = Detail::RNTupleMetrics(prefix);
1158 fCounters = std::make_unique<RCounters>(RCounters{
1159 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
1160 "number of pages committed to storage"),
1161 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
1162 "volume written for committed pages"),
1163 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1164 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1165 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1166 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
1167 "CPU time spent writing"),
1168 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1169 "CPU time spent compressing")});
1170}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:294
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
const RClusterDescriptor::RColumnRange & GetColumnRange(DescriptorId_t physicalId)
RResult< void > MarkSuppressedColumnRange(DescriptorId_t physicalId)
Books the given column ID as being suppressed in this cluster.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< void > CommitSuppressedColumnRanges(const RNTupleDescriptor &desc)
Sets the first element index and number of elements for all the suppressed column ranges.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A helper class for piece-wise construction of an RColumnDescriptor.
RColumnDescriptorBuilder & PhysicalColumnId(DescriptorId_t physicalColumnId)
RColumnDescriptorBuilder & Type(EColumnType type)
RColumnDescriptorBuilder & BitsOnStorage(std::uint16_t bitsOnStorage)
RColumnDescriptorBuilder & RepresentationIndex(std::uint16_t representationIndex)
RColumnDescriptorBuilder & FieldId(DescriptorId_t fieldId)
RColumnDescriptorBuilder & Index(std::uint32_t index)
RColumnDescriptorBuilder & FirstElementIndex(std::uint64_t firstElementIdx)
RResult< RColumnDescriptor > MakeDescriptor() const
Attempt to make a column descriptor.
RColumnDescriptorBuilder & LogicalColumnId(DescriptorId_t logicalColumnId)
RColumnDescriptorBuilder & ValueRange(double min, double max)
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
RColumnElementBase * GetElement() const
Definition RColumn.hxx:329
DescriptorId_t GetOnDiskId() const
Definition RColumn.hxx:343
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:336
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:342
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:351
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:344
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:331
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
RExtraTypeInfoDescriptorBuilder & Content(const std::string &content)
RExtraTypeInfoDescriptorBuilder & ContentId(EExtraTypeInfoIds contentId)
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:120
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(ColumnId_t columnId, std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
Reference to a page stored in the page pool.
Definition RPagePool.hxx:93
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
void Insert(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
void Erase(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) override
Register a new column.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
virtual RPageRef LoadPage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *cluster)
void Attach()
Open the physical storage container and deserialize header and footer.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:122
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:174
std::uint32_t GetNElements() const
Definition RPage.hxx:131
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
const RFieldBase * GetSourceField(const RFieldBase *target) const
bool TryUpdate(RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
Records the partition of data into pages for a particular column in a particular cluster.
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
std::unique_ptr< RNTupleModel > CreateModel(const RCreateModelOptions &options=RCreateModelOptions()) const
Re-create the C++ model from the stored meta-data.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
The RNTupleModel encapulates the schema of an ntuple.
const std::string & GetDescription() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
Definition RField.cxx:406
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
The window of element indexes of a particular column in a particular cluster.
NTupleSize_t fFirstElementIndex
The global index of the first column element in the cluster.
int fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
ELocatorType fType
For non-disk locators, the value for the Type field.