#include "TProofSuperMaster.h"
#include "TString.h"
#include "TObjString.h"
#include "TError.h"
#include "TList.h"
#include "TSortedList.h"
#include "TSlave.h"
#include "TMap.h"
#include "TProofServ.h"
#include "TSocket.h"
#include "TMonitor.h"
#include "TSemaphore.h"
#include "TDSet.h"
#include "TPluginManager.h"
#include "TVirtualProofPlayer.h"
#include "TMessage.h"
#include "TUrl.h"
#include "TProofResourcesStatic.h"
#include "TProofNodeInfo.h"
ClassImp(TProofSuperMaster)
TProofSuperMaster::TProofSuperMaster(const char *masterurl, const char *conffile,
const char *confdir, Int_t loglevel,
const char *alias, TProofMgr *mgr)
{
fManager = mgr;
fUrl = TUrl(masterurl);
if (!conffile || strlen(conffile) == 0)
conffile = kPROOF_ConfFile;
else if (!strncasecmp(conffile, "sm:", 3))
conffile+=3;
if (!confdir || strlen(confdir) == 0)
confdir = kPROOF_ConfDir;
Init(masterurl, conffile, confdir, loglevel, alias);
}
Bool_t TProofSuperMaster::StartSlaves(Bool_t parallel, Bool_t)
{
Int_t pc = 0;
TList *submasterList = new TList;
if (gProofServ->GetWorkers(submasterList, pc) == TProofServ::kQueryStop) {
Error("StartSlaves", "getting list of submaster nodes");
return kFALSE;
}
fImage = gProofServ->GetImage();
if (fImage.IsNull())
fImage = Form("%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(),
gProofServ->GetWorkDir());
UInt_t nSubmasters = submasterList->GetSize();
UInt_t nSubmastersDone = 0;
Int_t ord = 0;
TList validSubmasters;
TList validPairs;
validPairs.SetOwner();
std::vector<TProofThread *> thrHandlers;
if (parallel) {
thrHandlers.reserve(nSubmasters);
if (thrHandlers.max_size() < nSubmasters) {
PDB(kGlobal,1)
Info("StartSlaves","cannot reserve enough space thread"
" handlers - switch to serial startup");
parallel = kFALSE;
}
}
TListIter next(submasterList);
TObject *to;
TProofNodeInfo *submaster;
while ((to = next())) {
submaster = (TProofNodeInfo *)to;
const Char_t *conffile = submaster->GetConfig();
const Char_t *image = submaster->GetImage();
const Char_t *msd = submaster->GetMsd();
Int_t sport = submaster->GetPort();
if (sport == -1)
sport = fUrl.GetPort();
TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
if (parallel) {
TProofThreadArg *ta =
new TProofThreadArg(submaster->GetNodeName().Data(), sport,
fullord, image, conffile, msd, fSlaves, this);
if (ta) {
ta->fType = TSlave::kMaster;
TThread *th = new TThread(SlaveStartupThread,ta);
if (!th) {
Info("StartSlaves","Can't create startup thread:"
" out of system resources");
SafeDelete(ta);
} else {
thrHandlers.push_back(new TProofThread(th,ta));
th->Run();
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to submasters") << nSubmasters
<< nSubmastersDone << kTRUE;
gProofServ->GetSocket()->Send(m);
}
} else {
Info("StartSlaves","Can't create thread arguments object:"
" out of system resources");
}
}
else {
TUrl u(Form("%s:%d", submaster->GetNodeName().Data(), sport));
if (strlen(gProofServ->GetGroup()) > 0) {
if (strlen(u.GetUser()) <= 0)
u.SetUser(gProofServ->GetUser());
u.SetPasswd(gProofServ->GetGroup());
}
TSlave *slave =
CreateSubmaster(u.GetUrl(), fullord, image, msd);
Bool_t submasterOk = kTRUE;
fSlaves->Add(slave);
if (slave->IsValid()) {
validPairs.Add(new TPair(slave, new TObjString(conffile)));
} else {
submasterOk = kFALSE;
fBadSlaves->Add(slave);
}
PDB(kGlobal,3)
Info("StartSlaves","submaster on host %s created and"
" added to list", submaster->GetNodeName().Data());
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to submasters") << nSubmasters
<< nSubmastersDone << submasterOk;
gProofServ->GetSocket()->Send(m);
}
ord++;
}
SafeDelete(submasterList);
nSubmastersDone = 0;
if (parallel) {
std::vector<TProofThread *>::iterator i;
for (i = thrHandlers.begin(); i != thrHandlers.end(); ++i) {
TProofThread *pt = *i;
if (pt && pt->fThread->GetState() == TThread::kRunningState) {
PDB(kGlobal,3)
Info("StartSlaves",
"parallel startup: waiting for submaster %s (%s:%d)",
pt->fArgs->fOrd.Data(), pt->fArgs->fUrl->GetHost(),
pt->fArgs->fUrl->GetPort());
pt->fThread->Join();
}
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up submasters") << nSubmasters
<< nSubmastersDone << kTRUE;
gProofServ->GetSocket()->Send(m);
}
TIter nxw(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *)nxw())) {
if (sl->IsValid()) {
if (fProtocol == 1) {
Error("StartSlaves", "master and submaster protocols"
" not compatible (%d and %d)",
kPROOF_Protocol, fProtocol);
fBadSlaves->Add(sl);
} else {
fAllMonitor->Add(sl->GetSocket());
validSubmasters.Add(sl);
}
} else {
fBadSlaves->Add(sl);
}
}
while (!thrHandlers.empty()) {
i = thrHandlers.end()-1;
if (*i) {
SafeDelete(*i);
thrHandlers.erase(i);
}
}
}
else {
TIter nxsc(&validPairs);
TPair *sc = 0;
while ((sc = (TPair *) nxsc())) {
TSlave *sl = (TSlave *) sc->Key();
TObjString *cf = (TObjString *) sc->Value();
sl->SetupServ(TSlave::kMaster, cf->GetName());
Bool_t submasterOk = kTRUE;
if (sl->IsValid()) {
if (fProtocol == 1) {
Error("StartSlaves", "master and submaster protocols"
" not compatible (%d and %d)",
kPROOF_Protocol, fProtocol);
submasterOk = kFALSE;
fBadSlaves->Add(sl);
} else {
fAllMonitor->Add(sl->GetSocket());
validSubmasters.Add(sl);
}
} else {
submasterOk = kFALSE;
fBadSlaves->Add(sl);
}
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up submasters") << nSubmasters
<< nSubmastersDone << submasterOk;
gProofServ->GetSocket()->Send(m);
}
}
Collect(kAll);
TIter nextSubmaster(&validSubmasters);
while (TSlave* sl = dynamic_cast<TSlave*>(nextSubmaster())) {
if (sl->GetStatus() == -99) {
Error("StartSlaves", "not allowed to connect to PROOF master server");
fBadSlaves->Add(sl);
continue;
}
if (!sl->IsValid()) {
Error("StartSlaves", "failed to setup connection with PROOF master server");
fBadSlaves->Add(sl);
continue;
}
}
return kTRUE;
}
Long64_t TProofSuperMaster::Process(TDSet *set, const char *selector, Option_t *option,
Long64_t nentries, Long64_t first)
{
if (!IsValid()) return -1;
R__ASSERT(GetPlayer());
if (GetProgressDialog())
GetProgressDialog()->ExecPlugin(5, this, selector, set->GetListOfElements()->GetSize(),
first, nentries);
return GetPlayer()->Process(set, selector, option, nentries, first);
}
void TProofSuperMaster::ValidateDSet(TDSet *dset)
{
if (dset->ElementsValid()) return;
TList msds;
msds.SetOwner();
TList smholder;
smholder.SetOwner();
TList elemholder;
elemholder.SetOwner();
TIter nextSubmaster(GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextSubmaster())) {
TList *smlist = 0;
TPair *p = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
if (!p) {
smlist = new TList;
smlist->SetName(sl->GetMsd());
smholder.Add(smlist);
TList *elemlist = new TSortedList(kSortDescending);
elemlist->SetName(TString(sl->GetMsd())+"_elem");
elemholder.Add(elemlist);
msds.Add(new TPair(smlist, elemlist));
} else {
smlist = dynamic_cast<TList*>(p->Key());
}
smlist->Add(sl);
}
TIter nextElem(dset->GetListOfElements());
while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
if (elem->GetValid()) continue;
TPair *p = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
if (p) {
dynamic_cast<TList*>(p->Value())->Add(elem);
} else {
Error("ValidateDSet", "no mass storage domain '%s' associated"
" with available submasters",
elem->GetMsd());
return;
}
}
TList usedsms;
TIter nextSM(&msds);
SetDSet(dset);
while (TPair *msd = dynamic_cast<TPair*>(nextSM())) {
TList *sms = dynamic_cast<TList*>(msd->Key());
TList *setelements = dynamic_cast<TList*>(msd->Value());
Int_t nsms = sms->GetSize();
Int_t nelements = setelements->GetSize();
for (Int_t i=0; i<nsms; i++) {
TDSet set(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
for (Int_t j = (i*nelements)/nsms;
j < ((i+1)*nelements)/nsms;
j++) {
TDSetElement *elem =
dynamic_cast<TDSetElement*>(setelements->At(j));
set.Add(elem->GetFileName(), elem->GetObjName(),
elem->GetDirectory(), elem->GetFirst(),
elem->GetNum(), elem->GetMsd());
}
if (set.GetListOfElements()->GetSize()>0) {
TMessage mesg(kPROOF_VALIDATE_DSET);
mesg << &set;
TSlave *sl = dynamic_cast<TSlave*>(sms->At(i));
PDB(kGlobal,1)
Info("ValidateDSet",
"Sending TDSet with %d elements to worker %s"
" to be validated", set.GetListOfElements()->GetSize(),
sl->GetOrdinal());
sl->GetSocket()->Send(mesg);
usedsms.Add(sl);
}
}
}
PDB(kGlobal,1)
Info("ValidateDSet","Calling Collect");
Collect(&usedsms);
SetDSet(0);
}
TVirtualProofPlayer *TProofSuperMaster::MakePlayer(const char *player, TSocket *s)
{
if (!player)
player = "sm";
SetPlayer(TVirtualProofPlayer::Create(player, this, s));
return GetPlayer();
}
Last change: Wed Jun 25 08:51:32 2008
Last generated: 2008-06-25 08:51
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.