Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>
4/// \date 2020-07-08
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RError.hxx>
17#include <ROOT/RNTuple.hxx>
20#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleUtil.hxx>
23#include <TError.h>
24#include <TFile.h>
25#include <TKey.h>
26
27#include <deque>
28
30{
31 // Check the inputs
32 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo)
33 return -1;
34
35 // Parse the input parameters
36 TIter itr(inputs);
37
38 // First entry is the RNTuple name
39 std::string ntupleName = std::string(itr()->GetName());
40
41 // Second entry is the output file
42 TFile *outFile = dynamic_cast<TFile *>(itr());
43 if (!outFile)
44 return -1;
45
46 // Check if the output file already has a key with that name
47 TKey *outKey = outFile->FindKey(ntupleName.c_str());
48 RNTuple *outNTuple = nullptr;
49 if (outKey) {
50 outNTuple = outKey->ReadObject<RNTuple>();
51 if (!outNTuple) {
52 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
53 return -1;
54 }
55 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
56 // pointer we just got.
57 }
58
59 RNTupleWriteOptions writeOpts;
60 writeOpts.SetUseBufferedWrite(false);
61 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
62
63 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
64 if (outNTuple) {
65 auto source = Internal::RPageSourceFile::CreateFromAnchor(*outNTuple);
66 source->Attach();
67 auto desc = source->GetSharedDescriptorGuard();
68 destination->InitFromDescriptor(desc.GetRef());
69 }
70
71 // The remaining entries are the input files
72 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
73 std::vector<Internal::RPageSource *> sourcePtrs;
74
75 while (const auto &pitr = itr()) {
76 TFile *inFile = dynamic_cast<TFile *>(pitr);
77 RNTuple *anchor = inFile ? inFile->Get<RNTuple>(ntupleName.c_str()) : nullptr;
78 if (!anchor)
79 return -1;
80 sources.push_back(Internal::RPageSourceFile::CreateFromAnchor(*anchor));
81 }
82
83 // Interface conversion
84 for (const auto &s : sources) {
85 sourcePtrs.push_back(s.get());
86 }
87
88 // Now merge
90 merger.Merge(sourcePtrs, *destination);
91
92 // Provide the caller with a merged anchor object (even though we've already
93 // written it).
94 *this = *outFile->Get<RNTuple>(ntupleName.c_str());
95
96 return 0;
97}
98
99////////////////////////////////////////////////////////////////////////////////
101 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
102{
103 for (auto &column : columns) {
104 column.fColumnOutputId = fOutputIdMap.size();
105 fOutputIdMap[column.fColumnName + "." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
106 }
107}
108
109////////////////////////////////////////////////////////////////////////////////
111 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
112{
113 // First ensure that we have the same number of columns
114 if (fOutputIdMap.size() != columns.size()) {
115 throw RException(R__FAIL("Columns between sources do NOT match"));
116 }
117 // Then ensure that we have the same names of columns and assign the ids
118 for (auto &column : columns) {
119 try {
120 column.fColumnOutputId = fOutputIdMap.at(column.fColumnName + "." + column.fColumnTypeAndVersion);
121 } catch (const std::out_of_range &) {
122 throw RException(R__FAIL("Column NOT found in the first source w/ name " + column.fColumnName +
123 " type and version " + column.fColumnTypeAndVersion));
124 }
125 }
126}
127
128////////////////////////////////////////////////////////////////////////////////
129std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo>
131{
132 std::vector<RColumnInfo> columns;
133 // Here we recursively find the columns and fill the RColumnInfo vector
134 AddColumnsFromField(columns, descriptor, descriptor.GetFieldZero());
135 // Then we either build the internal map (first source) or validate the columns against it (remaning sources)
136 // In either case, we also assign the output ids here
137 if (fOutputIdMap.empty()) {
138 BuildColumnIdMap(columns);
139 } else {
140 ValidateColumns(columns);
141 }
142 return columns;
143}
144
145////////////////////////////////////////////////////////////////////////////////
147 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns, const RNTupleDescriptor &desc,
148 const RFieldDescriptor &fieldDesc, const std::string &prefix)
149{
150 for (const auto &field : desc.GetFieldIterable(fieldDesc)) {
151 std::string name = prefix + field.GetFieldName() + ".";
152 const std::string typeAndVersion = field.GetTypeName() + "." + std::to_string(field.GetTypeVersion());
153 for (const auto &column : desc.GetColumnIterable(field)) {
154 columns.emplace_back(name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
156 }
157 AddColumnsFromField(columns, desc, field, name);
158 }
159}
160
161////////////////////////////////////////////////////////////////////////////////
162void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination)
163{
164 if (destination.IsInitialized()) {
165 CollectColumns(destination.GetDescriptor());
166 }
167
168 // Append the sources to the destination one-by-one
169 for (const auto &source : sources) {
170 source->Attach();
171
172 // Make sure the source contains events to be merged
173 if (source->GetNEntries() == 0) {
174 continue;
175 }
176
177 // Get a handle on the descriptor (metadata)
178 auto descriptor = source->GetSharedDescriptorGuard();
179
180 // Collect all the columns
181 // The column name : output column id map is only built once
182 auto columns = CollectColumns(descriptor.GetRef());
183
184 // Create sink from the input model if not initialized
185 if (!destination.IsInitialized()) {
186 auto model = descriptor->CreateModel();
187 destination.Init(*model.get());
188 }
189
190 // Now loop over all clusters in this file
191 // descriptor->GetClusterIterable() doesn't guarantee any specific order...
192 // Find the first cluster id and iterate from there...
193 auto clusterId = descriptor->FindClusterId(0, 0);
194
195 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
196 auto &cluster = descriptor->GetClusterDescriptor(clusterId);
197
198 std::vector<std::unique_ptr<unsigned char[]>> buffers;
199 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are never
200 // invalidated.
201 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
202 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
203
204 for (const auto &column : columns) {
205
206 // See if this cluster contains this column
207 // if not, there is nothing to read/do...
208 auto columnId = column.fColumnInputId;
209 if (!cluster.ContainsColumn(columnId)) {
210 continue;
211 }
212
213 // Now get the pages for this column in this cluster
214 const auto &pages = cluster.GetPageRange(columnId);
215 size_t idx{0};
216
218
219 // Loop over the pages
220 for (const auto &pageInfo : pages.fPageInfos) {
221
222 // Each page contains N elements that we are going to read together
223 // LoadSealedPage reads packed/compressed bytes of a page into
224 // a memory buffer provided by a sealed page
225 RClusterIndex clusterIndex(clusterId, idx);
227 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
228
229 // The way LoadSealedPage works might require a double call
230 // See the implementation. Here we do this in any case...
231 auto buffer = std::make_unique<unsigned char[]>(sealedPage.fSize);
232 sealedPage.fBuffer = buffer.get();
233 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
234
235 buffers.push_back(std::move(buffer));
236 sealedPages.push_back(std::move(sealedPage));
237
238 // Move on to the next index
239 idx += pageInfo.fNElements;
240
241 } // end of loop over pages
242
243 sealedPagesV.push_back(std::move(sealedPages));
244 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
245 sealedPagesV.back().cend());
246
247 } // end of loop over columns
248
249 // Now commit all pages to the output
250 destination.CommitSealedPageV(sealedPageGroups);
251
252 // Commit the clusters
253 destination.CommitCluster(cluster.GetNEntries());
254
255 // Go to the next cluster
256 clusterId = descriptor->FindNextClusterId(clusterId);
257
258 } // end of loop over clusters
259
260 // Commit all clusters for this input
261 destination.CommitClusterGroup();
262
263 } // end of loop over sources
264
265 // Commit the output
266 destination.CommitDataset();
267}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
long long Long64_t
Definition RtypesCore.h:80
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
char name[80]
Definition TGX11.cxx:110
Given a set of RPageSources merge them into an RPageSink.
void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &desc, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
Recursively add columns from a given field.
void Merge(std::span< RPageSource * > sources, RPageSink &destination)
Merge a given set of sources into the destination.
std::vector< RColumnInfo > CollectColumns(const RNTupleDescriptor &descriptor)
Recursively collect all the columns for all the fields rooted at field zero.
void BuildColumnIdMap(std::vector< RColumnInfo > &columns)
Build the internal column id map from the first source This is where we assign the output ids for the...
void ValidateColumns(std::vector< RColumnInfo > &columns)
Validate the columns against the internal map that is built from the first source This is where we as...
Abstract interface to write data into an ntuple.
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
virtual void CommitDataset()=0
Finalize the current cluster and the entrire data set.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
std::deque< RSealedPage > SealedPageSequence_t
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
const RFieldDescriptor & GetFieldZero() const
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:103
constexpr DescriptorId_t kInvalidDescriptorId
A sealed page contains the bytes of a page as written to storage (packed & compressed).