ROOT  6.06/09
Reference Guide
TXSocket.h
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: G. Ganis Oct 2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TXSocket
13 #define ROOT_TXSocket
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TXSocket //
18 // //
19 // High level handler of connections to xproofd. //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #define DFLT_CONNECTMAXTRY 10
24 
25 #ifndef ROOT_TMutex
26 #include "TMutex.h"
27 #endif
28 #ifndef ROOT_TSemaphore
29 #include "TSemaphore.h"
30 #endif
31 #ifndef ROOT_TString
32 #include "TString.h"
33 #endif
34 #ifndef ROOT_TList
35 #include "TList.h"
36 #endif
37 #ifndef ROOT_TMessage
38 #include "TMessage.h"
39 #endif
40 #ifndef ROOT_TUrl
41 #include "TUrl.h"
42 #endif
43 #ifndef ROOT_TSocket
44 #include "TSocket.h"
45 #endif
46 #ifndef ROOT_XrdProofConn
47 #include "XrdProofConn.h"
48 #endif
49 #ifndef XRC_UNSOLMSG_H
50 #include "XrdClient/XrdClientUnsolMsg.hh"
51 #endif
52 
53 #include <list>
54 
55 class TObjString;
56 class TXSockBuf;
57 class TXSockPipe;
58 class TXHandler;
59 class TXSocketHandler;
60 class XrdClientMessage;
61 
62 // To transmit info to Handlers
63 typedef struct {
68 } XHandleIn_t;
69 typedef struct {
71  const char *fMsg;
72 } XHandleErr_t;
73 
74 class TXSocket : public TSocket, public XrdClientAbsUnsolMsgHandler {
75 
76 friend class TXProofMgr;
77 friend class TXProofServ;
78 friend class TXSlave;
79 friend class TXSocketHandler;
80 friend class TXSockPipe;
81 friend class TXUnixSocket;
82 
83 private:
84  char fMode; // 'e' (def) or 'i' (internal - proofsrv)
85  kXR_int32 fSendOpt; // Options for sending messages
86  Short_t fSessionID; // proofsrv: remote ID of connected session
87  TString fUser; // Username used for login
88  TString fHost; // Remote host
89  Int_t fPort; // Remote port
90 
91  Int_t fLogLevel; // Log level to be transmitted to servers
92 
93  TString fBuffer; // Container for exchanging information
94  TObject *fReference; // Generic object reference of this socket
95  TXHandler *fHandler; // Handler of asynchronous events (input, error)
96 
97  XrdProofConn *fConn; // instance of the underlying connection module
98 
99  // Asynchronous messages
100  TSemaphore fASem; // Control access to conn async msg queue
101  TMutex *fAMtx; // To protect async msg queue
102  Bool_t fAWait; // kTRUE if waiting at the async msg queue
103  std::list<TXSockBuf *> fAQue; // list of asynchronous messages
104  Int_t fByteLeft; // bytes left in the first buffer
105  Int_t fByteCur; // current position in the first buffer
106  TXSockBuf *fBufCur; // current read buffer
107 
108  TSemaphore fAsynProc; // Control actions while processing async messages
109 
110  // Interrupts
111  TMutex *fIMtx; // To protect interrupt queue
112  kXR_int32 fILev; // Highest received interrupt
113  Bool_t fIForward; // Whether the interrupt should be propagated
114 
115  // Process ID of the instatiating process (to signal interrupts)
117 
118  // Whether to timeout or not
119  Bool_t fDontTimeout; // If true wait forever for incoming messages
120  Bool_t fRDInterrupt; // To interrupt waiting for messages
121 
122  // Version of the remote XrdProofdProtocol
124 
125  // Static area for input handling
126  static TXSockPipe fgPipe; // Pipe for input monitoring
127  static TString fgLoc; // Location string
128  static Bool_t fgInitDone; // Avoid initializing more than once
129 
130  // List of spare buffers
131  static TMutex fgSMtx; // To protect spare list
132  static std::list<TXSockBuf *> fgSQue; // list of spare buffers
133 
134  // Manage asynchronous message
135  Int_t PickUpReady();
137  void PushBackSpare();
138 
139  // Post a message into the queue for asynchronous processing
140  void PostMsg(Int_t type, const char *msg = 0);
141 
142  // Auxilliary
143  Int_t GetLowSocket() const { return (fConn ? fConn->GetLowSocket() : -1); }
144 
145  static void SetLocation(const char *loc = ""); // Set location string
146 
147  static void InitEnvs(); // Initialize environment variables
148 
149 public:
150  // Should be the same as in proofd/src/XrdProofdProtocol::Urgent
151  enum EUrgentMsgType { kStopProcess = 2000 };
152 
153  TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
154  const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
155 #if 0
156  TXSocket(const TXSocket &xs);
157  TXSocket& operator=(const TXSocket& xs);
158 #endif
159  virtual ~TXSocket();
160 
161  virtual void Close(Option_t *opt = "");
162  Bool_t Create(Bool_t attach = kFALSE);
163  void DisconnectSession(Int_t id, Option_t *opt = "");
164 
165  void DoError(int level,
166  const char *location, const char *fmt, va_list va) const;
167 
168  virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s,
169  XrdClientMessage *msg);
170 
171  virtual Int_t GetClientID() const { return -1; }
172  virtual Int_t GetClientIDSize() const { return 1; }
173  Int_t GetLogConnID() const { return (fConn ? fConn->GetLogConnID() : -1); }
174  Int_t GetOpenError() const { return (fConn ? fConn->GetOpenError() : -1); }
175  Int_t GetServType() const { return (fConn ? fConn->GetServType() : -1); }
176  Int_t GetSessionID() const { return (fConn ? fConn->GetSessionID() : -1); }
178 
179  Bool_t IsValid() const { return (fConn ? (fConn->IsValid()) : kFALSE); }
181  virtual void RemoveClientID() { }
182  virtual void SetClientID(Int_t) { }
183  void SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
184  void SetSessionID(Int_t id);
185 
186  // Send interfaces
187  Int_t Send(const TMessage &mess);
188  Int_t Send(Int_t kind) { return TSocket::Send(kind); }
190  { return TSocket::Send(status, kind); }
191  Int_t Send(const char *mess, Int_t kind = kMESS_STRING)
192  { return TSocket::Send(mess, kind); }
193  Int_t SendRaw(const void *buf, Int_t len,
195 
196  TObjString *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
197  Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
198 
199  // Recv interfaces
200  Int_t Recv(TMessage *&mess);
202  { return TSocket::Recv(status, kind); }
203  Int_t Recv(char *mess, Int_t max)
204  { return TSocket::Recv(mess, max); }
205  Int_t Recv(char *mess, Int_t max, Int_t &kind)
206  { return TSocket::Recv(mess, max, kind); }
207  Int_t RecvRaw(void *buf, Int_t len,
208  ESendRecvOptions opt = kDefault);
209 
210  // Interrupts
212  Int_t GetInterrupt(Bool_t &forward);
213 
214  // Urgent message
215  void SendUrgent(Int_t type, Int_t int1, Int_t int2);
216 
217  // Interrupt the low level socket
218  inline void SetInterrupt(Bool_t i = kTRUE) { R__LOCKGUARD(fAMtx);
219  fRDInterrupt = i;
220  if (i && fConn) fConn->SetInterrupt();
221  if (i && fAWait) fASem.Post(); }
222  inline Bool_t IsInterrupt() { R__LOCKGUARD(fAMtx); return fRDInterrupt; }
223  // Set / Check async msg queue waiting status
224  inline void SetAWait(Bool_t w = kTRUE) { R__LOCKGUARD(fAMtx); fAWait = w; }
225  inline Bool_t IsAWait() { R__LOCKGUARD(fAMtx); return fAWait; }
226 
227  // Flush the asynchronous queue
228  Int_t Flush();
229 
230  // Ping the counterpart
231  Bool_t Ping(const char *ord = 0);
232 
233  // Request remote touch of the admin file associated with this connection
234  void RemoteTouch();
235  // Propagate a Ctrl-C
236  void CtrlC();
237 
238  // Standard options cannot be set
240 
241  // Disable / Enable read timeout
242  void DisableTimeout() { fDontTimeout = kTRUE; }
243  void EnableTimeout() { fDontTimeout = kFALSE; }
244 
245  // Try reconnection after error
246  virtual Int_t Reconnect();
247 
248  ClassDef(TXSocket, 0) //A high level connection class for PROOF
249 };
250 
251 
252 //
253 // The following structure is used to store buffers received asynchronously
254 //
255 class TXSockBuf {
256 public:
262 
263  TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
264  ~TXSockBuf();
265 
266  void Resize(Int_t sz);
267 
268  static Long64_t BuffMem();
269  static Long64_t GetMemMax();
270  static void SetMemMax(Long64_t memmax);
271 
272 private:
274  static Long64_t fgBuffMem; // Total allocated memory
275  static Long64_t fgMemMax; // Max allocated memory allowed
276 };
277 
278 //
279 // The following class describes internal pipes
280 //
281 class TXSockPipe {
282 public:
283 
284  TXSockPipe(const char *loc = "");
285  virtual ~TXSockPipe();
286 
287  Bool_t IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
288 
290 
291  Int_t GetRead() const { return fPipe[0]; }
292  Int_t Post(TSocket *s); // Notify socket ready via global pipe
293  Int_t Clean(TSocket *s); // Clean previous pipe notification
294  Int_t Flush(TSocket *s); // Remove any instance of 's' from the pipe
295  void DumpReadySock();
296 
297  void SetLoc(const char *loc = "") { fLoc = loc; }
298 
299 private:
300  TMutex fMutex; // Protect access to the sockets-ready list
301  Int_t fPipe[2]; // Pipe for input monitoring
302  TString fLoc; // Location string
303  TList fReadySock; // List of sockets ready to be read
304 };
305 
306 //
307 // Guard for a semaphore
308 //
310 public:
311 
313  virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
314 
315  Bool_t IsValid() const { return fValid; }
316 
317 private:
320 };
321 
322 #endif
void SetSendOpt(ESendRecvOptions o)
Definition: TXSocket.h:183
Definition: TMutex.h:37
TXSocket(const char *url, Char_t mode= 'M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Definition: TXSocket.cxx:127
Int_t fCid
Definition: TXSocket.h:261
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:254
Bool_t IsValid() const
Definition: TXSocket.h:179
virtual void RemoveClientID()
Definition: TXSocket.h:181
virtual void SetClientID(Int_t)
Definition: TXSocket.h:182
Int_t Send(const char *mess, Int_t kind=kMESS_STRING)
Send a character string buffer.
Definition: TXSocket.h:191
TSemaphore fAsynProc
Definition: TXSocket.h:108
Int_t Recv(Int_t &status, Int_t &kind)
Receives a status and a message type.
Definition: TXSocket.h:201
long long Long64_t
Definition: RtypesCore.h:69
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Definition: TXSocket.cxx:1176
TSemaphore fASem
Definition: TXSocket.h:100
void SetLoc(const char *loc="")
Definition: TXSocket.h:297
Int_t Recv(char *mess, Int_t max)
Receive a character string message of maximum max length.
Definition: TXSocket.h:203
Int_t GetXrdProofdVersion() const
Definition: TXSocket.h:177
Int_t fPipe[2]
Definition: TXSocket.h:301
Bool_t IsValid() const
Definition: TXSocket.h:287
Collectable string class.
Definition: TObjString.h:32
virtual void Close(Option_t *opt="")
Close connection.
Definition: TXSocket.cxx:323
Bool_t IsAWait()
Definition: TXSocket.h:225
const char Option_t
Definition: RtypesCore.h:62
ESockOptions
Definition: TSocket.h:52
Int_t fLogLevel
Definition: TXSocket.h:91
static Long64_t BuffMem()
Return the currently allocated memory.
Definition: TXSocket.cxx:2174
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual Int_t GetClientID() const
Definition: TXSocket.h:171
int GetLogConnID() const
Definition: XrdProofConn.h:140
XrdProofConn * fConn
Definition: TXSocket.h:97
char fMode
Definition: TXSocket.h:84
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
EUrgentMsgType
Definition: TXSocket.h:151
Int_t TryWait()
If semaphore value is > 0 then decrement it and return 0.
Definition: TSemaphore.cxx:81
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition: TXSocket.cxx:380
~TXSockBuf()
destructor
Definition: TXSocket.cxx:2143
TMutex * fAMtx
Definition: TXSocket.h:101
Basic string class.
Definition: TString.h:137
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TXSocket()
Destructor.
Definition: TXSocket.cxx:239
Int_t Recv(char *mess, Int_t max, Int_t &kind)
Receive a character string message of maximum max length.
Definition: TXSocket.h:205
const Bool_t kFALSE
Definition: Rtypes.h:92
TMutex * fIMtx
Definition: TXSocket.h:111
int GetServType() const
Definition: XrdProofConn.h:143
Int_t GetServType() const
Definition: TXSocket.h:175
void SetInterrupt(Bool_t i=kTRUE)
Definition: TXSocket.h:218
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Definition: TXSocket.cxx:268
Int_t fInt3
Definition: TXSocket.h:66
TXSocket * GetLastReady()
Return last ready socket.
Definition: TXSocket.cxx:2337
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Definition: TXSocket.cxx:1305
Int_t fOpt
Definition: TXSocket.h:70
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2228
ESendRecvOptions
Definition: TSocket.h:65
Int_t fInt2
Definition: TXSocket.h:65
static Long64_t fgBuffMem
Definition: TXSocket.h:274
virtual ~TXSemaphoreGuard()
Definition: TXSocket.h:313
void SetInterrupt()
Interrupt the underlying socket.
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
Definition: TXSocket.cxx:2257
static Long64_t fgMemMax
Definition: TXSocket.h:275
Bool_t fAWait
Definition: TXSocket.h:102
Int_t fPort
Definition: TXSocket.h:89
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TXSocket.cxx:1541
#define ClassDef(name, id)
Definition: Rtypes.h:254
static void InitEnvs()
Init environment variables for XrdClient.
Definition: TXSocket.cxx:1928
const char * ord
Definition: TXSlave.cxx:46
static std::list< TXSockBuf * > fgSQue
Definition: TXSocket.h:132
Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TXSocket.cxx:1639
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition: TXSocket.cxx:930
Bool_t fIForward
Definition: TXSocket.h:113
TXHandler * fHandler
Definition: TXSocket.h:95
Int_t GetLowSocket() const
Definition: TXSocket.h:143
virtual Int_t GetClientIDSize() const
Definition: TXSocket.h:172
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Definition: TXSocket.cxx:1470
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
Definition: TXSocket.cxx:917
TString fUser
Definition: TXSocket.h:87
int GetOpenError() const
Definition: XrdProofConn.h:142
Int_t fInt4
Definition: TXSocket.h:67
A doubly linked list.
Definition: TList.h:47
Bool_t fOwn
Definition: TXSocket.h:260
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Definition: TXSocket.cxx:1011
Int_t fByteCur
Definition: TXSocket.h:105
Int_t Flush()
Flush the asynchronous queue.
Definition: TXSocket.cxx:962
Bool_t IsInterrupt()
Definition: TXSocket.h:222
TString fHost
Definition: TXSocket.h:88
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
Definition: TXSocket.cxx:2131
void SetAWait(Bool_t w=kTRUE)
Definition: TXSocket.h:224
Int_t fXrdProofdVersion
Definition: TXSocket.h:123
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
Definition: TXSocket.cxx:860
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Definition: TXSocket.cxx:1233
TString fBuffer
Definition: TXSocket.h:93
static TMutex fgSMtx
Definition: TXSocket.h:131
short Short_t
Definition: RtypesCore.h:35
std::list< TXSockBuf * > fAQue
Definition: TXSocket.h:103
Int_t Post()
If any threads are blocked in Wait(), wake one of them up and increment the value of the semaphore...
Definition: TSemaphore.cxx:105
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
Definition: TXSocket.cxx:2288
static TString fgLoc
Definition: TXSocket.h:127
Char_t * fBuf
Definition: TXSocket.h:259
TString fLoc
Definition: TXSocket.h:302
TList fReadySock
Definition: TXSocket.h:303
Int_t fSiz
Definition: TXSocket.h:257
TXSemaphoreGuard(TSemaphore *sem)
Definition: TXSocket.h:312
Int_t fLen
Definition: TXSocket.h:258
virtual ~TXSockPipe()
Destructor.
Definition: TXSocket.cxx:2217
virtual Int_t Reconnect()
Try reconnection after failure.
Definition: TXSocket.cxx:2077
Int_t Send(Int_t status, Int_t kind)
Send a status and a single message opcode.
Definition: TXSocket.h:189
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition: TXSocket.cxx:280
TXSockBuf * fBufCur
Definition: TXSocket.h:106
int GetLowSocket()
Return the socket descriptor of the underlying connection.
Short_t fSessionID
Definition: TXSocket.h:86
Int_t Send(Int_t kind)
Send a single message opcode.
Definition: TXSocket.h:188
Bool_t IsValid() const
Definition: TXSocket.h:315
bool IsValid() const
Test validity of this connection.
TSocket & operator=(const TSocket &)
int type
Definition: TGX11.cxx:120
#define R__LOCKGUARD(mutex)
Bool_t fDontTimeout
Definition: TXSocket.h:119
TXSockPipe(const char *loc="")
Constructor.
Definition: TXSocket.cxx:2203
void DisableTimeout()
Definition: TXSocket.h:242
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TXSocket.cxx:1724
Char_t * fMem
Definition: TXSocket.h:273
Int_t fInt1
Definition: TXSocket.h:64
kXR_int32 fSendOpt
Definition: TXSocket.h:85
Int_t GetLogConnID() const
Definition: TXSocket.h:173
static Vc_ALWAYS_INLINE int_v max(const int_v &x, const int_v &y)
Definition: vector.h:440
Bool_t fRDInterrupt
Definition: TXSocket.h:120
void DumpReadySock()
Dump content of the ready socket list.
Definition: TXSocket.cxx:2322
TObject * fReference
Definition: TXSocket.h:94
Mother of all ROOT objects.
Definition: TObject.h:58
Int_t SetOption(ESockOptions, Int_t)
Set socket options.
Definition: TXSocket.h:239
char Char_t
Definition: RtypesCore.h:29
void PushBackSpare()
Release read buffer giving back to the spare list.
Definition: TXSocket.cxx:1520
const char * fMsg
Definition: TXSocket.h:71
kXR_int32 fILev
Definition: TXSocket.h:112
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Definition: TXSocket.cxx:1379
static Bool_t fgInitDone
Definition: TXSocket.h:128
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
Definition: TXSocket.cxx:1775
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
Definition: TXSocket.cxx:1892
static TXSockPipe fgPipe
Definition: TXSocket.h:126
Int_t GetSessionID() const
Definition: TXSocket.h:176
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2190
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
Definition: TXSocket.cxx:1598
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition: TXSocket.cxx:71
Int_t fPid
Definition: TXSocket.h:116
short GetSessionID() const
Definition: XrdProofConn.h:144
TSemaphore * fSem
Definition: TXSocket.h:318
Int_t GetRead() const
Definition: TXSocket.h:291
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t fByteLeft
Definition: TXSocket.h:104
Int_t GetOpenError() const
Definition: TXSocket.h:174
void EnableTimeout()
Definition: TXSocket.h:243
void Resize(Int_t sz)
resize socket buffer
Definition: TXSocket.cxx:2154
void CtrlC()
Interrupt the remote protocol instance.
Definition: TXSocket.cxx:1343
TMutex fMutex
Definition: TXSocket.h:300
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2182