28#include "XrdOuc/XrdOucStream.hh" 
   30#include "XrdVersion.hh" 
   31#include "Xrd/XrdBuffer.hh" 
   32#include "Xrd/XrdScheduler.hh" 
   59                                                     "xproofd protocol anchor");
 
   81#define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns)) 
   84#define XPDSETSTRING(n,ns,c,s) \ 
   85 { if (XPDCOND(n,ns)) { \ 
   86     SafeFree(c); c = strdup(s.c_str()); ns = n; }} 
   90#define XPDADOPTSTRING(n,ns,c,s) \ 
   92    XPDSETSTRING(n, ns, t, s); \ 
   93    if (t && strlen(t)) { \ 
  100#define XPDSETINT(n,ns,i,s) \ 
  101 { if (XPDCOND(n,ns)) { \ 
  102     i = strtol(s.c_str(),0,10); ns = n; }} 
  112typedef struct ResetCtrlcGuard {
 
  116   ~ResetCtrlcGuard() { 
if (xpd && 
type != 
kXP_ctrlc) xpd->ResetCtrlC(); }
 
  124   XrdProofdProtCfg(
const char *cfg, 
XrdSysError *edest = 0);
 
  132XrdProofdProtCfg::XrdProofdProtCfg(
const char *cfg, 
XrdSysError *edest)
 
  136   RegisterDirectives();
 
  142void XrdProofdProtCfg::RegisterDirectives()
 
  152                                  char *val, XrdOucStream *cfg, 
bool)
 
  156   XrdOucString port(val);
 
  157   if (
d->fName == 
"xrd.protocol") {
 
  158      port = cfg->GetWord();
 
  159      port.replace(
"xproofd:", 
"");
 
  160   } 
else if (
d->fName != 
"port") {
 
  163   if (port.length() > 0) {
 
  164      fPort = strtol(port.c_str(), 0, 10);
 
  170#if (ROOTXRDVERS >= 300030000) 
  188   return (XrdProtocol *)0;
 
  201         XrdProofdProtCfg pcfg(
pi->ConfigFN, 
pi->eDest);
 
  206         if (pcfg.fPort > 0) {
 
  219   : XrdProtocol(
"xproofd protocol handler"), fProtLink(this)
 
  239   XPDLOC(ALL, 
"Protocol::Response")
 
  255   XPDLOC(ALL, 
"Protocol::GetNewResponse")
 
  265               msg += 
" new capacity: ";
 
  273            msg += 
"; new size: ";
 
  278      TRACE(XERR,
"wrong sid: "<<sid);
 
  293   XPDLOC(ALL, 
"Protocol::Match")
 
  295   struct ClientInitHandShake hsdata;
 
  296   char  *hsbuff = (
char *)&hsdata;
 
  298   static hs_response_t hsresp = {0, 0, kXR_int32(htonl(
XPROOFD_VERSBIN)), 0};
 
  300   XrdProtocol *xp = 
nullptr;
 
  302   TRACE(HDBG, 
"enter");
 
  306   if ((dlen = lp->Peek(hsbuff,
sizeof(hsdata),
fgReadWait)) != 
sizeof(hsdata)) {
 
  307      if (dlen <= 0) lp->setEtext(
"Match: handshake not received");
 
  310         hsdata.first = ntohl(hsdata.first);
 
  311         if (hsdata.first == 8) {
 
  312            emsg = 
"rootd-file serving not supported any-longer";
 
  314         if (emsg.length() > 0) {
 
  315            lp->setEtext(emsg.c_str());
 
  317            lp->setEtext(
"link transfered");
 
  321      TRACE(XERR, 
"peeked incomplete or empty information! (dlen: "<<dlen<<
" bytes)");
 
  326   hsdata.third  = ntohl(hsdata.third);
 
  327   if (dlen != 
sizeof(hsdata) ||  hsdata.first || hsdata.second
 
  328       || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
 
  332         TRACE(ALL, 
"matched xrootd protocol on link: serving a file");
 
  334         TRACE(XERR, 
"failed to match any known or enabled protocol");
 
  340   if (!lp->Send((
char *)&hsresp, 
sizeof(hsresp))) {
 
  341      lp->setEtext(
"Match: handshake failed");
 
  342      TRACE(XERR, 
"handshake failed");
 
  347   int len = 
sizeof(hsdata);
 
  348   if (lp->Recv(hsbuff, len) != len) {
 
  349      lp->setEtext(
"Match: reread failed");
 
  350      TRACE(XERR, 
"reread failed");
 
  362   xpp->
fSecEntity.host = strdup((
char *)lp->Host());
 
  366   if (xpp->
GetData(
"dummy",(
char *)&dum[0],
sizeof(dum)) != 0) {
 
  370   xp = (XrdProtocol *) xpp;
 
  382   static char statfmt[] = 
"<stats id=\"xproofd\"><num>%ld</num></stats>";
 
  386      return sizeof(statfmt)+16;
 
  417   std::vector<XrdProofdResponse *>::iterator ii = 
fResponses.begin(); 
 
  431   XPDLOC(ALL, 
"Protocol::Configure")
 
  481   mp = 
"global manager created";
 
  486              " build "<<XrdVERSION<<
" successfully loaded");
 
  498   XPDLOC(ALL, 
"Protocol::Process")
 
  514   memcpy((
void *)&sid, (
const void *)&(
fRequest.
header.streamid[0]), 2);
 
  518         TRACET(
TraceID(), XERR, 
"could not get Response instance for rid: "<< sid);
 
  533      response->
Send(kXR_ArgInvalid, 
"Process: Invalid request data length");
 
  534      return fLink->setEtext(
"Process: protocol data length error");
 
  542         response->
Send(kXR_ArgTooLong, 
"fRequest.argument is too long");
 
  560   XPDLOC(ALL, 
"Protocol::Process2")
 
  577         response->Send(kXR_InvalidRequest,
"client undefined!!! ");
 
  607            TRACE(XERR, 
"link is undefined! ");
 
  618      TRACE(XERR, 
"link is undefined! ");
 
  629   XPDLOC(ALL, 
"Protocol::Recycle")
 
  631   const char *srvtype[6] = {
"ANY", 
"MasterWorker", 
"MasterMaster",
 
  632                             "ClientMaster", 
"Internal", 
"Admin"};
 
  655         TRACE(REQ,
"External disconnection of protocol associated with pid "<<
fPid);
 
  659         discpath.replace(
"/cid", 
"/disconnected");
 
  660         FILE *fd = fopen(discpath.c_str(), 
"w");
 
  661         if (!fd && errno != ENOENT) {
 
  662            TRACE(XERR, 
"unable to create path: " <<discpath<<
" (errno: "<<errno<<
")");
 
  675               TRACE(REQ, 
"Non-destroyed proofserv processes attached to this protocol ("<<
this<<
 
  676                          "), setting reconnect time");
 
  681            TRACE(XERR, 
"No XrdProofdMgr ("<<
fgMgr<<
") or SessionMgr (" 
  723   XPDLOC(ALL, 
"Protocol::GetBuff")
 
  725   TRACE(HDBG, 
"len: "<<quantum);
 
  730      if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
 
  740   if ((argp = 
fgBPool->Obtain(quantum)) == 0) {
 
  741      TRACE(XERR, 
"could not get requested buffer (size: "<<quantum<<
 
  742                  ") = insufficient memory");
 
  744      TRACE(HDBG, 
"quantum: "<<quantum<<
 
  745                  ", buff: "<<(
void *)(argp->buff)<<
", bsize:"<<argp->bsize);
 
  766   XPDLOC(ALL, 
"Protocol::GetData")
 
  772   TRACET(
TraceID(), HDBG, 
"dtype: "<<(dtype ? dtype : 
" - ")<<
", blen: "<<blen);
 
  777      if (rlen != -ENOMSG && rlen != -ECONNRESET) {
 
  778         XrdOucString emsg = 
"link read error: errno: ";
 
  781         return (
fLink ? 
fLink->setEtext(emsg.c_str()) : -1);
 
  783         TRACET(
TraceID(), HDBG, 
"connection closed by peer (errno: "<<-rlen<<
")");
 
  788      TRACET(
TraceID(), DBG, dtype << 
" timeout; read " <<rlen <<
" of " <<blen <<
" bytes - rescheduling");
 
  802   XPDLOC(ALL, 
"Protocol::SendData")
 
  816   if (!argp) 
return -1;
 
  824      if ((rc = 
GetData(
"data", argp->buff, quantum))) {
 
  828      if (buf && !(*buf) && savebuf)
 
  833            XPDFORM(msg, 
"EXT: server ID: %d, sending: %d bytes", sid, quantum);
 
  835                                         argp->buff, quantum) != 0) {
 
  837            XPDFORM(msg, 
"EXT: server ID: %d, problems sending: %d bytes to server",
 
  847            XPDFORM(msg, 
"INT: client ID: %d, sending: %d bytes", cid, quantum);
 
  848         if (xps->
SendData(cid, argp->buff, quantum) != 0) {
 
  850            XPDFORM(msg, 
"INT: client ID: %d, problems sending: %d bytes to client",
 
  878   XPDLOC(ALL, 
"Protocol::SendDataN")
 
  892   if (!argp) 
return -1;
 
  896      if ((rc = 
GetData(
"data", argp->buff, quantum))) {
 
  900      if (buf && !(*buf) && savebuf)
 
  904      if (xps->
SendDataN(argp->buff, quantum) != 0) {
 
  927   XPDLOC(ALL, 
"Protocol::SendMsg")
 
  929   static const char *crecv[5] = {
"master proofserv", 
"top master",
 
  930                                  "client", 
"undefined", 
"any"};
 
  943      XPDFORM(msg, 
"%s: session ID not found: %d", (
Internal() ? 
"INT" : 
"EXT"), psid);
 
  945      response->Send(kXR_InvalidRequest, msg.c_str());
 
  956         XPDFORM(msg, 
"EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d," 
  957                     " cid: %d)", len, psid, xps, xps->
Status(), 
fCID);
 
  968         TRACET(
TraceID(), REQ, 
"EXT: error sending message to proofserv");
 
  980          XPDFORM(msg, 
"INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
 
  981                       len, psid, xps, xps->
Status());
 
  984      bool saveStartMsg = 0;
 
  988         TRACET(
TraceID(), DBG, 
"INT: setting proofserv in 'idle' state");
 
  993         TRACET(
TraceID(), DBG, 
"INT: got message with query number");
 
  995         TRACET(
TraceID(), DBG, 
"INT: setting proofserv in 'running' state");
 
 1015         if (
SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
 
 1017                           "SendMsg: INT: session is reconnecting: retry later");
 
 1022         if (
SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
 
 1024                           "SendMsg: INT: session is reconnecting: retry later");
 
 1036         XPDFORM(msg, 
"INT: message sent to %s (%d bytes)", crecv[ii], len);
 
 1052   XPDLOC(ALL, 
"Protocol::Urgent")
 
 1054   unsigned int rc = 0;
 
 1070      response->Send(kXR_InvalidRequest,
"Urgent: session ID not found");
 
 1077   if (!xps->
Match(psid)) {
 
 1084      response->Send(
kXP_InvalidRequest,
"Urgent: session response object undefined - do nothing");
 
 1089   int len = 3 *
sizeof(kXR_int32);
 
 1090   char *buf = 
new char[len];
 
 1092   kXR_int32 itmp = 
static_cast<kXR_int32
>(htonl(
type));
 
 1093   memcpy(buf, &itmp, 
sizeof(kXR_int32));
 
 1095   itmp = 
static_cast<kXR_int32
>(htonl(int1));
 
 1096   memcpy(buf + 
sizeof(kXR_int32), &itmp, 
sizeof(kXR_int32));
 
 1098   itmp = 
static_cast<kXR_int32
>(htonl(int2));
 
 1099   memcpy(buf + 2 * 
sizeof(kXR_int32), &itmp, 
sizeof(kXR_int32));
 
 1103                     "Urgent: could not propagate request to proofsrv");
 
 1120   XPDLOC(ALL, 
"Protocol::Interrupt")
 
 1135      response->Send(kXR_InvalidRequest,
"Interrupt: session ID not found");
 
 1142      if (!xps->
Match(psid)) {
 
 1148      XPDFORM(msg, 
"xps: %p, link ID: %s, proofsrv PID: %d",
 
 1155                        "Interrupt: could not propagate interrupt code to proofsrv");
 
 1176   XPDLOC(ALL, 
"Protocol::Ping")
 
 1192   TRACET(
TraceID(), REQ, 
"psid: "<<psid<<
", async: "<<asyncopt);
 
 1199      response->Send(kXR_InvalidRequest,
"session ID not found");
 
 1204   kXR_int32 pingres = (psid > -1) ? 0 : 1;
 
 1205   if (psid > -1 && xps && xps->
IsValid()) {
 
 1213      if (asyncopt == 1) {
 
 1214         TRACET(
TraceID(), DBG, 
"EXT: async: notifying timeout to client: "<<checkfq<<
" secs");
 
 1215         response->Send(kXR_ok, checkfq);
 
 1220      if (path.length() <= 0) {
 
 1221         TRACET(
TraceID(),  XERR, 
"EXT: admin path is empty! - protocol error");
 
 1223            response->Send(
kXP_ServerError, 
"EXT: admin path is empty! - protocol error");
 
 1233      if (stat(path.c_str(), &st0) != 0) {
 
 1234         TRACET(
TraceID(),  XERR, 
"EXT: cannot stat admin path: "<<path);
 
 1245         if ((now - st0.st_mtime) > checkfq - 5) {
 
 1248               TRACET(
TraceID(),  XERR, 
"EXT: could not send verify request to proofsrv");
 
 1250                  response->Send(
kXP_ServerError, 
"EXT: could not verify reuqest to proofsrv");
 
 1257               if (stat(path.c_str(), &st1) == 0) {
 
 1258                  if (st1.st_mtime > st0.st_mtime) {
 
 1264               TRACET(
TraceID(), DBG, 
"EXT: waiting "<<
ns<<
" secs for session "<<pid<<
 
 1265                                 " to touch the admin path");
 
 1279      TRACET(
TraceID(), DBG, 
"EXT: notified the result to client: "<<pingres);
 
 1280      if (asyncopt == 0) {
 
 1281         response->Send(kXR_ok, pingres);
 
 1284         int len = 
sizeof(kXR_int32);
 
 1285         char *buf = 
new char[len];
 
 1287         kXR_int32 ifw = (kXR_int32)0;
 
 1288         ifw = 
static_cast<kXR_int32
>(htonl(ifw));
 
 1289         memcpy(buf, &ifw, 
sizeof(kXR_int32));
 
 1290         response->Send(kXR_attn, 
kXPD_ping, buf, len);
 
 1293   } 
else if (psid > -1)  {
 
 1299   response->Send(kXR_ok, pingres);
 
 1311   XPDLOC(ALL, 
"Protocol::PostSession")
 
 1315      int pid = (xps) ? xps->
SrvPID() : -1;
 
 1317         TRACE(XERR, 
"undefined session or process id");
 
 1321      XPDFORM(buf, 
"%d %s %s %d", on, u, 
g, pid);
 
 1324                                             buf.c_str()) != 0) {
 
 1325         TRACE(XERR, 
"problem posting the prority manager pipe");
 
 1331         TRACE(DBG, 
"posting the scheduler pipe");
 
 1333            TRACE(XERR, 
"problem posting the scheduler pipe");
 
 1340         TRACE(XERR, 
"problem posting the session manager pipe");
 
 1352   XPDLOC(ALL, 
"Protocol::TouchAdminPath")
 
 1365            apath.replace(
"/activesessions/", 
"/terminatedsessions/");
 
 1366            apath.replace(
".status", 
"");
 
 1369         if (rc != 0 && rc != -ENOENT) {
 
 1370            const char *
type = 
Internal() ? 
"internal" : 
"external";
 
 1384   XPDLOC(ALL, 
"Protocol::CtrlC")
 
#define TRACE(Flag, Args)
R__EXTERN C unsigned int sleep(unsigned int seconds)
#define kXPD_ClientMaster
#define kXPD_startprocess
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
static XrdSysLogger gMainLogger
XrdProtocol * XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
This protocol is meant to live in a shared library.
int XrdgetProtocolPort(const char *, char *, XrdProtocol_Config *pi)
This function is called early on to determine the port we need to use.
struct ResetCtrlcGuard ResetCtrlcGuard_t
XrdOucTrace * XrdProofdTrace
#define XPD_SETRESPV(p, x)
#define XPD_SETRESP(p, x)
#define TRACESET(act, on)
#define TRACET(tid, act, x)
#define XrdSysMutexHelper
void Set(int inQMax, time_t agemax=1800)
Lock the data area and set the values.
void Push(XpdObject *Node)
Push back a protocol.
XrdProofdProtocol * Pop()
Pop up a protocol object.
XrdProofdProtocol * objectItem()
void setItem(XrdProofdProtocol *ival)
static int Touch(const char *path, int opt=0)
Set access (opt == 1), modify (opt =2 ) or access&modify (opt = 0, default) times of path to current ...
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table.
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
const char * User() const
int ResetClientSlot(int ic)
Reset slot at 'ic'.
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
int Touch(bool reset=0)
Send a touch the connected clients: this will remotely touch the associated TSocket instance and sche...
virtual int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool)
virtual void RegisterDirectives()
XrdProofdPriorityMgr * PriorityMgr() const
XrdProofSched * ProofSched() const
XrdProofdNetMgr * NetMgr() const
int Process(XrdProofdProtocol *p)
Process manager request.
XrdProtocol * Xrootd() const
XrdProofdProofServMgr * SessionMgr() const
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
int Post(int type, const char *msg)
Post message on the pipe.
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
void DisconnectFromProofServ(int pid)
Change reconnecting status.
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
void SetReconnectTime(bool on=1)
Change reconnecting status.
int CheckFrequency() const
XrdProofdResponse * Response() const
const char * AdminPath() const
int SendData(int cid, void *buff, int len)
Send data to client cid.
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
bool Match(short int id) const
void SetStartMsg(XrdSrvBuffer *sm)
static int Configure(char *parms, XrdProtocol_Config *pi)
Protocol configuration tool Function: Establish configuration at load time.
void Recycle(XrdLink *lp, int x, const char *y)
Recycle call. Release the instance and give it back to the stack.
int SendMsg()
Handle a request to forward a message to another process.
int SendData(XrdProofdProofServ *xps, kXR_int32 sid=-1, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open link. Segmentation is done here, if required.
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
XrdProofdResponse * GetNewResponse(kXR_unt16 rid)
Create new response instance for stream ID 'sid'.
static XrdSysRecMutex fgBMutex
static void PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps)
Post change of session status.
static XrdSysError fgEDest
XrdProofdClient * fPClient
int SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open client links of session 'xps'.
static XpdObjectQ fgProtStack
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdProofdProtocol(XrdProtocol_Config *pi=0)
Protocol constructor.
static XrdProofdManager * fgMgr
XrdProofdClient * Client() const
int Process2()
Local processing method: here the request is dispatched to the appropriate method.
int CtrlC()
Set and propagate a Ctrl-C request.
int Stats(char *buff, int blen, int do_sync)
Return statistics info about the protocol.
static XrdBuffManager * fgBPool
std::vector< XrdProofdResponse * > fResponses
XrdSysRecMutex fCtrlcMutex
XrdSecProtocol * fAuthProt
void Reset()
Reset static and local vars.
int Process(XrdLink *lp)
Process the information received on the active link.
static XrdSysLogger * fgLogger
int Interrupt()
Handle an interrupt request.
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
void TouchAdminPath()
Recording time of the last request on this instance.
int Urgent()
Handle generic request of a urgent message to be forwarded to the server.
static int fgEUidAtStartup
const char * TraceID() const
XrdSecEntity * fSecClient
unsigned char fClntCapVer
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
int Ping()
Handle a ping request.
void Set(XrdLink *l)
Set the link to be used by this response.
int Send(void)
Auxilliary Send method.
const char * TraceID() const
static int ChangePerm(uid_t uid, gid_t gid)
static constexpr double pi
static constexpr double ns
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientInterruptRequest interrupt
struct XPClientSendRcvRequest sendrcv