38#include <initializer_list>
39#include <unordered_map>
65 const Ssiz_t wordLen = strlen(word);
66 if (str.Length() < wordLen)
70 return str.Length() == wordLen || str(wordLen) ==
' ';
75 std::initializer_list<std::pair<const char *, T>> validValues)
77 const Ssiz_t patternLen = strlen(pattern);
78 assert(pattern[patternLen - 1] ==
'=');
80 idx >= 0 && opts.
Length() > idx + patternLen) {
81 auto sub =
TString(opts(idx + patternLen, opts.
Length() - idx - patternLen));
82 for (
const auto &[
name,
value] : validValues) {
113 opts,
"rntuple.VersionBehavior=",
126 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo) {
135 std::string ntupleName = std::string(itr()->GetName());
139 TFile *outFile =
dynamic_cast<TFile *
>(secondArg);
147 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
159 const bool defaultComp = mergeInfo->
fOptions.
Contains(
"DefaultCompression");
160 const bool firstSrcComp = mergeInfo->
fOptions.
Contains(
"FirstSrcCompression");
161 const bool extraVerbose = mergeInfo->
fOptions.
Contains(
"rntuple.ExtraVerbose");
162 if (defaultComp && firstSrcComp) {
165 "only the latter will apply.";
167 std::optional<std::uint32_t> compression;
171 }
else if (!defaultComp) {
181 std::vector<std::unique_ptr<RPageSourceFile>> sources;
182 std::vector<RPageSource *> sourcePtrs;
184 while (
const auto &pitr = itr()) {
197 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
198 auto descriptor = source->GetSharedDescriptorGuard();
199 auto clusterIter = descriptor->GetClusterIterable();
200 auto firstCluster = clusterIter.begin();
201 if (firstCluster == clusterIter.end()) {
203 <<
"Asked to use the first source's compression as the output compression, but the "
204 "first source (file '"
206 <<
"') has an empty RNTuple, therefore the output compression could not be "
210 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
211 auto firstColRange = colRangeIter.begin();
212 if (firstColRange == colRangeIter.end()) {
214 <<
"Asked to use the first source's compression as the output compression, but the "
215 "first source (file '"
217 <<
"') has an empty RNTuple, therefore the output compression could not be "
221 compression = (*firstColRange).GetCompressionSettings().value();
224 sources.push_back(std::move(source));
230 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
231 std::unique_ptr<ROOT::RNTupleModel> model;
235 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
236 auto desc = outSource->GetSharedDescriptorGuard();
237 model = destination->InitFromDescriptor(desc.GetRef(),
true );
241 sourcePtrs.reserve(sources.size());
242 for (
const auto &s : sources) {
243 sourcePtrs.push_back(s.get());
247 RNTupleMerger merger{std::move(destination), std::move(model)};
260 merger.Merge(sourcePtrs, mergerOpts).ThrowOnError();
267}
catch (
const std::exception &
ex) {
274struct RChangeCompressionFunc {
282 std::size_t fBufSize;
294 std::unique_ptr<std::byte[]> unzipBufOwned;
296 if (compression != 0) {
298 unzipBuf = unzipBufOwned.get();
306 std::size_t nBytesZipped;
307 if (compression != 0) {
309 assert(fBufSize >= bytesPacked + checksumSize);
312 nBytesZipped = bytesPacked;
321 const RColumnElementBase &fSrcColElement;
322 const RColumnElementBase &fDstColElement;
323 const RNTupleMergeOptions &fMergeOptions;
325 RPageStorage::RSealedPage &fSealedPage;
326 ROOT::Internal::RPageAllocator &fPageAlloc;
328 std::size_t fBufSize;
329 const ROOT::RNTupleWriteOptions &fWriteOpts;
334 RPageSink::RSealPageConfig sealConf;
335 sealConf.
fElement = &fDstColElement;
336 sealConf.
fPage = &page;
342 fSealedPage = refSealedPage;
347 std::optional<ROOT::Experimental::TTaskGroup> &fGroup;
349 template <
typename T>
360 const ROOT::RFieldDescriptor *fSrc;
361 const ROOT::RFieldDescriptor *fDst;
363 RCommonField(
const ROOT::RFieldDescriptor &
src,
const ROOT::RFieldDescriptor &dst) : fSrc(&
src), fDst(&dst) {}
366struct RDescriptorsComparison {
367 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
368 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
369 std::vector<RCommonField> fCommonFields;
372struct RColumnOutInfo {
378using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
380struct RColumnInfoGroup {
381 std::vector<RColumnMergeInfo> fExtraDstColumns;
383 std::vector<RColumnMergeInfo> fCommonColumns;
389namespace ROOT::Experimental::Internal {
428 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
429 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
430 std::vector<std::unique_ptr<std::byte[]>>
fBuffers;
433std::ostream &
operator<<(std::ostream &os,
const std::optional<ROOT::RColumnDescriptor::RValueRange> &
x)
436 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
474static ROOT::RResult<RDescriptorsComparison>
483 std::vector<std::string> errors;
484 RDescriptorsComparison res;
486 std::vector<RCommonField> commonFields;
489 const auto srcFieldId =
src.FindFieldId(dstField.GetFieldName());
491 const auto &srcField =
src.GetFieldDescriptor(srcFieldId);
492 commonFields.push_back({srcField, dstField});
494 res.fExtraDstFields.emplace_back(&dstField);
497 for (
const auto &srcField :
src.GetTopLevelFields()) {
498 const auto dstFieldId = dst.
FindFieldId(srcField.GetFieldName());
500 res.fExtraSrcFields.push_back(&srcField);
504 auto fieldsToCheck = commonFields;
506 for (std::size_t fieldIdx = 0; fieldIdx < fieldsToCheck.size(); ++fieldIdx) {
507 const auto &field = fieldsToCheck[fieldIdx];
510 const auto &fieldName = field.fSrc->GetFieldName();
513 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
514 if (!projCompatible) {
515 std::stringstream ss;
516 ss <<
"Field `" << fieldName <<
"` is incompatible with previously-seen field with that name because the "
517 << (field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
518 errors.push_back(ss.str());
519 }
else if (field.fSrc->IsProjectedField()) {
521 const auto srcName =
src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
523 if (srcName != dstName) {
524 std::stringstream ss;
525 ss <<
"Field `" << fieldName
526 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
527 << dstName <<
", new: " << srcName <<
")";
528 errors.push_back(ss.str());
534 const auto &srcTyName = field.fSrc->GetTypeName();
535 const auto &dstTyName = field.fDst->GetTypeName();
536 if (srcTyName != dstTyName) {
537 std::stringstream ss;
538 ss <<
"Field `" << fieldName
539 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
540 <<
", new: " << srcTyName <<
")";
541 errors.push_back(ss.str());
545 const auto srcTyChk = field.fSrc->GetTypeChecksum();
546 const auto dstTyChk = field.fDst->GetTypeChecksum();
547 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
548 std::stringstream ss;
549 ss <<
"Field `" << field.fSrc->GetFieldName()
550 <<
"` has a different type checksum than previously-seen field with the same name";
551 errors.push_back(ss.str());
555 const auto srcTyVer = field.fSrc->GetTypeVersion();
556 const auto dstTyVer = field.fDst->GetTypeVersion();
557 if (srcTyVer != dstTyVer) {
558 std::stringstream ss;
559 ss <<
"Field `" << field.fSrc->GetFieldName()
560 <<
"` has a different type version than previously-seen field with the same name (old: " << dstTyVer
561 <<
", new: " << srcTyVer <<
")";
562 errors.push_back(ss.str());
566 const auto srcFldVer = field.fSrc->GetFieldVersion();
567 const auto dstFldVer = field.fDst->GetFieldVersion();
568 if (srcFldVer != dstFldVer) {
569 std::stringstream ss;
570 ss <<
"Field `" << field.fSrc->GetFieldName()
571 <<
"` has a different field version than previously-seen field with the same name (old: " << dstFldVer
572 <<
", new: " << srcFldVer <<
")";
573 errors.push_back(ss.str());
576 const auto srcRole = field.fSrc->GetStructure();
577 const auto dstRole = field.fDst->GetStructure();
578 if (srcRole != dstRole) {
579 std::stringstream ss;
580 ss <<
"Field `" << field.fSrc->GetFieldName()
581 <<
"` has a different structural role than previously-seen field with the same name (old: " << dstRole
582 <<
", new: " << srcRole <<
")";
583 errors.push_back(ss.str());
587 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
588 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
589 if (srcNCols != dstNCols) {
590 std::stringstream ss;
591 ss <<
"Field `" << field.fSrc->GetFieldName()
592 <<
"` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
593 <<
", new: " << srcNCols <<
")";
594 errors.push_back(ss.str());
596 for (
auto i = 0u; i < srcNCols; ++i) {
597 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
598 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
599 const auto &srcCol =
src.GetColumnDescriptor(srcColId);
604 if (srcCol.GetType() != dstCol.GetType() &&
606 std::stringstream ss;
607 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
608 <<
"` has a different column type of the same column on the previously-seen field with the same name "
612 errors.push_back(ss.str());
614 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
615 std::stringstream ss;
616 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
617 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
620 << srcCol.GetBitsOnStorage() <<
", new: " << dstCol.GetBitsOnStorage() <<
")";
621 errors.push_back(ss.str());
623 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
624 std::stringstream ss;
625 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
626 <<
"` has a different value range of the same column on the previously-seen field with the same name "
628 << srcCol.GetValueRange() <<
", new: " << dstCol.GetValueRange() <<
")";
629 errors.push_back(ss.str());
631 if (srcCol.GetRepresentationIndex() > 0) {
632 std::stringstream ss;
633 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
634 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
635 errors.push_back(ss.str());
641 const auto &srcLinks = field.fSrc->GetLinkIds();
642 const auto &dstLinks = field.fDst->GetLinkIds();
643 if (srcLinks.size() != dstLinks.size()) {
644 std::stringstream ss;
645 ss <<
"Field `" << field.fSrc->GetFieldName()
646 <<
"` has a different number of children than previously-seen field with the same name (old: "
647 << dstLinks.size() <<
", new: " << srcLinks.size() <<
")";
648 errors.push_back(ss.str());
650 for (std::size_t linkIdx = 0, linkNum = srcLinks.size(); linkIdx < linkNum; ++linkIdx) {
651 const auto &srcSubfield =
src.GetFieldDescriptor(srcLinks[linkIdx]);
653 fieldsToCheck.push_back(RCommonField{srcSubfield, dstSubfield});
659 for (
const auto &err : errors)
660 errMsg += std::string(
"\n * ") + err;
663 errMsg = errMsg.substr(1);
668 res.fCommonFields = std::move(commonFields);
675static ROOT::RResult<void>
679 assert(newFields.size() > 0);
685 std::string msg =
"destination doesn't contain field";
686 if (newFields.size() > 1)
689 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](
const auto &acc,
const auto *field) {
690 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
693 <<
" to the destination model (entry #" << mergeData.
fNumDstEntries <<
").";
698 for (
const auto *fieldDesc : newFields) {
699 if (!fieldDesc->IsProjectedField()) {
701 changeset.AddField(std::move(field));
705 for (
const auto *fieldDesc : newFields) {
706 if (!fieldDesc->IsProjectedField())
711 const auto sourceId = fieldDesc->GetProjectionSourceId();
713 fieldMap[field.get()] = &sourceField;
715 for (
const auto &subfield : *field) {
718 const auto &subSourceField =
720 fieldMap[&subfield] = &subSourceField;
722 changeset.fAddedProjectedFields.emplace_back(field.get());
729 return R__FAIL(
ex.GetError().GetReport());
732 commonFields.reserve(commonFields.size() + newFields.size());
733 for (
const auto *field : newFields) {
736 commonFields.emplace_back(*field, newFieldInDst);
744static ROOT::RResult<void>
749 if (!nEntriesToGenerate)
752 for (
const auto &column : columns) {
762 bool skipColumn =
false;
763 auto nRepetitions = std::max<std::uint64_t>(field->
GetNRepetitions(), 1);
781 "Destination RNTuple contains a streamer field (" + field->
GetFieldName() +
782 ") that is not present in one of the sources. "
783 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
792 const auto nElements = nEntriesToGenerate * nRepetitions;
793 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
795 constexpr auto kPageSizeLimit = 256 * 1024;
797 const size_t nPages = nBytesOnStorage / kPageSizeLimit + !!(nBytesOnStorage % kPageSizeLimit);
798 for (
size_t i = 0; i < nPages; ++i) {
799 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
801 const auto bufSize = pageSize + checksumSize;
802 assert(pageSize % colElement->GetSize() == 0);
803 const auto nElementsPerPage = pageSize / colElement->GetSize();
804 auto page = pageAlloc.
NewPage(colElement->GetSize(), nElementsPerPage);
806 memset(page.GetBuffer(), 0, page.GetNBytes());
808 auto &buffer = sealedPageData.
fBuffers.emplace_back(
new std::byte[bufSize]);
810 sealConf.
fElement = colElement.get();
811 sealConf.
fPage = &page;
812 sealConf.
fBuffer = buffer.get();
817 sealedPageData.
fPagesV.push_back({sealedPage});
818 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
819 sealedPageData.
fPagesV.back().cend());
830 std::span<const RColumnMergeInfo> commonColumns,
835 assert(nCommonColumnsInCluster == commonColumnSet.size());
836 assert(nCommonColumnsInCluster <= commonColumns.size());
837 if (nCommonColumnsInCluster == 0)
845 for (
size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
846 const auto &column = commonColumns[colIdx];
847 const auto &columnId = column.fInputId;
851 const auto srcColElement = column.fInMemoryType
854 const auto dstColElement = column.fInMemoryType
862 sealedPages.resize(pages.GetPageInfos().size());
873 const bool compressionIsDifferent =
875 const bool needsResealing =
876 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
877 const bool needsRecompressing = compressionIsDifferent || needsResealing;
881 << (needsResealing ?
"Resealing" :
"Recompressing") <<
" column " << column.fColumnName
882 <<
": { compression: " << colRangeCompressionSettings <<
" => "
888 size_t pageBufferBaseIdx = sealedPageData.
fBuffers.size();
891 if (needsRecompressing)
892 sealedPageData.
fBuffers.resize(sealedPageData.
fBuffers.size() + pages.GetPageInfos().size());
900 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.
GetFirstEntryIndex();
908 std::uint64_t pageIdx = 0;
909 for (
const auto &pageInfo : pages.GetPageInfos()) {
910 assert(pageIdx < sealedPages.size());
911 assert(sealedPageData.
fBuffers.size() == 0 || pageIdx < sealedPageData.
fBuffers.size());
921 sealedPage.
SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
922 sealedPage.
SetBuffer(onDiskPage->GetAddress());
927 if (needsRecompressing) {
928 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.
GetNElements();
929 auto &buffer = sealedPageData.
fBuffers[pageBufferBaseIdx + pageIdx];
930 const auto bufSize = uncompressedSize + checksumSize;
937 if (needsResealing) {
949 RTaskVisitor{
fTaskGroup}(RChangeCompressionFunc{
970 sealedPageData.
fPagesV.push_back(std::move(sealedPages));
971 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
972 sealedPageData.
fPagesV.back().cend());
985 std::span<const RColumnMergeInfo> extraDstColumns,
RNTupleMergeData &mergeData)
989 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
997 const auto nClusterEntries = clusterDesc.
GetNEntries();
1004 size_t nCommonColumnsInCluster = commonColumns.size();
1005 while (nCommonColumnsInCluster > 0) {
1009 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
1011 --nCommonColumnsInCluster;
1016 commonColumnSet.reserve(nCommonColumnsInCluster);
1017 for (
size_t i = 0; i < nCommonColumnsInCluster; ++i)
1018 commonColumnSet.emplace(commonColumns[i].fInputId);
1022 missingColumns.resize(extraDstColumns.size());
1023 for (
size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
1024 missingColumns.push_back(commonColumns[i]);
1027 auto res =
MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
1065 if (fieldType ==
"bool")
return typeid(
bool);
1066 if (fieldType ==
"std::byte")
return typeid(std::byte);
1067 if (fieldType ==
"char")
return typeid(char);
1068 if (fieldType ==
"std::int8_t")
return typeid(std::int8_t);
1069 if (fieldType ==
"std::uint8_t")
return typeid(std::uint8_t);
1070 if (fieldType ==
"std::int16_t")
return typeid(std::int16_t);
1071 if (fieldType ==
"std::uint16_t")
return typeid(std::uint16_t);
1072 if (fieldType ==
"std::int32_t")
return typeid(std::int32_t);
1073 if (fieldType ==
"std::uint32_t")
return typeid(std::uint32_t);
1074 if (fieldType ==
"std::int64_t")
return typeid(std::int64_t);
1075 if (fieldType ==
"std::uint64_t")
return typeid(std::uint64_t);
1076 if (fieldType ==
"float")
return typeid(float);
1077 if (fieldType ==
"double")
return typeid(
double);
1081 return std::nullopt;
1097 columns.reserve(columns.size() + columnIds.size());
1110 info.fInputId = srcColumn.GetPhysicalId();
1118 info.fParentFieldDescriptor = &srcFieldDesc;
1122 info.fParentNTupleDescriptor = &srcDesc;
1125 info.fOutputId = it->second.fColumnId;
1126 info.fColumnType = it->second.fColumnType;
1138 info.fColumnType = dstColumn.
GetType();
1139 mergeData.
fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
1144 <<
", phys.id " << srcColumn.GetPhysicalId() <<
", type "
1146 << info.fOutputId <<
", type "
1153 columns.emplace_back(info);
1156 const auto &srcChildrenIds = srcFieldDesc.
GetLinkIds();
1157 const auto &dstChildrenIds = dstFieldDesc.
GetLinkIds();
1158 assert(srcChildrenIds.size() == dstChildrenIds.size());
1159 for (
auto i = 0u; i < srcChildrenIds.size(); ++i) {
1172 RColumnInfoGroup res;
1176 for (
const auto &[srcField, dstField] : descCmp.fCommonFields) {
1182 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1183 [](
const auto &
a,
const auto &
b) { return a.fInputId < b.fInputId; });
1189 ColumnIdMap_t &colIdMap,
const std::string &prefix =
"")
1194 RColumnOutInfo info{};
1195 const auto colName =
name +
'.' + std::to_string(colDesc.GetIndex());
1196 info.fColumnId = colDesc.GetLogicalId();
1197 info.fColumnType = colDesc.GetType();
1198 colIdMap[colName] = info;
1201 for (
const auto &subId : fieldDesc.
GetLinkIds()) {
1208 std::unique_ptr<ROOT::RNTupleModel> model)
1236 const auto dstCompSettings =
fDestination->GetWriteOptions().GetCompression();
1240 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
1242 std::to_string(*mergeOpts.
fCompressionSettings) +
", sink: " + std::to_string(dstCompSettings) +
1243 ") This is currently unsupported.");
1250 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1251 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1260 for (
const auto &field : mergeData.fDstDescriptor.GetTopLevelFields()) {
1265#define SKIP_OR_ABORT(errMsg) \
1267 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1268 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1271 return R__FAIL(errMsg); \
1279 source->LoadStreamerInfo();
1281 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1282 auto srcDescriptor = source->GetSharedDescriptorGuard();
1283 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
1288 <<
"RNTuple '" << mergeData.fSrcDescriptor->GetName()
1289 <<
"' has a higher format version than the latest supported by this version "
1290 "of ROOT. Merging will work but some features may be dropped.";
1292 return R__FAIL(
"RNTuple '" + mergeData.fSrcDescriptor->GetName() +
1293 "' has a higher format version than the latest supported by this version. Refusing to "
1294 "merge, since RNTupleMergeOptions::fVersionBehavior is set to AbortOnHigherVersion.");
1303 for (
const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1308 SKIP_OR_ABORT(std::string(
"Source RNTuple has an incompatible schema with the destination:\n") +
1309 descCmpRes.GetError()->GetReport());
1311 auto descCmp = descCmpRes.Unwrap();
1316 std::string msg =
"Source RNTuple is missing the following fields:";
1317 for (
const auto *field : descCmp.fExtraDstFields) {
1318 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
1324 if (descCmp.fExtraSrcFields.size()) {
1332 std::string msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1333 for (
const auto *field : descCmp.fExtraSrcFields) {
1334 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
1341 auto columnInfos =
GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
1342 auto res =
MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
#define R__LOG_ERROR(...)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static std::optional< ENTupleMergeVersionBehavior > ParseOptionVersionBehavior(const TString &opts)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Ssiz_t
String size (currently int)
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::optional< TTaskGroup > fTaskGroup
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAlloc
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
virtual RIdentifier GetIdentifier() const =0
std::size_t GetPackedSize(std::size_t nElements=1U) const
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
void * GrowUnchecked(std::uint32_t nElements)
Increases the number elements in the page.
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
std::optional< std::uint32_t > GetCompressionSettings() const
Metadata for RNTuple clusters.
ROOT::NTupleSize_t GetNEntries() const
ROOT::DescriptorId_t GetId() const
const RPageRange & GetPageRange(ROOT::DescriptorId_t physicalId) const
bool ContainsColumn(ROOT::DescriptorId_t physicalId) const
const RColumnRange & GetColumnRange(ROOT::DescriptorId_t physicalId) const
ROOT::NTupleSize_t GetFirstEntryIndex() const
ROOT::ENTupleColumnType GetType() const
Base class for all ROOT issued exceptions.
Metadata stored for every field of an RNTuple.
const std::vector< ROOT::DescriptorId_t > & GetLogicalColumnIds() const
ROOT::ENTupleStructure GetStructure() const
const std::vector< ROOT::DescriptorId_t > & GetLinkIds() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
bool IsProjectedField() const
ROOT::DescriptorId_t GetProjectionSourceId() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
A log configuration for a channel, e.g.
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
RFieldDescriptorIterable GetTopLevelFields() const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
The RNTupleModel encapulates the schema of an RNTuple.
void Unfreeze()
Transitions an RNTupleModel from the frozen state back to the building state, invalidating all previo...
void Freeze()
Transitions an RNTupleModel from the building state to the frozen state, disabling adding additional ...
const ROOT::RFieldBase & GetConstField(std::string_view fieldName) const
Common user-tunable settings for storing RNTuples.
bool GetEnablePageChecksums() const
void SetCompression(std::uint32_t val)
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
static constexpr std::uint64_t GetCurrentVersion()
Returns the RNTuple version in the following form: Epoch: 2 most significant bytes Major: next 2 byte...
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page. This is commonly used in derived, concrete page sources....
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A class to pass information from the TFileMerger to the objects being merged.
TString fOptions
Additional text based option being passed down to customize the merge.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Int_t GetCompressionSettings() const
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
@ kFilter
The merger will discard all columns that aren't present in the prototype model (i....
@ kAbortOnHigherVersion
The merger will refuse to merge RNTuples with higher versions than the latest supported by this ROOT ...
@ kWarnOnHigherVersion
The merger will emit a warning when merging RNTuples with higher version than the latest supported by...
@ kAbort
The merger will abort merging as soon as an error is encountered.
@ kSkip
Upon errors, the merger will skip the current source and continue.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ROOT::DescriptorId_t fOutputId
const ROOT::RFieldDescriptor * fParentFieldDescriptor
ROOT::DescriptorId_t fInputId
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
ENTupleColumnType fColumnType
const ROOT::RNTupleDescriptor * fSrcDescriptor
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
ROOT::NTupleSize_t fNumDstEntries
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
ENTupleMergeVersionBehavior fVersionBehavior
Determines how the Merge function behaves depending on the RNTuple sources' version.
ENTupleMergeErrBehavior fErrBehavior
Determines how the Merge function behaves upon merging errors.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::vector< std::unique_ptr< std::byte[]> > fBuffers
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
The incremental changes to a RNTupleModel
std::vector< ROOT::RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
void SetBuffer(const void *buffer)
std::uint32_t GetNElements() const
void SetHasChecksum(bool hasChecksum)
void SetNElements(std::uint32_t nElements)
std::size_t GetBufferSize() const
void SetBufferSize(std::size_t bufferSize)
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
const void * GetBuffer() const
bool GetHasChecksum() const
std::size_t GetDataSize() const