Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RFieldBase.hxx>
22#include <ROOT/RNTupleModel.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <cstring>
38#include <functional>
39#include <memory>
40#include <string_view>
41#include <unordered_map>
42#include <utility>
43
45 : fMetrics(""), fPageAllocator(std::make_unique<RPageAllocatorHeap>()), fNTupleName(name)
46{
47}
48
50
52{
53 if (!fHasChecksum)
54 return;
55
56 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
57 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
58 std::uint64_t xxhash3;
59 RNTupleSerializer::SerializeXxHash3(charBuf, GetDataSize(), xxhash3, checksumBuf);
60}
61
63{
64 if (!fHasChecksum)
66
67 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
68 if (!success)
69 return R__FAIL("page checksum verification failed, data corruption detected");
71}
72
74{
75 if (!fHasChecksum)
76 return R__FAIL("invalid attempt to extract non-existing page checksum");
77
78 assert(fBufferSize >= kNBytesPageChecksum);
79 std::uint64_t checksum;
81 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
82 return checksum;
83}
84
85//------------------------------------------------------------------------------
86
88 DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
89{
90 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
91 for (auto &columnInfo : itr->second) {
92 if (columnInfo.fElementId == elementId) {
93 columnInfo.fRefCounter++;
94 return;
95 }
96 }
97 itr->second.emplace_back(RColumnInfo{elementId, 1});
98}
99
102{
103 auto itr = fColumnInfos.find(physicalColumnId);
104 R__ASSERT(itr != fColumnInfos.end());
105 for (std::size_t i = 0; i < itr->second.size(); ++i) {
106 if (itr->second[i].fElementId != elementId)
107 continue;
108
109 itr->second[i].fRefCounter--;
110 if (itr->second[i].fRefCounter == 0) {
111 itr->second.erase(itr->second.begin() + i);
112 if (itr->second.empty()) {
113 fColumnInfos.erase(itr);
114 }
115 }
116 break;
117 }
118}
119
122{
124 for (const auto &[physicalColumnId, _] : fColumnInfos)
125 result.insert(physicalColumnId);
126 return result;
127}
128
130{
131 if (fFirstEntry == kInvalidNTupleIndex) {
132 /// Entry range unset, we assume that the entry range covers the complete source
133 return true;
134 }
135
136 if (clusterDesc.GetNEntries() == 0)
137 return true;
138 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
139 return false;
140 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
141 return false;
142 return true;
143}
144
146 : RPageStorage(name), fOptions(options)
147{
148}
149
151
152std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
153ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
154 const RNTupleReadOptions &options)
155{
156 if (ntupleName.empty()) {
157 throw RException(R__FAIL("empty RNTuple name"));
158 }
159 if (location.empty()) {
160 throw RException(R__FAIL("empty storage location"));
161 }
162 if (location.find("daos://") == 0)
163#ifdef R__ENABLE_DAOS
164 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
165#else
166 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
167#endif
168
169 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
170}
171
174{
176 auto physicalId =
177 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
178 R__ASSERT(physicalId != kInvalidDescriptorId);
179 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
180 return ColumnHandle_t{physicalId, &column};
181}
182
184{
185 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
186}
187
189{
190 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
191 throw RException(R__FAIL("invalid entry range"));
192 }
193 fEntryRange = range;
194}
195
197{
198 if (!fHasStructure)
199 LoadStructureImpl();
200 fHasStructure = true;
201}
202
204{
205 LoadStructure();
206 if (!fIsAttached)
207 GetExclDescriptorGuard().MoveIn(AttachImpl());
208 fIsAttached = true;
209}
210
211std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSource::Clone() const
212{
213 auto clone = CloneImpl();
214 if (fIsAttached) {
215 clone->GetExclDescriptorGuard().MoveIn(std::move(*GetSharedDescriptorGuard()->Clone()));
216 clone->fHasStructure = true;
217 clone->fIsAttached = true;
218 }
219 return clone;
220}
221
223{
224 return GetSharedDescriptorGuard()->GetNEntries();
225}
226
228{
229 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
230}
231
233{
234 if (fTaskScheduler)
235 UnzipClusterImpl(cluster);
236}
237
239{
240 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
241
242 const auto clusterId = cluster->GetId();
243 auto descriptorGuard = GetSharedDescriptorGuard();
244 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
245
246 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
247
248 std::atomic<bool> foundChecksumFailure{false};
249
250 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
251 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
252 for (const auto columnId : columnsInCluster) {
253 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
254 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
255 // of active columns.
256 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
257 continue;
258 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
259
260 for (const auto &info : columnInfos) {
261 allElements.emplace_back(GenerateColumnElement(info.fElementId));
262
263 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
264 std::uint64_t pageNo = 0;
265 std::uint64_t firstInPage = 0;
266 for (const auto &pi : pageRange.fPageInfos) {
267 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
268 RSealedPage sealedPage;
269 sealedPage.SetNElements(pi.fNElements);
270 sealedPage.SetHasChecksum(pi.fHasChecksum);
271 sealedPage.SetBufferSize(pi.fLocator.GetNBytesOnStorage() + pi.fHasChecksum * kNBytesPageChecksum);
272 sealedPage.SetBuffer(onDiskPage->GetAddress());
273 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
274
275 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
276 &foundChecksumFailure,
277 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
278 const RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
279 auto rv = UnsealPage(sealedPage, *element);
280 if (!rv) {
281 foundChecksumFailure = true;
282 return;
283 }
284 auto newPage = rv.Unwrap();
285 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
286
287 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
288 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
289 };
290
291 fTaskScheduler->AddTask(taskFunc);
292
293 firstInPage += pi.fNElements;
294 pageNo++;
295 } // for all pages in column
296
297 fCounters->fNPageUnsealed.Add(pageNo);
298 } // for all in-memory types of the column
299 } // for all columns in cluster
300
301 fTaskScheduler->Wait();
302
303 if (foundChecksumFailure) {
304 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
305 }
306}
307
309 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
310 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
311{
312 auto descriptorGuard = GetSharedDescriptorGuard();
313 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
314
315 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
316 if (clusterDesc.GetColumnRange(physicalColumnId).fIsSuppressed)
317 continue;
318
319 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
320 NTupleSize_t pageNo = 0;
321 for (const auto &pageInfo : pageRange.fPageInfos) {
322 if (pageInfo.fLocator.GetType() == RNTupleLocator::kTypePageZero) {
323 pageZeroMap.Register(
324 ROnDiskPage::Key{physicalColumnId, pageNo},
325 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.GetNBytesOnStorage()));
326 } else {
327 perPageFunc(physicalColumnId, pageNo, pageInfo);
328 }
329 ++pageNo;
330 }
331 }
332}
333
335{
336 if (fLastUsedCluster == clusterId)
337 return;
338
339 NTupleSize_t firstEntryIndex = GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
340 auto itr = fPreloadedClusters.begin();
341 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
342 fPagePool.Evict(itr->second);
343 itr = fPreloadedClusters.erase(itr);
344 }
345 std::size_t poolWindow = 0;
346 while ((itr != fPreloadedClusters.end()) && (poolWindow < 2 * fOptions.GetClusterBunchSize())) {
347 ++itr;
348 ++poolWindow;
349 }
350 while (itr != fPreloadedClusters.end()) {
351 fPagePool.Evict(itr->second);
352 itr = fPreloadedClusters.erase(itr);
353 }
354
355 fLastUsedCluster = clusterId;
356}
357
360{
361 const auto columnId = columnHandle.fPhysicalId;
362 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
363 auto cachedPageRef = fPagePool.GetPage(RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
364 if (!cachedPageRef.Get().IsNull()) {
365 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
366 return cachedPageRef;
367 }
368
369 std::uint64_t idxInCluster;
370 RClusterInfo clusterInfo;
371 {
372 auto descriptorGuard = GetSharedDescriptorGuard();
373 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
374
375 if (clusterInfo.fClusterId == kInvalidDescriptorId)
376 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
377
378 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
379 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
380 if (columnRange.fIsSuppressed)
381 return RPageRef();
382
383 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
384 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
385 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
386 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
387 }
388
390 throw RException(R__FAIL("tried to read a page with an unknown locator"));
391
392 UpdateLastUsedCluster(clusterInfo.fClusterId);
393 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
394}
395
398{
399 const auto clusterId = clusterIndex.GetClusterId();
400 const auto idxInCluster = clusterIndex.GetIndex();
401 const auto columnId = columnHandle.fPhysicalId;
402 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
403 auto cachedPageRef = fPagePool.GetPage(RPagePool::RKey{columnId, columnElementId.fInMemoryType}, clusterIndex);
404 if (!cachedPageRef.Get().IsNull()) {
405 UpdateLastUsedCluster(clusterId);
406 return cachedPageRef;
407 }
408
409 if (clusterId == kInvalidDescriptorId)
410 throw RException(R__FAIL("entry out of bounds"));
411
412 RClusterInfo clusterInfo;
413 {
414 auto descriptorGuard = GetSharedDescriptorGuard();
415 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
416 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
417 if (columnRange.fIsSuppressed)
418 return RPageRef();
419
420 clusterInfo.fClusterId = clusterId;
421 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
422 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
423 }
424
426 throw RException(R__FAIL("tried to read a page with an unknown locator"));
427
428 UpdateLastUsedCluster(clusterInfo.fClusterId);
429 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
430}
431
433{
434 fMetrics = Detail::RNTupleMetrics(prefix);
435 fCounters = std::make_unique<RCounters>(RCounters{
436 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
437 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
438 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
439 "volume read from storage (required)"),
440 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
441 "volume read from storage (overhead)"),
442 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
443 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
444 "number of partial clusters preloaded from storage"),
445 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
446 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageUnsealed", "",
447 "number of pages unzipped and decoded"),
448 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
449 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
450 "wall clock time spent decompressing"),
451 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
452 "CPU time spent reading"),
453 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
454 "CPU time spent decompressing"),
455 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
456 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
457 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
458 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
459 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
460 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
461 if (auto walltime = timeWallRead->GetValueAsInt()) {
462 double payload = szReadPayload->GetValueAsInt();
463 double overhead = szReadOverhead->GetValueAsInt();
464 // unit: bytes / nanosecond = GB/s
465 return {true, (1000. * (payload + overhead) / walltime)};
466 }
467 }
468 }
469 }
470 return {false, -1.};
471 }),
472 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
473 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
474 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
475 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
476 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
477 if (auto walltime = timeWallRead->GetValueAsInt()) {
478 double unzip = szUnzip->GetValueAsInt();
479 // unit: bytes / nanosecond = GB/s
480 return {true, 1000. * unzip / walltime};
481 }
482 }
483 }
484 return {false, -1.};
485 }),
486 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
487 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
488 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
489 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
490 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
491 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
492 double unzip = szUnzip->GetValueAsInt();
493 // unit: bytes / nanosecond = GB/s
494 return {true, 1000. * unzip / walltime};
495 }
496 }
497 }
498 return {false, -1.};
499 }),
500 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
501 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
502 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
503 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
504 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
505 if (auto payload = szReadPayload->GetValueAsInt()) {
506 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
507 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
508 }
509 }
510 }
511 return {false, -1.};
512 }),
513 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
514 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
515 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
516 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
517 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
518 if (auto unzip = szUnzip->GetValueAsInt()) {
519 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
520 }
521 }
522 }
523 return {false, -1.};
524 })});
525}
526
529{
530 return UnsealPage(sealedPage, element, *fPageAllocator);
531}
532
535 RPageAllocator &pageAlloc)
536{
537 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
538 // large enough to hold `sealedPage.fNElements`
539 if (sealedPage.GetBuffer() == RPage::GetPageZeroBuffer()) {
540 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
541 page.GrowUnchecked(sealedPage.GetNElements());
542 memset(page.GetBuffer(), 0, page.GetNBytes());
543 return page;
544 }
545
546 auto rv = sealedPage.VerifyChecksumIfEnabled();
547 if (!rv)
548 return R__FORWARD_ERROR(rv);
549
550 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
551 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
552 if (sealedPage.GetDataSize() != bytesPacked) {
553 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), bytesPacked, page.GetBuffer());
554 } else {
555 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
556 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
557 // Note that usually pages are compressed.
558 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
559 }
560
561 if (!element.IsMappable()) {
562 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
563 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
564 page = std::move(tmp);
565 }
566
567 page.GrowUnchecked(sealedPage.GetNElements());
568 return page;
569}
570
571//------------------------------------------------------------------------------
572
574{
575 // Make the sort order unique by adding the physical on-disk column id as a secondary key
576 if (fCurrentPageSize == other.fCurrentPageSize)
577 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
578 return fCurrentPageSize > other.fCurrentPageSize;
579}
580
582 std::size_t pageSizeLimit)
583{
584 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
585 return true;
586
587 auto itr = fColumnsSortedByPageSize.begin();
588 while (itr != fColumnsSortedByPageSize.end()) {
589 if (itr->fCurrentPageSize <= pageSizeLimit)
590 break;
591 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
592 ++itr;
593 continue;
594 }
595
596 // Flushing the current column will invalidate itr
597 auto itrFlush = itr++;
598
599 RColumnInfo next;
600 if (itr != fColumnsSortedByPageSize.end())
601 next = *itr;
602
603 itrFlush->fColumn->Flush();
604 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
605 return true;
606
607 if (next.fColumn == nullptr)
608 return false;
609 itr = fColumnsSortedByPageSize.find(next);
610 };
611
612 return false;
613}
614
616{
617 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
618 auto itr = fColumnsSortedByPageSize.find(key);
619 if (itr == fColumnsSortedByPageSize.end()) {
620 if (!TryEvict(newWritePageSize, 0))
621 return false;
622 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
623 fCurrentAllocatedBytes += newWritePageSize;
624 return true;
625 }
626
627 RColumnInfo elem{*itr};
628 assert(newWritePageSize >= elem.fInitialPageSize);
629
630 if (newWritePageSize == elem.fCurrentPageSize)
631 return true;
632
633 fColumnsSortedByPageSize.erase(itr);
634
635 if (newWritePageSize < elem.fCurrentPageSize) {
636 // Page got smaller
637 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
638 elem.fCurrentPageSize = newWritePageSize;
639 fColumnsSortedByPageSize.insert(elem);
640 return true;
641 }
642
643 // Page got larger, we may need to make space available
644 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
645 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
646 // Don't change anything, let the calling column flush itself
647 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
648 fColumnsSortedByPageSize.insert(elem);
649 return false;
650 }
651 fCurrentAllocatedBytes += diffBytes;
652 elem.fCurrentPageSize = newWritePageSize;
653 fColumnsSortedByPageSize.insert(elem);
654 return true;
655}
656
657//------------------------------------------------------------------------------
658
660 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
661{
663}
664
666
669{
670 assert(config.fPage);
671 assert(config.fElement);
672 assert(config.fBuffer);
673
674 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
675 bool isAdoptedBuffer = true;
676 auto nBytesPacked = config.fPage->GetNBytes();
677 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
678
679 if (!config.fElement->IsMappable()) {
680 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
681 pageBuf = new unsigned char[nBytesPacked];
682 isAdoptedBuffer = false;
683 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
684 }
685 auto nBytesZipped = nBytesPacked;
686
687 if ((config.fCompressionSetting != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
688 config.fWriteChecksum) {
689 nBytesZipped = RNTupleCompressor::Zip(pageBuf, nBytesPacked, config.fCompressionSetting, config.fBuffer);
690 if (!isAdoptedBuffer)
691 delete[] pageBuf;
692 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
693 isAdoptedBuffer = true;
694 }
695
696 R__ASSERT(isAdoptedBuffer);
697
698 RSealedPage sealedPage{pageBuf, nBytesZipped + nBytesChecksum, config.fPage->GetNElements(), config.fWriteChecksum};
699 sealedPage.ChecksumIfEnabled();
700
701 return sealedPage;
702}
703
706{
707 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
708 if (fSealPageBuffer.size() < nBytes)
709 fSealPageBuffer.resize(nBytes);
710
711 RSealPageConfig config;
712 config.fPage = &page;
713 config.fElement = &element;
714 config.fCompressionSetting = GetWriteOptions().GetCompression();
715 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
716 config.fAllowAlias = true;
717 config.fBuffer = fSealPageBuffer.data();
718
719 return SealPage(config);
720}
721
723{
724 for (const auto &cb : fOnDatasetCommitCallbacks)
725 cb(*this);
726 CommitDatasetImpl();
727}
728
731{
732 R__ASSERT(nElements > 0);
733 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
734 const auto nBytes = elementSize * nElements;
735 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
736 return RPage();
737 return fPageAllocator->NewPage(elementSize, nElements);
738}
739
740//------------------------------------------------------------------------------
741
742std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
743ROOT::Experimental::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
744 const RNTupleWriteOptions &options)
745{
746 if (ntupleName.empty()) {
747 throw RException(R__FAIL("empty RNTuple name"));
748 }
749 if (location.empty()) {
750 throw RException(R__FAIL("empty storage location"));
751 }
752 if (location.find("daos://") == 0) {
753#ifdef R__ENABLE_DAOS
754 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
755#else
756 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
757#endif
758 }
759
760 // Otherwise assume that the user wants us to create a file.
761 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
762}
763
765 const RNTupleWriteOptions &options)
766 : RPageSink(name, options)
767{
768}
769
771
774{
775 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
776 RColumnDescriptorBuilder columnBuilder;
777 columnBuilder.LogicalColumnId(columnId)
778 .PhysicalColumnId(columnId)
779 .FieldId(fieldId)
781 .ValueRange(column.GetValueRange())
782 .Type(column.GetType())
783 .Index(column.GetIndex())
786 // For late model extension, we assume that the primary column representation is the active one for the
787 // deferred range. All other representations are suppressed.
788 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
789 columnBuilder.SetSuppressedDeferred();
790 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
791 return ColumnHandle_t{columnId, &column};
792}
793
795 NTupleSize_t firstEntry)
796{
797 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
798
799 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
800 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
801 // of the changeset follow immediately the already existing physical columns
802 auto getNColumns = [](const RFieldBase &f) -> std::size_t {
803 const auto &reps = f.GetColumnRepresentatives();
804 if (reps.empty())
805 return 0;
806 return reps.size() * reps[0].size();
807 };
808 std::uint32_t nNewPhysicalColumns = 0;
809 for (auto f : changeset.fAddedFields) {
810 nNewPhysicalColumns += getNColumns(*f);
811 for (const auto &descendant : *f)
812 nNewPhysicalColumns += getNColumns(descendant);
813 }
814 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
815 }
816
817 auto addField = [&](RFieldBase &f) {
818 auto fieldId = descriptor.GetNFields();
819 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
820 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
821 f.SetOnDiskId(fieldId);
822 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
823 };
824 auto addProjectedField = [&](RFieldBase &f) {
825 auto fieldId = descriptor.GetNFields();
826 auto sourceFieldId = GetProjectedFieldsOfModel(changeset.fModel).GetSourceField(&f)->GetOnDiskId();
827 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
828 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
829 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
830 f.SetOnDiskId(fieldId);
831 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
832 auto targetId = descriptor.GetNLogicalColumns();
833 RColumnDescriptorBuilder columnBuilder;
834 columnBuilder.LogicalColumnId(targetId)
835 .PhysicalColumnId(source.GetLogicalId())
836 .FieldId(fieldId)
837 .BitsOnStorage(source.GetBitsOnStorage())
838 .ValueRange(source.GetValueRange())
839 .Type(source.GetType())
840 .Index(source.GetIndex())
841 .RepresentationIndex(source.GetRepresentationIndex());
842 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
843 }
844 };
845
846 R__ASSERT(firstEntry >= fPrevClusterNEntries);
847 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
848 for (auto f : changeset.fAddedFields) {
849 addField(*f);
850 for (auto &descendant : *f)
851 addField(descendant);
852 }
853 for (auto f : changeset.fAddedProjectedFields) {
854 addProjectedField(*f);
855 for (auto &descendant : *f)
856 addProjectedField(descendant);
857 }
858
859 const auto nColumns = descriptor.GetNPhysicalColumns();
860 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
862 columnRange.fPhysicalColumnId = i;
863 // We set the first element index in the current cluster to the first element that is part of a materialized page
864 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
865 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
866 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
867 columnRange.fNElements = 0;
868 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
869 fOpenColumnRanges.emplace_back(columnRange);
871 pageRange.fPhysicalColumnId = i;
872 fOpenPageRanges.emplace_back(std::move(pageRange));
873 }
874
875 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
876 // header was already serialized, this has to be done manually as it is required for page list serialization.
877 if (fSerializationContext.GetHeaderSize() > 0)
878 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
879}
880
882 const RExtraTypeInfoDescriptor &extraTypeInfo)
883{
884 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
885 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
886
887 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
888}
889
891{
892 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
893 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
894
895 auto &fieldZero = Internal::GetFieldZeroOfModel(model);
896 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
897 fieldZero.SetOnDiskId(0);
898 auto &projectedFields = GetProjectedFieldsOfModel(model);
899 projectedFields.GetFieldZero().SetOnDiskId(0);
900
901 RNTupleModelChangeset initialChangeset{model};
902 for (auto f : fieldZero.GetSubFields())
903 initialChangeset.fAddedFields.emplace_back(f);
904 for (auto f : projectedFields.GetFieldZero().GetSubFields())
905 initialChangeset.fAddedProjectedFields.emplace_back(f);
906 UpdateSchema(initialChangeset, 0U);
907
908 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
909 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
910 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
911 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
912
913 fDescriptorBuilder.BeginHeaderExtension();
914}
915
917{
918 {
919 auto model = descriptor.CreateModel();
920 Init(*model.get());
921 }
922
923 auto clusterId = descriptor.FindClusterId(0, 0);
924
925 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
926 auto &cluster = descriptor.GetClusterDescriptor(clusterId);
927 auto nEntries = cluster.GetNEntries();
928
929 RClusterDescriptorBuilder clusterBuilder;
930 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
931 .FirstEntryIndex(fPrevClusterNEntries)
932 .NEntries(nEntries);
933
934 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
935 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
936 const auto &columnRange = cluster.GetColumnRange(i);
937 R__ASSERT(columnRange.fPhysicalColumnId == i);
938 const auto &pageRange = cluster.GetPageRange(i);
939 R__ASSERT(pageRange.fPhysicalColumnId == i);
940 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
941 pageRange);
942 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
943 }
944 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
945 fPrevClusterNEntries += nEntries;
946
947 clusterId = descriptor.FindNextClusterId(clusterId);
948 }
949}
950
952{
953 fOpenColumnRanges.at(columnHandle.fPhysicalId).fIsSuppressed = true;
954}
955
957{
958 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
959
961 pageInfo.fNElements = page.GetNElements();
962 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
963 pageInfo.fHasChecksum = GetWriteOptions().GetEnablePageChecksums();
964 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
965}
966
968 const RPageStorage::RSealedPage &sealedPage)
969{
970 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.GetNElements();
971
973 pageInfo.fNElements = sealedPage.GetNElements();
974 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
975 pageInfo.fHasChecksum = sealedPage.GetHasChecksum();
976 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
977}
978
979std::vector<ROOT::Experimental::RNTupleLocator>
981 std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask)
982{
983 std::vector<ROOT::Experimental::RNTupleLocator> locators;
984 locators.reserve(mask.size());
985 std::size_t i = 0;
986 for (auto &range : ranges) {
987 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
988 if (mask[i++])
989 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
990 }
991 }
992 locators.shrink_to_fit();
993 return locators;
994}
995
997 std::span<RPageStorage::RSealedPageGroup> ranges)
998{
999 /// Used in the `originalPages` map
1000 struct RSealedPageLink {
1001 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1002 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1003 };
1004
1005 std::vector<bool> mask;
1006 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1007 std::vector<std::size_t> locatorIndexes;
1008 // Maps page checksums to the first sealed page with that checksum
1009 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1010 std::size_t iLocator = 0;
1011 for (auto &range : ranges) {
1012 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1013 mask.reserve(mask.size() + rangeSize);
1014 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1015
1016 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1017 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1018 mask.emplace_back(true);
1019 locatorIndexes.emplace_back(iLocator++);
1020 continue;
1021 }
1022 // Same page merging requires page checksums - this is checked in the write options
1023 R__ASSERT(sealedPageIt->GetHasChecksum());
1024
1025 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1026 auto itr = originalPages.find(chk);
1027 if (itr == originalPages.end()) {
1028 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1029 mask.emplace_back(true);
1030 locatorIndexes.emplace_back(iLocator++);
1031 continue;
1032 }
1033
1034 const auto *p = itr->second.fSealedPage;
1035 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1036 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1037 mask.emplace_back(true);
1038 locatorIndexes.emplace_back(iLocator++);
1039 continue;
1040 }
1041
1042 mask.emplace_back(false);
1043 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1044 }
1045
1046 mask.shrink_to_fit();
1047 locatorIndexes.shrink_to_fit();
1048 }
1049
1050 auto locators = CommitSealedPageVImpl(ranges, mask);
1051 unsigned i = 0;
1052
1053 for (auto &range : ranges) {
1054 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1055 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->GetNElements();
1056
1058 pageInfo.fNElements = sealedPageIt->GetNElements();
1059 pageInfo.fLocator = locators[locatorIndexes[i++]];
1060 pageInfo.fHasChecksum = sealedPageIt->GetHasChecksum();
1061 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
1062 }
1063 }
1064}
1065
1068{
1069 RStagedCluster stagedCluster;
1070 stagedCluster.fNBytesWritten = StageClusterImpl();
1071 stagedCluster.fNEntries = nNewEntries;
1072
1073 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1074 RStagedCluster::RColumnInfo columnInfo;
1075 if (fOpenColumnRanges[i].fIsSuppressed) {
1076 assert(fOpenPageRanges[i].fPageInfos.empty());
1077 columnInfo.fPageRange.fPhysicalColumnId = i;
1078 columnInfo.fIsSuppressed = true;
1079 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1080 fOpenColumnRanges[i].fNElements = 0;
1081 fOpenColumnRanges[i].fIsSuppressed = false;
1082 } else {
1083 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1084 fOpenPageRanges[i].fPhysicalColumnId = i;
1085
1086 columnInfo.fNElements = fOpenColumnRanges[i].fNElements;
1087 fOpenColumnRanges[i].fNElements = 0;
1088 }
1089 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1090 }
1091
1092 return stagedCluster;
1093}
1094
1096{
1097 for (const auto &cluster : clusters) {
1098 RClusterDescriptorBuilder clusterBuilder;
1099 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1100 .FirstEntryIndex(fPrevClusterNEntries)
1101 .NEntries(cluster.fNEntries);
1102 for (const auto &columnInfo : cluster.fColumnInfos) {
1103 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1104 if (columnInfo.fIsSuppressed) {
1105 assert(columnInfo.fPageRange.fPageInfos.empty());
1106 clusterBuilder.MarkSuppressedColumnRange(colId);
1107 } else {
1108 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].fFirstElementIndex,
1109 fOpenColumnRanges[colId].fCompressionSettings, columnInfo.fPageRange);
1110 fOpenColumnRanges[colId].fFirstElementIndex += columnInfo.fNElements;
1111 }
1112 }
1113
1114 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1115 for (const auto &columnInfo : cluster.fColumnInfos) {
1116 if (!columnInfo.fIsSuppressed)
1117 continue;
1118 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1119 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1120 // cluster. This information has been determined for the committed cluster descriptor through
1121 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1122 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1123 fOpenColumnRanges[colId].fFirstElementIndex =
1124 columnRangeFromDesc.fFirstElementIndex + columnRangeFromDesc.fNElements;
1125 }
1126
1127 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1128 fPrevClusterNEntries += cluster.fNEntries;
1129 }
1130}
1131
1133{
1134 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1135
1136 const auto nClusters = descriptor.GetNActiveClusters();
1137 std::vector<DescriptorId_t> physClusterIDs;
1138 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1139 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1140 }
1141
1142 auto szPageList = RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext);
1143 auto bufPageList = MakeUninitArray<unsigned char>(szPageList);
1144 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
1145
1146 const auto clusterGroupId = descriptor.GetNClusterGroups();
1147 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1149 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1150 if (fNextClusterInGroup == nClusters) {
1151 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1152 } else {
1153 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1154 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1155 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1156 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1157 firstClusterDesc.GetFirstEntryIndex())
1158 .NClusters(nClusters - fNextClusterInGroup);
1159 }
1160 std::vector<DescriptorId_t> clusterIds;
1161 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1162 clusterIds.emplace_back(i);
1163 }
1164 cgBuilder.AddSortedClusters(clusterIds);
1165 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1166 fSerializationContext.MapClusterGroupId(clusterGroupId);
1167
1168 fNextClusterInGroup = nClusters;
1169}
1170
1172{
1173 if (!fStreamerInfos.empty()) {
1174 RExtraTypeInfoDescriptorBuilder extraInfoBuilder;
1177 fDescriptorBuilder.AddExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1178 }
1179
1180 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1181
1182 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext);
1183 auto bufFooter = MakeUninitArray<unsigned char>(szFooter);
1184 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1185
1186 CommitDatasetImpl(bufFooter.get(), szFooter);
1187}
1188
1190{
1191 fMetrics = Detail::RNTupleMetrics(prefix);
1192 fCounters = std::make_unique<RCounters>(RCounters{
1193 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
1194 "number of pages committed to storage"),
1195 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
1196 "volume written for committed pages"),
1197 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1198 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1199 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1200 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
1201 "CPU time spent writing"),
1202 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1203 "CPU time spent compressing")});
1204}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< void > MarkSuppressedColumnRange(DescriptorId_t physicalId)
Books the given column ID as being suppressed in this cluster.
RResult< void > CommitSuppressedColumnRanges(const RNTupleDescriptor &desc)
Sets the first element index and number of elements for all the suppressed column ranges.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
const RClusterDescriptor::RColumnRange & GetColumnRange(DescriptorId_t physicalId)
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
void AddSortedClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A helper class for piece-wise construction of an RColumnDescriptor.
RColumnDescriptorBuilder & PhysicalColumnId(DescriptorId_t physicalColumnId)
RColumnDescriptorBuilder & Type(EColumnType type)
RColumnDescriptorBuilder & BitsOnStorage(std::uint16_t bitsOnStorage)
RColumnDescriptorBuilder & RepresentationIndex(std::uint16_t representationIndex)
RColumnDescriptorBuilder & FieldId(DescriptorId_t fieldId)
RColumnDescriptorBuilder & Index(std::uint32_t index)
RColumnDescriptorBuilder & FirstElementIndex(std::uint64_t firstElementIdx)
RResult< RColumnDescriptor > MakeDescriptor() const
Attempt to make a column descriptor.
RColumnDescriptorBuilder & LogicalColumnId(DescriptorId_t logicalColumnId)
RColumnDescriptorBuilder & ValueRange(double min, double max)
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
RColumnElementBase * GetElement() const
Definition RColumn.hxx:337
DescriptorId_t GetOnDiskId() const
Definition RColumn.hxx:351
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:344
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:350
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:359
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:352
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:339
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
RExtraTypeInfoDescriptorBuilder & Content(const std::string &content)
RExtraTypeInfoDescriptorBuilder & ContentId(EExtraTypeInfoIds contentId)
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:120
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
void Insert(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
void Erase(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) override
Register a new column.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
virtual RPageRef LoadPage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *cluster)
void Attach()
Open the physical storage container and deserialize header and footer.
void UpdateLastUsedCluster(DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
Common functionality of an ntuple storage for both reading and writing.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:56
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:47
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:114
void * GrowUnchecked(std::uint32_t nElements)
Increases the number elements in the page.
Definition RPage.hxx:151
std::uint32_t GetNElements() const
Definition RPage.hxx:123
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
const RFieldBase * GetSourceField(const RFieldBase *target) const
bool TryUpdate(RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
Records the partition of data into pages for a particular column in a particular cluster.
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
std::unique_ptr< RNTupleModel > CreateModel(const RCreateModelOptions &options=RCreateModelOptions()) const
Re-create the C++ model from the stored meta-data.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(NTupleSize_t entryIdx) const
The RNTupleModel encapulates the schema of an ntuple.
const std::string & GetDescription() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
The window of element indexes of a particular column in a particular cluster.
NTupleSize_t fFirstElementIndex
The global index of the first column element in the cluster.
int fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
NTupleSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.