#include "TPSocket.h"
#include "TUrl.h"
#include "TServerSocket.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "TMessage.h"
#include "Bytes.h"
#include "TROOT.h"
#include "TError.h"
#include "TVirtualMutex.h"
ClassImp(TPSocket)
TPSocket::TPSocket(TInetAddress addr, const char *service, Int_t size,
                   Int_t tcpwindowsize) : TSocket(addr, service)
{
   
   
   
   
   
   
   
   
   fSize = size;
   Init(tcpwindowsize);
}
TPSocket::TPSocket(TInetAddress addr, Int_t port, Int_t size,
                   Int_t tcpwindowsize) : TSocket(addr, port)
{
   
   
   
   
   
   
   
   
   fSize = size;
   Init(tcpwindowsize);
}
TPSocket::TPSocket(const char *host, const char *service, Int_t size,
                   Int_t tcpwindowsize) : TSocket(host, service)
{
   
   
   
   
   
   
   
   
   fSize = size;
   Init(tcpwindowsize);
}
TPSocket::TPSocket(const char *host, Int_t port, Int_t size,
                   Int_t tcpwindowsize)
                  : TSocket(host, port, (Int_t)(size > 1 ? -1 : tcpwindowsize))
{
   
   
   
   
   
   
   
   
   
   fSockets        = 0;
   fWriteMonitor   = 0;
   fReadMonitor    = 0;
   fWriteBytesLeft = 0;
   fReadBytesLeft  = 0;
   fWritePtr       = 0;
   fReadPtr        = 0;
   
   fSize           = 1;
   
   Bool_t valid = TSocket::IsValid();
   
   Bool_t authreq = kFALSE;
   char *pauth = (char *)strstr(host, "?A");
   if (pauth) {
      authreq = kTRUE;
   }
   
   Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
   
   if (authreq) {
      if (valid) {
         if (!Authenticate(TUrl(host).GetUser())) {
            if (rootdSrv && fRemoteProtocol < 10) {
               
               
               
               Int_t tcpw = (size > 1 ? -1 : tcpwindowsize);
               TSocket *ns = new TSocket(host, port, tcpw);
               if (ns->IsValid()) {
                  R__LOCKGUARD2(gROOTMutex);
                  gROOT->GetListOfSockets()->Remove(ns);
                  fSocket = ns->GetDescriptor();
                  fSize = size;
                  Init(tcpwindowsize);
               }
               if ((valid = IsValid())) {
                  if (!Authenticate(TUrl(host).GetUser())) {
                     TSocket::Close();
                     valid = kFALSE;
                  }
               }
            } else {
               TSocket::Close();
               valid = kFALSE;
            }
         }
      }
      
      *pauth = '\0';
      SetUrl(host);
   }
   
   if (!rootdSrv || fRemoteProtocol > 9) {
      if (valid) {
         fSize = size;
         Init(tcpwindowsize);
      }
   }
}
TPSocket::TPSocket(const char *host, Int_t port, Int_t size, TSocket *sock)
{
   
   
   
   
   
   
   
   
   
   fSockets        = 0;
   fWriteMonitor   = 0;
   fReadMonitor    = 0;
   fWriteBytesLeft = 0;
   fReadBytesLeft  = 0;
   fWritePtr       = 0;
   fReadPtr        = 0;
   
   fSize           = 1;
   
   if (!sock) return;
   
   fSocket         = sock->GetDescriptor();
   fService        = sock->GetService();
   fAddress        = sock->GetInetAddress();
   fLocalAddress   = sock->GetLocalInetAddress();
   fBytesSent      = sock->GetBytesSent();
   fBytesRecv      = sock->GetBytesRecv();
   fCompress       = sock->GetCompressionLevel();
   fSecContext     = sock->GetSecContext();
   fRemoteProtocol = sock->GetRemoteProtocol();
   fServType       = (TSocket::EServiceType)sock->GetServType();
   fTcpWindowSize  = sock->GetTcpWindowSize();
   
   Bool_t valid = sock->IsValid();
   
   Bool_t authreq = kFALSE;
   char *pauth = (char *)strstr(host, "?A");
   if (pauth) {
      authreq = kTRUE;
   }
   
   Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
   
   if (authreq) {
      if (valid) {
         if (!Authenticate(TUrl(host).GetUser())) {
            if (rootdSrv && fRemoteProtocol < 10) {
               
               
               
               Int_t tcpw = (size > 1 ? -1 : fTcpWindowSize);
               TSocket *ns = new TSocket(host, port, tcpw);
               if (ns->IsValid()) {
                  R__LOCKGUARD2(gROOTMutex);
                  gROOT->GetListOfSockets()->Remove(ns);
                  fSocket = ns->GetDescriptor();
                  fSize = size;
                  Init(fTcpWindowSize);
               }
               if ((valid = IsValid())) {
                  if (!Authenticate(TUrl(host).GetUser())) {
                     TSocket::Close();
                     valid = kFALSE;
                  }
               }
            } else {
               TSocket::Close();
               valid = kFALSE;
            }
         }
      }
      
      *pauth = '\0';
      SetUrl(host);
   }
   
   if (!rootdSrv || fRemoteProtocol > 9) {
      if (valid) {
         fSize = size;
         Init(fTcpWindowSize, sock);
      }
   }
   
   if (IsValid()) {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Add(this);
   }
}
TPSocket::TPSocket(TSocket *pSockets[], Int_t size)
{
   
   fSockets = pSockets;
   fSize    = size;
   
   
   if (fSize <= 1)
      fSocket = fSockets[0]->GetDescriptor();
   
   SetOption(kNoDelay, 1);
   if (fSize > 1)
      SetOption(kNoBlock, 1);
   fWriteMonitor   = new TMonitor;
   fReadMonitor    = new TMonitor;
   fWriteBytesLeft = new Int_t[fSize];
   fReadBytesLeft  = new Int_t[fSize];
   fWritePtr       = new char*[fSize];
   fReadPtr        = new char*[fSize];
   for (int i = 0; i < fSize; i++) {
      fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
      fReadMonitor->Add(fSockets[i], TMonitor::kRead);
   }
   fWriteMonitor->DeActivateAll();
   fReadMonitor->DeActivateAll();
   SetName(fSockets[0]->GetName());
   SetTitle(fSockets[0]->GetTitle());
   fAddress = fSockets[0]->GetInetAddress();
   {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Add(this);
   }
}
TPSocket::~TPSocket()
{
   
   Close();
   delete fWriteMonitor;
   delete fReadMonitor;
   delete [] fWriteBytesLeft;
   delete [] fReadBytesLeft;
   delete [] fWritePtr;
   delete [] fReadPtr;
}
void TPSocket::Close(Option_t *option)
{
   
   
   
   
   if (!IsValid()) {
      
      
      TSocket::Close(option);
      return;
   }
   if (fSize <= 1) {
      TSocket::Close(option);
   } else {
      for (int i = 0; i < fSize; i++) {
         fSockets[i]->Close(option);
         delete fSockets[i];
      }
   }
   delete [] fSockets;
   fSockets = 0;
   {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(this);
   }
}
void TPSocket::Init(Int_t tcpwindowsize, TSocket *sock)
{
   
   fSockets        = 0;
   fWriteMonitor   = 0;
   fReadMonitor    = 0;
   fWriteBytesLeft = 0;
   fReadBytesLeft  = 0;
   fWritePtr       = 0;
   fReadPtr        = 0;
   if ((sock && !sock->IsValid()) || !TSocket::IsValid())
      return;
   Int_t i = 0;
   if (fSize <= 1) {
      
      fSize = 1;
      
      if (sock)
         sock->SetOption(kNoDelay, 1);
      else
         TSocket::SetOption(kNoDelay, 1);
      
      
      if (sock)
         sock->Send((Int_t)0, (Int_t)0);
      else
         TSocket::Send((Int_t)0, (Int_t)0);
      
      fSockets = new TSocket*[1];
      fSockets[0]= (TSocket *)this;
   } else {
      
      
      TServerSocket ss(0, kFALSE, fSize, tcpwindowsize);
      
      
      if (sock)
         sock->Send(ss.GetLocalPort(), fSize);
      else
         TSocket::Send(ss.GetLocalPort(), fSize);
      fSockets = new TSocket*[fSize];
      
      for (i = 0; i < fSize; i++) {
         fSockets[i] = ss.Accept();
         R__LOCKGUARD2(gROOTMutex);
         gROOT->GetListOfSockets()->Remove(fSockets[i]);
      }
      
      SetOption(kNoDelay, 1);
      SetOption(kNoBlock, 1);
      
      if (sock)
         sock->Close();
      else
         gSystem->CloseConnection(fSocket, kFALSE);
      fSocket = -1;
   }
   fWriteMonitor   = new TMonitor;
   fReadMonitor    = new TMonitor;
   fWriteBytesLeft = new Int_t[fSize];
   fReadBytesLeft  = new Int_t[fSize];
   fWritePtr       = new char*[fSize];
   fReadPtr        = new char*[fSize];
   for (i = 0; i < fSize; i++) {
      fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
      fReadMonitor->Add(fSockets[i], TMonitor::kRead);
   }
   fWriteMonitor->DeActivateAll();
   fReadMonitor->DeActivateAll();
}
TInetAddress TPSocket::GetLocalInetAddress()
{
   
   
   if (fSize<= 1)
      return TSocket::GetLocalInetAddress();
   if (IsValid()) {
      if (fLocalAddress.GetPort() == -1)
         fLocalAddress = gSystem->GetSockName(fSockets[0]->GetDescriptor());
      return fLocalAddress;
   }
   return TInetAddress();
}
Int_t TPSocket::GetDescriptor() const
{
   
   if (fSize <= 1)
      return TSocket::GetDescriptor();
   return fSockets ? fSockets[0]->GetDescriptor() : -1;
}
Int_t TPSocket::Send(const TMessage &mess)
{
   
   
   
   
   
   if (!fSockets || fSize <= 1)
      return TSocket::Send(mess);  
   if (!IsValid()) {
      return -1;
   }
   if (mess.IsReading()) {
      Error("Send", "cannot send a message used for reading");
      return -1;
   }
   mess.SetLength();   
   if (fCompress > 0 && mess.GetCompressionLevel() == 0)
      const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
   if (mess.GetCompressionLevel() > 0)
      const_cast<TMessage&>(mess).Compress();
   char *mbuf = mess.Buffer();
   Int_t mlen = mess.Length();
   if (mess.CompBuffer()) {
      mbuf = mess.CompBuffer();
      mlen = mess.CompLength();
   }
   Int_t nsent, ulen = (Int_t) sizeof(UInt_t);
   
   if ((nsent = SendRaw(mbuf, ulen, kDefault)) <= 0)
      return nsent;
   
   if ((nsent = SendRaw(mbuf+ulen, mlen-ulen, kDefault)) <= 0)
      return nsent;
   
   if (mess.What() & kMESS_ACK) {
      char buf[2];
      if (RecvRaw(buf, sizeof(buf), kDefault) < 0)
         return -1;
      if (strncmp(buf, "ok", 2)) {
         Error("Send", "bad acknowledgement");
         return -1;
      }
   }
   return nsent;  
}
Int_t TPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
{
   
   
   if (fSize == 1)
      return TSocket::SendRaw(buffer,length,opt);
   if (!fSockets) return -1;
   
   Int_t i, nsocks = fSize, len = length;
   if (len < 4096)
      nsocks = 1;
   ESendRecvOptions sendopt = kDontBlock;
   if (nsocks == 1)
      sendopt = kDefault;
   if (opt != kDefault) {
      nsocks  = 1;
      sendopt = opt;
   }
   if (nsocks == 1)
      fSockets[0]->SetOption(kNoBlock, 0);
   else
      fSockets[0]->SetOption(kNoBlock, 1);
   
   
   for (i = 0; i < nsocks; i++) {
      fWriteBytesLeft[i] = len/nsocks;
      fWritePtr[i] = (char *)buffer + (i*fWriteBytesLeft[i]);
      fWriteMonitor->Activate(fSockets[i]);
   }
   fWriteBytesLeft[nsocks-1] += len%nsocks;
   
   while (len > 0) {
      TSocket *s = fWriteMonitor->Select();
      for (int is = 0; is < nsocks; is++) {
         if (s == fSockets[is]) {
            if (fWriteBytesLeft[is] > 0) {
               Int_t nsent;
again:
               if ((nsent = fSockets[is]->SendRaw(fWritePtr[is],
                                                  fWriteBytesLeft[is],
                                                  sendopt)) <= 0) {
                  if (nsent == -4) {
                     
                     goto again;
                  }
                  fWriteMonitor->DeActivateAll();
                  if (nsent == -5) {
                     
                     Close();
                  }
                  return -1;
               }
               if (opt == kDontBlock) {
                  fWriteMonitor->DeActivateAll();
                  return nsent;
               }
               fWriteBytesLeft[is] -= nsent;
               fWritePtr[is] += nsent;
               len -= nsent;
            }
         }
      }
   }
   fWriteMonitor->DeActivateAll();
   return length;
}
Int_t TPSocket::Recv(TMessage *&mess)
{
   
   
   
   
   if (fSize <= 1)
      return TSocket::Recv(mess);
   if (!IsValid()) {
      mess = 0;
      return -1;
   }
   Int_t  n;
   UInt_t len;
   if ((n = RecvRaw(&len, sizeof(UInt_t), kDefault)) <= 0) {
      mess = 0;
      return n;
   }
   len = net2host(len);  
   char *buf = new char[len+sizeof(UInt_t)];
   if ((n = RecvRaw(buf+sizeof(UInt_t), len, kDefault)) <= 0) {
      delete [] buf;
      mess = 0;
      return n;
   }
   mess = new TMessage(buf, len+sizeof(UInt_t));
   if (mess->What() & kMESS_ACK) {
      char ok[2] = { 'o', 'k' };
      if (SendRaw(ok, sizeof(ok), kDefault) < 0) {
         delete mess;
         mess = 0;
         return -1;
      }
      mess->SetWhat(mess->What() & ~kMESS_ACK);
   }
   return n;
}
Int_t TPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
{
   
   
   if (fSize <= 1)
      return TSocket::RecvRaw(buffer,length,opt);
   if (!fSockets) return -1;
   
   Int_t i, nsocks = fSize, len = length;
   if (len < 4096)
      nsocks = 1;
   ESendRecvOptions recvopt = kDontBlock;
   if (nsocks == 1)
      recvopt = kDefault;
   if (opt != kDefault) {
      nsocks  = 1;
      recvopt = opt;
   }
   if (nsocks == 1)
      fSockets[0]->SetOption(kNoBlock, 0);
   else
      fSockets[0]->SetOption(kNoBlock, 1);
   
   
   for (i = 0; i < nsocks; i++) {
      fReadBytesLeft[i] = len/nsocks;
      fReadPtr[i] = (char *)buffer + (i*fReadBytesLeft[i]);
      fReadMonitor->Activate(fSockets[i]);
   }
   fReadBytesLeft[nsocks-1] += len%nsocks;
   
   
   
   while (len > 0) {
      TSocket *s = fReadMonitor->Select();
      for (int is = 0; is < nsocks; is++) {
         if (s == fSockets[is]) {
            if (fReadBytesLeft[is] > 0) {
               Int_t nrecv;
               if ((nrecv = fSockets[is]->RecvRaw(fReadPtr[is],
                                                  fReadBytesLeft[is],
                                                  recvopt)) <= 0) {
                  fReadMonitor->DeActivateAll();
                  if (nrecv == -5) {
                     
                     Close();
                  }
                  return -1;
               }
               if (opt == kDontBlock) {
                  fReadMonitor->DeActivateAll();
                  return nrecv;
               }
               fReadBytesLeft[is] -= nrecv;
               fReadPtr[is] += nrecv;
               len -= nrecv;
            }
         }
      }
   }
   fReadMonitor->DeActivateAll();
   return length;
}
Int_t TPSocket::SetOption(ESockOptions opt, Int_t val)
{
   
   if (fSize <= 1)
      return TSocket::SetOption(opt,val);
   Int_t ret = 0;
   for (int i = 0; i < fSize; i++)
      ret = fSockets[i]->SetOption(opt, val);
   return ret;
}
Int_t TPSocket::GetOption(ESockOptions opt, Int_t &val)
{
   
   if (fSize <= 1)
      return TSocket::GetOption(opt,val);
   Int_t ret = 0;
   for (int i = 0; i < fSize; i++)
      ret = fSockets[i]->GetOption(opt, val);
   return ret;
}
Int_t TPSocket::GetErrorCode() const
{
   
   
   if (fSize <= 1)
      return TSocket::GetErrorCode();
   return fSockets[0] ? fSockets[0]->GetErrorCode() : 0;
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.