Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RNTupleUtils.hxx>
26#include <ROOT/RPageStorage.hxx>
27#include <ROOT/RClusterPool.hxx>
29#include <ROOT/RNTupleZip.hxx>
31#include <TROOT.h>
32#include <TFileMergeInfo.h>
33#include <TFile.h>
34#include <TKey.h>
35
36#include <algorithm>
37#include <deque>
38#include <initializer_list>
39#include <unordered_map>
40#include <vector>
41
52
53using namespace ROOT::Experimental::Internal;
54
56{
57 static ROOT::RLogChannel sLog("ROOT.NTuple.Merge");
58 return sLog;
59}
60
61// TFile options parsing
62// -------------------------------------------------------------------------------------
63static bool BeginsWithDelimitedWord(const TString &str, const char *word)
64{
65 const Ssiz_t wordLen = strlen(word);
66 if (str.Length() < wordLen)
67 return false;
68 if (!str.BeginsWith(word, TString::ECaseCompare::kIgnoreCase))
69 return false;
70 return str.Length() == wordLen || str(wordLen) == ' ';
71}
72
73template <typename T>
74static std::optional<T> ParseStringOption(const TString &opts, const char *pattern,
75 std::initializer_list<std::pair<const char *, T>> validValues)
76{
77 const Ssiz_t patternLen = strlen(pattern);
78 assert(pattern[patternLen - 1] == '='); // we want to parse options with the format `option=Value`
79 if (auto idx = opts.Index(pattern, 0, TString::ECaseCompare::kIgnoreCase);
80 idx >= 0 && opts.Length() > idx + patternLen) {
81 auto sub = TString(opts(idx + patternLen, opts.Length() - idx - patternLen));
82 for (const auto &[name, value] : validValues) {
83 if (BeginsWithDelimitedWord(sub, name)) {
84 return value;
85 }
86 }
87 }
88 return std::nullopt;
89}
90
91static std::optional<ENTupleMergingMode> ParseOptionMergingMode(const TString &opts)
92{
93 return ParseStringOption<ENTupleMergingMode>(opts, "rntuple.MergingMode=",
94 {
98 });
99}
100
101static std::optional<ENTupleMergeErrBehavior> ParseOptionErrBehavior(const TString &opts)
102{
103 return ParseStringOption<ENTupleMergeErrBehavior>(opts, "rntuple.ErrBehavior=",
104 {
107 });
108}
109
110static std::optional<ENTupleMergeVersionBehavior> ParseOptionVersionBehavior(const TString &opts)
111{
113 opts, "rntuple.VersionBehavior=",
114 {
117 });
118}
119// -------------------------------------------------------------------------------------
120
121// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
123// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
124try {
125 // Check the inputs
126 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
127 R__LOG_ERROR(NTupleMergeLog()) << "Invalid inputs.";
128 return -1;
129 }
130
131 // Parse the input parameters
132 TIter itr(inputs);
133
134 // First entry is the RNTuple name
135 std::string ntupleName = std::string(itr()->GetName());
136
137 // Second entry is the output file
138 TObject *secondArg = itr();
139 TFile *outFile = dynamic_cast<TFile *>(secondArg);
140 if (!outFile) {
141 R__LOG_ERROR(NTupleMergeLog()) << "Second input parameter should be a TFile, but it's a "
142 << secondArg->ClassName() << ".";
143 return -1;
144 }
145
146 // Check if the output file already has a key with that name
147 TKey *outKey = outFile->FindKey(ntupleName.c_str());
148 ROOT::RNTuple *outNTuple = nullptr;
149 if (outKey) {
150 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
151 if (!outNTuple) {
152 R__LOG_ERROR(NTupleMergeLog()) << "Output file already has key, but not of type RNTuple!";
153 return -1;
154 }
155 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
156 // pointer we just got.
157 }
158
159 const bool defaultComp = mergeInfo->fOptions.Contains("DefaultCompression");
160 const bool firstSrcComp = mergeInfo->fOptions.Contains("FirstSrcCompression");
161 const bool extraVerbose = mergeInfo->fOptions.Contains("rntuple.ExtraVerbose");
162 if (defaultComp && firstSrcComp) {
163 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code.
164 R__LOG_WARNING(NTupleMergeLog()) << "Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
165 "only the latter will apply.";
166 }
167 std::optional<std::uint32_t> compression;
168 if (firstSrcComp) {
169 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
170 // (do nothing here, the compression will be fetched below)
171 } else if (!defaultComp) {
172 // compression was explicitly passed by the user: use it.
173 compression = outFile->GetCompressionSettings();
174 } else {
175 // user passed no compression-related options: use default
177 R__LOG_INFO(NTupleMergeLog()) << "Using the default compression: " << *compression;
178 }
179
180 // The remaining entries are the input files
181 std::vector<std::unique_ptr<RPageSourceFile>> sources;
182 std::vector<RPageSource *> sourcePtrs;
183
184 while (const auto &pitr = itr()) {
185 TFile *inFile = dynamic_cast<TFile *>(pitr);
186 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
187 if (!anchor) {
188 R__LOG_INFO(NTupleMergeLog()) << "No RNTuple anchor named '" << ntupleName << "' from file '"
189 << inFile->GetName() << "'";
190 continue;
191 }
192
193 auto source = RPageSourceFile::CreateFromAnchor(*anchor);
194 if (!compression) {
195 // Get the compression of this RNTuple and use it as the output compression.
196 // We currently assume all column ranges have the same compression, so we just peek at the first one.
197 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
198 auto descriptor = source->GetSharedDescriptorGuard();
199 auto clusterIter = descriptor->GetClusterIterable();
200 auto firstCluster = clusterIter.begin();
201 if (firstCluster == clusterIter.end()) {
203 << "Asked to use the first source's compression as the output compression, but the "
204 "first source (file '"
205 << inFile->GetName()
206 << "') has an empty RNTuple, therefore the output compression could not be "
207 "determined.";
208 return -1;
209 }
210 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
211 auto firstColRange = colRangeIter.begin();
212 if (firstColRange == colRangeIter.end()) {
214 << "Asked to use the first source's compression as the output compression, but the "
215 "first source (file '"
216 << inFile->GetName()
217 << "') has an empty RNTuple, therefore the output compression could not be "
218 "determined.";
219 return -1;
220 }
221 compression = (*firstColRange).GetCompressionSettings().value();
222 R__LOG_INFO(NTupleMergeLog()) << "Using the first RNTuple's compression: " << *compression;
223 }
224 sources.push_back(std::move(source));
225 }
226
227 RNTupleWriteOptions writeOpts;
228 assert(compression);
229 writeOpts.SetCompression(*compression);
230 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
231 std::unique_ptr<ROOT::RNTupleModel> model;
232 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
233 if (outNTuple) {
234 auto outSource = RPageSourceFile::CreateFromAnchor(*outNTuple);
235 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
236 auto desc = outSource->GetSharedDescriptorGuard();
237 model = destination->InitFromDescriptor(desc.GetRef(), true /* copyClusters */);
238 }
239
240 // Interface conversion
241 sourcePtrs.reserve(sources.size());
242 for (const auto &s : sources) {
243 sourcePtrs.push_back(s.get());
244 }
245
246 // Now merge
247 RNTupleMerger merger{std::move(destination), std::move(model)};
248 RNTupleMergeOptions mergerOpts;
249 mergerOpts.fCompressionSettings = *compression;
250 mergerOpts.fExtraVerbose = extraVerbose;
251 if (auto mergingMode = ParseOptionMergingMode(mergeInfo->fOptions)) {
252 mergerOpts.fMergingMode = *mergingMode;
253 }
254 if (auto errBehavior = ParseOptionErrBehavior(mergeInfo->fOptions)) {
255 mergerOpts.fErrBehavior = *errBehavior;
256 }
257 if (auto versionBehavior = ParseOptionVersionBehavior(mergeInfo->fOptions)) {
258 mergerOpts.fVersionBehavior = *versionBehavior;
259 }
260 merger.Merge(sourcePtrs, mergerOpts).ThrowOnError();
261
262 // Provide the caller with a merged anchor object (even though we've already
263 // written it).
264 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
265
266 return 0;
267} catch (const std::exception &ex) {
268 R__LOG_ERROR(NTupleMergeLog()) << "Exception thrown while merging: " << ex.what();
269 return -1;
270}
271
272namespace {
273// Functor used to change the compression of a page to `options.fCompressionSettings`.
274struct RChangeCompressionFunc {
275 const RColumnElementBase &fSrcColElement;
276 const RColumnElementBase &fDstColElement;
277 const RNTupleMergeOptions &fMergeOptions;
278
279 RPageStorage::RSealedPage &fSealedPage;
281 std::byte *fBuffer;
282 std::size_t fBufSize;
283 const ROOT::RNTupleWriteOptions &fWriteOpts;
284
285 void operator()() const
286 {
287 assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier());
288
289 fSealedPage.VerifyChecksumIfEnabled().ThrowOnError();
290
291 const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements());
292 const auto compression = fMergeOptions.fCompressionSettings.value_or(0);
293 // TODO: this buffer could be kept and reused across pages
294 std::unique_ptr<std::byte[]> unzipBufOwned;
295 std::byte *unzipBuf;
296 if (compression != 0) {
297 unzipBufOwned = MakeUninitArray<std::byte>(bytesPacked);
298 unzipBuf = unzipBufOwned.get();
299 } else {
300 unzipBuf = fBuffer;
301 }
302 ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked,
303 unzipBuf);
304
305 const auto checksumSize = fWriteOpts.GetEnablePageChecksums() * sizeof(std::uint64_t);
306 std::size_t nBytesZipped;
307 if (compression != 0) {
308 assert(fBuffer != unzipBuf);
309 assert(fBufSize >= bytesPacked + checksumSize);
310 nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf, bytesPacked, compression, fBuffer);
311 } else {
312 nBytesZipped = bytesPacked;
313 }
314
315 fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()};
316 fSealedPage.ChecksumIfEnabled();
317 }
318};
319
320struct RResealFunc {
321 const RColumnElementBase &fSrcColElement;
322 const RColumnElementBase &fDstColElement;
323 const RNTupleMergeOptions &fMergeOptions;
324
325 RPageStorage::RSealedPage &fSealedPage;
326 ROOT::Internal::RPageAllocator &fPageAlloc;
327 std::byte *fBuffer;
328 std::size_t fBufSize;
329 const ROOT::RNTupleWriteOptions &fWriteOpts;
330
331 void operator()() const
332 {
333 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
334 RPageSink::RSealPageConfig sealConf;
335 sealConf.fElement = &fDstColElement;
336 sealConf.fPage = &page;
337 sealConf.fBuffer = fBuffer;
338 sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings;
339 sealConf.fWriteChecksum = fWriteOpts.GetEnablePageChecksums();
340 assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t));
341 auto refSealedPage = RPageSink::SealPage(sealConf);
342 fSealedPage = refSealedPage;
343 }
344};
345
346struct RTaskVisitor {
347 std::optional<ROOT::Experimental::TTaskGroup> &fGroup;
348
349 template <typename T>
350 void operator()(T &&f)
351 {
352 if (fGroup)
353 fGroup->Run(f);
354 else
355 f();
356 }
357};
358
359struct RCommonField {
360 const ROOT::RFieldDescriptor *fSrc;
361 const ROOT::RFieldDescriptor *fDst;
362
363 RCommonField(const ROOT::RFieldDescriptor &src, const ROOT::RFieldDescriptor &dst) : fSrc(&src), fDst(&dst) {}
364};
365
366struct RDescriptorsComparison {
367 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
368 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
369 std::vector<RCommonField> fCommonFields;
370};
371
372struct RColumnOutInfo {
373 ROOT::DescriptorId_t fColumnId;
374 ENTupleColumnType fColumnType;
375};
376
377// { fully.qualified.fieldName.colInputId => colOutputInfo }
378using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
379
380struct RColumnInfoGroup {
381 std::vector<RColumnMergeInfo> fExtraDstColumns;
382 // These are sorted by InputId
383 std::vector<RColumnMergeInfo> fCommonColumns;
384};
385
386} // namespace
387
388// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
389namespace ROOT::Experimental::Internal {
391 // This column name is built as a dot-separated concatenation of the ancestry of
392 // the columns' parent fields' names plus the index of the column itself.
393 // e.g. "Muon.pt.x._0"
394 std::string fColumnName;
395 // The column id in the source RNTuple
397 // The corresponding column id in the destination RNTuple (the mapping happens in AddColumnsFromField())
400 // If nullopt, use the default in-memory type
401 std::optional<std::type_index> fInMemoryType;
404};
405
406// Data related to a single call of RNTupleMerger::Merge()
408 std::span<RPageSource *> fSources;
413
414 std::vector<RColumnMergeInfo> fColumns;
415 ColumnIdMap_t fColumnIdMap;
416
418
419 RNTupleMergeData(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
420 : fSources{sources}, fDestination{destination}, fMergeOpts{mergeOpts}, fDstDescriptor{destination.GetDescriptor()}
421 {
422 }
423};
424
426 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
427 // never invalidated.
428 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
429 std::vector<RPageStorage::RSealedPageGroup> fGroups;
430 std::vector<std::unique_ptr<std::byte[]>> fBuffers;
431};
432
433std::ostream &operator<<(std::ostream &os, const std::optional<ROOT::RColumnDescriptor::RValueRange> &x)
434{
435 if (x) {
436 os << '(' << x->fMin << ", " << x->fMax << ')';
437 } else {
438 os << "(null)";
439 }
440 return os;
441}
442
443} // namespace ROOT::Experimental::Internal
444
446{
447 // clang-format off
468 // clang-format on
469 return false;
470}
471
472/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
473/// In addition, returns the differences between `dst` and `src`'s structures
474static ROOT::RResult<RDescriptorsComparison>
476{
477 // Cases:
478 // 1. dst == src
479 // 2. dst has fields that src hasn't
480 // 3. src has fields that dst hasn't
481 // 4. dst and src have fields that differ (compatible or incompatible)
482
483 std::vector<std::string> errors;
484 RDescriptorsComparison res;
485
486 std::vector<RCommonField> commonFields;
487
488 for (const auto &dstField : dst.GetTopLevelFields()) {
489 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
490 if (srcFieldId != ROOT::kInvalidDescriptorId) {
491 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
492 commonFields.push_back({srcField, dstField});
493 } else {
494 res.fExtraDstFields.emplace_back(&dstField);
495 }
496 }
497 for (const auto &srcField : src.GetTopLevelFields()) {
498 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
499 if (dstFieldId == ROOT::kInvalidDescriptorId)
500 res.fExtraSrcFields.push_back(&srcField);
501 }
502
503 // Check compatibility of common fields
504 auto fieldsToCheck = commonFields;
505 // NOTE: using index-based for loop because the collection may get extended by the iteration
506 for (std::size_t fieldIdx = 0; fieldIdx < fieldsToCheck.size(); ++fieldIdx) {
507 const auto &field = fieldsToCheck[fieldIdx];
508
509 // NOTE: field.fSrc and field.fDst have the same name by construction
510 const auto &fieldName = field.fSrc->GetFieldName();
511
512 // Require that fields are both projected or both not projected
513 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
514 if (!projCompatible) {
515 std::stringstream ss;
516 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
517 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
518 errors.push_back(ss.str());
519 } else if (field.fSrc->IsProjectedField()) {
520 // if both fields are projected, verify that they point to the same real field
521 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
522 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
523 if (srcName != dstName) {
524 std::stringstream ss;
525 ss << "Field `" << fieldName
526 << "` is projected to a different field than a previously-seen field with the same name (old: "
527 << dstName << ", new: " << srcName << ")";
528 errors.push_back(ss.str());
529 }
530 }
531
532 // Require that fields types match
533 // TODO(gparolini): allow non-identical but compatible types
534 const auto &srcTyName = field.fSrc->GetTypeName();
535 const auto &dstTyName = field.fDst->GetTypeName();
536 if (srcTyName != dstTyName) {
537 std::stringstream ss;
538 ss << "Field `" << fieldName
539 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
540 << ", new: " << srcTyName << ")";
541 errors.push_back(ss.str());
542 }
543
544 // Require that type checksums match
545 const auto srcTyChk = field.fSrc->GetTypeChecksum();
546 const auto dstTyChk = field.fDst->GetTypeChecksum();
547 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
548 std::stringstream ss;
549 ss << "Field `" << field.fSrc->GetFieldName()
550 << "` has a different type checksum than previously-seen field with the same name";
551 errors.push_back(ss.str());
552 }
553
554 // Require that type versions match
555 const auto srcTyVer = field.fSrc->GetTypeVersion();
556 const auto dstTyVer = field.fDst->GetTypeVersion();
557 if (srcTyVer != dstTyVer) {
558 std::stringstream ss;
559 ss << "Field `" << field.fSrc->GetFieldName()
560 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
561 << ", new: " << srcTyVer << ")";
562 errors.push_back(ss.str());
563 }
564
565 // Require that field versions match
566 const auto srcFldVer = field.fSrc->GetFieldVersion();
567 const auto dstFldVer = field.fDst->GetFieldVersion();
568 if (srcFldVer != dstFldVer) {
569 std::stringstream ss;
570 ss << "Field `" << field.fSrc->GetFieldName()
571 << "` has a different field version than previously-seen field with the same name (old: " << dstFldVer
572 << ", new: " << srcFldVer << ")";
573 errors.push_back(ss.str());
574 }
575
576 const auto srcRole = field.fSrc->GetStructure();
577 const auto dstRole = field.fDst->GetStructure();
578 if (srcRole != dstRole) {
579 std::stringstream ss;
580 ss << "Field `" << field.fSrc->GetFieldName()
581 << "` has a different structural role than previously-seen field with the same name (old: " << dstRole
582 << ", new: " << srcRole << ")";
583 errors.push_back(ss.str());
584 }
585
586 // Require that column representations match
587 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
588 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
589 if (srcNCols != dstNCols) {
590 std::stringstream ss;
591 ss << "Field `" << field.fSrc->GetFieldName()
592 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
593 << ", new: " << srcNCols << ")";
594 errors.push_back(ss.str());
595 } else {
596 for (auto i = 0u; i < srcNCols; ++i) {
597 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
598 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
599 const auto &srcCol = src.GetColumnDescriptor(srcColId);
600 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
601 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
602 // version of the same type, because we know how to treat that specific case. We should also properly handle
603 // different but compatible types.
604 if (srcCol.GetType() != dstCol.GetType() &&
605 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
606 std::stringstream ss;
607 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
608 << "` has a different column type of the same column on the previously-seen field with the same name "
609 "(old: "
610 << RColumnElementBase::GetColumnTypeName(srcCol.GetType())
611 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
612 errors.push_back(ss.str());
613 }
614 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
615 std::stringstream ss;
616 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
617 << "` has a different number of bits of the same column on the previously-seen field with the same "
618 "name "
619 "(old: "
620 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
621 errors.push_back(ss.str());
622 }
623 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
624 std::stringstream ss;
625 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
626 << "` has a different value range of the same column on the previously-seen field with the same name "
627 "(old: "
628 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
629 errors.push_back(ss.str());
630 }
631 if (srcCol.GetRepresentationIndex() > 0) {
632 std::stringstream ss;
633 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
634 << "` has a representation index higher than 0. This is not supported yet by the merger.";
635 errors.push_back(ss.str());
636 }
637 }
638 }
639
640 // Require that subfields are compatible
641 const auto &srcLinks = field.fSrc->GetLinkIds();
642 const auto &dstLinks = field.fDst->GetLinkIds();
643 if (srcLinks.size() != dstLinks.size()) {
644 std::stringstream ss;
645 ss << "Field `" << field.fSrc->GetFieldName()
646 << "` has a different number of children than previously-seen field with the same name (old: "
647 << dstLinks.size() << ", new: " << srcLinks.size() << ")";
648 errors.push_back(ss.str());
649 } else {
650 for (std::size_t linkIdx = 0, linkNum = srcLinks.size(); linkIdx < linkNum; ++linkIdx) {
651 const auto &srcSubfield = src.GetFieldDescriptor(srcLinks[linkIdx]);
652 const auto &dstSubfield = dst.GetFieldDescriptor(dstLinks[linkIdx]);
653 fieldsToCheck.push_back(RCommonField{srcSubfield, dstSubfield});
654 }
655 }
656 }
657
658 std::string errMsg;
659 for (const auto &err : errors)
660 errMsg += std::string("\n * ") + err;
661
662 if (!errMsg.empty())
663 errMsg = errMsg.substr(1); // strip initial newline
664
665 if (errMsg.length())
666 return R__FAIL(errMsg);
667
668 res.fCommonFields = std::move(commonFields);
669
670 return ROOT::RResult(res);
671}
672
673// Applies late model extension to `destination`, adding all `newFields` to it.
674[[nodiscard]]
675static ROOT::RResult<void>
676ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
677 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
678{
679 assert(newFields.size() > 0); // no point in calling this with 0 new cols
680
681 dstModel.Unfreeze();
682 ROOT::Internal::RNTupleModelChangeset changeset{dstModel};
683
684 if (mergeData.fMergeOpts.fExtraVerbose) {
685 std::string msg = "destination doesn't contain field";
686 if (newFields.size() > 1)
687 msg += 's';
688 msg += ' ';
689 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
690 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
691 });
692 R__LOG_INFO(NTupleMergeLog()) << msg << ": adding " << (newFields.size() > 1 ? "them" : "it")
693 << " to the destination model (entry #" << mergeData.fNumDstEntries << ").";
694 }
695
696 changeset.fAddedFields.reserve(newFields.size());
697 // First add all non-projected fields...
698 for (const auto *fieldDesc : newFields) {
699 if (!fieldDesc->IsProjectedField()) {
700 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
701 changeset.AddField(std::move(field));
702 }
703 }
704 // ...then add all projected fields.
705 for (const auto *fieldDesc : newFields) {
706 if (!fieldDesc->IsProjectedField())
707 continue;
708
710 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
711 const auto sourceId = fieldDesc->GetProjectionSourceId();
712 const auto &sourceField = dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(sourceId));
713 fieldMap[field.get()] = &sourceField;
714
715 for (const auto &subfield : *field) {
716 const auto &subFieldDesc = mergeData.fSrcDescriptor->GetFieldDescriptor(subfield.GetOnDiskId());
717 const auto subSourceId = subFieldDesc.GetProjectionSourceId();
718 const auto &subSourceField =
719 dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(subSourceId));
720 fieldMap[&subfield] = &subSourceField;
721 }
722 changeset.fAddedProjectedFields.emplace_back(field.get());
723 ROOT::Internal::GetProjectedFieldsOfModel(dstModel).Add(std::move(field), fieldMap);
724 }
725 dstModel.Freeze();
726 try {
727 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
728 } catch (const ROOT::RException &ex) {
729 return R__FAIL(ex.GetError().GetReport());
730 }
731
732 commonFields.reserve(commonFields.size() + newFields.size());
733 for (const auto *field : newFields) {
734 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
735 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
736 commonFields.emplace_back(*field, newFieldInDst);
737 }
738
740}
741
742// Generates default (zero) values for the given columns
743[[nodiscard]]
744static ROOT::RResult<void>
745GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
746 RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
747 const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
748{
749 if (!nEntriesToGenerate)
751
752 for (const auto &column : columns) {
753 const ROOT::RFieldDescriptor *field = column.fParentFieldDescriptor;
754
755 // Skip all auxiliary columns
756 assert(!field->GetLogicalColumnIds().empty());
757 if (field->GetLogicalColumnIds()[0] != column.fInputId)
758 continue;
759
760 // Check if this column is a child of a Collection or a Variant. If so, it has no data
761 // and can be skipped.
762 bool skipColumn = false;
763 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
764 for (auto parentId = field->GetParentId(); parentId != ROOT::kInvalidDescriptorId;) {
765 const ROOT::RFieldDescriptor &parent = column.fParentNTupleDescriptor->GetFieldDescriptor(parentId);
768 skipColumn = true;
769 break;
770 }
771 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
772 parentId = parent.GetParentId();
773 }
774 if (skipColumn)
775 continue;
776
777 const auto structure = field->GetStructure();
778
779 if (structure == ROOT::ENTupleStructure::kStreamer) {
780 return R__FAIL(
781 "Destination RNTuple contains a streamer field (" + field->GetFieldName() +
782 ") that is not present in one of the sources. "
783 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
784 }
785
786 // NOTE: we cannot have a Record here because it has no associated columns.
788 structure == ROOT::ENTupleStructure::kPlain);
789
790 const auto &columnDesc = dstDescriptor.GetColumnDescriptor(column.fOutputId);
791 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
792 const auto nElements = nEntriesToGenerate * nRepetitions;
793 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
794 // TODO(gparolini): make this configurable
795 constexpr auto kPageSizeLimit = 256 * 1024;
796 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
797 const size_t nPages = nBytesOnStorage / kPageSizeLimit + !!(nBytesOnStorage % kPageSizeLimit);
798 for (size_t i = 0; i < nPages; ++i) {
799 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
800 const auto checksumSize = RPageStorage::kNBytesPageChecksum;
801 const auto bufSize = pageSize + checksumSize;
802 assert(pageSize % colElement->GetSize() == 0);
803 const auto nElementsPerPage = pageSize / colElement->GetSize();
804 auto page = pageAlloc.NewPage(colElement->GetSize(), nElementsPerPage);
805 page.GrowUnchecked(nElementsPerPage);
806 memset(page.GetBuffer(), 0, page.GetNBytes());
807
808 auto &buffer = sealedPageData.fBuffers.emplace_back(new std::byte[bufSize]);
810 sealConf.fElement = colElement.get();
811 sealConf.fPage = &page;
812 sealConf.fBuffer = buffer.get();
813 sealConf.fCompressionSettings = mergeData.fMergeOpts.fCompressionSettings.value();
815 auto sealedPage = RPageSink::SealPage(sealConf);
816
817 sealedPageData.fPagesV.push_back({sealedPage});
818 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
819 sealedPageData.fPagesV.back().cend());
820 }
821 }
823}
824
825// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
826// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
827ROOT::RResult<void>
829 const ROOT::RClusterDescriptor &clusterDesc,
830 std::span<const RColumnMergeInfo> commonColumns,
831 const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster,
832 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData,
834{
835 assert(nCommonColumnsInCluster == commonColumnSet.size());
836 assert(nCommonColumnsInCluster <= commonColumns.size());
837 if (nCommonColumnsInCluster == 0)
839
840 const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
841 // we expect the cluster pool to contain the requested set of columns, since they were
842 // validated by CompareDescriptorStructure().
843 assert(cluster);
844
845 for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
846 const auto &column = commonColumns[colIdx];
847 const auto &columnId = column.fInputId;
848 R__ASSERT(clusterDesc.ContainsColumn(columnId));
849
850 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
851 const auto srcColElement = column.fInMemoryType
852 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
853 : RColumnElementBase::Generate(columnDesc.GetType());
854 const auto dstColElement = column.fInMemoryType
855 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
856 : RColumnElementBase::Generate(column.fColumnType);
857
858 // Now get the pages for this column in this cluster
859 const auto &pages = clusterDesc.GetPageRange(columnId);
860
862 sealedPages.resize(pages.GetPageInfos().size());
863
864 // Each column range potentially has a distinct compression settings
865 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value();
866
867 // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
868 // L1: compression and encoding of src and dest both match: we can simply copy the page
869 // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
870 // resealing it.
871 // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
872 // it.
873 const bool compressionIsDifferent =
874 colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value();
875 const bool needsResealing =
876 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
877 const bool needsRecompressing = compressionIsDifferent || needsResealing;
878
879 if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) {
881 << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName
882 << ": { compression: " << colRangeCompressionSettings << " => "
883 << mergeData.fMergeOpts.fCompressionSettings.value()
884 << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType)
885 << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType);
886 }
887
888 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
889 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
890 // bother reserving memory for them.
891 if (needsRecompressing)
892 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size());
893
894 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
895 // with zeroes, like we do for extraDstColumns.
896 // As an optimization, we don't do this for the first source (since we can rely on the FirstElementIndex and
897 // deferred column mechanism in that case).
898 // TODO: also avoid doing this if we added no real page of this column to the destination yet.
899 if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
900 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
901 auto res = GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc,
902 mergeData.fDstDescriptor, mergeData);
903 if (!res)
904 return R__FORWARD_ERROR(res);
905 }
906
907 // Loop over the pages
908 std::uint64_t pageIdx = 0;
909 for (const auto &pageInfo : pages.GetPageInfos()) {
910 assert(pageIdx < sealedPages.size());
911 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
912 assert(pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero);
913
914 ROOT::Internal::ROnDiskPage::Key key{columnId, pageIdx};
915 auto onDiskPage = cluster->GetOnDiskPage(key);
916
917 const auto checksumSize = pageInfo.HasChecksum() * RPageStorage::kNBytesPageChecksum;
918 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
919 sealedPage.SetNElements(pageInfo.GetNElements());
920 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
921 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
922 sealedPage.SetBuffer(onDiskPage->GetAddress());
923 // TODO(gparolini): more graceful error handling (skip the page?)
925 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
926
927 if (needsRecompressing) {
928 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
929 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
930 const auto bufSize = uncompressedSize + checksumSize;
931 // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
932 // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
933 // the actual data size after recompressing.
934 buffer = MakeUninitArray<std::byte>(bufSize);
935
936 // clang-format off
937 if (needsResealing) {
938 RTaskVisitor{fTaskGroup}(RResealFunc{
939 *srcColElement,
940 *dstColElement,
941 mergeData.fMergeOpts,
942 sealedPage,
943 *fPageAlloc,
944 buffer.get(),
945 bufSize,
946 mergeData.fDestination.GetWriteOptions()
947 });
948 } else {
949 RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{
950 *srcColElement,
951 *dstColElement,
952 mergeData.fMergeOpts,
953 sealedPage,
954 *fPageAlloc,
955 buffer.get(),
956 bufSize,
957 mergeData.fDestination.GetWriteOptions()
958 });
959 }
960 // clang-format on
961 }
962
963 ++pageIdx;
964
965 } // end of loop over pages
966
967 if (fTaskGroup)
968 fTaskGroup->Wait();
969
970 sealedPageData.fPagesV.push_back(std::move(sealedPages));
971 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
972 sealedPageData.fPagesV.back().cend());
973 } // end loop over common columns
974
976}
977
978// Iterates over all clusters of `source` and merges their pages into `destination`.
979// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
980// the destination's schemas.
981// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
982// compression is unspecified or matches the original compression settings.
984RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
985 std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
986{
987 ROOT::Internal::RClusterPool clusterPool{source};
988
989 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
990
991 // Loop over all clusters in this file.
992 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
993 // request the first cluster.
994 ROOT::DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
995 while (clusterId != ROOT::kInvalidDescriptorId) {
996 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
997 const auto nClusterEntries = clusterDesc.GetNEntries();
998 R__ASSERT(nClusterEntries > 0);
999
1000 // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it,
1001 // as it may be a deferred column that only has real data in a future cluster.
1002 // We need to figure out which columns are actually present in this cluster so we only merge their pages (the
1003 // missing columns are handled by synthesizing zero pages - see below).
1004 size_t nCommonColumnsInCluster = commonColumns.size();
1005 while (nCommonColumnsInCluster > 0) {
1006 // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as
1007 // soon as we find a common column that appears in this cluster: we know that in that case all previous
1008 // columns must appear as well.
1009 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
1010 break;
1011 --nCommonColumnsInCluster;
1012 }
1013
1014 // Convert columns to a ColumnSet for the ClusterPool query
1015 RCluster::ColumnSet_t commonColumnSet;
1016 commonColumnSet.reserve(nCommonColumnsInCluster);
1017 for (size_t i = 0; i < nCommonColumnsInCluster; ++i)
1018 commonColumnSet.emplace(commonColumns[i].fInputId);
1019
1020 // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns
1021 // that are not present in the cluster. We generate zero pages for all of them.
1022 missingColumns.resize(extraDstColumns.size());
1023 for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
1024 missingColumns.push_back(commonColumns[i]);
1025
1026 RSealedPageMergeData sealedPageData;
1027 auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
1028 sealedPageData, mergeData, *fPageAlloc);
1029 if (!res)
1030 return R__FORWARD_ERROR(res);
1031
1032 res = GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
1033 mergeData.fDstDescriptor, mergeData);
1034 if (!res)
1035 return R__FORWARD_ERROR(res);
1036
1037 // Commit the pages and the clusters
1038 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
1039 mergeData.fDestination.CommitCluster(nClusterEntries);
1040 mergeData.fNumDstEntries += nClusterEntries;
1041
1042 // Go to the next cluster
1043 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
1044 }
1045
1046 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
1047 // the size of the running page list and commit a cluster group when it exceeds some threshold,
1048 // which would prevent the page list from getting too large.
1049 // However, as of today, we aren't really handling such huge files, and even relatively big ones
1050 // such as the CMS dataset have a page list size of about only 2 MB.
1051 // So currently we simply merge all cluster groups into one.
1053}
1054
1055static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
1056{
1057 if (onDiskType == ENTupleColumnType::kIndex32 || onDiskType == ENTupleColumnType::kSplitIndex32 ||
1059 return typeid(ROOT::Internal::RColumnIndex);
1060
1061 if (onDiskType == ENTupleColumnType::kSwitch)
1062 return typeid(ROOT::Internal::RColumnSwitch);
1063
1064 // clang-format off
1065 if (fieldType == "bool") return typeid(bool);
1066 if (fieldType == "std::byte") return typeid(std::byte);
1067 if (fieldType == "char") return typeid(char);
1068 if (fieldType == "std::int8_t") return typeid(std::int8_t);
1069 if (fieldType == "std::uint8_t") return typeid(std::uint8_t);
1070 if (fieldType == "std::int16_t") return typeid(std::int16_t);
1071 if (fieldType == "std::uint16_t") return typeid(std::uint16_t);
1072 if (fieldType == "std::int32_t") return typeid(std::int32_t);
1073 if (fieldType == "std::uint32_t") return typeid(std::uint32_t);
1074 if (fieldType == "std::int64_t") return typeid(std::int64_t);
1075 if (fieldType == "std::uint64_t") return typeid(std::uint64_t);
1076 if (fieldType == "float") return typeid(float);
1077 if (fieldType == "double") return typeid(double);
1078 // clang-format on
1079
1080 // if the type is not one of those above, we use the default in-memory type.
1081 return std::nullopt;
1082}
1083
1084// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its
1085// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column
1086// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor
1087// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should
1088// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and
1089// dstFieldDesc may alias.
1090static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const ROOT::RNTupleDescriptor &srcDesc,
1091 RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc,
1092 const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
1093{
1094 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
1095
1096 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
1097 columns.reserve(columns.size() + columnIds.size());
1098 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
1099 // different column representations.
1100 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
1101 // We don't want to try and merge alias columns
1102 if (srcFieldDesc.IsProjectedField())
1103 continue;
1104
1105 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
1106 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
1107
1108 RColumnMergeInfo info{};
1109 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
1110 info.fInputId = srcColumn.GetPhysicalId();
1111 // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations:
1112 // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and
1113 // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
1114 // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
1115 // but for the second case they're not, and we need to pick the source field because we will then check the
1116 // column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see
1117 // GenerateZeroPagesForColumns()).
1118 info.fParentFieldDescriptor = &srcFieldDesc;
1119 // Save the parent field descriptor since this may be either the source or destination descriptor depending on
1120 // whether this is an extraDstField or a commonField. We will need this in GenerateZeroPagesForColumns() to
1121 // properly walk up the field hierarchy.
1122 info.fParentNTupleDescriptor = &srcDesc;
1123
1124 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
1125 info.fOutputId = it->second.fColumnId;
1126 info.fColumnType = it->second.fColumnType;
1127 } else {
1128 info.fOutputId = mergeData.fColumnIdMap.size();
1129 // NOTE(gparolini): map the type of src column to the type of dst column.
1130 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
1131 // on-disk representation of the same column.
1132 // This is also important to do for first source when it is used to generate the destination sink,
1133 // because even in that case their column representations may differ.
1134 // e.g. if the destination has a different compression than the source, an integer column might be
1135 // zigzag-encoded in the source but not in the destination.
1136 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
1137 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
1138 info.fColumnType = dstColumn.GetType();
1139 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
1140 }
1141
1142 if (mergeData.fMergeOpts.fExtraVerbose) {
1143 R__LOG_INFO(NTupleMergeLog()) << "Adding column " << info.fColumnName << " with log.id " << srcColumnId
1144 << ", phys.id " << srcColumn.GetPhysicalId() << ", type "
1145 << RColumnElementBase::GetColumnTypeName(srcColumn.GetType()) << " -> log.id "
1146 << info.fOutputId << ", type "
1147 << RColumnElementBase::GetColumnTypeName(info.fColumnType);
1148 }
1149
1150 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
1151 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
1152 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
1153 columns.emplace_back(info);
1154 }
1155
1156 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
1157 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
1158 assert(srcChildrenIds.size() == dstChildrenIds.size());
1159 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
1160 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
1161 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
1162 AddColumnsFromField(columns, srcDesc, mergeData, srcChild, dstChild, name);
1163 }
1164}
1165
1166// Converts the fields comparison data to the corresponding column information.
1167// While doing so, it collects such information in `mergeData.fColumnIdMap`, which is used by later calls to this
1168// function to map already-seen column names to their chosen outputId, type and so on.
1169static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc,
1170 RNTupleMergeData &mergeData)
1171{
1172 RColumnInfoGroup res;
1173 for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) {
1174 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
1175 }
1176 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
1177 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
1178 }
1179
1180 // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has
1181 // (since each cluster must contain all columns of the previous cluster plus potentially some new ones)
1182 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1183 [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; });
1184
1185 return res;
1186}
1187
1188static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc,
1189 ColumnIdMap_t &colIdMap, const std::string &prefix = "")
1190{
1191 std::string name = prefix + '.' + fieldDesc.GetFieldName();
1192 for (const auto &colId : fieldDesc.GetLogicalColumnIds()) {
1193 const auto &colDesc = desc.GetColumnDescriptor(colId);
1194 RColumnOutInfo info{};
1195 const auto colName = name + '.' + std::to_string(colDesc.GetIndex());
1196 info.fColumnId = colDesc.GetLogicalId();
1197 info.fColumnType = colDesc.GetType();
1198 colIdMap[colName] = info;
1199 }
1200
1201 for (const auto &subId : fieldDesc.GetLinkIds()) {
1202 const auto &subfield = desc.GetFieldDescriptor(subId);
1203 PrefillColumnMap(desc, subfield, colIdMap, name);
1204 }
1205}
1206
1207RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination,
1208 std::unique_ptr<ROOT::RNTupleModel> model)
1209 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
1210 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
1211 : fDestination(std::move(destination)),
1212 fPageAlloc(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()),
1213 fModel(std::move(model))
1214{
1216
1217#ifdef R__USE_IMT
1220#endif
1221}
1222
1223RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination)
1224 : RNTupleMerger(std::move(destination), nullptr)
1225{
1226}
1227
1228ROOT::RResult<void> RNTupleMerger::Merge(std::span<RPageSource *> sources, const RNTupleMergeOptions &mergeOptsIn)
1229{
1230 RNTupleMergeOptions mergeOpts = mergeOptsIn;
1231
1232 assert(fDestination);
1233
1234 // Set compression settings if unset and verify it's compatible with the sink
1235 {
1236 const auto dstCompSettings = fDestination->GetWriteOptions().GetCompression();
1237 if (!mergeOpts.fCompressionSettings) {
1238 mergeOpts.fCompressionSettings = dstCompSettings;
1239 } else if (*mergeOpts.fCompressionSettings != dstCompSettings) {
1240 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
1241 "sink! (opts: ") +
1242 std::to_string(*mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
1243 ") This is currently unsupported.");
1244 }
1245 }
1246
1247 // we should have a model if and only if the destination is initialized.
1248 if (!!fModel != fDestination->IsInitialized()) {
1249 return R__FAIL(
1250 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1251 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1252 }
1253
1254 RNTupleMergeData mergeData{sources, *fDestination, mergeOpts};
1255 mergeData.fNumDstEntries = mergeData.fDestination.GetNEntries();
1256
1257 if (fModel) {
1258 // If this is an incremental merging, pre-fill the column id map with the existing destination ids.
1259 // Otherwise we would generate new output ids that may not match the ones in the destination!
1260 for (const auto &field : mergeData.fDstDescriptor.GetTopLevelFields()) {
1261 PrefillColumnMap(fDestination->GetDescriptor(), field, mergeData.fColumnIdMap);
1262 }
1263 }
1264
1265#define SKIP_OR_ABORT(errMsg) \
1266 do { \
1267 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1268 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1269 continue; \
1270 } else { \
1271 return R__FAIL(errMsg); \
1272 } \
1273 } while (0)
1274
1275 // Merge main loop
1276 for (RPageSource *source : sources) {
1277 // We need to make sure the streamer info from the source files is loaded otherwise we may not be able
1278 // to build the streamer info of user-defined types unless we have their dictionaries available.
1279 source->LoadStreamerInfo();
1280
1281 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1282 auto srcDescriptor = source->GetSharedDescriptorGuard();
1283 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
1284
1285 if (mergeData.fSrcDescriptor->GetVersion() > ROOT::RNTuple::GetCurrentVersion()) {
1288 << "RNTuple '" << mergeData.fSrcDescriptor->GetName()
1289 << "' has a higher format version than the latest supported by this version "
1290 "of ROOT. Merging will work but some features may be dropped.";
1291 } else {
1292 return R__FAIL("RNTuple '" + mergeData.fSrcDescriptor->GetName() +
1293 "' has a higher format version than the latest supported by this version. Refusing to "
1294 "merge, since RNTupleMergeOptions::fVersionBehavior is set to AbortOnHigherVersion.");
1295 }
1296 }
1297
1298 // Create sink from the input model if not initialized
1299 if (!fModel) {
1300 fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */);
1301 }
1302
1303 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1304 fDestination->UpdateExtraTypeInfo(extraTypeInfoDesc);
1305
1306 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
1307 if (!descCmpRes) {
1308 SKIP_OR_ABORT(std::string("Source RNTuple has an incompatible schema with the destination:\n") +
1309 descCmpRes.GetError()->GetReport());
1310 }
1311 auto descCmp = descCmpRes.Unwrap();
1312
1313 // If the current source is missing some fields and we're not in Union mode, error
1314 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
1315 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
1316 std::string msg = "Source RNTuple is missing the following fields:";
1317 for (const auto *field : descCmp.fExtraDstFields) {
1318 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1319 }
1320 SKIP_OR_ABORT(msg);
1321 }
1322
1323 // handle extra src fields
1324 if (descCmp.fExtraSrcFields.size()) {
1325 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
1326 // late model extension for all fExtraSrcFields in Union mode
1327 auto res = ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
1328 if (!res)
1329 return R__FORWARD_ERROR(res);
1330 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
1331 // If the current source has extra fields and we're in Strict mode, error
1332 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1333 for (const auto *field : descCmp.fExtraSrcFields) {
1334 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1335 }
1336 SKIP_OR_ABORT(msg);
1337 }
1338 }
1339
1340 // handle extra dst fields & common fields
1341 auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
1342 auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1343 if (!res)
1344 return R__FORWARD_ERROR(res);
1345 } // end loop over sources
1346
1347 if (fDestination->GetNEntries() == 0)
1348 R__LOG_WARNING(NTupleMergeLog()) << "Output RNTuple '" << fDestination->GetNTupleName() << "' has no entries.";
1349
1350 // Commit the output
1351 fDestination->CommitClusterGroup();
1352 fDestination->CommitDataset();
1353
1354 return RResult<void>::Success();
1355}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:304
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define R__LOG_WARNING(...)
Definition RLogger.hxx:357
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
#define R__LOG_INFO(...)
Definition RLogger.hxx:358
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static std::optional< ENTupleMergeVersionBehavior > ParseOptionVersionBehavior(const TString &opts)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Ssiz_t
String size (currently int)
Definition RtypesCore.h:81
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:148
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAlloc
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:31
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
virtual RIdentifier GetIdentifier() const =0
std::size_t GetPackedSize(std::size_t nElements=1U) const
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
void * GrowUnchecked(std::uint32_t nElements)
Increases the number elements in the page.
Definition RPage.hxx:150
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
std::optional< std::uint32_t > GetCompressionSettings() const
Metadata for RNTuple clusters.
ROOT::NTupleSize_t GetNEntries() const
ROOT::DescriptorId_t GetId() const
const RPageRange & GetPageRange(ROOT::DescriptorId_t physicalId) const
bool ContainsColumn(ROOT::DescriptorId_t physicalId) const
const RColumnRange & GetColumnRange(ROOT::DescriptorId_t physicalId) const
ROOT::NTupleSize_t GetFirstEntryIndex() const
ROOT::ENTupleColumnType GetType() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Metadata stored for every field of an RNTuple.
const std::vector< ROOT::DescriptorId_t > & GetLogicalColumnIds() const
ROOT::ENTupleStructure GetStructure() const
const std::vector< ROOT::DescriptorId_t > & GetLinkIds() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
ROOT::DescriptorId_t GetProjectionSourceId() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
A log configuration for a channel, e.g.
Definition RLogger.hxx:97
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
RFieldDescriptorIterable GetTopLevelFields() const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
The RNTupleModel encapulates the schema of an RNTuple.
void Unfreeze()
Transitions an RNTupleModel from the frozen state back to the building state, invalidating all previo...
void Freeze()
Transitions an RNTupleModel from the building state to the frozen state, disabling adding additional ...
const ROOT::RFieldBase & GetConstField(std::string_view fieldName) const
Common user-tunable settings for storing RNTuples.
void SetCompression(std::uint32_t val)
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:68
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
static constexpr std::uint64_t GetCurrentVersion()
Returns the RNTuple version in the following form: Epoch: 2 most significant bytes Major: next 2 byte...
Definition RNTuple.hxx:90
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:290
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:198
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page. This is commonly used in derived, concrete page sources....
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A class to pass information from the TFileMerger to the objects being merged.
TString fOptions
Additional text based option being passed down to customize the merge.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Definition TFile.h:130
Int_t GetCompressionSettings() const
Definition TFile.h:489
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:105
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
Mother of all ROOT objects.
Definition TObject.h:42
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:227
Basic string class.
Definition TString.h:138
Ssiz_t Length() const
Definition TString.h:425
@ kIgnoreCase
Definition TString.h:285
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:641
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition TString.h:660
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
@ kFilter
The merger will discard all columns that aren't present in the prototype model (i....
@ kAbortOnHigherVersion
The merger will refuse to merge RNTuples with higher versions than the latest supported by this ROOT ...
@ kWarnOnHigherVersion
The merger will emit a warning when merging RNTuples with higher version than the latest supported by...
@ kAbort
The merger will abort merging as soon as an error is encountered.
@ kSkip
Upon errors, the merger will skip the current source and continue.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:669
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleColumnType
const ROOT::RFieldDescriptor * fParentFieldDescriptor
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
const ROOT::RNTupleDescriptor * fSrcDescriptor
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
ENTupleMergeVersionBehavior fVersionBehavior
Determines how the Merge function behaves depending on the RNTuple sources' version.
ENTupleMergeErrBehavior fErrBehavior
Determines how the Merge function behaves upon merging errors.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::vector< std::unique_ptr< std::byte[]> > fBuffers
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
The incremental changes to a RNTupleModel
std::vector< ROOT::RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
void SetNElements(std::uint32_t nElements)
void SetBufferSize(std::size_t bufferSize)
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
const void * GetBuffer() const
std::size_t GetDataSize() const