Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RNTupleUtils.hxx>
26#include <ROOT/RPageStorage.hxx>
27#include <ROOT/RClusterPool.hxx>
29#include <ROOT/RNTupleZip.hxx>
31#include <TROOT.h>
32#include <TFileMergeInfo.h>
33#include <TFile.h>
34#include <TKey.h>
35
36#include <algorithm>
37#include <deque>
38#include <initializer_list>
39#include <unordered_map>
40#include <vector>
41
52
53using namespace ROOT::Experimental::Internal;
54
56{
57 static ROOT::RLogChannel sLog("ROOT.NTuple.Merge");
58 return sLog;
59}
60
61// TFile options parsing
62// -------------------------------------------------------------------------------------
63static bool BeginsWithDelimitedWord(const TString &str, const char *word)
64{
65 const Ssiz_t wordLen = strlen(word);
66 if (str.Length() < wordLen)
67 return false;
68 if (!str.BeginsWith(word, TString::ECaseCompare::kIgnoreCase))
69 return false;
70 return str.Length() == wordLen || str(wordLen) == ' ';
71}
72
73template <typename T>
74static std::optional<T> ParseStringOption(const TString &opts, const char *pattern,
75 std::initializer_list<std::pair<const char *, T>> validValues)
76{
77 const Ssiz_t patternLen = strlen(pattern);
78 assert(pattern[patternLen - 1] == '='); // we want to parse options with the format `option=Value`
79 if (auto idx = opts.Index(pattern, 0, TString::ECaseCompare::kIgnoreCase);
80 idx >= 0 && opts.Length() > idx + patternLen) {
81 auto sub = TString(opts(idx + patternLen, opts.Length() - idx - patternLen));
82 for (const auto &[name, value] : validValues) {
83 if (BeginsWithDelimitedWord(sub, name)) {
84 return value;
85 }
86 }
87 }
88 return std::nullopt;
89}
90
91static std::optional<ENTupleMergingMode> ParseOptionMergingMode(const TString &opts)
92{
93 return ParseStringOption<ENTupleMergingMode>(opts, "rntuple.MergingMode=",
94 {
95 {"Filter", ENTupleMergingMode::kFilter},
96 {"Union", ENTupleMergingMode::kUnion},
97 {"Strict", ENTupleMergingMode::kStrict},
98 });
99}
100
101static std::optional<ENTupleMergeErrBehavior> ParseOptionErrBehavior(const TString &opts)
102{
103 return ParseStringOption<ENTupleMergeErrBehavior>(opts, "rntuple.ErrBehavior=",
104 {
105 {"Abort", ENTupleMergeErrBehavior::kAbort},
106 {"Skip", ENTupleMergeErrBehavior::kSkip},
107 });
108}
109
110static std::optional<ENTupleMergeVersionBehavior> ParseOptionVersionBehavior(const TString &opts)
111{
113 opts, "rntuple.VersionBehavior=",
114 {
115 {"WarnOnHigherVersion", ENTupleMergeVersionBehavior::kWarnOnHigherVersion},
116 {"AbortOnHigherVersion", ENTupleMergeVersionBehavior::kAbortOnHigherVersion},
117 });
118}
119// -------------------------------------------------------------------------------------
120
121// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
123// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
124try {
125 // Check the inputs
126 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
127 R__LOG_ERROR(NTupleMergeLog()) << "Invalid inputs.";
128 return -1;
129 }
130
131 // Parse the input parameters
133
134 // First entry is the RNTuple name
135 std::string ntupleName = std::string(itr()->GetName());
136
137 // Second entry is the output file
138 TObject *secondArg = itr();
139 TFile *outFile = dynamic_cast<TFile *>(secondArg);
140 if (!outFile) {
141 R__LOG_ERROR(NTupleMergeLog()) << "Second input parameter should be a TFile, but it's a "
142 << secondArg->ClassName() << ".";
143 return -1;
144 }
145
146 // Check if the output file already has a key with that name
147 TKey *outKey = outFile->FindKey(ntupleName.c_str());
148 ROOT::RNTuple *outNTuple = nullptr;
149 if (outKey) {
150 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
151 if (!outNTuple) {
152 R__LOG_ERROR(NTupleMergeLog()) << "Output file already has key, but not of type RNTuple!";
153 return -1;
154 }
155 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
156 // pointer we just got.
157 }
158
159 const bool defaultComp = mergeInfo->fOptions.Contains("DefaultCompression");
160 const bool firstSrcComp = mergeInfo->fOptions.Contains("FirstSrcCompression");
161 const bool extraVerbose = mergeInfo->fOptions.Contains("rntuple.ExtraVerbose");
162 if (defaultComp && firstSrcComp) {
163 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code.
164 R__LOG_WARNING(NTupleMergeLog()) << "Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
165 "only the latter will apply.";
166 }
167 std::optional<std::uint32_t> compression;
168 if (firstSrcComp) {
169 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
170 // (do nothing here, the compression will be fetched below)
171 } else if (!defaultComp) {
172 // compression was explicitly passed by the user: use it.
173 compression = outFile->GetCompressionSettings();
174 } else {
175 // user passed no compression-related options: use default
177 R__LOG_INFO(NTupleMergeLog()) << "Using the default compression: " << *compression;
178 }
179
180 // The remaining entries are the input files
181 std::vector<std::unique_ptr<RPageSourceFile>> sources;
182 std::vector<RPageSource *> sourcePtrs;
183
184 while (const auto &pitr = itr()) {
185 TFile *inFile = dynamic_cast<TFile *>(pitr);
186 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
187 if (!anchor) {
188 R__LOG_INFO(NTupleMergeLog()) << "No RNTuple anchor named '" << ntupleName << "' from file '"
189 << inFile->GetName() << "'";
190 continue;
191 }
192
194 if (!compression) {
195 // Get the compression of this RNTuple and use it as the output compression.
196 // We currently assume all column ranges have the same compression, so we just peek at the first one.
197 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
198 auto descriptor = source->GetSharedDescriptorGuard();
199 auto clusterIter = descriptor->GetClusterIterable();
201 if (firstCluster == clusterIter.end()) {
203 << "Asked to use the first source's compression as the output compression, but the "
204 "first source (file '"
205 << inFile->GetName()
206 << "') has an empty RNTuple, therefore the output compression could not be "
207 "determined.";
208 return -1;
209 }
210 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
212 if (firstColRange == colRangeIter.end()) {
214 << "Asked to use the first source's compression as the output compression, but the "
215 "first source (file '"
216 << inFile->GetName()
217 << "') has an empty RNTuple, therefore the output compression could not be "
218 "determined.";
219 return -1;
220 }
221 compression = (*firstColRange).GetCompressionSettings().value();
222 R__LOG_INFO(NTupleMergeLog()) << "Using the first RNTuple's compression: " << *compression;
223 }
224 sources.push_back(std::move(source));
225 }
226
229 writeOpts.SetCompression(*compression);
230 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
231 std::unique_ptr<ROOT::RNTupleModel> model;
232 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
233 if (outNTuple) {
235 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
236 auto desc = outSource->GetSharedDescriptorGuard();
237 model = destination->InitFromDescriptor(desc.GetRef(), true /* copyClusters */);
238 }
239
240 // Interface conversion
241 sourcePtrs.reserve(sources.size());
242 for (const auto &s : sources) {
243 sourcePtrs.push_back(s.get());
244 }
245
246 // Now merge
247 RNTupleMerger merger{std::move(destination), std::move(model)};
249 mergerOpts.fCompressionSettings = *compression;
250 mergerOpts.fExtraVerbose = extraVerbose;
251 if (auto mergingMode = ParseOptionMergingMode(mergeInfo->fOptions)) {
252 mergerOpts.fMergingMode = *mergingMode;
253 }
254 if (auto errBehavior = ParseOptionErrBehavior(mergeInfo->fOptions)) {
255 mergerOpts.fErrBehavior = *errBehavior;
256 }
258 mergerOpts.fVersionBehavior = *versionBehavior;
259 }
260 merger.Merge(sourcePtrs, mergerOpts).ThrowOnError();
261
262 // Provide the caller with a merged anchor object (even though we've already
263 // written it).
264 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
265
266 return 0;
267} catch (const std::exception &ex) {
268 R__LOG_ERROR(NTupleMergeLog()) << "Exception thrown while merging: " << ex.what();
269 return -1;
270}
271
272namespace {
273// Functor used to change the compression of a page to `options.fCompressionSettings`.
274struct RChangeCompressionFunc {
275 const RColumnElementBase &fSrcColElement;
276 const RColumnElementBase &fDstColElement;
277 const RNTupleMergeOptions &fMergeOptions;
278
279 RPageStorage::RSealedPage &fSealedPage;
281 std::byte *fBuffer;
282 std::size_t fBufSize;
283 const ROOT::RNTupleWriteOptions &fWriteOpts;
284
285 void operator()() const
286 {
287 assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier());
288
290
291 const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements());
292 const auto compression = fMergeOptions.fCompressionSettings.value_or(0);
293 // TODO: this buffer could be kept and reused across pages
294 std::unique_ptr<std::byte[]> unzipBufOwned;
295 std::byte *unzipBuf;
296 if (compression != 0) {
298 unzipBuf = unzipBufOwned.get();
299 } else {
301 }
303 unzipBuf);
304
305 const auto checksumSize = fWriteOpts.GetEnablePageChecksums() * sizeof(std::uint64_t);
306 std::size_t nBytesZipped;
307 if (compression != 0) {
309 assert(fBufSize >= bytesPacked + checksumSize);
311 } else {
313 }
314
315 fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()};
316 fSealedPage.ChecksumIfEnabled();
317 }
318};
319
320struct RResealFunc {
321 const RColumnElementBase &fSrcColElement;
322 const RColumnElementBase &fDstColElement;
323 const RNTupleMergeOptions &fMergeOptions;
324
325 RPageStorage::RSealedPage &fSealedPage;
327 std::byte *fBuffer;
328 std::size_t fBufSize;
329 const ROOT::RNTupleWriteOptions &fWriteOpts;
330
331 void operator()() const
332 {
333 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
335 sealConf.fElement = &fDstColElement;
336 sealConf.fPage = &page;
337 sealConf.fBuffer = fBuffer;
338 sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings;
339 sealConf.fWriteChecksum = fWriteOpts.GetEnablePageChecksums();
340 assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t));
341 auto refSealedPage = RPageSink::SealPage(sealConf);
342 fSealedPage = refSealedPage;
343 }
344};
345
346struct RTaskVisitor {
347 std::optional<ROOT::Experimental::TTaskGroup> &fGroup;
348
349 template <typename T>
350 void operator()(T &&f)
351 {
352 if (fGroup)
353 fGroup->Run(f);
354 else
355 f();
356 }
357};
358
359struct RCommonField {
360 const ROOT::RFieldDescriptor *fSrc;
361 const ROOT::RFieldDescriptor *fDst;
362
363 RCommonField(const ROOT::RFieldDescriptor &src, const ROOT::RFieldDescriptor &dst) : fSrc(&src), fDst(&dst) {}
364};
365
366struct RDescriptorsComparison {
367 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
368 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
369 std::vector<RCommonField> fCommonFields;
370};
371
372struct RColumnOutInfo {
373 ROOT::DescriptorId_t fColumnId;
374 ENTupleColumnType fColumnType;
375};
376
377// { fully.qualified.fieldName.colInputId => colOutputInfo }
378using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
379
380struct RColumnInfoGroup {
381 std::vector<RColumnMergeInfo> fExtraDstColumns;
382 // These are sorted by InputId
383 std::vector<RColumnMergeInfo> fCommonColumns;
384};
385
386} // namespace
387
388// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
391 // This column name is built as a dot-separated concatenation of the ancestry of
392 // the columns' parent fields' names plus the index of the column itself.
393 // e.g. "Muon.pt.x._0"
394 std::string fColumnName;
395 // The column id in the source RNTuple
397 // The corresponding column id in the destination RNTuple (the mapping happens in AddColumnsFromField())
400 // If nullopt, use the default in-memory type
401 std::optional<std::type_index> fInMemoryType;
404};
405
406// Data related to a single call of RNTupleMerger::Merge()
424
426 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
427 // never invalidated.
428 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
429 std::vector<RPageStorage::RSealedPageGroup> fGroups;
430 std::vector<std::unique_ptr<std::byte[]>> fBuffers;
431};
432
433std::ostream &operator<<(std::ostream &os, const std::optional<ROOT::RColumnDescriptor::RValueRange> &x)
434{
435 if (x) {
436 os << '(' << x->fMin << ", " << x->fMax << ')';
437 } else {
438 os << "(null)";
439 }
440 return os;
441}
442
443} // namespace ROOT::Experimental::Internal
444
446{
447 // clang-format off
468 // clang-format on
469 return false;
470}
471
472/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
473/// In addition, returns the differences between `dst` and `src`'s structures
476{
477 // Cases:
478 // 1. dst == src
479 // 2. dst has fields that src hasn't
480 // 3. src has fields that dst hasn't
481 // 4. dst and src have fields that differ (compatible or incompatible)
482
483 std::vector<std::string> errors;
484 RDescriptorsComparison res;
485
486 std::vector<RCommonField> commonFields;
487
488 for (const auto &dstField : dst.GetTopLevelFields()) {
489 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
491 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
492 commonFields.push_back({srcField, dstField});
493 } else {
494 res.fExtraDstFields.emplace_back(&dstField);
495 }
496 }
497 for (const auto &srcField : src.GetTopLevelFields()) {
498 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
500 res.fExtraSrcFields.push_back(&srcField);
501 }
502
503 // Check compatibility of common fields
505 // NOTE: using index-based for loop because the collection may get extended by the iteration
506 for (std::size_t fieldIdx = 0; fieldIdx < fieldsToCheck.size(); ++fieldIdx) {
507 const auto &field = fieldsToCheck[fieldIdx];
508
509 // NOTE: field.fSrc and field.fDst have the same name by construction
510 const auto &fieldName = field.fSrc->GetFieldName();
511
512 // Require that fields are both projected or both not projected
513 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
514 if (!projCompatible) {
515 std::stringstream ss;
516 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
517 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
518 errors.push_back(ss.str());
519 } else if (field.fSrc->IsProjectedField()) {
520 // if both fields are projected, verify that they point to the same real field
521 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
522 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
523 if (srcName != dstName) {
524 std::stringstream ss;
525 ss << "Field `" << fieldName
526 << "` is projected to a different field than a previously-seen field with the same name (old: "
527 << dstName << ", new: " << srcName << ")";
528 errors.push_back(ss.str());
529 }
530 }
531
532 // Require that fields types match
533 // TODO(gparolini): allow non-identical but compatible types
534 const auto &srcTyName = field.fSrc->GetTypeName();
535 const auto &dstTyName = field.fDst->GetTypeName();
536 if (srcTyName != dstTyName) {
537 std::stringstream ss;
538 ss << "Field `" << fieldName
539 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
540 << ", new: " << srcTyName << ")";
541 errors.push_back(ss.str());
542 }
543
544 // Require that type checksums match
545 const auto srcTyChk = field.fSrc->GetTypeChecksum();
546 const auto dstTyChk = field.fDst->GetTypeChecksum();
547 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
548 std::stringstream ss;
549 ss << "Field `" << field.fSrc->GetFieldName()
550 << "` has a different type checksum than previously-seen field with the same name";
551 errors.push_back(ss.str());
552 }
553
554 // Require that type versions match
555 const auto srcTyVer = field.fSrc->GetTypeVersion();
556 const auto dstTyVer = field.fDst->GetTypeVersion();
557 if (srcTyVer != dstTyVer) {
558 std::stringstream ss;
559 ss << "Field `" << field.fSrc->GetFieldName()
560 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
561 << ", new: " << srcTyVer << ")";
562 errors.push_back(ss.str());
563 }
564
565 // Require that field versions match
566 const auto srcFldVer = field.fSrc->GetFieldVersion();
567 const auto dstFldVer = field.fDst->GetFieldVersion();
568 if (srcFldVer != dstFldVer) {
569 std::stringstream ss;
570 ss << "Field `" << field.fSrc->GetFieldName()
571 << "` has a different field version than previously-seen field with the same name (old: " << dstFldVer
572 << ", new: " << srcFldVer << ")";
573 errors.push_back(ss.str());
574 }
575
576 const auto srcRole = field.fSrc->GetStructure();
577 const auto dstRole = field.fDst->GetStructure();
578 if (srcRole != dstRole) {
579 std::stringstream ss;
580 ss << "Field `" << field.fSrc->GetFieldName()
581 << "` has a different structural role than previously-seen field with the same name (old: " << dstRole
582 << ", new: " << srcRole << ")";
583 errors.push_back(ss.str());
584 }
585
586 // Require that column representations match
587 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
588 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
589 if (srcNCols != dstNCols) {
590 std::stringstream ss;
591 ss << "Field `" << field.fSrc->GetFieldName()
592 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
593 << ", new: " << srcNCols << ")";
594 errors.push_back(ss.str());
595 } else {
596 for (auto i = 0u; i < srcNCols; ++i) {
597 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
598 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
599 const auto &srcCol = src.GetColumnDescriptor(srcColId);
600 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
601 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
602 // version of the same type, because we know how to treat that specific case. We should also properly handle
603 // different but compatible types.
604 if (srcCol.GetType() != dstCol.GetType() &&
605 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
606 std::stringstream ss;
607 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
608 << "` has a different column type of the same column on the previously-seen field with the same name "
609 "(old: "
611 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
612 errors.push_back(ss.str());
613 }
614 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
615 std::stringstream ss;
616 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
617 << "` has a different number of bits of the same column on the previously-seen field with the same "
618 "name "
619 "(old: "
620 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
621 errors.push_back(ss.str());
622 }
623 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
624 std::stringstream ss;
625 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
626 << "` has a different value range of the same column on the previously-seen field with the same name "
627 "(old: "
628 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
629 errors.push_back(ss.str());
630 }
631 if (srcCol.GetRepresentationIndex() > 0) {
632 std::stringstream ss;
633 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
634 << "` has a representation index higher than 0. This is not supported yet by the merger.";
635 errors.push_back(ss.str());
636 }
637 }
638 }
639
640 // Require that subfields are compatible
641 const auto &srcLinks = field.fSrc->GetLinkIds();
642 const auto &dstLinks = field.fDst->GetLinkIds();
643 if (srcLinks.size() != dstLinks.size()) {
644 std::stringstream ss;
645 ss << "Field `" << field.fSrc->GetFieldName()
646 << "` has a different number of children than previously-seen field with the same name (old: "
647 << dstLinks.size() << ", new: " << srcLinks.size() << ")";
648 errors.push_back(ss.str());
649 } else {
650 for (std::size_t linkIdx = 0, linkNum = srcLinks.size(); linkIdx < linkNum; ++linkIdx) {
651 const auto &srcSubfield = src.GetFieldDescriptor(srcLinks[linkIdx]);
652 const auto &dstSubfield = dst.GetFieldDescriptor(dstLinks[linkIdx]);
653 fieldsToCheck.push_back(RCommonField{srcSubfield, dstSubfield});
654 }
655 }
656 }
657
658 std::string errMsg;
659 for (const auto &err : errors)
660 errMsg += std::string("\n * ") + err;
661
662 if (!errMsg.empty())
663 errMsg = errMsg.substr(1); // strip initial newline
664
665 if (errMsg.length())
666 return R__FAIL(errMsg);
667
668 res.fCommonFields = std::move(commonFields);
669
670 return ROOT::RResult(res);
671}
672
673// Applies late model extension to `destination`, adding all `newFields` to it.
674[[nodiscard]]
676ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
677 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
678{
679 assert(newFields.size() > 0); // no point in calling this with 0 new cols
680
681 dstModel.Unfreeze();
683
684 if (mergeData.fMergeOpts.fExtraVerbose) {
685 std::string msg = "destination doesn't contain field";
686 if (newFields.size() > 1)
687 msg += 's';
688 msg += ' ';
689 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
690 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
691 });
692 R__LOG_INFO(NTupleMergeLog()) << msg << ": adding " << (newFields.size() > 1 ? "them" : "it")
693 << " to the destination model (entry #" << mergeData.fNumDstEntries << ").";
694 }
695
696 changeset.fAddedFields.reserve(newFields.size());
697 // First add all non-projected fields...
698 for (const auto *fieldDesc : newFields) {
699 if (!fieldDesc->IsProjectedField()) {
700 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
701 changeset.AddField(std::move(field));
702 }
703 }
704 // ...then add all projected fields.
705 for (const auto *fieldDesc : newFields) {
706 if (!fieldDesc->IsProjectedField())
707 continue;
708
710 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
711 const auto sourceId = fieldDesc->GetProjectionSourceId();
712 const auto &sourceField = dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(sourceId));
713 fieldMap[field.get()] = &sourceField;
714
715 for (const auto &subfield : *field) {
716 const auto &subFieldDesc = mergeData.fSrcDescriptor->GetFieldDescriptor(subfield.GetOnDiskId());
717 const auto subSourceId = subFieldDesc.GetProjectionSourceId();
718 const auto &subSourceField =
719 dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(subSourceId));
721 }
722 changeset.fAddedProjectedFields.emplace_back(field.get());
724 }
725 dstModel.Freeze();
726 try {
727 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
728 } catch (const ROOT::RException &ex) {
729 return R__FAIL(ex.GetError().GetReport());
730 }
731
732 commonFields.reserve(commonFields.size() + newFields.size());
733 for (const auto *field : newFields) {
734 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
735 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
736 commonFields.emplace_back(*field, newFieldInDst);
737 }
738
740}
741
742// Generates default (zero) values for the given columns
743[[nodiscard]]
745GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
748{
751
752 for (const auto &column : columns) {
753 const ROOT::RFieldDescriptor *field = column.fParentFieldDescriptor;
754
755 // Skip all auxiliary columns
756 assert(!field->GetLogicalColumnIds().empty());
757 if (field->GetLogicalColumnIds()[0] != column.fInputId)
758 continue;
759
760 // Check if this column is a child of a Collection or a Variant. If so, it has no data
761 // and can be skipped.
762 bool skipColumn = false;
763 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
764 for (auto parentId = field->GetParentId(); parentId != ROOT::kInvalidDescriptorId;) {
765 const ROOT::RFieldDescriptor &parent = column.fParentNTupleDescriptor->GetFieldDescriptor(parentId);
768 skipColumn = true;
769 break;
770 }
771 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
772 parentId = parent.GetParentId();
773 }
774 if (skipColumn)
775 continue;
776
777 const auto structure = field->GetStructure();
778
779 if (structure == ROOT::ENTupleStructure::kStreamer) {
780 return R__FAIL(
781 "Destination RNTuple contains a streamer field (" + field->GetFieldName() +
782 ") that is not present in one of the sources. "
783 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
784 }
785
786 // NOTE: we cannot have a Record here because it has no associated columns.
788 structure == ROOT::ENTupleStructure::kPlain);
789
790 const auto &columnDesc = dstDescriptor.GetColumnDescriptor(column.fOutputId);
791 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
793 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
794 // TODO(gparolini): make this configurable
795 constexpr auto kPageSizeLimit = 256 * 1024;
796 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
798 for (size_t i = 0; i < nPages; ++i) {
799 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
801 const auto bufSize = pageSize + checksumSize;
802 assert(pageSize % colElement->GetSize() == 0);
803 const auto nElementsPerPage = pageSize / colElement->GetSize();
804 auto page = pageAlloc.NewPage(colElement->GetSize(), nElementsPerPage);
805 page.GrowUnchecked(nElementsPerPage);
806 memset(page.GetBuffer(), 0, page.GetNBytes());
807
808 auto &buffer = sealedPageData.fBuffers.emplace_back(new std::byte[bufSize]);
810 sealConf.fElement = colElement.get();
811 sealConf.fPage = &page;
812 sealConf.fBuffer = buffer.get();
813 sealConf.fCompressionSettings = mergeData.fMergeOpts.fCompressionSettings.value();
814 sealConf.fWriteChecksum = mergeData.fDestination.GetWriteOptions().GetEnablePageChecksums();
816
817 sealedPageData.fPagesV.push_back({sealedPage});
818 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
819 sealedPageData.fPagesV.back().cend());
820 }
821 }
823}
824
825// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
826// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
830 std::span<const RColumnMergeInfo> commonColumns,
834{
839
840 const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
841 // we expect the cluster pool to contain the requested set of columns, since they were
842 // validated by CompareDescriptorStructure().
844
845 for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
846 const auto &column = commonColumns[colIdx];
847 const auto &columnId = column.fInputId;
848 R__ASSERT(clusterDesc.ContainsColumn(columnId));
849
850 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
851 const auto srcColElement = column.fInMemoryType
852 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
854 const auto dstColElement = column.fInMemoryType
855 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
856 : RColumnElementBase::Generate(column.fColumnType);
857
858 // Now get the pages for this column in this cluster
859 const auto &pages = clusterDesc.GetPageRange(columnId);
860
862 sealedPages.resize(pages.GetPageInfos().size());
863
864 // Each column range potentially has a distinct compression settings
865 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value();
866
867 // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
868 // L1: compression and encoding of src and dest both match: we can simply copy the page
869 // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
870 // resealing it.
871 // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
872 // it.
873 const bool compressionIsDifferent =
874 colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value();
875 const bool needsResealing =
876 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
878
879 if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) {
881 << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName
882 << ": { compression: " << colRangeCompressionSettings << " => "
883 << mergeData.fMergeOpts.fCompressionSettings.value()
884 << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType)
885 << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType);
886 }
887
888 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
889 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
890 // bother reserving memory for them.
892 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size());
893
894 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
895 // with zeroes, like we do for extraDstColumns.
896 // As an optimization, we don't do this for the first source (since we can rely on the FirstElementIndex and
897 // deferred column mechanism in that case).
898 // TODO: also avoid doing this if we added no real page of this column to the destination yet.
899 if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
900 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
902 mergeData.fDstDescriptor, mergeData);
903 if (!res)
904 return R__FORWARD_ERROR(res);
905 }
906
907 // Loop over the pages
908 std::uint64_t pageIdx = 0;
909 for (const auto &pageInfo : pages.GetPageInfos()) {
910 assert(pageIdx < sealedPages.size());
911 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
912 assert(pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero);
913
915 auto onDiskPage = cluster->GetOnDiskPage(key);
916
917 const auto checksumSize = pageInfo.HasChecksum() * RPageStorage::kNBytesPageChecksum;
919 sealedPage.SetNElements(pageInfo.GetNElements());
920 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
921 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
922 sealedPage.SetBuffer(onDiskPage->GetAddress());
923 // TODO(gparolini): more graceful error handling (skip the page?)
924 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
925 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
926
927 if (needsRecompressing) {
928 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
929 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
931 // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
932 // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
933 // the actual data size after recompressing.
935
936 // clang-format off
937 if (needsResealing) {
938 RTaskVisitor{fTaskGroup}(RResealFunc{
941 mergeData.fMergeOpts,
943 *fPageAlloc,
944 buffer.get(),
945 bufSize,
946 mergeData.fDestination.GetWriteOptions()
947 });
948 } else {
949 RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{
952 mergeData.fMergeOpts,
954 *fPageAlloc,
955 buffer.get(),
956 bufSize,
957 mergeData.fDestination.GetWriteOptions()
958 });
959 }
960 // clang-format on
961 }
962
963 ++pageIdx;
964
965 } // end of loop over pages
966
967 if (fTaskGroup)
968 fTaskGroup->Wait();
969
970 sealedPageData.fPagesV.push_back(std::move(sealedPages));
971 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
972 sealedPageData.fPagesV.back().cend());
973 } // end loop over common columns
974
976}
977
978// Iterates over all clusters of `source` and merges their pages into `destination`.
979// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
980// the destination's schemas.
981// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
982// compression is unspecified or matches the original compression settings.
985 std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
986{
988
989 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
990
991 // Loop over all clusters in this file.
992 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
993 // request the first cluster.
994 ROOT::DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
996 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
997 const auto nClusterEntries = clusterDesc.GetNEntries();
999
1000 // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it,
1001 // as it may be a deferred column that only has real data in a future cluster.
1002 // We need to figure out which columns are actually present in this cluster so we only merge their pages (the
1003 // missing columns are handled by synthesizing zero pages - see below).
1004 size_t nCommonColumnsInCluster = commonColumns.size();
1005 while (nCommonColumnsInCluster > 0) {
1006 // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as
1007 // soon as we find a common column that appears in this cluster: we know that in that case all previous
1008 // columns must appear as well.
1009 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
1010 break;
1012 }
1013
1014 // Convert columns to a ColumnSet for the ClusterPool query
1017 for (size_t i = 0; i < nCommonColumnsInCluster; ++i)
1018 commonColumnSet.emplace(commonColumns[i].fInputId);
1019
1020 // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns
1021 // that are not present in the cluster. We generate zero pages for all of them.
1022 missingColumns.resize(extraDstColumns.size());
1023 for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
1024 missingColumns.push_back(commonColumns[i]);
1025
1028 sealedPageData, mergeData, *fPageAlloc);
1029 if (!res)
1030 return R__FORWARD_ERROR(res);
1031
1033 mergeData.fDstDescriptor, mergeData);
1034 if (!res)
1035 return R__FORWARD_ERROR(res);
1036
1037 // Commit the pages and the clusters
1038 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
1039 mergeData.fDestination.CommitCluster(nClusterEntries);
1040 mergeData.fNumDstEntries += nClusterEntries;
1041
1042 // Go to the next cluster
1043 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
1044 }
1045
1046 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
1047 // the size of the running page list and commit a cluster group when it exceeds some threshold,
1048 // which would prevent the page list from getting too large.
1049 // However, as of today, we aren't really handling such huge files, and even relatively big ones
1050 // such as the CMS dataset have a page list size of about only 2 MB.
1051 // So currently we simply merge all cluster groups into one.
1053}
1054
1055static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
1056{
1059 return typeid(ROOT::Internal::RColumnIndex);
1060
1062 return typeid(ROOT::Internal::RColumnSwitch);
1063
1064 // clang-format off
1065 if (fieldType == "bool") return typeid(bool);
1066 if (fieldType == "std::byte") return typeid(std::byte);
1067 if (fieldType == "char") return typeid(char);
1068 if (fieldType == "std::int8_t") return typeid(std::int8_t);
1069 if (fieldType == "std::uint8_t") return typeid(std::uint8_t);
1070 if (fieldType == "std::int16_t") return typeid(std::int16_t);
1071 if (fieldType == "std::uint16_t") return typeid(std::uint16_t);
1072 if (fieldType == "std::int32_t") return typeid(std::int32_t);
1073 if (fieldType == "std::uint32_t") return typeid(std::uint32_t);
1074 if (fieldType == "std::int64_t") return typeid(std::int64_t);
1075 if (fieldType == "std::uint64_t") return typeid(std::uint64_t);
1076 if (fieldType == "float") return typeid(float);
1077 if (fieldType == "double") return typeid(double);
1078 // clang-format on
1079
1080 // if the type is not one of those above, we use the default in-memory type.
1081 return std::nullopt;
1082}
1083
1084// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its
1085// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column
1086// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor
1087// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should
1088// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and
1089// dstFieldDesc may alias.
1090static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const ROOT::RNTupleDescriptor &srcDesc,
1092 const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
1093{
1094 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
1095
1096 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
1097 columns.reserve(columns.size() + columnIds.size());
1098 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
1099 // different column representations.
1100 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
1101 // We don't want to try and merge alias columns
1102 if (srcFieldDesc.IsProjectedField())
1103 continue;
1104
1105 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
1106 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
1107
1109 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
1110 info.fInputId = srcColumn.GetPhysicalId();
1111 // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations:
1112 // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and
1113 // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
1114 // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
1115 // but for the second case they're not, and we need to pick the source field because we will then check the
1116 // column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see
1117 // GenerateZeroPagesForColumns()).
1118 info.fParentFieldDescriptor = &srcFieldDesc;
1119 // Save the parent field descriptor since this may be either the source or destination descriptor depending on
1120 // whether this is an extraDstField or a commonField. We will need this in GenerateZeroPagesForColumns() to
1121 // properly walk up the field hierarchy.
1122 info.fParentNTupleDescriptor = &srcDesc;
1123
1124 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
1125 info.fOutputId = it->second.fColumnId;
1126 info.fColumnType = it->second.fColumnType;
1127 } else {
1128 info.fOutputId = mergeData.fColumnIdMap.size();
1129 // NOTE(gparolini): map the type of src column to the type of dst column.
1130 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
1131 // on-disk representation of the same column.
1132 // This is also important to do for first source when it is used to generate the destination sink,
1133 // because even in that case their column representations may differ.
1134 // e.g. if the destination has a different compression than the source, an integer column might be
1135 // zigzag-encoded in the source but not in the destination.
1136 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
1137 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
1138 info.fColumnType = dstColumn.GetType();
1139 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
1140 }
1141
1142 if (mergeData.fMergeOpts.fExtraVerbose) {
1143 R__LOG_INFO(NTupleMergeLog()) << "Adding column " << info.fColumnName << " with log.id " << srcColumnId
1144 << ", phys.id " << srcColumn.GetPhysicalId() << ", type "
1145 << RColumnElementBase::GetColumnTypeName(srcColumn.GetType()) << " -> log.id "
1146 << info.fOutputId << ", type "
1148 }
1149
1150 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
1151 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
1152 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
1153 columns.emplace_back(info);
1154 }
1155
1156 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
1157 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
1158 assert(srcChildrenIds.size() == dstChildrenIds.size());
1159 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
1160 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
1161 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
1163 }
1164}
1165
1166// Converts the fields comparison data to the corresponding column information.
1167// While doing so, it collects such information in `mergeData.fColumnIdMap`, which is used by later calls to this
1168// function to map already-seen column names to their chosen outputId, type and so on.
1169static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc,
1171{
1172 RColumnInfoGroup res;
1173 for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) {
1174 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
1175 }
1176 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
1177 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
1178 }
1179
1180 // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has
1181 // (since each cluster must contain all columns of the previous cluster plus potentially some new ones)
1182 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1183 [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; });
1184
1185 return res;
1186}
1187
1189 ColumnIdMap_t &colIdMap, const std::string &prefix = "")
1190{
1191 std::string name = prefix + '.' + fieldDesc.GetFieldName();
1192 for (const auto &colId : fieldDesc.GetLogicalColumnIds()) {
1193 const auto &colDesc = desc.GetColumnDescriptor(colId);
1194 RColumnOutInfo info{};
1195 const auto colName = name + '.' + std::to_string(colDesc.GetIndex());
1196 info.fColumnId = colDesc.GetLogicalId();
1197 info.fColumnType = colDesc.GetType();
1199 }
1200
1201 for (const auto &subId : fieldDesc.GetLinkIds()) {
1202 const auto &subfield = desc.GetFieldDescriptor(subId);
1204 }
1205}
1206
1207RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination,
1208 std::unique_ptr<ROOT::RNTupleModel> model)
1209 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
1210 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
1211 : fDestination(std::move(destination)),
1212 fPageAlloc(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()),
1213 fModel(std::move(model))
1214{
1216
1217#ifdef R__USE_IMT
1220#endif
1221}
1222
1223RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination)
1224 : RNTupleMerger(std::move(destination), nullptr)
1225{
1226}
1227
1229{
1231
1233
1234 // Set compression settings if unset and verify it's compatible with the sink
1235 {
1236 const auto dstCompSettings = fDestination->GetWriteOptions().GetCompression();
1237 if (!mergeOpts.fCompressionSettings) {
1238 mergeOpts.fCompressionSettings = dstCompSettings;
1239 } else if (*mergeOpts.fCompressionSettings != dstCompSettings) {
1240 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
1241 "sink! (opts: ") +
1242 std::to_string(*mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
1243 ") This is currently unsupported.");
1244 }
1245 }
1246
1247 // we should have a model if and only if the destination is initialized.
1248 if (!!fModel != fDestination->IsInitialized()) {
1249 return R__FAIL(
1250 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1251 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1252 }
1253
1255 mergeData.fNumDstEntries = mergeData.fDestination.GetNEntries();
1256
1257 if (fModel) {
1258 // If this is an incremental merging, pre-fill the column id map with the existing destination ids.
1259 // Otherwise we would generate new output ids that may not match the ones in the destination!
1260 for (const auto &field : mergeData.fDstDescriptor.GetTopLevelFields()) {
1261 PrefillColumnMap(fDestination->GetDescriptor(), field, mergeData.fColumnIdMap);
1262 }
1263 }
1264
1265#define SKIP_OR_ABORT(errMsg) \
1266 do { \
1267 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1268 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1269 continue; \
1270 } else { \
1271 return R__FAIL(errMsg); \
1272 } \
1273 } while (0)
1274
1275 // Merge main loop
1276 for (RPageSource *source : sources) {
1277 // We need to make sure the streamer info from the source files is loaded otherwise we may not be able
1278 // to build the streamer info of user-defined types unless we have their dictionaries available.
1279 source->LoadStreamerInfo();
1280
1281 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1282 auto srcDescriptor = source->GetSharedDescriptorGuard();
1283 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
1284
1285 if (mergeData.fSrcDescriptor->GetVersion() > ROOT::RNTuple::GetCurrentVersion()) {
1288 << "RNTuple '" << mergeData.fSrcDescriptor->GetName()
1289 << "' has a higher format version than the latest supported by this version "
1290 "of ROOT. Merging will work but some features may be dropped.";
1291 } else {
1292 return R__FAIL("RNTuple '" + mergeData.fSrcDescriptor->GetName() +
1293 "' has a higher format version than the latest supported by this version. Refusing to "
1294 "merge, since RNTupleMergeOptions::fVersionBehavior is set to AbortOnHigherVersion.");
1295 }
1296 }
1297
1298 // Create sink from the input model if not initialized
1299 if (!fModel) {
1300 fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */);
1301 }
1302
1303 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1304 fDestination->UpdateExtraTypeInfo(extraTypeInfoDesc);
1305
1306 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
1307 if (!descCmpRes) {
1308 SKIP_OR_ABORT(std::string("Source RNTuple has an incompatible schema with the destination:\n") +
1309 descCmpRes.GetError()->GetReport());
1310 }
1311 auto descCmp = descCmpRes.Unwrap();
1312
1313 // If the current source is missing some fields and we're not in Union mode, error
1314 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
1315 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
1316 std::string msg = "Source RNTuple is missing the following fields:";
1317 for (const auto *field : descCmp.fExtraDstFields) {
1318 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1319 }
1321 }
1322
1323 // handle extra src fields
1324 if (descCmp.fExtraSrcFields.size()) {
1325 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
1326 // late model extension for all fExtraSrcFields in Union mode
1327 auto res = ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
1328 if (!res)
1329 return R__FORWARD_ERROR(res);
1330 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
1331 // If the current source has extra fields and we're in Strict mode, error
1332 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1333 for (const auto *field : descCmp.fExtraSrcFields) {
1334 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1335 }
1337 }
1338 }
1339
1340 // handle extra dst fields & common fields
1342 auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1343 if (!res)
1344 return R__FORWARD_ERROR(res);
1345 } // end loop over sources
1346
1347 if (fDestination->GetNEntries() == 0)
1348 R__LOG_WARNING(NTupleMergeLog()) << "Output RNTuple '" << fDestination->GetNTupleName() << "' has no entries.";
1349
1350 // Commit the output
1351 fDestination->CommitClusterGroup();
1352 fDestination->CommitDataset();
1353
1354 return RResult<void>::Success();
1355}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:304
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define R__LOG_WARNING(...)
Definition RLogger.hxx:357
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
#define R__LOG_INFO(...)
Definition RLogger.hxx:358
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static std::optional< ENTupleMergeVersionBehavior > ParseOptionVersionBehavior(const TString &opts)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:148
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
virtual RIdentifier GetIdentifier() const =0
std::size_t GetPackedSize(std::size_t nElements=1U) const
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Metadata stored for every field of an RNTuple.
ROOT::ENTupleStructure GetStructure() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
A log configuration for a channel, e.g.
Definition RLogger.hxx:97
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:68
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
static constexpr std::uint64_t GetCurrentVersion()
Returns the RNTuple version in the following form: Epoch: 2 most significant bytes Major: next 2 byte...
Definition RNTuple.hxx:90
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:290
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:198
Collection abstract base class.
Definition TCollection.h:65
A class to pass information from the TFileMerger to the objects being merged.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Definition TFile.h:130
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
Mother of all ROOT objects.
Definition TObject.h:42
Basic string class.
Definition TString.h:138
@ kIgnoreCase
Definition TString.h:285
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::ostream & operator<<(std::ostream &os, const std::optional< ROOT::RColumnDescriptor::RValueRange > &x)
@ kWarnOnHigherVersion
The merger will emit a warning when merging RNTuples with higher version than the latest supported by...
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:669
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleColumnType
const ROOT::RFieldDescriptor * fParentFieldDescriptor
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
const ROOT::RNTupleDescriptor * fSrcDescriptor
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::vector< std::unique_ptr< std::byte[]> > fBuffers
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58