Logo ROOT   6.14/05
Reference Guide
TBufferMerger.hxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TBufferMerger
13 #define ROOT_TBufferMerger
14 
15 #include "TFileMerger.h"
16 #include "TMemFile.h"
17 
18 #include <condition_variable>
19 #include <functional>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23 #include <thread>
24 
25 namespace ROOT {
26 namespace Experimental {
27 
28 class TBufferMergerFile;
29 
30 /**
31  * \class TBufferMerger TBufferMerger.hxx
32  * \ingroup IO
33  *
34  * TBufferMerger is a class to facilitate writing data in
35  * parallel from multiple threads, while writing to a single
36  * output file. Its purpose is similar to TParallelMergingFile,
37  * but instead of using processes that connect to a network
38  * socket, TBufferMerger uses threads that each write to a
39  * TBufferMergerFile, which in turn push data into a queue
40  * managed by the TBufferMerger.
41  */
42 
44 public:
45  /** Constructor
46  * @param name Output file name
47  * @param option Output file creation options
48  * @param compression Output file compression level
49  */
50  TBufferMerger(const char *name, Option_t *option = "RECREATE", Int_t compress = 1);
51 
52  /** Constructor
53  * @param output Output \c TFile
54  */
55  TBufferMerger(std::unique_ptr<TFile> output);
56 
57  /** Destructor */
58  virtual ~TBufferMerger();
59 
60  /** Returns a TBufferMergerFile to which data can be written.
61  * At the end, all TBufferMergerFiles get merged into the output file.
62  * The user is responsible to "cd" into the file to associate objects
63  * such as histograms or trees to it.
64  *
65  * After the creation of this file, the user must reset the kMustCleanup
66  * bit on any objects attached to it and take care of their deletion, as
67  * there is a possibility that a race condition will happen that causes
68  * a crash if ROOT manages these objects.
69  */
70  std::shared_ptr<TBufferMergerFile> GetFile();
71 
72  /** Returns the number of buffers currently in the queue. */
73  size_t GetQueueSize() const;
74 
75  /** Register a user callback function to be called after a buffer has been
76  * removed from the merging queue and finished being processed. This
77  * function can be useful to allow asynchronous launching of new tasks to
78  * push more data into the queue once its size satisfies user requirements.
79  */
80  void RegisterCallback(const std::function<void(void)> &f);
81 
82  /** Returns the current value of the auto save setting in bytes (default = 0). */
83  size_t GetAutoSave() const;
84 
85  /** By default, TBufferMerger will call TFileMerger::PartialMerge() for each
86  * buffer pushed onto its merge queue. This function lets the user change
87  * this behaviour by telling TBufferMerger to accumulate at least @param size
88  * bytes in memory before performing a partial merge and flushing to disk.
89  * This can be useful to avoid an excessive amount of work to happen in the
90  * output thread, as the number of TTree headers (which require compression)
91  * written to disk can be reduced.
92  */
93  void SetAutoSave(size_t size);
94 
95  friend class TBufferMergerFile;
96 
97 private:
98  /** TBufferMerger has no default constructor */
99  TBufferMerger();
100 
101  /** TBufferMerger has no copy constructor */
102  TBufferMerger(const TBufferMerger &);
103 
104  /** TBufferMerger has no copy operator */
106 
107  void Init(std::unique_ptr<TFile>);
108 
109  void Merge();
110  void Push(TBufferFile *buffer);
111  void WriteOutputFile();
112 
113  size_t fAutoSave{0}; //< AutoSave only every fAutoSave bytes
114  size_t fBuffered{0}; //< Number of bytes currently buffered
115  TFileMerger fMerger{false, false}; //< TFileMerger used to merge all buffers
116  std::mutex fQueueMutex; //< Mutex used to lock fQueue
117  std::condition_variable fDataAvailable; //< Condition variable used to wait for data
118  std::queue<TBufferFile *> fQueue; //< Queue to which data is pushed and merged
119  std::unique_ptr<std::thread> fMergingThread; //< Worker thread that writes to disk
120  std::vector<std::weak_ptr<TBufferMergerFile>> fAttachedFiles; //< Attached files
121  std::function<void(void)> fCallback; //< Callback for when data is removed from queue
122 };
123 
124 /**
125  * \class TBufferMerger TBufferMerger.hxx
126  * \ingroup IO
127  *
128  * A TBufferMergerFile is similar to a TMemFile, but when data
129  * is written to it, it is appended to the TBufferMerger queue.
130  * The TBufferMerger merges all data into the output file on disk.
131  */
132 
133 class TBufferMergerFile : public TMemFile {
134 private:
135  TBufferMerger &fMerger; //< TBufferMerger this file is attached to
136 
137  /** Constructor. Can only be called by TBufferMerger.
138  * @param m Merger this file is attached to. */
140 
141  /** TBufferMergerFile has no default constructor. */
143 
144  /** TBufferMergerFile has no copy constructor. */
146 
147  /** TBufferMergerFile has no copy operator */
149 
150  friend class TBufferMerger;
151 
152 public:
153  /** Destructor */
155 
156  using TMemFile::Write;
157 
158  /** Write data into a TBufferFile and append it to TBufferMerger.
159  * @param name Name
160  * @param opt Options
161  * @param bufsize Buffer size
162  * This function must be called before the TBufferMergerFile gets destroyed,
163  * or no data is appended to the TBufferMerger.
164  */
165  virtual Int_t Write(const char *name = nullptr, Int_t opt = 0, Int_t bufsize = 0) override;
166 
168 };
169 
170 } // namespace Experimental
171 } // namespace ROOT
172 
173 #endif
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket...
Definition: TBufferFile.h:46
auto * m
Definition: textangle.C:8
std::vector< std::weak_ptr< TBufferMergerFile > > fAttachedFiles
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
const char Option_t
Definition: RtypesCore.h:62
#define f(i)
Definition: RSha256.hxx:104
int Int_t
Definition: RtypesCore.h:41
size_t GetAutoSave() const
Returns the current value of the auto save setting in bytes (default = 0).
void Push(TBufferFile *buffer)
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
void Init(std::unique_ptr< TFile >)
std::queue< TBufferFile * > fQueue
TBufferMerger & operator=(const TBufferMerger &)
TBufferMerger has no copy operator.
void SetAutoSave(size_t size)
By default, TBufferMerger will call TFileMerger::PartialMerge() for each buffer pushed onto its merge...
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:146
size_t GetQueueSize() const
Returns the number of buffers currently in the queue.
void RegisterCallback(const std::function< void(void)> &f)
Register a user callback function to be called after a buffer has been removed from the merging queue...
virtual ~TBufferMerger()
Destructor.
std::condition_variable fDataAvailable
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file.
Definition: TFile.cxx:2336
This class provides file copy and merging services.
Definition: TFileMerger.h:30
TBufferMerger is a class to facilitate writing data in parallel from multiple threads, while writing to a single output file.
std::unique_ptr< std::thread > fMergingThread
TBufferMerger()
TBufferMerger has no default constructor.
std::function< void(void)> fCallback
std::shared_ptr< TBufferMergerFile > GetFile()
Returns a TBufferMergerFile to which data can be written.
#define ClassDefOverride(name, id)
Definition: Rtypes.h:324
char name[80]
Definition: TGX11.cxx:109