Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
25#include <ROOT/RPageStorage.hxx>
26#include <ROOT/RClusterPool.hxx>
28#include <ROOT/RNTupleZip.hxx>
30#include <TROOT.h>
31#include <TFileMergeInfo.h>
32#include <TError.h>
33#include <TFile.h>
34#include <TKey.h>
35
36#include <algorithm>
37#include <deque>
38#include <inttypes.h> // for PRIu64
39#include <initializer_list>
40#include <unordered_map>
41#include <vector>
42
52
53using namespace ROOT::Experimental::Internal;
54
55// TFile options parsing
56// -------------------------------------------------------------------------------------
57static bool BeginsWithDelimitedWord(const TString &str, const char *word)
58{
59 const Ssiz_t wordLen = strlen(word);
60 if (str.Length() < wordLen)
61 return false;
62 if (!str.BeginsWith(word, TString::ECaseCompare::kIgnoreCase))
63 return false;
64 return str.Length() == wordLen || str(wordLen) == ' ';
65}
66
67template <typename T>
68static std::optional<T> ParseStringOption(const TString &opts, const char *pattern,
69 std::initializer_list<std::pair<const char *, T>> validValues)
70{
71 const Ssiz_t patternLen = strlen(pattern);
72 assert(pattern[patternLen - 1] == '='); // we want to parse options with the format `option=Value`
73 if (auto idx = opts.Index(pattern, 0, TString::ECaseCompare::kIgnoreCase);
74 idx >= 0 && opts.Length() > idx + patternLen) {
75 auto sub = TString(opts(idx + patternLen, opts.Length() - idx - patternLen));
76 for (const auto &[name, value] : validValues) {
77 if (BeginsWithDelimitedWord(sub, name)) {
78 return value;
79 }
80 }
81 }
82 return std::nullopt;
83}
84
85static std::optional<ENTupleMergingMode> ParseOptionMergingMode(const TString &opts)
86{
87 return ParseStringOption<ENTupleMergingMode>(opts, "rntuple.MergingMode=",
88 {
89 {"Filter", ENTupleMergingMode::kFilter},
90 {"Union", ENTupleMergingMode::kUnion},
91 {"Strict", ENTupleMergingMode::kStrict},
92 });
93}
94
95static std::optional<ENTupleMergeErrBehavior> ParseOptionErrBehavior(const TString &opts)
96{
97 return ParseStringOption<ENTupleMergeErrBehavior>(opts, "rntuple.ErrBehavior=",
98 {
99 {"Abort", ENTupleMergeErrBehavior::kAbort},
100 {"Skip", ENTupleMergeErrBehavior::kSkip},
101 });
102}
103// -------------------------------------------------------------------------------------
104
105// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
107// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
108try {
109 // Check the inputs
110 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
111 Error("RNTuple::Merge", "Invalid inputs.");
112 return -1;
113 }
114
115 // Parse the input parameters
117
118 // First entry is the RNTuple name
119 std::string ntupleName = std::string(itr()->GetName());
120
121 // Second entry is the output file
122 TObject *secondArg = itr();
123 TFile *outFile = dynamic_cast<TFile *>(secondArg);
124 if (!outFile) {
125 Error("RNTuple::Merge", "Second input parameter should be a TFile, but it's a %s.", secondArg->ClassName());
126 return -1;
127 }
128
129 // Check if the output file already has a key with that name
130 TKey *outKey = outFile->FindKey(ntupleName.c_str());
131 ROOT::RNTuple *outNTuple = nullptr;
132 if (outKey) {
133 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
134 if (!outNTuple) {
135 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
136 return -1;
137 }
138 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
139 // pointer we just got.
140 }
141
142 const bool defaultComp = mergeInfo->fOptions.Contains("DefaultCompression");
143 const bool firstSrcComp = mergeInfo->fOptions.Contains("FirstSrcCompression");
144 const bool extraVerbose = mergeInfo->fOptions.Contains("rntuple.ExtraVerbose");
145 if (defaultComp && firstSrcComp) {
146 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code.
147 Warning("RNTuple::Merge", "Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
148 "only the latter will apply.");
149 }
150 std::optional<std::uint32_t> compression;
151 if (firstSrcComp) {
152 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
153 // (do nothing here, the compression will be fetched below)
154 } else if (!defaultComp) {
155 // compression was explicitly passed by the user: use it.
156 compression = outFile->GetCompressionSettings();
157 } else {
158 // user passed no compression-related options: use default
160 Info("RNTuple::Merge", "Using the default compression: %u", *compression);
161 }
162
163 // The remaining entries are the input files
164 std::vector<std::unique_ptr<RPageSourceFile>> sources;
165 std::vector<RPageSource *> sourcePtrs;
166
167 while (const auto &pitr = itr()) {
168 TFile *inFile = dynamic_cast<TFile *>(pitr);
169 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
170 if (!anchor) {
171 Info("RNTuple::Merge", "No RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(), inFile->GetName());
172 continue;
173 }
174
176 if (!compression) {
177 // Get the compression of this RNTuple and use it as the output compression.
178 // We currently assume all column ranges have the same compression, so we just peek at the first one.
179 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
180 auto descriptor = source->GetSharedDescriptorGuard();
181 auto clusterIter = descriptor->GetClusterIterable();
183 if (firstCluster == clusterIter.end()) {
184 Error("RNTuple::Merge",
185 "Asked to use the first source's compression as the output compression, but the "
186 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
187 "determined.",
188 inFile->GetName());
189 return -1;
190 }
191 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
193 if (firstColRange == colRangeIter.end()) {
194 Error("RNTuple::Merge",
195 "Asked to use the first source's compression as the output compression, but the "
196 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
197 "determined.",
198 inFile->GetName());
199 return -1;
200 }
201 compression = (*firstColRange).GetCompressionSettings().value();
202 Info("RNTuple::Merge", "Using the first RNTuple's compression: %u", *compression);
203 }
204 sources.push_back(std::move(source));
205 }
206
209 writeOpts.SetCompression(*compression);
210 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
211 std::unique_ptr<ROOT::RNTupleModel> model;
212 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
213 if (outNTuple) {
215 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
216 auto desc = outSource->GetSharedDescriptorGuard();
217 model = destination->InitFromDescriptor(desc.GetRef(), true /* copyClusters */);
218 }
219
220 // Interface conversion
221 sourcePtrs.reserve(sources.size());
222 for (const auto &s : sources) {
223 sourcePtrs.push_back(s.get());
224 }
225
226 // Now merge
227 RNTupleMerger merger{std::move(destination), std::move(model)};
229 mergerOpts.fCompressionSettings = *compression;
230 mergerOpts.fExtraVerbose = extraVerbose;
231 if (auto mergingMode = ParseOptionMergingMode(mergeInfo->fOptions)) {
232 mergerOpts.fMergingMode = *mergingMode;
233 }
234 if (auto errBehavior = ParseOptionErrBehavior(mergeInfo->fOptions)) {
235 mergerOpts.fErrBehavior = *errBehavior;
236 }
237 merger.Merge(sourcePtrs, mergerOpts).ThrowOnError();
238
239 // Provide the caller with a merged anchor object (even though we've already
240 // written it).
241 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
242
243 return 0;
244} catch (const std::exception &ex) {
245 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
246 return -1;
247}
248
249namespace {
250// Functor used to change the compression of a page to `options.fCompressionSettings`.
251struct RChangeCompressionFunc {
252 const RColumnElementBase &fSrcColElement;
253 const RColumnElementBase &fDstColElement;
254 const RNTupleMergeOptions &fMergeOptions;
255
256 RPageStorage::RSealedPage &fSealedPage;
258 std::uint8_t *fBuffer;
259
260 void operator()() const
261 {
262 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
264 sealConf.fElement = &fDstColElement;
265 sealConf.fPage = &page;
266 sealConf.fBuffer = fBuffer;
267 sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings;
268 sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
269 auto refSealedPage = RPageSink::SealPage(sealConf);
270 fSealedPage = refSealedPage;
271 }
272};
273
274struct RCommonField {
275 const ROOT::RFieldDescriptor *fSrc;
276 const ROOT::RFieldDescriptor *fDst;
277
278 RCommonField(const ROOT::RFieldDescriptor *src, const ROOT::RFieldDescriptor *dst) : fSrc(src), fDst(dst) {}
279};
280
281struct RDescriptorsComparison {
282 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
283 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
284 std::vector<RCommonField> fCommonFields;
285};
286
287struct RColumnOutInfo {
288 ROOT::DescriptorId_t fColumnId;
289 ENTupleColumnType fColumnType;
290};
291
292// { fully.qualified.fieldName.colInputId => colOutputInfo }
293using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
294
295struct RColumnInfoGroup {
296 std::vector<RColumnMergeInfo> fExtraDstColumns;
297 // These are sorted by InputId
298 std::vector<RColumnMergeInfo> fCommonColumns;
299};
300
301} // namespace
302
303// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
306 // This column name is built as a dot-separated concatenation of the ancestry of
307 // the columns' parent fields' names plus the index of the column itself.
308 // e.g. "Muon.pt.x._0"
309 std::string fColumnName;
313 // If nullopt, use the default in-memory type
314 std::optional<std::type_index> fInMemoryType;
316};
317
318// Data related to a single call of RNTupleMerger::Merge()
336
338 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
339 // never invalidated.
340 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
341 std::vector<RPageStorage::RSealedPageGroup> fGroups;
342 std::vector<std::unique_ptr<std::uint8_t[]>> fBuffers;
343};
344
345std::ostream &operator<<(std::ostream &os, const std::optional<ROOT::RColumnDescriptor::RValueRange> &x)
346{
347 if (x) {
348 os << '(' << x->fMin << ", " << x->fMax << ')';
349 } else {
350 os << "(null)";
351 }
352 return os;
353}
354
355} // namespace ROOT::Experimental::Internal
356
358{
359 // clang-format off
380 // clang-format on
381 return false;
382}
383
384/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
385/// In addition, returns the differences between `dst` and `src`'s structures
388{
389 // Cases:
390 // 1. dst == src
391 // 2. dst has fields that src hasn't
392 // 3. src has fields that dst hasn't
393 // 4. dst and src have fields that differ (compatible or incompatible)
394
395 std::vector<std::string> errors;
396 RDescriptorsComparison res;
397
398 std::vector<RCommonField> commonFields;
399
400 for (const auto &dstField : dst.GetTopLevelFields()) {
401 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
403 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
404 commonFields.push_back({&srcField, &dstField});
405 } else {
406 res.fExtraDstFields.emplace_back(&dstField);
407 }
408 }
409 for (const auto &srcField : src.GetTopLevelFields()) {
410 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
412 res.fExtraSrcFields.push_back(&srcField);
413 }
414
415 // Check compatibility of common fields
416 for (const auto &field : commonFields) {
417 // NOTE: field.fSrc and field.fDst have the same name by construction
418 const auto &fieldName = field.fSrc->GetFieldName();
419
420 // Require that fields are both projected or both not projected
421 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
422 if (!projCompatible) {
423 std::stringstream ss;
424 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
425 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
426 errors.push_back(ss.str());
427 } else if (field.fSrc->IsProjectedField()) {
428 // if both fields are projected, verify that they point to the same real field
429 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
430 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
431 if (srcName != dstName) {
432 std::stringstream ss;
433 ss << "Field `" << fieldName
434 << "` is projected to a different field than a previously-seen field with the same name (old: "
435 << dstName << ", new: " << srcName << ")";
436 errors.push_back(ss.str());
437 }
438 }
439
440 // Require that fields types match
441 // TODO(gparolini): allow non-identical but compatible types
442 const auto &srcTyName = field.fSrc->GetTypeName();
443 const auto &dstTyName = field.fDst->GetTypeName();
444 if (srcTyName != dstTyName) {
445 std::stringstream ss;
446 ss << "Field `" << fieldName
447 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
448 << ", new: " << srcTyName << ")";
449 errors.push_back(ss.str());
450 }
451
452 // Require that type checksums match
453 const auto srcTyChk = field.fSrc->GetTypeChecksum();
454 const auto dstTyChk = field.fDst->GetTypeChecksum();
455 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
456 std::stringstream ss;
457 ss << "Field `" << field.fSrc->GetFieldName()
458 << "` has a different type checksum than previously-seen field with the same name";
459 errors.push_back(ss.str());
460 }
461
462 // Require that type versions match
463 const auto srcTyVer = field.fSrc->GetTypeVersion();
464 const auto dstTyVer = field.fDst->GetTypeVersion();
465 if (srcTyVer != dstTyVer) {
466 std::stringstream ss;
467 ss << "Field `" << field.fSrc->GetFieldName()
468 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
469 << ", new: " << srcTyVer << ")";
470 errors.push_back(ss.str());
471 }
472
473 // Require that column representations match
474 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
475 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
476 if (srcNCols != dstNCols) {
477 std::stringstream ss;
478 ss << "Field `" << field.fSrc->GetFieldName()
479 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
480 << ", new: " << srcNCols << ")";
481 errors.push_back(ss.str());
482 } else {
483 for (auto i = 0u; i < srcNCols; ++i) {
484 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
485 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
486 const auto &srcCol = src.GetColumnDescriptor(srcColId);
487 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
488 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
489 // version of the same type, because we know how to treat that specific case. We should also properly handle
490 // different but compatible types.
491 if (srcCol.GetType() != dstCol.GetType() &&
492 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
493 std::stringstream ss;
494 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
495 << "` has a different column type of the same column on the previously-seen field with the same name "
496 "(old: "
498 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
499 errors.push_back(ss.str());
500 }
501 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
502 std::stringstream ss;
503 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
504 << "` has a different number of bits of the same column on the previously-seen field with the same "
505 "name "
506 "(old: "
507 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
508 errors.push_back(ss.str());
509 }
510 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
511 std::stringstream ss;
512 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
513 << "` has a different value range of the same column on the previously-seen field with the same name "
514 "(old: "
515 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
516 errors.push_back(ss.str());
517 }
518 if (srcCol.GetRepresentationIndex() > 0) {
519 std::stringstream ss;
520 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
521 << "` has a representation index higher than 0. This is not supported yet by the merger.";
522 errors.push_back(ss.str());
523 }
524 }
525 }
526 }
527
528 std::string errMsg;
529 for (const auto &err : errors)
530 errMsg += std::string("\n * ") + err;
531
532 if (!errMsg.empty())
533 errMsg = errMsg.substr(1); // strip initial newline
534
535 if (errMsg.length())
536 return R__FAIL(errMsg);
537
538 res.fCommonFields.reserve(commonFields.size());
539 for (const auto &[srcField, dstField] : commonFields) {
540 res.fCommonFields.emplace_back(srcField, dstField);
541 }
542
543 // TODO(gparolini): we should exhaustively check the field tree rather than just the top level fields,
544 // in case the user forgets to change the version number on one field.
545
546 return ROOT::RResult(res);
547}
548
549// Applies late model extension to `destination`, adding all `newFields` to it.
550static void ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
551 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
552{
553 assert(newFields.size() > 0); // no point in calling this with 0 new cols
554
555 dstModel.Unfreeze();
557
558 if (mergeData.fMergeOpts.fExtraVerbose) {
559 std::string msg = "destination doesn't contain field";
560 if (newFields.size() > 1)
561 msg += 's';
562 msg += ' ';
563 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
564 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
565 });
566 Info("RNTuple::Merge", "%s: adding %s to the destination model (entry #%" PRIu64 ").", msg.c_str(),
567 (newFields.size() > 1 ? "them" : "it"), mergeData.fNumDstEntries);
568 }
569
570 changeset.fAddedFields.reserve(newFields.size());
571 // First add all non-projected fields...
572 for (const auto *fieldDesc : newFields) {
573 if (!fieldDesc->IsProjectedField()) {
574 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
575 changeset.AddField(std::move(field));
576 }
577 }
578 // ...then add all projected fields.
579 for (const auto *fieldDesc : newFields) {
580 if (!fieldDesc->IsProjectedField())
581 continue;
582
584 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
585 const auto sourceId = fieldDesc->GetProjectionSourceId();
586 const auto &sourceField = dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(sourceId));
587 fieldMap[field.get()] = &sourceField;
588
589 for (const auto &subfield : *field) {
590 const auto &subFieldDesc = mergeData.fSrcDescriptor->GetFieldDescriptor(subfield.GetOnDiskId());
591 const auto subSourceId = subFieldDesc.GetProjectionSourceId();
592 const auto &subSourceField =
593 dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(subSourceId));
595 }
596 changeset.fAddedProjectedFields.emplace_back(field.get());
598 }
599 dstModel.Freeze();
600 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
601
602 commonFields.reserve(commonFields.size() + newFields.size());
603 for (const auto *field : newFields) {
604 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
605 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
606 commonFields.emplace_back(field, &newFieldInDst);
607 }
608}
609
610// Generates default (zero) values for the given columns
611static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
615{
617 return;
618
619 for (const auto &column : columns) {
620 const auto &columnId = column.fInputId;
621 const auto &columnDesc = dstDescriptor.GetColumnDescriptor(columnId);
622 const ROOT::RFieldDescriptor *field = column.fParentField;
623
624 // Skip all auxiliary columns
625 assert(!field->GetLogicalColumnIds().empty());
626 if (field->GetLogicalColumnIds()[0] != columnId)
627 continue;
628
629 // Check if this column is a child of a Collection or a Variant. If so, it has no data
630 // and can be skipped.
631 bool skipColumn = false;
632 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
633 for (auto parentId = field->GetParentId(); parentId != ROOT::kInvalidDescriptorId;) {
634 const ROOT::RFieldDescriptor &parent = srcDescriptor.GetFieldDescriptor(parentId);
637 skipColumn = true;
638 break;
639 }
640 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
641 parentId = parent.GetParentId();
642 }
643 if (skipColumn)
644 continue;
645
646 const auto structure = field->GetStructure();
647
648 if (structure == ROOT::ENTupleStructure::kStreamer) {
649 Fatal(
650 "RNTuple::Merge",
651 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
652 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
653 field->GetFieldName().c_str());
654 continue;
655 }
656
657 // NOTE: we cannot have a Record here because it has no associated columns.
659 structure == ROOT::ENTupleStructure::kLeaf);
660
661 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
663 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
664 constexpr auto kPageSizeLimit = 256 * 1024;
665 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
667 for (size_t i = 0; i < nPages; ++i) {
668 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
670 const auto bufSize = pageSize + checksumSize;
671 assert(pageSize % colElement->GetSize() == 0);
672 const auto nElementsPerPage = pageSize / colElement->GetSize();
673 auto page = pageAlloc.NewPage(colElement->GetSize(), nElementsPerPage);
674 page.GrowUnchecked(nElements);
675 memset(page.GetBuffer(), 0, page.GetNBytes());
676
677 auto &buffer = sealedPageData.fBuffers.emplace_back(new unsigned char[bufSize]);
679 sealConf.fElement = colElement.get();
680 sealConf.fPage = &page;
681 sealConf.fBuffer = buffer.get();
682 sealConf.fCompressionSettings = mergeData.fMergeOpts.fCompressionSettings.value();
683 sealConf.fWriteChecksum = mergeData.fDestination.GetWriteOptions().GetEnablePageChecksums();
685
686 sealedPageData.fPagesV.push_back({sealedPage});
687 }
688
689 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
690 sealedPageData.fPagesV.back().cend());
691 }
692}
693
694// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
695// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
697 std::span<const RColumnMergeInfo> commonColumns,
701{
705 return;
706
707 const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
708 // we expect the cluster pool to contain the requested set of columns, since they were
709 // validated by CompareDescriptorStructure().
711
712 for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
713 const auto &column = commonColumns[colIdx];
714 const auto &columnId = column.fInputId;
715 R__ASSERT(clusterDesc.ContainsColumn(columnId));
716
717 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
718 const auto srcColElement = column.fInMemoryType
719 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
721 const auto dstColElement = column.fInMemoryType
722 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
723 : RColumnElementBase::Generate(column.fColumnType);
724
725 // Now get the pages for this column in this cluster
726 const auto &pages = clusterDesc.GetPageRange(columnId);
727
729 sealedPages.resize(pages.GetPageInfos().size());
730
731 // Each column range potentially has a distinct compression settings
732 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value();
733 // If either the compression or the encoding of the source doesn't match that of the destination, we need
734 // to reseal the page. Otherwise, if both match, we can fast merge.
735 const bool needsResealing =
736 colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value() ||
737 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
738
739 if (needsResealing && mergeData.fMergeOpts.fExtraVerbose) {
740 Info("RNTuple::Merge", "Resealing column %s: { compression: %d => %d, onDiskType: %s => %s }",
741 column.fColumnName.c_str(), colRangeCompressionSettings,
742 mergeData.fMergeOpts.fCompressionSettings.value(),
743 RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType),
744 RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType));
745 }
746
747 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
748 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
749 // bother reserving memory for them.
750 if (needsResealing)
751 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size());
752
753 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
754 // with zeroes, like we do for extraDstColumns.
755 // As an optimization, we don't do this for the first source (since we can rely on the FirstElementIndex and
756 // deferred column mechanism in that case).
757 // TODO: also avoid doing this if we added no real page of this column to the destination yet.
758 if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
759 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
761 *mergeData.fSrcDescriptor, mergeData.fDstDescriptor, mergeData);
762 }
763
764 // Loop over the pages
765 std::uint64_t pageIdx = 0;
766 for (const auto &pageInfo : pages.GetPageInfos()) {
767 assert(pageIdx < sealedPages.size());
768 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
769 assert(pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero);
770
772 auto onDiskPage = cluster->GetOnDiskPage(key);
773
774 const auto checksumSize = pageInfo.HasChecksum() * RPageStorage::kNBytesPageChecksum;
776 sealedPage.SetNElements(pageInfo.GetNElements());
777 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
778 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
779 sealedPage.SetBuffer(onDiskPage->GetAddress());
780 // TODO(gparolini): more graceful error handling (skip the page?)
781 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
782 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
783
784 if (needsResealing) {
785 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
786 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
788 RChangeCompressionFunc compressTask{
789 *srcColElement, *dstColElement, mergeData.fMergeOpts, sealedPage, *fPageAlloc, buffer.get(),
790 };
791
792 if (fTaskGroup)
793 fTaskGroup->Run(compressTask);
794 else
795 compressTask();
796 }
797
798 ++pageIdx;
799
800 } // end of loop over pages
801
802 if (fTaskGroup)
803 fTaskGroup->Wait();
804
805 sealedPageData.fPagesV.push_back(std::move(sealedPages));
806 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
807 sealedPageData.fPagesV.back().cend());
808 } // end loop over common columns
809}
810
811// Iterates over all clusters of `source` and merges their pages into `destination`.
812// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
813// the destination's schemas.
814// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
815// compression is unspecified or matches the original compression settings.
817 std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
818{
820
821 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
822
823 // Loop over all clusters in this file.
824 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
825 // request the first cluster.
826 ROOT::DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
828 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
829 const auto nClusterEntries = clusterDesc.GetNEntries();
831
832 // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it,
833 // as it may be a deferred column that only has real data in a future cluster.
834 // We need to figure out which columns are actually present in this cluster so we only merge their pages (the
835 // missing columns are handled by synthesizing zero pages - see below).
837 while (nCommonColumnsInCluster > 0) {
838 // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as
839 // soon as we find a common column that appears in this cluster: we know that in that case all previous
840 // columns must appear as well.
841 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
842 break;
844 }
845
846 // Convert columns to a ColumnSet for the ClusterPool query
849 for (size_t i = 0; i < nCommonColumnsInCluster; ++i)
850 commonColumnSet.emplace(commonColumns[i].fInputId);
851
852 // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns
853 // that are not present in the cluster. We generate zero pages for all of them.
854 missingColumns.resize(extraDstColumns.size());
855 for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
856 missingColumns.push_back(commonColumns[i]);
857
860 sealedPageData, mergeData, *fPageAlloc);
862 *mergeData.fSrcDescriptor, mergeData.fDstDescriptor, mergeData);
863
864 // Commit the pages and the clusters
865 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
866 mergeData.fDestination.CommitCluster(nClusterEntries);
867 mergeData.fNumDstEntries += nClusterEntries;
868
869 // Go to the next cluster
870 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
871 }
872
873 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
874 // the size of the running page list and commit a cluster group when it exceeds some threshold,
875 // which would prevent the page list from getting too large.
876 // However, as of today, we aren't really handling such huge files, and even relatively big ones
877 // such as the CMS dataset have a page list size of about only 2 MB.
878 // So currently we simply merge all cluster groups into one.
879}
880
881static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
882{
885 return typeid(ROOT::Internal::RColumnIndex);
886
888 return typeid(ROOT::Internal::RColumnSwitch);
889
890 // clang-format off
891 if (fieldType == "bool") return typeid(bool);
892 if (fieldType == "std::byte") return typeid(std::byte);
893 if (fieldType == "char") return typeid(char);
894 if (fieldType == "std::int8_t") return typeid(std::int8_t);
895 if (fieldType == "std::uint8_t") return typeid(std::uint8_t);
896 if (fieldType == "std::int16_t") return typeid(std::int16_t);
897 if (fieldType == "std::uint16_t") return typeid(std::uint16_t);
898 if (fieldType == "std::int32_t") return typeid(std::int32_t);
899 if (fieldType == "std::uint32_t") return typeid(std::uint32_t);
900 if (fieldType == "std::int64_t") return typeid(std::int64_t);
901 if (fieldType == "std::uint64_t") return typeid(std::uint64_t);
902 if (fieldType == "float") return typeid(float);
903 if (fieldType == "double") return typeid(double);
904 // clang-format on
905
906 // if the type is not one of those above, we use the default in-memory type.
907 return std::nullopt;
908}
909
910// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its
911// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column
912// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor
913// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should
914// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and
915// dstFieldDesc may alias.
916static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const ROOT::RNTupleDescriptor &srcDesc,
918 const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
919{
920 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
921
922 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
923 columns.reserve(columns.size() + columnIds.size());
924 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
925 // different column representations.
926 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
927 // We don't want to try and merge alias columns
928 if (srcFieldDesc.IsProjectedField())
929 continue;
930
931 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
932 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
933
935 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
936 info.fInputId = srcColumn.GetPhysicalId();
937 // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations:
938 // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and
939 // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
940 // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
941 // but for the second case they're not, and we need to pick the source field because we will then check the
942 // column's *input* id inside fParentField to see if it's a suppressed column (see GenerateZeroPagesForColumns()).
943 info.fParentField = &srcFieldDesc;
944
945 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
946 info.fOutputId = it->second.fColumnId;
947 info.fColumnType = it->second.fColumnType;
948 } else {
949 info.fOutputId = mergeData.fColumnIdMap.size();
950 // NOTE(gparolini): map the type of src column to the type of dst column.
951 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
952 // on-disk representation of the same column.
953 // This is also important to do for first source when it is used to generate the destination sink,
954 // because even in that case their column representations may differ.
955 // e.g. if the destination has a different compression than the source, an integer column might be
956 // zigzag-encoded in the source but not in the destination.
957 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
958 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
959 info.fColumnType = dstColumn.GetType();
960 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
961 }
962
963 if (mergeData.fMergeOpts.fExtraVerbose) {
964 Info("RNTuple::Merge",
965 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
966 " -> log.id %" PRIu64 ", type %s",
967 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
970 }
971
972 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
973 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
974 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
975 columns.emplace_back(info);
976 }
977
978 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
979 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
980 assert(srcChildrenIds.size() == dstChildrenIds.size());
981 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
982 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
983 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
985 }
986}
987
988// Converts the fields comparison data to the corresponding column information.
989// While doing so, it collects such information in `mergeData.fColumnIdMap`, which is used by later calls to this
990// function to map already-seen column names to their chosen outputId, type and so on.
991static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc,
993{
994 RColumnInfoGroup res;
995 for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) {
996 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
997 }
998 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
999 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
1000 }
1001
1002 // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has
1003 // (since each cluster must contain all columns of the previous cluster plus potentially some new ones)
1004 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1005 [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; });
1006
1007 return res;
1008}
1009
1011 ColumnIdMap_t &colIdMap, const std::string &prefix = "")
1012{
1013 std::string name = prefix + '.' + fieldDesc.GetFieldName();
1014 for (const auto &colId : fieldDesc.GetLogicalColumnIds()) {
1015 const auto &colDesc = desc.GetColumnDescriptor(colId);
1016 RColumnOutInfo info{};
1017 const auto colName = name + '.' + std::to_string(colDesc.GetIndex());
1018 info.fColumnId = colDesc.GetLogicalId();
1019 info.fColumnType = colDesc.GetType();
1021 }
1022
1023 for (const auto &subId : fieldDesc.GetLinkIds()) {
1024 const auto &subfield = desc.GetFieldDescriptor(subId);
1026 }
1027}
1028
1029RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination,
1030 std::unique_ptr<ROOT::RNTupleModel> model)
1031 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
1032 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
1033 : fDestination(std::move(destination)),
1034 fPageAlloc(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()),
1035 fModel(std::move(model))
1036{
1038
1039#ifdef R__USE_IMT
1042#endif
1043}
1044
1045RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination)
1046 : RNTupleMerger(std::move(destination), nullptr)
1047{
1048}
1049
1051{
1053
1055
1056 // Set compression settings if unset and verify it's compatible with the sink
1057 {
1058 const auto dstCompSettings = fDestination->GetWriteOptions().GetCompression();
1059 if (!mergeOpts.fCompressionSettings) {
1060 mergeOpts.fCompressionSettings = dstCompSettings;
1061 } else if (*mergeOpts.fCompressionSettings != dstCompSettings) {
1062 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
1063 "sink! (opts: ") +
1064 std::to_string(*mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
1065 ") This is currently unsupported.");
1066 }
1067 }
1068
1069 // we should have a model if and only if the destination is initialized.
1070 if (!!fModel != fDestination->IsInitialized()) {
1071 return R__FAIL(
1072 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1073 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1074 }
1075
1077 mergeData.fNumDstEntries = mergeData.fDestination.GetNEntries();
1078
1079 if (fModel) {
1080 // If this is an incremental merging, pre-fill the column id map with the existing destination ids.
1081 // Otherwise we would generate new output ids that may not match the ones in the destination!
1082 for (const auto &field : mergeData.fDstDescriptor.GetTopLevelFields()) {
1083 PrefillColumnMap(fDestination->GetDescriptor(), field, mergeData.fColumnIdMap);
1084 }
1085 }
1086
1087#define SKIP_OR_ABORT(errMsg) \
1088 do { \
1089 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1090 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
1091 continue; \
1092 } else { \
1093 return R__FAIL(errMsg); \
1094 } \
1095 } while (0)
1096
1097 // Merge main loop
1098 for (RPageSource *source : sources) {
1099 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1100 auto srcDescriptor = source->GetSharedDescriptorGuard();
1101 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
1102
1103 // Create sink from the input model if not initialized
1104 if (!fModel) {
1105 fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */);
1106 }
1107
1108 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1109 fDestination->UpdateExtraTypeInfo(extraTypeInfoDesc);
1110
1111 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
1112 if (!descCmpRes) {
1113 SKIP_OR_ABORT(std::string("Source RNTuple has an incompatible schema with the destination:\n") +
1114 descCmpRes.GetError()->GetReport());
1115 }
1116 auto descCmp = descCmpRes.Unwrap();
1117
1118 // If the current source is missing some fields and we're not in Union mode, error
1119 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
1120 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
1121 std::string msg = "Source RNTuple is missing the following fields:";
1122 for (const auto *field : descCmp.fExtraDstFields) {
1123 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1124 }
1126 }
1127
1128 // handle extra src fields
1129 if (descCmp.fExtraSrcFields.size()) {
1130 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
1131 // late model extension for all fExtraSrcFields in Union mode
1132 ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
1133 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
1134 // If the current source has extra fields and we're in Strict mode, error
1135 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1136 for (const auto *field : descCmp.fExtraSrcFields) {
1137 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1138 }
1140 }
1141 }
1142
1143 // handle extra dst fields & common fields
1145 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1146 } // end loop over sources
1147
1148 if (fDestination->GetNEntries() == 0)
1149 Warning("RNTuple::Merge", "Output RNTuple '%s' has no entries.", fDestination->GetNTupleName().c_str());
1150
1151 // Commit the output
1152 fDestination->CommitClusterGroup();
1153 fDestination->CommitDataset();
1154
1155 return RResult<void>::Success();
1156}
fBuffer
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static void ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &srcDescriptor, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
#define b(i)
Definition RSha256.hxx:100
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:110
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
void MergeCommonColumns(RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
void MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Metadata for RNTuple clusters.
Metadata stored for every field of an RNTuple.
ROOT::ENTupleStructure GetStructure() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:65
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
Collection abstract base class.
Definition TCollection.h:65
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:131
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
Mother of all ROOT objects.
Definition TObject.h:41
Basic string class.
Definition TString.h:139
@ kIgnoreCase
Definition TString.h:277
std::ostream & Info()
Definition hadd.cxx:171
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::ostream & operator<<(std::ostream &os, const std::optional< ROOT::RColumnDescriptor::RValueRange > &x)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleColumnType
const ROOT::RFieldDescriptor * fParentField
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fSrcDescriptor
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
The incremental changes to a RNTupleModel
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58