#include #include #include #include #include #include #include #include #include #include // for std::size_t #include // for std::uint32_t #include // for std::ref #include #include #include #include #include #include #include // for std::pair #include using ModelTokensPair = std::pair, std::vector>; // A DataProduct associates an arbitrary address to an index in the model. struct DataProduct { std::size_t index; const void *address; DataProduct(std::size_t i, const void *a) : index(i), address(a) {} }; // The FileService opens a TFile and provides synchronization. class FileService { std::unique_ptr fFile; std::mutex fMutex; public: FileService(std::string_view url, std::string_view options = "") { fFile.reset(TFile::Open(std::string(url).c_str(), std::string(options).c_str())); // The file is automatically closed when destructing the std::unique_ptr. } TFile &GetFile() { return *fFile; } std::mutex &GetMutex() { return fMutex; } }; // An Outputter provides the interface to fill DataProducts into an RNTuple. class Outputter { public: virtual ~Outputter() = default; virtual void InitSlot(unsigned slot) = 0; virtual void Fill(unsigned slot, const std::vector &products) = 0; }; // A ParallelOutputter uses an RNTupleParallelWriter to append an RNTuple to a TFile. class ParallelOutputter final : public Outputter { FileService &fFileService; std::unique_ptr fParallelWriter; std::vector fTokens; struct SlotData { std::shared_ptr fillContext; std::unique_ptr entry; }; std::vector fSlots; public: ParallelOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options) : fFileService(fileService), fTokens(std::move(modelTokens.second)) { auto &model = modelTokens.first; std::lock_guard g(fileService.GetMutex()); fParallelWriter = ROOT::RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options); } void InitSlot(unsigned slot) final { if (slot >= fSlots.size()) { fSlots.resize(slot + 1); } // Create an RNTupleFillContext and RRawPtrWriteEntry that are used for all fills from this slot. fSlots[slot].fillContext = fParallelWriter->CreateFillContext(); fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateRawPtrWriteEntry(); } void Fill(unsigned slot, const std::vector &products) final { assert(slot < fSlots.size()); auto &fillContext = *fSlots[slot].fillContext; auto &entry = *fSlots[slot].entry; // Use the field tokens to bind the products' raw pointers. for (auto &&product : products) { entry.BindRawPtr(fTokens[product.index], product.address); } // Fill the entry without triggering an implicit flush. ROOT::RNTupleFillStatus status; fillContext.FillNoFlush(entry, status); if (status.ShouldFlushCluster()) { // If we are asked to flush, first try to do as much work as possible outside of the critical section: // FlushColumns() will flush column data and trigger compression, but not actually write to storage. // (A framework may of course also decide to flush more often.) fillContext.FlushColumns(); { // FlushCluster() will flush data to the underlying TFile, so it requires synchronization. std::lock_guard g(fFileService.GetMutex()); fillContext.FlushCluster(); } } } }; // A SerializingOutputter uses a sequential RNTupleWriter to append an RNTuple to a TFile and a std::mutex to // synchronize multiple threads. Note that ROOT's implicit multithreading would not be very efficient with this // implementation because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently // in the critical section by executing its tasks. See also the note at the top of the file. class SerializingOutputter final : public Outputter { FileService &fFileService; std::unique_ptr fWriter; std::mutex fWriterMutex; std::vector fTokens; struct SlotData { std::unique_ptr entry; }; std::vector fSlots; public: SerializingOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options) : fFileService(fileService), fTokens(std::move(modelTokens.second)) { auto &model = modelTokens.first; std::lock_guard g(fileService.GetMutex()); fWriter = ROOT::RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options); } void InitSlot(unsigned slot) final { if (slot >= fSlots.size()) { fSlots.resize(slot + 1); } // Create an RRawPtrWriteEntry that is used for all fills from this slot. fSlots[slot].entry = fWriter->GetModel().CreateRawPtrWriteEntry(); } void Fill(unsigned slot, const std::vector &products) final { assert(slot < fSlots.size()); auto &entry = *fSlots[slot].entry; // Use the field tokens to bind the products' raw pointers. for (auto &&product : products) { entry.BindRawPtr(fTokens[product.index], product.address); } { // Fill the entry without triggering an implicit flush. This requires synchronization with other threads using // the same writer, but not (yet) with the underlying TFile. std::lock_guard g(fWriterMutex); ROOT::RNTupleFillStatus status; fWriter->FillNoFlush(entry, status); if (status.ShouldFlushCluster()) { // If we are asked to flush, first try to do as much work as possible outside of the critical section: // FlushColumns() will flush column data and trigger compression, but not actually write to storage. // (A framework may of course also decide to flush more often.) fWriter->FlushColumns(); { // FlushCluster() will flush data to the underlying TFile, so it requires synchronization. std::lock_guard g(fFileService.GetMutex()); fWriter->FlushCluster(); } } } } }; // === END OF TUTORIAL FRAMEWORK CODE === // Simple structs to store events struct Track { float eta; float mass; float pt; float phi; }; struct ChargedTrack : public Track { std::int8_t charge; }; struct Event { std::uint32_t eventId; std::uint32_t runId; std::vector electrons; std::vector photons; std::vector muons; }; // RNTupleModel for Events; in a real framework, this would likely be dynamic. ModelTokensPair CreateEventModel() { // We recommend creating a bare model if the default entry is not used. auto model = ROOT::RNTupleModel::CreateBare(); // For more efficient access, also create field tokens. std::vector tokens; model->MakeField("eventId"); tokens.push_back(model->GetToken("eventId")); model->MakeField("runId"); tokens.push_back(model->GetToken("runId")); model->MakeField("electrons"); tokens.push_back(model->GetToken("electrons")); model->MakeField("photons"); tokens.push_back(model->GetToken("photons")); model->MakeField("muons"); tokens.push_back(model->GetToken("muons")); return {std::move(model), std::move(tokens)}; } // DataProducts with addresses that point into the Event object. std::vector CreateEventDataProducts(Event &event) { std::vector products; // The indices have to match the order of std::vector above. products.emplace_back(0, &event.eventId); products.emplace_back(1, &event.runId); products.emplace_back(2, &event.electrons); products.emplace_back(3, &event.photons); products.emplace_back(4, &event.muons); return products; } // Simple struct to store runs struct Run { std::uint32_t runId; std::uint32_t nEvents; }; // RNTupleModel for Runs; in a real framework, this would likely be dynamic. ModelTokensPair CreateRunModel() { // We recommend creating a bare model if the default entry is not used. auto model = ROOT::RNTupleModel::CreateBare(); // For more efficient access, also create field tokens. std::vector tokens; model->MakeField("runId"); tokens.push_back(model->GetToken("runId")); model->MakeField("nEvents"); tokens.push_back(model->GetToken("nEvents")); return {std::move(model), std::move(tokens)}; } // DataProducts with addresses that point into the Run object. std::vector CreateRunDataProducts(Run &run) { std::vector products; // The indices have to match the order of std::vector above. products.emplace_back(0, &run.runId); products.emplace_back(1, &run.nEvents); return products; } constexpr unsigned kNRunsPerThread = 100; constexpr unsigned kMeanNEventsPerRun = 400; constexpr unsigned kStddevNEventsPerRun = 100; constexpr unsigned kMeanNTracks = 5; void ProcessRunsAndEvents(unsigned threadId, Outputter &eventOutputter, Outputter &runOutputter) { std::mt19937 gen(threadId); std::normal_distribution nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun); std::poisson_distribution<> nTracksDist(kMeanNTracks); std::uniform_real_distribution floatDist; for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) { double nEventsD = nEventsDist(gen); std::uint32_t nEvents = 0; if (nEventsD > 0) { nEvents = static_cast(nEventsD); } // Process events, reusing a single Event object. Event event; event.runId = runId; auto eventProducts = CreateEventDataProducts(event); for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) { event.eventId = eventId; // Produce some data; eta, phi, and pt are just filled with uniformly distributed data. event.electrons.resize(nTracksDist(gen)); for (auto &electron : event.electrons) { electron.eta = floatDist(gen); electron.mass = 0.511 /* MeV */; electron.phi = floatDist(gen); electron.pt = floatDist(gen); electron.charge = (gen() % 2 ? 1 : -1); } event.photons.resize(nTracksDist(gen)); for (auto &photon : event.photons) { photon.eta = floatDist(gen); photon.mass = 0; photon.phi = floatDist(gen); photon.pt = floatDist(gen); } event.muons.resize(nTracksDist(gen)); for (auto &muon : event.muons) { muon.eta = floatDist(gen); muon.mass = 105.658 /* MeV */; muon.phi = floatDist(gen); muon.pt = floatDist(gen); muon.charge = (gen() % 2 ? 1 : -1); } eventOutputter.Fill(threadId, eventProducts); } // Fill the Run data. Run run; run.runId = runId; run.nEvents = nEvents; auto runProducts = CreateRunDataProducts(run); runOutputter.Fill(threadId, runProducts); } } constexpr unsigned kNThreads = 4; void ntpl014_framework() { FileService fileService("ntpl014_framework.root", "RECREATE"); ROOT::RNTupleWriteOptions options; // Parallel writing requires buffered writing; force it on (even if it is the default). options.SetUseBufferedWrite(true); // For demonstration purposes, reduce the cluster size to 2 MiB. options.SetApproxZippedClusterSize(2 * 1024 * 1024); ParallelOutputter eventOutputter(CreateEventModel(), fileService, "Events", options); // SerializingOutputter also relies on buffered writing; force it on (even if it is the default). options.SetUseBufferedWrite(true); // For demonstration purposes, reduce the cluster size for the very simple Run data to 1 KiB. options.SetApproxZippedClusterSize(1024); SerializingOutputter runOutputter(CreateRunModel(), fileService, "Runs", options); // Initialize slots in the two Outputters. for (unsigned i = 0; i < kNThreads; i++) { eventOutputter.InitSlot(i); runOutputter.InitSlot(i); } std::vector threads; for (unsigned i = 0; i < kNThreads; i++) { threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter)); } for (unsigned i = 0; i < kNThreads; i++) { threads[i].join(); } }