44 #include <sys/socket.h> 91 Bool_t TXSocketPingHandler::Notify()
168 Error(
"TXSocket",
"internal pipe is invalid");
190 Error(
"TXSocket",
"fatal error occurred while opening a connection" 205 Error(
"TXSocket",
"create or attach failed (%s)",
273 Info(
"DisconnectSession",
"not connected: nothing to do");
277 Bool_t shutdown = opt && (strchr(opt,
'S') || strchr(opt,
's'));
278 Bool_t all = opt && (strchr(opt,
'A') || strchr(opt,
'a'));
280 if (
id > -1 || all) {
283 memset(&Request, 0,
sizeof(Request) );
293 fConn->
SendReq(&Request, (
const void *)0, 0,
"DisconnectSession");
315 Warning(
"Close",
"could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
318 TXSocket::fgPipe.
Flush(
this);
323 Info(
"Close",
"no connection: nothing to do");
337 if (o.Index(
"#") !=
kNPOS) {
338 o.Remove(0,o.Index(
"#")+1);
339 if (o.Index(
"#") !=
kNPOS) {
340 o.Remove(o.Index(
"#"));
341 sessID = o.IsDigit() ? o.Atoi() : sessID;
376 Error(
"ProcessUnsolicitedMsg",
"%p: async semaphore taken by Close()! Should not be here!",
this);
382 Info(
"ProcessUnsolicitedMsg",
"%p: got empty message: skipping",
this);
387 Info(
"ProcessUnsolicitedMsg",
"%p: got message with status: %d, len: %d bytes (ID: %d)",
395 Info(
"ProcessUnsolicitedMsg",
"%p: got error from underlying connection",
this);
399 Info(
"ProcessUnsolicitedMsg",
"%p: handler undefined or recovery failed",
this);
409 Info(
"ProcessUnsolicitedMsg",
"%p: underlying connection timed out",
this);
425 if ((len = m->
DataLen()) < (
int)
sizeof(kXR_int32)) {
426 Error(
"ProcessUnsolicitedMsg",
"empty or bad-formed message - disabling");
436 memcpy(&acod, m->
GetData(),
sizeof(kXR_int32));
438 Info(
"ProcessUnsolicitedMsg",
"%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
442 void *pdata = (
void *)((
char *)(m->
GetData()) +
sizeof(kXR_int32));
443 len -=
sizeof(kXR_int32);
445 Info(
"ProcessUnsolicitedMsg",
"%p: got action: %d (%d bytes) (ID: %d)",
464 lab = !lab ?
"kXPD_interrupt" : lab;
465 { std::lock_guard<std::recursive_mutex> lock(
fIMtx);
467 memcpy(&ilev, pdata,
sizeof(kXR_int32));
470 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
471 len -=
sizeof(kXR_int32);
476 memcpy(&ifw, pdata,
sizeof(kXR_int32));
479 Info(
"ProcessUnsolicitedMsg",
"%s: forwarding option: %d", lab, ifw);
492 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
503 memcpy(&opt, pdata,
sizeof(kXR_int32));
506 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found opt: %d", opt);
508 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
509 len -=
sizeof(kXR_int32);
513 memcpy(&delay, pdata,
sizeof(kXR_int32));
516 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found delay: %d", delay);
518 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
519 len -=
sizeof(kXR_int32);
528 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
535 kXR_int32 inflate = 1000;
537 memcpy(&inflate, pdata,
sizeof(kXR_int32));
540 Info(
"ProcessUnsolicitedMsg",
"kXPD_inflate: factor: %d", inflate);
542 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
543 len -=
sizeof(kXR_int32);
551 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
558 kXR_int32 priority = -1;
560 memcpy(&priority, pdata,
sizeof(kXR_int32));
563 Info(
"ProcessUnsolicitedMsg",
"kXPD_priority: priority: %d", priority);
565 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
566 len -=
sizeof(kXR_int32);
574 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
587 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
597 memcpy(&type, pdata,
sizeof(kXR_int32));
600 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found type: %d", type);
602 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
603 len -=
sizeof(kXR_int32);
608 memcpy(&int1, pdata,
sizeof(kXR_int32));
611 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int1: %d", int1);
613 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
614 len -=
sizeof(kXR_int32);
619 memcpy(&int2, pdata,
sizeof(kXR_int32));
622 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int2: %d", int2);
624 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
625 len -=
sizeof(kXR_int32);
634 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
640 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
645 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
648 memcpy(b->
fBuf, pdata, len);
662 Info(
"ProcessUnsolicitedMsg",
"%p: %s: posting semaphore: %p (%d bytes)",
669 Info(
"ProcessUnsolicitedMsg",
670 "kXPD_feedback treatment not yet implemented");
678 memcpy(&opt, pdata,
sizeof(kXR_int32));
680 if (opt >= 0 && opt <= 4) {
682 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
683 len -=
sizeof(kXR_int32);
690 Printf(
"| %.*s", len, (
char *)pdata);
691 }
else if (opt == 2) {
693 Printf(
"%.*s", len, (
char *)pdata);
694 }
else if (opt == 3) {
696 fprintf(stderr,
"%.*s", len, (
char *)pdata);
697 }
else if (opt == 4) {
699 fprintf(stderr,
"%.*s\r", len, (
char *)pdata);
703 Printf(
"| Message from server:");
704 Printf(
"| %.*s", len, (
char *)pdata);
712 Printf(
"| Error condition occured: message from server:");
713 Printf(
"| %.*s", len, (
char *)pdata);
719 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
724 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
728 memcpy(&cid, pdata,
sizeof(kXR_int32));
732 Info(
"ProcessUnsolicitedMsg",
"found cid: %d", cid);
735 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
736 len -=
sizeof(kXR_int32);
741 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
744 memcpy(b->
fBuf, pdata, len);
761 Info(
"ProcessUnsolicitedMsg",
"%p: cid: %d, posting semaphore: %p (%d bytes)",
762 this, cid, &
fASem, len);
771 if (what.BeginsWith(
"idle-timeout")) {
776 Printf(
"| %s", what.Data());
781 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
800 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
803 memcpy(&nsess, pdata,
sizeof(kXR_int32));
805 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
806 len -=
sizeof(kXR_int32);
808 memcpy(&nacti, pdata,
sizeof(kXR_int32));
810 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
811 len -=
sizeof(kXR_int32);
813 memcpy(&neffs, pdata,
sizeof(kXR_int32));
815 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
816 len -=
sizeof(kXR_int32);
819 Info(
"ProcessUnsolicitedMsg",
"kXPD_clusterinfo: # sessions: %d," 820 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
827 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
831 Error(
"ProcessUnsolicitedMsg",
"%p: unknown action code: %d received from '%s' - disabling",
854 if (msg && strlen(msg) > 0)
870 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
875 Error(
"PostMsg",
"could allocate spare buffer");
880 memcpy(b->
fBuf, mbuf, mlen);
894 Info(
"PostMsg",
"%p: posting type %d to semaphore: %p (%d bytes)",
895 this, type, &
fASem, mlen);
907 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
975 Info(
"GetInterrupt",
"%p: waiting to lock mutex",
this);
977 std::lock_guard<std::recursive_mutex> lock(
fIMtx);
985 Error(
"GetInterrupt",
"value is unset (%d) - protocol error",
fILev);
1007 list<TXSockBuf *> splist;
1008 list<TXSockBuf *>::iterator i;
1010 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1013 if (
fAQue.size() > 0) {
1020 splist.push_back(*i);
1029 Printf(
"Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
1036 { std::lock_guard<std::mutex> lock(
fgSMtx);
1037 if (splist.size() > 0) {
1038 for (i = splist.begin(); i != splist.end();) {
1040 i = splist.erase(i);
1058 Info(
"Create",
"not connected: nothing to do");
1064 while (retriesleft--) {
1069 memset( &reqhdr, 0,
sizeof(reqhdr));
1073 if (
fMode ==
'A' || attach) {
1084 const void *buf = (
const void *)(
fBuffer.Data());
1087 Info(
"Create",
"sending %d bytes to server", reqhdr.
header.dlen);
1091 Info(
"Create",
"creating session of server %s",
fUrl.Data());
1096 &answData,
"TXSocket::Create", 0);
1097 struct ServerResponseBody_Protocol *srvresp = (
struct ServerResponseBody_Protocol *)answData;
1105 void *pdata = (
void *)(xrsp->
GetData());
1108 if (len >= (
Int_t)
sizeof(kXR_int32)) {
1111 memcpy(&psid, pdata,
sizeof(kXR_int32));
1113 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1114 len -=
sizeof(kXR_int32);
1116 Error(
"Create",
"session ID is undefined!");
1118 if (srvresp)
free(srvresp);
1122 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1125 memcpy(&dver, pdata,
sizeof(kXR_int16));
1127 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1128 len -=
sizeof(kXR_int16);
1130 Warning(
"Create",
"protocol version of the remote PROOF undefined!");
1135 len +=
sizeof(kXR_int16);
1137 memcpy(&dver, pdata,
sizeof(kXR_int32));
1139 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1140 len -=
sizeof(kXR_int32);
1142 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1145 memcpy(&dver, pdata,
sizeof(kXR_int16));
1147 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1148 len -=
sizeof(kXR_int16);
1150 Warning(
"Create",
"version of the remote XrdProofdProtocol undefined!");
1156 char *url =
new char[len+1];
1157 memcpy(url, pdata, len);
1165 if (srvresp)
free(srvresp);
1172 if (retriesleft <= 0 && fConn->GetLastErr()) {
1180 if (srvresp)
free(srvresp);
1186 if ((ilog = emsg.Index(
"|log:")) !=
kNPOS) emsg.Remove(ilog);
1193 Info(
"Create",
"creation/attachment attempt failed: %d attempts left", retriesleft);
1194 if (retriesleft <= 0)
1195 Error(
"Create",
"%d creation/attachment attempts failed: no attempts left",
1198 if (srvresp)
free(srvresp);
1207 "problems creating or attaching to a remote server (%s)",
1228 memset( &Request, 0,
sizeof(Request) );
1243 Int_t nsent = length;
1261 Printf(
"%s: error occured but no message from server",
fHost.Data());
1265 Error(
"SendRaw",
"%s: problems sending %d bytes to server",
1266 fHost.Data(), length);
1280 Info(
"Ping",
"%p: %s: sid: %d",
this, ord ? ord :
"int",
fSessionID);
1284 Error(
"Ping",
"not connected: nothing to do");
1293 memset( &Request, 0,
sizeof(Request) );
1305 fConn->
SendReq(&Request, (
const void *)0, &pans,
"Ping");
1306 kXR_int32 *pres = (kXR_int32 *) pans;
1322 if (pans)
free(pans);
1329 Error(
"Ping",
"%p: int: problems marshalling request",
this);
1335 Error(
"Ping",
"%p: %s: problems sending ping to server",
this, ord ? ord :
"int");
1337 Info(
"Ping",
"%p: %s: sid: %d OK",
this, ord ? ord :
"int",
fSessionID);
1352 Info(
"RemoteTouch",
"%p: sending touch request to %s",
this,
GetName());
1356 Error(
"RemoteTouch",
"not connected: nothing to do");
1362 memset( &Request, 0,
sizeof(Request) );
1371 Error(
"Touch",
"%p: problems marshalling request ",
this);
1375 Error(
"Touch",
"%p: problems sending touch request to server",
this);
1390 Info(
"CtrlC",
"%p: sending ctrl-c request to %s",
this,
GetName());
1394 Error(
"CtrlC",
"not connected: nothing to do");
1400 memset( &Request, 0,
sizeof(Request) );
1408 Error(
"CtrlC",
"%p: problems marshalling request ",
this);
1412 Error(
"CtrlC",
"%p: problems sending ctrl-c request to server",
this);
1427 Info(
"PickUpReady",
"%p: %s: going to sleep",
this,
GetTitle());
1432 static Int_t dt = 2000;
1440 Error(
"PickUpReady",
"error waiting at semaphore");
1444 Info(
"PickUpReady",
"%p: %s: got timeout: retring (%d secs)",
1454 Info(
"PickUpReady",
"interrupted");
1463 Error(
"PickUpReady",
"error waiting at semaphore");
1470 Info(
"PickUpReady",
"%p: %s: waken up",
this,
GetTitle());
1472 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1475 if (
fAQue.size() <= 0) {
1476 Error(
"PickUpReady",
"queue is empty - protocol error ?");
1480 Error(
"PickUpReady",
"got invalid buffer - protocol error ?");
1490 Info(
"PickUpReady",
"%p: %s: got message (%d bytes)",
1515 static Int_t nBuf = 0;
1517 std::lock_guard<std::mutex> lock(
fgSMtx);
1521 list<TXSockBuf *>::iterator i;
1523 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1524 if ((*i) && (*i)->fSiz >= size) {
1527 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1528 size, (
int)
fgSQue.size(), nBuf, buf, buf->
fSiz);
1538 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1539 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1550 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1551 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1562 std::lock_guard<std::mutex> lock(
fgSMtx);
1565 Info(
"PushBackSpare",
"release buf %p, sz: %d (BuffMem: %lld)",
1584 if (!buffer || (length <= 0))
1607 while (tobecopied > 0) {
1612 Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1615 if ((fByteLeft -= ncpy) <= 0)
1644 memset(&Request, 0,
sizeof(Request) );
1656 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendInterrupt");
1671 Error(
"SendInterrupt",
"problems sending interrupt to server");
1679 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1694 Error(
"Send",
"cannot send a message used for reading");
1707 const_cast<TMessage&>(mess).SetCompressionSettings(
fCompress);
1710 const_cast<TMessage&>(mess).Compress();
1712 char *mbuf = mess.
Buffer();
1720 kXR_int32 fSendOptDefault =
fSendOpt;
1721 switch (mess.
What()) {
1754 Info(
"Send",
"sending type %d (%d bytes) to '%s'", mess.
What(), mlen,
GetTitle());
1765 return nsent -
sizeof(
UInt_t);
1792 char *buf =
new char[len+
sizeof(
UInt_t)];
1814 mess->SetWhat(mess->What() & ~
kMESS_ACK);
1832 const void *buf = 0;
1835 memset(&reqhdr, 0,
sizeof(reqhdr));
1847 vout = (
char **)&bout;
1854 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1855 buf = (msg) ? (
const void *)msg : buf;
1862 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1863 buf = (msg) ? (
const void *)msg : buf;
1864 vout = (
char **)&bout;
1867 vout = (
char **)&bout;
1875 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1876 buf = (msg) ? (
const void *)msg : buf;
1879 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1880 buf = (msg) ? (
const void *)msg : buf;
1884 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1886 buf = (
const void *)msg;
1887 vout = (
char **)&bout;
1894 Info(
"SendCoordinator",
"kReadBuffer: old server (ver %d < 1003):" 1899 if (!msg || strlen(msg) <= 0) {
1900 Info(
"SendCoordinator",
"kReadBuffer: file path undefined");
1903 reqhdr.
header.dlen = strlen(msg);
1904 buf = (
const void *)msg;
1905 vout = (
char **)&bout;
1908 Info(
"SendCoordinator",
"unknown message kind: %d", kind);
1915 fConn->
SendReq(&reqhdr, buf, vout,
"TXSocket::SendCoordinator", noterr);
1920 if (bout && (xrsp->
DataLen() > 0))
1948 memset(&Request, 0,
sizeof(Request) );
1959 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendUrgent");
1997 const char *cenv = 0;
2000 TString allowCO =
gEnv->
GetValue(
"XProof.ConnectDomainAllowRE",
"");
2001 if (allowCO.Length() > 0)
2005 TString denyCO =
gEnv->
GetValue(
"XProof.ConnectDomainDenyRE",
"");
2006 if (denyCO.Length() > 0)
2034 TString socks4Host =
gEnv->
GetValue(
"XNet.SOCKS4Host",
"");
2036 if (socks4Port > 0) {
2037 if (socks4Host.IsNull())
2039 socks4Host =
"127.0.0.1";
2045 TString autolog =
gEnv->
GetValue(
"XSec.Pwd.AutoLogin",
"1");
2046 if (autolog.Length() > 0 &&
2047 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2055 TString alogfile =
gEnv->
GetValue(
"XSec.Pwd.ALogFile",
"");
2056 if (alogfile.Length() > 0)
2059 TString verisrv =
gEnv->
GetValue(
"XSec.Pwd.VerifySrv",
"1");
2060 if (verisrv.Length() > 0 &&
2061 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2064 TString srvpuk =
gEnv->
GetValue(
"XSec.Pwd.ServerPuk",
"");
2065 if (srvpuk.Length() > 0)
2070 if (cadir.Length() > 0)
2074 if (crldir.Length() > 0)
2077 TString crlext =
gEnv->
GetValue(
"XSec.GSI.CRLextension",
"");
2078 if (crlext.Length() > 0)
2081 TString ucert =
gEnv->
GetValue(
"XSec.GSI.UserCert",
"");
2082 if (ucert.Length() > 0)
2086 if (ukey.Length() > 0)
2089 TString upxy =
gEnv->
GetValue(
"XSec.GSI.UserProxy",
"");
2090 if (upxy.Length() > 0)
2093 TString valid =
gEnv->
GetValue(
"XSec.GSI.ProxyValid",
"");
2094 if (valid.Length() > 0)
2097 TString deplen =
gEnv->
GetValue(
"XSec.GSI.ProxyForward",
"0");
2098 if (deplen.Length() > 0 &&
2099 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2102 TString pxybits =
gEnv->
GetValue(
"XSec.GSI.ProxyKeyBits",
"");
2103 if (pxybits.Length() > 0)
2106 TString crlcheck =
gEnv->
GetValue(
"XSec.GSI.CheckCRL",
"1");
2107 if (crlcheck.Length() > 0 &&
2108 (!(cenv =
gSystem->
Getenv(
"XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2111 TString delegpxy =
gEnv->
GetValue(
"XSec.GSI.DelegProxy",
"0");
2112 if (delegpxy.Length() > 0 &&
2113 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2116 TString signpxy =
gEnv->
GetValue(
"XSec.GSI.SignProxy",
"1");
2117 if (signpxy.Length() > 0 &&
2118 (!(cenv =
gSystem->
Getenv(
"XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2123 ::
Info(
"TXSocket",
"(C) 2005 CERN TXSocket (XPROOF client) %s",
2124 gROOT->GetVersion());
2136 Info(
"Reconnect",
"%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2143 if (tryreconnect == 0)
2144 Info(
"Reconnect",
"%p: reconnection attempts explicitly disabled!",
this);
2146 Info(
"Reconnect",
"%p: server does not support reconnections (protocol: %d < 1005)",
2161 Error(
"TXSocket",
"create or attach failed (%s)",
2172 Info(
"Reconnect",
"%p (c:%p): attempt %s (logid: %d)",
this,
fConn,
2176 Info(
"Reconnect",
"%p (c:0x0): attempt failed",
this);
2214 fgBuffMem += (sz - fSiz);
2248 fgMemMax = memmax > 0 ? memmax : fgMemMax;
2262 if (pipe(
fPipe) != 0) {
2263 Printf(
"TXSockPipe: problem initializing pipe for socket inputs");
2286 if (!
IsValid() || !s)
return -1;
2290 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2296 if (write(
fPipe[1],(
const void *)&c,
sizeof(
Char_t)) < 1) {
2297 Printf(
"TXSockPipe::Post: %s: can't notify pipe",
fLoc.Data());
2304 Printf(
"TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2316 if (!
IsValid() || !s)
return -1;
2321 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2322 if (read(
fPipe[0],(
void *)&c,
sizeof(
Char_t)) < 1) {
2323 Printf(
"TXSockPipe::Clean: %s: can't read from pipe",
fLoc.Data());
2333 Printf(
"TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2347 if (!
IsValid() || !s)
return -1;
2351 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2360 if (read(
fPipe[0],(
void *)&c,
sizeof(
Char_t)) < 1)
2361 Printf(
"TXSockPipe::Flush: %s: can't read from pipe",
fLoc.Data());
2369 Printf(
"TXSockPipe::Flush: %s: %p: pipe flushed",
fLoc.Data(),
s);
2380 std::lock_guard<std::recursive_mutex> lock(
fMutex);
2386 buf +=
Form(
" %p",o);
2387 Printf(
"TXSockPipe::DumpReadySock: %s: list content: %s",
fLoc.Data(), buf.Data());
2395 std::lock_guard<std::recursive_mutex> lock(
fMutex);
virtual const char * GetName() const
Returns name of object.
Int_t GetOpenError() const
Getter for last error.
static void SetLocation(const char *loc="")
Set location string.
virtual int GetPid()
Get process id.
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual void SetClientID(Int_t)
Int_t GetCompressionLevel() const
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
char * CompBuffer() const
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
TXSocket(const char *url, Char_t mode='M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
void SetLoc(const char *loc="")
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Collectable string class.
virtual void Close(Option_t *opt="")
Close connection.
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
std::recursive_mutex fMutex
static Long64_t BuffMem()
Return the currently allocated memory.
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
struct XPClientInterruptRequest interrupt
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
std::recursive_mutex fIMtx
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
#define NAME_KEEPSOCKOPENIFNOTXRD
XrdOucTrace * XrdProofdTrace
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual ~TXSocket()
Destructor.
void SetInterrupt(Bool_t i=kTRUE)
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
#define NAME_FIRSTCONNECTMAXCNT
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
TXSocket * GetLastReady()
Return last ready socket.
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Int_t GetServType() const
Getter for server type.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
static Long64_t fgBuffMem
void SetInterrupt()
Interrupt the underlying socket.
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
struct ClientRequestHdr header
const char * GetHost() const
UShort_t net2host(UShort_t x)
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
static void InitEnvs()
Init environment variables for XrdClient.
virtual Bool_t Notify()
This method must be overridden to handle object notification.
const char *const kPROOF_WorkerIdleTO
static std::list< TXSockBuf * > fgSQue
Int_t Send(const TMessage &mess)
Send a TMessage object.
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
const char * GetLastErr()
virtual const char * Getenv(const char *env)
Get environment variable.
#define EnvPutInt(name, val)
#define DFLT_RECONNECTWAIT
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
#define NAME_RECONNECTWAIT
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
struct XPClientSendRcvRequest sendrcv
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Int_t Flush()
Flush the asynchronous queue.
virtual Int_t GetClientID() const
#define NAME_CONNECTDOMAINALLOW_RE
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
virtual void Setenv(const char *name, const char *value)
Set environment variable.
virtual Bool_t HandleInput(const void *in=0)
Handler of asynchronous input events.
void SetAWait(Bool_t w=kTRUE)
R__EXTERN TSystem * gSystem
Handler of asynchronous events for XProofD sockets.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
High level handler of connections to XProofD.
Int_t GetLowSocket() const
#define EnvPutString(name, val)
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
char * Form(const char *fmt,...)
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
std::recursive_mutex fAMtx
std::list< TXSockBuf * > fAQue
Int_t Post()
Increment the value of the semaphore.
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
virtual ~TXSockPipe()
Destructor.
virtual Int_t Reconnect()
Try reconnection after failure.
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
static XrdSysError eDest(0, "Proofx")
#define NAME_CONNECTDOMAINDENY_RE
int GetLowSocket()
Return the socket descriptor of the underlying connection.
#define NAME_CONNECTTIMEOUT
unsigned long long ULong64_t
static XrdSysLogger eLogger
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
static constexpr double s
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
void forward(const LAYERDATA &prevLayerData, LAYERDATA &currLayerData)
apply the weights (and functions) in forward direction of the DNN
TXSockPipe(const char *loc="")
Constructor.
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
Int_t GetCompressionLevel() const
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
void DumpReadySock()
Dump content of the ready socket list.
Mother of all ROOT objects.
static ULong64_t fgBytesSent
#define NAME_REQUESTTIMEOUT
void SetLength() const
Set the message length at the beginning of the message buffer.
void PushBackSpare()
Release read buffer giving back to the spare list.
Bool_t IsValid() const
Getter for validity status.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
virtual void Add(TObject *obj)
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
static ULong64_t fgBytesRecv
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
short GetSessionID() const
bool IsValid() const
Test validity of this connection.
static void ResetErrno()
Static function resetting system error number.
virtual Int_t GetClientIDSize() const
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
bool MatchStreamid(short sid)
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
void Resize(Int_t sz)
resize socket buffer
void CtrlC()
Interrupt the remote protocol instance.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Int_t GetSessionID() const
Getter for session ID.
virtual const char * GetTitle() const
Returns title of object.
Int_t GetLogConnID() const
Getter for logical connection ID.
static Long64_t GetMemMax()
Return the max allocated memory allowed.