#include "TProofCondor.h"
#include "TCondor.h"
#include "TList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TProofNodeInfo.h"
#include "TProofResourcesStatic.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TString.h"
#include "TTimer.h"
ClassImp(TProofCondor)
TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
const char *confdir, Int_t loglevel,
const char *, TProofMgr *mgr)
: fCondor(0), fTimer(0)
{
InitMembers();
fManager = mgr;
fUrl = TUrl(masterurl);
if (!conffile || !conffile[0]) {
conffile = kPROOF_ConfFile;
} else if (!strncasecmp(conffile, "condor:", 7)) {
conffile+=7;
}
if (!confdir || !confdir[0]) {
confdir = kPROOF_ConfDir;
}
Init(masterurl, conffile, confdir, loglevel);
}
TProofCondor::~TProofCondor()
{
SafeDelete(fCondor);
SafeDelete(fTimer);
}
Bool_t TProofCondor::StartSlaves(Bool_t)
{
fCondor = new TCondor;
TString jobad = GetJobAd();
fImage = fCondor->GetImage(gSystem->HostName());
if (fImage.Length() == 0) {
Error("StartSlaves", "Empty Condor image found for system %s",
gSystem->HostName());
return kFALSE;
}
TList claims;
if (fConfFile.IsNull()) {
TList *condorclaims = fCondor->Claim(9999, jobad);
TIter nextclaim(condorclaims);
while (TObject *o = nextclaim()) claims.Add(o);
} else {
TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
fConfFile = resources->GetFileName();
PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
TList *workerList = resources->GetWorkers();
if (workerList->GetSize() == 0) {
Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
return kFALSE;
}
Int_t ord = 0;
TListIter next(workerList);
TObject *to;
TProofNodeInfo *worker;
int nSlavesDone = 0;
while ((to = next())) {
worker = (TProofNodeInfo *)to;
const Char_t *image = worker->GetImage().Data();
const Char_t *workdir = worker->GetWorkDir().Data();
Int_t perfidx = worker->GetPerfIndex();
gSystem->Sleep(10 );
TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
if (csl) {
csl->fPerfIdx = perfidx;
csl->fImage = image;
csl->fWorkDir = gSystem->ExpandPathName(workdir);
TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
csl->fOrdinal = fullord.Data();
claims.Add(csl);
ord++;
}
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Creating COD Claim") << workerList->GetSize()
<< nSlavesDone << (csl != 0);
gProofServ->GetSocket()->Send(m);
}
delete resources;
resources = 0;
}
Long_t delay = 500;
Int_t ntries = 20;
Int_t trial = 1;
Int_t idx = 0;
int nClaims = claims.GetSize();
int nClaimsDone = 0;
while (claims.GetSize() > 0) {
TCondorSlave* c = 0;
if (trial == 1) {
c = dynamic_cast<TCondorSlave*>(claims.At(idx));
} else {
TPair *p = dynamic_cast<TPair*>(claims.At(idx));
if (p) {
TTimer *t = dynamic_cast<TTimer*>(p->Value());
if (t) {
Long64_t wait = t->GetAbsTime()-gSystem->Now();
if (wait > 0) gSystem->Sleep((UInt_t)wait);
c = dynamic_cast<TCondorSlave*>(p->Key());
}
}
}
TSlave *slave = 0;
if (c) slave = CreateSlave(Form("%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
c->fPerfIdx, c->fImage, c->fWorkDir);
if (trial < ntries) {
if (slave && slave->IsValid()) {
fSlaves->Add(slave);
if (trial == 1) {
claims.Remove(c);
} else {
TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
if (p) {
TTimer *xt = dynamic_cast<TTimer*>(p->Value());
if (xt) delete xt;
delete p;
}
}
nClaimsDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to workers") << nClaims
<< nClaimsDone << kTRUE;
gProofServ->GetSocket()->Send(m);
} else if (slave) {
if (trial == 1) {
TTimer* timer = new TTimer(delay);
TPair *p = new TPair(c, timer);
claims.RemoveAt(idx);
claims.AddAt(p, idx);
} else {
TPair *p = dynamic_cast<TPair*>(claims.At(idx));
if (p && p->Value()) {
TTimer *xt = dynamic_cast<TTimer*>(p->Value());
if (xt) xt->Reset();
}
}
delete slave;
idx++;
} else {
Warning("StartSlaves", "could not create TSlave object!");
}
} else {
if (slave) {
fSlaves->Add(slave);
TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
if (p && p->Value()) {
TTimer *xt = dynamic_cast<TTimer*>(p->Value());
delete xt;
}
if (p) delete p;
nClaimsDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to workers") << nClaims
<< nClaimsDone << slave->IsValid();
gProofServ->GetSocket()->Send(m);
} else {
Warning("StartSlaves", "could not create TSlave object!");
}
}
if (idx>=claims.GetSize()) {
trial++;
idx = 0;
}
}
TIter nxsl(fSlaves);
TSlave *sl = 0;
int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
while ((sl = (TSlave *) nxsl())) {
if (sl->IsValid()) {
sl->SetupServ(TSlave::kSlave, 0);
}
if (sl->IsValid()) {
fAllMonitor->Add(sl->GetSocket());
} else {
fBadSlaves->Add(sl);
}
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
Bool_t wrkvalid = sl->IsValid() ? kTRUE : kFALSE;
m << TString("Setting up worker servers") << nSlavesTotal
<< nSlavesDone << wrkvalid;
gProofServ->GetSocket()->Send(m);
}
return kTRUE;
}
void TProofCondor::SetActive(Bool_t active)
{
if (fTimer == 0) {
fTimer = new TTimer();
}
if (active) {
PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
fTimer->Stop();
if (fCondor->GetState() == TCondor::kSuspended)
fCondor->Resume();
} else {
#if 1
return;
#else
Int_t delay = 60000;
PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
delay, delay + Long64_t(gSystem->Now()));
fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
fTimer->Start(10000, kTRUE);
#endif
}
}
TString TProofCondor::GetJobAd()
{
TString ad;
ad = "JobUniverse = 5\n";
ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
ad += Form("Iwd = \"%s\"\n", gSystem->TempDirectory());
ad += "In = \"/dev/null\"\n";
ad += Form("Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
ad += Form("Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
return ad;
}