Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>
4/// \date 2020-07-08
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RError.hxx>
17#include <ROOT/RNTuple.hxx>
20#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleUtil.hxx>
23#include <TError.h>
24#include <TFile.h>
25#include <TKey.h>
26
27#include <deque>
28
30{
31 // Check the inputs
32 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo)
33 return -1;
34
35 // Parse the input parameters
36 TIter itr(inputs);
37
38 // First entry is the RNTuple name
39 std::string ntupleName = std::string(itr()->GetName());
40
41 // Second entry is the output file
42 TFile *outFile = dynamic_cast<TFile *>(itr());
43 if (!outFile)
44 return -1;
45
46 // Check if the output file already has a key with that name
47 TKey *outKey = outFile->FindKey(ntupleName.c_str());
48 RNTuple *outNTuple = nullptr;
49 if (outKey) {
50 outNTuple = outKey->ReadObject<RNTuple>();
51 if (!outNTuple) {
52 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
53 return -1;
54 }
55 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
56 // pointer we just got.
57 }
58
59 RNTupleWriteOptions writeOpts;
60 writeOpts.SetUseBufferedWrite(false);
61 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
62
63 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
64 if (outNTuple) {
65 auto source = Internal::RPageSourceFile::CreateFromAnchor(*outNTuple);
66 source->Attach();
67 auto desc = source->GetSharedDescriptorGuard();
68 destination->InitFromDescriptor(desc.GetRef());
69 }
70
71 // The remaining entries are the input files
72 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
73 std::vector<Internal::RPageSource *> sourcePtrs;
74
75 while (const auto &pitr = itr()) {
76 TFile *inFile = dynamic_cast<TFile *>(pitr);
77 RNTuple *anchor = inFile ? inFile->Get<RNTuple>(ntupleName.c_str()) : nullptr;
78 if (!anchor)
79 return -1;
80 sources.push_back(Internal::RPageSourceFile::CreateFromAnchor(*anchor));
81 }
82
83 // Interface conversion
84 for (const auto &s : sources) {
85 sourcePtrs.push_back(s.get());
86 }
87
88 // Now merge
90 merger.Merge(sourcePtrs, *destination);
91
92 // Provide the caller with a merged anchor object (even though we've already
93 // written it).
94 *this = *outFile->Get<RNTuple>(ntupleName.c_str());
95
96 return 0;
97}
98
99////////////////////////////////////////////////////////////////////////////////
101 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
102{
103 for (auto &column : columns) {
104 column.fColumnOutputId = fOutputIdMap.size();
105 fOutputIdMap[column.fColumnName + "." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
106 }
107}
108
109////////////////////////////////////////////////////////////////////////////////
111 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
112{
113 // First ensure that we have the same number of columns
114 if (fOutputIdMap.size() != columns.size()) {
115 throw RException(R__FAIL("Columns between sources do NOT match"));
116 }
117 // Then ensure that we have the same names of columns and assign the ids
118 for (auto &column : columns) {
119 try {
120 column.fColumnOutputId = fOutputIdMap.at(column.fColumnName + "." + column.fColumnTypeAndVersion);
121 } catch (const std::out_of_range &) {
122 throw RException(R__FAIL("Column NOT found in the first source w/ name " + column.fColumnName +
123 " type and version " + column.fColumnTypeAndVersion));
124 }
125 }
126}
127
128////////////////////////////////////////////////////////////////////////////////
129std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo>
131{
132 std::vector<RColumnInfo> columns;
133 // Here we recursively find the columns and fill the RColumnInfo vector
134 AddColumnsFromField(columns, descriptor, descriptor.GetFieldZero());
135 // Then we either build the internal map (first source) or validate the columns against it (remaning sources)
136 // In either case, we also assign the output ids here
137 if (fOutputIdMap.empty()) {
138 BuildColumnIdMap(columns);
139 } else {
140 ValidateColumns(columns);
141 }
142 return columns;
143}
144
145////////////////////////////////////////////////////////////////////////////////
147 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns, const RNTupleDescriptor &desc,
148 const RFieldDescriptor &fieldDesc, const std::string &prefix)
149{
150 for (const auto &field : desc.GetFieldIterable(fieldDesc)) {
151 std::string name = prefix + field.GetFieldName() + ".";
152 const std::string typeAndVersion = field.GetTypeName() + "." + std::to_string(field.GetTypeVersion());
153 for (const auto &column : desc.GetColumnIterable(field)) {
154 columns.emplace_back(name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
156 }
157 AddColumnsFromField(columns, desc, field, name);
158 }
159}
160
161////////////////////////////////////////////////////////////////////////////////
162void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination)
163{
164 if (destination.IsInitialized()) {
165 CollectColumns(destination.GetDescriptor());
166 }
167
168 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
169
170 // Append the sources to the destination one-by-one
171 for (const auto &source : sources) {
172 source->Attach();
173
174 // Get a handle on the descriptor (metadata)
175 auto descriptor = source->GetSharedDescriptorGuard();
176
177 // Collect all the columns
178 // The column name : output column id map is only built once
179 auto columns = CollectColumns(descriptor.GetRef());
180
181 // Create sink from the input model if not initialized
182 if (!destination.IsInitialized()) {
183 model = descriptor->CreateModel();
184 destination.Init(*model.get());
185 }
186
187 for (const auto &extraTypeInfoDesc : descriptor->GetExtraTypeInfoIterable()) {
188 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
189 }
190
191 // Make sure the source contains events to be merged
192 if (source->GetNEntries() == 0) {
193 continue;
194 }
195
196 // Now loop over all clusters in this file
197 // descriptor->GetClusterIterable() doesn't guarantee any specific order...
198 // Find the first cluster id and iterate from there...
199 auto clusterId = descriptor->FindClusterId(0, 0);
200
201 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
202 auto &cluster = descriptor->GetClusterDescriptor(clusterId);
203
204 std::vector<std::unique_ptr<unsigned char[]>> buffers;
205 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are never
206 // invalidated.
207 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
208 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
209
210 for (const auto &column : columns) {
211
212 // See if this cluster contains this column
213 // if not, there is nothing to read/do...
214 auto columnId = column.fColumnInputId;
215 if (!cluster.ContainsColumn(columnId)) {
216 continue;
217 }
218
219 // Now get the pages for this column in this cluster
220 const auto &pages = cluster.GetPageRange(columnId);
221 size_t idx{0};
222
224
225 // Loop over the pages
226 for (const auto &pageInfo : pages.fPageInfos) {
227
228 // Each page contains N elements that we are going to read together
229 // LoadSealedPage reads packed/compressed bytes of a page into
230 // a memory buffer provided by a sealed page
231 RClusterIndex clusterIndex(clusterId, idx);
233 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
234
235 // The way LoadSealedPage works might require a double call
236 // See the implementation. Here we do this in any case...
237 auto buffer = std::make_unique<unsigned char[]>(sealedPage.GetSize());
238 sealedPage.SetBuffer(buffer.get());
239 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
240
241 buffers.push_back(std::move(buffer));
242 sealedPages.push_back(std::move(sealedPage));
243
244 // Move on to the next index
245 idx += pageInfo.fNElements;
246
247 } // end of loop over pages
248
249 sealedPagesV.push_back(std::move(sealedPages));
250 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
251 sealedPagesV.back().cend());
252
253 } // end of loop over columns
254
255 // Now commit all pages to the output
256 destination.CommitSealedPageV(sealedPageGroups);
257
258 // Commit the clusters
259 destination.CommitCluster(cluster.GetNEntries());
260
261 // Go to the next cluster
262 clusterId = descriptor->FindNextClusterId(clusterId);
263
264 } // end of loop over clusters
265
266 // Commit all clusters for this input
267 destination.CommitClusterGroup();
268
269 } // end of loop over sources
270
271 // Commit the output
272 destination.CommitDataset();
273}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
long long Long64_t
Definition RtypesCore.h:80
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
char name[80]
Definition TGX11.cxx:110
Given a set of RPageSources merge them into an RPageSink.
void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &desc, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
Recursively add columns from a given field.
void Merge(std::span< RPageSource * > sources, RPageSink &destination)
Merge a given set of sources into the destination.
std::vector< RColumnInfo > CollectColumns(const RNTupleDescriptor &descriptor)
Recursively collect all the columns for all the fields rooted at field zero.
void BuildColumnIdMap(std::vector< RColumnInfo > &columns)
Build the internal column id map from the first source This is where we assign the output ids for the...
void ValidateColumns(std::vector< RColumnInfo > &columns)
Validate the columns against the internal map that is built from the first source This is where we as...
Abstract interface to write data into an ntuple.
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
std::deque< RSealedPage > SealedPageSequence_t
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
const RFieldDescriptor & GetFieldZero() const
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:103
constexpr DescriptorId_t kInvalidDescriptorId
A sealed page contains the bytes of a page as written to storage (packed & compressed).