Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleExporter.cxx
Go to the documentation of this file.
1/// \file RNTupleExporter.cxx
2/// \author Giacomo Parolini <giacomo.parolini@cern.ch>
3/// \date 2024-12-10
4/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
5/// is welcome!
6
7/*************************************************************************
8 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#include <ROOT/RError.hxx>
17#include <ROOT/RPageStorage.hxx>
19#include <ROOT/RNTupleZip.hxx>
20#include <ROOT/RClusterPool.hxx>
21#include <ROOT/RLogger.hxx>
22#include <fstream>
23#include <sstream>
24#include <vector>
25
27
28namespace {
29
30ROOT::RLogChannel &RNTupleExporterLog()
31{
32 static RLogChannel sLog("ROOT.RNTupleExporter");
33 return sLog;
34}
35
36struct RColumnExportInfo {
37 const ROOT::RColumnDescriptor *fColDesc;
38 const ROOT::RFieldDescriptor *fFieldDesc;
39 std::string fQualifiedName;
40
41 RColumnExportInfo(const ROOT::RNTupleDescriptor &desc, const ROOT::RColumnDescriptor &colDesc,
42 const ROOT::RFieldDescriptor &fieldDesc)
43 : fColDesc(&colDesc),
44 fFieldDesc(&fieldDesc),
45 // NOTE: we don't need to keep the column representation index into account because exactly 1 representation
46 // is active per page, so there is no risk of name collisions.
47 fQualifiedName(desc.GetQualifiedFieldName(fieldDesc.GetId()) + '-' + std::to_string(colDesc.GetIndex()))
48 {
49 }
50};
51
52struct RAddColumnsResult {
53 int fNColsTotal = 0;
54
55 RAddColumnsResult &operator+=(const RAddColumnsResult &other)
56 {
57 fNColsTotal += other.fNColsTotal;
58 return *this;
59 }
60};
61
62template <typename T>
63bool ItemIsFilteredOut(const RNTupleExporter::RFilter<T> &filter, const T &item)
64{
65 bool filterHasType = filter.fSet.find(item) != filter.fSet.end();
66 bool isFiltered = (filter.fType == RNTupleExporter::EFilterType::kBlacklist) == filterHasType;
67 return isFiltered;
68}
69
70RAddColumnsResult AddColumnsFromField(std::vector<RColumnExportInfo> &vec, const ROOT::RNTupleDescriptor &desc,
71 const ROOT::RFieldDescriptor &fieldDesc,
72 const RNTupleExporter::RPagesOptions &options)
73{
74 R__LOG_DEBUG(1, RNTupleExporterLog()) << "processing field \"" << desc.GetQualifiedFieldName(fieldDesc.GetId())
75 << "\"";
76
77 RAddColumnsResult res{};
78
79 for (const auto &subfieldDesc : desc.GetFieldIterable(fieldDesc)) {
80 if (subfieldDesc.IsProjectedField())
81 continue;
82
83 for (const auto &colDesc : desc.GetColumnIterable(subfieldDesc)) {
84 // Filter columns by type
85 bool typeIsFiltered = ItemIsFilteredOut(options.fColumnTypeFilter, colDesc.GetType());
86 if (!typeIsFiltered)
87 vec.emplace_back(desc, colDesc, subfieldDesc);
88 res.fNColsTotal += 1;
89 }
90 res += AddColumnsFromField(vec, desc, subfieldDesc, options);
91 }
92
93 return res;
94}
95
96int CountPages(const ROOT::RNTupleDescriptor &desc, std::span<const RColumnExportInfo> columns)
97{
98 int nPages = 0;
99 auto clusterId = desc.FindClusterId(0, 0);
100 while (clusterId != kInvalidDescriptorId) {
101 const auto &clusterDesc = desc.GetClusterDescriptor(clusterId);
102 for (const auto &colInfo : columns) {
103 const auto &pages = clusterDesc.GetPageRange(colInfo.fColDesc->GetPhysicalId());
104 nPages += pages.GetPageInfos().size();
105 }
106 clusterId = desc.FindNextClusterId(clusterId);
107 }
108 return nPages;
109}
110
111} // namespace
112
115{
117 throw ROOT::RException(R__FAIL("exporting checksums is incompatible with decompressing the pages"));
118
119 RPagesResult res = {};
120
121 // make sure the source is attached
122 source.Attach();
123
124 auto desc = source.GetSharedDescriptorGuard();
125 ROOT::Internal::RClusterPool clusterPool{source};
126
127 // Collect column info
128 std::vector<RColumnExportInfo> columnInfos;
129 const RAddColumnsResult addColRes = AddColumnsFromField(columnInfos, desc.GetRef(), desc->GetFieldZero(), options);
130
131 // Collect ColumnSet for the cluster pool query
133 columnSet.reserve(columnInfos.size());
134 for (const auto &colInfo : columnInfos) {
135 columnSet.emplace(colInfo.fColDesc->GetPhysicalId());
136 }
137
138 const auto nPages = CountPages(desc.GetRef(), columnInfos);
139
140 const bool showProgress = (options.fFlags & RPagesOptions::kShowProgressBar) != 0;
141 res.fExportedFileNames.reserve(nPages);
142
143 // Iterate over the clusters in order and dump pages
144 auto clusterId = nPages > 0 ? desc->FindClusterId(0, 0) : ROOT::kInvalidDescriptorId;
145 int pagesExported = 0;
146 int prevIntPercent = 0;
147 std::vector<char> unzipBuf; // Only used when pages get decompressed
148 while (clusterId != ROOT::kInvalidDescriptorId) {
149 const auto &clusterDesc = desc->GetClusterDescriptor(clusterId);
150 const ROOT::Internal::RCluster *cluster = clusterPool.GetCluster(clusterId, columnSet);
151 for (const auto &colInfo : columnInfos) {
152 auto columnId = colInfo.fColDesc->GetPhysicalId();
153 const auto &pages = clusterDesc.GetPageRange(columnId);
154 const auto &colRange = clusterDesc.GetColumnRange(columnId);
155 auto colElement = ROOT::Internal::RColumnElementBase::Generate<void>(colInfo.fColDesc->GetType());
156 colElement->SetBitsOnStorage(colInfo.fColDesc->GetBitsOnStorage());
157
158 std::uint64_t pageIdx = 0;
159
160 R__LOG_DEBUG(0, RNTupleExporterLog())
161 << "exporting column \"" << colInfo.fQualifiedName << "\" (" << pages.GetPageInfos().size() << " pages)";
162
163 // We should never try to export a suppressed column range
164 assert(!colRange.IsSuppressed() || pages.GetPageInfos().empty());
165
166 for (const auto &pageInfo : pages.GetPageInfos()) {
167 ROOT::Internal::ROnDiskPage::Key key{columnId, pageIdx};
168 const ROOT::Internal::ROnDiskPage *onDiskPage = cluster->GetOnDiskPage(key);
169
170 // prepare the output file
171 std::ostringstream ss{options.fOutputPath, std::ios_base::ate};
172 assert(colRange.GetCompressionSettings());
173 ss << "/cluster_" << clusterDesc.GetId() << "_" << colInfo.fQualifiedName << "_page_" << pageIdx
174 << "_elems_" << pageInfo.GetNElements() << "_comp_" << *colRange.GetCompressionSettings() << ".page";
175 const auto outFileName = ss.str();
176 std::ofstream outFile{outFileName, std::ios_base::binary};
177 if (!outFile) {
178 throw ROOT::RException(
179 R__FAIL(std::string("output path ") + options.fOutputPath + " does not exist or is not writable!"));
180 }
181
182 // dump the page
183 const auto *pageBuf = static_cast<const char *>(onDiskPage->GetAddress());
184 if (options.fFlags & RPagesOptions::kDecompress) {
185 const auto nbytesPacked = colElement->GetPackedSize(pageInfo.GetNElements());
186 const auto nbytesData = pageInfo.GetLocator().GetNBytesOnStorage();
187 if (unzipBuf.size() < nbytesPacked)
188 unzipBuf.resize(nbytesPacked);
189 ROOT::Internal::RNTupleDecompressor::Unzip(pageBuf, nbytesData, nbytesPacked, &unzipBuf[0]);
190 outFile.write(unzipBuf.data(), nbytesPacked);
191 } else {
192 const bool includeChecksum =
193 (options.fFlags & RPagesOptions::kIncludeChecksums) != 0 && pageInfo.HasChecksum();
194 const std::size_t maybeChecksumSize = includeChecksum * 8;
195 const auto nbytesData = pageInfo.GetLocator().GetNBytesOnStorage() + maybeChecksumSize;
196 outFile.write(pageBuf, nbytesData);
197 }
198
199 res.fExportedFileNames.push_back(outFileName);
200
201 ++pageIdx, ++pagesExported;
202
203 if (showProgress) {
204 int intPercent = static_cast<int>(100.f * pagesExported / res.fExportedFileNames.size());
205 if (intPercent != prevIntPercent) {
206 fprintf(stderr, "\rExport progress: %02d%%", intPercent);
207 if (intPercent == 100)
208 fprintf(stderr, "\n");
209 prevIntPercent = intPercent;
210 }
211 }
212 }
213 }
214 clusterId = desc->FindNextClusterId(clusterId);
215 }
216
217 assert(res.fExportedFileNames.size() == static_cast<size_t>(pagesExported));
218 std::ostringstream ss;
219 ss << "exported " << res.fExportedFileNames.size() << " pages (";
220 if (options.fColumnTypeFilter.fSet.empty()) {
221 ss << addColRes.fNColsTotal << " columns)";
222 } else {
223 auto nColsFilteredOut = addColRes.fNColsTotal - columnInfos.size();
224 ss << nColsFilteredOut << "/" << addColRes.fNColsTotal << " columns filtered out)";
225 }
226 R__LOG_INFO(RNTupleExporterLog()) << ss.str();
227
228 return res;
229}
230
231} // namespace ROOT::Experimental::Internal
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:359
#define R__LOG_INFO(...)
Definition RLogger.hxx:358
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
std::string & operator+=(std::string &left, const TString &right)
Definition TString.h:495
@ kBlacklist
Don't export items contained in the filter's set.
static RPagesResult ExportPages(ROOT::Internal::RPageSource &source, const RPagesOptions &options={})
Given a page source, writes all its pages to individual files (1 per page).
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:31
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
const void * GetAddress() const
Definition RCluster.hxx:63
Abstract interface to read data from an ntuple.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor. Multiple threads can take the lock concurrently....
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
ROOT::DescriptorId_t GetId() const
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
const RFieldDescriptor & GetFieldZero() const
constexpr DescriptorId_t kInvalidDescriptorId
RFilter< ENTupleColumnType > fColumnTypeFilter
Optional filter that determines which columns are included or excluded from being exported.
@ kDecompress
If enabled, uncompress (but don't unpack) the page (mutually exclusive with kIncludeChecksums)
@ kShowProgressBar
If enabled, the exporter will report the current progress on the stderr.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51