39#include <condition_variable>
48 , fMetrics(
"RPageSinkRoot")
52 "Do not store real data with this version of RNTuple!";
62 , fMetrics(
"RPageSinkRoot")
66 "Do not store real data with this version of RNTuple!";
75 , fMetrics(
"RPageSinkRoot")
79 "Do not store real data with this version of RNTuple!";
80 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
92 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
93 auto szHeader = descriptor.GetHeaderSize();
94 auto buffer = std::make_unique<unsigned char[]>(szHeader);
95 descriptor.SerializeHeader(buffer.get());
97 auto zipBuffer = std::make_unique<unsigned char[]>(szHeader);
98 auto szZipHeader = fCompressor(buffer.get(), szHeader, fOptions.GetCompression(),
99 [&zipBuffer](
const void *
b,
size_t n,
size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
100 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, szHeader);
107 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
108 bool isAdoptedBuffer =
true;
109 auto packedBytes = page.
GetSize();
111 const auto isMappable = element->
IsMappable();
114 packedBytes = (page.
GetNElements() * element->GetBitsOnStorage() + 7) / 8;
115 buffer =
new unsigned char[packedBytes];
116 isAdoptedBuffer =
false;
119 auto zippedBytes = packedBytes;
121 if (fOptions.GetCompression() != 0) {
122 zippedBytes = fCompressor(buffer, packedBytes, fOptions.GetCompression());
123 if (!isAdoptedBuffer)
125 buffer =
const_cast<unsigned char *
>(
reinterpret_cast<const unsigned char *
>(fCompressor.GetZipBuffer()));
126 isAdoptedBuffer =
true;
129 auto offsetData = fWriter->WriteBlob(buffer, zippedBytes, packedBytes);
130 fClusterMinOffset = std::min(offsetData, fClusterMinOffset);
131 fClusterMaxOffset = std::max(offsetData + zippedBytes, fClusterMaxOffset);
133 if (!isAdoptedBuffer)
149 fClusterMinOffset = std::uint64_t(-1);
150 fClusterMaxOffset = 0;
157 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
158 auto szFooter = descriptor.GetFooterSize();
159 auto buffer = std::make_unique<unsigned char []>(szFooter);
160 descriptor.SerializeFooter(buffer.get());
162 auto zipBuffer = std::make_unique<unsigned char []>(szFooter);
163 auto szZipFooter = fCompressor(buffer.get(), szFooter, fOptions.GetCompression(),
164 [&zipBuffer](
const void *
b,
size_t n,
size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
165 fWriter->WriteNTupleFooter(zipBuffer.get(), szZipFooter, szFooter);
174 nElements = kDefaultElementsPerPage;
176 return fPageAllocator->NewPage(columnHandle.
fId, elementSize, nElements);
181 fPageAllocator->DeletePage(page);
189 ColumnId_t columnId,
void *mem, std::size_t elementSize, std::size_t nElements)
191 RPage newPage(columnId, mem, elementSize * nElements, elementSize);
200 delete[]
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
210 , fMetrics(
"RPageSourceFile")
212 , fPagePool(std::make_shared<
RPagePool>())
222 "number of partial clusters preloaded from storage"),
229 "CPU time spent decompressing"),
232 if (
const auto szReadPayload = metrics.GetCounter(
"RPageSourceFile.szReadPayload")) {
233 if (
const auto szReadOverhead = metrics.GetCounter(
"RPageSourceFile.szReadOverhead")) {
234 if (
const auto timeWallRead = metrics.GetCounter(
"RPageSourceFile.timeWallRead")) {
235 if (
auto walltime = timeWallRead->GetValueAsInt()) {
236 double payload = szReadPayload->GetValueAsInt();
237 double overhead = szReadOverhead->GetValueAsInt();
239 return {
true, (1000. * (payload + overhead) / walltime)};
249 if (
const auto szUnzip = metrics.
GetCounter(
"RPageSourceFile.szUnzip")) {
250 if (
const auto timeWallRead = metrics.
GetCounter(
"RPageSourceFile.timeWallRead")) {
251 if (
auto walltime = timeWallRead->GetValueAsInt()) {
252 double unzip = szUnzip->GetValueAsInt();
254 return {
true, 1000. * unzip / walltime};
263 if (
const auto szUnzip = metrics.
GetCounter(
"RPageSourceFile.szUnzip")) {
264 if (
const auto timeWallUnzip = metrics.
GetCounter(
"RPageSourceFile.timeWallUnzip")) {
265 if (
auto walltime = timeWallUnzip->GetValueAsInt()) {
266 double unzip = szUnzip->GetValueAsInt();
268 return {
true, 1000. * unzip / walltime};
277 if (
const auto szReadPayload = metrics.
GetCounter(
"RPageSourceFile.szReadPayload")) {
278 if (
const auto szReadOverhead = metrics.
GetCounter(
"RPageSourceFile.szReadOverhead")) {
279 if (
auto payload = szReadPayload->GetValueAsInt()) {
281 return {
true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
290 if (
const auto szReadPayload = metrics.
GetCounter(
"RPageSourceFile.szReadPayload")) {
291 if (
const auto szUnzip = metrics.
GetCounter(
"RPageSourceFile.szUnzip")) {
292 if (
auto unzip = szUnzip->GetValueAsInt()) {
293 return {
true, (1. * szReadPayload->GetValueAsInt()) / unzip};
320 auto ntpl = fReader.GetNTuple(fNTupleName).Unwrap();
322 auto buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
323 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
324 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fSeekHeader);
325 fDecompressor(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
328 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
329 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
330 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fSeekFooter);
331 fDecompressor(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
341 const auto columnId = columnHandle.
fId;
342 const auto clusterId = clusterDescriptor.
GetId();
343 const auto &pageRange = clusterDescriptor.
GetPageRange(columnId);
347 decltype(idxInCluster) firstInPage = 0;
349 for (
const auto &pi : pageRange.fPageInfos) {
350 if (firstInPage + pi.fNElements > idxInCluster) {
361 const auto elementSize = element->
GetSize();
364 const auto bytesPacked = (element->GetBitsOnStorage() * pageInfo.
fNElements + 7) / 8;
365 const auto pageSize = elementSize * pageInfo.
fNElements;
367 auto pageBuffer =
new unsigned char[bytesPacked];
370 fCounters->fNPageLoaded.Inc();
373 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
374 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
375 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
377 auto cachedPage = fPagePool->GetPage(columnId,
RClusterIndex(clusterId, idxInCluster));
378 if (!cachedPage.IsNull())
383 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
385 R__ASSERT(bytesOnStorage == onDiskPage->GetSize());
386 memcpy(pageBuffer, onDiskPage->GetAddress(), onDiskPage->GetSize());
389 if (bytesOnStorage != bytesPacked) {
391 fDecompressor(pageBuffer, bytesOnStorage, bytesPacked);
392 fCounters->fSzUnzip.Add(bytesPacked);
395 if (!element->IsMappable()) {
396 auto unpackedBuffer =
new unsigned char[pageSize];
397 element->Unpack(unpackedBuffer, pageBuffer, pageInfo.
fNElements);
399 pageBuffer = unpackedBuffer;
403 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, elementSize, pageInfo.
fNElements);
405 fPagePool->RegisterPage(newPage,
410 fCounters->fNPagePopulated.Inc();
418 const auto columnId = columnHandle.
fId;
419 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
420 if (!cachedPage.IsNull())
423 const auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
425 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
426 const auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
428 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
436 const auto idxInCluster = clusterIndex.
GetIndex();
437 const auto columnId = columnHandle.
fId;
438 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
439 if (!cachedPage.IsNull())
443 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
444 return PopulatePageFromCluster(columnHandle, clusterDescriptor, idxInCluster);
449 fPagePool->ReturnPage(page);
455 clone->fFile = fFile->Clone();
457 return std::unique_ptr<RPageSourceFile>(clone);
460std::unique_ptr<ROOT::Experimental::Detail::RCluster>
463 fCounters->fNClusterLoaded.Inc();
465 const auto &clusterDesc = GetDescriptor().GetClusterDescriptor(clusterId);
466 auto clusterLocator = clusterDesc.GetLocator();
467 auto clusterSize = clusterLocator.fBytesOnStorage;
470 struct ROnDiskPageLocator {
471 ROnDiskPageLocator() =
default;
473 : fColumnId(
c), fPageNo(p), fOffset(o),
fSize(s) {}
476 std::uint64_t fOffset = 0;
477 std::uint64_t
fSize = 0;
478 std::size_t fBufPos = 0;
482 std::vector<ROnDiskPageLocator> onDiskPages;
484 for (
auto columnId : columns) {
485 const auto &pageRange = clusterDesc.GetPageRange(columnId);
487 for (
const auto &pageInfo : pageRange.fPageInfos) {
488 const auto &pageLocator = pageInfo.fLocator;
489 activeSize += pageLocator.fBytesOnStorage;
490 onDiskPages.emplace_back(ROnDiskPageLocator(
491 columnId, pageNo, pageLocator.fPosition, pageLocator.fBytesOnStorage));
497 std::sort(onDiskPages.begin(), onDiskPages.end(),
498 [](
const ROnDiskPageLocator &
a,
const ROnDiskPageLocator &
b) {return a.fOffset < b.fOffset;});
507 float maxOverhead = 0.25 * float(activeSize);
508 std::vector<std::size_t> gaps;
509 for (
unsigned i = 1; i < onDiskPages.size(); ++i) {
510 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].
fSize + onDiskPages[i-1].fOffset));
512 std::sort(gaps.begin(), gaps.end());
513 std::size_t gapCut = 0;
515 for (
auto g : gaps) {
517 if (szExtra > maxOverhead)
523 struct RReadRequest {
524 RReadRequest() =
default;
525 RReadRequest(std::size_t
b, std::uint64_t o, std::uint64_t s) : fBufPos(
b), fOffset(o),
fSize(s) {}
526 std::size_t fBufPos = 0;
527 std::uint64_t fOffset = 0;
528 std::uint64_t
fSize = 0;
530 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
533 std::size_t szPayload = 0;
534 std::size_t szOverhead = 0;
535 for (
auto &s : onDiskPages) {
539 auto overhead = s.fOffset - readUpTo;
540 szPayload += s.fSize;
541 if (overhead <= gapCut) {
542 szOverhead += overhead;
543 s.fBufPos =
reinterpret_cast<intptr_t
>(req.
fBuffer) + req.
fSize + overhead;
544 req.
fSize += overhead + s.fSize;
550 readRequests.emplace_back(req);
553 s.fBufPos =
reinterpret_cast<intptr_t
>(req.
fBuffer);
558 readRequests.emplace_back(req);
559 fCounters->fSzReadPayload.Add(szPayload);
560 fCounters->fSzReadOverhead.Add(szOverhead);
563 auto buffer =
new unsigned char[
reinterpret_cast<intptr_t
>(req.
fBuffer) + req.
fSize];
564 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<
unsigned char []>(buffer));
565 for (
const auto &s : onDiskPages) {
567 pageMap->Register(key,
ROnDiskPage(buffer + s.fBufPos, s.fSize));
569 fCounters->fNPageLoaded.Add(onDiskPages.size());
570 for (
auto &
r : readRequests) {
571 r.fBuffer = buffer +
reinterpret_cast<intptr_t
>(
r.fBuffer);
574 auto nReqs = readRequests.size();
577 fFile->ReadV(&readRequests[0], nReqs);
579 fCounters->fNReadV.Inc();
580 fCounters->fNRead.Add(nReqs);
582 auto cluster = std::make_unique<RCluster>(clusterId);
583 cluster->Adopt(std::move(pageMap));
584 for (
auto colId : columns)
585 cluster->SetColumnAvailable(colId);
593 fTaskScheduler->Reset();
595 const auto clusterId = cluster->
GetId();
596 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
598 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
601 for (
const auto columnId : columnsInCluster) {
602 const auto &columnDesc = fDescriptor.GetColumnDescriptor(columnId);
606 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
607 std::uint64_t pageNo = 0;
608 std::uint64_t firstInPage = 0;
609 for (
const auto &pi : pageRange.fPageInfos) {
613 R__ASSERT(onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage);
616 [
this, columnId, clusterId, firstInPage, onDiskPage,
617 element = allElements.back().get(),
618 nElements = pi.fNElements,
619 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
621 const auto bytesPacked = (element->GetBitsOnStorage() * nElements + 7) / 8;
622 const auto pageSize = element->GetSize() * nElements;
624 auto pageBufferPacked =
new unsigned char[bytesPacked];
625 if (onDiskPage->GetSize() != bytesPacked) {
626 fDecompressor(onDiskPage->GetAddress(), onDiskPage->GetSize(), bytesPacked, pageBufferPacked);
627 fCounters->fSzUnzip.Add(bytesPacked);
631 memcpy(pageBufferPacked, onDiskPage->GetAddress(), bytesPacked);
634 auto pageBuffer = pageBufferPacked;
635 if (!element->IsMappable()) {
636 pageBuffer =
new unsigned char[pageSize];
637 element->Unpack(pageBuffer, pageBufferPacked, nElements);
638 delete[] pageBufferPacked;
641 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, element->GetSize(), nElements);
643 fPagePool->PreloadPage(newPage,
650 fTaskScheduler->AddTask(taskFunc);
652 firstInPage += pi.fNElements;
659 fTaskScheduler->Wait();
#define R__LOG_WARNING(...)
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
size_t GetNOnDiskPages() const
const std::unordered_set< DescriptorId_t > & GetAvailColumns() const
DescriptorId_t GetId() const
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetSize() const
RColumnElementBase * GetElement() const
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetCounter(std::string_view name) const
Searches this object and all the observed sub metrics. Returns nullptr if name is not found.
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Manages pages read from a the file.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
static void DeletePage(const RPage &page)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
RPageSinkFile(std::string_view ntupleName, std::string_view path, const RNTupleWriteOptions &options)
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements=0) final
Get a new, empty page for the given column that can be filled with up to nElements.
void CreateImpl(const RNTupleModel &model) final
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void CommitDatasetImpl() final
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RClusterDescriptor::RLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RClusterDescriptor::RLocator CommitClusterImpl(NTupleSize_t nEntries) final
Abstract interface to write data into an ntuple.
Storage provider that reads ntuple pages from a file.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
std::unique_ptr< RCluster > LoadCluster(DescriptorId_t clusterId, const ColumnSet_t &columns) final
Populates all the pages of the given cluster id and columns; it is possible that some columns do not ...
virtual ~RPageSourceFile()
std::unique_ptr< RCounters > fCounters
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RNTupleDescriptor AttachImpl() final
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RNTupleMetrics fMetrics
Wraps the I/O counters and is observed by the RNTupleReader metrics.
Abstract interface to read data from an ntuple.
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
ClusterSize_t::ValueType GetNElements() const
void * TryGrow(ClusterSize_t::ValueType nElements)
Return a pointer after the last element that has space for nElements new elements.
ClusterSize_t::ValueType GetSize() const
The space taken by column elements in the buffer.
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
DescriptorId_t GetId() const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
A helper class for piece-wise construction of an RNTupleDescriptor.
RNTupleDescriptor MoveDescriptor()
void SetFromHeader(void *headerBuffer)
void AddClustersFromFooter(void *footerBuffer)
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
int GetCompression() const
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
On-disk pages within a page source are identified by the column and page number.
I/O performance counters that get registered in fMetrics.
NTupleSize_t fFirstElementIndex
A 64bit element index.
Generic information about the physical location of data.
std::uint32_t fBytesOnStorage
Used for vector reads from multiple offsets into multiple buffers.
std::size_t fSize
The number of desired bytes.
void * fBuffer
The destination for reading.
std::uint64_t fOffset
The file offset.