36 , fMetrics(
"RPageSinkRaw")
38 , fZipBuffer(std::make_unique<std::array<char, kMaxPageSize>>())
41 "Do not store real data with this version of RNTuple!";
42 fFile = fopen(std::string(path).c_str(),
"wb");
55 auto written = fwrite(buffer, 1, nbytes, fFile);
62 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
63 auto szHeader = descriptor.SerializeHeader(
nullptr);
64 auto buffer =
new unsigned char[szHeader];
65 descriptor.SerializeHeader(buffer);
66 Write(buffer, szHeader);
68 fClusterStart = fFilePos;
74 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
75 bool isAdoptedBuffer =
true;
76 auto packedBytes = page.
GetSize();
81 packedBytes = (page.
GetNElements() * element->GetBitsOnStorage() + 7) / 8;
82 buffer =
new unsigned char[packedBytes];
84 isAdoptedBuffer =
false;
87 if (fOptions.GetCompression() % 100 != 0) {
89 auto level = fOptions.GetCompression() % 100;
91 int szZipBuffer = kMaxPageSize;
92 int szSource = packedBytes;
93 char *source =
reinterpret_cast<char *
>(buffer);
95 R__zipMultipleAlgorithm(level, &szSource, source, &szZipBuffer, fZipBuffer->data(), &zipBytes, algorithm);
96 if ((zipBytes > 0) && (zipBytes < szSource)) {
99 buffer =
reinterpret_cast<unsigned char *
>(fZipBuffer->data());
100 packedBytes = zipBytes;
101 isAdoptedBuffer =
true;
108 Write(buffer, packedBytes);
110 if (!isAdoptedBuffer)
122 fClusterStart = fFilePos;
128 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
129 auto szFooter = descriptor.SerializeFooter(
nullptr);
130 auto buffer =
new unsigned char[szFooter];
131 descriptor.SerializeFooter(buffer);
132 Write(buffer, szFooter);
140 nElements = kDefaultElementsPerPage;
142 return fPageAllocator->NewPage(columnHandle.
fId, elementSize, nElements);
147 fPageAllocator->DeletePage(page);
154 ColumnId_t columnId,
void *mem, std::size_t elementSize, std::size_t nElements)
156 RPage newPage(columnId, mem, elementSize * nElements, elementSize);
176 , fPagePool(std::make_shared<
RPagePool>())
177 , fUnzipBuffer(std::make_unique<std::array<unsigned char, kMaxPageSize>>())
178 , fMetrics(
"RPageSourceRaw")
185 "timeWallRead",
"ns",
"wall clock time spent reading");
188 "timeWallUnzip",
"ns",
"wall clock time spent decompressing");
190 "timeCpuUnzip",
"ns",
"CPU time spent decompressing");
211 auto nread = fFile->ReadAt(buffer, nbytes, offset);
213 fCtrSzRead->Add(nread);
221 auto fileSize = fFile->GetSize();
227 std::uint32_t szHeader;
228 std::uint32_t szFooter;
230 R__ASSERT(fileSize >= szHeader + szFooter);
232 unsigned char *header =
new unsigned char[szHeader];
233 unsigned char *footer =
new unsigned char[szFooter];
234 Read(header, szHeader, 0);
235 Read(footer, szFooter, fileSize - szFooter);
251 auto columnId = columnHandle.
fId;
252 auto clusterId = clusterDescriptor.
GetId();
253 const auto &pageRange = clusterDescriptor.
GetPageRange(columnId);
257 decltype(clusterIndex) firstInPage = 0;
258 for (
const auto &
pi : pageRange.fPageInfos) {
259 if (firstInPage +
pi.fNElements > clusterIndex) {
263 firstInPage +=
pi.fNElements;
269 auto elementSize = element->
GetSize();
272 void *pageBuffer =
malloc(std::max(pageSize,
static_cast<std::uint32_t
>(elementSize * pageInfo.
fNElements)));
276 auto bytesOnStorage = (element->GetBitsOnStorage() * pageInfo.
fNElements + 7) / 8;
277 if (pageSize != bytesOnStorage) {
280 R__ASSERT(bytesOnStorage <= kMaxPageSize);
283 int szUnzipBuffer = kMaxPageSize;
284 int szSource = pageSize;
285 unsigned char *source =
reinterpret_cast<unsigned char *
>(pageBuffer);
287 R__unzip(&szSource, source, &szUnzipBuffer, fUnzipBuffer->data(), &unzipBytes);
288 R__ASSERT(unzipBytes >
static_cast<int>(pageSize));
289 memcpy(pageBuffer, fUnzipBuffer->data(), unzipBytes);
290 pageSize = unzipBytes;
291 fCtrSzUnzip->Add(unzipBytes);
294 if (!element->IsMappable()) {
296 auto unpackedBuffer =
reinterpret_cast<unsigned char *
>(
malloc(pageSize));
298 element->Unpack(unpackedBuffer, pageBuffer, pageInfo.
fNElements);
300 pageBuffer = unpackedBuffer;
304 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, elementSize, pageInfo.
fNElements);
306 fPagePool->RegisterPage(newPage,
318 auto columnId = columnHandle.
fId;
319 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
320 if (!cachedPage.IsNull())
323 auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
325 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
326 auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
328 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
336 auto index = clusterIndex.
GetIndex();
337 auto columnId = columnHandle.
fId;
338 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
339 if (!cachedPage.IsNull())
343 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
344 return PopulatePageFromCluster(columnHandle, clusterDescriptor, index);
349 fPagePool->ReturnPage(page);
355 clone->fFile = fFile->Clone();
356 return std::unique_ptr<RPageSourceRaw>(clone);
#define R__WARNING_HERE(GROUP)
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetSize() const
RColumnElementBase * GetElement() const
CounterPtrT MakeCounter(const std::string &name, const std::string &unit, const std::string &desc)
Record wall time and CPU time between construction and destruction.
Manages pages read from a raw file.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
static void DeletePage(const RPage &page)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
RPageSinkRaw(std::string_view ntupleName, std::string_view path, const RNTupleWriteOptions &options)
RClusterDescriptor::RLocator DoCommitCluster(NTupleSize_t nEntries) final
RClusterDescriptor::RLocator DoCommitPage(ColumnHandle_t columnHandle, const RPage &page) final
void DoCommitDataset() final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements=0) final
Get a new, empty page for the given column that can be filled with up to nElements.
void DoCreate(const RNTupleModel &model) final
void Write(const void *buffer, std::size_t nbytes)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
Abstract interface to write data into an ntuple.
Storage provider that reads ntuple pages from a raw file.
RNTuplePlainCounter * fCtrNPages
RNTupleDescriptor DoAttach() final
RNTupleTickCounter< RNTuplePlainCounter > * fCtrTimeCpuRead
RPageSourceRaw(std::string_view ntupleName, const RNTupleReadOptions &options)
void Read(void *buffer, std::size_t nbytes, std::uint64_t offset)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RNTuplePlainCounter * fCtrSzRead
virtual ~RPageSourceRaw()
std::unique_ptr< ROOT::Internal::RRawFile > fFile
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType clusterIndex)
RNTuplePlainCounter * fCtrTimeWallUnzip
RNTuplePlainCounter * fCtrTimeWallRead
RNTuplePlainCounter * fCtrSzUnzip
RNTuplePlainCounter * fCtrNRead
RNTupleTickCounter< RNTuplePlainCounter > * fCtrTimeCpuUnzip
std::unique_ptr< RPageSource > Clone() const final
Open the same storage multiple time, e.g. for reading in multiple threads.
Abstract interface to read data from an ntuple.
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
ClusterSize_t::ValueType GetNElements() const
void * TryGrow(ClusterSize_t::ValueType nElements)
Return a pointer after the last element that has space for nElements new elements.
ClusterSize_t::ValueType GetSize() const
The space taken by column elements in the buffer.
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
DescriptorId_t GetId() const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
A helper class for piece-wise construction of an RNTupleDescriptor.
RNTupleDescriptor MoveDescriptor()
void SetFromHeader(void *headerBuffer)
void AddClustersFromFooter(void *footerBuffer)
The on-storage meta-data of an ntuple.
static void LocateMetadata(const void *postscript, std::uint32_t &szHeader, std::uint32_t &szFooter)
Given kNBytesPostscript bytes, extract the header and footer lengths in bytes.
static constexpr unsigned int kNBytesPostscript
The last few bytes after the footer store the length of footer and header.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
static constexpr std::uint64_t kUnknownFileSize
Derived classes do not necessarily need to provide file size information but they can return "not kno...
static constexpr int kFeatureHasSize
GetSize() does not return kUnknownFileSize.
basic_string_view< char > string_view
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
static constexpr double pi
NTupleSize_t fFirstElementIndex
A 64bit element index.
Generic information about the physical location of data.
std::uint32_t fBytesOnStorage
EValues
Note: this is only temporarily a struct and will become a enum class hence the name.