Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \ingroup NTuple ROOT7
3/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
4/// \date 2020-11-03
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RLogger.hxx>
20#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPage.hxx>
27#include <ROOT/RPagePool.hxx>
28#include <ROOT/RDaos.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <cstdio>
36#include <cstdlib>
37#include <limits>
38#include <utility>
39#include <regex>
40#include <cassert>
41
42namespace {
46
47/// \brief RNTuple page-DAOS mappings
48enum EDaosMapping { kOidPerCluster, kOidPerPage };
49
50struct RDaosKey {
51 daos_obj_id_t fOid;
52 DistributionKey_t fDkey;
53 AttributeKey_t fAkey;
54};
55
56/// \brief Pre-defined keys for object store. `kDistributionKeyDefault` is the distribution key for metadata and
57/// pagelist values; optionally it can be used for ntuple pages (if under the `kOidPerPage` mapping strategy).
58/// `kAttributeKeyDefault` is the attribute key for ntuple pages under `kOidPerPage`.
59/// `kAttributeKey{Anchor,Header,Footer}` are the respective attribute keys for anchor/header/footer metadata elements.
60static constexpr DistributionKey_t kDistributionKeyDefault = 0x5a3c69f0cafe4a11;
61static constexpr AttributeKey_t kAttributeKeyDefault = 0x4243544b53444229;
62static constexpr AttributeKey_t kAttributeKeyAnchor = 0x4243544b5344422a;
63static constexpr AttributeKey_t kAttributeKeyHeader = 0x4243544b5344422b;
64static constexpr AttributeKey_t kAttributeKeyFooter = 0x4243544b5344422c;
65
66/// \brief Pre-defined 64 LSb of the OIDs for ntuple metadata (holds anchor/header/footer) and clusters' pagelists.
67static constexpr decltype(daos_obj_id_t::lo) kOidLowMetadata = -1;
68static constexpr decltype(daos_obj_id_t::lo) kOidLowPageList = -2;
69
70static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
71
72static constexpr EDaosMapping kDefaultDaosMapping = kOidPerCluster;
73
74template <EDaosMapping mapping>
76 long unsigned columnId, long unsigned pageCount)
77{
78 if constexpr (mapping == kOidPerCluster) {
79 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(clusterId),
80 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
81 static_cast<DistributionKey_t>(columnId), static_cast<AttributeKey_t>(pageCount)};
82 } else if constexpr (mapping == kOidPerPage) {
83 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(pageCount),
84 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
86 }
87}
88
89struct RDaosURI {
90 /// \brief Label of the DAOS pool
91 std::string fPoolLabel;
92 /// \brief Label of the container for this RNTuple
93 std::string fContainerLabel;
94};
95
96/**
97 \brief Parse a DAOS RNTuple URI of the form 'daos://pool_id/container_id'.
98*/
99RDaosURI ParseDaosURI(std::string_view uri)
100{
101 std::regex re("daos://([^/]+)/(.+)");
102 std::cmatch m;
103 if (!std::regex_match(uri.data(), m, re))
104 throw ROOT::Experimental::RException(R__FAIL("Invalid DAOS pool URI."));
105 return {m[1], m[2]};
106}
107
108/// \brief Unpacks a 64-bit RNTuple page locator address for object stores into a pair of 32-bit values:
109/// the attribute key under which the cage is stored and the offset within that cage to access the page.
110std::pair<uint32_t, uint32_t> DecodeDaosPagePosition(const ROOT::Experimental::RNTupleLocatorObject64 &address)
111{
112 auto position = static_cast<uint32_t>(address.fLocation & 0xFFFFFFFF);
113 auto offset = static_cast<uint32_t>(address.fLocation >> 32);
114 return {position, offset};
115}
116
117/// \brief Packs an attribute key together with an offset within its contents into a single 64-bit address.
118/// The offset is kept in the MSb half and defaults to zero, which is the case when caging is disabled.
120{
121 uint64_t address = (position & 0xFFFFFFFF) | (offset << 32);
123}
124
125/// \brief Helper structure concentrating the functionality required to locate an ntuple within a DAOS container.
126/// It includes a hashing function that converts the RNTuple's name into a 32-bit identifier; this value is used to
127/// index the subspace for the ntuple among all objects in the container. A zero-value hash value is reserved for
128/// storing any future metadata related to container-wide management; a zero-index ntuple is thus disallowed and
129/// remapped to "1". Once the index is computed, `InitNTupleDescriptorBuilder()` can be called to return a
130/// partially-filled builder with the ntuple's anchor, header and footer, lacking only pagelists. Upon that call,
131/// a copy of the anchor is stored in `fAnchor`.
132struct RDaosContainerNTupleLocator {
133 std::string fName{};
134 ntuple_index_t fIndex{};
135 std::optional<ROOT::Experimental::Internal::RDaosNTupleAnchor> fAnchor;
136 static const ntuple_index_t kReservedIndex = 0;
137
138 RDaosContainerNTupleLocator() = default;
139 explicit RDaosContainerNTupleLocator(const std::string &ntupleName) : fName(ntupleName), fIndex(Hash(ntupleName)){};
140
141 bool IsValid() { return fAnchor.has_value() && fAnchor->fNBytesHeader; }
142 [[nodiscard]] ntuple_index_t GetIndex() const { return fIndex; };
143 static ntuple_index_t Hash(const std::string &ntupleName)
144 {
145 // Convert string to numeric representation via `std::hash`.
146 uint64_t h = std::hash<std::string>{}(ntupleName);
147 // Fold the hash into 32-bit using `boost::hash_combine()` algorithm and magic number.
148 auto seed = static_cast<uint32_t>(h >> 32);
149 seed ^= static_cast<uint32_t>(h & 0xffffffff) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
150 auto hash = static_cast<ntuple_index_t>(seed);
151 return (hash == kReservedIndex) ? kReservedIndex + 1 : hash;
152 }
153
156 {
157 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
158 auto &anchor = fAnchor.emplace();
159 int err;
160
162 daos_obj_id_t oidMetadata{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(this->GetIndex())};
163
164 buffer = std::make_unique<unsigned char[]>(anchorSize);
165 if ((err = cont.ReadSingleAkey(buffer.get(), anchorSize, oidMetadata, kDistributionKeyDefault,
167 return err;
168 }
169
170 anchor.Deserialize(buffer.get(), anchorSize).Unwrap();
171 if (anchor.fVersionEpoch != ROOT::RNTuple::kVersionEpoch) {
173 R__FAIL("unsupported RNTuple epoch version: " + std::to_string(anchor.fVersionEpoch)));
174 }
175
176 builder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
177 buffer = std::make_unique<unsigned char[]>(anchor.fLenHeader);
178 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesHeader);
179 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesHeader, oidMetadata, kDistributionKeyDefault,
181 return err;
183 buffer.get());
185
186 builder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
187 buffer = std::make_unique<unsigned char[]>(anchor.fLenFooter);
188 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesFooter);
189 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesFooter, oidMetadata, kDistributionKeyDefault,
191 return err;
193 buffer.get());
195
196 return 0;
197 }
198
199 static std::pair<RDaosContainerNTupleLocator, ROOT::Experimental::Internal::RNTupleDescriptorBuilder>
201 {
202 auto result = std::make_pair(RDaosContainerNTupleLocator(ntupleName),
204
205 auto &loc = result.first;
206 auto &builder = result.second;
207
208 if (int err = loc.InitNTupleDescriptorBuilder(cont, builder); !err) {
209 if (ntupleName.empty() || ntupleName != builder.GetDescriptor().GetName()) {
210 // Hash already taken by a differently-named ntuple.
212 R__FAIL("LocateNTuple: ntuple name '" + ntupleName + "' unavailable in this container."));
213 }
214 }
215 return result;
216 }
217};
218
219} // anonymous namespace
220
221////////////////////////////////////////////////////////////////////////////////
222
241
244{
245 if (bufSize < 32)
246 return R__FAIL("DAOS anchor too short");
247
249 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
251 if (fVersionAnchor != RDaosNTupleAnchor().fVersionAnchor) {
252 return R__FAIL("unsupported DAOS anchor version: " + std::to_string(fVersionAnchor));
253 }
254
264 if (!result)
265 return R__FORWARD_ERROR(result);
266 return result.Unwrap() + 32;
267}
268
273
274////////////////////////////////////////////////////////////////////////////////
275
277 const RNTupleWriteOptions &options)
278 : RPagePersistentSink(ntupleName, options), fURI(uri)
279{
280 static std::once_flag once;
281 std::call_once(once, []() {
282 R__LOG_WARNING(NTupleLog()) << "The DAOS backend is experimental and still under development. "
283 << "Do not store real data with this version of RNTuple!";
284 });
285 fCompressor = std::make_unique<RNTupleCompressor>();
286 EnableDefaultMetrics("RPageSinkDaos");
287}
288
290
292{
293 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
294 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
295 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
296 if (oclass.IsUnknown())
297 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
298
299 size_t cageSz = opts ? opts->GetMaxCageSize() : RNTupleWriteOptionsDaos().GetMaxCageSize();
300 size_t pageSz = opts ? opts->GetMaxUnzippedPageSize() : RNTupleWriteOptionsDaos().GetMaxUnzippedPageSize();
301 fCageSizeLimit = std::max(cageSz, pageSz);
302
303 auto args = ParseDaosURI(fURI);
304 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
305
306 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel, /*create =*/true);
307 fDaosContainer->SetDefaultObjectClass(oclass);
308
309 auto [locator, _] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
310 fNTupleIndex = locator.GetIndex();
311
312 auto zipBuffer = std::make_unique<unsigned char[]>(length);
313 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
315 WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
316}
317
320{
321 auto element = columnHandle.fColumn->GetElement();
323 {
324 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
325 sealedPage = SealPage(page, *element);
326 }
327
328 fCounters->fSzZip.Add(page.GetNBytes());
329 return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage);
330}
331
335{
336 auto offsetData = fPageId.fetch_add(1);
337 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
338
339 {
340 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
342 fDaosContainer->WriteSingleAkey(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
343 daosKey.fAkey);
344 }
345
348 result.fBytesOnStorage = sealedPage.GetDataSize();
350 fCounters->fNPageCommitted.Inc();
351 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
352 fNBytesCurrentCluster += sealedPage.GetBufferSize();
353 return result;
354}
355
356std::vector<ROOT::Experimental::RNTupleLocator>
357ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
358 const std::vector<bool> &mask)
359{
361 std::vector<ROOT::Experimental::RNTupleLocator> locators;
362 auto nPages = mask.size();
363 locators.reserve(nPages);
364
365 const uint32_t maxCageSz = fCageSizeLimit;
366 const bool useCaging = fCageSizeLimit > 0;
367 const std::uint8_t locatorFlags = useCaging ? EDaosLocatorFlags::kCagedPage : 0;
368
369 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
370 int64_t payloadSz = 0;
371 std::size_t positionOffset;
372 uint32_t positionIndex;
373
374 /// Aggregate batch of requests by object ID and distribution key, determined by the ntuple-DAOS mapping
375 for (auto &range : ranges) {
376 positionOffset = 0;
377 /// Under caging, the atomic page counter is fetch-incremented for every column range to get the position of its
378 /// first cage and indicate the next one, also ensuring subsequent pages of different columns do not end up caged
379 /// together. This increment is not necessary in the absence of caging, as each page is trivially caged.
380 positionIndex = useCaging ? fPageId.fetch_add(1) : fPageId.load();
381
382 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
384
386 positionOffset = 0;
387 positionIndex = fPageId.fetch_add(1);
388 }
389
391 d_iov_set(&pageIov, const_cast<void *>(s.GetBuffer()), s.GetBufferSize());
392
393 RDaosKey daosKey =
394 GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, positionIndex);
397 it->second.Insert(daosKey.fAkey, pageIov);
398
401 locator.fBytesOnStorage = s.GetDataSize();
403 locator.fReserved = locatorFlags;
404 locators.push_back(locator);
405
408 }
409 }
410 fNBytesCurrentCluster += payloadSz;
411
412 {
413 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
414 if (int err = fDaosContainer->WriteV(writeRequests))
415 throw ROOT::Experimental::RException(R__FAIL("WriteV: error" + std::string(d_errstr(err))));
416 }
417
418 fCounters->fNPageCommitted.Add(nPages);
419 fCounters->fSzWritePayload.Add(payloadSz);
420
421 return locators;
422}
423
425{
426 return std::exchange(fNBytesCurrentCluster, 0);
427}
428
431 std::uint32_t length)
432{
433 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
434 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
436
437 auto offsetData = fClusterGroupId.fetch_add(1);
438 fDaosContainer->WriteSingleAkey(
440 daos_obj_id_t{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)}, kDistributionKeyDefault,
444 result.fBytesOnStorage = szPageListZip;
446 fCounters->fSzWritePayload.Add(static_cast<int64_t>(szPageListZip));
447 return result;
448}
449
451 std::uint32_t length)
452{
453 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
454 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
456 WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
457 WriteNTupleAnchor();
458}
459
461{
462 fDaosContainer->WriteSingleAkey(
463 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
465 fNTupleAnchor.fLenHeader = lenHeader;
466 fNTupleAnchor.fNBytesHeader = nbytes;
467}
468
470{
471 fDaosContainer->WriteSingleAkey(
472 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
474 fNTupleAnchor.fLenFooter = lenFooter;
475 fNTupleAnchor.fNBytesFooter = nbytes;
476}
477
479{
481 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
482 fNTupleAnchor.Serialize(buffer.get());
483 fDaosContainer->WriteSingleAkey(
484 buffer.get(), ntplSize, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
486}
487
488////////////////////////////////////////////////////////////////////////////////
489
491 const RNTupleReadOptions &options)
492 : RPageSource(ntupleName, options),
493 fURI(uri),
494 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
495{
496 EnableDefaultMetrics("RPageSourceDaos");
497
498 auto args = ParseDaosURI(uri);
499 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
500 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel);
501}
502
504
506{
508 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
509
510 auto [locator, descBuilder] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
511 if (!locator.IsValid())
513 R__FAIL("Attach: requested ntuple '" + fNTupleName + "' is not present in DAOS container."));
514
515 auto oclass = RDaosObject::ObjClassId(locator.fAnchor->fObjClass);
516 if (oclass.IsUnknown())
517 throw ROOT::Experimental::RException(R__FAIL("Attach: unknown object class " + locator.fAnchor->fObjClass));
518
519 fDaosContainer->SetDefaultObjectClass(oclass);
520 fNTupleIndex = locator.GetIndex();
522
523 auto desc = descBuilder.MoveDescriptor();
524
525 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
526 buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
527 zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
528 fDaosContainer->ReadSingleAkey(
529 zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, oidPageList, kDistributionKeyDefault,
530 cgDesc.GetPageListLocator().GetPosition<RNTupleLocatorObject64>().fLocation, kCidMetadata);
531 RNTupleDecompressor::Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage,
532 cgDesc.GetPageListLength(), buffer.get());
533
534 RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
535 }
536
537 return desc;
538}
539
541{
542 return fDaosContainer->GetDefaultObjectClass().ToString();
543}
544
547{
548 const auto clusterId = clusterIndex.GetClusterId();
549
551 {
552 auto descriptorGuard = GetSharedDescriptorGuard();
553 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
554 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
555 }
556
557 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
558 sealedPage.SetNElements(pageInfo.fNElements);
559 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
560 if (!sealedPage.GetBuffer())
561 return;
562
563 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
564 assert(!pageInfo.fHasChecksum);
565 memcpy(const_cast<void *>(sealedPage.GetBuffer()), RPage::GetPageZeroBuffer(), sealedPage.GetBufferSize());
566 return;
567 }
568
569 if (pageInfo.fLocator.fReserved & EDaosLocatorFlags::kCagedPage) {
570 // Suboptimal but hard to do differently: we load the full cage up to and including the requested page.
571 // In practice, individual LoadSealedPage calls are rare and usually full clusters are buffered.
572 // The support for extracting individual pages from a cage makes testing easier, however.
573 const auto [position, offset] = DecodeDaosPagePosition(pageInfo.fLocator.GetPosition<RNTupleLocatorObject64>());
574 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, position);
575 const auto bufSize = offset + sealedPage.GetBufferSize();
576 auto cageHeadBuffer = std::make_unique<unsigned char[]>(bufSize);
577 fDaosContainer->ReadSingleAkey(cageHeadBuffer.get(), bufSize, daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
578 memcpy(const_cast<void *>(sealedPage.GetBuffer()), cageHeadBuffer.get() + offset, sealedPage.GetBufferSize());
579 } else {
581 fNTupleIndex, clusterId, physicalColumnId, pageInfo.fLocator.GetPosition<RNTupleLocatorObject64>().fLocation);
582 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
583 daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
584 }
585
586 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
587}
588
593{
594 const auto columnId = columnHandle.fPhysicalId;
595 const auto clusterId = clusterInfo.fClusterId;
596 const auto &pageInfo = clusterInfo.fPageInfo;
597
598 const auto element = columnHandle.fColumn->GetElement();
599 const auto elementSize = element->GetSize();
600 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
601
602 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
604 pageZero.GrowUnchecked(pageInfo.fNElements);
605 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
607 return fPagePool.RegisterPage(std::move(pageZero), elementInMemoryType);
608 }
609
611 sealedPage.SetNElements(pageInfo.fNElements);
612 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
613 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
614 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
615
616 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
617 if (pageInfo.fLocator.fReserved & EDaosLocatorFlags::kCagedPage) {
619 R__FAIL("accessing caged pages is only supported in conjunction with cluster cache"));
620 }
621
622 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[sealedPage.GetBufferSize()]);
624 fNTupleIndex, clusterId, columnId, pageInfo.fLocator.GetPosition<RNTupleLocatorObject64>().fLocation);
625 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
626 daosKey.fAkey);
627 fCounters->fNPageRead.Inc();
628 fCounters->fNRead.Inc();
629 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
630 sealedPage.SetBuffer(directReadBuffer.get());
631 } else {
632 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
633 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
634 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
635
637 if (!cachedPageRef.Get().IsNull())
638 return cachedPageRef;
639
640 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
641 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
642 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
643 sealedPage.SetBuffer(onDiskPage->GetAddress());
644 }
645
647 {
648 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
649 newPage = UnsealPage(sealedPage, *element, columnId).Unwrap();
650 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
651 }
652
653 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
655 fCounters->fNPageUnsealed.Inc();
656 return fPagePool.RegisterPage(std::move(newPage), elementInMemoryType);
657}
658
659std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
661{
662 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
663 return std::unique_ptr<RPageSourceDaos>(clone);
664}
665
666std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
668{
670 DescriptorId_t fClusterId = 0;
671 DescriptorId_t fColumnId = 0;
672 NTupleSize_t fPageNo = 0;
673 std::uint64_t fPosition = 0;
674 std::uint64_t fCageOffset = 0;
675 std::uint64_t fDataSize = 0; // page payload
676 std::uint64_t fBufferSize = 0; // page payload + checksum (if available)
677 };
678
679 // Prepares read requests for a single cluster; `readRequests` is modified by this function. Requests are coalesced
680 // by OID and distribution key.
681 // TODO(jalopezg): this may be a private member function; that, however, requires additional changes given that
682 // `RDaosContainer::MultiObjectRWOperation_t` cannot be forward-declared
685 auto clusterId = clusterKey.fClusterId;
686 // Group page locators by their position in the object store; with caging enabled, this facilitates the
687 // processing of cages' requests together into a single IOV to be loaded.
688 std::unordered_map<std::uint32_t, std::vector<RDaosSealedPageLocator>> onDiskPages;
689
690 unsigned clusterBufSz = 0, nPages = 0;
691 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
692 PrepareLoadCluster(
696 const auto &pageLocator = pageInfo.fLocator;
697 uint32_t position, offset;
698 std::tie(position, offset) = DecodeDaosPagePosition(pageLocator.GetPosition<RNTupleLocatorObject64>());
699 auto [itLoc, _] = onDiskPages.emplace(position, std::vector<RDaosSealedPageLocator>());
700 auto pageBufferSize = pageLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum;
701
702 itLoc->second.push_back(
703 {clusterId, physicalColumnId, pageNo, position, offset, pageLocator.fBytesOnStorage, pageBufferSize});
704 ++nPages;
706 });
707
708 auto clusterBuffer = new unsigned char[clusterBufSz];
709 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(clusterBuffer));
710
712 // Fill the cluster page map and the read requests for the RDaosContainer::ReadV() call
713 for (auto &[cageIndex, pageVec] : onDiskPages) {
714 auto columnId = pageVec[0].fColumnId; // All pages in a cage belong to the same column
715 std::size_t cageSz = 0;
716
717 for (auto &s : pageVec) {
718 assert(columnId == s.fColumnId);
719 assert(cageIndex == s.fPosition);
720 // Register the on disk pages in a page map
721 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
722 pageMap->Register(key, ROnDiskPage(cageBuffer + s.fCageOffset, s.fBufferSize));
723 cageSz += s.fBufferSize;
724 }
725
726 // Prepare new read request batched up by object ID and distribution key
727 d_iov_t iov;
729
733 itReq->second.Insert(daosKey.fAkey, iov);
734
736 }
737 fCounters->fNPageRead.Add(nPages);
738 fCounters->fSzReadPayload.Add(clusterBufSz);
739
740 auto cluster = std::make_unique<RCluster>(clusterId);
741 cluster->Adopt(std::move(pageMap));
742 cluster->Adopt(std::move(pageZeroMap));
743 for (auto colId : clusterKey.fPhysicalColumnSet)
744 cluster->SetColumnAvailable(colId);
745 return cluster;
746 };
747
748 fCounters->fNClusterLoaded.Add(clusterKeys.size());
749
750 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
752 for (auto key : clusterKeys) {
753 clusters.emplace_back(fnPrepareSingleCluster(key, readRequests));
754 }
755
756 {
757 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
758 if (int err = fDaosContainer->ReadV(readRequests))
759 throw ROOT::Experimental::RException(R__FAIL("ReadV: error" + std::string(d_errstr(err))));
760 }
761 fCounters->fNReadV.Inc();
762 fCounters->fNRead.Add(readRequests.size());
763
764 return clusters;
765}
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:294
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define h(i)
Definition RSha256.hxx:106
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
UInt_t Hash(const TString &s)
Definition TString.h:494
#define _(A, B)
Definition cfortran.h:108
Managed a set of clusters containing compressed and packed pages.
A RDaosContainer provides read/write access to objects in a given container.
Definition RDaos.hxx:157
RDaosObject::DistributionKey_t DistributionKey_t
Definition RDaos.hxx:160
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
Definition RDaos.hxx:231
RDaosObject::AttributeKey_t AttributeKey_t
Definition RDaos.hxx:161
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for piece-wise construction of an RNTupleDescriptor.
void AddToOnDiskFooterSize(std::uint64_t size)
The real footer size also include the page list envelopes.
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t DeserializeUInt16(const void *buffer, std::uint16_t &val)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::uint32_t SerializeString(const std::string &val, void *buffer)
static std::uint32_t DeserializeUInt32(const void *buffer, std::uint32_t &val)
static std::uint32_t SerializeUInt64(std::uint64_t val, void *buffer)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< std::uint32_t > DeserializeString(const void *buffer, std::uint64_t bufSize, std::string &val)
static std::uint32_t SerializeUInt16(std::uint16_t val, void *buffer)
static std::uint32_t SerializeUInt32(std::uint32_t val, void *buffer)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Definition RPagePool.hxx:93
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster) final
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
RNTupleDescriptor AttachImpl() final
LoadStructureImpl() has been called before AttachImpl() is called
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new connection to the pool/container.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:174
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
Common user-tunable settings for storing ntuples.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:79
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition daos.h:50
uint16_t daos_oclass_id_t
Definition daos.h:135
@ OC_SX
Definition daos.h:129
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Definition RDaos.hxx:166
Describes a read/write operation on multiple attribute keys under the same object ID and distribution...
Definition RDaos.hxx:190
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint64_t fVersionAnchor
Allows for evolving the struct in future versions.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint16_t fVersionEpoch
Version of the binary format supported by the writer.
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition RDaos.hxx:108
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Summarizes cluster-level information that are necessary to load a certain page.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
RNTupleLocator payload that is common for object stores using 64bit location information.
Generic information about the physical location of data.
iovec for memory buffer
Definition daos.h:37
uint64_t hi
Definition daos.h:147
uint64_t lo
Definition daos.h:146
TMarker m
Definition textangle.C:8