#include "TPacketizerUnit.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TMath.h"
#include "TObjString.h"
using namespace TMath;
class TPacketizerUnit::TSlaveStat : public TObject {
friend class TPacketizerUnit;
private:
TSlave *fSlave;
Long64_t fProcessed;
Long64_t fLastProcessed;
Double_t fSpeed;
Double_t fTimeInstant;
TNtupleD *fCircNtp;
Long_t fCircLvl;
public:
TSlaveStat(TSlave *sl, TList *input);
~TSlaveStat();
void GetCurrentTime();
const char *GetName() const { return fSlave->GetName(); }
Long64_t GetEntriesProcessed() const { return fProcessed; }
void UpdatePerformance(Double_t time);
};
TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
: fSlave(slave), fProcessed(0), fLastProcessed(0),
fSpeed(0), fTimeInstant(0)
{
fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
fCircNtp->SetCircular(fCircLvl);
}
TPacketizerUnit::TSlaveStat::~TSlaveStat()
{
SafeDelete(fCircNtp);
}
void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
{
Double_t ttot = time;
Double_t *ar = fCircNtp->GetArgs();
Int_t ne = fCircNtp->GetEntries();
if (ne <= 0) {
fCircNtp->Fill(0., 0);
fSpeed = 0.;
return;
}
fCircNtp->GetEntry(ne-1);
ttot = ar[0] + time;
fCircNtp->Fill(ttot, fProcessed);
fCircNtp->GetEntry(0);
Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
Long64_t nevts = fProcessed - (Long64_t)ar[1];
fSpeed = nevts / dtime;
PDB(kPacketizer,2)
Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
time, dtime, nevts, fSpeed);
}
ClassImp(TPacketizerUnit)
TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input)
: TVirtualPacketizer(input)
{
PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
fSlaveStats = 0;
fPackets = 0;
fTimeLimit = 1;
TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", fTimeLimit);
PDB(kPacketizer,1)
Info("TPacketizerUnit", "time limit is %lf", fTimeLimit);
fProcessing = 0;
fStopwatch = new TStopwatch();
fPackets = new TList;
fPackets->SetOwner();
fSlaveStats = new TMap;
fSlaveStats->SetOwner(kFALSE);
TSlave *slave;
TIter si(slaves);
while ((slave = (TSlave*) si.Next()))
fSlaveStats->Add(slave, new TSlaveStat(slave, input));
fTotalEntries = num;
fStopwatch->Start();
PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
}
TPacketizerUnit::~TPacketizerUnit()
{
if (fSlaveStats)
fSlaveStats->DeleteValues();
SafeDelete(fSlaveStats);
SafeDelete(fPackets);
SafeDelete(fStopwatch);
}
Double_t TPacketizerUnit::GetCurrentTime()
{
Double_t retValue = fStopwatch->RealTime();
fStopwatch->Continue();
return retValue;
}
Long64_t TPacketizerUnit::GetEntriesProcessed(TSlave *slave) const
{
if (!fSlaveStats)
return 0;
TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(slave);
if (!slstat)
return 0;
return slstat->GetEntriesProcessed();
}
TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
{
if (!fValid)
return 0;
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(sl);
R__ASSERT(slstat != 0);
Double_t latency, proctime, proccpu;
Long64_t bytesRead = -1;
Long64_t totalEntries = -1;
(*r) >> latency >> proctime >> proccpu;
if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
Long64_t totev = 0;
if (r->BufferSize() > r->Length()) (*r) >> totev;
Long64_t numev = totev - slstat->fProcessed;
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s: fProcessed %lld\t", sl->GetOrdinal(), fProcessed);
fProcessing = 0;
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
sl->GetOrdinal(), sl->GetName(),
numev, latency, proctime, proccpu, bytesRead);
if (gPerfStats != 0) {
gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
latency, proctime, proccpu, bytesRead);
}
if (fProcessed == fTotalEntries) {
HandleTimer(0);
SafeDelete(fProgress);
}
if (fStop) {
HandleTimer(0);
return 0;
}
Long64_t num;
Double_t cTime = GetCurrentTime();
if (slstat->fCircNtp->GetEntries() <= 0) {
PDB(kPacketizer,2) Info("GetNextPacket"," calibration: "
"total entries %lld, workers %d",
fTotalEntries, fSlaveStats->GetSize());
Long64_t avg = fTotalEntries/(fSlaveStats->GetSize());
num = (avg > 5) ? 5 : avg;
slstat->UpdatePerformance(0.);
} else {
slstat->fProcessed += slstat->fLastProcessed;
slstat->UpdatePerformance(proctime);
TMapIter *iter = (TMapIter *)fSlaveStats->MakeIterator();
TSlave * tmpSlave;
TSlaveStat * tmpStat;
Double_t sumSpeed = 0;
Double_t sumBusy = 0;
while ((tmpSlave = (TSlave *)iter->Next())) {
tmpStat = (TSlaveStat *)fSlaveStats->GetValue(tmpSlave);
if ((cTime - tmpStat->fTimeInstant) > 4*fTimeLimit)
tmpStat->fSpeed = 0;
PDB(kPacketizer,2)
Info("GetNextPacket",
"worker-%s: speed %lf", tmpSlave->GetOrdinal(), tmpStat->fSpeed);
if (tmpStat->fSpeed > 0) {
sumSpeed += tmpStat->fSpeed;
if ((tmpStat->fTimeInstant) && (cTime - tmpStat->fTimeInstant > 0)) {
Double_t busyspeed =
((tmpStat->fTimeInstant - cTime)*tmpStat->fSpeed + tmpStat->fLastProcessed);
if (busyspeed > 0)
sumBusy += busyspeed;
}
}
}
PDB(kPacketizer,2)
Info("GetNextPacket", "worker-%s: sum speed: %lf, sum busy: %f",
sl->GetOrdinal(), sumSpeed, sumBusy);
if ((fTotalEntries - fProcessed)/(slstat->fSpeed) < fTimeLimit) {
num = (fTotalEntries - fProcessed);
} else {
Double_t optTime = (fTotalEntries - fProcessed + sumBusy)/sumSpeed;
num = (optTime > fTimeLimit) ? Nint(fTimeLimit*(slstat->fSpeed)) : Nint(optTime*(slstat->fSpeed));
PDB(kPacketizer,2)
Info("GetNextPacket", "opTime %lf", optTime);
}
}
fProcessing = (num < (fTotalEntries - fProcessed)) ? num : (fTotalEntries - fProcessed);
slstat->fLastProcessed = fProcessing;
slstat->fTimeInstant = cTime;
PDB(kPacketizer,2)
Info("GetNextPacket", "worker-%s: num %lld processing %lld remained %lld",sl->GetOrdinal(),
num, fProcessing, (fTotalEntries - fProcessed - fProcessing));
TDSetElement *elem = new TDSetElement("", "", "", 0, fProcessing);
elem->SetBit(TDSetElement::kEmpty);
fProcessed += slstat->fLastProcessed;
return elem;
}
Last update: Thu Jan 17 09:00:49 2008
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.