Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
5/// \date 2018-10-04
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
18#include <ROOT/RField.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleUtil.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <memory>
31#include <mutex>
32#include <string>
33#include <vector>
34#include <typeinfo>
35#include <utility>
36
37// clang-format off
38/**
39* \class ROOT::Experimental::RNTupleDS
40* \ingroup dataframe
41* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
42*
43* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
44*
45* For each column containing an array or a collection, a corresponding column `#colname` is available to access
46* `colname.size()` without reading and deserializing the collection values.
47*
48**/
49// clang-format on
50
51namespace ROOT {
52namespace Experimental {
53namespace Internal {
54
55/// An artificial field that transforms an RNTuple column that contains the offset of collections into
56/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
57/// `R_rdf_sizeof_jets` for a collection named `jets`.
58///
59/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
60/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
61/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
63protected:
64 std::unique_ptr<ROOT::Experimental::RFieldBase> CloneImpl(std::string_view /* newName */) const final
65 {
66 return std::make_unique<RRDFCardinalityField>();
67 }
68 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
69
70public:
71 static std::string TypeName() { return "std::size_t"; }
73 : ROOT::Experimental::RFieldBase("", TypeName(), ENTupleStructure::kLeaf, false /* isSimple */)
74 {
75 }
79
81 {
82 static RColumnRepresentations representations(
84 {});
85 return representations;
86 }
87 // Field is only used for reading
88 void GenerateColumnsImpl() final { assert(false && "Cardinality fields must only be used for reading"); }
89 void GenerateColumnsImpl(const RNTupleDescriptor &desc) final
90 {
91 auto onDiskTypes = EnsureCompatibleColumnTypes(desc);
92 fColumns.emplace_back(
93 ROOT::Experimental::Internal::RColumn::Create<ClusterSize_t>(RColumnModel(onDiskTypes[0]), 0));
94 }
95
96 size_t GetValueSize() const final { return sizeof(std::size_t); }
97 size_t GetAlignment() const final { return alignof(std::size_t); }
98
99 /// Get the number of elements of the collection identified by globalIndex
100 void ReadGlobalImpl(ROOT::Experimental::NTupleSize_t globalIndex, void *to) final
101 {
102 RClusterIndex collectionStart;
104 fPrincipalColumn->GetCollectionInfo(globalIndex, &collectionStart, &size);
105 *static_cast<std::size_t *>(to) = size;
106 }
107
108 /// Get the number of elements of the collection identified by clusterIndex
109 void ReadInClusterImpl(ROOT::Experimental::RClusterIndex clusterIndex, void *to) final
110 {
111 RClusterIndex collectionStart;
113 fPrincipalColumn->GetCollectionInfo(clusterIndex, &collectionStart, &size);
114 *static_cast<std::size_t *>(to) = size;
115 }
116};
117
118/**
119 * @brief An artificial field that provides the size of a fixed-size array
120 *
121 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
122 * fixed-size arrays on disk.
123 */
125private:
126 std::size_t fArrayLength;
127
128 std::unique_ptr<ROOT::Experimental::RFieldBase> CloneImpl(std::string_view) const final
129 {
130 return std::make_unique<RArraySizeField>(fArrayLength);
131 }
132 void GenerateColumnsImpl() final { assert(false && "RArraySizeField fields must only be used for reading"); }
134 void ReadGlobalImpl(NTupleSize_t /*globalIndex*/, void *to) final { *static_cast<std::size_t *>(to) = fArrayLength; }
135 void ReadInClusterImpl(RClusterIndex /*clusterIndex*/, void *to) final
136 {
137 *static_cast<std::size_t *>(to) = fArrayLength;
138 }
139
140public:
141 RArraySizeField(std::size_t arrayLength)
142 : ROOT::Experimental::RFieldBase("", "std::size_t", ENTupleStructure::kLeaf, false /* isSimple */),
143 fArrayLength(arrayLength)
144 {
145 }
146 RArraySizeField(const RArraySizeField &other) = delete;
150 ~RArraySizeField() final = default;
151
152 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
153 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
154 std::size_t GetAlignment() const final { return alignof(std::size_t); }
155};
156
157/// Every RDF column is represented by exactly one RNTuple field
161
162 RNTupleDS *fDataSource; ///< The data source that owns this column reader
163 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
164 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
165 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
166 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
167 Long64_t fLastEntry = -1; ///< Last entry number that was read
168 /// For chains, the logical entry and the physical entry in any particular file can be different.
169 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the corresponding
170 /// data source was opened.
172
173public:
174 RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField) : fDataSource(ds), fProtoField(protoField) {}
176
177 /// Connect the field and its subfields to the page source
178 void Connect(RPageSource &source, Long64_t entryOffset)
179 {
180 assert(fLastEntry == -1);
181 fEntryOffset = entryOffset;
182
183 // Create a new, real field from the prototype and set its field ID in the context of the given page source
185 {
186 auto descGuard = source.GetSharedDescriptorGuard();
187 // Set the on-disk field IDs for the field and the subfield
188 fField->SetOnDiskId(
189 descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(fProtoField->GetOnDiskId())));
190 auto iProto = fProtoField->cbegin();
191 auto iReal = fField->begin();
192 for (; iReal != fField->end(); ++iProto, ++iReal) {
193 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
194 }
195 }
196
198
199 if (fValuePtr) {
200 // When the reader reconnects to a new file, the fValuePtr is already set
201 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
202 fValuePtr = nullptr;
203 } else {
204 // For the first file, create a new object for this field (reader)
205 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
206 }
207 }
208
209 void Disconnect(bool keepValue)
210 {
211 if (fValue && keepValue) {
212 fValuePtr = fValue->GetPtr<void>();
213 }
214 fValue = nullptr;
215 fField = nullptr;
216 fLastEntry = -1;
217 }
218
219 void *GetImpl(Long64_t entry) final
220 {
221 if (entry != fLastEntry) {
222 fValue->Read(entry - fEntryOffset);
223 fLastEntry = entry;
224 }
225 return fValue->GetPtr<void>().get();
226 }
227};
228
229} // namespace Internal
230
231RNTupleDS::~RNTupleDS() = default;
232
233void RNTupleDS::AddField(const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId,
234 std::vector<RNTupleDS::RFieldInfo> fieldInfos)
235{
236 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
237 // using the following types and with a top-level field named "event" of type Event:
238 //
239 // struct Event {
240 // int id;
241 // std::vector<Track> tracks;
242 // };
243 // struct Track {
244 // std::vector<Hit> hits;
245 // };
246 // struct Hit {
247 // float x;
248 // float y;
249 // };
250 //
251 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
252 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
253 // tree of sub fields and expose the following RDF columns:
254 //
255 // "event" [Event]
256 // "event.id" [int]
257 // "event.tracks" [RVec<Track>]
258 // "R_rdf_sizeof_event.tracks" [unsigned int]
259 // "event.tracks.hits" [RVec<RVec<Hit>>]
260 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
261 // "event.tracks.hits.x" [RVec<RVec<float>>]
262 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
263 // "event.tracks.hits.y" [RVec<RVec<float>>]
264 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
265
266 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
267 const auto &nRepetitions = fieldDesc.GetNRepetitions();
268 if ((fieldDesc.GetStructure() == ENTupleStructure::kCollection) || (nRepetitions > 0)) {
269 // The field is a collection or a fixed-size array.
270 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
271 // fieldInfos would already contain the fieldID of "event.tracks"
272 fieldInfos.emplace_back(fieldId, nRepetitions);
273 }
274
275 if (fieldDesc.GetStructure() == ENTupleStructure::kCollection) {
276 // Inner fields of collections are provided as projected collections of only that inner field,
277 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
278 // above.
279
280 if (fieldDesc.GetTypeName().empty()) {
281 // Anonymous collection with one or several sub fields
282 auto cardinalityField = std::make_unique<ROOT::Experimental::Internal::RRDFCardinalityField>();
283 cardinalityField->SetOnDiskId(fieldId);
284 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
285 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
286 fProtoFields.emplace_back(std::move(cardinalityField));
287
288 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
289 AddField(desc, std::string(colName) + "." + f.GetFieldName(), f.GetId(), fieldInfos);
290 }
291 } else {
292 // ROOT::RVec with exactly one sub field
293 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
294 AddField(desc, colName, f.GetId(), fieldInfos);
295 }
296 // Note that at the end of the recursion, we handled the inner sub collections as well as the
297 // collection as whole, so we are done.
298 return;
299
300 } else if (nRepetitions > 0) {
301 // Fixed-size array, same logic as ROOT::RVec.
302 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
303 AddField(desc, colName, f.GetId(), fieldInfos);
304 return;
305 } else if (fieldDesc.GetStructure() == ENTupleStructure::kRecord) {
306 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
307 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
308 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
309 AddField(desc, innerName, f.GetId(), fieldInfos);
310 }
311 }
312
313 // The fieldID could be the root field or the class of fieldId might not be loaded.
314 // In these cases, only the inner fields are exposed as RDF columns.
315 auto fieldOrException = RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
316 if (!fieldOrException)
317 return;
318 auto valueField = fieldOrException.Unwrap();
319 valueField->SetOnDiskId(fieldId);
320 for (auto &f : *valueField) {
321 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
322 }
323 std::unique_ptr<RFieldBase> cardinalityField;
324 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
325 if (!fieldInfos.empty()) {
326 const auto &info = fieldInfos.back();
327 if (info.fNRepetitions > 0) {
328 cardinalityField = std::make_unique<ROOT::Experimental::Internal::RArraySizeField>(info.fNRepetitions);
329 } else {
330 cardinalityField = std::make_unique<ROOT::Experimental::Internal::RRDFCardinalityField>();
331 }
332 cardinalityField->SetOnDiskId(info.fFieldId);
333 }
334
335 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
336 const auto &fieldInfo = *i;
337
338 if (fieldInfo.fNRepetitions > 0) {
339 // Fixed-size array, read it as ROOT::RVec in memory
340 valueField =
341 std::make_unique<ROOT::Experimental::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
342 } else {
343 // Actual ROOT::RVec
344 valueField = std::make_unique<ROOT::Experimental::RRVecField>("", std::move(valueField));
345 }
346
347 valueField->SetOnDiskId(fieldInfo.fFieldId);
348
349 // Skip the inner-most collection level to construct the cardinality column
350 // It's taken care of by the `if (!fieldInfos.empty())` scope above
351 if (i != fieldInfos.rbegin()) {
352 if (fieldInfo.fNRepetitions > 0) {
353 // This collection level refers to a fixed-size array
354 cardinalityField = std::make_unique<ROOT::Experimental::RArrayAsRVecField>("", std::move(cardinalityField),
355 fieldInfo.fNRepetitions);
356 } else {
357 // This collection level refers to an RVec
358 cardinalityField = std::make_unique<ROOT::Experimental::RRVecField>("", std::move(cardinalityField));
359 }
360
361 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
362 }
363 }
364
365 if (cardinalityField) {
366 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
367 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
368 fProtoFields.emplace_back(std::move(cardinalityField));
369 }
370
371 fieldInfos.emplace_back(fieldId, nRepetitions);
372 fColumnNames.emplace_back(colName);
373 fColumnTypes.emplace_back(valueField->GetTypeName());
374 fProtoFields.emplace_back(std::move(valueField));
375}
376
377RNTupleDS::RNTupleDS(std::unique_ptr<Internal::RPageSource> pageSource) : fPrincipalSource(std::move(pageSource))
378{
379 fPrincipalSource->Attach();
380 fPrincipalDescriptor = fPrincipalSource->GetSharedDescriptorGuard()->Clone();
381
382 AddField(*fPrincipalDescriptor, "", fPrincipalDescriptor->GetFieldZeroId(),
383 std::vector<ROOT::Experimental::RNTupleDS::RFieldInfo>());
384}
385
386namespace {
387
389{
390 // The setting is for now a global one, must be decided before running the
391 // program by setting the appropriate environment variable. Make sure that
392 // option configuration is thread-safe and happens only once.
394 static std::once_flag flag;
395 std::call_once(flag, []() {
396 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
397 std::string envStr{env};
398 auto envNum{std::stoul(envStr)};
399 envNum = envNum == 0 ? 1 : envNum;
400 opts.SetClusterBunchSize(envNum);
401 }
402 });
403 return opts;
404}
405
406std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
407CreatePageSource(std::string_view ntupleName, std::string_view fileName)
408{
409 return ROOT::Experimental::Internal::RPageSource::Create(ntupleName, fileName, GetOpts());
410}
411} // namespace
412
413RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
414 : RNTupleDS(CreatePageSource(ntupleName, fileName))
415{
416}
417
419 : RNTupleDS(ROOT::Experimental::Internal::RPageSourceFile::CreateFromAnchor(*ntuple))
420{
421}
422
423RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
424 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
425{
426 fNTupleName = ntupleName;
427 fFileNames = fileNames;
428}
429
430RDF::RDataSource::Record_t RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
431{
432 // This datasource uses the newer GetColumnReaders() API
433 return {};
434}
435
436std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
437RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info & /*tid*/)
438{
439 // At this point we can assume that `name` will be found in fColumnNames
440 // TODO(jblomer): check incoming type
441 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), name));
442 auto field = fProtoFields[index].get();
443
444 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
445 // other page sources from the chain
446 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor->GetQualifiedFieldName(field->GetOnDiskId());
447 for (const auto &s : *field) {
448 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor->GetQualifiedFieldName(s.GetOnDiskId());
449 }
450
451 auto reader = std::make_unique<Internal::RNTupleColumnReader>(this, field);
452 fActiveColumnReaders[slot].emplace_back(reader.get());
453
454 return reader;
455}
456
458{
459 // Old API, unsused
460 return true;
461}
462
464{
465 assert(fNextRanges.empty());
466 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
467 auto nRemainingFiles = nFiles - fNextFileIndex;
468 if (nRemainingFiles == 0)
469 return;
470
471 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
472 if (nRemainingFiles >= fNSlots) {
473 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
474 REntryRangeDS range;
475
476 if (fPrincipalSource) {
477 // Avoid reopening the first file, which has been opened already to read the schema
478 assert(fNextFileIndex == 0);
479 std::swap(fPrincipalSource, range.fSource);
480 } else {
481 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
482 range.fSource->Attach();
483 }
485
486 auto nEntries = range.fSource->GetNEntries();
487 if (nEntries == 0)
488 continue;
489
490 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
491 fNextRanges.emplace_back(std::move(range));
492 }
493 return;
494 }
495
496 // Work scheduling of the tail: multiple slots work on the same file.
497 // Every slot still has its own page source but these page sources may open the same file.
498 // Again, we need to skip empty files.
499 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
500 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
501 std::unique_ptr<Internal::RPageSource> source;
502 if (fPrincipalSource) {
503 // Avoid reopening the first file, which has been opened already to read the schema
504 assert(fNextFileIndex == 0);
505 std::swap(source, fPrincipalSource);
506 } else {
507 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
508 source->Attach();
509 }
511
512 auto nEntries = source->GetNEntries();
513 if (nEntries == 0)
514 continue;
515
516 // If last file: use all remaining slots
517 if (i == (nRemainingFiles - 1))
518 nSlotsPerFile = fNSlots - fNextRanges.size();
519
520 std::vector<std::pair<ULong64_t, ULong64_t>> rangesByCluster;
521 {
522 auto descriptorGuard = source->GetSharedDescriptorGuard();
523 auto clusterId = descriptorGuard->FindClusterId(0, 0);
524 while (clusterId != kInvalidDescriptorId) {
525 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
526 rangesByCluster.emplace_back(std::make_pair<ULong64_t, ULong64_t>(
527 clusterDesc.GetFirstEntryIndex(), clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()));
528 clusterId = descriptorGuard->FindNextClusterId(clusterId);
529 }
530 }
531 const unsigned int nRangesByCluster = rangesByCluster.size();
532
533 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
534 const auto nClustersPerSlot = nRangesByCluster / nSlotsPerFile;
535 const auto remainder = nRangesByCluster % nSlotsPerFile;
536 std::size_t iRange = 0;
537 unsigned int iSlot = 0;
538 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
539 for (; iSlot < N; ++iSlot) {
540 auto start = rangesByCluster[iRange].first;
541 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
542 assert(iRange > 0);
543 auto end = rangesByCluster[iRange - 1].second;
544
545 REntryRangeDS range;
546 // The last range for this file just takes the already opened page source. All previous ranges clone.
547 if (iSlot == N - 1) {
548 range.fSource = std::move(source);
549 } else {
550 range.fSource = source->Clone();
551 range.fSource->Attach();
552 }
553 range.fSource->SetEntryRange({start, end - start});
554 range.fFirstEntry = start;
555 range.fLastEntry = end;
556 fNextRanges.emplace_back(std::move(range));
557 }
558 } // loop over tail of remaining files
559}
560
561std::vector<std::pair<ULong64_t, ULong64_t>> RNTupleDS::GetEntryRanges()
562{
563 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
564 if (fNextRanges.empty())
565 return ranges;
566 assert(fNextRanges.size() <= fNSlots);
567
568 // We need to distinguish between single threaded and multi-threaded runs.
569 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
570 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
571 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
572 // InitSlot and FinalizeSlot.
573
574 if (fNSlots == 1) {
575 for (auto r : fActiveColumnReaders[0]) {
576 r->Disconnect(true /* keepValue */);
577 }
578 }
579
580 fCurrentRanges.clear();
581 std::swap(fCurrentRanges, fNextRanges);
583
584 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
585 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
586 // entry ranges, given the current state of the entry cursor.
587 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
588 // so that we can properly rewire the column reader in InitSlot
589 fFirstEntry2RangeIdx.clear();
590 ULong64_t nEntriesPerSource = 0;
591 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
592 // Several consecutive ranges may operate on the same file (each with their own page source clone).
593 // We can detect a change of file when the first entry number jumps back to 0.
594 if (fCurrentRanges[i].fFirstEntry == 0) {
595 // New source
596 fSeenEntries += nEntriesPerSource;
597 nEntriesPerSource = 0;
598 }
599 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
600 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
601 nEntriesPerSource += end - start;
602
603 fFirstEntry2RangeIdx[start] = i;
604 ranges.emplace_back(start, end);
605 }
606 fSeenEntries += nEntriesPerSource;
607
608 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
609 for (auto r : fActiveColumnReaders[0]) {
610 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
611 }
612 }
613
614 return ranges;
615}
616
617void RNTupleDS::InitSlot(unsigned int slot, ULong64_t firstEntry)
618{
619 if (fNSlots == 1)
620 return;
621
622 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
623 for (auto r : fActiveColumnReaders[slot]) {
624 r->Connect(*fCurrentRanges[idxRange].fSource, firstEntry - fCurrentRanges[idxRange].fFirstEntry);
625 }
626}
627
628void RNTupleDS::FinalizeSlot(unsigned int slot)
629{
630 if (fNSlots == 1)
631 return;
632
633 for (auto r : fActiveColumnReaders[slot]) {
634 r->Disconnect(true /* keepValue */);
635 }
636}
637
638std::string RNTupleDS::GetTypeName(std::string_view colName) const
639{
640 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), colName));
641 return fColumnTypes[index];
642}
643
644bool RNTupleDS::HasColumn(std::string_view colName) const
645{
646 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
647}
648
650{
651 fSeenEntries = 0;
652 fNextFileIndex = 0;
653 if (!fCurrentRanges.empty() && (fFileNames.size() <= fNSlots)) {
654 assert(fNextRanges.empty());
655 std::swap(fCurrentRanges, fNextRanges);
656 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
657 } else {
659 }
660}
661
663{
664 for (unsigned int i = 0; i < fNSlots; ++i) {
665 for (auto r : fActiveColumnReaders[i]) {
666 r->Disconnect(false /* keepValue */);
667 }
668 }
669 // If we have a chain with more files than the number of slots, the files opened at the end of the
670 // event loop won't be reused when the event loop restarts, so we can close them.
671 if (fFileNames.size() > fNSlots) {
672 fCurrentRanges.clear();
673 fNextRanges.clear();
674 }
675}
676
677void RNTupleDS::SetNSlots(unsigned int nSlots)
678{
679 assert(fNSlots == 0);
680 assert(nSlots > 0);
681 fNSlots = nSlots;
683}
684} // namespace Experimental
685} // namespace ROOT
686
687ROOT::RDataFrame ROOT::RDF::Experimental::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
688{
689 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntupleName, fileName));
690}
691
693ROOT::RDF::Experimental::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
694{
695 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntupleName, fileNames));
696}
697
699{
700 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntuple));
701}
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:555
An artificial field that provides the size of a fixed-size array.
RArraySizeField & operator=(RArraySizeField &&other)=default
std::unique_ptr< ROOT::Experimental::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(NTupleSize_t, void *to) final
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField(RArraySizeField &&other)=default
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(const RArraySizeField &other)=delete
void ReadInClusterImpl(RClusterIndex, void *to) final
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumnsImpl(const ROOT::Experimental::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
RArraySizeField & operator=(const RArraySizeField &other)=delete
void GenerateColumnsImpl() final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
void GetCollectionInfo(const NTupleSize_t globalIndex, RClusterIndex *collectionStart, ClusterSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:293
Every RDF column is represented by exactly one RNTuple field.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
Long64_t fLastEntry
Last entry number that was read.
RNTupleDS * fDataSource
The data source that owns this column reader.
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:62
std::unique_ptr< ROOT::Experimental::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:64
void ReadInClusterImpl(ROOT::Experimental::RClusterIndex clusterIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
RRDFCardinalityField(RRDFCardinalityField &&other)=default
void GenerateColumnsImpl() final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
Definition RNTupleDS.cxx:88
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:80
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:96
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:97
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:68
void GenerateColumnsImpl(const RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
Definition RNTupleDS.cxx:89
void ReadGlobalImpl(ROOT::Experimental::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Holds the static meta-data of an RNTuple column.
Some fields have multiple possible column representations, e.g.
Definition RField.hxx:171
A field translates read and write calls from/to underlying columns to/from tree values.
Definition RField.hxx:99
std::string GetFieldName() const
Definition RField.hxx:677
const ColumnRepresentation_t & EnsureCompatibleColumnTypes(const RNTupleDescriptor &desc) const
Returns the on-disk column types found in the provided descriptor for fOnDiskId.
Definition RField.cxx:1010
std::vector< std::unique_ptr< Internal::RColumn > > fColumns
The columns are connected either to a sink or to a source (not to both); they are owned by the field.
Definition RField.hxx:404
RConstSchemaIterator cbegin() const
Definition RField.hxx:718
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its sub fields using a possibly new name and a new, unconnected set of columns.
Definition RField.cxx:863
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &canonicalType, const std::string &typeAlias, bool fContinueOnError=false)
Factory method to resurrect a field from the stored on-disk type information.
Definition RField.cxx:626
Internal::RColumn * fPrincipalColumn
Points into fColumns.
Definition RField.hxx:402
DescriptorId_t GetOnDiskId() const
Definition RField.hxx:694
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:43
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
bool SetEntry(unsigned int slot, ULong64_t entry) final
Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void Initialize() final
Convenience method called before starting an event-loop.
void AddField(const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos)
Provides the RDF column "colName" given the field identified by fieldID.
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
Definition RNTupleDS.hxx:66
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:77
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
Definition RNTupleDS.hxx:90
void Finalize() final
Convenience method called after concluding an event-loop.
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
std::unordered_map< ROOT::Experimental::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:76
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
std::vector< std::vector< Internal::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
Definition RNTupleDS.hxx:81
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::unique_ptr< Internal::RPageSource > fPrincipalSource
The first source is used to extract the schema and build the prototype fields.
Definition RNTupleDS.hxx:59
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
Definition RNTupleDS.hxx:85
std::unique_ptr< RNTupleDescriptor > fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:61
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
Definition RNTupleDS.hxx:86
std::vector< std::unique_ptr< ROOT::Experimental::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:72
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:65
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:64
std::vector< std::string > fColumnTypes
Definition RNTupleDS.hxx:78
RNTupleDS(std::unique_ptr< ROOT::Experimental::Internal::RPageSource > pageSource)
ULong64_t fSeenEntries
The number of entries so far returned by GetEntryRanges()
Definition RNTupleDS.hxx:84
The on-storage meta-data of an ntuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
Common user-tunable settings for reading ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
std::vector< void * > Record_t
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1665
void CallConnectPageSourceOnField(RFieldBase &, RPageSource &)
Definition RField.cxx:424
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
ENTupleStructure
The fields in the ntuple model tree can carry different structural information about the type system.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Wrap the integer in a struct in order to avoid template specialization clash with std::uint64_t.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:49
std::unique_ptr< ROOT::Experimental::Internal::RPageSource > fSource
Definition RNTupleDS.hxx:50
ULong64_t fLastEntry
End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry.
Definition RNTupleDS.hxx:53
ULong64_t fFirstEntry
First entry index in fSource.
Definition RNTupleDS.hxx:51