Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
28#include <ROOT/RRawFile.hxx>
29
30#include <RVersion.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <iostream>
37#include <utility>
38
39#include <atomic>
40#include <condition_variable>
41#include <functional>
42#include <mutex>
43#include <thread>
44#include <queue>
45
47 const RNTupleWriteOptions &options)
48 : RPageSink(ntupleName, options)
49 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
50{
51 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. " <<
52 "Do not store real data with this version of RNTuple!";
53 fCompressor = std::make_unique<RNTupleCompressor>();
54 EnableDefaultMetrics("RPageSinkFile");
55}
56
57
58ROOT::Experimental::Detail::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
59 const RNTupleWriteOptions &options)
60 : RPageSinkFile(ntupleName, options)
61{
62 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Recreate(
63 ntupleName, path, options.GetCompression(), options.GetContainerFormat()));
64}
65
66
68 const RNTupleWriteOptions &options)
69 : RPageSinkFile(ntupleName, options)
70{
71 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Append(ntupleName, file));
72}
73
74
75ROOT::Experimental::Detail::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
76 const RNTupleWriteOptions &options, std::unique_ptr<TFile> &file)
77 : RPageSinkFile(ntupleName, options)
78{
79 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
81}
82
83
85{
86}
87
88
90{
91 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
92 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(nullptr, descriptor);
93 auto buffer = std::make_unique<unsigned char []>(fSerializationContext.GetHeaderSize());
94 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(buffer.get(), descriptor);
95
96 auto zipBuffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
97 auto szZipHeader =
98 fCompressor->Zip(buffer.get(), fSerializationContext.GetHeaderSize(), GetWriteOptions().GetCompression(),
99 [&zipBuffer](const void *b, size_t n, size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
100 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, fSerializationContext.GetHeaderSize());
101}
102
103
106 const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
107{
108 std::uint64_t offsetData;
109 {
110 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
111 offsetData = fWriter->WriteBlob(sealedPage.fBuffer, sealedPage.fSize, bytesPacked);
112 }
113 fClusterMinOffset = std::min(offsetData, fClusterMinOffset);
114 fClusterMaxOffset = std::max(offsetData + sealedPage.fSize, fClusterMaxOffset);
115
116 RNTupleLocator result;
117 result.fPosition = offsetData;
118 result.fBytesOnStorage = sealedPage.fSize;
119 fCounters->fNPageCommitted.Inc();
120 fCounters->fSzWritePayload.Add(sealedPage.fSize);
121 fNBytesCurrentCluster += sealedPage.fSize;
122 return result;
123}
124
125
128{
129 auto element = columnHandle.fColumn->GetElement();
130 RPageStorage::RSealedPage sealedPage;
131 {
132 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
133 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
134 }
135
136 fCounters->fSzZip.Add(page.GetNBytes());
137 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
138}
139
140
143 DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage)
144{
145 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
146 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(columnId).GetModel().GetType());
147 const auto bytesPacked = (bitsOnStorage * sealedPage.fNElements + 7) / 8;
148
149 return WriteSealedPage(sealedPage, bytesPacked);
150}
151
152
153std::uint64_t
155{
156 auto result = fNBytesCurrentCluster;
157 fNBytesCurrentCluster = 0;
158 return result;
159}
160
161
163{
164 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
165
166 std::vector<DescriptorId_t> physClusterIDs;
167 for (const auto &c : descriptor.GetClusterIterable()) {
168 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(c.GetId()));
169 }
170
172 nullptr, descriptor, physClusterIDs, fSerializationContext);
173 auto bufPageList = std::make_unique<unsigned char []>(szPageList);
175 bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
176
177 auto bufPageListZip = std::make_unique<unsigned char []>(szPageList);
178 auto szPageListZip = fCompressor->Zip(bufPageList.get(), szPageList, GetWriteOptions().GetCompression(),
179 [&bufPageListZip](const void *b, size_t n, size_t o){ memcpy(bufPageListZip.get() + o, b, n); } );
180 auto offPageList = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, szPageList);
181
183 pageListEnvelope.fUnzippedSize = szPageList;
184 pageListEnvelope.fLocator.fPosition = offPageList;
185 pageListEnvelope.fLocator.fBytesOnStorage = szPageListZip;
186 fSerializationContext.AddClusterGroup(physClusterIDs.size(), pageListEnvelope);
187
188 auto szFooter = Internal::RNTupleSerializer::SerializeFooterV1(nullptr, descriptor, fSerializationContext);
189 auto bufFooter = std::make_unique<unsigned char []>(szFooter);
190 Internal::RNTupleSerializer::SerializeFooterV1(bufFooter.get(), descriptor, fSerializationContext);
191
192 auto bufFooterZip = std::make_unique<unsigned char []>(szFooter);
193 auto szFooterZip = fCompressor->Zip(bufFooter.get(), szFooter, GetWriteOptions().GetCompression(),
194 [&bufFooterZip](const void *b, size_t n, size_t o){ memcpy(bufFooterZip.get() + o, b, n); } );
195 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, szFooter);
196 fWriter->Commit();
197}
198
199
202{
203 if (nElements == 0)
204 throw RException(R__FAIL("invalid call: request empty page"));
205 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
206 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
207}
208
210{
211 fPageAllocator->DeletePage(page);
212}
213
214
215////////////////////////////////////////////////////////////////////////////////
216
217
219 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
220{
221 RPage newPage(columnId, mem, elementSize, nElements);
222 newPage.GrowUnchecked(nElements);
223 return newPage;
224}
225
227{
228 if (page.IsNull())
229 return;
230 delete[] reinterpret_cast<unsigned char *>(page.GetBuffer());
231}
232
233
234////////////////////////////////////////////////////////////////////////////////
235
236
238 const RNTupleReadOptions &options)
239 : RPageSource(ntupleName, options)
240 , fPageAllocator(std::make_unique<RPageAllocatorFile>())
241 , fPagePool(std::make_shared<RPagePool>())
242 , fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
243{
244 fDecompressor = std::make_unique<RNTupleDecompressor>();
245 EnableDefaultMetrics("RPageSourceFile");
246}
247
248
249ROOT::Experimental::Detail::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
250 const RNTupleReadOptions &options)
251 : RPageSourceFile(ntupleName, options)
252{
256}
257
258
260
261
263{
264 RNTupleDescriptorBuilder descBuilder;
265 auto ntpl = fReader.GetNTuple(fNTupleName).Unwrap();
266
267 descBuilder.SetOnDiskHeaderSize(ntpl.fNBytesHeader);
268 auto buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
269 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
270 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fSeekHeader);
271 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
272 Internal::RNTupleSerializer::DeserializeHeaderV1(buffer.get(), ntpl.fLenHeader, descBuilder);
273
274 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
275 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
276 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fSeekFooter);
277 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
278 Internal::RNTupleSerializer::DeserializeFooterV1(buffer.get(), ntpl.fLenFooter, descBuilder);
279
280 auto cg = descBuilder.GetClusterGroup(0);
281 descBuilder.SetOnDiskFooterSize(ntpl.fNBytesFooter + cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage);
282 buffer = std::make_unique<unsigned char[]>(cg.fPageListEnvelopeLink.fUnzippedSize);
283 zipBuffer = std::make_unique<unsigned char[]>(cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage);
284 fReader.ReadBuffer(zipBuffer.get(),
285 cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage,
286 cg.fPageListEnvelopeLink.fLocator.fPosition);
287 fDecompressor->Unzip(zipBuffer.get(),
288 cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage,
289 cg.fPageListEnvelopeLink.fUnzippedSize,
290 buffer.get());
291
292 std::vector<RClusterDescriptorBuilder> clusters;
293 Internal::RNTupleSerializer::DeserializePageListV1(buffer.get(), cg.fPageListEnvelopeLink.fUnzippedSize, clusters);
294 for (std::size_t i = 0; i < clusters.size(); ++i) {
295 descBuilder.AddCluster(i, std::move(clusters[i]));
296 }
297
298 return descBuilder.MoveDescriptor();
299}
300
301
303 DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage)
304{
305 const auto clusterId = clusterIndex.GetClusterId();
306 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
307
308 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.GetIndex());
309
310 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
311 sealedPage.fSize = bytesOnStorage;
312 sealedPage.fNElements = pageInfo.fNElements;
313 if (sealedPage.fBuffer)
314 fReader.ReadBuffer(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage, pageInfo.fLocator.fPosition);
315}
316
318 ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
319{
320 const auto columnId = columnHandle.fId;
321 const auto clusterId = clusterDescriptor.GetId();
322
323 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
324
325 const auto element = columnHandle.fColumn->GetElement();
326 const auto elementSize = element->GetSize();
327 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
328
329 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
330 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
331
332 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
333 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
334 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.fPosition);
335 fCounters->fNPageLoaded.Inc();
336 fCounters->fNRead.Inc();
337 fCounters->fSzReadPayload.Add(bytesOnStorage);
338 sealedPageBuffer = directReadBuffer.get();
339 } else {
340 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
341 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
342 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
343
344 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
345 if (!cachedPage.IsNull())
346 return cachedPage;
347
348 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
349 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
350 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
351 sealedPageBuffer = onDiskPage->GetAddress();
352 }
353
354 std::unique_ptr<unsigned char []> pageBuffer;
355 {
356 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
357 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
358 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
359 }
360
361 const auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
362 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
363 newPage.SetWindow(indexOffset + pageInfo.fFirstInPage, RPage::RClusterInfo(clusterId, indexOffset));
364 fPagePool->RegisterPage(newPage,
365 RPageDeleter([](const RPage &page, void * /*userData*/)
366 {
368 }, nullptr));
369 fCounters->fNPagePopulated.Inc();
370 return newPage;
371}
372
373
375 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
376{
377 const auto columnId = columnHandle.fId;
378 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
379 if (!cachedPage.IsNull())
380 return cachedPage;
381
382 const auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
383 R__ASSERT(clusterId != kInvalidDescriptorId);
384 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
385 const auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
386 R__ASSERT(selfOffset <= globalIndex);
387 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
388}
389
390
392 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
393{
394 const auto clusterId = clusterIndex.GetClusterId();
395 const auto idxInCluster = clusterIndex.GetIndex();
396 const auto columnId = columnHandle.fId;
397 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
398 if (!cachedPage.IsNull())
399 return cachedPage;
400
401 R__ASSERT(clusterId != kInvalidDescriptorId);
402 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
403 return PopulatePageFromCluster(columnHandle, clusterDescriptor, idxInCluster);
404}
405
407{
408 fPagePool->ReturnPage(page);
409}
410
411std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceFile::Clone() const
412{
413 auto clone = new RPageSourceFile(fNTupleName, fOptions);
414 clone->fFile = fFile->Clone();
415 clone->fReader = Internal::RMiniFileReader(clone->fFile.get());
416 return std::unique_ptr<RPageSourceFile>(clone);
417}
418
419std::unique_ptr<ROOT::Experimental::Detail::RCluster>
421 const RCluster::RKey &clusterKey,
422 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
423{
424 struct ROnDiskPageLocator {
427 std::uint64_t fOffset = 0;
428 std::uint64_t fSize = 0;
429 std::size_t fBufPos = 0;
430 };
431
432 const auto &clusterDesc = GetDescriptor().GetClusterDescriptor(clusterKey.fClusterId);
433
434 // Collect the page necessary page meta-data and sum up the total size of the compressed and packed pages
435 std::vector<ROnDiskPageLocator> onDiskPages;
436 auto activeSize = 0;
437 for (auto columnId : clusterKey.fColumnSet) {
438 const auto &pageRange = clusterDesc.GetPageRange(columnId);
439 NTupleSize_t pageNo = 0;
440 for (const auto &pageInfo : pageRange.fPageInfos) {
441 const auto &pageLocator = pageInfo.fLocator;
442 activeSize += pageLocator.fBytesOnStorage;
443 onDiskPages.push_back(
444 {columnId, pageNo, std::uint64_t(pageLocator.fPosition), pageLocator.fBytesOnStorage, 0});
445 ++pageNo;
446 }
447 }
448
449 // Linearize the page requests by file offset
450 std::sort(onDiskPages.begin(), onDiskPages.end(),
451 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) {return a.fOffset < b.fOffset;});
452
453 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
454 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
455 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
456 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
457 // of extra bytes.
458 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
459 // memory consumption, device block size.
460 float maxOverhead = 0.25 * float(activeSize);
461 std::vector<std::size_t> gaps;
462 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
463 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].fSize + onDiskPages[i-1].fOffset));
464 }
465 std::sort(gaps.begin(), gaps.end());
466 std::size_t gapCut = 0;
467 std::size_t currentGap = 0;
468 float szExtra = 0.0;
469 for (auto g : gaps) {
470 if (g != currentGap) {
471 gapCut = currentGap;
472 currentGap = g;
473 }
474 szExtra += g;
475 if (szExtra > maxOverhead)
476 break;
477 }
478
479 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
480 // In a second step, we'll fix-up the memory destinations for the read calls given the
481 // address of the allocated buffer. We must not touch, however, the read requests from previous
482 // calls to PrepareSingleCluster()
483 const auto currentReadRequestIdx = readRequests.size();
484
486 std::size_t szPayload = 0;
487 std::size_t szOverhead = 0;
488 for (auto &s : onDiskPages) {
489 R__ASSERT(s.fSize > 0);
490 auto readUpTo = req.fOffset + req.fSize;
491 R__ASSERT(s.fOffset >= readUpTo);
492 auto overhead = s.fOffset - readUpTo;
493 szPayload += s.fSize;
494 if (overhead <= gapCut) {
495 szOverhead += overhead;
496 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
497 req.fSize += overhead + s.fSize;
498 continue;
499 }
500
501 // close the current request and open new one
502 if (req.fSize > 0)
503 readRequests.emplace_back(req);
504
505 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
506 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
507
508 req.fOffset = s.fOffset;
509 req.fSize = s.fSize;
510 }
511 readRequests.emplace_back(req);
512 fCounters->fSzReadPayload.Add(szPayload);
513 fCounters->fSzReadOverhead.Add(szOverhead);
514
515 // Register the on disk pages in a page map
516 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
517 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
518 for (const auto &s : onDiskPages) {
519 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
520 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
521 }
522 fCounters->fNPageLoaded.Add(onDiskPages.size());
523 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
524 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
525 }
526
527 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
528 cluster->Adopt(std::move(pageMap));
529 for (auto colId : clusterKey.fColumnSet)
530 cluster->SetColumnAvailable(colId);
531 return cluster;
532}
533
534std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
536{
537 fCounters->fNClusterLoaded.Add(clusterKeys.size());
538
539 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> clusters;
540 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
541
542 for (auto key: clusterKeys) {
543 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
544 }
545
546 auto nReqs = readRequests.size();
547 {
548 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
549 fFile->ReadV(&readRequests[0], nReqs);
550 }
551 fCounters->fNReadV.Inc();
552 fCounters->fNRead.Add(nReqs);
553
554 return clusters;
555}
556
557
559{
560 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
561 fTaskScheduler->Reset();
562
563 const auto clusterId = cluster->GetId();
564 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
565
566 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
567
568 const auto &columnsInCluster = cluster->GetAvailColumns();
569 for (const auto columnId : columnsInCluster) {
570 const auto &columnDesc = fDescriptor.GetColumnDescriptor(columnId);
571
572 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
573
574 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
575 std::uint64_t pageNo = 0;
576 std::uint64_t firstInPage = 0;
577 for (const auto &pi : pageRange.fPageInfos) {
578 ROnDiskPage::Key key(columnId, pageNo);
579 auto onDiskPage = cluster->GetOnDiskPage(key);
580 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
581
582 auto taskFunc =
583 [this, columnId, clusterId, firstInPage, onDiskPage,
584 element = allElements.back().get(),
585 nElements = pi.fNElements,
586 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
587 ] () {
588 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
589 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
590
591 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
592 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
593 fPagePool->PreloadPage(newPage,
594 RPageDeleter([](const RPage &page, void * /*userData*/)
595 {
597 }, nullptr));
598 };
599
600 fTaskScheduler->AddTask(taskFunc);
601
602 firstInPage += pi.fNElements;
603 pageNo++;
604 } // for all pages in column
605 } // for all columns in cluster
606
607 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
608
609 fTaskScheduler->Wait();
610}
size_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:291
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
#define R__ASSERT(e)
Definition TError.h:118
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:154
const ColumnSet_t & GetAvailColumns() const
Definition RCluster.hxx:198
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
RColumnElementBase * GetElement() const
Definition RColumn.hxx:308
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:43
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
Storage provider that write ntuple pages into a file.
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
void CreateImpl(const RNTupleModel &model) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
ClusterSize_t::ValueType GetNElements() const
Definition RPage.hxx:83
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition RPage.hxx:109
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:114
A helper class for piece-wise construction of an RNTupleDescriptor.
Internal::RNTupleSerializer::RClusterGroup GetClusterGroup(std::uint32_t id) const
void AddCluster(DescriptorId_t clusterId, RNTupleVersion version, NTupleSize_t firstEntryIndex, ClusterSize_t nEntries)
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:73
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
const Int_t n
Definition legend1.C:16
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
Definition file.py:1
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:158
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:53
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RPageInfoExtended Find(RClusterSize::ValueType idxInCluster) const
Find the page in the RPageRange that contains the given element. The element must exist.
Generic information about the physical location of data.
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:71
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:77
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:73
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:75