Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \ingroup NTuple ROOT7
3/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
4/// \date 2020-11-03
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
24#include <ROOT/RNTupleUtil.hxx>
25#include <ROOT/RNTupleZip.hxx>
26#include <ROOT/RPage.hxx>
28#include <ROOT/RPagePool.hxx>
29#include <ROOT/RDaos.hxx>
31
32#include <RVersion.h>
33#include <TError.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <limits>
39#include <utility>
40#include <regex>
41#include <cassert>
42
43namespace {
47
48/// \brief RNTuple page-DAOS mappings
49enum EDaosMapping { kOidPerCluster, kOidPerPage };
50
51struct RDaosKey {
52 daos_obj_id_t fOid;
53 DistributionKey_t fDkey;
54 AttributeKey_t fAkey;
55};
56
57/// \brief Pre-defined keys for object store. `kDistributionKeyDefault` is the distribution key for metadata and
58/// pagelist values; optionally it can be used for ntuple pages (if under the `kOidPerPage` mapping strategy).
59/// `kAttributeKeyDefault` is the attribute key for ntuple pages under `kOidPerPage`.
60/// `kAttributeKey{Anchor,Header,Footer}` are the respective attribute keys for anchor/header/footer metadata elements.
61static constexpr DistributionKey_t kDistributionKeyDefault = 0x5a3c69f0cafe4a11;
62static constexpr AttributeKey_t kAttributeKeyDefault = 0x4243544b53444229;
63static constexpr AttributeKey_t kAttributeKeyAnchor = 0x4243544b5344422a;
64static constexpr AttributeKey_t kAttributeKeyHeader = 0x4243544b5344422b;
65static constexpr AttributeKey_t kAttributeKeyFooter = 0x4243544b5344422c;
66
67/// \brief Pre-defined 64 LSb of the OIDs for ntuple metadata (holds anchor/header/footer) and clusters' pagelists.
68static constexpr decltype(daos_obj_id_t::lo) kOidLowMetadata = -1;
69static constexpr decltype(daos_obj_id_t::lo) kOidLowPageList = -2;
70
71static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
72
73static constexpr EDaosMapping kDefaultDaosMapping = kOidPerCluster;
74
75template <EDaosMapping mapping>
76RDaosKey GetPageDaosKey(ROOT::Experimental::Internal::ntuple_index_t ntplId, long unsigned clusterId,
77 long unsigned columnId, long unsigned pageCount)
78{
79 if constexpr (mapping == kOidPerCluster) {
80 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(clusterId),
81 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
82 static_cast<DistributionKey_t>(columnId), static_cast<AttributeKey_t>(pageCount)};
83 } else if constexpr (mapping == kOidPerPage) {
84 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(pageCount),
85 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
86 kDistributionKeyDefault, kAttributeKeyDefault};
87 }
88}
89
90struct RDaosURI {
91 /// \brief Label of the DAOS pool
92 std::string fPoolLabel;
93 /// \brief Label of the container for this RNTuple
94 std::string fContainerLabel;
95};
96
97/**
98 \brief Parse a DAOS RNTuple URI of the form 'daos://pool_id/container_id'.
99*/
100RDaosURI ParseDaosURI(std::string_view uri)
101{
102 std::regex re("daos://([^/]+)/(.+)");
103 std::cmatch m;
104 if (!std::regex_match(uri.data(), m, re))
105 throw ROOT::Experimental::RException(R__FAIL("Invalid DAOS pool URI."));
106 return {m[1], m[2]};
107}
108
109/// \brief Unpacks a 64-bit RNTuple page locator address for object stores into a pair of 32-bit values:
110/// the attribute key under which the cage is stored and the offset within that cage to access the page.
111std::pair<uint32_t, uint32_t> DecodeDaosPagePosition(const ROOT::Experimental::RNTupleLocatorObject64 &address)
112{
113 auto position = static_cast<uint32_t>(address.fLocation & 0xFFFFFFFF);
114 auto offset = static_cast<uint32_t>(address.fLocation >> 32);
115 return {position, offset};
116}
117
118/// \brief Packs an attribute key together with an offset within its contents into a single 64-bit address.
119/// The offset is kept in the MSb half and defaults to zero, which is the case when caging is disabled.
120ROOT::Experimental::RNTupleLocatorObject64 EncodeDaosPagePosition(uint64_t position, uint64_t offset = 0)
121{
122 uint64_t address = (position & 0xFFFFFFFF) | (offset << 32);
124}
125
126/// \brief Helper structure concentrating the functionality required to locate an ntuple within a DAOS container.
127/// It includes a hashing function that converts the RNTuple's name into a 32-bit identifier; this value is used to
128/// index the subspace for the ntuple among all objects in the container. A zero-value hash value is reserved for
129/// storing any future metadata related to container-wide management; a zero-index ntuple is thus disallowed and
130/// remapped to "1". Once the index is computed, `InitNTupleDescriptorBuilder()` can be called to return a
131/// partially-filled builder with the ntuple's anchor, header and footer, lacking only pagelists. Upon that call,
132/// a copy of the anchor is stored in `fAnchor`.
133struct RDaosContainerNTupleLocator {
134 std::string fName{};
135 ntuple_index_t fIndex{};
136 std::optional<ROOT::Experimental::Internal::RDaosNTupleAnchor> fAnchor;
137 static const ntuple_index_t kReservedIndex = 0;
138
139 RDaosContainerNTupleLocator() = default;
140 explicit RDaosContainerNTupleLocator(const std::string &ntupleName) : fName(ntupleName), fIndex(Hash(ntupleName)){};
141
142 bool IsValid() { return fAnchor.has_value() && fAnchor->fNBytesHeader; }
143 [[nodiscard]] ntuple_index_t GetIndex() const { return fIndex; };
144 static ntuple_index_t Hash(const std::string &ntupleName)
145 {
146 // Convert string to numeric representation via `std::hash`.
147 uint64_t h = std::hash<std::string>{}(ntupleName);
148 // Fold the hash into 32-bit using `boost::hash_combine()` algorithm and magic number.
149 auto seed = static_cast<uint32_t>(h >> 32);
150 seed ^= static_cast<uint32_t>(h & 0xffffffff) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
151 auto hash = static_cast<ntuple_index_t>(seed);
152 return (hash == kReservedIndex) ? kReservedIndex + 1 : hash;
153 }
154
155 int InitNTupleDescriptorBuilder(ROOT::Experimental::Internal::RDaosContainer &cont,
158 {
159 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
160 auto &anchor = fAnchor.emplace();
161 int err;
162
164 daos_obj_id_t oidMetadata{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(this->GetIndex())};
165
166 buffer = std::make_unique<unsigned char[]>(anchorSize);
167 if ((err = cont.ReadSingleAkey(buffer.get(), anchorSize, oidMetadata, kDistributionKeyDefault,
168 kAttributeKeyAnchor, kCidMetadata))) {
169 return err;
170 }
171
172 anchor.Deserialize(buffer.get(), anchorSize).Unwrap();
173 if (anchor.fVersionEpoch != ROOT::Experimental::RNTuple::kVersionEpoch) {
175 R__FAIL("unsupported RNTuple epoch version: " + std::to_string(anchor.fVersionEpoch)));
176 }
177 if (anchor.fVersionEpoch == 0) {
178 static std::once_flag once;
179 std::call_once(once, [&anchor]() {
181 << "Pre-release format version: RC " << anchor.fVersionMajor;
182 });
183 }
184
185 builder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
186 buffer = std::make_unique<unsigned char[]>(anchor.fLenHeader);
187 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesHeader);
188 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesHeader, oidMetadata, kDistributionKeyDefault,
189 kAttributeKeyHeader, kCidMetadata)))
190 return err;
191 decompressor.Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
192 ROOT::Experimental::Internal::RNTupleSerializer::DeserializeHeader(buffer.get(), anchor.fLenHeader, builder);
193
194 builder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
195 buffer = std::make_unique<unsigned char[]>(anchor.fLenFooter);
196 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesFooter);
197 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesFooter, oidMetadata, kDistributionKeyDefault,
198 kAttributeKeyFooter, kCidMetadata)))
199 return err;
200 decompressor.Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
201 ROOT::Experimental::Internal::RNTupleSerializer::DeserializeFooter(buffer.get(), anchor.fLenFooter, builder);
202
203 return 0;
204 }
205
206 static std::pair<RDaosContainerNTupleLocator, ROOT::Experimental::Internal::RNTupleDescriptorBuilder>
207 LocateNTuple(ROOT::Experimental::Internal::RDaosContainer &cont, const std::string &ntupleName,
209 {
210 auto result = std::make_pair(RDaosContainerNTupleLocator(ntupleName),
212
213 auto &loc = result.first;
214 auto &builder = result.second;
215
216 if (int err = loc.InitNTupleDescriptorBuilder(cont, decompressor, builder); !err) {
217 if (ntupleName.empty() || ntupleName != builder.GetDescriptor().GetName()) {
218 // Hash already taken by a differently-named ntuple.
220 R__FAIL("LocateNTuple: ntuple name '" + ntupleName + "' unavailable in this container."));
221 }
222 }
223 return result;
224 }
225};
226
227} // anonymous namespace
228
229////////////////////////////////////////////////////////////////////////////////
230
232{
234 if (buffer != nullptr) {
235 auto bytes = reinterpret_cast<unsigned char *>(buffer);
246 }
247 return RNTupleSerializer::SerializeString(fObjClass, nullptr) + 32;
248}
249
251ROOT::Experimental::Internal::RDaosNTupleAnchor::Deserialize(const void *buffer, std::uint32_t bufSize)
252{
253 if (bufSize < 32)
254 return R__FAIL("DAOS anchor too short");
255
257 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
259 if (fVersionAnchor != RDaosNTupleAnchor().fVersionAnchor) {
260 return R__FAIL("unsupported DAOS anchor version: " + std::to_string(fVersionAnchor));
261 }
262
271 auto result = RNTupleSerializer::DeserializeString(bytes, bufSize - 32, fObjClass);
272 if (!result)
273 return R__FORWARD_ERROR(result);
274 return result.Unwrap() + 32;
275}
276
278{
280}
281
282////////////////////////////////////////////////////////////////////////////////
283
284ROOT::Experimental::Internal::RPageSinkDaos::RPageSinkDaos(std::string_view ntupleName, std::string_view uri,
285 const RNTupleWriteOptions &options)
286 : RPagePersistentSink(ntupleName, options),
287 fPageAllocator(std::make_unique<Internal::RPageAllocatorHeap>()),
288 fURI(uri)
289{
290 static std::once_flag once;
291 std::call_once(once, []() {
292 R__LOG_WARNING(NTupleLog()) << "The DAOS backend is experimental and still under development. "
293 << "Do not store real data with this version of RNTuple!";
294 });
295 fCompressor = std::make_unique<RNTupleCompressor>();
296 EnableDefaultMetrics("RPageSinkDaos");
297}
298
300
301void ROOT::Experimental::Internal::RPageSinkDaos::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
302{
303 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
304 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
305 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
306 if (oclass.IsUnknown())
307 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
308
309 size_t cageSz = opts ? opts->GetMaxCageSize() : RNTupleWriteOptionsDaos().GetMaxCageSize();
310 size_t pageSz = opts ? opts->GetApproxUnzippedPageSize() : RNTupleWriteOptionsDaos().GetApproxUnzippedPageSize();
311 fCageSizeLimit = std::max(cageSz, pageSz);
312
313 auto args = ParseDaosURI(fURI);
314 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
315
316 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel, /*create =*/true);
317 fDaosContainer->SetDefaultObjectClass(oclass);
318
319 RNTupleDecompressor decompressor;
320 auto [locator, _] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName, decompressor);
321 fNTupleIndex = locator.GetIndex();
322
323 auto zipBuffer = std::make_unique<unsigned char[]>(length);
324 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
325 RNTupleCompressor::MakeMemCopyWriter(zipBuffer.get()));
326 WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
327}
328
331{
332 auto element = columnHandle.fColumn->GetElement();
333 RPageStorage::RSealedPage sealedPage;
334 {
335 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
336 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
337 }
338
339 fCounters->fSzZip.Add(page.GetNBytes());
340 return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage);
341}
342
345 const RPageStorage::RSealedPage &sealedPage)
346{
347 auto offsetData = fPageId.fetch_add(1);
348 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
349
350 {
351 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
352 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, offsetData);
353 fDaosContainer->WriteSingleAkey(sealedPage.fBuffer, sealedPage.fSize, daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
354 }
355
357 result.fPosition = EncodeDaosPagePosition(offsetData);
358 result.fBytesOnStorage = sealedPage.fSize;
360 fCounters->fNPageCommitted.Inc();
361 fCounters->fSzWritePayload.Add(sealedPage.fSize);
362 fNBytesCurrentCluster += sealedPage.fSize;
363 return result;
364}
365
366std::vector<ROOT::Experimental::RNTupleLocator>
367ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
368{
370 std::vector<ROOT::Experimental::RNTupleLocator> locators;
371 int64_t nPages =
372 std::accumulate(ranges.begin(), ranges.end(), 0, [](int64_t c, const RPageStorage::RSealedPageGroup &r) {
373 return c + std::distance(r.fFirst, r.fLast);
374 });
375 locators.reserve(nPages);
376
377 const uint32_t maxCageSz = fCageSizeLimit;
378 const bool useCaging = fCageSizeLimit > 0;
379 const std::uint8_t locatorFlags = useCaging ? EDaosLocatorFlags::kCagedPage : 0;
380
381 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
382 int64_t payloadSz = 0;
383 std::size_t positionOffset;
384 uint32_t positionIndex;
385
386 /// Aggregate batch of requests by object ID and distribution key, determined by the ntuple-DAOS mapping
387 for (auto &range : ranges) {
388 positionOffset = 0;
389 /// Under caging, the atomic page counter is fetch-incremented for every column range to get the position of its
390 /// first cage and indicate the next one, also ensuring subsequent pages of different columns do not end up caged
391 /// together. This increment is not necessary in the absence of caging, as each page is trivially caged.
392 positionIndex = useCaging ? fPageId.fetch_add(1) : fPageId.load();
393
394 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
395
396 const RPageStorage::RSealedPage &s = *sealedPageIt;
397
398 if (positionOffset + s.fSize > maxCageSz) {
399 positionOffset = 0;
400 positionIndex = fPageId.fetch_add(1);
401 }
402
403 d_iov_t pageIov;
404 d_iov_set(&pageIov, const_cast<void *>(s.fBuffer), s.fSize);
405
406 RDaosKey daosKey =
407 GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, positionIndex);
408 auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
409 auto [it, ret] = writeRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
410 it->second.Insert(daosKey.fAkey, pageIov);
411
412 RNTupleLocator locator;
413 locator.fPosition = EncodeDaosPagePosition(positionIndex, positionOffset);
414 locator.fBytesOnStorage = s.fSize;
416 locator.fReserved = locatorFlags;
417 locators.push_back(locator);
418
419 positionOffset += s.fSize;
420 payloadSz += s.fSize;
421 }
422 }
423 fNBytesCurrentCluster += payloadSz;
424
425 {
426 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
427 if (int err = fDaosContainer->WriteV(writeRequests))
428 throw ROOT::Experimental::RException(R__FAIL("WriteV: error" + std::string(d_errstr(err))));
429 }
430
431 fCounters->fNPageCommitted.Add(nPages);
432 fCounters->fSzWritePayload.Add(payloadSz);
433
434 return locators;
435}
436
438{
439 return std::exchange(fNBytesCurrentCluster, 0);
440}
441
444 std::uint32_t length)
445{
446 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
447 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
448 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
449
450 auto offsetData = fClusterGroupId.fetch_add(1);
451 fDaosContainer->WriteSingleAkey(
452 bufPageListZip.get(), szPageListZip,
453 daos_obj_id_t{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)}, kDistributionKeyDefault,
454 offsetData, kCidMetadata);
456 result.fPosition = RNTupleLocatorObject64{offsetData};
457 result.fBytesOnStorage = szPageListZip;
459 fCounters->fSzWritePayload.Add(static_cast<int64_t>(szPageListZip));
460 return result;
461}
462
464 std::uint32_t length)
465{
466 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
467 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
468 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
469 WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
470 WriteNTupleAnchor();
471}
472
473void ROOT::Experimental::Internal::RPageSinkDaos::WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
474{
475 fDaosContainer->WriteSingleAkey(
476 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
477 kDistributionKeyDefault, kAttributeKeyHeader, kCidMetadata);
478 fNTupleAnchor.fLenHeader = lenHeader;
479 fNTupleAnchor.fNBytesHeader = nbytes;
480}
481
482void ROOT::Experimental::Internal::RPageSinkDaos::WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
483{
484 fDaosContainer->WriteSingleAkey(
485 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
486 kDistributionKeyDefault, kAttributeKeyFooter, kCidMetadata);
487 fNTupleAnchor.fLenFooter = lenFooter;
488 fNTupleAnchor.fNBytesFooter = nbytes;
489}
490
492{
493 const auto ntplSize = RDaosNTupleAnchor::GetSize();
494 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
495 fNTupleAnchor.Serialize(buffer.get());
496 fDaosContainer->WriteSingleAkey(
497 buffer.get(), ntplSize, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
498 kDistributionKeyDefault, kAttributeKeyAnchor, kCidMetadata);
499}
500
503{
504 if (nElements == 0)
505 throw RException(R__FAIL("invalid call: request empty page"));
506 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
507 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
508}
509
511{
512 fPageAllocator->DeletePage(page);
513}
514
515////////////////////////////////////////////////////////////////////////////////
516
517ROOT::Experimental::Internal::RPageSourceDaos::RPageSourceDaos(std::string_view ntupleName, std::string_view uri,
518 const RNTupleReadOptions &options)
519 : RPageSource(ntupleName, options),
520 fPagePool(std::make_shared<RPagePool>()),
521 fURI(uri),
522 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
523{
524 fDecompressor = std::make_unique<RNTupleDecompressor>();
525 EnableDefaultMetrics("RPageSourceDaos");
526
527 auto args = ParseDaosURI(uri);
528 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
529 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel);
530}
531
533
535{
537 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
538
539 auto [locator, descBuilder] =
540 RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName, *fDecompressor);
541 if (!locator.IsValid())
543 R__FAIL("Attach: requested ntuple '" + fNTupleName + "' is not present in DAOS container."));
544
545 auto oclass = RDaosObject::ObjClassId(locator.fAnchor->fObjClass);
546 if (oclass.IsUnknown())
547 throw ROOT::Experimental::RException(R__FAIL("Attach: unknown object class " + locator.fAnchor->fObjClass));
548
549 fDaosContainer->SetDefaultObjectClass(oclass);
550 fNTupleIndex = locator.GetIndex();
551 daos_obj_id_t oidPageList{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)};
552
553 auto desc = descBuilder.MoveDescriptor();
554
555 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
556 buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
557 zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
558 fDaosContainer->ReadSingleAkey(
559 zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, oidPageList, kDistributionKeyDefault,
560 cgDesc.GetPageListLocator().GetPosition<RNTupleLocatorObject64>().fLocation, kCidMetadata);
561 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
562 buffer.get());
563
564 RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
565 }
566
567 return desc;
568}
569
571{
572 return fDaosContainer->GetDefaultObjectClass().ToString();
573}
574
576 RClusterIndex clusterIndex, RSealedPage &sealedPage)
577{
578 const auto clusterId = clusterIndex.GetClusterId();
579
581 {
582 auto descriptorGuard = GetSharedDescriptorGuard();
583 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
584 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
585 }
586
589 R__FAIL("accessing caged pages is only supported in conjunction with cluster cache"));
590 }
591
592 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
593 sealedPage.fSize = bytesOnStorage;
594 sealedPage.fNElements = pageInfo.fNElements;
595 if (!sealedPage.fBuffer)
596 return;
598 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(
599 fNTupleIndex, clusterId, physicalColumnId, pageInfo.fLocator.GetPosition<RNTupleLocatorObject64>().fLocation);
600 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage, daosKey.fOid,
601 daosKey.fDkey, daosKey.fAkey);
602 } else {
603 memcpy(const_cast<void *>(sealedPage.fBuffer), RPage::GetPageZeroBuffer(), bytesOnStorage);
604 }
605}
606
609 const RClusterInfo &clusterInfo,
610 ClusterSize_t::ValueType idxInCluster)
611{
612 const auto columnId = columnHandle.fPhysicalId;
613 const auto clusterId = clusterInfo.fClusterId;
614 const auto &pageInfo = clusterInfo.fPageInfo;
615
616 const auto element = columnHandle.fColumn->GetElement();
617 const auto elementSize = element->GetSize();
618 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
619
620 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
621 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
622
623 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
624 auto pageZero = RPage::MakePageZero(columnId, elementSize);
625 pageZero.GrowUnchecked(pageInfo.fNElements);
626 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
627 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
628 fPagePool->RegisterPage(pageZero, RPageDeleter([](const RPage &, void *) {}, nullptr));
629 return pageZero;
630 }
631
632 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
633 if (pageInfo.fLocator.fReserved & EDaosLocatorFlags::kCagedPage) {
635 R__FAIL("accessing caged pages is only supported in conjunction with cluster cache"));
636 }
637
638 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[bytesOnStorage]);
639 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(
640 fNTupleIndex, clusterId, columnId, pageInfo.fLocator.GetPosition<RNTupleLocatorObject64>().fLocation);
641 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), bytesOnStorage, daosKey.fOid, daosKey.fDkey,
642 daosKey.fAkey);
643 fCounters->fNPageLoaded.Inc();
644 fCounters->fNRead.Inc();
645 fCounters->fSzReadPayload.Add(bytesOnStorage);
646 sealedPageBuffer = directReadBuffer.get();
647 } else {
648 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
649 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
650 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
651
652 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
653 if (!cachedPage.IsNull())
654 return cachedPage;
655
656 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
657 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
658 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
659 sealedPageBuffer = onDiskPage->GetAddress();
660 }
661
662 RPage newPage;
663 {
664 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
665 newPage = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element, columnId);
666 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
667 }
668
669 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
670 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
671 fPagePool->RegisterPage(
672 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
673 fCounters->fNPagePopulated.Inc();
674 return newPage;
675}
676
679{
680 const auto columnId = columnHandle.fPhysicalId;
681 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
682 if (!cachedPage.IsNull())
683 return cachedPage;
684
685 std::uint64_t idxInCluster;
686 RClusterInfo clusterInfo;
687 {
688 auto descriptorGuard = GetSharedDescriptorGuard();
689 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
690
691 if (clusterInfo.fClusterId == kInvalidDescriptorId)
692 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
693
694 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
695 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
696 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
697 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
698 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
699 }
700 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
701}
702
705{
706 const auto clusterId = clusterIndex.GetClusterId();
707 const auto idxInCluster = clusterIndex.GetIndex();
708 const auto columnId = columnHandle.fPhysicalId;
709 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
710 if (!cachedPage.IsNull())
711 return cachedPage;
712
713 if (clusterId == kInvalidDescriptorId)
714 throw RException(R__FAIL("entry out of bounds"));
715
716 RClusterInfo clusterInfo;
717 {
718 auto descriptorGuard = GetSharedDescriptorGuard();
719 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
720 clusterInfo.fClusterId = clusterId;
721 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
722 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
723 }
724
725 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
726}
727
729{
730 fPagePool->ReturnPage(page);
731}
732
733std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceDaos::Clone() const
734{
735 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
736 return std::unique_ptr<RPageSourceDaos>(clone);
737}
738
739std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
741{
742 struct RDaosSealedPageLocator {
743 DescriptorId_t fClusterId = 0;
744 DescriptorId_t fColumnId = 0;
745 NTupleSize_t fPageNo = 0;
746 std::uint64_t fPosition = 0;
747 std::uint64_t fCageOffset = 0;
748 std::uint64_t fSize = 0;
749 };
750
751 // Prepares read requests for a single cluster; `readRequests` is modified by this function. Requests are coalesced
752 // by OID and distribution key.
753 // TODO(jalopezg): this may be a private member function; that, however, requires additional changes given that
754 // `RDaosContainer::MultiObjectRWOperation_t` cannot be forward-declared
755 auto fnPrepareSingleCluster = [&](const RCluster::RKey &clusterKey,
757 auto clusterId = clusterKey.fClusterId;
758 // Group page locators by their position in the object store; with caging enabled, this facilitates the
759 // processing of cages' requests together into a single IOV to be populated.
760 std::unordered_map<std::uint32_t, std::vector<RDaosSealedPageLocator>> onDiskPages;
761
762 unsigned clusterBufSz = 0, nPages = 0;
763 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
764 PrepareLoadCluster(clusterKey, *pageZeroMap,
765 [&](DescriptorId_t physicalColumnId, NTupleSize_t pageNo,
767 const auto &pageLocator = pageInfo.fLocator;
768 uint32_t position, offset;
769 std::tie(position, offset) =
770 DecodeDaosPagePosition(pageLocator.GetPosition<RNTupleLocatorObject64>());
771 auto [itLoc, _] = onDiskPages.emplace(position, std::vector<RDaosSealedPageLocator>());
772
773 itLoc->second.push_back(
774 {clusterId, physicalColumnId, pageNo, position, offset, pageLocator.fBytesOnStorage});
775 ++nPages;
776 clusterBufSz += pageLocator.fBytesOnStorage;
777 });
778
779 auto clusterBuffer = new unsigned char[clusterBufSz];
780 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(clusterBuffer));
781
782 auto cageBuffer = clusterBuffer;
783 // Fill the cluster page map and the read requests for the RDaosContainer::ReadV() call
784 for (auto &[cageIndex, pageVec] : onDiskPages) {
785 auto columnId = pageVec[0].fColumnId; // All pages in a cage belong to the same column
786 std::size_t cageSz = 0;
787
788 for (auto &s : pageVec) {
789 assert(columnId == s.fColumnId);
790 assert(cageIndex == s.fPosition);
791 // Register the on disk pages in a page map
792 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
793 pageMap->Register(key, ROnDiskPage(cageBuffer + s.fCageOffset, s.fSize));
794 cageSz += s.fSize;
795 }
796
797 // Prepare new read request batched up by object ID and distribution key
798 d_iov_t iov;
799 d_iov_set(&iov, cageBuffer, cageSz);
800
801 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, columnId, cageIndex);
802 auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
803 auto [itReq, ret] = readRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
804 itReq->second.Insert(daosKey.fAkey, iov);
805
806 cageBuffer += cageSz;
807 }
808 fCounters->fNPageLoaded.Add(nPages);
809 fCounters->fSzReadPayload.Add(clusterBufSz);
810
811 auto cluster = std::make_unique<RCluster>(clusterId);
812 cluster->Adopt(std::move(pageMap));
813 cluster->Adopt(std::move(pageZeroMap));
814 for (auto colId : clusterKey.fPhysicalColumnSet)
815 cluster->SetColumnAvailable(colId);
816 return cluster;
817 };
818
819 fCounters->fNClusterLoaded.Add(clusterKeys.size());
820
821 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
823 for (auto key : clusterKeys) {
824 clusters.emplace_back(fnPrepareSingleCluster(key, readRequests));
825 }
826
827 {
828 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
829 if (int err = fDaosContainer->ReadV(readRequests))
830 throw ROOT::Experimental::RException(R__FAIL("ReadV: error" + std::string(d_errstr(err))));
831 }
832 fCounters->fNReadV.Inc();
833 fCounters->fNRead.Add(readRequests.size());
834
835 return clusters;
836}
837
839{
840 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
841
842 const auto clusterId = cluster->GetId();
843 auto descriptorGuard = GetSharedDescriptorGuard();
844 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
845
846 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
847
848 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
849 for (const auto columnId : columnsInCluster) {
850 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
851
852 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
853
854 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
855 std::uint64_t pageNo = 0;
856 std::uint64_t firstInPage = 0;
857 for (const auto &pi : pageRange.fPageInfos) {
858 ROnDiskPage::Key key(columnId, pageNo);
859 auto onDiskPage = cluster->GetOnDiskPage(key);
860 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
861
862 auto taskFunc = [this, columnId, clusterId, firstInPage, onDiskPage, element = allElements.back().get(),
863 nElements = pi.fNElements,
864 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
865 auto newPage = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element, columnId);
866 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
867
868 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
869 fPagePool->PreloadPage(
870 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
871 };
872
873 fTaskScheduler->AddTask(taskFunc);
874
875 firstInPage += pi.fNElements;
876 pageNo++;
877 } // for all pages in column
878 } // for all columns in cluster
879
880 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
881
882 fTaskScheduler->Wait();
883}
dim_t fSize
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:294
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define c(i)
Definition RSha256.hxx:101
#define h(i)
Definition RSha256.hxx:106
TObject * clone(const char *newname) const override
Definition RooChi2Var.h:9
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
UInt_t Hash(const TString &s)
Definition TString.h:494
#define _(A, B)
Definition cfortran.h:108
Record wall time and CPU time between construction and destruction.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:325
A RDaosContainer provides read/write access to objects in a given container.
Definition RDaos.hxx:157
RDaosObject::DistributionKey_t DistributionKey_t
Definition RDaos.hxx:160
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
Definition RDaos.hxx:231
int ReadSingleAkey(void *buffer, std::size_t length, daos_obj_id_t oid, DistributionKey_t dkey, AttributeKey_t akey, ObjClassId_t cid)
Read data from a single object attribute key to the given buffer.
Definition RDaos.cxx:211
RDaosObject::AttributeKey_t AttributeKey_t
Definition RDaos.hxx:161
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Helper class to uncompress data blocks in the ROOT compression frame format.
void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for piece-wise construction of an RNTupleDescriptor.
void AddToOnDiskFooterSize(std::uint64_t size)
The real footer size also include the page list envelopes.
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t DeserializeUInt16(const void *buffer, std::uint16_t &val)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::uint32_t SerializeString(const std::string &val, void *buffer)
static std::uint32_t DeserializeUInt32(const void *buffer, std::uint32_t &val)
static std::uint32_t SerializeUInt64(std::uint64_t val, void *buffer)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< std::uint32_t > DeserializeString(const void *buffer, std::uint64_t bufSize, std::string &val)
static std::uint32_t SerializeUInt16(std::uint16_t val, void *buffer)
static std::uint32_t SerializeUInt32(std::uint32_t val, void *buffer)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::uint64_t CommitClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new connection to the pool/container.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:48
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:134
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:83
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:117
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:18
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
Common user-tunable settings for storing ntuples.
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:67
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition daos.h:50
@ OC_SX
Definition daos.h:129
uint16_t daos_oclass_id_t
Definition daos.h:135
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Definition RDaos.hxx:166
Describes a read/write operation on multiple attribute keys under the same object ID and distribution...
Definition RDaos.hxx:190
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint64_t fVersionAnchor
Allows for evolving the struct in future versions.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint16_t fVersionEpoch
Version of the binary format supported by the writer.
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition RDaos.hxx:108
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A range of sealed pages referring to the same column that can be used for vector commit.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
RNTupleLocator payload that is common for object stores using 64bit location information.
Generic information about the physical location of data.
std::uint8_t fReserved
Reserved for use by concrete storage backends.
ELocatorType fType
For non-disk locators, the value for the Type field.
std::variant< std::uint64_t, std::string, RNTupleLocatorObject64 > fPosition
Simple on-disk locators consisting of a 64-bit offset use variant type uint64_t; extended locators ha...
iovec for memory buffer
Definition daos.h:37
uint64_t hi
Definition daos.h:147
uint64_t lo
Definition daos.h:146
TMarker m
Definition textangle.C:8