Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMessage.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 19/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMessage //
15// //
16// Message buffer class used for serializing objects and sending them //
17// over a network. This class inherits from TBuffer the basic I/O //
18// serializer. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "TMessage.h"
23#include "Compression.h"
25#include "TList.h"
26#include "Bytes.h"
27#include "TProcessID.h"
28#include "RZip.h"
29
31
32
34
35////////////////////////////////////////////////////////////////////////////////
36/// Create a TMessage object for storing objects. The "what" integer
37/// describes the type of message. Predefined ROOT system message types
38/// can be found in MessageTypes.h. Make sure your own message types are
39/// unique from the ROOT defined message types (i.e. 0 - 10000 are
40/// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41/// will wait for an acknowledgment from the remote side. This makes
42/// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43/// the message will be compressed in TSocket using the zip algorithm
44/// (only if message is > 256 bytes).
45
47 TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t)),
48 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
49{
50 // space at the beginning of the message reserved for the message length
51 UInt_t reserved = 0;
52 *this << reserved;
53
54 fWhat = what;
55 *this << what;
56
57 fClass = nullptr;
58 fBufComp = nullptr;
59 fBufCompCur = nullptr;
60 fCompPos = nullptr;
61 fInfos = nullptr;
63
65}
66
67////////////////////////////////////////////////////////////////////////////////
68/// Create a TMessage object for reading objects. The objects will be
69/// read from buf. Use the What() method to get the message type.
70
72 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
73{
74 // skip space at the beginning of the message reserved for the message length
75 fBufCur += sizeof(UInt_t);
76
77 *this >> fWhat;
78
79 fBufComp = nullptr;
80 fBufCompCur = nullptr;
81 fCompPos = nullptr;
82 fInfos = nullptr;
84
85 if (fWhat & kMESS_ZIP) {
86 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
89 fBuffer = nullptr;
90 Uncompress();
91 if (adopt) {
92 SetBit(kIsOwnerComp); // bufcomp points to the original buf
93 }
94 }
95
96 if (fWhat == kMESS_OBJECT) {
97 InitMap();
98 fClass = ReadClass(); // get first the class stored in message
99 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
100 ResetMap();
101 } else {
102 fClass = nullptr;
103 }
104}
105
106////////////////////////////////////////////////////////////////////////////////
107/// Destructor
108
110{
111 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
113 delete [] fBufComp;
114 delete fInfos;
115}
116
117////////////////////////////////////////////////////////////////////////////////
118/// Static function enabling or disabling the automatic schema evolution.
119/// By default schema evolution support is off.
120
125
126////////////////////////////////////////////////////////////////////////////////
127/// Static function returning status of global schema evolution.
128
133
134////////////////////////////////////////////////////////////////////////////////
135/// Force writing the TStreamerInfo to the message.
136
138{
139 if (fgEvolution || fEvolution) {
140 if (!fInfos) fInfos = new TList();
141 fInfos->Add(info);
142 }
143}
144
145////////////////////////////////////////////////////////////////////////////////
146/// Change a buffer that was received into one that can be send, i.e.
147/// forward a just received message.
148
150{
151 if (IsReading()) {
152 SetWriteMode();
155
156 if (fBufComp) {
158 }
159 }
160}
161
162////////////////////////////////////////////////////////////////////////////////
163/// Remember that the StreamerInfo is being used in writing.
164///
165/// When support for schema evolution is enabled the list of TStreamerInfo
166/// used to stream this object is kept in fInfos. This information is used
167/// by TSocket::Send that sends this list through the socket. This list is in
168/// turn used by TSocket::Recv to store the TStreamerInfo objects in the
169/// relevant TClass in case the TClass does not know yet about a particular
170/// class version. This feature is implemented to support clients and servers
171/// with either different ROOT versions or different user classes versions.
172
174{
175 if (fgEvolution || fEvolution) {
176 if (!fInfos) fInfos = new TList();
177 fInfos->Add(info);
178 }
179}
180
181////////////////////////////////////////////////////////////////////////////////
182/// Reset the message buffer so we can use (i.e. fill) it again.
183
185{
186 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
187 ResetMap();
188
189 if (fBufComp) {
190 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
192 delete [] fBufComp;
193 fBufComp = nullptr;
194 fBufCompCur = nullptr;
195 fCompPos = nullptr;
196 }
197
198 if (fgEvolution || fEvolution) {
199 if (fInfos)
200 fInfos->Clear();
201 }
203}
204
205////////////////////////////////////////////////////////////////////////////////
206/// Set the message length at the beginning of the message buffer.
207/// This method is only called by TSocket::Send().
208
210{
211 if (IsWriting()) {
212 char *buf = Buffer();
213 if (buf)
214 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
215
216 if (fBufComp) {
217 buf = fBufComp;
218 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
219 }
220 }
221}
222
223////////////////////////////////////////////////////////////////////////////////
224/// Using this method one can change the message type a-posteriori
225/// In case you OR "what" with kMESS_ACK, the message will wait for
226/// an acknowledgment from the remote side. This makes the sending
227/// process synchronous.
228
230{
231 fWhat = what;
232
233 char *buf = Buffer();
234 if (buf) {
235 buf += sizeof(UInt_t); // skip reserved length space
236 tobuf(buf, what);
237 }
238
239 if (fBufComp) {
240 buf = fBufComp;
241 buf += sizeof(UInt_t); // skip reserved length space
242 tobuf(buf, what | kMESS_ZIP);
243 }
244}
245
246////////////////////////////////////////////////////////////////////////////////
247/// Set compression algorithm
248
250{
253 if (fCompress < 0) {
255 } else {
256 int level = fCompress % 100;
257 newCompress = 100 * algorithm + level;
258 }
259 if (newCompress != fCompress && fBufComp) {
260 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
262 delete [] fBufComp;
263 fBufComp = nullptr;
264 fBufCompCur = nullptr;
265 fCompPos = nullptr;
266 }
268}
269
270////////////////////////////////////////////////////////////////////////////////
271/// Set compression level
272
274{
275 if (level < 0) level = 0;
276 if (level > 99) level = 99;
278 if (fCompress < 0) {
279 newCompress = level;
280 } else {
281 int algorithm = fCompress / 100;
283 newCompress = 100 * algorithm + level;
284 }
285 if (newCompress != fCompress && fBufComp) {
286 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
288 delete [] fBufComp;
289 fBufComp = nullptr;
290 fBufCompCur = nullptr;
291 fCompPos = nullptr;
292 }
294}
295
296////////////////////////////////////////////////////////////////////////////////
297/// Set compression settings
298
300{
301 if (settings != fCompress && fBufComp) {
302 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
304 delete [] fBufComp;
305 fBufComp = nullptr;
306 fBufCompCur = nullptr;
307 fCompPos = nullptr;
308 }
310}
311
312////////////////////////////////////////////////////////////////////////////////
313/// Compress the message. The message will only be compressed if the
314/// compression level > 0 and the if the message is > 256 bytes.
315/// Returns -1 in case of error (when compression fails or
316/// when the message increases in size in some pathological cases),
317/// otherwise returns 0.
318
320{
323 if (compressionLevel <= 0) {
324 // no compression specified
325 if (fBufComp) {
326 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
328 delete [] fBufComp;
329 fBufComp = nullptr;
330 fBufCompCur = nullptr;
331 fCompPos = nullptr;
332 }
333 return 0;
334 }
335
336 if (fBufComp && fCompPos == fBufCur) {
337 // the message was already compressed
338 return 0;
339 }
340
341 // remove any existing compressed buffer before compressing modified message
342 if (fBufComp) {
343 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
345 delete [] fBufComp;
346 fBufComp = nullptr;
347 fBufCompCur = nullptr;
348 fCompPos = nullptr;
349 }
350
351 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
352 // this message is too small to be compressed
353 return 0;
354 }
355
356 if (!Buffer()) {
357 // error condition, should never happen
358 return -1;
359 }
360
361 Int_t hdrlen = 2*sizeof(UInt_t);
363 Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
364 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
365 Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
366 fBufComp = new char[buflen];
367 char *messbuf = Buffer() + hdrlen;
368 char *bufcur = fBufComp + chdrlen;
370 Int_t nzip = 0;
371 Int_t nout, bufmax;
372 for (Int_t i = 0; i < nbuffers; ++i) {
373 if (i == nbuffers - 1)
374 bufmax = messlen - nzip;
375 else
376 bufmax = kMAXZIPBUF;
379 if (nout == 0 || nout >= messlen) {
380 //this happens when the buffer cannot be compressed
382 delete [] fBufComp;
383 fBufComp = nullptr;
384 fBufCompCur = nullptr;
385 fCompPos = nullptr;
386 return -1;
387 }
388 bufcur += nout;
390 nzip += kMAXZIPBUF;
391 }
394
396 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
398 tobuf(bufcur, what);
399 tobuf(bufcur, Length()); // original uncompressed buffer length
400
401 return 0;
402}
403
404////////////////////////////////////////////////////////////////////////////////
405/// Uncompress the message. The message will only be uncompressed when
406/// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
407
409{
410 if (!fBufComp || !(fWhat & kMESS_ZIP))
411 return -1;
412
413 Int_t buflen;
414 Int_t hdrlen = 2*sizeof(UInt_t);
415 char *bufcur1 = fBufComp + hdrlen;
416 frombuf(bufcur1, &buflen);
418
419 /* early consistency check */
420 Int_t nin, nbuf;
421 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
422 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
423 return -1;
424 }
425
426 fBuffer = new char[buflen];
427 fBufSize = buflen;
428 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
430 char *messbuf = fBuffer + hdrlen;
431
432 // Force being owner of the newly created buffer
434
435 Int_t nout;
436 Int_t noutot = 0;
437 while (1) {
439 if (hc!=0) break;
440 R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
441 if (!nout) break;
442 noutot += nout;
443 if (noutot >= buflen - hdrlen) break;
444 bufcur += nin;
445 messbuf += nout;
446 }
447
448 fWhat &= ~kMESS_ZIP;
449 fCompress = 1;
450
451 return 0;
452}
453
454////////////////////////////////////////////////////////////////////////////////
455/// Check if the ProcessID pid is already in the message.
456/// If not, then:
457/// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
458/// - mark bit uid+1 where uid id the uid of the ProcessID
459
461{
462 if (fBitsPIDs.TestBitNumber(0)) return 0;
463 if (!pid)
464 pid = TProcessID::GetPID();
465 if (!pid) return 0;
467 UInt_t uid = pid->GetUniqueID();
468 fBitsPIDs.SetBitNumber(uid+1);
469 return 1;
470}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
void tobuf(char *&buf, Bool_t x)
Definition Bytes.h:55
@ kMESS_OBJECT
@ kMESS_ZIP
bool Bool_t
Definition RtypesCore.h:63
unsigned int UInt_t
Definition RtypesCore.h:46
constexpr Bool_t kFALSE
Definition RtypesCore.h:94
#define ClassImp(name)
Definition Rtypes.h:374
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition TBits.cxx:481
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket.
Definition TBufferFile.h:47
TClass * ReadClass(const TClass *cl=nullptr, UInt_t *objTag=nullptr) override
Read class definition from I/O buffer.
void InitMap() override
Create the fMap container and initialize them with the null object.
void ResetMap() override
Delete existing fMap and reset map counter.
Buffer base class used for serializing objects.
Definition TBuffer.h:43
void SetWriteMode()
Set buffer in write mode.
Definition TBuffer.cxx:315
Int_t fBufSize
Definition TBuffer.h:50
@ kCannotHandleMemberWiseStreaming
Definition TBuffer.h:76
@ kIsOwner
Definition TBuffer.h:75
char * fBufMax
Definition TBuffer.h:53
char * fBufCur
Definition TBuffer.h:52
Bool_t IsWriting() const
Definition TBuffer.h:87
Bool_t IsReading() const
Definition TBuffer.h:86
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
char * fBuffer
Definition TBuffer.h:51
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A doubly linked list.
Definition TList.h:38
void Clear(Option_t *option="") override
Remove all objects from the list.
Definition TList.cxx:400
void Add(TObject *obj) override
Definition TList.h:81
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:121
@ kIsOwnerComp
Definition TMessage.h:63
UInt_t fWhat
Definition TMessage.h:44
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force) override
Force writing the TStreamerInfo to the message.
Definition TMessage.cxx:137
char * fCompPos
Definition TMessage.h:49
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition TMessage.cxx:149
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:209
Int_t GetCompressionAlgorithm() const
Definition TMessage.h:103
TClass * fClass
Definition TMessage.h:45
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:184
char * fBufComp
Definition TMessage.h:47
char * fBufCompCur
Definition TMessage.h:48
Bool_t fEvolution
Definition TMessage.h:50
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Set compression settings.
Definition TMessage.cxx:299
UShort_t WriteProcessID(TProcessID *pid) override
Check if the ProcessID pid is already in the message.
Definition TMessage.cxx:460
Int_t Compress()
Compress the message.
Definition TMessage.cxx:319
virtual ~TMessage()
Destructor.
Definition TMessage.cxx:109
Int_t fCompress
Definition TMessage.h:46
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition TMessage.cxx:129
TMessage(const TMessage &)
void TagStreamerInfo(TVirtualStreamerInfo *info) override
Remember that the StreamerInfo is being used in writing.
Definition TMessage.cxx:173
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
Set compression level.
Definition TMessage.cxx:273
Int_t Uncompress()
Uncompress the message.
Definition TMessage.cxx:408
Int_t GetCompressionLevel() const
Definition TMessage.h:109
TBits fBitsPIDs
Definition TMessage.h:43
TList * fInfos
Definition TMessage.h:42
Int_t CompLength() const
Definition TMessage.h:93
static Bool_t fgEvolution
Definition TMessage.h:52
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:229
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
Set compression algorithm.
Definition TMessage.cxx:249
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition TObject.h:201
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:474
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:813
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1020
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Abstract Interface class describing Streamer information for one class.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static const char * what
Definition stlLoader.cc:5
EValues
Note: this is only temporarily a struct and will become a enum class hence the name convention used.
Definition Compression.h:88
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:72