#include "TProofNodes.h"
#include "TProof.h"
#include "TList.h"
#include "TMap.h"
#include "TObjString.h"
ClassImp(TProofNodes)
TProofNodes::TProofNodes(TProof* proof)
: fProof(proof), fNodes(0), fActiveNodes(0),
fMaxWrksNode(-1), fMinWrksNode(-1),
fNNodes(0), fNWrks(0), fNActiveWrks(0), fNCores(0)
{
Build();
}
TProofNodes::~TProofNodes()
{
if (fNodes) {
fNodes->SetOwner(kTRUE);
SafeDelete(fNodes);
}
}
void TProofNodes::Build()
{
if (!fProof || !fProof->IsValid()) {
Warning("Build", "the PROOF instance is undefined or invalid! Cannot continue");
return;
}
if (fNodes){
fNodes->SetOwner(kTRUE);
SafeDelete(fNodes);
}
fNodes = new TMap;
fNodes->SetOwner(kTRUE);
TList *slaves = fProof->GetListOfSlaveInfos();
TIter nxtslave(slaves);
TSlaveInfo *si = 0;
TList *node = 0;
TPair *pair = 0;
while ((si = (TSlaveInfo *)(nxtslave()))) {
TSlaveInfo *si_copy = (TSlaveInfo *)(si->Clone());
if (!(pair = (TPair *) fNodes->FindObject(si->GetName()))) {
node = new TList;
node->SetOwner(kTRUE);
node->SetName(si_copy->GetName());
node->Add(si_copy);
fNodes->Add(new TObjString(si->GetName()), node);
} else {
node = (TList *) pair->Value();
node->Add(si_copy);
}
}
if (fActiveNodes){
fActiveNodes->SetOwner(kTRUE);
SafeDelete(fActiveNodes);
}
fActiveNodes = new TMap;
fActiveNodes->SetOwner(kTRUE);
TList *actnode = 0;
fMaxWrksNode = -1;
fMinWrksNode = -1;
fNNodes = 0;
fNWrks = 0;
fNActiveWrks = 0;
TIter nxk(fNodes);
TObject *key = 0;
while ((key = nxk()) != 0) {
node = dynamic_cast<TList *>(fNodes->GetValue(key));
if (node) {
fNNodes++;
si = (TSlaveInfo *) node->First();
fNCores += si->fSysInfo.fCpus;
fNWrks += node->GetSize();
if (fMinWrksNode == -1 || (node->GetSize() < fMinWrksNode)) {
fMinWrksNode = node->GetSize();
}
if (fMaxWrksNode == -1 || (node->GetSize() > fMaxWrksNode)) {
fMaxWrksNode = node->GetSize();
}
TIter nxw(node);
while ((si = (TSlaveInfo *) nxw())) {
if (si->fStatus == TSlaveInfo::kActive) {
fNActiveWrks++;
TSlaveInfo *si_copy = (TSlaveInfo *)(si->Clone());
actnode = dynamic_cast<TList *>(fActiveNodes->GetValue(key));
if (actnode) {
actnode->Add(si_copy);
} else {
actnode = new TList;
actnode->SetOwner(kTRUE);
actnode->SetName(si_copy->GetName());
actnode->Add(si_copy);
fActiveNodes->Add(new TObjString(si->GetName()), actnode);
}
}
}
} else {
Warning("Build", "could not get list for node '%s'", key->GetName());
}
}
return;
}
Int_t TProofNodes::ActivateWorkers(Int_t nwrks)
{
Int_t nw = fProof->SetParallel(nwrks);
if (nw > 0) {
if (nw != nwrks)
Warning("ActivateWorkers", "requested %d got %d", nwrks, nw);
Build();
}
return nw;
}
Int_t TProofNodes::ActivateWorkers(const char *workers)
{
TString toactivate;
TString todeactivate;
Bool_t protocol33 = kTRUE;
if (fProof->GetRemoteProtocol() < 33 || fProof->GetClientProtocol() < 33) {
protocol33 = kFALSE;
fProof->SetParallel(0);
}
Build();
TString sworkers = TString(workers).Strip(TString::kTrailing, 'x');
if (!sworkers.IsDigit()) {
Error("ActivateWorkers", "wrongly formatted argument: %s - cannot continue", workers);
return -1;
}
Int_t nworkersnode = sworkers.Atoi();
Int_t ret = nworkersnode;
TSlaveInfo *si = 0;
TList *node = 0;
TObject *key = 0;
TIter nxk(fNodes);
while ((key = nxk()) != 0) {
if ((node = dynamic_cast<TList *>(fNodes->GetValue(key)))) {
TIter nxtworker(node);
Int_t nactiveworkers = 0;
while ((si = (TSlaveInfo *)(nxtworker()))) {
if (nactiveworkers < nworkersnode) {
if (si->fStatus == TSlaveInfo::kNotActive) {
if (protocol33) {
toactivate += TString::Format("%s,", si->GetOrdinal());
} else {
fProof->ActivateWorker(si->GetOrdinal());
}
}
nactiveworkers++;
} else {
if (si->fStatus == TSlaveInfo::kActive) {
if (protocol33) {
todeactivate += TString::Format("%s,", si->GetOrdinal());
} else {
fProof->DeactivateWorker(si->GetOrdinal());
}
}
}
}
} else {
Warning("ActivateWorkers", "could not get list for node '%s'", key->GetName());
}
}
if (!todeactivate.IsNull()) {
todeactivate.Remove(TString::kTrailing, ',');
if (fProof->DeactivateWorker(todeactivate) < 0) ret = -1;
}
if (!toactivate.IsNull()) {
toactivate.Remove(TString::kTrailing, ',');
if (fProof->ActivateWorker(toactivate) < 0) ret = -1;
}
if (ret < 0) {
Warning("ActivateWorkers", "could not get the requested number of workers per node (%d)",
nworkersnode);
return ret;
}
Build();
TIter nxkn(fNodes);
while ((key = nxkn()) != 0) {
if ((node = dynamic_cast<TList *>(fNodes->GetValue(key)))) {
TIter nxtworker(node);
Int_t nactiveworkers = 0;
while ((si = (TSlaveInfo *)(nxtworker()))) {
if (si->fStatus == TSlaveInfo::kActive) nactiveworkers++;
}
if (nactiveworkers != nworkersnode) {
Warning("ActivateWorkers", "only %d (out of %d requested) workers "
"were activated on node %s",
nactiveworkers, nworkersnode, node->GetName());
ret = -1;
}
} else {
Warning("ActivateWorkers", "could not get list for node '%s'", key->GetName());
}
}
return ret;
}
void TProofNodes::Print(Option_t* option) const
{
TIter nxk(fNodes);
TObject *key = 0;
while ((key = nxk()) != 0) {
TList *node = dynamic_cast<TList *>(fNodes->GetValue(key));
if (node) {
node->Print(option);
} else {
Warning("Print", "could not get list for node '%s'", key->GetName());
}
}
}