58class RPageSynchronizingSink :
public RPageSink {
61 RPageSink *fInnerSink;
65 explicit RPageSynchronizingSink(RPageSink &inner, std::mutex &mutex)
66 : RPageSink(inner.GetNTupleName(), inner.GetWriteOptions()), fInnerSink(&inner), fMutex(&mutex)
72 RPageSynchronizingSink(
const RPageSynchronizingSink &) =
delete;
73 RPageSynchronizingSink &
operator=(
const RPageSynchronizingSink &) =
delete;
75 const RNTupleDescriptor &GetDescriptor() const final {
return fInnerSink->GetDescriptor(); }
77 NTupleSize_t GetNEntries() const final {
return fInnerSink->GetNEntries(); }
79 ColumnHandle_t AddColumn(DescriptorId_t, RColumn &)
final {
return {}; }
80 void InitImpl(ROOT::RNTupleModel &)
final {}
81 void UpdateSchema(
const RNTupleModelChangeset &, NTupleSize_t)
final
83 throw ROOT::RException(
R__FAIL(
"UpdateSchema not supported via RPageSynchronizingSink"));
85 void UpdateExtraTypeInfo(
const RExtraTypeInfoDescriptor &)
final
87 throw ROOT::RException(
R__FAIL(
"UpdateExtraTypeInfo not supported via RPageSynchronizingSink"));
90 void CommitSuppressedColumn(ColumnHandle_t handle)
final { fInnerSink->CommitSuppressedColumn(handle); }
91 void CommitPage(ColumnHandle_t,
const RPage &)
final
93 throw ROOT::RException(
R__FAIL(
"should never commit single pages via RPageSynchronizingSink"));
95 void CommitSealedPage(DescriptorId_t,
const RSealedPage &)
final
97 throw ROOT::RException(
R__FAIL(
"should never commit sealed pages via RPageSynchronizingSink"));
99 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
final
101 fInnerSink->CommitSealedPageV(ranges);
103 std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
final {
return fInnerSink->CommitCluster(nNewEntries); }
104 RStagedCluster StageCluster(NTupleSize_t nNewEntries)
final {
return fInnerSink->StageCluster(nNewEntries); }
105 void CommitStagedClusters(std::span<RStagedCluster> clusters)
final { fInnerSink->CommitStagedClusters(clusters); }
106 void CommitClusterGroup() final
108 throw ROOT::RException(
R__FAIL(
"should never commit cluster group via RPageSynchronizingSink"));
111 ROOT::Internal::RNTupleLink CommitDatasetImpl() final
113 throw ROOT::RException(
R__FAIL(
"should never commit dataset via RPageSynchronizingSink"));
116 RSinkGuard GetSinkGuard() final {
return RSinkGuard(fMutex); }
118 std::unique_ptr<RPageSink> CloneAsHidden(std::string_view,
const ROOT::RNTupleWriteOptions &)
const final
120 throw ROOT::RException(
R__FAIL(
"cloning a RPageSynchronizingSink is not implemented yet"));
123 void CommitAttributeSet(std::string_view,
const ROOT::Internal::RNTupleLink &)
final
125 throw ROOT::RException(
R__FAIL(
"committing attribute sets is not implemented yet for parallel writing"));
132 std::unique_ptr<RPageSink> sink)
135 if (
fModel->GetRegisteredSubfieldNames().size() > 0) {
136 throw RException(R__FAIL(
"cannot create an RNTupleParallelWriter from a model with registered subfields"));
158 if (!context.expired()) {
164 fSink->CommitClusterGroup();
165 fSink->CommitDataset();
169std::unique_ptr<ROOT::RNTupleParallelWriter>
179 return std::unique_ptr<RNTupleParallelWriter>(
new RNTupleParallelWriter(std::move(model), std::move(sink)));
182std::unique_ptr<ROOT::RNTupleParallelWriter>
186 auto file = fileOrDirectory.
GetFile();
189 R__FAIL(
"RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into a directory "
190 "that is not backed by a file"));
192 if (!file->IsBinary()) {
193 throw RException(
R__FAIL(
"RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into " +
194 std::string(file->GetName())));
196 if (!file->IsWritable()) {
198 "' given to RNTupleParallelWriter is not writable. Open it with 'UPDATE' or 'RECREATE' "
199 "if you want to write into it."));
205 auto sink = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, fileOrDirectory, options);
207 return std::unique_ptr<RNTupleParallelWriter>(
new RNTupleParallelWriter(std::move(model), std::move(sink)));
214 auto model =
fModel->Clone();
216 std::make_unique<ROOT::Internal::RPageSinkBuf>(std::make_unique<RPageSynchronizingSink>(*
fSink,
fSinkMutex));
220 std::shared_ptr<RNTupleFillContext> context(
new RNTupleFillContext(std::move(model), std::move(sink)));
226 context->fZipTasks = std::make_unique<ROOT::Experimental::Internal::RNTupleImtTaskScheduler>();
227 context->fSink->SetTaskScheduler(context->fZipTasks.get());
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
RInterface & operator=(const RInterface &)=default
Copy-assignment operator for RInterface.
#define R__LOG_ERROR(...)
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
std::string GetReport() const
Format a dignostics report, e.g. for an exception message.
Base class for all ROOT issued exceptions.
const RError & GetError() const
The on-storage metadata of an RNTuple.
A context for filling entries (data) into clusters of an RNTuple.
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Recreate a new file and return a writer to write an RNTuple.
std::vector< std::weak_ptr< RNTupleFillContext > > fFillContexts
List of all created helpers. They must be destroyed before this RNTupleParallelWriter is destructed.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an RNTuple to the existing file.
Experimental::Detail::RNTupleMetrics fMetrics
void CommitDataset()
Automatically called by the destructor.
std::mutex fMutex
A global mutex to protect the internal data structures of this object.
std::unique_ptr< ROOT::Internal::RPageSink > fSink
The final RPageSink that represents the synchronization point.
RNTupleParallelWriter(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
std::shared_ptr< RNTupleFillContext > CreateFillContext()
Create a new RNTupleFillContext that can be used to fill entries and prepare clusters in parallel.
std::unique_ptr< ROOT::RNTupleModel > fModel
The original RNTupleModel connected to fSink; needs to be destructed before it.
std::mutex fSinkMutex
A mutex to synchronize the final page sink.
Common user-tunable settings for storing RNTuples.
bool GetUseBufferedWrite() const
Abstract interface to write data into an ntuple.
Describe directory structure in memory.
virtual TFile * GetFile() const
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel