32 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo)
39 std::string ntupleName = std::string(itr()->GetName());
47 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
52 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
61 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
67 auto desc = source->GetSharedDescriptorGuard();
68 destination->InitFromDescriptor(desc.GetRef());
72 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
73 std::vector<Internal::RPageSource *> sourcePtrs;
75 while (
const auto &pitr = itr()) {
84 for (
const auto &s : sources) {
85 sourcePtrs.push_back(s.get());
90 merger.
Merge(sourcePtrs, *destination);
94 *
this = *outFile->
Get<
RNTuple>(ntupleName.c_str());
101 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
103 for (
auto &column : columns) {
104 column.fColumnOutputId = fOutputIdMap.size();
105 fOutputIdMap[column.fColumnName +
"." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
111 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
114 if (fOutputIdMap.size() != columns.size()) {
118 for (
auto &column : columns) {
120 column.fColumnOutputId = fOutputIdMap.at(column.fColumnName +
"." + column.fColumnTypeAndVersion);
121 }
catch (
const std::out_of_range &) {
122 throw RException(
R__FAIL(
"Column NOT found in the first source w/ name " + column.fColumnName +
123 " type and version " + column.fColumnTypeAndVersion));
129std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo>
132 std::vector<RColumnInfo> columns;
134 AddColumnsFromField(columns, descriptor, descriptor.
GetFieldZero());
137 if (fOutputIdMap.empty()) {
138 BuildColumnIdMap(columns);
140 ValidateColumns(columns);
147 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns,
const RNTupleDescriptor &desc,
151 std::string
name = prefix + field.GetFieldName() +
".";
152 const std::string typeAndVersion = field.GetTypeName() +
"." + std::to_string(field.GetTypeVersion());
154 columns.emplace_back(
name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
157 AddColumnsFromField(columns, desc, field,
name);
168 std::unique_ptr<RNTupleModel> model;
171 for (
const auto &source : sources) {
175 auto descriptor = source->GetSharedDescriptorGuard();
179 auto columns = CollectColumns(descriptor.GetRef());
183 model = descriptor->CreateModel();
184 destination.
Init(*model.get());
187 for (
const auto &extraTypeInfoDesc : descriptor->GetExtraTypeInfoIterable()) {
192 if (source->GetNEntries() == 0) {
199 auto clusterId = descriptor->FindClusterId(0, 0);
202 auto &cluster = descriptor->GetClusterDescriptor(clusterId);
204 std::vector<std::unique_ptr<unsigned char[]>> buffers;
207 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
208 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
210 for (
const auto &column : columns) {
214 auto columnId = column.fColumnInputId;
215 if (!cluster.ContainsColumn(columnId)) {
220 const auto &pages = cluster.GetPageRange(columnId);
226 for (
const auto &pageInfo : pages.fPageInfos) {
233 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
237 auto buffer = std::make_unique<unsigned char[]>(sealedPage.
GetSize());
239 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
241 buffers.push_back(std::move(buffer));
242 sealedPages.push_back(std::move(sealedPage));
245 idx += pageInfo.fNElements;
249 sealedPagesV.push_back(std::move(sealedPages));
250 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
251 sealedPagesV.back().cend());
262 clusterId = descriptor->FindNextClusterId(clusterId);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Given a set of RPageSources merge them into an RPageSink.
void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &desc, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
Recursively add columns from a given field.
void Merge(std::span< RPageSource * > sources, RPageSink &destination)
Merge a given set of sources into the destination.
std::vector< RColumnInfo > CollectColumns(const RNTupleDescriptor &descriptor)
Recursively collect all the columns for all the fields rooted at field zero.
void BuildColumnIdMap(std::vector< RColumnInfo > &columns)
Build the internal column id map from the first source This is where we assign the output ids for the...
void ValidateColumns(std::vector< RColumnInfo > &columns)
Validate the columns against the internal map that is built from the first source This is where we as...
Abstract interface to write data into an ntuple.
bool IsInitialized() const
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
std::deque< RSealedPage > SealedPageSequence_t
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
const RFieldDescriptor & GetFieldZero() const
Common user-tunable settings for storing ntuples.
void SetUseBufferedWrite(bool val)
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
constexpr DescriptorId_t kInvalidDescriptorId
A sealed page contains the bytes of a page as written to storage (packed & compressed).
void SetBuffer(const void *buffer)
std::uint32_t GetSize() const