#include "TProofCondor.h"
#include "TCondor.h"
#include "TList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TProofNodeInfo.h"
#include "TProofResourcesStatic.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TString.h"
#include "TTimer.h"
ClassImp(TProofCondor)
TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
                           const char *confdir, Int_t loglevel)
  : fCondor(0), fTimer(0)
{
   
   fUrl = TUrl(masterurl);
   if (!conffile || strlen(conffile) == 0) {
      conffile = kPROOF_ConfFile;
   } else if (!strncasecmp(conffile, "condor:", 7)) {
      conffile+=7;
   }
   if (!confdir  || strlen(confdir) == 0) {
      confdir = kPROOF_ConfDir;
   }
   Init(masterurl, conffile, confdir, loglevel);
}
TProofCondor::~TProofCondor()
{
   
   SafeDelete(fCondor);
   SafeDelete(fTimer);
}
Bool_t TProofCondor::StartSlaves(Bool_t parallel, Bool_t)
{
   
   fCondor = new TCondor;
   TString jobad = GetJobAd();
   fImage = fCondor->GetImage(gSystem->HostName());
   if (fImage.Length() == 0) {
      Error("StartSlaves", "Empty Condor image found for system %s",
            gSystem->HostName());
      return kFALSE;
   }
   TList claims;
   if (fConfFile.IsNull()) {
      
      TList *condorclaims = fCondor->Claim(9999, jobad);
      TIter nextclaim(condorclaims);
      while (TObject *o = nextclaim()) claims.Add(o);
   } else {
      
      TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
      fConfFile = resources->GetFileName(); 
      PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
      
      TList *workerList = resources->GetWorkers();
      if (workerList->GetSize() == 0) {
         Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
         return kFALSE;
      }
      
      Int_t ord = 0;
      
      TListIter next(workerList);
      TObject *to;
      TProofNodeInfo *worker;
      int nSlavesDone = 0;
      while ((to = next())) {
         
         worker = (TProofNodeInfo *)to;
         
         const Char_t *image = worker->GetImage().Data();
         const Char_t *workdir = worker->GetWorkDir().Data();
         Int_t perfidx = worker->GetPerfIndex();
         gSystem->Sleep(10 );
         TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
         if (csl) {
            csl->fPerfIdx = perfidx;
            csl->fImage = image;
            csl->fWorkDir = gSystem->ExpandPathName(workdir);
            TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
            csl->fOrdinal = fullord.Data();
            claims.Add(csl);
            ord++;
         }
                 
         
         nSlavesDone++;
         TMessage m(kPROOF_SERVERSTARTED);
         m << TString("Creating COD Claim") << workerList->GetSize()
         << nSlavesDone << (csl != 0);
         gProofServ->GetSocket()->Send(m);
      } 
      
      delete resources;
      resources = 0;
   } 
   Long_t delay = 500; 
   Int_t ntries = 20; 
   Int_t trial = 1;
   Int_t idx = 0;
   
   std::vector<TProofThread *> thrHandlers;
   TIter *nextsl = 0, *nextclaim = 0;
   TList *startedsl = 0;
   UInt_t nSlaves = 0;
   TTimer *ptimer = 0;
   if (parallel) {
      nSlaves = claims.GetSize();
      thrHandlers.reserve(nSlaves);
      if (thrHandlers.max_size() >= nSlaves) {
         startedsl = new TList();
         nextsl = new TIter(startedsl);
         nextclaim = new TIter(&claims);
      } else {
         PDB(kGlobal,1)
            Info("StartSlaves","cannot reserve enough space thread"
                 " handlers - switch to serial startup");
         parallel = kFALSE;
      }
   }
   int nClaims = claims.GetSize();
   int nClaimsDone = 0;
   while (claims.GetSize() > 0) {
      TCondorSlave* c = 0;
      if (parallel) {
         
         startedsl->RemoveAll();
         nextclaim->Reset();
         while ((c = (TCondorSlave *)(*nextclaim)())) {
            
            TProofThreadArg *ta = new TProofThreadArg(c, &claims, startedsl, this);
            if (ta) {
               
               TThread *th = new TThread(SlaveStartupThread,ta);
               if (!th) {
                  Info("StartSlaves","can't create startup thread:"
                       " out of system resources");
                  SafeDelete(ta);
               } else {
                  
                  thrHandlers.push_back(new TProofThread(th, ta));
                  
                  th->Run();
               }
            } else {
               Info("StartSlaves","can't create thread arguments object:"
                    " out of system resources");
            }
         }
         
         if (trial == 1) {
            ptimer = new TTimer(delay);
         } else {
            ptimer->Reset();
         }
         
         std::vector<TProofThread *>::iterator i;
         for (i = thrHandlers.begin(); i != thrHandlers.end(); ++i) {
            TProofThread *pt = *i;
            
            if (pt && pt->fThread->GetState() == TThread::kRunningState) {
               Info("Init",
                    "parallel startup: waiting for slave %s (%s:%d)",
                    pt->fArgs->fOrd.Data(), pt->fArgs->fUrl->GetHost(),
                    pt->fArgs->fUrl->GetPort());
               pt->fThread->Join();
            }
         }
         
         nextsl->Reset();
         TSlave *sl = 0;
         while ((sl = (TSlave *)(*nextsl)())) {
            if (sl->IsValid()) {
               fSlaves->Add(sl);
               fAllMonitor->Add(sl->GetSocket());
               startedsl->Remove(sl);
            }
         }
         
         
         
         if (claims.GetSize() > 0) {
            if (trial < ntries) {
               
               nextsl->Reset();
               while ((sl = (TSlave *)(*nextsl)())) {
                  SafeDelete(sl);
               }
            } else {
               
               nextsl->Reset();
               while ((sl = (TSlave *)(*nextsl)())) {
                  fBadSlaves->Add(sl);
               }
               
               claims.RemoveAll();
            }
         }
         
         trial++;
         
         while (!thrHandlers.empty()) {
            std::vector<TProofThread *>::iterator i = thrHandlers.end()-1;
            if (*i) {
               SafeDelete(*i);
               thrHandlers.erase(i);
            }
         }
      } else { 
         
         if (trial == 1) {
            c = dynamic_cast<TCondorSlave*>(claims.At(idx));
         } else {
            TPair *p = dynamic_cast<TPair*>(claims.At(idx));
            TTimer *t = dynamic_cast<TTimer*>(p->Value());
            
            Long_t wait = (Long_t) (t->GetAbsTime()-gSystem->Now());
            if (wait>0) gSystem->Sleep(wait);
            c = dynamic_cast<TCondorSlave*>(p->Key());
         }
         
         TSlave *slave = CreateSlave(Form("%s:d",c->fHostname.Data(), c->fPort), c->fOrdinal,
                                     c->fPerfIdx, c->fImage, c->fWorkDir);
         
         if (trial<ntries) {
            if (slave->IsValid()) {
               fSlaves->Add(slave);
               if (trial == 1) {
                  claims.Remove(c);
               } else {
                  TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
                  delete dynamic_cast<TTimer*>(p->Value());
                  delete p;
               }
               nClaimsDone++;
               TMessage m(kPROOF_SERVERSTARTED);
               m << TString("Opening connections to workers") << nClaims
                 << nClaimsDone << kTRUE;
               gProofServ->GetSocket()->Send(m);
            } else {
               if (trial == 1) {
                  TTimer* timer = new TTimer(delay);
                  TPair *p = new TPair(c, timer);
                  claims.RemoveAt(idx);
                  claims.AddAt(p, idx);
               } else {
                  TPair *p = dynamic_cast<TPair*>(claims.At(idx));
                  dynamic_cast<TTimer*>(p->Value())->Reset();
               }
               delete slave;
               idx++;
            }
         } else {
            fSlaves->Add(slave);
            TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
            delete dynamic_cast<TTimer*>(p->Value());
            delete p;
            
            nClaimsDone++;
            TMessage m(kPROOF_SERVERSTARTED);
            m << TString("Opening connections to workers") << nClaims
              << nClaimsDone << slave->IsValid();
            gProofServ->GetSocket()->Send(m);
         }
         if (idx>=claims.GetSize()) {
            trial++;
            idx = 0;
         }
      }
   }
   if (!parallel) {
      
      
      TIter nxsl(fSlaves);
      TSlave *sl = 0;
      int nSlavesDone = 0, nSlaves = fSlaves->GetSize();
      while ((sl = (TSlave *) nxsl())) {
         
         if (sl->IsValid()) {
            sl->SetupServ(TSlave::kSlave, 0);
         }
         
         if (sl->IsValid()) {
            fAllMonitor->Add(sl->GetSocket());
         } else {
            fBadSlaves->Add(sl);
         }
         
         nSlavesDone++;
         TMessage m(kPROOF_SERVERSTARTED);
         m << TString("Setting up worker servers") << nSlaves
           << nSlavesDone << sl->IsValid();
         gProofServ->GetSocket()->Send(m);
      }
   }
   
   
   if (parallel) {
      SafeDelete(startedsl);
      SafeDelete(nextsl);
      SafeDelete(nextclaim);
   }
   return kTRUE;
}
void TProofCondor::SetActive(Bool_t active)
{
   
   if (fTimer == 0) {
      fTimer = new TTimer();
   }
   if (active) {
      PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
      fTimer->Stop();
      if (fCondor->GetState() == TCondor::kSuspended)
         fCondor->Resume();
   } else {
return; 
      Int_t delay = 60000; 
      PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %ld) --",
                          delay, delay + long(gSystem->Now()));
      fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
      fTimer->Start(10000, kTRUE); 
   }
}
TString TProofCondor::GetJobAd()
{
   
   TString ad;
   ad = "JobUniverse = 5\n"; 
   ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
   ad += "Iwd = \"/tmp\"\n";
   ad += "In = \"/dev/null\"\n";
   ad += "Out = \"/tmp/proofd.out.$(Port)\"\n";
   ad += "Err = \"/tmp/proofd.err.$(Port)\"\n";
   ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
   return ad;
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.