Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2019-11-25
4
5/*************************************************************************
6 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include <ROOT/RCluster.hxx>
14#include <ROOT/RLogger.hxx>
16#include <ROOT/RNTupleModel.hxx>
18#include <ROOT/RNTupleZip.hxx>
19#include <ROOT/RPage.hxx>
21#include <ROOT/RPagePool.hxx>
23#include <ROOT/RRawFile.hxx>
25#include <ROOT/RNTupleTypes.hxx>
26#include <ROOT/RNTupleUtils.hxx>
27
28#include <RVersion.h>
29#include <TDirectory.h>
30#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <cstring>
37#include <iterator>
38#include <limits>
39#include <utility>
40
41#include <functional>
42#include <mutex>
43
58
65
72
79
86
87ROOT::Internal::RPageSinkFile::RPageSinkFile(std::unique_ptr<ROOT::Internal::RNTupleFileWriter> writer,
88 const ROOT::RNTupleWriteOptions &options)
89 : RPageSinkFile(writer->GetNTupleName(), options)
90{
91 fWriter = std::move(writer);
92}
93
95
97{
99 auto szZipHeader =
100 RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get());
101 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
102}
103
106{
108
109 auto fnAddStreamerInfo = [this](const ROOT::RFieldBase *field) {
110 const TClass *cl = nullptr;
111 if (auto classField = dynamic_cast<const RClassField *>(field)) {
112 cl = classField->GetClass();
113 } else if (auto streamerField = dynamic_cast<const RStreamerField *>(field)) {
114 cl = streamerField->GetClass();
115 } else if (auto soaField = dynamic_cast<const ROOT::Experimental::RSoAField *>(field)) {
116 cl = soaField->GetSoAClass();
117 }
118 if (!cl)
119 return;
120
121 auto streamerInfo = cl->GetStreamerInfo(field->GetTypeVersion());
122 if (!streamerInfo) {
123 throw RException(R__FAIL(std::string("cannot get streamerInfo for ") + cl->GetName() + " [" +
124 std::to_string(field->GetTypeVersion()) + "]"));
125 }
126 fInfosOfClassFields[streamerInfo->GetNumber()] = streamerInfo;
127 };
128
129 for (const auto field : changeset.fAddedFields) {
131 for (const auto &subField : *field) {
133 }
134 }
135}
136
139{
140 std::uint64_t offsetData;
141 {
142 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
143 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), bytesPacked);
144 }
145
147 result.SetPosition(offsetData);
148 result.SetNBytesOnStorage(sealedPage.GetDataSize());
149 fCounters->fNPageCommitted.Inc();
150 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
151 fNBytesCurrentCluster += sealedPage.GetBufferSize();
152 return result;
153}
154
157{
158 auto element = columnHandle.fColumn->GetElement();
160 {
161 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
162 sealedPage = SealPage(page, *element);
163 }
164
165 fCounters->fSzZip.Add(page.GetNBytes());
166 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
167}
168
171{
172 const auto nBits = fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetBitsOnStorage();
173 const auto bytesPacked = (nBits * sealedPage.GetNElements() + 7) / 8;
174 return WriteSealedPage(sealedPage, bytesPacked);
175}
176
178{
179 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
180
181 std::uint64_t offset = fWriter->ReserveBlob(batch.fSize, batch.fBytesPacked);
182
183 locators.reserve(locators.size() + batch.fSealedPages.size());
184
185 for (const auto *pagePtr : batch.fSealedPages) {
186 fWriter->WriteIntoReservedBlob(pagePtr->GetBuffer(), pagePtr->GetBufferSize(), offset);
188 locator.SetPosition(offset);
189 locator.SetNBytesOnStorage(pagePtr->GetDataSize());
190 locators.push_back(locator);
191 offset += pagePtr->GetBufferSize();
192 }
193
194 fCounters->fNPageCommitted.Add(batch.fSealedPages.size());
195 fCounters->fSzWritePayload.Add(batch.fSize);
196 fNBytesCurrentCluster += batch.fSize;
197
198 batch.fSize = 0;
199 batch.fBytesPacked = 0;
200 batch.fSealedPages.clear();
201}
202
203std::vector<ROOT::RNTupleLocator>
204ROOT::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
205 const std::vector<bool> &mask)
206{
207 const std::uint64_t maxKeySize = fOptions->GetMaxKeySize();
208
210 std::vector<RNTupleLocator> locators;
211
212 std::size_t iPage = 0;
213 for (auto rangeIt = ranges.begin(); rangeIt != ranges.end(); ++rangeIt) {
214 auto &range = *rangeIt;
215 if (range.fFirst == range.fLast) {
216 // Skip empty ranges, they might not have a physical column ID!
217 continue;
218 }
219
220 const auto bitsOnStorage =
221 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetBitsOnStorage();
222
223 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt, ++iPage) {
224 if (!mask[iPage])
225 continue;
226
227 const auto bytesPacked = (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
228
229 if (batch.fSize > 0 && batch.fSize + sealedPageIt->GetBufferSize() > maxKeySize) {
230 /**
231 * Adding this page would exceed maxKeySize. Since we always want to write into a single key
232 * with vectorized writes, we commit the current set of pages before proceeding.
233 * NOTE: we do this *before* checking if sealedPageIt->GetBufferSize() > maxKeySize to guarantee that
234 * we always flush the current batch before doing an individual WriteBlob. This way we
235 * preserve the assumption that a CommitBatch always contain a sequential set of pages.
236 */
237 CommitBatchOfPages(batch, locators);
238 }
239
240 if (sealedPageIt->GetBufferSize() > maxKeySize) {
241 // This page alone is bigger than maxKeySize: save it by itself, since it will need to be
242 // split into multiple keys.
243
244 // Since this check implies the previous check on batchSize + newSize > maxSize, we should
245 // already have committed the current batch before writing this page.
246 assert(batch.fSize == 0);
247
248 std::uint64_t offset =
249 fWriter->WriteBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetBufferSize(), bytesPacked);
251 locator.SetPosition(offset);
252 locator.SetNBytesOnStorage(sealedPageIt->GetDataSize());
253 locators.push_back(locator);
254
255 fCounters->fNPageCommitted.Inc();
256 fCounters->fSzWritePayload.Add(sealedPageIt->GetBufferSize());
257 fNBytesCurrentCluster += sealedPageIt->GetBufferSize();
258
259 } else {
260 batch.fSealedPages.emplace_back(&(*sealedPageIt));
261 batch.fSize += sealedPageIt->GetBufferSize();
262 batch.fBytesPacked += bytesPacked;
263 }
264 }
265 }
266
267 if (batch.fSize > 0) {
268 CommitBatchOfPages(batch, locators);
269 }
270
271 return locators;
272}
273
275{
276 auto result = fNBytesCurrentCluster;
277 fNBytesCurrentCluster = 0;
278 return result;
279}
280
283{
285 auto szPageListZip =
286 RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get());
287
289 result.SetNBytesOnStorage(szPageListZip);
290 result.SetPosition(fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length));
291 return result;
292}
293
296{
297 // Add the streamer info records from streamer fields: because of runtime polymorphism we may need to add additional
298 // types not covered by the type names of the class fields
299 for (const auto &extraTypeInfo : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
301 continue;
302 // Ideally, we would avoid deserializing the streamer info records of the streamer fields that we just serialized.
303 // However, this happens only once at the end of writing and only when streamer fields are used, so the
304 // preference here is for code simplicity.
305 fInfosOfClassFields.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
306 }
307 fWriter->UpdateStreamerInfos(fInfosOfClassFields);
308
310 auto szFooterZip =
311 RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get());
312 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
313 return fWriter->Commit(GetWriteOptions().GetCompression());
314}
315
316std::unique_ptr<ROOT::Internal::RPageSink>
318{
319 auto writer = fWriter->CloneAsHidden(name);
320 auto cloned = std::unique_ptr<RPageSinkFile>(new RPageSinkFile(std::move(writer), opts));
321 return cloned;
322}
323
324////////////////////////////////////////////////////////////////////////////////
325
328{
329 EnableDefaultMetrics("RPageSourceFile");
330 fFileCounters = std::make_unique<RFileCounters>(RFileCounters{
331 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szSkip", "B",
332 "cumulative seek distance (excluding header/footer reads)"),
334 "szFile", "B", "total file size", fMetrics,
335 [this](const RNTupleMetrics &) -> std::pair<bool, double> {
336 if (fFileSize > 0)
337 return {true, static_cast<double>(fFileSize)};
338 return {false, -1.};
339 }),
341 "randomness", "",
342 "ratio of seek distance to bytes read (excluding file structure reads)", fMetrics,
343 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
344 if (const auto szSkip = metrics.GetLocalCounter("szSkip")) {
345 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
346 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
347 auto totalRead = szReadPayload->GetValueAsInt() + szReadOverhead->GetValueAsInt();
348 if (totalRead > 0) {
349 return {true, (1. * szSkip->GetValueAsInt()) / totalRead};
350 }
351 }
352 }
353 }
354 return {false, -1.};
355 }),
357 "sparseness", "",
358 "ratio of bytes read to total file size (excluding file structure reads)", fMetrics,
359 [this](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
360 if (fFileSize > 0) {
361 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
362 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
363 auto totalRead = szReadPayload->GetValueAsInt() + szReadOverhead->GetValueAsInt();
364 return {true, (1. * totalRead) / fFileSize};
365 }
366 }
367 }
368 return {false, -1.};
369 })});
370}
371
373 std::unique_ptr<ROOT::Internal::RRawFile> file,
374 const ROOT::RNTupleReadOptions &options)
375 : RPageSourceFile(ntupleName, options)
376{
377 fFile = std::move(file);
380}
381
382ROOT::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
383 const ROOT::RNTupleReadOptions &options)
384 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
385{
386}
387
388std::unique_ptr<ROOT::Internal::RPageSourceFile>
390{
391 if (!anchor.fFile)
392 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
393
394 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
395 // For local TFiles, TDavixFile, TCurlFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the
396 // faster reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
397 // experiment frameworks.
398 std::string className = anchor.fFile->IsA()->GetName();
399 auto url = anchor.fFile->GetEndpointUrl();
400 auto protocol = std::string(url->GetProtocol());
401 if (className == "TFile") {
403 } else if (className == "TDavixFile" || className == "TCurlFile" || className == "TNetXNGFile") {
405 } else {
407 }
408
409 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
410 pageSource->fAnchor = anchor;
411 // NOTE: fNTupleName gets set only upon Attach().
412 return pageSource;
413}
414
416{
417 fClusterPool.StopBackgroundThread();
418}
419
420std::unique_ptr<ROOT::Internal::RPageSource>
422 const ROOT::RNTupleReadOptions &options)
423{
424 assert(anchorLink.fLocator.GetType() == RNTupleLocator::kTypeFile);
425
426 const auto anchorPos = anchorLink.fLocator.GetPosition<std::uint64_t>();
427 auto anchor =
428 fReader.GetNTupleProperAtOffset(anchorPos, anchorLink.fLocator.GetNBytesOnStorage(), anchorLink.fLength).Unwrap();
429 auto pageSource = std::make_unique<RPageSourceFile>("", fFile->Clone(), options);
430 pageSource->fAnchor = anchor;
431 // NOTE: fNTupleName gets set only upon Attach().
432 return pageSource;
433}
434
436{
437 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
438 // Otherwise, the page source was created by OpenFromAnchor()
439 if (!fAnchor) {
440 fAnchor = fReader.GetNTuple(fNTupleName).Unwrap();
441 }
442 fReader.SetMaxKeySize(fAnchor->GetMaxKeySize());
443
444 fDescriptorBuilder.SetVersion(fAnchor->GetVersionEpoch(), fAnchor->GetVersionMajor(), fAnchor->GetVersionMinor(),
445 fAnchor->GetVersionPatch());
446 fDescriptorBuilder.SetOnDiskHeaderSize(fAnchor->GetNBytesHeader());
447 fDescriptorBuilder.AddToOnDiskFooterSize(fAnchor->GetNBytesFooter());
448
449 // Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
450 const auto bufSize = fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() +
451 std::max(fAnchor->GetLenHeader(), fAnchor->GetLenFooter());
452 fStructureBuffer.fBuffer = MakeUninitArray<unsigned char>(bufSize);
453 fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
454 fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor->GetNBytesHeader();
455
456 auto readvLimits = fFile->GetReadVLimits();
457 // Never try to vectorize reads to a split key
458 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fAnchor->GetMaxKeySize());
459
460 if ((readvLimits.fMaxReqs < 2) ||
461 (std::max(fAnchor->GetNBytesHeader(), fAnchor->GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
462 (fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
463 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
464 fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
465 fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
466 fCounters->fNRead.Add(2);
467 } else {
468 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
469 R__ASSERT(fAnchor->GetNBytesHeader() < std::numeric_limits<std::size_t>::max());
470 R__ASSERT(fAnchor->GetNBytesFooter() < std::numeric_limits<std::size_t>::max());
471 ROOT::Internal::RRawFile::RIOVec readRequests[2] = {{fStructureBuffer.fPtrHeader, fAnchor->GetSeekHeader(),
472 static_cast<std::size_t>(fAnchor->GetNBytesHeader()), 0},
473 {fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(),
474 static_cast<std::size_t>(fAnchor->GetNBytesFooter()), 0}};
475 fFile->ReadV(readRequests, 2);
476 fCounters->fNReadV.Inc();
477 }
478}
479
481{
482 auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();
483
484 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetLenHeader(),
485 unzipBuf);
486 RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor->GetLenHeader(), fDescriptorBuilder);
487
488 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetLenFooter(),
489 unzipBuf);
490 RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);
491
492 auto desc = fDescriptorBuilder.MoveDescriptor();
493
494 // fNTupleName is empty if and only if we created this source via CreateFromAnchor. If that's the case, this is the
495 // earliest we can set the name.
496 if (fNTupleName.empty())
497 fNTupleName = desc.GetName();
498
499 std::vector<unsigned char> buffer;
500 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
501 buffer.resize(std::max<size_t>(buffer.size(),
502 cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().GetNBytesOnStorage()));
503 auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
504 fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
505 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
506 RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
507 cgDesc.GetPageListLength(), buffer.data());
508
509 RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc, mode);
510 }
511
512 // For the page reads, we rely on the I/O scheduler to define the read requests
513 fFile->SetBuffering(false);
514
515 // Set file size once after buffering is turned off
516 fFileSize = fFile->GetSize();
517
518 return desc;
519}
520
523{
524 const auto clusterId = localIndex.GetClusterId();
525
527 {
528 auto descriptorGuard = GetSharedDescriptorGuard();
529 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
530 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(localIndex.GetIndexInCluster());
531 }
532
533 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
534 sealedPage.SetNElements(pageInfo.GetNElements());
535 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
536 if (!sealedPage.GetBuffer())
537 return;
538 if (pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero) {
539 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
540 pageInfo.GetLocator().GetPosition<std::uint64_t>());
541 } else {
542 assert(!pageInfo.HasChecksum());
543 memcpy(const_cast<void *>(sealedPage.GetBuffer()), ROOT::Internal::RPage::GetPageZeroBuffer(),
544 sealedPage.GetBufferSize());
545 }
546
547 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
548}
549
553{
554 const auto columnId = columnHandle.fPhysicalId;
555 const auto clusterId = clusterInfo.fClusterId;
556 const auto pageInfo = clusterInfo.fPageInfo;
557
558 const auto element = columnHandle.fColumn->GetElement();
559 const auto elementSize = element->GetSize();
560 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
561
562 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
563 auto pageZero = fPageAllocator->NewPage(elementSize, pageInfo.GetNElements());
564 pageZero.GrowUnchecked(pageInfo.GetNElements());
565 memset(pageZero.GetBuffer(), 0, pageZero.GetNBytes());
566 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
568 return fPagePool.RegisterPage(std::move(pageZero), RPagePool::RKey{columnId, elementInMemoryType});
569 }
570
572 sealedPage.SetNElements(pageInfo.GetNElements());
573 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
574 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
575 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
576
577 if (fOptions.GetClusterCache() == ROOT::RNTupleReadOptions::EClusterCache::kOff) {
579 {
580 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
581 const auto offset = pageInfo.GetLocator().GetPosition<std::uint64_t>();
582 // Track seek distance (excluding file structure reads)
583 R__ASSERT(fFileCounters);
584 if (fLastOffset != 0) {
585 const auto distance = static_cast<std::uint64_t>(std::abs(
586 static_cast<std::int64_t>(offset) - static_cast<std::int64_t>(fLastOffset)));
587 fFileCounters->fSzSkip.Add(distance);
588 }
589 fReader.ReadBuffer(directReadBuffer.get(), sealedPage.GetBufferSize(), offset);
590 fLastOffset = offset + sealedPage.GetBufferSize();
591 }
592 fCounters->fNPageRead.Inc();
593 fCounters->fNRead.Inc();
594 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
595 sealedPage.SetBuffer(directReadBuffer.get());
596 } else {
597 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
598 fCurrentCluster = fClusterPool.GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
599 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
600
601 auto cachedPageRef =
603 if (!cachedPageRef.Get().IsNull())
604 return cachedPageRef;
605
606 ROnDiskPage::Key key(columnId, pageInfo.GetPageNumber());
607 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
608 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
609 sealedPage.SetBuffer(onDiskPage->GetAddress());
610 }
611
613 {
614 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
615 newPage = UnsealPage(sealedPage, *element).Unwrap();
616 fCounters->fSzUnzip.Add(elementSize * pageInfo.GetNElements());
617 }
618
619 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
621 fCounters->fNPageUnsealed.Inc();
622 return fPagePool.RegisterPage(std::move(newPage), RPagePool::RKey{columnId, elementInMemoryType});
623}
624
625std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSourceFile::CloneImpl() const
626{
627 auto clone = new RPageSourceFile(fNTupleName, fOptions);
628 clone->fFile = fFile->Clone();
629 clone->fReader = ROOT::Internal::RMiniFileReader(clone->fFile.get());
630 return std::unique_ptr<RPageSourceFile>(clone);
631}
632
633std::unique_ptr<ROOT::Internal::RCluster>
635 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
636{
637 struct ROnDiskPageLocator {
638 ROOT::DescriptorId_t fColumnId = 0;
639 ROOT::NTupleSize_t fPageNo = 0;
640 std::uint64_t fOffset = 0;
641 std::uint64_t fSize = 0;
642 std::size_t fBufPos = 0;
643 };
644
645 std::vector<ROnDiskPageLocator> onDiskPages;
646 auto activeSize = 0;
647 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
648 PrepareLoadCluster(
652 const auto &pageLocator = pageInfo.GetLocator();
654 throw RException(R__FAIL("tried to read a page with an unknown locator"));
655 const auto nBytes = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
657 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
658 });
659
660 // Linearize the page requests by file offset
661 std::sort(onDiskPages.begin(), onDiskPages.end(),
662 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
663
664 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
665 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
666 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
667 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
668 // of extra bytes.
669 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
670 // memory consumption, device block size.
671 float maxOverhead = 0.25 * float(activeSize);
672 std::vector<std::size_t> gaps;
673 if (onDiskPages.size())
674 gaps.reserve(onDiskPages.size() - 1);
675 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
676 std::int64_t gap =
677 static_cast<int64_t>(onDiskPages[i].fOffset) - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset);
678 gaps.emplace_back(std::max(gap, std::int64_t(0)));
679 // If the pages overlap, substract the overlapped bytes from `activeSize`
680 activeSize += std::min(gap, std::int64_t(0));
681 }
682 std::sort(gaps.begin(), gaps.end());
683 std::size_t gapCut = 0;
684 std::size_t currentGap = 0;
685 float szExtra = 0.0;
686 for (auto g : gaps) {
687 if (g != currentGap) {
689 currentGap = g;
690 }
691 szExtra += g;
692 if (szExtra > maxOverhead)
693 break;
694 }
695
696 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
697 // In a second step, we'll fix-up the memory destinations for the read calls given the
698 // address of the allocated buffer. We must not touch, however, the read requests from previous
699 // calls to PrepareSingleCluster()
700 const auto currentReadRequestIdx = readRequests.size();
701
703 // To simplify the first loop iteration, pretend an empty request starting at the first page's fOffset.
704 if (!onDiskPages.empty())
705 req.fOffset = onDiskPages[0].fOffset;
706 std::size_t szPayload = 0;
707 std::size_t szOverhead = 0;
708 const std::uint64_t maxKeySize = fReader.GetMaxKeySize();
709 for (auto &s : onDiskPages) {
710 R__ASSERT(s.fSize > 0);
711 const std::int64_t readUpTo = req.fOffset + req.fSize;
712 // Note: byte ranges of pages may overlap
713 const std::uint64_t overhead = std::max(static_cast<std::int64_t>(s.fOffset) - readUpTo, std::int64_t(0));
714 const std::uint64_t extent = std::max(static_cast<std::int64_t>(s.fOffset + s.fSize) - readUpTo, std::int64_t(0));
715 if (req.fSize + extent < maxKeySize && overhead <= gapCut) {
718 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + s.fOffset - req.fOffset;
719 req.fSize += extent;
720 continue;
721 }
722
723 // close the current request and open new one
724 if (req.fSize > 0)
725 readRequests.emplace_back(req);
726
727 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
728 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
729
730 szPayload += s.fSize;
731 req.fOffset = s.fOffset;
732 req.fSize = s.fSize;
733 }
734 readRequests.emplace_back(req);
735 fCounters->fSzReadPayload.Add(szPayload);
736 fCounters->fSzReadOverhead.Add(szOverhead);
737
738 // Register the on disk pages in a page map
739 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
740 auto pageMap = std::make_unique<ROOT::Internal::ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
741 for (const auto &s : onDiskPages) {
742 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
743 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
744 }
745 fCounters->fNPageRead.Add(onDiskPages.size());
746 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
747 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
748 }
749
750 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
751 cluster->Adopt(std::move(pageMap));
752 cluster->Adopt(std::move(pageZeroMap));
753 for (auto colId : clusterKey.fPhysicalColumnSet)
754 cluster->SetColumnAvailable(colId);
755 return cluster;
756}
757
758std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
760{
761 fCounters->fNClusterLoaded.Add(clusterKeys.size());
762
763 std::vector<std::unique_ptr<ROOT::Internal::RCluster>> clusters;
764 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
765
766 clusters.reserve(clusterKeys.size());
767 for (auto key : clusterKeys) {
768 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
769 }
770
771 auto nReqs = readRequests.size();
772 auto readvLimits = fFile->GetReadVLimits();
773 // We never want to do vectorized reads of split blobs, so we limit our single size to maxKeySize.
774 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fReader.GetMaxKeySize());
775
776 int iReq = 0;
777 while (nReqs > 0) {
778 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
779
780 if (readvLimits.HasSizeLimit()) {
781 std::uint64_t totalSize = 0;
782 for (std::size_t i = 0; i < nBatch; ++i) {
783 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
784 nBatch = i;
785 break;
786 }
787
788 totalSize += readRequests[iReq + i].fSize;
789 if (totalSize > readvLimits.fMaxTotalSize) {
790 nBatch = i;
791 break;
792 }
793 }
794 }
795
796 // Track seek distance for each read request (excluding file structure reads)
797 R__ASSERT(fFileCounters);
798 for (std::size_t i = 0; i < nBatch; ++i) {
799 const auto offset = readRequests[iReq + i].fOffset;
800 if (fLastOffset != 0) {
801 const auto distance = static_cast<std::uint64_t>(std::abs(
802 static_cast<std::int64_t>(offset) - static_cast<std::int64_t>(fLastOffset)));
803 fFileCounters->fSzSkip.Add(distance);
804 }
805 fLastOffset = offset + readRequests[iReq + i].fSize;
806 }
807
808 if (nBatch <= 1) {
809 nBatch = 1;
810 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
811 fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
812 } else {
813 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
814 fFile->ReadV(&readRequests[iReq], nBatch);
815 }
816 fCounters->fNReadV.Inc();
817 fCounters->fNRead.Add(nBatch);
818
819 iReq += nBatch;
820 nReqs -= nBatch;
821 }
822
823 return clusters;
824}
825
827{
828 fReader.LoadStreamerInfo();
829}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:148
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
An interface to read from, or write to, a ROOT file, as well as performing other common operations.
Definition RFile.hxx:252
The SoA field provides I/O for an in-memory SoA layout linked to an on-disk collection of the underly...
Definition RFieldSoA.hxx:54
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:60
Helper class to compress data blocks in the ROOT compression frame format.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
Helper class to uncompress data blocks in the ROOT compression frame format.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
Write RNTuple data blocks in a TFile or a bare file container.
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TDirectory &fileOrDirectory, std::uint64_t maxKeySize, bool isHidden)
The directory parameter can also be a TFile object (TFile inherits from TDirectory).
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, EContainerFormat containerFormat, const ROOT::RNTupleWriteOptions &options)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, ROOT::DescriptorId_t clusterGroupId, RNTupleDescriptor &desc, EDescriptorDeserializeMode mode)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:98
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:40
Base class for a sink with a physical storage backend.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) override
Incorporate incremental changes to the model into the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
A thread-safe cache of pages loaded from the page source.
Definition RPagePool.hxx:46
Reference to a page stored in the page pool.
Storage provider that write ntuple pages into a file.
void CommitBatchOfPages(CommitBatch &batch, std::vector< RNTupleLocator > &locators)
Subroutine of CommitSealedPageVImpl, used to perform a vector write of the (multi-)range of pages con...
RPageSinkFile(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
std::unique_ptr< RPageSink > CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const override
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) override
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
We pass bytesPacked so that TFile::ls() reports a reasonable value for the compression ratio of the c...
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLink CommitDatasetImpl() final
std::unique_ptr< ROOT::Internal::RNTupleFileWriter > fWriter
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
Storage provider that reads ntuple pages from a file.
std::int64_t fFileSize
Total file size, set once in AttachImpl()
std::unique_ptr< ROOT::Internal::RCluster > PrepareSingleCluster(const ROOT::Internal::RCluster::RKey &clusterKey, std::vector< RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > OpenWithDifferentAnchor(const ROOT::Internal::RNTupleLink &anchorLink, const ROOT::RNTupleReadOptions &options={}) final
Creates a new PageSource using the same underlying file as this but referring to a different RNTuple,...
RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster) final
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
ROOT::RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode) final
LoadStructureImpl() has been called before AttachImpl() is called
std::vector< std::unique_ptr< ROOT::Internal::RCluster > > LoadClusters(std::span< ROOT::Internal::RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RFileCounters > fFileCounters
RPageSourceFile(std::string_view ntupleName, const ROOT::RNTupleReadOptions &options)
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void LoadStreamerInfo() final
Forces the loading of ROOT StreamerInfo from the underlying file.
std::unique_ptr< RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
ROOT::Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
Stores information about the cluster in which this page resides.
Definition RPage.hxx:52
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:22
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
The field for a class with dictionary.
Definition RField.hxx:135
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Generic information about the physical location of data.
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
std::uint64_t GetMaxKeySize() const
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:67
const_iterator begin() const
const_iterator end() const
The field for a class using ROOT standard streaming.
Definition RField.hxx:235
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:84
TVirtualStreamerInfo * GetStreamerInfo(Int_t version=0, Bool_t isTransient=kFALSE) const
returns a pointer to the TVirtualStreamerInfo object for version If the object does not exist,...
Definition TClass.cxx:4657
Describe directory structure in memory.
Definition TDirectory.h:45
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:151
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:50
File-specific I/O performance counters.
Summarizes cluster-level information that are necessary to load a certain page.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
Information about a single page in the context of a cluster's page range.