38#include <unordered_map>
49 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo) {
50 Error(
"RNTuple::Merge",
"Invalid inputs.");
58 std::string ntupleName = std::string(itr()->GetName());
62 TFile *outFile =
dynamic_cast<TFile *
>(secondArg);
64 Error(
"RNTuple::Merge",
"Second input parameter should be a TFile, but it's a %s.", secondArg->
ClassName());
69 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
74 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
81 const bool defaultComp = mergeInfo->
fOptions.
Contains(
"default_compression");
82 const bool firstSrcComp = mergeInfo->
fOptions.
Contains(
"first_source_compression");
83 if (defaultComp && firstSrcComp) {
87 "Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
93 }
else if (!defaultComp) {
99 Info(
"RNTuple::Merge",
"Using the default compression: %d", compression);
103 std::vector<std::unique_ptr<RPageSourceFile>> sources;
104 std::vector<RPageSource *> sourcePtrs;
106 while (
const auto &pitr = itr()) {
110 Error(
"RNTuple::Merge",
"Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
120 auto descriptor = source->GetSharedDescriptorGuard();
121 auto clusterIter = descriptor->GetClusterIterable();
122 auto firstCluster = clusterIter.begin();
123 if (firstCluster == clusterIter.end()) {
124 Error(
"RNTuple::Merge",
125 "Asked to use the first source's compression as the output compression, but the "
126 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
131 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
132 auto firstColRange = colRangeIter.begin();
133 if (firstColRange == colRangeIter.end()) {
134 Error(
"RNTuple::Merge",
135 "Asked to use the first source's compression as the output compression, but the "
136 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
141 compression = (*firstColRange).fCompressionSettings;
142 Info(
"RNTuple::Merge",
"Using the first RNTuple's compression: %d", compression);
144 sources.push_back(std::move(source));
150 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
156 auto desc = outSource->GetSharedDescriptorGuard();
157 destination->InitFromDescriptor(desc.GetRef());
161 sourcePtrs.reserve(sources.size());
162 for (
const auto &s : sources) {
163 sourcePtrs.push_back(s.get());
178 Error(
"RNTuple::Merge",
"Exception thrown while merging: %s",
ex.what());
184struct RChangeCompressionFunc {
197 sealConf.
fElement = &fDstColElement;
198 sealConf.
fPage = &page;
203 fSealedPage = refSealedPage;
214struct RDescriptorsComparison {
215 std::vector<const RFieldDescriptor *> fExtraDstFields;
216 std::vector<const RFieldDescriptor *> fExtraSrcFields;
217 std::vector<RCommonField> fCommonFields;
220struct RColumnOutInfo {
226using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
228struct RColumnInfoGroup {
229 std::vector<RColumnMergeInfo> fExtraDstColumns;
230 std::vector<RColumnMergeInfo> fCommonColumns;
272 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
273 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
274 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
277std::ostream &
operator<<(std::ostream &os,
const std::optional<RColumnDescriptor::RValueRange> &
x)
280 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
292 if (
a == EColumnType::kInt16 &&
b == EColumnType::kSplitInt16)
return true;
293 if (
a == EColumnType::kSplitInt16 &&
b == EColumnType::kInt16)
return true;
294 if (
a == EColumnType::kInt32 &&
b == EColumnType::kSplitInt32)
return true;
295 if (
a == EColumnType::kSplitInt32 &&
b == EColumnType::kInt32)
return true;
296 if (
a == EColumnType::kInt64 &&
b == EColumnType::kSplitInt64)
return true;
297 if (
a == EColumnType::kSplitInt64 &&
b == EColumnType::kInt64)
return true;
298 if (
a == EColumnType::kUInt16 &&
b == EColumnType::kSplitUInt16)
return true;
299 if (
a == EColumnType::kSplitUInt16 &&
b == EColumnType::kUInt16)
return true;
300 if (
a == EColumnType::kUInt32 &&
b == EColumnType::kSplitUInt32)
return true;
301 if (
a == EColumnType::kSplitUInt32 &&
b == EColumnType::kUInt32)
return true;
302 if (
a == EColumnType::kUInt64 &&
b == EColumnType::kSplitUInt64)
return true;
303 if (
a == EColumnType::kSplitUInt64 &&
b == EColumnType::kUInt64)
return true;
304 if (
a == EColumnType::kIndex32 &&
b == EColumnType::kSplitIndex32)
return true;
305 if (
a == EColumnType::kSplitIndex32 &&
b == EColumnType::kIndex32)
return true;
306 if (
a == EColumnType::kIndex64 &&
b == EColumnType::kSplitIndex64)
return true;
307 if (
a == EColumnType::kSplitIndex64 &&
b == EColumnType::kIndex64)
return true;
308 if (
a == EColumnType::kReal32 &&
b == EColumnType::kSplitReal32)
return true;
309 if (
a == EColumnType::kSplitReal32 &&
b == EColumnType::kReal32)
return true;
310 if (
a == EColumnType::kReal64 &&
b == EColumnType::kSplitReal64)
return true;
311 if (
a == EColumnType::kSplitReal64 &&
b == EColumnType::kReal64)
return true;
327 std::vector<std::string> errors;
328 RDescriptorsComparison res;
330 std::vector<RCommonField> commonFields;
333 const auto srcFieldId =
src.FindFieldId(dstField.GetFieldName());
335 const auto &srcField =
src.GetFieldDescriptor(srcFieldId);
336 commonFields.push_back({&srcField, &dstField});
338 res.fExtraDstFields.emplace_back(&dstField);
341 for (
const auto &srcField :
src.GetTopLevelFields()) {
342 const auto dstFieldId = dst.
FindFieldId(srcField.GetFieldName());
344 res.fExtraSrcFields.push_back(&srcField);
348 for (
const auto &field : commonFields) {
350 const auto &fieldName = field.fSrc->GetFieldName();
353 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
354 if (!projCompatible) {
355 std::stringstream ss;
356 ss <<
"Field `" << fieldName <<
"` is incompatible with previously-seen field with that name because the "
357 << (field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
358 errors.push_back(ss.str());
359 }
else if (field.fSrc->IsProjectedField()) {
361 const auto srcName =
src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
363 if (srcName != dstName) {
364 std::stringstream ss;
365 ss <<
"Field `" << fieldName
366 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
367 << dstName <<
", new: " << srcName <<
")";
368 errors.push_back(ss.str());
374 const auto &srcTyName = field.fSrc->GetTypeName();
375 const auto &dstTyName = field.fDst->GetTypeName();
376 if (srcTyName != dstTyName) {
377 std::stringstream ss;
378 ss <<
"Field `" << fieldName
379 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
380 <<
", new: " << srcTyName <<
")";
381 errors.push_back(ss.str());
385 const auto srcTyChk = field.fSrc->GetTypeChecksum();
386 const auto dstTyChk = field.fDst->GetTypeChecksum();
387 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
388 std::stringstream ss;
389 ss <<
"Field `" << field.fSrc->GetFieldName()
390 <<
"` has a different type checksum than previously-seen field with the same name";
391 errors.push_back(ss.str());
395 const auto srcTyVer = field.fSrc->GetTypeVersion();
396 const auto dstTyVer = field.fDst->GetTypeVersion();
397 if (srcTyVer != dstTyVer) {
398 std::stringstream ss;
399 ss <<
"Field `" << field.fSrc->GetFieldName()
400 <<
"` has a different type version than previously-seen field with the same name (old: " << dstTyVer
401 <<
", new: " << srcTyVer <<
")";
402 errors.push_back(ss.str());
406 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
407 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
408 if (srcNCols != dstNCols) {
409 std::stringstream ss;
410 ss <<
"Field `" << field.fSrc->GetFieldName()
411 <<
"` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
412 <<
", new: " << srcNCols <<
")";
413 errors.push_back(ss.str());
415 for (
auto i = 0u; i < srcNCols; ++i) {
416 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
417 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
418 const auto &srcCol =
src.GetColumnDescriptor(srcColId);
423 if (srcCol.GetType() != dstCol.GetType() &&
425 std::stringstream ss;
426 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
427 <<
"` has a different column type of the same column on the previously-seen field with the same name "
431 errors.push_back(ss.str());
433 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
434 std::stringstream ss;
435 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
436 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
439 << srcCol.
GetBitsOnStorage() <<
", new: " << dstCol.GetBitsOnStorage() <<
")";
440 errors.push_back(ss.str());
442 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
443 std::stringstream ss;
444 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
445 <<
"` has a different value range of the same column on the previously-seen field with the same name "
447 << srcCol.GetValueRange() <<
", new: " << dstCol.GetValueRange() <<
")";
448 errors.push_back(ss.str());
450 if (srcCol.GetRepresentationIndex() > 0) {
451 std::stringstream ss;
452 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
453 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
454 errors.push_back(ss.str());
461 for (
const auto &err : errors)
462 errMsg += std::string(
"\n * ") + err;
465 errMsg = errMsg.substr(1);
470 res.fCommonFields.reserve(commonFields.size());
471 for (
const auto &[srcField, dstField] : commonFields) {
472 res.fCommonFields.emplace_back(srcField, dstField);
485 assert(newFields.size() > 0);
490 std::string msg =
"destination doesn't contain field";
491 if (newFields.size() > 1)
494 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](
const auto &acc,
const auto *field) {
495 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
497 Info(
"RNTuple::Merge",
"%s: adding %s to the destination model (entry #%" PRIu64
").", msg.c_str(),
498 (newFields.size() > 1 ?
"them" :
"it"), mergeData.
fNumDstEntries);
500 changeset.fAddedFields.reserve(newFields.size());
501 for (
const auto *fieldDesc : newFields) {
503 if (fieldDesc->IsProjectedField())
504 changeset.fAddedProjectedFields.emplace_back(field.get());
506 changeset.fAddedFields.emplace_back(field.get());
507 changeset.fModel.AddField(std::move(field));
512 commonFields.reserve(commonFields.size() + newFields.size());
513 for (
const auto *field : newFields) {
516 commonFields.emplace_back(field, &newFieldInDst);
523 std::span<RColumnMergeInfo> commonColumns,
527 assert(commonColumns.size() == commonColumnSet.size());
528 if (commonColumns.empty())
538 for (
const auto &column : commonColumns) {
539 const auto &columnId = column.fInputId;
540 R__ASSERT(clusterDesc.ContainsColumn(columnId));
543 const auto srcColElement = column.fInMemoryType
546 const auto dstColElement = column.fInMemoryType ?
GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
550 const auto &pages = clusterDesc.GetPageRange(columnId);
553 sealedPages.resize(pages.fPageInfos.size());
556 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
559 Info(
"RNTuple::Merge",
"Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
562 size_t pageBufferBaseIdx = sealedPageData.
fBuffers.size();
565 if (needsCompressionChange)
566 sealedPageData.
fBuffers.resize(sealedPageData.
fBuffers.size() + pages.fPageInfos.size());
569 std::uint64_t pageIdx = 0;
570 for (
const auto &pageInfo : pages.fPageInfos) {
571 assert(pageIdx < sealedPages.size());
572 assert(sealedPageData.
fBuffers.size() == 0 || pageIdx < sealedPageData.
fBuffers.size());
581 sealedPage.
SetBufferSize(pageInfo.fLocator.GetNBytesOnStorage() + checksumSize);
582 sealedPage.
SetBuffer(onDiskPage->GetAddress());
587 if (needsCompressionChange) {
588 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.
GetNElements();
589 auto &buffer = sealedPageData.
fBuffers[pageBufferBaseIdx + pageIdx];
590 buffer = MakeUninitArray<std::uint8_t>(uncompressedSize + checksumSize);
591 RChangeCompressionFunc compressTask{
592 *srcColElement, *dstColElement, mergeData.
fMergeOpts, sealedPage, *fPageAlloc, buffer.get(),
596 fTaskGroup->Run(compressTask);
608 sealedPageData.
fPagesV.push_back(std::move(sealedPages));
609 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
610 sealedPageData.
fPagesV.back().cend());
619 for (
const auto &column : extraDstColumns) {
620 const auto &columnId = column.fInputId;
630 bool skipColumn =
false;
631 auto nRepetitions = std::max<std::uint64_t>(field->
GetNRepetitions(), 1);
634 if (parent.
GetStructure() == ENTupleStructure::kCollection ||
647 if (structure == ENTupleStructure::kStreamer) {
650 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
651 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
657 R__ASSERT(structure == ENTupleStructure::kCollection || structure == ENTupleStructure::kVariant ||
658 structure == ENTupleStructure::kLeaf);
661 const auto nElements = nClusterEntries * nRepetitions;
662 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
663 constexpr auto kPageSizeLimit = 256 * 1024;
665 const size_t nPages = nBytesOnStorage / kPageSizeLimit + !!(nBytesOnStorage % kPageSizeLimit);
666 for (
size_t i = 0; i < nPages; ++i) {
667 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
669 const auto bufSize = pageSize + checksumSize;
670 auto &buffer = sealedPageData.
fBuffers.emplace_back(
new unsigned char[bufSize]);
673 memset(buffer.get(), 0, pageSize);
674 sealedPage.ChecksumIfEnabled();
676 sealedPageData.
fPagesV.push_back({sealedPage});
679 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
680 sealedPageData.
fPagesV.back().cend());
696 commonColumnSet.reserve(commonColumns.size());
697 for (
const auto &column : commonColumns)
698 commonColumnSet.emplace(column.fInputId);
701 extraDstColumnSet.reserve(extraDstColumns.size());
702 for (
const auto &column : extraDstColumns)
703 extraDstColumnSet.emplace(column.fInputId);
711 const auto nClusterEntries = clusterDesc.
GetNEntries();
716 if (!commonColumnSet.empty()) {
717 MergeCommonColumns(clusterPool, clusterId, commonColumns, commonColumnSet, sealedPageData, mergeData);
720 if (!extraDstColumnSet.empty()) {
743 if (onDiskType == EColumnType::kIndex32 || onDiskType == EColumnType::kSplitIndex32 ||
744 onDiskType == EColumnType::kIndex64 || onDiskType == EColumnType::kSplitIndex64)
747 if (onDiskType == EColumnType::kSwitch)
750 if (fieldType ==
"bool") {
752 }
else if (fieldType ==
"std::byte") {
753 return typeid(std::byte);
754 }
else if (fieldType ==
"char") {
756 }
else if (fieldType ==
"std::int8_t") {
757 return typeid(std::int8_t);
758 }
else if (fieldType ==
"std::uint8_t") {
759 return typeid(std::uint8_t);
760 }
else if (fieldType ==
"std::int16_t") {
761 return typeid(std::int16_t);
762 }
else if (fieldType ==
"std::uint16_t") {
763 return typeid(std::uint16_t);
764 }
else if (fieldType ==
"std::int32_t") {
765 return typeid(std::int32_t);
766 }
else if (fieldType ==
"std::uint32_t") {
767 return typeid(std::uint32_t);
768 }
else if (fieldType ==
"std::int64_t") {
769 return typeid(std::int64_t);
770 }
else if (fieldType ==
"std::uint64_t") {
771 return typeid(std::uint64_t);
772 }
else if (fieldType ==
"float") {
773 return typeid(float);
774 }
else if (fieldType ==
"double") {
796 columns.reserve(columns.size() + columnIds.size());
808 info.fInputId = srcColumn.GetPhysicalId();
811 info.fParentField = &dstFieldDesc;
814 info.fOutputId = it->second.fColumnId;
815 info.fColumnType = it->second.fColumnType;
827 info.fColumnType = dstColumn.
GetType();
828 mergeData.
fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
832 Info(
"RNTuple::Merge",
833 "Adding column %s with log.id %" PRIu64
", phys.id %" PRIu64
", type %s "
834 " -> log.id %" PRIu64
", type %s",
835 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
843 columns.emplace_back(info);
846 const auto &srcChildrenIds = srcFieldDesc.
GetLinkIds();
847 const auto &dstChildrenIds = dstFieldDesc.
GetLinkIds();
848 assert(srcChildrenIds.size() == dstChildrenIds.size());
849 for (
auto i = 0u; i < srcChildrenIds.size(); ++i) {
859static RColumnInfoGroup
862 RColumnInfoGroup res;
866 for (
const auto &[srcField, dstField] : descCmp.fCommonFields) {
892 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
895 ") This is currently unsupported.");
901 std::unique_ptr<RNTupleModel> model;
903#define SKIP_OR_ABORT(errMsg) \
905 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
906 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
909 return R__FAIL(errMsg); \
916 auto srcDescriptor = source->GetSharedDescriptorGuard();
917 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
922 opts.fReconstructProjections =
true;
923 model = srcDescriptor->CreateModel(opts);
924 destination.
Init(*model);
927 for (
const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
933 std::string(
"Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
934 descCmpRes.GetError()->GetReport());
936 auto descCmp = descCmpRes.Unwrap();
941 std::string msg =
"Source RNTuple is missing the following fields:";
942 for (
const auto *field : descCmp.fExtraDstFields) {
943 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
949 if (descCmp.fExtraSrcFields.size()) {
955 std::string msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
956 for (
const auto *field : descCmp.fExtraSrcFields) {
957 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
964 auto columnInfos =
GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
965 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnMergeInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool IsSplitOrUnsplitVersionOf(EColumnType a, EColumnType b)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc, const RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(EColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
std::optional< TTaskGroup > fTaskGroup
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeSourceClusters(RPageSource &source, std::span< RColumnMergeInfo > commonColumns, std::span< RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
bool IsInitialized() const
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
NTupleSize_t GetNEntries() const
EColumnType GetType() const
std::uint16_t GetBitsOnStorage() const
Meta-data stored for every field of an ntuple.
DescriptorId_t GetParentId() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
const std::vector< DescriptorId_t > & GetLogicalColumnIds() const
bool IsProjectedField() const
const std::vector< DescriptorId_t > & GetLinkIds() const
std::uint64_t GetNRepetitions() const
ENTupleStructure GetStructure() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RColumnDescriptor & GetColumnDescriptor(DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetTopLevelFields() const
DescriptorId_t FindClusterId(NTupleSize_t entryIdx) const
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
int GetCompression() const
void SetCompression(int val)
A class to manage the asynchronous execution of work items.
Base class for all ROOT issued exceptions.
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Int_t GetCompressionSettings() const
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
constexpr int kNTupleUnknownCompression
Regular, known compression settings have the form algorithm * 100 + level, e.g. 101,...
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
const RFieldDescriptor * fParentField
std::optional< std::type_index > fInMemoryType
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
const RNTupleDescriptor & fDstDescriptor
const RNTupleDescriptor * fSrcDescriptor
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
NTupleSize_t fNumDstEntries
int fCompressionSettings
If fCompressionSettings == kNTupleUnknownCompression (the default), the merger will not change the co...
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Parameters for the SealPage() method.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
const RPage * fPage
Input page to be sealed.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
void SetHasChecksum(bool hasChecksum)
void SetNElements(std::uint32_t nElements)
void SetBuffer(const void *buffer)
void SetBufferSize(std::size_t bufferSize)
std::uint32_t GetNElements() const
std::size_t GetBufferSize() const
bool GetHasChecksum() const
RResult< void > VerifyChecksumIfEnabled() const
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
Modifiers passed to CreateModel
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...