Logo ROOT   6.14/05
Reference Guide
TMessage.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Fons Rademakers 19/12/96
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TMessage //
15 // //
16 // Message buffer class used for serializing objects and sending them //
17 // over a network. This class inherits from TBuffer the basic I/O //
18 // serializer. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "TMessage.h"
23 #include "Compression.h"
24 #include "TVirtualStreamerInfo.h"
25 #include "Bytes.h"
26 #include "TFile.h"
27 #include "TProcessID.h"
28 #include "RZip.h"
29 
31 
32 
34 
35 ////////////////////////////////////////////////////////////////////////////////
36 /// Create a TMessage object for storing objects. The "what" integer
37 /// describes the type of message. Predifined ROOT system message types
38 /// can be found in MessageTypes.h. Make sure your own message types are
39 /// unique from the ROOT defined message types (i.e. 0 - 10000 are
40 /// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41 /// will wait for an acknowledgement from the remote side. This makes
42 /// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43 /// the message will be compressed in TSocket using the zip algorithm
44 /// (only if message is > 256 bytes).
45 
47  TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t))
48 {
49  // space at the beginning of the message reserved for the message length
50  UInt_t reserved = 0;
51  *this << reserved;
52 
53  fWhat = what;
54  *this << what;
55 
56  fClass = 0;
57  fCompress = 0;
58  fBufComp = 0;
59  fBufCompCur = 0;
60  fCompPos = 0;
61  fInfos = 0;
63 
65 }
66 
67 ////////////////////////////////////////////////////////////////////////////////
68 /// Create a TMessage object for reading objects. The objects will be
69 /// read from buf. Use the What() method to get the message type.
70 
71 TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf)
72 {
73  // skip space at the beginning of the message reserved for the message length
74  fBufCur += sizeof(UInt_t);
75 
76  *this >> fWhat;
77 
78  fCompress = 0;
79  fBufComp = 0;
80  fBufCompCur = 0;
81  fCompPos = 0;
82  fInfos = 0;
84 
85  if (fWhat & kMESS_ZIP) {
86  // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
87  fBufComp = fBuffer;
88  fBufCompCur = fBuffer + bufsize;
89  fBuffer = 0;
90  Uncompress();
91  }
92 
93  if (fWhat == kMESS_OBJECT) {
94  InitMap();
95  fClass = ReadClass(); // get first the class stored in message
96  SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
97  ResetMap();
98  } else {
99  fClass = 0;
100  }
101 }
102 
103 ////////////////////////////////////////////////////////////////////////////////
104 /// Clean up compression buffer.
105 
107 {
108  delete [] fBufComp;
109  delete fInfos;
110 }
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Static function enabling or disabling the automatic schema evolution.
114 /// By default schema evolution support is off.
115 
117 {
118  fgEvolution = enable;
119 }
120 
121 ////////////////////////////////////////////////////////////////////////////////
122 /// Static function returning status of global schema evolution.
123 
125 {
126  return fgEvolution;
127 }
128 
129 ////////////////////////////////////////////////////////////////////////////////
130 /// Force writing the TStreamerInfo to the message.
131 
133 {
134  if (fgEvolution || fEvolution) {
135  if (!fInfos) fInfos = new TList();
136  fInfos->Add(info);
137  }
138 }
139 
140 ////////////////////////////////////////////////////////////////////////////////
141 /// Change a buffer that was received into one that can be send, i.e.
142 /// forward a just received message.
143 
145 {
146  if (IsReading()) {
147  SetWriteMode();
150 
151  if (fBufComp) {
152  fCompPos = fBufCur;
153  }
154  }
155 }
156 
157 ////////////////////////////////////////////////////////////////////////////////
158 /// Remember that the StreamerInfo is being used in writing.
159 ///
160 /// When support for schema evolution is enabled the list of TStreamerInfo
161 /// used to stream this object is kept in fInfos. This information is used
162 /// by TSocket::Send that sends this list through the socket. This list is in
163 /// turn used by TSocket::Recv to store the TStreamerInfo objects in the
164 /// relevant TClass in case the TClass does not know yet about a particular
165 /// class version. This feature is implemented to support clients and servers
166 /// with either different ROOT versions or different user classes versions.
168 {
169  if (fgEvolution || fEvolution) {
170  if (!fInfos) fInfos = new TList();
171  fInfos->Add(info);
172  }
173 }
174 
175 ////////////////////////////////////////////////////////////////////////////////
176 /// Reset the message buffer so we can use (i.e. fill) it again.
177 
179 {
180  SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
181  ResetMap();
182 
183  if (fBufComp) {
184  delete [] fBufComp;
185  fBufComp = 0;
186  fBufCompCur = 0;
187  fCompPos = 0;
188  }
189 
190  if (fgEvolution || fEvolution) {
191  if (fInfos)
192  fInfos->Clear();
193  }
195 }
196 
197 ////////////////////////////////////////////////////////////////////////////////
198 /// Set the message length at the beginning of the message buffer.
199 /// This method is only called by TSocket::Send().
200 
202 {
203  if (IsWriting()) {
204  char *buf = Buffer();
205  tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
206 
207  if (fBufComp) {
208  buf = fBufComp;
209  tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
210  }
211  }
212 }
213 
214 ////////////////////////////////////////////////////////////////////////////////
215 /// Using this method one can change the message type a-posteriory.
216 /// In case you OR "what" with kMESS_ACK, the message will wait for
217 /// an acknowledgement from the remote side. This makes the sending
218 /// process synchronous.
219 
221 {
222  fWhat = what;
223 
224  char *buf = Buffer();
225  buf += sizeof(UInt_t); // skip reserved length space
226  tobuf(buf, what);
227 
228  if (fBufComp) {
229  buf = fBufComp;
230  buf += sizeof(UInt_t); // skip reserved length space
231  tobuf(buf, what | kMESS_ZIP);
232  }
233 }
234 
235 ////////////////////////////////////////////////////////////////////////////////
236 
238 {
239  if (algorithm < 0 || algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
240  Int_t newCompress;
241  if (fCompress < 0) {
242  newCompress = 100 * algorithm + 1;
243  } else {
244  int level = fCompress % 100;
245  newCompress = 100 * algorithm + level;
246  }
247  if (newCompress != fCompress && fBufComp) {
248  delete [] fBufComp;
249  fBufComp = 0;
250  fBufCompCur = 0;
251  fCompPos = 0;
252  }
253  fCompress = newCompress;
254 }
255 
256 ////////////////////////////////////////////////////////////////////////////////
257 
259 {
260  if (level < 0) level = 0;
261  if (level > 99) level = 99;
262  Int_t newCompress;
263  if (fCompress < 0) {
264  newCompress = level;
265  } else {
266  int algorithm = fCompress / 100;
267  if (algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
268  newCompress = 100 * algorithm + level;
269  }
270  if (newCompress != fCompress && fBufComp) {
271  delete [] fBufComp;
272  fBufComp = 0;
273  fBufCompCur = 0;
274  fCompPos = 0;
275  }
276  fCompress = newCompress;
277 }
278 
279 ////////////////////////////////////////////////////////////////////////////////
280 
282 {
283  if (settings != fCompress && fBufComp) {
284  delete [] fBufComp;
285  fBufComp = 0;
286  fBufCompCur = 0;
287  fCompPos = 0;
288  }
289  fCompress = settings;
290 }
291 
292 ////////////////////////////////////////////////////////////////////////////////
293 /// Compress the message. The message will only be compressed if the
294 /// compression level > 0 and the if the message is > 256 bytes.
295 /// Returns -1 in case of error (when compression fails or
296 /// when the message increases in size in some pathological cases),
297 /// otherwise returns 0.
298 
300 {
301  Int_t compressionLevel = GetCompressionLevel();
302  Int_t compressionAlgorithm = GetCompressionAlgorithm();
303  if (compressionLevel <= 0) {
304  // no compression specified
305  if (fBufComp) {
306  delete [] fBufComp;
307  fBufComp = 0;
308  fBufCompCur = 0;
309  fCompPos = 0;
310  }
311  return 0;
312  }
313 
314  if (fBufComp && fCompPos == fBufCur) {
315  // the message was already compressed
316  return 0;
317  }
318 
319  // remove any existing compressed buffer before compressing modified message
320  if (fBufComp) {
321  delete [] fBufComp;
322  fBufComp = 0;
323  fBufCompCur = 0;
324  fCompPos = 0;
325  }
326 
327  if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
328  // this message is too small to be compressed
329  return 0;
330  }
331 
332  Int_t hdrlen = 2*sizeof(UInt_t);
333  Int_t messlen = Length() - hdrlen;
334  Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
335  Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
336  Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
337  fBufComp = new char[buflen];
338  char *messbuf = Buffer() + hdrlen;
339  char *bufcur = fBufComp + chdrlen;
340  Int_t noutot = 0;
341  Int_t nzip = 0;
342  Int_t nout, bufmax;
343  for (Int_t i = 0; i < nbuffers; ++i) {
344  if (i == nbuffers - 1)
345  bufmax = messlen - nzip;
346  else
347  bufmax = kMAXZIPBUF;
348  R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout,
349  static_cast<ROOT::ECompressionAlgorithm>(compressionAlgorithm));
350  if (nout == 0 || nout >= messlen) {
351  //this happens when the buffer cannot be compressed
352  delete [] fBufComp;
353  fBufComp = 0;
354  fBufCompCur = 0;
355  fCompPos = 0;
356  return -1;
357  }
358  bufcur += nout;
359  noutot += nout;
360  messbuf += kMAXZIPBUF;
361  nzip += kMAXZIPBUF;
362  }
363  fBufCompCur = bufcur;
364  fCompPos = fBufCur;
365 
366  bufcur = fBufComp;
367  tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
368  Int_t what = fWhat | kMESS_ZIP;
369  tobuf(bufcur, what);
370  tobuf(bufcur, Length()); // original uncompressed buffer length
371 
372  return 0;
373 }
374 
375 ////////////////////////////////////////////////////////////////////////////////
376 /// Uncompress the message. The message will only be uncompressed when
377 /// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
378 
380 {
381  if (!fBufComp || !(fWhat & kMESS_ZIP))
382  return -1;
383 
384  Int_t buflen;
385  Int_t hdrlen = 2*sizeof(UInt_t);
386  char *bufcur1 = fBufComp + hdrlen;
387  frombuf(bufcur1, &buflen);
388  UChar_t *bufcur = (UChar_t*)bufcur1;
389 
390  /* early consistency check */
391  Int_t nin, nbuf;
392  if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
393  Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
394  return -1;
395  }
396 
397  fBuffer = new char[buflen];
398  fBufSize = buflen;
399  fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
401  char *messbuf = fBuffer + hdrlen;
402 
403  Int_t nout;
404  Int_t noutot = 0;
405  while (1) {
406  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
407  if (hc!=0) break;
408  R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
409  if (!nout) break;
410  noutot += nout;
411  if (noutot >= buflen - hdrlen) break;
412  bufcur += nin;
413  messbuf += nout;
414  }
415 
416  fWhat &= ~kMESS_ZIP;
417  fCompress = 1;
418 
419  return 0;
420 }
421 
422 ////////////////////////////////////////////////////////////////////////////////
423 /// Check if the ProcessID pid is already in the message.
424 /// If not, then:
425 /// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
426 /// - mark bit uid+1 where uid id the uid of the ProcessID
427 
429 {
430  if (fBitsPIDs.TestBitNumber(0)) return 0;
431  if (!pid)
432  pid = TProcessID::GetPID();
433  if (!pid) return 0;
435  UInt_t uid = pid->GetUniqueID();
436  fBitsPIDs.SetBitNumber(uid+1);
437  return 1;
438 }
void SetCompressionSettings(Int_t settings=1)
Definition: TMessage.cxx:281
virtual TClass * ReadClass(const TClass *cl=0, UInt_t *objTag=0)
Read class definition from I/O buffer.
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Definition: TProcessID.cxx:341
virtual void ResetMap()
Delete existing fMap and reset map counter.
Definition: TBufferIO.cxx:288
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:90
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition: TObject.cxx:375
Int_t Compress()
Compress the message.
Definition: TMessage.cxx:299
Bool_t IsReading() const
Definition: TBuffer.h:83
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
Bool_t IsWriting() const
Definition: TBuffer.h:84
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket...
Definition: TBufferFile.h:46
char * fBuffer
Definition: TBuffer.h:48
static Bool_t fgEvolution
Definition: TMessage.h:51
TMessage(const TMessage &)
unsigned short UShort_t
Definition: RtypesCore.h:36
UInt_t fWhat
Definition: TMessage.h:43
Buffer base class used for serializing objects.
Definition: TBuffer.h:40
Int_t GetCompressionAlgorithm() const
Definition: TMessage.h:99
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual void InitMap()
Create the fMap container and initialize them with the null object.
Definition: TBufferIO.cxx:129
void SetCompressionAlgorithm(Int_t algorithm=0)
Definition: TMessage.cxx:237
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition: TBits.cxx:481
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition: TMessage.cxx:124
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
Int_t Length() const
Definition: TBuffer.h:96
char * fBufCompCur
Definition: TMessage.h:47
void TagStreamerInfo(TVirtualStreamerInfo *info)
Remember that the StreamerInfo is being used in writing.
Definition: TMessage.cxx:167
TClass * fClass
Definition: TMessage.h:44
char * fBufComp
Definition: TMessage.h:46
char * Buffer() const
Definition: TBuffer.h:93
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriory.
Definition: TMessage.cxx:220
Int_t CompLength() const
Definition: TMessage.h:89
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void tobuf(char *&buf, Bool_t x)
Definition: Bytes.h:57
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition: TProcessID.h:69
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition: TBits.h:226
A doubly linked list.
Definition: TList.h:44
void Reset()
Reset the message buffer so we can use (i.e. fill) it again.
Definition: TMessage.cxx:178
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
Int_t Uncompress()
Uncompress the message.
Definition: TMessage.cxx:379
TList * fInfos
Definition: TMessage.h:41
char * fBufCur
Definition: TBuffer.h:49
Bool_t fEvolution
Definition: TMessage.h:49
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition: TMessage.cxx:116
const Bool_t kFALSE
Definition: RtypesCore.h:88
UShort_t WriteProcessID(TProcessID *pid)
Check if the ProcessID pid is already in the message.
Definition: TMessage.cxx:428
#define ClassImp(name)
Definition: Rtypes.h:359
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force)
Force writing the TStreamerInfo to the message.
Definition: TMessage.cxx:132
Int_t GetCompressionLevel() const
Definition: TMessage.h:105
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char * fCompPos
Definition: TMessage.h:48
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:201
virtual void Add(TObject *obj)
Definition: TList.h:87
TBits fBitsPIDs
Definition: TMessage.h:42
void SetCompressionLevel(Int_t level=1)
Definition: TMessage.cxx:258
virtual ~TMessage()
Clean up compression buffer.
Definition: TMessage.cxx:106
Int_t fBufSize
Definition: TBuffer.h:47
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added)...
Definition: Compression.h:55
unsigned char UChar_t
Definition: RtypesCore.h:34
Abstract Interface class describing Streamer information for one class.
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition: TMessage.cxx:144
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:210
char * fBufMax
Definition: TBuffer.h:50
void SetWriteMode()
Set buffer in write mode.
Definition: TBuffer.cxx:294
Int_t fCompress
Definition: TMessage.h:45