Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl014_framework.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4///
5/// Example of framework usage for writing RNTuples:
6/// 1. Creation of (bare) RNTupleModels and RFieldTokens.
7/// 2. Creation of RNTupleWriter and RNTupleParallelWriter when appending to a single TFile.
8/// 3. Creation of RNTupleFillContext and RRawPtrWriteEntry per thread, and usage of BindRawPtr.
9/// 4. Usage of FillNoFlush(), RNTupleFillStatus::ShouldFlushCluster(), FlushColumns(), and FlushCluster().
10///
11/// Please note that this tutorial has very simplified versions of classes that could be found in a framework, such as
12/// DataProduct, FileService, ParallelOutputter, and SerializingOutputter. They try to mimick the usage in a framework
13/// (for example, Outputters are agnostic of the data written, which is encapsulated in std::vector<DataProduct>), but
14/// are not meant for production usage!
15///
16/// Also note that this tutorial uses std::thread and std::mutex directly instead of a task scheduling library such as
17/// Threading Building Blocks (TBB). For that reason, turning on ROOT's implicit multithreading (IMT) would not be very
18/// efficient with the simplified code in this tutorial because a thread blocking to acquire a std::mutex cannot "help"
19/// the other thread that is currently in the critical section by executing its tasks. If that is wanted, the framework
20/// should use synchronization methods provided by TBB directly (which goes beyond the scope of this tutorial).
21///
22/// \macro_code
23///
24/// \date September 2024
25/// \author The ROOT Team
26
27// NOTE: The RRawPtrWriteEntry is experimental at this point.
28// Functionality and interface are still subject to changes.
29
31#include <ROOT/RFieldToken.hxx>
34#include <ROOT/RNTupleModel.hxx>
38
39#include <cassert>
40#include <cstddef> // for std::size_t
41#include <cstdint> // for std::uint32_t
42#include <functional> // for std::ref
43#include <memory>
44#include <mutex>
45#include <random>
46#include <string>
47#include <string_view>
48#include <thread>
49#include <utility> // for std::pair
50#include <vector>
51
52// Import classes from Experimental namespace for the time being
54
55using ModelTokensPair = std::pair<std::unique_ptr<ROOT::RNTupleModel>, std::vector<ROOT::RFieldToken>>;
56
57// A DataProduct associates an arbitrary address to an index in the model.
58struct DataProduct {
59 std::size_t index;
60 const void *address;
61
62 DataProduct(std::size_t i, const void *a) : index(i), address(a) {}
63};
64
65// The FileService opens a TFile and provides synchronization.
66class FileService {
67 std::unique_ptr<TFile> fFile;
68 std::mutex fMutex;
69
70public:
71 FileService(std::string_view url, std::string_view options = "")
72 {
73 fFile.reset(TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
74 // The file is automatically closed when destructing the std::unique_ptr.
75 }
76
77 TFile &GetFile() { return *fFile; }
78 std::mutex &GetMutex() { return fMutex; }
79};
80
81// An Outputter provides the interface to fill DataProducts into an RNTuple.
82class Outputter {
83public:
84 virtual ~Outputter() = default;
85
86 virtual void InitSlot(unsigned slot) = 0;
87 virtual void Fill(unsigned slot, const std::vector<DataProduct> &products) = 0;
88};
89
90// A ParallelOutputter uses an RNTupleParallelWriter to append an RNTuple to a TFile.
91class ParallelOutputter final : public Outputter {
93 std::unique_ptr<ROOT::RNTupleParallelWriter> fParallelWriter;
94 std::vector<ROOT::RFieldToken> fTokens;
95
96 struct SlotData {
97 std::shared_ptr<ROOT::RNTupleFillContext> fillContext;
98 std::unique_ptr<RRawPtrWriteEntry> entry;
99 };
100 std::vector<SlotData> fSlots;
101
102public:
104 const ROOT::RNTupleWriteOptions &options)
106 {
107 auto &model = modelTokens.first;
108
109 std::lock_guard g(fileService.GetMutex());
111 ROOT::RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options);
112 }
113
114 void InitSlot(unsigned slot) final
115 {
116 if (slot >= fSlots.size()) {
117 fSlots.resize(slot + 1);
118 }
119 // Create an RNTupleFillContext and RRawPtrWriteEntry that are used for all fills from this slot.
120 fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
121 fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateRawPtrWriteEntry();
122 }
123
124 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
125 {
126 assert(slot < fSlots.size());
127 auto &fillContext = *fSlots[slot].fillContext;
128 auto &entry = *fSlots[slot].entry;
129
130 // Use the field tokens to bind the products' raw pointers.
131 for (auto &&product : products) {
132 entry.BindRawPtr(fTokens[product.index], product.address);
133 }
134
135 // Fill the entry without triggering an implicit flush.
137 fillContext.FillNoFlush(entry, status);
138 if (status.ShouldFlushCluster()) {
139 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
140 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
141 // (A framework may of course also decide to flush more often.)
142 fillContext.FlushColumns();
143
144 {
145 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
146 std::lock_guard g(fFileService.GetMutex());
147 fillContext.FlushCluster();
148 }
149 }
150 }
151};
152
153// A SerializingOutputter uses a sequential RNTupleWriter to append an RNTuple to a TFile and a std::mutex to
154// synchronize multiple threads. Note that ROOT's implicit multithreading would not be very efficient with this
155// implementation because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently
156// in the critical section by executing its tasks. See also the note at the top of the file.
157class SerializingOutputter final : public Outputter {
159 std::unique_ptr<ROOT::RNTupleWriter> fWriter;
160 std::mutex fWriterMutex;
161 std::vector<ROOT::RFieldToken> fTokens;
162
163 struct SlotData {
164 std::unique_ptr<RRawPtrWriteEntry> entry;
165 };
166 std::vector<SlotData> fSlots;
167
168public:
170 const ROOT::RNTupleWriteOptions &options)
172 {
173 auto &model = modelTokens.first;
174
175 std::lock_guard g(fileService.GetMutex());
176 fWriter = ROOT::RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options);
177 }
178
179 void InitSlot(unsigned slot) final
180 {
181 if (slot >= fSlots.size()) {
182 fSlots.resize(slot + 1);
183 }
184 // Create an RRawPtrWriteEntry that is used for all fills from this slot.
185 fSlots[slot].entry = fWriter->GetModel().CreateRawPtrWriteEntry();
186 }
187
188 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
189 {
190 assert(slot < fSlots.size());
191 auto &entry = *fSlots[slot].entry;
192
193 // Use the field tokens to bind the products' raw pointers.
194 for (auto &&product : products) {
195 entry.BindRawPtr(fTokens[product.index], product.address);
196 }
197
198 {
199 // Fill the entry without triggering an implicit flush. This requires synchronization with other threads using
200 // the same writer, but not (yet) with the underlying TFile.
201 std::lock_guard g(fWriterMutex);
203 fWriter->FillNoFlush(entry, status);
204 if (status.ShouldFlushCluster()) {
205 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
206 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
207 // (A framework may of course also decide to flush more often.)
208 fWriter->FlushColumns();
209
210 {
211 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
212 std::lock_guard g(fFileService.GetMutex());
213 fWriter->FlushCluster();
214 }
215 }
216 }
217 }
218};
219
220// === END OF TUTORIAL FRAMEWORK CODE ===
221
222// Simple structs to store events
223struct Track {
224 float eta;
225 float mass;
226 float pt;
227 float phi;
228};
229
230struct ChargedTrack : public Track {
231 std::int8_t charge;
232};
233
234struct Event {
235 std::uint32_t eventId;
236 std::uint32_t runId;
237 std::vector<ChargedTrack> electrons;
238 std::vector<Track> photons;
239 std::vector<ChargedTrack> muons;
240};
241
242// RNTupleModel for Events; in a real framework, this would likely be dynamic.
244{
245 // We recommend creating a bare model if the default entry is not used.
246 auto model = ROOT::RNTupleModel::CreateBare();
247 // For more efficient access, also create field tokens.
248 std::vector<ROOT::RFieldToken> tokens;
249
250 model->MakeField<decltype(Event::eventId)>("eventId");
251 tokens.push_back(model->GetToken("eventId"));
252
253 model->MakeField<decltype(Event::runId)>("runId");
254 tokens.push_back(model->GetToken("runId"));
255
256 model->MakeField<decltype(Event::electrons)>("electrons");
257 tokens.push_back(model->GetToken("electrons"));
258
259 model->MakeField<decltype(Event::photons)>("photons");
260 tokens.push_back(model->GetToken("photons"));
261
262 model->MakeField<decltype(Event::muons)>("muons");
263 tokens.push_back(model->GetToken("muons"));
264
265 return {std::move(model), std::move(tokens)};
266}
267
268// DataProducts with addresses that point into the Event object.
269std::vector<DataProduct> CreateEventDataProducts(Event &event)
270{
271 std::vector<DataProduct> products;
272 // The indices have to match the order of std::vector<ROOT::RFieldToken> above.
273 products.emplace_back(0, &event.eventId);
274 products.emplace_back(1, &event.runId);
275 products.emplace_back(2, &event.electrons);
276 products.emplace_back(3, &event.photons);
277 products.emplace_back(4, &event.muons);
278 return products;
279}
280
281// Simple struct to store runs
282struct Run {
283 std::uint32_t runId;
284 std::uint32_t nEvents;
285};
286
287// RNTupleModel for Runs; in a real framework, this would likely be dynamic.
289{
290 // We recommend creating a bare model if the default entry is not used.
291 auto model = ROOT::RNTupleModel::CreateBare();
292 // For more efficient access, also create field tokens.
293 std::vector<ROOT::RFieldToken> tokens;
294
295 model->MakeField<decltype(Run::runId)>("runId");
296 tokens.push_back(model->GetToken("runId"));
297
298 model->MakeField<decltype(Run::nEvents)>("nEvents");
299 tokens.push_back(model->GetToken("nEvents"));
300
301 return {std::move(model), std::move(tokens)};
302}
303
304// DataProducts with addresses that point into the Run object.
305std::vector<DataProduct> CreateRunDataProducts(Run &run)
306{
307 std::vector<DataProduct> products;
308 // The indices have to match the order of std::vector<ROOT::RFieldToken> above.
309 products.emplace_back(0, &run.runId);
310 products.emplace_back(1, &run.nEvents);
311 return products;
312}
313
314constexpr unsigned kNRunsPerThread = 100;
315constexpr unsigned kMeanNEventsPerRun = 400;
316constexpr unsigned kStddevNEventsPerRun = 100;
317constexpr unsigned kMeanNTracks = 5;
318
320{
321 std::mt19937 gen(threadId);
322 std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
323 std::poisson_distribution<> nTracksDist(kMeanNTracks);
324 std::uniform_real_distribution<float> floatDist;
325
326 for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
327 double nEventsD = nEventsDist(gen);
328 std::uint32_t nEvents = 0;
329 if (nEventsD > 0) {
330 nEvents = static_cast<std::uint32_t>(nEventsD);
331 }
332
333 // Process events, reusing a single Event object.
334 Event event;
335 event.runId = runId;
337 for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
338 event.eventId = eventId;
339
340 // Produce some data; eta, phi, and pt are just filled with uniformly distributed data.
341 event.electrons.resize(nTracksDist(gen));
342 for (auto &electron : event.electrons) {
343 electron.eta = floatDist(gen);
344 electron.mass = 0.511 /* MeV */;
345 electron.phi = floatDist(gen);
346 electron.pt = floatDist(gen);
347 electron.charge = (gen() % 2 ? 1 : -1);
348 }
349 event.photons.resize(nTracksDist(gen));
350 for (auto &photon : event.photons) {
351 photon.eta = floatDist(gen);
352 photon.mass = 0;
353 photon.phi = floatDist(gen);
354 photon.pt = floatDist(gen);
355 }
356 event.muons.resize(nTracksDist(gen));
357 for (auto &muon : event.muons) {
358 muon.eta = floatDist(gen);
359 muon.mass = 105.658 /* MeV */;
360 muon.phi = floatDist(gen);
361 muon.pt = floatDist(gen);
362 muon.charge = (gen() % 2 ? 1 : -1);
363 }
364
366 }
367
368 // Fill the Run data.
369 Run run;
370 run.runId = runId;
371 run.nEvents = nEvents;
372
375 }
376}
377
378constexpr unsigned kNThreads = 4;
379
381{
382 FileService fileService("ntpl014_framework.root", "RECREATE");
383
385 // Parallel writing requires buffered writing; force it on (even if it is the default).
386 options.SetUseBufferedWrite(true);
387 // For demonstration purposes, reduce the cluster size to 2 MiB.
388 options.SetApproxZippedClusterSize(2 * 1024 * 1024);
390
391 // SerializingOutputter also relies on buffered writing; force it on (even if it is the default).
392 options.SetUseBufferedWrite(true);
393 // For demonstration purposes, reduce the cluster size for the very simple Run data to 1 KiB.
394 options.SetApproxZippedClusterSize(1024);
396
397 // Initialize slots in the two Outputters.
398 for (unsigned i = 0; i < kNThreads; i++) {
399 eventOutputter.InitSlot(i);
400 runOutputter.InitSlot(i);
401 }
402
403 std::vector<std::thread> threads;
404 for (unsigned i = 0; i < kNThreads; i++) {
405 threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
406 }
407 for (unsigned i = 0; i < kNThreads; i++) {
408 threads[i].join();
409 }
410}
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Int_t Fill(Double_t x) override
A container of const raw pointers, corresponding to a row in the data set.
A status object after filling an entry.
bool ShouldFlushCluster() const
Return true if the caller should call FlushCluster.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an RNTuple to the existing file.
Common user-tunable settings for storing RNTuples.
void SetApproxZippedClusterSize(std::size_t val)
static std::unique_ptr< RNTupleWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter that writes into an existing TFile or TDirectory, without overwriting its co...
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:131
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3764
TPaveText * pt
double product(double const *factors, std::size_t nFactors)
Definition MathFuncs.h:92