#include "MessageTypes.h"
#include "TEnv.h"
#include "TError.h"
#include "TException.h"
#include "TMonitor.h"
#include "TObjString.h"
#include "TProof.h"
#include "TSlave.h"
#include "TRegexp.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TXHandler.h"
#include "TXSocket.h"
#include "XProofProtocol.h"
#include "XrdProofConn.h"
#include "XrdClient/XrdClientConnMgr.hh"
#include "XrdClient/XrdClientConst.hh"
#include "XrdClient/XrdClientEnv.hh"
#include "XrdClient/XrdClientLogConnection.hh"
#include "XrdClient/XrdClientMessage.hh"
#ifndef WIN32
#include <sys/socket.h>
#else
#include <Winsock2.h>
#endif
#ifdef OLDXRDOUC
# include "XrdSysToOuc.h"
# include "XrdOuc/XrdOucError.hh"
# include "XrdOuc/XrdOucLogger.hh"
#else
# include "XrdSys/XrdSysError.hh"
# include "XrdSys/XrdSysLogger.hh"
#endif
#include "XrdProofdTrace.h"
XrdOucTrace *XrdProofdTrace = 0;
static XrdSysLogger eLogger;
static XrdSysError eDest(0, "Proofx");
#ifdef WIN32
ULong64_t TSocket::fgBytesSent;
ULong64_t TSocket::fgBytesRecv;
#endif
void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
{
::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
}
class TXSocketPingHandler : public TFileHandler {
TXSocket *fSocket;
public:
TXSocketPingHandler(TXSocket *s, Int_t fd)
: TFileHandler(fd, 1) { fSocket = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
Bool_t TXSocketPingHandler::Notify()
{
fSocket->Ping(kTRUE);
return kTRUE;
}
Bool_t TXSocket::fgInitDone = kFALSE;
TList TXSocket::fgReadySock;
TMutex TXSocket::fgReadyMtx(kTRUE);
Int_t TXSocket::fgPipe[2] = {-1,-1};
TString TXSocket::fgLoc = "undef";
TMutex TXSocket::fgSMtx;
std::list<TXSockBuf *> TXSocket::fgSQue;
Long64_t TXSockBuf::fgBuffMem = 0;
Long64_t TXSockBuf::fgMemMax = 10485760;
TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
const char *logbuf, Int_t loglevel, TXHandler *handler)
: TSocket(), fMode(m), fLogLevel(loglevel),
fBuffer(logbuf), fASem(0), fDontTimeout(kFALSE), fRDInterrupt(kFALSE),
fXrdProofdVersion(-1)
{
eDest.logger(&eLogger);
if (!XrdProofdTrace)
XrdProofdTrace = new XrdOucTrace(&eDest);
if (!fgInitDone)
InitEnvs();
if (!(fAMtx = new TMutex(kTRUE))) {
Error("TXSocket", "problems initializing mutex for async queue");
return;
}
fAQue.clear();
if (!(fIMtx = new TMutex(kTRUE))) {
Error("TXSocket", "problems initializing mutex for interrupts");
return;
}
fILev = -1;
fByteLeft = 0;
fByteCur = 0;
fBufCur = 0;
fServType = kPROOFD;
fTcpWindowSize = -1;
fRemoteProtocol = -1;
fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
fSessionID = (fMode == 'C') ? -1 : psid;
fSocket = -1;
fReference = 0;
fHandler = handler;
if (fgPipe[0] == -1) {
if (pipe(fgPipe) != 0) {
Error("TXSocket", "problem initializing global pipe for socket inputs");
return;
}
}
if (url) {
char md = (m != 'A' && m != 'C') ? m : 'M';
fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
if (!fConn || !(fConn->IsValid())) {
if (fConn->GetServType() != XrdProofConn::kSTProofd)
if (gDebug > 0)
Error("TXSocket", "fatal error occurred while opening a connection"
" to server [%s]: %s", url, fConn->GetLastErr());
return;
}
if (m == 'm' || m == 's' || m == 'M' || m == 'A') {
if (!Create()) {
Error("TXSocket", "create or attach failed (%s)",
((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
Close();
return;
}
}
fUser = fConn->fUser.c_str();
fHost = fConn->fHost.c_str();
fPort = fConn->fPort;
if (m == 'C') {
fXrdProofdVersion = fConn->fRemoteProtocol;
fRemoteProtocol = fConn->fRemoteProtocol;
}
fUrl = fConn->fUrl.GetUrl().c_str();
fAddress.fPort = fPort;
fPid = gSystem->GetPid();
}
}
TXSocket::TXSocket(const TXSocket &s) : TSocket(s),XrdClientAbsUnsolMsgHandler(s)
{
}
TXSocket& TXSocket::operator=(const TXSocket&)
{
return *this;
}
TXSocket::~TXSocket()
{
Close();
SafeDelete(fAMtx);
SafeDelete(fIMtx);
}
void TXSocket::SetSessionID(Int_t id)
{
if (id < 0 && fConn)
fConn->SetAsync(0);
fSessionID = id;
}
void TXSocket::DisconnectSession(Int_t id, Option_t *opt)
{
if (!IsValid()) {
if (gDebug > 0)
Info("DisconnectSession","not connected: nothing to do");
return;
}
Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
if (id > -1 || all) {
XPClientRequest Request;
memset(&Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
if (shutdown)
Request.proof.requestid = kXP_destroy;
else
Request.proof.requestid = kXP_detach;
Request.proof.sid = id;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
if (!xrsp && fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
SafeDelete(xrsp);
}
}
void TXSocket::Close(Option_t *opt)
{
TXSocket::FlushPipe(this);
if (!IsValid()) {
if (gDebug > 0)
Info("Close","not connected: nothing to do");
return;
}
TString o(opt);
Int_t sessID = fSessionID;
if (o.Index("#") != kNPOS) {
o.Remove(0,o.Index("#")+1);
if (o.Index("#") != kNPOS) {
o.Remove(o.Index("#"));
sessID = o.IsDigit() ? o.Atoi() : sessID;
}
}
fConn->SetAsync(0);
if (sessID > -1) {
DisconnectSession(sessID, opt);
} else {
fConn->Close(opt);
}
SafeDelete(fConn);
}
UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
XrdClientMessage *m)
{
UnsolRespProcResult rc = kUNSOL_KEEP;
if (gDebug > 2)
Info("ProcessUnsolicitedMsg", "Processing unsolicited msg: %p", m);
if (!m) {
return kUNSOL_CONTINUE;
} else {
if (gDebug > 2)
Info("ProcessUnsolicitedMsg", "status: %d, len: %d bytes",
m->GetStatusCode(), m->DataLen());
}
if (m->IsError()) {
if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
if (gDebug > 0)
Info("ProcessUnsolicitedMsg","got error from underlying connection");
if (fHandler)
fHandler->HandleError();
else
Error("ProcessUnsolicitedMsg","handler undefined");
fSessionID = -1;
} else {
if (gDebug > 2)
Info("ProcessUnsolicitedMsg", "underlying connection timed out");
}
return kUNSOL_CONTINUE;
}
if (!fConn || !m->MatchStreamid(fConn->fStreamid))
return kUNSOL_CONTINUE;
if (!m) {
Error("ProcessUnsolicitedMsg","undefined message");
return rc;
}
Int_t len = 0;
if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
Error("ProcessUnsolicitedMsg","empty or bad-formed message");
return rc;
}
kXR_int32 acod = 0;
memcpy(&acod, m->GetData(), sizeof(kXR_int32));
void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
this, acod, len, m->HeaderSID());
if (gDebug > 3)
DumpReadySock();
kXR_int32 ilev = -1;
switch (acod) {
case kXPD_ping:
ilev = TProof::kPing;
case kXPD_interrupt:
{ R__LOCKGUARD(fIMtx);
if (acod == kXPD_interrupt) {
memcpy(&ilev, pdata, sizeof(kXR_int32));
ilev = net2host(ilev);
}
fILev = ilev;
XHandleIn_t hin = {acod, 0, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_timer:
{
kXR_int32 opt = 1;
kXR_int32 delay = 0;
if (len > 0) {
memcpy(&opt, pdata, sizeof(kXR_int32));
opt = net2host(opt);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
if (len > 0) {
memcpy(&delay, pdata, sizeof(kXR_int32));
delay = net2host(delay);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, opt, delay, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_inflate:
{
kXR_int32 inflate = 1000;
if (len > 0) {
memcpy(&inflate, pdata, sizeof(kXR_int32));
inflate = net2host(inflate);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, inflate, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_priority:
{
kXR_int32 priority = -1;
if (len > 0) {
memcpy(&priority, pdata, sizeof(kXR_int32));
priority = net2host(priority);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, priority, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_flush:
{
XHandleIn_t hin = {acod, 0, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_urgent:
{
kXR_int32 type = -1;
if (len > 0) {
memcpy(&type, pdata, sizeof(kXR_int32));
type = net2host(type);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
kXR_int32 int1 = -1;
if (len > 0) {
memcpy(&int1, pdata, sizeof(kXR_int32));
int1 = net2host(int1);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
kXR_int32 int2 = -1;
if (len > 0) {
memcpy(&int2, pdata, sizeof(kXR_int32));
int2 = net2host(int2);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, type, int1, int2};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_msg:
{ R__LOCKGUARD(fAMtx);
TXSockBuf *b = PopUpSpare(len);
if (!b) {
Error("ProcessUnsolicitedMsg","could allocate spare buffer");
return rc;
}
memcpy(b->fBuf, pdata, len);
b->fLen = len;
fBytesRecv += len;
fAQue.push_back(b);
PostPipe(this);
if (gDebug > 2)
Info("ProcessUnsolicitedMsg","%p: posting semaphore: %p (%d bytes)",
this,&fASem,len);
fASem.Post();
}
break;
case kXPD_feedback:
Info("ProcessUnsolicitedMsg",
"kXPD_feedback treatment not yet implemented");
break;
case kXPD_srvmsg:
{
kXR_int32 opt = 0;
memcpy(&opt, pdata, sizeof(kXR_int32));
opt = net2host(opt);
if (opt == 0 || opt == 1) {
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
} else {
opt = 1;
}
if (opt == 0) {
Printf("| %.*s", len, (char *)pdata);
} else {
Printf(" ");
Printf("| Message from server:");
Printf("| %.*s", len, (char *)pdata);
}
}
break;
case kXPD_errmsg:
Printf(" ");
Printf("| Error condition occured: message from server:");
Printf("| %.*s", len, (char *)pdata);
if (fHandler)
fHandler->HandleError();
else
Error("ProcessUnsolicitedMsg","handler undefined");
break;
case kXPD_msgsid:
{ R__LOCKGUARD(fAMtx);
kXR_int32 cid = 0;
memcpy(&cid, pdata, sizeof(kXR_int32));
cid = net2host(cid);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","found cid: %d", cid);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
TXSockBuf *b = PopUpSpare(len);
if (!b) {
Error("ProcessUnsolicitedMsg","could allocate spare buffer");
return rc;
}
memcpy(b->fBuf, pdata, len);
b->fLen = len;
b->fCid = cid;
fBytesRecv += len;
fAQue.push_back(b);
PostPipe(this);
if (gDebug > 2)
Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
this, cid, &fASem, len);
fASem.Post();
}
break;
default:
Error("ProcessUnsolicitedMsg","unknown action code: %d", acod);
}
return rc;
}
Int_t TXSocket::GetPipeRead()
{
if (fgPipe[0] == -1) {
if (pipe(fgPipe) != 0) {
fgPipe[0] = -1;
::Error("TXSocket::GetPipeRead", "error: errno: %d", errno);
}
}
return fgPipe[0];
}
Int_t TXSocket::PostPipe(TSocket *s)
{
{ R__LOCKGUARD(&TXSocket::fgReadyMtx);
TXSocket::fgReadySock.Add(s);
}
if (fgPipe[1] < 0)
return -1;
Char_t c = 1;
if (write(fgPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
::Error("TXSocket::PostPipe", "can't notify pipe");
return -1;
}
if (gDebug > 2)
::Info("TXSocket::PostPipe", "%s: %p: pipe posted (pending %d)",
fgLoc.Data(), s, TXSocket::fgReadySock.GetSize());
return 0;
}
Int_t TXSocket::CleanPipe(TSocket *s)
{
if (fgPipe[0] < 0)
return -1;
Char_t c = 0;
if (read(fgPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
::Error("TXSocket::CleanPipe", "%s: can't read from pipe", fgLoc.Data());
return -1;
}
R__LOCKGUARD(&TXSocket::fgReadyMtx);
TXSocket::fgReadySock.Remove(s);
if (gDebug > 2)
::Info("TXSocket::CleanPipe", "%s: %p: pipe cleaned (pending %d)",
fgLoc.Data(), s, TXSocket::fgReadySock.GetSize());
return 0;
}
Int_t TXSocket::FlushPipe(TSocket *s)
{
if (fgPipe[0] < 0)
return -1;
R__LOCKGUARD(&TXSocket::fgReadyMtx);
while (TXSocket::fgReadySock.FindObject(s)) {
TXSocket::fgReadySock.Remove(s);
Char_t c = 0;
if (read(fgPipe[0],(void *)&c, sizeof(Char_t)) < 1)
::Warning("TXSocket::FlushPipe", "%s: can't read from pipe", fgLoc.Data());
}
if (gDebug > 0)
::Info("TXSocket::ResetPipe", "%s: %p: pipe flushed", fgLoc.Data(), s);
return 0;
}
Bool_t TXSocket::IsServProofd()
{
if (fConn && (fConn->GetServType() == XrdProofConn::kSTProofd))
return kTRUE;
return kFALSE;
}
Int_t TXSocket::GetInterrupt()
{
if (gDebug > 2)
Info("GetInterrupt","%p: waiting to lock mutex %p", fIMtx);
R__LOCKGUARD(fIMtx);
if (fILev == -1)
Error("GetInterrupt","value is unset (%d) - protocol error",fILev);
return fILev;
}
Int_t TXSocket::Flush()
{
R__LOCKGUARD(fAMtx);
Int_t nf = 0;
if (fAQue.size() > 0) {
Int_t sz = fAQue.size();
list<TXSockBuf *>::iterator i;
for (i = fAQue.begin(); i != fAQue.end(); i++) {
if (*i) {
{ fgSMtx.Lock();
fgSQue.push_back(*i);
fgSMtx.UnLock();
}
fAQue.erase(i);
nf += (*i)->fLen;
}
}
while (sz--)
fASem.TryWait();
fAQue.clear();
}
return nf;
}
Bool_t TXSocket::Create()
{
if (!IsValid()) {
if (gDebug > 0)
Info("Create","not connected: nothing to do");
return kFALSE;
}
Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
while (retriesleft--) {
XPClientRequest reqhdr;
memset( &reqhdr, 0, sizeof(reqhdr));
fConn->SetSID(reqhdr.header.streamid);
if (fMode == 'A') {
reqhdr.header.requestid = kXP_attach;
reqhdr.proof.sid = fSessionID;
} else {
reqhdr.header.requestid = kXP_create;
}
reqhdr.proof.int1 = fLogLevel;
const void *buf = (const void *)(fBuffer.Data());
reqhdr.header.dlen = fBuffer.Length();
if (gDebug >= 2)
Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
if (gDebug > 1)
Info("Create", "creating session of server %s", fUrl.Data());
char *answData = 0;
XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
&answData, "TXSocket::Create");
struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
fBuffer = "";
if (xrsp) {
void *pdata = (void *)(xrsp->GetData());
Int_t len = xrsp->DataLen();
if (len >= (Int_t)sizeof(kXR_int32)) {
kXR_int32 psid = 0;
memcpy(&psid, pdata, sizeof(kXR_int32));
fSessionID = net2host(psid);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
} else {
Error("Create","session ID is undefined!");
}
if (len >= (Int_t)sizeof(kXR_int16)) {
kXR_int16 dver = 0;
memcpy(&dver, pdata, sizeof(kXR_int16));
fRemoteProtocol = net2host(dver);
pdata = (void *)((char *)pdata + sizeof(kXR_int16));
len -= sizeof(kXR_int16);
} else {
Warning("Create","protocol version of the remote PROOF undefined!");
}
if (fRemoteProtocol == 0) {
len += sizeof(kXR_int16);
kXR_int32 dver = 0;
memcpy(&dver, pdata, sizeof(kXR_int32));
fRemoteProtocol = net2host(dver);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
} else {
if (len >= (Int_t)sizeof(kXR_int16)) {
kXR_int16 dver = 0;
memcpy(&dver, pdata, sizeof(kXR_int16));
fXrdProofdVersion = net2host(dver);
pdata = (void *)((char *)pdata + sizeof(kXR_int16));
len -= sizeof(kXR_int16);
} else {
Warning("Create","version of the remote XrdProofdProtocol undefined!");
}
}
if (len > 0) {
char *url = new char[len+1];
memcpy(url, pdata, len);
url[len] = 0;
fBuffer = url;
delete[] url;
}
SafeDelete(xrsp);
if (srvresp)
free(srvresp);
return kTRUE;
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
Info("Create",
"creation/attachment attempt failed: %d attempts left", retriesleft);
}
Error("Create:",
"problems creating or attaching to a remote server (%s)",
((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
return kFALSE;
}
Int_t TXSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
{
TSystem::ResetErrno();
if (!IsValid()) {
Error("SendRaw","not connected: nothing to do");
return -1;
}
fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
: (~kXPD_async & fSendOpt) ;
XPClientRequest Request;
memset( &Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.sendrcv.requestid = kXP_sendmsg;
Request.sendrcv.sid = fSessionID;
Request.sendrcv.opt = fSendOpt;
Request.sendrcv.cid = GetClientID();
Request.sendrcv.dlen = length;
if (gDebug >= 2)
Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
if (xrsp) {
Int_t nsent = length;
fBytesSent += length;
SafeDelete(xrsp);
return nsent;
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
else
Printf("%s: error occured but no message from server", fHost.Data());
}
Error("SendRaw", "%s: problems sending data to server", fHost.Data());
return -1;
}
Bool_t TXSocket::Ping(Bool_t)
{
TSystem::ResetErrno();
if (!IsValid()) {
Error("Ping","not connected: nothing to do");
return kFALSE;
}
kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
XPClientRequest Request;
memset( &Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.sendrcv.requestid = kXP_ping;
Request.sendrcv.sid = fSessionID;
Request.sendrcv.opt = options;
Request.sendrcv.dlen = 0;
char *pans = 0;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
kXR_int32 *pres = (kXR_int32 *) pans;
Bool_t res = kFALSE;
if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
*pres = net2host(*pres);
res = (*pres == 1);
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
SafeDelete(xrsp);
Error("Ping", "problems sending ping to server");
return res;
}
Int_t TXSocket::PickUpReady()
{
fBufCur = 0;
fByteLeft = 0;
fByteCur = 0;
if (gDebug > 2)
Info("PickUpReady","%p: going to sleep", this);
if (!fDontTimeout) {
static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
static Int_t dt = 2000;
Int_t to = timeout;
while (to && !fRDInterrupt) {
if (fASem.Wait(dt) != 0) {
to -= dt;
if (to <= 0) {
Error("PickUpReady","error waiting at semaphore");
return -1;
} else {
if (gDebug > 0)
Info("PickUpReady","%p: got timeout: retring (%d secs)", this, to/1000);
}
} else
break;
}
if (fRDInterrupt) {
Error("PickUpReady","interrupted");
fRDInterrupt = kFALSE;
return -1;
}
} else {
if (fASem.Wait() != 0) {
Error("PickUpReady","error waiting at semaphore");
return -1;
}
}
if (gDebug > 2)
Info("PickUpReady","%p: waken up", this);
R__LOCKGUARD(fAMtx);
if (fAQue.size() <= 0) {
Error("PickUpReady","queue is empty - protocol error ?");
return -1;
}
fBufCur = fAQue.front();
fAQue.pop_front();
if (fBufCur)
fByteLeft = fBufCur->fLen;
if (gDebug > 2)
Info("PickUpReady","%p: got message (%d bytes)", this, (Int_t)(fBufCur ? fBufCur->fLen : 0));
fBytesRecv += fBufCur->fLen;
if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
SetClientID(fBufCur->fCid);
CleanPipe(this);
return 0;
}
TXSockBuf *TXSocket::PopUpSpare(Int_t size)
{
TXSockBuf *buf = 0;
static Int_t nBuf = 0;
R__LOCKGUARD(&fgSMtx);
Int_t maxsz = 0;
if (fgSQue.size() > 0) {
list<TXSockBuf *>::iterator i;
for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
if ((*i) && (*i)->fSiz >= size) {
buf = *i;
if (gDebug > 2)
Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
size, fgSQue.size(), nBuf, buf, buf->fSiz);
fgSQue.erase(i);
return buf;
}
}
buf = fgSQue.front();
buf->Resize(size);
if (gDebug > 2)
Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
size, fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
fgSQue.pop_front();
return buf;
}
char *b = (char *)malloc(size);
if (b)
buf = new TXSockBuf(b, size);
nBuf++;
if (gDebug > 2)
Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
size, fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
return buf;
}
void TXSocket::PushBackSpare()
{
R__LOCKGUARD(&fgSMtx);
if (gDebug > 2)
Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
fBufCur, fBufCur->fSiz, TXSockBuf::BuffMem());
if (TXSockBuf::BuffMem() < TXSockBuf::GetMemMax()) {
fgSQue.push_back(fBufCur);
} else {
delete fBufCur;
}
fBufCur = 0;
fByteCur = 0;
fByteLeft = 0;
}
Int_t TXSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions)
{
if (!buffer || (length <= 0))
return -1;
if (!fBufCur && (PickUpReady() != 0))
return -1;
if (fByteLeft >= length) {
memcpy(buffer, fBufCur->fBuf + fByteCur, length);
fByteCur += length;
if ((fByteLeft -= length) <= 0)
PushBackSpare();
return length;
} else {
memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
Int_t at = fByteLeft;
Int_t tobecopied = length - fByteLeft;
PushBackSpare();
while (tobecopied > 0) {
if (PickUpReady() != 0)
return -1;
Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
fByteCur = ncpy;
if ((fByteLeft -= ncpy) <= 0)
PushBackSpare();
tobecopied -= ncpy;
at += ncpy;
}
}
fBytesRecv += length;
fgBytesRecv += length;
return length;
}
Int_t TXSocket::SendInterrupt(Int_t type)
{
TSystem::ResetErrno();
if (!IsValid()) {
Error("SendInterrupt","not connected: nothing to do");
return -1;
}
XPClientRequest Request;
memset(&Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
if (type == (Int_t) TProof::kShutdownInterrupt)
Request.interrupt.requestid = kXP_destroy;
else
Request.interrupt.requestid = kXP_interrupt;
Request.interrupt.sid = fSessionID;
Request.interrupt.type = type;
Request.interrupt.dlen = 0;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
if (xrsp) {
SafeDelete(xrsp);
return 0;
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
Error("SendInterrupt", "problems sending interrupt to server");
return -1;
}
Int_t TXSocket::Send(const TMessage &mess)
{
TSystem::ResetErrno();
if (!IsValid()) {
Error("Send","not connected: nothing to do");
return -1;
}
if (mess.IsReading()) {
Error("Send", "cannot send a message used for reading");
return -1;
}
SendStreamerInfos(mess);
SendProcessIDs(mess);
mess.SetLength();
if (fCompress > 0 && mess.GetCompressionLevel() == 0)
const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
if (mess.GetCompressionLevel() > 0)
const_cast<TMessage&>(mess).Compress();
char *mbuf = mess.Buffer();
Int_t mlen = mess.Length();
if (mess.CompBuffer()) {
mbuf = mess.CompBuffer();
mlen = mess.CompLength();
}
kXR_int32 fSendOptDefault = fSendOpt;
switch (mess.What()) {
case kPROOF_PROCESS:
fSendOpt |= kXPD_process;
break;
case kPROOF_PROGRESS:
case kPROOF_FEEDBACK:
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_QUERYSUBMITTED:
fSendOpt |= kXPD_querynum;
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_STARTPROCESS:
fSendOpt |= kXPD_startprocess;
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_STOPPROCESS:
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_SETIDLE:
fSendOpt |= kXPD_setidle;
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_LOGFILE:
case kPROOF_LOGDONE:
if (GetClientIDSize() <= 1)
fSendOpt |= kXPD_logmsg;
break;
default:
break;
}
Int_t nsent = SendRaw(mbuf, mlen);
fSendOpt = fSendOptDefault;
if (nsent <= 0)
return nsent;
fBytesSent += nsent;
fgBytesSent += nsent;
return nsent - sizeof(UInt_t);
}
Int_t TXSocket::Recv(TMessage *&mess)
{
TSystem::ResetErrno();
if (!IsValid()) {
mess = 0;
return -1;
}
oncemore:
Int_t n;
UInt_t len;
if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
mess = 0;
return n;
}
len = net2host(len);
char *buf = new char[len+sizeof(UInt_t)];
if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
delete [] buf;
mess = 0;
return n;
}
fBytesRecv += n + sizeof(UInt_t);
fgBytesRecv += n + sizeof(UInt_t);
mess = new TMessage(buf, len+sizeof(UInt_t));
if (RecvStreamerInfos(mess))
goto oncemore;
if (RecvProcessIDs(mess))
goto oncemore;
return n;
}
TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
Long64_t l64, Int_t int3, const char *)
{
TObjString *sout = 0;
XPClientRequest reqhdr;
const void *buf = 0;
char *bout = 0;
char **vout = 0;
memset(&reqhdr, 0, sizeof(reqhdr));
fConn->SetSID(reqhdr.header.streamid);
reqhdr.header.requestid = kXP_admin;
reqhdr.proof.int1 = kind;
reqhdr.proof.int2 = int2;
switch (kind) {
case kQueryROOTVersions:
case kQuerySessions:
case kQueryWorkers:
reqhdr.proof.sid = 0;
reqhdr.header.dlen = 0;
vout = (char **)&bout;
break;
case kCleanupSessions:
reqhdr.proof.int2 = (kXR_int32) kXPD_TopMaster;
reqhdr.proof.sid = fSessionID;
reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
buf = (msg) ? (const void *)msg : buf;
break;
case kQueryLogPaths:
vout = (char **)&bout;
case kSendMsgToUser:
case kGroupProperties:
case kSessionTag:
case kSessionAlias:
reqhdr.proof.sid = fSessionID;
reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
buf = (msg) ? (const void *)msg : buf;
break;
case kROOTVersion:
reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
buf = (msg) ? (const void *)msg : buf;
break;
case kGetWorkers:
reqhdr.proof.sid = fSessionID;
reqhdr.header.dlen = 0;
vout = (char **)&bout;
break;
case kReadBuffer:
reqhdr.header.requestid = kXP_readbuf;
reqhdr.readbuf.ofs = l64;
reqhdr.readbuf.len = int2;
if (int3 > 0 && fXrdProofdVersion < 1003) {
Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
" grep functionality not supported", fXrdProofdVersion);
return sout;
}
reqhdr.readbuf.int1 = int3;
if (!msg || strlen(msg) <= 0) {
Info("SendCoordinator", "kReadBuffer: file path undefined");
return sout;
}
reqhdr.header.dlen = strlen(msg);
buf = (const void *)msg;
vout = (char **)&bout;
break;
default:
Info("SendCoordinator", "unknown message kind: %d", kind);
return sout;
}
XrdClientMessage *xrsp =
fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator");
if (xrsp) {
if (bout && (xrsp->DataLen() > 0))
sout = new TObjString(TString(bout,xrsp->DataLen()));
if (bout)
free(bout);
SafeDelete(xrsp);
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
return sout;
}
void TXSocket::SendUrgent(Int_t type, Int_t int1, Int_t int2)
{
TSystem::ResetErrno();
if (!IsValid()) {
Error("SendUrgent","not connected: nothing to do");
return;
}
XPClientRequest Request;
memset(&Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.proof.requestid = kXP_urgent;
Request.proof.sid = fSessionID;
Request.proof.int1 = type;
Request.proof.int2 = int1;
Request.proof.int3 = int2;
Request.proof.dlen = 0;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
if (xrsp) {
SafeDelete(xrsp);
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
return;
}
void TXSocket::DumpReadySock()
{
R__LOCKGUARD(&fgReadyMtx);
TString buf = Form("%d |", fgReadySock.GetSize());
TIter nxs(&fgReadySock);
TObject *o = 0;
while ((o = nxs()))
buf += Form(" %p",o);
::Info("TXSocket::DumpReadySock", "%s: list content: %s", fgLoc.Data(), buf.Data());
}
void TXSocket::InitEnvs()
{
EnvPutInt(NAME_DEBUG, gEnv->GetValue("XProof.Debug", 0));
if (gEnv->GetValue("XProof.Debug", 0) > 0)
XrdProofdTrace->What = TRACE_REQ;
if (gEnv->GetValue("XProof.Debug", 0) > 1)
XrdProofdTrace->What = TRACE_ALL;
TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
if (allowCO.Length() > 0)
EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
if (denyCO.Length() > 0)
EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
XrdProofConn::SetRetryParam(-1, -1);
Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
Int_t recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
DFLT_RECONNECTTIMEOUT);
EnvPutInt(NAME_RECONNECTTIMEOUT, recoTO);
Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", DFLT_REQUESTTIMEOUT);
EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
Int_t garbCollTh = gEnv->GetValue("XProof.StartGarbageCollectorThread",
DFLT_STARTGARBAGECOLLECTORTHREAD);
EnvPutInt(NAME_STARTGARBAGECOLLECTORTHREAD, garbCollTh);
EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
if (socks4Port > 0) {
if (socks4Host.IsNull())
socks4Host = "127.0.0.1";
EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
EnvPutInt(NAME_SOCKS4PORT, socks4Port);
}
TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
if (autolog.Length() > 0)
gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
TString netrc;
netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
gSystem->Setenv("XrdSecNETRC", netrc.Data());
TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
if (alogfile.Length() > 0)
gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
if (verisrv.Length() > 0)
gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
if (srvpuk.Length() > 0)
gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
if (cadir.Length() > 0)
gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
if (crldir.Length() > 0)
gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
if (crlext.Length() > 0)
gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
if (ucert.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
if (ukey.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
if (upxy.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
if (valid.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
if (deplen.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
if (pxybits.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
if (crlcheck.Length() > 0)
gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
if (delegpxy.Length() > 0)
gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
if (signpxy.Length() > 0)
gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
gROOT->GetVersion());
fgInitDone = kTRUE;
}
TXSockBuf::TXSockBuf(Char_t *bp, Int_t sz, Bool_t own)
{
fBuf = fMem = bp;
fSiz = fLen = sz;
fOwn = own;
fCid = -1;
fgBuffMem += sz;
}
TXSockBuf::~TXSockBuf()
{
if (fOwn && fMem) {
free(fMem);
fgBuffMem -= fSiz;
}
}
void TXSockBuf::Resize(Int_t sz)
{
if (sz > fSiz) {
if ((fMem = (Char_t *)realloc(fMem, sz))) {
fgBuffMem += (sz - fSiz);
fBuf = fMem;
fSiz = sz;
fLen = 0;
}
}
}
Long64_t TXSockBuf::BuffMem()
{
return fgBuffMem;
}
Long64_t TXSockBuf::GetMemMax()
{
return fgMemMax;
}
void TXSockBuf::SetMemMax(Long64_t memmax)
{
fgMemMax = memmax > 0 ? memmax : fgMemMax;
}
Last change: Wed Jun 25 08:55:21 2008
Last generated: 2008-06-25 08:55
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.