{ "cells": [ { "cell_type": "markdown", "id": "6949c302", "metadata": {}, "source": [ "# ntpl013_staged\n", "Example of staged cluster committing in multi-threaded writing using RNTupleParallelWriter.\n", "\n", "\n", "\n", "\n", "**Author:** The ROOT Team \n", "This notebook tutorial was automatically generated with ROOTBOOK-izer from the macro found in the ROOT repository on Tuesday, May 19, 2026 at 08:15 PM." ] }, { "cell_type": "code", "execution_count": 1, "id": "6fc0d823", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:20.626238Z", "iopub.status.busy": "2026-05-19T20:15:20.626103Z", "iopub.status.idle": "2026-05-19T20:15:20.634235Z", "shell.execute_reply": "2026-05-19T20:15:20.633737Z" } }, "outputs": [], "source": [ "%%cpp -d\n", "#include \n", "#include \n", "#include \n", "\n", "#include \n", "\n", "#include \n", "#include \n", "#include \n", "#include \n", "#include \n", "#include \n", "\n", "constexpr char const *kNTupleFileName = \"ntpl013_staged.root\";\n", "\n", "constexpr int kNWriterThreads = 4;\n", "\n", "constexpr int kNEventsPerThread = 25000;\n", "\n", "constexpr int kNEventsPerBlock = 10000;" ] }, { "cell_type": "markdown", "id": "268c88ac", "metadata": {}, "source": [ " Where to store the ntuple of this example\n", "Number of parallel threads to fill the ntuple\n", "Number of events to generate is kNEventsPerThread * kNWriterThreads\n", "Number of events per block\n", "Thread function to generate and write events\n", " " ] }, { "cell_type": "code", "execution_count": 2, "id": "cda08cef", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:20.635813Z", "iopub.status.busy": "2026-05-19T20:15:20.635688Z", "iopub.status.idle": "2026-05-19T20:15:20.724525Z", "shell.execute_reply": "2026-05-19T20:15:20.723897Z" } }, "outputs": [], "source": [ "%%cpp -d\n", "void FillData(int id, ROOT::RNTupleParallelWriter *writer)\n", "{\n", " // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data\n", " // structures in real code!\n", " static std::mutex g_Mutex;\n", " static ROOT::NTupleSize_t g_WrittenEntries = 0;\n", "\n", " using generator = std::mt19937;\n", " generator gen;\n", "\n", " // Create a fill context and turn on staged cluster committing.\n", " auto fillContext = writer->CreateFillContext();\n", " fillContext->EnableStagedClusterCommitting();\n", " auto entry = fillContext->CreateEntry();\n", "\n", " auto eventId = entry->GetPtr(\"eventId\");\n", " auto eventIdStart = id * kNEventsPerThread;\n", " auto rndm = entry->GetPtr(\"rndm\");\n", "\n", " for (int i = 0; i < kNEventsPerThread; i++) {\n", " // Prepare the entry with an id and a random number.\n", " *eventId = eventIdStart + i;\n", " auto d = static_cast(gen()) / generator::max();\n", " *rndm = static_cast(d);\n", "\n", " // Fill might auto-flush a cluster, which will be staged.\n", " fillContext->Fill(*entry);\n", " }\n", "\n", " // It is important to first FlushCluster() so that a cluster with the remaining entries is staged.\n", " fillContext->FlushCluster();\n", " {\n", " std::lock_guard g(g_Mutex);\n", " fillContext->CommitStagedClusters();\n", " std::cout << \"Thread #\" << id << \" wrote events #\" << eventIdStart << \" - #\"\n", " << (eventIdStart + kNEventsPerThread - 1) << \" as entries #\" << g_WrittenEntries << \" - #\"\n", " << (g_WrittenEntries + kNEventsPerThread - 1) << std::endl;\n", " g_WrittenEntries += kNEventsPerThread;\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "1209a745", "metadata": {}, "source": [ " Generate kNEvents with multiple threads in kNTupleFileName\n", " " ] }, { "cell_type": "code", "execution_count": 3, "id": "a5c2dd03", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:20.726264Z", "iopub.status.busy": "2026-05-19T20:15:20.726133Z", "iopub.status.idle": "2026-05-19T20:15:20.951097Z", "shell.execute_reply": "2026-05-19T20:15:20.950342Z" } }, "outputs": [], "source": [ "%%cpp -d\n", "void Write()\n", "{\n", " std::cout << \" === Writing with staged cluster committing ===\" << std::endl;\n", "\n", " // Create the data model\n", " auto model = ROOT::RNTupleModel::CreateBare();\n", " model->MakeField(\"eventId\");\n", " model->MakeField(\"rndm\");\n", "\n", " // Create RNTupleWriteOptions to make the writing commit multiple clusters.\n", " // This is for demonstration purposes only to have multiple clusters per\n", " // thread that are implicitly flushed, and should not be copied into real\n", " // code!\n", " ROOT::RNTupleWriteOptions options;\n", " options.SetApproxZippedClusterSize(32'000);\n", "\n", " // We hand over the data model to a newly created ntuple of name \"NTuple\", stored in kNTupleFileName\n", " auto writer = ROOT::RNTupleParallelWriter::Recreate(std::move(model), \"NTuple\", kNTupleFileName, options);\n", "\n", " std::vector threads;\n", " for (int i = 0; i < kNWriterThreads; ++i)\n", " threads.emplace_back(FillData, i, writer.get());\n", " for (int i = 0; i < kNWriterThreads; ++i)\n", " threads[i].join();\n", "\n", " // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk\n", " // and closes the attached ROOT file.\n", "}" ] }, { "cell_type": "markdown", "id": "d40afca7", "metadata": {}, "source": [ " Definition of a helper function: " ] }, { "cell_type": "code", "execution_count": 4, "id": "6619e8dd", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:20.952655Z", "iopub.status.busy": "2026-05-19T20:15:20.952513Z", "iopub.status.idle": "2026-05-19T20:15:21.001225Z", "shell.execute_reply": "2026-05-19T20:15:21.000494Z" } }, "outputs": [], "source": [ "%%cpp -d\n", "void FillDataInBlocks(int id, ROOT::RNTupleParallelWriter *writer)\n", "{\n", " // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data\n", " // structures in real code!\n", " static std::mutex g_Mutex;\n", " static ROOT::NTupleSize_t g_WrittenEntries = 0;\n", "\n", " using generator = std::mt19937;\n", " generator gen;\n", "\n", " // Create a fill context and turn on staged cluster committing.\n", " auto fillContext = writer->CreateFillContext();\n", " fillContext->EnableStagedClusterCommitting();\n", " auto entry = fillContext->CreateEntry();\n", "\n", " auto eventId = entry->GetPtr(\"eventId\");\n", " auto eventIdStart = id * kNEventsPerThread;\n", " int startOfBlock = 0;\n", " auto rndm = entry->GetPtr(\"rndm\");\n", "\n", " for (int i = 0; i < kNEventsPerThread; i++) {\n", " // Prepare the entry with an id and a random number.\n", " *eventId = eventIdStart + i;\n", " auto d = static_cast(gen()) / generator::max();\n", " *rndm = static_cast(d);\n", "\n", " // Fill might auto-flush a cluster, which will be staged.\n", " fillContext->Fill(*entry);\n", "\n", " if ((i + 1) % kNEventsPerBlock == 0) {\n", " // Decide to flush this cluster and logically append all staged clusters to the ntuple.\n", " fillContext->FlushCluster();\n", " {\n", " std::lock_guard g(g_Mutex);\n", " fillContext->CommitStagedClusters();\n", " auto firstEvent = eventIdStart + startOfBlock;\n", " auto lastEvent = eventIdStart + i;\n", " std::cout << \"Thread #\" << id << \" wrote events #\" << firstEvent << \" - #\" << lastEvent << \" as entries #\"\n", " << g_WrittenEntries << \" - #\" << (g_WrittenEntries + kNEventsPerBlock - 1) << std::endl;\n", " g_WrittenEntries += kNEventsPerBlock;\n", " startOfBlock += kNEventsPerBlock;\n", " }\n", " }\n", " }\n", "\n", " // Flush the remaining data and commit staged clusters.\n", " fillContext->FlushCluster();\n", " {\n", " std::lock_guard g(g_Mutex);\n", " fillContext->CommitStagedClusters();\n", " auto firstEvent = eventIdStart + startOfBlock;\n", " auto lastEvent = eventIdStart + kNEventsPerThread - 1;\n", " auto numEvents = kNEventsPerThread - startOfBlock;\n", " std::cout << \"Thread #\" << id << \" wrote events #\" << firstEvent << \" - #\" << lastEvent << \" as entries #\"\n", " << g_WrittenEntries << \" - #\" << (g_WrittenEntries + numEvents - 1) << std::endl;\n", " g_WrittenEntries += numEvents;\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "1c2cb611", "metadata": {}, "source": [ " Generate kNEvents with multiple threads in kNTupleFileName, and sequence them in blocks of kNEventsPerBlock entries\n", " " ] }, { "cell_type": "code", "execution_count": 5, "id": "7600099e", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:21.002680Z", "iopub.status.busy": "2026-05-19T20:15:21.002531Z", "iopub.status.idle": "2026-05-19T20:15:21.114196Z", "shell.execute_reply": "2026-05-19T20:15:21.113392Z" } }, "outputs": [], "source": [ "%%cpp -d\n", "void WriteInBlocks()\n", "{\n", " std::cout << \"\\n === ... with sequencing in blocks of \" << kNEventsPerBlock << \" events ===\" << std::endl;\n", "\n", " // Create the data model\n", " auto model = ROOT::RNTupleModel::CreateBare();\n", " model->MakeField(\"eventId\");\n", " model->MakeField(\"rndm\");\n", "\n", " // Create RNTupleWriteOptions to make the writing commit multiple clusters.\n", " // This is for demonstration purposes only to have multiple clusters per\n", " // thread and also per block that are implicitly flushed, and can be mixed\n", " // with explicit calls to FlushCluster(). This should not be copied into real\n", " // code!\n", " ROOT::RNTupleWriteOptions options;\n", " options.SetApproxZippedClusterSize(32'000);\n", "\n", " // We hand over the data model to a newly created ntuple of name \"NTuple\", stored in kNTupleFileName\n", " auto writer = ROOT::RNTupleParallelWriter::Recreate(std::move(model), \"NTuple\", kNTupleFileName, options);\n", "\n", " std::vector threads;\n", " for (int i = 0; i < kNWriterThreads; ++i)\n", " threads.emplace_back(FillDataInBlocks, i, writer.get());\n", " for (int i = 0; i < kNWriterThreads; ++i)\n", " threads[i].join();\n", "\n", " // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk\n", " // and closes the attached ROOT file.\n", "}" ] }, { "cell_type": "code", "execution_count": 6, "id": "032bd2b2", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:21.115725Z", "iopub.status.busy": "2026-05-19T20:15:21.115565Z", "iopub.status.idle": "2026-05-19T20:15:21.905055Z", "shell.execute_reply": "2026-05-19T20:15:21.904325Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " === Writing with staged cluster committing ===\n", "Thread #1 wrote events #25000 - #49999 as entries #0 - #24999\n", "Thread #2 wrote events #50000 - #74999 as entries #25000 - #49999\n", "Thread #0 wrote events #0 - #24999 as entries #50000 - #74999\n", "Thread #3 wrote events #75000 - #99999 as entries #75000 - #99999\n", "\n", " === ... with sequencing in blocks of 10000 events ===\n", "Thread #0 wrote events #0 - #9999 as entries #0 - #9999\n", "Thread #2 wrote events #50000 - #59999 as entries #10000 - #19999\n", "Thread #1 wrote events #25000 - #34999 as entries #20000 - #29999\n", "Thread #3 wrote events #75000 - #84999 as entries #30000 - #39999\n", "Thread #1 wrote events #35000 - #44999 as entries #40000 - #49999\n", "Thread #0 wrote events #10000 - #19999 as entries #50000 - #59999\n", "Thread #2 wrote events #60000 - #69999 as entries #60000 - #69999\n", "Thread #3 wrote events #85000 - #94999 as entries #70000 - #79999\n", "Thread #1 wrote events #45000 - #49999 as entries #80000 - #84999\n", "Thread #0 wrote events #20000 - #24999 as entries #85000 - #89999\n", "Thread #2 wrote events #70000 - #74999 as entries #90000 - #94999\n", "Thread #3 wrote events #95000 - #99999 as entries #95000 - #99999\n" ] } ], "source": [ "Write();\n", "WriteInBlocks();" ] }, { "cell_type": "markdown", "id": "5068ec07", "metadata": {}, "source": [ "Draw all canvases " ] }, { "cell_type": "code", "execution_count": 7, "id": "1cc6a8a4", "metadata": { "collapsed": false, "execution": { "iopub.execute_input": "2026-05-19T20:15:21.906552Z", "iopub.status.busy": "2026-05-19T20:15:21.906424Z", "iopub.status.idle": "2026-05-19T20:15:22.111693Z", "shell.execute_reply": "2026-05-19T20:15:22.110726Z" } }, "outputs": [], "source": [ "gROOT->GetListOfCanvases()->Draw()" ] } ], "metadata": { "kernelspec": { "display_name": "ROOT C++", "language": "c++", "name": "root" }, "language_info": { "codemirror_mode": "text/x-c++src", "file_extension": ".C", "mimetype": " text/x-c++src", "name": "c++" } }, "nbformat": 4, "nbformat_minor": 5 }