38 #include "XrdClient/XrdClientConst.hh"
39 #include "XrdClient/XrdClientEnv.hh"
44 #include <sys/socket.h>
93 fSocket->Ping(
"ping handler");
103 TString TXSocket::fgLoc =
"undef";
109 Long64_t TXSockBuf::fgMemMax = 10485760;
129 :
TSocket(), fMode(m), fLogLevel(loglevel),
130 fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131 fDontTimeout(
kFALSE), fRDInterrupt(
kFALSE), fXrdProofdVersion(-1)
135 eDest.logger(&eLogger);
137 XrdProofdTrace =
new XrdOucTrace(&
eDest);
145 Error(
"TXSocket",
"problems initializing mutex for async queue");
152 Error(
"TXSocket",
"problems initializing mutex for interrupts");
176 Error(
"TXSocket",
"internal pipe is invalid");
198 Error(
"TXSocket",
"fatal error occurred while opening a connection"
213 Error(
"TXSocket",
"create or attach failed (%s)",
285 Info(
"DisconnectSession",
"not connected: nothing to do");
289 Bool_t shutdown = opt && (strchr(opt,
'S') || strchr(opt,
's'));
290 Bool_t all = opt && (strchr(opt,
'A') || strchr(opt,
'a'));
292 if (
id > -1 || all) {
295 memset(&Request, 0,
sizeof(Request) );
305 fConn->
SendReq(&Request, (
const void *)0, 0,
"DisconnectSession");
327 Warning(
"Close",
"could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
330 TXSocket::fgPipe.
Flush(
this);
335 Info(
"Close",
"no connection: nothing to do");
383 UnsolRespProcResult rc = kUNSOL_KEEP;
388 Error(
"ProcessUnsolicitedMsg",
"%p: async semaphore taken by Close()! Should not be here!",
this);
389 return kUNSOL_CONTINUE;
394 Info(
"ProcessUnsolicitedMsg",
"%p: got empty message: skipping",
this);
396 return kUNSOL_CONTINUE;
399 Info(
"ProcessUnsolicitedMsg",
"%p: got message with status: %d, len: %d bytes (ID: %d)",
407 Info(
"ProcessUnsolicitedMsg",
"%p: got error from underlying connection",
this);
411 Info(
"ProcessUnsolicitedMsg",
"%p: handler undefined or recovery failed",
this);
421 Info(
"ProcessUnsolicitedMsg",
"%p: underlying connection timed out",
this);
424 return kUNSOL_CONTINUE;
432 return kUNSOL_CONTINUE;
437 if ((len = m->
DataLen()) < (
int)
sizeof(kXR_int32)) {
438 Error(
"ProcessUnsolicitedMsg",
"empty or bad-formed message - disabling");
448 memcpy(&acod, m->
GetData(),
sizeof(kXR_int32));
450 Info(
"ProcessUnsolicitedMsg",
"%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
454 void *pdata = (
void *)((
char *)(m->
GetData()) +
sizeof(kXR_int32));
455 len -=
sizeof(kXR_int32);
457 Info(
"ProcessUnsolicitedMsg",
"%p: got action: %d (%d bytes) (ID: %d)",
476 lab = !lab ?
"kXPD_interrupt" : lab;
479 memcpy(&ilev, pdata,
sizeof(kXR_int32));
482 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
483 len -=
sizeof(kXR_int32);
488 memcpy(&ifw, pdata,
sizeof(kXR_int32));
491 Info(
"ProcessUnsolicitedMsg",
"%s: forwarding option: %d", lab, ifw);
504 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
515 memcpy(&opt, pdata,
sizeof(kXR_int32));
518 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found opt: %d", opt);
520 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
521 len -=
sizeof(kXR_int32);
525 memcpy(&delay, pdata,
sizeof(kXR_int32));
528 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found delay: %d", delay);
530 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
531 len -=
sizeof(kXR_int32);
540 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
547 kXR_int32 inflate = 1000;
549 memcpy(&inflate, pdata,
sizeof(kXR_int32));
552 Info(
"ProcessUnsolicitedMsg",
"kXPD_inflate: factor: %d", inflate);
554 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
555 len -=
sizeof(kXR_int32);
563 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
570 kXR_int32 priority = -1;
572 memcpy(&priority, pdata,
sizeof(kXR_int32));
575 Info(
"ProcessUnsolicitedMsg",
"kXPD_priority: priority: %d", priority);
577 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
578 len -=
sizeof(kXR_int32);
586 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
599 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
609 memcpy(&type, pdata,
sizeof(kXR_int32));
612 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found type: %d", type);
614 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
615 len -=
sizeof(kXR_int32);
620 memcpy(&int1, pdata,
sizeof(kXR_int32));
623 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int1: %d", int1);
625 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
626 len -=
sizeof(kXR_int32);
631 memcpy(&int2, pdata,
sizeof(kXR_int32));
634 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int2: %d", int2);
636 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
637 len -=
sizeof(kXR_int32);
646 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
657 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
660 memcpy(b->
fBuf, pdata, len);
674 Info(
"ProcessUnsolicitedMsg",
"%p: %s: posting semaphore: %p (%d bytes)",
681 Info(
"ProcessUnsolicitedMsg",
682 "kXPD_feedback treatment not yet implemented");
690 memcpy(&opt, pdata,
sizeof(kXR_int32));
692 if (opt >= 0 && opt <= 4) {
694 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
695 len -=
sizeof(kXR_int32);
702 Printf(
"| %.*s", len, (
char *)pdata);
703 }
else if (opt == 2) {
705 Printf(
"%.*s", len, (
char *)pdata);
706 }
else if (opt == 3) {
708 fprintf(stderr,
"%.*s", len, (
char *)pdata);
709 }
else if (opt == 4) {
711 fprintf(stderr,
"%.*s\r", len, (
char *)pdata);
715 Printf(
"| Message from server:");
716 Printf(
"| %.*s", len, (
char *)pdata);
724 Printf(
"| Error condition occured: message from server:");
725 Printf(
"| %.*s", len, (
char *)pdata);
731 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
740 memcpy(&cid, pdata,
sizeof(kXR_int32));
744 Info(
"ProcessUnsolicitedMsg",
"found cid: %d", cid);
747 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
748 len -=
sizeof(kXR_int32);
753 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
756 memcpy(b->
fBuf, pdata, len);
773 Info(
"ProcessUnsolicitedMsg",
"%p: cid: %d, posting semaphore: %p (%d bytes)",
774 this, cid, &
fASem, len);
793 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
812 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
815 memcpy(&nsess, pdata,
sizeof(kXR_int32));
817 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
818 len -=
sizeof(kXR_int32);
820 memcpy(&nacti, pdata,
sizeof(kXR_int32));
822 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
823 len -=
sizeof(kXR_int32);
825 memcpy(&neffs, pdata,
sizeof(kXR_int32));
827 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
828 len -=
sizeof(kXR_int32);
831 Info(
"ProcessUnsolicitedMsg",
"kXPD_clusterinfo: # sessions: %d,"
832 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
839 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
843 Error(
"ProcessUnsolicitedMsg",
"%p: unknown action code: %d received from '%s' - disabling",
866 if (msg && strlen(msg) > 0)
887 Error(
"PostMsg",
"could allocate spare buffer");
892 memcpy(b->
fBuf, mbuf, mlen);
906 Info(
"PostMsg",
"%p: posting type %d to semaphore: %p (%d bytes)",
907 this, type, &
fASem, mlen);
933 Info(
"GetInterrupt",
"%p: waiting to lock mutex %p",
this,
fIMtx);
943 Error(
"GetInterrupt",
"value is unset (%d) - protocol error",
fILev);
965 list<TXSockBuf *> splist;
966 list<TXSockBuf *>::iterator i;
971 if (
fAQue.size() > 0) {
978 splist.push_back(*i);
987 Printf(
"Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
995 if (splist.size() > 0) {
996 for (i = splist.begin(); i != splist.end();) {
1016 Info(
"Create",
"not connected: nothing to do");
1022 while (retriesleft--) {
1027 memset( &reqhdr, 0,
sizeof(reqhdr));
1031 if (
fMode ==
'A' || attach) {
1045 Info(
"Create",
"sending %d bytes to server", reqhdr.
header.dlen);
1049 Info(
"Create",
"creating session of server %s",
fUrl.
Data());
1054 &answData,
"TXSocket::Create", 0);
1055 struct ServerResponseBody_Protocol *srvresp = (
struct ServerResponseBody_Protocol *)answData;
1063 void *pdata = (
void *)(xrsp->
GetData());
1066 if (len >= (
Int_t)
sizeof(kXR_int32)) {
1069 memcpy(&psid, pdata,
sizeof(kXR_int32));
1071 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1072 len -=
sizeof(kXR_int32);
1074 Error(
"Create",
"session ID is undefined!");
1076 if (srvresp)
free(srvresp);
1080 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1083 memcpy(&dver, pdata,
sizeof(kXR_int16));
1085 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1086 len -=
sizeof(kXR_int16);
1088 Warning(
"Create",
"protocol version of the remote PROOF undefined!");
1093 len +=
sizeof(kXR_int16);
1095 memcpy(&dver, pdata,
sizeof(kXR_int32));
1097 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1098 len -=
sizeof(kXR_int32);
1100 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1103 memcpy(&dver, pdata,
sizeof(kXR_int16));
1105 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1106 len -=
sizeof(kXR_int16);
1108 Warning(
"Create",
"version of the remote XrdProofdProtocol undefined!");
1114 char *url =
new char[len+1];
1115 memcpy(url, pdata, len);
1123 if (srvresp)
free(srvresp);
1130 if (retriesleft <= 0 && fConn->GetLastErr()) {
1138 if (srvresp)
free(srvresp);
1144 if ((ilog = emsg.Index(
"|log:")) !=
kNPOS) emsg.
Remove(ilog);
1151 Info(
"Create",
"creation/attachment attempt failed: %d attempts left", retriesleft);
1152 if (retriesleft <= 0)
1153 Error(
"Create",
"%d creation/attachment attempts failed: no attempts left",
1156 if (srvresp)
free(srvresp);
1165 "problems creating or attaching to a remote server (%s)",
1186 memset( &Request, 0,
sizeof(Request) );
1223 Error(
"SendRaw",
"%s: problems sending %d bytes to server",
1238 Info(
"Ping",
"%p: %s: sid: %d",
this, ord ? ord :
"int",
fSessionID);
1242 Error(
"Ping",
"not connected: nothing to do");
1251 memset( &Request, 0,
sizeof(Request) );
1263 fConn->
SendReq(&Request, (
const void *)0, &pans,
"Ping");
1264 kXR_int32 *pres = (kXR_int32 *) pans;
1280 if (pans)
free(pans);
1287 Error(
"Ping",
"%p: int: problems marshalling request",
this);
1293 Error(
"Ping",
"%p: %s: problems sending ping to server",
this, ord ? ord :
"int");
1295 Info(
"Ping",
"%p: %s: sid: %d OK",
this, ord ? ord :
"int",
fSessionID);
1310 Info(
"RemoteTouch",
"%p: sending touch request to %s",
this,
GetName());
1314 Error(
"RemoteTouch",
"not connected: nothing to do");
1320 memset( &Request, 0,
sizeof(Request) );
1329 Error(
"Touch",
"%p: problems marshalling request ",
this);
1333 Error(
"Touch",
"%p: problems sending touch request to server",
this);
1348 Info(
"CtrlC",
"%p: sending ctrl-c request to %s",
this,
GetName());
1352 Error(
"CtrlC",
"not connected: nothing to do");
1358 memset( &Request, 0,
sizeof(Request) );
1366 Error(
"CtrlC",
"%p: problems marshalling request ",
this);
1370 Error(
"CtrlC",
"%p: problems sending ctrl-c request to server",
this);
1385 Info(
"PickUpReady",
"%p: %s: going to sleep",
this,
GetTitle());
1390 static Int_t dt = 2000;
1398 Error(
"PickUpReady",
"error waiting at semaphore");
1402 Info(
"PickUpReady",
"%p: %s: got timeout: retring (%d secs)",
1412 Info(
"PickUpReady",
"interrupted");
1421 Error(
"PickUpReady",
"error waiting at semaphore");
1428 Info(
"PickUpReady",
"%p: %s: waken up",
this,
GetTitle());
1433 if (
fAQue.size() <= 0) {
1434 Error(
"PickUpReady",
"queue is empty - protocol error ?");
1438 Error(
"PickUpReady",
"got invalid buffer - protocol error ?");
1448 Info(
"PickUpReady",
"%p: %s: got message (%d bytes)",
1473 static Int_t nBuf = 0;
1481 list<TXSockBuf *>::iterator i;
1483 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1484 if ((*i) && (*i)->fSiz >= size) {
1487 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1488 size, (
int)
fgSQue.size(), nBuf, buf, buf->
fSiz);
1498 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1499 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1510 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1511 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1525 Info(
"PushBackSpare",
"release buf %p, sz: %d (BuffMem: %lld)",
1544 if (!buffer || (length <= 0))
1567 while (tobecopied > 0) {
1572 Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1575 if ((fByteLeft -= ncpy) <= 0)
1604 memset(&Request, 0,
sizeof(Request) );
1616 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendInterrupt");
1631 Error(
"SendInterrupt",
"problems sending interrupt to server");
1644 Error(
"Send",
"cannot send a message used for reading");
1657 const_cast<TMessage&>(mess).SetCompressionSettings(
fCompress);
1660 const_cast<TMessage&>(mess).Compress();
1662 char *mbuf = mess.
Buffer();
1670 kXR_int32 fSendOptDefault =
fSendOpt;
1671 switch (mess.
What()) {
1704 Info(
"Send",
"sending type %d (%d bytes) to '%s'", mess.
What(), mlen,
GetTitle());
1715 return nsent -
sizeof(
UInt_t);
1742 char *buf =
new char[len+
sizeof(
UInt_t)];
1764 mess->SetWhat(mess->What() & ~
kMESS_ACK);
1782 const void *buf = 0;
1785 memset(&reqhdr, 0,
sizeof(reqhdr));
1797 vout = (
char **)&bout;
1804 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1805 buf = (msg) ? (
const void *)msg : buf;
1812 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1813 buf = (msg) ? (
const void *)msg : buf;
1814 vout = (
char **)&bout;
1817 vout = (
char **)&bout;
1825 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1826 buf = (msg) ? (
const void *)msg : buf;
1829 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1830 buf = (msg) ? (
const void *)msg : buf;
1834 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1836 buf = (
const void *)msg;
1837 vout = (
char **)&bout;
1844 Info(
"SendCoordinator",
"kReadBuffer: old server (ver %d < 1003):"
1849 if (!msg || strlen(msg) <= 0) {
1850 Info(
"SendCoordinator",
"kReadBuffer: file path undefined");
1853 reqhdr.
header.dlen = strlen(msg);
1854 buf = (
const void *)msg;
1855 vout = (
char **)&bout;
1858 Info(
"SendCoordinator",
"unknown message kind: %d", kind);
1865 fConn->
SendReq(&reqhdr, buf, vout,
"TXSocket::SendCoordinator", noterr);
1870 if (bout && (xrsp->
DataLen() > 0))
1898 memset(&Request, 0,
sizeof(Request) );
1909 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendUrgent");
1932 EnvPutInt(NAME_DEBUG, deb);
1941 const char *cenv = 0;
1945 if (allowCO.
Length() > 0)
1946 EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.
Data());
1951 EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.
Data());
1956 EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
1958 EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
1962 DFLT_RECONNECTWAIT);
1963 if (recoTO == DFLT_RECONNECTWAIT) {
1966 DFLT_RECONNECTWAIT);
1968 EnvPutInt(NAME_RECONNECTWAIT, recoTO);
1972 EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
1975 EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
1980 if (socks4Port > 0) {
1983 socks4Host =
"127.0.0.1";
1984 EnvPutString(NAME_SOCKS4HOST, socks4Host.
Data());
1985 EnvPutInt(NAME_SOCKS4PORT, socks4Port);
1990 if (autolog.
Length() > 0 &&
1991 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2000 if (alogfile.
Length() > 0)
2004 if (verisrv.
Length() > 0 &&
2005 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2042 if (deplen.
Length() > 0 &&
2043 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2047 if (pxybits.
Length() > 0)
2051 if (crlcheck.
Length() > 0 &&
2052 (!(cenv =
gSystem->
Getenv(
"XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2056 if (delegpxy.
Length() > 0 &&
2057 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2061 if (signpxy.
Length() > 0 &&
2062 (!(cenv =
gSystem->
Getenv(
"XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2067 ::
Info(
"TXSocket",
"(C) 2005 CERN TXSocket (XPROOF client) %s",
2068 gROOT->GetVersion());
2080 Info(
"Reconnect",
"%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2087 if (tryreconnect == 0)
2088 Info(
"Reconnect",
"%p: reconnection attempts explicitly disabled!",
this);
2090 Info(
"Reconnect",
"%p: server does not support reconnections (protocol: %d < 1005)",
2105 Error(
"TXSocket",
"create or attach failed (%s)",
2116 Info(
"Reconnect",
"%p (c:%p): attempt %s (logid: %d)",
this,
fConn,
2120 Info(
"Reconnect",
"%p (c:0x0): attempt failed",
this);
2206 if (pipe(
fPipe) != 0) {
2207 Printf(
"TXSockPipe: problem initializing pipe for socket inputs");
2230 if (!
IsValid() || !s)
return -1;
2248 Printf(
"TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2260 if (!
IsValid() || !s)
return -1;
2267 Printf(
"TXSockPipe::Clean: %s: can't read from pipe",
fLoc.
Data());
2277 Printf(
"TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2291 if (!
IsValid() || !s)
return -1;
2305 Printf(
"TXSockPipe::Flush: %s: can't read from pipe",
fLoc.
Data());
2313 Printf(
"TXSockPipe::Flush: %s: %p: pipe flushed",
fLoc.
Data(), s);
2330 buf +=
Form(
" %p",o);
const char * GetHost() const
virtual const char * GetTitle() const
Returns title of object.
double read(const std::string &file_name)
reading
TXSocket(const char *url, Char_t mode= 'M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
static void SetLocation(const char *loc="")
Set location string.
virtual int GetPid()
Get process id.
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual void SetClientID(Int_t)
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
void SetLoc(const char *loc="")
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
Collectable string class.
virtual void Close(Option_t *opt="")
Close connection.
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
static Long64_t BuffMem()
Return the currently allocated memory.
virtual Int_t GetClientID() const
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Int_t TryWait()
If semaphore value is > 0 then decrement it and return 0.
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Int_t Wait(Int_t millisec=0)
If semaphore value is > 0 then decrement it and carry on.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
struct XPClientInterruptRequest interrupt
char * CompBuffer() const
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
XrdOucTrace * XrdProofdTrace
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual ~TXSocket()
Destructor.
Int_t GetCompressionLevel() const
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
void SetInterrupt(Bool_t i=kTRUE)
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TXSocket * GetLastReady()
Return last ready socket.
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
static Long64_t fgBuffMem
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
const char * Data() const
struct ClientRequestHdr header
UShort_t net2host(UShort_t x)
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
static void InitEnvs()
Init environment variables for XrdClient.
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
const char *const kPROOF_WorkerIdleTO
static std::list< TXSockBuf * > fgSQue
Int_t Send(const TMessage &mess)
Send a TMessage object.
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
const char * GetLastErr()
virtual const char * Getenv(const char *env)
Get environment variable.
if(pyself &&pyself!=Py_None)
virtual Int_t GetClientIDSize() const
virtual Bool_t HandleInput(const void *in=0)
Int_t Atoi() const
Return integer value of string.
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
struct XPClientSendRcvRequest sendrcv
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Int_t Flush()
Flush the asynchronous queue.
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Double_t length(const TVector2 &v)
void SetAWait(Bool_t w=kTRUE)
R__EXTERN TSystem * gSystem
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
char * Form(const char *fmt,...)
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
std::list< TXSockBuf * > fAQue
virtual const char * GetName() const
Returns name of object.
Int_t Post()
If any threads are blocked in Wait(), wake one of them up and increment the value of the semaphore...
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
Int_t GetCompressionLevel() const
virtual ~TXSockPipe()
Destructor.
virtual Int_t Reconnect()
Try reconnection after failure.
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
TString & Remove(Ssiz_t pos)
virtual Int_t GetSize() const
bool IsValid() const
Test validity of this connection.
unsigned long long ULong64_t
static XrdSysLogger eLogger
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
#define R__LOCKGUARD(mutex)
TXSockPipe(const char *loc="")
Constructor.
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
void DumpReadySock()
Dump content of the ready socket list.
Mother of all ROOT objects.
static ULong64_t fgBytesSent
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
void PushBackSpare()
Release read buffer giving back to the spare list.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
virtual void Add(TObject *obj)
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
static ULong64_t fgBytesRecv
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
static void ResetErrno()
Static function resetting system error number.
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
bool MatchStreamid(short sid)
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
void Resize(Int_t sz)
resize socket buffer
void CtrlC()
Interrupt the remote protocol instance.
static Long64_t GetMemMax()
Return the max allocated memory allowed.
void SetLength() const
Set the message length at the beginning of the message buffer.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
static XrdSysError eDest(0,"Proofx")