Logo ROOT   6.14/05
Reference Guide
TProofMonSenderML.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: G.Ganis July 2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 
13 /** \class TProofMonSenderML
14 \ingroup proofkernel
15 
16 TProofMonSender implementation for the ML writer
17 
18 */
19 
20 #include "TProofMonSenderML.h"
21 
22 #include "TDSet.h"
23 #include "TFileInfo.h"
24 #include "THashList.h"
25 #include "TList.h"
26 #include "TParameter.h"
27 #include "TPluginManager.h"
28 #include "TProofDebug.h"
29 #include "TROOT.h"
30 #include "TSystem.h"
31 #include "TVirtualMonitoring.h"
32 
33 ////////////////////////////////////////////////////////////////////////////////
34 /// Main constructor
35 
36 TProofMonSenderML::TProofMonSenderML(const char *serv, const char *tag,
37  const char *id, const char *subid,
38  const char *opt)
39  : TProofMonSender(serv, "ProofMonSenderML")
40 {
41  fWriter = 0;
42  // Init the sender instance using the plugin manager
43  TPluginHandler *h = 0;
44  if ((h = gROOT->GetPluginManager()->FindHandler("TVirtualMonitoringWriter", "MonaLisa"))) {
45  if (h->LoadPlugin() != -1) {
46  fWriter = (TVirtualMonitoringWriter *) h->ExecPlugin(5, serv, tag, id, subid, opt);
48  }
49  }
50  // Flag this instance as valid if the writer initialization succeeded
52 
53  // Set default send control options
57  fSummaryVrs = 1;
58  fDataSetInfoVrs = 1;
59  fFileInfoVrs = 1;
60 
61  // Transfer verbosity requirements
62  PDB(kMonitoring,1) if (fWriter) fWriter->Verbose(kTRUE);
63 }
64 
65 ////////////////////////////////////////////////////////////////////////////////
66 /// Destructor
67 
69 {
71 }
72 
73 ////////////////////////////////////////////////////////////////////////////////
74 /// Send summary record
75 ///
76 /// There are three versions of this record, corresponding the evolution
77 /// in time of the monitoring requirements.
78 ///
79 /// The default version 2 contains the following information
80 ///
81 /// user XRD_STRING
82 /// proofgroup XRD_STRING
83 /// begin XRD_STRING
84 /// end XRD_STRING
85 /// walltime XRD_REAL64
86 /// cputime XRD_REAL64
87 /// bytesread XRD_REAL64
88 /// events XRD_REAL64
89 /// totevents XRD_REAL64
90 /// workers XRD_REAL64
91 /// vmemmxw XRD_REAL64
92 /// rmemmxw XRD_REAL64
93 /// vmemmxm XRD_REAL64
94 /// rmemmxm XRD_REAL64
95 /// numfiles XRD_REAL64
96 /// missfiles XRD_REAL64
97 /// status XRD_REAL64
98 /// rootver XRD_STRING
99 ///
100 /// Version 1 contains the following information
101 /// (no 'status', 'missfiles', 'rootver'; 'dataset' field with name(s) of
102 /// processed dataset(s))
103 ///
104 /// user XRD_STRING
105 /// proofgroup XRD_STRING
106 /// begin XRD_STRING
107 /// end XRD_STRING
108 /// walltime XRD_REAL64
109 /// cputime XRD_REAL64
110 /// bytesread XRD_REAL64
111 /// events XRD_REAL64
112 /// totevents XRD_REAL64
113 /// workers XRD_REAL64
114 /// vmemmxw XRD_REAL64
115 /// rmemmxw XRD_REAL64
116 /// vmemmxm XRD_REAL64
117 /// rmemmxm XRD_REAL64
118 /// numfiles XRD_REAL64
119 /// dataset XRD_STRING
120 ///
121 /// Version 0 contains the following information
122 /// ('group' instead of 'proofgroup'; no 'vmemmxw',
123 /// 'rmemmxw', 'vmemmxm', 'rmemmxm', 'numfiles', 'dataset')
124 ///
125 /// user XRD_STRING
126 /// group XRD_STRING
127 /// begin XRD_STRING
128 /// end XRD_STRING
129 /// walltime XRD_REAL64
130 /// cputime XRD_REAL64
131 /// bytesread XRD_REAL64
132 /// events XRD_REAL64
133 /// totevents XRD_REAL64
134 /// workers XRD_REAL64
135 ///
136 /// Return 0 on success, -1 on any failure.
137 
139 {
140  if (!IsValid()) {
141  Error("SendSummary", "invalid instance: do nothing!");
142  return -1;
143  }
144 
145  // Are we requested to send this info?
146  if (!TestBit(TProofMonSender::kSendSummary)) return 0;
147 
148  // Make sure we have something to send
149  if (!recs || (recs && recs->GetSize() <= 0)) {
150  Error("SendSummary", "records list undefined or empty!");
151  return -1;
152  }
153  TList *xrecs = recs;
154 
155  PDB(kMonitoring,1) Info("SendSummary", "preparing (qid: '%s')", id);
156 
157  // Do not send duplicated information
158  TObject *qtag = recs->FindObject("querytag");
159  if (qtag) recs->Remove(qtag);
160 
161  TObject *dsn = 0;
162  // We may need to correct some variable names first
163  if (fSummaryVrs == 0) {
164  if ((dsn = recs->FindObject("dataset"))) recs->Remove(dsn);
165  } else if (fSummaryVrs == 0) {
166  // Only the first records
167  xrecs = new TList;
168  xrecs->SetOwner(kFALSE);
169  TIter nxr(recs);
170  TObject *o = 0;
171  while ((o = nxr())) {
172  if (!strcmp(o->GetName(), "vmemmxw")) break;
173  xrecs->Add(o);
174  }
175  }
176 
177  PDB(kMonitoring,1)
178  Info("SendSummary", "sending (%d entries)", xrecs->GetSize());
179 
180  // Now we are ready to send
181  Bool_t rc = fWriter->SendParameters(xrecs, id);
182 
183  // Restore the "dataset" entry in the list
184  if (fSummaryVrs > 1 && dsn && xrecs == recs) {
185  TObject *num = recs->FindObject("numfiles");
186  if (num)
187  recs->AddBefore(num, dsn);
188  else
189  recs->Add(dsn);
190  }
191  // Restore the "querytag" entry in the list
192  if (qtag) {
193  TObject *wrks = recs->FindObject("workers");
194  if (wrks)
195  recs->AddAfter(wrks, qtag);
196  else
197  recs->Add(qtag);
198  }
199  if (xrecs != recs) SafeDelete(xrecs);
200 
201  // Done
202  return (rc ? 0 : -1);
203 }
204 
205 ////////////////////////////////////////////////////////////////////////////////
206 /// Post information about the processed dataset(s). The information is taken
207 /// from the TDSet object 'dset' and integrated with the missing files
208 /// information in the list 'missing'. The string 'qid' is the uninque
209 /// ID of the query; 'begin' the starting time.
210 ///
211 /// The records sent by this call will appear with ids 'dataset_<dataset_name_hash>'
212 ///
213 /// There are two versions of this record, with or without the starting time.
214 /// The starting time could be looked up from the summary record, if available.
215 ///
216 /// The default version 1 contains the following information:
217 ///
218 /// dsn XRD_STRING
219 /// querytag XRD_STRING
220 /// querybegin XRD_STRING
221 /// numfiles XRD_REAL64
222 /// missfiles XRD_REAL64
223 ///
224 /// Version 0 contains the following information:
225 /// (no 'querybegin')
226 ///
227 /// dsn XRD_STRING
228 /// querytag XRD_STRING
229 /// numfiles XRD_REAL64
230 /// missfiles XRD_REAL64
231 ///
232 /// The information is posted with a bulk insert.
233 ///
234 /// Returns 0 on success, -1 on failure.
235 
237  const char *begin, const char *qid)
238 {
239  if (!IsValid()) {
240  Error("SendDataSetInfo", "invalid instance: do nothing!");
241  return -1;
242  }
243 
244  // Are we requested to send this info?
246 
247  // The query id (tag) must be given
248  if (!qid || (qid && strlen(qid) <= 0)) {
249  Error("SendDataSetInfo", "query id (tag) undefined!");
250  return -1;
251  }
252  // The dataset must be given
253  if (!dset) {
254  Error("SendDataSetInfo", "TDSet object undefined! (qid: '%s')", qid);
255  return -1;
256  }
257 
258  PDB(kMonitoring,1)
259  Info("SendDataSetInfo", "preparing (qid: '%s')", qid);
260 
261  TList plets;
262  // Extract the information and save it into the relevant multiplets
263  TString dss(dset->GetName()), ds;
264  Ssiz_t from = 0;
265  while ((dss.Tokenize(ds, from , "[,| ]"))) {
266  // Create a new TDSetPlet and add it to the list
267  plets.Add(new TDSetPlet(ds.Data()));
268  }
269  // Now try to count the files
270  TDSetPlet *plet = 0;
271  TIter nxpl(&plets);
272  TObject *o = 0;
273  TDSetElement *e = 0, *ee = 0;
274  TDSet *dsete = 0;
275  TIter nxe(dset->GetListOfElements());
276  TString dse;
277  while ((o = nxe())) {
278  if ((e = dynamic_cast<TDSetElement *>(o))) {
279  dse = e->GetDataSet();
280  if (!dse.IsNull()) {
281  nxpl.Reset();
282  while ((plet = (TDSetPlet *) nxpl())) {
283  if (dse == plet->GetName()) {
284  plet->fFiles += 1;
285  break;
286  }
287  }
288  }
289  } else if ((dsete = dynamic_cast<TDSet *>(o))) {
290  PDB(kMonitoring,1)
291  Info("SendDataSetInfo", "dset '%s' (%d files)",
292  o->GetName(), dsete->GetListOfElements()->GetSize());
293  TIter nxee(dsete->GetListOfElements());
294  while ((ee = (TDSetElement *) nxee())) {
295  dse = ee->GetDataSet();
296  if (!dse.IsNull()) {
297  nxpl.Reset();
298  while ((plet = (TDSetPlet *) nxpl())) {
299  if (dse == plet->GetName()) {
300  plet->fFiles += 1;
301  plet->fDSet = dsete;
302  break;
303  }
304  }
305  }
306  }
307  } else {
308  Warning("SendDataSetInfo", "ignoring unknown element type: '%s'", o->ClassName());
309  }
310  }
311 
312  // Now try to include the missing files info
313  if (missing) {
314  TFileInfo *fi = 0;
315  TIter nxm(missing);
316  TString dsfi, fn;
317  while ((fi = (TFileInfo *) nxm())) {
318  dsfi = fi->GetTitle();
319  if (!dsfi.IsNull() && dsfi != "TFileInfo") {
320  nxpl.Reset();
321  while ((plet = (TDSetPlet *) nxpl())) {
322  if (dsfi == plet->GetName()) {
323  fn = fi->GetCurrentUrl()->GetUrl();
324  if (plet->fDSet && plet->fDSet->GetListOfElements() &&
325  !(plet->fDSet->GetListOfElements()->FindObject(fn))) plet->fFiles += 1;
326  plet->fMissing += 1;
327  break;
328  }
329  }
330  }
331  }
332  }
333 
334  // Prepare objects to be sent
335  TList values;
336  TNamed *nm_dsn = new TNamed("dsn", "");
337  values.Add(nm_dsn);
338  TNamed *nm_querytag = new TNamed("querytag", qid);
339  values.Add(nm_querytag);
340  TNamed *nm_begin = 0;
341  if (fDataSetInfoVrs > 0) {
342  nm_begin = new TNamed("begin", begin);
343  values.Add(nm_begin);
344  }
345  TParameter<Int_t> *pi_numfiles = new TParameter<Int_t>("numfiles", -1);
346  values.Add(pi_numfiles);
347  TParameter<Int_t> *pi_missfiles = new TParameter<Int_t>("missfiles", -1);
348  values.Add(pi_missfiles);
349 
350  PDB(kMonitoring,1)
351  Info("SendDataSetInfo", "sending (%d entries)", plets.GetSize());
352 
353  Bool_t rc = kTRUE;
354  TString dsnh;
355  nxpl.Reset();
356  while ((plet = (TDSetPlet *) nxpl())) {
357  nm_dsn->SetTitle(plet->GetName());
358  pi_numfiles->SetVal(plet->fFiles);
359  pi_missfiles->SetVal(plet->fMissing);
360  dsnh.Form("dataset_%x", TString(plet->GetName()).Hash());
361  if (!(rc = fWriter->SendParameters(&values, dsnh.Data()))) break;
362  }
363 
364  // Done
365  return (rc ? 0 : -1);
366 }
367 
368 ////////////////////////////////////////////////////////////////////////////////
369 /// Post information about the requested files. The information is taken
370 /// from the TDSet object 'dset' and integrated with the missing files
371 /// information in the list 'missing'. The string 'qid' is the unique
372 /// ID of the query; 'begin' the starting time.
373 ///
374 /// The records sent by this call will appear with ids 'file_<file_name_hash>'
375 ///
376 /// There are two versions of this record, with or without the starting time.
377 /// The starting time could be looked up from the summary record, if available.
378 ///
379 /// The default version 1 contains the following information:
380 ///
381 /// lfn XRD_STRING
382 /// path XRD_STRING
383 /// querytag XRD_STRING
384 /// querybegin XRD_STRING
385 /// status XRD_REAL64
386 ///
387 /// Version 0 contains the following information:
388 /// (no 'querybegin')
389 ///
390 /// lfn XRD_STRING
391 /// path XRD_STRING
392 /// querytag XRD_STRING
393 /// status XRD_REAL64
394 ///
395 /// The information is posted with a bulk insert.
396 ///
397 /// Returns 0 on success, -1 on failure.
398 
400  const char *begin, const char *qid)
401 {
402  if (!IsValid()) {
403  Error("SendFileInfo", "invalid instance: do nothing!");
404  return -1;
405  }
406 
407  // Are we requested to send this info?
408  if (!TestBit(TProofMonSender::kSendFileInfo)) return 0;
409 
410  // The query id (tag) must be given
411  if (!qid || (qid && strlen(qid) <= 0)) {
412  Error("SendFileInfo", "query id (tag) undefined!");
413  return -1;
414  }
415  // The dataset must be given
416  if (!dset) {
417  Error("SendFileInfo", "TDSet object undefined! (qid: '%s')", qid);
418  return -1;
419  }
420 
421  PDB(kMonitoring,1) Info("SendFileInfo", "preparing (qid: '%s')", qid);
422 
423  THashList hmiss;
424  if (missing) {
425  TIter nxfm(missing);
426  TFileInfo *fi = 0;
427  while ((fi = (TFileInfo *)nxfm())) {
428  hmiss.Add(new TObjString(fi->GetCurrentUrl()->GetUrl()));
429  }
430  hmiss.Print();
431  }
432 
433  // Prepare objects to be sent
434  TList values;
435  TNamed *nm_lnf = new TNamed("lnf", "");
436  values.Add(nm_lnf);
437  TNamed *nm_path = new TNamed("path", "");
438  values.Add(nm_path);
439  TNamed *nm_querytag = new TNamed("querytag", qid);
440  values.Add(nm_querytag);
441  TNamed *nm_begin = 0;
442  if (fFileInfoVrs > 0) {
443  nm_begin = new TNamed("begin", begin);
444  values.Add(nm_begin);
445  }
446  TParameter<Int_t> *pi_status = new TParameter<Int_t>("status", -1);
447  values.Add(pi_status);
448 
449  PDB(kMonitoring,1)
450  Info("SendFileInfo", "sending (%d entries)",
451  dset->GetListOfElements()->GetSize());
452 
453  // Loop over files
454  Bool_t rc = kTRUE;
455  TObject *o = 0;
456  TDSetElement *e = 0, *ee = 0;
457  TDSet *dsete = 0;
458  TIter nxe(dset->GetListOfElements());
459  TString fne, fneh;
460  Int_t status = -1;
461  while ((o = nxe())) {
462  if ((e = dynamic_cast<TDSetElement *>(o))) {
463  fne = e->GetName();
464  // Try to determine the status
465  status = 1;
466  if (hmiss.FindObject(fne)) status = 0;
467  // Prepare the parameters list
468  nm_lnf->SetTitle(gSystem->BaseName(fne));
469  nm_path->SetTitle(gSystem->DirName(fne));
470  pi_status->SetVal(status);
471  fneh.Form("file_%x", TString(TUrl(fne.Data()).GetFile()).Hash());
472  if (!(rc = fWriter->SendParameters(&values, fneh.Data()))) break;
473  } else if ((dsete = dynamic_cast<TDSet *>(o))) {
474  PDB(kMonitoring,1)
475  Info("SendFileInfo", "dset '%s' (%d files)",
476  o->GetName(), dsete->GetListOfElements()->GetSize());
477  TIter nxee(dsete->GetListOfElements());
478  while ((ee = (TDSetElement *) nxee())) {
479  fne = ee->GetName();
480  // Try to determine the status
481  status = 1;
482  if (hmiss.FindObject(fne)) status = 0;
483  // Prepare the parameters list
484  nm_lnf->SetTitle(gSystem->BaseName(fne));
485  nm_path->SetTitle(gSystem->DirName(fne));
486  pi_status->SetVal(status);
487  fneh.Form("file_%x", TString(TUrl(fne.Data()).GetFile()).Hash());
488  if (!(rc = fWriter->SendParameters(&values, fneh.Data()))) break;
489  }
490  } else {
491  Warning("SendFileInfo", "ignoring unknown element type: '%s'", o->ClassName());
492  }
493  }
494 
495  // Done
496  return (rc ? 0 : -1);
497 }
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:932
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
virtual void Verbose(Bool_t)
Collectable string class.
Definition: TObjString.h:28
TVirtualMonitoringWriter * fWriter
Int_t SendFileInfo(TDSet *, TList *, const char *, const char *)
Post information about the requested files.
This class represents a WWW compatible URL.
Definition: TUrl.h:35
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:248
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t IsValid() const
#define gROOT
Definition: TROOT.h:410
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:172
void SetVal(const AParamType &val)
Definition: TParameter.h:71
Int_t LoadPlugin()
Load the plugin library for this handler.
Basic string class.
Definition: TString.h:131
Int_t SendDataSetInfo(TDSet *, TList *, const char *, const char *)
Post information about the processed dataset(s).
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:1004
bool Bool_t
Definition: RtypesCore.h:59
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashList.cxx:262
TList * GetListOfElements() const
Definition: TDSet.h:231
void Reset()
Definition: TCollection.h:252
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:387
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
Manages an element of a TDSet.
Definition: TDSet.h:66
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:128
#define PDB(mask, level)
Definition: TProofDebug.h:56
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:34
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition: TObject.h:134
A doubly linked list.
Definition: TList.h:44
Named parameter, streamable and storable.
Definition: TParameter.h:37
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
if object ctor succeeded but object should not be used
Definition: TObject.h:68
Long_t ExecPlugin(int nargs, const T &... params)
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:818
Int_t SendSummary(TList *, const char *)
Send summary record.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2264
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual Bool_t SendParameters(TList *, const char *=0)
virtual void AddBefore(const TObject *before, TObject *obj)
Insert object before object before in the list.
Definition: TList.cxx:193
#define h(i)
Definition: RSha256.hxx:106
virtual ULong_t Hash() const
Return hash value for this object.
Definition: TNamed.h:49
const Bool_t kFALSE
Definition: RtypesCore.h:88
#define SafeDelete(p)
Definition: RConfig.h:529
int Ssiz_t
Definition: RtypesCore.h:63
TNamed()
Definition: TNamed.h:36
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
Bool_t IsNull() const
Definition: TString.h:402
virtual void AddAfter(const TObject *after, TObject *obj)
Insert object after object after in the list.
Definition: TList.cxx:247
Mother of all ROOT objects.
Definition: TObject.h:37
const char * GetDataSet() const
Definition: TDSet.h:122
virtual void Add(TObject *obj)
Definition: TList.h:87
Provides the interface for PROOF monitoring to different writers.
TProofMonSenderML(const char *serv, const char *tag, const char *id=0, const char *subid=0, const char *opt="")
Main constructor.
virtual ~TProofMonSenderML()
Destructor.
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
void ResetBit(UInt_t f)
Definition: TObject.h:171
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:164
const Bool_t kTRUE
Definition: RtypesCore.h:87
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
const char * Data() const
Definition: TString.h:364