Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5
6/*************************************************************************
7 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
14#include <ROOT/RCluster.hxx>
15#include <ROOT/RClusterPool.hxx>
16#include <ROOT/RLogger.hxx>
18#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleZip.hxx>
21#include <ROOT/RPage.hxx>
23#include <ROOT/RPagePool.hxx>
25#include <ROOT/RRawFile.hxx>
27#include <ROOT/RNTupleUtil.hxx>
28
29#include <RVersion.h>
30#include <TDirectory.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <cstring>
37#include <iterator>
38#include <limits>
39#include <utility>
40
41#include <functional>
42#include <mutex>
43
55
62
69
76
78
80{
82 auto szZipHeader =
83 RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get());
84 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
85}
86
89{
90 std::uint64_t offsetData;
91 {
92 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
93 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), bytesPacked);
94 }
95
97 result.SetPosition(offsetData);
98 result.SetNBytesOnStorage(sealedPage.GetDataSize());
99 fCounters->fNPageCommitted.Inc();
100 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
101 fNBytesCurrentCluster += sealedPage.GetBufferSize();
102 return result;
103}
104
107{
108 auto element = columnHandle.fColumn->GetElement();
110 {
111 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
112 sealedPage = SealPage(page, *element);
113 }
114
115 fCounters->fSzZip.Add(page.GetNBytes());
116 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
117}
118
121{
122 const auto nBits = fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetBitsOnStorage();
123 const auto bytesPacked = (nBits * sealedPage.GetNElements() + 7) / 8;
124 return WriteSealedPage(sealedPage, bytesPacked);
125}
126
128{
129 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
130
131 std::uint64_t offset = fWriter->ReserveBlob(batch.fSize, batch.fBytesPacked);
132
133 locators.reserve(locators.size() + batch.fSealedPages.size());
134
135 for (const auto *pagePtr : batch.fSealedPages) {
136 fWriter->WriteIntoReservedBlob(pagePtr->GetBuffer(), pagePtr->GetBufferSize(), offset);
138 locator.SetPosition(offset);
139 locator.SetNBytesOnStorage(pagePtr->GetDataSize());
140 locators.push_back(locator);
141 offset += pagePtr->GetBufferSize();
142 }
143
144 fCounters->fNPageCommitted.Add(batch.fSealedPages.size());
145 fCounters->fSzWritePayload.Add(batch.fSize);
146 fNBytesCurrentCluster += batch.fSize;
147
148 batch.fSize = 0;
149 batch.fBytesPacked = 0;
150 batch.fSealedPages.clear();
151}
152
153std::vector<ROOT::RNTupleLocator>
154ROOT::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
155 const std::vector<bool> &mask)
156{
157 const std::uint64_t maxKeySize = fOptions->GetMaxKeySize();
158
160 std::vector<RNTupleLocator> locators;
161
162 std::size_t iPage = 0;
163 for (auto rangeIt = ranges.begin(); rangeIt != ranges.end(); ++rangeIt) {
164 auto &range = *rangeIt;
165 if (range.fFirst == range.fLast) {
166 // Skip empty ranges, they might not have a physical column ID!
167 continue;
168 }
169
170 const auto bitsOnStorage =
171 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetBitsOnStorage();
172
173 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt, ++iPage) {
174 if (!mask[iPage])
175 continue;
176
177 const auto bytesPacked = (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
178
179 if (batch.fSize > 0 && batch.fSize + sealedPageIt->GetBufferSize() > maxKeySize) {
180 /**
181 * Adding this page would exceed maxKeySize. Since we always want to write into a single key
182 * with vectorized writes, we commit the current set of pages before proceeding.
183 * NOTE: we do this *before* checking if sealedPageIt->GetBufferSize() > maxKeySize to guarantee that
184 * we always flush the current batch before doing an individual WriteBlob. This way we
185 * preserve the assumption that a CommitBatch always contain a sequential set of pages.
186 */
187 CommitBatchOfPages(batch, locators);
188 }
189
190 if (sealedPageIt->GetBufferSize() > maxKeySize) {
191 // This page alone is bigger than maxKeySize: save it by itself, since it will need to be
192 // split into multiple keys.
193
194 // Since this check implies the previous check on batchSize + newSize > maxSize, we should
195 // already have committed the current batch before writing this page.
196 assert(batch.fSize == 0);
197
198 std::uint64_t offset =
199 fWriter->WriteBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetBufferSize(), bytesPacked);
201 locator.SetPosition(offset);
202 locator.SetNBytesOnStorage(sealedPageIt->GetDataSize());
203 locators.push_back(locator);
204
205 fCounters->fNPageCommitted.Inc();
206 fCounters->fSzWritePayload.Add(sealedPageIt->GetBufferSize());
207 fNBytesCurrentCluster += sealedPageIt->GetBufferSize();
208
209 } else {
210 batch.fSealedPages.emplace_back(&(*sealedPageIt));
211 batch.fSize += sealedPageIt->GetBufferSize();
212 batch.fBytesPacked += bytesPacked;
213 }
214 }
215 }
216
217 if (batch.fSize > 0) {
218 CommitBatchOfPages(batch, locators);
219 }
220
221 return locators;
222}
223
225{
226 auto result = fNBytesCurrentCluster;
227 fNBytesCurrentCluster = 0;
228 return result;
229}
230
233{
235 auto szPageListZip =
236 RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get());
237
239 result.SetNBytesOnStorage(szPageListZip);
240 result.SetPosition(fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length));
241 return result;
242}
243
245{
246 fWriter->UpdateStreamerInfos(fDescriptorBuilder.BuildStreamerInfos());
248 auto szFooterZip =
249 RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get());
250 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
251 fWriter->Commit(GetWriteOptions().GetCompression());
252}
253
254////////////////////////////////////////////////////////////////////////////////
255
258 fClusterPool(
259 std::make_unique<RClusterPool>(*this, ROOT::Internal::RNTupleReadOptionsManip::GetClusterBunchSize(opts)))
260{
261 EnableDefaultMetrics("RPageSourceFile");
262}
263
265 std::unique_ptr<ROOT::Internal::RRawFile> file,
266 const ROOT::RNTupleReadOptions &options)
267 : RPageSourceFile(ntupleName, options)
268{
269 fFile = std::move(file);
272}
273
274ROOT::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
275 const ROOT::RNTupleReadOptions &options)
276 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
277{
278}
279
280std::unique_ptr<ROOT::Internal::RPageSourceFile>
282{
283 if (!anchor.fFile)
284 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
285
286 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
287 // For local TFiles, TDavixFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the faster
288 // reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
289 // experiment frameworks.
290 std::string className = anchor.fFile->IsA()->GetName();
291 auto url = anchor.fFile->GetEndpointUrl();
292 auto protocol = std::string(url->GetProtocol());
293 if (className == "TFile") {
295 } else if (className == "TDavixFile" || className == "TNetXNGFile") {
297 } else {
299 }
300
301 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
302 pageSource->fAnchor = anchor;
303 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
304 return pageSource;
305}
306
308
310{
311 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
312 // Otherwise, the page source was created by OpenFromAnchor()
313 if (!fAnchor) {
314 fAnchor = fReader.GetNTuple(fNTupleName).Unwrap();
315 }
316 fReader.SetMaxKeySize(fAnchor->GetMaxKeySize());
317
318 // TOOD(jblomer): can the epoch check be factored out across anchors?
319 if (fAnchor->GetVersionEpoch() != RNTuple::kVersionEpoch) {
320 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(fAnchor->GetVersionEpoch())));
321 }
322
323 fDescriptorBuilder.SetOnDiskHeaderSize(fAnchor->GetNBytesHeader());
324 fDescriptorBuilder.AddToOnDiskFooterSize(fAnchor->GetNBytesFooter());
325
326 // Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
327 const auto bufSize = fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() +
328 std::max(fAnchor->GetLenHeader(), fAnchor->GetLenFooter());
329 fStructureBuffer.fBuffer = MakeUninitArray<unsigned char>(bufSize);
330 fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
331 fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor->GetNBytesHeader();
332
333 auto readvLimits = fFile->GetReadVLimits();
334 // Never try to vectorize reads to a split key
335 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fAnchor->GetMaxKeySize());
336
337 if ((readvLimits.fMaxReqs < 2) ||
338 (std::max(fAnchor->GetNBytesHeader(), fAnchor->GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
339 (fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
340 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
341 fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
342 fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
343 fCounters->fNRead.Add(2);
344 } else {
345 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
346 R__ASSERT(fAnchor->GetNBytesHeader() < std::numeric_limits<std::size_t>::max());
347 R__ASSERT(fAnchor->GetNBytesFooter() < std::numeric_limits<std::size_t>::max());
348 ROOT::Internal::RRawFile::RIOVec readRequests[2] = {{fStructureBuffer.fPtrHeader, fAnchor->GetSeekHeader(),
349 static_cast<std::size_t>(fAnchor->GetNBytesHeader()), 0},
350 {fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(),
351 static_cast<std::size_t>(fAnchor->GetNBytesFooter()), 0}};
352 fFile->ReadV(readRequests, 2);
353 fCounters->fNReadV.Inc();
354 }
355}
356
358{
359 auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();
360
361 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetLenHeader(),
362 unzipBuf);
363 RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor->GetLenHeader(), fDescriptorBuilder);
364
365 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetLenFooter(),
366 unzipBuf);
367 RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);
368
369 auto desc = fDescriptorBuilder.MoveDescriptor();
370
371 std::vector<unsigned char> buffer;
372 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
373 buffer.resize(std::max<size_t>(buffer.size(),
374 cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().GetNBytesOnStorage()));
375 auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
376 fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
377 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
378 RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
379 cgDesc.GetPageListLength(), buffer.data());
380
381 RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc, mode);
382 }
383
384 // For the page reads, we rely on the I/O scheduler to define the read requests
385 fFile->SetBuffering(false);
386
387 return desc;
388}
389
392{
393 const auto clusterId = localIndex.GetClusterId();
394
396 {
397 auto descriptorGuard = GetSharedDescriptorGuard();
398 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
399 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(localIndex.GetIndexInCluster());
400 }
401
402 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
403 sealedPage.SetNElements(pageInfo.GetNElements());
404 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
405 if (!sealedPage.GetBuffer())
406 return;
407 if (pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero) {
408 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
409 pageInfo.GetLocator().GetPosition<std::uint64_t>());
410 } else {
411 assert(!pageInfo.HasChecksum());
412 memcpy(const_cast<void *>(sealedPage.GetBuffer()), ROOT::Internal::RPage::GetPageZeroBuffer(),
413 sealedPage.GetBufferSize());
414 }
415
416 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
417}
418
422{
423 const auto columnId = columnHandle.fPhysicalId;
424 const auto clusterId = clusterInfo.fClusterId;
425 const auto pageInfo = clusterInfo.fPageInfo;
426
427 const auto element = columnHandle.fColumn->GetElement();
428 const auto elementSize = element->GetSize();
429 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
430
431 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
432 auto pageZero = fPageAllocator->NewPage(elementSize, pageInfo.GetNElements());
433 pageZero.GrowUnchecked(pageInfo.GetNElements());
434 memset(pageZero.GetBuffer(), 0, pageZero.GetNBytes());
435 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
437 return fPagePool.RegisterPage(std::move(pageZero), RPagePool::RKey{columnId, elementInMemoryType});
438 }
439
441 sealedPage.SetNElements(pageInfo.GetNElements());
442 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
443 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
444 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
445
446 if (fOptions.GetClusterCache() == ROOT::RNTupleReadOptions::EClusterCache::kOff) {
448 {
449 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
450 fReader.ReadBuffer(directReadBuffer.get(), sealedPage.GetBufferSize(),
451 pageInfo.GetLocator().GetPosition<std::uint64_t>());
452 }
453 fCounters->fNPageRead.Inc();
454 fCounters->fNRead.Inc();
455 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
456 sealedPage.SetBuffer(directReadBuffer.get());
457 } else {
458 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
459 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
460 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
461
462 auto cachedPageRef =
464 if (!cachedPageRef.Get().IsNull())
465 return cachedPageRef;
466
467 ROnDiskPage::Key key(columnId, pageInfo.GetPageNumber());
468 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
469 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
470 sealedPage.SetBuffer(onDiskPage->GetAddress());
471 }
472
474 {
475 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
476 newPage = UnsealPage(sealedPage, *element).Unwrap();
477 fCounters->fSzUnzip.Add(elementSize * pageInfo.GetNElements());
478 }
479
480 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
482 fCounters->fNPageUnsealed.Inc();
483 return fPagePool.RegisterPage(std::move(newPage), RPagePool::RKey{columnId, elementInMemoryType});
484}
485
486std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSourceFile::CloneImpl() const
487{
488 auto clone = new RPageSourceFile(fNTupleName, fOptions);
489 clone->fFile = fFile->Clone();
490 clone->fReader = ROOT::Internal::RMiniFileReader(clone->fFile.get());
491 return std::unique_ptr<RPageSourceFile>(clone);
492}
493
494std::unique_ptr<ROOT::Internal::RCluster>
496 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
497{
498 struct ROnDiskPageLocator {
499 ROOT::DescriptorId_t fColumnId = 0;
500 ROOT::NTupleSize_t fPageNo = 0;
501 std::uint64_t fOffset = 0;
502 std::uint64_t fSize = 0;
503 std::size_t fBufPos = 0;
504 };
505
506 std::vector<ROnDiskPageLocator> onDiskPages;
507 auto activeSize = 0;
508 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
509 PrepareLoadCluster(clusterKey, *pageZeroMap,
512 const auto &pageLocator = pageInfo.GetLocator();
514 throw RException(R__FAIL("tried to read a page with an unknown locator"));
515 const auto nBytes =
516 pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
518 onDiskPages.push_back(
519 {physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
520 });
521
522 // Linearize the page requests by file offset
523 std::sort(onDiskPages.begin(), onDiskPages.end(),
524 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
525
526 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
527 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
528 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
529 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
530 // of extra bytes.
531 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
532 // memory consumption, device block size.
533 float maxOverhead = 0.25 * float(activeSize);
534 std::vector<std::size_t> gaps;
535 if (onDiskPages.size())
536 gaps.reserve(onDiskPages.size() - 1);
537 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
538 std::int64_t gap =
539 static_cast<int64_t>(onDiskPages[i].fOffset) - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset);
540 gaps.emplace_back(std::max(gap, std::int64_t(0)));
541 // If the pages overlap, substract the overlapped bytes from `activeSize`
542 activeSize += std::min(gap, std::int64_t(0));
543 }
544 std::sort(gaps.begin(), gaps.end());
545 std::size_t gapCut = 0;
546 std::size_t currentGap = 0;
547 float szExtra = 0.0;
548 for (auto g : gaps) {
549 if (g != currentGap) {
551 currentGap = g;
552 }
553 szExtra += g;
554 if (szExtra > maxOverhead)
555 break;
556 }
557
558 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
559 // In a second step, we'll fix-up the memory destinations for the read calls given the
560 // address of the allocated buffer. We must not touch, however, the read requests from previous
561 // calls to PrepareSingleCluster()
562 const auto currentReadRequestIdx = readRequests.size();
563
565 // To simplify the first loop iteration, pretend an empty request starting at the first page's fOffset.
566 if (!onDiskPages.empty())
567 req.fOffset = onDiskPages[0].fOffset;
568 std::size_t szPayload = 0;
569 std::size_t szOverhead = 0;
570 const std::uint64_t maxKeySize = fReader.GetMaxKeySize();
571 for (auto &s : onDiskPages) {
572 R__ASSERT(s.fSize > 0);
573 const std::int64_t readUpTo = req.fOffset + req.fSize;
574 // Note: byte ranges of pages may overlap
575 const std::uint64_t overhead = std::max(static_cast<std::int64_t>(s.fOffset) - readUpTo, std::int64_t(0));
576 const std::uint64_t extent = std::max(static_cast<std::int64_t>(s.fOffset + s.fSize) - readUpTo, std::int64_t(0));
577 if (req.fSize + extent < maxKeySize && overhead <= gapCut) {
580 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + s.fOffset - req.fOffset;
581 req.fSize += extent;
582 continue;
583 }
584
585 // close the current request and open new one
586 if (req.fSize > 0)
587 readRequests.emplace_back(req);
588
589 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
590 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
591
592 szPayload += s.fSize;
593 req.fOffset = s.fOffset;
594 req.fSize = s.fSize;
595 }
596 readRequests.emplace_back(req);
597 fCounters->fSzReadPayload.Add(szPayload);
598 fCounters->fSzReadOverhead.Add(szOverhead);
599
600 // Register the on disk pages in a page map
601 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
602 auto pageMap = std::make_unique<ROOT::Internal::ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
603 for (const auto &s : onDiskPages) {
604 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
605 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
606 }
607 fCounters->fNPageRead.Add(onDiskPages.size());
608 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
609 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
610 }
611
612 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
613 cluster->Adopt(std::move(pageMap));
614 cluster->Adopt(std::move(pageZeroMap));
615 for (auto colId : clusterKey.fPhysicalColumnSet)
616 cluster->SetColumnAvailable(colId);
617 return cluster;
618}
619
620std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
622{
623 fCounters->fNClusterLoaded.Add(clusterKeys.size());
624
625 std::vector<std::unique_ptr<ROOT::Internal::RCluster>> clusters;
626 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
627
628 clusters.reserve(clusterKeys.size());
629 for (auto key : clusterKeys) {
630 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
631 }
632
633 auto nReqs = readRequests.size();
634 auto readvLimits = fFile->GetReadVLimits();
635 // We never want to do vectorized reads of split blobs, so we limit our single size to maxKeySize.
636 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fReader.GetMaxKeySize());
637
638 int iReq = 0;
639 while (nReqs > 0) {
640 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
641
642 if (readvLimits.HasSizeLimit()) {
643 std::uint64_t totalSize = 0;
644 for (std::size_t i = 0; i < nBatch; ++i) {
645 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
646 nBatch = i;
647 break;
648 }
649
650 totalSize += readRequests[iReq + i].fSize;
651 if (totalSize > readvLimits.fMaxTotalSize) {
652 nBatch = i;
653 break;
654 }
655 }
656 }
657
658 if (nBatch <= 1) {
659 nBatch = 1;
660 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
661 fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
662 } else {
663 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
664 fFile->ReadV(&readRequests[iReq], nBatch);
665 }
666 fCounters->fNReadV.Inc();
667 fCounters->fNRead.Add(nBatch);
668
669 iReq += nBatch;
670 nReqs -= nBatch;
671 }
672
673 return clusters;
674}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char mode
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:55
Helper class to compress data blocks in the ROOT compression frame format.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
Helper class to uncompress data blocks in the ROOT compression frame format.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
Write RNTuple data blocks in a TFile or a bare file container.
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, EContainerFormat containerFormat, const ROOT::RNTupleWriteOptions &options)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TDirectory &fileOrDirectory, std::uint64_t maxKeySize)
The directory parameter can also be a TFile object (TFile inherits from TDirectory).
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, ROOT::DescriptorId_t clusterGroupId, RNTupleDescriptor &desc, EDescriptorDeserializeMode mode)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:99
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
A thread-safe cache of pages loaded from the page source.
Definition RPagePool.hxx:44
Reference to a page stored in the page pool.
Storage provider that write ntuple pages into a file.
void CommitBatchOfPages(CommitBatch &batch, std::vector< RNTupleLocator > &locators)
Subroutine of CommitSealedPageVImpl, used to perform a vector write of the (multi-)range of pages con...
RPageSinkFile(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) override
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
We pass bytesPacked so that TFile::ls() reports a reasonable value for the compression ratio of the c...
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
std::unique_ptr< ROOT::Internal::RNTupleFileWriter > fWriter
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
Storage provider that reads ntuple pages from a file.
std::unique_ptr< ROOT::Internal::RCluster > PrepareSingleCluster(const ROOT::Internal::RCluster::RKey &clusterKey, std::vector< RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster) final
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
ROOT::RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode) final
LoadStructureImpl() has been called before AttachImpl() is called
std::vector< std::unique_ptr< ROOT::Internal::RCluster > > LoadClusters(std::span< ROOT::Internal::RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPageSourceFile(std::string_view ntupleName, const ROOT::RNTupleReadOptions &options)
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
ROOT::Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Generic information about the physical location of data.
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
std::uint64_t GetMaxKeySize() const
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:67
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:78
const_iterator begin() const
const_iterator end() const
Describe directory structure in memory.
Definition TDirectory.h:45
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Summarizes cluster-level information that are necessary to load a certain page.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
Information about a single page in the context of a cluster's page range.