Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TProofMonSenderML.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: G.Ganis July 2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12
13/** \class TProofMonSenderML
14\ingroup proofkernel
15
16TProofMonSender implementation for the ML writer
17
18*/
19
20#include "TProofMonSenderML.h"
21
22#include "TDSet.h"
23#include "TFileInfo.h"
24#include "THashList.h"
25#include "TList.h"
26#include "TParameter.h"
27#include "TPluginManager.h"
28#include "TProofDebug.h"
29#include "TROOT.h"
30#include "TUrl.h"
31#include "TSystem.h"
32#include "TObjString.h"
33#include "TVirtualMonitoring.h"
34
35////////////////////////////////////////////////////////////////////////////////
36/// Main constructor
37
38TProofMonSenderML::TProofMonSenderML(const char *serv, const char *tag,
39 const char *id, const char *subid,
40 const char *opt)
41 : TProofMonSender(serv, "ProofMonSenderML")
42{
43 fWriter = 0;
44 // Init the sender instance using the plugin manager
45 TPluginHandler *h = 0;
46 if ((h = gROOT->GetPluginManager()->FindHandler("TVirtualMonitoringWriter", "MonaLisa"))) {
47 if (h->LoadPlugin() != -1) {
48 fWriter = (TVirtualMonitoringWriter *) h->ExecPlugin(5, serv, tag, id, subid, opt);
50 }
51 }
52 // Flag this instance as valid if the writer initialization succeeded
54
55 // Set default send control options
59 fSummaryVrs = 1;
61 fFileInfoVrs = 1;
62
63 // Transfer verbosity requirements
64 PDB(kMonitoring,1) if (fWriter) fWriter->Verbose(kTRUE);
65}
66
67////////////////////////////////////////////////////////////////////////////////
68/// Destructor
69
71{
73}
74
75////////////////////////////////////////////////////////////////////////////////
76/// Send summary record
77///
78/// There are three versions of this record, corresponding the evolution
79/// in time of the monitoring requirements.
80///
81/// The default version 2 contains the following information
82///
83/// user XRD_STRING
84/// proofgroup XRD_STRING
85/// begin XRD_STRING
86/// end XRD_STRING
87/// walltime XRD_REAL64
88/// cputime XRD_REAL64
89/// bytesread XRD_REAL64
90/// events XRD_REAL64
91/// totevents XRD_REAL64
92/// workers XRD_REAL64
93/// vmemmxw XRD_REAL64
94/// rmemmxw XRD_REAL64
95/// vmemmxm XRD_REAL64
96/// rmemmxm XRD_REAL64
97/// numfiles XRD_REAL64
98/// missfiles XRD_REAL64
99/// status XRD_REAL64
100/// rootver XRD_STRING
101///
102/// Version 1 contains the following information
103/// (no 'status', 'missfiles', 'rootver'; 'dataset' field with name(s) of
104/// processed dataset(s))
105///
106/// user XRD_STRING
107/// proofgroup XRD_STRING
108/// begin XRD_STRING
109/// end XRD_STRING
110/// walltime XRD_REAL64
111/// cputime XRD_REAL64
112/// bytesread XRD_REAL64
113/// events XRD_REAL64
114/// totevents XRD_REAL64
115/// workers XRD_REAL64
116/// vmemmxw XRD_REAL64
117/// rmemmxw XRD_REAL64
118/// vmemmxm XRD_REAL64
119/// rmemmxm XRD_REAL64
120/// numfiles XRD_REAL64
121/// dataset XRD_STRING
122///
123/// Version 0 contains the following information
124/// ('group' instead of 'proofgroup'; no 'vmemmxw',
125/// 'rmemmxw', 'vmemmxm', 'rmemmxm', 'numfiles', 'dataset')
126///
127/// user XRD_STRING
128/// group XRD_STRING
129/// begin XRD_STRING
130/// end XRD_STRING
131/// walltime XRD_REAL64
132/// cputime XRD_REAL64
133/// bytesread XRD_REAL64
134/// events XRD_REAL64
135/// totevents XRD_REAL64
136/// workers XRD_REAL64
137///
138/// Return 0 on success, -1 on any failure.
139
141{
142 if (!IsValid()) {
143 Error("SendSummary", "invalid instance: do nothing!");
144 return -1;
145 }
146
147 // Are we requested to send this info?
149
150 // Make sure we have something to send
151 if (!recs || (recs && recs->GetSize() <= 0)) {
152 Error("SendSummary", "records list undefined or empty!");
153 return -1;
154 }
155 TList *xrecs = recs;
156
157 PDB(kMonitoring,1) Info("SendSummary", "preparing (qid: '%s')", id);
158
159 // Do not send duplicated information
160 TObject *qtag = recs->FindObject("querytag");
161 if (qtag) recs->Remove(qtag);
162
163 TObject *dsn = 0;
164 // We may need to correct some variable names first
165 if (fSummaryVrs > 1) {
166 if ((dsn = recs->FindObject("dataset"))) recs->Remove(dsn);
167 } else if (fSummaryVrs == 0) {
168 // Only the first records
169 xrecs = new TList;
170 xrecs->SetOwner(kFALSE);
171 TIter nxr(recs);
172 TObject *o = 0;
173 while ((o = nxr())) {
174 if (!strcmp(o->GetName(), "vmemmxw")) break;
175 xrecs->Add(o);
176 }
177 }
178
179 PDB(kMonitoring,1)
180 Info("SendSummary", "sending (%d entries)", xrecs->GetSize());
181
182 // Now we are ready to send
183 Bool_t rc = fWriter->SendParameters(xrecs, id);
184
185 // Restore the "dataset" entry in the list
186 if (fSummaryVrs > 1 && dsn && xrecs == recs) {
187 TObject *num = recs->FindObject("numfiles");
188 if (num)
189 recs->AddBefore(num, dsn);
190 else
191 recs->Add(dsn);
192 }
193 // Restore the "querytag" entry in the list
194 if (qtag) {
195 TObject *wrks = recs->FindObject("workers");
196 if (wrks)
197 recs->AddAfter(wrks, qtag);
198 else
199 recs->Add(qtag);
200 }
201 if (xrecs != recs) SafeDelete(xrecs);
202
203 // Done
204 return (rc ? 0 : -1);
205}
206
207////////////////////////////////////////////////////////////////////////////////
208/// Post information about the processed dataset(s). The information is taken
209/// from the TDSet object 'dset' and integrated with the missing files
210/// information in the list 'missing'. The string 'qid' is the uninque
211/// ID of the query; 'begin' the starting time.
212///
213/// The records sent by this call will appear with ids 'dataset_<dataset_name_hash>'
214///
215/// There are two versions of this record, with or without the starting time.
216/// The starting time could be looked up from the summary record, if available.
217///
218/// The default version 1 contains the following information:
219///
220/// dsn XRD_STRING
221/// querytag XRD_STRING
222/// querybegin XRD_STRING
223/// numfiles XRD_REAL64
224/// missfiles XRD_REAL64
225///
226/// Version 0 contains the following information:
227/// (no 'querybegin')
228///
229/// dsn XRD_STRING
230/// querytag XRD_STRING
231/// numfiles XRD_REAL64
232/// missfiles XRD_REAL64
233///
234/// The information is posted with a bulk insert.
235///
236/// Returns 0 on success, -1 on failure.
237
239 const char *begin, const char *qid)
240{
241 if (!IsValid()) {
242 Error("SendDataSetInfo", "invalid instance: do nothing!");
243 return -1;
244 }
245
246 // Are we requested to send this info?
248
249 // The query id (tag) must be given
250 if (!qid || (qid && strlen(qid) <= 0)) {
251 Error("SendDataSetInfo", "query id (tag) undefined!");
252 return -1;
253 }
254 // The dataset must be given
255 if (!dset) {
256 Error("SendDataSetInfo", "TDSet object undefined! (qid: '%s')", qid);
257 return -1;
258 }
259
260 PDB(kMonitoring,1)
261 Info("SendDataSetInfo", "preparing (qid: '%s')", qid);
262
263 TList plets;
264 // Extract the information and save it into the relevant multiplets
265 TString dss(dset->GetName()), ds;
266 Ssiz_t from = 0;
267 while ((dss.Tokenize(ds, from , "[,| ]"))) {
268 // Create a new TDSetPlet and add it to the list
269 plets.Add(new TDSetPlet(ds.Data()));
270 }
271 // Now try to count the files
272 TDSetPlet *plet = 0;
273 TIter nxpl(&plets);
274 TObject *o = 0;
275 TDSetElement *e = 0, *ee = 0;
276 TDSet *dsete = 0;
277 TIter nxe(dset->GetListOfElements());
278 TString dse;
279 while ((o = nxe())) {
280 if ((e = dynamic_cast<TDSetElement *>(o))) {
281 dse = e->GetDataSet();
282 if (!dse.IsNull()) {
283 nxpl.Reset();
284 while ((plet = (TDSetPlet *) nxpl())) {
285 if (dse == plet->GetName()) {
286 plet->fFiles += 1;
287 break;
288 }
289 }
290 }
291 } else if ((dsete = dynamic_cast<TDSet *>(o))) {
292 PDB(kMonitoring,1)
293 Info("SendDataSetInfo", "dset '%s' (%d files)",
294 o->GetName(), dsete->GetListOfElements()->GetSize());
295 TIter nxee(dsete->GetListOfElements());
296 while ((ee = (TDSetElement *) nxee())) {
297 dse = ee->GetDataSet();
298 if (!dse.IsNull()) {
299 nxpl.Reset();
300 while ((plet = (TDSetPlet *) nxpl())) {
301 if (dse == plet->GetName()) {
302 plet->fFiles += 1;
303 plet->fDSet = dsete;
304 break;
305 }
306 }
307 }
308 }
309 } else {
310 Warning("SendDataSetInfo", "ignoring unknown element type: '%s'", o->ClassName());
311 }
312 }
313
314 // Now try to include the missing files info
315 if (missing) {
316 TFileInfo *fi = 0;
317 TIter nxm(missing);
318 TString dsfi, fn;
319 while ((fi = (TFileInfo *) nxm())) {
320 dsfi = fi->GetTitle();
321 if (!dsfi.IsNull() && dsfi != "TFileInfo") {
322 nxpl.Reset();
323 while ((plet = (TDSetPlet *) nxpl())) {
324 if (dsfi == plet->GetName()) {
325 fn = fi->GetCurrentUrl()->GetUrl();
326 if (plet->fDSet && plet->fDSet->GetListOfElements() &&
327 !(plet->fDSet->GetListOfElements()->FindObject(fn))) plet->fFiles += 1;
328 plet->fMissing += 1;
329 break;
330 }
331 }
332 }
333 }
334 }
335
336 // Prepare objects to be sent
337 TList values;
338 TNamed *nm_dsn = new TNamed("dsn", "");
339 values.Add(nm_dsn);
340 TNamed *nm_querytag = new TNamed("querytag", qid);
341 values.Add(nm_querytag);
342 TNamed *nm_begin = 0;
343 if (fDataSetInfoVrs > 0) {
344 nm_begin = new TNamed("begin", begin);
345 values.Add(nm_begin);
346 }
347 TParameter<Int_t> *pi_numfiles = new TParameter<Int_t>("numfiles", -1);
348 values.Add(pi_numfiles);
349 TParameter<Int_t> *pi_missfiles = new TParameter<Int_t>("missfiles", -1);
350 values.Add(pi_missfiles);
351
352 PDB(kMonitoring,1)
353 Info("SendDataSetInfo", "sending (%d entries)", plets.GetSize());
354
355 Bool_t rc = kTRUE;
356 TString dsnh;
357 nxpl.Reset();
358 while ((plet = (TDSetPlet *) nxpl())) {
359 nm_dsn->SetTitle(plet->GetName());
360 pi_numfiles->SetVal(plet->fFiles);
361 pi_missfiles->SetVal(plet->fMissing);
362 dsnh.Form("dataset_%x", TString(plet->GetName()).Hash());
363 if (!(rc = fWriter->SendParameters(&values, dsnh.Data()))) break;
364 }
365
366 // Done
367 return (rc ? 0 : -1);
368}
369
370////////////////////////////////////////////////////////////////////////////////
371/// Post information about the requested files. The information is taken
372/// from the TDSet object 'dset' and integrated with the missing files
373/// information in the list 'missing'. The string 'qid' is the unique
374/// ID of the query; 'begin' the starting time.
375///
376/// The records sent by this call will appear with ids 'file_<file_name_hash>'
377///
378/// There are two versions of this record, with or without the starting time.
379/// The starting time could be looked up from the summary record, if available.
380///
381/// The default version 1 contains the following information:
382///
383/// lfn XRD_STRING
384/// path XRD_STRING
385/// querytag XRD_STRING
386/// querybegin XRD_STRING
387/// status XRD_REAL64
388///
389/// Version 0 contains the following information:
390/// (no 'querybegin')
391///
392/// lfn XRD_STRING
393/// path XRD_STRING
394/// querytag XRD_STRING
395/// status XRD_REAL64
396///
397/// The information is posted with a bulk insert.
398///
399/// Returns 0 on success, -1 on failure.
400
402 const char *begin, const char *qid)
403{
404 if (!IsValid()) {
405 Error("SendFileInfo", "invalid instance: do nothing!");
406 return -1;
407 }
408
409 // Are we requested to send this info?
411
412 // The query id (tag) must be given
413 if (!qid || (qid && strlen(qid) <= 0)) {
414 Error("SendFileInfo", "query id (tag) undefined!");
415 return -1;
416 }
417 // The dataset must be given
418 if (!dset) {
419 Error("SendFileInfo", "TDSet object undefined! (qid: '%s')", qid);
420 return -1;
421 }
422
423 PDB(kMonitoring,1) Info("SendFileInfo", "preparing (qid: '%s')", qid);
424
425 THashList hmiss;
426 if (missing) {
427 TIter nxfm(missing);
428 TFileInfo *fi = 0;
429 while ((fi = (TFileInfo *)nxfm())) {
430 hmiss.Add(new TObjString(fi->GetCurrentUrl()->GetUrl()));
431 }
432 hmiss.Print();
433 }
434
435 // Prepare objects to be sent
436 TList values;
437 TNamed *nm_lnf = new TNamed("lnf", "");
438 values.Add(nm_lnf);
439 TNamed *nm_path = new TNamed("path", "");
440 values.Add(nm_path);
441 TNamed *nm_querytag = new TNamed("querytag", qid);
442 values.Add(nm_querytag);
443 TNamed *nm_begin = 0;
444 if (fFileInfoVrs > 0) {
445 nm_begin = new TNamed("begin", begin);
446 values.Add(nm_begin);
447 }
448 TParameter<Int_t> *pi_status = new TParameter<Int_t>("status", -1);
449 values.Add(pi_status);
450
451 PDB(kMonitoring,1)
452 Info("SendFileInfo", "sending (%d entries)",
453 dset->GetListOfElements()->GetSize());
454
455 // Loop over files
456 Bool_t rc = kTRUE;
457 TObject *o = 0;
458 TDSetElement *e = 0, *ee = 0;
459 TDSet *dsete = 0;
460 TIter nxe(dset->GetListOfElements());
461 TString fne, fneh;
462 Int_t status = -1;
463 while ((o = nxe())) {
464 if ((e = dynamic_cast<TDSetElement *>(o))) {
465 fne = e->GetName();
466 // Try to determine the status
467 status = 1;
468 if (hmiss.FindObject(fne)) status = 0;
469 // Prepare the parameters list
470 nm_lnf->SetTitle(gSystem->BaseName(fne));
471 nm_path->SetTitle(gSystem->GetDirName(fne));
472 pi_status->SetVal(status);
473 fneh.Form("file_%x", TString(TUrl(fne.Data()).GetFile()).Hash());
474 if (!(rc = fWriter->SendParameters(&values, fneh.Data()))) break;
475 } else if ((dsete = dynamic_cast<TDSet *>(o))) {
476 PDB(kMonitoring,1)
477 Info("SendFileInfo", "dset '%s' (%d files)",
478 o->GetName(), dsete->GetListOfElements()->GetSize());
479 TIter nxee(dsete->GetListOfElements());
480 while ((ee = (TDSetElement *) nxee())) {
481 fne = ee->GetName();
482 // Try to determine the status
483 status = 1;
484 if (hmiss.FindObject(fne)) status = 0;
485 // Prepare the parameters list
486 nm_lnf->SetTitle(gSystem->BaseName(fne));
487 nm_path->SetTitle(gSystem->GetDirName(fne));
488 pi_status->SetVal(status);
489 fneh.Form("file_%x", TString(TUrl(fne.Data()).GetFile()).Hash());
490 if (!(rc = fWriter->SendParameters(&values, fneh.Data()))) break;
491 }
492 } else {
493 Warning("SendFileInfo", "ignoring unknown element type: '%s'", o->ClassName());
494 }
495 }
496
497 // Done
498 return (rc ? 0 : -1);
499}
#define SafeDelete(p)
Definition RConfig.hxx:547
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
const Bool_t kFALSE
Definition RtypesCore.h:92
const Bool_t kTRUE
Definition RtypesCore.h:91
#define PDB(mask, level)
Definition TProofDebug.h:56
#define gROOT
Definition TROOT.h:406
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Definition TDSet.h:66
This class implements a data set to be used for PROOF processing.
Definition TDSet.h:153
TList * GetListOfElements() const
Definition TDSet.h:231
Class describing a generic file including meta information.
Definition TFileInfo.h:39
TUrl * GetCurrentUrl() const
Return the current url.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
TObject * FindObject(const char *name) const
Find object using its name.
void Reset()
A doubly linked list.
Definition TList.h:44
virtual void Add(TObject *obj)
Definition TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition TList.cxx:822
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition TList.cxx:578
virtual void AddAfter(const TObject *after, TObject *obj)
Insert object after object after in the list.
Definition TList.cxx:250
virtual void AddBefore(const TObject *before, TObject *obj)
Insert object before object before in the list.
Definition TList.cxx:196
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:164
TNamed()
Definition TNamed.h:36
virtual const char * GetTitle() const
Returns title of object.
Definition TNamed.h:48
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:37
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:359
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition TObject.h:187
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:130
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:879
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition TObject.h:149
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:696
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:893
void ResetBit(UInt_t f)
Definition TObject.h:186
@ kInvalidObject
if object ctor succeeded but object should not be used
Definition TObject.h:68
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:867
Named parameter, streamable and storable.
Definition TParameter.h:35
void SetVal(const AParamType &val)
Definition TParameter.h:69
Int_t SendFileInfo(TDSet *, TList *, const char *, const char *)
Post information about the requested files.
TProofMonSenderML(const char *serv, const char *tag, const char *id=0, const char *subid=0, const char *opt="")
Main constructor.
virtual ~TProofMonSenderML()
Destructor.
Int_t SendSummary(TList *, const char *)
Send summary record.
TVirtualMonitoringWriter * fWriter
Int_t SendDataSetInfo(TDSet *, TList *, const char *, const char *)
Post information about the processed dataset(s).
Provides the interface for PROOF monitoring to different writers.
Bool_t IsValid() const
Basic string class.
Definition TString.h:136
const char * Data() const
Definition TString.h:369
Bool_t IsNull() const
Definition TString.h:407
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition TString.cxx:658
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2309
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition TSystem.cxx:933
virtual TString GetDirName(const char *pathname)
Return the directory name in pathname.
Definition TSystem.cxx:1030
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition TUrl.cxx:389
const char * GetFile() const
Definition TUrl.h:69
virtual Bool_t SendParameters(TList *, const char *=0)
virtual void Verbose(Bool_t)