Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
28#include <ROOT/RRawFile.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33#include <TFile.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <limits>
40#include <utility>
41
42#include <functional>
43#include <mutex>
44
46 const RNTupleWriteOptions &options)
47 : RPagePersistentSink(ntupleName, options), fPageAllocator(std::make_unique<RPageAllocatorHeap>())
48{
49 static std::once_flag once;
50 std::call_once(once, []() {
51 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. "
52 << "Do not store real data with this version of RNTuple!";
53 });
54 fCompressor = std::make_unique<RNTupleCompressor>();
55 EnableDefaultMetrics("RPageSinkFile");
56}
57
58ROOT::Experimental::Internal::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
59 const RNTupleWriteOptions &options)
60 : RPageSinkFile(ntupleName, options)
61{
62 fWriter = RNTupleFileWriter::Recreate(ntupleName, path, options.GetCompression(),
64}
65
67 const RNTupleWriteOptions &options)
68 : RPageSinkFile(ntupleName, options)
69{
70 fWriter = RNTupleFileWriter::Append(ntupleName, file);
71}
72
74
75void ROOT::Experimental::Internal::RPageSinkFile::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
76{
77 auto zipBuffer = std::make_unique<unsigned char[]>(length);
78 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
80 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
81}
82
85 std::size_t bytesPacked)
86{
87 std::uint64_t offsetData;
88 {
89 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
90 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetSize(), bytesPacked);
91 }
92
94 result.fPosition = offsetData;
95 result.fBytesOnStorage = sealedPage.GetSize();
96 fCounters->fNPageCommitted.Inc();
97 fCounters->fSzWritePayload.Add(sealedPage.GetSize());
98 fNBytesCurrentCluster += sealedPage.GetSize();
99 return result;
100}
101
104{
105 auto element = columnHandle.fColumn->GetElement();
106 RPageStorage::RSealedPage sealedPage;
107 {
108 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
109 sealedPage = SealPage(page, *element);
110 }
111
112 fCounters->fSzZip.Add(page.GetNBytes());
113 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
114}
115
118 const RPageStorage::RSealedPage &sealedPage)
119{
120 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
121 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetModel().GetType());
122 const auto bytesPacked = (bitsOnStorage * sealedPage.GetNElements() + 7) / 8;
123
124 return WriteSealedPage(sealedPage, bytesPacked);
125}
126
127std::vector<ROOT::Experimental::RNTupleLocator>
128ROOT::Experimental::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
129{
130 size_t size = 0, bytesPacked = 0;
131 for (auto &range : ranges) {
132 if (range.fFirst == range.fLast) {
133 // Skip empty ranges, they might not have a physical column ID!
134 continue;
135 }
136
137 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
138 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetModel().GetType());
139 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
140 size += sealedPageIt->GetSize();
141 bytesPacked += (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
142 }
143 }
144 if (size >= std::numeric_limits<std::int32_t>::max() || bytesPacked >= std::numeric_limits<std::int32_t>::max()) {
145 // Cannot fit it into one key, fall back to one key per page.
146 // TODO: Remove once there is support for large keys.
148 }
149
150 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
151 // Reserve a blob that is large enough to hold all pages.
152 std::uint64_t offset = fWriter->ReserveBlob(size, bytesPacked);
153
154 // Now write the individual pages and record their locators.
155 std::vector<ROOT::Experimental::RNTupleLocator> locators;
156 for (auto &range : ranges) {
157 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
158 fWriter->WriteIntoReservedBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetSize(), offset);
159 RNTupleLocator locator;
160 locator.fPosition = offset;
161 locator.fBytesOnStorage = sealedPageIt->GetSize();
162 locators.push_back(locator);
163 offset += sealedPageIt->GetSize();
164 }
165 }
166
167 fCounters->fNPageCommitted.Add(locators.size());
168 fCounters->fSzWritePayload.Add(size);
169 fNBytesCurrentCluster += size;
170
171 return locators;
172}
173
175{
176 auto result = fNBytesCurrentCluster;
177 fNBytesCurrentCluster = 0;
178 return result;
179}
180
183 std::uint32_t length)
184{
185 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
186 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
187 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
188
190 result.fBytesOnStorage = szPageListZip;
191 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
192 return result;
193}
194
196 std::uint32_t length)
197{
198 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
199 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
200 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
201 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
202 fWriter->Commit();
203}
204
207{
208 if (nElements == 0)
209 throw RException(R__FAIL("invalid call: request empty page"));
210 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
211 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
212}
213
215{
216 fPageAllocator->DeletePage(page);
217}
218
219////////////////////////////////////////////////////////////////////////////////
220
222 const RNTupleReadOptions &options)
223 : RPageSource(ntupleName, options),
224 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
225{
226 fDecompressor = std::make_unique<RNTupleDecompressor>();
227 EnableDefaultMetrics("RPageSourceFile");
228}
229
231 std::unique_ptr<ROOT::Internal::RRawFile> file,
232 const RNTupleReadOptions &options)
233 : RPageSourceFile(ntupleName, options)
234{
235 fFile = std::move(file);
238}
239
240ROOT::Experimental::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
241 const RNTupleReadOptions &options)
242 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
243{
244}
245
247{
248 // TOOD(jblomer): can the epoch check be factored out across anchors?
249 if (anchor.GetVersionEpoch() != RNTuple::kVersionEpoch) {
250 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(anchor.GetVersionEpoch())));
251 }
252 if (anchor.GetVersionEpoch() == 0) {
253 static std::once_flag once;
254 std::call_once(once, [&anchor]() {
255 R__LOG_WARNING(NTupleLog()) << "Pre-release format version: RC " << anchor.GetVersionMajor();
256 });
257 }
258
259 fDescriptorBuilder.SetOnDiskHeaderSize(anchor.GetNBytesHeader());
260 fDescriptorBuilder.AddToOnDiskFooterSize(anchor.GetNBytesFooter());
261
262 const auto bufSize =
263 anchor.GetNBytesHeader() + anchor.GetNBytesFooter() + std::max(anchor.GetLenHeader(), anchor.GetLenFooter());
264 auto buffer = std::make_unique<unsigned char[]>(bufSize);
265 auto headerBuf = buffer.get();
266 auto footerBuf = headerBuf + anchor.GetNBytesHeader();
267 auto unzipBuf = footerBuf + anchor.GetNBytesFooter();
268
269 auto readvLimits = fFile->GetReadVLimits();
270 if ((readvLimits.fMaxReqs < 2) ||
271 (std::max(anchor.GetNBytesHeader(), anchor.GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
272 (anchor.GetNBytesHeader() + anchor.GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
273 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
274 fReader.ReadBuffer(headerBuf, anchor.GetNBytesHeader(), anchor.GetSeekHeader());
275 fReader.ReadBuffer(footerBuf, anchor.GetNBytesFooter(), anchor.GetSeekFooter());
276 fCounters->fNRead.Add(2);
277 } else {
278 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
279 ROOT::Internal::RRawFile::RIOVec readRequests[2] = {
280 {headerBuf, anchor.GetSeekHeader(), anchor.GetNBytesHeader(), 0},
281 {footerBuf, anchor.GetSeekFooter(), anchor.GetNBytesFooter(), 0}};
282 fFile->ReadV(readRequests, 2);
283 fCounters->fNReadV.Inc();
284 }
285
286 fDecompressor->Unzip(headerBuf, anchor.GetNBytesHeader(), anchor.GetLenHeader(), unzipBuf);
287 RNTupleSerializer::DeserializeHeader(unzipBuf, anchor.GetLenHeader(), fDescriptorBuilder);
288
289 fDecompressor->Unzip(footerBuf, anchor.GetNBytesFooter(), anchor.GetLenFooter(), unzipBuf);
290 RNTupleSerializer::DeserializeFooter(unzipBuf, anchor.GetLenFooter(), fDescriptorBuilder);
291}
292
293std::unique_ptr<ROOT::Experimental::Internal::RPageSourceFile>
295 const RNTupleReadOptions &options)
296{
297 if (!anchor.fFile)
298 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
299
300 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
301 // For local TFiles, TDavixFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the faster
302 // reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
303 // experiment frameworks.
304 std::string className = anchor.fFile->IsA()->GetName();
305 auto url = anchor.fFile->GetEndpointUrl();
306 auto protocol = std::string(url->GetProtocol());
307 if (className == "TFile") {
308 rawFile = ROOT::Internal::RRawFile::Create(url->GetFile());
309 } else if (className == "TDavixFile" || className == "TNetXNGFile") {
310 rawFile = ROOT::Internal::RRawFile::Create(url->GetUrl());
311 } else {
312 rawFile.reset(new ROOT::Internal::RRawFileTFile(anchor.fFile));
313 }
314
315 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
316 pageSource->InitDescriptor(anchor);
317 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
318 return pageSource;
319}
320
322
324{
325 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
326 // Otherwise, the page source was created by OpenFromAnchor() and the header and footer are already processed.
327 if (fDescriptorBuilder.GetDescriptor().GetOnDiskHeaderSize() == 0) {
328 auto anchor = fReader.GetNTuple(fNTupleName).Unwrap();
329 InitDescriptor(anchor);
330 }
331
332 auto desc = fDescriptorBuilder.MoveDescriptor();
333
334 std::vector<unsigned char> buffer;
335 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
336 buffer.resize(
337 std::max<size_t>(buffer.size(), cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().fBytesOnStorage));
338 auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
339 fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage,
340 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
341 fDecompressor->Unzip(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
342 buffer.data());
343
344 RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
345 }
346
347 // For the page reads, we rely on the I/O scheduler to define the read requests
348 fFile->SetBuffering(false);
349
350 return desc;
351}
352
354 RClusterIndex clusterIndex, RSealedPage &sealedPage)
355{
356 const auto clusterId = clusterIndex.GetClusterId();
357
359 {
360 auto descriptorGuard = GetSharedDescriptorGuard();
361 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
362 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
363 }
364
365 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
366 sealedPage.SetSize(bytesOnStorage);
367 sealedPage.SetNElements(pageInfo.fNElements);
368 if (!sealedPage.GetBuffer())
369 return;
371 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), bytesOnStorage,
372 pageInfo.fLocator.GetPosition<std::uint64_t>());
373 } else {
374 memcpy(const_cast<void *>(sealedPage.GetBuffer()), RPage::GetPageZeroBuffer(), bytesOnStorage);
375 }
376}
377
380 const RClusterInfo &clusterInfo,
381 ClusterSize_t::ValueType idxInCluster)
382{
383 const auto columnId = columnHandle.fPhysicalId;
384 const auto clusterId = clusterInfo.fClusterId;
385 const auto pageInfo = clusterInfo.fPageInfo;
386
387 const auto element = columnHandle.fColumn->GetElement();
388 const auto elementSize = element->GetSize();
389 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
390
391 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
392 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
393
394 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
395 auto pageZero = RPage::MakePageZero(columnId, elementSize);
396 pageZero.GrowUnchecked(pageInfo.fNElements);
397 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
398 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
399 fPagePool->RegisterPage(pageZero, RPageDeleter([](const RPage &, void *) {}, nullptr));
400 return pageZero;
401 }
402
403 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
404 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[bytesOnStorage]);
405 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.GetPosition<std::uint64_t>());
406 fCounters->fNPageLoaded.Inc();
407 fCounters->fNRead.Inc();
408 fCounters->fSzReadPayload.Add(bytesOnStorage);
409 sealedPageBuffer = directReadBuffer.get();
410 } else {
411 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
412 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
413 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
414
415 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
416 if (!cachedPage.IsNull())
417 return cachedPage;
418
419 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
420 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
421 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
422 sealedPageBuffer = onDiskPage->GetAddress();
423 }
424
425 RPage newPage;
426 {
427 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
428 newPage = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element, columnId);
429 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
430 }
431
432 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
433 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
434 fPagePool->RegisterPage(
435 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
436 fCounters->fNPagePopulated.Inc();
437 return newPage;
438}
439
442{
443 const auto columnId = columnHandle.fPhysicalId;
444 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
445 if (!cachedPage.IsNull())
446 return cachedPage;
447
448 std::uint64_t idxInCluster;
449 RClusterInfo clusterInfo;
450 {
451 auto descriptorGuard = GetSharedDescriptorGuard();
452 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
453
454 if (clusterInfo.fClusterId == kInvalidDescriptorId)
455 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
456
457 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
458 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
459 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
460 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
461 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
462 }
463
464 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
465}
466
469{
470 const auto clusterId = clusterIndex.GetClusterId();
471 const auto idxInCluster = clusterIndex.GetIndex();
472 const auto columnId = columnHandle.fPhysicalId;
473 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
474 if (!cachedPage.IsNull())
475 return cachedPage;
476
477 if (clusterId == kInvalidDescriptorId)
478 throw RException(R__FAIL("entry out of bounds"));
479
480 RClusterInfo clusterInfo;
481 {
482 auto descriptorGuard = GetSharedDescriptorGuard();
483 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
484 clusterInfo.fClusterId = clusterId;
485 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
486 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
487 }
488
489 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
490}
491
493{
494 fPagePool->ReturnPage(page);
495}
496
497std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceFile::Clone() const
498{
499 auto clone = new RPageSourceFile(fNTupleName, fOptions);
500 clone->fFile = fFile->Clone();
501 clone->fReader = RMiniFileReader(clone->fFile.get());
502 return std::unique_ptr<RPageSourceFile>(clone);
503}
504
505std::unique_ptr<ROOT::Experimental::Internal::RCluster>
507 const RCluster::RKey &clusterKey, std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
508{
509 struct ROnDiskPageLocator {
512 std::uint64_t fOffset = 0;
513 std::uint64_t fSize = 0;
514 std::size_t fBufPos = 0;
515 };
516
517 std::vector<ROnDiskPageLocator> onDiskPages;
518 auto activeSize = 0;
519 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
520 PrepareLoadCluster(clusterKey, *pageZeroMap,
521 [&](DescriptorId_t physicalColumnId, NTupleSize_t pageNo,
523 const auto &pageLocator = pageInfo.fLocator;
524 activeSize += pageLocator.fBytesOnStorage;
525 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(),
526 pageLocator.fBytesOnStorage, 0});
527 });
528
529 // Linearize the page requests by file offset
530 std::sort(onDiskPages.begin(), onDiskPages.end(),
531 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
532
533 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
534 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
535 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
536 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
537 // of extra bytes.
538 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
539 // memory consumption, device block size.
540 float maxOverhead = 0.25 * float(activeSize);
541 std::vector<std::size_t> gaps;
542 if (onDiskPages.size())
543 gaps.reserve(onDiskPages.size() - 1);
544 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
545 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset));
546 }
547 std::sort(gaps.begin(), gaps.end());
548 std::size_t gapCut = 0;
549 std::size_t currentGap = 0;
550 float szExtra = 0.0;
551 for (auto g : gaps) {
552 if (g != currentGap) {
553 gapCut = currentGap;
554 currentGap = g;
555 }
556 szExtra += g;
557 if (szExtra > maxOverhead)
558 break;
559 }
560
561 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
562 // In a second step, we'll fix-up the memory destinations for the read calls given the
563 // address of the allocated buffer. We must not touch, however, the read requests from previous
564 // calls to PrepareSingleCluster()
565 const auto currentReadRequestIdx = readRequests.size();
566
568 std::size_t szPayload = 0;
569 std::size_t szOverhead = 0;
570 for (auto &s : onDiskPages) {
571 R__ASSERT(s.fSize > 0);
572 auto readUpTo = req.fOffset + req.fSize;
573 R__ASSERT(s.fOffset >= readUpTo);
574 auto overhead = s.fOffset - readUpTo;
575 szPayload += s.fSize;
576 if (overhead <= gapCut) {
577 szOverhead += overhead;
578 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
579 req.fSize += overhead + s.fSize;
580 continue;
581 }
582
583 // close the current request and open new one
584 if (req.fSize > 0)
585 readRequests.emplace_back(req);
586
587 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
588 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
589
590 req.fOffset = s.fOffset;
591 req.fSize = s.fSize;
592 }
593 readRequests.emplace_back(req);
594 fCounters->fSzReadPayload.Add(szPayload);
595 fCounters->fSzReadOverhead.Add(szOverhead);
596
597 // Register the on disk pages in a page map
598 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
599 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
600 for (const auto &s : onDiskPages) {
601 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
602 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
603 }
604 fCounters->fNPageLoaded.Add(onDiskPages.size());
605 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
606 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
607 }
608
609 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
610 cluster->Adopt(std::move(pageMap));
611 cluster->Adopt(std::move(pageZeroMap));
612 for (auto colId : clusterKey.fPhysicalColumnSet)
613 cluster->SetColumnAvailable(colId);
614 return cluster;
615}
616
617std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
619{
620 fCounters->fNClusterLoaded.Add(clusterKeys.size());
621
622 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
623 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
624
625 clusters.reserve(clusterKeys.size());
626 for (auto key : clusterKeys) {
627 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
628 }
629
630 auto nReqs = readRequests.size();
631 auto readvLimits = fFile->GetReadVLimits();
632
633 int iReq = 0;
634 while (nReqs > 0) {
635 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
636
637 if (readvLimits.HasSizeLimit()) {
638 std::uint64_t totalSize = 0;
639 for (std::size_t i = 0; i < nBatch; ++i) {
640 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
641 nBatch = i;
642 break;
643 }
644
645 totalSize += readRequests[iReq + i].fSize;
646 if (totalSize > readvLimits.fMaxTotalSize) {
647 nBatch = i;
648 break;
649 }
650 }
651 }
652
653 if (nBatch <= 1) {
654 nBatch = 1;
655 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
656 fFile->ReadAt(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
657 } else {
658 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
659 fFile->ReadV(&readRequests[iReq], nBatch);
660 }
661 fCounters->fNReadV.Inc();
662 fCounters->fNRead.Add(nBatch);
663
664 iReq += nBatch;
665 nReqs -= nBatch;
666 }
667
668 return clusters;
669}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
TObject * clone(const char *newname) const override
Definition RooChi2Var.h:9
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Record wall time and CPU time between construction and destruction.
Managed a set of clusters containing compressed and packed pages.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:336
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:54
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, EContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
Storage provider that write ntuple pages into a file.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
std::uint64_t CommitClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
std::unique_ptr< RNTupleFileWriter > fWriter
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
void InitDescriptor(const RNTuple &anchor)
Deserialized header and footer into a minimal descriptor held by fDescriptorBuilder.
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:48
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:134
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:83
std::uint32_t GetNElements() const
Definition RPage.hxx:84
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:117
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:18
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
std::uint64_t GetLenFooter() const
Definition RNTuple.hxx:116
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:67
std::uint64_t GetNBytesHeader() const
Definition RNTuple.hxx:111
std::uint64_t GetLenHeader() const
Definition RNTuple.hxx:112
std::uint16_t GetVersionEpoch() const
Definition RNTuple.hxx:105
std::uint64_t GetNBytesFooter() const
Definition RNTuple.hxx:115
TFile * fFile
! The file from which the ntuple was streamed, registered in the custom streamer
Definition RNTuple.hxx:99
std::uint64_t GetSeekHeader() const
Definition RNTuple.hxx:110
std::uint16_t GetVersionMajor() const
Definition RNTuple.hxx:106
std::uint64_t GetSeekFooter() const
Definition RNTuple.hxx:114
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual const TUrl * GetEndpointUrl() const
Definition TFile.h:235
TClass * IsA() const override
Definition TFile.h:344
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition TObject.cxx:223
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Generic information about the physical location of data.
ELocatorType fType
For non-disk locators, the value for the Type field.
std::variant< std::uint64_t, std::string, RNTupleLocatorObject64 > fPosition
Simple on-disk locators consisting of a 64-bit offset use variant type uint64_t; extended locators ha...
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:67
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:63
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:65