#include "TAdaptivePacketizer.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TRandom.h"
class TAdaptivePacketizer::TFileStat : public TObject {
private:
Bool_t fIsDone;
TFileNode *fNode;
TDSetElement *fElement;
Long64_t fNextEntry;
public:
TFileStat(TFileNode *node, TDSetElement *elem);
Bool_t IsDone() const {return fIsDone;}
void SetDone() {fIsDone = kTRUE;}
TFileNode *GetNode() const {return fNode;}
TDSetElement *GetElement() const {return fElement;}
Long64_t GetNextEntry() const {return fNextEntry;}
void MoveNextEntry(Long64_t step) {fNextEntry += step;}
};
TAdaptivePacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
: fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
{
}
class TAdaptivePacketizer::TFileNode : public TObject {
private:
TString fNodeName;
TList *fFiles;
TObject *fUnAllocFileNext;
TList *fActFiles;
TObject *fActFileNext;
Int_t fMySlaveCnt;
Int_t fSlaveCnt;
Int_t fRunSlaveCnt;
Long64_t fProcessed;
Long64_t fEvents;
public:
TFileNode(const char *name);
~TFileNode() { delete fFiles; delete fActFiles; }
void IncMySlaveCnt() { fMySlaveCnt++; }
Int_t GetMySlaveCnt() const {return fMySlaveCnt;}
void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
void IncRunSlaveCnt() { fRunSlaveCnt++; }
void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
Int_t GetRunSlaveCnt() const {return fRunSlaveCnt;}
Int_t GetExtSlaveCnt() const {return fSlaveCnt;}
Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
Bool_t IsSortable() const { return kTRUE; }
Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
void IncProcessed(Long64_t nEvents)
{ fProcessed += nEvents; }
Long64_t GetProcessed() { return fProcessed; }
Long64_t GetEventsLeftPerSlave() const
{ return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
const char *GetName() const { return fNodeName.Data(); }
Long64_t GetNEvents() { return fEvents; }
void Add(TDSetElement *elem)
{
TFileStat *f = new TFileStat(this,elem);
fFiles->Add(f);
if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
}
TFileStat *GetNextUnAlloc()
{
TObject *next = fUnAllocFileNext;
if (next != 0) {
fActFiles->Add(next);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
}
return (TFileStat *) next;
}
TFileStat *GetNextActive()
{
TObject *next = fActFileNext;
if (fActFileNext != 0) {
fActFileNext = fActFiles->After(fActFileNext);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
}
return (TFileStat *) next;
}
void RemoveActive(TFileStat *file)
{
if (fActFileNext == file) fActFileNext = fActFiles->After(file);
fActFiles->Remove(file);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
}
Int_t Compare(const TObject *other) const
{
const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
R__ASSERT(obj != 0);
Long64_t diffEvents =
GetEventsLeftPerSlave() - obj->GetEventsLeftPerSlave();
Int_t myExtSlaves = GetExtSlaveCnt();
Int_t otherExtSlaves = obj->GetExtSlaveCnt();
Long64_t avEventsLeft = (GetEventsLeftPerSlave() + obj->GetEventsLeftPerSlave())/2;
if (myExtSlaves < otherExtSlaves) {
if (diffEvents < -(avEventsLeft / 3)
&& obj->GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
return 1;
else
return -1;
} else if (myExtSlaves > otherExtSlaves) {
if (diffEvents > (avEventsLeft / 3)
&& GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
return -1;
else
return 1;
} else {
Int_t myOwnSlaves = GetMySlaveCnt();
Int_t otherOwnSlaves = obj->GetMySlaveCnt();
if (myOwnSlaves < otherOwnSlaves) {
if (diffEvents < -(avEventsLeft / 3)
&& obj->GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
return 1;
else
return -1;
} else if (myOwnSlaves > otherOwnSlaves) {
if (diffEvents > (avEventsLeft / 3)
&& GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
return -1;
else
return 1;
} else {
if (diffEvents > 0) {
return -1;
} else if (diffEvents < 0) {
return 1;
} else
return 0;
}
}
}
void Print(Option_t *) const
{
cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
<< "\tMySlaveCount " << fMySlaveCnt
<< "\tSlaveCount " << fSlaveCnt << endl;
}
void Reset()
{
fUnAllocFileNext = fFiles->First();
fActFiles->Clear();
fActFileNext = 0;
fSlaveCnt = 0;
fMySlaveCnt = 0;
fRunSlaveCnt = 0;
}
};
TAdaptivePacketizer::TFileNode::TFileNode(const char *name)
: fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0), fProcessed(0), fEvents(0)
{
fFiles->SetOwner();
fActFiles->SetOwner(kFALSE);
}
class TAdaptivePacketizer::TSlaveStat : public TObject {
friend class TAdaptivePacketizer;
private:
TSlave *fSlave;
TFileNode *fFileNode;
TFileStat *fCurFile;
TDSetElement *fCurElem;
Long64_t fProcessed;
Float_t fProcTime;
Long64_t fCurProcessed;
Float_t fCurProcTime;
public:
TSlaveStat(TSlave *slave);
TFileNode *GetFileNode() const { return fFileNode; }
const char *GetName() const { return fSlave->GetName(); }
Long64_t GetEntriesProcessed() const { return fProcessed; }
TFileStat *GetCurFile() { return fCurFile; }
void SetFileNode(TFileNode *node) { fFileNode = node; }
void UpdateRates(Long64_t nEvents, Float_t time);
Float_t GetAvgRate() { return (fProcTime?fProcessed/fProcTime:0); }
Float_t GetCurRate() {
return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
Int_t GetLocalEventsLeft() {
return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
};
TAdaptivePacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
: fSlave(slave), fFileNode(0), fCurFile(0), fCurElem(0), fProcessed(0),
fProcTime(0), fCurProcessed(0), fCurProcTime(0)
{
}
void TAdaptivePacketizer::TSlaveStat::UpdateRates(Long64_t nEvents, Float_t time)
{
if (fCurFile->IsDone()) {
fCurProcTime = 0;
fCurProcessed = 0;
} else {
fCurProcTime += time;
fCurProcessed += nEvents;
}
fProcTime += time;
fProcessed += nEvents;
fCurFile->GetNode()->IncProcessed(nEvents);
}
ClassImp(TAdaptivePacketizer)
Int_t TAdaptivePacketizer::fgMaxSlaveCnt = 4;
TAdaptivePacketizer::TAdaptivePacketizer(TDSet *dset, TList *slaves,
Long64_t first, Long64_t num, TList *input)
{
PDB(kPacketizer,1) Info("TDynPacketizer", "Enter (first %lld, num %lld)", first, num);
fSlaveStats = 0;
fPackets = 0;
fSlaveStats = 0;
fUnAllocated = 0;
fActive = 0;
fFileNodes = 0;
fProgress = 0;
fProcessed = 0;
fBytesRead = 0;
fProcTime = 0;
fMaxPerfIdx = 1;
TTime tnow = gSystem->Now();
fStartTime = Long_t(tnow);
fInitTime = 0;
fProcTime = 0;
fTimeUpdt = -1.;
fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb");
TObject *obj = input->FindObject("PROOF_ProgressCircularity");
TParameter<Long_t> *par = (obj == 0) ? 0 :
dynamic_cast<TParameter<Long_t>*>(obj);
fCircN = (par == 0) ? 10 : par->GetVal();
fCircProg->SetCircular(fCircN);
obj = input->FindObject("PROOF_MaxSlavesPerNode");
par = (obj == 0) ? 0 : dynamic_cast<TParameter<Long_t>*>(obj);
fgMaxSlaveCnt = (par == 0) ? 4 : par->GetVal();
fPackets = new TList;
fPackets->SetOwner();
fFileNodes = new TList;
fFileNodes->SetOwner();
fUnAllocated = new TList;
fUnAllocated->SetOwner(kFALSE);
fActive = new TList;
fActive->SetOwner(kFALSE);
fValid = kTRUE;
dset->Lookup();
dset->Reset();
TDSetElement *e;
while ((e = (TDSetElement*)dset->Next())) {
if (e->GetValid()) continue;
TUrl url = e->GetFileName();
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4)) ) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if (node == 0) {
node = new TFileNode(host);
fFileNodes->Add(node);
}
node->Add( e );
}
fSlaveStats = new TMap;
fSlaveStats->SetOwner(kFALSE);
TSlave *slave;
TIter si(slaves);
while ((slave = (TSlave*) si.Next())) {
fSlaveStats->Add( slave, new TSlaveStat(slave) );
fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
slave->GetPerfIdx() : fMaxPerfIdx;
}
Reset();
ValidateFiles(dset, slaves);
if (!fValid) return;
Int_t files = 0;
fTotalEntries = 0;
fUnAllocated->Clear();
fActive->Clear();
fFileNodes->Clear();
PDB(kPacketizer,2) Info("","Processing Range: First %lld, Num %lld", first, num);
dset->Reset();
Long64_t cur = 0;
while (( e = (TDSetElement*)dset->Next())) {
TUrl url = e->GetFileName();
Long64_t eFirst = e->GetFirst();
Long64_t eNum = e->GetNum();
PDB(kPacketizer,2) Info("","Processing element: First %lld, Num %lld (cur %lld)", eFirst, eNum, cur);
if (!e->GetEventList()) {
if (cur + eNum < first) {
cur += eNum;
PDB(kPacketizer,2) Info("","Processing element: skip element cur %lld", cur);
continue;
}
if (num != -1 && (first+num <= cur)) {
cur += eNum;
PDB(kPacketizer,2) Info("","Processing element: drop element cur %lld", cur);
continue;
}
if (num != -1 && (first+num < cur+eNum)) {
e->SetNum( first + num - cur );
PDB(kPacketizer,2) Info("","Processing element: Adjust end %lld", first + num - cur);
}
if (cur < first) {
e->SetFirst( eFirst + (first - cur) );
e->SetNum( e->GetNum() - (first - cur) );
PDB(kPacketizer,2) Info("","Processing element: Adjust start %lld and end %lld",
eFirst + (first - cur), first + num - cur);
}
cur += eNum;
} else {
if (e->GetEventList()->GetN() == 0)
continue;
}
PDB(kPacketizer,2) Info("","Processing element: next cur %lld", cur);
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4)) ) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if ( node == 0 ) {
node = new TFileNode( host );
fFileNodes->Add( node );
}
++files;
fTotalEntries += e->GetNum();
node->Add( e );
node->IncEvents(e->GetNum());
PDB(kPacketizer,2) e->Print("a");
}
if (dset->GetEventList())
fTotalEntries = dset->GetEventList()->GetN();
PDB(kGlobal,1) Info("TDynPacketizer","Processing %lld entries in %d files on %d hosts",
fTotalEntries, files, fFileNodes->GetSize());
Reset();
Int_t noRemoteFiles = 0;
fNEventsOnRemLoc = 0;
Int_t totalNumberOfFiles = 0;
TIter next(fFileNodes);
while (TFileNode *fn = (TFileNode*)next()) {
totalNumberOfFiles += fn->GetNumberOfFiles();
if (fn->GetSlaveCnt() == 0) {
noRemoteFiles += fn->GetNumberOfFiles();
fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
}
}
fFractionOfRemoteFiles = noRemoteFiles / totalNumberOfFiles;
Printf("fraction of remote files %f", fFractionOfRemoteFiles);
if ( fValid ) {
TParameter<Int_t> *par = 0;
TObject *obj = input->FindObject("PROOF_ProgressPeriod");
Int_t period = 500;
if (obj && (par = dynamic_cast<TParameter<Int_t>*>(obj)))
period = par->GetVal();
fProgress = new TTimer;
fProgress->SetObject(this);
fProgress->Start(period, kFALSE);
}
PDB(kPacketizer,1) Info("TDynPacketizer", "Return");
}
TAdaptivePacketizer::~TAdaptivePacketizer()
{
if (fSlaveStats) {
fSlaveStats->DeleteValues();
}
SafeDelete(fPackets);
SafeDelete(fSlaveStats);
SafeDelete(fUnAllocated);
SafeDelete(fActive);
SafeDelete(fFileNodes);
SafeDelete(fProgress);
}
TAdaptivePacketizer::TFileStat *TAdaptivePacketizer::GetNextUnAlloc(TFileNode *node)
{
TFileStat *file = 0;
if (node != 0) {
file = node->GetNextUnAlloc();
if (file == 0) RemoveUnAllocNode(node);
} else {
while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
file = node->GetNextUnAlloc();
if (file == 0) RemoveUnAllocNode(node);
}
}
if (file != 0) {
if (fActive->FindObject(node) == 0) {
fActive->Add(node);
}
}
return file;
}
TAdaptivePacketizer::TFileNode *TAdaptivePacketizer::NextUnAllocNode()
{
fUnAllocated->Sort();
PDB(kPacketizer,2) {
cout << "TAdaptivePacketizer::NextUnAllocNode()" << endl;
fUnAllocated->Print();
}
TFileNode *fn = (TFileNode*) fUnAllocated->First();
if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
PDB(kPacketizer,1) Info("NextUnAllocNode",
"Reached Slaves per Node Limit (%d)", fgMaxSlaveCnt);
fn = 0;
}
return fn;
}
void TAdaptivePacketizer::RemoveUnAllocNode(TFileNode * node)
{
fUnAllocated->Remove(node);
}
TAdaptivePacketizer::TFileStat *TAdaptivePacketizer::GetNextActive()
{
TFileNode *node;
TFileStat *file = 0;
while (file == 0 && ((node = NextActiveNode()) != 0)) {
file = node->GetNextActive();
if (file == 0) RemoveActiveNode(node);
}
return file;
}
TAdaptivePacketizer::TFileNode *TAdaptivePacketizer::NextActiveNode()
{
fActive->Sort();
PDB(kPacketizer,2) {
cout << "TAdaptivePacketizer::NextActiveNode()" << endl;
fActive->Print();
}
TFileNode *fn = (TFileNode*) fActive->First();
if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
PDB(kPacketizer,1) Info("NextActiveNode","Reached Slaves per Node Limit (%d)", fgMaxSlaveCnt);
fn = 0;
}
return fn;
}
void TAdaptivePacketizer::RemoveActive(TFileStat *file)
{
TFileNode *node = file->GetNode();
node->RemoveActive(file);
if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
}
void TAdaptivePacketizer::RemoveActiveNode(TFileNode *node)
{
fActive->Remove(node);
}
void TAdaptivePacketizer::Reset()
{
fUnAllocated->Clear();
fUnAllocated->AddAll(fFileNodes);
fActive->Clear();
TIter files(fFileNodes);
TFileNode *fn;
while ((fn = (TFileNode*) files.Next()) != 0) {
fn->Reset();
}
TIter slaves(fSlaveStats);
TObject *key;
while ((key = slaves.Next()) != 0) {
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
TFileNode *fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
if (fn != 0 ) {
slstat->SetFileNode(fn);
fn->IncMySlaveCnt();
}
slstat->fCurFile = 0;
}
}
void TAdaptivePacketizer::ValidateFiles(TDSet *dset, TList *slaves)
{
TMap slaves_by_sock;
TMonitor mon;
TList workers;
workers.AddAll(slaves);
TIter si(slaves);
TSlave *slave;
while ((slave = (TSlave*)si.Next()) != 0) {
PDB(kPacketizer,3) Info("ValidateFiles","socket added to monitor: %p (%s)",
slave->GetSocket(), slave->GetName());
mon.Add(slave->GetSocket());
slaves_by_sock.Add(slave->GetSocket(),slave);
}
mon.DeActivateAll();
((TProof*)gProof)->DeActivateAsyncInput();
((TProof*)gProof)->fCurrentMonitor = &mon;
TString msg("Validating files");
UInt_t n = 0;
UInt_t tot = dset->GetListOfElements()->GetSize();
Bool_t st = kTRUE;
while (kTRUE) {
while( TSlave *s = (TSlave*)workers.First() ) {
workers.Remove(s);
TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
TFileNode *node = 0;
TFileStat *file = 0;
if ( (node = slstat->GetFileNode()) != 0 ) {
file = GetNextUnAlloc(node);
if ( file == 0 ) {
slstat->SetFileNode(0);
}
}
if (file == 0) {
file = GetNextUnAlloc();
}
if ( file != 0 ) {
RemoveActive(file);
slstat->fCurFile = file;
file->GetNode()->IncSlaveCnt(slstat->GetName());
TMessage m(kPROOF_GETENTRIES);
TDSetElement *elem = file->GetElement();
m << dset->IsTree()
<< TString(elem->GetFileName())
<< TString(elem->GetDirectory())
<< TString(elem->GetObjName());
s->GetSocket()->Send( m );
mon.Activate(s->GetSocket());
PDB(kPacketizer,2) Info("TDynPacketizer","sent to slave-%s (%s) via %p GETENTRIES on %s %s %s %s",
s->GetOrdinal(), s->GetName(), s->GetSocket(), dset->IsTree() ? "tree" : "objects",
elem->GetFileName(), elem->GetDirectory(), elem->GetObjName());
}
}
if ( mon.GetActive() == 0 ) break;
PDB(kPacketizer,3) {
Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
TList *act = mon.GetListOfActives();
TIter next(act);
while (TSocket *s = (TSocket*) next()) {
TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
if (sl)
Info("ValidateFiles", " slave-%s (%s)", sl->GetOrdinal(), sl->GetName());
}
delete act;
}
TSocket *sock = mon.Select();
mon.DeActivate(sock);
PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
TMessage *reply;
if ( sock->Recv(reply) <= 0 ) {
((TProof*)gProof)->MarkBad(slave);
fValid = kFALSE;
Error("ValidateFiles", "Recv failed! for slave-%s (%s)",
slave->GetOrdinal(), slave->GetName());
continue;
}
if ( reply->What() == kPROOF_FATAL ) {
Error("ValidateFiles", "kPROOF_FATAL from slave-%s (%s)",
slave->GetOrdinal(), slave->GetName());
((TProof*)gProof)->MarkBad(slave);
fValid = kFALSE;
continue;
} else if ( reply->What() == kPROOF_LOGFILE ) {
PDB(kPacketizer,3) Info("ValidateFiles", "got logfile");
Int_t size;
(*reply) >> size;
((TProof*)gProof)->RecvLogFile(sock, size);
mon.Activate(sock);
continue;
} else if ( reply->What() == kPROOF_LOGDONE ) {
PDB(kPacketizer,3) Info("ValidateFiles", "got logdone");
mon.Activate(sock);
continue;
} else if ( reply->What() != kPROOF_GETENTRIES ) {
Error("ValidateFiles", "unexpected message type (%d) from slave-%s (%s)",
reply->What(), slave->GetOrdinal(), slave->GetName());
((TProof*)gProof)->MarkBad(slave);
fValid = kFALSE;
continue;
}
TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
TDSetElement *e = slavestat->fCurFile->GetElement();
slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
Long64_t entries;
(*reply) >> entries;
e->SetTDSetOffset(entries);
if ( entries > 0 ) {
if (!e->GetEventList()) {
if ( e->GetFirst() > entries ) {
Error("ValidateFiles", "first (%d) higher then number of entries (%d) in %d",
e->GetFirst(), entries, e->GetFileName() );
slavestat->fCurFile->SetDone();
fValid = kFALSE;
}
if ( e->GetNum() == -1 ) {
e->SetNum( entries - e->GetFirst() );
} else if ( e->GetFirst() + e->GetNum() > entries ) {
Error("ValidateFiles",
"Num (%d) + First (%d) larger then number of keys/entries (%d) in %s",
e->GetNum(), e->GetFirst(), entries, e->GetFileName() );
e->SetNum( entries - e->GetFirst() );
}
}
n++;
gProof->SendDataSetStatus(msg, n, tot, st);
} else {
Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
if (gProofServ) {
TMessage m(kPROOF_MESSAGE);
m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
gProofServ->GetSocket()->Send(m);
}
if (dset->Remove(e) == -1)
Error("ValidateFiles", "removing of not-registered element %p failed", e);
}
workers.Add(slave);
}
((TProof*)gProof)->ActivateAsyncInput();
((TProof*)gProof)->fCurrentMonitor = 0;
if (!fValid)
return;
Long64_t offset = 0;
Long64_t newOffset = 0;
TIter next(dset->GetListOfElements());
TDSetElement *el;
while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
newOffset = offset + el->GetTDSetOffset();
el->SetTDSetOffset(offset);
offset = newOffset;
}
if (dset->GetEventList()) {
SplitEventList(dset);
}
}
void TAdaptivePacketizer::SplitEventList(TDSet *dset)
{
TEventList *mainList = dset->GetEventList();
R__ASSERT(mainList);
TIter next(dset->GetListOfElements());
TDSetElement *el, *prev;
prev = dynamic_cast<TDSetElement*> (next());
if (!prev)
return;
Long64_t low = prev->GetTDSetOffset();
Long64_t high = low;
Long64_t currPos = 0;
do {
el = dynamic_cast<TDSetElement*> (next());
if (el == 0)
high = kMaxLong64;
else
high = el->GetTDSetOffset();
#ifdef DEBUG
while (currPos < mainList->GetN() && mainList->GetEntry(currPos) < low) {
Error("SplitEventList", "event outside of the range of any of the TDSetElements");
currPos++;
}
#endif
TEventList* newEventList = new TEventList();
while (currPos < mainList->GetN() && mainList->GetEntry((Int_t)currPos) < high) {
newEventList->Enter(mainList->GetEntry((Int_t)currPos) - low);
currPos++;
}
prev->SetEventList(newEventList);
prev->SetNum(newEventList->GetN());
low = high;
prev = el;
} while (el);
}
Long64_t TAdaptivePacketizer::GetEntriesProcessed(TSlave *slave) const
{
if ( fSlaveStats == 0 ) return 0;
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
if ( slstat == 0 ) return 0;
return slstat->GetEntriesProcessed();
}
TDSetElement* TAdaptivePacketizer::CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num)
{
TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
base->GetDirectory(), first, num);
TDSetElement::FriendsList_t *friends = base->GetListOfFriends();
for (TDSetElement::FriendsList_t::iterator i = friends->begin(); i != friends->end(); ++i) {
TDSetElement* friendElem = i->first;
elem->AddFriend(new TDSetElement(friendElem->GetFileName(), friendElem->GetObjName(),
friendElem->GetDirectory(), first, num), i->second);
}
return elem;
}
Int_t TAdaptivePacketizer::CalculatePacketSize(TObject *slStatPtr)
{
TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
Long64_t num;
Int_t packetSizeAsFraction = 10;
Float_t rate = slstat->GetCurRate();
if (!rate)
rate = slstat->GetAvgRate();
if (rate) {
Float_t avgProcRate = (fProcessed/(fProcTime / fSlaveStats->GetSize()));
Float_t packetTime;
if (fProcessed < 0.8 * fTotalEntries)
packetTime = ((fTotalEntries - fProcessed)/avgProcRate)/packetSizeAsFraction;
else
packetTime = (fTotalEntries/avgProcRate)/(packetSizeAsFraction * 5);
if (packetTime < 1)
packetTime = 1;
num = (Long64_t)(rate * packetTime);
} else {
Int_t packetSize = (fTotalEntries - fProcessed)/(packetSizeAsFraction * fSlaveStats->GetSize());
num = Long64_t(packetSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
}
if (num < 1) num = 1;
return num;
}
TDSetElement *TAdaptivePacketizer::GetNextPacket(TSlave *sl, TMessage *r)
{
if ( !fValid ) {
return 0;
}
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
R__ASSERT( slstat != 0 );
if ( slstat->fCurElem != 0 ) {
Double_t latency, proctime, proccpu;
Long64_t bytesRead = -1;
Long64_t totalEntries = -1;
Long64_t numev = slstat->fCurElem->GetNum();
fPackets->Add(slstat->fCurElem);
(*r) >> latency >> proctime >> proccpu;
if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
Long64_t totev = 0;
if (r->BufferSize() > r->Length()) (*r) >> totev;
if (totev > 0)
numev = totev - slstat->fProcessed;
fProcessed += ((numev > 0) ? numev : 0);
fBytesRead += ((bytesRead > 0) ? bytesRead : 0);
slstat->UpdateRates(numev, proctime);
fProcTime += proctime;
PDB(kPacketizer,2)
Info("GetNextPacket","slave-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
sl->GetOrdinal(), sl->GetName(),
numev, latency, proctime, proccpu, bytesRead);
if (gPerfStats != 0) {
gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
numev, latency, proctime, proccpu, bytesRead);
}
slstat->fCurElem = 0;
if ( fProcessed == fTotalEntries ) {
HandleTimer(0);
delete fProgress; fProgress = 0;
}
}
if ( fStop ) {
HandleTimer(0);
return 0;
}
TFileStat *file = slstat->fCurFile;
if ( file != 0 && file->IsDone() ) {
file->GetNode()->DecSlaveCnt(slstat->GetName());
file->GetNode()->DecRunSlaveCnt();
if (gPerfStats != 0) {
gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
file->GetElement()->GetFileName(), kFALSE);
}
file = 0;
}
Long64_t avgEventsLeftPerSlave =
(fTotalEntries - fProcessed) / fSlaveStats->GetSize();
if (fTotalEntries == fProcessed)
return 0;
if ( file == 0) {
Bool_t openLocal;
Float_t localPreference = 1.5 - (fNEventsOnRemLoc / (fTotalEntries - fProcessed));
if ( slstat->GetFileNode() != 0 ) {
fUnAllocated->Sort();
TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
Bool_t nonLocalNodePossible =
firstNonLocalNode?(firstNonLocalNode->GetSlaveCnt() < fgMaxSlaveCnt):0;
openLocal = !nonLocalNodePossible;
Float_t slaveRate = slstat->GetAvgRate();
if ( nonLocalNodePossible ) {
if ( slaveRate == 0 ) {
if ( slstat->GetLocalEventsLeft() * localPreference > (avgEventsLeftPerSlave))
openLocal = kTRUE;
else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
< slstat->GetLocalEventsLeft() * localPreference )
openLocal = kTRUE;
else if (firstNonLocalNode->GetExtSlaveCnt() > 1)
openLocal = kTRUE;
} else {
Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
Float_t avgTime = avgEventsLeftPerSlave/(fProcessed/fProcTime);
if (slaveTime * localPreference > avgTime)
openLocal = kTRUE;
else if ((firstNonLocalNode->GetEventsLeftPerSlave())
< slstat->GetLocalEventsLeft() * localPreference)
openLocal = kTRUE;
}
}
if (openLocal) {
file = slstat->GetFileNode()->GetNextUnAlloc();
if (!file)
file = slstat->GetFileNode()->GetNextActive();
if ( file == 0 ) {
slstat->SetFileNode(0);
}
}
}
if(file == 0) {
file = GetNextUnAlloc();
}
if(file == 0) {
file = GetNextActive();
}
if ( file == 0 ) return 0;
slstat->fCurFile = file;
if (file->GetNode()->GetMySlaveCnt() == 0 &&
file->GetElement()->GetFirst() == file->GetNextEntry()) {
fNEventsOnRemLoc -= file->GetElement()->GetNum();
R__ASSERT(fNEventsOnRemLoc >= 0);
}
file->GetNode()->IncSlaveCnt(slstat->GetName());
file->GetNode()->IncRunSlaveCnt();
if (gPerfStats != 0) {
gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
file->GetNode()->GetName(),
file->GetElement()->GetFileName(), kTRUE);
}
}
Long64_t num = CalculatePacketSize(slstat);
TDSetElement *base = file->GetElement();
Long64_t first = file->GetNextEntry();
Long64_t last = base->GetFirst() + base->GetNum();
if ( first + num >= last ) {
num = last - first;
file->SetDone();
RemoveActive(file);
} else {
file->MoveNextEntry(num);
}
slstat->fCurElem = CreateNewPacket(base, first, num);
if (base->GetEventList()) {
TEventList *evl = new TEventList();
for (; num > 0; num--, first++)
evl->Enter(base->GetEventList()->GetEntry((Int_t)first));
slstat->fCurElem->SetEventList(evl);
}
return slstat->fCurElem;
}
Bool_t TAdaptivePacketizer::HandleTimer(TTimer *)
{
if (fProgress == 0) return kFALSE;
TMessage m(kPROOF_PROGRESS);
if (gProofServ->GetProtocol() > 11) {
TTime tnow = gSystem->Now();
Float_t now = (Float_t) (Long_t(tnow) - fStartTime) / (Double_t)1000.;
Double_t evts = (Double_t) fProcessed;
Double_t mbs = (fBytesRead > 0) ? fBytesRead / TMath::Power(2.,20.) : 0.;
Float_t evtrti = -1., mbrti = -1.;
if (evts <= 0) {
fInitTime = now;
} else {
if (fCircProg->GetEntries() <= 0) {
fCircProg->Fill((Double_t)0., 0., 0.);
fInitTime = (now + fInitTime) / 2.;
}
fTimeUpdt = now - fProcTime;
fProcTime = now - fInitTime;
fCircProg->Fill((Double_t)fProcTime, evts, mbs);
if (fCircProg->GetEntries() > 4) {
Double_t *ar = fCircProg->GetArgs();
fCircProg->GetEntry(0);
Double_t dt = (Double_t)fProcTime - ar[0];
evtrti = (dt > 0) ? (Float_t) (evts - ar[1]) / dt : -1. ;
mbrti = (dt > 0) ? (Float_t) (mbs - ar[2]) / dt : -1. ;
if (gPerfStats != 0)
gPerfStats->RateEvent((Double_t)fProcTime, dt,
(Long64_t) (evts - ar[1]),
(Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)));
}
}
m << fTotalEntries << fProcessed << fBytesRead << fInitTime << fProcTime
<< evtrti << mbrti;
} else {
m << fTotalEntries << fProcessed;
}
gProofServ->GetSocket()->Send(m);
return kFALSE;
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.