Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
28#include <ROOT/RRawFile.hxx>
29
30#include <RVersion.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <cstring>
37#include <iostream>
38#include <utility>
39
40#include <atomic>
41#include <condition_variable>
42#include <functional>
43#include <mutex>
44#include <thread>
45#include <queue>
46
48 const RNTupleWriteOptions &options)
49 : RPageSink(ntupleName, options)
50 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
51{
52 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. " <<
53 "Do not store real data with this version of RNTuple!";
54 fCompressor = std::make_unique<RNTupleCompressor>();
55 EnableDefaultMetrics("RPageSinkFile");
56}
57
58
59ROOT::Experimental::Detail::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
60 const RNTupleWriteOptions &options)
61 : RPageSinkFile(ntupleName, options)
62{
63 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Recreate(
64 ntupleName, path, options.GetCompression(), options.GetContainerFormat()));
65}
66
67
69 const RNTupleWriteOptions &options)
70 : RPageSinkFile(ntupleName, options)
71{
72 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Append(ntupleName, file));
73}
74
75
76ROOT::Experimental::Detail::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
77 const RNTupleWriteOptions &options, std::unique_ptr<TFile> &file)
78 : RPageSinkFile(ntupleName, options)
79{
80 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
82}
83
84
86{
87}
88
90 unsigned char *serializedHeader, std::uint32_t length)
91{
92 auto zipBuffer = std::make_unique<unsigned char[]>(length);
93 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
95 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
96}
97
98
101 const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
102{
103 std::uint64_t offsetData;
104 {
105 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
106 offsetData = fWriter->WriteBlob(sealedPage.fBuffer, sealedPage.fSize, bytesPacked);
107 }
108
110 result.fPosition = offsetData;
111 result.fBytesOnStorage = sealedPage.fSize;
112 fCounters->fNPageCommitted.Inc();
113 fCounters->fSzWritePayload.Add(sealedPage.fSize);
114 fNBytesCurrentCluster += sealedPage.fSize;
115 return result;
116}
117
118
121{
122 auto element = columnHandle.fColumn->GetElement();
123 RPageStorage::RSealedPage sealedPage;
124 {
125 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
126 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
127 }
128
129 fCounters->fSzZip.Add(page.GetNBytes());
130 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
131}
132
135 const RPageStorage::RSealedPage &sealedPage)
136{
137 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
138 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetModel().GetType());
139 const auto bytesPacked = (bitsOnStorage * sealedPage.fNElements + 7) / 8;
140
141 return WriteSealedPage(sealedPage, bytesPacked);
142}
143
144
145std::uint64_t
147{
148 auto result = fNBytesCurrentCluster;
149 fNBytesCurrentCluster = 0;
150 return result;
151}
152
155 std::uint32_t length)
156{
157 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
158 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
159 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
160
162 result.fBytesOnStorage = szPageListZip;
163 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
164 return result;
165}
166
167void ROOT::Experimental::Detail::RPageSinkFile::CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)
168{
169 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
170 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
171 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
172 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
173 fWriter->Commit();
174}
175
178{
179 if (nElements == 0)
180 throw RException(R__FAIL("invalid call: request empty page"));
181 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
182 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
183}
184
186{
187 fPageAllocator->DeletePage(page);
188}
189
190
191////////////////////////////////////////////////////////////////////////////////
192
194 const RNTupleReadOptions &options)
195 : RPageSource(ntupleName, options),
196 fPagePool(std::make_shared<RPagePool>()),
197 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
198{
199 fDecompressor = std::make_unique<RNTupleDecompressor>();
200 EnableDefaultMetrics("RPageSourceFile");
201}
202
203
204ROOT::Experimental::Detail::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
205 const RNTupleReadOptions &options)
206 : RPageSourceFile(ntupleName, options)
207{
211}
212
214{
215 fDescriptorBuilder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
216 auto buffer = std::make_unique<unsigned char[]>(anchor.fLenHeader);
217 auto zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesHeader);
218 fReader.ReadBuffer(zipBuffer.get(), anchor.fNBytesHeader, anchor.fSeekHeader);
219 fDecompressor->Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
220 Internal::RNTupleSerializer::DeserializeHeaderV1(buffer.get(), anchor.fLenHeader, fDescriptorBuilder);
221
222 fDescriptorBuilder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
223 buffer = std::make_unique<unsigned char[]>(anchor.fLenFooter);
224 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesFooter);
225 fReader.ReadBuffer(zipBuffer.get(), anchor.fNBytesFooter, anchor.fSeekFooter);
226 fDecompressor->Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
227 Internal::RNTupleSerializer::DeserializeFooterV1(buffer.get(), anchor.fLenFooter, fDescriptorBuilder);
228}
229
230std::unique_ptr<ROOT::Experimental::Detail::RPageSourceFile>
232 std::string_view path, const RNTupleReadOptions &options)
233{
234 auto pageSource = std::make_unique<RPageSourceFile>("", path, options);
235 pageSource->InitDescriptor(anchor);
236 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
237 return pageSource;
238}
239
241
242
244{
245 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
246 // Otherwise, the page source was created by OpenFromAnchor() and the header and footer are already processed.
247 if (fDescriptorBuilder.GetDescriptor().GetOnDiskHeaderSize() == 0) {
248 auto anchor = fReader.GetNTuple(fNTupleName).Unwrap();
249 InitDescriptor(anchor);
250 }
251
252 auto ntplDesc = fDescriptorBuilder.MoveDescriptor();
253
254 for (const auto &cgDesc : ntplDesc.GetClusterGroupIterable()) {
255 auto buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
256 auto zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
257 fReader.ReadBuffer(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage,
258 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
259 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
260 buffer.get());
261
262 auto clusters = RClusterGroupDescriptorBuilder::GetClusterSummaries(ntplDesc, cgDesc.GetId());
263 Internal::RNTupleSerializer::DeserializePageListV1(buffer.get(), cgDesc.GetPageListLength(), clusters);
264 for (std::size_t i = 0; i < clusters.size(); ++i) {
265 ntplDesc.AddClusterDetails(clusters[i].AddDeferredColumnRanges(ntplDesc).MoveDescriptor().Unwrap());
266 }
267 }
268
269 return ntplDesc;
270}
271
273 const RClusterIndex &clusterIndex,
274 RSealedPage &sealedPage)
275{
276 const auto clusterId = clusterIndex.GetClusterId();
277
279 {
280 auto descriptorGuard = GetSharedDescriptorGuard();
281 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
282 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
283 }
284
285 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
286 sealedPage.fSize = bytesOnStorage;
287 sealedPage.fNElements = pageInfo.fNElements;
288 if (!sealedPage.fBuffer)
289 return;
291 fReader.ReadBuffer(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage,
292 pageInfo.fLocator.GetPosition<std::uint64_t>());
293 } else {
294 memcpy(const_cast<void *>(sealedPage.fBuffer), RPage::GetPageZeroBuffer(), bytesOnStorage);
295 }
296}
297
300 const RClusterInfo &clusterInfo,
301 ClusterSize_t::ValueType idxInCluster)
302{
303 const auto columnId = columnHandle.fPhysicalId;
304 const auto clusterId = clusterInfo.fClusterId;
305 const auto pageInfo = clusterInfo.fPageInfo;
306
307 const auto element = columnHandle.fColumn->GetElement();
308 const auto elementSize = element->GetSize();
309 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
310
311 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
312 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
313
314 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
315 auto pageZero = RPage::MakePageZero(columnId, elementSize);
316 pageZero.GrowUnchecked(pageInfo.fNElements);
317 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
318 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
319 fPagePool->RegisterPage(pageZero, RPageDeleter([](const RPage &, void *) {}, nullptr));
320 return pageZero;
321 }
322
323 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
324 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[bytesOnStorage]);
325 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.GetPosition<std::uint64_t>());
326 fCounters->fNPageLoaded.Inc();
327 fCounters->fNRead.Inc();
328 fCounters->fSzReadPayload.Add(bytesOnStorage);
329 sealedPageBuffer = directReadBuffer.get();
330 } else {
331 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
332 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
333 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
334
335 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
336 if (!cachedPage.IsNull())
337 return cachedPage;
338
339 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
340 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
341 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
342 sealedPageBuffer = onDiskPage->GetAddress();
343 }
344
345 RPage newPage;
346 {
347 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
348 newPage = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element, columnId);
349 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
350 }
351
352 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
353 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
354 fPagePool->RegisterPage(
355 newPage,
356 RPageDeleter([](const RPage &page, void * /*userData*/) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
357 fCounters->fNPagePopulated.Inc();
358 return newPage;
359}
360
361
363 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
364{
365 const auto columnId = columnHandle.fPhysicalId;
366 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
367 if (!cachedPage.IsNull())
368 return cachedPage;
369
370 std::uint64_t idxInCluster;
371 RClusterInfo clusterInfo;
372 {
373 auto descriptorGuard = GetSharedDescriptorGuard();
374 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
375
376 if (clusterInfo.fClusterId == kInvalidDescriptorId)
377 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
378
379 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
380 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
381 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
382 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
383 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
384 }
385
386 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
387}
388
389
391 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
392{
393 const auto clusterId = clusterIndex.GetClusterId();
394 const auto idxInCluster = clusterIndex.GetIndex();
395 const auto columnId = columnHandle.fPhysicalId;
396 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
397 if (!cachedPage.IsNull())
398 return cachedPage;
399
400 if (clusterId == kInvalidDescriptorId)
401 throw RException(R__FAIL("entry out of bounds"));
402
403 RClusterInfo clusterInfo;
404 {
405 auto descriptorGuard = GetSharedDescriptorGuard();
406 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
407 clusterInfo.fClusterId = clusterId;
408 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
409 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
410 }
411
412 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
413}
414
416{
417 fPagePool->ReturnPage(page);
418}
419
420std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceFile::Clone() const
421{
422 auto clone = new RPageSourceFile(fNTupleName, fOptions);
423 clone->fFile = fFile->Clone();
424 clone->fReader = Internal::RMiniFileReader(clone->fFile.get());
425 return std::unique_ptr<RPageSourceFile>(clone);
426}
427
428std::unique_ptr<ROOT::Experimental::Detail::RCluster>
430 const RCluster::RKey &clusterKey,
431 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
432{
433 struct ROnDiskPageLocator {
436 std::uint64_t fOffset = 0;
437 std::uint64_t fSize = 0;
438 std::size_t fBufPos = 0;
439 };
440
441 std::vector<ROnDiskPageLocator> onDiskPages;
442 auto activeSize = 0;
443 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
444 PrepareLoadCluster(clusterKey, *pageZeroMap,
445 [&](DescriptorId_t physicalColumnId, NTupleSize_t pageNo,
447 const auto &pageLocator = pageInfo.fLocator;
448 activeSize += pageLocator.fBytesOnStorage;
449 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(),
450 pageLocator.fBytesOnStorage, 0});
451 });
452
453 // Linearize the page requests by file offset
454 std::sort(onDiskPages.begin(), onDiskPages.end(),
455 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) {return a.fOffset < b.fOffset;});
456
457 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
458 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
459 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
460 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
461 // of extra bytes.
462 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
463 // memory consumption, device block size.
464 float maxOverhead = 0.25 * float(activeSize);
465 std::vector<std::size_t> gaps;
466 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
467 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].fSize + onDiskPages[i-1].fOffset));
468 }
469 std::sort(gaps.begin(), gaps.end());
470 std::size_t gapCut = 0;
471 std::size_t currentGap = 0;
472 float szExtra = 0.0;
473 for (auto g : gaps) {
474 if (g != currentGap) {
475 gapCut = currentGap;
476 currentGap = g;
477 }
478 szExtra += g;
479 if (szExtra > maxOverhead)
480 break;
481 }
482
483 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
484 // In a second step, we'll fix-up the memory destinations for the read calls given the
485 // address of the allocated buffer. We must not touch, however, the read requests from previous
486 // calls to PrepareSingleCluster()
487 const auto currentReadRequestIdx = readRequests.size();
488
490 std::size_t szPayload = 0;
491 std::size_t szOverhead = 0;
492 for (auto &s : onDiskPages) {
493 R__ASSERT(s.fSize > 0);
494 auto readUpTo = req.fOffset + req.fSize;
495 R__ASSERT(s.fOffset >= readUpTo);
496 auto overhead = s.fOffset - readUpTo;
497 szPayload += s.fSize;
498 if (overhead <= gapCut) {
499 szOverhead += overhead;
500 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
501 req.fSize += overhead + s.fSize;
502 continue;
503 }
504
505 // close the current request and open new one
506 if (req.fSize > 0)
507 readRequests.emplace_back(req);
508
509 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
510 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
511
512 req.fOffset = s.fOffset;
513 req.fSize = s.fSize;
514 }
515 readRequests.emplace_back(req);
516 fCounters->fSzReadPayload.Add(szPayload);
517 fCounters->fSzReadOverhead.Add(szOverhead);
518
519 // Register the on disk pages in a page map
520 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
521 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
522 for (const auto &s : onDiskPages) {
523 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
524 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
525 }
526 fCounters->fNPageLoaded.Add(onDiskPages.size());
527 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
528 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
529 }
530
531 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
532 cluster->Adopt(std::move(pageMap));
533 cluster->Adopt(std::move(pageZeroMap));
534 for (auto colId : clusterKey.fPhysicalColumnSet)
535 cluster->SetColumnAvailable(colId);
536 return cluster;
537}
538
539std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
541{
542 fCounters->fNClusterLoaded.Add(clusterKeys.size());
543
544 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> clusters;
545 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
546
547 for (auto key: clusterKeys) {
548 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
549 }
550
551 auto nReqs = readRequests.size();
552 {
553 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
554 fFile->ReadV(&readRequests[0], nReqs);
555 }
556 fCounters->fNReadV.Inc();
557 fCounters->fNRead.Add(nReqs);
558
559 return clusters;
560}
561
562
564{
565 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
566 fTaskScheduler->Reset();
567
568 const auto clusterId = cluster->GetId();
569 auto descriptorGuard = GetSharedDescriptorGuard();
570 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
571
572 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
573
574 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
575 for (const auto columnId : columnsInCluster) {
576 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
577
578 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
579
580 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
581 std::uint64_t pageNo = 0;
582 std::uint64_t firstInPage = 0;
583 for (const auto &pi : pageRange.fPageInfos) {
584 ROnDiskPage::Key key(columnId, pageNo);
585 auto onDiskPage = cluster->GetOnDiskPage(key);
586 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
587
588 auto taskFunc = [this, columnId, clusterId, firstInPage, onDiskPage, element = allElements.back().get(),
589 nElements = pi.fNElements,
590 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
591 auto newPage = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element, columnId);
592 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
593
594 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
595 fPagePool->PreloadPage(
596 newPage,
597 RPageDeleter([](const RPage &page, void * /*userData*/) { RPageAllocatorHeap::DeletePage(page); },
598 nullptr));
599 };
600
601 fTaskScheduler->AddTask(taskFunc);
602
603 firstInPage += pi.fNElements;
604 pageNo++;
605 } // for all pages in column
606 } // for all columns in cluster
607
608 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
609
610 fTaskScheduler->Wait();
611}
size_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:303
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:155
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:199
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:322
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:43
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
Storage provider that write ntuple pages into a file.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
void LoadSealedPage(DescriptorId_t physicalColumnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
void InitDescriptor(const Internal::RFileNTupleAnchor &anchor)
Deserialized header and footer into a minimal descriptor held by fDescriptorBuilder.
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const Internal::RFileNTupleAnchor &anchor, std::string_view path, const RNTupleReadOptions &options)
Used from the RNTuple class to build a datasource if the anchor is already available.
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:49
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:42
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:84
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:135
std::uint32_t GetNElements() const
Definition RPage.hxx:86
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:118
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::vector< RClusterDescriptorBuilder > GetClusterSummaries(const RNTupleDescriptor &ntplDesc, DescriptorId_t clusterGroupId)
Used to prepare the cluster descriptor builders when loading the page locations for a certain cluster...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:73
A ROOT file is composed of a header, followed by consecutive data records (TKey instances) with a wel...
Definition TFile.h:53
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Definition file.py:1
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:159
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:53
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Entry point for an RNTuple in a ROOT file.
Definition RMiniFile.hxx:65
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
Definition RMiniFile.hxx:82
std::uint64_t fSeekFooter
The file offset of the footer excluding the TKey part.
Definition RMiniFile.hxx:80
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
Definition RMiniFile.hxx:76
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
Definition RMiniFile.hxx:84
std::uint64_t fSeekHeader
The file offset of the header excluding the TKey part.
Definition RMiniFile.hxx:74
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
Definition RMiniFile.hxx:78
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Generic information about the physical location of data.
ELocatorType fType
For non-disk locators, the value for the Type field.
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:71
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:77
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:73
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:75