38 #include "XrdClient/XrdClientConst.hh"
39 #include "XrdClient/XrdClientEnv.hh"
44 #include <sys/socket.h>
93 fSocket->Ping(
"ping handler");
103 TString TXSocket::fgLoc =
"undef";
109 Long64_t TXSockBuf::fgMemMax = 10485760;
129 :
TSocket(), fMode(m), fLogLevel(loglevel),
130 fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131 fDontTimeout(
kFALSE), fRDInterrupt(
kFALSE), fXrdProofdVersion(-1)
135 eDest.logger(&eLogger);
137 XrdProofdTrace =
new XrdOucTrace(&
eDest);
168 Error(
"TXSocket",
"internal pipe is invalid");
190 Error(
"TXSocket",
"fatal error occurred while opening a connection"
205 Error(
"TXSocket",
"create or attach failed (%s)",
273 Info(
"DisconnectSession",
"not connected: nothing to do");
277 Bool_t shutdown = opt && (strchr(opt,
'S') || strchr(opt,
's'));
278 Bool_t all = opt && (strchr(opt,
'A') || strchr(opt,
'a'));
280 if (
id > -1 || all) {
283 memset(&Request, 0,
sizeof(Request) );
293 fConn->
SendReq(&Request, (
const void *)0, 0,
"DisconnectSession");
315 Warning(
"Close",
"could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
318 TXSocket::fgPipe.
Flush(
this);
323 Info(
"Close",
"no connection: nothing to do");
371 UnsolRespProcResult rc = kUNSOL_KEEP;
376 Error(
"ProcessUnsolicitedMsg",
"%p: async semaphore taken by Close()! Should not be here!",
this);
377 return kUNSOL_CONTINUE;
382 Info(
"ProcessUnsolicitedMsg",
"%p: got empty message: skipping",
this);
384 return kUNSOL_CONTINUE;
387 Info(
"ProcessUnsolicitedMsg",
"%p: got message with status: %d, len: %d bytes (ID: %d)",
395 Info(
"ProcessUnsolicitedMsg",
"%p: got error from underlying connection",
this);
399 Info(
"ProcessUnsolicitedMsg",
"%p: handler undefined or recovery failed",
this);
409 Info(
"ProcessUnsolicitedMsg",
"%p: underlying connection timed out",
this);
412 return kUNSOL_CONTINUE;
420 return kUNSOL_CONTINUE;
425 if ((len = m->
DataLen()) < (
int)
sizeof(kXR_int32)) {
426 Error(
"ProcessUnsolicitedMsg",
"empty or bad-formed message - disabling");
436 memcpy(&acod, m->
GetData(),
sizeof(kXR_int32));
438 Info(
"ProcessUnsolicitedMsg",
"%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
442 void *pdata = (
void *)((
char *)(m->
GetData()) +
sizeof(kXR_int32));
443 len -=
sizeof(kXR_int32);
445 Info(
"ProcessUnsolicitedMsg",
"%p: got action: %d (%d bytes) (ID: %d)",
464 lab = !lab ?
"kXPD_interrupt" : lab;
465 { std::lock_guard<std::recursive_mutex> lock(
fIMtx);
467 memcpy(&ilev, pdata,
sizeof(kXR_int32));
470 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
471 len -=
sizeof(kXR_int32);
476 memcpy(&ifw, pdata,
sizeof(kXR_int32));
479 Info(
"ProcessUnsolicitedMsg",
"%s: forwarding option: %d", lab, ifw);
492 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
503 memcpy(&opt, pdata,
sizeof(kXR_int32));
506 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found opt: %d", opt);
508 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
509 len -=
sizeof(kXR_int32);
513 memcpy(&delay, pdata,
sizeof(kXR_int32));
516 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found delay: %d", delay);
518 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
519 len -=
sizeof(kXR_int32);
528 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
535 kXR_int32 inflate = 1000;
537 memcpy(&inflate, pdata,
sizeof(kXR_int32));
540 Info(
"ProcessUnsolicitedMsg",
"kXPD_inflate: factor: %d", inflate);
542 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
543 len -=
sizeof(kXR_int32);
551 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
558 kXR_int32 priority = -1;
560 memcpy(&priority, pdata,
sizeof(kXR_int32));
563 Info(
"ProcessUnsolicitedMsg",
"kXPD_priority: priority: %d", priority);
565 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
566 len -=
sizeof(kXR_int32);
574 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
587 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
597 memcpy(&type, pdata,
sizeof(kXR_int32));
600 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found type: %d", type);
602 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
603 len -=
sizeof(kXR_int32);
608 memcpy(&int1, pdata,
sizeof(kXR_int32));
611 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int1: %d", int1);
613 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
614 len -=
sizeof(kXR_int32);
619 memcpy(&int2, pdata,
sizeof(kXR_int32));
622 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int2: %d", int2);
624 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
625 len -=
sizeof(kXR_int32);
634 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
640 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
645 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
648 memcpy(b->
fBuf, pdata, len);
662 Info(
"ProcessUnsolicitedMsg",
"%p: %s: posting semaphore: %p (%d bytes)",
669 Info(
"ProcessUnsolicitedMsg",
670 "kXPD_feedback treatment not yet implemented");
678 memcpy(&opt, pdata,
sizeof(kXR_int32));
680 if (opt >= 0 && opt <= 4) {
682 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
683 len -=
sizeof(kXR_int32);
690 Printf(
"| %.*s", len, (
char *)pdata);
691 }
else if (opt == 2) {
693 Printf(
"%.*s", len, (
char *)pdata);
694 }
else if (opt == 3) {
696 fprintf(stderr,
"%.*s", len, (
char *)pdata);
697 }
else if (opt == 4) {
699 fprintf(stderr,
"%.*s\r", len, (
char *)pdata);
703 Printf(
"| Message from server:");
704 Printf(
"| %.*s", len, (
char *)pdata);
712 Printf(
"| Error condition occured: message from server:");
713 Printf(
"| %.*s", len, (
char *)pdata);
719 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
724 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
728 memcpy(&cid, pdata,
sizeof(kXR_int32));
732 Info(
"ProcessUnsolicitedMsg",
"found cid: %d", cid);
735 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
736 len -=
sizeof(kXR_int32);
741 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
744 memcpy(b->
fBuf, pdata, len);
761 Info(
"ProcessUnsolicitedMsg",
"%p: cid: %d, posting semaphore: %p (%d bytes)",
762 this, cid, &
fASem, len);
781 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
800 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
803 memcpy(&nsess, pdata,
sizeof(kXR_int32));
805 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
806 len -=
sizeof(kXR_int32);
808 memcpy(&nacti, pdata,
sizeof(kXR_int32));
810 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
811 len -=
sizeof(kXR_int32);
813 memcpy(&neffs, pdata,
sizeof(kXR_int32));
815 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
816 len -=
sizeof(kXR_int32);
819 Info(
"ProcessUnsolicitedMsg",
"kXPD_clusterinfo: # sessions: %d,"
820 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
827 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
831 Error(
"ProcessUnsolicitedMsg",
"%p: unknown action code: %d received from '%s' - disabling",
854 if (msg && strlen(msg) > 0)
870 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
875 Error(
"PostMsg",
"could allocate spare buffer");
880 memcpy(b->
fBuf, mbuf, mlen);
894 Info(
"PostMsg",
"%p: posting type %d to semaphore: %p (%d bytes)",
895 this, type, &
fASem, mlen);
907 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
936 Info(
"GetInterrupt",
"%p: waiting to lock mutex",
this);
938 std::lock_guard<std::recursive_mutex> lock(
fIMtx);
946 Error(
"GetInterrupt",
"value is unset (%d) - protocol error",
fILev);
968 list<TXSockBuf *> splist;
969 list<TXSockBuf *>::iterator i;
971 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
974 if (
fAQue.size() > 0) {
981 splist.push_back(*i);
990 Printf(
"Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
997 { std::lock_guard<std::mutex> lock(
fgSMtx);
998 if (splist.size() > 0) {
999 for (i = splist.begin(); i != splist.end();) {
1001 i = splist.erase(i);
1019 Info(
"Create",
"not connected: nothing to do");
1025 while (retriesleft--) {
1030 memset( &reqhdr, 0,
sizeof(reqhdr));
1034 if (
fMode ==
'A' || attach) {
1048 Info(
"Create",
"sending %d bytes to server", reqhdr.
header.dlen);
1052 Info(
"Create",
"creating session of server %s",
fUrl.
Data());
1057 &answData,
"TXSocket::Create", 0);
1058 struct ServerResponseBody_Protocol *srvresp = (
struct ServerResponseBody_Protocol *)answData;
1066 void *pdata = (
void *)(xrsp->
GetData());
1069 if (len >= (
Int_t)
sizeof(kXR_int32)) {
1072 memcpy(&psid, pdata,
sizeof(kXR_int32));
1074 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1075 len -=
sizeof(kXR_int32);
1077 Error(
"Create",
"session ID is undefined!");
1079 if (srvresp)
free(srvresp);
1083 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1086 memcpy(&dver, pdata,
sizeof(kXR_int16));
1088 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1089 len -=
sizeof(kXR_int16);
1091 Warning(
"Create",
"protocol version of the remote PROOF undefined!");
1096 len +=
sizeof(kXR_int16);
1098 memcpy(&dver, pdata,
sizeof(kXR_int32));
1100 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1101 len -=
sizeof(kXR_int32);
1103 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1106 memcpy(&dver, pdata,
sizeof(kXR_int16));
1108 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1109 len -=
sizeof(kXR_int16);
1111 Warning(
"Create",
"version of the remote XrdProofdProtocol undefined!");
1117 char *url =
new char[len+1];
1118 memcpy(url, pdata, len);
1126 if (srvresp)
free(srvresp);
1133 if (retriesleft <= 0 && fConn->GetLastErr()) {
1141 if (srvresp)
free(srvresp);
1147 if ((ilog = emsg.Index(
"|log:")) !=
kNPOS) emsg.
Remove(ilog);
1154 Info(
"Create",
"creation/attachment attempt failed: %d attempts left", retriesleft);
1155 if (retriesleft <= 0)
1156 Error(
"Create",
"%d creation/attachment attempts failed: no attempts left",
1159 if (srvresp)
free(srvresp);
1168 "problems creating or attaching to a remote server (%s)",
1189 memset( &Request, 0,
sizeof(Request) );
1226 Error(
"SendRaw",
"%s: problems sending %d bytes to server",
1241 Info(
"Ping",
"%p: %s: sid: %d",
this, ord ? ord :
"int",
fSessionID);
1245 Error(
"Ping",
"not connected: nothing to do");
1254 memset( &Request, 0,
sizeof(Request) );
1266 fConn->
SendReq(&Request, (
const void *)0, &pans,
"Ping");
1267 kXR_int32 *pres = (kXR_int32 *) pans;
1283 if (pans)
free(pans);
1290 Error(
"Ping",
"%p: int: problems marshalling request",
this);
1296 Error(
"Ping",
"%p: %s: problems sending ping to server",
this, ord ? ord :
"int");
1298 Info(
"Ping",
"%p: %s: sid: %d OK",
this, ord ? ord :
"int",
fSessionID);
1313 Info(
"RemoteTouch",
"%p: sending touch request to %s",
this,
GetName());
1317 Error(
"RemoteTouch",
"not connected: nothing to do");
1323 memset( &Request, 0,
sizeof(Request) );
1332 Error(
"Touch",
"%p: problems marshalling request ",
this);
1336 Error(
"Touch",
"%p: problems sending touch request to server",
this);
1351 Info(
"CtrlC",
"%p: sending ctrl-c request to %s",
this,
GetName());
1355 Error(
"CtrlC",
"not connected: nothing to do");
1361 memset( &Request, 0,
sizeof(Request) );
1369 Error(
"CtrlC",
"%p: problems marshalling request ",
this);
1373 Error(
"CtrlC",
"%p: problems sending ctrl-c request to server",
this);
1388 Info(
"PickUpReady",
"%p: %s: going to sleep",
this,
GetTitle());
1393 static Int_t dt = 2000;
1401 Error(
"PickUpReady",
"error waiting at semaphore");
1405 Info(
"PickUpReady",
"%p: %s: got timeout: retring (%d secs)",
1415 Info(
"PickUpReady",
"interrupted");
1424 Error(
"PickUpReady",
"error waiting at semaphore");
1431 Info(
"PickUpReady",
"%p: %s: waken up",
this,
GetTitle());
1433 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1436 if (
fAQue.size() <= 0) {
1437 Error(
"PickUpReady",
"queue is empty - protocol error ?");
1441 Error(
"PickUpReady",
"got invalid buffer - protocol error ?");
1451 Info(
"PickUpReady",
"%p: %s: got message (%d bytes)",
1476 static Int_t nBuf = 0;
1478 std::lock_guard<std::mutex> lock(
fgSMtx);
1482 list<TXSockBuf *>::iterator i;
1484 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1485 if ((*i) && (*i)->fSiz >= size) {
1488 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1489 size, (
int)
fgSQue.size(), nBuf, buf, buf->
fSiz);
1499 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1500 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1511 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1512 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1523 std::lock_guard<std::mutex> lock(
fgSMtx);
1526 Info(
"PushBackSpare",
"release buf %p, sz: %d (BuffMem: %lld)",
1545 if (!buffer || (length <= 0))
1568 while (tobecopied > 0) {
1573 Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1576 if ((fByteLeft -= ncpy) <= 0)
1605 memset(&Request, 0,
sizeof(Request) );
1617 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendInterrupt");
1632 Error(
"SendInterrupt",
"problems sending interrupt to server");
1645 Error(
"Send",
"cannot send a message used for reading");
1658 const_cast<TMessage&>(mess).SetCompressionSettings(
fCompress);
1661 const_cast<TMessage&>(mess).Compress();
1663 char *mbuf = mess.
Buffer();
1671 kXR_int32 fSendOptDefault =
fSendOpt;
1672 switch (mess.
What()) {
1705 Info(
"Send",
"sending type %d (%d bytes) to '%s'", mess.
What(), mlen,
GetTitle());
1716 return nsent -
sizeof(
UInt_t);
1743 char *buf =
new char[len+
sizeof(
UInt_t)];
1765 mess->SetWhat(mess->What() & ~
kMESS_ACK);
1783 const void *buf = 0;
1786 memset(&reqhdr, 0,
sizeof(reqhdr));
1798 vout = (
char **)&bout;
1805 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1806 buf = (msg) ? (
const void *)msg : buf;
1813 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1814 buf = (msg) ? (
const void *)msg : buf;
1815 vout = (
char **)&bout;
1818 vout = (
char **)&bout;
1826 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1827 buf = (msg) ? (
const void *)msg : buf;
1830 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1831 buf = (msg) ? (
const void *)msg : buf;
1835 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1837 buf = (
const void *)msg;
1838 vout = (
char **)&bout;
1845 Info(
"SendCoordinator",
"kReadBuffer: old server (ver %d < 1003):"
1850 if (!msg || strlen(msg) <= 0) {
1851 Info(
"SendCoordinator",
"kReadBuffer: file path undefined");
1854 reqhdr.
header.dlen = strlen(msg);
1855 buf = (
const void *)msg;
1856 vout = (
char **)&bout;
1859 Info(
"SendCoordinator",
"unknown message kind: %d", kind);
1866 fConn->
SendReq(&reqhdr, buf, vout,
"TXSocket::SendCoordinator", noterr);
1871 if (bout && (xrsp->
DataLen() > 0))
1899 memset(&Request, 0,
sizeof(Request) );
1910 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendUrgent");
1933 EnvPutInt(NAME_DEBUG, deb);
1942 const char *cenv = 0;
1946 if (allowCO.
Length() > 0)
1947 EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.
Data());
1952 EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.
Data());
1957 EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
1959 EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
1963 DFLT_RECONNECTWAIT);
1964 if (recoTO == DFLT_RECONNECTWAIT) {
1967 DFLT_RECONNECTWAIT);
1969 EnvPutInt(NAME_RECONNECTWAIT, recoTO);
1973 EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
1976 EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
1981 if (socks4Port > 0) {
1984 socks4Host =
"127.0.0.1";
1985 EnvPutString(NAME_SOCKS4HOST, socks4Host.
Data());
1986 EnvPutInt(NAME_SOCKS4PORT, socks4Port);
1991 if (autolog.
Length() > 0 &&
1992 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2001 if (alogfile.
Length() > 0)
2005 if (verisrv.
Length() > 0 &&
2006 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2043 if (deplen.
Length() > 0 &&
2044 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2048 if (pxybits.
Length() > 0)
2052 if (crlcheck.
Length() > 0 &&
2053 (!(cenv =
gSystem->
Getenv(
"XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2057 if (delegpxy.
Length() > 0 &&
2058 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2062 if (signpxy.
Length() > 0 &&
2063 (!(cenv =
gSystem->
Getenv(
"XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2068 ::
Info(
"TXSocket",
"(C) 2005 CERN TXSocket (XPROOF client) %s",
2069 gROOT->GetVersion());
2081 Info(
"Reconnect",
"%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2088 if (tryreconnect == 0)
2089 Info(
"Reconnect",
"%p: reconnection attempts explicitly disabled!",
this);
2091 Info(
"Reconnect",
"%p: server does not support reconnections (protocol: %d < 1005)",
2106 Error(
"TXSocket",
"create or attach failed (%s)",
2117 Info(
"Reconnect",
"%p (c:%p): attempt %s (logid: %d)",
this,
fConn,
2121 Info(
"Reconnect",
"%p (c:0x0): attempt failed",
this);
2207 if (pipe(
fPipe) != 0) {
2208 Printf(
"TXSockPipe: problem initializing pipe for socket inputs");
2231 if (!
IsValid() || !s)
return -1;
2235 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2249 Printf(
"TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2261 if (!
IsValid() || !s)
return -1;
2266 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2268 Printf(
"TXSockPipe::Clean: %s: can't read from pipe",
fLoc.
Data());
2278 Printf(
"TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2292 if (!
IsValid() || !s)
return -1;
2296 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2306 Printf(
"TXSockPipe::Flush: %s: can't read from pipe",
fLoc.
Data());
2314 Printf(
"TXSockPipe::Flush: %s: %p: pipe flushed",
fLoc.
Data(), s);
2325 std::lock_guard<std::recursive_mutex> lock(
fMutex);
2331 buf +=
Form(
" %p",o);
2340 std::lock_guard<std::recursive_mutex> lock(
fMutex);
const char * GetHost() const
virtual const char * GetTitle() const
Returns title of object.
double read(const std::string &file_name)
reading
TXSocket(const char *url, Char_t mode= 'M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
static void SetLocation(const char *loc="")
Set location string.
virtual int GetPid()
Get process id.
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual void SetClientID(Int_t)
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
void SetLoc(const char *loc="")
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
Collectable string class.
virtual void Close(Option_t *opt="")
Close connection.
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
std::recursive_mutex fMutex
static Long64_t BuffMem()
Return the currently allocated memory.
virtual Int_t GetClientID() const
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
struct XPClientInterruptRequest interrupt
char * CompBuffer() const
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
std::recursive_mutex fIMtx
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
XrdOucTrace * XrdProofdTrace
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual ~TXSocket()
Destructor.
Int_t GetCompressionLevel() const
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
void SetInterrupt(Bool_t i=kTRUE)
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TXSocket * GetLastReady()
Return last ready socket.
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
static Long64_t fgBuffMem
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
const char * Data() const
struct ClientRequestHdr header
UShort_t net2host(UShort_t x)
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
static void InitEnvs()
Init environment variables for XrdClient.
const char *const kPROOF_WorkerIdleTO
static std::list< TXSockBuf * > fgSQue
Int_t Send(const TMessage &mess)
Send a TMessage object.
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
const char * GetLastErr()
virtual const char * Getenv(const char *env)
Get environment variable.
if(pyself &&pyself!=Py_None)
virtual Int_t GetClientIDSize() const
virtual Bool_t HandleInput(const void *in=0)
Int_t Atoi() const
Return integer value of string.
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
struct XPClientSendRcvRequest sendrcv
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Int_t Flush()
Flush the asynchronous queue.
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Double_t length(const TVector2 &v)
void SetAWait(Bool_t w=kTRUE)
R__EXTERN TSystem * gSystem
Handler of asynchronous events for XProofD sockets.
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
High level handler of connections to XProofD.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
char * Form(const char *fmt,...)
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
std::recursive_mutex fAMtx
std::list< TXSockBuf * > fAQue
virtual const char * GetName() const
Returns name of object.
Int_t Post()
Increment the value of the semaphore.
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
Int_t GetCompressionLevel() const
virtual ~TXSockPipe()
Destructor.
virtual Int_t Reconnect()
Try reconnection after failure.
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
TString & Remove(Ssiz_t pos)
virtual Int_t GetSize() const
bool IsValid() const
Test validity of this connection.
unsigned long long ULong64_t
static XrdSysLogger eLogger
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
TXSockPipe(const char *loc="")
Constructor.
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
void DumpReadySock()
Dump content of the ready socket list.
Mother of all ROOT objects.
static ULong64_t fgBytesSent
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
void PushBackSpare()
Release read buffer giving back to the spare list.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
virtual void Add(TObject *obj)
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
static ULong64_t fgBytesRecv
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
static void ResetErrno()
Static function resetting system error number.
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
bool MatchStreamid(short sid)
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
void Resize(Int_t sz)
resize socket buffer
void CtrlC()
Interrupt the remote protocol instance.
static Long64_t GetMemMax()
Return the max allocated memory allowed.
void SetLength() const
Set the message length at the beginning of the message buffer.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
static XrdSysError eDest(0,"Proofx")