29 if (!bufPage.fPage.IsNull()) {
46 "wall clock time spent in critical sections"),
48 "timeCpuCriticalSection",
"ns",
"CPU time spent in critical section")});
72 f.SetOnDiskId(fNFields);
75 for (
auto *
f : fields) {
77 for (
auto &descendant : *
f) {
78 connectField(descendant);
81 fBufferedColumns.resize(fNColumns);
86 return fInnerSink->GetDescriptor();
93 fInnerModel = model.
Clone();
94 fInnerSink->Init(*fInnerModel);
104 auto cloneAddField = [&](
const RFieldBase *field) {
105 auto cloned = field->Clone(field->GetFieldName());
107 fInnerModel->AddField(std::move(cloned));
110 auto cloneAddProjectedField = [&](
RFieldBase *field) {
111 auto cloned = field->
Clone(field->GetFieldName());
116 auto targetIt = cloned->
begin();
117 for (
auto &
f : *field)
118 fieldMap[&(*targetIt++)] = projectedFields.GetSourceField(&
f);
123 fInnerModel->Unfreeze();
125 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
127 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
128 fInnerModel->Freeze();
129 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
136 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
148 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
149 zipItem.AllocateSealedPageBuf(page.
GetNBytes());
151 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
153 if (!fTaskScheduler) {
156 SealPage(page, element, GetWriteOptions().GetCompression(), zipItem.fBuf.get(),
false);
157 zipItem.fSealedPage = &sealedPage;
162 zipItem.fPage = ReservePage(columnHandle, page.
GetNElements());
167 fCounters->fParallelZip.SetValue(1);
170 fTaskScheduler->AddTask([
this, &zipItem, &sealedPage, &element] {
171 sealedPage = SealPage(zipItem.fPage, element, GetWriteOptions().GetCompression(), zipItem.fBuf.get());
172 zipItem.fSealedPage = &sealedPage;
191 std::vector<RSealedPageGroup> toCommit;
192 toCommit.reserve(fBufferedColumns.size());
193 for (
auto &bufColumn : fBufferedColumns) {
194 R__ASSERT(bufColumn.HasSealedPagesOnly());
195 const auto &sealedPages = bufColumn.GetSealedPages();
196 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
199 std::uint64_t nbytes;
203 fInnerSink->CommitSealedPageV(toCommit);
205 nbytes = fInnerSink->CommitCluster(nNewEntries);
208 for (
auto &bufColumn : fBufferedColumns)
209 bufColumn.DropBufferedPages();
217 fInnerSink->CommitClusterGroup();
224 fInnerSink->CommitDataset();
230 return fInnerSink->ReservePage(columnHandle, nElements);
235 fInnerSink->ReleasePage(page);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
winID h TVirtualViewer3D TVirtualGLPainter p
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
RColumnElementBase * GetElement() const
RPageSink * GetPageSink() const
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::ColumnHandle_t fCol
void CommitDatasetImpl() final
std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
const RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
std::unique_ptr< RCounters > fCounters
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
void InitImpl(RNTupleModel &model) final
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void ConnectFields(const std::vector< RFieldBase * > &fields, NTupleSize_t firstEntry)
void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
const std::string & GetNTupleName() const
Returns the NTuple name.
virtual void ReleasePage(RPage &page)=0
Every page store needs to be able to free pages it handed out.
Detail::RNTupleMetrics fMetrics
A page is a slice of a column that is mapped into memory.
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
std::uint32_t GetNElements() const
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
std::vector< RFieldBase * > GetSubFields()
The on-storage meta-data of an ntuple.
Projected fields are fields whose columns are reused from existing fields.
std::unordered_map< const RFieldBase *, const RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
const RFieldBase * GetSourceField(const RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing and to set the on-disk f...
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
void Add(RHist< DIMENSIONS, PRECISION, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION, STAT_FROM... > &from)
Add two histograms.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).