38#include <unordered_map>
49 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo) {
50 Error(
"RNTuple::Merge",
"Invalid inputs.");
58 std::string ntupleName = std::string(itr()->GetName());
62 TFile *outFile =
dynamic_cast<TFile *
>(secondArg);
64 Error(
"RNTuple::Merge",
"Second input parameter should be a TFile, but it's a %s.", secondArg->
ClassName());
69 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
74 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
81 const bool defaultComp = mergeInfo->
fOptions.
Contains(
"default_compression");
82 const bool firstSrcComp = mergeInfo->
fOptions.
Contains(
"first_source_compression");
83 if (defaultComp && firstSrcComp) {
87 "Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
93 }
else if (!defaultComp) {
99 Info(
"RNTuple::Merge",
"Using the default compression: %d", compression);
103 std::vector<std::unique_ptr<RPageSourceFile>> sources;
104 std::vector<RPageSource *> sourcePtrs;
106 while (
const auto &pitr = itr()) {
110 Error(
"RNTuple::Merge",
"Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
120 auto descriptor = source->GetSharedDescriptorGuard();
121 auto clusterIter = descriptor->GetClusterIterable();
122 auto firstCluster = clusterIter.begin();
123 if (firstCluster == clusterIter.end()) {
124 Error(
"RNTuple::Merge",
125 "Asked to use the first source's compression as the output compression, but the "
126 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
131 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
132 auto firstColRange = colRangeIter.begin();
133 if (firstColRange == colRangeIter.end()) {
134 Error(
"RNTuple::Merge",
135 "Asked to use the first source's compression as the output compression, but the "
136 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
141 compression = (*firstColRange).fCompressionSettings;
142 Info(
"RNTuple::Merge",
"Using the first RNTuple's compression: %d", compression);
144 sources.push_back(std::move(source));
150 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
156 auto desc = outSource->GetSharedDescriptorGuard();
157 destination->InitFromDescriptor(desc.GetRef());
161 sourcePtrs.reserve(sources.size());
162 for (
const auto &s : sources) {
163 sourcePtrs.push_back(s.get());
178 Error(
"RNTuple::Merge",
"Exception thrown while merging: %s",
ex.what());
184struct RChangeCompressionFunc {
199 sealConf.
fElement = &fDstColElement;
200 sealConf.
fPage = &page;
205 fSealedPage = refSealedPage;
216struct RDescriptorsComparison {
217 std::vector<const RFieldDescriptor *> fExtraDstFields;
218 std::vector<const RFieldDescriptor *> fExtraSrcFields;
219 std::vector<RCommonField> fCommonFields;
222struct RColumnOutInfo {
228using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
230struct RColumnInfoGroup {
231 std::vector<RColumnMergeInfo> fExtraDstColumns;
232 std::vector<RColumnMergeInfo> fCommonColumns;
274 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
275 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
276 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
279std::ostream &
operator<<(std::ostream &os,
const std::optional<RColumnDescriptor::RValueRange> &
x)
282 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
294 if (
a == EColumnType::kInt16 &&
b == EColumnType::kSplitInt16)
return true;
295 if (
a == EColumnType::kSplitInt16 &&
b == EColumnType::kInt16)
return true;
296 if (
a == EColumnType::kInt32 &&
b == EColumnType::kSplitInt32)
return true;
297 if (
a == EColumnType::kSplitInt32 &&
b == EColumnType::kInt32)
return true;
298 if (
a == EColumnType::kInt64 &&
b == EColumnType::kSplitInt64)
return true;
299 if (
a == EColumnType::kSplitInt64 &&
b == EColumnType::kInt64)
return true;
300 if (
a == EColumnType::kUInt16 &&
b == EColumnType::kSplitUInt16)
return true;
301 if (
a == EColumnType::kSplitUInt16 &&
b == EColumnType::kUInt16)
return true;
302 if (
a == EColumnType::kUInt32 &&
b == EColumnType::kSplitUInt32)
return true;
303 if (
a == EColumnType::kSplitUInt32 &&
b == EColumnType::kUInt32)
return true;
304 if (
a == EColumnType::kUInt64 &&
b == EColumnType::kSplitUInt64)
return true;
305 if (
a == EColumnType::kSplitUInt64 &&
b == EColumnType::kUInt64)
return true;
306 if (
a == EColumnType::kIndex32 &&
b == EColumnType::kSplitIndex32)
return true;
307 if (
a == EColumnType::kSplitIndex32 &&
b == EColumnType::kIndex32)
return true;
308 if (
a == EColumnType::kIndex64 &&
b == EColumnType::kSplitIndex64)
return true;
309 if (
a == EColumnType::kSplitIndex64 &&
b == EColumnType::kIndex64)
return true;
310 if (
a == EColumnType::kReal32 &&
b == EColumnType::kSplitReal32)
return true;
311 if (
a == EColumnType::kSplitReal32 &&
b == EColumnType::kReal32)
return true;
312 if (
a == EColumnType::kReal64 &&
b == EColumnType::kSplitReal64)
return true;
313 if (
a == EColumnType::kSplitReal64 &&
b == EColumnType::kReal64)
return true;
329 std::vector<std::string> errors;
330 RDescriptorsComparison res;
332 std::vector<RCommonField> commonFields;
335 const auto srcFieldId =
src.FindFieldId(dstField.GetFieldName());
337 const auto &srcField =
src.GetFieldDescriptor(srcFieldId);
338 commonFields.push_back({&srcField, &dstField});
340 res.fExtraDstFields.emplace_back(&dstField);
343 for (
const auto &srcField :
src.GetTopLevelFields()) {
344 const auto dstFieldId = dst.
FindFieldId(srcField.GetFieldName());
346 res.fExtraSrcFields.push_back(&srcField);
350 for (
const auto &field : commonFields) {
352 const auto &fieldName = field.fSrc->GetFieldName();
355 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
356 if (!projCompatible) {
357 std::stringstream ss;
358 ss <<
"Field `" << fieldName <<
"` is incompatible with previously-seen field with that name because the "
359 << (field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
360 errors.push_back(ss.str());
361 }
else if (field.fSrc->IsProjectedField()) {
363 const auto srcName =
src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
365 if (srcName != dstName) {
366 std::stringstream ss;
367 ss <<
"Field `" << fieldName
368 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
369 << dstName <<
", new: " << srcName <<
")";
370 errors.push_back(ss.str());
376 const auto &srcTyName = field.fSrc->GetTypeName();
377 const auto &dstTyName = field.fDst->GetTypeName();
378 if (srcTyName != dstTyName) {
379 std::stringstream ss;
380 ss <<
"Field `" << fieldName
381 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
382 <<
", new: " << srcTyName <<
")";
383 errors.push_back(ss.str());
387 const auto srcTyChk = field.fSrc->GetTypeChecksum();
388 const auto dstTyChk = field.fDst->GetTypeChecksum();
389 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
390 std::stringstream ss;
391 ss <<
"Field `" << field.fSrc->GetFieldName()
392 <<
"` has a different type checksum than previously-seen field with the same name";
393 errors.push_back(ss.str());
397 const auto srcTyVer = field.fSrc->GetTypeVersion();
398 const auto dstTyVer = field.fDst->GetTypeVersion();
399 if (srcTyVer != dstTyVer) {
400 std::stringstream ss;
401 ss <<
"Field `" << field.fSrc->GetFieldName()
402 <<
"` has a different type version than previously-seen field with the same name (old: " << dstTyVer
403 <<
", new: " << srcTyVer <<
")";
404 errors.push_back(ss.str());
408 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
409 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
410 if (srcNCols != dstNCols) {
411 std::stringstream ss;
412 ss <<
"Field `" << field.fSrc->GetFieldName()
413 <<
"` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
414 <<
", new: " << srcNCols <<
")";
415 errors.push_back(ss.str());
417 for (
auto i = 0u; i < srcNCols; ++i) {
418 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
419 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
420 const auto &srcCol =
src.GetColumnDescriptor(srcColId);
425 if (srcCol.GetType() != dstCol.GetType() &&
427 std::stringstream ss;
428 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
429 <<
"` has a different column type of the same column on the previously-seen field with the same name "
433 errors.push_back(ss.str());
435 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
436 std::stringstream ss;
437 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
438 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
441 << srcCol.
GetBitsOnStorage() <<
", new: " << dstCol.GetBitsOnStorage() <<
")";
442 errors.push_back(ss.str());
444 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
445 std::stringstream ss;
446 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
447 <<
"` has a different value range of the same column on the previously-seen field with the same name "
449 << srcCol.GetValueRange() <<
", new: " << dstCol.GetValueRange() <<
")";
450 errors.push_back(ss.str());
452 if (srcCol.GetRepresentationIndex() > 0) {
453 std::stringstream ss;
454 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
455 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
456 errors.push_back(ss.str());
463 for (
const auto &err : errors)
464 errMsg += std::string(
"\n * ") + err;
467 errMsg = errMsg.substr(1);
472 res.fCommonFields.reserve(commonFields.size());
473 for (
const auto &[srcField, dstField] : commonFields) {
474 res.fCommonFields.emplace_back(srcField, dstField);
487 assert(newFields.size() > 0);
492 std::string msg =
"destination doesn't contain field";
493 if (newFields.size() > 1)
496 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](
const auto &acc,
const auto *field) {
497 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
499 Info(
"RNTuple::Merge",
"%s: adding %s to the destination model (entry #%" PRIu64
").", msg.c_str(),
500 (newFields.size() > 1 ?
"them" :
"it"), mergeData.
fNumDstEntries);
502 changeset.fAddedFields.reserve(newFields.size());
503 for (
const auto *fieldDesc : newFields) {
505 if (fieldDesc->IsProjectedField())
506 changeset.fAddedProjectedFields.emplace_back(field.get());
508 changeset.fAddedFields.emplace_back(field.get());
509 changeset.fModel.AddField(std::move(field));
514 commonFields.reserve(commonFields.size() + newFields.size());
515 for (
const auto *field : newFields) {
518 commonFields.emplace_back(field, &newFieldInDst);
525 std::span<RColumnMergeInfo> commonColumns,
529 assert(commonColumns.size() == commonColumnSet.size());
530 if (commonColumns.empty())
540 for (
const auto &column : commonColumns) {
541 const auto &columnId = column.fInputId;
542 R__ASSERT(clusterDesc.ContainsColumn(columnId));
545 const auto srcColElement = column.fInMemoryType
548 const auto dstColElement = column.fInMemoryType ?
GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
552 const auto &pages = clusterDesc.GetPageRange(columnId);
555 sealedPages.resize(pages.fPageInfos.size());
558 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
561 Info(
"RNTuple::Merge",
"Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
564 size_t pageBufferBaseIdx = sealedPageData.
fBuffers.size();
567 if (needsCompressionChange)
568 sealedPageData.
fBuffers.resize(sealedPageData.
fBuffers.size() + pages.fPageInfos.size());
571 std::uint64_t pageIdx = 0;
572 for (
const auto &pageInfo : pages.fPageInfos) {
573 assert(pageIdx < sealedPages.size());
574 assert(sealedPageData.
fBuffers.size() == 0 || pageIdx < sealedPageData.
fBuffers.size());
583 sealedPage.
SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
584 sealedPage.
SetBuffer(onDiskPage->GetAddress());
589 if (needsCompressionChange) {
590 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.
GetNElements();
591 auto &buffer = sealedPageData.
fBuffers[pageBufferBaseIdx + pageIdx];
592 buffer = std::make_unique<std::uint8_t[]>(uncompressedSize + checksumSize);
593 RChangeCompressionFunc compressTask{
594 column.fOutputId, *srcColElement, *dstColElement, mergeData.
fMergeOpts,
595 sealedPage, *fPageAlloc, buffer.get(),
599 fTaskGroup->Run(compressTask);
611 sealedPageData.
fPagesV.push_back(std::move(sealedPages));
612 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
613 sealedPageData.
fPagesV.back().cend());
622 for (
const auto &column : extraDstColumns) {
623 const auto &columnId = column.fInputId;
633 bool skipColumn =
false;
634 auto nRepetitions = std::max<std::uint64_t>(field->
GetNRepetitions(), 1);
637 if (parent.
GetStructure() == ENTupleStructure::kCollection ||
650 if (structure == ENTupleStructure::kStreamer) {
653 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
654 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
660 R__ASSERT(structure == ENTupleStructure::kCollection || structure == ENTupleStructure::kVariant ||
661 structure == ENTupleStructure::kLeaf);
664 const auto nElements = nClusterEntries * nRepetitions;
665 const auto bytesOnStorage = colElement->GetPackedSize(nElements);
666 constexpr auto kPageSizeLimit = 256 * 1024;
668 const size_t nPages = bytesOnStorage / kPageSizeLimit + !!(bytesOnStorage % kPageSizeLimit);
669 for (
size_t i = 0; i < nPages; ++i) {
670 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : bytesOnStorage - kPageSizeLimit * (nPages - 1);
672 const auto bufSize = pageSize + checksumSize;
673 auto &buffer = sealedPageData.
fBuffers.emplace_back(
new unsigned char[bufSize]);
676 memset(buffer.get(), 0, pageSize);
677 sealedPage.ChecksumIfEnabled();
679 sealedPageData.
fPagesV.push_back({sealedPage});
682 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
683 sealedPageData.
fPagesV.back().cend());
699 commonColumnSet.reserve(commonColumns.size());
700 for (
const auto &column : commonColumns)
701 commonColumnSet.emplace(column.fInputId);
704 extraDstColumnSet.reserve(extraDstColumns.size());
705 for (
const auto &column : extraDstColumns)
706 extraDstColumnSet.emplace(column.fInputId);
714 const auto nClusterEntries = clusterDesc.
GetNEntries();
719 if (!commonColumnSet.empty()) {
720 MergeCommonColumns(clusterPool, clusterId, commonColumns, commonColumnSet, sealedPageData, mergeData);
723 if (!extraDstColumnSet.empty()) {
746 if (onDiskType == EColumnType::kIndex32 || onDiskType == EColumnType::kSplitIndex32 ||
747 onDiskType == EColumnType::kIndex64 || onDiskType == EColumnType::kSplitIndex64)
750 if (onDiskType == EColumnType::kSwitch)
753 if (fieldType ==
"bool") {
755 }
else if (fieldType ==
"std::byte") {
756 return typeid(std::byte);
757 }
else if (fieldType ==
"char") {
759 }
else if (fieldType ==
"std::int8_t") {
760 return typeid(std::int8_t);
761 }
else if (fieldType ==
"std::uint8_t") {
762 return typeid(std::uint8_t);
763 }
else if (fieldType ==
"std::int16_t") {
764 return typeid(std::int16_t);
765 }
else if (fieldType ==
"std::uint16_t") {
766 return typeid(std::uint16_t);
767 }
else if (fieldType ==
"std::int32_t") {
768 return typeid(std::int32_t);
769 }
else if (fieldType ==
"std::uint32_t") {
770 return typeid(std::uint32_t);
771 }
else if (fieldType ==
"std::int64_t") {
772 return typeid(std::int64_t);
773 }
else if (fieldType ==
"std::uint64_t") {
774 return typeid(std::uint64_t);
775 }
else if (fieldType ==
"float") {
776 return typeid(float);
777 }
else if (fieldType ==
"double") {
799 columns.reserve(columns.size() + columnIds.size());
811 info.fInputId = srcColumn.GetPhysicalId();
814 info.fParentField = &dstFieldDesc;
817 info.fOutputId = it->second.fColumnId;
818 info.fColumnType = it->second.fColumnType;
830 info.fColumnType = dstColumn.
GetType();
831 mergeData.
fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
835 Info(
"RNTuple::Merge",
836 "Adding column %s with log.id %" PRIu64
", phys.id %" PRIu64
", type %s "
837 " -> log.id %" PRIu64
", type %s",
838 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
846 columns.emplace_back(info);
849 const auto &srcChildrenIds = srcFieldDesc.
GetLinkIds();
850 const auto &dstChildrenIds = dstFieldDesc.
GetLinkIds();
851 assert(srcChildrenIds.size() == dstChildrenIds.size());
852 for (
auto i = 0u; i < srcChildrenIds.size(); ++i) {
862static RColumnInfoGroup
865 RColumnInfoGroup res;
869 for (
const auto &[srcField, dstField] : descCmp.fCommonFields) {
895 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
898 ") This is currently unsupported.");
904 std::unique_ptr<RNTupleModel> model;
906#define SKIP_OR_ABORT(errMsg) \
908 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
909 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
912 return R__FAIL(errMsg); \
919 auto srcDescriptor = source->GetSharedDescriptorGuard();
920 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
925 opts.fReconstructProjections =
true;
926 model = srcDescriptor->CreateModel(opts);
927 destination.
Init(*model);
930 for (
const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
936 std::string(
"Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
937 descCmpRes.GetError()->GetReport());
939 auto descCmp = descCmpRes.Unwrap();
944 std::string msg =
"Source RNTuple is missing the following fields:";
945 for (
const auto *field : descCmp.fExtraDstFields) {
946 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
952 if (descCmp.fExtraSrcFields.size()) {
958 std::string msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
959 for (
const auto *field : descCmp.fExtraSrcFields) {
960 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
967 auto columnInfos =
GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
968 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnMergeInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool IsSplitOrUnsplitVersionOf(EColumnType a, EColumnType b)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc, const RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(EColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
std::optional< TTaskGroup > fTaskGroup
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeSourceClusters(RPageSource &source, std::span< RColumnMergeInfo > commonColumns, std::span< RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
bool IsInitialized() const
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
ClusterSize_t GetNEntries() const
EColumnType GetType() const
std::uint16_t GetBitsOnStorage() const
Holds the index and the tag of a kSwitch column.
Base class for all ROOT issued exceptions.
Meta-data stored for every field of an ntuple.
DescriptorId_t GetParentId() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
const std::vector< DescriptorId_t > & GetLogicalColumnIds() const
bool IsProjectedField() const
const std::vector< DescriptorId_t > & GetLinkIds() const
std::uint64_t GetNRepetitions() const
ENTupleStructure GetStructure() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RColumnDescriptor & GetColumnDescriptor(DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetTopLevelFields() const
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
int GetCompression() const
void SetCompression(int val)
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
A class to manage the asynchronous execution of work items.
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Int_t GetCompressionSettings() const
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
RClusterSize ClusterSize_t
constexpr int kUnknownCompressionSettings
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
const RFieldDescriptor * fParentField
std::optional< std::type_index > fInMemoryType
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
const RNTupleDescriptor & fDstDescriptor
const RNTupleDescriptor * fSrcDescriptor
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
NTupleSize_t fNumDstEntries
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Parameters for the SealPage() method.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
const RPage * fPage
Input page to be sealed.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
void SetHasChecksum(bool hasChecksum)
void SetNElements(std::uint32_t nElements)
void SetBuffer(const void *buffer)
void SetBufferSize(std::size_t bufferSize)
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
std::size_t GetBufferSize() const
bool GetHasChecksum() const
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
Modifiers passed to CreateModel
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...