Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RFieldBase.hxx>
22#include <ROOT/RNTupleModel.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <functional>
38#include <memory>
39#include <string_view>
40#include <unordered_map>
41#include <utility>
42
44 : fMetrics(""), fPageAllocator(std::make_unique<RPageAllocatorHeap>()), fNTupleName(name)
45{
46}
47
49
51{
52 if (!fHasChecksum)
53 return;
54
55 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
56 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
57 std::uint64_t xxhash3;
58 RNTupleSerializer::SerializeXxHash3(charBuf, GetDataSize(), xxhash3, checksumBuf);
59}
60
63{
64 if (!fHasChecksum)
66
67 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
68 if (!success)
69 return R__FAIL("page checksum verification failed, data corruption detected");
71}
72
74{
75 if (!fHasChecksum)
76 return R__FAIL("invalid attempt to extract non-existing page checksum");
77
78 assert(fBufferSize >= kNBytesPageChecksum);
79 std::uint64_t checksum;
81 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
82 return checksum;
83}
84
85//------------------------------------------------------------------------------
86
88{
89 for (unsigned i = 0; i < fIDs.size(); ++i) {
90 if (fIDs[i] == physicalColumnID) {
91 fRefCounters[i]++;
92 return;
93 }
94 }
95 fIDs.emplace_back(physicalColumnID);
96 fRefCounters.emplace_back(1);
97}
98
100{
101 for (unsigned i = 0; i < fIDs.size(); ++i) {
102 if (fIDs[i] == physicalColumnID) {
103 if (--fRefCounters[i] == 0) {
104 fIDs.erase(fIDs.begin() + i);
105 fRefCounters.erase(fRefCounters.begin() + i);
106 }
107 return;
108 }
109 }
110}
111
114{
116 for (const auto &id : fIDs)
117 result.insert(id);
118 return result;
119}
120
122{
123 if (fFirstEntry == kInvalidNTupleIndex) {
124 /// Entry range unset, we assume that the entry range covers the complete source
125 return true;
126 }
127
128 if (clusterDesc.GetNEntries() == 0)
129 return true;
130 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
131 return false;
132 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
133 return false;
134 return true;
135}
136
138 : RPageStorage(name), fOptions(options)
139{
140}
141
143
144std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
145ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
146 const RNTupleReadOptions &options)
147{
148 if (ntupleName.empty()) {
149 throw RException(R__FAIL("empty RNTuple name"));
150 }
151 if (location.empty()) {
152 throw RException(R__FAIL("empty storage location"));
153 }
154 if (location.find("daos://") == 0)
155#ifdef R__ENABLE_DAOS
156 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
157#else
158 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
159#endif
160
161 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
162}
163
166{
168 auto physicalId =
169 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
170 R__ASSERT(physicalId != kInvalidDescriptorId);
171 fActivePhysicalColumns.Insert(physicalId);
172 return ColumnHandle_t{physicalId, &column};
173}
174
176{
177 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId);
178}
179
181{
182 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
183 throw RException(R__FAIL("invalid entry range"));
184 }
185 fEntryRange = range;
186}
187
189{
190 if (!fHasStructure)
191 LoadStructureImpl();
192 fHasStructure = true;
193}
194
196{
197 LoadStructure();
198 if (!fIsAttached)
199 GetExclDescriptorGuard().MoveIn(AttachImpl());
200 fIsAttached = true;
201}
202
203std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSource::Clone() const
204{
205 auto clone = CloneImpl();
206 if (fIsAttached) {
207 clone->GetExclDescriptorGuard().MoveIn(std::move(*GetSharedDescriptorGuard()->Clone()));
208 clone->fHasStructure = true;
209 clone->fIsAttached = true;
210 }
211 return clone;
212}
213
215{
216 return GetSharedDescriptorGuard()->GetNEntries();
217}
218
220{
221 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
222}
223
225{
226 if (fTaskScheduler)
227 UnzipClusterImpl(cluster);
228}
229
231{
232 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
233
234 const auto clusterId = cluster->GetId();
235 auto descriptorGuard = GetSharedDescriptorGuard();
236 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
237
238 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
239
240 std::atomic<bool> foundChecksumFailure{false};
241
242 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
243 for (const auto columnId : columnsInCluster) {
244 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
245
246 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetType()));
247
248 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
249 std::uint64_t pageNo = 0;
250 std::uint64_t firstInPage = 0;
251 for (const auto &pi : pageRange.fPageInfos) {
252 ROnDiskPage::Key key(columnId, pageNo);
253 auto onDiskPage = cluster->GetOnDiskPage(key);
254 RSealedPage sealedPage;
255 sealedPage.SetNElements(pi.fNElements);
256 sealedPage.SetHasChecksum(pi.fHasChecksum);
257 sealedPage.SetBufferSize(pi.fLocator.fBytesOnStorage + pi.fHasChecksum * kNBytesPageChecksum);
258 sealedPage.SetBuffer(onDiskPage->GetAddress());
259 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
260
261 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
262 &foundChecksumFailure,
263 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
264 auto rv = UnsealPage(sealedPage, *element, columnId);
265 if (!rv) {
266 foundChecksumFailure = true;
267 return;
268 }
269 auto newPage = rv.Unwrap();
270 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
271
272 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
273 fPagePool.PreloadPage(std::move(newPage));
274 };
275
276 fTaskScheduler->AddTask(taskFunc);
277
278 firstInPage += pi.fNElements;
279 pageNo++;
280 } // for all pages in column
281 } // for all columns in cluster
282
283 fCounters->fNPageUnsealed.Add(cluster->GetNOnDiskPages());
284
285 fTaskScheduler->Wait();
286
287 if (foundChecksumFailure) {
288 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
289 }
290}
291
293 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
294 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
295{
296 auto descriptorGuard = GetSharedDescriptorGuard();
297 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
298
299 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
300 if (clusterDesc.GetColumnRange(physicalColumnId).fIsSuppressed)
301 continue;
302
303 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
304 NTupleSize_t pageNo = 0;
305 for (const auto &pageInfo : pageRange.fPageInfos) {
306 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
307 pageZeroMap.Register(
308 ROnDiskPage::Key{physicalColumnId, pageNo},
309 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.fBytesOnStorage));
310 } else {
311 perPageFunc(physicalColumnId, pageNo, pageInfo);
312 }
313 ++pageNo;
314 }
315 }
316}
317
320{
321 const auto columnId = columnHandle.fPhysicalId;
322 auto cachedPageRef = fPagePool.GetPage(columnId, globalIndex);
323 if (!cachedPageRef.Get().IsNull())
324 return cachedPageRef;
325
326 std::uint64_t idxInCluster;
327 RClusterInfo clusterInfo;
328 {
329 auto descriptorGuard = GetSharedDescriptorGuard();
330 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
331
332 if (clusterInfo.fClusterId == kInvalidDescriptorId)
333 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
334
335 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
336 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
337 if (columnRange.fIsSuppressed)
338 return RPageRef();
339
340 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
341 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
342 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
343 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
344 }
345
347 throw RException(R__FAIL("tried to read a page with an unknown locator"));
348
349 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
350}
351
354{
355 const auto clusterId = clusterIndex.GetClusterId();
356 const auto idxInCluster = clusterIndex.GetIndex();
357 const auto columnId = columnHandle.fPhysicalId;
358 auto cachedPageRef = fPagePool.GetPage(columnId, clusterIndex);
359 if (!cachedPageRef.Get().IsNull())
360 return cachedPageRef;
361
362 if (clusterId == kInvalidDescriptorId)
363 throw RException(R__FAIL("entry out of bounds"));
364
365 RClusterInfo clusterInfo;
366 {
367 auto descriptorGuard = GetSharedDescriptorGuard();
368 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
369 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
370 if (columnRange.fIsSuppressed)
371 return RPageRef();
372
373 clusterInfo.fClusterId = clusterId;
374 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
375 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
376 }
377
379 throw RException(R__FAIL("tried to read a page with an unknown locator"));
380
381 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
382}
383
385{
386 fMetrics = Detail::RNTupleMetrics(prefix);
387 fCounters = std::make_unique<RCounters>(RCounters{
388 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
389 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
390 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
391 "volume read from storage (required)"),
392 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
393 "volume read from storage (overhead)"),
394 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
395 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
396 "number of partial clusters preloaded from storage"),
397 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
398 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageUnsealed", "",
399 "number of pages unzipped and decoded"),
400 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
401 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
402 "wall clock time spent decompressing"),
403 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
404 "CPU time spent reading"),
405 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
406 "CPU time spent decompressing"),
407 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
408 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
409 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
410 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
411 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
412 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
413 if (auto walltime = timeWallRead->GetValueAsInt()) {
414 double payload = szReadPayload->GetValueAsInt();
415 double overhead = szReadOverhead->GetValueAsInt();
416 // unit: bytes / nanosecond = GB/s
417 return {true, (1000. * (payload + overhead) / walltime)};
418 }
419 }
420 }
421 }
422 return {false, -1.};
423 }),
424 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
425 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
426 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
427 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
428 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
429 if (auto walltime = timeWallRead->GetValueAsInt()) {
430 double unzip = szUnzip->GetValueAsInt();
431 // unit: bytes / nanosecond = GB/s
432 return {true, 1000. * unzip / walltime};
433 }
434 }
435 }
436 return {false, -1.};
437 }),
438 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
439 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
440 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
441 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
442 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
443 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
444 double unzip = szUnzip->GetValueAsInt();
445 // unit: bytes / nanosecond = GB/s
446 return {true, 1000. * unzip / walltime};
447 }
448 }
449 }
450 return {false, -1.};
451 }),
452 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
453 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
454 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
455 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
456 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
457 if (auto payload = szReadPayload->GetValueAsInt()) {
458 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
459 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
460 }
461 }
462 }
463 return {false, -1.};
464 }),
465 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
466 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
467 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
468 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
469 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
470 if (auto unzip = szUnzip->GetValueAsInt()) {
471 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
472 }
473 }
474 }
475 return {false, -1.};
476 })});
477}
478
481 DescriptorId_t physicalColumnId)
482{
483 return UnsealPage(sealedPage, element, physicalColumnId, *fPageAllocator);
484}
485
488 DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
489{
490 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
491 // large enough to hold `sealedPage.fNElements`
492 if (sealedPage.GetBuffer() == RPage::GetPageZeroBuffer()) {
493 auto page = RPage::MakePageZero(physicalColumnId, element.GetSize());
494 page.GrowUnchecked(sealedPage.GetNElements());
495 return page;
496 }
497
498 auto rv = sealedPage.VerifyChecksumIfEnabled();
499 if (!rv)
500 return R__FORWARD_ERROR(rv);
501
502 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
503 auto page = pageAlloc.NewPage(physicalColumnId, element.GetPackedSize(), sealedPage.GetNElements());
504 if (sealedPage.GetDataSize() != bytesPacked) {
505 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), bytesPacked, page.GetBuffer());
506 } else {
507 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
508 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
509 // Note that usually pages are compressed.
510 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
511 }
512
513 if (!element.IsMappable()) {
514 auto tmp = pageAlloc.NewPage(physicalColumnId, element.GetSize(), sealedPage.GetNElements());
515 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
516 page = std::move(tmp);
517 }
518
519 page.GrowUnchecked(sealedPage.GetNElements());
520 return page;
522
523//------------------------------------------------------------------------------
524
526{
527 // Make the sort order unique by adding the physical on-disk column id as a secondary key
528 if (fCurrentPageSize == other.fCurrentPageSize)
529 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
530 return fCurrentPageSize > other.fCurrentPageSize;
531}
532
534 std::size_t pageSizeLimit)
536 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
537 return true;
538
539 auto itr = fColumnsSortedByPageSize.begin();
540 while (itr != fColumnsSortedByPageSize.end()) {
541 if (itr->fCurrentPageSize <= pageSizeLimit)
542 break;
543 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
544 ++itr;
545 continue;
546 }
547
548 // Flushing the current column will invalidate itr
549 auto itrFlush = itr++;
550
551 RColumnInfo next;
552 if (itr != fColumnsSortedByPageSize.end())
553 next = *itr;
554
555 itrFlush->fColumn->Flush();
556 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
557 return true;
558
559 if (next.fColumn == nullptr)
560 return false;
561 itr = fColumnsSortedByPageSize.find(next);
562 };
563
564 return false;
565}
566
568{
569 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
570 auto itr = fColumnsSortedByPageSize.find(key);
571 if (itr == fColumnsSortedByPageSize.end()) {
572 if (!TryEvict(newWritePageSize, 0))
573 return false;
574 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
575 fCurrentAllocatedBytes += newWritePageSize;
576 return true;
577 }
578
579 RColumnInfo elem{*itr};
580 assert(newWritePageSize >= elem.fInitialPageSize);
581
582 if (newWritePageSize == elem.fCurrentPageSize)
583 return true;
584
585 fColumnsSortedByPageSize.erase(itr);
586
587 if (newWritePageSize < elem.fCurrentPageSize) {
588 // Page got smaller
589 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
590 elem.fCurrentPageSize = newWritePageSize;
591 fColumnsSortedByPageSize.insert(elem);
592 return true;
593 }
594
595 // Page got larger, we may need to make space available
596 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
597 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
598 // Don't change anything, let the calling column flush itself
599 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
600 fColumnsSortedByPageSize.insert(elem);
601 return false;
602 }
603 fCurrentAllocatedBytes += diffBytes;
604 elem.fCurrentPageSize = newWritePageSize;
605 fColumnsSortedByPageSize.insert(elem);
606 return true;
607}
608
609//------------------------------------------------------------------------------
610
612 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
613{
614}
615
617
620{
621 assert(config.fPage);
622 assert(config.fElement);
623 assert(config.fBuffer);
624
625 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
626 bool isAdoptedBuffer = true;
627 auto nBytesPacked = config.fPage->GetNBytes();
628 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
629
630 if (!config.fElement->IsMappable()) {
631 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
632 pageBuf = new unsigned char[nBytesPacked];
633 isAdoptedBuffer = false;
634 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
635 }
636 auto nBytesZipped = nBytesPacked;
637
638 if ((config.fCompressionSetting != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
639 config.fWriteChecksum) {
640 nBytesZipped = RNTupleCompressor::Zip(pageBuf, nBytesPacked, config.fCompressionSetting, config.fBuffer);
641 if (!isAdoptedBuffer)
642 delete[] pageBuf;
643 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
644 isAdoptedBuffer = true;
645 }
646
647 R__ASSERT(isAdoptedBuffer);
648
649 RSealedPage sealedPage{pageBuf, nBytesZipped + nBytesChecksum, config.fPage->GetNElements(), config.fWriteChecksum};
650 sealedPage.ChecksumIfEnabled();
651
652 return sealedPage;
653}
654
657{
658 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
659 if (fSealPageBuffer.size() < nBytes)
660 fSealPageBuffer.resize(nBytes);
661
662 RSealPageConfig config;
663 config.fPage = &page;
664 config.fElement = &element;
665 config.fCompressionSetting = GetWriteOptions().GetCompression();
666 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
667 config.fAllowAlias = true;
668 config.fBuffer = fSealPageBuffer.data();
669
670 return SealPage(config);
671}
672
674{
675 for (const auto &cb : fOnDatasetCommitCallbacks)
676 cb(*this);
677 CommitDatasetImpl();
678}
679
682{
683 R__ASSERT(nElements > 0);
684 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
685 const auto nBytes = elementSize * nElements;
686 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
687 return RPage();
688 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
689}
690
691//------------------------------------------------------------------------------
692
693std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
694ROOT::Experimental::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
695 const RNTupleWriteOptions &options)
696{
697 if (ntupleName.empty()) {
698 throw RException(R__FAIL("empty RNTuple name"));
699 }
700 if (location.empty()) {
701 throw RException(R__FAIL("empty storage location"));
702 }
703 if (location.find("daos://") == 0) {
704#ifdef R__ENABLE_DAOS
705 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
706#else
707 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
708#endif
709 }
710
711 // Otherwise assume that the user wants us to create a file.
712 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
713}
714
716 const RNTupleWriteOptions &options)
717 : RPageSink(name, options)
718{
719}
720
722
725{
726 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
727 RColumnDescriptorBuilder columnBuilder;
728 columnBuilder.LogicalColumnId(columnId)
729 .PhysicalColumnId(columnId)
730 .FieldId(fieldId)
732 .ValueRange(column.GetValueRange())
733 .Type(column.GetType())
734 .Index(column.GetIndex())
737 // For late model extension, we assume that the primary column representation is the active one for the
738 // deferred range. All other representations are suppressed.
739 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
740 columnBuilder.SetSuppressedDeferred();
741 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
742 return ColumnHandle_t{columnId, &column};
743}
744
746 NTupleSize_t firstEntry)
747{
748 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
749
750 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
751 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
752 // of the changeset follow immediately the already existing physical columns
753 auto getNColumns = [](const RFieldBase &f) -> std::size_t {
754 const auto &reps = f.GetColumnRepresentatives();
755 if (reps.empty())
756 return 0;
757 return reps.size() * reps[0].size();
758 };
759 std::uint32_t nNewPhysicalColumns = 0;
760 for (auto f : changeset.fAddedFields) {
761 nNewPhysicalColumns += getNColumns(*f);
762 for (const auto &descendant : *f)
763 nNewPhysicalColumns += getNColumns(descendant);
764 }
765 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
766 }
767
768 auto addField = [&](RFieldBase &f) {
769 auto fieldId = descriptor.GetNFields();
770 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
771 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
772 f.SetOnDiskId(fieldId);
773 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
774 };
775 auto addProjectedField = [&](RFieldBase &f) {
776 auto fieldId = descriptor.GetNFields();
777 auto sourceFieldId = GetProjectedFieldsOfModel(changeset.fModel).GetSourceField(&f)->GetOnDiskId();
778 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
779 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
780 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
781 f.SetOnDiskId(fieldId);
782 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
783 auto targetId = descriptor.GetNLogicalColumns();
784 RColumnDescriptorBuilder columnBuilder;
785 columnBuilder.LogicalColumnId(targetId)
786 .PhysicalColumnId(source.GetLogicalId())
787 .FieldId(fieldId)
788 .BitsOnStorage(source.GetBitsOnStorage())
789 .ValueRange(source.GetValueRange())
790 .Type(source.GetType())
791 .Index(source.GetIndex())
792 .RepresentationIndex(source.GetRepresentationIndex());
793 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
794 }
795 };
796
797 R__ASSERT(firstEntry >= fPrevClusterNEntries);
798 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
799 for (auto f : changeset.fAddedFields) {
800 addField(*f);
801 for (auto &descendant : *f)
802 addField(descendant);
803 }
804 for (auto f : changeset.fAddedProjectedFields) {
805 addProjectedField(*f);
806 for (auto &descendant : *f)
807 addProjectedField(descendant);
808 }
809
810 const auto nColumns = descriptor.GetNPhysicalColumns();
811 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
813 columnRange.fPhysicalColumnId = i;
814 // We set the first element index in the current cluster to the first element that is part of a materialized page
815 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
816 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
817 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
818 columnRange.fNElements = 0;
819 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
820 fOpenColumnRanges.emplace_back(columnRange);
822 pageRange.fPhysicalColumnId = i;
823 fOpenPageRanges.emplace_back(std::move(pageRange));
824 }
825
826 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
827 // header was already serialized, this has to be done manually as it is required for page list serialization.
828 if (fSerializationContext.GetHeaderSize() > 0)
829 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
830}
831
833 const RExtraTypeInfoDescriptor &extraTypeInfo)
834{
835 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
836 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
837
838 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
839}
840
842{
843 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
844 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
845
846 auto &fieldZero = Internal::GetFieldZeroOfModel(model);
847 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
848 fieldZero.SetOnDiskId(0);
849 auto &projectedFields = GetProjectedFieldsOfModel(model);
850 projectedFields.GetFieldZero().SetOnDiskId(0);
851
852 RNTupleModelChangeset initialChangeset{model};
853 for (auto f : fieldZero.GetSubFields())
854 initialChangeset.fAddedFields.emplace_back(f);
855 for (auto f : projectedFields.GetFieldZero().GetSubFields())
856 initialChangeset.fAddedProjectedFields.emplace_back(f);
857 UpdateSchema(initialChangeset, 0U);
858
859 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
860 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
861 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
862 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
863
864 fDescriptorBuilder.BeginHeaderExtension();
865}
866
868{
869 {
870 auto model = descriptor.CreateModel();
871 Init(*model.get());
872 }
873
874 auto clusterId = descriptor.FindClusterId(0, 0);
875
876 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
877 auto &cluster = descriptor.GetClusterDescriptor(clusterId);
878 auto nEntries = cluster.GetNEntries();
879
880 RClusterDescriptorBuilder clusterBuilder;
881 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
882 .FirstEntryIndex(fPrevClusterNEntries)
883 .NEntries(nEntries);
884
885 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
886 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
887 const auto &columnRange = cluster.GetColumnRange(i);
888 R__ASSERT(columnRange.fPhysicalColumnId == i);
889 const auto &pageRange = cluster.GetPageRange(i);
890 R__ASSERT(pageRange.fPhysicalColumnId == i);
891 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
892 pageRange);
893 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
894 }
895 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
896 fPrevClusterNEntries += nEntries;
897
898 clusterId = descriptor.FindNextClusterId(clusterId);
899 }
900}
901
903{
904 fOpenColumnRanges.at(columnHandle.fPhysicalId).fIsSuppressed = true;
905}
906
908{
909 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
910
912 pageInfo.fNElements = page.GetNElements();
913 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
914 pageInfo.fHasChecksum = GetWriteOptions().GetEnablePageChecksums();
915 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
916}
917
919 const RPageStorage::RSealedPage &sealedPage)
920{
921 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.GetNElements();
922
924 pageInfo.fNElements = sealedPage.GetNElements();
925 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
926 pageInfo.fHasChecksum = sealedPage.GetHasChecksum();
927 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
928}
929
930std::vector<ROOT::Experimental::RNTupleLocator>
932 std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask)
933{
934 std::vector<ROOT::Experimental::RNTupleLocator> locators;
935 locators.reserve(mask.size());
936 std::size_t i = 0;
937 for (auto &range : ranges) {
938 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
939 if (mask[i++])
940 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
941 }
942 }
943 locators.shrink_to_fit();
944 return locators;
945}
946
948 std::span<RPageStorage::RSealedPageGroup> ranges)
949{
950 /// Used in the `originalPages` map
951 struct RSealedPageLink {
952 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
953 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
954 };
955
956 std::vector<bool> mask;
957 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
958 std::vector<std::size_t> locatorIndexes;
959 // Maps page checksums to the first sealed page with that checksum
960 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
961 std::size_t iLocator = 0;
962 for (auto &range : ranges) {
963 const auto rangeSize = std::distance(range.fFirst, range.fLast);
964 mask.reserve(mask.size() + rangeSize);
965 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
966
967 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
968 if (!fFeatures.fCanMergePages || !sealedPageIt->GetHasChecksum()) {
969 mask.emplace_back(true);
970 locatorIndexes.emplace_back(iLocator++);
971 continue;
972 }
973
974 const auto chk = sealedPageIt->GetChecksum().Unwrap();
975 auto itr = originalPages.find(chk);
976 if (itr == originalPages.end()) {
977 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
978 mask.emplace_back(true);
979 locatorIndexes.emplace_back(iLocator++);
980 continue;
981 }
982
983 const auto *p = itr->second.fSealedPage;
984 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
985 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
986 mask.emplace_back(true);
987 locatorIndexes.emplace_back(iLocator++);
988 continue;
989 }
990
991 mask.emplace_back(false);
992 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
993 }
994
995 mask.shrink_to_fit();
996 locatorIndexes.shrink_to_fit();
997 }
998
999 auto locators = CommitSealedPageVImpl(ranges, mask);
1000 unsigned i = 0;
1001
1002 for (auto &range : ranges) {
1003 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1004 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->GetNElements();
1005
1007 pageInfo.fNElements = sealedPageIt->GetNElements();
1008 pageInfo.fLocator = locators[locatorIndexes[i++]];
1009 pageInfo.fHasChecksum = sealedPageIt->GetHasChecksum();
1010 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
1011 }
1012 }
1013}
1014
1017{
1018 RStagedCluster stagedCluster;
1019 stagedCluster.fNBytesWritten = StageClusterImpl();
1020 stagedCluster.fNEntries = nNewEntries;
1021
1022 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1023 RStagedCluster::RColumnInfo columnInfo;
1024 if (fOpenColumnRanges[i].fIsSuppressed) {
1025 assert(fOpenPageRanges[i].fPageInfos.empty());
1026 columnInfo.fPageRange.fPhysicalColumnId = i;
1027 columnInfo.fIsSuppressed = true;
1028 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1029 fOpenColumnRanges[i].fNElements = 0;
1030 fOpenColumnRanges[i].fIsSuppressed = false;
1031 } else {
1032 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1033 fOpenPageRanges[i].fPhysicalColumnId = i;
1034
1035 columnInfo.fNElements = fOpenColumnRanges[i].fNElements;
1036 fOpenColumnRanges[i].fNElements = 0;
1037 }
1038 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1039 }
1040
1041 return stagedCluster;
1042}
1043
1045{
1046 for (const auto &cluster : clusters) {
1047 RClusterDescriptorBuilder clusterBuilder;
1048 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1049 .FirstEntryIndex(fPrevClusterNEntries)
1050 .NEntries(cluster.fNEntries);
1051 for (const auto &columnInfo : cluster.fColumnInfos) {
1052 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1053 if (columnInfo.fIsSuppressed) {
1054 assert(columnInfo.fPageRange.fPageInfos.empty());
1055 clusterBuilder.MarkSuppressedColumnRange(colId);
1056 } else {
1057 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].fFirstElementIndex,
1058 fOpenColumnRanges[colId].fCompressionSettings, columnInfo.fPageRange);
1059 fOpenColumnRanges[colId].fFirstElementIndex += columnInfo.fNElements;
1060 }
1061 }
1062
1063 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1064 for (const auto &columnInfo : cluster.fColumnInfos) {
1065 if (!columnInfo.fIsSuppressed)
1066 continue;
1067 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1068 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1069 // cluster. This information has been determined for the committed cluster descriptor through
1070 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1071 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1072 fOpenColumnRanges[colId].fFirstElementIndex =
1073 columnRangeFromDesc.fFirstElementIndex + columnRangeFromDesc.fNElements;
1074 }
1075
1076 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1077 fPrevClusterNEntries += cluster.fNEntries;
1078 }
1079}
1080
1082{
1083 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1084
1085 const auto nClusters = descriptor.GetNActiveClusters();
1086 std::vector<DescriptorId_t> physClusterIDs;
1087 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1088 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1089 }
1090
1091 auto szPageList = RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext);
1092 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
1093 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
1094
1095 const auto clusterGroupId = descriptor.GetNClusterGroups();
1096 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1098 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1099 if (fNextClusterInGroup == nClusters) {
1100 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1101 } else {
1102 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1103 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1104 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1105 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1106 firstClusterDesc.GetFirstEntryIndex())
1107 .NClusters(nClusters - fNextClusterInGroup);
1108 }
1109 std::vector<DescriptorId_t> clusterIds;
1110 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1111 clusterIds.emplace_back(i);
1112 }
1113 cgBuilder.AddClusters(clusterIds);
1114 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1115 fSerializationContext.MapClusterGroupId(clusterGroupId);
1116
1117 fNextClusterInGroup = nClusters;
1118}
1119
1121{
1122 if (!fStreamerInfos.empty()) {
1123 RExtraTypeInfoDescriptorBuilder extraInfoBuilder;
1126 fDescriptorBuilder.AddExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1127 }
1128
1129 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1130
1131 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext);
1132 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
1133 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1134
1135 CommitDatasetImpl(bufFooter.get(), szFooter);
1136}
1137
1139{
1140 fMetrics = Detail::RNTupleMetrics(prefix);
1141 fCounters = std::make_unique<RCounters>(RCounters{
1142 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
1143 "number of pages committed to storage"),
1144 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
1145 "volume written for committed pages"),
1146 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1147 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1148 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1149 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
1150 "CPU time spent writing"),
1151 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1152 "CPU time spent compressing")});
1153}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:294
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
const RClusterDescriptor::RColumnRange & GetColumnRange(DescriptorId_t physicalId)
RResult< void > MarkSuppressedColumnRange(DescriptorId_t physicalId)
Books the given column ID as being suppressed in this cluster.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< void > CommitSuppressedColumnRanges(const RNTupleDescriptor &desc)
Sets the first element index and number of elements for all the suppressed column ranges.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A helper class for piece-wise construction of an RColumnDescriptor.
RColumnDescriptorBuilder & PhysicalColumnId(DescriptorId_t physicalColumnId)
RColumnDescriptorBuilder & Type(EColumnType type)
RColumnDescriptorBuilder & BitsOnStorage(std::uint16_t bitsOnStorage)
RColumnDescriptorBuilder & RepresentationIndex(std::uint16_t representationIndex)
RColumnDescriptorBuilder & FieldId(DescriptorId_t fieldId)
RColumnDescriptorBuilder & Index(std::uint32_t index)
RColumnDescriptorBuilder & FirstElementIndex(std::uint64_t firstElementIdx)
RResult< RColumnDescriptor > MakeDescriptor() const
Attempt to make a column descriptor.
RColumnDescriptorBuilder & LogicalColumnId(DescriptorId_t logicalColumnId)
RColumnDescriptorBuilder & ValueRange(double min, double max)
A column element encapsulates the translation between basic C++ types and their column representation...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
RColumnElementBase * GetElement() const
Definition RColumn.hxx:329
DescriptorId_t GetOnDiskId() const
Definition RColumn.hxx:343
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:336
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:342
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:351
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:344
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:331
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
RExtraTypeInfoDescriptorBuilder & Content(const std::string &content)
RExtraTypeInfoDescriptorBuilder & ContentId(EExtraTypeInfoIds contentId)
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:120
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(ColumnId_t columnId, std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
Reference to a page stored in the page pool.
Definition RPagePool.hxx:85
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) override
Register a new column.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
virtual RPageRef LoadPage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *cluster)
void Attach()
Open the physical storage container and deserialize header and footer.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:122
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:174
std::uint32_t GetNElements() const
Definition RPage.hxx:131
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
const RFieldBase * GetSourceField(const RFieldBase *target) const
bool TryUpdate(RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
Records the partition of data into pages for a particular column in a particular cluster.
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
std::unique_ptr< RNTupleModel > CreateModel(const RCreateModelOptions &options=RCreateModelOptions()) const
Re-create the C++ model from the stored meta-data.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
The RNTupleModel encapulates the schema of an ntuple.
const std::string & GetDescription() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
Definition RField.cxx:406
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
The window of element indexes of a particular column in a particular cluster.
NTupleSize_t fFirstElementIndex
The global index of the first column element in the cluster.
int fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
ELocatorType fType
For non-disk locators, the value for the Type field.