#include "TXSlave.h"
#include "TProof.h"
#include "TProofServ.h"
#include "TSystem.h"
#include "TEnv.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TError.h"
#include "TSysEvtHandler.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TXSocket.h"
#include "TXSocketHandler.h"
#include "Varargs.h"
ClassImp(TXSlave)
extern "C" {
   TSlave *GetTXSlave(const char *url, const char *ord, Int_t perf,
                      const char *image, TProof *proof, Int_t stype,
                      const char *workdir, const char *msd)
   {
      return ((TSlave *)(new TXSlave(url, ord, perf, image,
                                     proof, stype, workdir, msd)));
   }
}
class XSlaveInit {
 public:
   XSlaveInit() {
      TSlave::SetTXSlaveHook(&GetTXSlave);
}};
static XSlaveInit xslave_init;
void TXSlave::DoError(int level, const char *location, const char *fmt, va_list va) const
{
   
   ::ErrorHandler(level, Form("TXSlave::%s", location), fmt, va);
}
class TXSlaveInterruptHandler : public TSignalHandler {
private:
   TXSocket *fSocket;
public:
   TXSlaveInterruptHandler(TXSocket *s = 0)
      : TSignalHandler(kSigInterrupt, kFALSE), fSocket(s) { }
   Bool_t Notify();
};
Bool_t TXSlaveInterruptHandler::Notify()
{
   
   Info("Notify","Processing interrupt signal ...");
   
   if (fSocket)
      fSocket->SetInterrupt();
   return kTRUE;
}
TXSlave::TXSlave(const char *url, const char *ord, Int_t perf,
               const char *image, TProof *proof, Int_t stype,
               const char *workdir, const char *msd) : TSlave()
{
   
   fImage = image;
   fProofWorkDir = workdir;
   fWorkDir = workdir;
   fOrdinal = ord;
   fPerfIdx = perf;
   fProof = proof;
   fSlaveType = (ESlaveType)stype;
   fMsd = msd;
   fIntHandler = 0;
   fValid = kFALSE;
   
   TXSocketHandler *sh = TXSocketHandler::GetSocketHandler();
   gSystem->AddFileHandler(sh);
   TXSocket::fgLoc = (fProof->IsMaster()) ? "master" : "client" ;
   Init(url, stype);
}
void TXSlave::Init(const char *host, Int_t stype)
{
   
   
   
   
   
   
   TUrl url(host);
   url.SetProtocol(fProof->fUrl.GetProtocol());
   
   if (url.GetPort() == TUrl("a").GetPort()) {
      
      
      
      Int_t port = gSystem->GetServiceByName("proofd");
      if (port < 0) {
         if (gDebug > 0)
            Info("Init","service 'proofd' not found by GetServiceByName"
                        ": using default IANA assigned tcp port 1093");
         port = 1094;
      } else {
         if (gDebug > 1)
            Info("Init","port from GetServiceByName: %d", port);
      }
      url.SetPort(port);
   }
   
   fName = url.GetHost();
   fPort = url.GetPort(); 
   
   fGroup = url.GetPasswd();
   
   
   
   TString opts(url.GetOptions());
   Bool_t attach = (opts.Length() > 0 && opts.IsDigit()) ? kTRUE : kFALSE;
   Int_t psid = (attach) ? opts.Atoi() : kPROOF_Protocol;
   
   TString iam;
   Char_t mode = 's';
   TString alias = fProof->GetTitle();
   if (fProof->IsMaster() && stype == kSlave) {
      iam = "Master";
      mode = 's';
      
      alias = Form("session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
   } else if (fProof->IsMaster() && stype == kMaster) {
      iam = "Master";
      mode = 'm';
      
      alias = Form("session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
   } else if (!fProof->IsMaster() && stype == kMaster) {
      iam = "Local Client";
      mode = (attach) ? 'A' : 'M';
   } else {
      Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
      R__ASSERT(0);
   }
   
   if (fProof->fConfFile.Length() > 0)
      alias += Form("|cf:%s",fProof->fConfFile.Data());
   
   TString envlist;
   if (!fProof->GetManager() ||
        fProof->GetManager()->GetRemoteProtocol() > 1001) {
         const TList *envs = TProof::GetEnvVars();
         if (envs != 0 ) {
            TIter next(envs);
            for (TObject *o = next(); o != 0; o = next()) {
               TNamed *env = dynamic_cast<TNamed*>(o);
               if (env != 0) {
                  if (!envlist.IsNull())
                     envlist += ",";
                  envlist += Form("%s=%s", env->GetName(), env->GetTitle());
               }
            }
         }
   } else {
      if (fProof->GetManager() && TProof::GetEnvVars())
         Info("Init", "** NOT ** sending user envs - RemoteProtocol : %d",
                      fProof->GetManager()->GetRemoteProtocol());
   }
   
   if (!envlist.IsNull())
      alias += Form("|envs:%s", envlist.Data());
   
   
   if (!(fSocket = new TXSocket(url.GetUrl(kTRUE), mode, psid,
                                -1, alias, fProof->GetLogLevel(), this))) {
      Error("Init", "while opening the connection to %s - exit", url.GetUrl(kTRUE));
      return;
   }
   
   if (!(fSocket->IsValid())) {
      Error("Init", "some severe error occurred while opening "
                    "the connection at %s - exit", url.GetUrl(kTRUE));
      SafeDelete(fSocket);
      return;
   }
   
   if (!fProof->GetManager() && !envlist.IsNull() &&
      ((TXSocket *)fSocket)->GetXrdProofdVersion() <= 1001) {
      Info("Init","user envs setting sent but unsupported remotely - RemoteProtocol : %d",
                     ((TXSocket *)fSocket)->GetXrdProofdVersion()); 
   }
   
   ((TXSocket *)fSocket)->fReference = fProof;
   
   fProtocol = fSocket->GetRemoteProtocol();
   
   fProof->fServType = TProofMgr::kXProofd;
   
   fProof->fSessionID = ((TXSocket *)fSocket)->GetSessionID();
   
   TString dpu(((TXSocket *)fSocket)->fBuffer);
   if (dpu.Length() > 0)
      fProof->SetDataPoolUrl(dpu);
   
   
   
   
   {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(fSocket);
   }
   R__LOCKGUARD2(gProofMutex);
   
   fUser = ((TXSocket *)fSocket)->fUser;
   PDB(kGlobal,3) {
      Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
   }
   
   fValid = kTRUE;
}
Int_t TXSlave::SetupServ(Int_t, const char *)
{
   
   
   
   
   
   Int_t what;
   char buf[512];
   if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
      Error("SetupServ", "failed to receive slave startup message");
      Close("S");
      SafeDelete(fSocket);
      fValid = kFALSE;
      return -1;
   }
   if (what == kMESS_NOTOK) {
      SafeDelete(fSocket);
      fValid = kFALSE;
      return -1;
   }
   
   if (fProtocol < 4) {
      Error("SetupServ", "incompatible PROOF versions (remote version "
                         "must be >= 4, is %d)", fProtocol);
      SafeDelete(fSocket);
      fValid = kFALSE;
      return -1;
   }
   fProof->fProtocol   = fProtocol;   
   
   fSocket->SetOption(kNoDelay, 1);
   
   return 0;
}
TXSlave::~TXSlave()
{
   
   Close();
}
void TXSlave::Close(Option_t *opt)
{
   
   if (fSocket)
      
      fSocket->Close(opt);
   SafeDelete(fInput);
   SafeDelete(fSocket);
}
Int_t TXSlave::Ping()
{
   
   
   if (!IsValid()) return -1;
   return ((TXSocket *)fSocket)->Ping();
}
void TXSlave::Interrupt(Int_t type)
{
   
   
   if (!IsValid()) return;
   if (type == TProof::kLocalInterrupt) {
      
      
      if (fProof) {
         
         TMonitor *mon = fProof->fCurrentMonitor;
         if (mon && fSocket && mon->GetListOfActives()->FindObject(fSocket)) {
            
            if (gDebug > 2)
               Info("Interrupt", "%p: deactivating from monitor %p", this, mon);
            mon->DeActivate(fSocket);
         }
      } else {
         Warning("Interrupt", "%p: reference to PROOF missing", this);
      }
      
      if (fSocket) {
         R__LOCKGUARD(((TXSocket *)fSocket)->fAMtx);
         TSemaphore *sem = &(((TXSocket *)fSocket)->fASem);
         while (sem->TryWait() != 1)
            sem->Post();
      }
      return;
   }
   ((TXSocket *)fSocket)->SendInterrupt(type);
   Info("Interrupt","Interrupt of type %d sent", type);
}
void TXSlave::StopProcess(Bool_t abort, Int_t timeout)
{
   
   
   if (!IsValid()) return;
   ((TXSocket *)fSocket)->SendUrgent(TXSocket::kStopProcess, (Int_t)abort, timeout);
   if (gDebug > 0)
      Info("StopProcess", "Request of type %d sent over", abort);
}
Int_t TXSlave::GetProofdProtocol(TSocket *s)
{
   
   
   Int_t rproto = -1;
   UInt_t cproto = 0;
   Int_t len = sizeof(cproto);
   memcpy((char *)&cproto,
      Form(" %d", TSocket::GetClientProtocol()),len);
   Int_t ns = s->SendRaw(&cproto, len);
   if (ns != len) {
      ::Error("TXSlave::GetProofdProtocol",
              "sending %d bytes to proofd server [%s:%d]",
              len, (s->GetInetAddress()).GetHostName(), s->GetPort());
      return -1;
   }
   
   Int_t ibuf[2] = {0};
   len = sizeof(ibuf);
   Int_t nr = s->RecvRaw(ibuf, len);
   if (nr != len) {
      ::Error("TXSlave::GetProofdProtocol",
              "reading %d bytes from proofd server [%s:%d]",
              len, (s->GetInetAddress()).GetHostName(), s->GetPort());
      return -1;
   }
   Int_t kind = net2host(ibuf[0]);
   if (kind == kROOTD_PROTOCOL) {
      rproto = net2host(ibuf[1]);
   } else {
      kind = net2host(ibuf[1]);
      if (kind == kROOTD_PROTOCOL) {
         len = sizeof(rproto);
         nr = s->RecvRaw(&rproto, len);
         if (nr != len) {
            ::Error("TXSlave::GetProofdProtocol",
                    "reading %d bytes from proofd server [%s:%d]",
                    len, (s->GetInetAddress()).GetHostName(), s->GetPort());
            return -1;
         }
         rproto = net2host(rproto);
      }
   }
   if (gDebug > 2)
      ::Info("TXSlave::GetProofdProtocol",
             "remote proofd: buf1: %d, buf2: %d rproto: %d",
             net2host(ibuf[0]),net2host(ibuf[1]),rproto);
   
   return rproto;
}
TObjString *TXSlave::SendCoordinator(Int_t kind, const char *msg, Int_t int2)
{
   
   
   return ((TXSocket *)fSocket)->SendCoordinator(kind, msg, int2);
}
void TXSlave::SetAlias(const char *alias)
{
   
   
   
   
   if (!IsValid()) return;
   ((TXSocket *)fSocket)->SendCoordinator(TXSocket::kSessionAlias, alias);
   return;
}
Bool_t TXSlave::HandleError(const void *)
{
   
   Info("HandleError", "%p: got called ... fProof: %p", this, fProof);
   
   if (fSocket)
      ((TXSocket *)fSocket)->SetInterrupt();
   
   SetInterruptHandler(kFALSE);
   if (fProof) {
      
      if (fProof->fIntHandler)
         fProof->fIntHandler->Remove();
      
      TMonitor *mon = fProof->fCurrentMonitor;
      if (gDebug > 2)
         Info("HandleError", "%p: proof: %p, mon: %p", this, fProof, mon);
      if (mon && fSocket && mon->GetListOfActives()->FindObject(fSocket)) {
         
         if (gDebug > 2)
            Info("HandleError", "%p: deactivating from monitor %p", this, mon);
         mon->DeActivate(fSocket);
      }
      
      if (fProof->IsMaster()) {
         
         TString msg(Form("Worker '%s-%s' has been removed from the active list",
                          fName.Data(), fOrdinal.Data()));
         TMessage m(kPROOF_MESSAGE);
         m << msg;
         if (gProofServ)
            gProofServ->GetSocket()->Send(m);
         else
            Warning("HandleError", "%p: global reference to TProofServ missing");
         
         if (fSocket)
            ((TXSocket *)fSocket)->SetSessionID(-1);
         fProof->MarkBad(this);
      } else {
         
         
         fProof->GetListOfSlaves()->Remove(this);
         TProofMgr *mgr= fProof->GetManager();
         if (mgr)
            mgr->ShutdownSession(fProof);
         Close("P");
         SafeDelete(fSocket);
         fValid = kFALSE;
      }
   } else {
      Warning("HandleError", "%p: reference to PROOF missing", this);
   }
   
   if (fSocket) {
      R__LOCKGUARD(((TXSocket *)fSocket)->fAMtx);
      TSemaphore *sem = &(((TXSocket *)fSocket)->fASem);
      while (sem->TryWait() != 1)
         sem->Post();
   }
   if (gDebug > 0)
      Info("HandleError", "%p: DONE ... ", this);
   
   return kTRUE;
}
Bool_t TXSlave::HandleInput(const void *)
{
   
   if (fProof) {
      
      TMonitor *mon = fProof->fCurrentMonitor;
      if (gDebug > 2)
         Info("HandleInput", "%p: %s: proof: %p, mon: %p",
                             this, GetOrdinal(), fProof, mon);
      if (mon) {
         if (mon->IsActive(fSocket)) {
            
            if (gDebug > 2)
               Info("HandleInput","%p: %s: posting monitor %p", this, GetOrdinal(), mon);
            mon->SetReady(fSocket);
         }
      } else {
         
         if (gDebug > 2)
            Info("HandleInput","%p: %s: calling TProof::CollectInputFrom", this, GetOrdinal());
         fProof->CollectInputFrom(fSocket);
      }
   } else {
      Warning("HandleInput", "%p: %s: reference to PROOF missing", this, GetOrdinal());
      return kFALSE;
   }
   
   return kTRUE;
}
void TXSlave::SetInterruptHandler(Bool_t on)
{
   
   if (gDebug > 1)
      Info("SetInterruptHandler", "enter: %d", on);
   if (on) {
      if (!fIntHandler)
         fIntHandler = new TXSlaveInterruptHandler((TXSocket *)fSocket);
      fIntHandler->Add();
   } else {
      if (fIntHandler)
         fIntHandler->Remove();
   }
}
void TXSlave::FlushSocket()
{
   
   if (gDebug > 1)
      Info("FlushSocket", "enter: %p", fSocket);
   if (fSocket)
      TXSocket::FlushPipe(fSocket);
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.