#include "TPacketizerAdaptive.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEnv.h"
#include "TEntryList.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TRandom.h"
#include "TMath.h"
#include "TObjString.h"
class TPacketizerAdaptive::TFileStat : public TObject {
private:
Bool_t fIsDone;
TFileNode *fNode;
TDSetElement *fElement;
Long64_t fNextEntry;
public:
TFileStat(TFileNode *node, TDSetElement *elem);
Bool_t IsDone() const {return fIsDone;}
void SetDone() {fIsDone = kTRUE;}
TFileNode *GetNode() const {return fNode;}
TDSetElement *GetElement() const {return fElement;}
Long64_t GetNextEntry() const {return fNextEntry;}
void MoveNextEntry(Long64_t step) {fNextEntry += step;}
};
TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
: fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
{
}
class TPacketizerAdaptive::TFileNode : public TObject {
private:
TString fNodeName;
TList *fFiles;
TObject *fUnAllocFileNext;
TList *fActFiles;
TObject *fActFileNext;
Int_t fMySlaveCnt;
Int_t fExtSlaveCnt;
Int_t fRunSlaveCnt;
Long64_t fProcessed;
Long64_t fEvents;
public:
TFileNode(const char *name);
~TFileNode() { delete fFiles; delete fActFiles; }
void IncMySlaveCnt() { fMySlaveCnt++; }
Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
void IncRunSlaveCnt() { fRunSlaveCnt++; }
void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
Bool_t IsSortable() const { return kTRUE; }
Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
void IncProcessed(Long64_t nEvents)
{ fProcessed += nEvents; }
Long64_t GetProcessed() const { return fProcessed; }
Long64_t GetEventsLeftPerSlave() const
{ return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
const char *GetName() const { return fNodeName.Data(); }
Long64_t GetNEvents() const { return fEvents; }
void Add(TDSetElement *elem)
{
TFileStat *f = new TFileStat(this, elem);
fFiles->Add(f);
if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
}
TFileStat *GetNextUnAlloc()
{
TObject *next = fUnAllocFileNext;
if (next != 0) {
fActFiles->Add(next);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
}
return (TFileStat *) next;
}
TFileStat *GetNextActive()
{
TObject *next = fActFileNext;
if (fActFileNext != 0) {
fActFileNext = fActFiles->After(fActFileNext);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
}
return (TFileStat *) next;
}
void RemoveActive(TFileStat *file)
{
if (fActFileNext == file) fActFileNext = fActFiles->After(file);
fActFiles->Remove(file);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
}
Int_t Compare(const TObject *other) const
{
const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
R__ASSERT(obj != 0);
if (fgNetworkFasterThanHD) {
Int_t myVal = GetRunSlaveCnt();
Int_t otherVal = obj->GetRunSlaveCnt();
if (myVal < otherVal) {
return -1;
} else if (myVal > otherVal) {
return 1;
} else {
if ((fEvents - fProcessed) >
(obj->GetNEvents() - obj->GetProcessed())) {
return -1;
} else {
return 1;
}
}
} else {
Long64_t diffEvents =
GetEventsLeftPerSlave() - obj->GetEventsLeftPerSlave();
Int_t myExtSlaves = GetExtSlaveCnt();
Int_t otherExtSlaves = obj->GetExtSlaveCnt();
Long64_t avEventsLeft = (GetEventsLeftPerSlave()
+ obj->GetEventsLeftPerSlave())/2;
Int_t mySlavesProcRemote = GetSlaveCnt() - GetRunSlaveCnt();
Int_t otherSlavesProcRemote = obj->GetSlaveCnt()
- obj->GetRunSlaveCnt();
if ( mySlavesProcRemote < otherSlavesProcRemote ) {
if (diffEvents < -(avEventsLeft / 2)
&& obj->GetExtSlaveCnt() < TPacketizerAdaptive::fgMaxSlaveCnt)
return 1;
else
return -1;
} else if ( mySlavesProcRemote > otherSlavesProcRemote ) {
if (diffEvents > (avEventsLeft / 2)
&& GetExtSlaveCnt() < TPacketizerAdaptive::fgMaxSlaveCnt)
return -1;
else
return 1;
} else if (myExtSlaves < otherExtSlaves) {
if (diffEvents < -(avEventsLeft / 3)
&& obj->GetExtSlaveCnt() < TPacketizerAdaptive::fgMaxSlaveCnt)
return 1;
else
return -1;
} else if (myExtSlaves > otherExtSlaves) {
if (diffEvents > (avEventsLeft / 3)
&& GetExtSlaveCnt() < TPacketizerAdaptive::fgMaxSlaveCnt)
return -1;
else
return 1;
} else {
Int_t myOwnSlaves = GetMySlaveCnt();
Int_t otherOwnSlaves = obj->GetMySlaveCnt();
if (myOwnSlaves < otherOwnSlaves) {
if (diffEvents < -(avEventsLeft / 3)
&& obj->GetExtSlaveCnt() < TPacketizerAdaptive::fgMaxSlaveCnt)
return 1;
else
return -1;
} else if (myOwnSlaves > otherOwnSlaves) {
if (diffEvents > (avEventsLeft / 3)
&& GetExtSlaveCnt() < TPacketizerAdaptive::fgMaxSlaveCnt)
return -1;
else
return 1;
} else {
if (diffEvents > 0) {
return -1;
} else if (diffEvents < 0) {
return 1;
} else
return 0;
}
}
}
}
void Print(Option_t *) const
{
cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
<< "\tMySlaveCount " << fMySlaveCnt
<< "\tSlaveCount " << fExtSlaveCnt << endl;
}
void Reset()
{
fUnAllocFileNext = fFiles->First();
fActFiles->Clear();
fActFileNext = 0;
fExtSlaveCnt = 0;
fMySlaveCnt = 0;
fRunSlaveCnt = 0;
}
};
TPacketizerAdaptive::TFileNode::TFileNode(const char *name)
: fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
fActFileNext(0), fMySlaveCnt(0), fExtSlaveCnt(0), fProcessed(0), fEvents(0)
{
fFiles->SetOwner();
fActFiles->SetOwner(kFALSE);
}
class TPacketizerAdaptive::TSlaveStat : public TObject {
friend class TPacketizerAdaptive;
private:
TSlave *fSlave;
TFileNode *fFileNode;
TFileStat *fCurFile;
TDSetElement *fCurElem;
Long64_t fProcessed;
Float_t fProcTime;
Long64_t fCurProcessed;
Float_t fCurProcTime;
TList *fDSubSet;
public:
TSlaveStat(TSlave *slave);
~TSlaveStat();
TFileNode *GetFileNode() const { return fFileNode; }
const char *GetName() const { return fSlave->GetName(); }
Long64_t GetEntriesProcessed() const { return fProcessed; }
TFileStat *GetCurFile() { return fCurFile; }
void SetFileNode(TFileNode *node) { fFileNode = node; }
void UpdateRates(Long64_t nEvents, Float_t time);
Float_t GetAvgRate() { return (fProcTime?fProcessed/fProcTime:0); }
Float_t GetCurRate() {
return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
Int_t GetLocalEventsLeft() {
return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
TList *GetProcessedSubSet() { return fDSubSet; }
Int_t AddProcessed();
};
TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
: fSlave(slave), fFileNode(0), fCurFile(0), fCurElem(0), fProcessed(0),
fProcTime(0), fCurProcessed(0), fCurProcTime(0)
{
fDSubSet = new TList();
fDSubSet->SetOwner();
}
TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
{
SafeDelete(fDSubSet);
}
void TPacketizerAdaptive::TSlaveStat::UpdateRates(Long64_t nEvents,
Float_t time)
{
if (fCurFile->IsDone()) {
fCurProcTime = 0;
fCurProcessed = 0;
} else {
fCurProcTime += time;
fCurProcessed += nEvents;
}
fProcTime += time;
fProcessed += nEvents;
fCurFile->GetNode()->IncProcessed(nEvents);
}
Int_t TPacketizerAdaptive::TSlaveStat::AddProcessed()
{
if (fDSubSet && fCurElem) {
fDSubSet->Add(fCurElem);
return 0;
} else
return -1;
}
ClassImp(TPacketizerAdaptive)
Int_t TPacketizerAdaptive::fgMaxSlaveCnt = 2;
Int_t TPacketizerAdaptive::fgPacketAsAFraction = 4;
Double_t TPacketizerAdaptive::fgMinPacketTime = 3;
Int_t TPacketizerAdaptive::fgNetworkFasterThanHD = 1;
TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves,
Long64_t first, Long64_t num, TList *input)
: TVirtualPacketizer(input)
{
PDB(kPacketizer,1) Info("TPacketizerAdaptive",
"enter (first %lld, num %lld)", first, num);
fSlaveStats = 0;
fSlaveStats = 0;
fUnAllocated = 0;
fActive = 0;
fFileNodes = 0;
fCumProcTime = 0;
fMaxPerfIdx = 1;
Long_t maxSlaveCnt = 0;
if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
fgMaxSlaveCnt = (Int_t) maxSlaveCnt;
} else {
SysInfo_t si;
gSystem->GetSysInfo(&si);
if (si.fCpus > 2)
fgMaxSlaveCnt = si.fCpus;
}
fForceLocal = kFALSE;
Long_t forceLocal = 0;
if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
if (forceLocal == 1)
fForceLocal = kTRUE;
else
Info("Process",
"The only accepted value of PROOF_ForceLocal parameter is 1 !");
}
Long_t packetAsAFraction = 0;
if (TProof::GetParameter(input, "PROOF_PacketAsAFraction",
packetAsAFraction) == 0) {
if (packetAsAFraction > 0) {
fgPacketAsAFraction = (Int_t)packetAsAFraction;
Info("Process",
"using alternate fraction of query time as a packet size: %ld",
packetAsAFraction);
} else
Info("Process", "packetAsAFraction parameter must be higher than 0");
}
Double_t minPacketTime = 0;
if (TProof::GetParameter(input, "PROOF_MinPacketTime",
minPacketTime) == 0) {
Info("Process", "using alternate minimum time of a packet: %ld",
packetAsAFraction);
fgMinPacketTime = (Int_t) minPacketTime;
}
fgNetworkFasterThanHD = gEnv->GetValue("ProofServ.NetworkFasterThanHD", 1);
if (fgNetworkFasterThanHD != 1)
Info("TPacketizerAdaptive","fgNetworkFasterThanHD set to %d",
fgNetworkFasterThanHD);
Double_t baseLocalPreference = 1.2;
TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference);
fBaseLocalPreference = (Float_t)baseLocalPreference;
fFileNodes = new TList;
fFileNodes->SetOwner();
fUnAllocated = new TList;
fUnAllocated->SetOwner(kFALSE);
fActive = new TList;
fActive->SetOwner(kFALSE);
fValid = kTRUE;
dset->Reset();
TDSetElement *e;
while ((e = (TDSetElement*)dset->Next())) {
if (e->GetValid()) continue;
TUrl url = e->GetFileName();
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4)) ) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if (node == 0) {
node = new TFileNode(host);
fFileNodes->Add(node);
}
node->Add( e );
}
fSlaveStats = new TMap;
fSlaveStats->SetOwner(kFALSE);
TSlave *slave;
TIter si(slaves);
while ((slave = (TSlave*) si.Next())) {
fSlaveStats->Add( slave, new TSlaveStat(slave) );
fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
slave->GetPerfIdx() : fMaxPerfIdx;
}
Reset();
ValidateFiles(dset, slaves);
if (!fValid) return;
Int_t files = 0;
fTotalEntries = 0;
fUnAllocated->Clear();
fActive->Clear();
fFileNodes->Clear();
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing Range: First %lld, Num %lld", first, num);
dset->Reset();
Long64_t cur = 0;
while (( e = (TDSetElement*)dset->Next())) {
TUrl url = e->GetFileName();
Long64_t eFirst = e->GetFirst();
Long64_t eNum = e->GetNum();
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: First %lld, Num %lld (cur %lld)", eFirst, eNum, cur);
if (!e->GetEntryList()) {
if (cur + eNum < first) {
cur += eNum;
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: skip element cur %lld", cur);
continue;
}
if (num != -1 && (first+num <= cur)) {
cur += eNum;
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: drop element cur %lld", cur);
continue;
}
if (num != -1 && (first+num < cur+eNum)) {
e->SetNum( first + num - cur );
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: Adjust end %lld", first + num - cur);
}
if (cur < first) {
e->SetFirst( eFirst + (first - cur) );
e->SetNum( e->GetNum() - (first - cur) );
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: Adjust start %lld and end %lld",
eFirst + (first - cur), first + num - cur);
}
cur += eNum;
} else {
Long64_t n = 0;
TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
if (enl) {
n = enl->GetN();
} else {
TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
n = evl ? evl->GetN() : n;
}
if (!n)
continue;
}
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: next cur %lld", cur);
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4)) ) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if ( node == 0 ) {
node = new TFileNode( host );
fFileNodes->Add( node );
}
++files;
fTotalEntries += e->GetNum();
node->Add( e );
node->IncEvents(e->GetNum());
PDB(kPacketizer,2) e->Print("a");
}
TEntryList *enl = dynamic_cast<TEntryList *>(dset->GetEntryList());
if (enl) {
fTotalEntries = enl->GetN();
} else {
TEventList *evl = dynamic_cast<TEventList *>(dset->GetEntryList());
fTotalEntries = evl ? evl->GetN() : fTotalEntries;
}
PDB(kGlobal,1)
Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
fTotalEntries, files, fFileNodes->GetSize());
Reset();
Int_t noRemoteFiles = 0;
fNEventsOnRemLoc = 0;
Int_t totalNumberOfFiles = 0;
TIter next(fFileNodes);
while (TFileNode *fn = (TFileNode*)next()) {
totalNumberOfFiles += fn->GetNumberOfFiles();
if (fn->GetSlaveCnt() == 0) {
noRemoteFiles += fn->GetNumberOfFiles();
fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
}
}
if (totalNumberOfFiles == 0) {
Info("TPacketizerAdaptive", "no valid or non-empty file found: setting invalid");
fValid = kFALSE;
return;
}
fFractionOfRemoteFiles = noRemoteFiles / totalNumberOfFiles;
Info("TPacketizerAdaptive",
"fraction of remote files %f", fFractionOfRemoteFiles);
if (!fValid)
SafeDelete(fProgress);
PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
}
TPacketizerAdaptive::~TPacketizerAdaptive()
{
if (fSlaveStats) {
fSlaveStats->DeleteValues();
}
SafeDelete(fSlaveStats);
SafeDelete(fUnAllocated);
SafeDelete(fActive);
SafeDelete(fFileNodes);
}
TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node)
{
TFileStat *file = 0;
if (node != 0) {
file = node->GetNextUnAlloc();
if (file == 0) RemoveUnAllocNode(node);
} else {
while (file == 0 && ((node = NextNode()) != 0)) {
file = node->GetNextUnAlloc();
if (file == 0) RemoveUnAllocNode(node);
}
}
if (file != 0) {
if (fActive->FindObject(node) == 0) {
fActive->Add(node);
}
}
return file;
}
TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
{
fUnAllocated->Sort();
PDB(kPacketizer,2) {
fUnAllocated->Print();
}
TFileNode *fn = (TFileNode*) fUnAllocated->First();
if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
PDB(kPacketizer,1) Info("NextNode",
"Reached Slaves per Node Limit (%d)", fgMaxSlaveCnt);
fn = 0;
}
return fn;
}
void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node)
{
fUnAllocated->Remove(node);
}
TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
{
TFileNode *node;
TFileStat *file = 0;
while (file == 0 && ((node = NextActiveNode()) != 0)) {
file = node->GetNextActive();
if (file == 0) RemoveActiveNode(node);
}
return file;
}
TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
{
fActive->Sort();
PDB(kPacketizer,2) {
Info("NextActiveNode", "enter");
fActive->Print();
}
TFileNode *fn = (TFileNode*) fActive->First();
if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
PDB(kPacketizer,1)
Info("NextActiveNode","reached Workers-per-Node limit (%d)", fgMaxSlaveCnt);
fn = 0;
}
return fn;
}
void TPacketizerAdaptive::RemoveActive(TFileStat *file)
{
TFileNode *node = file->GetNode();
node->RemoveActive(file);
if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
}
void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node)
{
fActive->Remove(node);
}
void TPacketizerAdaptive::Reset()
{
fUnAllocated->Clear();
fUnAllocated->AddAll(fFileNodes);
fActive->Clear();
TIter files(fFileNodes);
TFileNode *fn;
while ((fn = (TFileNode*) files.Next()) != 0) {
fn->Reset();
}
TIter slaves(fSlaveStats);
TObject *key;
while ((key = slaves.Next()) != 0) {
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
TFileNode *fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
if (fn != 0 ) {
slstat->SetFileNode(fn);
fn->IncMySlaveCnt();
}
slstat->fCurFile = 0;
}
}
void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves)
{
TMap slaves_by_sock;
TMonitor mon;
TList workers;
workers.AddAll(slaves);
TIter si(slaves);
TSlave *slave;
while ((slave = (TSlave*)si.Next()) != 0) {
PDB(kPacketizer,3) Info("ValidateFiles","socket added to monitor: %p (%s)",
slave->GetSocket(), slave->GetName());
mon.Add(slave->GetSocket());
slaves_by_sock.Add(slave->GetSocket(),slave);
}
mon.DeActivateAll();
((TProof*)gProof)->DeActivateAsyncInput();
((TProof*)gProof)->fCurrentMonitor = &mon;
TString msg("Validating files");
UInt_t n = 0;
UInt_t tot = dset->GetListOfElements()->GetSize();
Bool_t st = kTRUE;
while (kTRUE) {
while( TSlave *s = (TSlave*)workers.First() ) {
workers.Remove(s);
TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
TFileNode *node = 0;
TFileStat *file = 0;
if ( (node = slstat->GetFileNode()) != 0 ) {
file = GetNextUnAlloc(node);
if ( file == 0 ) {
slstat->SetFileNode(0);
}
}
if (file == 0) {
file = GetNextUnAlloc();
}
if ( file != 0 ) {
RemoveActive(file);
slstat->fCurFile = file;
file->GetNode()->IncExtSlaveCnt(slstat->GetName());
TMessage m(kPROOF_GETENTRIES);
TDSetElement *elem = file->GetElement();
m << dset->IsTree()
<< TString(elem->GetFileName())
<< TString(elem->GetDirectory())
<< TString(elem->GetObjName());
s->GetSocket()->Send( m );
mon.Activate(s->GetSocket());
PDB(kPacketizer,2)
Info("ValidateFiles",
"sent to slave-%s (%s) via %p GETENTRIES on %s %s %s %s",
s->GetOrdinal(), s->GetName(), s->GetSocket(),
dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
elem->GetDirectory(), elem->GetObjName());
}
}
if ( mon.GetActive() == 0 ) break;
PDB(kPacketizer,3) {
Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
TList *act = mon.GetListOfActives();
TIter next(act);
while (TSocket *s = (TSocket*) next()) {
TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
if (sl)
Info("ValidateFiles", " slave-%s (%s)",
sl->GetOrdinal(), sl->GetName());
}
delete act;
}
TSocket *sock = mon.Select();
mon.DeActivate(sock);
PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
TMessage *reply;
if ( sock->Recv(reply) <= 0 ) {
((TProof*)gProof)->MarkBad(slave);
fValid = kFALSE;
Error("ValidateFiles", "Recv failed! for slave-%s (%s)",
slave->GetOrdinal(), slave->GetName());
continue;
}
if ( reply->What() == kPROOF_FATAL ) {
Error("ValidateFiles", "kPROOF_FATAL from slave-%s (%s)",
slave->GetOrdinal(), slave->GetName());
((TProof*)gProof)->MarkBad(slave);
fValid = kFALSE;
continue;
} else if ( reply->What() == kPROOF_LOGFILE ) {
PDB(kPacketizer,3) Info("ValidateFiles", "got logfile");
Int_t size;
(*reply) >> size;
((TProof*)gProof)->RecvLogFile(sock, size);
mon.Activate(sock);
continue;
} else if ( reply->What() == kPROOF_LOGDONE ) {
PDB(kPacketizer,3) Info("ValidateFiles", "got logdone");
mon.Activate(sock);
continue;
} else if ( reply->What() != kPROOF_GETENTRIES ) {
Error("ValidateFiles",
"unexpected message type (%d) from slave-%s (%s)",
reply->What(), slave->GetOrdinal(), slave->GetName());
((TProof*)gProof)->MarkBad(slave);
fValid = kFALSE;
continue;
}
TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
TDSetElement *e = slavestat->fCurFile->GetElement();
slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
Long64_t entries;
(*reply) >> entries;
if ((reply->BufferSize() > reply->Length())) {
TString objname;
(*reply) >> objname;
e->SetTitle(objname);
}
e->SetTDSetOffset(entries);
if ( entries > 0 ) {
if (!e->GetEntryList()) {
if ( e->GetFirst() > entries ) {
Error("ValidateFiles",
"first (%d) higher then number of entries (%d) in %d",
e->GetFirst(), entries, e->GetFileName() );
slavestat->fCurFile->SetDone();
fValid = kFALSE;
}
if ( e->GetNum() == -1 ) {
e->SetNum( entries - e->GetFirst() );
} else if ( e->GetFirst() + e->GetNum() > entries ) {
Error("ValidateFiles",
"Num (%d) + First (%d) larger then number of keys/entries (%d) in %s",
e->GetNum(), e->GetFirst(), entries, e->GetFileName() );
e->SetNum( entries - e->GetFirst() );
}
}
n++;
gProof->SendDataSetStatus(msg, n, tot, st);
} else {
Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
if (gProofServ) {
TMessage m(kPROOF_MESSAGE);
m << TString(Form("Cannot get entries for file: %s - skipping",
e->GetFileName()));
gProofServ->GetSocket()->Send(m);
}
if (dset->Remove(e) == -1)
Error("ValidateFiles", "removing of not-registered element %p failed", e);
}
workers.Add(slave);
}
((TProof*)gProof)->ActivateAsyncInput();
((TProof*)gProof)->fCurrentMonitor = 0;
if (!fValid)
return;
Long64_t offset = 0;
Long64_t newOffset = 0;
TIter next(dset->GetListOfElements());
TDSetElement *el;
while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
newOffset = offset + el->GetTDSetOffset();
el->SetTDSetOffset(offset);
offset = newOffset;
}
}
Long64_t TPacketizerAdaptive::GetEntriesProcessed(TSlave *slave) const
{
if ( fSlaveStats == 0 ) return 0;
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
if ( slstat == 0 ) return 0;
return slstat->GetEntriesProcessed();
}
Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr)
{
TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
Long64_t num;
Float_t rate = slstat->GetCurRate();
if (!rate)
rate = slstat->GetAvgRate();
if (rate) {
Float_t avgProcRate = (fProcessed/(fCumProcTime / fSlaveStats->GetSize()));
Float_t packetTime;
packetTime = ((fTotalEntries - fProcessed)/avgProcRate)/fgPacketAsAFraction;
if (packetTime < fgMinPacketTime)
packetTime = fgMinPacketTime;
if (rate < 0.25 * slstat->GetAvgRate())
rate = (rate + slstat->GetAvgRate()) / 2;
if (rate < 0.20 * (fTotalEntries - fProcessed))
packetTime *= 2;
num = (Long64_t)(rate * packetTime);
} else {
Int_t packetSize = (fTotalEntries - fProcessed)
/ (6 * fgPacketAsAFraction * fSlaveStats->GetSize());
num = Long64_t(packetSize *
((Float_t)slstat->fSlave->GetPerfIdx() / fMaxPerfIdx));
}
if (num < 1) num = 1;
return num;
}
TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r)
{
if ( !fValid ) {
return 0;
}
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
R__ASSERT( slstat != 0 );
if ( slstat->fCurElem != 0 ) {
Double_t latency, proctime, proccpu;
Long64_t bytesRead = -1;
Long64_t totalEntries = -1;
Long64_t numev = slstat->fCurElem->GetNum();
slstat->AddProcessed();
(*r) >> latency >> proctime >> proccpu;
if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
Long64_t totev = 0;
if (r->BufferSize() > r->Length()) (*r) >> totev;
if (totev > 0)
numev = totev - slstat->fProcessed;
fProcessed += ((numev > 0) ? numev : 0);
fBytesRead += ((bytesRead > 0) ? bytesRead : 0);
slstat->UpdateRates(numev, proctime);
fCumProcTime += proctime;
PDB(kPacketizer,2)
Info("GetNextPacket","slave-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
sl->GetOrdinal(), sl->GetName(),
numev, latency, proctime, proccpu, bytesRead);
if (gPerfStats != 0) {
gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
numev, latency, proctime, proccpu, bytesRead);
}
slstat->fCurElem = 0;
if ( fProcessed == fTotalEntries ) {
HandleTimer(0);
delete fProgress; fProgress = 0;
}
}
if ( fStop ) {
HandleTimer(0);
return 0;
}
TFileStat *file = slstat->fCurFile;
if ( file != 0 && file->IsDone() ) {
file->GetNode()->DecExtSlaveCnt(slstat->GetName());
file->GetNode()->DecRunSlaveCnt();
if (gPerfStats != 0) {
gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
file->GetElement()->GetFileName(), kFALSE);
}
file = 0;
}
Long64_t avgEventsLeftPerSlave =
(fTotalEntries - fProcessed) / fSlaveStats->GetSize();
if (fTotalEntries == fProcessed)
return 0;
if ( file == 0) {
Bool_t openLocal;
Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
(0.4 *(fTotalEntries - fProcessed)));
if ( slstat->GetFileNode() != 0 ) {
fUnAllocated->Sort();
TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
Bool_t nonLocalNodePossible;
if (fForceLocal)
nonLocalNodePossible = 0;
else
nonLocalNodePossible = firstNonLocalNode?
(firstNonLocalNode->GetExtSlaveCnt() < fgMaxSlaveCnt):0;
openLocal = !nonLocalNodePossible;
Float_t slaveRate = slstat->GetAvgRate();
if ( nonLocalNodePossible ) {
if ( slstat->GetFileNode()->GetRunSlaveCnt() >
slstat->GetFileNode()->GetMySlaveCnt() - 1 )
openLocal = kTRUE;
else if ( slaveRate == 0 ) {
if ( slstat->GetLocalEventsLeft() * localPreference > (avgEventsLeftPerSlave))
openLocal = kTRUE;
else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
< slstat->GetLocalEventsLeft() * localPreference )
openLocal = kTRUE;
else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
openLocal = kTRUE;
else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
openLocal = kTRUE;
} else {
Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
Float_t avgTime = avgEventsLeftPerSlave/(fProcessed/fCumProcTime);
if (slaveTime * localPreference > avgTime)
openLocal = kTRUE;
else if ((firstNonLocalNode->GetEventsLeftPerSlave())
< slstat->GetLocalEventsLeft() * localPreference)
openLocal = kTRUE;
}
}
if (openLocal) {
file = slstat->GetFileNode()->GetNextUnAlloc();
if (!file)
file = slstat->GetFileNode()->GetNextActive();
if ( file == 0 ) {
slstat->SetFileNode(0);
}
}
}
if(file == 0 && !fForceLocal) {
file = GetNextUnAlloc();
}
if(file == 0 && !fForceLocal) {
file = GetNextActive();
}
if ( file == 0 ) return 0;
slstat->fCurFile = file;
if (file->GetNode()->GetMySlaveCnt() == 0 &&
file->GetElement()->GetFirst() == file->GetNextEntry()) {
fNEventsOnRemLoc -= file->GetElement()->GetNum();
R__ASSERT(fNEventsOnRemLoc >= 0);
}
file->GetNode()->IncExtSlaveCnt(slstat->GetName());
file->GetNode()->IncRunSlaveCnt();
if (gPerfStats != 0) {
gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
file->GetNode()->GetName(),
file->GetElement()->GetFileName(), kTRUE);
}
}
Long64_t num = CalculatePacketSize(slstat);
TDSetElement *base = file->GetElement();
Long64_t first = file->GetNextEntry();
Long64_t last = base->GetFirst() + base->GetNum();
if ( first + num >= last ) {
num = last - first;
file->SetDone();
RemoveActive(file);
} else {
file->MoveNextEntry(num);
}
slstat->fCurElem = CreateNewPacket(base, first, num);
if (base->GetEntryList())
slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
return slstat->fCurElem;
}
Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t,
Long64_t &ent, Long64_t &bytes)
{
ent = fProcessed;
bytes = fBytesRead;
if (fUseEstOpt == kEstOff)
return 0;
Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
Float_t trate = 0.;
if (fSlaveStats && fSlaveStats->GetSize() > 0) {
ent = 0;
TIter nxw(fSlaveStats);
TObject *key;
while ((key = nxw()) != 0) {
TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
if (slstat) {
Long64_t e = slstat->fProcessed;
Float_t dt = (t > slstat->fProcTime) ? t - slstat->fProcTime : 0;
Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
: slstat->GetAvgRate();
trate += rate;
e += (Long64_t) (dt * rate);
ent += e;
PDB(kPacketizer,3)
Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
slstat->fSlave->GetOrdinal(),
slstat->fProcessed, rate, dt, e);
}
}
}
PDB(kPacketizer,2)
Info("GetEstEntriesProcessed",
"estimated entries: %lld, bytes read: %lld rate: %f", ent, bytes, trate);
ent = (ent > 0) ? ent : fProcessed;
ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
bytes = (bytes > 0) ? bytes : fBytesRead;
return 0;
}
Last update: Thu Jan 17 09:00:47 2008
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.