#include "RConfigure.h"
#include "TApplication.h"
#include "TSlave.h"
#include "TProof.h"
#include "TSystem.h"
#include "TEnv.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TMessage.h"
#include "TError.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TSocket.h"
#include "TObjString.h"
ClassImp(TSlave)
TSlave_t TSlave::fgTXSlaveHook = 0;
TSlave::TSlave(const char *url, const char *ord, Int_t perf,
               const char *image, TProof *proof, Int_t stype,
               const char *workdir, const char *msd)
  : fImage(image), fProofWorkDir(workdir),
    fWorkDir(workdir), fPort(-1),
    fOrdinal(ord), fPerfIdx(perf),
    fProtocol(0), fSocket(0), fProof(proof),
    fInput(0), fBytesRead(0), fRealTime(0),
    fCpuTime(0), fSlaveType((ESlaveType)stype), fStatus(TSlave::kInvalid),
    fParallel(0), fMsd(msd)
{
   
   fName = TUrl(url).GetHostFQDN();
   fPort = TUrl(url).GetPort();
   Init(url, -1, stype);
}
TSlave::TSlave()
{
   
   fPort      = -1;
   fOrdinal   = "-1";
   fPerfIdx   = -1;
   fProof     = 0;
   fSlaveType = kMaster;
   fProtocol  = 0;
   fSocket    = 0;
   fInput     = 0;
   fBytesRead = 0;
   fRealTime  = 0;
   fCpuTime   = 0;
   fStatus    = kInvalid;
   fParallel  = 0;
}
void TSlave::Init(const char *host, Int_t port, Int_t stype)
{
   
   
   
   
   
   TString proto = fProof->fUrl.GetProtocol();
   proto.Insert(5, 'd');
   TUrl hurl(host);
   hurl.SetProtocol(proto);
   if (port > 0)
      hurl.SetPort(port);
   
   TString iam;
   if (fProof->IsMaster() && stype == kSlave) {
      iam = "Master";
      hurl.SetOptions("SM");
   } else if (fProof->IsMaster() && stype == kMaster) {
      iam = "Master";
      hurl.SetOptions("MM");
   } else if (!fProof->IsMaster() && stype == kMaster) {
      iam = "Local Client";
      hurl.SetOptions("MC");
   } else {
      Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
      R__ASSERT(0);
   }
   
   
   
   
   Int_t wsize = 65536;
   fSocket = TSocket::CreateAuthSocket(hurl.GetUrl(), 0, wsize, fSocket);
   if (!fSocket || !fSocket->IsAuthenticated()) {
      SafeDelete(fSocket);
      return;
   }
   
   
   
   
   {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(fSocket);
   }
   R__LOCKGUARD2(gProofMutex);
   
   fUser              = fSocket->GetSecContext()->GetUser();
   PDB(kGlobal,3) {
      Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
   }
   if (fSocket->GetRemoteProtocol() >= 14 ) {
      TMessage m(kPROOF_SETENV);
      const TList *envs = TProof::GetEnvVars();
      if (envs != 0 ) {
         TIter next(envs);
         for (TObject *o = next(); o != 0; o = next()) {
            TNamed *env = dynamic_cast<TNamed*>(o);
            if (env != 0) {
               TString def = Form("%s=%s", env->GetName(), env->GetTitle());
               const char *p = def.Data();
               m << p;
            }
         }
      }
      fSocket->Send(m);
   } else {
      Info("Init","** NOT ** Sending kPROOF_SETENV RemoteProtocol : %d",
         fSocket->GetRemoteProtocol());
   }
   char buf[512];
   fSocket->Recv(buf, sizeof(buf));
   if (strcmp(buf, "Okay")) {
      Printf("%s", buf);
      SafeDelete(fSocket);
      return;
   }
}
Int_t TSlave::SetupServ(Int_t stype, const char *conffile)
{
   
   
   
   
   
   Int_t what;
   char buf[512];
   if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
      Error("SetupServ", "failed to receive slave startup message");
      SafeDelete(fSocket);
      return -1;
   }
   if (what == kMESS_NOTOK) {
      SafeDelete(fSocket);
      return -1;
   }
   
   
   if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
      Error("SetupServ", "failed to send local PROOF protocol");
      SafeDelete(fSocket);
      return -1;
   }
   if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
      Error("SetupServ", "failed to receive remote PROOF protocol");
      SafeDelete(fSocket);
      return -1;
   }
   
   if (fProtocol < 4) {
      Error("SetupServ", "incompatible PROOF versions (remote version"
                      " must be >= 4, is %d)", fProtocol);
      SafeDelete(fSocket);
      return -1;
   }
   fProof->fProtocol   = fProtocol;   
   if (fProtocol < 5) {
      
      
      Bool_t isMaster = (stype == kMaster);
      TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
      if (OldAuthSetup(isMaster, wconf) != 0) {
         Error("SetupServ", "OldAuthSetup: failed to setup authentication");
         SafeDelete(fSocket);
         return -1;
      }
   } else {
      
      
      TMessage mess;
      if (stype == kMaster)
         mess << fUser << fOrdinal << TString(conffile);
      else
         mess << fUser << fOrdinal << fProofWorkDir;
      if (fSocket->Send(mess) < 0) {
         Error("SetupServ", "failed to send ordinal and config info");
         SafeDelete(fSocket);
         return -1;
      }
   }
   
   fSocket->SetOption(kNoDelay, 1);
   
   fStatus = kActive;
   
   return 0;
}
void TSlave::Init(TSocket *s, Int_t stype)
{
   
   
   fSocket = s;
   TSlave::Init(s->GetInetAddress().GetHostName(), s->GetPort(), stype);
}
TSlave::~TSlave()
{
   
   Close();
}
void TSlave::Close(Option_t *opt)
{
   
   if (fSocket) {
      
      if (!(fProof->IsMaster()) && !strncasecmp(opt,"S",1)) {
         
         Interrupt(TProof::kShutdownInterrupt);
      }
      
      
      TSecContext *sc = fSocket->GetSecContext();
      if (sc && sc->IsActive()) {
         TIter last(sc->GetSecContextCleanup(), kIterBackward);
         TSecContextCleanup *nscc = 0;
         while ((nscc = (TSecContextCleanup *)last())) {
            if (nscc->GetType() == TSocket::kPROOFD &&
                nscc->GetProtocol() < 9) {
               sc->DeActivate("");
               break;
            }
         }
      }
   }
   SafeDelete(fInput);
   SafeDelete(fSocket);
}
Int_t TSlave::Compare(const TObject *obj) const
{
   
   const TSlave *sl = dynamic_cast<const TSlave*>(obj);
   if (fPerfIdx > sl->GetPerfIdx()) return 1;
   if (fPerfIdx < sl->GetPerfIdx()) return -1;
   const char *myord = GetOrdinal();
   const char *otherord = sl->GetOrdinal();
   while (myord && otherord) {
      Int_t myval = atoi(myord);
      Int_t otherval = atoi(otherord);
      if (myval < otherval) return 1;
      if (myval > otherval) return -1;
      myord = strchr(myord, '.');
      if (myord) myord++;
      otherord = strchr(otherord, '.');
      if (otherord) otherord++;
   }
   if (myord) return -1;
   if (otherord) return 1;
   return 0;
}
void TSlave::Print(Option_t *) const
{
   
   TString sc;
   const char *sst[] = { "invalid" , "valid", "inactive" };
   Int_t st = fSocket ? ((fStatus == kInactive) ? 2 : 1) : 0;
   Printf("*** Worker %s  (%s)", fOrdinal.Data(), sst[st]);
   Printf("    Host name:               %s", GetName());
   Printf("    Port number:             %d", GetPort());
   Printf("    ROOT version|rev|tag:    %s", GetROOTVersion());
   Printf("    Architecture-Compiler:   %s", GetArchCompiler());
   if (fSocket) {
      if (strlen(GetGroup()) > 0) {
         Printf("    User/Group:              %s/%s", GetUser(), GetGroup());
      } else {
         Printf("    User:                    %s", GetUser());
      }
      if (fSocket->GetSecContext())
         Printf("    Security context:        %s", fSocket->GetSecContext()->AsString(sc));
      Printf("    Proofd protocol version: %d", fSocket->GetRemoteProtocol());
      Printf("    Image name:              %s", GetImage());
      Printf("    Working directory:       %s", GetWorkDir());
      Printf("    Performance index:       %d", GetPerfIdx());
      Printf("    MB's processed:          %.2f", float(GetBytesRead())/(1024*1024));
      Printf("    MB's sent:               %.2f", float(fSocket->GetBytesRecv())/(1024*1024));
      Printf("    MB's received:           %.2f", float(fSocket->GetBytesSent())/(1024*1024));
      Printf("    Real time used (s):      %.3f", GetRealTime());
      Printf("    CPU time used (s):       %.3f", GetCpuTime());
   }
}
void TSlave::SetInputHandler(TFileHandler *ih)
{
   
   
   fInput = ih;
   fInput->Add();
}
Int_t TSlave::OldAuthSetup(Bool_t master, TString wconf)
{
   
   
   static OldSlaveAuthSetup_t oldAuthSetupHook = 0;
   if (!oldAuthSetupHook) {
      
      TString authlib = "libRootAuth";
      char *p = 0;
      
      if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
         delete[] p;
         if (gSystem->Load(authlib) == -1) {
            Error("OldAuthSetup", "can't load %s",authlib.Data());
            return kFALSE;
         }
      } else {
         Error("OldAuthSetup", "can't locate %s",authlib.Data());
         return -1;
      }
      
      
      Func_t f = gSystem->DynFindSymbol(authlib,"OldSlaveAuthSetup");
      if (f)
         oldAuthSetupHook = (OldSlaveAuthSetup_t)(f);
      else {
         Error("OldAuthSetup", "can't find OldSlaveAuthSetup");
         return -1;
      }
   }
   
   
   if (oldAuthSetupHook) {
      return (*oldAuthSetupHook)(fSocket, master, fOrdinal, wconf);
   } else {
      Error("OldAuthSetup", "hook to method OldSlaveAuthSetup is undefined");
      return -1;
   }
}
TSlave *TSlave::Create(const char *url, const char *ord, Int_t perf,
                       const char *image, TProof *proof, Int_t stype,
                       const char *workdir, const char *msd)
{
   
   
   TSlave *s = 0;
   
   Bool_t tryxpd = kTRUE;
   if (!(proof->IsMaster())) {
      if (proof->IsProofd())
         tryxpd = kFALSE;
   } else {
      if (gApplication &&
         (gApplication->Argc() < 3 || strncmp(gApplication->Argv(2),"xpd",3)))
         tryxpd = kFALSE;
   }
   
   
   if (!fgTXSlaveHook) {
      
      TString proofxlib = "libProofx";
      char *p = 0;
      if ((p = gSystem->DynamicPathName(proofxlib, kTRUE))) {
         delete[] p;
         if (gSystem->Load(proofxlib) == -1)
            ::Error("TSlave::Create", "can't load %s", proofxlib.Data());
      } else
         ::Error("TSlave::Create", "can't locate %s", proofxlib.Data());
   }
   
   if (fgTXSlaveHook && tryxpd) {
      s = (*fgTXSlaveHook)(url, ord, perf, image, proof, stype, workdir, msd);
   } else {
      s = new TSlave(url, ord, perf, image, proof, stype, workdir, msd);
   }
   return s;
}
Int_t TSlave::Ping()
{
   
   
   if (!IsValid()) return -1;
   TMessage mess(kPROOF_PING | kMESS_ACK);
   fSocket->Send(mess);
   if (fSocket->Send(mess) == -1) {
      Warning("Ping","%s: acknowledgement not received", GetOrdinal());
      return -1;
   }
   return 0;
}
void TSlave::Interrupt(Int_t type)
{
   
   
   if (!IsValid()) return;
   char oobc = (char) type;
   const int kBufSize = 1024;
   char waste[kBufSize];
   
   if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
      Error("Interrupt", "error sending oobc to slave %s", GetOrdinal());
      return;
   }
   if (type == TProof::kHardInterrupt) {
      char  oob_byte;
      int   n, nch, nbytes = 0, nloop = 0;
      
      while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
         if (n == -2) {   
            
            
            
            
            
            
            
            
            
            
            fSocket->GetOption(kBytesToRead, nch);
            if (nch == 0) {
               gSystem->Sleep(1000);
               continue;
            }
            if (nch > kBufSize) nch = kBufSize;
            n = fSocket->RecvRaw(waste, nch);
            if (n <= 0) {
               Error("Interrupt", "error receiving waste from slave %s",
                     GetOrdinal());
               break;
            }
            nbytes += n;
         } else if (n == -3) {   
            
            
            
            gSystem->Sleep(100);
            if (++nloop > 100) {  
               Error("Interrupt", "server %s does not respond", GetOrdinal());
               break;
            }
         } else {
            Error("Interrupt", "error receiving OOB from server %s",
                  GetOrdinal());
            break;
         }
      }
      
      
      
      
      while (1) {
         int atmark;
         fSocket->GetOption(kAtMark, atmark);
         if (atmark)
            break;
         
         fSocket->GetOption(kBytesToRead, nch);
         if (nch == 0) {
            gSystem->Sleep(1000);
            continue;
         }
         if (nch > kBufSize) nch = kBufSize;
         n = fSocket->RecvRaw(waste, nch);
         if (n <= 0) {
            Error("Interrupt", "error receiving waste (2) from slave %s",
                  GetOrdinal());
            break;
         }
         nbytes += n;
      }
      if (nbytes > 0) {
         if (fProof->IsMaster())
            Info("Interrupt", "slave %s:%s synchronized: %d bytes discarded",
                 GetName(), GetOrdinal(), nbytes);
         else
            Info("Interrupt", "PROOF synchronized: %d bytes discarded", nbytes);
      }
      
      fProof->Collect(this);
   } else if (type == TProof::kSoftInterrupt) {
      
      fProof->Collect(this);
   } else if (type == TProof::kShutdownInterrupt) {
      ; 
   } else {
      
      fProof->Collect(this);
   }
}
void TSlave::StopProcess(Bool_t abort, Int_t timeout)
{
   
   
   TMessage msg(kPROOF_STOPPROCESS);
   msg << abort;
   if (fProof->fProtocol > 9)
      msg << timeout;
   fSocket->Send(msg);
}
TObjString *TSlave::SendCoordinator(Int_t, const char *, Int_t)
{
   
   
   if (gDebug > 0)
      Info("SendCoordinator","method not implemented for this communication layer");
   return 0;
}
void TSlave::SetAlias(const char *)
{
   
   
   
   if (gDebug > 0)
      Info("SetAlias","method not implemented for this communication layer");
   return;
}
void TSlave::SetTXSlaveHook(TSlave_t xslavehook)
{
   
   fgTXSlaveHook = xslavehook;
}
Last update: Thu Jan 17 09:03:35 2008
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.