Logo ROOT  
Reference Guide
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \ingroup NTuple ROOT7
3/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
4/// \date 2020-11-03
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
27#include <ROOT/RDaos.hxx>
29
30#include <RVersion.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <iostream>
37#include <utility>
38#include <regex>
39
40namespace {
41struct RDaosURI {
42 /// \brief UUID of the DAOS pool
43 std::string fPoolUuid;
44 /// \brief Ranks of the service replicas, separated by `_`
45 std::string fSvcReplicas;
46 /// \brief UUID of the container for this RNTuple
47 std::string fContainerUuid;
48};
49
50/**
51 \brief Parse a DAOS RNTuple URI of the form 'daos://pool-uuid:svc_replicas/container_uuid'.
52*/
53RDaosURI ParseDaosURI(std::string_view uri)
54{
55 std::regex re("daos://([[:xdigit:]-]+):([[:digit:]_]+)/([[:xdigit:]-]+)");
56 std::cmatch m;
57 if (!std::regex_match(uri.data(), m, re))
58 throw ROOT::Experimental::RException(R__FAIL("Invalid DAOS pool URI."));
59 return { m[1], m[2], m[3] };
60}
61
62/// \brief Some random distribution/attribute key. TODO: apply recommended schema, i.e.
63/// an OID for each cluster + a dkey for each page.
64static constexpr std::uint64_t kDistributionKey = 0x5a3c69f0cafe4a11;
65static constexpr std::uint64_t kAttributeKey = 0x4243544b5344422d;
66
67static constexpr daos_obj_id_t kOidAnchor{std::uint64_t(-1), 0};
68static constexpr daos_obj_id_t kOidHeader{std::uint64_t(-2), 0};
69static constexpr daos_obj_id_t kOidFooter{std::uint64_t(-3), 0};
70
71static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
72} // namespace
73
74
75////////////////////////////////////////////////////////////////////////////////
76
77
78std::uint32_t
80{
82 if (buffer != nullptr) {
83 auto bytes = reinterpret_cast<unsigned char *>(buffer);
84 bytes += SerializeUInt32(fVersion, bytes);
85 bytes += SerializeUInt32(fNBytesHeader, bytes);
86 bytes += SerializeUInt32(fLenHeader, bytes);
87 bytes += SerializeUInt32(fNBytesFooter, bytes);
88 bytes += SerializeUInt32(fLenFooter, bytes);
89 bytes += SerializeString(fObjClass, bytes);
90 }
91 return SerializeString(fObjClass, nullptr) + 20;
92}
93
94std::uint32_t
96{
98 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
99 bytes += DeserializeUInt32(bytes, &fVersion);
100 bytes += DeserializeUInt32(bytes, &fNBytesHeader);
101 bytes += DeserializeUInt32(bytes, &fLenHeader);
102 bytes += DeserializeUInt32(bytes, &fNBytesFooter);
103 bytes += DeserializeUInt32(bytes, &fLenFooter);
104 bytes += DeserializeString(bytes, &fObjClass);
105 return SerializeString(fObjClass, nullptr) + 20;
106}
107
108std::uint32_t
110{
111 return RDaosNTupleAnchor().Serialize(nullptr)
113}
114
115
116////////////////////////////////////////////////////////////////////////////////
117
118
120 const RNTupleWriteOptions &options)
121 : RPageSink(ntupleName, options)
122 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
123 , fURI(uri)
124{
125 R__LOG_WARNING(NTupleLog()) << "The DAOS backend is experimental and still under development. " <<
126 "Do not store real data with this version of RNTuple!";
127 fCompressor = std::make_unique<RNTupleCompressor>();
128 EnableDefaultMetrics("RPageSinkDaos");
129}
130
131
133
134
136{
137 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
138 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
139 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
140 if (oclass.IsUnknown())
141 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
142
143 auto args = ParseDaosURI(fURI);
144 auto pool = std::make_shared<RDaosPool>(args.fPoolUuid, args.fSvcReplicas);
145 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerUuid, /*create =*/ true);
146 fDaosContainer->SetDefaultObjectClass(oclass);
147
148 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
149 auto szHeader = descriptor.GetHeaderSize();
150 auto buffer = std::make_unique<unsigned char[]>(szHeader);
151 descriptor.SerializeHeader(buffer.get());
152
153 auto zipBuffer = std::make_unique<unsigned char[]>(szHeader);
154 auto szZipHeader = fCompressor->Zip(buffer.get(), szHeader, GetWriteOptions().GetCompression(),
155 [&zipBuffer](const void *b, size_t n, size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
156 WriteNTupleHeader(zipBuffer.get(), szZipHeader, szHeader);
157}
158
159
162{
163 auto element = columnHandle.fColumn->GetElement();
164 RPageStorage::RSealedPage sealedPage;
165 {
166 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
167 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
168 }
169
170 fCounters->fSzZip.Add(page.GetNBytes());
171 return CommitSealedPageImpl(columnHandle.fId, sealedPage);
172}
173
174
177 DescriptorId_t /*columnId*/, const RPageStorage::RSealedPage &sealedPage)
178{
179 auto offsetData = fOid.fetch_add(1);
180 {
181 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
182 fDaosContainer->WriteSingleAkey(sealedPage.fBuffer, sealedPage.fSize,
183 {offsetData, 0}, kDistributionKey, kAttributeKey);
184 }
185
187 result.fPosition = offsetData;
188 result.fBytesOnStorage = sealedPage.fSize;
189 fCounters->fNPageCommitted.Inc();
190 fCounters->fSzWritePayload.Add(sealedPage.fSize);
191 fNBytesCurrentCluster += sealedPage.fSize;
192 return result;
193}
194
195
196std::uint64_t
198{
199 return std::exchange(fNBytesCurrentCluster, 0);
200}
201
202
204{
205 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
206 auto szFooter = descriptor.GetFooterSize();
207 auto buffer = std::make_unique<unsigned char []>(szFooter);
208 descriptor.SerializeFooter(buffer.get());
209
210 auto zipBuffer = std::make_unique<unsigned char []>(szFooter);
211 auto szZipFooter = fCompressor->Zip(buffer.get(), szFooter, GetWriteOptions().GetCompression(),
212 [&zipBuffer](const void *b, size_t n, size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
213 WriteNTupleFooter(zipBuffer.get(), szZipFooter, szFooter);
214 WriteNTupleAnchor();
215}
216
217
219 const void *data, size_t nbytes, size_t lenHeader)
220{
221 fDaosContainer->WriteSingleAkey(data, nbytes, kOidHeader, kDistributionKey,
222 kAttributeKey, kCidMetadata);
223 fNTupleAnchor.fLenHeader = lenHeader;
224 fNTupleAnchor.fNBytesHeader = nbytes;
225}
226
228 const void *data, size_t nbytes, size_t lenFooter)
229{
230 fDaosContainer->WriteSingleAkey(data, nbytes, kOidFooter, kDistributionKey,
231 kAttributeKey, kCidMetadata);
232 fNTupleAnchor.fLenFooter = lenFooter;
233 fNTupleAnchor.fNBytesFooter = nbytes;
234}
235
237 const auto ntplSize = RDaosNTupleAnchor::GetSize();
238 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
239 fNTupleAnchor.Serialize(buffer.get());
240 fDaosContainer->WriteSingleAkey(buffer.get(), ntplSize, kOidAnchor, kDistributionKey,
241 kAttributeKey, kCidMetadata);
242}
243
246{
247 if (nElements == 0)
248 throw RException(R__FAIL("invalid call: request empty page"));
249 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
250 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
251}
252
254{
255 fPageAllocator->DeletePage(page);
256}
257
258
259////////////////////////////////////////////////////////////////////////////////
260
261
263 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
264{
265 RPage newPage(columnId, mem, elementSize, nElements);
266 newPage.GrowUnchecked(nElements);
267 return newPage;
268}
269
271{
272 if (page.IsNull())
273 return;
274 delete[] reinterpret_cast<unsigned char *>(page.GetBuffer());
275}
276
277
278////////////////////////////////////////////////////////////////////////////////
279
280
282 const RNTupleReadOptions &options)
283 : RPageSource(ntupleName, options)
284 , fPageAllocator(std::make_unique<RPageAllocatorDaos>())
285 , fPagePool(std::make_shared<RPagePool>())
286 , fURI(uri)
287 , fClusterPool(std::make_unique<RClusterPool>(*this))
288{
289 fDecompressor = std::make_unique<RNTupleDecompressor>();
290 EnableDefaultMetrics("RPageSourceDaos");
291
292 auto args = ParseDaosURI(uri);
293 auto pool = std::make_shared<RDaosPool>(args.fPoolUuid, args.fSvcReplicas);
294 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerUuid);
295}
296
297
299
300
302{
303 RNTupleDescriptorBuilder descBuilder;
305 const auto ntplSize = RDaosNTupleAnchor::GetSize();
306 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
307 fDaosContainer->ReadSingleAkey(buffer.get(), ntplSize, kOidAnchor, kDistributionKey,
308 kAttributeKey, kCidMetadata);
309 ntpl.Deserialize(buffer.get());
310
311 auto oclass = RDaosObject::ObjClassId(ntpl.fObjClass);
312 if (oclass.IsUnknown())
313 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + ntpl.fObjClass));
314 fDaosContainer->SetDefaultObjectClass(oclass);
315
316 buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
317 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
318 fDaosContainer->ReadSingleAkey(zipBuffer.get(), ntpl.fNBytesHeader, kOidHeader, kDistributionKey,
319 kAttributeKey, kCidMetadata);
320 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
321 descBuilder.SetFromHeader(buffer.get());
322
323 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
324 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
325 fDaosContainer->ReadSingleAkey(zipBuffer.get(), ntpl.fNBytesFooter, kOidFooter, kDistributionKey,
326 kAttributeKey, kCidMetadata);
327 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
328 descBuilder.AddClustersFromFooter(buffer.get());
329
330 return descBuilder.MoveDescriptor();
331}
332
333
335{
336 return fDaosContainer->GetDefaultObjectClass().ToString();
337}
338
339
341 DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage)
342{
343 const auto clusterId = clusterIndex.GetClusterId();
344 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
345
346 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.GetIndex());
347
348 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
349 sealedPage.fSize = bytesOnStorage;
350 sealedPage.fNElements = pageInfo.fNElements;
351 if (sealedPage.fBuffer) {
352 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage,
353 {static_cast<decltype(daos_obj_id_t::lo)>(pageInfo.fLocator.fPosition), 0},
354 kDistributionKey, kAttributeKey);
355 }
356}
357
359 ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
360{
361 const auto columnId = columnHandle.fId;
362 const auto clusterId = clusterDescriptor.GetId();
363
364 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
365
366 const auto element = columnHandle.fColumn->GetElement();
367 const auto elementSize = element->GetSize();
368 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
369
370 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
371 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
372
373 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
374 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
375 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), bytesOnStorage,
376 {static_cast<decltype(daos_obj_id_t::lo)>(pageInfo.fLocator.fPosition), 0},
377 kDistributionKey, kAttributeKey);
378 fCounters->fNPageLoaded.Inc();
379 fCounters->fNRead.Inc();
380 fCounters->fSzReadPayload.Add(bytesOnStorage);
381 sealedPageBuffer = directReadBuffer.get();
382 } else {
383 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
384 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
385 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
386
387 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
388 if (!cachedPage.IsNull())
389 return cachedPage;
390
391 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
392 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
393 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
394 sealedPageBuffer = onDiskPage->GetAddress();
395 }
396
397 std::unique_ptr<unsigned char []> pageBuffer;
398 {
399 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
400 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
401 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
402 }
403
404 const auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
405 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
406 newPage.SetWindow(indexOffset + pageInfo.fFirstInPage, RPage::RClusterInfo(clusterId, indexOffset));
407 fPagePool->RegisterPage(newPage,
408 RPageDeleter([](const RPage &page, void * /*userData*/)
409 {
411 }, nullptr));
412 fCounters->fNPagePopulated.Inc();
413 return newPage;
414}
415
416
418 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
419{
420 const auto columnId = columnHandle.fId;
421 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
422 if (!cachedPage.IsNull())
423 return cachedPage;
424
425 const auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
426 R__ASSERT(clusterId != kInvalidDescriptorId);
427 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
428 const auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
429 R__ASSERT(selfOffset <= globalIndex);
430 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
431}
432
433
435 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
436{
437 const auto clusterId = clusterIndex.GetClusterId();
438 const auto idxInCluster = clusterIndex.GetIndex();
439 const auto columnId = columnHandle.fId;
440 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
441 if (!cachedPage.IsNull())
442 return cachedPage;
443
444 R__ASSERT(clusterId != kInvalidDescriptorId);
445 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
446 return PopulatePageFromCluster(columnHandle, clusterDescriptor, idxInCluster);
447}
448
450{
451 fPagePool->ReturnPage(page);
452}
453
454std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceDaos::Clone() const
455{
456 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
457 return std::unique_ptr<RPageSourceDaos>(clone);
458}
459
460std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
462{
463 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> result;
464 for (const auto &clusterKey : clusterKeys) {
465 auto clusterId = clusterKey.fClusterId;
466 fCounters->fNClusterLoaded.Inc();
467
468 const auto &clusterDesc = GetDescriptor().GetClusterDescriptor(clusterId);
469 auto clusterLocator = clusterDesc.GetLocator();
470
471 struct RDaosSealedPageLocator {
472 RDaosSealedPageLocator() = default;
473 RDaosSealedPageLocator(DescriptorId_t c, NTupleSize_t p, std::uint64_t o, std::uint64_t s, std::size_t b)
474 : fColumnId(c), fPageNo(p), fObjectId(o), fSize(s), fBufPos(b) {}
475 DescriptorId_t fColumnId = 0;
476 NTupleSize_t fPageNo = 0;
477 std::uint64_t fObjectId = 0;
478 std::uint64_t fSize = 0;
479 std::size_t fBufPos = 0;
480 };
481
482 // Collect the page necessary page meta-data and sum up the total size of the compressed and packed pages
483 std::vector<RDaosSealedPageLocator> onDiskPages;
484 std::size_t szPayload = 0;
485 for (auto columnId : clusterKey.fColumnSet) {
486 const auto &pageRange = clusterDesc.GetPageRange(columnId);
487 NTupleSize_t pageNo = 0;
488 for (const auto &pageInfo : pageRange.fPageInfos) {
489 const auto &pageLocator = pageInfo.fLocator;
490 onDiskPages.emplace_back(RDaosSealedPageLocator(
491 columnId, pageNo, pageLocator.fPosition, pageLocator.fBytesOnStorage, szPayload));
492 szPayload += pageLocator.fBytesOnStorage;
493 ++pageNo;
494 }
495 }
496
497 // Prepare the input vector for the RDaosContainer::ReadV() call
498 std::vector<RDaosContainer::RWOperation> readRequests;
499 auto buffer = new unsigned char[szPayload];
500 for (auto &s : onDiskPages) {
501 std::vector<d_iov_t> iovs(1);
502 d_iov_set(&iovs[0], buffer + s.fBufPos, s.fSize);
503 readRequests.emplace_back(daos_obj_id_t{s.fObjectId, 0},
504 kDistributionKey, kAttributeKey, iovs);
505 }
506 fCounters->fSzReadPayload.Add(szPayload);
507
508 // Register the on disk pages in a page map
509 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
510 for (const auto &s : onDiskPages) {
511 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
512 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
513 }
514 fCounters->fNPageLoaded.Add(onDiskPages.size());
515
516 {
517 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
518 fDaosContainer->ReadV(readRequests);
519 }
520 fCounters->fNReadV.Inc();
521 fCounters->fNRead.Add(readRequests.size());
522
523 auto cluster = std::make_unique<RCluster>(clusterId);
524 cluster->Adopt(std::move(pageMap));
525 for (auto colId : clusterKey.fColumnSet)
526 cluster->SetColumnAvailable(colId);
527
528 result.emplace_back(std::move(cluster));
529 }
530 return result;
531}
532
533
535{
536 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
537 fTaskScheduler->Reset();
538
539 const auto clusterId = cluster->GetId();
540 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
541
542 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
543
544 const auto &columnsInCluster = cluster->GetAvailColumns();
545 for (const auto columnId : columnsInCluster) {
546 const auto &columnDesc = fDescriptor.GetColumnDescriptor(columnId);
547
548 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
549
550 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
551 std::uint64_t pageNo = 0;
552 std::uint64_t firstInPage = 0;
553 for (const auto &pi : pageRange.fPageInfos) {
554 ROnDiskPage::Key key(columnId, pageNo);
555 auto onDiskPage = cluster->GetOnDiskPage(key);
556 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
557
558 auto taskFunc =
559 [this, columnId, clusterId, firstInPage, onDiskPage,
560 element = allElements.back().get(),
561 nElements = pi.fNElements,
562 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
563 ] () {
564 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
565 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
566
567 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
568 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
569 fPagePool->PreloadPage(newPage,
570 RPageDeleter([](const RPage &page, void * /*userData*/)
571 {
573 }, nullptr));
574 };
575
576 fTaskScheduler->AddTask(taskFunc);
577
578 firstInPage += pi.fNElements;
579 pageNo++;
580 } // for all pages in column
581 } // for all columns in cluster
582
583 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
584
585 fTaskScheduler->Wait();
586}
size_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition: RError.hxx:291
#define R__LOG_WARNING(...)
Definition: RLogger.hxx:363
#define b(i)
Definition: RSha256.hxx:100
#define c(i)
Definition: RSha256.hxx:101
#define R__ASSERT(e)
Definition: TError.h:118
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition: RCluster.hxx:154
const ColumnSet_t & GetAvailColumns() const
Definition: RCluster.hxx:198
DescriptorId_t GetId() const
Definition: RCluster.hxx:197
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition: RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
RColumnElementBase * GetElement() const
Definition: RColumn.hxx:308
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition: RCluster.hxx:43
Manages pages read from a DAOS container.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition: RPagePool.hxx:47
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
void CreateImpl(const RNTupleModel &model) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RClusterDescriptor::RLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
RClusterDescriptor::RLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UnzipClusterImpl(RCluster *cluster) final
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new connection to the pool/container.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition: RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition: RPage.hxx:41
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition: RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition: RPage.hxx:109
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition: RError.hxx:114
A helper class for piece-wise construction of an RNTupleDescriptor.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
const std::string & GetObjectClass() const
Common user-tunable settings for storing ntuples.
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition: daos.h:54
uint16_t daos_oclass_id_t
Definition: daos.h:133
@ OC_SX
Definition: daos.h:127
const Int_t n
Definition: legend1.C:16
basic_string_view< char > string_view
Machine-independent serialization functions for fundamental types.
Definition: RNTupleUtil.hxx:36
std::uint32_t SerializeUInt32(std::uint32_t val, void *buffer)
Definition: RNTupleUtil.cxx:85
std::uint32_t DeserializeString(const void *buffer, std::string *val)
std::uint32_t DeserializeUInt32(const void *buffer, std::uint32_t *val)
Definition: RNTupleUtil.cxx:98
std::uint32_t SerializeString(const std::string &val, void *buffer)
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Definition: RNTupleUtil.cxx:24
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
Definition: RNTupleUtil.hxx:77
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
static constexpr double s
static constexpr double pi
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint32_t fVersion
Allows for evolving the struct in future versions.
std::uint32_t Deserialize(const void *buffer)
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t Serialize(void *buffer) const
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
Wrap around a daos_oclass_id_t.
Definition: RDaos.hxx:70
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition: RDaos.hxx:84
On-disk pages within a page source are identified by the column and page number.
Definition: RCluster.hxx:53
A sealed page contains the bytes of a page as written to storage (packed & compressed).
NTupleSize_t fFirstElementIndex
A 64bit element index.
Generic information about the physical location of data.
RPageInfoExtended Find(RClusterSize::ValueType idxInCluster) const
Find the page in the RPageRange that contains the given element. The element must exist.
auto * m
Definition: textangle.C:8