Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RField.hxx>
17#include <ROOT/RFieldUtils.hxx>
20#include <ROOT/RNTupleDS.hxx>
21#include <ROOT/RNTupleUtil.hxx>
22#include <ROOT/RPageStorage.hxx>
23#include <string_view>
24
25#include <TError.h>
26#include <TSystem.h>
27
28#include <cassert>
29#include <memory>
30#include <mutex>
31#include <string>
32#include <vector>
33#include <typeinfo>
34#include <utility>
35
36// clang-format off
37/**
38* \class ROOT::RDF::RNTupleDS
39* \ingroup dataframe
40* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
41*
42* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
43*
44* For each column containing an array or a collection, a corresponding column `#colname` is available to access
45* `colname.size()` without reading and deserializing the collection values.
46*
47**/
48// clang-format on
49
50namespace ROOT::Internal::RDF {
51/// An artificial field that transforms an RNTuple column that contains the offset of collections into
52/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
53/// `R_rdf_sizeof_jets` for a collection named `jets`.
54///
55/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
56/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
57/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
59protected:
60 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
61 {
62 return std::make_unique<RRDFCardinalityField>();
63 }
64 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
65
66public:
67 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */) {}
70 ~RRDFCardinalityField() override = default;
71
81 // Field is only used for reading
82 void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); }
87
88 size_t GetValueSize() const final { return sizeof(std::size_t); }
89 size_t GetAlignment() const final { return alignof(std::size_t); }
90
91 /// Get the number of elements of the collection identified by globalIndex
99
100 /// Get the number of elements of the collection identified by clusterIndex
108};
109
110/**
111 * @brief An artificial field that provides the size of a fixed-size array
112 *
113 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
114 * fixed-size arrays on disk.
115 */
117private:
118 std::size_t fArrayLength;
119
120 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
121 {
122 return std::make_unique<RArraySizeField>(fArrayLength);
123 }
124 void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); }
126 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
127 {
128 *static_cast<std::size_t *>(to) = fArrayLength;
129 }
130 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
131 {
132 *static_cast<std::size_t *>(to) = fArrayLength;
133 }
134
135public:
137 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */),
139 {
140 }
146
147 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
148 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
149 std::size_t GetAlignment() const final { return alignof(std::size_t); }
150};
151
152/// Every RDF column is represented by exactly one RNTuple field
156
157 RNTupleDS *fDataSource; ///< The data source that owns this column reader
158 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
159 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
160 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
161 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
162 Long64_t fLastEntry = -1; ///< Last entry number that was read
163 /// For chains, the logical entry and the physical entry in any particular file can be different.
164 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the corresponding
165 /// data source was opened.
167
168public:
170 ~RNTupleColumnReader() override = default;
171
172 /// Connect the field and its subfields to the page source
174 {
175 assert(fLastEntry == -1);
177
178 // Create a new, real field from the prototype and set its field ID in the context of the given page source
180 {
181 auto descGuard = source.GetSharedDescriptorGuard();
182 // Set the on-disk field IDs for the field and the subfield
183 fField->SetOnDiskId(
185 auto iProto = fProtoField->cbegin();
186 auto iReal = fField->begin();
187 for (; iReal != fField->end(); ++iProto, ++iReal) {
188 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
189 }
190 }
191
192 try {
194 } catch (const ROOT::RException &err) {
195 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
196 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
197 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
198 onDiskType + "\"";
199 throw std::runtime_error(msg);
200 }
201
202 if (fValuePtr) {
203 // When the reader reconnects to a new file, the fValuePtr is already set
204 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
205 fValuePtr = nullptr;
206 } else {
207 // For the first file, create a new object for this field (reader)
208 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
209 }
210 }
211
213 {
214 if (fValue && keepValue) {
215 fValuePtr = fValue->GetPtr<void>();
216 }
217 fValue = nullptr;
218 fField = nullptr;
219 fLastEntry = -1;
220 }
221
222 void *GetImpl(Long64_t entry) final
223 {
224 if (entry != fLastEntry) {
225 fValue->Read(entry - fEntryOffset);
227 }
228 return fValue->GetPtr<void>().get();
229 }
230};
231} // namespace ROOT::Internal::RDF
232
234
236 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
237 bool convertToRVec)
238{
239 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
240 // using the following types and with a top-level field named "event" of type Event:
241 //
242 // struct Event {
243 // int id;
244 // std::vector<Track> tracks;
245 // };
246 // struct Track {
247 // std::vector<Hit> hits;
248 // };
249 // struct Hit {
250 // float x;
251 // float y;
252 // };
253 //
254 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
255 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
256 // tree of sub fields and expose the following RDF columns:
257 //
258 // "event" [Event]
259 // "event.id" [int]
260 // "event.tracks" [RVec<Track>]
261 // "R_rdf_sizeof_event.tracks" [unsigned int]
262 // "event.tracks.hits" [RVec<RVec<Hit>>]
263 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
264 // "event.tracks.hits.x" [RVec<RVec<float>>]
265 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
266 // "event.tracks.hits.y" [RVec<RVec<float>>]
267 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
268
269 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
270 const auto &nRepetitions = fieldDesc.GetNRepetitions();
271 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
272 // The field is a collection or a fixed-size array.
273 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
274 // fieldInfos would already contain the fieldID of "event.tracks"
275 fieldInfos.emplace_back(fieldId, nRepetitions);
276 }
277
278 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
279 // Inner fields of collections are provided as projected collections of only that inner field,
280 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
281 // above.
283 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
284 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
285 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
287
288 // Note that at the end of the recursion, we handled the inner sub collections as well as the
289 // collection as whole, so we are done.
290 return;
291
292 } else if (nRepetitions > 0) {
293 // Fixed-size array, same logic as ROOT::RVec.
294 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
295 AddField(desc, colName, f.GetId(), fieldInfos);
296 return;
297 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
298 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
299 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
300 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
301 // Inner fields of collections of records are always exposed as ROOT::RVec
302 AddField(desc, innerName, f.GetId(), fieldInfos);
303 }
304 }
305
306 // The fieldID could be the root field or the class of fieldId might not be loaded.
307 // In these cases, only the inner fields are exposed as RDF columns.
308 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
309 if (!fieldOrException)
310 return;
311 auto valueField = fieldOrException.Unwrap();
312 valueField->SetOnDiskId(fieldId);
313 for (auto &f : *valueField) {
314 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
315 }
316 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
317 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
318 if (!fieldInfos.empty()) {
319 const auto &info = fieldInfos.back();
320 if (info.fNRepetitions > 0) {
321 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(info.fNRepetitions);
322 } else {
323 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
324 }
325 cardinalityField->SetOnDiskId(info.fFieldId);
326 }
327
328 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
329 const auto &fieldInfo = *i;
330
331 if (fieldInfo.fNRepetitions > 0) {
332 // Fixed-size array, read it as ROOT::RVec in memory
333 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
334 } else {
335 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
336 // their original type.
337 if (convertToRVec) {
338 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
339 } else {
340 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
342 }
343 }
344
345 valueField->SetOnDiskId(fieldInfo.fFieldId);
346
347 // Skip the inner-most collection level to construct the cardinality column
348 // It's taken care of by the `if (!fieldInfos.empty())` scope above
349 if (i != fieldInfos.rbegin()) {
350 if (fieldInfo.fNRepetitions > 0) {
351 // This collection level refers to a fixed-size array
353 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
354 } else {
355 // This collection level refers to an RVec
356 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
357 }
358
359 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
360 }
361 }
362
363 if (cardinalityField) {
364 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
365 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
366 fProtoFields.emplace_back(std::move(cardinalityField));
367 }
368
369 fieldInfos.emplace_back(fieldId, nRepetitions);
370 fColumnNames.emplace_back(colName);
371 fColumnTypes.emplace_back(valueField->GetTypeName());
372 fProtoFields.emplace_back(std::move(valueField));
373}
374
375ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
376{
377 pageSource->Attach();
378 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
379 fStagingArea.emplace_back(std::move(pageSource));
380
381 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
382 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
383}
384
385namespace {
386
388{
389 // The setting is for now a global one, must be decided before running the
390 // program by setting the appropriate environment variable. Make sure that
391 // option configuration is thread-safe and happens only once.
393 static std::once_flag flag;
394 std::call_once(flag, []() {
395 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
396 std::string envStr{env};
397 auto envNum{std::stoul(envStr)};
398 envNum = envNum == 0 ? 1 : envNum;
400 }
401 });
402 return opts;
403}
404
405std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
406{
408}
409} // namespace
410
411ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
412 : RNTupleDS(CreatePageSource(ntupleName, fileName))
413{
414}
415
419
420ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
421 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
422{
425 fStagingArea.resize(fFileNames.size());
426}
427
429ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
430{
431 // This datasource uses the newer GetColumnReaders() API
432 return {};
433}
434
435std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
436ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
437{
438 // At this point we can assume that `name` will be found in fColumnNames
439 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), name));
441
443 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
444 // from the proto field that was created when the data source was constructed, we first have to create an
445 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
446 if (name.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
447 auto &altProtoFields = fAlternativeProtoFields[index];
448 auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
449 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
450 return fld->GetTypeName() == requestedType;
451 });
453 field = altProtoField->get();
454 } else {
457 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
458 "\" for column \"" + std::string(name));
459 }
461 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
462 field = newAltProtoField.get();
463 altProtoFields.emplace_back(std::move(newAltProtoField));
464 }
465 } else {
466 field = fProtoFields[index].get();
467 }
468
469 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
470 // other page sources from the chain
471 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
472 for (const auto &s : *field) {
473 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
474 }
475
476 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
477 fActiveColumnReaders[slot].emplace_back(reader.get());
478
479 return reader;
480}
481
483{
484 while (true) {
485 std::unique_lock lock(fMutexStaging);
486 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
487 if (fStagingThreadShouldTerminate)
488 return;
489
490 assert(!fHasNextSources);
491 StageNextSources();
492 fHasNextSources = true;
493 fIsReadyForStaging = false;
494
495 lock.unlock();
496 fCvStaging.notify_one();
497 }
498}
499
501{
502 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
503 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
504 if (fStagingThreadShouldTerminate)
505 return;
506
507 if (fStagingArea[i]) {
508 // The first file is already open and was used to read the schema
509 assert(i == 0);
510 } else {
511 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
512 fStagingArea[i]->LoadStructure();
513 }
514 }
515}
516
518{
519 assert(fNextRanges.empty());
520 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
521 auto nRemainingFiles = nFiles - fNextFileIndex;
522 if (nRemainingFiles == 0)
523 return;
524
525 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
526 if (nRemainingFiles >= fNSlots) {
527 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
529
530 std::swap(fStagingArea[fNextFileIndex], range.fSource);
531
532 if (!range.fSource) {
533 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
534 // to open and attach files here.
535 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
536 }
537 range.fSource->Attach();
538 fNextFileIndex++;
539
540 auto nEntries = range.fSource->GetNEntries();
541 if (nEntries == 0)
542 continue;
543
544 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
545 fNextRanges.emplace_back(std::move(range));
546 }
547 return;
548 }
549
550 // Work scheduling of the tail: multiple slots work on the same file.
551 // Every slot still has its own page source but these page sources may open the same file.
552 // Again, we need to skip empty files.
553 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
554 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
555 std::unique_ptr<ROOT::Internal::RPageSource> source;
556 std::swap(fStagingArea[fNextFileIndex], source);
557 if (!source) {
558 // Empty files trigger this condition
559 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
560 }
561 source->Attach();
562 fNextFileIndex++;
563
564 auto nEntries = source->GetNEntries();
565 if (nEntries == 0)
566 continue;
567
568 // If last file: use all remaining slots
569 if (i == (nRemainingFiles - 1))
570 nSlotsPerFile = fNSlots - fNextRanges.size();
571
572 std::vector<std::pair<ULong64_t, ULong64_t>> rangesByCluster;
573 {
574 auto descriptorGuard = source->GetSharedDescriptorGuard();
575 auto clusterId = descriptorGuard->FindClusterId(0, 0);
577 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
578 rangesByCluster.emplace_back(std::make_pair<ULong64_t, ULong64_t>(
579 clusterDesc.GetFirstEntryIndex(), clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()));
580 clusterId = descriptorGuard->FindNextClusterId(clusterId);
581 }
582 }
583 const unsigned int nRangesByCluster = rangesByCluster.size();
584
585 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
587 const auto remainder = nRangesByCluster % nSlotsPerFile;
588 std::size_t iRange = 0;
589 unsigned int iSlot = 0;
590 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
591 for (; iSlot < N; ++iSlot) {
592 auto start = rangesByCluster[iRange].first;
593 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
594 assert(iRange > 0);
595 auto end = rangesByCluster[iRange - 1].second;
596
598 // The last range for this file just takes the already opened page source. All previous ranges clone.
599 if (iSlot == N - 1) {
600 range.fSource = std::move(source);
601 } else {
602 range.fSource = source->Clone();
603 }
604 range.fSource->SetEntryRange({start, end - start});
605 range.fFirstEntry = start;
606 range.fLastEntry = end;
607 fNextRanges.emplace_back(std::move(range));
608 }
609 } // loop over tail of remaining files
610}
611
612std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
613{
614 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
615
616 // We need to distinguish between single threaded and multi-threaded runs.
617 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
618 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
619 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
620 // InitSlot and FinalizeSlot.
621
622 if (fNSlots == 1) {
623 for (auto r : fActiveColumnReaders[0]) {
624 r->Disconnect(true /* keepValue */);
625 }
626 }
627
628 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
629 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
630 if (fCurrentRanges.empty() || (fSeenEntries > 0)) {
631 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
632 // and swap with the current ones.
633 {
634 std::unique_lock lock(fMutexStaging);
635 fCvStaging.wait(lock, [this] { return fHasNextSources; });
636 }
637 PrepareNextRanges();
638 if (fNextRanges.empty()) {
639 // No more data
640 return ranges;
641 }
642
643 assert(fNextRanges.size() <= fNSlots);
644
645 fCurrentRanges.clear();
646 std::swap(fCurrentRanges, fNextRanges);
647 }
648
649 // Stage next batch of files for the next call to GetEntryRanges()
650 {
651 std::lock_guard _(fMutexStaging);
652 fIsReadyForStaging = true;
653 fHasNextSources = false;
654 }
655 fCvStaging.notify_one();
656
657 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
658 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
659 // entry ranges, given the current state of the entry cursor.
660 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
661 // so that we can properly rewire the column reader in InitSlot
662 fFirstEntry2RangeIdx.clear();
664 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
665 // Several consecutive ranges may operate on the same file (each with their own page source clone).
666 // We can detect a change of file when the first entry number jumps back to 0.
667 if (fCurrentRanges[i].fFirstEntry == 0) {
668 // New source
669 fSeenEntries += nEntriesPerSource;
671 }
672 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
673 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
674 nEntriesPerSource += end - start;
675
676 fFirstEntry2RangeIdx[start] = i;
677 ranges.emplace_back(start, end);
678 }
679 fSeenEntries += nEntriesPerSource;
680
681 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
682 for (auto r : fActiveColumnReaders[0]) {
683 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
684 }
685 }
686
687 return ranges;
688}
689
691{
692 if (fNSlots == 1)
693 return;
694
695 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
696 for (auto r : fActiveColumnReaders[slot]) {
697 r->Connect(*fCurrentRanges[idxRange].fSource, firstEntry - fCurrentRanges[idxRange].fFirstEntry);
698 }
699}
700
702{
703 if (fNSlots == 1)
704 return;
705
706 for (auto r : fActiveColumnReaders[slot]) {
707 r->Disconnect(true /* keepValue */);
708 }
709}
710
711std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
712{
713 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
714
715 if (colNamePos == fColumnNames.end()) {
716 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
717 throw std::runtime_error(msg);
718 }
719
720 const auto index = std::distance(fColumnNames.begin(), colNamePos);
721 return fColumnTypes[index];
722}
723
724bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
725{
726 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
727}
728
730{
731 fSeenEntries = 0;
732 fNextFileIndex = 0;
733 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate = false;
734 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
735 assert(fNextRanges.empty());
736
737 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
738 // First event loop or large number of files: start the staging process.
739 {
740 std::lock_guard _(fMutexStaging);
741 fIsReadyForStaging = true;
742 }
743 fCvStaging.notify_one();
744 } else {
745 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
746 // (already at the end of the list of files).
747 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
748 }
749}
750
752{
753 for (unsigned int i = 0; i < fNSlots; ++i) {
754 for (auto r : fActiveColumnReaders[i]) {
755 r->Disconnect(false /* keepValue */);
756 }
757 }
758 {
759 std::lock_guard _(fMutexStaging);
760 fStagingThreadShouldTerminate = true;
761 }
762 fCvStaging.notify_one();
763 fThreadStaging.join();
764 // If we have a chain with more files than the number of slots, the files opened at the end of the
765 // event loop won't be reused when the event loop restarts, so we can close them.
766 if (fFileNames.size() > fNSlots) {
767 fCurrentRanges.clear();
768 fNextRanges.clear();
769 fStagingArea.clear();
770 fStagingArea.resize(fFileNames.size());
771 }
772}
773
775{
776 assert(fNSlots == 0);
777 assert(nSlots > 0);
778 fNSlots = nSlots;
779 fActiveColumnReaders.resize(fNSlots);
780}
781
782ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
783{
784 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
785}
786
787ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
788{
789 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
790}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:58
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:89
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:82
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:92
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:83
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:72
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:88
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:60
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:64
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Storage provider that reads ntuple pages from a file.
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:47
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:82
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:89
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:65
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:98
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:64
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:99
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:65
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1677
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:123
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleStructure
The fields in the ntuple model tree can carry different structural information about the type system.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:53