ROOT logo
// @(#)root/proofx:$Id: TXSocket.h 27035 2008-12-19 15:36:44Z ganis $
// Author: G. Ganis Oct 2005

/*************************************************************************
 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TXSocket
#define ROOT_TXSocket

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TXSocket                                                             //
//                                                                      //
// High level handler of connections to xproofd.                        //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#define DFLT_CONNECTMAXTRY           10

#ifndef ROOT_TMutex
#include "TMutex.h"
#endif
#ifndef ROOT_TSemaphore
#include "TSemaphore.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_TList
#include "TList.h"
#endif
#ifndef ROOT_TMessage
#include "TMessage.h"
#endif
#ifndef ROOT_TUrl
#include "TUrl.h"
#endif
#ifndef ROOT_TSocket
#include "TSocket.h"
#endif
#ifndef ROOT_XrdProofConn
#include "XrdProofConn.h"
#endif
#ifndef XRC_UNSOLMSG_H
#include "XrdClient/XrdClientUnsolMsg.hh"
#endif

#include <list>

class TObjString;
class TXSockBuf;
class TXSockPipe;
class TXHandler;
class TXSocketHandler;
class XrdClientMessage;

// To transmit info to Handlers
typedef struct {
   Int_t   fInt1;
   Int_t   fInt2;
   Int_t   fInt3;
   Int_t   fInt4;
} XHandleIn_t;
typedef struct {
   Int_t   fOpt;
   const char *fMsg;
} XHandleErr_t;

class TXSocket  : public TSocket, public XrdClientAbsUnsolMsgHandler {

friend class TXProofMgr;
friend class TXProofServ;
friend class TXSlave;
friend class TXSocketHandler;
friend class TXSockPipe;
friend class TXUnixSocket;

private:
   char                fMode;          // 'e' (def) or 'i' (internal - proofsrv)
   kXR_int32           fSendOpt;       // Options for sending messages
   Short_t             fSessionID;     // proofsrv: remote ID of connected session
   TString             fUser;          // Username used for login
   TString             fHost;          // Remote host
   Int_t               fPort;          // Remote port

   Int_t               fLogLevel;      // Log level to be transmitted to servers

   TString             fBuffer;        // Container for exchanging information
   TObject            *fReference;     // Generic object reference of this socket
   TXHandler          *fHandler;       // Handler of asynchronous events (input, error)

   XrdProofConn       *fConn;          // instance of the underlying connection module

   // Asynchronous messages
   TSemaphore          fASem;          // Control access to conn async msg queue
   TMutex             *fAMtx;          // To protect async msg queue
   std::list<TXSockBuf *> fAQue;          // list of asynchronous messages
   Int_t               fByteLeft;      // bytes left in the first buffer
   Int_t               fByteCur;       // current position in the first buffer
   TXSockBuf          *fBufCur;        // current read buffer

   // Interrupts
   TMutex             *fIMtx;          // To protect interrupt queue
   kXR_int32           fILev;          // Highest received interrupt
   Bool_t              fIForward;      // Wheter the interrupt should be propagated

   // Process ID of the instatiating process (to signal interrupts)
   Int_t               fPid;

   // Whether to timeout or not
   Bool_t              fDontTimeout;   // If true wait forever for incoming messages
   Bool_t              fRDInterrupt;   // To interrupt waiting for messages

   // Version of the remote XrdProofdProtocol
   Int_t               fXrdProofdVersion;

   // Static area for input handling
   static TXSockPipe   fgPipe;         //  Pipe for input monitoring
   static TString      fgLoc;          // Location string
   static Bool_t       fgInitDone;     // Avoid initializing more than once

   // List of spare buffers
   static TMutex       fgSMtx;          // To protect spare list
   static std::list<TXSockBuf *> fgSQue; // list of spare buffers

   // Manage asynchronous message
   Int_t               PickUpReady();
   TXSockBuf          *PopUpSpare(Int_t sz);
   void                PushBackSpare();

   // Post a message into the queue for asynchronous processing
   void                PostMsg(Int_t type);

   // Auxilliary
   Int_t               GetLowSocket() const { return (fConn ? fConn->GetLowSocket() : -1); }

   static void         SetLocation(const char *loc = ""); // Set location string

   static void         InitEnvs(); // Initialize environment variables

public:
   // Should be the same as in proofd/src/XrdProofdProtocol::Urgent
   enum EUrgentMsgType { kStopProcess = 2000 };

   TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
            const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
   TXSocket(const TXSocket &xs);
   TXSocket& operator=(const TXSocket& xs);
   virtual ~TXSocket();

   virtual void        Close(Option_t *opt = "");
   Bool_t              Create(Bool_t attach = kFALSE);
   void                DisconnectSession(Int_t id, Option_t *opt = "");

   void                DoError(int level,
                               const char *location, const char *fmt, va_list va) const;

   virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s,
                                                     XrdClientMessage *msg);

   virtual Int_t       GetClientID() const { return -1; }
   virtual Int_t       GetClientIDSize() const { return 1; }
   Int_t               GetLogConnID() const { return (fConn ? fConn->GetLogConnID() : -1); }
   Int_t               GetOpenError() const { return (fConn ? fConn->GetOpenError() : -1); }
   Int_t               GetServType() const { return (fConn ? fConn->GetServType() : -1); }
   Int_t               GetSessionID() const { return (fConn ? fConn->GetSessionID() : -1); }
   Int_t               GetXrdProofdVersion() const { return fXrdProofdVersion; }

   Bool_t              IsValid() const { return (fConn ? (fConn->IsValid()) : kFALSE); }
   Bool_t              IsServProofd();
   virtual void        RemoveClientID() { }
   virtual void        SetClientID(Int_t) { }
   void                SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
   void                SetSessionID(Int_t id);

   // Send interfaces
   Int_t               Send(const TMessage &mess);
   Int_t               Send(Int_t kind) { return TSocket::Send(kind); }
   Int_t               Send(Int_t status, Int_t kind)
                                        { return TSocket::Send(status, kind); }
   Int_t               Send(const char *mess, Int_t kind = kMESS_STRING)
                                        { return TSocket::Send(mess, kind); }
   Int_t               SendRaw(const void *buf, Int_t len,
                               ESendRecvOptions opt = kDontBlock);

   TObjString         *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
                                       Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);


   // Recv interfaces
   Int_t               Recv(TMessage *&mess);
   Int_t               Recv(Int_t &status, Int_t &kind)
                                        { return TSocket::Recv(status, kind); }
   Int_t               Recv(char *mess, Int_t max)
                                        { return TSocket::Recv(mess, max); }
   Int_t               Recv(char *mess, Int_t max, Int_t &kind)
                                        { return TSocket::Recv(mess, max, kind); }
   Int_t               RecvRaw(void *buf, Int_t len,
                               ESendRecvOptions opt = kDefault);

   // Interrupts
   Int_t               SendInterrupt(Int_t type);
   Int_t               GetInterrupt(Bool_t &forward);

   // Urgent message
   void                SendUrgent(Int_t type, Int_t int1, Int_t int2);

   // Interrupt the low level socket
   void                SetInterrupt() { fRDInterrupt = kTRUE;
                                        if (fConn) fConn->SetInterrupt(); }

   // Flush the asynchronous queue
   Int_t               Flush();

   // Ping the counterpart
   Bool_t              Ping(const char *ord = 0);

   // Request remote touch of the admin file associated with this connection
   void                RemoteTouch();

   // Standard options cannot be set
   Int_t               SetOption(ESockOptions, Int_t) { return 0; }

   // Disable / Enable read timeout
   void                DisableTimeout() { fDontTimeout = kTRUE; }
   void                EnableTimeout() { fDontTimeout = kFALSE; }

   // Try reconnection after error
   virtual Int_t       Reconnect();

   ClassDef(TXSocket, 0) //A high level connection class for PROOF
};


//
// The following structure is used to store buffers received asynchronously
//
class TXSockBuf {
public:
   Int_t   fSiz;
   Int_t   fLen;
   Char_t *fBuf;
   Bool_t  fOwn;
   Int_t   fCid;

   TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
  ~TXSockBuf();

   void Resize(Int_t sz);

   static Long64_t BuffMem();
   static Long64_t GetMemMax();
   static void     SetMemMax(Long64_t memmax);

private:
   Char_t *fMem;
   static Long64_t fgBuffMem; // Total allocated memory
   static Long64_t fgMemMax;  // Max allocated memory allowed
};

//
// The following class describes internal pipes
//
class TXSockPipe {
public:

   TXSockPipe(const char *loc = "");
   virtual ~TXSockPipe();

   Bool_t       IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }

   TXSocket    *GetLastReady();

   Int_t        GetRead() const { return fPipe[0]; }
   Int_t        Post(TSocket *s);  // Notify socket ready via global pipe
   Int_t        Clean(TSocket *s); // Clean previous pipe notification
   Int_t        Flush(TSocket *s); // Remove any instance of 's' from the pipe
   void         DumpReadySock();

   void         SetLoc(const char *loc = "") { fLoc = loc; }

private:
   TMutex       fMutex;     // Protect access to the sockets-ready list
   Int_t        fPipe[2];   // Pipe for input monitoring
   TString      fLoc;       // Location string
   TList        fReadySock;    // List of sockets ready to be read
};

#endif
 TXSocket.h:1
 TXSocket.h:2
 TXSocket.h:3
 TXSocket.h:4
 TXSocket.h:5
 TXSocket.h:6
 TXSocket.h:7
 TXSocket.h:8
 TXSocket.h:9
 TXSocket.h:10
 TXSocket.h:11
 TXSocket.h:12
 TXSocket.h:13
 TXSocket.h:14
 TXSocket.h:15
 TXSocket.h:16
 TXSocket.h:17
 TXSocket.h:18
 TXSocket.h:19
 TXSocket.h:20
 TXSocket.h:21
 TXSocket.h:22
 TXSocket.h:23
 TXSocket.h:24
 TXSocket.h:25
 TXSocket.h:26
 TXSocket.h:27
 TXSocket.h:28
 TXSocket.h:29
 TXSocket.h:30
 TXSocket.h:31
 TXSocket.h:32
 TXSocket.h:33
 TXSocket.h:34
 TXSocket.h:35
 TXSocket.h:36
 TXSocket.h:37
 TXSocket.h:38
 TXSocket.h:39
 TXSocket.h:40
 TXSocket.h:41
 TXSocket.h:42
 TXSocket.h:43
 TXSocket.h:44
 TXSocket.h:45
 TXSocket.h:46
 TXSocket.h:47
 TXSocket.h:48
 TXSocket.h:49
 TXSocket.h:50
 TXSocket.h:51
 TXSocket.h:52
 TXSocket.h:53
 TXSocket.h:54
 TXSocket.h:55
 TXSocket.h:56
 TXSocket.h:57
 TXSocket.h:58
 TXSocket.h:59
 TXSocket.h:60
 TXSocket.h:61
 TXSocket.h:62
 TXSocket.h:63
 TXSocket.h:64
 TXSocket.h:65
 TXSocket.h:66
 TXSocket.h:67
 TXSocket.h:68
 TXSocket.h:69
 TXSocket.h:70
 TXSocket.h:71
 TXSocket.h:72
 TXSocket.h:73
 TXSocket.h:74
 TXSocket.h:75
 TXSocket.h:76
 TXSocket.h:77
 TXSocket.h:78
 TXSocket.h:79
 TXSocket.h:80
 TXSocket.h:81
 TXSocket.h:82
 TXSocket.h:83
 TXSocket.h:84
 TXSocket.h:85
 TXSocket.h:86
 TXSocket.h:87
 TXSocket.h:88
 TXSocket.h:89
 TXSocket.h:90
 TXSocket.h:91
 TXSocket.h:92
 TXSocket.h:93
 TXSocket.h:94
 TXSocket.h:95
 TXSocket.h:96
 TXSocket.h:97
 TXSocket.h:98
 TXSocket.h:99
 TXSocket.h:100
 TXSocket.h:101
 TXSocket.h:102
 TXSocket.h:103
 TXSocket.h:104
 TXSocket.h:105
 TXSocket.h:106
 TXSocket.h:107
 TXSocket.h:108
 TXSocket.h:109
 TXSocket.h:110
 TXSocket.h:111
 TXSocket.h:112
 TXSocket.h:113
 TXSocket.h:114
 TXSocket.h:115
 TXSocket.h:116
 TXSocket.h:117
 TXSocket.h:118
 TXSocket.h:119
 TXSocket.h:120
 TXSocket.h:121
 TXSocket.h:122
 TXSocket.h:123
 TXSocket.h:124
 TXSocket.h:125
 TXSocket.h:126
 TXSocket.h:127
 TXSocket.h:128
 TXSocket.h:129
 TXSocket.h:130
 TXSocket.h:131
 TXSocket.h:132
 TXSocket.h:133
 TXSocket.h:134
 TXSocket.h:135
 TXSocket.h:136
 TXSocket.h:137
 TXSocket.h:138
 TXSocket.h:139
 TXSocket.h:140
 TXSocket.h:141
 TXSocket.h:142
 TXSocket.h:143
 TXSocket.h:144
 TXSocket.h:145
 TXSocket.h:146
 TXSocket.h:147
 TXSocket.h:148
 TXSocket.h:149
 TXSocket.h:150
 TXSocket.h:151
 TXSocket.h:152
 TXSocket.h:153
 TXSocket.h:154
 TXSocket.h:155
 TXSocket.h:156
 TXSocket.h:157
 TXSocket.h:158
 TXSocket.h:159
 TXSocket.h:160
 TXSocket.h:161
 TXSocket.h:162
 TXSocket.h:163
 TXSocket.h:164
 TXSocket.h:165
 TXSocket.h:166
 TXSocket.h:167
 TXSocket.h:168
 TXSocket.h:169
 TXSocket.h:170
 TXSocket.h:171
 TXSocket.h:172
 TXSocket.h:173
 TXSocket.h:174
 TXSocket.h:175
 TXSocket.h:176
 TXSocket.h:177
 TXSocket.h:178
 TXSocket.h:179
 TXSocket.h:180
 TXSocket.h:181
 TXSocket.h:182
 TXSocket.h:183
 TXSocket.h:184
 TXSocket.h:185
 TXSocket.h:186
 TXSocket.h:187
 TXSocket.h:188
 TXSocket.h:189
 TXSocket.h:190
 TXSocket.h:191
 TXSocket.h:192
 TXSocket.h:193
 TXSocket.h:194
 TXSocket.h:195
 TXSocket.h:196
 TXSocket.h:197
 TXSocket.h:198
 TXSocket.h:199
 TXSocket.h:200
 TXSocket.h:201
 TXSocket.h:202
 TXSocket.h:203
 TXSocket.h:204
 TXSocket.h:205
 TXSocket.h:206
 TXSocket.h:207
 TXSocket.h:208
 TXSocket.h:209
 TXSocket.h:210
 TXSocket.h:211
 TXSocket.h:212
 TXSocket.h:213
 TXSocket.h:214
 TXSocket.h:215
 TXSocket.h:216
 TXSocket.h:217
 TXSocket.h:218
 TXSocket.h:219
 TXSocket.h:220
 TXSocket.h:221
 TXSocket.h:222
 TXSocket.h:223
 TXSocket.h:224
 TXSocket.h:225
 TXSocket.h:226
 TXSocket.h:227
 TXSocket.h:228
 TXSocket.h:229
 TXSocket.h:230
 TXSocket.h:231
 TXSocket.h:232
 TXSocket.h:233
 TXSocket.h:234
 TXSocket.h:235
 TXSocket.h:236
 TXSocket.h:237
 TXSocket.h:238
 TXSocket.h:239
 TXSocket.h:240
 TXSocket.h:241
 TXSocket.h:242
 TXSocket.h:243
 TXSocket.h:244
 TXSocket.h:245
 TXSocket.h:246
 TXSocket.h:247
 TXSocket.h:248
 TXSocket.h:249
 TXSocket.h:250
 TXSocket.h:251
 TXSocket.h:252
 TXSocket.h:253
 TXSocket.h:254
 TXSocket.h:255
 TXSocket.h:256
 TXSocket.h:257
 TXSocket.h:258
 TXSocket.h:259
 TXSocket.h:260
 TXSocket.h:261
 TXSocket.h:262
 TXSocket.h:263
 TXSocket.h:264
 TXSocket.h:265
 TXSocket.h:266
 TXSocket.h:267
 TXSocket.h:268
 TXSocket.h:269
 TXSocket.h:270
 TXSocket.h:271
 TXSocket.h:272
 TXSocket.h:273
 TXSocket.h:274
 TXSocket.h:275
 TXSocket.h:276
 TXSocket.h:277
 TXSocket.h:278
 TXSocket.h:279
 TXSocket.h:280
 TXSocket.h:281
 TXSocket.h:282
 TXSocket.h:283
 TXSocket.h:284
 TXSocket.h:285
 TXSocket.h:286
 TXSocket.h:287
 TXSocket.h:288
 TXSocket.h:289
 TXSocket.h:290
 TXSocket.h:291
 TXSocket.h:292
 TXSocket.h:293
 TXSocket.h:294