Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleFillContext.cxx
Go to the documentation of this file.
1/// \file RNTupleFillContext.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2024-02-22
5
6/*************************************************************************
7 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15
16#include <ROOT/RError.hxx>
17#include <ROOT/RFieldBase.hxx>
18#include <ROOT/RLogger.hxx>
20#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleUtils.hxx>
23#include <ROOT/RPageStorage.hxx>
24
25#include <algorithm>
26#include <utility>
27
28ROOT::RNTupleFillContext::RNTupleFillContext(std::unique_ptr<ROOT::RNTupleModel> model,
29 std::unique_ptr<ROOT::Internal::RPageSink> sink)
30 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleFillContext")
31{
32 fModel->Freeze();
33 fSink->Init(*fModel);
34 fMetrics.ObserveMetrics(fSink->GetMetrics());
35
36 const auto &writeOpts = fSink->GetWriteOptions();
37 fMaxUnzippedClusterSize = writeOpts.GetMaxUnzippedClusterSize();
38 // First estimate is a factor 2 compression if compression is used at all
39 const int scale = writeOpts.GetCompression() ? 2 : 1;
40 fUnzippedClusterSizeEst = scale * writeOpts.GetApproxZippedClusterSize();
41}
42
44{
45 try {
47 } catch (const RException &err) {
48 R__LOG_ERROR(ROOT::Internal::NTupleLog()) << "failure flushing cluster: " << err.GetError().GetReport();
49 }
50
51 if (!fStagedClusters.empty()) {
53 << std::to_string(fStagedClusters.size()) << " staged clusters still pending, their data is lost";
54 }
55}
56
63
65{
66 if (fNEntries == fLastFlushed) {
67 return;
68 }
69 for (auto &field : ROOT::Internal::GetFieldZeroOfModel(*fModel)) {
71 }
72 auto nEntriesInCluster = fNEntries - fLastFlushed;
74 auto stagedCluster = fSink->StageCluster(nEntriesInCluster);
75 fNBytesFlushed += stagedCluster.fNBytesWritten;
76 fStagedClusters.push_back(std::move(stagedCluster));
77 } else {
78 fNBytesFlushed += fSink->CommitCluster(nEntriesInCluster);
79 }
81
82 // Cap the compression factor at 1000 to prevent overflow of fUnzippedClusterSizeEst
83 const float compressionFactor =
84 std::min(1000.f, static_cast<float>(fNBytesFilled) / static_cast<float>(fNBytesFlushed));
86 compressionFactor * static_cast<float>(fSink->GetWriteOptions().GetApproxZippedClusterSize());
87
90}
91
93{
94 if (fStagedClusters.empty()) {
95 return;
96 }
97 if (fModel->IsExpired()) {
98 throw RException(R__FAIL("invalid attempt to commit staged clusters after dataset was committed"));
99 }
100
101 fSink->CommitStagedClusters(fStagedClusters);
102 fStagedClusters.clear();
103}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
std::string GetReport() const
Format a dignostics report, e.g. for an exception message.
Definition RError.cxx:21
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
const RError & GetError() const
Definition RError.hxx:84
void FlushCluster()
Flush so far filled entries to storage.
ROOT::NTupleSize_t fLastFlushed
std::size_t fUnzippedClusterSizeEst
Estimator of uncompressed cluster size, taking into account the estimated compression ratio.
std::uint64_t fNBytesFilled
The total number of bytes filled into all the so far committed clusters, i.e.
Experimental::Detail::RNTupleMetrics fMetrics
std::uint64_t fNBytesFlushed
The total number of bytes written to storage (i.e., after compression)
std::vector< ROOT::Internal::RPageSink::RStagedCluster > fStagedClusters
Vector of currently staged clusters.
void FlushColumns()
Flush column data, preparing for CommitCluster or to reduce memory usage.
RNTupleFillContext(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
std::size_t fUnzippedClusterSize
Keeps track of the number of bytes written into the current cluster.
void CommitStagedClusters()
Logically append staged clusters to the RNTuple.
bool fStagedClusterCommitting
Whether to enable staged cluster committing, where only an explicit call to CommitStagedClusters() wi...
std::unique_ptr< ROOT::RNTupleModel > fModel
Needs to be destructed before fSink.
std::unique_ptr< ROOT::Internal::RPageSink > fSink
std::size_t fMaxUnzippedClusterSize
Limit for committing cluster no matter the other tunables.
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
void CallCommitClusterOnField(RFieldBase &)
void CallFlushColumnsOnField(RFieldBase &)