Logo ROOT   6.14/05
Reference Guide
TBufferMerger.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "ROOT/TBufferMerger.hxx"
13 
14 #include "TBufferFile.h"
15 #include "TError.h"
16 #include "TROOT.h"
17 #include "TVirtualMutex.h"
18 
19 #include <utility>
20 
21 namespace ROOT {
22 namespace Experimental {
23 
24 TBufferMerger::TBufferMerger(const char *name, Option_t *option, Int_t compress)
25 {
26  // We cannot chain constructors or use in-place initialization here because
27  // instantiating a TBufferMerger should not alter gDirectory's state.
29  Init(std::unique_ptr<TFile>(TFile::Open(name, option, /* title */ name, compress)));
30 }
31 
32 TBufferMerger::TBufferMerger(std::unique_ptr<TFile> output)
33 {
34  Init(std::move(output));
35 }
36 
37 void TBufferMerger::Init(std::unique_ptr<TFile> output)
38 {
39  if (!output || !output->IsWritable() || output->IsZombie())
40  Error("TBufferMerger", "cannot write to output file");
41 
42  fMerger.OutputFile(std::move(output));
43  fMergingThread.reset(new std::thread([&]() { this->WriteOutputFile(); }));
44 }
45 
47 {
48  for (const auto &f : fAttachedFiles)
49  if (!f.expired()) Fatal("TBufferMerger", " TBufferMergerFiles must be destroyed before the server");
50 
51  this->Push(nullptr);
52  fMergingThread->join();
53 }
54 
55 std::shared_ptr<TBufferMergerFile> TBufferMerger::GetFile()
56 {
58  std::shared_ptr<TBufferMergerFile> f(new TBufferMergerFile(*this));
59  gROOT->GetListOfFiles()->Remove(f.get());
60  fAttachedFiles.push_back(f);
61  return f;
62 }
63 
65 {
66  return fQueue.size();
67 }
68 
70 {
71  fCallback = f;
72 }
73 
75 {
76  {
77  std::lock_guard<std::mutex> lock(fQueueMutex);
78  fQueue.push(buffer);
79  }
80  fDataAvailable.notify_one();
81 }
82 
84 {
85  return fAutoSave;
86 }
87 
88 void TBufferMerger::SetAutoSave(size_t size)
89 {
90  fAutoSave = size;
91 }
92 
94 {
95  fBuffered = 0;
97  fMerger.Reset();
98 
99  if (fCallback)
100  fCallback();
101 }
102 
104 {
105  std::unique_ptr<TBufferFile> buffer;
106 
107  while (true) {
108  std::unique_lock<std::mutex> lock(fQueueMutex);
109  fDataAvailable.wait(lock, [this]() { return !this->fQueue.empty(); });
110 
111  buffer.reset(fQueue.front());
112  fQueue.pop();
113  lock.unlock();
114 
115  if (!buffer)
116  break;
117 
118  fBuffered += buffer->BufferSize();
119  fMerger.AddAdoptFile(new TMemFile(fMerger.GetOutputFileName(), buffer->Buffer(), buffer->BufferSize(), "read"));
120 
121  if (fBuffered > fAutoSave)
122  Merge();
123  }
124 
125  Merge();
126 }
127 
128 } // namespace Experimental
129 } // namespace ROOT
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket...
Definition: TBufferFile.h:46
std::vector< std::weak_ptr< TBufferMergerFile > > fAttachedFiles
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
void Fatal(const char *location, const char *msgfmt,...)
const char Option_t
Definition: RtypesCore.h:62
#define gROOT
Definition: TROOT.h:410
#define f(i)
Definition: RSha256.hxx:104
int Int_t
Definition: RtypesCore.h:41
R__EXTERN TVirtualMutex * gROOTMutex
Definition: TROOT.h:57
size_t GetAutoSave() const
Returns the current value of the auto save setting in bytes (default = 0).
void Push(TBufferFile *buffer)
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
void Init(std::unique_ptr< TFile >)
const char * GetOutputFileName() const
Definition: TFileMerger.h:84
std::queue< TBufferFile * > fQueue
char * Buffer() const
Definition: TBuffer.h:93
void SetAutoSave(size_t size)
By default, TBufferMerger will call TFileMerger::PartialMerge() for each buffer pushed onto its merge...
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:146
virtual Bool_t AddAdoptFile(TFile *source, Bool_t cpProgress=kTRUE)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition: TObject.h:134
size_t GetQueueSize() const
Returns the number of buffers currently in the queue.
void RegisterCallback(const std::function< void(void)> &f)
Register a user callback function to be called after a buffer has been removed from the merging queue...
virtual ~TBufferMerger()
Destructor.
std::condition_variable fDataAvailable
static TFilePtr Open(std::string_view name, const Options_t &opts=Options_t())
Open a file with name for reading.
Definition: TFile.cxx:153
std::unique_ptr< std::thread > fMergingThread
#define R__LOCKGUARD(mutex)
Int_t BufferSize() const
Definition: TBuffer.h:94
TBufferMerger()
TBufferMerger has no default constructor.
std::function< void(void)> fCallback
std::shared_ptr< TBufferMergerFile > GetFile()
Returns a TBufferMergerFile to which data can be written.
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Bool_t IsWritable() const
virtual void Reset()
Reset merger file list.
char name[80]
Definition: TGX11.cxx:109
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.