64 class MonitoredTFileInfo:
public TObject {
70 fFileClassName =
"TXNetFile";
75 fLastBytesWritten = 0;
78 fTempWrittenBytes = 0;
80 fLastResetTime = timenow;
81 fCreationTime = timenow;
102 Double_t t = std::min(prectime, fLastResetTime);
105 mselapsed = std::max(mselapsed, 1);
107 readthr = fTempReadBytes / mselapsed * 1000;
108 writethr = fTempWrittenBytes / mselapsed * 1000;
111 void UpdateFileStatus(
TFile *file) {
121 fTempWrittenBytes = 0;
122 fLastResetTime = timenow;
125 void ResetFileStatus(
Double_t timenow) {
127 ResetFileStatus(fileinst, timenow);
136 class MonitoredTFileSummary:
public TNamed {
138 MonitoredTFileSummary(
TString &fileclassname):
TNamed(fileclassname, fileclassname) {
142 fWriteThroughput = 0;
152 mi->GetThroughputs(rth, wth, timenow, prectime);
154 fBytesRead += mi->fTempReadBytes;
155 fBytesWritten += mi->fTempWrittenBytes;
157 if (rth > 0) fReadThroughput += rth;
158 if (wth > 0) fWriteThroughput += wth;
169 const char *monid,
const char *monsubid,
172 fMonInfoRepo =
new std::map<UInt_t, MonitoredTFileInfo *>;
174 Init(monserver, montag, monid, monsubid, option);
271 const char *monsubid,
const char *option)
278 fFileStopwatch.Start(
kTRUE);
279 fLastRWSendTime = fFileStopwatch.RealTime();
280 fLastFCloseSendTime = fFileStopwatch.RealTime();
281 fLastProgressTime = time(0);
282 fFileStopwatch.Continue();
284 fReportInterval = 120;
287 if (fReportInterval < 1)
289 Info(
"TMonaLisaWriter",
"Setting APMON Report Interval to %d seconds",fReportInterval);
292 char *apmon_config[1] =
293 { ((monserver == 0) ? (
char *)
gSystem->
Getenv(
"APMON_CONFIG") : (
char *) monserver) };
294 if (apmon_config[0] == 0) {
295 Error(
"TMonaLisaWriter",
296 "Disabling apmon monitoring since env variable APMON_CONFIG was not found and the monitoring server is not specified in the constructor!");
302 fApmon =
new ApMon(1, apmon_config);
303 fApmon->setConfRecheck(
false);
304 fApmon->setJobMonitoring(
false);
307 }
catch (runtime_error &
e) {
308 Error(
"TMonaLisaWriter",
"Error initializing ApMon: %s", e.what());
309 Error(
"TMonaLisaWriter",
"Disabling apmon.");
326 clustername +=
TString(
"none");
329 SetTitle(clustername);
332 SetTitle(clustername+
TString(montag));
350 fJobId =
"-no-job-id";
364 fSubJobId = monsubid;
369 Info(
"Initialized for ML Server <%s> - Setting ClusterID <%s> JobID <%s> SubID <%s>\n",
370 apmon_config[0], fName.Data() ,fJobId.Data(),fSubJobId.Data());
372 fInitialized =
kTRUE;
386 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->begin();
387 while (iter != fMonInfoRepo->end()) {
392 fMonInfoRepo->clear();
410 Error(
"SendInfoStatus",
"Monitoring is not properly initialized!");
421 valuelist->
Add(valtext);
424 success = SendParameters(valuelist);
437 Error(
"TMonaLisaWriter",
438 "Monitoring initialization has failed - you can't send to MonaLisa!");
447 const char *localuser;
454 localuser =
"unknown";
460 valuelist->
Add(valtext);
463 success = SendParameters(valuelist);
476 Error(
"SendInfoDescription",
477 "Monitoring is not properly initialized!");
488 valuelist->
Add(valtext);
491 success = SendParameters(valuelist);
504 Error(
"SendInfoTime",
"Monitoring is not properly initialized!");
517 valuelist->
Add(valtext);
520 success = SendParameters(valuelist);
537 fStopwatch.Start(
kTRUE);
541 Error(
"TMonaLisaWriter",
542 "Monitoring initialization has failed - you can't send to MonaLisa!");
553 valuelist->
Add(valtext);
556 valuelist->
Add(valhost);
559 valuelist->
Add(valsid);
562 success = SendParameters(valuelist);
573 if (!force && (time(0)-fLastProgressTime) < fReportInterval) {
579 Error(
"SendProcessingProgress",
580 "Monitoring is not properly initialized!");
606 valuelist->
Add(valsid);
607 valuelist->
Add(valevent);
608 valuelist->
Add(valbyte);
609 valuelist->
Add(valrealtime);
610 valuelist->
Add(valcputime);
611 valuelist->
Add(valtotmem);
612 valuelist->
Add(valrssmem);
613 valuelist->
Add(valshdmem);
620 strcpu += fStopwatch.CpuTime();
622 strreal += fStopwatch.RealTime();
630 fStopwatch.Continue();
639 valuelist->
Add(textevent);
640 valuelist->
Add(textbyte);
641 valuelist->
Add(textcpu);
642 valuelist->
Add(textreal);
643 valuelist->
Add(texttotmem);
644 valuelist->
Add(textrssmem);
645 valuelist->
Add(textshdmem);
648 valuelist->
Add(valhost);
651 success = SendParameters(valuelist);
652 fLastProgressTime = time(0);
668 const char *openphasename,
672 Error(
"SendFileOpenProgress",
673 "Monitoring is not properly initialized!");
678 if (!fTmpOpenPhases && !openphases) {
679 fTmpOpenPhases =
new TList;
683 if (!openphasename) {
685 fTmpOpenPhases->Clear();
691 fFileStopwatch.Continue();
694 fTmpOpenPhases->Add(nfo);
697 TIter nxt(fTmpOpenPhases);
702 openphases->
Add(nfo);
704 if (fTmpOpenPhases) {
705 fTmpOpenPhases->SetOwner(0);
706 fTmpOpenPhases->Clear();
711 if (!forcesend)
return kTRUE;
712 if (!file)
return kTRUE;
714 TList *op = openphases ? openphases : fTmpOpenPhases;
725 valuelist->
Add(valhost);
727 valuelist->
Add(valsid);
729 valuelist->
Add(valdest);
732 valuelist->
Add(valfid);
735 valuelist->
Add(valstrfid);
755 valuelist->
Add(valtottime);
758 success = SendParameters(valuelist);
767 Error(
"SendFileCloseEvent",
768 "Monitoring is not properly initialized!");
773 Double_t timenow = fFileStopwatch.RealTime();
774 fFileStopwatch.Continue();
776 MonitoredTFileInfo *mi = 0;
777 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->
GetUniqueID());
778 if (iter != fMonInfoRepo->end()) mi = iter->second;
781 if (mi) timelapsed = timenow - mi->fCreationTime;
796 valuelist->
Add(valdest);
798 valuelist->
Add(valfid);
802 valuelist->
Add(valstrfid);
805 valname +=
"readbytes";
807 valuelist->
Add(valread);
815 valname +=
"writtenbytes";
817 valuelist->
Add(valwrite);
825 if (timelapsed > 0.001) {
830 valname +=
"filethrpt_rd";
832 valuelist->
Add(valreadthavg);
836 valname +=
"filethrpt_wr";
838 valuelist->
Add(valwritethavg);
843 mi->UpdateFileStatus(file);
848 success = SendParameters(valuelist);
857 return SendFileCheckpoint(file);
862 return SendFileCheckpoint(file);
869 Error(
"SendFileCheckpoint",
870 "Monitoring is not properly initialized!");
884 Double_t timenow = fFileStopwatch.RealTime();
885 fFileStopwatch.Continue();
891 MonitoredTFileInfo *mi = 0;
892 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->
GetUniqueID());
893 if (iter != fMonInfoRepo->end()) mi = iter->second;
896 mi =
new MonitoredTFileInfo(file, timenow);
897 if (mi) fMonInfoRepo->insert( make_pair( file->
GetUniqueID(), mi ) );
901 if (mi) mi->UpdateFileStatus(file);
904 if ( timenow - fLastRWSendTime < fReportInterval) {
920 valuelist->
Add(valhost);
922 valuelist->
Add(valsid);
928 iter = fMonInfoRepo->begin();
929 if (iter != fMonInfoRepo->end()) mi = iter->second;
933 MonitoredTFileSummary *
sum =
static_cast<MonitoredTFileSummary *
>(summary.
FindObject(mi->fFileClassName));
935 sum =
new MonitoredTFileSummary(mi->fFileClassName);
940 sum->Update(mi, timenow, fLastRWSendTime);
941 mi->ResetFileStatus(timenow);
950 if (iter != fMonInfoRepo->end())
956 for (iter = fMonInfoRepo->begin(); iter != fMonInfoRepo->end(); iter++)
957 if (!iter->second) fMonInfoRepo->erase(iter);
962 TIter nxt2(&summary);
963 MonitoredTFileSummary *
sum;
964 while ((sum = (MonitoredTFileSummary *)nxt2())) {
966 if (sum->fReadThroughput >= 0) {
967 valname = sum->GetName();
968 valname +=
"_avgthrpt_rd";
970 valuelist->
Add(valreadthr);
973 if ( sum->fWriteThroughput >= 0 ) {
974 valname = sum->GetName();
975 valname +=
"_avgthrpt_wr";
977 valuelist->
Add(valwritethr);
984 success = SendParameters(valuelist);
986 fLastRWSendTime = timenow;
998 Error(
"SendParameters",
"Monitoring is not properly initialized!");
1003 Error(
"SendParameters",
"No values in the value list!");
1007 if (identifier == 0)
1008 identifier = fJobId;
1010 TIter nextvalue(valuelist);
1017 char **apmon_params = 0;
1018 Int_t *apmon_types = 0;
1019 char **apmon_values = 0;
1022 if (apmon_nparams) {
1024 apmon_params = (
char **)
malloc(apmon_nparams *
sizeof(
char *));
1025 apmon_values = (
char **)
malloc(apmon_nparams *
sizeof(
char *));
1026 apmon_types = (
int *)
malloc(apmon_nparams *
sizeof(
int));
1027 bufDouble =
new Double_t[apmon_nparams];
1030 while ((monobj = nextvalue())) {
1031 if (!strcmp(monobj->
ClassName(),
"TMonaLisaValue")) {
1035 Info(
"SendParameters",
"adding tag %s with val %f",
1038 apmon_params[looper] = (
char *) objval->
GetName();
1039 apmon_types[looper] = XDR_REAL64;
1040 apmon_values[looper] = (
char *) (objval->
GetValuePtr());
1043 if (!strcmp(monobj->
ClassName(),
"TMonaLisaText")) {
1047 Info(
"SendParameters",
"adding tag %s with text %s",
1050 apmon_params[looper] = (
char *) objtext->
GetName();
1051 apmon_types[looper] = XDR_STRING;
1052 apmon_values[looper] = (
char *) (objtext->
GetText());
1055 if (!strcmp(monobj->
ClassName(),
"TNamed")) {
1059 Info(
"SendParameters",
"adding tag %s with text %s",
1062 apmon_params[looper] = (
char *) objNamed->
GetName();
1063 apmon_types[looper] = XDR_STRING;
1064 apmon_values[looper] = (
char *) (objNamed->
GetTitle());
1068 if (!strcmp(monobj->
ClassName(),
"TParameter<double>")) {
1072 Info(
"SendParameters",
"adding tag %s with val %f",
1075 apmon_params[looper] = (
char *) objParam->
GetName();
1076 apmon_types[looper] = XDR_REAL64;
1077 apmon_values[looper] = (
char *) &(objParam->
GetVal());
1080 if (!strcmp(monobj->
ClassName(),
"TParameter<Long64_t>")) {
1084 Info(
"SendParameters",
"adding tag %s with val %lld",
1087 apmon_params[looper] = (
char *) objParam->
GetName();
1088 apmon_types[looper] = XDR_REAL64;
1089 bufDouble[looper] = objParam->
GetVal();
1090 apmon_values[looper] = (
char *) (bufDouble + looper);
1093 if (!strcmp(monobj->
ClassName(),
"TParameter<long>")) {
1097 Info(
"SendParameters",
"adding tag %s with val %ld",
1100 apmon_params[looper] = (
char *) objParam->
GetName();
1101 apmon_types[looper] = XDR_REAL64;
1102 bufDouble[looper] = objParam->
GetVal();
1103 apmon_values[looper] = (
char *) (bufDouble + looper);
1106 if (!strcmp(monobj->
ClassName(),
"TParameter<float>")) {
1110 Info(
"SendParameters",
"adding tag %s with val %f",
1113 apmon_params[looper] = (
char *) objParam->
GetName();
1114 apmon_types[looper] = XDR_REAL64;
1115 bufDouble[looper] = objParam->
GetVal();
1116 apmon_values[looper] = (
char *) (bufDouble + looper);
1119 if (!strcmp(monobj->
ClassName(),
"TParameter<int>")) {
1123 Info(
"SendParameters",
"adding tag %s with val %d",
1126 apmon_params[looper] = (
char *) objParam->
GetName();
1127 apmon_types[looper] = XDR_REAL64;
1128 bufDouble[looper] = objParam->
GetVal();
1129 apmon_values[looper] = (
char *) (bufDouble + looper);
1135 apmon_nparams = looper;
1138 Info(
"SendParameters",
"n: %d name: %s identifier %s ...,",
1139 apmon_nparams,
GetName(), identifier);
1141 ((ApMon *) fApmon)->sendParameters((
char *)
GetName(), (
char*)identifier,
1142 apmon_nparams, apmon_params,
1143 apmon_types, apmon_values);
1158 ((ApMon *) fApmon)->setLogLevel((
char *) loglevel);
1166 std::cout <<
"Site (Farm) : " << fName << std::endl;
1167 std::cout <<
"JobId (Node) : " << fJobId << std::endl;
1168 std::cout <<
"SubJobId (Node) : " << fSubJobId << std::endl;
1169 std::cout <<
"HostName : " << fHostname << std::endl;
1170 std::cout <<
"Pid : " << fPid << std::endl;
1171 std::cout <<
"Inititialized : " << fInitialized << std::endl;
1172 std::cout <<
"Verbose : " << fVerbose << std::endl;
virtual Bool_t SendInfoStatus(const char *status)
Sends a <status> text to MonaLisa following the process scheme: <site> –> <jobid> –> 'status' = <st...
virtual const char * GetName() const
Returns name of object.
std::string GetName(const std::string &scope_name)
virtual UInt_t GetUniqueID() const
Return the unique object id.
virtual Bool_t SendFileWriteProgress(TFile *file)
static long int sum(long int i)
virtual int GetPid()
Get process id.
virtual Bool_t SendFileCloseEvent(TFile *file)
void AddLast(TObject *obj)
Add object at the end of the list.
Bool_t TestBit(UInt_t f) const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void Init(const char *monserver, const char *montag, const char *monid, const char *monsubid, const char *option)
Creates a TMonaLisaWriter object to send monitoring information to a MonaLisa server using the MonaLi...
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
const char * GetText() const
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
const char * GetName() const
Returns name of object.
virtual Long64_t GetBytesWritten() const
Return the total number of bytes written so far to the file.
virtual Bool_t SendProcessingProgress(Double_t nevent, Double_t nbytes, Bool_t force=kFALSE)
Send the procesing progress to MonaLisa.
TObject * FindObject(const char *name) const
Find object using its name.
TMonaLisaWriter(const TMonaLisaWriter &)
virtual void SetLogLevel(const char *loglevel="WARNING")
Set MonaLisa log level.
virtual Bool_t SendFileOpenProgress(TFile *file, TList *openphases, const char *openphasename, Bool_t forcesend=kFALSE)
Send the fileopen progress to MonaLisa.
const char * GetHost() const
virtual const char * ClassName() const
Returns name of class to which the object belongs.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
virtual Bool_t SendInfoTime()
Sends the current time to MonaLisa following the processing scheme <site> –> <jobid> –> 'time' = >u...
virtual ~TMonaLisaWriter()
Cleanup.
The TNamed class is the base class for all named ROOT classes.
virtual Bool_t SendInfoUser(const char *user=0)
Sends the <user> text to MonaLisa following the process scheme: <site> –> <jobid> –> 'user' = <user...
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual const char * Getenv(const char *env)
Get environment variable.
void Info(const char *location, const char *msgfmt,...)
virtual Bool_t SendParameters(TList *valuelist, const char *identifier=0)
Send the parameters to MonaLisa.
void Error(const char *location, const char *msgfmt,...)
Named parameter, streamable and storable.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
R__EXTERN TSystem * gSystem
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
char * Form(const char *fmt,...)
virtual Long64_t GetBytesRead() const
virtual Bool_t IsOpen() const
Returns kTRUE in case file is open and kFALSE if file is not open.
virtual const char * HostName()
Return the system's host name.
virtual Bool_t SendProcessingStatus(const char *status, Bool_t restarttimer=kFALSE)
Send the procesing status 'status' to MonaLisa following the processing scheme: <site> –> <jobid> –...
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Mother of all ROOT objects.
virtual const TUrl * GetEndpointUrl() const
virtual void Add(TObject *obj)
virtual Bool_t SendInfoDescription(const char *jobtag)
Sends the description <jobtag> following the processing scheme: <site> –> <jobid> –> 'jobname' = <j...
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
void Print(Option_t *option="") const
Print info about MonaLisa object.
Double_t GetValue() const
const AParamType & GetVal() const
const char * GetUser() const
virtual Int_t GetSize() const
Bool_t SendFileCheckpoint(TFile *file)
static Long64_t GetFileCounter()
virtual Bool_t SendFileReadProgress(TFile *file)
virtual const char * GetTitle() const
Returns title of object.
const char * Data() const