97 static_assert(std::is_integral_v<T>,
"T must be an integral type");
101 if constexpr (std::is_same_v<T, bool> || std::is_same_v<T, std::uint64_t>)
105 ". Please read the column with a larger-sized integral type."));
112 return std::make_unique<RRDFCardinalityField>(
newName);
137 *
static_cast<T *
>(to) =
size;
147 *
static_cast<T *
>(to) =
size;
205 std::unique_ptr<RFieldBase::RValue>
fValue;
245 auto onDiskType =
source.GetSharedDescriptorGuard()->GetFieldDescriptor(
fField->GetOnDiskId()).GetTypeName();
246 std::string
msg =
"RNTupleDS: invalid type \"" +
fField->GetTypeName() +
"\" for column \"" +
249 throw std::runtime_error(
msg);
259 fValue = std::make_unique<RFieldBase::RValue>(
fField->CreateValue());
279 return fValue->GetPtr<
void>().get();
335 fieldDesc.GetTypeName().substr(0, 12) ==
"std::vector<" ||
fieldDesc.GetTypeName() ==
"");
367 std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint32_t>>(
fieldDesc.GetFieldName());
370 std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint64_t>>(
fieldDesc.GetFieldName());
372 R__ASSERT(
false &&
"cardinality field stored with an unexpected integer type");
377 f.SetOnDiskId(desc.
FindFieldId(
f.GetFieldName(),
f.GetParent()->GetOnDiskId()));
384 if (
info.fNRepetitions > 0) {
387 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::size_t>>(
name);
446 fPrincipalDescriptor =
pageSource->GetSharedDescriptorGuard()->Clone();
447 fStagingArea.emplace_back(std::move(
pageSource));
449 AddField(fPrincipalDescriptor,
"", fPrincipalDescriptor.GetFieldZeroId(),
450 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
456 fTopLevelFieldNames.push_back(
field.GetFieldName());
467 static std::once_flag
flag;
468 std::call_once(
flag, []() {
479std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view
ntupleName, std::string_view fileName)
489 fFileNames = std::vector<std::string>{std::string{fileName}};
501 const std::pair<ULong64_t, ULong64_t> &
range)
518 std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(),
fieldName));
523 return fProtoFields[
index].get();
538 return fld->GetTypeName() == requestedType;
569 throw std::runtime_error(
"RNTupleDS: Could not create field with type \"" +
requestedType +
570 "\" for column \"" + std::string(
fieldName) +
"\"");
575 throw std::runtime_error(
"RNTupleDS: Could not create field with type \"" +
requestedType +
576 "\" for column \"" + std::string(
fieldName) +
"\"");
587 return fProtoFields[
index].get();
590std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
598 fFieldId2QualifiedName[
field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(
field->GetOnDiskId());
599 for (
const auto &s : *
field) {
600 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
603 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(
this,
field);
604 fActiveColumnReaders[
slot].emplace_back(
reader.get());
612 std::unique_lock lock(fMutexStaging);
613 fCvStaging.wait(lock, [
this] {
return fIsReadyForStaging || fStagingThreadShouldTerminate; });
614 if (fStagingThreadShouldTerminate)
619 fHasNextSources =
true;
620 fIsReadyForStaging =
false;
623 fCvStaging.notify_one();
629 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
631 for (
auto i = fNextFileIndex; (i <
nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
633 if (fStagingThreadShouldTerminate)
636 if (fStagingArea[i]) {
640 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
641 fStagingArea[i]->LoadStructure();
648 assert(fNextRanges.empty());
650 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
659 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex <
nFiles)) {
662 std::swap(fStagingArea[fNextFileIndex],
range.fSource);
664 if (!
range.fSource) {
667 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
669 range.fFileName = fFileNames[fNextFileIndex];
670 range.fSource->Attach();
677 fNextRanges.emplace_back(std::move(
range));
686 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex <
nFiles); ++i) {
687 std::unique_ptr<ROOT::Internal::RPageSource>
source;
690 std::swap(fStagingArea[fNextFileIndex],
source);
693 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
718 unsigned int iSlot = 0;
734 range.fSource->SetEntryRange({start, end - start});
735 range.fFirstEntry = start;
736 range.fLastEntry = end;
737 fNextRanges.emplace_back(std::move(
range));
744 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
753 for (
auto r : fActiveColumnReaders[0]) {
754 r->Disconnect(
true );
760 if (fCurrentRanges.empty() || fSeenEntriesNoGlobalRange > 0) {
764 std::unique_lock lock(fMutexStaging);
765 fCvStaging.wait(lock, [
this] {
return fHasNextSources; });
768 if (fNextRanges.empty()) {
773 assert(fNextRanges.size() <= fNSlots);
775 fCurrentRanges.clear();
776 std::swap(fCurrentRanges, fNextRanges);
781 std::lock_guard
_(fMutexStaging);
782 fIsReadyForStaging =
true;
783 fHasNextSources =
false;
785 fCvStaging.notify_one();
792 fFirstEntry2RangeIdx.clear();
793 fOriginalRanges.clear();
797 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
801 if (fCurrentRanges[i].fFirstEntry == 0) {
807 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntriesNoGlobalRange;
808 auto end = fCurrentRanges[i].fLastEntry + fSeenEntriesNoGlobalRange;
812 if (fGlobalEntryRange.has_value()) {
830 if (fGlobalEntryRange->first >= start && fGlobalEntryRange->second <= end) {
831 fOriginalRanges.emplace_back(start, end);
832 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
833 ranges.emplace_back(fGlobalEntryRange->first, fGlobalEntryRange->second);
838 else if (fGlobalEntryRange->second > end && fGlobalEntryRange->first < end) {
839 fOriginalRanges.emplace_back(start, end);
840 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
841 ranges.emplace_back(fGlobalEntryRange->first, end);
842 std::optional<std::pair<ULong64_t, ULong64_t>>
newvalues({end, fGlobalEntryRange->second});
846 else if (fGlobalEntryRange->second < start) {
851 else if (fGlobalEntryRange->first >= end) {
852 fOriginalRanges.emplace_back(start, end);
853 fFirstEntry2RangeIdx[start] = i;
854 ranges.emplace_back(start, start);
859 fFirstEntry2RangeIdx[start] = i;
860 fOriginalRanges.emplace_back(start, end);
861 ranges.emplace_back(start, end);
867 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
868 for (
auto r : fActiveColumnReaders[0]) {
869 r->Connect(*fCurrentRanges[0].fSource, fOriginalRanges[0].first);
880 fSlotsToRangeIdxs[0] = 0;
891 fSlotsToRangeIdxs[
slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>()] =
idxRange;
893 for (
auto r : fActiveColumnReaders[
slot]) {
894 r->Connect(*fCurrentRanges[
idxRange].fSource,
904 for (
auto r : fActiveColumnReaders[
slot]) {
905 r->Disconnect(
true );
914 auto msg = std::string(
"RNTupleDS: There is no column with name \"") + std::string(
colName) +
"\"";
915 throw std::runtime_error(
msg);
919 return fColumnTypes[
index];
924 return std::find(fColumnNames.begin(), fColumnNames.end(),
colName) != fColumnNames.end();
929 fSeenEntriesNoGlobalRange = 0;
931 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate =
false;
932 fThreadStaging = std::thread(&RNTupleDS::ExecStaging,
this);
933 assert(fNextRanges.empty());
935 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
938 std::lock_guard
_(fMutexStaging);
939 fIsReadyForStaging =
true;
941 fCvStaging.notify_one();
945 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
951 for (
unsigned int i = 0; i < fNSlots; ++i) {
952 for (
auto r : fActiveColumnReaders[i]) {
953 r->Disconnect(
false );
957 std::lock_guard
_(fMutexStaging);
958 fStagingThreadShouldTerminate =
true;
960 fCvStaging.notify_one();
961 fThreadStaging.join();
964 if (fFileNames.size() > fNSlots) {
965 fCurrentRanges.clear();
967 fStagingArea.clear();
968 fStagingArea.resize(fFileNames.size());
977 fActiveColumnReaders.resize(fNSlots);
978 fSlotsToRangeIdxs.resize(fNSlots * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
992 unsigned int slot,
const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &
sampleMap)
const
999 const auto &
rangeIdx = fSlotsToRangeIdxs.at(
slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
1002 if (!fCurrentRanges[
rangeIdx].fSource)
1012 fPrincipalDescriptor.GetNEntries());
1015 throw std::runtime_error(
"Full sample identifier '" +
ntupleID +
"' cannot be found in the available samples.");
1024 const std::pair<ULong64_t, ULong64_t> &
range)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Portable signed long integer 8 bytes.
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
R__EXTERN TSystem * gSystem
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
RArraySizeField(std::string_view name, std::size_t arrayLength)
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
~RArraySizeField() final=default
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view newName) const final
Called by Clone(), which additionally copies the on-disk ID.
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
~RNTupleColumnReader() override=default
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
void Disconnect(bool keepValue)
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
RRDFCardinalityFieldBase(std::string_view name, std::string_view type)
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
~RRDFCardinalityFieldBase() override=default
RRDFCardinalityFieldBase(const RRDFCardinalityFieldBase &other)=delete
RRDFCardinalityFieldBase(RRDFCardinalityFieldBase &&other)=default
RRDFCardinalityFieldBase & operator=(const RRDFCardinalityFieldBase &other)=delete
RRDFCardinalityFieldBase & operator=(RRDFCardinalityFieldBase &&other)=default
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
RRDFCardinalityField(RRDFCardinalityField &&other)=default
void CheckSize(ROOT::NTupleSize_t size) const
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RRDFCardinalityField & operator=(const RRDFCardinalityField &other)=delete
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RRDFCardinalityField(const RRDFCardinalityField &other)=delete
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view newName) const final
Called by Clone(), which additionally copies the on-disk ID.
~RRDFCardinalityField() override=default
RRDFCardinalityField(std::string_view name)
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
std::vector< void * > Record_t
std::optional< std::pair< ULong64_t, ULong64_t > > fGlobalEntryRange
The RDataSource implementation for RNTuple.
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
ROOT::RFieldBase * GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
std::vector< std::string > fFileNames
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The container field for an ntuple model, which itself has no physical representation.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector< std::string > &fileNames, const std::pair< ULong64_t, ULong64_t > &range)
Internal overload of the function that allows passing a range of entries.
std::pair< std::vector< ROOT::Internal::RNTupleClusterBoundaries >, ROOT::NTupleSize_t > GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
Retrieves the cluster boundaries and the number of entries for the input RNTuple.
void SetAllowFieldSubstitutions(RFieldZero &fieldZero, bool val)
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::vector< ROOT::Internal::RNTupleClusterBoundaries > GetClusterBoundaries(const RNTupleDescriptor &desc)
Return the cluster boundaries for each cluster in this RNTuple.
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
std::vector< std::string > ColumnNames_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
ENTupleStructure
The fields in the RNTuple data model tree can carry different structural information about the type s...
Tag to let data sources use the native data type when creating a column reader.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.