Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleFillContext.cxx
Go to the documentation of this file.
1/// \file RNTupleFillContext.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2024-02-22
4
5/*************************************************************************
6 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
14
15#include <ROOT/RError.hxx>
16#include <ROOT/RFieldBase.hxx>
17#include <ROOT/RLogger.hxx>
19#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleUtils.hxx>
22#include <ROOT/RPageStorage.hxx>
23
24#include <algorithm>
25#include <utility>
26
27ROOT::RNTupleFillContext::RNTupleFillContext(std::unique_ptr<ROOT::RNTupleModel> model,
28 std::unique_ptr<ROOT::Internal::RPageSink> sink)
29 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleFillContext")
30{
31 fModel->Freeze();
32 fSink->Init(*fModel);
33 fMetrics.ObserveMetrics(fSink->GetMetrics());
34
35 const auto &writeOpts = fSink->GetWriteOptions();
36 fMaxUnzippedClusterSize = writeOpts.GetMaxUnzippedClusterSize();
37 // First estimate is a factor 2 compression if compression is used at all
38 const int scale = writeOpts.GetCompression() ? 2 : 1;
39 fUnzippedClusterSizeEst = scale * writeOpts.GetApproxZippedClusterSize();
40}
41
43{
44 try {
45 FlushCluster();
46 } catch (const RException &err) {
47 R__LOG_ERROR(ROOT::Internal::NTupleLog()) << "failure flushing cluster: " << err.GetError().GetReport();
48 }
49
50 if (!fStagedClusters.empty()) {
52 << std::to_string(fStagedClusters.size()) << " staged clusters still pending, their data is lost";
53 }
54}
55
62
64{
65 if (fNEntries == fLastFlushed) {
66 return;
67 }
68 for (auto &field : ROOT::Internal::GetFieldZeroOfModel(*fModel)) {
70 }
71 auto nEntriesInCluster = fNEntries - fLastFlushed;
72 if (fStagedClusterCommitting) {
73 auto stagedCluster = fSink->StageCluster(nEntriesInCluster);
74 fNBytesFlushed += stagedCluster.fNBytesWritten;
75 fStagedClusters.push_back(std::move(stagedCluster));
76 } else {
77 fNBytesFlushed += fSink->CommitCluster(nEntriesInCluster);
78 }
79 fNBytesFilled += fUnzippedClusterSize;
80
81 // Cap the compression factor at 1000 to prevent overflow of fUnzippedClusterSizeEst
82 const float compressionFactor =
83 std::min(1000.f, static_cast<float>(fNBytesFilled) / static_cast<float>(fNBytesFlushed));
84 fUnzippedClusterSizeEst =
85 compressionFactor * static_cast<float>(fSink->GetWriteOptions().GetApproxZippedClusterSize());
86
87 fLastFlushed = fNEntries;
88 fUnzippedClusterSize = 0;
89}
90
92{
93 if (fStagedClusters.empty()) {
94 return;
95 }
96 if (fModel->IsExpired()) {
97 throw RException(R__FAIL("invalid attempt to commit staged clusters after dataset was committed"));
98 }
99
100 fSink->CommitStagedClusters(fStagedClusters);
101 fStagedClusters.clear();
102}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
void ObserveMetrics(RNTupleMetrics &observee)
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
const RError & GetError() const
Definition RError.hxx:83
void FlushCluster()
Flush so far filled entries to storage.
std::size_t fUnzippedClusterSizeEst
Estimator of uncompressed cluster size, taking into account the estimated compression ratio.
Experimental::Detail::RNTupleMetrics fMetrics
void FlushColumns()
Flush column data, preparing for CommitCluster or to reduce memory usage.
RNTupleFillContext(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
void CommitStagedClusters()
Logically append staged clusters to the RNTuple.
std::unique_ptr< ROOT::RNTupleModel > fModel
Needs to be destructed before fSink.
std::unique_ptr< ROOT::Internal::RPageSink > fSink
std::size_t fMaxUnzippedClusterSize
Limit for committing cluster no matter the other tunables.
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
void CallCommitClusterOnField(RFieldBase &)
void CallFlushColumnsOnField(RFieldBase &)