Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterLoader.hxx
Go to the documentation of this file.
1// Author: Dante Niewenhuis, VU Amsterdam 07/2023
2// Author: Kristupas Pranckietis, Vilnius University 05/2024
3// Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024
4// Author: Vincenzo Eduardo Padulano, CERN 10/2024
5// Author: Silia Taider, CERN 03/2026
6
7/*************************************************************************
8 * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#ifndef ROOT_INTERNAL_ML_RCLUSTERLOADER
16#define ROOT_INTERNAL_ML_RCLUSTERLOADER
17
18#include <algorithm>
19#include <numeric>
20#include <random>
21#include <string>
22#include <utility>
23#include <vector>
24
27#include "ROOT/RDataFrame.hxx"
28#include "ROOT/RDFHelpers.hxx"
29#include "ROOT/RDF/Utils.hxx"
30
32
33/**
34 * \struct RClusterRange
35 * \brief Describes a contiguous range of entries within a single RDataFrame,
36 * corresponding to one TTree/RNTuple cluster boundary.
37 *
38 * For filtered RDataFrames the \p numEntries field may be smaller than `end - start`
39 * because it tracks the number of entries that actually pass the filter,
40 * discovered and set lazily during the first epoch.
41 */
43 std::size_t rdfIdx; // which rdf this cluster belongs to
44 std::uint64_t start; // first raw entry (incl)
45 std::uint64_t end; // one-past-last entry (excl)
46 std::size_t numEntries{
47 static_cast<std::size_t>(end - start)}; // number of entries in the cluster (that pass filters, if any)
48
49 std::size_t GetNumEntries() const { return numEntries; }
50 void SetNumEntries(std::size_t num) { numEntries = num; }
51};
52
53/**
54 * \class ROOT::Experimental::Internal::ML::RClusterLoaderFunctor
55 * \brief Functor invoked by RDataFrame::Foreach to fill one row of an RFlat2DMatrix.
56 *
57 */
58
59template <typename... ColTypes>
61 std::size_t fOffset{};
62 std::size_t fVecSizeIdx{};
63 float fVecPadding{};
64 std::vector<std::size_t> fMaxVecSizes{};
66
67 std::size_t fNumChunkCols;
68
69 int fI;
71
72 //////////////////////////////////////////////////////////////////////////
73 /// \brief \brief Copy the content of a column into the current tensor when the column consists of vectors
74 template <typename T, std::enable_if_t<ROOT::Internal::RDF::IsDataContainer<T>::value, int> = 0>
75 void AssignToTensor(const T &vec, int i, int numColumns)
76 {
77 std::size_t max_vec_size = fMaxVecSizes[fVecSizeIdx++];
78 std::size_t vec_size = vec.size();
79
80 float *dst = fChunkTensor.GetData() + fOffset + numColumns * i;
81 if (vec_size < max_vec_size) // Padding vector column to max_vec_size with fVecPadding
82 {
83 std::copy(vec.begin(), vec.end(), dst);
84 std::fill(dst + vec_size, dst + max_vec_size, fVecPadding);
85 } else // Copy only max_vec_size length from vector column
86 {
87 std::copy(vec.begin(), vec.begin() + max_vec_size, dst);
88 }
89 fOffset += max_vec_size;
90 }
91
92 //////////////////////////////////////////////////////////////////////////
93 /// \brief Copy the content of a column into the current tensor when the column consists of scalar values
94 template <typename T, std::enable_if_t<!ROOT::Internal::RDF::IsDataContainer<T>::value, int> = 0>
95 void AssignToTensor(const T &val, int i, int numColumns)
96 {
97 fChunkTensor.GetData()[fOffset + numColumns * i] = val;
98 fOffset++;
99 }
100
101public:
102 RClusterLoaderFunctor(RFlat2DMatrix &chunkTensor, std::size_t numColumns,
103 const std::vector<std::size_t> &maxVecSizes, float vecPadding, int i,
104 std::size_t rowOffset = 0)
105 : fChunkTensor(chunkTensor),
106 fMaxVecSizes(maxVecSizes),
107 fVecPadding(vecPadding),
108 fI(i),
109 fNumColumns(numColumns),
110 fOffset(rowOffset * numColumns)
111 {
112 }
113
114 void operator()(const ColTypes &...cols)
115 {
116 fVecSizeIdx = 0;
117 (AssignToTensor(cols, fI, fNumColumns), ...);
118 }
119};
120
121/**
122 * \class ROOT::Experimental::Internal::ML::RClusterLoader
123 * \brief Loads TTree/RNTuple clusters from one or more RDataFrames into RFlat2DMatrix
124 * buffers for ML training and validation.
125 *
126 * ### Overview
127 * At construction the loader scans the cluster boundaries of every
128 * provided RDataFrame and stores them as a flat list of \ref RClusterRange objects.
129 * SplitDataset() then partitions those ranges into training and validation sets according to \p validationSplit.
130 *
131 * ### The split strategy depends on whether shuffling is enabled or not
132 * - **Unshuffled**: one cut is made so that the first `(1 - validationSplit)`
133 * fraction of entries goes to training. At most one cluster is split at the boundary.
134 * - **Shuffled**: each cluster is split proportionally (according to `validationSplit`)
135 * so both sets draw entries from every part of the dataset. ShuffleTrainingClusters()
136 * and ShuffleValidationClusters() re-order the cluster lists at the start of each epoch.
137 * A second shuffling step, at the entries level, happens inside LoadTrainingClusterInto()
138 * and LoadValidationClusterInto() when loading the data into the tensors.
139 *
140 * ### Filtered RDataFrames
141 * When any RDataFrame carries a filter, the true entry count is not known
142 * until the computation graph is executed. In this case SplitDataset() is a
143 * no-op and the split is discovered lazily inside LoadTrainingClusterInto()
144 * during the first epoch.
145 * After the first epoch FinaliseSplitDiscovery() marks the split as stable and
146 * all subsequent epochs use the same pre-computed ranges.
147 */
148template <typename... Args>
150private:
151 std::vector<ROOT::RDF::RNode> &fRdfs;
152 std::vector<std::size_t> fRdfSizes;
153 std::vector<std::string> fCols;
154 std::vector<std::size_t> fVecSizes;
158 std::size_t fSetSeed;
159
160 std::size_t fNumCols;
161 std::size_t fSumVecSizes;
162 std::size_t fNumChunkCols;
163
164 std::vector<RClusterRange> fAllClusters;
165 std::vector<RClusterRange> fTrainingClusters;
166 std::vector<RClusterRange> fValidationClusters;
167
168 std::size_t fTotalEntries{0};
169 std::size_t fNumTrainingEntries{0};
170 std::size_t fNumValidationEntries{0};
171
172 bool fIsFiltered{false};
173 bool fSplitDiscovered{false};
175
176public:
177 RClusterLoader(std::vector<ROOT::RDF::RNode> &rdfs, const std::vector<std::string> &cols,
178 const std::vector<std::size_t> &vecSizes, float vecPadding, float validationSplit, bool shuffle,
179 std::size_t setSeed)
180 : fRdfs(rdfs),
181 fCols(cols),
182 fVecSizes(vecSizes),
183 fVecPadding(vecPadding),
184 fValidationSplit(validationSplit),
185 fShuffle(shuffle),
186 fSetSeed(setSeed)
187 {
188 fNumCols = fCols.size();
189 fSumVecSizes = std::accumulate(fVecSizes.begin(), fVecSizes.end(), 0UL);
191
192 for (auto &rdf : fRdfs) {
193 // TODO(staider) We need a better API in RDF to detect generically whether there's a filter or not
194 if (!rdf.GetFilterNames().empty()) {
195 fIsFiltered = true;
196 break;
197 }
198 }
199
200 fRdfSizes.resize(fRdfs.size(), 0);
201
202 // scan cluster boundaries across files
203 // TODO(staider) Add progress bar to inform the user about this potentially long operation
204 for (std::size_t rdfIdx = 0; rdfIdx < fRdfs.size(); ++rdfIdx) {
206 fAllClusters.push_back({rdfIdx, r.first, r.second});
207 auto numEntries = r.second - r.first;
208 fRdfSizes[rdfIdx] += numEntries;
209 fTotalEntries += numEntries;
210 }
211 }
212 }
213
214 //////////////////////////////////////////////////////////////////////////
215 /// \brief Distribute the clusters into training and validation datasets
216 /// No-op for filtered RDataFrames, the split is discovered lazily during the first epoch.
218 {
219 if (fAllClusters.empty())
220 throw std::runtime_error("RClusterLoader::SplitDataset: no clusters found.");
221
222 if (fIsFiltered) {
223 return;
224 }
225
226 if (fShuffle) {
227 // --- Shuffled path
228 // Every cluster contributes a prefix to training and a suffix to validation.
229 // Cost: Each cluster is read twice per epoch, only when validation split is more than 0.
230 // We generate a random boolean value to decide whether the training set gets the prefix
231 // or suffix of each cluster to ensure better shuffling across runs when splitting.
232 std::mt19937 g(fSetSeed);
233 std::uniform_int_distribution<int> coin(0, 1);
234
235 for (const RClusterRange &c : fAllClusters) {
236 const std::size_t sz = c.GetNumEntries();
237 const std::size_t trainSz = static_cast<std::size_t>((1.0f - fValidationSplit) * sz);
238 const std::size_t valSz = sz - trainSz;
239
240 // Randomly assign prefix or suffix to training
241 bool trainIsPrefix = coin(g);
242 const uint64_t trainStart = trainIsPrefix ? c.start : c.start + static_cast<std::uint64_t>(valSz);
243 const uint64_t valStart = trainIsPrefix ? c.start + static_cast<std::uint64_t>(trainSz) : c.start;
244
245 if (trainSz > 0) {
246 fTrainingClusters.push_back({c.rdfIdx, trainStart, trainStart + static_cast<std::uint64_t>(trainSz)});
247 fNumTrainingEntries += trainSz;
248 }
249 if (valSz > 0) {
250 fValidationClusters.push_back({c.rdfIdx, valStart, valStart + static_cast<std::uint64_t>(valSz)});
251 fNumValidationEntries += valSz;
252 }
253 }
254 } else {
255 // --- Unshuffled path
256 // Contiguous split: first (1 - validationSplit) fraction of entries go to
257 // training, the remainder to validation. At most one cluster is split at
258 // the boundary.
259 const std::size_t targetTraining = fTotalEntries - static_cast<std::size_t>(fValidationSplit * fTotalEntries);
260
261 std::size_t accumulated = 0;
262 std::size_t splitIdx = 0;
263 for (; splitIdx < fAllClusters.size(); ++splitIdx) {
264 const std::size_t sz = fAllClusters[splitIdx].GetNumEntries();
265 if (accumulated + sz > targetTraining) {
266 break;
267 }
268 accumulated += sz;
269 }
270
271 // Assign whole train/val clusters
272 fTrainingClusters.assign(fAllClusters.begin(), fAllClusters.begin() + splitIdx);
273 fNumTrainingEntries = accumulated;
274
275 if (splitIdx < fAllClusters.size() && accumulated < targetTraining) {
276 // Split the boundary cluster
277 const RClusterRange &boundary = fAllClusters[splitIdx];
278 const std::uint64_t splitPoint = boundary.start + static_cast<std::uint64_t>(targetTraining - accumulated);
279
280 fTrainingClusters.push_back({boundary.rdfIdx, boundary.start, splitPoint});
281 fValidationClusters.push_back({boundary.rdfIdx, splitPoint, boundary.end});
282 fValidationClusters.insert(fValidationClusters.end(), fAllClusters.begin() + splitIdx + 1,
283 fAllClusters.end());
284
285 fNumTrainingEntries += splitPoint - boundary.start;
286 } else {
287 fValidationClusters.assign(fAllClusters.begin() + splitIdx, fAllClusters.end());
288 }
289
291 }
292
293 if (fTrainingClusters.empty())
294 throw std::runtime_error("RClusterLoader::SplitDataset: no entries for training after split. "
295 "Reduce validation_split.");
296
297 if (fValidationSplit > 0.0f && fValidationClusters.empty())
298 throw std::runtime_error("RClusterLoader::SplitDataset: no entries for validation after split. "
299 "Increase validation_split.");
300 }
301
302 //////////////////////////////////////////////////////////////////////////
303 /// \brief Re-order training clusters for the upcoming epoch
304 void ShuffleTrainingClusters(std::size_t epochIdx)
305 {
306 if (!fShuffle) {
307 return;
308 }
309
310 std::mt19937 g(fSetSeed == 0 ? std::random_device{}() : fSetSeed ^ epochIdx);
311 std::shuffle(fTrainingClusters.begin(), fTrainingClusters.end(), g);
312 }
313
314 //////////////////////////////////////////////////////////////////////////
315 /// \brief Re-order validation clusters for the upcoming epoch
316 void ShuffleValidationClusters(std::size_t epochIdx)
317 {
318 if (!fShuffle) {
319 return;
320 }
321 std::mt19937 g(fSetSeed == 0 ? std::random_device{}() : fSetSeed ^ epochIdx);
322 std::shuffle(fValidationClusters.begin(), fValidationClusters.end(), g);
323 }
324
325 void LoadClusterInto(RFlat2DMatrix &dest, std::size_t rdfIdx, std::uint64_t startRow, std::uint64_t endRow,
326 std::size_t rowOffset = 0)
327 {
328 ROOT::RDF::RNode &rdf = fRdfs[rdfIdx];
330 RClusterLoaderFunctor<Args...> func(dest, fNumChunkCols, fVecSizes, fVecPadding, 0, rowOffset);
331 rdf.Foreach(func, fCols);
333 }
334
335 //////////////////////////////////////////////////////////////////////////
336 /// \brief Load one training cluster and return the number of rows written.
337 ///
338 /// **Unfiltered**: delegates directly to `LoadClusterInto()`
339 /// **Filtered**, epoch 1 (!fSplitDiscovered):
340 /// - On the first call, Count() is called across all RDFs to obtain
341 /// the total filtered entry count, fNumTrainingEntries and
342 /// fNumValidationEntries are set as targets.
343 /// - A single Foreach on the full raw cluster range loads data and captures
344 /// rdfentry_ simultaneously. The real train/val boundary is computed from
345 /// the accumulated filtered count vs the target, then the train sub-range
346 /// is pushed to fTrainingClusters and the val sub-range to fValidationClusters.
347 /// - Only the train rows are written into \p dest.
348 /// -All subsequent epochs: delegates directly to `LoadClusterInto()`
349 std::size_t LoadTrainingClusterInto(RFlat2DMatrix &dest, std::size_t rdfIdx, std::uint64_t startRow,
350 std::uint64_t endRow, std::size_t rowOffset = 0)
351 {
353 // First call: discover total filtered count and set split targets.
355 std::vector<ROOT::RDF::RResultPtr<ULong64_t>> counts;
356 counts.reserve(fRdfs.size());
357 for (auto &rdf : fRdfs) {
358 counts.push_back(rdf.Count());
359 }
360 ROOT::RDF::RunGraphs({counts.begin(), counts.end()});
361
362 std::size_t totalFiltered = 0;
363 for (auto &c : counts) {
364 totalFiltered += c.GetValue();
365 }
366 fNumTrainingEntries = static_cast<std::size_t>(totalFiltered * (1.0f - fValidationSplit));
368 }
369
370 ROOT::RDF::RNode &rdf = fRdfs[rdfIdx];
371
372 // Fill data and collect raw entry indices that pass the filter
373 std::vector<ULong64_t> rdfEntries;
374 rdfEntries.reserve(endRow - startRow);
375
376 RClusterLoaderFunctor<Args...> loader(dest, fNumChunkCols, fVecSizes, fVecPadding, 0, rowOffset);
378
379 std::vector<std::string> colsWithEntry;
380 colsWithEntry.reserve(fCols.size() + 1);
381 colsWithEntry.push_back("rdfentry_");
382 colsWithEntry.insert(colsWithEntry.end(), fCols.begin(), fCols.end());
383
384 rdf.Foreach(
385 [&](ULong64_t entry, const Args &...cols) {
386 rdfEntries.push_back(entry);
387 loader(cols...);
388 },
389 colsWithEntry);
390
392
393 const std::size_t totalFiltered = rdfEntries.size();
394 if (totalFiltered == 0) {
395 return 0;
396 }
397 std::sort(rdfEntries.begin(), rdfEntries.end());
398
399 const std::size_t trainRemaining = fNumTrainingEntries - fAccumulatedFilteredForTrain;
400 const std::size_t trainCount =
401 std::min(static_cast<std::size_t>(totalFiltered * (1.0f - fValidationSplit)), trainRemaining);
402 const std::size_t valCount = totalFiltered - trainCount;
403
404 bool trainIsPrefix = true;
405 if (fShuffle) {
406 // If shuffling is enabled, we generate a random boolean value to decide whether the training set
407 // gets the prefix or suffix of each cluster to ensure better shuffling across runs when splitting.
408 std::mt19937 g(fSetSeed + fAccumulatedFilteredForTrain); // vary per cluster
409 std::uniform_int_distribution<int> coin(0, 1);
410 trainIsPrefix = coin(g);
411 }
412
413 // The boundary is the raw entry index of the first entry assigned to validation.
414 // Stable across epochs since the same filter always produces the same ordered entries.
415 const std::uint64_t trainBoundaryEntry = trainIsPrefix ? rdfEntries[trainCount] : rdfEntries[valCount];
416 const std::uint64_t boundary = (valCount > 0) ? trainBoundaryEntry : endRow;
417
418 const std::uint64_t trainStart = trainIsPrefix ? startRow : boundary;
419 const std::uint64_t trainEnd = trainIsPrefix ? boundary : endRow;
420 const std::uint64_t valStart = trainIsPrefix ? boundary : startRow;
421 const std::uint64_t valEnd = trainIsPrefix ? endRow : boundary;
422
423 if (trainCount > 0)
424 fTrainingClusters.push_back({rdfIdx, trainStart, trainEnd, trainCount});
425 if (valCount > 0)
426 fValidationClusters.push_back({rdfIdx, valStart, valEnd, valCount});
427
428 fAccumulatedFilteredForTrain += trainCount;
429 return trainCount;
430 }
431
432 LoadClusterInto(dest, rdfIdx, startRow, endRow, rowOffset);
433 return endRow - startRow;
434 }
435
436 //////////////////////////////////////////////////////////////////////////
437 /// \brief Load one validation cluster into \p dest starting at \p rowOffset
438 void LoadValidationClusterInto(RFlat2DMatrix &dest, std::size_t rdfIdx, std::uint64_t startRow, std::uint64_t endRow,
439 std::size_t rowOffset = 0)
440 {
441 LoadClusterInto(dest, rdfIdx, startRow, endRow, rowOffset);
442 }
443
444 //////////////////////////////////////////////////////////////////////////
445 /// \brief Mark the train/val split as finalised after the first epoch
447 {
448 if (fIsFiltered)
449 fSplitDiscovered = true;
450 }
451
452 bool IsSplitDiscovered() const { return !fIsFiltered || fSplitDiscovered; }
453
454 //////////////////////////////////////////////////////////////////////////
455 // Accessors
456 std::size_t GetNumTrainingEntries() const { return fNumTrainingEntries; }
457 std::size_t GetNumValidationEntries() const { return fNumValidationEntries; }
458 std::size_t GetNumChunkCols() const { return fNumChunkCols; }
459
460 const std::vector<RClusterRange> &GetTrainingClusters() const
461 {
463 }
464 const std::vector<RClusterRange> &GetValidationClusters() const { return fValidationClusters; }
465
466 std::size_t GetNumTrainingClusters() const
467 {
468 return (fIsFiltered && !fSplitDiscovered) ? fAllClusters.size() : fTrainingClusters.size();
469 }
470 std::size_t GetNumValidationClusters() const { return fValidationClusters.size(); }
471 std::size_t GetNmTotalClusters() const { return fAllClusters.size(); }
472};
473
474} // namespace ROOT::Experimental::Internal::ML
475#endif // ROOT_INTERNAL_ML_RCLUSTERLOADER
#define c(i)
Definition RSha256.hxx:101
#define g(i)
Definition RSha256.hxx:105
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t dest
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Functor invoked by RDataFrame::Foreach to fill one row of an RFlat2DMatrix.
void AssignToTensor(const T &vec, int i, int numColumns)
Copy the content of a column into the current tensor when the column consists of vectors.
RClusterLoaderFunctor(RFlat2DMatrix &chunkTensor, std::size_t numColumns, const std::vector< std::size_t > &maxVecSizes, float vecPadding, int i, std::size_t rowOffset=0)
void AssignToTensor(const T &val, int i, int numColumns)
Copy the content of a column into the current tensor when the column consists of scalar values.
void ShuffleTrainingClusters(std::size_t epochIdx)
Re-order training clusters for the upcoming epoch.
void FinaliseSplitDiscovery()
Mark the train/val split as finalised after the first epoch.
void LoadClusterInto(RFlat2DMatrix &dest, std::size_t rdfIdx, std::uint64_t startRow, std::uint64_t endRow, std::size_t rowOffset=0)
void ShuffleValidationClusters(std::size_t epochIdx)
Re-order validation clusters for the upcoming epoch.
std::size_t LoadTrainingClusterInto(RFlat2DMatrix &dest, std::size_t rdfIdx, std::uint64_t startRow, std::uint64_t endRow, std::size_t rowOffset=0)
Load one training cluster and return the number of rows written.
void SplitDataset()
Distribute the clusters into training and validation datasets No-op for filtered RDataFrames,...
void LoadValidationClusterInto(RFlat2DMatrix &dest, std::size_t rdfIdx, std::uint64_t startRow, std::uint64_t endRow, std::size_t rowOffset=0)
Load one validation cluster into dest starting at rowOffset.
const std::vector< RClusterRange > & GetTrainingClusters() const
RClusterLoader(std::vector< ROOT::RDF::RNode > &rdfs, const std::vector< std::string > &cols, const std::vector< std::size_t > &vecSizes, float vecPadding, float validationSplit, bool shuffle, std::size_t setSeed)
const std::vector< RClusterRange > & GetValidationClusters() const
std::vector< std::pair< std::uint64_t, std::uint64_t > > GetDatasetGlobalClusterBoundaries(const RNode &node)
Retrieve the cluster boundaries for each cluster in the dataset, across files, with a global offset.
void ChangeBeginAndEndEntries(const RNode &node, Long64_t begin, Long64_t end)
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Run the event loops of multiple RDataFrames concurrently.
RInterface<::ROOT::Detail::RDF::RNodeBase > RNode
Describes a contiguous range of entries within a single RDataFrame, corresponding to one TTree/RNTupl...
Wrapper around ROOT::RVec<float> representing a 2D matrix.