Logo ROOT  
Reference Guide
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
28#include <ROOT/RRawFile.hxx>
29
30#include <RVersion.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <iostream>
37#include <utility>
38
39#include <atomic>
40#include <condition_variable>
41#include <functional>
42#include <mutex>
43#include <thread>
44#include <queue>
45
47 const RNTupleWriteOptions &options)
48 : RPageSink(ntupleName, options)
49 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
50{
51 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. " <<
52 "Do not store real data with this version of RNTuple!";
53 fCompressor = std::make_unique<RNTupleCompressor>();
54 EnableDefaultMetrics("RPageSinkFile");
55}
56
57
59 const RNTupleWriteOptions &options)
60 : RPageSinkFile(ntupleName, options)
61{
62 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Recreate(
63 ntupleName, path, options.GetCompression(), options.GetContainerFormat()));
64}
65
66
68 const RNTupleWriteOptions &options)
69 : RPageSinkFile(ntupleName, options)
70{
71 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Append(ntupleName, file));
72}
73
74
76 const RNTupleWriteOptions &options, std::unique_ptr<TFile> &file)
77 : RPageSinkFile(ntupleName, options)
78{
79 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
81}
82
83
85{
86}
87
89 unsigned char *serializedHeader, std::uint32_t length)
90{
91 auto zipBuffer = std::make_unique<unsigned char[]>(length);
92 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
94 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
95}
96
97
100 const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
101{
102 std::uint64_t offsetData;
103 {
104 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
105 offsetData = fWriter->WriteBlob(sealedPage.fBuffer, sealedPage.fSize, bytesPacked);
106 }
107
109 result.fPosition = offsetData;
110 result.fBytesOnStorage = sealedPage.fSize;
111 fCounters->fNPageCommitted.Inc();
112 fCounters->fSzWritePayload.Add(sealedPage.fSize);
113 fNBytesCurrentCluster += sealedPage.fSize;
114 return result;
115}
116
117
120{
121 auto element = columnHandle.fColumn->GetElement();
122 RPageStorage::RSealedPage sealedPage;
123 {
124 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
125 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
126 }
127
128 fCounters->fSzZip.Add(page.GetNBytes());
129 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
130}
131
132
135 DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage)
136{
137 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
138 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(columnId).GetModel().GetType());
139 const auto bytesPacked = (bitsOnStorage * sealedPage.fNElements + 7) / 8;
140
141 return WriteSealedPage(sealedPage, bytesPacked);
142}
143
144
145std::uint64_t
147{
148 auto result = fNBytesCurrentCluster;
149 fNBytesCurrentCluster = 0;
150 return result;
151}
152
155 std::uint32_t length)
156{
157 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
158 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
159 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
160
162 result.fBytesOnStorage = szPageListZip;
163 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
164 return result;
165}
166
167void ROOT::Experimental::Detail::RPageSinkFile::CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)
168{
169 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
170 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
171 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
172 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
173 fWriter->Commit();
174}
175
178{
179 if (nElements == 0)
180 throw RException(R__FAIL("invalid call: request empty page"));
181 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
182 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
183}
184
186{
187 fPageAllocator->DeletePage(page);
188}
189
190
191////////////////////////////////////////////////////////////////////////////////
192
193
195 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
196{
197 RPage newPage(columnId, mem, elementSize, nElements);
198 newPage.GrowUnchecked(nElements);
199 return newPage;
200}
201
203{
204 if (page.IsNull())
205 return;
206 delete[] reinterpret_cast<unsigned char *>(page.GetBuffer());
207}
208
209
210////////////////////////////////////////////////////////////////////////////////
211
212
214 const RNTupleReadOptions &options)
215 : RPageSource(ntupleName, options)
216 , fPageAllocator(std::make_unique<RPageAllocatorFile>())
217 , fPagePool(std::make_shared<RPagePool>())
218 , fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
219{
220 fDecompressor = std::make_unique<RNTupleDecompressor>();
221 EnableDefaultMetrics("RPageSourceFile");
222}
223
224
226 const RNTupleReadOptions &options)
227 : RPageSourceFile(ntupleName, options)
228{
232}
233
235{
236 fDescriptorBuilder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
237 auto buffer = std::make_unique<unsigned char[]>(anchor.fLenHeader);
238 auto zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesHeader);
239 fReader.ReadBuffer(zipBuffer.get(), anchor.fNBytesHeader, anchor.fSeekHeader);
240 fDecompressor->Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
241 Internal::RNTupleSerializer::DeserializeHeaderV1(buffer.get(), anchor.fLenHeader, fDescriptorBuilder);
242
243 fDescriptorBuilder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
244 buffer = std::make_unique<unsigned char[]>(anchor.fLenFooter);
245 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesFooter);
246 fReader.ReadBuffer(zipBuffer.get(), anchor.fNBytesFooter, anchor.fSeekFooter);
247 fDecompressor->Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
248 Internal::RNTupleSerializer::DeserializeFooterV1(buffer.get(), anchor.fLenFooter, fDescriptorBuilder);
249}
250
251std::unique_ptr<ROOT::Experimental::Detail::RPageSourceFile>
253 std::string_view path, const RNTupleReadOptions &options)
254{
255 auto pageSource = std::make_unique<RPageSourceFile>("", path, options);
256 pageSource->InitDescriptor(anchor);
257 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
258 return pageSource;
259}
260
262
263
265{
266 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
267 // Otherwise, the page source was created by OpenFromAnchor() and the header and footer are already processed.
268 if (fDescriptorBuilder.GetDescriptor().GetOnDiskHeaderSize() == 0) {
269 auto anchor = fReader.GetNTuple(fNTupleName).Unwrap();
270 InitDescriptor(anchor);
271 }
272
273 auto ntplDesc = fDescriptorBuilder.MoveDescriptor();
274
275 for (const auto &cgDesc : ntplDesc.GetClusterGroupIterable()) {
276 auto buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
277 auto zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
278 fReader.ReadBuffer(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage,
279 cgDesc.GetPageListLocator().fPosition);
280 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
281 buffer.get());
282
283 auto clusters = RClusterGroupDescriptorBuilder::GetClusterSummaries(ntplDesc, cgDesc.GetId());
284 Internal::RNTupleSerializer::DeserializePageListV1(buffer.get(), cgDesc.GetPageListLength(), clusters);
285 for (std::size_t i = 0; i < clusters.size(); ++i) {
286 ntplDesc.AddClusterDetails(clusters[i].MoveDescriptor().Unwrap());
287 }
288 }
289
290 return ntplDesc;
291}
292
293
295 DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage)
296{
297 const auto clusterId = clusterIndex.GetClusterId();
298
300 {
301 auto descriptorGuard = GetSharedDescriptorGuard();
302 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
303 pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.GetIndex());
304 }
305
306 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
307 sealedPage.fSize = bytesOnStorage;
308 sealedPage.fNElements = pageInfo.fNElements;
309 if (sealedPage.fBuffer)
310 fReader.ReadBuffer(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage, pageInfo.fLocator.fPosition);
311}
312
315 const RClusterInfo &clusterInfo,
316 ClusterSize_t::ValueType idxInCluster)
317{
318 const auto columnId = columnHandle.fId;
319 const auto clusterId = clusterInfo.fClusterId;
320 const auto pageInfo = clusterInfo.fPageInfo;
321
322 const auto element = columnHandle.fColumn->GetElement();
323 const auto elementSize = element->GetSize();
324 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
325
326 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
327 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
328
329 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
330 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
331 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.fPosition);
332 fCounters->fNPageLoaded.Inc();
333 fCounters->fNRead.Inc();
334 fCounters->fSzReadPayload.Add(bytesOnStorage);
335 sealedPageBuffer = directReadBuffer.get();
336 } else {
337 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
338 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
339 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
340
341 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
342 if (!cachedPage.IsNull())
343 return cachedPage;
344
345 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
346 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
347 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
348 sealedPageBuffer = onDiskPage->GetAddress();
349 }
350
351 std::unique_ptr<unsigned char []> pageBuffer;
352 {
353 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
354 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
355 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
356 }
357
358 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
359 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
360 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
361 fPagePool->RegisterPage(newPage,
362 RPageDeleter([](const RPage &page, void * /*userData*/)
363 {
365 }, nullptr));
366 fCounters->fNPagePopulated.Inc();
367 return newPage;
368}
369
370
372 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
373{
374 const auto columnId = columnHandle.fId;
375 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
376 if (!cachedPage.IsNull())
377 return cachedPage;
378
379 std::uint64_t idxInCluster;
380 RClusterInfo clusterInfo;
381 {
382 auto descriptorGuard = GetSharedDescriptorGuard();
383 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
385
386 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
387 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
388 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
389 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
390 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
391 }
392
393 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
394}
395
396
398 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
399{
400 const auto clusterId = clusterIndex.GetClusterId();
401 const auto idxInCluster = clusterIndex.GetIndex();
402 const auto columnId = columnHandle.fId;
403 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
404 if (!cachedPage.IsNull())
405 return cachedPage;
406
407 R__ASSERT(clusterId != kInvalidDescriptorId);
408 RClusterInfo clusterInfo;
409 {
410 auto descriptorGuard = GetSharedDescriptorGuard();
411 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
412 clusterInfo.fClusterId = clusterId;
413 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
414 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
415 }
416
417 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
418}
419
421{
422 fPagePool->ReturnPage(page);
423}
424
425std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceFile::Clone() const
426{
427 auto clone = new RPageSourceFile(fNTupleName, fOptions);
428 clone->fFile = fFile->Clone();
429 clone->fReader = Internal::RMiniFileReader(clone->fFile.get());
430 return std::unique_ptr<RPageSourceFile>(clone);
431}
432
433std::unique_ptr<ROOT::Experimental::Detail::RCluster>
435 const RCluster::RKey &clusterKey,
436 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
437{
438 struct ROnDiskPageLocator {
441 std::uint64_t fOffset = 0;
442 std::uint64_t fSize = 0;
443 std::size_t fBufPos = 0;
444 };
445
446 std::vector<ROnDiskPageLocator> onDiskPages;
447 auto activeSize = 0;
448 {
449 auto descriptorGuard = GetSharedDescriptorGuard();
450 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
451
452 // Collect the page necessary page meta-data and sum up the total size of the compressed and packed pages
453 for (auto columnId : clusterKey.fColumnSet) {
454 const auto &pageRange = clusterDesc.GetPageRange(columnId);
455 NTupleSize_t pageNo = 0;
456 for (const auto &pageInfo : pageRange.fPageInfos) {
457 const auto &pageLocator = pageInfo.fLocator;
458 activeSize += pageLocator.fBytesOnStorage;
459 onDiskPages.push_back(
460 {columnId, pageNo, std::uint64_t(pageLocator.fPosition), pageLocator.fBytesOnStorage, 0});
461 ++pageNo;
462 }
463 }
464 }
465
466 // Linearize the page requests by file offset
467 std::sort(onDiskPages.begin(), onDiskPages.end(),
468 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) {return a.fOffset < b.fOffset;});
469
470 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
471 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
472 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
473 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
474 // of extra bytes.
475 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
476 // memory consumption, device block size.
477 float maxOverhead = 0.25 * float(activeSize);
478 std::vector<std::size_t> gaps;
479 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
480 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].fSize + onDiskPages[i-1].fOffset));
481 }
482 std::sort(gaps.begin(), gaps.end());
483 std::size_t gapCut = 0;
484 std::size_t currentGap = 0;
485 float szExtra = 0.0;
486 for (auto g : gaps) {
487 if (g != currentGap) {
488 gapCut = currentGap;
489 currentGap = g;
490 }
491 szExtra += g;
492 if (szExtra > maxOverhead)
493 break;
494 }
495
496 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
497 // In a second step, we'll fix-up the memory destinations for the read calls given the
498 // address of the allocated buffer. We must not touch, however, the read requests from previous
499 // calls to PrepareSingleCluster()
500 const auto currentReadRequestIdx = readRequests.size();
501
503 std::size_t szPayload = 0;
504 std::size_t szOverhead = 0;
505 for (auto &s : onDiskPages) {
506 R__ASSERT(s.fSize > 0);
507 auto readUpTo = req.fOffset + req.fSize;
508 R__ASSERT(s.fOffset >= readUpTo);
509 auto overhead = s.fOffset - readUpTo;
510 szPayload += s.fSize;
511 if (overhead <= gapCut) {
512 szOverhead += overhead;
513 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
514 req.fSize += overhead + s.fSize;
515 continue;
516 }
517
518 // close the current request and open new one
519 if (req.fSize > 0)
520 readRequests.emplace_back(req);
521
522 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
523 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
524
525 req.fOffset = s.fOffset;
526 req.fSize = s.fSize;
527 }
528 readRequests.emplace_back(req);
529 fCounters->fSzReadPayload.Add(szPayload);
530 fCounters->fSzReadOverhead.Add(szOverhead);
531
532 // Register the on disk pages in a page map
533 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
534 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
535 for (const auto &s : onDiskPages) {
536 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
537 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
538 }
539 fCounters->fNPageLoaded.Add(onDiskPages.size());
540 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
541 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
542 }
543
544 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
545 cluster->Adopt(std::move(pageMap));
546 for (auto colId : clusterKey.fColumnSet)
547 cluster->SetColumnAvailable(colId);
548 return cluster;
549}
550
551std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
553{
554 fCounters->fNClusterLoaded.Add(clusterKeys.size());
555
556 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> clusters;
557 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
558
559 for (auto key: clusterKeys) {
560 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
561 }
562
563 auto nReqs = readRequests.size();
564 {
565 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
566 fFile->ReadV(&readRequests[0], nReqs);
567 }
568 fCounters->fNReadV.Inc();
569 fCounters->fNRead.Add(nReqs);
570
571 return clusters;
572}
573
574
576{
577 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
578 fTaskScheduler->Reset();
579
580 const auto clusterId = cluster->GetId();
581 auto descriptorGuard = GetSharedDescriptorGuard();
582 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
583
584 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
585
586 const auto &columnsInCluster = cluster->GetAvailColumns();
587 for (const auto columnId : columnsInCluster) {
588 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
589
590 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
591
592 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
593 std::uint64_t pageNo = 0;
594 std::uint64_t firstInPage = 0;
595 for (const auto &pi : pageRange.fPageInfos) {
596 ROnDiskPage::Key key(columnId, pageNo);
597 auto onDiskPage = cluster->GetOnDiskPage(key);
598 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
599
600 auto taskFunc =
601 [this, columnId, clusterId, firstInPage, onDiskPage,
602 element = allElements.back().get(),
603 nElements = pi.fNElements,
604 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
605 ] () {
606 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
607 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
608
609 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
610 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
611 fPagePool->PreloadPage(newPage,
612 RPageDeleter([](const RPage &page, void * /*userData*/)
613 {
615 }, nullptr));
616 };
617
618 fTaskScheduler->AddTask(taskFunc);
619
620 firstInPage += pi.fNElements;
621 pageNo++;
622 } // for all pages in column
623 } // for all columns in cluster
624
625 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
626
627 fTaskScheduler->Wait();
628}
size_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition: RError.hxx:291
#define R__LOG_WARNING(...)
Definition: RLogger.hxx:363
#define R__ASSERT(e)
Definition: TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t b
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t g
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition: RCluster.hxx:154
const ColumnSet_t & GetAvailColumns() const
Definition: RCluster.hxx:198
DescriptorId_t GetId() const
Definition: RCluster.hxx:197
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition: RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
RColumnElementBase * GetElement() const
Definition: RColumn.hxx:308
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Definition: RNTupleZip.hxx:49
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition: RCluster.hxx:43
Manages pages read from a the file.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition: RPagePool.hxx:47
Storage provider that write ntuple pages into a file.
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
void InitDescriptor(const Internal::RFileNTupleAnchor &anchor)
Deserialized header and footer into a minimal descriptor held by fDescriptorBuilder.
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const Internal::RFileNTupleAnchor &anchor, std::string_view path, const RNTupleReadOptions &options)
Used from the RNTuple class to build a datasource if the anchor is already available.
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition: RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition: RPage.hxx:41
ClusterSize_t::ValueType GetNElements() const
Definition: RPage.hxx:83
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition: RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition: RPage.hxx:109
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition: RMiniFile.hxx:102
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
Definition: RMiniFile.cxx:1250
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
Definition: RMiniFile.cxx:1203
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::vector< RClusterDescriptorBuilder > GetClusterSummaries(const RNTupleDescriptor &ntplDesc, DescriptorId_t clusterGroupId)
Used to prepare the cluster descriptor builders when loading the page locations for a certain cluster...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Definition: RNTupleUtil.hxx:87
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition: RError.hxx:114
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition: RRawFile.cxx:73
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:54
basic_string_view< char > string_view
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Definition: RNTupleUtil.cxx:24
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
Definition: RNTupleUtil.hxx:47
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
Definition: RNTupleUtil.hxx:83
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
Definition: RNTupleUtil.hxx:79
constexpr DescriptorId_t kInvalidDescriptorId
Definition: RNTupleUtil.hxx:84
static constexpr double s
static constexpr double pi
Definition: file.py:1
The identifiers that specifies the content of a (partial) cluster.
Definition: RCluster.hxx:158
On-disk pages within a page source are identified by the column and page number.
Definition: RCluster.hxx:53
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Entry point for an RNTuple in a ROOT file.
Definition: RMiniFile.hxx:65
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
Definition: RMiniFile.hxx:82
std::uint64_t fSeekFooter
The file offset of the footer excluding the TKey part.
Definition: RMiniFile.hxx:80
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
Definition: RMiniFile.hxx:76
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
Definition: RMiniFile.hxx:84
std::uint64_t fSeekHeader
The file offset of the header excluding the TKey part.
Definition: RMiniFile.hxx:74
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
Definition: RMiniFile.hxx:78
We do not need to store the element size / uncompressed page size because we know to which column the...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
ClusterSize_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
Generic information about the physical location of data.
Used for vector reads from multiple offsets into multiple buffers.
Definition: RRawFile.hxx:71
std::size_t fSize
The number of desired bytes.
Definition: RRawFile.hxx:77
void * fBuffer
The destination for reading.
Definition: RRawFile.hxx:73
std::uint64_t fOffset
The file offset.
Definition: RRawFile.hxx:75
TArc a
Definition: textangle.C:12