Logo ROOT  
Reference Guide
XrdProofdProtocol.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: Gerardo Ganis 12/12/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// XrdProofdProtocol //
15// //
16// Authors: G. Ganis, CERN, 2005 //
17// //
18// XrdProtocol implementation to coordinate 'proofserv' applications. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "XrdProofdPlatform.h"
23
24#include "XpdSysError.h"
25#include "XpdSysLogger.h"
26
27#include "XrdSys/XrdSysPriv.hh"
28#include "XrdOuc/XrdOucStream.hh"
29
30#include "XrdVersion.hh"
31#include "Xrd/XrdBuffer.hh"
32#include "Xrd/XrdScheduler.hh"
33
34#include "XrdProofdClient.h"
35#include "XrdProofdClientMgr.h"
36#include "XrdProofdConfig.h"
37#include "XrdProofdManager.h"
38#include "XrdProofdNetMgr.h"
41#include "XrdProofdProtocol.h"
42#include "XrdProofdResponse.h"
43#include "XrdProofdProofServ.h"
44#include "XrdProofSched.h"
45#include "XrdROOT.h"
46#include "rpdconn.h"
47
48// Tracing utils
49#include "XrdProofdTrace.h"
50XrdOucTrace *XrdProofdTrace = 0;
51
52// Loggers: we need two to avoid deadlocks
54
55//
56// Static area: general protocol managing section
59 "xproofd protocol anchor");
60XrdSysRecMutex XrdProofdProtocol::fgBMutex; // Buffer management mutex
61XrdBuffManager *XrdProofdProtocol::fgBPool = 0;
65//
66// Static area: protocol configuration section
68//
70// Cluster manager
72
73// Effective uid
75
76// Local definitions
77#define MAX_ARGS 128
78
79// Macros used to set conditional options
80#ifndef XPDCOND
81#define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
82#endif
83#ifndef XPDSETSTRING
84#define XPDSETSTRING(n,ns,c,s) \
85 { if (XPDCOND(n,ns)) { \
86 SafeFree(c); c = strdup(s.c_str()); ns = n; }}
87#endif
88
89#ifndef XPDADOPTSTRING
90#define XPDADOPTSTRING(n,ns,c,s) \
91 { char *t = 0; \
92 XPDSETSTRING(n, ns, t, s); \
93 if (t && strlen(t)) { \
94 SafeFree(c); c = t; \
95 } else \
96 SafeFree(t); }
97#endif
98
99#ifndef XPDSETINT
100#define XPDSETINT(n,ns,i,s) \
101 { if (XPDCOND(n,ns)) { \
102 i = strtol(s.c_str(),0,10); ns = n; }}
103#endif
104
105typedef struct {
106 kXR_int32 ptyp; // must be always 0 !
107 kXR_int32 rlen;
108 kXR_int32 pval;
109 kXR_int32 styp;
110} hs_response_t;
111
112typedef struct ResetCtrlcGuard {
114 int type;
115 ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { }
116 ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
118
119//
120// Derivation of XrdProofdConfig to read the port from the config file
121class XrdProofdProtCfg : public XrdProofdConfig {
122public:
123 int fPort; // The port on which we listen
124 XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0);
125 int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool);
126 void RegisterDirectives();
127};
128
129////////////////////////////////////////////////////////////////////////////////
130/// Constructor
131
132XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest)
133 : XrdProofdConfig(cfg, edest)
134{
135 fPort = -1;
136 RegisterDirectives();
137}
138
139////////////////////////////////////////////////////////////////////////////////
140/// Register directives for configuration
141
142void XrdProofdProtCfg::RegisterDirectives()
143{
144 Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass));
145 Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass));
146}
147
148////////////////////////////////////////////////////////////////////////////////
149/// Parse directives
150
151int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
152 char *val, XrdOucStream *cfg, bool)
153{
154 if (!d) return -1;
155
156 XrdOucString port(val);
157 if (d->fName == "xrd.protocol") {
158 port = cfg->GetWord();
159 port.replace("xproofd:", "");
160 } else if (d->fName != "port") {
161 return -1;
162 }
163 if (port.length() > 0) {
164 fPort = strtol(port.c_str(), 0, 10);
165 }
166 fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
167 return 0;
168}
169
170#if (ROOTXRDVERS >= 300030000)
171XrdVERSIONINFO(XrdgetProtocol,xproofd);
172XrdVERSIONINFO(XrdgetProtocolPort,xproofd);
173#endif
174
175extern "C" {
176////////////////////////////////////////////////////////////////////////////////
177/// This protocol is meant to live in a shared library. The interface below is
178/// used by the server to obtain a copy of the protocol object that can be used
179/// to decide whether or not a link is talking a particular protocol.
180
181XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
182{
183 // Return the protocol object to be used if static init succeeds
184 if (XrdProofdProtocol::Configure(parms, pi)) {
185
186 return (XrdProtocol *) new XrdProofdProtocol(pi);
187 }
188 return (XrdProtocol *)0;
189}
190
191////////////////////////////////////////////////////////////////////////////////
192/// This function is called early on to determine the port we need to use. The
193/// The default is ostensibly 1093 but can be overidden; which we allow.
194
195int XrdgetProtocolPort(const char * /*pname*/, char * /*parms*/, XrdProtocol_Config *pi)
196{
197 // Default XPD_DEF_PORT (1093)
198 int port = XPD_DEF_PORT;
199
200 if (pi) {
201 XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
202 // Init some relevant quantities for tracing
203 XrdProofdTrace = new XrdOucTrace(pi->eDest);
204 pcfg.Config(0);
205
206 if (pcfg.fPort > 0) {
207 port = pcfg.fPort;
208 } else {
209 port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
210 }
211 }
212 return port;
213}}
214
215////////////////////////////////////////////////////////////////////////////////
216/// Protocol constructor
217
219 : XrdProtocol("xproofd protocol handler"), fProtLink(this)
220{
221 fLink = 0;
222 fArgp = 0;
223 fPClient = 0;
224 fSecClient = 0;
225 fAuthProt = 0;
226 fResponses.reserve(10);
227
228 fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
229
230 // Instantiate a Proofd protocol object
231 Reset();
232}
233
234////////////////////////////////////////////////////////////////////////////////
235/// Get response instance corresponding to stream ID 'sid'
236
238{
239 XPDLOC(ALL, "Protocol::Response")
240
241 TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
242
243 if (sid > 0)
244 if (sid <= fResponses.size())
245 return fResponses[sid-1];
246
247 return (XrdProofdResponse *)0;
248}
249
250////////////////////////////////////////////////////////////////////////////////
251/// Create new response instance for stream ID 'sid'
252
254{
255 XPDLOC(ALL, "Protocol::GetNewResponse")
256
257 XrdOucString msg;
258 XPDFORM(msg, "sid: %d", sid);
259 if (sid > 0) {
260 if (sid > fResponses.size()) {
261 if (sid > fResponses.capacity()) {
262 int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
263 fResponses.reserve(newsz);
264 if (TRACING(DBG)) {
265 msg += " new capacity: ";
266 msg += (int) fResponses.capacity();
267 }
268 }
269 int nnew = sid - fResponses.size();
270 while (nnew--)
271 fResponses.push_back(new XrdProofdResponse());
272 if (TRACING(DBG)) {
273 msg += "; new size: ";
274 msg += (int) fResponses.size();
275 }
276 }
277 } else {
278 TRACE(XERR,"wrong sid: "<<sid);
279 return (XrdProofdResponse *)0;
280 }
281
282 TRACE(DBG, msg);
283
284 // Done
285 return fResponses[sid-1];
286}
287
288////////////////////////////////////////////////////////////////////////////////
289/// Check whether the request matches this protocol
290
292{
293 XPDLOC(ALL, "Protocol::Match")
294
295 struct ClientInitHandShake hsdata;
296 char *hsbuff = (char *)&hsdata;
297
298 static hs_response_t hsresp = {0, 0, kXR_int32(htonl(XPROOFD_VERSBIN)), 0};
299
300 XrdProtocol *xp = nullptr;
301 int dlen;
302 TRACE(HDBG, "enter");
303
304 XrdOucString emsg;
305 // Peek at the first 20 bytes of data
306 if ((dlen = lp->Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) {
307 if (dlen <= 0) lp->setEtext("Match: handshake not received");
308 if (dlen == 12) {
309 // Check if it is a request to open a file via 'rootd', unsupported
310 hsdata.first = ntohl(hsdata.first);
311 if (hsdata.first == 8) {
312 emsg = "rootd-file serving not supported any-longer";
313 }
314 if (emsg.length() > 0) {
315 lp->setEtext(emsg.c_str());
316 } else {
317 lp->setEtext("link transfered");
318 }
319 return xp;
320 }
321 TRACE(XERR, "peeked incomplete or empty information! (dlen: "<<dlen<<" bytes)");
322 return xp;
323 }
324
325 // If this is is not our protocol, we check if it a data serving request via xrootd
326 hsdata.third = ntohl(hsdata.third);
327 if (dlen != sizeof(hsdata) || hsdata.first || hsdata.second
328 || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
329
330 // Check if it is a request to open a file via 'xrootd'
331 if (fgMgr->Xrootd() && (xp = fgMgr->Xrootd()->Match(lp))) {
332 TRACE(ALL, "matched xrootd protocol on link: serving a file");
333 } else {
334 TRACE(XERR, "failed to match any known or enabled protocol");
335 }
336 return xp;
337 }
338
339 // Respond to this request with the handshake response
340 if (!lp->Send((char *)&hsresp, sizeof(hsresp))) {
341 lp->setEtext("Match: handshake failed");
342 TRACE(XERR, "handshake failed");
343 return xp;
344 }
345
346 // We can now read all 20 bytes and discard them (no need to wait for it)
347 int len = sizeof(hsdata);
348 if (lp->Recv(hsbuff, len) != len) {
349 lp->setEtext("Match: reread failed");
350 TRACE(XERR, "reread failed");
351 return xp;
352 }
353
354 // Get a protocol object off the stack (if none, allocate a new one)
355 XrdProofdProtocol *xpp = nullptr;
356 if (!(xpp = fgProtStack.Pop()))
357 xpp = new XrdProofdProtocol();
358
359 // Bind the protocol to the link and return the protocol
360 xpp->fLink = lp;
361 snprintf(xpp->fSecEntity.prot, XrdSecPROTOIDSIZE, "host");
362 xpp->fSecEntity.host = strdup((char *)lp->Host());
363
364 // Dummy data used by 'proofd'
365 kXR_int32 dum[2];
366 if (xpp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) {
367 xpp->Recycle(0,0,0);
368 }
369
370 xp = (XrdProtocol *) xpp;
371
372 // We are done
373 return xp;
374}
375
376////////////////////////////////////////////////////////////////////////////////
377/// Return statistics info about the protocol.
378/// Not really implemented yet: this is a reduced XrdXrootd version.
379
380int XrdProofdProtocol::Stats(char *buff, int blen, int)
381{
382 static char statfmt[] = "<stats id=\"xproofd\"><num>%ld</num></stats>";
383
384 // If caller wants only size, give it to them
385 if (!buff)
386 return sizeof(statfmt)+16;
387
388 // We have only one statistic -- number of successful matches
389 return snprintf(buff, blen, statfmt, fgCount);
390}
391
392////////////////////////////////////////////////////////////////////////////////
393/// Reset static and local vars
394
396{
397 // Init local vars
398 fLink = 0;
399 fPid = -1;
400 fArgp = 0;
401 fStatus = 0;
402 fClntCapVer = 0;
404 fSuperUser = 0;
405 fPClient = 0;
406 fUserIn = "";
407 fGroupIn = "";
408 fCID = -1;
409 fTraceID = "";
410 fAdminPath = "";
411 if (fAuthProt) {
412 fAuthProt->Delete();
413 fAuthProt = 0;
414 }
415 fSecEntity = XrdSecEntity();
416 // Cleanup existing XrdProofdResponse objects
417 std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin(); // One per each logical connection
418 while (ii != fResponses.end()) {
419 (*ii)->Reset();
420 ++ii;
421 }
422}
423
424////////////////////////////////////////////////////////////////////////////////
425/// Protocol configuration tool
426/// Function: Establish configuration at load time.
427/// Output: 1 upon success or 0 otherwise.
428
429int XrdProofdProtocol::Configure(char *parms, XrdProtocol_Config *pi)
430{
431 XPDLOC(ALL, "Protocol::Configure")
432
433 XrdOucString mp;
434
435 // Only once
436 if (fgConfigDone)
437 return 1;
438 fgConfigDone = 1;
439
440 // Copy out the special info we want to use at top level
441 fgLogger = pi->eDest->logger();
442 fgEDest.logger(fgLogger);
443 if (XrdProofdTrace) delete XrdProofdTrace; // It could have been initialized in XrdgetProtocolPort
444 XrdProofdTrace = new XrdOucTrace(&fgEDest);
445 fgBPool = pi->BPool;
446 fgReadWait = pi->readWait;
447
448 // Pre-initialize some i/o values
449 fgMaxBuffsz = fgBPool->MaxSize();
450
451 // Schedule protocol object cleanup; the maximum number of objects
452 // and the max age are taken from XrdXrootdProtocol: this may need
453 // some optimization in the future.
454#if 1
456 fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
457#else
458 fgProtStack.Set(pi->Sched, 3600);
459#endif
460
461 // Default tracing options: always trace logins and errors for all
462 // domains; if the '-d' option was specified on the command line then
463 // trace also REQ and FORM.
464 // NB: these are superseeded by settings in the config file (xpd.trace)
466 TRACESET(XERR, 1);
467 TRACESET(LOGIN, 1);
468 TRACESET(RSP, 0);
469 if (pi->DebugON)
471
472 // Work as root to avoid contineous changes of the effective user
473 // (users are logged in their box after forking)
474 fgEUidAtStartup = geteuid();
475 if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
476
477 // Process the config file for directives meaningful to us
478 // Create and Configure the manager
479 fgMgr = new XrdProofdManager(parms, pi, &fgEDest);
480 if (fgMgr->Config(0)) return 0;
481 mp = "global manager created";
482 TRACE(ALL, mp);
483
484 // Issue herald indicating we configured successfully
485 TRACE(ALL, "xproofd protocol version "<<XPROOFD_VERSION<<
486 " build "<<XrdVERSION<<" successfully loaded");
487
488 // Return success
489 return 1;
490}
491
492////////////////////////////////////////////////////////////////////////////////
493/// Process the information received on the active link.
494/// (We ignore the argument here)
495
497{
498 XPDLOC(ALL, "Protocol::Process")
499
500 int rc = 0;
501 TRACET(TraceID(), DBG, "instance: " << this);
502
503 // Read the next request header
504 if ((rc = GetData("request", (char *)&fRequest, sizeof(fRequest))) != 0)
505 return rc;
506 TRACET(TraceID(), HDBG, "after GetData: rc: " << rc);
507
508 // Deserialize the data
509 fRequest.header.requestid = ntohs(fRequest.header.requestid);
510 fRequest.header.dlen = ntohl(fRequest.header.dlen);
511
512 // Get response object
513 kXR_unt16 sid;
514 memcpy((void *)&sid, (const void *)&(fRequest.header.streamid[0]), 2);
515 XrdProofdResponse *response = 0;
516 if (!(response = Response(sid))) {
517 if (!(response = GetNewResponse(sid))) {
518 TRACET(TraceID(), XERR, "could not get Response instance for rid: "<< sid);
519 return rc;
520 }
521 }
522 // Set the stream ID for the reply
523 response->Set(fRequest.header.streamid);
524 response->Set(fLink);
525
526 TRACET(TraceID(), REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid <<
528 ")" << ", dlen: " <<fRequest.header.dlen);
529
530 // Every request has an associated data length. It better be >= 0 or we won't
531 // be able to know how much data to read.
532 if (fRequest.header.dlen < 0) {
533 response->Send(kXR_ArgInvalid, "Process: Invalid request data length");
534 return fLink->setEtext("Process: protocol data length error");
535 }
536
537 // Read any argument data at this point, except when the request is to forward
538 // a buffer: the argument may have to be segmented and we're not prepared to do
539 // that here.
540 if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
541 if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
542 response->Send(kXR_ArgTooLong, "fRequest.argument is too long");
543 return rc;
544 }
545 if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen)))
546 return rc;
547 fArgp->buff[fRequest.header.dlen] = '\0';
548 }
549
550 // Continue with request processing at the resume point
551 return Process2();
552}
553
554////////////////////////////////////////////////////////////////////////////////
555/// Local processing method: here the request is dispatched to the appropriate
556/// method
557
559{
560 XPDLOC(ALL, "Protocol::Process2")
561
562 int rc = 0;
563 XPD_SETRESP(this, "Process2");
564
565 TRACET(TraceID(), REQ, "req id: " << fRequest.header.requestid << " (" <<
567
568 ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
569
570 // If the user is logged in check if the wanted action is to be done by us
571 if (fStatus && (fStatus & XPD_LOGGEDIN)) {
572 // Record time of the last action
574 // We must have a client instance if here
575 if (!fPClient) {
576 TRACET(TraceID(), XERR, "client undefined!!! ");
577 response->Send(kXR_InvalidRequest,"client undefined!!! ");
578 return 0;
579 }
580 bool formgr = 0;
581 switch(fRequest.header.requestid) {
582 case kXP_ctrlc:
583 rc = CtrlC();
584 break;
585 case kXP_touch:
586 // Reset the asked-to-touch flag, if it was never set
587 fPClient->Touch(1);
588 break;
589 case kXP_interrupt:
590 rc = Interrupt();
591 break;
592 case kXP_ping:
593 rc = Ping();
594 break;
595 case kXP_sendmsg:
596 rc = SendMsg();
597 break;
598 case kXP_urgent:
599 rc = Urgent();
600 break;
601 default:
602 formgr = 1;
603 }
604 if (!formgr) {
605 // Check the link
606 if (!fLink || (fLink->FDnum() <= 0)) {
607 TRACE(XERR, "link is undefined! ");
608 return -1;
609 }
610 return rc;
611 }
612 }
613
614 // The request is for the manager
615 rc = fgMgr->Process(this);
616 // Check the link
617 if (!fLink || (fLink->FDnum() <= 0)) {
618 TRACE(XERR, "link is undefined! ");
619 return -1;
620 }
621 return rc;
622}
623
624////////////////////////////////////////////////////////////////////////////////
625/// Recycle call. Release the instance and give it back to the stack.
626
627void XrdProofdProtocol::Recycle(XrdLink *, int, const char *)
628{
629 XPDLOC(ALL, "Protocol::Recycle")
630
631 const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster",
632 "ClientMaster", "Internal", "Admin"};
633 XrdOucString buf;
634
635 // Document the disconnect
636 if (fPClient)
637 XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(),
638 srvtype[fConnType+1]);
639 else
640 XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]);
641 TRACET(TraceID(), LOGIN, buf);
642
643 // If we have a buffer, release it
644 if (fArgp) {
645 fgBPool->Release(fArgp);
646 fArgp = 0;
647 }
648
649 // Locate the client instance
651
652 if (pmgr) {
653 if (!Internal()) {
654
655 TRACE(REQ,"External disconnection of protocol associated with pid "<<fPid);
656
657 // Write disconnection file
658 XrdOucString discpath(fAdminPath);
659 discpath.replace("/cid", "/disconnected");
660 FILE *fd = fopen(discpath.c_str(), "w");
661 if (!fd && errno != ENOENT) {
662 TRACE(XERR, "unable to create path: " <<discpath<<" (errno: "<<errno<<")");
663 } else if (fd) {
664 fclose(fd);
665 }
666
667 // Remove protocol and response from attached client/proofserv instances
668 // Set reconnect flag if proofserv instances attached to this client are still running
669 pmgr->ResetClientSlot(fCID);
670 if(fgMgr && fgMgr->SessionMgr()) {
672
674 if((fConnType == 0) && fgMgr->SessionMgr()->Alive(this)) {
675 TRACE(REQ, "Non-destroyed proofserv processes attached to this protocol ("<<this<<
676 "), setting reconnect time");
678 }
680 } else {
681 TRACE(XERR, "No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("
682 <<(fgMgr ? fgMgr->SessionMgr() : (void *) -1)<<")")
683 }
684
685 } else {
686
687 // Internal connection: we need to remove this instance from the list
688 // of proxy servers and to notify the attached clients.
689 // Tell the session manager that this session has gone
690 if (fgMgr && fgMgr->SessionMgr()) {
692 TRACE(HDBG, "fAdminPath: "<<fAdminPath);
693 buf.assign(fAdminPath, fAdminPath.rfind('/') + 1, -1);
694 fgMgr->SessionMgr()->DeleteFromSessions(buf.c_str());
695 // Move the entry to the terminated sessions area
696 fgMgr->SessionMgr()->MvSession(buf.c_str());
697 }
698 else {
699 TRACE(XERR,"No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("<<fgMgr->SessionMgr()<<")")
700 }
701 }
702 }
703 // Set fields to starting point (debugging mostly)
704 Reset();
705
706 // Push ourselves on the stack
708#if 0
709 if(fgProtStack.Push(&fProtLink) != 0) {
712 delete xp;
713 }
714#endif
715}
716
717////////////////////////////////////////////////////////////////////////////////
718/// Allocate a buffer to handle quantum bytes; if argp points to an existing
719/// buffer, its size is checked and re-allocated if needed
720
721XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp)
722{
723 XPDLOC(ALL, "Protocol::GetBuff")
724
725 TRACE(HDBG, "len: "<<quantum);
726
727 // If we are given an existing buffer, we keep it if we use at least half
728 // of it; otherwise we take a smaller one
729 if (argp) {
730 if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
731 return argp;
732 }
733
734 // Release the buffer if too small
736 if (argp)
737 fgBPool->Release(argp);
738
739 // Obtain a new one
740 if ((argp = fgBPool->Obtain(quantum)) == 0) {
741 TRACE(XERR, "could not get requested buffer (size: "<<quantum<<
742 ") = insufficient memory");
743 } else {
744 TRACE(HDBG, "quantum: "<<quantum<<
745 ", buff: "<<(void *)(argp->buff)<<", bsize:"<<argp->bsize);
746 }
747
748 // Done
749 return argp;
750}
751
752////////////////////////////////////////////////////////////////////////////////
753/// Release a buffer previously allocated via GetBuff
754
756{
758 fgBPool->Release(argp);
759}
760
761////////////////////////////////////////////////////////////////////////////////
762/// Get data from the open link
763
764int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen)
765{
766 XPDLOC(ALL, "Protocol::GetData")
767
768 int rlen;
769
770 // Read the data but reschedule the link if we have not received all of the
771 // data within the timeout interval.
772 TRACET(TraceID(), HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
773
774 // No need to lock:the link is disable while we are here
775 rlen = fLink->Recv(buff, blen, fgReadWait);
776 if (rlen < 0) {
777 if (rlen != -ENOMSG && rlen != -ECONNRESET) {
778 XrdOucString emsg = "link read error: errno: ";
779 emsg += -rlen;
780 TRACET(TraceID(), XERR, emsg.c_str());
781 return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
782 } else {
783 TRACET(TraceID(), HDBG, "connection closed by peer (errno: "<<-rlen<<")");
784 return -1;
785 }
786 }
787 if (rlen < blen) {
788 TRACET(TraceID(), DBG, dtype << " timeout; read " <<rlen <<" of " <<blen <<" bytes - rescheduling");
789 return 1;
790 }
791 TRACET(TraceID(), HDBG, "rlen: "<<rlen);
792
793 return 0;
794}
795
796////////////////////////////////////////////////////////////////////////////////
797/// Send data over the open link. Segmentation is done here, if required.
798
800 kXR_int32 sid, XrdSrvBuffer **buf, bool savebuf)
801{
802 XPDLOC(ALL, "Protocol::SendData")
803
804 int rc = 0;
805
806 TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
807
808 // Buffer length
809 int len = fRequest.header.dlen;
810
811 // Quantum size
812 int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
813
814 // Get a buffer
815 XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
816 if (!argp) return -1;
817
818 // Now send over all of the data as unsolicited messages
819 XrdOucString msg;
820 while (len > 0) {
821
822 XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
823
824 if ((rc = GetData("data", argp->buff, quantum))) {
825 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
826 return -1;
827 }
828 if (buf && !(*buf) && savebuf)
829 *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
830 // Send
831 if (sid > -1) {
832 if (TRACING(HDBG))
833 XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum);
834 if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
835 argp->buff, quantum) != 0) {
836 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
837 XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server",
838 sid, quantum);
839 TRACET(TraceID(), XERR, msg);
840 return -1;
841 }
842 } else {
843
844 // Get ID of the client
845 int cid = ntohl(fRequest.sendrcv.cid);
846 if (TRACING(HDBG))
847 XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum);
848 if (xps->SendData(cid, argp->buff, quantum) != 0) {
849 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
850 XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client",
851 cid, quantum);
852 TRACET(TraceID(), XERR, msg);
853 return -1;
854 }
855 }
856 TRACET(TraceID(), HDBG, msg);
857 // Next segment
858 len -= quantum;
859 if (len < quantum)
860 quantum = len;
861 }
862
863 // Release the buffer
864 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
865
866 // Done
867 return 0;
868}
869
870////////////////////////////////////////////////////////////////////////////////
871/// Send data over the open client links of session 'xps'.
872/// Used when all the connected clients are eligible to receive the message.
873/// Segmentation is done here, if required.
874
876 XrdSrvBuffer **buf, bool savebuf)
877{
878 XPDLOC(ALL, "Protocol::SendDataN")
879
880 int rc = 0;
881
882 TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
883
884 // Buffer length
885 int len = fRequest.header.dlen;
886
887 // Quantum size
888 int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
889
890 // Get a buffer
891 XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
892 if (!argp) return -1;
893
894 // Now send over all of the data as unsolicited messages
895 while (len > 0) {
896 if ((rc = GetData("data", argp->buff, quantum))) {
898 return -1;
899 }
900 if (buf && !(*buf) && savebuf)
901 *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
902
903 // Send to connected clients
904 if (xps->SendDataN(argp->buff, quantum) != 0) {
906 return -1;
907 }
908
909 // Next segment
910 len -= quantum;
911 if (len < quantum)
912 quantum = len;
913 }
914
915 // Release the buffer
917
918 // Done
919 return 0;
920}
921
922////////////////////////////////////////////////////////////////////////////////
923/// Handle a request to forward a message to another process
924
926{
927 XPDLOC(ALL, "Protocol::SendMsg")
928
929 static const char *crecv[5] = {"master proofserv", "top master",
930 "client", "undefined", "any"};
931 int rc = 0;
932
933 XPD_SETRESP(this, "SendMsg");
934
935 // Unmarshall the data
936 int psid = ntohl(fRequest.sendrcv.sid);
937 int opt = ntohl(fRequest.sendrcv.opt);
938
939 XrdOucString msg;
940 // Find server session
941 XrdProofdProofServ *xps = 0;
942 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
943 XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid);
944 TRACET(TraceID(), XERR, msg.c_str());
945 response->Send(kXR_InvalidRequest, msg.c_str());
946 return 0;
947 }
948
949 // Message length
950 int len = fRequest.header.dlen;
951
952 if (!Internal()) {
953
954 if (TRACING(HDBG)) {
955 // Notify
956 XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
957 " cid: %d)", len, psid, xps, xps->Status(), fCID);
958 TRACET(TraceID(), HDBG, msg.c_str());
959 }
960
961 // Send to proofsrv our client ID
962 if (fCID == -1) {
963 TRACET(TraceID(), REQ, "EXT: error getting clientSID");
964 response->Send(kXP_ServerError,"EXT: getting clientSID");
965 return 0;
966 }
967 if (SendData(xps, fCID)) {
968 TRACET(TraceID(), REQ, "EXT: error sending message to proofserv");
969 response->Send(kXP_reconnecting,"EXT: sending message to proofserv");
970 return 0;
971 }
972
973 // Notify to user
974 response->Send();
975
976 } else {
977
978 if (TRACING(HDBG)) {
979 // Notify
980 XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
981 len, psid, xps, xps->Status());
982 TRACET(TraceID(), HDBG, msg.c_str());
983 }
984 bool saveStartMsg = 0;
985 XrdSrvBuffer *savedBuf = 0;
986 // Additional info about the message
987 if (opt & kXPD_setidle) {
988 TRACET(TraceID(), DBG, "INT: setting proofserv in 'idle' state");
989 xps->SetStatus(kXPD_idle);
990 PostSession(-1, fPClient->UI().fUser.c_str(),
991 fPClient->UI().fGroup.c_str(), xps);
992 } else if (opt & kXPD_querynum) {
993 TRACET(TraceID(), DBG, "INT: got message with query number");
994 } else if (opt & kXPD_startprocess) {
995 TRACET(TraceID(), DBG, "INT: setting proofserv in 'running' state");
997 PostSession(1, fPClient->UI().fUser.c_str(),
998 fPClient->UI().fGroup.c_str(), xps);
999 // Save start processing message for later clients
1000 xps->DeleteStartMsg();
1001 saveStartMsg = 1;
1002 } else if (opt & kXPD_logmsg) {
1003 // We broadcast log messages only not idle to catch the
1004 // result from processing
1005 if (xps->Status() == kXPD_running) {
1006 TRACET(TraceID(), DBG, "INT: broadcasting log message");
1007 opt |= kXPD_fb_prog;
1008 }
1009 }
1010 bool fbprog = (opt & kXPD_fb_prog);
1011
1012 if (!fbprog) {
1013 //
1014 // The message is strictly for the client requiring it
1015 if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1016 response->Send(kXP_reconnecting,
1017 "SendMsg: INT: session is reconnecting: retry later");
1018 return 0;
1019 }
1020 } else {
1021 // Send to all connected clients
1022 if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1023 response->Send(kXP_reconnecting,
1024 "SendMsg: INT: session is reconnecting: retry later");
1025 return 0;
1026 }
1027 }
1028 // Save start processing messages, if required
1029 if (saveStartMsg)
1030 xps->SetStartMsg(savedBuf);
1031
1032 if (TRACING(DBG)) {
1033 int ii = xps->SrvType();
1034 if (ii > 3) ii = 3;
1035 if (ii < 0) ii = 4;
1036 XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len);
1037 TRACET(TraceID(), DBG, msg);
1038 }
1039 // Notify to proofsrv
1040 response->Send();
1041 }
1042
1043 // Over
1044 return 0;
1045}
1046
1047////////////////////////////////////////////////////////////////////////////////
1048/// Handle generic request of a urgent message to be forwarded to the server
1049
1051{
1052 XPDLOC(ALL, "Protocol::Urgent")
1053
1054 unsigned int rc = 0;
1055
1056 XPD_SETRESP(this, "Urgent");
1057
1058 // Unmarshall the data
1059 int psid = ntohl(fRequest.proof.sid);
1060 int type = ntohl(fRequest.proof.int1);
1061 int int1 = ntohl(fRequest.proof.int2);
1062 int int2 = ntohl(fRequest.proof.int3);
1063
1064 TRACET(TraceID(), REQ, "psid: "<<psid<<", type: "<< type);
1065
1066 // Find server session
1067 XrdProofdProofServ *xps = 0;
1068 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1069 TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1070 response->Send(kXR_InvalidRequest,"Urgent: session ID not found");
1071 return 0;
1072 }
1073
1074 TRACET(TraceID(), DBG, "xps: "<<xps<<", status: "<<xps->Status());
1075
1076 // Check ID matching
1077 if (!xps->Match(psid)) {
1078 response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing");
1079 return 0;
1080 }
1081
1082 // Check the link to the session
1083 if (!xps->Response()) {
1084 response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing");
1085 return 0;
1086 }
1087
1088 // Prepare buffer
1089 int len = 3 *sizeof(kXR_int32);
1090 char *buf = new char[len];
1091 // Type
1092 kXR_int32 itmp = static_cast<kXR_int32>(htonl(type));
1093 memcpy(buf, &itmp, sizeof(kXR_int32));
1094 // First info container
1095 itmp = static_cast<kXR_int32>(htonl(int1));
1096 memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1097 // Second info container
1098 itmp = static_cast<kXR_int32>(htonl(int2));
1099 memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1100 // Send over
1101 if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
1102 response->Send(kXP_ServerError,
1103 "Urgent: could not propagate request to proofsrv");
1104 return 0;
1105 }
1106
1107 // Notify to user
1108 response->Send();
1109 TRACET(TraceID(), DBG, "request propagated to proofsrv");
1110
1111 // Over
1112 return 0;
1113}
1114
1115////////////////////////////////////////////////////////////////////////////////
1116/// Handle an interrupt request
1117
1119{
1120 XPDLOC(ALL, "Protocol::Interrupt")
1121
1122 int rc = 0;
1123
1124 XPD_SETRESP(this, "Interrupt");
1125
1126 // Unmarshall the data
1127 int psid = ntohl(fRequest.interrupt.sid);
1128 int type = ntohl(fRequest.interrupt.type);
1129 TRACET(TraceID(), REQ, "psid: "<<psid<<", type:"<<type);
1130
1131 // Find server session
1132 XrdProofdProofServ *xps = 0;
1133 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1134 TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1135 response->Send(kXR_InvalidRequest,"Interrupt: session ID not found");
1136 return 0;
1137 }
1138
1139 if (xps) {
1140
1141 // Check ID matching
1142 if (!xps->Match(psid)) {
1143 response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing");
1144 return 0;
1145 }
1146
1147 XrdOucString msg;
1148 XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d",
1149 xps, xps->Response()->TraceID(), xps->SrvPID());
1150 TRACET(TraceID(), DBG, msg.c_str());
1151
1152 // Propagate the type as unsolicited
1153 if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
1154 response->Send(kXP_ServerError,
1155 "Interrupt: could not propagate interrupt code to proofsrv");
1156 return 0;
1157 }
1158
1159 // Notify to user
1160 response->Send();
1161 TRACET(TraceID(), DBG, "interrupt propagated to proofsrv");
1162 }
1163
1164 // Over
1165 return 0;
1166}
1167
1168////////////////////////////////////////////////////////////////////////////////
1169/// Handle a ping request.
1170/// For internal connections, ping is done asynchronously to avoid locking
1171/// problems; the session checker verifies that the admin file has been touched
1172/// recently enough; touching is done in Process2, so we have nothing to do here
1173
1175{
1176 XPDLOC(ALL, "Protocol::Ping")
1177
1178 int rc = 0;
1179 if (Internal()) {
1180 if (TRACING(HDBG)) {
1181 XPD_SETRESP(this, "Ping");
1182 TRACET(TraceID(), HDBG, "INT: nothing to do ");
1183 }
1184 return 0;
1185 }
1186 XPD_SETRESP(this, "Ping");
1187
1188 // Unmarshall the data
1189 int psid = ntohl(fRequest.sendrcv.sid);
1190 int asyncopt = ntohl(fRequest.sendrcv.opt);
1191
1192 TRACET(TraceID(), REQ, "psid: "<<psid<<", async: "<<asyncopt);
1193
1194 // For connections to servers find the server session; manager connections
1195 // (psid == -1) do not have any session attached
1196 XrdProofdProofServ *xps = 0;
1197 if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
1198 TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1199 response->Send(kXR_InvalidRequest,"session ID not found");
1200 return 0;
1201 }
1202
1203 // For manager connections we are done
1204 kXR_int32 pingres = (psid > -1) ? 0 : 1;
1205 if (psid > -1 && xps && xps->IsValid()) {
1206
1207 TRACET(TraceID(), DBG, "EXT: psid: "<<psid);
1208
1209 // This is the max time we will privide an answer
1210 kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
1211
1212 // If asynchronous return the timeout for an answer
1213 if (asyncopt == 1) {
1214 TRACET(TraceID(), DBG, "EXT: async: notifying timeout to client: "<<checkfq<<" secs");
1215 response->Send(kXR_ok, checkfq);
1216 }
1217
1218 // Admin path
1219 XrdOucString path(xps->AdminPath());
1220 if (path.length() <= 0) {
1221 TRACET(TraceID(), XERR, "EXT: admin path is empty! - protocol error");
1222 if (asyncopt == 0)
1223 response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error");
1224 return 0;
1225 }
1226 path += ".status";
1227
1228 // Current time
1229 int now = time(0);
1230
1231 // Stat the admin file
1232 struct stat st0;
1233 if (stat(path.c_str(), &st0) != 0) {
1234 TRACET(TraceID(), XERR, "EXT: cannot stat admin path: "<<path);
1235 if (asyncopt == 0)
1236 response->Send(kXP_ServerError, "EXT: cannot stat admin path");
1237 return 0;
1238 }
1239
1240 // Take the pid
1241 int pid = xps->SrvPID();
1242 // If the session is alive ...
1243 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1244 // If it as not touched during the last ~checkfq secs we ask for a refresh
1245 if ((now - st0.st_mtime) > checkfq - 5) {
1246 // Send the request (asking for further propagation)
1247 if (xps->VerifyProofServ(1) != 0) {
1248 TRACET(TraceID(), XERR, "EXT: could not send verify request to proofsrv");
1249 if (asyncopt == 0)
1250 response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv");
1251 return 0;
1252 }
1253 // Wait for the action for checkfq secs, checking every 1 sec
1254 struct stat st1;
1255 int ns = checkfq;
1256 while (ns--) {
1257 if (stat(path.c_str(), &st1) == 0) {
1258 if (st1.st_mtime > st0.st_mtime) {
1259 pingres = 1;
1260 break;
1261 }
1262 }
1263 // Wait 1 sec
1264 TRACET(TraceID(), DBG, "EXT: waiting "<<ns<<" secs for session "<<pid<<
1265 " to touch the admin path");
1266 sleep(1);
1267 }
1268
1269 } else {
1270 // Session is alive
1271 pingres = 1;
1272 }
1273 } else {
1274 // Session is dead
1275 pingres = 0;
1276 }
1277
1278 // Notify the client
1279 TRACET(TraceID(), DBG, "EXT: notified the result to client: "<<pingres);
1280 if (asyncopt == 0) {
1281 response->Send(kXR_ok, pingres);
1282 } else {
1283 // Prepare buffer for asynchronous notification
1284 int len = sizeof(kXR_int32);
1285 char *buf = new char[len];
1286 // Option
1287 kXR_int32 ifw = (kXR_int32)0;
1288 ifw = static_cast<kXR_int32>(htonl(ifw));
1289 memcpy(buf, &ifw, sizeof(kXR_int32));
1290 response->Send(kXR_attn, kXPD_ping, buf, len);
1291 }
1292 return 0;
1293 } else if (psid > -1) {
1294 // This is a failure for connections to sessions
1295 TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1296 }
1297
1298 // Send the result
1299 response->Send(kXR_ok, pingres);
1300
1301 // Done
1302 return 0;
1303}
1304
1305////////////////////////////////////////////////////////////////////////////////
1306/// Post change of session status
1307
1308void XrdProofdProtocol::PostSession(int on, const char *u, const char *g,
1309 XrdProofdProofServ *xps)
1310{
1311 XPDLOC(ALL, "Protocol::PostSession")
1312
1313 // Tell the priority manager
1314 if (fgMgr && fgMgr->PriorityMgr()) {
1315 int pid = (xps) ? xps->SrvPID() : -1;
1316 if (pid < 0) {
1317 TRACE(XERR, "undefined session or process id");
1318 return;
1319 }
1320 XrdOucString buf;
1321 XPDFORM(buf, "%d %s %s %d", on, u, g, pid);
1322
1324 buf.c_str()) != 0) {
1325 TRACE(XERR, "problem posting the prority manager pipe");
1326 }
1327 }
1328 // Tell the scheduler
1329 if (fgMgr && fgMgr->ProofSched()) {
1330 if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
1331 TRACE(DBG, "posting the scheduler pipe");
1332 if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
1333 TRACE(XERR, "problem posting the scheduler pipe");
1334 }
1335 }
1336 }
1337 // Tell the session manager
1338 if (fgMgr && fgMgr->SessionMgr()) {
1340 TRACE(XERR, "problem posting the session manager pipe");
1341 }
1342 }
1343 // Done
1344 return;
1345}
1346
1347////////////////////////////////////////////////////////////////////////////////
1348/// Recording time of the last request on this instance
1349
1351{
1352 XPDLOC(ALL, "Protocol::TouchAdminPath")
1353
1354 XPD_SETRESPV(this, "TouchAdminPath");
1355 TRACET(TraceID(), HDBG, fAdminPath);
1356
1357 if (fAdminPath.length() > 0) {
1358 int rc = 0;
1359 if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
1360 // In the case the file was not found and the connetion is internal
1361 // try also the terminated sessions, as the file could have been moved
1362 // in the meanwhile
1363 XrdOucString apath = fAdminPath;
1364 if (rc == -ENOENT && Internal()) {
1365 apath.replace("/activesessions/", "/terminatedsessions/");
1366 apath.replace(".status", "");
1367 rc = XrdProofdAux::Touch(apath.c_str());
1368 }
1369 if (rc != 0 && rc != -ENOENT) {
1370 const char *type = Internal() ? "internal" : "external";
1371 TRACET(TraceID(), XERR, type<<": problems touching "<<apath<<"; errno: "<<-rc);
1372 }
1373 }
1374 }
1375 // Done
1376 return;
1377}
1378
1379////////////////////////////////////////////////////////////////////////////////
1380/// Set and propagate a Ctrl-C request
1381
1383{
1384 XPDLOC(ALL, "Protocol::CtrlC")
1385
1386 TRACET(TraceID(), ALL, "handling request");
1387
1389 fIsCtrlC = 1;
1390 }
1391
1392 // Propagate now
1393 if (fgMgr) {
1394 if (fgMgr->SrvType() != kXPD_Worker) {
1395 if (fgMgr->NetMgr()) {
1396 fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());
1397 }
1398 }
1399 }
1400
1401 // Over
1402 return 0;
1403}
#define d(i)
Definition: RSha256.hxx:102
#define g(i)
Definition: RSha256.hxx:105
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
int type
Definition: TGX11.cxx:120
R__EXTERN C unsigned int sleep(unsigned int seconds)
#define kXPD_logmsg
@ kXP_sendmsg
@ kXP_ping
@ kXP_urgent
@ kXP_touch
@ kXP_interrupt
@ kXP_ctrlc
#define kXPD_ClientMaster
@ kXPD_running
@ kXPD_idle
#define kXPD_TopMaster
#define XPD_DEF_PORT
#define kXPD_startprocess
@ kXP_reconnecting
@ kXP_InvalidRequest
@ kXP_ServerError
#define kXPD_querynum
#define kXPD_fb_prog
#define kXPD_Worker
#define kXPD_setidle
@ kXPD_msgsid
@ kXPD_urgent
@ kXPD_interrupt
@ kXPD_ping
#define XrdSysError
Definition: XpdSysError.h:8
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define XPDFORM
Definition: XrdProofdAux.h:378
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define XPD_LOGGEDIN
static XrdSysLogger gMainLogger
XrdProtocol * XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
This protocol is meant to live in a shared library.
int XrdgetProtocolPort(const char *, char *, XrdProtocol_Config *pi)
This function is called early on to determine the port we need to use.
struct ResetCtrlcGuard ResetCtrlcGuard_t
XrdOucTrace * XrdProofdTrace
#define XPROOFD_VERSBIN
#define XPROOFD_VERSION
#define XPD_SETRESPV(p, x)
#define XPD_SETRESP(p, x)
#define TRACE_DOMAINS
#define XPDLOC(d, x)
#define TRACESET(act, on)
#define TRACE_FORK
#define TRACE_REQ
#define TRACE_MEM
#define TRACET(tid, act, x)
#define TRACING(x)
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
#define snprintf
Definition: civetweb.c:1540
void Set(int inQMax, time_t agemax=1800)
Lock the data area and set the values.
Definition: XpdObject.cxx:64
void Push(XpdObject *Node)
Push back a protocol.
Definition: XpdObject.cxx:47
XrdProofdProtocol * Pop()
Pop up a protocol object.
Definition: XpdObject.cxx:31
XrdProofdProtocol * objectItem()
Definition: XpdObject.h:43
void setItem(XrdProofdProtocol *ival)
Definition: XpdObject.h:49
XrdProofdPipe * Pipe()
XrdOucString fUser
Definition: XrdProofdAux.h:40
XrdOucString fGroup
Definition: XrdProofdAux.h:41
static int Touch(const char *path, int opt=0)
Set access (opt == 1), modify (opt =2 ) or access&modify (opt = 0, default) times of path to current ...
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table.
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
XrdProofUI UI() const
const char * User() const
int ResetClientSlot(int ic)
Reset slot at 'ic'.
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
int Touch(bool reset=0)
Send a touch the connected clients: this will remotely touch the associated TSocket instance and sche...
virtual int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool)
virtual void RegisterDirectives()
XrdProofdPriorityMgr * PriorityMgr() const
XrdProofSched * ProofSched() const
XrdProofdNetMgr * NetMgr() const
int Process(XrdProofdProtocol *p)
Process manager request.
XrdProtocol * Xrootd() const
int SrvType() const
XrdProofdProofServMgr * SessionMgr() const
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
int Post(int type, const char *msg)
Post message on the pipe.
XrdProofdPipe * Pipe()
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
void DisconnectFromProofServ(int pid)
Change reconnecting status.
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
void SetReconnectTime(bool on=1)
Change reconnecting status.
XrdProofdResponse * Response() const
const char * AdminPath() const
int SendData(int cid, void *buff, int len)
Send data to client cid.
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
bool Match(short int id) const
void SetStartMsg(XrdSrvBuffer *sm)
static int Configure(char *parms, XrdProtocol_Config *pi)
Protocol configuration tool Function: Establish configuration at load time.
void Recycle(XrdLink *lp, int x, const char *y)
Recycle call. Release the instance and give it back to the stack.
int SendMsg()
Handle a request to forward a message to another process.
int SendData(XrdProofdProofServ *xps, kXR_int32 sid=-1, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open link. Segmentation is done here, if required.
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
XrdSecEntity fSecEntity
XrdProofdResponse * GetNewResponse(kXR_unt16 rid)
Create new response instance for stream ID 'sid'.
static bool fgConfigDone
static XrdSysRecMutex fgBMutex
static void PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps)
Post change of session status.
static XrdSysError fgEDest
XrdProofdClient * fPClient
int SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open client links of session 'xps'.
static XpdObjectQ fgProtStack
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdProofdProtocol(XrdProtocol_Config *pi=0)
Protocol constructor.
static XrdProofdManager * fgMgr
XrdProofdClient * Client() const
int Process2()
Local processing method: here the request is dispatched to the appropriate method.
int CtrlC()
Set and propagate a Ctrl-C request.
int Stats(char *buff, int blen, int do_sync)
Return statistics info about the protocol.
static XrdBuffManager * fgBPool
std::vector< XrdProofdResponse * > fResponses
XrdSysRecMutex fCtrlcMutex
XrdSecProtocol * fAuthProt
XrdOucString fAdminPath
void Reset()
Reset static and local vars.
int Process(XrdLink *lp)
Process the information received on the active link.
static XrdSysLogger * fgLogger
int Interrupt()
Handle an interrupt request.
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
void TouchAdminPath()
Recording time of the last request on this instance.
int Urgent()
Handle generic request of a urgent message to be forwarded to the server.
XPClientRequest fRequest
static int fgEUidAtStartup
const char * TraceID() const
XrdSecEntity * fSecClient
unsigned char fClntCapVer
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
int Ping()
Handle a ping request.
void Set(XrdLink *l)
Set the link to be used by this response.
int Send(void)
Auxilliary Send method.
const char * TraceID() const
static int ChangePerm(uid_t uid, gid_t gid)
static constexpr double pi
static constexpr double ns
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientInterruptRequest interrupt
struct XPClientSendRcvRequest sendrcv