63 class MonitoredTFileInfo:
public TObject {
69 fFileClassName =
"TXNetFile";
74 fLastBytesWritten = 0;
77 fTempWrittenBytes = 0;
79 fLastResetTime = timenow;
80 fCreationTime = timenow;
101 Double_t t = std::min(prectime, fLastResetTime);
104 mselapsed = std::max(mselapsed, 1);
106 readthr = fTempReadBytes / mselapsed * 1000;
107 writethr = fTempWrittenBytes / mselapsed * 1000;
110 void UpdateFileStatus(
TFile *file) {
120 fTempWrittenBytes = 0;
121 fLastResetTime = timenow;
124 void ResetFileStatus(
Double_t timenow) {
126 ResetFileStatus(fileinst, timenow);
135 class MonitoredTFileSummary:
public TNamed {
137 MonitoredTFileSummary(
TString &fileclassname):
TNamed(fileclassname, fileclassname) {
141 fWriteThroughput = 0;
151 mi->GetThroughputs(rth, wth, timenow, prectime);
153 fBytesRead += mi->fTempReadBytes;
154 fBytesWritten += mi->fTempWrittenBytes;
156 if (rth > 0) fReadThroughput += rth;
157 if (wth > 0) fWriteThroughput += wth;
168 const char *monid,
const char *monsubid,
171 fMonInfoRepo =
new std::map<UInt_t, MonitoredTFileInfo *>;
173 Init(monserver, montag, monid, monsubid, option);
270 const char *monsubid,
const char *option)
277 fFileStopwatch.Start(
kTRUE);
278 fLastRWSendTime = fFileStopwatch.RealTime();
279 fLastFCloseSendTime = fFileStopwatch.RealTime();
280 fLastProgressTime = time(0);
281 fFileStopwatch.Continue();
283 fReportInterval = 120;
286 if (fReportInterval < 1)
288 Info(
"TMonaLisaWriter",
"Setting APMON Report Interval to %d seconds",fReportInterval);
291 char *apmon_config[1] =
292 { ((monserver == 0) ? (
char *)
gSystem->
Getenv(
"APMON_CONFIG") : (
char *) monserver) };
293 if (apmon_config[0] == 0) {
294 Error(
"TMonaLisaWriter",
295 "Disabling apmon monitoring since env variable APMON_CONFIG was not found and the monitoring server is not specified in the constructor!");
301 fApmon =
new ApMon(1, apmon_config);
302 fApmon->setConfRecheck(
false);
303 fApmon->setJobMonitoring(
false);
306 }
catch (runtime_error &
e) {
307 Error(
"TMonaLisaWriter",
"Error initializing ApMon: %s", e.what());
308 Error(
"TMonaLisaWriter",
"Disabling apmon.");
325 clustername +=
TString(
"none");
328 SetTitle(clustername);
331 SetTitle(clustername+
TString(montag));
349 fJobId =
"-no-job-id";
363 fSubJobId = monsubid;
368 Info(
"Initialized for ML Server <%s> - Setting ClusterID <%s> JobID <%s> SubID <%s>\n",
369 apmon_config[0], fName.Data() ,fJobId.Data(),fSubJobId.Data());
371 fInitialized =
kTRUE;
385 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->begin();
386 while (iter != fMonInfoRepo->end()) {
391 fMonInfoRepo->clear();
409 Error(
"SendInfoStatus",
"Monitoring is not properly initialized!");
420 valuelist->
Add(valtext);
423 success = SendParameters(valuelist);
436 Error(
"TMonaLisaWriter",
437 "Monitoring initialization has failed - you can't send to MonaLisa!");
446 const char *localuser;
453 localuser =
"unknown";
459 valuelist->
Add(valtext);
462 success = SendParameters(valuelist);
475 Error(
"SendInfoDescription",
476 "Monitoring is not properly initialized!");
487 valuelist->
Add(valtext);
490 success = SendParameters(valuelist);
503 Error(
"SendInfoTime",
"Monitoring is not properly initialized!");
516 valuelist->
Add(valtext);
519 success = SendParameters(valuelist);
536 fStopwatch.Start(
kTRUE);
540 Error(
"TMonaLisaWriter",
541 "Monitoring initialization has failed - you can't send to MonaLisa!");
552 valuelist->
Add(valtext);
555 valuelist->
Add(valhost);
558 valuelist->
Add(valsid);
561 success = SendParameters(valuelist);
572 if (!force && (time(0)-fLastProgressTime) < fReportInterval) {
578 Error(
"SendProcessingProgress",
579 "Monitoring is not properly initialized!");
605 valuelist->
Add(valsid);
606 valuelist->
Add(valevent);
607 valuelist->
Add(valbyte);
608 valuelist->
Add(valrealtime);
609 valuelist->
Add(valcputime);
610 valuelist->
Add(valtotmem);
611 valuelist->
Add(valrssmem);
612 valuelist->
Add(valshdmem);
619 strcpu += fStopwatch.CpuTime();
621 strreal += fStopwatch.RealTime();
629 fStopwatch.Continue();
638 valuelist->
Add(textevent);
639 valuelist->
Add(textbyte);
640 valuelist->
Add(textcpu);
641 valuelist->
Add(textreal);
642 valuelist->
Add(texttotmem);
643 valuelist->
Add(textrssmem);
644 valuelist->
Add(textshdmem);
647 valuelist->
Add(valhost);
650 success = SendParameters(valuelist);
651 fLastProgressTime = time(0);
667 const char *openphasename,
671 Error(
"SendFileOpenProgress",
672 "Monitoring is not properly initialized!");
677 if (!fTmpOpenPhases && !openphases) {
678 fTmpOpenPhases =
new TList;
682 if (!openphasename) {
684 fTmpOpenPhases->Clear();
690 fFileStopwatch.Continue();
693 fTmpOpenPhases->Add(nfo);
696 TIter nxt(fTmpOpenPhases);
701 openphases->
Add(nfo);
703 if (fTmpOpenPhases) {
704 fTmpOpenPhases->SetOwner(0);
705 fTmpOpenPhases->Clear();
710 if (!forcesend)
return kTRUE;
711 if (!file)
return kTRUE;
713 TList *op = openphases ? openphases : fTmpOpenPhases;
724 valuelist->
Add(valhost);
726 valuelist->
Add(valsid);
728 valuelist->
Add(valdest);
731 valuelist->
Add(valfid);
734 valuelist->
Add(valstrfid);
754 valuelist->
Add(valtottime);
757 success = SendParameters(valuelist);
766 Error(
"SendFileCloseEvent",
767 "Monitoring is not properly initialized!");
772 Double_t timenow = fFileStopwatch.RealTime();
773 fFileStopwatch.Continue();
775 MonitoredTFileInfo *mi = 0;
776 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->
GetUniqueID());
777 if (iter != fMonInfoRepo->end()) mi = iter->second;
780 if (mi) timelapsed = timenow - mi->fCreationTime;
795 valuelist->
Add(valdest);
797 valuelist->
Add(valfid);
801 valuelist->
Add(valstrfid);
804 valname +=
"readbytes";
806 valuelist->
Add(valread);
814 valname +=
"writtenbytes";
816 valuelist->
Add(valwrite);
824 if (timelapsed > 0.001) {
829 valname +=
"filethrpt_rd";
831 valuelist->
Add(valreadthavg);
835 valname +=
"filethrpt_wr";
837 valuelist->
Add(valwritethavg);
842 mi->UpdateFileStatus(file);
847 success = SendParameters(valuelist);
856 return SendFileCheckpoint(file);
861 return SendFileCheckpoint(file);
868 Error(
"SendFileCheckpoint",
869 "Monitoring is not properly initialized!");
883 Double_t timenow = fFileStopwatch.RealTime();
884 fFileStopwatch.Continue();
890 MonitoredTFileInfo *mi = 0;
891 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->
GetUniqueID());
892 if (iter != fMonInfoRepo->end()) mi = iter->second;
895 mi =
new MonitoredTFileInfo(file, timenow);
896 if (mi) fMonInfoRepo->insert( make_pair( file->
GetUniqueID(), mi ) );
900 if (mi) mi->UpdateFileStatus(file);
903 if ( timenow - fLastRWSendTime < fReportInterval) {
919 valuelist->
Add(valhost);
921 valuelist->
Add(valsid);
927 iter = fMonInfoRepo->begin();
928 if (iter != fMonInfoRepo->end()) mi = iter->second;
932 MonitoredTFileSummary *
sum =
static_cast<MonitoredTFileSummary *
>(summary.
FindObject(mi->fFileClassName));
934 sum =
new MonitoredTFileSummary(mi->fFileClassName);
939 sum->Update(mi, timenow, fLastRWSendTime);
940 mi->ResetFileStatus(timenow);
949 if (iter != fMonInfoRepo->end())
955 for (iter = fMonInfoRepo->begin(); iter != fMonInfoRepo->end(); iter++)
956 if (!iter->second) fMonInfoRepo->erase(iter);
961 TIter nxt2(&summary);
962 MonitoredTFileSummary *
sum;
963 while ((sum = (MonitoredTFileSummary *)nxt2())) {
965 if (sum->fReadThroughput >= 0) {
966 valname = sum->GetName();
967 valname +=
"_avgthrpt_rd";
969 valuelist->
Add(valreadthr);
972 if ( sum->fWriteThroughput >= 0 ) {
973 valname = sum->GetName();
974 valname +=
"_avgthrpt_wr";
976 valuelist->
Add(valwritethr);
983 success = SendParameters(valuelist);
985 fLastRWSendTime = timenow;
997 Error(
"SendParameters",
"Monitoring is not properly initialized!");
1002 Error(
"SendParameters",
"No values in the value list!");
1006 if (identifier == 0)
1007 identifier = fJobId;
1009 TIter nextvalue(valuelist);
1016 char **apmon_params = 0;
1017 Int_t *apmon_types = 0;
1018 char **apmon_values = 0;
1021 if (apmon_nparams) {
1023 apmon_params = (
char **)
malloc(apmon_nparams *
sizeof(
char *));
1024 apmon_values = (
char **)
malloc(apmon_nparams *
sizeof(
char *));
1025 apmon_types = (
int *)
malloc(apmon_nparams *
sizeof(
int));
1026 bufDouble =
new Double_t[apmon_nparams];
1029 while ((monobj = nextvalue())) {
1030 if (!strcmp(monobj->
ClassName(),
"TMonaLisaValue")) {
1034 Info(
"SendParameters",
"adding tag %s with val %f",
1037 apmon_params[looper] = (
char *) objval->
GetName();
1038 apmon_types[looper] = XDR_REAL64;
1039 apmon_values[looper] = (
char *) (objval->
GetValuePtr());
1042 if (!strcmp(monobj->
ClassName(),
"TMonaLisaText")) {
1046 Info(
"SendParameters",
"adding tag %s with text %s",
1049 apmon_params[looper] = (
char *) objtext->
GetName();
1050 apmon_types[looper] = XDR_STRING;
1051 apmon_values[looper] = (
char *) (objtext->
GetText());
1054 if (!strcmp(monobj->
ClassName(),
"TNamed")) {
1058 Info(
"SendParameters",
"adding tag %s with text %s",
1061 apmon_params[looper] = (
char *) objNamed->
GetName();
1062 apmon_types[looper] = XDR_STRING;
1063 apmon_values[looper] = (
char *) (objNamed->
GetTitle());
1067 if (!strcmp(monobj->
ClassName(),
"TParameter<double>")) {
1071 Info(
"SendParameters",
"adding tag %s with val %f",
1074 apmon_params[looper] = (
char *) objParam->
GetName();
1075 apmon_types[looper] = XDR_REAL64;
1076 apmon_values[looper] = (
char *) &(objParam->
GetVal());
1079 if (!strcmp(monobj->
ClassName(),
"TParameter<Long64_t>")) {
1083 Info(
"SendParameters",
"adding tag %s with val %lld",
1086 apmon_params[looper] = (
char *) objParam->
GetName();
1087 apmon_types[looper] = XDR_REAL64;
1088 bufDouble[looper] = objParam->
GetVal();
1089 apmon_values[looper] = (
char *) (bufDouble + looper);
1092 if (!strcmp(monobj->
ClassName(),
"TParameter<long>")) {
1096 Info(
"SendParameters",
"adding tag %s with val %ld",
1099 apmon_params[looper] = (
char *) objParam->
GetName();
1100 apmon_types[looper] = XDR_REAL64;
1101 bufDouble[looper] = objParam->
GetVal();
1102 apmon_values[looper] = (
char *) (bufDouble + looper);
1105 if (!strcmp(monobj->
ClassName(),
"TParameter<float>")) {
1109 Info(
"SendParameters",
"adding tag %s with val %f",
1112 apmon_params[looper] = (
char *) objParam->
GetName();
1113 apmon_types[looper] = XDR_REAL64;
1114 bufDouble[looper] = objParam->
GetVal();
1115 apmon_values[looper] = (
char *) (bufDouble + looper);
1118 if (!strcmp(monobj->
ClassName(),
"TParameter<int>")) {
1122 Info(
"SendParameters",
"adding tag %s with val %d",
1125 apmon_params[looper] = (
char *) objParam->
GetName();
1126 apmon_types[looper] = XDR_REAL64;
1127 bufDouble[looper] = objParam->
GetVal();
1128 apmon_values[looper] = (
char *) (bufDouble + looper);
1134 apmon_nparams = looper;
1137 Info(
"SendParameters",
"n: %d name: %s identifier %s ...,",
1138 apmon_nparams,
GetName(), identifier);
1140 ((ApMon *) fApmon)->sendParameters((
char *)
GetName(), (
char*)identifier,
1141 apmon_nparams, apmon_params,
1142 apmon_types, apmon_values);
1157 ((ApMon *) fApmon)->setLogLevel((
char *) loglevel);
1165 std::cout <<
"Site (Farm) : " << fName << std::endl;
1166 std::cout <<
"JobId (Node) : " << fJobId << std::endl;
1167 std::cout <<
"SubJobId (Node) : " << fSubJobId << std::endl;
1168 std::cout <<
"HostName : " << fHostname << std::endl;
1169 std::cout <<
"Pid : " << fPid << std::endl;
1170 std::cout <<
"Inititialized : " << fInitialized << std::endl;
1171 std::cout <<
"Verbose : " << fVerbose << std::endl;
virtual Bool_t SendInfoStatus(const char *status)
Sends a <status> text to MonaLisa following the process scheme: <site> –> <jobid> –> 'status' = <st...
virtual const char * GetName() const
Returns name of object.
std::string GetName(const std::string &scope_name)
virtual UInt_t GetUniqueID() const
Return the unique object id.
virtual Bool_t SendFileWriteProgress(TFile *file)
static long int sum(long int i)
virtual int GetPid()
Get process id.
virtual Bool_t SendFileCloseEvent(TFile *file)
void AddLast(TObject *obj)
Add object at the end of the list.
Bool_t TestBit(UInt_t f) const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void Init(const char *monserver, const char *montag, const char *monid, const char *monsubid, const char *option)
Creates a TMonaLisaWriter object to send monitoring information to a MonaLisa server using the MonaLi...
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
const char * GetText() const
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
const char * GetName() const
Returns name of object.
virtual Long64_t GetBytesWritten() const
Return the total number of bytes written so far to the file.
virtual Bool_t SendProcessingProgress(Double_t nevent, Double_t nbytes, Bool_t force=kFALSE)
Send the procesing progress to MonaLisa.
TObject * FindObject(const char *name) const
Find object using its name.
TMonaLisaWriter(const TMonaLisaWriter &)
virtual void SetLogLevel(const char *loglevel="WARNING")
Set MonaLisa log level.
virtual Bool_t SendFileOpenProgress(TFile *file, TList *openphases, const char *openphasename, Bool_t forcesend=kFALSE)
Send the fileopen progress to MonaLisa.
const char * GetHost() const
virtual const char * ClassName() const
Returns name of class to which the object belongs.
while((line_o=(*lineIt)()))
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
virtual Bool_t SendInfoTime()
Sends the current time to MonaLisa following the processing scheme <site> –> <jobid> –> 'time' = >u...
virtual ~TMonaLisaWriter()
Cleanup.
The TNamed class is the base class for all named ROOT classes.
virtual Bool_t SendInfoUser(const char *user=0)
Sends the <user> text to MonaLisa following the process scheme: <site> –> <jobid> –> 'user' = <user...
void Init(TClassEdit::TInterpreterLookupHelper *helper)
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual const char * Getenv(const char *env)
Get environment variable.
void Info(const char *location, const char *msgfmt,...)
virtual Bool_t SendParameters(TList *valuelist, const char *identifier=0)
Send the parameters to MonaLisa.
void Error(const char *location, const char *msgfmt,...)
Named parameter, streamable and storable.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
R__EXTERN TSystem * gSystem
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
char * Form(const char *fmt,...)
virtual Long64_t GetBytesRead() const
virtual Bool_t IsOpen() const
Returns kTRUE in case file is open and kFALSE if file is not open.
virtual const char * HostName()
Return the system's host name.
virtual Bool_t SendProcessingStatus(const char *status, Bool_t restarttimer=kFALSE)
Send the procesing status 'status' to MonaLisa following the processing scheme: <site> –> <jobid> –...
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Mother of all ROOT objects.
virtual const TUrl * GetEndpointUrl() const
virtual void Add(TObject *obj)
virtual Bool_t SendInfoDescription(const char *jobtag)
Sends the description <jobtag> following the processing scheme: <site> –> <jobid> –> 'jobname' = <j...
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
void Print(Option_t *option="") const
Print info about MonaLisa object.
Double_t GetValue() const
const AParamType & GetVal() const
const char * GetUser() const
virtual Int_t GetSize() const
Bool_t SendFileCheckpoint(TFile *file)
static Long64_t GetFileCounter()
if(line.BeginsWith("/*"))
virtual Bool_t SendFileReadProgress(TFile *file)
virtual const char * GetTitle() const
Returns title of object.
const char * Data() const