Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleProcessor.cxx
Go to the documentation of this file.
1/// \file RNTupleProcessor.cxx
2/// \ingroup NTuple ROOT7
3/// \author Florine de Geus <florine.de.geus@cern.ch>
4/// \date 2024-03-26
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
17
18#include <ROOT/RFieldBase.hxx>
19
20namespace {
22void EnsureUniqueNTupleNames(const std::vector<RNTupleOpenSpec> &ntuples)
23{
24 std::unordered_set<std::string> uniqueNTupleNames;
25 for (const auto &ntuple : ntuples) {
26 auto res = uniqueNTupleNames.emplace(ntuple.fNTupleName);
27 if (!res.second) {
28 throw ROOT::RException(R__FAIL("horizontal joining of RNTuples with the same name is not allowed"));
29 }
30 }
31}
32} // anonymous namespace
33
34std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
36{
37 auto pageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage);
38 pageSource->Attach();
39 auto model = pageSource->GetSharedDescriptorGuard()->CreateModel();
40 return RNTupleProcessor::Create(ntuple, *model);
41}
42
43std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
45{
46 return std::unique_ptr<RNTupleSingleProcessor>(new RNTupleSingleProcessor(ntuple, model));
47}
48
49std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
51 std::unique_ptr<RNTupleModel> model)
52{
53 return std::unique_ptr<RNTupleChainProcessor>(new RNTupleChainProcessor(ntuples, std::move(model)));
54}
55
56std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
58 const std::vector<std::string> &joinFields,
59 std::vector<std::unique_ptr<RNTupleModel>> models)
60{
61 if (ntuples.size() < 1)
62 throw RException(R__FAIL("at least one RNTuple must be provided"));
63
64 if (models.size() > 0 && models.size() != ntuples.size()) {
65 throw RException(R__FAIL("number of provided models must match number of specified ntuples"));
66 }
67
68 if (joinFields.size() > 4) {
69 throw RException(R__FAIL("a maximum of four join fields is allowed"));
70 }
71
72 if (std::set(joinFields.begin(), joinFields.end()).size() < joinFields.size()) {
73 throw RException(R__FAIL("join fields must be unique"));
74 }
75
76 // TODO(fdegeus) allow for the provision of aliases for ntuples with the same name, removing the constraint of
77 // uniquely-named ntuples.
79
80 std::unique_ptr<RNTupleJoinProcessor> processor;
81 if (models.size() > 0) {
82 processor = std::unique_ptr<RNTupleJoinProcessor>(new RNTupleJoinProcessor(ntuples[0], std::move(models[0])));
83 } else {
84 processor = std::unique_ptr<RNTupleJoinProcessor>(new RNTupleJoinProcessor(ntuples[0]));
85 }
86
87 for (unsigned i = 1; i < ntuples.size(); ++i) {
88 if (models.size() > 0)
89 processor->AddAuxiliary(ntuples[i], joinFields, std::move(models[i]));
90 else
91 processor->AddAuxiliary(ntuples[i], joinFields);
92 }
93
94 processor->SetJoinFieldTokens(joinFields);
95 processor->ConnectFields();
96
97 return processor;
98}
99
101 REntry &entry)
102{
103 auto desc = pageSource.GetSharedDescriptorGuard();
104
105 const auto fieldId = desc->FindFieldId(fieldContext.GetProtoField().GetFieldName());
107 throw RException(
108 R__FAIL("field \"" + fieldContext.GetProtoField().GetFieldName() + "\" not found in current RNTuple"));
109 }
110
111 fieldContext.SetConcreteField();
112 fieldContext.fConcreteField->SetOnDiskId(fieldId);
114
115 auto valuePtr = entry.GetPtr<void>(fieldContext.fToken);
116 auto value = fieldContext.fConcreteField->BindValue(valuePtr);
117 entry.UpdateValue(fieldContext.fToken, value);
118}
119
120//------------------------------------------------------------------------------
121
124{
125 fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage);
126 fPageSource->Attach();
127
128 model.Freeze();
129 fEntry = model.CreateEntry();
130
131 for (const auto &value : *fEntry) {
132 auto &field = value.GetField();
133 auto token = fEntry->GetToken(field.GetFieldName());
134
135 // If the model has a default entry, use the value pointers from the entry in the entry managed by the
136 // processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to
137 // access the corresponding field values.
138 if (!model.IsBare()) {
139 auto valuePtr = model.GetDefaultEntry().GetPtr<void>(token);
140 fEntry->BindValue(token, valuePtr);
141 }
142
143 auto fieldContext = RFieldContext(field.Clone(field.GetFieldName()), token);
144 ConnectField(fieldContext, *fPageSource, *fEntry);
145 fFieldContexts.try_emplace(field.GetFieldName(), std::move(fieldContext));
146 }
147}
148
150{
151 if (fLocalEntryNumber == kInvalidNTupleIndex || fLocalEntryNumber >= fPageSource->GetNEntries()) {
152 return kInvalidNTupleIndex;
153 }
154
155 LoadEntry();
156
157 fNEntriesProcessed++;
158 return fLocalEntryNumber;
159}
160
161//------------------------------------------------------------------------------
162
164 std::unique_ptr<RNTupleModel> model)
166{
167 if (fNTuples.empty())
168 throw RException(R__FAIL("at least one RNTuple must be provided"));
169
170 fPageSource = Internal::RPageSource::Create(fNTuples[0].fNTupleName, fNTuples[0].fStorage);
171 fPageSource->Attach();
172
173 if (fPageSource->GetNEntries() == 0) {
174 throw RException(R__FAIL("first RNTuple does not contain any entries"));
175 }
176
177 if (!model)
178 model = fPageSource->GetSharedDescriptorGuard()->CreateModel();
179
180 model->Freeze();
181 fEntry = model->CreateEntry();
182
183 for (const auto &value : *fEntry) {
184 auto &field = value.GetField();
185 auto token = fEntry->GetToken(field.GetFieldName());
186
187 // If the model has a default entry, use the value pointers from the entry in the entry managed by the
188 // processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to
189 // access the corresponding field values.
190 if (!model->IsBare()) {
191 auto valuePtr = model->GetDefaultEntry().GetPtr<void>(token);
192 fEntry->BindValue(token, valuePtr);
193 }
194
195 const auto &[fieldContext, _] =
196 fFieldContexts.try_emplace(field.GetFieldName(), field.Clone(field.GetFieldName()), token);
198 }
199}
200
202{
203 // Before destroying the current page source and replacing it by the new one, we need to reset the concrete fields
204 // belonging to the current page source. Otherwise, these concrete fields become invalid.
205 for (auto &[_, fieldContext] : fFieldContexts) {
206 fieldContext.ResetConcreteField();
207 }
208 // Replace the current page source with a new one, belonging to the provided ntuple.
209 fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage);
210 fPageSource->Attach();
211
212 // Now that the new page source has been created and attached, we can create and connect the concrete fields again.
213 for (auto &[_, fieldContext] : fFieldContexts) {
214 ConnectField(fieldContext, *fPageSource, *fEntry);
215 }
216
217 return fPageSource->GetNEntries();
218}
219
221{
222 if (fLocalEntryNumber == kInvalidNTupleIndex)
223 return kInvalidNTupleIndex;
224
225 if (fLocalEntryNumber >= fPageSource->GetNEntries()) {
226 do {
227 if (++fCurrentNTupleNumber >= fNTuples.size()) {
228 return kInvalidNTupleIndex;
229 }
230 // Skip over empty ntuples we might encounter.
231 } while (ConnectNTuple(fNTuples.at(fCurrentNTupleNumber)) == 0);
232
233 fLocalEntryNumber = 0;
234 }
235
236 LoadEntry();
237
238 fNEntriesProcessed++;
239 return fLocalEntryNumber;
240}
241
242//------------------------------------------------------------------------------
243
245 std::unique_ptr<RNTupleModel> model)
247{
248 fPageSource = Internal::RPageSource::Create(mainNTuple.fNTupleName, mainNTuple.fStorage);
249 fPageSource->Attach();
250
251 if (fPageSource->GetNEntries() == 0) {
252 throw RException(R__FAIL("provided RNTuple is empty"));
253 }
254
255 if (!model)
256 model = fPageSource->GetSharedDescriptorGuard()->CreateModel();
257
258 fJoinModel = model->Clone();
259 fJoinModel->Freeze();
260 fEntry = fJoinModel->CreateEntry();
261
262 for (const auto &value : *fEntry) {
263 auto &field = value.GetField();
264 const auto &fieldName = field.GetQualifiedFieldName();
265
266 // If the model has a default entry, use the value pointers from the default entry of the model that was passed to
267 // this constructor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop
268 // to access the corresponding field values.
269 if (!fJoinModel->IsBare()) {
270 auto valuePtr = model->GetDefaultEntry().GetPtr<void>(fieldName);
271 fEntry->BindValue(fieldName, valuePtr);
272 }
273
274 const auto &[fieldContext, _] =
275 fFieldContexts.try_emplace(fieldName, field.Clone(fieldName), fEntry->GetToken(fieldName));
276 ConnectField(fieldContext->second, *fPageSource, *fEntry);
277 }
278}
279
281 const std::vector<std::string> &joinFields,
282 std::unique_ptr<RNTupleModel> model)
283{
284 assert(fNEntriesProcessed == 0 && "cannot add auxiliary ntuples after processing has started");
285
286 fNTuples.emplace_back(auxNTuple);
287
288 auto pageSource = Internal::RPageSource::Create(auxNTuple.fNTupleName, auxNTuple.fStorage);
289 pageSource->Attach();
290
291 if (pageSource->GetNEntries() == 0) {
292 throw RException(R__FAIL("provided RNTuple is empty"));
293 }
294
295 if (!model)
296 model = pageSource->GetSharedDescriptorGuard()->CreateModel();
297
298 model->Freeze();
299 auto entry = model->CreateBareEntry();
300
301 // Append the auxiliary fields to the join model
302 fJoinModel->Unfreeze();
303
304 // The fields of the auxiliary ntuple are contained in an anonymous record field and subsequently registered as
305 // subfields to the join model. This way they can be accessed through the processor as `auxNTupleName.fieldName`,
306 // which is necessary in case there are duplicate field names between the main ntuple and (any of the) auxiliary
307 // ntuples.
308 std::vector<std::unique_ptr<RFieldBase>> auxFields;
309 auxFields.reserve(entry->fValues.size());
310 for (const auto &val : *entry) {
311 auto &field = val.GetField();
312
313 auxFields.emplace_back(field.Clone(field.GetQualifiedFieldName()));
314 }
315 std::unique_ptr<RFieldBase> auxParentField =
316 std::make_unique<RRecordField>(auxNTuple.fNTupleName, std::move(auxFields));
317
318 if (!auxParentField) {
319 throw RException(R__FAIL("could not create auxiliary RNTuple parent field"));
320 }
321
322 const auto &subFields = auxParentField->GetSubFields();
323 fJoinModel->AddField(std::move(auxParentField));
324 for (const auto &field : subFields) {
325 fJoinModel->RegisterSubfield(field->GetQualifiedFieldName());
326 }
327
328 fJoinModel->Freeze();
329 // After modifying the join model, we need to create a new entry since the old one is invalidated. However, we do
330 // want to carry over the value pointers, so the pointers returned by `MakeField` during the creation of the original
331 // model by the user can be used in the processor loop.
332 auto newEntry = fJoinModel->CreateEntry();
333
334 for (const auto &value : *newEntry) {
335 const auto &field = value.GetField();
336
337 // Skip if the field is the untyped record that holds the fields of auxiliary ntuples.
338 const auto fnIsNTuple = [&field](RNTupleOpenSpec n) { return n.fNTupleName == field.GetFieldName(); };
339 if (std::find_if(fNTuples.cbegin(), fNTuples.cend(), fnIsNTuple) != fNTuples.end()) {
340 continue;
341 }
342
343 auto fieldContext = fFieldContexts.find(field.GetQualifiedFieldName());
344 // If the field belongs to the auxiliary ntuple currently being added, apart from assigning its entry value the
345 // correct pointer, we also have to create a field context for it.
346 if (fieldContext == fFieldContexts.end()) {
347 // If the model has a default entry, use the value pointers from the entry in the entry managed by the
348 // processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to
349 // access the corresponding field values.
350 if (!model->IsBare()) {
351 auto valuePtr = model->GetDefaultEntry().GetPtr<void>(field.GetFieldName());
352 newEntry->BindValue(field.GetQualifiedFieldName(), valuePtr);
353 }
354
355 auto token = newEntry->GetToken(field.GetQualifiedFieldName());
356 fFieldContexts.try_emplace(field.GetQualifiedFieldName(), field.Clone(field.GetFieldName()), token,
357 fNTuples.size() - 1);
358 } else {
359 auto valuePtr = fEntry->GetPtr<void>(fieldContext->second.fToken);
360 auto newToken = newEntry->GetToken(field.GetQualifiedFieldName());
361 newEntry->BindValue(newToken, valuePtr);
362 fieldContext->second.fToken = std::move(newToken);
363 }
364 }
365
366 fEntry.swap(newEntry);
367
368 // If no join fields have been specified, an aligned join is assumed and an index won't be necessary.
369 if (joinFields.size() > 0)
370 fJoinIndices.emplace_back(Internal::RNTupleIndex::Create(joinFields, *pageSource, true /* deferBuild */));
371
372 fAuxiliaryPageSources.emplace_back(std::move(pageSource));
373}
374
376{
377 for (auto &[_, fieldContext] : fFieldContexts) {
379 fieldContext.IsAuxiliary() ? *fAuxiliaryPageSources.at(fieldContext.fNTupleIdx - 1) : *fPageSource;
380 ConnectField(fieldContext, pageSource, *fEntry);
381 }
382}
383
385{
386 if (fLocalEntryNumber == kInvalidNTupleIndex || fLocalEntryNumber >= fPageSource->GetNEntries()) {
387 return kInvalidNTupleIndex;
388 }
389
390 LoadEntry();
391
392 fNEntriesProcessed++;
393 return fLocalEntryNumber;
394}
395
397{
398 // Read the values of the primary ntuple. If no index is used (i.e., the join is aligned), also read the values of
399 // auxiliary ntuples.
400 for (const auto &[_, fieldContext] : fFieldContexts) {
401 if (!fieldContext.IsAuxiliary() || !IsUsingIndex()) {
402 auto &value = fEntry->GetValue(fieldContext.fToken);
403 value.Read(fLocalEntryNumber);
404 }
405 }
406
407 // If no index is used (i.e., the join is aligned), there's nothing left to do.
408 if (!IsUsingIndex())
409 return;
410
411 // Collect the values of the join fields for this entry.
412 std::vector<void *> valPtrs;
413 valPtrs.reserve(fJoinFieldTokens.size());
414 for (const auto &token : fJoinFieldTokens) {
415 auto ptr = fEntry->GetPtr<void>(token);
416 valPtrs.push_back(ptr.get());
417 }
418
419 // Find the index entry number corresponding to the join field values for each auxiliary ntuple.
420 std::vector<NTupleSize_t> indexEntryNumbers;
421 indexEntryNumbers.reserve(fJoinIndices.size());
422 for (unsigned i = 0; i < fJoinIndices.size(); ++i) {
423 auto &joinIndex = fJoinIndices[i];
424 if (!joinIndex->IsBuilt())
425 joinIndex->Build();
426
427 indexEntryNumbers.push_back(joinIndex->GetFirstEntryNumber(valPtrs));
428 }
429
430 // For each auxiliary field, load its value according to the entry number we just found of the ntuple it belongs to.
431 for (const auto &[_, fieldContext] : fFieldContexts) {
432 if (!fieldContext.IsAuxiliary())
433 continue;
434
435 auto &value = fEntry->GetValue(fieldContext.fToken);
436 if (indexEntryNumbers[fieldContext.fNTupleIdx - 1] == kInvalidNTupleIndex) {
437 // No matching entry exists, so we reset the field's value to a default value.
438 // TODO(fdegeus): further consolidate how non-existing join matches should be handled. N.B.: in case
439 // ConstructValue is not used anymore in the future, remove friend in RFieldBase.
440 fieldContext.fProtoField->ConstructValue(value.GetPtr<void>().get());
441 } else {
442 value.Read(indexEntryNumbers[fieldContext.fNTupleIdx - 1]);
443 }
444 }
445}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
#define _(A, B)
Definition cfortran.h:108
static std::unique_ptr< RNTupleIndex > Create(const std::vector< std::string > &fieldNames, const RPageSource &pageSource, bool deferBuild=false)
Create an RNTupleIndex from an existing RNTuple.
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
The REntry is a collection of values in an ntuple corresponding to a complete row in the data set.
Definition REntry.hxx:51
Processor specializiation for vertically concatenated RNTuples (chains).
NTupleSize_t Advance() final
Advance the processor to the next available entry.
RNTupleChainProcessor(const std::vector< RNTupleOpenSpec > &ntuples, std::unique_ptr< RNTupleModel > model=nullptr)
Constructs a new RNTupleChainProcessor.
NTupleSize_t ConnectNTuple(const RNTupleOpenSpec &ntuple)
Connect an RNTuple for processing.
Processor specializiation for horizontally concatenated RNTuples (joins).
void AddAuxiliary(const RNTupleOpenSpec &auxNTuple, const std::vector< std::string > &joinFields, std::unique_ptr< RNTupleModel > model=nullptr)
Add an auxiliary RNTuple to the processor.
NTupleSize_t Advance() final
Advance the processor to the next available entry.
void LoadEntry() final
Fill the entry with values belonging to the current entry number of the primary RNTuple.
RNTupleJoinProcessor(const RNTupleOpenSpec &mainNTuple, std::unique_ptr< RNTupleModel > model=nullptr)
Constructs a new RNTupleJoinProcessor.
The RNTupleModel encapulates the schema of an ntuple.
Manager for a field as part of the RNTupleProcessor.
Interface for iterating over entries of RNTuples and vertically concatenated RNTuples (chains).
static std::unique_ptr< RNTupleProcessor > CreateChain(const std::vector< RNTupleOpenSpec > &ntuples, std::unique_ptr< RNTupleModel > model=nullptr)
Create a new RNTuple processor chain for vertical concatenation of RNTuples.
std::unordered_map< std::string, RFieldContext > fFieldContexts
static std::unique_ptr< RNTupleProcessor > CreateJoin(const std::vector< RNTupleOpenSpec > &ntuples, const std::vector< std::string > &joinFields, std::vector< std::unique_ptr< RNTupleModel > > models={})
Create a new RNTuple processor for horizontallly concatenated RNTuples.
static std::unique_ptr< RNTupleProcessor > Create(const RNTupleOpenSpec &ntuple)
void ConnectField(RFieldContext &fieldContext, Internal::RPageSource &pageSource, REntry &entry)
Creates and connects a concrete field to the current page source, based on its proto field.
std::unique_ptr< Internal::RPageSource > fPageSource
std::vector< RNTupleOpenSpec > fNTuples
Processor specializiation for processing a single RNTuple.
RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, RNTupleModel &model)
Constructs a new RNTupleProcessor for processing a single RNTuple.
NTupleSize_t Advance() final
Advance the processor to the next available entry.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
const_iterator begin() const
const_iterator end() const
const Int_t n
Definition legend1.C:16
void CallConnectPageSourceOnField(RFieldBase &, RPageSource &)
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
Used to specify the underlying RNTuples in RNTupleProcessor.