// @(#)root/proofx:$Name: $:$Id: TXSlave.cxx,v 1.6 2006/04/19 10:57:44 rdm Exp $
// Author: Gerardo Ganis 12/12/2005
/*************************************************************************
* Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// TXSlave //
// //
// This is the version of TSlave for slave servers based on XRD. //
// See TSlave for details. //
// //
//////////////////////////////////////////////////////////////////////////
#include "TXSlave.h"
#include "TProof.h"
#include "TSystem.h"
#include "TEnv.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TError.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TXSocket.h"
#include "TXSocketHandler.h"
ClassImp(TXSlave)
//______________________________________________________________________________
//---- Hook to the constructor -------------------------------------------------
//---- This is needed to avoid using the plugin manager which may create -------
//---- problems in multi-threaded environments. --------------------------------
extern "C" {
TSlave *GetTXSlave(const char *url, const char *ord, Int_t perf,
const char *image, TProof *proof, Int_t stype,
const char *workdir, const char *msd)
{
return ((TSlave *)(new TXSlave(url, ord, perf, image,
proof, stype, workdir, msd)));
}
}
class XSlaveInit {
public:
XSlaveInit() {
TSlave::SetTXSlaveHook(&GetTXSlave);
}};
static XSlaveInit xslave_init;
//______________________________________________________________________________
//---- error handling ----------------------------------------------------------
//---- Needed to avoid blocking on the CINT mutex in printouts -----------------
//______________________________________________________________________________
void TXSlave::DoError(int level, const char *location, const char *fmt, va_list va) const
{
// Interface to ErrorHandler (protected).
::ErrorHandler(level, Form("TXSlave::%s", location), fmt, va);
}
//______________________________________________________________________________
TXSlave::TXSlave(const char *url, const char *ord, Int_t perf,
const char *image, TProof *proof, Int_t stype,
const char *workdir, const char *msd) : TSlave()
{
// Create a PROOF slave object. Called via the TProof ctor.
fImage = image;
fProofWorkDir = workdir;
fWorkDir = workdir;
fOrdinal = ord;
fPerfIdx = perf;
fProof = proof;
fSlaveType = (ESlaveType)stype;
fMsd = msd;
// Instance of the socket input handler to monitor all the XPD sockets
TXSocketHandler *sh = TXSocketHandler::GetSocketHandler();
gSystem->AddFileHandler(sh);
TXSocket::fgLoc = (fProof->IsMaster()) ? "master" : "client" ;
Init(url, stype);
}
//______________________________________________________________________________
void TXSlave::Init(const char *host, Int_t stype)
{
// Init a PROOF slave object. Called via the TXSlave ctor.
// The Init method is technology specific and is overwritten by derived
// classes.
// Url string with host, port information; 'host' may contain 'user' information
// in the usual form 'user@host'
// Auxilliary url
TUrl url(host);
url.SetProtocol(fProof->fUrl.GetProtocol());
// Check port
if (url.GetPort() == TUrl("a").GetPort()) {
// For the time being we use 'rootd' service as default.
// This will be changed to 'proofd' as soon as XRD will be able to
// accept on multiple ports
Int_t port = gSystem->GetServiceByName("proofd");
if (port < 0) {
if (gDebug > 0)
Info("Init","service 'proofd' not found by GetServiceByName"
": using default IANA assigned tcp port 1093");
port = 1094;
} else {
if (gDebug > 1)
Info("Init","port from GetServiceByName: %d", port);
}
url.SetPort(port);
}
// Fill members
fName = url.GetHost();
fPort = url.GetPort(); // We get the right default if the port is not specified
// If we are attaching to an existing process, the ID is passed in the
// options field of the url
Int_t psid = (strlen(url.GetOptions()) > 0) ? atoi(url.GetOptions()) : -1;
// Add information about our status (Client or Master)
TString iam;
Char_t mode = 's';
if (fProof->IsMaster() && stype == kSlave) {
iam = "Master";
mode = 's';
} else if (fProof->IsMaster() && stype == kMaster) {
iam = "Master";
mode = 'm';
} else if (!fProof->IsMaster() && stype == kMaster) {
iam = "Local Client";
mode = 'M';
} else {
Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
R__ASSERT(0);
}
// Open connection to a remote XrdPROOF slave server.
// Login and authentication are dealt with at this level, if required.
if (!(fSocket = new TXSocket(url.GetUrl(kTRUE),
mode, psid, -1, fProof->GetTitle()))) {
Error("Init", "while opening the connection to %s - exit", url.GetUrl(kTRUE));
return;
}
// The socket may not be valid
if (!(fSocket->IsValid())) {
Error("Init", "some severe error occurred while opening"
" the connection at %s - exit", url.GetUrl(kTRUE));
return;
}
// Set the reference to TProof
((TXSocket *)fSocket)->fReference = fProof;
// Set this has handler
((TXSocket *)fSocket)->fHandler = this;
// Set server type
fProof->fServType = TVirtualProofMgr::kXProofd;
// Set remote session ID
fProof->fSessionID = ((TXSocket *)fSocket)->GetSessionID();
// Remove socket from global TROOT socket list. Only the TProof object,
// representing all slave sockets, will be added to this list. This will
// ensure the correct termination of all proof servers in case the
// root session terminates.
{
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(fSocket);
}
R__LOCKGUARD2(gProofMutex);
// Fill some useful info
fUser = ((TXSocket *)fSocket)->fUser;
PDB(kGlobal,3) {
Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
}
}
//______________________________________________________________________________
void TXSlave::SetupServ(Int_t stype, const char *conffile)
{
// Init a PROOF slave object. Called via the TXSlave ctor.
// The Init method is technology specific and is overwritten by derived
// classes.
// get back startup message of proofserv (we are now talking with
// the real proofserver and not anymore with the proofd front-end)
Int_t what;
char buf[512];
if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
Error("SetupServ", "failed to receive slave startup message");
SafeDelete(fSocket);
return;
}
if (what == kMESS_NOTOK) {
SafeDelete(fSocket);
return;
}
// exchange protocol level between client and master and between
// master and slave
if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
Error("SetupServ", "failed to send local PROOF protocol");
SafeDelete(fSocket);
return;
}
if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
Error("SetupServ", "failed to receive remote PROOF protocol");
SafeDelete(fSocket);
return;
}
// protocols less than 4 are incompatible
if (fProtocol < 4) {
Error("SetupServ", "incompatible PROOF versions (remote version"
" must be >= 4, is %d)", fProtocol);
SafeDelete(fSocket);
return;
}
fProof->fProtocol = fProtocol; // protocol of last slave on master
if (fProtocol < 5) {
//
// Setup authentication related stuff for ald versions
Bool_t isMaster = (stype == kMaster);
TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
if (OldAuthSetup(isMaster, wconf) != 0) {
Error("SetupServ", "OldAuthSetup: failed to setup authentication");
SafeDelete(fSocket);
return;
}
} else {
//
// Send ordinal (and config) info to slave (or master)
TMessage mess;
if (stype == kMaster)
mess << fUser << fOrdinal << TString(conffile);
else
mess << fUser << fOrdinal << fProofWorkDir;
if (fSocket->Send(mess) < 0) {
Error("SetupServ", "failed to send ordinal and config info");
SafeDelete(fSocket);
return;
}
}
// set some socket options
fSocket->SetOption(kNoDelay, 1);
}
//______________________________________________________________________________
TXSlave::~TXSlave()
{
// Destroy slave.
Close();
}
//______________________________________________________________________________
void TXSlave::Close(Option_t *opt)
{
// Close slave socket.
if (fSocket)
// Closing socket ...
fSocket->Close(opt);
SafeDelete(fInput);
SafeDelete(fSocket);
}
//______________________________________________________________________________
Int_t TXSlave::Ping()
{
// Ping the remote master or slave servers.
// Returns 0 if ok, -1 in case of error
if (!IsValid()) return -1;
return ((TXSocket *)fSocket)->Ping();
}
//______________________________________________________________________________
void TXSlave::Interrupt(Int_t type)
{
// Send interrupt to master or slave servers.
// Returns 0 if ok, -1 in case of error
if (!IsValid()) return;
if (type == TProof::kLocalInterrupt) {
// Equivalent to an error condition on the socket
HandleError();
return;
}
((TXSocket *)fSocket)->SendInterrupt(type);
Info("Interrupt","Interrupt of type %d sent", type);
}
//_____________________________________________________________________________
Int_t TXSlave::GetProofdProtocol(TSocket *s)
{
// Find out the remote proofd protocol version.
// Returns -1 in case of error
Int_t rproto = -1;
UInt_t cproto = 0;
Int_t len = sizeof(cproto);
memcpy((char *)&cproto,
Form(" %d", TSocket::GetClientProtocol()),len);
Int_t ns = s->SendRaw(&cproto, len);
if (ns != len) {
::Error("TXSlave::GetProofdProtocol",
"sending %d bytes to proofd server [%s:%d]",
len, (s->GetInetAddress()).GetHostName(), s->GetPort());
return -1;
}
// Get the remote protocol
Int_t ibuf[2] = {0};
len = sizeof(ibuf);
Int_t nr = s->RecvRaw(ibuf, len);
if (nr != len) {
::Error("TXSlave::GetProofdProtocol",
"reading %d bytes from proofd server [%s:%d]",
len, (s->GetInetAddress()).GetHostName(), s->GetPort());
return -1;
}
Int_t kind = net2host(ibuf[0]);
if (kind == kROOTD_PROTOCOL) {
rproto = net2host(ibuf[1]);
} else {
kind = net2host(ibuf[1]);
if (kind == kROOTD_PROTOCOL) {
len = sizeof(rproto);
nr = s->RecvRaw(&rproto, len);
if (nr != len) {
::Error("TXSlave::GetProofdProtocol",
"reading %d bytes from proofd server [%s:%d]",
len, (s->GetInetAddress()).GetHostName(), s->GetPort());
return -1;
}
rproto = net2host(rproto);
}
}
if (gDebug > 2)
::Info("TXSlave::GetProofdProtocol",
"remote proofd: buf1: %d, buf2: %d rproto: %d",
net2host(ibuf[0]),net2host(ibuf[1]),rproto);
// We are done
return rproto;
}
//______________________________________________________________________________
TObjString *TXSlave::SendCoordinator(Int_t kind, const char *msg)
{
// Send message to intermediate coordinator.
// If any output is due, this is returned as a generic message
return ((TXSocket *)fSocket)->SendCoordinator(kind, msg);
}
//______________________________________________________________________________
void TXSlave::SetAlias(const char *alias)
{
// Set an alias for this session. If reconnection is supported, the alias
// will be communicated to the remote coordinator so that it can be recovered
// when reconnecting
// Nothing to do if not in contact with coordinator
if (!IsValid()) return;
((TXSocket *)fSocket)->SendCoordinator(TXSocket::kSessionAlias, alias);
return;
}
//_____________________________________________________________________________
Bool_t TXSlave::HandleError()
{
// Handle error on the input socket
Printf("HandleError: %p: got called ...", this);
// Post semaphore to wake up anybody waiting; send as many posts as needed
TSemaphore *sem = &(((TXSocket *)fSocket)->fASem);
while (sem->TryWait() != 1)
sem->Post();
if (fProof) {
// Attach to the monitor instance, if any
TMonitor *mon = fProof->fCurrentMonitor;
if (gDebug > 2)
Info("HandleInput","%p: proof: %p, mon: %p", this, fProof, mon);
if (mon && mon->GetListOfActives()->FindObject(fSocket)) {
// Synchronous collection in TProof
if (gDebug > 2)
Info("HandleInput","%p: deactivating from monitor %p", this, mon);
mon->DeActivate(fSocket);
}
} else {
Warning("HandleInput", "%p: reference to PROOF missing", this);
}
Printf("HandleError: %p: DONE ... ", this);
// We are done
return kTRUE;
}
//_____________________________________________________________________________
Bool_t TXSlave::HandleInput()
{
// Handle asynchronous input on the socket
if (fProof) {
// Attach to the monitor instance, if any
TMonitor *mon = fProof->fCurrentMonitor;
if (gDebug > 2)
Info("HandleInput","%p: proof: %p, mon: %p", this, fProof, mon);
if (mon && mon->GetListOfActives()->FindObject(fSocket)) {
// Synchronous collection in TProof
if (gDebug > 2)
Info("HandleInput","%p: posting monitor %p", this, mon);
mon->SetReady(fSocket);
} else {
// Asynchronous collection in TProof
if (gDebug > 2)
Info("HandleInput","%p: calling TProof::CollectInputFrom", this);
fProof->CollectInputFrom(fSocket);
}
} else {
Warning("HandleInput", "%p: reference to PROOF missing", this);
return kFALSE;
}
// We are done
return kTRUE;
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.