Logo ROOT  
Reference Guide
RPageStorageRaw.cxx
Go to the documentation of this file.
1/// \file RPageStorageRaw.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
17#include <ROOT/RColumn.hxx>
18#include <ROOT/RLogger.hxx>
20#include <ROOT/RPage.hxx>
22#include <ROOT/RPagePool.hxx>
23#include <ROOT/RRawFile.hxx>
24
25#include <Compression.h>
26#include <RZip.h>
27#include <TError.h>
28
29#include <cstdio>
30#include <cstring>
31#include <iostream>
32
34 const RNTupleWriteOptions &options)
35 : RPageSink(ntupleName, options)
36 , fMetrics("RPageSinkRaw")
37 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
38 , fZipBuffer(std::make_unique<std::array<char, kMaxPageSize>>())
39{
40 R__WARNING_HERE("NTuple") << "The RNTuple file format will change. " <<
41 "Do not store real data with this version of RNTuple!";
42 fFile = fopen(std::string(path).c_str(), "wb");
44}
45
47{
48 if (fFile)
49 fclose(fFile);
50}
51
52void ROOT::Experimental::Detail::RPageSinkRaw::Write(const void *buffer, std::size_t nbytes)
53{
54 R__ASSERT(fFile);
55 auto written = fwrite(buffer, 1, nbytes, fFile);
56 R__ASSERT(written == nbytes);
57 fFilePos += written;
58}
59
61{
62 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
63 auto szHeader = descriptor.SerializeHeader(nullptr);
64 auto buffer = new unsigned char[szHeader];
65 descriptor.SerializeHeader(buffer);
66 Write(buffer, szHeader);
67 delete[] buffer;
68 fClusterStart = fFilePos;
69}
70
73{
74 unsigned char *buffer = reinterpret_cast<unsigned char *>(page.GetBuffer());
75 bool isAdoptedBuffer = true;
76 auto packedBytes = page.GetSize();
77 auto element = columnHandle.fColumn->GetElement();
78 const auto isMappable = element->IsMappable();
79
80 if (!isMappable) {
81 packedBytes = (page.GetNElements() * element->GetBitsOnStorage() + 7) / 8;
82 buffer = new unsigned char[packedBytes];
83 element->Pack(buffer, page.GetBuffer(), page.GetNElements());
84 isAdoptedBuffer = false;
85 }
86
87 if (fOptions.GetCompression() % 100 != 0) {
88 R__ASSERT(packedBytes <= kMaxPageSize);
89 auto level = fOptions.GetCompression() % 100;
90 auto algorithm = static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(fOptions.GetCompression() / 100);
91 int szZipBuffer = kMaxPageSize;
92 int szSource = packedBytes;
93 char *source = reinterpret_cast<char *>(buffer);
94 int zipBytes = 0;
95 R__zipMultipleAlgorithm(level, &szSource, source, &szZipBuffer, fZipBuffer->data(), &zipBytes, algorithm);
96 if ((zipBytes > 0) && (zipBytes < szSource)) {
97 if (!isAdoptedBuffer)
98 delete[] buffer;
99 buffer = reinterpret_cast<unsigned char *>(fZipBuffer->data());
100 packedBytes = zipBytes;
101 isAdoptedBuffer = true;
102 }
103 }
104
106 result.fPosition = fFilePos;
107 result.fBytesOnStorage = packedBytes;
108 Write(buffer, packedBytes);
109
110 if (!isAdoptedBuffer)
111 delete[] buffer;
112
113 return result;
114}
115
118{
120 result.fPosition = fClusterStart;
121 result.fBytesOnStorage = fFilePos - fClusterStart;
122 fClusterStart = fFilePos;
123 return result;
124}
125
127{
128 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
129 auto szFooter = descriptor.SerializeFooter(nullptr);
130 auto buffer = new unsigned char[szFooter];
131 descriptor.SerializeFooter(buffer);
132 Write(buffer, szFooter);
133 delete[] buffer;
134}
135
138{
139 if (nElements == 0)
140 nElements = kDefaultElementsPerPage;
141 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
142 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
143}
144
146{
147 fPageAllocator->DeletePage(page);
148}
149
150////////////////////////////////////////////////////////////////////////////////
151
152
154 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
155{
156 RPage newPage(columnId, mem, elementSize * nElements, elementSize);
157 newPage.TryGrow(nElements);
158 return newPage;
159}
160
162{
163 if (page.IsNull())
164 return;
165 free(page.GetBuffer());
166}
167
168
169////////////////////////////////////////////////////////////////////////////////
170
171
173 const RNTupleReadOptions &options)
174 : RPageSource(ntupleName, options)
175 , fPageAllocator(std::make_unique<RPageAllocatorFile>())
176 , fPagePool(std::make_shared<RPagePool>())
177 , fUnzipBuffer(std::make_unique<std::array<unsigned char, kMaxPageSize>>())
178 , fMetrics("RPageSourceRaw")
179{
180 fCtrNRead = fMetrics.MakeCounter<decltype(fCtrNRead)>("nRead", "", "number of read() calls");
181 fCtrSzRead = fMetrics.MakeCounter<decltype(fCtrSzRead)>("szRead", "B", "volume read from file");
182 fCtrSzUnzip = fMetrics.MakeCounter<decltype(fCtrSzUnzip)>("szUnzip", "B", "volume after unzipping");
183 fCtrNPages = fMetrics.MakeCounter<decltype(fCtrNPages)>("nPages", "", "number of populated pages");
185 "timeWallRead", "ns", "wall clock time spent reading");
186 fCtrTimeCpuRead = fMetrics.MakeCounter<decltype(fCtrTimeCpuRead)>("timeCpuRead", "ns", "CPU time spent reading");
188 "timeWallUnzip", "ns", "wall clock time spent decompressing");
190 "timeCpuUnzip", "ns", "CPU time spent decompressing");
191}
192
194 const RNTupleReadOptions &options)
195 : RPageSourceRaw(ntupleName, options)
196{
200}
201
202
204{
205}
206
207
208void ROOT::Experimental::Detail::RPageSourceRaw::Read(void *buffer, std::size_t nbytes, std::uint64_t offset)
209{
210 RNTuplePlainTimer timer(*fCtrTimeWallRead, *fCtrTimeCpuRead);
211 auto nread = fFile->ReadAt(buffer, nbytes, offset);
212 R__ASSERT(nread == nbytes);
213 fCtrSzRead->Add(nread);
214 fCtrNRead->Inc();
215}
216
217
219{
220 unsigned char postscript[RNTupleDescriptor::kNBytesPostscript];
221 auto fileSize = fFile->GetSize();
224 auto offset = fileSize - RNTupleDescriptor::kNBytesPostscript;
225 Read(postscript, RNTupleDescriptor::kNBytesPostscript, offset);
226
227 std::uint32_t szHeader;
228 std::uint32_t szFooter;
229 RNTupleDescriptor::LocateMetadata(postscript, szHeader, szFooter);
230 R__ASSERT(fileSize >= szHeader + szFooter);
231
232 unsigned char *header = new unsigned char[szHeader];
233 unsigned char *footer = new unsigned char[szFooter];
234 Read(header, szHeader, 0);
235 Read(footer, szFooter, fileSize - szFooter);
236
237 RNTupleDescriptorBuilder descBuilder;
238 descBuilder.SetFromHeader(header);
239 descBuilder.AddClustersFromFooter(footer);
240 delete[] header;
241 delete[] footer;
242
243 return descBuilder.MoveDescriptor();
244}
245
246
248 ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType clusterIndex)
249{
250 fCtrNPages->Inc();
251 auto columnId = columnHandle.fId;
252 auto clusterId = clusterDescriptor.GetId();
253 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
254
255 // TODO(jblomer): binary search
257 decltype(clusterIndex) firstInPage = 0;
258 for (const auto &pi : pageRange.fPageInfos) {
259 if (firstInPage + pi.fNElements > clusterIndex) {
260 pageInfo = pi;
261 break;
262 }
263 firstInPage += pi.fNElements;
264 }
265 R__ASSERT(firstInPage <= clusterIndex);
266 R__ASSERT((firstInPage + pageInfo.fNElements) > clusterIndex);
267
268 auto element = columnHandle.fColumn->GetElement();
269 auto elementSize = element->GetSize();
270
271 auto pageSize = pageInfo.fLocator.fBytesOnStorage;
272 void *pageBuffer = malloc(std::max(pageSize, static_cast<std::uint32_t>(elementSize * pageInfo.fNElements)));
273 R__ASSERT(pageBuffer);
274 Read(pageBuffer, pageSize, pageInfo.fLocator.fPosition);
275
276 auto bytesOnStorage = (element->GetBitsOnStorage() * pageInfo.fNElements + 7) / 8;
277 if (pageSize != bytesOnStorage) {
278 RNTuplePlainTimer timer(*fCtrTimeWallUnzip, *fCtrTimeCpuUnzip);
279
280 R__ASSERT(bytesOnStorage <= kMaxPageSize);
281 // We do have the unzip information in the column range, but here we simply use the value from
282 // the R__zip header
283 int szUnzipBuffer = kMaxPageSize;
284 int szSource = pageSize;
285 unsigned char *source = reinterpret_cast<unsigned char *>(pageBuffer);
286 int unzipBytes = 0;
287 R__unzip(&szSource, source, &szUnzipBuffer, fUnzipBuffer->data(), &unzipBytes);
288 R__ASSERT(unzipBytes > static_cast<int>(pageSize));
289 memcpy(pageBuffer, fUnzipBuffer->data(), unzipBytes);
290 pageSize = unzipBytes;
291 fCtrSzUnzip->Add(unzipBytes);
292 }
293
294 if (!element->IsMappable()) {
295 pageSize = elementSize * pageInfo.fNElements;
296 auto unpackedBuffer = reinterpret_cast<unsigned char *>(malloc(pageSize));
297 R__ASSERT(unpackedBuffer != nullptr);
298 element->Unpack(unpackedBuffer, pageBuffer, pageInfo.fNElements);
299 free(pageBuffer);
300 pageBuffer = unpackedBuffer;
301 }
302
303 auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
304 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, elementSize, pageInfo.fNElements);
305 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
306 fPagePool->RegisterPage(newPage,
307 RPageDeleter([](const RPage &page, void * /*userData*/)
308 {
310 }, nullptr));
311 return newPage;
312}
313
314
316 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
317{
318 auto columnId = columnHandle.fId;
319 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
320 if (!cachedPage.IsNull())
321 return cachedPage;
322
323 auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
324 R__ASSERT(clusterId != kInvalidDescriptorId);
325 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
326 auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
327 R__ASSERT(selfOffset <= globalIndex);
328 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
329}
330
331
333 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
334{
335 auto clusterId = clusterIndex.GetClusterId();
336 auto index = clusterIndex.GetIndex();
337 auto columnId = columnHandle.fId;
338 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
339 if (!cachedPage.IsNull())
340 return cachedPage;
341
342 R__ASSERT(clusterId != kInvalidDescriptorId);
343 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
344 return PopulatePageFromCluster(columnHandle, clusterDescriptor, index);
345}
346
348{
349 fPagePool->ReturnPage(page);
350}
351
352std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceRaw::Clone() const
353{
354 auto clone = new RPageSourceRaw(fNTupleName, fOptions);
355 clone->fFile = fFile->Clone();
356 return std::unique_ptr<RPageSourceRaw>(clone);
357}
#define R__WARNING_HERE(GROUP)
Definition: RLogger.hxx:184
#define R__ASSERT(e)
Definition: TError.h:96
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
#define free
Definition: civetweb.c:1539
#define malloc
Definition: civetweb.c:1536
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
RColumnElementBase * GetElement() const
Definition: RColumn.hxx:230
CounterPtrT MakeCounter(const std::string &name, const std::string &unit, const std::string &desc)
Record wall time and CPU time between construction and destruction.
Manages pages read from a raw file.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition: RPagePool.hxx:46
RPageSinkRaw(std::string_view ntupleName, std::string_view path, const RNTupleWriteOptions &options)
RClusterDescriptor::RLocator DoCommitCluster(NTupleSize_t nEntries) final
RClusterDescriptor::RLocator DoCommitPage(ColumnHandle_t columnHandle, const RPage &page) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements=0) final
Get a new, empty page for the given column that can be filled with up to nElements.
void DoCreate(const RNTupleModel &model) final
void Write(const void *buffer, std::size_t nbytes)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
Abstract interface to write data into an ntuple.
Storage provider that reads ntuple pages from a raw file.
RNTupleTickCounter< RNTuplePlainCounter > * fCtrTimeCpuRead
RPageSourceRaw(std::string_view ntupleName, const RNTupleReadOptions &options)
void Read(void *buffer, std::size_t nbytes, std::uint64_t offset)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< ROOT::Internal::RRawFile > fFile
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType clusterIndex)
RNTupleTickCounter< RNTuplePlainCounter > * fCtrTimeCpuUnzip
std::unique_ptr< RPageSource > Clone() const final
Open the same storage multiple time, e.g. for reading in multiple threads.
Abstract interface to read data from an ntuple.
Stores information about the cluster in which this page resides.
Definition: RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition: RPage.hxx:41
ClusterSize_t::ValueType GetNElements() const
Definition: RPage.hxx:83
void * TryGrow(ClusterSize_t::ValueType nElements)
Return a pointer after the last element that has space for nElements new elements.
Definition: RPage.hxx:107
ClusterSize_t::ValueType GetSize() const
The space taken by column elements in the buffer.
Definition: RPage.hxx:81
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Definition: RNTupleUtil.hxx:83
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
A helper class for piece-wise construction of an RNTupleDescriptor.
The on-storage meta-data of an ntuple.
static void LocateMetadata(const void *postscript, std::uint32_t &szHeader, std::uint32_t &szFooter)
Given kNBytesPostscript bytes, extract the header and footer lengths in bytes.
static constexpr unsigned int kNBytesPostscript
The last few bytes after the footer store the length of footer and header.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition: RRawFile.cxx:73
static constexpr std::uint64_t kUnknownFileSize
Derived classes do not necessarily need to provide file size information but they can return "not kno...
Definition: RRawFile.hxx:43
static constexpr int kFeatureHasSize
GetSize() does not return kUnknownFileSize.
Definition: RRawFile.hxx:49
basic_string_view< char > string_view
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
Definition: RNTupleUtil.hxx:43
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
Definition: RNTupleUtil.hxx:75
constexpr DescriptorId_t kInvalidDescriptorId
Definition: RNTupleUtil.hxx:80
static constexpr double pi
NTupleSize_t fFirstElementIndex
A 64bit element index.
Generic information about the physical location of data.
We do not need to store the element size / uncompressed page size because we know to which column the...
RLocator fLocator
The meaning of fLocator depends on the storage backend.
ClusterSize_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
EValues
Note: this is only temporarily a struct and will become a enum class hence the name.
Definition: Compression.h:78