#ifndef ROOT_TXSocket
#define ROOT_TXSocket
#define DFLT_CONNECTMAXTRY 10
#ifndef ROOT_TMutex
#include "TMutex.h"
#endif
#ifndef ROOT_TSemaphore
#include "TSemaphore.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_TList
#include "TList.h"
#endif
#ifndef ROOT_TMessage
#include "TMessage.h"
#endif
#ifndef ROOT_TUrl
#include "TUrl.h"
#endif
#ifndef ROOT_TSocket
#include "TSocket.h"
#endif
#ifndef ROOT_XrdProofConn
#include "XrdProofConn.h"
#endif
#ifndef XRC_UNSOLMSG_H
#include "XrdClient/XrdClientUnsolMsg.hh"
#endif
#include <list>
class TObjString;
class TXSockBuf;
class TXSockPipe;
class TXHandler;
class TXSocketHandler;
class XrdClientMessage;
typedef struct {
Int_t fInt1;
Int_t fInt2;
Int_t fInt3;
Int_t fInt4;
} XHandleIn_t;
typedef struct {
Int_t fOpt;
const char *fMsg;
} XHandleErr_t;
class TXSocket : public TSocket, public XrdClientAbsUnsolMsgHandler {
friend class TXProofMgr;
friend class TXProofServ;
friend class TXSlave;
friend class TXSocketHandler;
friend class TXSockPipe;
friend class TXUnixSocket;
private:
char fMode;
kXR_int32 fSendOpt;
Short_t fSessionID;
TString fUser;
TString fHost;
Int_t fPort;
Int_t fLogLevel;
TString fBuffer;
TObject *fReference;
TXHandler *fHandler;
XrdProofConn *fConn;
TSemaphore fASem;
TMutex *fAMtx;
Bool_t fAWait;
std::list<TXSockBuf *> fAQue;
Int_t fByteLeft;
Int_t fByteCur;
TXSockBuf *fBufCur;
TSemaphore fAsynProc;
TMutex *fIMtx;
kXR_int32 fILev;
Bool_t fIForward;
Int_t fPid;
Bool_t fDontTimeout;
Bool_t fRDInterrupt;
Int_t fXrdProofdVersion;
static TXSockPipe fgPipe;
static TString fgLoc;
static Bool_t fgInitDone;
static TMutex fgSMtx;
static std::list<TXSockBuf *> fgSQue;
Int_t PickUpReady();
TXSockBuf *PopUpSpare(Int_t sz);
void PushBackSpare();
void PostMsg(Int_t type, const char *msg = 0);
Int_t GetLowSocket() const { return (fConn ? fConn->GetLowSocket() : -1); }
static void SetLocation(const char *loc = "");
static void InitEnvs();
public:
enum EUrgentMsgType { kStopProcess = 2000 };
TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
TXSocket(const TXSocket &xs);
TXSocket& operator=(const TXSocket& xs);
virtual ~TXSocket();
virtual void Close(Option_t *opt = "");
Bool_t Create(Bool_t attach = kFALSE);
void DisconnectSession(Int_t id, Option_t *opt = "");
void DoError(int level,
const char *location, const char *fmt, va_list va) const;
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s,
XrdClientMessage *msg);
virtual Int_t GetClientID() const { return -1; }
virtual Int_t GetClientIDSize() const { return 1; }
Int_t GetLogConnID() const { return (fConn ? fConn->GetLogConnID() : -1); }
Int_t GetOpenError() const { return (fConn ? fConn->GetOpenError() : -1); }
Int_t GetServType() const { return (fConn ? fConn->GetServType() : -1); }
Int_t GetSessionID() const { return (fConn ? fConn->GetSessionID() : -1); }
Int_t GetXrdProofdVersion() const { return fXrdProofdVersion; }
Bool_t IsValid() const { return (fConn ? (fConn->IsValid()) : kFALSE); }
Bool_t IsServProofd();
virtual void RemoveClientID() { }
virtual void SetClientID(Int_t) { }
void SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
void SetSessionID(Int_t id);
Int_t Send(const TMessage &mess);
Int_t Send(Int_t kind) { return TSocket::Send(kind); }
Int_t Send(Int_t status, Int_t kind)
{ return TSocket::Send(status, kind); }
Int_t Send(const char *mess, Int_t kind = kMESS_STRING)
{ return TSocket::Send(mess, kind); }
Int_t SendRaw(const void *buf, Int_t len,
ESendRecvOptions opt = kDontBlock);
TObjString *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
Int_t Recv(TMessage *&mess);
Int_t Recv(Int_t &status, Int_t &kind)
{ return TSocket::Recv(status, kind); }
Int_t Recv(char *mess, Int_t max)
{ return TSocket::Recv(mess, max); }
Int_t Recv(char *mess, Int_t max, Int_t &kind)
{ return TSocket::Recv(mess, max, kind); }
Int_t RecvRaw(void *buf, Int_t len,
ESendRecvOptions opt = kDefault);
Int_t SendInterrupt(Int_t type);
Int_t GetInterrupt(Bool_t &forward);
void SendUrgent(Int_t type, Int_t int1, Int_t int2);
inline void SetInterrupt(Bool_t i = kTRUE) { R__LOCKGUARD(fAMtx);
fRDInterrupt = i;
if (i && fConn) fConn->SetInterrupt();
if (i && fAWait) fASem.Post(); }
inline Bool_t IsInterrupt() { R__LOCKGUARD(fAMtx); return fRDInterrupt; }
inline void SetAWait(Bool_t w = kTRUE) { R__LOCKGUARD(fAMtx); fAWait = w; }
inline Bool_t IsAWait() { R__LOCKGUARD(fAMtx); return fAWait; }
Int_t Flush();
Bool_t Ping(const char *ord = 0);
void RemoteTouch();
void CtrlC();
Int_t SetOption(ESockOptions, Int_t) { return 0; }
void DisableTimeout() { fDontTimeout = kTRUE; }
void EnableTimeout() { fDontTimeout = kFALSE; }
virtual Int_t Reconnect();
ClassDef(TXSocket, 0)
};
class TXSockBuf {
public:
Int_t fSiz;
Int_t fLen;
Char_t *fBuf;
Bool_t fOwn;
Int_t fCid;
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
~TXSockBuf();
void Resize(Int_t sz);
static Long64_t BuffMem();
static Long64_t GetMemMax();
static void SetMemMax(Long64_t memmax);
private:
Char_t *fMem;
static Long64_t fgBuffMem;
static Long64_t fgMemMax;
};
class TXSockPipe {
public:
TXSockPipe(const char *loc = "");
virtual ~TXSockPipe();
Bool_t IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
TXSocket *GetLastReady();
Int_t GetRead() const { return fPipe[0]; }
Int_t Post(TSocket *s);
Int_t Clean(TSocket *s);
Int_t Flush(TSocket *s);
void DumpReadySock();
void SetLoc(const char *loc = "") { fLoc = loc; }
private:
TMutex fMutex;
Int_t fPipe[2];
TString fLoc;
TList fReadySock;
};
class TXSemaphoreGuard {
public:
TXSemaphoreGuard(TSemaphore *sem) : fSem(sem), fValid(kTRUE) { if (!fSem || fSem->TryWait()) fValid = kFALSE; }
virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
Bool_t IsValid() const { return fValid; }
private:
TSemaphore *fSem;
Bool_t fValid;
};
#endif