Logo ROOT   6.08/07
Reference Guide
XrdProofdProtocol.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // XrdProofdProtocol //
15 // //
16 // Authors: G. Ganis, CERN, 2005 //
17 // //
18 // XrdProtocol implementation to coordinate 'proofserv' applications. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "XrdProofdPlatform.h"
23 
24 #include "XpdSysError.h"
25 #include "XpdSysLogger.h"
26 
27 #include "XrdSys/XrdSysPriv.hh"
28 #include "XrdOuc/XrdOucStream.hh"
29 
30 #include "XrdVersion.hh"
31 #include "Xrd/XrdBuffer.hh"
32 #include "Xrd/XrdScheduler.hh"
33 
34 #include "XrdProofdClient.h"
35 #include "XrdProofdClientMgr.h"
36 #include "XrdProofdConfig.h"
37 #include "XrdProofdManager.h"
38 #include "XrdProofdNetMgr.h"
39 #include "XrdProofdPriorityMgr.h"
40 #include "XrdProofdProofServMgr.h"
41 #include "XrdProofdProtocol.h"
42 #include "XrdProofdResponse.h"
43 #include "XrdProofdProofServ.h"
44 #include "XrdProofSched.h"
45 #include "XrdROOT.h"
46 #include "rpdconn.h"
47 
48 // Tracing utils
49 #include "XrdProofdTrace.h"
50 XrdOucTrace *XrdProofdTrace = 0;
51 
52 // Loggers: we need two to avoid deadlocks
54 
55 //
56 // Static area: general protocol managing section
59  "xproofd protocol anchor");
60 XrdSysRecMutex XrdProofdProtocol::fgBMutex; // Buffer management mutex
61 XrdBuffManager *XrdProofdProtocol::fgBPool = 0;
65 //
66 // Static area: protocol configuration section
68 //
70 // Cluster manager
72 
73 // Effective uid
75 
76 // Local definitions
77 #define MAX_ARGS 128
78 
79 // Macros used to set conditional options
80 #ifndef XPDCOND
81 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
82 #endif
83 #ifndef XPDSETSTRING
84 #define XPDSETSTRING(n,ns,c,s) \
85  { if (XPDCOND(n,ns)) { \
86  SafeFree(c); c = strdup(s.c_str()); ns = n; }}
87 #endif
88 
89 #ifndef XPDADOPTSTRING
90 #define XPDADOPTSTRING(n,ns,c,s) \
91  { char *t = 0; \
92  XPDSETSTRING(n, ns, t, s); \
93  if (t && strlen(t)) { \
94  SafeFree(c); c = t; \
95  } else \
96  SafeFree(t); }
97 #endif
98 
99 #ifndef XPDSETINT
100 #define XPDSETINT(n,ns,i,s) \
101  { if (XPDCOND(n,ns)) { \
102  i = strtol(s.c_str(),0,10); ns = n; }}
103 #endif
104 
105 typedef struct {
106  kXR_int32 ptyp; // must be always 0 !
107  kXR_int32 rlen;
108  kXR_int32 pval;
109  kXR_int32 styp;
110 } hs_response_t;
111 
112 typedef struct ResetCtrlcGuard {
113  XrdProofdProtocol *xpd;
114  int type;
115  ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { }
116  ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
118 
119 //
120 // Derivation of XrdProofdConfig to read the port from the config file
121 class XrdProofdProtCfg : public XrdProofdConfig {
122 public:
123  int fPort; // The port on which we listen
124  XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0);
125  int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool);
126  void RegisterDirectives();
127 };
128 
129 ////////////////////////////////////////////////////////////////////////////////
130 /// Constructor
131 
132 XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest)
133  : XrdProofdConfig(cfg, edest)
134 {
135  fPort = -1;
136  RegisterDirectives();
137 }
138 
139 ////////////////////////////////////////////////////////////////////////////////
140 /// Register directives for configuration
141 
142 void XrdProofdProtCfg::RegisterDirectives()
143 {
144  Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass));
145  Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass));
146 }
147 
148 ////////////////////////////////////////////////////////////////////////////////
149 /// Parse directives
150 
151 int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
152  char *val, XrdOucStream *cfg, bool)
153 {
154  if (!d) return -1;
155 
156  XrdOucString port(val);
157  if (d->fName == "xrd.protocol") {
158  port = cfg->GetWord();
159  port.replace("xproofd:", "");
160  } else if (d->fName != "port") {
161  return -1;
162  }
163  if (port.length() > 0) {
164  fPort = strtol(port.c_str(), 0, 10);
165  }
166  fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
167  return 0;
168 }
169 
170 #if (ROOTXRDVERS >= 300030000)
171 XrdVERSIONINFO(XrdgetProtocol,xproofd);
172 XrdVERSIONINFO(XrdgetProtocolPort,xproofd);
173 #endif
174 
175 extern "C" {
176 ////////////////////////////////////////////////////////////////////////////////
177 /// This protocol is meant to live in a shared library. The interface below is
178 /// used by the server to obtain a copy of the protocol object that can be used
179 /// to decide whether or not a link is talking a particular protocol.
180 
181 XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
182 {
183  // Return the protocol object to be used if static init succeeds
184  if (XrdProofdProtocol::Configure(parms, pi)) {
185 
186  return (XrdProtocol *) new XrdProofdProtocol(pi);
187  }
188  return (XrdProtocol *)0;
189 }
190 
191 ////////////////////////////////////////////////////////////////////////////////
192 /// This function is called early on to determine the port we need to use. The
193 /// The default is ostensibly 1093 but can be overidden; which we allow.
194 
195 int XrdgetProtocolPort(const char * /*pname*/, char * /*parms*/, XrdProtocol_Config *pi)
196 {
197  // Default XPD_DEF_PORT (1093)
198  int port = XPD_DEF_PORT;
199 
200  if (pi) {
201  XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
202  // Init some relevant quantities for tracing
203  XrdProofdTrace = new XrdOucTrace(pi->eDest);
204  pcfg.Config(0);
205 
206  if (pcfg.fPort > 0) {
207  port = pcfg.fPort;
208  } else {
209  port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
210  }
211  }
212  return port;
213 }}
214 
215 ////////////////////////////////////////////////////////////////////////////////
216 /// Protocol constructor
217 
219  : XrdProtocol("xproofd protocol handler"), fProtLink(this)
220 {
221  fLink = 0;
222  fArgp = 0;
223  fPClient = 0;
224  fSecClient = 0;
225  fAuthProt = 0;
226  fResponses.reserve(10);
227 
228  fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
229 
230  // Instantiate a Proofd protocol object
231  Reset();
232 }
233 
234 ////////////////////////////////////////////////////////////////////////////////
235 /// Get response instance corresponding to stream ID 'sid'
236 
238 {
239  XPDLOC(ALL, "Protocol::Response")
240 
241  TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
242 
243  if (sid > 0)
244  if (sid <= fResponses.size())
245  return fResponses[sid-1];
246 
247  return (XrdProofdResponse *)0;
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Create new response instance for stream ID 'sid'
252 
254 {
255  XPDLOC(ALL, "Protocol::GetNewResponse")
256 
257  XrdOucString msg;
258  XPDFORM(msg, "sid: %d", sid);
259  if (sid > 0) {
260  if (sid > fResponses.size()) {
261  if (sid > fResponses.capacity()) {
262  int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
263  fResponses.reserve(newsz);
264  if (TRACING(DBG)) {
265  msg += " new capacity: ";
266  msg += (int) fResponses.capacity();
267  }
268  }
269  int nnew = sid - fResponses.size();
270  while (nnew--)
271  fResponses.push_back(new XrdProofdResponse());
272  if (TRACING(DBG)) {
273  msg += "; new size: ";
274  msg += (int) fResponses.size();
275  }
276  }
277  } else {
278  TRACE(XERR,"wrong sid: "<<sid);
279  return (XrdProofdResponse *)0;
280  }
281 
282  TRACE(DBG, msg);
283 
284  // Done
285  return fResponses[sid-1];
286 }
287 
288 ////////////////////////////////////////////////////////////////////////////////
289 /// Check whether the request matches this protocol
290 
291 XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp)
292 {
293  XPDLOC(ALL, "Protocol::Match")
294 
295  struct ClientInitHandShake hsdata;
296  char *hsbuff = (char *)&hsdata;
297 
298  static hs_response_t hsresp = {0, 0, kXR_int32(htonl(XPROOFD_VERSBIN)), 0};
299 
300  XrdProtocol *xp = nullptr;
301  int dlen;
302  TRACE(HDBG, "enter");
303 
304  XrdOucString emsg;
305  // Peek at the first 20 bytes of data
306  if ((dlen = lp->Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) {
307  if (dlen <= 0) lp->setEtext("Match: handshake not received");
308  if (dlen == 12) {
309  // Check if it is a request to open a file via 'rootd'
310  hsdata.first = ntohl(hsdata.first);
311  if (hsdata.first == 8) {
312  if (strlen(fgMgr->RootdExe()) > 0) {
313  if (fgMgr->IsRootdAllowed((const char *)lp->Host())) {
314  TRACE(ALL, "matched rootd protocol on link: executing "<<fgMgr->RootdExe());
315  XrdOucString em;
316  if (StartRootd(lp, em) != 0) {
317  emsg = "rootd: failed to start daemon: ";
318  emsg += em;
319  }
320  } else {
321  XPDFORM(emsg, "rootd-file serving not authorized for host '%s'", lp->Host());
322  }
323  } else {
324  emsg = "rootd-file serving not enabled";
325  }
326  }
327  if (emsg.length() > 0) {
328  lp->setEtext(emsg.c_str());
329  } else {
330  lp->setEtext("link transfered");
331  }
332  return xp;
333  }
334  TRACE(XERR, "peeked incomplete or empty information! (dlen: "<<dlen<<" bytes)");
335  return xp;
336  }
337 
338  // If this is is not our protocol, we check if it a data serving request via xrootd
339  hsdata.third = ntohl(hsdata.third);
340  if (dlen != sizeof(hsdata) || hsdata.first || hsdata.second
341  || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
342 
343  // Check if it is a request to open a file via 'xrootd'
344  if (fgMgr->Xrootd() && (xp = fgMgr->Xrootd()->Match(lp))) {
345  TRACE(ALL, "matched xrootd protocol on link: serving a file");
346  } else {
347  TRACE(XERR, "failed to match any known or enabled protocol");
348  }
349  return xp;
350  }
351 
352  // Respond to this request with the handshake response
353  if (!lp->Send((char *)&hsresp, sizeof(hsresp))) {
354  lp->setEtext("Match: handshake failed");
355  TRACE(XERR, "handshake failed");
356  return xp;
357  }
358 
359  // We can now read all 20 bytes and discard them (no need to wait for it)
360  int len = sizeof(hsdata);
361  if (lp->Recv(hsbuff, len) != len) {
362  lp->setEtext("Match: reread failed");
363  TRACE(XERR, "reread failed");
364  return xp;
365  }
366 
367  // Get a protocol object off the stack (if none, allocate a new one)
368  XrdProofdProtocol *xpp = nullptr;
369  if (!(xpp = fgProtStack.Pop()))
370  xpp = new XrdProofdProtocol();
371 
372  // Bind the protocol to the link and return the protocol
373  xpp->fLink = lp;
374  snprintf(xpp->fSecEntity.prot, XrdSecPROTOIDSIZE, "host");
375  xpp->fSecEntity.host = strdup((char *)lp->Host());
376 
377  // Dummy data used by 'proofd'
378  kXR_int32 dum[2];
379  if (xpp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) {
380  xpp->Recycle(0,0,0);
381  }
382 
383  xp = (XrdProtocol *) xpp;
384 
385  // We are done
386  return xp;
387 }
388 
389 ////////////////////////////////////////////////////////////////////////////////
390 /// Transfer the connection to a rootd daemon to serve a file access request
391 /// Return 0 on success, -1 on failure
392 
393 int XrdProofdProtocol::StartRootd(XrdLink *lp, XrdOucString &emsg)
394 {
395  XPDLOC(ALL, "Protocol::StartRootd")
396 
397  const char *prog = fgMgr->RootdExe();
398  const char **progArg = fgMgr->RootdArgs();
399 
400  if (fgMgr->RootdFork()) {
401 
402  // Start rootd using fork()
403 
404  pid_t pid;
405  if ((pid = fgMgr->Sched()->Fork(lp->Name()))) {
406  if (pid < 0) {
407  emsg = "rootd fork failed";
408  return -1;
409  }
410  return 0;
411  }
412  // In the child ...
413 
414  // Restablish standard error for the program we will exec
415  dup2(fStdErrFD, STDERR_FILENO);
416  close(fStdErrFD);
417 
418  // Force stdin/out to point to the socket FD (this will also bypass the
419  // close on exec setting for the socket)
420  dup2(lp->FDnum(), STDIN_FILENO);
421  dup2(lp->FDnum(), STDOUT_FILENO);
422 
423  // Do the exec
424  execv((const char *)prog, (char * const *)progArg);
425  TRACE(XERR, "rootd: Oops! Exec(" <<prog <<") failed; errno: " <<errno);
426  _exit(17);
427 
428  } else {
429 
430  // Start rootd using system + proofexecv
431 
432  // ROOT version
433  XrdROOT *roo = fgMgr->ROOTMgr()->DefaultVersion();
434  if (!roo) {
435  emsg = "ROOT version undefined!";
436  return -1;
437  }
438  // The path to the executable
439  XrdOucString pexe;
440  XPDFORM(pexe, "%s/proofexecv", roo->BinDir());
441  if (access(pexe.c_str(), X_OK) != 0) {
442  XPDFORM(emsg, "path '%s' does not exist or is not executable (errno: %d)",
443  pexe.c_str(), (int)errno);
444  return -1;
445  }
446 
447  // Start the proofexecv
448  XrdOucString cmd, exp;
449  XPDFORM(cmd, "export ROOTBINDIR=\"%s\"; %s 20 0 %s %s", roo->BinDir(),
450  pexe.c_str(), fgMgr->RootdUnixSrv()->path(), prog);
451  int n = 1;
452  while (progArg[n] != 0) {
453  cmd += " "; cmd += progArg[n]; n++;
454  }
455  cmd += " &";
456  TRACE(HDBG, cmd);
457  if (system(cmd.c_str()) == -1) {
458  XPDFORM(emsg, "failure from 'system' (errno: %d)", (int)errno);
459  return -1;
460  }
461 
462  // Accept a connection from the second server
463  int err = 0;
464  rpdunix *uconn = fgMgr->RootdUnixSrv()->accept(-1, &err);
465  if (!uconn || !uconn->isvalid(0)) {
466  XPDFORM(emsg, "failure accepting callback (errno: %d)", -err);
467  if (uconn) delete uconn;
468  return -1;
469  }
470  TRACE(HDBG, "proofexecv connected!");
471 
472  int rcc = 0;
473  // Transfer the open descriptor to be used in rootd
474  int fd = dup(lp->FDnum());
475  if (fd < 0 || (rcc = uconn->senddesc(fd)) != 0) {
476  XPDFORM(emsg, "failure sending descriptor '%d' (original: %d); (errno: %d)", fd, lp->FDnum(), -rcc);
477  if (uconn) delete uconn;
478  return -1;
479  }
480  // Close the connection to the parent
481  delete uconn;
482  }
483 
484  // Done
485  return 0;
486 }
487 
488 ////////////////////////////////////////////////////////////////////////////////
489 /// Return statistics info about the protocol.
490 /// Not really implemented yet: this is a reduced XrdXrootd version.
491 
492 int XrdProofdProtocol::Stats(char *buff, int blen, int)
493 {
494  static char statfmt[] = "<stats id=\"xproofd\"><num>%ld</num></stats>";
495 
496  // If caller wants only size, give it to them
497  if (!buff)
498  return sizeof(statfmt)+16;
499 
500  // We have only one statistic -- number of successful matches
501  return snprintf(buff, blen, statfmt, fgCount);
502 }
503 
504 ////////////////////////////////////////////////////////////////////////////////
505 /// Reset static and local vars
506 
508 {
509  // Init local vars
510  fLink = 0;
511  fPid = -1;
512  fArgp = 0;
513  fStatus = 0;
514  fClntCapVer = 0;
516  fSuperUser = 0;
517  fPClient = 0;
518  fUserIn = "";
519  fGroupIn = "";
520  fCID = -1;
521  fTraceID = "";
522  fAdminPath = "";
523  if (fAuthProt) {
524  fAuthProt->Delete();
525  fAuthProt = 0;
526  }
527  memset(&fSecEntity, 0, sizeof(fSecEntity));
528  // Cleanup existing XrdProofdResponse objects
529  std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin(); // One per each logical connection
530  while (ii != fResponses.end()) {
531  (*ii)->Reset();
532  ii++;
533  }
534 }
535 
536 ////////////////////////////////////////////////////////////////////////////////
537 /// Protocol configuration tool
538 /// Function: Establish configuration at load time.
539 /// Output: 1 upon success or 0 otherwise.
540 
541 int XrdProofdProtocol::Configure(char *parms, XrdProtocol_Config *pi)
542 {
543  XPDLOC(ALL, "Protocol::Configure")
544 
545  XrdOucString mp;
546 
547  // Only once
548  if (fgConfigDone)
549  return 1;
550  fgConfigDone = 1;
551 
552  // Copy out the special info we want to use at top level
553  fgLogger = pi->eDest->logger();
554  fgEDest.logger(fgLogger);
555  if (XrdProofdTrace) delete XrdProofdTrace; // It could have been initialized in XrdgetProtocolPort
556  XrdProofdTrace = new XrdOucTrace(&fgEDest);
557  fgBPool = pi->BPool;
558  fgReadWait = pi->readWait;
559 
560  // Pre-initialize some i/o values
561  fgMaxBuffsz = fgBPool->MaxSize();
562 
563  // Schedule protocol object cleanup; the maximum number of objects
564  // and the max age are taken from XrdXrootdProtocol: this may need
565  // some optimization in the future.
566 #if 1
568  fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
569 #else
570  fgProtStack.Set(pi->Sched, 3600);
571 #endif
572 
573  // Default tracing options: always trace logins and errors for all
574  // domains; if the '-d' option was specified on the command line then
575  // trace also REQ and FORM.
576  // NB: these are superseeded by settings in the config file (xpd.trace)
578  TRACESET(XERR, 1);
579  TRACESET(LOGIN, 1);
580  TRACESET(RSP, 0);
581  if (pi->DebugON)
582  XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK);
583 
584  // Work as root to avoid contineous changes of the effective user
585  // (users are logged in their box after forking)
586  fgEUidAtStartup = geteuid();
587  if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
588 
589  // Process the config file for directives meaningful to us
590  // Create and Configure the manager
591  fgMgr = new XrdProofdManager(parms, pi, &fgEDest);
592  if (fgMgr->Config(0)) return 0;
593  mp = "global manager created";
594  TRACE(ALL, mp);
595 
596  // Issue herald indicating we configured successfully
597  TRACE(ALL, "xproofd protocol version "<<XPROOFD_VERSION<<
598  " build "<<XrdVERSION<<" successfully loaded");
599 
600  // Return success
601  return 1;
602 }
603 
604 ////////////////////////////////////////////////////////////////////////////////
605 /// Process the information received on the active link.
606 /// (We ignore the argument here)
607 
609 {
610  XPDLOC(ALL, "Protocol::Process")
611 
612  int rc = 0;
613  TRACET(TraceID(), DBG, "instance: " << this);
614 
615  // Read the next request header
616  if ((rc = GetData("request", (char *)&fRequest, sizeof(fRequest))) != 0)
617  return rc;
618  TRACET(TraceID(), HDBG, "after GetData: rc: " << rc);
619 
620  // Deserialize the data
621  fRequest.header.requestid = ntohs(fRequest.header.requestid);
622  fRequest.header.dlen = ntohl(fRequest.header.dlen);
623 
624  // Get response object
625  kXR_unt16 sid;
626  memcpy((void *)&sid, (const void *)&(fRequest.header.streamid[0]), 2);
627  XrdProofdResponse *response = 0;
628  if (!(response = Response(sid))) {
629  if (!(response = GetNewResponse(sid))) {
630  TRACET(TraceID(), XERR, "could not get Response instance for rid: "<< sid);
631  return rc;
632  }
633  }
634  // Set the stream ID for the reply
635  response->Set(fRequest.header.streamid);
636  response->Set(fLink);
637 
638  TRACET(TraceID(), REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid <<
639  " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<<
640  ")" << ", dlen: " <<fRequest.header.dlen);
641 
642  // Every request has an associated data length. It better be >= 0 or we won't
643  // be able to know how much data to read.
644  if (fRequest.header.dlen < 0) {
645  response->Send(kXR_ArgInvalid, "Process: Invalid request data length");
646  return fLink->setEtext("Process: protocol data length error");
647  }
648 
649  // Read any argument data at this point, except when the request is to forward
650  // a buffer: the argument may have to be segmented and we're not prepared to do
651  // that here.
652  if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
653  if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
654  response->Send(kXR_ArgTooLong, "fRequest.argument is too long");
655  return rc;
656  }
657  if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen)))
658  return rc;
659  fArgp->buff[fRequest.header.dlen] = '\0';
660  }
661 
662  // Continue with request processing at the resume point
663  return Process2();
664 }
665 
666 ////////////////////////////////////////////////////////////////////////////////
667 /// Local processing method: here the request is dispatched to the appropriate
668 /// method
669 
671 {
672  XPDLOC(ALL, "Protocol::Process2")
673 
674  int rc = 0;
675  XPD_SETRESP(this, "Process2");
676 
677  TRACET(TraceID(), REQ, "req id: " << fRequest.header.requestid << " (" <<
679 
680  ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
681 
682  // If the user is logged in check if the wanted action is to be done by us
683  if (fStatus && (fStatus & XPD_LOGGEDIN)) {
684  // Record time of the last action
685  TouchAdminPath();
686  // We must have a client instance if here
687  if (!fPClient) {
688  TRACET(TraceID(), XERR, "client undefined!!! ");
689  response->Send(kXR_InvalidRequest,"client undefined!!! ");
690  return 0;
691  }
692  bool formgr = 0;
693  switch(fRequest.header.requestid) {
694  case kXP_ctrlc:
695  rc = CtrlC();
696  break;
697  case kXP_touch:
698  // Reset the asked-to-touch flag, if it was never set
699  fPClient->Touch(1);
700  break;
701  case kXP_interrupt:
702  rc = Interrupt();
703  break;
704  case kXP_ping:
705  rc = Ping();
706  break;
707  case kXP_sendmsg:
708  rc = SendMsg();
709  break;
710  case kXP_urgent:
711  rc = Urgent();
712  break;
713  default:
714  formgr = 1;
715  }
716  if (!formgr) {
717  // Check the link
718  if (!fLink || (fLink->FDnum() <= 0)) {
719  TRACE(XERR, "link is undefined! ");
720  return -1;
721  }
722  return rc;
723  }
724  }
725 
726  // The request is for the manager
727  rc = fgMgr->Process(this);
728  // Check the link
729  if (!fLink || (fLink->FDnum() <= 0)) {
730  TRACE(XERR, "link is undefined! ");
731  return -1;
732  }
733  return rc;
734 }
735 
736 ////////////////////////////////////////////////////////////////////////////////
737 /// Recycle call. Release the instance and give it back to the stack.
738 
739 void XrdProofdProtocol::Recycle(XrdLink *, int, const char *)
740 {
741  XPDLOC(ALL, "Protocol::Recycle")
742 
743  const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster",
744  "ClientMaster", "Internal", "Admin"};
745  XrdOucString buf;
746 
747  // Document the disconnect
748  if (fPClient)
749  XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(),
750  srvtype[fConnType+1]);
751  else
752  XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]);
753  TRACET(TraceID(), LOGIN, buf);
754 
755  // If we have a buffer, release it
756  if (fArgp) {
757  fgBPool->Release(fArgp);
758  fArgp = 0;
759  }
760 
761  // Locate the client instance
762  XrdProofdClient *pmgr = fPClient;
763 
764  if (pmgr) {
765  if (!Internal()) {
766 
767  TRACE(REQ,"External disconnection of protocol associated with pid "<<fPid);
768 
769  // Write disconnection file
770  XrdOucString discpath(fAdminPath);
771  discpath.replace("/cid", "/disconnected");
772  FILE *fd = fopen(discpath.c_str(), "w");
773  if (!fd && errno != ENOENT) {
774  TRACE(XERR, "unable to create path: " <<discpath<<" (errno: "<<errno<<")");
775  } else if (fd) {
776  fclose(fd);
777  }
778 
779  // Remove protocol and response from attached client/proofserv instances
780  // Set reconnect flag if proofserv instances attached to this client are still running
781  pmgr->ResetClientSlot(fCID);
782  if(fgMgr && fgMgr->SessionMgr()) {
784 
786  if((fConnType == 0) && fgMgr->SessionMgr()->Alive(this)) {
787  TRACE(REQ, "Non-destroyed proofserv processes attached to this protocol ("<<this<<
788  "), setting reconnect time");
790  }
792  } else {
793  TRACE(XERR, "No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("
794  <<(fgMgr ? fgMgr->SessionMgr() : (void *) -1)<<")")
795  }
796 
797  } else {
798 
799  // Internal connection: we need to remove this instance from the list
800  // of proxy servers and to notify the attached clients.
801  // Tell the session manager that this session has gone
802  if (fgMgr && fgMgr->SessionMgr()) {
804  TRACE(HDBG, "fAdminPath: "<<fAdminPath);
805  buf.assign(fAdminPath, fAdminPath.rfind('/') + 1, -1);
806  fgMgr->SessionMgr()->DeleteFromSessions(buf.c_str());
807  // Move the entry to the terminated sessions area
808  fgMgr->SessionMgr()->MvSession(buf.c_str());
809  }
810  else {
811  TRACE(XERR,"No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("<<fgMgr->SessionMgr()<<")")
812  }
813  }
814  }
815  // Set fields to starting point (debugging mostly)
816  Reset();
817 
818  // Push ourselves on the stack
820 #if 0
821  if(fgProtStack.Push(&fProtLink) != 0) {
823  fProtLink.setItem(0);
824  delete xp;
825  }
826 #endif
827 }
828 
829 ////////////////////////////////////////////////////////////////////////////////
830 /// Allocate a buffer to handle quantum bytes; if argp points to an existing
831 /// buffer, its size is checked and re-allocated if needed
832 
833 XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp)
834 {
835  XPDLOC(ALL, "Protocol::GetBuff")
836 
837  TRACE(HDBG, "len: "<<quantum);
838 
839  // If we are given an existing buffer, we keep it if we use at least half
840  // of it; otherwise we take a smaller one
841  if (argp) {
842  if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
843  return argp;
844  }
845 
846  // Release the buffer if too small
848  if (argp)
849  fgBPool->Release(argp);
850 
851  // Obtain a new one
852  if ((argp = fgBPool->Obtain(quantum)) == 0) {
853  TRACE(XERR, "could not get requested buffer (size: "<<quantum<<
854  ") = insufficient memory");
855  } else {
856  TRACE(HDBG, "quantum: "<<quantum<<
857  ", buff: "<<(void *)(argp->buff)<<", bsize:"<<argp->bsize);
858  }
859 
860  // Done
861  return argp;
862 }
863 
864 ////////////////////////////////////////////////////////////////////////////////
865 /// Release a buffer previously allocated via GetBuff
866 
867 void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp)
868 {
870  fgBPool->Release(argp);
871 }
872 
873 ////////////////////////////////////////////////////////////////////////////////
874 /// Get data from the open link
875 
876 int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen)
877 {
878  XPDLOC(ALL, "Protocol::GetData")
879 
880  int rlen;
881 
882  // Read the data but reschedule the link if we have not received all of the
883  // data within the timeout interval.
884  TRACET(TraceID(), HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
885 
886  // No need to lock:the link is disable while we are here
887  rlen = fLink->Recv(buff, blen, fgReadWait);
888  if (rlen < 0) {
889  if (rlen != -ENOMSG && rlen != -ECONNRESET) {
890  XrdOucString emsg = "link read error: errno: ";
891  emsg += -rlen;
892  TRACET(TraceID(), XERR, emsg.c_str());
893  return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
894  } else {
895  TRACET(TraceID(), HDBG, "connection closed by peer (errno: "<<-rlen<<")");
896  return -1;
897  }
898  }
899  if (rlen < blen) {
900  TRACET(TraceID(), DBG, dtype << " timeout; read " <<rlen <<" of " <<blen <<" bytes - rescheduling");
901  return 1;
902  }
903  TRACET(TraceID(), HDBG, "rlen: "<<rlen);
904 
905  return 0;
906 }
907 
908 ////////////////////////////////////////////////////////////////////////////////
909 /// Send data over the open link. Segmentation is done here, if required.
910 
912  kXR_int32 sid, XrdSrvBuffer **buf, bool savebuf)
913 {
914  XPDLOC(ALL, "Protocol::SendData")
915 
916  int rc = 0;
917 
918  TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
919 
920  // Buffer length
921  int len = fRequest.header.dlen;
922 
923  // Quantum size
924  int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
925 
926  // Get a buffer
927  XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
928  if (!argp) return -1;
929 
930  // Now send over all of the data as unsolicited messages
931  XrdOucString msg;
932  while (len > 0) {
933 
934  XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
935 
936  if ((rc = GetData("data", argp->buff, quantum))) {
937  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
938  return -1;
939  }
940  if (buf && !(*buf) && savebuf)
941  *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
942  // Send
943  if (sid > -1) {
944  if (TRACING(HDBG))
945  XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum);
946  if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
947  argp->buff, quantum) != 0) {
948  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
949  XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server",
950  sid, quantum);
951  TRACET(TraceID(), XERR, msg);
952  return -1;
953  }
954  } else {
955 
956  // Get ID of the client
957  int cid = ntohl(fRequest.sendrcv.cid);
958  if (TRACING(HDBG))
959  XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum);
960  if (xps->SendData(cid, argp->buff, quantum) != 0) {
961  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
962  XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client",
963  cid, quantum);
964  TRACET(TraceID(), XERR, msg);
965  return -1;
966  }
967  }
968  TRACET(TraceID(), HDBG, msg);
969  // Next segment
970  len -= quantum;
971  if (len < quantum)
972  quantum = len;
973  }
974 
975  // Release the buffer
976  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
977 
978  // Done
979  return 0;
980 }
981 
982 ////////////////////////////////////////////////////////////////////////////////
983 /// Send data over the open client links of session 'xps'.
984 /// Used when all the connected clients are eligible to receive the message.
985 /// Segmentation is done here, if required.
986 
988  XrdSrvBuffer **buf, bool savebuf)
989 {
990  XPDLOC(ALL, "Protocol::SendDataN")
991 
992  int rc = 0;
993 
994  TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
995 
996  // Buffer length
997  int len = fRequest.header.dlen;
998 
999  // Quantum size
1000  int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
1001 
1002  // Get a buffer
1003  XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
1004  if (!argp) return -1;
1005 
1006  // Now send over all of the data as unsolicited messages
1007  while (len > 0) {
1008  if ((rc = GetData("data", argp->buff, quantum))) {
1010  return -1;
1011  }
1012  if (buf && !(*buf) && savebuf)
1013  *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
1014 
1015  // Send to connected clients
1016  if (xps->SendDataN(argp->buff, quantum) != 0) {
1018  return -1;
1019  }
1020 
1021  // Next segment
1022  len -= quantum;
1023  if (len < quantum)
1024  quantum = len;
1025  }
1026 
1027  // Release the buffer
1029 
1030  // Done
1031  return 0;
1032 }
1033 
1034 ////////////////////////////////////////////////////////////////////////////////
1035 /// Handle a request to forward a message to another process
1036 
1038 {
1039  XPDLOC(ALL, "Protocol::SendMsg")
1040 
1041  static const char *crecv[5] = {"master proofserv", "top master",
1042  "client", "undefined", "any"};
1043  int rc = 0;
1044 
1045  XPD_SETRESP(this, "SendMsg");
1046 
1047  // Unmarshall the data
1048  int psid = ntohl(fRequest.sendrcv.sid);
1049  int opt = ntohl(fRequest.sendrcv.opt);
1050 
1051  XrdOucString msg;
1052  // Find server session
1053  XrdProofdProofServ *xps = 0;
1054  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1055  XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid);
1056  TRACET(TraceID(), XERR, msg.c_str());
1057  response->Send(kXR_InvalidRequest, msg.c_str());
1058  return 0;
1059  }
1060 
1061  // Message length
1062  int len = fRequest.header.dlen;
1063 
1064  if (!Internal()) {
1065 
1066  if (TRACING(HDBG)) {
1067  // Notify
1068  XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
1069  " cid: %d)", len, psid, xps, xps->Status(), fCID);
1070  TRACET(TraceID(), HDBG, msg.c_str());
1071  }
1072 
1073  // Send to proofsrv our client ID
1074  if (fCID == -1) {
1075  TRACET(TraceID(), REQ, "EXT: error getting clientSID");
1076  response->Send(kXP_ServerError,"EXT: getting clientSID");
1077  return 0;
1078  }
1079  if (SendData(xps, fCID)) {
1080  TRACET(TraceID(), REQ, "EXT: error sending message to proofserv");
1081  response->Send(kXP_reconnecting,"EXT: sending message to proofserv");
1082  return 0;
1083  }
1084 
1085  // Notify to user
1086  response->Send();
1087 
1088  } else {
1089 
1090  if (TRACING(HDBG)) {
1091  // Notify
1092  XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
1093  len, psid, xps, xps->Status());
1094  TRACET(TraceID(), HDBG, msg.c_str());
1095  }
1096  bool saveStartMsg = 0;
1097  XrdSrvBuffer *savedBuf = 0;
1098  // Additional info about the message
1099  if (opt & kXPD_setidle) {
1100  TRACET(TraceID(), DBG, "INT: setting proofserv in 'idle' state");
1101  xps->SetStatus(kXPD_idle);
1102  PostSession(-1, fPClient->UI().fUser.c_str(),
1103  fPClient->UI().fGroup.c_str(), xps);
1104  } else if (opt & kXPD_querynum) {
1105  TRACET(TraceID(), DBG, "INT: got message with query number");
1106  } else if (opt & kXPD_startprocess) {
1107  TRACET(TraceID(), DBG, "INT: setting proofserv in 'running' state");
1108  xps->SetStatus(kXPD_running);
1109  PostSession(1, fPClient->UI().fUser.c_str(),
1110  fPClient->UI().fGroup.c_str(), xps);
1111  // Save start processing message for later clients
1112  xps->DeleteStartMsg();
1113  saveStartMsg = 1;
1114  } else if (opt & kXPD_logmsg) {
1115  // We broadcast log messages only not idle to catch the
1116  // result from processing
1117  if (xps->Status() == kXPD_running) {
1118  TRACET(TraceID(), DBG, "INT: broadcasting log message");
1119  opt |= kXPD_fb_prog;
1120  }
1121  }
1122  bool fbprog = (opt & kXPD_fb_prog);
1123 
1124  if (!fbprog) {
1125  //
1126  // The message is strictly for the client requiring it
1127  if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1128  response->Send(kXP_reconnecting,
1129  "SendMsg: INT: session is reconnecting: retry later");
1130  return 0;
1131  }
1132  } else {
1133  // Send to all connected clients
1134  if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1135  response->Send(kXP_reconnecting,
1136  "SendMsg: INT: session is reconnecting: retry later");
1137  return 0;
1138  }
1139  }
1140  // Save start processing messages, if required
1141  if (saveStartMsg)
1142  xps->SetStartMsg(savedBuf);
1143 
1144  if (TRACING(DBG)) {
1145  int ii = xps->SrvType();
1146  if (ii > 3) ii = 3;
1147  if (ii < 0) ii = 4;
1148  XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len);
1149  TRACET(TraceID(), DBG, msg);
1150  }
1151  // Notify to proofsrv
1152  response->Send();
1153  }
1154 
1155  // Over
1156  return 0;
1157 }
1158 
1159 ////////////////////////////////////////////////////////////////////////////////
1160 /// Handle generic request of a urgent message to be forwarded to the server
1161 
1163 {
1164  XPDLOC(ALL, "Protocol::Urgent")
1165 
1166  unsigned int rc = 0;
1167 
1168  XPD_SETRESP(this, "Urgent");
1169 
1170  // Unmarshall the data
1171  int psid = ntohl(fRequest.proof.sid);
1172  int type = ntohl(fRequest.proof.int1);
1173  int int1 = ntohl(fRequest.proof.int2);
1174  int int2 = ntohl(fRequest.proof.int3);
1175 
1176  TRACET(TraceID(), REQ, "psid: "<<psid<<", type: "<< type);
1177 
1178  // Find server session
1179  XrdProofdProofServ *xps = 0;
1180  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1181  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1182  response->Send(kXR_InvalidRequest,"Urgent: session ID not found");
1183  return 0;
1184  }
1185 
1186  TRACET(TraceID(), DBG, "xps: "<<xps<<", status: "<<xps->Status());
1187 
1188  // Check ID matching
1189  if (!xps->Match(psid)) {
1190  response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing");
1191  return 0;
1192  }
1193 
1194  // Check the link to the session
1195  if (!xps->Response()) {
1196  response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing");
1197  return 0;
1198  }
1199 
1200  // Prepare buffer
1201  int len = 3 *sizeof(kXR_int32);
1202  char *buf = new char[len];
1203  // Type
1204  kXR_int32 itmp = static_cast<kXR_int32>(htonl(type));
1205  memcpy(buf, &itmp, sizeof(kXR_int32));
1206  // First info container
1207  itmp = static_cast<kXR_int32>(htonl(int1));
1208  memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1209  // Second info container
1210  itmp = static_cast<kXR_int32>(htonl(int2));
1211  memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1212  // Send over
1213  if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
1214  response->Send(kXP_ServerError,
1215  "Urgent: could not propagate request to proofsrv");
1216  return 0;
1217  }
1218 
1219  // Notify to user
1220  response->Send();
1221  TRACET(TraceID(), DBG, "request propagated to proofsrv");
1222 
1223  // Over
1224  return 0;
1225 }
1226 
1227 ////////////////////////////////////////////////////////////////////////////////
1228 /// Handle an interrupt request
1229 
1231 {
1232  XPDLOC(ALL, "Protocol::Interrupt")
1233 
1234  int rc = 0;
1235 
1236  XPD_SETRESP(this, "Interrupt");
1237 
1238  // Unmarshall the data
1239  int psid = ntohl(fRequest.interrupt.sid);
1240  int type = ntohl(fRequest.interrupt.type);
1241  TRACET(TraceID(), REQ, "psid: "<<psid<<", type:"<<type);
1242 
1243  // Find server session
1244  XrdProofdProofServ *xps = 0;
1245  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1246  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1247  response->Send(kXR_InvalidRequest,"Interrupt: session ID not found");
1248  return 0;
1249  }
1250 
1251  if (xps) {
1252 
1253  // Check ID matching
1254  if (!xps->Match(psid)) {
1255  response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing");
1256  return 0;
1257  }
1258 
1259  XrdOucString msg;
1260  XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d",
1261  xps, xps->Response()->TraceID(), xps->SrvPID());
1262  TRACET(TraceID(), DBG, msg.c_str());
1263 
1264  // Propagate the type as unsolicited
1265  if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
1266  response->Send(kXP_ServerError,
1267  "Interrupt: could not propagate interrupt code to proofsrv");
1268  return 0;
1269  }
1270 
1271  // Notify to user
1272  response->Send();
1273  TRACET(TraceID(), DBG, "interrupt propagated to proofsrv");
1274  }
1275 
1276  // Over
1277  return 0;
1278 }
1279 
1280 ////////////////////////////////////////////////////////////////////////////////
1281 /// Handle a ping request.
1282 /// For internal connections, ping is done asynchronously to avoid locking
1283 /// problems; the session checker verifies that the admin file has been touched
1284 /// recently enough; touching is done in Process2, so we have nothing to do here
1285 
1287 {
1288  XPDLOC(ALL, "Protocol::Ping")
1289 
1290  int rc = 0;
1291  if (Internal()) {
1292  if (TRACING(HDBG)) {
1293  XPD_SETRESP(this, "Ping");
1294  TRACET(TraceID(), HDBG, "INT: nothing to do ");
1295  }
1296  return 0;
1297  }
1298  XPD_SETRESP(this, "Ping");
1299 
1300  // Unmarshall the data
1301  int psid = ntohl(fRequest.sendrcv.sid);
1302  int asyncopt = ntohl(fRequest.sendrcv.opt);
1303 
1304  TRACET(TraceID(), REQ, "psid: "<<psid<<", async: "<<asyncopt);
1305 
1306  // For connections to servers find the server session; manager connections
1307  // (psid == -1) do not have any session attached
1308  XrdProofdProofServ *xps = 0;
1309  if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
1310  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1311  response->Send(kXR_InvalidRequest,"session ID not found");
1312  return 0;
1313  }
1314 
1315  // For manager connections we are done
1316  kXR_int32 pingres = (psid > -1) ? 0 : 1;
1317  if (psid > -1 && xps && xps->IsValid()) {
1318 
1319  TRACET(TraceID(), DBG, "EXT: psid: "<<psid);
1320 
1321  // This is the max time we will privide an answer
1322  kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
1323 
1324  // If asynchronous return the timeout for an answer
1325  if (asyncopt == 1) {
1326  TRACET(TraceID(), DBG, "EXT: async: notifying timeout to client: "<<checkfq<<" secs");
1327  response->Send(kXR_ok, checkfq);
1328  }
1329 
1330  // Admin path
1331  XrdOucString path(xps->AdminPath());
1332  if (path.length() <= 0) {
1333  TRACET(TraceID(), XERR, "EXT: admin path is empty! - protocol error");
1334  if (asyncopt == 0)
1335  response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error");
1336  return 0;
1337  }
1338  path += ".status";
1339 
1340  // Current time
1341  int now = time(0);
1342 
1343  // Stat the admin file
1344  struct stat st0;
1345  if (stat(path.c_str(), &st0) != 0) {
1346  TRACET(TraceID(), XERR, "EXT: cannot stat admin path: "<<path);
1347  if (asyncopt == 0)
1348  response->Send(kXP_ServerError, "EXT: cannot stat admin path");
1349  return 0;
1350  }
1351 
1352  // Take the pid
1353  int pid = xps->SrvPID();
1354  // If the session is alive ...
1355  if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1356  // If it as not touched during the last ~checkfq secs we ask for a refresh
1357  if ((now - st0.st_mtime) > checkfq - 5) {
1358  // Send the request (asking for further propagation)
1359  if (xps->VerifyProofServ(1) != 0) {
1360  TRACET(TraceID(), XERR, "EXT: could not send verify request to proofsrv");
1361  if (asyncopt == 0)
1362  response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv");
1363  return 0;
1364  }
1365  // Wait for the action for checkfq secs, checking every 1 sec
1366  struct stat st1;
1367  int ns = checkfq;
1368  while (ns--) {
1369  if (stat(path.c_str(), &st1) == 0) {
1370  if (st1.st_mtime > st0.st_mtime) {
1371  pingres = 1;
1372  break;
1373  }
1374  }
1375  // Wait 1 sec
1376  TRACET(TraceID(), DBG, "EXT: waiting "<<ns<<" secs for session "<<pid<<
1377  " to touch the admin path");
1378  sleep(1);
1379  }
1380 
1381  } else {
1382  // Session is alive
1383  pingres = 1;
1384  }
1385  } else {
1386  // Session is dead
1387  pingres = 0;
1388  }
1389 
1390  // Notify the client
1391  TRACET(TraceID(), DBG, "EXT: notified the result to client: "<<pingres);
1392  if (asyncopt == 0) {
1393  response->Send(kXR_ok, pingres);
1394  } else {
1395  // Prepare buffer for asynchronous notification
1396  int len = sizeof(kXR_int32);
1397  char *buf = new char[len];
1398  // Option
1399  kXR_int32 ifw = (kXR_int32)0;
1400  ifw = static_cast<kXR_int32>(htonl(ifw));
1401  memcpy(buf, &ifw, sizeof(kXR_int32));
1402  response->Send(kXR_attn, kXPD_ping, buf, len);
1403  }
1404  return 0;
1405  } else if (psid > -1) {
1406  // This is a failure for connections to sessions
1407  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1408  }
1409 
1410  // Send the result
1411  response->Send(kXR_ok, pingres);
1412 
1413  // Done
1414  return 0;
1415 }
1416 
1417 ////////////////////////////////////////////////////////////////////////////////
1418 /// Post change of session status
1419 
1420 void XrdProofdProtocol::PostSession(int on, const char *u, const char *g,
1421  XrdProofdProofServ *xps)
1422 {
1423  XPDLOC(ALL, "Protocol::PostSession")
1424 
1425  // Tell the priority manager
1426  if (fgMgr && fgMgr->PriorityMgr()) {
1427  int pid = (xps) ? xps->SrvPID() : -1;
1428  if (pid < 0) {
1429  TRACE(XERR, "undefined session or process id");
1430  return;
1431  }
1432  XrdOucString buf;
1433  XPDFORM(buf, "%d %s %s %d", on, u, g, pid);
1434 
1436  buf.c_str()) != 0) {
1437  TRACE(XERR, "problem posting the prority manager pipe");
1438  }
1439  }
1440  // Tell the scheduler
1441  if (fgMgr && fgMgr->ProofSched()) {
1442  if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
1443  TRACE(DBG, "posting the scheduler pipe");
1444  if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
1445  TRACE(XERR, "problem posting the scheduler pipe");
1446  }
1447  }
1448  }
1449  // Tell the session manager
1450  if (fgMgr && fgMgr->SessionMgr()) {
1452  TRACE(XERR, "problem posting the session manager pipe");
1453  }
1454  }
1455  // Done
1456  return;
1457 }
1458 
1459 ////////////////////////////////////////////////////////////////////////////////
1460 /// Recording time of the last request on this instance
1461 
1463 {
1464  XPDLOC(ALL, "Protocol::TouchAdminPath")
1465 
1466  XPD_SETRESPV(this, "TouchAdminPath");
1467  TRACET(TraceID(), HDBG, fAdminPath);
1468 
1469  if (fAdminPath.length() > 0) {
1470  int rc = 0;
1471  if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
1472  // In the case the file was not found and the connetion is internal
1473  // try also the terminated sessions, as the file could have been moved
1474  // in the meanwhile
1475  XrdOucString apath = fAdminPath;
1476  if (rc == -ENOENT && Internal()) {
1477  apath.replace("/activesessions/", "/terminatedsessions/");
1478  apath.replace(".status", "");
1479  rc = XrdProofdAux::Touch(apath.c_str());
1480  }
1481  if (rc != 0 && rc != -ENOENT) {
1482  const char *type = Internal() ? "internal" : "external";
1483  TRACET(TraceID(), XERR, type<<": problems touching "<<apath<<"; errno: "<<-rc);
1484  }
1485  }
1486  }
1487  // Done
1488  return;
1489 }
1490 
1491 ////////////////////////////////////////////////////////////////////////////////
1492 /// Set and propagate a Ctrl-C request
1493 
1495 {
1496  XPDLOC(ALL, "Protocol::CtrlC")
1497 
1498  TRACET(TraceID(), ALL, "handling request");
1499 
1501  fIsCtrlC = 1;
1502  }
1503 
1504  // Propagate now
1505  if (fgMgr) {
1506  if (fgMgr->SrvType() != kXPD_Worker) {
1507  if (fgMgr->NetMgr()) {
1508  fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());
1509  }
1510  }
1511  }
1512 
1513  // Over
1514  return 0;
1515 }
static void PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps)
Post change of session status.
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define kXPD_querynum
XrdProofdProtocol * objectItem()
Definition: XpdObject.h:43
static XpdObjectQ fgProtStack
#define XPD_LOGGEDIN
void SetReconnectTime(bool on=1)
Change reconnecting status.
#define TRACE_FORK
#define kXPD_TopMaster
const char * RootdExe() const
static XrdSysLogger * fgLogger
int ResetClientSlot(int ic)
Reset slot at &#39;ic&#39;.
#define TRACING(x)
XrdProofdProofServMgr * SessionMgr() const
XrdSecEntity * fSecClient
#define XPROOFD_VERSBIN
const double pi
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdROOT * DefaultVersion() const
Definition: XrdROOT.h:118
XrdProofdNetMgr * NetMgr() const
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
struct XPClientInterruptRequest interrupt
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
#define TRACE(Flag, Args)
Definition: TGHtml.h:124
int Process(XrdProofdProtocol *p)
Process manager request.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
bool Match(short int id) const
XrdScheduler * Sched() const
void setItem(XrdProofdProtocol *ival)
Definition: XpdObject.h:49
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
static XrdSysError fgEDest
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
XPClientRequest fRequest
static XrdSysRecMutex fgBMutex
#define kXPD_ClientMaster
static int ChangePerm(uid_t uid, gid_t gid)
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
rpdunixsrv * RootdUnixSrv() const
bool RootdFork() const
struct ClientRequestHdr header
#define kXPD_setidle
Int_t bsize[]
Definition: SparseFit4.cxx:31
you should not use this method at all Int_t Int_t Double_t Double_t em
Definition: TRolke.cxx:630
static int Configure(char *parms, XrdProtocol_Config *pi)
Protocol configuration tool Function: Establish configuration at load time.
XrdProofSched * ProofSched() const
#define XPROOFD_VERSION
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
XrdSecEntity fSecEntity
const char * User() const
static XrdBuffManager * fgBPool
static XrdProofdManager * fgMgr
int SrvType() const
std::vector< XrdProofdResponse * > fResponses
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
void Reset()
Reset static and local vars.
XrdProofdClient * fPClient
XrdOucString fUser
Definition: XrdProofdAux.h:40
void DisconnectFromProofServ(int pid)
Change reconnecting status.
XrdSecProtocol * fAuthProt
struct XPClientSendRcvRequest sendrcv
#define XPDLOC(d, x)
XrdProofdPipe * Pipe()
unsigned char fClntCapVer
#define TRACESET(act, on)
void Push(XpdObject *Node)
Push back a protocol.
Definition: XpdObject.cxx:47
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID &#39;sid&#39;.
XrdProofdPipe * Pipe()
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
XrdProofdProtocol * Pop()
Pop up a protocol object.
Definition: XpdObject.cxx:31
XrdOucString fGroup
Definition: XrdProofdAux.h:41
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
const char ** RootdArgs() const
struct XPClientProofRequest proof
void Set(XrdLink *l)
Set the link to be used by this response.
void Recycle(XrdLink *lp, int x, const char *y)
Recycle call. Release the instance and give it back to the stack.
bool IsRootdAllowed(const char *host)
Check if &#39;host&#39; is allowed to access files via rootd.
#define XrdSysError
Definition: XpdSysError.h:8
static int Touch(const char *path, int opt=0)
Set access (opt == 1), modify (opt =2 ) or access&modify (opt = 0, default) times of path to current ...
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
#define TRACE_REQ
#define TRACE_DOMAINS
const char * TraceID() const
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
const char * TraceID() const
#define kXPD_logmsg
#define XPD_SETRESPV(p, x)
const char * AdminPath() const
int Touch(bool reset=0)
Send a touch the connected clients: this will remotely touch the associated TSocket instance and sche...
XrdProofdProtocol(XrdProtocol_Config *pi=0)
Protocol constructor.
XrdROOTMgr * ROOTMgr() const
#define XPDFORM
Definition: XrdProofdAux.h:381
#define kXPD_Worker
void TouchAdminPath()
Recording time of the last request on this instance.
int Process2()
Local processing method: here the request is dispatched to the appropriate method.
XrdProtocol * Xrootd() const
#define TRACE_MEM
int type
Definition: TGX11.cxx:120
int SendData(int cid, void *buff, int len)
Send data to client cid.
#define kXPD_fb_prog
XrdSysRecMutex fCtrlcMutex
XrdProofdResponse * GetNewResponse(kXR_unt16 rid)
Create new response instance for stream ID &#39;sid&#39;.
int Ping()
Handle a ping request.
#define kXPD_startprocess
static int fgEUidAtStartup
int Process(XrdLink *lp)
Process the information received on the active link.
#define XPD_SETRESP(p, x)
R__EXTERN C unsigned int sleep(unsigned int seconds)
static XrdSysLogger gMainLogger
XrdProofdResponse * Response() const
int StartRootd(XrdLink *lp, XrdOucString &emsg)
Transfer the connection to a rootd daemon to serve a file access request Return 0 on success...
XrdProofdPriorityMgr * PriorityMgr() const
int Urgent()
Handle generic request of a urgent message to be forwarded to the server.
#define TRACET(tid, act, x)
int XrdgetProtocolPort(const char *, char *, XrdProtocol_Config *pi)
This function is called early on to determine the port we need to use.
XrdProtocol * XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
This protocol is meant to live in a shared library.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
XrdOucString fName
Definition: XrdProofdAux.h:111
int Stats(char *buff, int blen, int do_sync)
Return statistics info about the protocol.
#define XPD_DEF_PORT
struct ResetCtrlcGuard ResetCtrlcGuard_t
XrdProofUI UI() const
#define snprintf
Definition: civetweb.c:822
int Interrupt()
Handle an interrupt request.
int SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open client links of session &#39;xps&#39;.
XrdProofdClient * Client() const
int Post(int type, const char *msg)
Post message on the pipe.
void Set(int inQMax, time_t agemax=1800)
Lock the data area and set the values.
Definition: XpdObject.cxx:64
int SendMsg()
Handle a request to forward a message to another process.
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named &#39;pname&#39; and process &#39;pid&#39; is still in the process table. ...
XrdOucTrace * XrdProofdTrace
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
double exp(double)
int CtrlC()
Set and propagate a Ctrl-C request.
const char * BinDir() const
Definition: XrdROOT.h:66
XrdOucString fAdminPath
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int Send(void)
Auxilliary Send method.
const Int_t n
Definition: legend1.C:16
void SetStartMsg(XrdSrvBuffer *sm)
int SendData(XrdProofdProofServ *xps, kXR_int32 sid=-1, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open link. Segmentation is done here, if required.
static bool fgConfigDone