Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RPageStorage.hxx>
25#include <ROOT/RClusterPool.hxx>
27#include <ROOT/RNTupleZip.hxx>
29#include <TROOT.h>
30#include <TFileMergeInfo.h>
31#include <TError.h>
32#include <TFile.h>
33#include <TKey.h>
34
35#include <algorithm>
36#include <deque>
37#include <inttypes.h> // for PRIu64
38#include <unordered_map>
39#include <vector>
40
41using namespace ROOT::Experimental;
42using namespace ROOT::Experimental::Internal;
43
44// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
46// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
47try {
48 // Check the inputs
49 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
50 Error("RNTuple::Merge", "Invalid inputs.");
51 return -1;
52 }
53
54 // Parse the input parameters
55 TIter itr(inputs);
56
57 // First entry is the RNTuple name
58 std::string ntupleName = std::string(itr()->GetName());
59
60 // Second entry is the output file
61 TObject *secondArg = itr();
62 TFile *outFile = dynamic_cast<TFile *>(secondArg);
63 if (!outFile) {
64 Error("RNTuple::Merge", "Second input parameter should be a TFile, but it's a %s.", secondArg->ClassName());
65 return -1;
66 }
67
68 // Check if the output file already has a key with that name
69 TKey *outKey = outFile->FindKey(ntupleName.c_str());
70 ROOT::RNTuple *outNTuple = nullptr;
71 if (outKey) {
72 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
73 if (!outNTuple) {
74 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
75 return -1;
76 }
77 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
78 // pointer we just got.
79 }
80
81 const bool defaultComp = mergeInfo->fOptions.Contains("default_compression");
82 const bool firstSrcComp = mergeInfo->fOptions.Contains("first_source_compression");
83 if (defaultComp && firstSrcComp) {
84 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code...
85 Warning(
86 "RNTuple::Merge",
87 "Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
88 }
89 int compression = kUnknownCompressionSettings;
90 if (firstSrcComp) {
91 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
92 // (do nothing here, the compression will be fetched below)
93 } else if (!defaultComp) {
94 // compression was explicitly passed by the user: use it.
95 compression = outFile->GetCompressionSettings();
96 } else {
97 // user passed no compression-related options: use default
99 Info("RNTuple::Merge", "Using the default compression: %d", compression);
100 }
101
102 // The remaining entries are the input files
103 std::vector<std::unique_ptr<RPageSourceFile>> sources;
104 std::vector<RPageSource *> sourcePtrs;
105
106 while (const auto &pitr = itr()) {
107 TFile *inFile = dynamic_cast<TFile *>(pitr);
108 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
109 if (!anchor) {
110 Error("RNTuple::Merge", "Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
111 inFile->GetName());
112 return -1;
113 }
114
115 auto source = RPageSourceFile::CreateFromAnchor(*anchor);
116 if (compression == kUnknownCompressionSettings) {
117 // Get the compression of this RNTuple and use it as the output compression.
118 // We currently assume all column ranges have the same compression, so we just peek at the first one.
119 source->Attach();
120 auto descriptor = source->GetSharedDescriptorGuard();
121 auto clusterIter = descriptor->GetClusterIterable();
122 auto firstCluster = clusterIter.begin();
123 if (firstCluster == clusterIter.end()) {
124 Error("RNTuple::Merge",
125 "Asked to use the first source's compression as the output compression, but the "
126 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
127 "determined.",
128 inFile->GetName());
129 return -1;
130 }
131 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
132 auto firstColRange = colRangeIter.begin();
133 if (firstColRange == colRangeIter.end()) {
134 Error("RNTuple::Merge",
135 "Asked to use the first source's compression as the output compression, but the "
136 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
137 "determined.",
138 inFile->GetName());
139 return -1;
140 }
141 compression = (*firstColRange).fCompressionSettings;
142 Info("RNTuple::Merge", "Using the first RNTuple's compression: %d", compression);
143 }
144 sources.push_back(std::move(source));
145 }
146
147 RNTupleWriteOptions writeOpts;
148 assert(compression != kUnknownCompressionSettings);
149 writeOpts.SetCompression(compression);
150 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
151
152 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
153 if (outNTuple) {
154 auto outSource = RPageSourceFile::CreateFromAnchor(*outNTuple);
155 outSource->Attach();
156 auto desc = outSource->GetSharedDescriptorGuard();
157 destination->InitFromDescriptor(desc.GetRef());
158 }
159
160 // Interface conversion
161 sourcePtrs.reserve(sources.size());
162 for (const auto &s : sources) {
163 sourcePtrs.push_back(s.get());
164 }
165
166 // Now merge
167 RNTupleMerger merger;
168 RNTupleMergeOptions mergerOpts;
169 mergerOpts.fCompressionSettings = compression;
170 merger.Merge(sourcePtrs, *destination, mergerOpts).ThrowOnError();
171
172 // Provide the caller with a merged anchor object (even though we've already
173 // written it).
174 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
175
176 return 0;
177} catch (const RException &ex) {
178 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
179 return -1;
180}
181
182namespace {
183// Functor used to change the compression of a page to `options.fCompressionSettings`.
184struct RChangeCompressionFunc {
185 DescriptorId_t fOutputColumnId;
186
187 const RColumnElementBase &fSrcColElement;
188 const RColumnElementBase &fDstColElement;
189 const RNTupleMergeOptions &fMergeOptions;
190
191 RPageStorage::RSealedPage &fSealedPage;
192 RPageAllocator &fPageAlloc;
193 std::uint8_t *fBuffer;
194
195 void operator()() const
196 {
197 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fOutputColumnId, fPageAlloc).Unwrap();
199 sealConf.fElement = &fDstColElement;
200 sealConf.fPage = &page;
201 sealConf.fBuffer = fBuffer;
202 sealConf.fCompressionSetting = fMergeOptions.fCompressionSettings;
203 sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
204 auto refSealedPage = RPageSink::SealPage(sealConf);
205 fSealedPage = refSealedPage;
206 }
207};
208
209struct RCommonField {
210 const RFieldDescriptor *fSrc;
211 const RFieldDescriptor *fDst;
212
213 RCommonField(const RFieldDescriptor *src, const RFieldDescriptor *dst) : fSrc(src), fDst(dst) {}
214};
215
216struct RDescriptorsComparison {
217 std::vector<const RFieldDescriptor *> fExtraDstFields;
218 std::vector<const RFieldDescriptor *> fExtraSrcFields;
219 std::vector<RCommonField> fCommonFields;
220};
221
222struct RColumnOutInfo {
223 DescriptorId_t fColumnId;
224 EColumnType fColumnType;
225};
226
227// { fully.qualified.fieldName.colInputId => colOutputInfo }
228using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
229
230struct RColumnInfoGroup {
231 std::vector<RColumnMergeInfo> fExtraDstColumns;
232 std::vector<RColumnMergeInfo> fCommonColumns;
233};
234
235} // namespace
236
237// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
240 // This column name is built as a dot-separated concatenation of the ancestry of
241 // the columns' parent fields' names plus the index of the column itself.
242 // e.g. "Muon.pt.x._0"
243 std::string fColumnName;
247 // If nullopt, use the default in-memory type
248 std::optional<std::type_index> fInMemoryType;
250};
251
252// Data related to a single call of RNTupleMerger::Merge()
254 std::span<RPageSource *> fSources;
259
260 std::vector<RColumnMergeInfo> fColumns;
261 ColumnIdMap_t fColumnIdMap;
262
264
265 RNTupleMergeData(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
266 : fSources{sources}, fDestination{destination}, fMergeOpts{mergeOpts}, fDstDescriptor{destination.GetDescriptor()}
267 {
268 }
269};
270
272 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
273 // never invalidated.
274 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
275 std::vector<RPageStorage::RSealedPageGroup> fGroups;
276 std::vector<std::unique_ptr<std::uint8_t[]>> fBuffers;
277};
278
279std::ostream &operator<<(std::ostream &os, const std::optional<RColumnDescriptor::RValueRange> &x)
280{
281 if (x) {
282 os << '(' << x->fMin << ", " << x->fMax << ')';
283 } else {
284 os << "(null)";
285 }
286 return os;
287}
288
289} // namespace ROOT::Experimental::Internal
290
292{
293 // clang-format off
294 if (a == EColumnType::kInt16 && b == EColumnType::kSplitInt16) return true;
295 if (a == EColumnType::kSplitInt16 && b == EColumnType::kInt16) return true;
296 if (a == EColumnType::kInt32 && b == EColumnType::kSplitInt32) return true;
297 if (a == EColumnType::kSplitInt32 && b == EColumnType::kInt32) return true;
298 if (a == EColumnType::kInt64 && b == EColumnType::kSplitInt64) return true;
299 if (a == EColumnType::kSplitInt64 && b == EColumnType::kInt64) return true;
300 if (a == EColumnType::kUInt16 && b == EColumnType::kSplitUInt16) return true;
301 if (a == EColumnType::kSplitUInt16 && b == EColumnType::kUInt16) return true;
302 if (a == EColumnType::kUInt32 && b == EColumnType::kSplitUInt32) return true;
303 if (a == EColumnType::kSplitUInt32 && b == EColumnType::kUInt32) return true;
304 if (a == EColumnType::kUInt64 && b == EColumnType::kSplitUInt64) return true;
305 if (a == EColumnType::kSplitUInt64 && b == EColumnType::kUInt64) return true;
306 if (a == EColumnType::kIndex32 && b == EColumnType::kSplitIndex32) return true;
307 if (a == EColumnType::kSplitIndex32 && b == EColumnType::kIndex32) return true;
308 if (a == EColumnType::kIndex64 && b == EColumnType::kSplitIndex64) return true;
309 if (a == EColumnType::kSplitIndex64 && b == EColumnType::kIndex64) return true;
310 if (a == EColumnType::kReal32 && b == EColumnType::kSplitReal32) return true;
311 if (a == EColumnType::kSplitReal32 && b == EColumnType::kReal32) return true;
312 if (a == EColumnType::kReal64 && b == EColumnType::kSplitReal64) return true;
313 if (a == EColumnType::kSplitReal64 && b == EColumnType::kReal64) return true;
314 // clang-format on
315 return false;
316}
317
318/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
319/// In addition, returns the differences between `dst` and `src`'s structures
322{
323 // Cases:
324 // 1. dst == src
325 // 2. dst has fields that src hasn't
326 // 3. src has fields that dst hasn't
327 // 4. dst and src have fields that differ (compatible or incompatible)
328
329 std::vector<std::string> errors;
330 RDescriptorsComparison res;
331
332 std::vector<RCommonField> commonFields;
333
334 for (const auto &dstField : dst.GetTopLevelFields()) {
335 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
336 if (srcFieldId != kInvalidDescriptorId) {
337 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
338 commonFields.push_back({&srcField, &dstField});
339 } else {
340 res.fExtraDstFields.emplace_back(&dstField);
341 }
342 }
343 for (const auto &srcField : src.GetTopLevelFields()) {
344 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
345 if (dstFieldId == kInvalidDescriptorId)
346 res.fExtraSrcFields.push_back(&srcField);
347 }
348
349 // Check compatibility of common fields
350 for (const auto &field : commonFields) {
351 // NOTE: field.fSrc and field.fDst have the same name by construction
352 const auto &fieldName = field.fSrc->GetFieldName();
353
354 // Require that fields are both projected or both not projected
355 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
356 if (!projCompatible) {
357 std::stringstream ss;
358 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
359 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
360 errors.push_back(ss.str());
361 } else if (field.fSrc->IsProjectedField()) {
362 // if both fields are projected, verify that they point to the same real field
363 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
364 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
365 if (srcName != dstName) {
366 std::stringstream ss;
367 ss << "Field `" << fieldName
368 << "` is projected to a different field than a previously-seen field with the same name (old: "
369 << dstName << ", new: " << srcName << ")";
370 errors.push_back(ss.str());
371 }
372 }
373
374 // Require that fields types match
375 // TODO(gparolini): allow non-identical but compatible types
376 const auto &srcTyName = field.fSrc->GetTypeName();
377 const auto &dstTyName = field.fDst->GetTypeName();
378 if (srcTyName != dstTyName) {
379 std::stringstream ss;
380 ss << "Field `" << fieldName
381 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
382 << ", new: " << srcTyName << ")";
383 errors.push_back(ss.str());
384 }
385
386 // Require that type checksums match
387 const auto srcTyChk = field.fSrc->GetTypeChecksum();
388 const auto dstTyChk = field.fDst->GetTypeChecksum();
389 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
390 std::stringstream ss;
391 ss << "Field `" << field.fSrc->GetFieldName()
392 << "` has a different type checksum than previously-seen field with the same name";
393 errors.push_back(ss.str());
394 }
395
396 // Require that type versions match
397 const auto srcTyVer = field.fSrc->GetTypeVersion();
398 const auto dstTyVer = field.fDst->GetTypeVersion();
399 if (srcTyVer != dstTyVer) {
400 std::stringstream ss;
401 ss << "Field `" << field.fSrc->GetFieldName()
402 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
403 << ", new: " << srcTyVer << ")";
404 errors.push_back(ss.str());
405 }
406
407 // Require that column representations match
408 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
409 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
410 if (srcNCols != dstNCols) {
411 std::stringstream ss;
412 ss << "Field `" << field.fSrc->GetFieldName()
413 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
414 << ", new: " << srcNCols << ")";
415 errors.push_back(ss.str());
416 } else {
417 for (auto i = 0u; i < srcNCols; ++i) {
418 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
419 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
420 const auto &srcCol = src.GetColumnDescriptor(srcColId);
421 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
422 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
423 // version of the same type, because we know how to treat that specific case. We should also properly handle
424 // different but compatible types.
425 if (srcCol.GetType() != dstCol.GetType() &&
426 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
427 std::stringstream ss;
428 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
429 << "` has a different column type of the same column on the previously-seen field with the same name "
430 "(old: "
431 << RColumnElementBase::GetColumnTypeName(srcCol.GetType())
432 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
433 errors.push_back(ss.str());
434 }
435 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
436 std::stringstream ss;
437 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
438 << "` has a different number of bits of the same column on the previously-seen field with the same "
439 "name "
440 "(old: "
441 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
442 errors.push_back(ss.str());
443 }
444 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
445 std::stringstream ss;
446 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
447 << "` has a different value range of the same column on the previously-seen field with the same name "
448 "(old: "
449 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
450 errors.push_back(ss.str());
451 }
452 if (srcCol.GetRepresentationIndex() > 0) {
453 std::stringstream ss;
454 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
455 << "` has a representation index higher than 0. This is not supported yet by the merger.";
456 errors.push_back(ss.str());
457 }
458 }
459 }
460 }
461
462 std::string errMsg;
463 for (const auto &err : errors)
464 errMsg += std::string("\n * ") + err;
465
466 if (!errMsg.empty())
467 errMsg = errMsg.substr(1); // strip initial newline
468
469 if (errMsg.length())
470 return R__FAIL(errMsg);
471
472 res.fCommonFields.reserve(commonFields.size());
473 for (const auto &[srcField, dstField] : commonFields) {
474 res.fCommonFields.emplace_back(srcField, dstField);
475 }
476
477 // TODO(gparolini): we should exhaustively check the field tree rather than just the top level fields,
478 // in case the user forgets to change the version number on one field.
479
480 return RResult(res);
481}
482
483// Applies late model extension to `destination`, adding all `newFields` to it.
484static void ExtendDestinationModel(std::span<const RFieldDescriptor *> newFields, RNTupleModel &dstModel,
485 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
486{
487 assert(newFields.size() > 0); // no point in calling this with 0 new cols
488
489 dstModel.Unfreeze();
490 RNTupleModelChangeset changeset{dstModel};
491
492 std::string msg = "destination doesn't contain field";
493 if (newFields.size() > 1)
494 msg += 's';
495 msg += ' ';
496 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
497 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
498 });
499 Info("RNTuple::Merge", "%s: adding %s to the destination model (entry #%" PRIu64 ").", msg.c_str(),
500 (newFields.size() > 1 ? "them" : "it"), mergeData.fNumDstEntries);
501
502 changeset.fAddedFields.reserve(newFields.size());
503 for (const auto *fieldDesc : newFields) {
504 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
505 if (fieldDesc->IsProjectedField())
506 changeset.fAddedProjectedFields.emplace_back(field.get());
507 else
508 changeset.fAddedFields.emplace_back(field.get());
509 changeset.fModel.AddField(std::move(field));
510 }
511 dstModel.Freeze();
512 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
513
514 commonFields.reserve(commonFields.size() + newFields.size());
515 for (const auto *field : newFields) {
516 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
517 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
518 commonFields.emplace_back(field, &newFieldInDst);
519 }
520}
521
522// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
523// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
525 std::span<RColumnMergeInfo> commonColumns,
526 const RCluster::ColumnSet_t &commonColumnSet,
527 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
528{
529 assert(commonColumns.size() == commonColumnSet.size());
530 if (commonColumns.empty())
531 return;
532
533 const RCluster *cluster = clusterPool.GetCluster(clusterId, commonColumnSet);
534 // we expect the cluster pool to contain the requested set of columns, since they were
535 // validated by CompareDescriptorStructure().
536 assert(cluster);
537
538 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
539
540 for (const auto &column : commonColumns) {
541 const auto &columnId = column.fInputId;
542 R__ASSERT(clusterDesc.ContainsColumn(columnId));
543
544 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
545 const auto srcColElement = column.fInMemoryType
546 ? GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
547 : RColumnElementBase::Generate(columnDesc.GetType());
548 const auto dstColElement = column.fInMemoryType ? GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
549 : RColumnElementBase::Generate(column.fColumnType);
550
551 // Now get the pages for this column in this cluster
552 const auto &pages = clusterDesc.GetPageRange(columnId);
553
555 sealedPages.resize(pages.fPageInfos.size());
556
557 // Each column range potentially has a distinct compression settings
558 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
559 const bool needsCompressionChange = colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings;
560 if (needsCompressionChange && mergeData.fMergeOpts.fExtraVerbose)
561 Info("RNTuple::Merge", "Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
562 colRangeCompressionSettings, mergeData.fMergeOpts.fCompressionSettings);
563
564 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
565 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
566 // bother reserving memory for them.
567 if (needsCompressionChange)
568 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.fPageInfos.size());
569
570 // Loop over the pages
571 std::uint64_t pageIdx = 0;
572 for (const auto &pageInfo : pages.fPageInfos) {
573 assert(pageIdx < sealedPages.size());
574 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
575
576 ROnDiskPage::Key key{columnId, pageIdx};
577 auto onDiskPage = cluster->GetOnDiskPage(key);
578
579 const auto checksumSize = pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum;
580 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
581 sealedPage.SetNElements(pageInfo.fNElements);
582 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
583 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
584 sealedPage.SetBuffer(onDiskPage->GetAddress());
585 // TODO(gparolini): more graceful error handling (skip the page?)
587 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
588
589 if (needsCompressionChange) {
590 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
591 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
592 buffer = std::make_unique<std::uint8_t[]>(uncompressedSize + checksumSize);
593 RChangeCompressionFunc compressTask{
594 column.fOutputId, *srcColElement, *dstColElement, mergeData.fMergeOpts,
595 sealedPage, *fPageAlloc, buffer.get(),
596 };
597
598 if (fTaskGroup)
599 fTaskGroup->Run(compressTask);
600 else
601 compressTask();
602 }
603
604 ++pageIdx;
605
606 } // end of loop over pages
607
608 if (fTaskGroup)
609 fTaskGroup->Wait();
610
611 sealedPageData.fPagesV.push_back(std::move(sealedPages));
612 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
613 sealedPageData.fPagesV.back().cend());
614 } // end loop over common columns
615}
616
617// Generates default values for columns that are not present in the current source RNTuple
618// but are present in the destination's schema.
619static void GenerateExtraDstColumns(size_t nClusterEntries, std::span<RColumnMergeInfo> extraDstColumns,
620 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
621{
622 for (const auto &column : extraDstColumns) {
623 const auto &columnId = column.fInputId;
624 const auto &columnDesc = mergeData.fDstDescriptor.GetColumnDescriptor(columnId);
625 const RFieldDescriptor *field = column.fParentField;
626
627 // Skip all auxiliary columns
628 if (field->GetLogicalColumnIds()[0] != columnId)
629 continue;
630
631 // Check if this column is a child of a Collection or a Variant. If so, it has no data
632 // and can be skipped.
633 bool skipColumn = false;
634 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
635 for (auto parentId = field->GetParentId(); parentId != kInvalidDescriptorId;) {
636 const RFieldDescriptor &parent = mergeData.fSrcDescriptor->GetFieldDescriptor(parentId);
637 if (parent.GetStructure() == ENTupleStructure::kCollection ||
638 parent.GetStructure() == ENTupleStructure::kVariant) {
639 skipColumn = true;
640 break;
641 }
642 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
643 parentId = parent.GetParentId();
644 }
645 if (skipColumn)
646 continue;
647
648 const auto structure = field->GetStructure();
649
650 if (structure == ENTupleStructure::kStreamer) {
651 Fatal(
652 "RNTuple::Merge",
653 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
654 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
655 field->GetFieldName().c_str());
656 continue;
657 }
658
659 // NOTE: we cannot have a Record here because it has no associated columns.
660 R__ASSERT(structure == ENTupleStructure::kCollection || structure == ENTupleStructure::kVariant ||
661 structure == ENTupleStructure::kLeaf);
662
663 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
664 const auto nElements = nClusterEntries * nRepetitions;
665 const auto bytesOnStorage = colElement->GetPackedSize(nElements);
666 constexpr auto kPageSizeLimit = 256 * 1024;
667 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
668 const size_t nPages = bytesOnStorage / kPageSizeLimit + !!(bytesOnStorage % kPageSizeLimit);
669 for (size_t i = 0; i < nPages; ++i) {
670 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : bytesOnStorage - kPageSizeLimit * (nPages - 1);
671 const auto checksumSize = RPageStorage::kNBytesPageChecksum;
672 const auto bufSize = pageSize + checksumSize;
673 auto &buffer = sealedPageData.fBuffers.emplace_back(new unsigned char[bufSize]);
674
675 RPageStorage::RSealedPage sealedPage{buffer.get(), bufSize, static_cast<std::uint32_t>(nElements), true};
676 memset(buffer.get(), 0, pageSize);
677 sealedPage.ChecksumIfEnabled();
678
679 sealedPageData.fPagesV.push_back({sealedPage});
680 }
681
682 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
683 sealedPageData.fPagesV.back().cend());
684 }
685}
686
687// Iterates over all clusters of `source` and merges their pages into `destination`.
688// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
689// the destination's schemas.
690// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
691// compression is unspecified or matches the original compression settings.
692void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<RColumnMergeInfo> commonColumns,
693 std::span<RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
694{
695 RClusterPool clusterPool{source};
696
697 // Convert columns to a ColumnSet for the ClusterPool query
698 RCluster::ColumnSet_t commonColumnSet;
699 commonColumnSet.reserve(commonColumns.size());
700 for (const auto &column : commonColumns)
701 commonColumnSet.emplace(column.fInputId);
702
703 RCluster::ColumnSet_t extraDstColumnSet;
704 extraDstColumnSet.reserve(extraDstColumns.size());
705 for (const auto &column : extraDstColumns)
706 extraDstColumnSet.emplace(column.fInputId);
707
708 // Loop over all clusters in this file.
709 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
710 // request the first cluster.
711 DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
712 while (clusterId != kInvalidDescriptorId) {
713 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
714 const auto nClusterEntries = clusterDesc.GetNEntries();
715 R__ASSERT(nClusterEntries > 0);
716
717 RSealedPageMergeData sealedPageData;
718
719 if (!commonColumnSet.empty()) {
720 MergeCommonColumns(clusterPool, clusterId, commonColumns, commonColumnSet, sealedPageData, mergeData);
721 }
722
723 if (!extraDstColumnSet.empty()) {
724 GenerateExtraDstColumns(nClusterEntries, extraDstColumns, sealedPageData, mergeData);
725 }
726
727 // Commit the pages and the clusters
728 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
729 mergeData.fDestination.CommitCluster(nClusterEntries);
730 mergeData.fNumDstEntries += nClusterEntries;
731
732 // Go to the next cluster
733 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
734 }
735
736 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
737 // the size of the running page list and commit a cluster group when it exceeds some threshold,
738 // which would prevent the page list from getting too large.
739 // However, as of today, we aren't really handling such huge files, and even relatively big ones
740 // such as the CMS dataset have a page list size of about only 2 MB.
741 // So currently we simply merge all cluster groups into one.
742}
743
744static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
745{
746 if (onDiskType == EColumnType::kIndex32 || onDiskType == EColumnType::kSplitIndex32 ||
747 onDiskType == EColumnType::kIndex64 || onDiskType == EColumnType::kSplitIndex64)
748 return typeid(ClusterSize_t);
749
750 if (onDiskType == EColumnType::kSwitch)
752
753 if (fieldType == "bool") {
754 return typeid(bool);
755 } else if (fieldType == "std::byte") {
756 return typeid(std::byte);
757 } else if (fieldType == "char") {
758 return typeid(char);
759 } else if (fieldType == "std::int8_t") {
760 return typeid(std::int8_t);
761 } else if (fieldType == "std::uint8_t") {
762 return typeid(std::uint8_t);
763 } else if (fieldType == "std::int16_t") {
764 return typeid(std::int16_t);
765 } else if (fieldType == "std::uint16_t") {
766 return typeid(std::uint16_t);
767 } else if (fieldType == "std::int32_t") {
768 return typeid(std::int32_t);
769 } else if (fieldType == "std::uint32_t") {
770 return typeid(std::uint32_t);
771 } else if (fieldType == "std::int64_t") {
772 return typeid(std::int64_t);
773 } else if (fieldType == "std::uint64_t") {
774 return typeid(std::uint64_t);
775 } else if (fieldType == "float") {
776 return typeid(float);
777 } else if (fieldType == "double") {
778 return typeid(double);
779 }
780
781 // if the type is not one of those above, we use the default in-memory type.
782 return std::nullopt;
783}
784
785// Given a field, fill `columns` and `colIdMap` with information about all columns belonging to it and its subfields.
786// `colIdMap` is used to map matching columns from different sources to the same output column in the destination.
787// We match columns by their "fully qualified name", which is the concatenation of their ancestor fields' names
788// and the column index.
789// By this point, since we called `CompareDescriptorStructure()` earlier, we should be guaranteed that two matching
790// columns will have at least compatible representations.
791// NOTE: srcFieldDesc and dstFieldDesc may alias.
792static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const RNTupleDescriptor &srcDesc,
793 RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc,
794 const RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
795{
796 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
797
798 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
799 columns.reserve(columns.size() + columnIds.size());
800 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
801 // different column representations.
802 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
803 // We don't want to try and merge alias columns
804 if (srcFieldDesc.IsProjectedField())
805 continue;
806
807 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
808 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
809 RColumnMergeInfo info{};
810 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
811 info.fInputId = srcColumn.GetPhysicalId();
812 // Since the parent field is only relevant for extra dst columns, the choice of src or dstFieldDesc as a parent
813 // is arbitrary (they're the same field).
814 info.fParentField = &dstFieldDesc;
815
816 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
817 info.fOutputId = it->second.fColumnId;
818 info.fColumnType = it->second.fColumnType;
819 } else {
820 info.fOutputId = mergeData.fColumnIdMap.size();
821 // NOTE(gparolini): map the type of src column to the type of dst column.
822 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
823 // on-disk representation of the same column.
824 // This is also important to do for first source when it is used to generate the destination sink,
825 // because even in that case their column representations may differ.
826 // e.g. if the destination has a different compression than the source, an integer column might be
827 // zigzag-encoded in the source but not in the destination.
828 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
829 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
830 info.fColumnType = dstColumn.GetType();
831 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
832 }
833
834 if (mergeData.fMergeOpts.fExtraVerbose) {
835 Info("RNTuple::Merge",
836 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
837 " -> log.id %" PRIu64 ", type %s",
838 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
839 RColumnElementBase::GetColumnTypeName(srcColumn.GetType()), info.fOutputId,
840 RColumnElementBase::GetColumnTypeName(info.fColumnType));
841 }
842
843 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
844 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
845 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
846 columns.emplace_back(info);
847 }
848
849 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
850 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
851 assert(srcChildrenIds.size() == dstChildrenIds.size());
852 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
853 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
854 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
855 AddColumnsFromField(columns, srcDesc, mergeData, srcChild, dstChild, name);
856 }
857}
858
859// Converts the fields comparison data to the corresponding column information.
860// While doing so, it collects such information in `colIdMap`, which is used by later calls to this function
861// to map already-seen column names to their chosen outputId, type and so on.
862static RColumnInfoGroup
863GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
864{
865 RColumnInfoGroup res;
866 for (const RFieldDescriptor *field : descCmp.fExtraDstFields) {
867 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
868 }
869 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
870 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
871 }
872 return res;
873}
874
876 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
877 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
878 : fPageAlloc(std::make_unique<RPageAllocatorHeap>())
879{
880#ifdef R__USE_IMT
883#endif
884}
885
887RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOptsIn)
888{
889 RNTupleMergeOptions mergeOpts = mergeOptsIn;
890 {
891 const auto dstCompSettings = destination.GetWriteOptions().GetCompression();
893 mergeOpts.fCompressionSettings = dstCompSettings;
894 } else if (mergeOpts.fCompressionSettings != dstCompSettings) {
895 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
896 "sink! (opts: ") +
897 std::to_string(mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
898 ") This is currently unsupported.");
899 }
900 }
901
902 RNTupleMergeData mergeData{sources, destination, mergeOpts};
903
904 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
905
906#define SKIP_OR_ABORT(errMsg) \
907 do { \
908 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
909 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
910 continue; \
911 } else { \
912 return R__FAIL(errMsg); \
913 } \
914 } while (0)
915
916 // Merge main loop
917 for (RPageSource *source : sources) {
918 source->Attach();
919 auto srcDescriptor = source->GetSharedDescriptorGuard();
920 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
921
922 // Create sink from the input model if not initialized
923 if (!destination.IsInitialized()) {
925 opts.fReconstructProjections = true;
926 model = srcDescriptor->CreateModel(opts);
927 destination.Init(*model);
928 }
929
930 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
931 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
932
933 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
934 if (!descCmpRes) {
936 std::string("Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
937 descCmpRes.GetError()->GetReport());
938 }
939 auto descCmp = descCmpRes.Unwrap();
940
941 // If the current source is missing some fields and we're not in Union mode, error
942 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
943 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
944 std::string msg = "Source RNTuple is missing the following fields:";
945 for (const auto *field : descCmp.fExtraDstFields) {
946 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
947 }
948 SKIP_OR_ABORT(msg);
949 }
950
951 // handle extra src fields
952 if (descCmp.fExtraSrcFields.size()) {
953 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
954 // late model extension for all fExtraSrcFields in Union mode
955 ExtendDestinationModel(descCmp.fExtraSrcFields, *model, mergeData, descCmp.fCommonFields);
956 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
957 // If the current source has extra fields and we're in Strict mode, error
958 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
959 for (const auto *field : descCmp.fExtraSrcFields) {
960 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
961 }
962 SKIP_OR_ABORT(msg);
963 }
964 }
965
966 // handle extra dst fields & common fields
967 auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
968 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
969 } // end loop over sources
970
971 // Commit the output
972 destination.CommitClusterGroup();
973 destination.CommitDataset();
974
975 return RResult<void>::Success();
976}
fBuffer
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnMergeInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool IsSplitOrUnsplitVersionOf(EColumnType a, EColumnType b)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc, const RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
#define b(i)
Definition RSha256.hxx:100
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:110
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(EColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeSourceClusters(RPageSource &source, std::span< RColumnMergeInfo > commonColumns, std::span< RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Holds the index and the tag of a kSwitch column.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Meta-data stored for every field of an ntuple.
const std::string & GetFieldName() const
const std::string & GetTypeName() const
const std::vector< DescriptorId_t > & GetLogicalColumnIds() const
const std::vector< DescriptorId_t > & GetLinkIds() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RColumnDescriptor & GetColumnDescriptor(DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetTopLevelFields() const
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
A class to manage the asynchronous execution of work items.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Int_t GetCompressionSettings() const
Definition TFile.h:397
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:103
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:213
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:632
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
RClusterSize ClusterSize_t
constexpr int kUnknownCompressionSettings
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
std::optional< std::type_index > fInMemoryType
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58