39#include <initializer_list>
40#include <unordered_map>
69 std::initializer_list<std::pair<const char *, T>>
validValues)
89 {
"Filter", ENTupleMergingMode::kFilter},
90 {
"Union", ENTupleMergingMode::kUnion},
91 {
"Strict", ENTupleMergingMode::kStrict},
99 {
"Abort", ENTupleMergeErrBehavior::kAbort},
100 {
"Skip", ENTupleMergeErrBehavior::kSkip},
111 Error(
"RNTuple::Merge",
"Invalid inputs.");
125 Error(
"RNTuple::Merge",
"Second input parameter should be a TFile, but it's a %s.",
secondArg->ClassName());
135 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
147 Warning(
"RNTuple::Merge",
"Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
148 "only the latter will apply.");
160 Info(
"RNTuple::Merge",
"Using the default compression: %u", *
compression);
164 std::vector<std::unique_ptr<RPageSourceFile>>
sources;
167 while (
const auto &
pitr =
itr()) {
171 Info(
"RNTuple::Merge",
"No RNTuple anchor named '%s' from file '%s'",
ntupleName.c_str(),
inFile->GetName());
179 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
184 Error(
"RNTuple::Merge",
185 "Asked to use the first source's compression as the output compression, but the "
186 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
191 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
194 Error(
"RNTuple::Merge",
195 "Asked to use the first source's compression as the output compression, but the "
196 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
201 compression = (*firstColRange).GetCompressionSettings().value();
202 Info(
"RNTuple::Merge",
"Using the first RNTuple's compression: %u", *
compression);
211 std::unique_ptr<ROOT::RNTupleModel> model;
215 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
216 auto desc =
outSource->GetSharedDescriptorGuard();
217 model =
destination->InitFromDescriptor(desc.GetRef(),
true );
222 for (
const auto &s :
sources) {
244}
catch (
const std::exception &
ex) {
245 Error(
"RNTuple::Merge",
"Exception thrown while merging: %s",
ex.what());
251struct RChangeCompressionFunc {
262 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
264 sealConf.fElement = &fDstColElement;
281struct RDescriptorsComparison {
282 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
283 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
284 std::vector<RCommonField> fCommonFields;
287struct RColumnOutInfo {
293using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
295struct RColumnInfoGroup {
296 std::vector<RColumnMergeInfo> fExtraDstColumns;
298 std::vector<RColumnMergeInfo> fCommonColumns;
340 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
341 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
342 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
345std::ostream &
operator<<(std::ostream &os,
const std::optional<ROOT::RColumnDescriptor::RValueRange> &
x)
348 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
395 std::vector<std::string>
errors;
396 RDescriptorsComparison res;
400 for (
const auto &
dstField :
dst.GetTopLevelFields()) {
406 res.fExtraDstFields.emplace_back(&
dstField);
409 for (
const auto &
srcField :
src.GetTopLevelFields()) {
412 res.fExtraSrcFields.push_back(&
srcField);
423 std::stringstream
ss;
424 ss <<
"Field `" <<
fieldName <<
"` is incompatible with previously-seen field with that name because the "
425 << (
field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
427 }
else if (
field.fSrc->IsProjectedField()) {
429 const auto srcName =
src.GetQualifiedFieldName(
field.fSrc->GetProjectionSourceId());
430 const auto dstName =
dst.GetQualifiedFieldName(
field.fDst->GetProjectionSourceId());
432 std::stringstream
ss;
434 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
445 std::stringstream
ss;
447 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " <<
dstTyName
456 std::stringstream
ss;
457 ss <<
"Field `" <<
field.fSrc->GetFieldName()
458 <<
"` has a different type checksum than previously-seen field with the same name";
466 std::stringstream
ss;
467 ss <<
"Field `" <<
field.fSrc->GetFieldName()
468 <<
"` has a different type version than previously-seen field with the same name (old: " <<
dstTyVer
474 const auto srcNCols =
field.fSrc->GetLogicalColumnIds().size();
475 const auto dstNCols =
field.fDst->GetLogicalColumnIds().size();
477 std::stringstream
ss;
478 ss <<
"Field `" <<
field.fSrc->GetFieldName()
479 <<
"` has a different number of columns than previously-seen field with the same name (old: " <<
dstNCols
493 std::stringstream
ss;
494 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
495 <<
"` has a different column type of the same column on the previously-seen field with the same name "
501 if (
srcCol.GetBitsOnStorage() !=
dstCol.GetBitsOnStorage()) {
502 std::stringstream
ss;
503 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
504 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
507 <<
srcCol.GetBitsOnStorage() <<
", new: " <<
dstCol.GetBitsOnStorage() <<
")";
511 std::stringstream
ss;
512 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
513 <<
"` has a different value range of the same column on the previously-seen field with the same name "
515 <<
srcCol.GetValueRange() <<
", new: " <<
dstCol.GetValueRange() <<
")";
518 if (
srcCol.GetRepresentationIndex() > 0) {
519 std::stringstream
ss;
520 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
521 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
529 for (
const auto &err :
errors)
530 errMsg += std::string(
"\n * ") + err;
558 if (
mergeData.fMergeOpts.fExtraVerbose) {
559 std::string
msg =
"destination doesn't contain field";
564 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
566 Info(
"RNTuple::Merge",
"%s: adding %s to the destination model (entry #%" PRIu64 ").",
msg.c_str(),
619 for (
const auto &column :
columns) {
620 const auto &
columnId = column.fInputId;
646 const auto structure =
field->GetStructure();
651 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
652 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
653 field->GetFieldName().c_str());
667 for (
size_t i = 0; i <
nPages; ++i) {
682 sealConf.fCompressionSettings =
mergeData.fMergeOpts.fCompressionSettings.value();
683 sealConf.fWriteChecksum =
mergeData.fDestination.GetWriteOptions().GetEnablePageChecksums();
714 const auto &
columnId = column.fInputId;
740 Info(
"RNTuple::Merge",
"Resealing column %s: { compression: %d => %d, onDiskType: %s => %s }",
742 mergeData.fMergeOpts.fCompressionSettings.value(),
781 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
892 if (
fieldType ==
"std::byte")
return typeid(std::byte);
893 if (
fieldType ==
"char")
return typeid(char);
894 if (
fieldType ==
"std::int8_t")
return typeid(std::int8_t);
895 if (
fieldType ==
"std::uint8_t")
return typeid(std::uint8_t);
896 if (
fieldType ==
"std::int16_t")
return typeid(std::int16_t);
897 if (
fieldType ==
"std::uint16_t")
return typeid(std::uint16_t);
898 if (
fieldType ==
"std::int32_t")
return typeid(std::int32_t);
899 if (
fieldType ==
"std::uint32_t")
return typeid(std::uint32_t);
900 if (
fieldType ==
"std::int64_t")
return typeid(std::int64_t);
901 if (
fieldType ==
"std::uint64_t")
return typeid(std::uint64_t);
902 if (
fieldType ==
"float")
return typeid(float);
946 info.fOutputId = it->second.fColumnId;
947 info.fColumnType = it->second.fColumnType;
963 if (
mergeData.fMergeOpts.fExtraVerbose) {
964 Info(
"RNTuple::Merge",
965 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
966 " -> log.id %" PRIu64 ", type %s",
994 RColumnInfoGroup res;
1004 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1005 [](
const auto &
a,
const auto &
b) { return a.fInputId < b.fInputId; });
1011 ColumnIdMap_t &
colIdMap,
const std::string &prefix =
"")
1016 RColumnOutInfo
info{};
1030 std::unique_ptr<ROOT::RNTupleModel> model)
1035 fModel(std::
move(model))
1062 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
1065 ") This is currently unsupported.");
1072 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1073 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1082 for (
const auto &
field :
mergeData.fDstDescriptor.GetTopLevelFields()) {
1087#define SKIP_OR_ABORT(errMsg) \
1089 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1090 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
1093 return R__FAIL(errMsg); \
1099 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1113 SKIP_OR_ABORT(std::string(
"Source RNTuple has an incompatible schema with the destination:\n") +
1121 std::string
msg =
"Source RNTuple is missing the following fields:";
1123 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
1129 if (
descCmp.fExtraSrcFields.size()) {
1135 std::string
msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1137 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
1149 Warning(
"RNTuple::Merge",
"Output RNTuple '%s' has no entries.",
fDestination->GetNTupleName().c_str());
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static void ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &srcDescriptor, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
std::optional< TTaskGroup > fTaskGroup
void MergeCommonColumns(RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
void MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Metadata for RNTuple clusters.
Metadata stored for every field of an RNTuple.
ROOT::ENTupleStructure GetStructure() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Collection abstract base class.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Mother of all ROOT objects.
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::ostream & operator<<(std::ostream &os, const std::optional< ROOT::RColumnDescriptor::RValueRange > &x)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ROOT::DescriptorId_t fOutputId
const ROOT::RFieldDescriptor * fParentField
ROOT::DescriptorId_t fInputId
std::optional< std::type_index > fInMemoryType
ENTupleColumnType fColumnType
const ROOT::RNTupleDescriptor * fSrcDescriptor
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
ROOT::NTupleSize_t fNumDstEntries
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
On-disk pages within a page source are identified by the column and page number.
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
The incremental changes to a RNTupleModel
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
bool GetHasChecksum() const
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...