#include "TProofSuperMaster.h"
#include "TString.h"
#include "TObjString.h"
#include "TError.h"
#include "TList.h"
#include "TSortedList.h"
#include "TSlave.h"
#include "TMap.h"
#include "TProofServ.h"
#include "TSocket.h"
#include "TMonitor.h"
#include "TSemaphore.h"
#include "TDSet.h"
#include "TPluginManager.h"
#include "TVirtualProofPlayer.h"
#include "TMessage.h"
#include "TUrl.h"
#include "TProofResourcesStatic.h"
#include "TProofNodeInfo.h"
ClassImp(TProofSuperMaster)
TProofSuperMaster::TProofSuperMaster(const char *masterurl, const char *conffile,
                                     const char *confdir, Int_t loglevel)
{
   
   fUrl = TUrl(masterurl);
   if (!conffile || strlen(conffile) == 0)
      conffile = kPROOF_ConfFile;
   else if (!strncasecmp(conffile, "sm:", 3))
      conffile+=3;
   if (!confdir  || strlen(confdir) == 0)
      confdir = kPROOF_ConfDir;
   Init(masterurl, conffile, confdir, loglevel);
}
Bool_t TProofSuperMaster::StartSlaves(Bool_t parallel, Bool_t)
{
   
   
   
   
   
   
   
   
   TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
   fConfFile = resources->GetFileName(); 
   PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
   
   TProofNodeInfo *master = resources->GetMaster();
   if (master)
      fImage = master->GetImage();
   if (!master || (fImage.Length() == 0)) {
      Error("StartSlaves",
            "no appropriate master line found in %s", fConfFile.Data());
      return kFALSE;
   }
   TList *submasterList = resources->GetSubmasters();
   UInt_t nSubmasters = submasterList->GetSize();
   UInt_t nSubmastersDone = 0;
   Int_t ord = 0;
   TList validSubmasters;
   TList validPairs;
   validPairs.SetOwner();
   
   std::vector<TProofThread *> thrHandlers;
   if (parallel) {
      thrHandlers.reserve(nSubmasters);
      if (thrHandlers.max_size() < nSubmasters) {
         PDB(kGlobal,1)
            Info("StartSlaves","cannot reserve enough space thread"
                 " handlers - switch to serial startup");
         parallel = kFALSE;
      }
   }
   
   TListIter next(submasterList);
   TObject *to;
   TProofNodeInfo *submaster;
   while ((to = next())) {
      
      submaster = (TProofNodeInfo *)to;
      const Char_t *conffile = submaster->GetConfig();
      const Char_t *image = submaster->GetImage();
      const Char_t *msd = submaster->GetMsd();
      Int_t sport = submaster->GetPort();
      if (sport == -1)
         sport = fUrl.GetPort();
      TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
      if (parallel) {
         
         TProofThreadArg *ta =
             new TProofThreadArg(submaster->GetNodeName().Data(), sport,
                                 fullord, image, conffile, msd, fSlaves, this);
         if (ta) {
            
            ta->fType = TSlave::kMaster;
            
            TThread *th = new TThread(SlaveStartupThread,ta);
            if (!th) {
               Info("StartSlaves","Can't create startup thread:"
                    " out of system resources");
               SafeDelete(ta);
            } else {
               
               thrHandlers.push_back(new TProofThread(th,ta));
               
               th->Run();
               
               nSubmastersDone++;
               TMessage m(kPROOF_SERVERSTARTED);
               m << TString("Opening connections to submasters") << nSubmasters
                 << nSubmastersDone << kTRUE;
               gProofServ->GetSocket()->Send(m);
            }
         } else {
            Info("StartSlaves","Can't create thread arguments object:"
                 " out of system resources");
         }
      } 
      else {
         
         TSlave *slave =
            CreateSubmaster(Form("%s:%d", submaster->GetNodeName().Data(), sport),
                            fullord, image, msd);
         
         
         Bool_t submasterOk = kTRUE;
         fSlaves->Add(slave);
         if (slave->IsValid()) {
            validPairs.Add(new TPair(slave, new TObjString(conffile)));
         } else {
            submasterOk = kFALSE;
            fBadSlaves->Add(slave);
         }
         PDB(kGlobal,3)
            Info("StartSlaves","submaster on host %s created and"
                 " added to list", submaster->GetNodeName().Data());
         
         nSubmastersDone++;
         TMessage m(kPROOF_SERVERSTARTED);
         m << TString("Opening connections to submasters") << nSubmasters
           << nSubmastersDone << submasterOk;
         gProofServ->GetSocket()->Send(m);
      } 
      ord++;
   } 
   
   delete resources;
   resources = 0;
   nSubmastersDone = 0;
   if (parallel) {
      
      std::vector<TProofThread *>::iterator i;
      for (i = thrHandlers.begin(); i != thrHandlers.end(); ++i) {
         TProofThread *pt = *i;
         
         if (pt && pt->fThread->GetState() == TThread::kRunningState) {
            PDB(kGlobal,3)
               Info("StartSlaves",
                    "parallel startup: waiting for submaster %s (%s:%d)",
                    pt->fArgs->fOrd.Data(), pt->fArgs->fUrl->GetHost(),
                    pt->fArgs->fUrl->GetPort());
            pt->fThread->Join();
         }
         
         nSubmastersDone++;
         TMessage m(kPROOF_SERVERSTARTED);
         m << TString("Setting up submasters") << nSubmasters
           << nSubmastersDone << kTRUE;
         gProofServ->GetSocket()->Send(m);
      }
      TIter next(fSlaves);
      TSlave *sl = 0;
      while ((sl = (TSlave *)next())) {
         if (sl->IsValid()) {
            if (fProtocol == 1) {
               Error("StartSlaves", "master and submaster protocols"
                     " not compatible (%d and %d)",
                     kPROOF_Protocol, fProtocol);
               fBadSlaves->Add(sl);
            } else {
               fAllMonitor->Add(sl->GetSocket());
               validSubmasters.Add(sl);
            }
         } else {
            fBadSlaves->Add(sl);
         }
      } 
      
      while (!thrHandlers.empty()) {
         i = thrHandlers.end()-1;
         if (*i) {
            SafeDelete(*i);
            thrHandlers.erase(i);
         }
      }
   } 
   else {
      
      
      TIter nxsc(&validPairs);
      TPair *sc = 0;
      while ((sc = (TPair *) nxsc())) {
         
         TSlave *sl = (TSlave *) sc->Key();
         TObjString *cf = (TObjString *) sc->Value();
         sl->SetupServ(TSlave::kMaster, cf->GetName());
         
         Bool_t submasterOk = kTRUE;
         if (sl->IsValid()) {
            
            
            if (fProtocol == 1) {
               Error("StartSlaves", "master and submaster protocols"
                     " not compatible (%d and %d)",
                     kPROOF_Protocol, fProtocol);
               submasterOk = kFALSE;
               fBadSlaves->Add(sl);
            } else {
               fAllMonitor->Add(sl->GetSocket());
               validSubmasters.Add(sl);
            }
         } else {
            submasterOk = kFALSE;
            fBadSlaves->Add(sl);
         }
         
         nSubmastersDone++;
         TMessage m(kPROOF_SERVERSTARTED);
         m << TString("Setting up submasters") << nSubmasters
           << nSubmastersDone << submasterOk;
         gProofServ->GetSocket()->Send(m);
      }
   }
   Collect(kAll); 
   TIter nextSubmaster(&validSubmasters);
   while (TSlave* sl = dynamic_cast<TSlave*>(nextSubmaster())) {
      if (sl->GetStatus() == -99) {
         Error("StartSlaves", "not allowed to connect to PROOF master server");
         fBadSlaves->Add(sl);
         continue;
      }
      if (!sl->IsValid()) {
         Error("StartSlaves", "failed to setup connection with PROOF master server");
         fBadSlaves->Add(sl);
         continue;
      }
   }
   return kTRUE;
}
Long64_t TProofSuperMaster::Process(TDSet *set, const char *selector, Option_t *option,
                                    Long64_t nentries, Long64_t first, TEventList *evl)
{
   
   
   
   if (!IsValid()) return -1;
   R__ASSERT(GetPlayer());
   if (GetProgressDialog())
      GetProgressDialog()->ExecPlugin(5, this, selector, set->GetListOfElements()->GetSize(),
                                  first, nentries);
   return GetPlayer()->Process(set, selector, option, nentries, first, evl);
}
void TProofSuperMaster::ValidateDSet(TDSet *dset)
{
   
   if (dset->ElementsValid()) return;
   TList msds;
   msds.SetOwner();
   TList smholder;
   smholder.SetOwner();
   TList elemholder;
   elemholder.SetOwner();
   
   TIter nextSubmaster(GetListOfActiveSlaves());
   while (TSlave *sl = dynamic_cast<TSlave*>(nextSubmaster())) {
      TList *smlist = 0;
      TPair *p = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
      if (!p) {
         smlist = new TList;
         smlist->SetName(sl->GetMsd());
         smholder.Add(smlist);
         TList *elemlist = new TSortedList(kSortDescending);
         elemlist->SetName(TString(sl->GetMsd())+"_elem");
         elemholder.Add(elemlist);
         msds.Add(new TPair(smlist, elemlist));
      } else {
         smlist = dynamic_cast<TList*>(p->Key());
      }
      smlist->Add(sl);
   }
   TIter nextElem(dset->GetListOfElements());
   while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
      if (elem->GetValid()) continue;
      TPair *p = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
      if (p) {
         dynamic_cast<TList*>(p->Value())->Add(elem);
      } else {
         Error("ValidateDSet", "no mass storage domain '%s' associated"
                               " with available submasters",
                               elem->GetMsd());
         return;
      }
   }
   
   TList usedsms;
   TIter nextSM(&msds);
   SetDSet(dset); 
   while (TPair *msd = dynamic_cast<TPair*>(nextSM())) {
      TList *sms = dynamic_cast<TList*>(msd->Key());
      TList *setelements = dynamic_cast<TList*>(msd->Value());
      
      Int_t nsms = sms->GetSize();
      Int_t nelements = setelements->GetSize();
      for (Int_t i=0; i<nsms; i++) {
         TDSet set(dset->GetType(), dset->GetObjName(),
                   dset->GetDirectory());
         for (Int_t j = (i*nelements)/nsms;
                    j < ((i+1)*nelements)/nsms;
                    j++) {
            TDSetElement *elem =
               dynamic_cast<TDSetElement*>(setelements->At(j));
            set.Add(elem->GetFileName(), elem->GetObjName(),
                    elem->GetDirectory(), elem->GetFirst(),
                    elem->GetNum(), elem->GetMsd());
         }
         if (set.GetListOfElements()->GetSize()>0) {
            TMessage mesg(kPROOF_VALIDATE_DSET);
            mesg << &set;
            TSlave *sl = dynamic_cast<TSlave*>(sms->At(i));
            PDB(kGlobal,1) Info("ValidateDSet",
                                "Sending TDSet with %d elements to slave %s"
                                " to be validated",
                                set.GetListOfElements()->GetSize(),
                                sl->GetOrdinal());
            sl->GetSocket()->Send(mesg);
            usedsms.Add(sl);
         }
      }
   }
   PDB(kGlobal,1) Info("ValidateDSet","Calling Collect");
   Collect(&usedsms);
   SetDSet(0);
}
TVirtualProofPlayer *TProofSuperMaster::MakePlayer(const char *player, TSocket *s)
{
   
   
   
   if (!player)
      player = "sm";
   SetPlayer(TVirtualProofPlayer::Create(player, this, s));
   return GetPlayer();
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.