32 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo)
39 std::string ntupleName = std::string(itr()->GetName());
47 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
52 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
61 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
67 auto desc = source->GetSharedDescriptorGuard();
68 destination->InitFromDescriptor(desc.GetRef());
72 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
73 std::vector<Internal::RPageSource *> sourcePtrs;
75 while (
const auto &pitr = itr()) {
84 for (
const auto &s : sources) {
85 sourcePtrs.push_back(s.get());
90 merger.
Merge(sourcePtrs, *destination);
94 *
this = *outFile->
Get<
RNTuple>(ntupleName.c_str());
101 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
103 for (
auto &column : columns) {
105 fOutputIdMap[column.fColumnName +
"." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
111 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
118 for (
auto &column : columns) {
120 column.fColumnOutputId =
fOutputIdMap.at(column.fColumnName +
"." + column.fColumnTypeAndVersion);
121 }
catch (
const std::out_of_range &) {
122 throw RException(
R__FAIL(
"Column NOT found in the first source w/ name " + column.fColumnName +
123 " type and version " + column.fColumnTypeAndVersion));
129std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo>
132 std::vector<RColumnInfo> columns;
147 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns,
const RNTupleDescriptor &desc,
151 std::string
name = prefix + field.GetFieldName() +
".";
152 const std::string typeAndVersion = field.GetTypeName() +
"." + std::to_string(field.GetTypeVersion());
154 columns.emplace_back(
name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
169 for (
const auto &source : sources) {
173 if (source->GetNEntries() == 0) {
178 auto descriptor = source->GetSharedDescriptorGuard();
186 auto model = descriptor->CreateModel();
187 destination.
Init(*model.get());
193 auto clusterId = descriptor->FindClusterId(0, 0);
196 auto &cluster = descriptor->GetClusterDescriptor(clusterId);
198 std::vector<std::unique_ptr<unsigned char[]>> buffers;
201 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
202 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
204 for (
const auto &column : columns) {
208 auto columnId = column.fColumnInputId;
209 if (!cluster.ContainsColumn(columnId)) {
214 const auto &pages = cluster.GetPageRange(columnId);
220 for (
const auto &pageInfo : pages.fPageInfos) {
227 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
231 auto buffer = std::make_unique<unsigned char[]>(sealedPage.
fSize);
232 sealedPage.
fBuffer = buffer.get();
233 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
235 buffers.push_back(std::move(buffer));
236 sealedPages.push_back(std::move(sealedPage));
239 idx += pageInfo.fNElements;
243 sealedPagesV.push_back(std::move(sealedPages));
244 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
245 sealedPagesV.back().cend());
256 clusterId = descriptor->FindNextClusterId(clusterId);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Given a set of RPageSources merge them into an RPageSink.
std::unordered_map< std::string, DescriptorId_t > fOutputIdMap
void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &desc, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
Recursively add columns from a given field.
void Merge(std::span< RPageSource * > sources, RPageSink &destination)
Merge a given set of sources into the destination.
std::vector< RColumnInfo > CollectColumns(const RNTupleDescriptor &descriptor)
Recursively collect all the columns for all the fields rooted at field zero.
void BuildColumnIdMap(std::vector< RColumnInfo > &columns)
Build the internal column id map from the first source This is where we assign the output ids for the...
void ValidateColumns(std::vector< RColumnInfo > &columns)
Validate the columns against the internal map that is built from the first source This is where we as...
Abstract interface to write data into an ntuple.
bool IsInitialized() const
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
virtual void CommitDataset()=0
Finalize the current cluster and the entrire data set.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
std::deque< RSealedPage > SealedPageSequence_t
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
const RFieldDescriptor & GetFieldZero() const
Common user-tunable settings for storing ntuples.
void SetUseBufferedWrite(bool val)
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
constexpr DescriptorId_t kInvalidDescriptorId
A sealed page contains the bytes of a page as written to storage (packed & compressed).