#include <iostream>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
#include <vector>
constexpr char const *kNTupleFileName = "ntpl013_staged.root";
constexpr int kNWriterThreads = 4;
constexpr int kNEventsPerThread = 25000;
constexpr int kNEventsPerBlock = 10000;
void FillData(
int id, RNTupleParallelWriter *
writer)
{
static std::mutex g_Mutex;
static NTupleSize_t g_WrittenEntries = 0;
using generator = std::mt19937;
generator gen;
auto fillContext =
writer->CreateFillContext();
fillContext->EnableStagedClusterCommitting();
auto entry = fillContext->CreateEntry();
auto eventId = entry->GetPtr<std::uint32_t>("eventId");
auto eventIdStart = id * kNEventsPerThread;
auto rndm = entry->GetPtr<float>("rndm");
for (int i = 0; i < kNEventsPerThread; i++) {
*eventId = eventIdStart + i;
auto d =
static_cast<double>(gen()) / generator::max();
*rndm =
static_cast<float>(
d);
fillContext->Fill(*entry);
}
fillContext->FlushCluster();
{
std::lock_guard
g(g_Mutex);
fillContext->CommitStagedClusters();
std::cout << "Thread #" << id << " wrote events #" << eventIdStart << " - #"
<< (eventIdStart + kNEventsPerThread - 1) << " as entries #" << g_WrittenEntries << " - #"
<< (g_WrittenEntries + kNEventsPerThread - 1) << std::endl;
g_WrittenEntries += kNEventsPerThread;
}
}
void Write()
{
std::cout << " === Writing with staged cluster committing ===" << std::endl;
auto model = RNTupleModel::CreateBare();
model->MakeField<std::uint32_t>("eventId");
model->MakeField<float>("rndm");
RNTupleWriteOptions options;
options.SetApproxZippedClusterSize(32'000);
auto writer = RNTupleParallelWriter::Recreate(std::move(model),
"NTuple", kNTupleFileName, options);
std::vector<std::thread> threads;
for (int i = 0; i < kNWriterThreads; ++i)
threads.emplace_back(FillData, i,
writer.get());
for (int i = 0; i < kNWriterThreads; ++i)
threads[i].join();
}
void FillDataInBlocks(
int id, RNTupleParallelWriter *
writer)
{
static std::mutex g_Mutex;
using generator = std::mt19937;
generator gen;
auto fillContext =
writer->CreateFillContext();
fillContext->EnableStagedClusterCommitting();
auto entry = fillContext->CreateEntry();
auto eventId = entry->GetPtr<std::uint32_t>("eventId");
auto eventIdStart = id * kNEventsPerThread;
int startOfBlock = 0;
auto rndm = entry->GetPtr<float>("rndm");
for (int i = 0; i < kNEventsPerThread; i++) {
*eventId = eventIdStart + i;
auto d =
static_cast<double>(gen()) / generator::max();
*rndm =
static_cast<float>(
d);
fillContext->Fill(*entry);
if ((i + 1) % kNEventsPerBlock == 0) {
fillContext->FlushCluster();
{
std::lock_guard
g(g_Mutex);
fillContext->CommitStagedClusters();
auto lastEvent = eventIdStart + i;
std::cout <<
"Thread #" <<
id <<
" wrote events #" <<
firstEvent <<
" - #" << lastEvent <<
" as entries #"
<< g_WrittenEntries << " - #" << (g_WrittenEntries + kNEventsPerBlock - 1) << std::endl;
g_WrittenEntries += kNEventsPerBlock;
startOfBlock += kNEventsPerBlock;
}
}
}
fillContext->FlushCluster();
{
std::lock_guard
g(g_Mutex);
fillContext->CommitStagedClusters();
auto lastEvent = eventIdStart + kNEventsPerThread - 1;
auto numEvents = kNEventsPerThread - startOfBlock;
std::cout <<
"Thread #" <<
id <<
" wrote events #" <<
firstEvent <<
" - #" << lastEvent <<
" as entries #"
<< g_WrittenEntries << " - #" << (g_WrittenEntries + numEvents - 1) << std::endl;
g_WrittenEntries += numEvents;
}
}
void WriteInBlocks()
{
std::cout << "\n === ... with sequencing in blocks of " << kNEventsPerBlock << " events ===" << std::endl;
auto model = RNTupleModel::CreateBare();
model->MakeField<std::uint32_t>("eventId");
model->MakeField<float>("rndm");
RNTupleWriteOptions options;
options.SetApproxZippedClusterSize(32'000);
auto writer = RNTupleParallelWriter::Recreate(std::move(model),
"NTuple", kNTupleFileName, options);
std::vector<std::thread> threads;
for (int i = 0; i < kNWriterThreads; ++i)
threads.emplace_back(FillDataInBlocks, i,
writer.get());
for (int i = 0; i < kNWriterThreads; ++i)
threads[i].join();
}
void ntpl013_staged()
{
Write();
WriteInBlocks();
}
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
Common user-tunable settings for storing ntuples.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.