Logo ROOT  
Reference Guide
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \ingroup NTuple ROOT7
3/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
4/// \date 2020-11-03
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPage.hxx>
27#include <ROOT/RPagePool.hxx>
28#include <ROOT/RDaos.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <cstdio>
36#include <cstdlib>
37#include <iostream>
38#include <limits>
39#include <utility>
40#include <regex>
41
42namespace {
45
46/// \brief RNTuple page-DAOS mappings
47enum EDaosMapping { kOidPerCluster, kOidPerPage };
48
49struct RDaosKey {
50 daos_obj_id_t fOid;
51 DistributionKey_t fDkey;
52 AttributeKey_t fAkey;
53};
54
55/// \brief Pre-defined keys for object store. `kDistributionKeyDefault` is the distribution key for metadata and
56/// pagelist values; optionally it can be used for ntuple pages (if under the `kOidPerPage` mapping strategy).
57/// `kAttributeKeyDefault` is the attribute key for ntuple pages under `kOidPerPage`.
58/// `kAttributeKey{Anchor,Header,Footer}` are the respective attribute keys for anchor/header/footer metadata elements.
59static constexpr DistributionKey_t kDistributionKeyDefault = 0x5a3c69f0cafe4a11;
60static constexpr AttributeKey_t kAttributeKeyDefault = 0x4243544b53444229;
61static constexpr AttributeKey_t kAttributeKeyAnchor = 0x4243544b5344422a;
62static constexpr AttributeKey_t kAttributeKeyHeader = 0x4243544b5344422b;
63static constexpr AttributeKey_t kAttributeKeyFooter = 0x4243544b5344422c;
64
65/// \brief Pre-defined object IDs for metadata (holds anchor/header/footer) and clusters' pagelists.
66static constexpr daos_obj_id_t kOidMetadata{std::uint64_t(-1), 0};
67static constexpr daos_obj_id_t kOidPageList{std::uint64_t(-2), 0};
68
69static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
70
71static constexpr EDaosMapping kDefaultDaosMapping = kOidPerCluster;
72
73template <EDaosMapping mapping>
74RDaosKey GetPageDaosKey(long unsigned clusterId, long unsigned columnId, long unsigned pageCount)
75{
76 if constexpr (mapping == kOidPerCluster) {
77 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(clusterId), 0},
78 static_cast<DistributionKey_t>(columnId), static_cast<AttributeKey_t>(pageCount)};
79 } else if constexpr (mapping == kOidPerPage) {
80 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(pageCount), 0}, kDistributionKeyDefault,
81 kAttributeKeyDefault};
82 }
83}
84
85struct RDaosURI {
86 /// \brief Label of the DAOS pool
87 std::string fPoolLabel;
88 /// \brief Label of the container for this RNTuple
89 std::string fContainerLabel;
90};
91
92/**
93 \brief Parse a DAOS RNTuple URI of the form 'daos://pool_id/container_id'.
94*/
95RDaosURI ParseDaosURI(std::string_view uri)
96{
97 std::regex re("daos://([^/]+)/(.+)");
98 std::cmatch m;
99 if (!std::regex_match(uri.data(), m, re))
100 throw ROOT::Experimental::RException(R__FAIL("Invalid DAOS pool URI."));
101 return {m[1], m[2]};
102}
103} // namespace
104
105
106////////////////////////////////////////////////////////////////////////////////
107
108std::uint32_t
110{
111 using RNTupleSerializer = ROOT::Experimental::Internal::RNTupleSerializer;
112 if (buffer != nullptr) {
113 auto bytes = reinterpret_cast<unsigned char *>(buffer);
114 bytes += RNTupleSerializer::SerializeUInt32(fVersion, bytes);
115 bytes += RNTupleSerializer::SerializeUInt32(fNBytesHeader, bytes);
116 bytes += RNTupleSerializer::SerializeUInt32(fLenHeader, bytes);
117 bytes += RNTupleSerializer::SerializeUInt32(fNBytesFooter, bytes);
118 bytes += RNTupleSerializer::SerializeUInt32(fLenFooter, bytes);
119 bytes += RNTupleSerializer::SerializeString(fObjClass, bytes);
120 }
121 return RNTupleSerializer::SerializeString(fObjClass, nullptr) + 20;
122}
123
125ROOT::Experimental::Detail::RDaosNTupleAnchor::Deserialize(const void *buffer, std::uint32_t bufSize)
126{
127 if (bufSize < 20)
128 return R__FAIL("DAOS anchor too short");
129
130 using RNTupleSerializer = ROOT::Experimental::Internal::RNTupleSerializer;
131 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
132 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fVersion);
133 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fNBytesHeader);
134 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fLenHeader);
135 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fNBytesFooter);
136 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fLenFooter);
137 auto result = RNTupleSerializer::DeserializeString(bytes, bufSize - 20, fObjClass);
138 if (!result)
139 return R__FORWARD_ERROR(result);
140 return result.Unwrap() + 20;
141}
142
143std::uint32_t
145{
146 return RDaosNTupleAnchor().Serialize(nullptr)
148}
149
150
151////////////////////////////////////////////////////////////////////////////////
152
153
155 const RNTupleWriteOptions &options)
156 : RPageSink(ntupleName, options)
157 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
158 , fURI(uri)
159{
160 R__LOG_WARNING(NTupleLog()) << "The DAOS backend is experimental and still under development. " <<
161 "Do not store real data with this version of RNTuple!";
162 fCompressor = std::make_unique<RNTupleCompressor>();
163 EnableDefaultMetrics("RPageSinkDaos");
164}
165
166
168
170 unsigned char *serializedHeader, std::uint32_t length)
171{
172 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
173 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
174 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
175 if (oclass.IsUnknown())
176 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
177
178 auto args = ParseDaosURI(fURI);
179 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
180 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel, /*create =*/true);
181 fDaosContainer->SetDefaultObjectClass(oclass);
182
183 auto zipBuffer = std::make_unique<unsigned char[]>(length);
184 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
185 RNTupleCompressor::MakeMemCopyWriter(zipBuffer.get()));
186 WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
187}
188
189
192{
193 auto element = columnHandle.fColumn->GetElement();
194 RPageStorage::RSealedPage sealedPage;
195 {
196 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
197 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
198 }
199
200 fCounters->fSzZip.Add(page.GetNBytes());
201 return CommitSealedPageImpl(columnHandle.fId, sealedPage);
202}
203
206 const RPageStorage::RSealedPage &sealedPage)
207{
208 auto offsetData = fPageId.fetch_add(1);
209 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNClusters();
210
211 {
212 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
213 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(clusterId, columnId, offsetData);
214 fDaosContainer->WriteSingleAkey(sealedPage.fBuffer, sealedPage.fSize, daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
215 }
216
218 result.fPosition = offsetData;
219 result.fBytesOnStorage = sealedPage.fSize;
220 fCounters->fNPageCommitted.Inc();
221 fCounters->fSzWritePayload.Add(sealedPage.fSize);
222 fNBytesCurrentCluster += sealedPage.fSize;
223 return result;
224}
225
226std::vector<ROOT::Experimental::RNTupleLocator>
227ROOT::Experimental::Detail::RPageSinkDaos::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
228{
230 std::vector<ROOT::Experimental::RNTupleLocator> locators;
231 size_t nPages =
232 std::accumulate(ranges.begin(), ranges.end(), 0, [](size_t c, const RPageStorage::RSealedPageGroup &r) {
233 return c + std::distance(r.fFirst, r.fLast);
234 });
235 locators.reserve(nPages);
236
237 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNClusters();
238 std::size_t szPayload = 0;
239
240 /// Aggregate batch of requests by object ID and distribution key, determined by the ntuple-DAOS mapping
241 for (auto &range : ranges) {
242 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
243 const RPageStorage::RSealedPage &s = *sealedPageIt;
244 d_iov_t pageIov;
245 d_iov_set(&pageIov, const_cast<void *>(s.fBuffer), s.fSize);
246 auto offsetData = fPageId.fetch_add(1);
247
248 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(clusterId, range.fColumnId, offsetData);
249 auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
250 auto [it, ret] = writeRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
251 it->second.insert(daosKey.fAkey, pageIov);
252
253 RNTupleLocator locator;
254 locator.fPosition = offsetData;
255 locator.fBytesOnStorage = s.fSize;
256 locators.push_back(locator);
257
258 szPayload += s.fSize;
259 }
260 }
261 fNBytesCurrentCluster += szPayload;
262
263 {
264 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
265 if (int err = fDaosContainer->WriteV(writeRequests))
266 throw ROOT::Experimental::RException(R__FAIL("WriteV: error" + std::string(d_errstr(err))));
267 }
268
269 fCounters->fNPageCommitted.Add(nPages);
270 fCounters->fSzWritePayload.Add(szPayload);
271
272 return locators;
273}
274
275std::uint64_t
277{
278 return std::exchange(fNBytesCurrentCluster, 0);
279}
280
283 std::uint32_t length)
284{
285 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
286 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
287 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
288
289 auto offsetData = fClusterGroupId.fetch_add(1);
290 fDaosContainer->WriteSingleAkey(bufPageListZip.get(), szPageListZip, kOidPageList, kDistributionKeyDefault,
291 offsetData, kCidMetadata);
293 result.fPosition = offsetData;
294 result.fBytesOnStorage = szPageListZip;
295 fCounters->fSzWritePayload.Add(szPageListZip);
296 return result;
297}
298
299void ROOT::Experimental::Detail::RPageSinkDaos::CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)
300{
301 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
302 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
303 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
304 WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
305 WriteNTupleAnchor();
306}
307
309 const void *data, size_t nbytes, size_t lenHeader)
310{
311 fDaosContainer->WriteSingleAkey(data, nbytes, kOidMetadata, kDistributionKeyDefault, kAttributeKeyHeader,
312 kCidMetadata);
313 fNTupleAnchor.fLenHeader = lenHeader;
314 fNTupleAnchor.fNBytesHeader = nbytes;
315}
316
318 const void *data, size_t nbytes, size_t lenFooter)
319{
320 fDaosContainer->WriteSingleAkey(data, nbytes, kOidMetadata, kDistributionKeyDefault, kAttributeKeyFooter,
321 kCidMetadata);
322 fNTupleAnchor.fLenFooter = lenFooter;
323 fNTupleAnchor.fNBytesFooter = nbytes;
324}
325
327 const auto ntplSize = RDaosNTupleAnchor::GetSize();
328 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
329 fNTupleAnchor.Serialize(buffer.get());
330 fDaosContainer->WriteSingleAkey(buffer.get(), ntplSize, kOidMetadata, kDistributionKeyDefault, kAttributeKeyAnchor,
331 kCidMetadata);
332}
333
336{
337 if (nElements == 0)
338 throw RException(R__FAIL("invalid call: request empty page"));
339 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
340 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
341}
342
344{
345 fPageAllocator->DeletePage(page);
346}
347
348
349////////////////////////////////////////////////////////////////////////////////
350
351
353 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
354{
355 RPage newPage(columnId, mem, elementSize, nElements);
356 newPage.GrowUnchecked(nElements);
357 return newPage;
358}
359
361{
362 if (page.IsNull())
363 return;
364 delete[] reinterpret_cast<unsigned char *>(page.GetBuffer());
365}
366
367
368////////////////////////////////////////////////////////////////////////////////
369
371 const RNTupleReadOptions &options)
372 : RPageSource(ntupleName, options), fPageAllocator(std::make_unique<RPageAllocatorDaos>()),
373 fPagePool(std::make_shared<RPagePool>()), fURI(uri),
374 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
375{
376 fDecompressor = std::make_unique<RNTupleDecompressor>();
377 EnableDefaultMetrics("RPageSourceDaos");
378
379 auto args = ParseDaosURI(uri);
380 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
381 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel);
382}
383
384
386
387
389{
390 RNTupleDescriptorBuilder descBuilder;
392 const auto ntplSize = RDaosNTupleAnchor::GetSize();
393 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
394 fDaosContainer->ReadSingleAkey(buffer.get(), ntplSize, kOidMetadata, kDistributionKeyDefault, kAttributeKeyAnchor,
395 kCidMetadata);
396 ntpl.Deserialize(buffer.get(), ntplSize).Unwrap();
397
398 auto oclass = RDaosObject::ObjClassId(ntpl.fObjClass);
399 if (oclass.IsUnknown())
400 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + ntpl.fObjClass));
401 fDaosContainer->SetDefaultObjectClass(oclass);
402
403 descBuilder.SetOnDiskHeaderSize(ntpl.fNBytesHeader);
404 buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
405 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
406 fDaosContainer->ReadSingleAkey(zipBuffer.get(), ntpl.fNBytesHeader, kOidMetadata, kDistributionKeyDefault,
407 kAttributeKeyHeader, kCidMetadata);
408 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
409 Internal::RNTupleSerializer::DeserializeHeaderV1(buffer.get(), ntpl.fLenHeader, descBuilder);
410
411 descBuilder.AddToOnDiskFooterSize(ntpl.fNBytesFooter);
412 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
413 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
414 fDaosContainer->ReadSingleAkey(zipBuffer.get(), ntpl.fNBytesFooter, kOidMetadata, kDistributionKeyDefault,
415 kAttributeKeyFooter, kCidMetadata);
416 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
417 Internal::RNTupleSerializer::DeserializeFooterV1(buffer.get(), ntpl.fLenFooter, descBuilder);
418
419 auto ntplDesc = descBuilder.MoveDescriptor();
420
421 for (const auto &cgDesc : ntplDesc.GetClusterGroupIterable()) {
422 buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
423 zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
424 fDaosContainer->ReadSingleAkey(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, kOidPageList,
425 kDistributionKeyDefault, cgDesc.GetPageListLocator().fPosition, kCidMetadata);
426 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
427 buffer.get());
428
429 auto clusters = RClusterGroupDescriptorBuilder::GetClusterSummaries(ntplDesc, cgDesc.GetId());
430 Internal::RNTupleSerializer::DeserializePageListV1(buffer.get(), cgDesc.GetPageListLength(), clusters);
431 for (std::size_t i = 0; i < clusters.size(); ++i) {
432 ntplDesc.AddClusterDetails(clusters[i].MoveDescriptor().Unwrap());
433 }
434 }
435
436 return ntplDesc;
437}
438
439
441{
442 return fDaosContainer->GetDefaultObjectClass().ToString();
443}
444
445
447 DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage)
448{
449 const auto clusterId = clusterIndex.GetClusterId();
450
452 {
453 auto descriptorGuard = GetSharedDescriptorGuard();
454 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
455 pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.GetIndex());
456 }
457
458 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
459 sealedPage.fSize = bytesOnStorage;
460 sealedPage.fNElements = pageInfo.fNElements;
461 if (sealedPage.fBuffer) {
462 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(clusterId, columnId, pageInfo.fLocator.fPosition);
463 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage, daosKey.fOid,
464 daosKey.fDkey, daosKey.fAkey);
465 }
466}
467
470 const RClusterInfo &clusterInfo,
471 ClusterSize_t::ValueType idxInCluster)
472{
473 const auto columnId = columnHandle.fId;
474 const auto clusterId = clusterInfo.fClusterId;
475 const auto &pageInfo = clusterInfo.fPageInfo;
476
477 const auto element = columnHandle.fColumn->GetElement();
478 const auto elementSize = element->GetSize();
479 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
480
481 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
482 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
483
484 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
485 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
486 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(clusterId, columnId, pageInfo.fLocator.fPosition);
487 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), bytesOnStorage, daosKey.fOid, daosKey.fDkey,
488 daosKey.fAkey);
489 fCounters->fNPageLoaded.Inc();
490 fCounters->fNRead.Inc();
491 fCounters->fSzReadPayload.Add(bytesOnStorage);
492 sealedPageBuffer = directReadBuffer.get();
493 } else {
494 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
495 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
496 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
497
498 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
499 if (!cachedPage.IsNull())
500 return cachedPage;
501
502 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
503 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
504 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
505 sealedPageBuffer = onDiskPage->GetAddress();
506 }
507
508 std::unique_ptr<unsigned char []> pageBuffer;
509 {
510 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
511 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
512 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
513 }
514
515 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
516 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
517 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
518 fPagePool->RegisterPage(newPage,
519 RPageDeleter([](const RPage &page, void * /*userData*/)
520 {
522 }, nullptr));
523 fCounters->fNPagePopulated.Inc();
524 return newPage;
525}
526
527
529 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
530{
531 const auto columnId = columnHandle.fId;
532 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
533 if (!cachedPage.IsNull())
534 return cachedPage;
535
536 std::uint64_t idxInCluster;
537 RClusterInfo clusterInfo;
538 {
539 auto descriptorGuard = GetSharedDescriptorGuard();
540 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
542
543 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
544 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
545 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
546 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
547 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
548 }
549 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
550}
551
552
554 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
555{
556 const auto clusterId = clusterIndex.GetClusterId();
557 const auto idxInCluster = clusterIndex.GetIndex();
558 const auto columnId = columnHandle.fId;
559 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
560 if (!cachedPage.IsNull())
561 return cachedPage;
562
563 R__ASSERT(clusterId != kInvalidDescriptorId);
564 RClusterInfo clusterInfo;
565 {
566 auto descriptorGuard = GetSharedDescriptorGuard();
567 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
568 clusterInfo.fClusterId = clusterId;
569 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
570 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
571 }
572
573 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
574}
575
577{
578 fPagePool->ReturnPage(page);
579}
580
581std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceDaos::Clone() const
582{
583 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
584 return std::unique_ptr<RPageSourceDaos>(clone);
585}
586
587std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
589{
590 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> result;
591
592 struct RDaosSealedPageLocator {
593 RDaosSealedPageLocator() = default;
594 RDaosSealedPageLocator(DescriptorId_t cl, DescriptorId_t co, NTupleSize_t p, std::uint64_t o, std::uint64_t s,
595 std::size_t b)
596 : fClusterId(cl), fColumnId(co), fPageNo(p), fObjectId(o), fSize(s), fBufPos(b)
597 {
598 }
599 DescriptorId_t fClusterId = 0;
600 DescriptorId_t fColumnId = 0;
601 NTupleSize_t fPageNo = 0;
602 std::uint64_t fObjectId = 0;
603 std::uint64_t fSize = 0;
604 std::size_t fBufPos = 0;
605 };
606
607 std::vector<unsigned char *> clusterBuffers(clusterKeys.size());
608 std::vector<std::unique_ptr<ROnDiskPageMapHeap>> pageMaps(clusterKeys.size());
610
611 std::size_t szPayload = 0;
612 unsigned nPages = 0;
613
614 for (unsigned i = 0; i < clusterKeys.size(); ++i) {
615 const auto &clusterKey = clusterKeys[i];
616 auto clusterId = clusterKey.fClusterId;
617 std::vector<RDaosSealedPageLocator> onDiskClusterPages;
618
619 unsigned clusterBufSz = 0;
620 fCounters->fNClusterLoaded.Inc();
621 {
622 auto descriptorGuard = GetSharedDescriptorGuard();
623 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
624
625 // Collect the necessary page meta-data and sum up the total size of the compressed and packed pages
626 for (auto columnId : clusterKey.fColumnSet) {
627 const auto &pageRange = clusterDesc.GetPageRange(columnId);
628 NTupleSize_t columnPageCount = 0;
629 for (const auto &pageInfo : pageRange.fPageInfos) {
630 const auto &pageLocator = pageInfo.fLocator;
631 onDiskClusterPages.push_back(RDaosSealedPageLocator(clusterId, columnId, columnPageCount,
632 pageLocator.fPosition, pageLocator.fBytesOnStorage,
633 clusterBufSz));
634 ++columnPageCount;
635 clusterBufSz += pageLocator.fBytesOnStorage;
636 }
637 nPages += columnPageCount;
638 }
639 }
640 szPayload += clusterBufSz;
641
642 clusterBuffers[i] = new unsigned char[clusterBufSz];
643 pageMaps[i] = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(clusterBuffers[i]));
644
645 // Fill the cluster page maps and the input dictionary for the RDaosContainer::ReadV() call
646 for (const auto &s : onDiskClusterPages) {
647 // Register the on disk pages in a page map
648 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
649 pageMaps[i]->Register(key, ROnDiskPage(clusterBuffers[i] + s.fBufPos, s.fSize));
650
651 // Prepare new read request batched up by object ID and distribution key
652 d_iov_t iov;
653 d_iov_set(&iov, clusterBuffers[i] + s.fBufPos, s.fSize);
654
655 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(s.fClusterId, s.fColumnId, s.fObjectId);
656 auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
657 auto [it, ret] = readRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
658 it->second.insert(daosKey.fAkey, iov);
659 }
660 }
661 fCounters->fNPageLoaded.Add(nPages);
662 fCounters->fSzReadPayload.Add(szPayload);
663
664 {
665 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
666 if (int err = fDaosContainer->ReadV(readRequests))
667 throw ROOT::Experimental::RException(R__FAIL("ReadV: error" + std::string(d_errstr(err))));
668 }
669 fCounters->fNReadV.Inc();
670 fCounters->fNRead.Add(nPages);
671
672 // Assign each cluster its page map
673 for (unsigned i = 0; i < clusterKeys.size(); ++i) {
674 auto cluster = std::make_unique<RCluster>(clusterKeys[i].fClusterId);
675 cluster->Adopt(std::move(pageMaps[i]));
676 for (auto colId : clusterKeys[i].fColumnSet)
677 cluster->SetColumnAvailable(colId);
678
679 result.emplace_back(std::move(cluster));
680 }
681 return result;
682}
683
685{
686 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
687 fTaskScheduler->Reset();
688
689 const auto clusterId = cluster->GetId();
690 auto descriptorGuard = GetSharedDescriptorGuard();
691 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
692
693 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
694
695 const auto &columnsInCluster = cluster->GetAvailColumns();
696 for (const auto columnId : columnsInCluster) {
697 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
698
699 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
700
701 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
702 std::uint64_t pageNo = 0;
703 std::uint64_t firstInPage = 0;
704 for (const auto &pi : pageRange.fPageInfos) {
705 ROnDiskPage::Key key(columnId, pageNo);
706 auto onDiskPage = cluster->GetOnDiskPage(key);
707 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
708
709 auto taskFunc =
710 [this, columnId, clusterId, firstInPage, onDiskPage,
711 element = allElements.back().get(),
712 nElements = pi.fNElements,
713 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
714 ] () {
715 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
716 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
717
718 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
719 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
720 fPagePool->PreloadPage(newPage,
721 RPageDeleter([](const RPage &page, void * /*userData*/)
722 {
724 }, nullptr));
725 };
726
727 fTaskScheduler->AddTask(taskFunc);
728
729 firstInPage += pi.fNElements;
730 pageNo++;
731 } // for all pages in column
732 } // for all columns in cluster
733
734 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
735
736 fTaskScheduler->Wait();
737}
size_t fSize
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition: RError.hxx:295
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition: RError.hxx:291
#define R__LOG_WARNING(...)
Definition: RLogger.hxx:363
#define c(i)
Definition: RSha256.hxx:101
#define R__ASSERT(e)
Definition: TError.h:118
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t b
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition: RCluster.hxx:154
const ColumnSet_t & GetAvailColumns() const
Definition: RCluster.hxx:198
DescriptorId_t GetId() const
Definition: RCluster.hxx:197
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition: RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
RColumnElementBase * GetElement() const
Definition: RColumn.hxx:308
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
Definition: RDaos.hxx:199
RDaosObject::DistributionKey_t DistributionKey_t
Definition: RDaos.hxx:151
RDaosObject::AttributeKey_t AttributeKey_t
Definition: RDaos.hxx:152
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Definition: RNTupleZip.hxx:49
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition: RCluster.hxx:43
Manages pages read from a DAOS container.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition: RPagePool.hxx:47
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UnzipClusterImpl(RCluster *cluster) final
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new connection to the pool/container.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition: RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition: RPage.hxx:41
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition: RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition: RPage.hxx:109
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::vector< RClusterDescriptorBuilder > GetClusterSummaries(const RNTupleDescriptor &ntplDesc, DescriptorId_t clusterGroupId)
Used to prepare the cluster descriptor builders when loading the page locations for a certain cluster...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Definition: RNTupleUtil.hxx:87
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition: RError.hxx:114
A helper class for piece-wise construction of an RNTupleDescriptor.
void AddToOnDiskFooterSize(std::uint64_t size)
The real footer size also include the page list envelopes.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
const std::string & GetObjectClass() const
Common user-tunable settings for storing ntuples.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition: RError.hxx:195
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition: daos.h:50
uint16_t daos_oclass_id_t
Definition: daos.h:135
@ OC_SX
Definition: daos.h:129
basic_string_view< char > string_view
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Definition: RNTupleUtil.cxx:24
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
Definition: RNTupleUtil.hxx:47
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
Definition: RNTupleUtil.hxx:83
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
Definition: RNTupleUtil.hxx:79
constexpr DescriptorId_t kInvalidDescriptorId
Definition: RNTupleUtil.hxx:84
static constexpr double s
static constexpr double pi
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Definition: RDaos.hxx:157
Describes a read/write operation on multiple objects; see the ReadV/WriteV functions.
Definition: RDaos.hxx:181
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint32_t fVersion
Allows for evolving the struct in future versions.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t Serialize(void *buffer) const
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
Wrap around a daos_oclass_id_t.
Definition: RDaos.hxx:94
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition: RDaos.hxx:107
On-disk pages within a page source are identified by the column and page number.
Definition: RCluster.hxx:53
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A range of sealed pages referring to the same column that can be used for vector commit.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
ClusterSize_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
Generic information about the physical location of data.
iovec for memory buffer
Definition: daos.h:37
uint64_t lo
Definition: daos.h:146
TMarker m
Definition: textangle.C:8