Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RPageStorage.hxx>
25#include <ROOT/RClusterPool.hxx>
27#include <ROOT/RNTupleZip.hxx>
29#include <TROOT.h>
30#include <TFileMergeInfo.h>
31#include <TError.h>
32#include <TFile.h>
33#include <TKey.h>
34
35#include <algorithm>
36#include <deque>
37#include <inttypes.h> // for PRIu64
38#include <unordered_map>
39#include <vector>
40
41using namespace ROOT::Experimental;
42using namespace ROOT::Experimental::Internal;
43
44// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
46// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
47try {
48 // Check the inputs
49 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
50 Error("RNTuple::Merge", "Invalid inputs.");
51 return -1;
52 }
53
54 // Parse the input parameters
55 TIter itr(inputs);
56
57 // First entry is the RNTuple name
58 std::string ntupleName = std::string(itr()->GetName());
59
60 // Second entry is the output file
61 TObject *secondArg = itr();
62 TFile *outFile = dynamic_cast<TFile *>(secondArg);
63 if (!outFile) {
64 Error("RNTuple::Merge", "Second input parameter should be a TFile, but it's a %s.", secondArg->ClassName());
65 return -1;
66 }
67
68 // Check if the output file already has a key with that name
69 TKey *outKey = outFile->FindKey(ntupleName.c_str());
70 ROOT::RNTuple *outNTuple = nullptr;
71 if (outKey) {
72 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
73 if (!outNTuple) {
74 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
75 return -1;
76 }
77 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
78 // pointer we just got.
79 }
80
81 const bool defaultComp = mergeInfo->fOptions.Contains("default_compression");
82 const bool firstSrcComp = mergeInfo->fOptions.Contains("first_source_compression");
83 if (defaultComp && firstSrcComp) {
84 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code...
85 Warning(
86 "RNTuple::Merge",
87 "Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
88 }
89 int compression = kNTupleUnknownCompression;
90 if (firstSrcComp) {
91 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
92 // (do nothing here, the compression will be fetched below)
93 } else if (!defaultComp) {
94 // compression was explicitly passed by the user: use it.
95 compression = outFile->GetCompressionSettings();
96 } else {
97 // user passed no compression-related options: use default
99 Info("RNTuple::Merge", "Using the default compression: %d", compression);
100 }
101
102 // The remaining entries are the input files
103 std::vector<std::unique_ptr<RPageSourceFile>> sources;
104 std::vector<RPageSource *> sourcePtrs;
105
106 while (const auto &pitr = itr()) {
107 TFile *inFile = dynamic_cast<TFile *>(pitr);
108 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
109 if (!anchor) {
110 Error("RNTuple::Merge", "Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
111 inFile->GetName());
112 return -1;
113 }
114
115 auto source = RPageSourceFile::CreateFromAnchor(*anchor);
116 if (compression == kNTupleUnknownCompression) {
117 // Get the compression of this RNTuple and use it as the output compression.
118 // We currently assume all column ranges have the same compression, so we just peek at the first one.
119 source->Attach();
120 auto descriptor = source->GetSharedDescriptorGuard();
121 auto clusterIter = descriptor->GetClusterIterable();
122 auto firstCluster = clusterIter.begin();
123 if (firstCluster == clusterIter.end()) {
124 Error("RNTuple::Merge",
125 "Asked to use the first source's compression as the output compression, but the "
126 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
127 "determined.",
128 inFile->GetName());
129 return -1;
130 }
131 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
132 auto firstColRange = colRangeIter.begin();
133 if (firstColRange == colRangeIter.end()) {
134 Error("RNTuple::Merge",
135 "Asked to use the first source's compression as the output compression, but the "
136 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
137 "determined.",
138 inFile->GetName());
139 return -1;
140 }
141 compression = (*firstColRange).fCompressionSettings;
142 Info("RNTuple::Merge", "Using the first RNTuple's compression: %d", compression);
143 }
144 sources.push_back(std::move(source));
145 }
146
147 RNTupleWriteOptions writeOpts;
148 assert(compression != kNTupleUnknownCompression);
149 writeOpts.SetCompression(compression);
150 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
151
152 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
153 if (outNTuple) {
154 auto outSource = RPageSourceFile::CreateFromAnchor(*outNTuple);
155 outSource->Attach();
156 auto desc = outSource->GetSharedDescriptorGuard();
157 destination->InitFromDescriptor(desc.GetRef());
158 }
159
160 // Interface conversion
161 sourcePtrs.reserve(sources.size());
162 for (const auto &s : sources) {
163 sourcePtrs.push_back(s.get());
164 }
165
166 // Now merge
167 RNTupleMerger merger;
168 RNTupleMergeOptions mergerOpts;
169 mergerOpts.fCompressionSettings = compression;
170 merger.Merge(sourcePtrs, *destination, mergerOpts).ThrowOnError();
171
172 // Provide the caller with a merged anchor object (even though we've already
173 // written it).
174 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
175
176 return 0;
177} catch (const RException &ex) {
178 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
179 return -1;
180}
181
182namespace {
183// Functor used to change the compression of a page to `options.fCompressionSettings`.
184struct RChangeCompressionFunc {
185 const RColumnElementBase &fSrcColElement;
186 const RColumnElementBase &fDstColElement;
187 const RNTupleMergeOptions &fMergeOptions;
188
189 RPageStorage::RSealedPage &fSealedPage;
190 RPageAllocator &fPageAlloc;
191 std::uint8_t *fBuffer;
192
193 void operator()() const
194 {
195 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
197 sealConf.fElement = &fDstColElement;
198 sealConf.fPage = &page;
199 sealConf.fBuffer = fBuffer;
200 sealConf.fCompressionSetting = fMergeOptions.fCompressionSettings;
201 sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
202 auto refSealedPage = RPageSink::SealPage(sealConf);
203 fSealedPage = refSealedPage;
204 }
205};
206
207struct RCommonField {
208 const RFieldDescriptor *fSrc;
209 const RFieldDescriptor *fDst;
210
211 RCommonField(const RFieldDescriptor *src, const RFieldDescriptor *dst) : fSrc(src), fDst(dst) {}
212};
213
214struct RDescriptorsComparison {
215 std::vector<const RFieldDescriptor *> fExtraDstFields;
216 std::vector<const RFieldDescriptor *> fExtraSrcFields;
217 std::vector<RCommonField> fCommonFields;
218};
219
220struct RColumnOutInfo {
221 DescriptorId_t fColumnId;
222 EColumnType fColumnType;
223};
224
225// { fully.qualified.fieldName.colInputId => colOutputInfo }
226using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
227
228struct RColumnInfoGroup {
229 std::vector<RColumnMergeInfo> fExtraDstColumns;
230 std::vector<RColumnMergeInfo> fCommonColumns;
231};
232
233} // namespace
234
235// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
238 // This column name is built as a dot-separated concatenation of the ancestry of
239 // the columns' parent fields' names plus the index of the column itself.
240 // e.g. "Muon.pt.x._0"
241 std::string fColumnName;
245 // If nullopt, use the default in-memory type
246 std::optional<std::type_index> fInMemoryType;
248};
249
250// Data related to a single call of RNTupleMerger::Merge()
252 std::span<RPageSource *> fSources;
257
258 std::vector<RColumnMergeInfo> fColumns;
259 ColumnIdMap_t fColumnIdMap;
260
262
263 RNTupleMergeData(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
264 : fSources{sources}, fDestination{destination}, fMergeOpts{mergeOpts}, fDstDescriptor{destination.GetDescriptor()}
265 {
266 }
267};
268
270 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
271 // never invalidated.
272 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
273 std::vector<RPageStorage::RSealedPageGroup> fGroups;
274 std::vector<std::unique_ptr<std::uint8_t[]>> fBuffers;
275};
276
277std::ostream &operator<<(std::ostream &os, const std::optional<RColumnDescriptor::RValueRange> &x)
278{
279 if (x) {
280 os << '(' << x->fMin << ", " << x->fMax << ')';
281 } else {
282 os << "(null)";
283 }
284 return os;
285}
286
287} // namespace ROOT::Experimental::Internal
288
290{
291 // clang-format off
292 if (a == EColumnType::kInt16 && b == EColumnType::kSplitInt16) return true;
293 if (a == EColumnType::kSplitInt16 && b == EColumnType::kInt16) return true;
294 if (a == EColumnType::kInt32 && b == EColumnType::kSplitInt32) return true;
295 if (a == EColumnType::kSplitInt32 && b == EColumnType::kInt32) return true;
296 if (a == EColumnType::kInt64 && b == EColumnType::kSplitInt64) return true;
297 if (a == EColumnType::kSplitInt64 && b == EColumnType::kInt64) return true;
298 if (a == EColumnType::kUInt16 && b == EColumnType::kSplitUInt16) return true;
299 if (a == EColumnType::kSplitUInt16 && b == EColumnType::kUInt16) return true;
300 if (a == EColumnType::kUInt32 && b == EColumnType::kSplitUInt32) return true;
301 if (a == EColumnType::kSplitUInt32 && b == EColumnType::kUInt32) return true;
302 if (a == EColumnType::kUInt64 && b == EColumnType::kSplitUInt64) return true;
303 if (a == EColumnType::kSplitUInt64 && b == EColumnType::kUInt64) return true;
304 if (a == EColumnType::kIndex32 && b == EColumnType::kSplitIndex32) return true;
305 if (a == EColumnType::kSplitIndex32 && b == EColumnType::kIndex32) return true;
306 if (a == EColumnType::kIndex64 && b == EColumnType::kSplitIndex64) return true;
307 if (a == EColumnType::kSplitIndex64 && b == EColumnType::kIndex64) return true;
308 if (a == EColumnType::kReal32 && b == EColumnType::kSplitReal32) return true;
309 if (a == EColumnType::kSplitReal32 && b == EColumnType::kReal32) return true;
310 if (a == EColumnType::kReal64 && b == EColumnType::kSplitReal64) return true;
311 if (a == EColumnType::kSplitReal64 && b == EColumnType::kReal64) return true;
312 // clang-format on
313 return false;
314}
315
316/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
317/// In addition, returns the differences between `dst` and `src`'s structures
320{
321 // Cases:
322 // 1. dst == src
323 // 2. dst has fields that src hasn't
324 // 3. src has fields that dst hasn't
325 // 4. dst and src have fields that differ (compatible or incompatible)
326
327 std::vector<std::string> errors;
328 RDescriptorsComparison res;
329
330 std::vector<RCommonField> commonFields;
331
332 for (const auto &dstField : dst.GetTopLevelFields()) {
333 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
334 if (srcFieldId != kInvalidDescriptorId) {
335 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
336 commonFields.push_back({&srcField, &dstField});
337 } else {
338 res.fExtraDstFields.emplace_back(&dstField);
339 }
340 }
341 for (const auto &srcField : src.GetTopLevelFields()) {
342 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
343 if (dstFieldId == kInvalidDescriptorId)
344 res.fExtraSrcFields.push_back(&srcField);
345 }
346
347 // Check compatibility of common fields
348 for (const auto &field : commonFields) {
349 // NOTE: field.fSrc and field.fDst have the same name by construction
350 const auto &fieldName = field.fSrc->GetFieldName();
351
352 // Require that fields are both projected or both not projected
353 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
354 if (!projCompatible) {
355 std::stringstream ss;
356 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
357 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
358 errors.push_back(ss.str());
359 } else if (field.fSrc->IsProjectedField()) {
360 // if both fields are projected, verify that they point to the same real field
361 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
362 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
363 if (srcName != dstName) {
364 std::stringstream ss;
365 ss << "Field `" << fieldName
366 << "` is projected to a different field than a previously-seen field with the same name (old: "
367 << dstName << ", new: " << srcName << ")";
368 errors.push_back(ss.str());
369 }
370 }
371
372 // Require that fields types match
373 // TODO(gparolini): allow non-identical but compatible types
374 const auto &srcTyName = field.fSrc->GetTypeName();
375 const auto &dstTyName = field.fDst->GetTypeName();
376 if (srcTyName != dstTyName) {
377 std::stringstream ss;
378 ss << "Field `" << fieldName
379 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
380 << ", new: " << srcTyName << ")";
381 errors.push_back(ss.str());
382 }
383
384 // Require that type checksums match
385 const auto srcTyChk = field.fSrc->GetTypeChecksum();
386 const auto dstTyChk = field.fDst->GetTypeChecksum();
387 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
388 std::stringstream ss;
389 ss << "Field `" << field.fSrc->GetFieldName()
390 << "` has a different type checksum than previously-seen field with the same name";
391 errors.push_back(ss.str());
392 }
393
394 // Require that type versions match
395 const auto srcTyVer = field.fSrc->GetTypeVersion();
396 const auto dstTyVer = field.fDst->GetTypeVersion();
397 if (srcTyVer != dstTyVer) {
398 std::stringstream ss;
399 ss << "Field `" << field.fSrc->GetFieldName()
400 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
401 << ", new: " << srcTyVer << ")";
402 errors.push_back(ss.str());
403 }
404
405 // Require that column representations match
406 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
407 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
408 if (srcNCols != dstNCols) {
409 std::stringstream ss;
410 ss << "Field `" << field.fSrc->GetFieldName()
411 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
412 << ", new: " << srcNCols << ")";
413 errors.push_back(ss.str());
414 } else {
415 for (auto i = 0u; i < srcNCols; ++i) {
416 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
417 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
418 const auto &srcCol = src.GetColumnDescriptor(srcColId);
419 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
420 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
421 // version of the same type, because we know how to treat that specific case. We should also properly handle
422 // different but compatible types.
423 if (srcCol.GetType() != dstCol.GetType() &&
424 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
425 std::stringstream ss;
426 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
427 << "` has a different column type of the same column on the previously-seen field with the same name "
428 "(old: "
429 << RColumnElementBase::GetColumnTypeName(srcCol.GetType())
430 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
431 errors.push_back(ss.str());
432 }
433 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
434 std::stringstream ss;
435 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
436 << "` has a different number of bits of the same column on the previously-seen field with the same "
437 "name "
438 "(old: "
439 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
440 errors.push_back(ss.str());
441 }
442 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
443 std::stringstream ss;
444 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
445 << "` has a different value range of the same column on the previously-seen field with the same name "
446 "(old: "
447 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
448 errors.push_back(ss.str());
449 }
450 if (srcCol.GetRepresentationIndex() > 0) {
451 std::stringstream ss;
452 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
453 << "` has a representation index higher than 0. This is not supported yet by the merger.";
454 errors.push_back(ss.str());
455 }
456 }
457 }
458 }
459
460 std::string errMsg;
461 for (const auto &err : errors)
462 errMsg += std::string("\n * ") + err;
463
464 if (!errMsg.empty())
465 errMsg = errMsg.substr(1); // strip initial newline
466
467 if (errMsg.length())
468 return R__FAIL(errMsg);
469
470 res.fCommonFields.reserve(commonFields.size());
471 for (const auto &[srcField, dstField] : commonFields) {
472 res.fCommonFields.emplace_back(srcField, dstField);
473 }
474
475 // TODO(gparolini): we should exhaustively check the field tree rather than just the top level fields,
476 // in case the user forgets to change the version number on one field.
477
478 return ROOT::RResult(res);
479}
480
481// Applies late model extension to `destination`, adding all `newFields` to it.
482static void ExtendDestinationModel(std::span<const RFieldDescriptor *> newFields, RNTupleModel &dstModel,
483 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
484{
485 assert(newFields.size() > 0); // no point in calling this with 0 new cols
486
487 dstModel.Unfreeze();
488 RNTupleModelChangeset changeset{dstModel};
489
490 std::string msg = "destination doesn't contain field";
491 if (newFields.size() > 1)
492 msg += 's';
493 msg += ' ';
494 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
495 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
496 });
497 Info("RNTuple::Merge", "%s: adding %s to the destination model (entry #%" PRIu64 ").", msg.c_str(),
498 (newFields.size() > 1 ? "them" : "it"), mergeData.fNumDstEntries);
499
500 changeset.fAddedFields.reserve(newFields.size());
501 for (const auto *fieldDesc : newFields) {
502 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
503 if (fieldDesc->IsProjectedField())
504 changeset.fAddedProjectedFields.emplace_back(field.get());
505 else
506 changeset.fAddedFields.emplace_back(field.get());
507 changeset.fModel.AddField(std::move(field));
508 }
509 dstModel.Freeze();
510 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
511
512 commonFields.reserve(commonFields.size() + newFields.size());
513 for (const auto *field : newFields) {
514 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
515 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
516 commonFields.emplace_back(field, &newFieldInDst);
517 }
518}
519
520// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
521// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
523 std::span<RColumnMergeInfo> commonColumns,
524 const RCluster::ColumnSet_t &commonColumnSet,
525 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
526{
527 assert(commonColumns.size() == commonColumnSet.size());
528 if (commonColumns.empty())
529 return;
530
531 const RCluster *cluster = clusterPool.GetCluster(clusterId, commonColumnSet);
532 // we expect the cluster pool to contain the requested set of columns, since they were
533 // validated by CompareDescriptorStructure().
534 assert(cluster);
535
536 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
537
538 for (const auto &column : commonColumns) {
539 const auto &columnId = column.fInputId;
540 R__ASSERT(clusterDesc.ContainsColumn(columnId));
541
542 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
543 const auto srcColElement = column.fInMemoryType
544 ? GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
545 : RColumnElementBase::Generate(columnDesc.GetType());
546 const auto dstColElement = column.fInMemoryType ? GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
547 : RColumnElementBase::Generate(column.fColumnType);
548
549 // Now get the pages for this column in this cluster
550 const auto &pages = clusterDesc.GetPageRange(columnId);
551
553 sealedPages.resize(pages.fPageInfos.size());
554
555 // Each column range potentially has a distinct compression settings
556 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
557 const bool needsCompressionChange = colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings;
558 if (needsCompressionChange && mergeData.fMergeOpts.fExtraVerbose)
559 Info("RNTuple::Merge", "Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
560 colRangeCompressionSettings, mergeData.fMergeOpts.fCompressionSettings);
561
562 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
563 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
564 // bother reserving memory for them.
565 if (needsCompressionChange)
566 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.fPageInfos.size());
567
568 // Loop over the pages
569 std::uint64_t pageIdx = 0;
570 for (const auto &pageInfo : pages.fPageInfos) {
571 assert(pageIdx < sealedPages.size());
572 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
573
574 ROnDiskPage::Key key{columnId, pageIdx};
575 auto onDiskPage = cluster->GetOnDiskPage(key);
576
577 const auto checksumSize = pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum;
578 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
579 sealedPage.SetNElements(pageInfo.fNElements);
580 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
581 sealedPage.SetBufferSize(pageInfo.fLocator.GetNBytesOnStorage() + checksumSize);
582 sealedPage.SetBuffer(onDiskPage->GetAddress());
583 // TODO(gparolini): more graceful error handling (skip the page?)
585 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
586
587 if (needsCompressionChange) {
588 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
589 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
590 buffer = MakeUninitArray<std::uint8_t>(uncompressedSize + checksumSize);
591 RChangeCompressionFunc compressTask{
592 *srcColElement, *dstColElement, mergeData.fMergeOpts, sealedPage, *fPageAlloc, buffer.get(),
593 };
594
595 if (fTaskGroup)
596 fTaskGroup->Run(compressTask);
597 else
598 compressTask();
599 }
600
601 ++pageIdx;
602
603 } // end of loop over pages
604
605 if (fTaskGroup)
606 fTaskGroup->Wait();
607
608 sealedPageData.fPagesV.push_back(std::move(sealedPages));
609 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
610 sealedPageData.fPagesV.back().cend());
611 } // end loop over common columns
612}
613
614// Generates default values for columns that are not present in the current source RNTuple
615// but are present in the destination's schema.
616static void GenerateExtraDstColumns(size_t nClusterEntries, std::span<RColumnMergeInfo> extraDstColumns,
617 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
618{
619 for (const auto &column : extraDstColumns) {
620 const auto &columnId = column.fInputId;
621 const auto &columnDesc = mergeData.fDstDescriptor.GetColumnDescriptor(columnId);
622 const RFieldDescriptor *field = column.fParentField;
623
624 // Skip all auxiliary columns
625 if (field->GetLogicalColumnIds()[0] != columnId)
626 continue;
627
628 // Check if this column is a child of a Collection or a Variant. If so, it has no data
629 // and can be skipped.
630 bool skipColumn = false;
631 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
632 for (auto parentId = field->GetParentId(); parentId != kInvalidDescriptorId;) {
633 const RFieldDescriptor &parent = mergeData.fSrcDescriptor->GetFieldDescriptor(parentId);
634 if (parent.GetStructure() == ENTupleStructure::kCollection ||
635 parent.GetStructure() == ENTupleStructure::kVariant) {
636 skipColumn = true;
637 break;
638 }
639 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
640 parentId = parent.GetParentId();
641 }
642 if (skipColumn)
643 continue;
644
645 const auto structure = field->GetStructure();
646
647 if (structure == ENTupleStructure::kStreamer) {
648 Fatal(
649 "RNTuple::Merge",
650 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
651 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
652 field->GetFieldName().c_str());
653 continue;
654 }
655
656 // NOTE: we cannot have a Record here because it has no associated columns.
657 R__ASSERT(structure == ENTupleStructure::kCollection || structure == ENTupleStructure::kVariant ||
658 structure == ENTupleStructure::kLeaf);
659
660 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
661 const auto nElements = nClusterEntries * nRepetitions;
662 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
663 constexpr auto kPageSizeLimit = 256 * 1024;
664 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
665 const size_t nPages = nBytesOnStorage / kPageSizeLimit + !!(nBytesOnStorage % kPageSizeLimit);
666 for (size_t i = 0; i < nPages; ++i) {
667 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
668 const auto checksumSize = RPageStorage::kNBytesPageChecksum;
669 const auto bufSize = pageSize + checksumSize;
670 auto &buffer = sealedPageData.fBuffers.emplace_back(new unsigned char[bufSize]);
671
672 RPageStorage::RSealedPage sealedPage{buffer.get(), bufSize, static_cast<std::uint32_t>(nElements), true};
673 memset(buffer.get(), 0, pageSize);
674 sealedPage.ChecksumIfEnabled();
675
676 sealedPageData.fPagesV.push_back({sealedPage});
677 }
678
679 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
680 sealedPageData.fPagesV.back().cend());
681 }
682}
683
684// Iterates over all clusters of `source` and merges their pages into `destination`.
685// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
686// the destination's schemas.
687// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
688// compression is unspecified or matches the original compression settings.
689void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<RColumnMergeInfo> commonColumns,
690 std::span<RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
691{
692 RClusterPool clusterPool{source};
693
694 // Convert columns to a ColumnSet for the ClusterPool query
695 RCluster::ColumnSet_t commonColumnSet;
696 commonColumnSet.reserve(commonColumns.size());
697 for (const auto &column : commonColumns)
698 commonColumnSet.emplace(column.fInputId);
699
700 RCluster::ColumnSet_t extraDstColumnSet;
701 extraDstColumnSet.reserve(extraDstColumns.size());
702 for (const auto &column : extraDstColumns)
703 extraDstColumnSet.emplace(column.fInputId);
704
705 // Loop over all clusters in this file.
706 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
707 // request the first cluster.
708 DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
709 while (clusterId != kInvalidDescriptorId) {
710 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
711 const auto nClusterEntries = clusterDesc.GetNEntries();
712 R__ASSERT(nClusterEntries > 0);
713
714 RSealedPageMergeData sealedPageData;
715
716 if (!commonColumnSet.empty()) {
717 MergeCommonColumns(clusterPool, clusterId, commonColumns, commonColumnSet, sealedPageData, mergeData);
718 }
719
720 if (!extraDstColumnSet.empty()) {
721 GenerateExtraDstColumns(nClusterEntries, extraDstColumns, sealedPageData, mergeData);
722 }
723
724 // Commit the pages and the clusters
725 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
726 mergeData.fDestination.CommitCluster(nClusterEntries);
727 mergeData.fNumDstEntries += nClusterEntries;
728
729 // Go to the next cluster
730 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
731 }
732
733 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
734 // the size of the running page list and commit a cluster group when it exceeds some threshold,
735 // which would prevent the page list from getting too large.
736 // However, as of today, we aren't really handling such huge files, and even relatively big ones
737 // such as the CMS dataset have a page list size of about only 2 MB.
738 // So currently we simply merge all cluster groups into one.
739}
740
741static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
742{
743 if (onDiskType == EColumnType::kIndex32 || onDiskType == EColumnType::kSplitIndex32 ||
744 onDiskType == EColumnType::kIndex64 || onDiskType == EColumnType::kSplitIndex64)
746
747 if (onDiskType == EColumnType::kSwitch)
749
750 if (fieldType == "bool") {
751 return typeid(bool);
752 } else if (fieldType == "std::byte") {
753 return typeid(std::byte);
754 } else if (fieldType == "char") {
755 return typeid(char);
756 } else if (fieldType == "std::int8_t") {
757 return typeid(std::int8_t);
758 } else if (fieldType == "std::uint8_t") {
759 return typeid(std::uint8_t);
760 } else if (fieldType == "std::int16_t") {
761 return typeid(std::int16_t);
762 } else if (fieldType == "std::uint16_t") {
763 return typeid(std::uint16_t);
764 } else if (fieldType == "std::int32_t") {
765 return typeid(std::int32_t);
766 } else if (fieldType == "std::uint32_t") {
767 return typeid(std::uint32_t);
768 } else if (fieldType == "std::int64_t") {
769 return typeid(std::int64_t);
770 } else if (fieldType == "std::uint64_t") {
771 return typeid(std::uint64_t);
772 } else if (fieldType == "float") {
773 return typeid(float);
774 } else if (fieldType == "double") {
775 return typeid(double);
776 }
777
778 // if the type is not one of those above, we use the default in-memory type.
779 return std::nullopt;
780}
781
782// Given a field, fill `columns` and `colIdMap` with information about all columns belonging to it and its subfields.
783// `colIdMap` is used to map matching columns from different sources to the same output column in the destination.
784// We match columns by their "fully qualified name", which is the concatenation of their ancestor fields' names
785// and the column index.
786// By this point, since we called `CompareDescriptorStructure()` earlier, we should be guaranteed that two matching
787// columns will have at least compatible representations.
788// NOTE: srcFieldDesc and dstFieldDesc may alias.
789static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const RNTupleDescriptor &srcDesc,
790 RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc,
791 const RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
792{
793 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
794
795 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
796 columns.reserve(columns.size() + columnIds.size());
797 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
798 // different column representations.
799 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
800 // We don't want to try and merge alias columns
801 if (srcFieldDesc.IsProjectedField())
802 continue;
803
804 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
805 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
806 RColumnMergeInfo info{};
807 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
808 info.fInputId = srcColumn.GetPhysicalId();
809 // Since the parent field is only relevant for extra dst columns, the choice of src or dstFieldDesc as a parent
810 // is arbitrary (they're the same field).
811 info.fParentField = &dstFieldDesc;
812
813 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
814 info.fOutputId = it->second.fColumnId;
815 info.fColumnType = it->second.fColumnType;
816 } else {
817 info.fOutputId = mergeData.fColumnIdMap.size();
818 // NOTE(gparolini): map the type of src column to the type of dst column.
819 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
820 // on-disk representation of the same column.
821 // This is also important to do for first source when it is used to generate the destination sink,
822 // because even in that case their column representations may differ.
823 // e.g. if the destination has a different compression than the source, an integer column might be
824 // zigzag-encoded in the source but not in the destination.
825 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
826 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
827 info.fColumnType = dstColumn.GetType();
828 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
829 }
830
831 if (mergeData.fMergeOpts.fExtraVerbose) {
832 Info("RNTuple::Merge",
833 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
834 " -> log.id %" PRIu64 ", type %s",
835 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
836 RColumnElementBase::GetColumnTypeName(srcColumn.GetType()), info.fOutputId,
837 RColumnElementBase::GetColumnTypeName(info.fColumnType));
838 }
839
840 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
841 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
842 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
843 columns.emplace_back(info);
844 }
845
846 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
847 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
848 assert(srcChildrenIds.size() == dstChildrenIds.size());
849 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
850 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
851 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
852 AddColumnsFromField(columns, srcDesc, mergeData, srcChild, dstChild, name);
853 }
854}
855
856// Converts the fields comparison data to the corresponding column information.
857// While doing so, it collects such information in `colIdMap`, which is used by later calls to this function
858// to map already-seen column names to their chosen outputId, type and so on.
859static RColumnInfoGroup
860GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
861{
862 RColumnInfoGroup res;
863 for (const RFieldDescriptor *field : descCmp.fExtraDstFields) {
864 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
865 }
866 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
867 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
868 }
869 return res;
870}
871
873 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
874 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
875 : fPageAlloc(std::make_unique<RPageAllocatorHeap>())
876{
877#ifdef R__USE_IMT
880#endif
881}
882
884RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOptsIn)
885{
886 RNTupleMergeOptions mergeOpts = mergeOptsIn;
887 {
888 const auto dstCompSettings = destination.GetWriteOptions().GetCompression();
890 mergeOpts.fCompressionSettings = dstCompSettings;
891 } else if (mergeOpts.fCompressionSettings != dstCompSettings) {
892 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
893 "sink! (opts: ") +
894 std::to_string(mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
895 ") This is currently unsupported.");
896 }
897 }
898
899 RNTupleMergeData mergeData{sources, destination, mergeOpts};
900
901 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
902
903#define SKIP_OR_ABORT(errMsg) \
904 do { \
905 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
906 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
907 continue; \
908 } else { \
909 return R__FAIL(errMsg); \
910 } \
911 } while (0)
912
913 // Merge main loop
914 for (RPageSource *source : sources) {
915 source->Attach();
916 auto srcDescriptor = source->GetSharedDescriptorGuard();
917 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
918
919 // Create sink from the input model if not initialized
920 if (!destination.IsInitialized()) {
922 opts.fReconstructProjections = true;
923 model = srcDescriptor->CreateModel(opts);
924 destination.Init(*model);
925 }
926
927 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
928 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
929
930 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
931 if (!descCmpRes) {
933 std::string("Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
934 descCmpRes.GetError()->GetReport());
935 }
936 auto descCmp = descCmpRes.Unwrap();
937
938 // If the current source is missing some fields and we're not in Union mode, error
939 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
940 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
941 std::string msg = "Source RNTuple is missing the following fields:";
942 for (const auto *field : descCmp.fExtraDstFields) {
943 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
944 }
945 SKIP_OR_ABORT(msg);
946 }
947
948 // handle extra src fields
949 if (descCmp.fExtraSrcFields.size()) {
950 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
951 // late model extension for all fExtraSrcFields in Union mode
952 ExtendDestinationModel(descCmp.fExtraSrcFields, *model, mergeData, descCmp.fCommonFields);
953 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
954 // If the current source has extra fields and we're in Strict mode, error
955 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
956 for (const auto *field : descCmp.fExtraSrcFields) {
957 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
958 }
959 SKIP_OR_ABORT(msg);
960 }
961 }
962
963 // handle extra dst fields & common fields
964 auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
965 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
966 } // end loop over sources
967
968 // Commit the output
969 destination.CommitClusterGroup();
970 destination.CommitDataset();
971
972 return RResult<void>::Success();
973}
fBuffer
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnMergeInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool IsSplitOrUnsplitVersionOf(EColumnType a, EColumnType b)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc, const RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
#define b(i)
Definition RSha256.hxx:100
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:110
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(EColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeSourceClusters(RPageSource &source, std::span< RColumnMergeInfo > commonColumns, std::span< RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Meta-data stored for every field of an ntuple.
const std::string & GetFieldName() const
const std::string & GetTypeName() const
const std::vector< DescriptorId_t > & GetLogicalColumnIds() const
const std::vector< DescriptorId_t > & GetLinkIds() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RColumnDescriptor & GetColumnDescriptor(DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetTopLevelFields() const
DescriptorId_t FindClusterId(NTupleSize_t entryIdx) const
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
A class to manage the asynchronous execution of work items.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Int_t GetCompressionSettings() const
Definition TFile.h:397
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:103
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:225
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:632
std::ostream & Info()
Definition hadd.cxx:163
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
constexpr int kNTupleUnknownCompression
Regular, known compression settings have the form algorithm * 100 + level, e.g. 101,...
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
std::optional< std::type_index > fInMemoryType
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
int fCompressionSettings
If fCompressionSettings == kNTupleUnknownCompression (the default), the merger will not change the co...
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58