#include "TPacketizerFile.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TMath.h"
#include "TObjString.h"
#include "TFileInfo.h"
#include "TFileCollection.h"
#include "THashList.h"
class TPacketizerFile::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
friend class TPacketizerFile;
private:
Long64_t fLastProcessed;
Double_t fSpeed;
Double_t fTimeInstant;
TNtupleD *fCircNtp;
Long_t fCircLvl;
public:
TSlaveStat(TSlave *sl, TList *input);
~TSlaveStat();
void GetCurrentTime();
void UpdatePerformance(Double_t time);
TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
};
class TPacketizerFile::TIterObj : public TObject {
private:
TString fName;
TIter *fIter;
public:
TIterObj(const char *n, TIter *iter) : fName(n), fIter(iter) { }
virtual ~TIterObj() { if (fIter) delete fIter; }
const char *GetName() const {return fName;}
TIter *GetIter() const {return fIter;}
void Print(Option_t* option = "") const;
};
ClassImp(TPacketizerFile)
TPacketizerFile::TPacketizerFile(TList *workers, Long64_t, TList *input,
TProofProgressStatus *st)
: TVirtualPacketizer(input, st)
{
PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
ResetBit(TObject::kInvalidObject);
fValid = kFALSE;
fAssigned = 0;
fProcNotAssigned = kTRUE;
fAddFileInfo = kFALSE;
if (!input || (input && input->GetSize() <= 0)) {
Error("TPacketizerFile", "input file is undefined or empty!");
SetBit(TObject::kInvalidObject);
return;
}
Int_t procnotass = 1;
if (TProof::GetParameter(input, "PROOF_ProcessNotAssigned", procnotass) == 0) {
if (procnotass == 0) {
Info("TPacketizerFile", "files not assigned to workers will not be processed");
fProcNotAssigned = kFALSE;
}
}
Int_t addfileinfo = 0;
if (TProof::GetParameter(input, "PROOF_IncludeFileInfoInPacket", addfileinfo) == 0) {
if (addfileinfo == 1) {
Info("TPacketizerFile",
"TFileInfo object will be included in the packet as associated object");
fAddFileInfo = kTRUE;
}
}
if (!(fFiles = dynamic_cast<TMap *>(input->FindObject("PROOF_FilesToProcess")))) {
Error("TPacketizerFile", "map of files to be processed/created not found");
SetBit(TObject::kInvalidObject);
return;
}
fSlaveStats = new TMap;
fSlaveStats->SetOwner(kFALSE);
TList nodes;
nodes.SetOwner(kTRUE);
TSlave *wrk;
TIter si(workers);
while ((wrk = (TSlave *) si.Next())) {
fSlaveStats->Add(wrk, new TSlaveStat(wrk, input));
TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
Info("TPacketizerFile", "worker: %s", wrkname.Data());
if (!nodes.FindObject(wrkname)) nodes.Add(new TObjString(wrkname));
}
fIters = new TList;
fIters->SetOwner(kTRUE);
fTotalEntries = 0;
fNotAssigned = new TList;
fNotAssigned->SetName("*");
TIter nxl(fFiles);
TObject *key, *o = 0;
while ((key = nxl()) != 0) {
THashList *wrklist = dynamic_cast<THashList *>(fFiles->GetValue(key));
if (!wrklist) {
TFileCollection *fc = dynamic_cast<TFileCollection *>(fFiles->GetValue(key));
if (fc) wrklist = fc->GetList();
}
if (wrklist) {
TString hname = TUrl(key->GetName()).GetHostFQDN();
if ((o = nodes.FindObject(hname))) {
fTotalEntries += wrklist->GetSize();
fIters->Add(new TIterObj(hname, new TIter(wrklist)));
PDB(kPacketizer,2)
Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
} else {
TIter nxf(wrklist);
while ((o = nxf()))
fNotAssigned->Add(o);
PDB(kPacketizer,2)
Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
wrklist->GetSize(), key->GetName(), hname.Data());
}
}
}
if (fNotAssigned && fNotAssigned->GetSize() > 0) {
fTotalEntries += fNotAssigned->GetSize();
fIters->Add(new TIterObj("*", new TIter(fNotAssigned)));
Info("TPacketizerFile", "non-assigned files: %d", fNotAssigned->GetSize());
fNotAssigned->Print();
}
if (fTotalEntries <= 0) {
Error("TPacketizerFile", "no file path in the map!");
SetBit(TObject::kInvalidObject);
SafeDelete(fIters);
return;
} else {
Info("TPacketizerFile", "processing %lld files", fTotalEntries);
fIters->Print();
}
fStopwatch = new TStopwatch();
fStopwatch->Start();
fValid = kTRUE;
PDB(kPacketizer,1) Info("TPacketizerFile", "return");
return;
}
TPacketizerFile::~TPacketizerFile()
{
if (fNotAssigned) fNotAssigned->SetOwner(kFALSE);
SafeDelete(fNotAssigned);
if (fIters) fIters->SetOwner(kTRUE);
SafeDelete(fIters);
SafeDelete(fStopwatch);
}
Double_t TPacketizerFile::GetCurrentTime()
{
Double_t retValue = fStopwatch->RealTime();
fStopwatch->Continue();
return retValue;
}
Float_t TPacketizerFile::GetCurrentRate(Bool_t &all)
{
all = kTRUE;
Float_t currate = 0.;
if (fSlaveStats && fSlaveStats->GetSize() > 0) {
TIter nxw(fSlaveStats);
TObject *key;
while ((key = nxw()) != 0) {
TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
currate += wrkstat->GetProgressStatus()->GetCurrentRate();
} else {
all = kFALSE;
}
}
}
return currate;
}
TDSetElement *TPacketizerFile::GetNextPacket(TSlave *wrk, TMessage *r)
{
TDSetElement *elem = 0;
if (!fValid) return elem;
TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
if (!wrkstat) {
Error("GetNextPacket", "could not find stat object for worker '%s'!", wrk->GetName());
return elem;
}
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
Double_t latency = 0., proctime = 0., proccpu = 0.;
Long64_t bytesRead = -1;
Long64_t totalEntries = -1;
Long64_t totev = 0;
Long64_t numev = -1;
TProofProgressStatus *status = 0;
if (wrk->GetProtocol() > 18) {
(*r) >> latency;
(*r) >> status;
TProofProgressStatus *progress = 0;
if (status) {
numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
progress = wrkstat->AddProcessed(status);
if (progress) {
proctime = progress->GetProcTime();
proccpu = progress->GetCPUTime();
totev = status->GetEntries();
bytesRead = progress->GetBytesRead();
delete progress;
}
delete status;
} else
Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
} else {
(*r) >> latency >> proctime >> proccpu;
if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
if (r->BufferSize() > r->Length()) (*r) >> totev;
numev = totev - wrkstat->GetEntriesProcessed();
wrkstat->GetProgressStatus()->IncEntries(numev);
wrkstat->GetProgressStatus()->SetLastUpdate();
}
fProgressStatus->IncEntries(numev);
fProgressStatus->SetLastUpdate();
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
wrk->GetOrdinal(), wrk->GetName(),
numev, latency, proctime, proccpu, bytesRead);
if (gPerfStats != 0) {
gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(), "", numev,
latency, proctime, proccpu, bytesRead);
}
if (fAssigned == fTotalEntries) {
HandleTimer(0);
return 0;
}
if (fStop) {
HandleTimer(0);
return 0;
}
PDB(kPacketizer,2)
Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
wrk->GetName());
TObject *nextfile = 0;
TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrkname));
if (io) {
if (io->GetIter())
nextfile = io->GetIter()->Next();
}
if (!nextfile && fProcNotAssigned) {
if ((io = dynamic_cast<TIterObj *>(fIters->FindObject("*")))) {
if (io->GetIter())
nextfile = io->GetIter()->Next();
}
}
if (!nextfile) return elem;
TString filename;
TObjString *os = 0;
TFileInfo *fi = 0;
if ((os = dynamic_cast<TObjString *>(nextfile))) {
filename = os->GetName();
} else {
if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
filename = fi->GetCurrentUrl()->GetUrl();
}
if (filename.IsNull()) {
Warning("GetNextPacket", "found unsupported object of type '%s' in list: it must"
" be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
return elem;
}
PDB(kPacketizer,2)
Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
elem = new TDSetElement(filename, "", "", 0, 1);
elem->SetBit(TDSetElement::kEmpty);
if (fAddFileInfo && fi) {
elem->AddAssocObj(fi);
PDB(kPacketizer,2) fi->Print("L");
}
fAssigned += 1;
return elem;
}
TPacketizerFile::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
: fLastProcessed(0),
fSpeed(0), fTimeInstant(0), fCircLvl(5)
{
fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
TProof::GetParameter(input, "PROOF_TPacketizerFileCircularity", fCircLvl);
fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
fCircNtp->SetCircular(fCircLvl);
fSlave = slave;
fStatus = new TProofProgressStatus();
}
TPacketizerFile::TSlaveStat::~TSlaveStat()
{
SafeDelete(fCircNtp);
}
void TPacketizerFile::TSlaveStat::UpdatePerformance(Double_t time)
{
Double_t ttot = time;
Double_t *ar = fCircNtp->GetArgs();
Int_t ne = fCircNtp->GetEntries();
if (ne <= 0) {
fCircNtp->Fill(0., 0);
fSpeed = 0.;
return;
}
fCircNtp->GetEntry(ne-1);
ttot = ar[0] + time;
fCircNtp->Fill(ttot, GetEntriesProcessed());
fCircNtp->GetEntry(0);
Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
fSpeed = nevts / dtime;
PDB(kPacketizer,2)
Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
time, dtime, nevts, fSpeed);
}
TProofProgressStatus *TPacketizerFile::TSlaveStat::AddProcessed(TProofProgressStatus *st)
{
if (st) {
Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
fStatus->SetLastProcTime(0.);
TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
*fStatus += *diff;
fStatus->SetLastEntries(lastEntries);
return diff;
} else {
Error("AddProcessed", "status arg undefined");
return 0;
}
}
void TPacketizerFile::TIterObj::Print(Option_t *) const
{
Printf("Iterator '%s' controls %d units", GetName(),
((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
: -1));
}