Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RBatchLoader.hxx
Go to the documentation of this file.
1// Author: Dante Niewenhuis, VU Amsterdam 07/2023
2// Author: Kristupas Pranckietis, Vilnius University 05/2024
3// Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024
4// Author: Vincenzo Eduardo Padulano, CERN 10/2024
5// Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025
6
7/*************************************************************************
8 * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#ifndef TMVA_RBATCHLOADER
16#define TMVA_RBATCHLOADER
17
18#include <vector>
19#include <memory>
20#include <numeric>
21
22// Imports for threading
23#include <queue>
24#include <mutex>
25#include <condition_variable>
26
28#include "TMVA/Tools.h"
29
31
32/**
33\class ROOT::TMVA::Experimental::Internal::RBatchLoader
34\ingroup tmva
35\brief Building and loading the batches from loaded chunks in RChunkLoader
36
37In this class the chunks that are loaded into memory (see RChunkLoader) are split into batches used in the ML training
38which are loaded into a queue. This is done for both the training and validation chunks separately.
39*/
40
42private:
43 std::size_t fBatchSize;
44 // needed for calculating the total number of batch columns when vectors columns are present
45 std::vector<std::string> fCols;
46 std::vector<std::size_t> fVecSizes;
47 std::size_t fSumVecSizes;
48 std::size_t fNumColumns;
49 std::size_t fNumEntries;
51
52 std::size_t fNumFullBatches;
54 std::size_t fNumBatches;
55 std::size_t fLeftoverBatchSize;
56
57 bool fIsActive = false;
58
59 std::mutex fBatchLock;
60 std::condition_variable fBatchCondition;
61
62 // queues of flattened tensors (rows * cols)
63 std::queue<std::unique_ptr<RFlat2DMatrix>> fBatchQueue;
64
65 // current batch that is loaded into memory
66 std::unique_ptr<RFlat2DMatrix> fCurrentBatch;
67
68 // primary and secondary leftover batches used to create batches from a chunk
69 std::unique_ptr<RFlat2DMatrix> fPrimaryLeftoverBatch;
70 std::unique_ptr<RFlat2DMatrix> fSecondaryLeftoverBatch;
71
72public:
73 RBatchLoader(std::size_t batchSize, const std::vector<std::string> &cols,
74 const std::vector<std::size_t> &vecSizes = {}, std::size_t numEntries = 0, bool dropRemainder = false)
76 {
77
78 fSumVecSizes = std::accumulate(fVecSizes.begin(), fVecSizes.end(), 0);
79 fNumColumns = fCols.size() + fSumVecSizes - fVecSizes.size();
80
81 if (fBatchSize == 0) {
83 }
84
87
89
90 if (fDropRemainder) {
92 }
93
94 else {
96 }
97
98 fPrimaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
99 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
100 }
101
102public:
103 void Activate()
104 {
105 {
106 std::lock_guard<std::mutex> lock(fBatchLock);
107 fIsActive = true;
108 }
109 fBatchCondition.notify_all();
110 }
111
112 /// \brief DeActivate the batchloader. This means that no more batches are created.
113 /// Batches can still be returned if they are already loaded
115 {
116 {
117 std::lock_guard<std::mutex> lock(fBatchLock);
118 fIsActive = false;
119 }
120 fBatchCondition.notify_all();
121 }
122
123 /// \brief Return a batch of data as a unique pointer.
124 /// After the batch has been processed, it should be destroyed.
125 /// \param[in] chunkTensor Tensor with the data from the chunk
126 /// \param[in] idxs Index of batch in the chunk
127 /// \return Batch
128 std::unique_ptr<RFlat2DMatrix> CreateBatch(RFlat2DMatrix &chunTensor, std::size_t idxs)
129 {
130 auto batch = std::make_unique<RFlat2DMatrix>(fBatchSize, fNumColumns);
131 std::copy(chunTensor.GetData() + (idxs * fBatchSize * fNumColumns),
132 chunTensor.GetData() + ((idxs + 1) * fBatchSize * fNumColumns), batch->GetData());
133
134 return batch;
135 }
136
137 /// \brief Loading the batch from the queue
138 /// \return Batch
140 {
141
142 if (fBatchQueue.empty()) {
143 fCurrentBatch = std::make_unique<RFlat2DMatrix>();
144 return *fCurrentBatch;
145 }
146
147 fCurrentBatch = std::move(fBatchQueue.front());
148 fBatchQueue.pop();
149
150 return *fCurrentBatch;
151 }
152
153 /// \brief Creating the batches from a chunk and add them to the queue.
154 /// \param[in] chunkTensor Tensor with the data from the chunk
155 /// \param[in] lastbatch Check if the batch in the chunk is the last one
157 {
158 std::size_t ChunkSize = chunkTensor.GetRows();
159 std::size_t NumCols = chunkTensor.GetCols();
160 std::size_t Batches = ChunkSize / fBatchSize;
161 std::size_t LeftoverBatchSize = ChunkSize % fBatchSize;
162
163 // create a vector of batches
164 std::vector<std::unique_ptr<RFlat2DMatrix>> batches;
165
166 // fill the full batches from the chunk into a vector
167 for (std::size_t i = 0; i < Batches; i++) {
168 // Fill a batch
169 batches.emplace_back(CreateBatch(chunkTensor, i));
170 }
171
172 // copy the remaining entries from the chunk into a leftover batch
174 std::copy(chunkTensor.GetData() + (Batches * fBatchSize * NumCols),
176 LeftoverBatch.GetData());
177
178 // calculate how many empty slots are left in fPrimaryLeftoverBatch
179 std::size_t PrimaryLeftoverSize = fPrimaryLeftoverBatch->GetRows();
181
182 // copy LeftoverBatch to end of fPrimaryLeftoverBatch
185 std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (LeftoverBatchSize * fNumColumns),
187
188 // copy LeftoverBatch to end of fPrimaryLeftoverBatch and add it to the batch vector
190 auto copy = std::make_unique<RFlat2DMatrix>(fBatchSize, fNumColumns);
191 std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fBatchSize * fNumColumns),
192 copy->GetData());
193 batches.emplace_back(std::move(copy));
194
195 // reset fPrimaryLeftoverBatch and fSecondaryLeftoverBatch
197 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
198 }
199 }
200
201 // copy LeftoverBatch to both fPrimaryLeftoverBatch and fSecondaryLeftoverBatch
202 else if (emptySlots < LeftoverBatchSize) {
203 // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch
205 std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * NumCols),
207
208 // copy the last part of LeftoverBatch to the end of fSecondaryLeftoverBatch
210 std::copy(LeftoverBatch.GetData() + (emptySlots * NumCols),
212
213 // add fPrimaryLeftoverBatch to the batch vector
214 auto copy = std::make_unique<RFlat2DMatrix>(fBatchSize, fNumColumns);
215 std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fBatchSize * fNumColumns),
216 copy->GetData());
217 batches.emplace_back(std::move(copy));
218
219 // exchange fPrimaryLeftoverBatch and fSecondaryLeftoverBatch
221
222 // reset fSecondaryLeftoverTrainingBatch
223 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
224 }
225
226 // copy the content of fPrimaryLeftoverBatch to the leftover batch from the chunk
227 if (lastbatch == 1) {
228
229 if (fDropRemainder == false && fLeftoverBatchSize > 0) {
230 auto copy = std::make_unique<RFlat2DMatrix>(fLeftoverBatchSize, fNumColumns);
231 std::copy(fPrimaryLeftoverBatch->GetData(),
232 fPrimaryLeftoverBatch->GetData() + (fLeftoverBatchSize * fNumColumns), copy->GetData());
233 batches.emplace_back(std::move(copy));
234 }
235
236 fPrimaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
237 fSecondaryLeftoverBatch = std::make_unique<RFlat2DMatrix>();
238 }
239
240 // append the batches from the batch vector from the chunk to the training batch queue
241 for (std::size_t i = 0; i < batches.size(); i++) {
242 fBatchQueue.push(std::move(batches[i]));
243 }
244 }
245
246 std::size_t GetNumBatches() { return fNumBatches; }
247 std::size_t GetNumEntries() { return fNumEntries; }
248 std::size_t GetNumRemainderRows() { return fLeftoverBatchSize; }
249 std::size_t GetNumBatchQueue() { return fBatchQueue.size(); }
250};
251
252} // namespace TMVA::Experimental::Internal
253
254#endif // TMVA_RBATCHLOADER
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
These classes encapsulate the necessary data for the computations.
std::unique_ptr< RFlat2DMatrix > fSecondaryLeftoverBatch
std::unique_ptr< RFlat2DMatrix > CreateBatch(RFlat2DMatrix &chunTensor, std::size_t idxs)
Return a batch of data as a unique pointer.
void CreateBatches(RFlat2DMatrix &chunkTensor, std::size_t lastbatch)
Creating the batches from a chunk and add them to the queue.
std::unique_ptr< RFlat2DMatrix > fCurrentBatch
std::queue< std::unique_ptr< RFlat2DMatrix > > fBatchQueue
RFlat2DMatrix GetBatch()
Loading the batch from the queue.
std::unique_ptr< RFlat2DMatrix > fPrimaryLeftoverBatch
void DeActivate()
DeActivate the batchloader.
RBatchLoader(std::size_t batchSize, const std::vector< std::string > &cols, const std::vector< std::size_t > &vecSizes={}, std::size_t numEntries=0, bool dropRemainder=false)
Wrapper around ROOT::RVec<float> representing a 2D matrix.