The RDataSource implementation for RNTuple.
It lets RDataFrame read RNTuple data.
An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
For each column containing an array or a collection, a corresponding column #colname
is available to access colname.size()
without reading and deserializing the collection values.
Definition at line 46 of file RNTupleDS.hxx.
Classes | |
struct | REntryRangeDS |
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records. More... | |
struct | RFieldInfo |
Holds useful information about fields added to the RNTupleDS. More... | |
Public Member Functions | |
RNTupleDS (ROOT::RNTuple *ntuple) | |
RNTupleDS (std::string_view ntupleName, const std::vector< std::string > &fileNames) | |
RNTupleDS (std::string_view ntupleName, std::string_view fileName) | |
~RNTupleDS () | |
void | Finalize () final |
Convenience method called after concluding an event-loop. | |
void | FinalizeSlot (unsigned int slot) final |
Convenience method called at the end of the data processing associated to a slot. | |
const std::vector< std::string > & | GetColumnNames () const final |
Returns a reference to the collection of the dataset's column names. | |
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > | GetColumnReaders (unsigned int, std::string_view, const std::type_info &) final |
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead. | |
std::vector< std::pair< ULong64_t, ULong64_t > > | GetEntryRanges () final |
Return ranges of entries to distribute to tasks. | |
std::string | GetLabel () final |
Return a string representation of the datasource type. | |
std::size_t | GetNFiles () const final |
Returns the number of files from which the dataset is constructed. | |
std::string | GetTypeName (std::string_view colName) const final |
Type of a column as a string, e.g. | |
bool | HasColumn (std::string_view colName) const final |
Checks if the dataset has a certain column. | |
void | Initialize () final |
Convenience method called before starting an event-loop. | |
void | InitSlot (unsigned int slot, ULong64_t firstEntry) final |
Convenience method called at the start of the data processing associated to a slot. | |
bool | SetEntry (unsigned int, ULong64_t) final |
Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot. | |
void | SetNSlots (unsigned int nSlots) final |
Inform RDataSource of the number of processing slots (i.e. | |
Public Member Functions inherited from ROOT::RDF::RDataSource | |
virtual | ~RDataSource ()=default |
template<typename T > | |
std::vector< T ** > | GetColumnReaders (std::string_view columnName) |
Called at most once per column by RDF. | |
Protected Member Functions | |
Record_t | GetColumnReadersImpl (std::string_view name, const std::type_info &) final |
type-erased vector of pointers to pointers to column values - one per slot | |
Protected Member Functions inherited from ROOT::RDF::RDataSource | |
virtual std::string | AsString () |
Private Member Functions | |
RNTupleDS (std::unique_ptr< ROOT::Experimental::Internal::RPageSource > pageSource) | |
void | AddField (const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos) |
Provides the RDF column "colName" given the field identified by fieldID. | |
void | ExecStaging () |
The main function of the fThreadStaging background thread. | |
void | PrepareNextRanges () |
Populates fNextRanges with the next set of entry ranges. | |
void | StageNextSources () |
Starting from fNextFileIndex , opens the next fNSlots files. | |
Private Attributes | |
std::vector< std::vector< Internal::RNTupleColumnReader * > > | fActiveColumnReaders |
List of column readers returned by GetColumnReaders() organized by slot. | |
std::vector< std::string > | fColumnNames |
std::vector< std::string > | fColumnTypes |
std::vector< REntryRangeDS > | fCurrentRanges |
Basis for the ranges returned by the last GetEntryRanges() call. | |
std::condition_variable | fCvStaging |
Signal for the state information of fIsReadyForStaging and fHasNextSources. | |
std::unordered_map< ROOT::Experimental::DescriptorId_t, std::string > | fFieldId2QualifiedName |
Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d). | |
std::vector< std::string > | fFileNames |
std::unordered_map< ULong64_t, std::size_t > | fFirstEntry2RangeIdx |
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in the fCurrentRanges vectors. | |
bool | fHasNextSources = false |
Is true when the staging thread has populated the next batch of files to fStagingArea. | |
bool | fIsReadyForStaging = false |
Is true when the staging thread should start working. | |
std::mutex | fMutexStaging |
Protects the shared state between the main thread and the I/O thread. | |
std::size_t | fNextFileIndex = 0 |
Index into fFileNames to the next file to process. | |
std::vector< REntryRangeDS > | fNextRanges |
Basis for the ranges populated by the PrepareNextRanges() call. | |
unsigned int | fNSlots = 0 |
std::string | fNTupleName |
The data source may be constructed with an ntuple name and a list of files. | |
std::unique_ptr< RNTupleDescriptor > | fPrincipalDescriptor |
A clone of the first pages source's descriptor. | |
std::vector< std::unique_ptr< ROOT::Experimental::RFieldBase > > | fProtoFields |
We prepare a prototype field for every column. | |
ULong64_t | fSeenEntries = 0 |
The number of entries so far returned by GetEntryRanges() | |
std::vector< std::unique_ptr< ROOT::Experimental::Internal::RPageSource > > | fStagingArea |
The staging area is relevant for chains of files, i.e. | |
bool | fStagingThreadShouldTerminate = false |
Is true when the I/O thread should quit. | |
std::thread | fThreadStaging |
The background thread that runs StageNextSources() | |
Friends | |
class | Internal::RNTupleColumnReader |
Additional Inherited Members | |
Protected Types inherited from ROOT::RDF::RDataSource | |
using | Record_t = std::vector< void * > |
#include <ROOT/RNTupleDS.hxx>
|
explicitprivate |
Definition at line 372 of file RNTupleDS.cxx.
ROOT::Experimental::RNTupleDS::RNTupleDS | ( | std::string_view | ntupleName, |
std::string_view | fileName | ||
) |
Definition at line 409 of file RNTupleDS.cxx.
ROOT::Experimental::RNTupleDS::RNTupleDS | ( | ROOT::RNTuple * | ntuple | ) |
Definition at line 414 of file RNTupleDS.cxx.
ROOT::Experimental::RNTupleDS::RNTupleDS | ( | std::string_view | ntupleName, |
const std::vector< std::string > & | fileNames | ||
) |
Definition at line 419 of file RNTupleDS.cxx.
|
default |
|
private |
Provides the RDF column "colName" given the field identified by fieldID.
For records and collections, AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an std::vector<Jet>
, with struct Jet { float pt; float eta; }; AddField will recurse into Jet.pt and Jet.eta and provide the two inner fields as std::vector<float> each.
Definition at line 228 of file RNTupleDS.cxx.
|
private |
The main function of the fThreadStaging background thread.
Definition at line 454 of file RNTupleDS.cxx.
|
finalvirtual |
Convenience method called after concluding an event-loop.
See Initialize for more details.
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 716 of file RNTupleDS.cxx.
|
finalvirtual |
Convenience method called at the end of the data processing associated to a slot.
[in] | slot | The data processing slot wihch needs to be finalized This method might be called multiple times per thread per event-loop. |
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 673 of file RNTupleDS.cxx.
|
inlinefinalvirtual |
Returns a reference to the collection of the dataset's column names.
Implements ROOT::RDF::RDataSource.
Definition at line 162 of file RNTupleDS.hxx.
|
finalvirtual |
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
[in] | slot | The data processing slot that needs to be considered |
[in] | name | The name of the column for which a column reader needs to be returned |
[in] | tid | A type_info At least one of the two must return a non-empty/non-null value. |
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 434 of file RNTupleDS.cxx.
|
finalprotectedvirtual |
type-erased vector of pointers to pointers to column values - one per slot
Implements ROOT::RDF::RDataSource.
Definition at line 427 of file RNTupleDS.cxx.
|
finalvirtual |
Return ranges of entries to distribute to tasks.
They are required to be contiguous intervals with no entries skipped. Supposing a dataset with nEntries, the intervals must start at 0 and end at nEntries, e.g. [0-5],[5-10] for 10 entries. This function will be invoked repeatedly by RDataFrame as it needs additional entries to process. The same entry range should not be returned more than once. Returning an empty collection of ranges signals to RDataFrame that the processing can stop.
Implements ROOT::RDF::RDataSource.
Definition at line 584 of file RNTupleDS.cxx.
|
inlinefinalvirtual |
Return a string representation of the datasource type.
The returned string will be used by ROOT::RDF::SaveGraph() to represent the datasource in the visualization of the computation graph. Concrete datasources can override the default implementation.
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 166 of file RNTupleDS.hxx.
|
inlinefinalvirtual |
Returns the number of files from which the dataset is constructed.
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 161 of file RNTupleDS.hxx.
|
finalvirtual |
Type of a column as a string, e.g.
GetTypeName("x") == "double"
. Required for jitting e.g. df.Filter("x>0")
.
[in] | colName | The name of the column |
Implements ROOT::RDF::RDataSource.
Definition at line 683 of file RNTupleDS.cxx.
|
finalvirtual |
Checks if the dataset has a certain column.
[in] | colName | The name of the column |
Implements ROOT::RDF::RDataSource.
Definition at line 689 of file RNTupleDS.cxx.
|
finalvirtual |
Convenience method called before starting an event-loop.
This method might be called multiple times over the lifetime of a RDataSource, since users can run multiple event-loops with the same RDataFrame. Ideally, Initialize
should set the state of the RDataSource so that multiple identical event-loops will produce identical results.
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 694 of file RNTupleDS.cxx.
Convenience method called at the start of the data processing associated to a slot.
[in] | slot | The data processing slot wihch needs to be initialized |
[in] | firstEntry | The first entry of the range that the task will process. This method might be called multiple times per thread per event-loop. |
Reimplemented from ROOT::RDF::RDataSource.
Definition at line 662 of file RNTupleDS.cxx.
|
private |
Populates fNextRanges with the next set of entry ranges.
Moves files from the staging area as necessary and aligns ranges with cluster boundaries for scheduling the tail of files. Upon return, the fNextRanges list is ordered. It has usually fNSlots elements; fewer if there is not enough work to give at least one cluster to every slot.
Definition at line 489 of file RNTupleDS.cxx.
|
inlinefinalvirtual |
Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
[in] | slot | The data processing slot that needs to be considered |
[in] | entry | The entry which needs to be pointed to by the reader pointers Slots are adopted to accommodate parallel data processing. Different workers will loop over different ranges and will be labelled by different "slot" values. Returns true if the entry has to be processed, false otherwise. |
Implements ROOT::RDF::RDataSource.
Definition at line 177 of file RNTupleDS.hxx.
|
finalvirtual |
Inform RDataSource of the number of processing slots (i.e.
worker threads) used by the associated RDataFrame. Slots numbers are used to simplify parallel execution: RDataFrame guarantees that different threads will always pass different slot values when calling methods concurrently.
Implements ROOT::RDF::RDataSource.
Definition at line 739 of file RNTupleDS.cxx.
|
private |
Starting from fNextFileIndex
, opens the next fNSlots
files.
Calls LoadStructure()
on the opened files. The very first file is already available from the constructor.
Definition at line 472 of file RNTupleDS.cxx.
|
friend |
Definition at line 47 of file RNTupleDS.hxx.
|
private |
List of column readers returned by GetColumnReaders() organized by slot.
Used to reconnect readers to new page sources when the files in the chain change.
Definition at line 97 of file RNTupleDS.hxx.
|
private |
Definition at line 93 of file RNTupleDS.hxx.
|
private |
Definition at line 94 of file RNTupleDS.hxx.
|
private |
Basis for the ranges returned by the last GetEntryRanges() call.
Definition at line 101 of file RNTupleDS.hxx.
|
private |
Signal for the state information of fIsReadyForStaging and fHasNextSources.
Definition at line 113 of file RNTupleDS.hxx.
|
private |
Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d).
This enables the column reader to rewire the field IDs when the file changes (chain), using the fully qualified name as a search key in the descriptor of the other page sources.
Definition at line 92 of file RNTupleDS.hxx.
|
private |
Definition at line 64 of file RNTupleDS.hxx.
|
private |
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in the fCurrentRanges vectors.
This is necessary because the returned ranges get distributed arbitrarily onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to.
Definition at line 106 of file RNTupleDS.hxx.
|
private |
Is true when the staging thread has populated the next batch of files to fStagingArea.
Definition at line 117 of file RNTupleDS.hxx.
|
private |
Is true when the staging thread should start working.
Definition at line 115 of file RNTupleDS.hxx.
|
private |
Protects the shared state between the main thread and the I/O thread.
Definition at line 111 of file RNTupleDS.hxx.
|
private |
Index into fFileNames to the next file to process.
Definition at line 82 of file RNTupleDS.hxx.
|
private |
Basis for the ranges populated by the PrepareNextRanges() call.
Definition at line 102 of file RNTupleDS.hxx.
|
private |
Definition at line 99 of file RNTupleDS.hxx.
|
private |
The data source may be constructed with an ntuple name and a list of files.
Definition at line 63 of file RNTupleDS.hxx.
|
private |
A clone of the first pages source's descriptor.
Definition at line 60 of file RNTupleDS.hxx.
|
private |
We prepare a prototype field for every column.
If a column reader is actually requested in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame. Only the clone connects to the backing page store and acquires I/O resources. The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName.
Definition at line 88 of file RNTupleDS.hxx.
|
private |
The number of entries so far returned by GetEntryRanges()
Definition at line 100 of file RNTupleDS.hxx.
|
private |
The staging area is relevant for chains of files, i.e.
when fFileNames is not empty. In this case, files are opened in the background in batches of size fNSlots
and kept in the staging area. The first file (chains or no chains) is always opened on construction in order to process the schema. For all subsequent files, the corresponding page sources in the staging area only executed LoadStructure()
, i.e. they should have a compressed buffer of the meta-data available. Concretely:
Initialize()
, we start the I/O background thread, which in turn opens the first batch of files.GetEntryRanges()
, we a) wait for the I/O thread to finish, b) call PrepareNextRanges()
in the main thread to move the page sources from the staging area into fNextRanges
; this will also call Attach()
on the page sources (i.e., deserialize the meta-data), and c) trigger staging of the next batch of files in the I/O background thread.Finalize()
, the I/O background thread is stopped. Definition at line 81 of file RNTupleDS.hxx.
|
private |
Is true when the I/O thread should quit.
Definition at line 119 of file RNTupleDS.hxx.
|
private |
The background thread that runs StageNextSources()
Definition at line 109 of file RNTupleDS.hxx.