Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RBatchLoader.hxx
Go to the documentation of this file.
1// Author: Dante Niewenhuis, VU Amsterdam 07/2023
2// Author: Kristupas Pranckietis, Vilnius University 05/2024
3// Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024
4// Author: Vincenzo Eduardo Padulano, CERN 10/2024
5// Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025
6// Author: Silia Taider, CERN 02/2026
7
8/*************************************************************************
9 * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT_INTERNAL_ML_RBATCHLOADER
17#define ROOT_INTERNAL_ML_RBATCHLOADER
18
19#include <vector>
20#include <memory>
21#include <numeric>
22
23// Imports for threading
24#include <queue>
25#include <mutex>
26#include <condition_variable>
27
29
31/**
32 \class ROOT::Experimental::Internal::ML::RBatchLoader
33
34\brief Building and loading the batches from loaded chunks in RChunkLoader
35
36In this class the chunks that are loaded into memory (see RChunkLoader) are split into batches used in the ML training
37which are loaded into a queue. This is done for both the training and validation chunks separately.
38*/
39
41private:
42 std::size_t fBatchSize;
43 // needed for calculating the total number of batch columns when vectors columns are present
44 std::vector<std::string> fCols;
45 std::mutex &fLock;
46 std::condition_variable &fCV;
47 std::vector<std::size_t> fVecSizes;
48 std::size_t fSumVecSizes;
49 std::size_t fNumColumns;
50 std::size_t fNumEntries;
52
53 std::size_t fNumFullBatches;
54 std::size_t fNumBatches;
55 std::size_t fLeftoverBatchSize;
56
57 bool fIsActive = false;
58 bool fProducerDone = true;
59
60 // queues of flattened tensors (rows * cols)
61 std::queue<std::unique_ptr<RFlat2DMatrix>> fBatchQueue;
62
63 // current batch that is loaded into memory
64 std::unique_ptr<RFlat2DMatrix> fCurrentBatch;
65
66 // primary and secondary leftover batches used to create batches from a chunk
67 std::unique_ptr<RFlat2DMatrix> fPrimaryLeftoverBatch;
68 std::unique_ptr<RFlat2DMatrix> fSecondaryLeftoverBatch;
69
70public:
71 RBatchLoader(std::size_t batchSize, const std::vector<std::string> &cols, std::mutex &sharedMutex,
72 std::condition_variable &sharedCV, const std::vector<std::size_t> &vecSizes = {},
73 std::size_t numEntries = 0, bool dropRemainder = false)
74 : fBatchSize(batchSize),
75 fCols(cols),
79 fNumEntries(numEntries),
81 {
82 fSumVecSizes = std::accumulate(fVecSizes.begin(), fVecSizes.end(), 0);
83 fNumColumns = fCols.size() + fSumVecSizes - fVecSizes.size();
84
85 if (fBatchSize == 0) {
87 }
88
91
92 std::size_t numLeftoverBatches = fLeftoverBatchSize == 0 ? 0 : 1;
93
94 if (fDropRemainder) {
96 } else {
98 }
99
100 fPrimaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
101 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
102 }
103
104 /// \brief Activate the batchloader. This means that batches can be created and loaded.
105 void Activate()
106 {
107 {
108 std::lock_guard<std::mutex> lock(fLock);
109 if (fIsActive)
110 return;
111 fIsActive = true;
112 fProducerDone = false;
113 }
114
115 fCV.notify_all();
116 }
117
118 /// \brief DeActivate the batchloader. This means that no more batches are created.
119 /// Batches can still be returned if they are already loaded.
121 {
122 {
123 std::lock_guard<std::mutex> lock(fLock);
124 if (!fIsActive)
125 return;
126 fIsActive = false;
127 }
128
129 fCV.notify_all();
130 }
131
132 /// \brief Return a batch of data as a unique pointer.
133 /// After the batch has been processed, it should be destroyed.
134 /// \param[in] chunkTensor Tensor with the data from the chunk
135 /// \param[in] idxs Index of batch in the chunk
136 /// \return Batch
137 std::unique_ptr<RFlat2DMatrix> CreateBatch(RFlat2DMatrix &chunkTensor, std::size_t idxs)
138 {
139 auto batch = std::make_unique<RFlat2DMatrix>(fBatchSize, fNumColumns);
140 std::copy(chunkTensor.GetData() + (idxs * fBatchSize * fNumColumns),
141 chunkTensor.GetData() + ((idxs + 1) * fBatchSize * fNumColumns), batch->GetData());
142
143 return batch;
144 }
145
146 /// \brief Loading the batch from the queue.
147 /// \return Batch
149 {
150 std::unique_lock<std::mutex> lock(fLock);
151
152 // Wait until:
153 // - there is data in the queue
154 // - or producer declares "done"
155 // - or we are deactivated
156 fCV.wait(lock, [&] { return !fBatchQueue.empty() || fProducerDone || !fIsActive; });
157
158 if (fBatchQueue.empty()) {
159 // producer done and no queued data -> end-of-epoch signal
160 fCurrentBatch = std::make_unique<RFlat2DMatrix>();
161 return *fCurrentBatch;
162 }
163
164 fCurrentBatch = std::move(fBatchQueue.front());
165 fBatchQueue.pop();
166 // Notify the loading thread that the queue has drained
167 fCV.notify_all();
168
169 return *fCurrentBatch;
170 }
171
172 /// \brief Creating the batches from a chunk and add them to the queue.
173 /// \param[in] chunkTensor Tensor with the data from the chunk
174 /// \param[in] isLastBatch Check if the batch in the chunk is the last one
176 {
177 std::size_t chunkSize = chunkTensor.GetRows();
178 std::size_t numCols = chunkTensor.GetCols();
179 std::size_t numBatches = chunkSize / fBatchSize;
180 std::size_t leftoverBatchSize = chunkSize % fBatchSize;
181
182 // create a vector of batches
183 std::vector<std::unique_ptr<RFlat2DMatrix>> batches;
184
185 // fill the full batches from the chunk into a vector
186 for (std::size_t i = 0; i < numBatches; i++) {
187 batches.emplace_back(CreateBatch(chunkTensor, i));
188 }
189
190 // copy the remaining entries from the chunk into a leftover batch
192 std::copy(chunkTensor.GetData() + (numBatches * fBatchSize * numCols),
194 LeftoverBatch.GetData());
195
196 // calculate how many empty slots are left in fPrimaryLeftoverBatch
197 std::size_t PrimaryLeftoverSize = fPrimaryLeftoverBatch->GetRows();
199
200 // copy LeftoverBatch to end of fPrimaryLeftoverBatch
203 std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (leftoverBatchSize * fNumColumns),
205
206 // copy LeftoverBatch to end of fPrimaryLeftoverBatch and add it to the batch
208 auto copy = std::make_unique<RFlat2DMatrix>(fBatchSize, fNumColumns);
209 std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fBatchSize * fNumColumns),
210 copy->GetData());
211 batches.emplace_back(std::move(copy));
212
213 // reset fPrimaryLeftoverBatch and fSecondaryLeftoverBatch
215 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
216 }
217 }
218
219 // copy LeftoverBatch to both fPrimaryLeftoverBatch and fSecondaryLeftoverBatch
220 else if (emptySlots < leftoverBatchSize) {
221 // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch
223 std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * numCols),
225
226 // copy the last part of LeftoverBatch to the end of fSecondaryLeftoverBatch
228 std::copy(LeftoverBatch.GetData() + (emptySlots * numCols),
230
231 // add fPrimaryLeftoverBatch to the batch vector
232 auto copy = std::make_unique<RFlat2DMatrix>(fBatchSize, fNumColumns);
233 std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fBatchSize * fNumColumns),
234 copy->GetData());
235 batches.emplace_back(std::move(copy));
236
237 // exchange fPrimaryLeftoverBatch and fSecondaryLeftoverBatch
239 // reset fSecondaryLeftoverTrainingBatch
240 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
241 }
242
243 // copy the content of fPrimaryLeftoverBatch to the leftover batch from the chunk
244 if (isLastBatch) {
246 auto copy = std::make_unique<RFlat2DMatrix>(fLeftoverBatchSize, fNumColumns);
247 std::copy(fPrimaryLeftoverBatch->GetData(),
248 fPrimaryLeftoverBatch->GetData() + (fLeftoverBatchSize * fNumColumns), copy->GetData());
249 batches.emplace_back(std::move(copy));
250 }
251
252 fPrimaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
253 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
254 }
255
256 {
257 std::lock_guard<std::mutex> lock(fLock);
258 for (auto &batch : batches) {
259 fBatchQueue.push(std::move(batch));
260 }
261 }
262
263 fCV.notify_all();
264 }
265
266 /// \brief Reset the batchloader state.
267 void Reset()
268 {
269 {
270 std::lock_guard<std::mutex> lock(fLock);
271
272 while (!fBatchQueue.empty()) {
273 fBatchQueue.pop();
274 }
275
276 fCurrentBatch.reset();
277 fPrimaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
278 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
279 }
280
281 fCV.notify_all();
282 }
283
284 /// \brief Signal that the producer has finished pushing all batches for this epoch.
286 {
287 fProducerDone = true;
288 fCV.notify_all();
289 }
290
291 bool isProducerDone() { return fProducerDone; }
292 std::size_t GetNumBatches() { return fNumBatches; }
293 std::size_t GetNumEntries() { return fNumEntries; }
294 std::size_t GetNumRemainderRows() { return fLeftoverBatchSize; }
295 std::size_t GetNumBatchQueue() { return fBatchQueue.size(); }
296};
297
298} // namespace ROOT::Experimental::Internal::ML
299
300#endif // ROOT_INTERNAL_ML_RBATCHLOADER
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Building and loading the batches from loaded chunks in RChunkLoader.
RFlat2DMatrix GetBatch()
Loading the batch from the queue.
std::unique_ptr< RFlat2DMatrix > fSecondaryLeftoverBatch
std::queue< std::unique_ptr< RFlat2DMatrix > > fBatchQueue
void Reset()
Reset the batchloader state.
void Activate()
Activate the batchloader. This means that batches can be created and loaded.
void MarkProducerDone()
Signal that the producer has finished pushing all batches for this epoch.
void CreateBatches(RFlat2DMatrix &chunkTensor, bool isLastBatch)
Creating the batches from a chunk and add them to the queue.
void DeActivate()
DeActivate the batchloader.
std::unique_ptr< RFlat2DMatrix > fPrimaryLeftoverBatch
std::unique_ptr< RFlat2DMatrix > fCurrentBatch
RBatchLoader(std::size_t batchSize, const std::vector< std::string > &cols, std::mutex &sharedMutex, std::condition_variable &sharedCV, const std::vector< std::size_t > &vecSizes={}, std::size_t numEntries=0, bool dropRemainder=false)
std::unique_ptr< RFlat2DMatrix > CreateBatch(RFlat2DMatrix &chunkTensor, std::size_t idxs)
Return a batch of data as a unique pointer.
Wrapper around ROOT::RVec<float> representing a 2D matrix.