Logo ROOT   6.12/07
Reference Guide
TBufferMerger.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "ROOT/TBufferMerger.hxx"
13 
14 #include "TBufferFile.h"
15 #include "TError.h"
16 #include "TFileMerger.h"
17 #include "TROOT.h"
18 #include "TVirtualMutex.h"
19 
20 namespace ROOT {
21 namespace Experimental {
22 
23 TBufferMerger::TBufferMerger(const char *name, Option_t *option, Int_t compress)
24 {
25  // NOTE: We cannot use ctor chaining or in-place initialization because we want this operation to have no effect on
26  // ROOT's gDirectory.
28  if (TFile *output = TFile::Open(name, option, /*title*/ name, compress))
29  Init(std::unique_ptr<TFile>(output));
30  else
31  Error("OutputFile", "cannot open the MERGER output file %s", name);
32 }
33 
34 TBufferMerger::TBufferMerger(std::unique_ptr<TFile> output)
35 {
36  Init(std::move(output));
37 }
38 
39 void TBufferMerger::Init(std::unique_ptr<TFile> output)
40 {
41  fFile = output.release();
42  fAutoSave = 0;
43  fMergingThread.reset(new std::thread([&]() { this->WriteOutputFile(); }));
44 }
45 
47 {
48  for (auto f : fAttachedFiles)
49  if (!f.expired()) Fatal("TBufferMerger", " TBufferMergerFiles must be destroyed before the server");
50 
51  this->Push(nullptr);
52  fMergingThread->join();
53 }
54 
55 std::shared_ptr<TBufferMergerFile> TBufferMerger::GetFile()
56 {
58  std::shared_ptr<TBufferMergerFile> f(new TBufferMergerFile(*this));
59  gROOT->GetListOfFiles()->Remove(f.get());
60  fAttachedFiles.push_back(f);
61  return f;
62 }
63 
65 {
66  return fQueue.size();
67 }
68 
70 {
71  fCallback = f;
72 }
73 
75 {
76  {
77  std::lock_guard<std::mutex> lock(fQueueMutex);
78  fQueue.push(buffer);
79  }
80  fDataAvailable.notify_one();
81 }
82 
84 {
85  return fAutoSave;
86 }
87 
88 void TBufferMerger::SetAutoSave(size_t size)
89 {
90  fAutoSave = size;
91 }
92 
94 {
95  size_t buffered = 0;
96  std::vector<std::unique_ptr<TMemFile>> memfiles;
97  std::unique_ptr<TBufferFile> buffer;
98  TFileMerger merger;
99 
100  merger.ResetBit(kMustCleanup);
101 
102  {
104  merger.OutputFile(std::unique_ptr<TFile>(fFile));
105  }
106 
107  while (true) {
108  std::unique_lock<std::mutex> lock(fQueueMutex);
109  fDataAvailable.wait(lock, [this]() { return !this->fQueue.empty(); });
110 
111  buffer.reset(fQueue.front());
112  fQueue.pop();
113  lock.unlock();
114 
115  if (!buffer)
116  break;
117 
118  Long64_t length;
119  buffer->SetReadMode();
120  buffer->SetBufferOffset();
121  buffer->ReadLong64(length);
122  buffered += length;
123 
124  {
126  memfiles.emplace_back(new TMemFile(fFile->GetName(), buffer->Buffer() + buffer->Length(), length, "read"));
127  buffer->SetBufferOffset(buffer->Length() + length);
128  merger.AddFile(memfiles.back().get(), false);
129 
130  if (buffered > fAutoSave) {
131  buffered = 0;
132  merger.PartialMerge();
133  merger.Reset();
134  memfiles.clear();
135  }
136  }
137 
138  if (fCallback)
139  fCallback();
140  }
141 
143  merger.PartialMerge();
144  merger.Reset();
145 }
146 
147 } // namespace Experimental
148 } // namespace ROOT
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:90
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket...
Definition: TBufferFile.h:47
long long Long64_t
Definition: RtypesCore.h:69
std::vector< std::weak_ptr< TBufferMergerFile > > fAttachedFiles
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
void Fatal(const char *location, const char *msgfmt,...)
const char Option_t
Definition: RtypesCore.h:62
#define gROOT
Definition: TROOT.h:402
int Int_t
Definition: RtypesCore.h:41
R__EXTERN TVirtualMutex * gROOTMutex
Definition: TROOT.h:57
size_t GetAutoSave() const
Returns the current value of the auto save setting in bytes (default = 0).
void Push(TBufferFile *buffer)
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:17
Int_t Length() const
Definition: TBuffer.h:96
void Init(std::unique_ptr< TFile >)
std::queue< TBufferFile * > fQueue
virtual void ReadLong64(Long64_t &l)
Definition: TBufferFile.h:490
char * Buffer() const
Definition: TBuffer.h:93
void SetAutoSave(size_t size)
By default, TBufferMerger will call TFileMerger::PartialMerge() for each buffer pushed onto its merge...
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:146
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
size_t GetQueueSize() const
Returns the number of buffers currently in the queue.
void RegisterCallback(const std::function< void(void)> &f)
Register a user callback function to be called after a buffer has been removed from the merging queue...
virtual ~TBufferMerger()
Destructor.
std::condition_variable fDataAvailable
static TFilePtr Open(std::string_view name, const Options_t &opts=Options_t())
Open a file with name for reading.
Definition: TFile.cxx:153
This class provides file copy and merging services.
Definition: TFileMerger.h:30
void SetReadMode()
Set buffer in read mode.
Definition: TBuffer.cxx:281
std::unique_ptr< std::thread > fMergingThread
#define R__LOCKGUARD(mutex)
TBufferMerger()
TBufferMerger has no default constructor.
std::function< void(void)> fCallback
std::shared_ptr< TBufferMergerFile > GetFile()
Returns a TBufferMergerFile to which data can be written.
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
void ResetBit(UInt_t f)
Definition: TObject.h:171
virtual void Reset()
Reset merger file list.
A ROOT file.
Definition: TFile.hxx:45
char name[80]
Definition: TGX11.cxx:109
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.