#include <errno.h>
#ifdef WIN32
#include <io.h>
#endif
#include "TList.h"
#include "TObjArray.h"
#include "TObjString.h"
#include "TProof.h"
#include "TProofLog.h"
#include "TXProofMgr.h"
#include "TXSocket.h"
#include "TROOT.h"
ClassImp(TXProofMgr)
TProofMgr *GetTXProofMgr(const char *url, Int_t l, const char *al)
{ return ((TProofMgr *) new TXProofMgr(url, l, al)); }
class TXProofMgrInit {
public:
   TXProofMgrInit() {
      TProofMgr::SetTXProofMgrHook(&GetTXProofMgr);
}};
static TXProofMgrInit gxproofmgr_init;
TXProofMgr::TXProofMgr(const char *url, Int_t dbg, const char *alias)
          : TProofMgr(url)
{
   
   
   fServType = kXProofd;
   
   if (!strcmp(fUrl.GetProtocol(), TUrl("a").GetProtocol()))
      fUrl.SetProtocol("proof");
   
   if (fUrl.GetPort() == TUrl("a").GetPort()) {
      
      
      
      Int_t port = gSystem->GetServiceByName("proofd");
      if (port < 0) {
         if (gDebug > 0)
            Info("TXProofMgr","service 'proofd' not found by GetServiceByName"
                              ": using default IANA assigned tcp port 1093");
         port = 1093;
      } else {
         if (gDebug > 1)
            Info("TXProofMgr","port from GetServiceByName: %d", port);
      }
      fUrl.SetPort(port);
   }
   
   if (strcmp(fUrl.GetHost(), fUrl.GetHostFQDN()))
      fUrl.SetHost(fUrl.GetHostFQDN());
   SetName(fUrl.GetUrl(kTRUE));
   if (alias)
      SetAlias(alias);
   else
      SetAlias(fUrl.GetHost());
   
   if (Init(dbg) != 0) {
      
      
      SafeDelete(fSocket);
   }
}
Int_t TXProofMgr::Init(Int_t)
{
   
   
   
   
   
   
   
   TString u = fUrl.GetUrl(kTRUE);
   if (!(fSocket = new TXSocket(u, 'C', kPROOF_Protocol,
                                kXPROOF_Protocol, 0, -1, this)) ||
       !(fSocket->IsValid())) {
      if (!fSocket || !(fSocket->IsServProofd()))
         if (gDebug > 0)
            Error("Init", "while opening the connection to %s - exit (error: %d)",
                          u.Data(), (fSocket ? fSocket->GetOpenError() : -1));
      if (fSocket && fSocket->IsServProofd())
         fServType = TProofMgr::kProofd;
      return -1;
   }
   
   fRemoteProtocol = fSocket->GetRemoteProtocol();
   
   {  R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(fSocket);
   }
   
   return 0;
}
TXProofMgr::~TXProofMgr()
{
   
   if (fSocket)
      fSocket->Close("P");
   SafeDelete(fSocket);
   
   {  R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(this);
   }
}
TProof *TXProofMgr::AttachSession(Int_t id, Bool_t gui)
{
   
   
   
   if (!IsValid()) {
      Warning("AttachSession","invalid TXProofMgr - do nothing");
      return 0;
   }
   TProofDesc *d = GetProofDesc(id);
   if (d) {
      if (d->GetProof())
         
         return d->GetProof();
      
      TString u(Form("%s/?%d", fUrl.GetUrl(kTRUE), d->GetRemoteId()));
      
      
      if (gui)
         u += "GUI";
      
      TProof *p = new TProof(u);
      if (p && p->IsValid()) {
         
         p->SetManager(this);
         
         Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
                                  : TProofDesc::kRunning;
         d->SetStatus(st);
         d->SetProof(p);
         
         p->SetName(d->GetName());
      } else {
         
         Error("AttachSession", "attaching to PROOF session");
      }
      return p;
   }
   Info("AttachSession","invalid proofserv id (%d)", id);
   return 0;
}
void TXProofMgr::DetachSession(Int_t id, Option_t *opt)
{
   
   
   if (!IsValid()) {
      Warning("DetachSession","invalid TXProofMgr - do nothing");
      return;
   }
   if (id > 0) {
      
      TProofDesc *d = GetProofDesc(id);
      if (d) {
         if (fSocket)
            fSocket->DisconnectSession(d->GetRemoteId(), opt);
         TProof *p = d->GetProof();
         SafeDelete(p);
         fSessions->Remove(d);
         delete d;
      }
   } else if (id == 0) {
      
      if (fSocket) {
         TString o = Form("%sA",opt);
         fSocket->DisconnectSession(-1, o);
      }
      if (fSessions) {
         
         TIter nxd(fSessions);
         TProofDesc *d = 0;
         while ((d = (TProofDesc *)nxd())) {
            TProof *p = d->GetProof();
            SafeDelete(p);
         }
         fSessions->Delete();
      }
   }
   return;
}
Bool_t TXProofMgr::MatchUrl(const char *url)
{
   
   
   
   
   if (!IsValid()) {
      Warning("MatchUrl","invalid TXProofMgr - do nothing");
      return 0;
   }
   TUrl u(url);
   
   if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
      u.SetProtocol("proof");
   if (u.GetPort() == TUrl("a").GetPort()) {
      
      Int_t port = gSystem->GetServiceByName("proofd");
      if (port < 0)
         port = 1093;
      u.SetPort(port);
   }
   
   if (!strcmp(u.GetHostFQDN(), fUrl.GetHost()))
      if (u.GetPort() == fUrl.GetPort() ||
          u.GetPort() == fSocket->GetPort())
         if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
            return kTRUE;
   
   return kFALSE;
}
void TXProofMgr::ShowWorkers()
{
   
   if (!IsValid()) {
      Warning("ShowWorkers","invalid TXProofMgr - do nothing");
      return;
   }
   
   TObjString *os = fSocket->SendCoordinator(TXSocket::kQueryWorkers);
   if (os) {
      TObjArray *oa = TString(os->GetName()).Tokenize(TString("&"));
      if (oa) {
         TIter nxos(oa);
         TObjString *to = 0;
         while ((to = (TObjString *) nxos()))
            
            Printf("+  %s", to->GetName());
      }
   }
}
TList *TXProofMgr::QuerySessions(Option_t *opt)
{
   
   if (opt && !strncasecmp(opt,"L",1))
      
      return fSessions;
   
   if (!IsValid()) {
      Warning("QuerySessions","invalid TXProofMgr - do nothing");
      return 0;
   }
   
   if (!fSessions) {
      fSessions = new TList();
      fSessions->SetOwner();
   }
   
   TList *ocl = new TList;
   TObjString *os = fSocket->SendCoordinator(TXSocket::kQuerySessions);
   if (os) {
      TObjArray *oa = TString(os->GetName()).Tokenize(TString("|"));
      if (oa) {
         TProofDesc *d = 0;
         TIter nxos(oa);
         TObjString *to = (TObjString *) nxos();
         while ((to = (TObjString *) nxos())) {
            
            char al[256];
            char tg[256];
            Int_t id = -1, st = -1, nc = 0;
            sscanf(to->GetName(),"%d %s %s %d %d", &id, tg, al, &st, &nc);
            
            if (!(d = (TProofDesc *) fSessions->FindObject(tg))) {
               Int_t locid = fSessions->GetSize() + 1;
               d = new TProofDesc(tg, al, GetUrl(), locid, id, st, 0);
               fSessions->Add(d);
            } else {
               
               d->SetStatus(st);
               d->SetRemoteId(id);
               d->SetTitle(al);
            }
            
            ocl->Add(new TObjString(tg));
         }
         SafeDelete(oa);
      }
      SafeDelete(os);
   }
   
   if (fSessions->GetSize() > 0) {
      TIter nxd(fSessions);
      TProofDesc *d = 0;
      while ((d = (TProofDesc *)nxd())) {
         if (ocl->FindObject(d->GetName())) {
            if (opt && !strncasecmp(opt,"S",1))
               d->Print("");
         } else {
            fSessions->Remove(d);
            SafeDelete(d);
         }
      }
   }
   
   return fSessions;
}
Bool_t TXProofMgr::HandleError(const void *)
{
   
   Printf("TXProofMgr::HandleError: %p: got called ...", this);
   
   if (fSessions && fSessions->GetSize() > 0) {
      TIter nxd(fSessions);
      TProofDesc *d = 0;
      while ((d = (TProofDesc *)nxd())) {
         TProof *p = (TProof *) d->GetProof();
         if (p)
            p->InterruptCurrentMonitor();
      }
   }
   if (gDebug > 0)
      Printf("TXProofMgr::HandleError: %p: DONE ... ", this);
   
   return kTRUE;
}
Int_t TXProofMgr::Reset(const char *usr)
{
   
   
   
   
   
   
   if (!IsValid()) {
      Warning("Reset","invalid TXProofMgr - do nothing");
      return -1;
   }
   fSocket->SendCoordinator(TXSocket::kCleanupSessions, usr);
   return 0;
}
TProofLog *TXProofMgr::GetSessionLogs(Int_t isess,
                                      const char *stag, const char *pattern)
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   if (!IsValid()) {
      Warning("GetSessionLogs","invalid TXProofMgr - do nothing");
      return 0;
   }
   TProofLog *pl = 0;
   
   isess = (isess > 0) ? -isess : isess;
   
   TObjString *os = fSocket->SendCoordinator(TXSocket::kQueryLogPaths, stag, isess);
   
   Int_t ii = 0;
   if (os) {
      TString rs(os->GetName());
      Ssiz_t from = 0;
      
      TString tag;
      if (!rs.Tokenize(tag, from, "|")) {
         Warning("GetSessionLogs", "Session tag undefined: corruption?\n"
                                   " (received string: %s)", os->GetName());
         return (TProofLog *)0;
      }
      
      TString purl;
      if (!rs.Tokenize(purl, from, "|")) {
         Warning("GetSessionLogs", "Pool URL undefined: corruption?\n"
                                   " (received string: %s)", os->GetName());
         return (TProofLog *)0;
      }
      
      if (!pl)
         pl = new TProofLog(tag, purl, this);
      
      TString to;
      while (rs.Tokenize(to, from, "|")) {
         if (!to.IsNull()) {
            TString ord(to);
            ord.Strip(TString::kLeading, ' ');
            TString url(ord);
            if ((ii = ord.Index(" ")) != kNPOS)
               ord.Remove(ii);
            if ((ii = url.Index(" ")) != kNPOS)
               url.Remove(0, ii + 1);
            
            pl->Add(ord, url);
            
            if (gDebug > 1)
               Info("GetSessionLogs", "ord: %s, url: %s", ord.Data(), url.Data());
         }
      }
      
      SafeDelete(os);
      
      if (pl) {
         if (pattern && strlen(pattern) > 0)
            pl->Retrieve("*", TProofLog::kGrep, 0, pattern);
         else
            pl->Retrieve();
      }
   }
   
   return pl;
}
TObjString *TXProofMgr::ReadBuffer(const char *fin, Long64_t ofs, Int_t len)
{
   
   
   
   if (!IsValid()) {
      Warning("ReadBuffer","invalid TXProofMgr - do nothing");
      return (TObjString *)0;
   }
   
   return fSocket->SendCoordinator(TXSocket::kReadBuffer, fin, len, ofs, 0);
}
TObjString *TXProofMgr::ReadBuffer(const char *fin, const char *pattern)
{
   
   
   
   if (!IsValid()) {
      Warning("ReadBuffer","invalid TXProofMgr - do nothing");
      return (TObjString *)0;
   }
   
   Int_t k = 0;
   Int_t opt = 1;
   if (!strncmp(pattern,"-v ",3)) {
      opt = 2;
      k = 3;
   }
   
   Int_t i = k;
   Int_t plen = strlen(pattern);
   Int_t len = strlen(fin) + plen - k;
   char *buf = new char[len + 1];
   memcpy(buf, fin, strlen(fin));
   Int_t j = strlen(fin);
   for (i = k; i < plen; i++)
      buf[j++] = pattern[i];
   buf[len] = 0;
   
   return fSocket->SendCoordinator(TXSocket::kReadBuffer, buf, plen - k, 0, opt);
}
void TXProofMgr::ShowROOTVersions()
{
   
   
   if (!IsValid()) {
      Warning("ShowROOTVersions","invalid TXProofMgr - do nothing");
      return;
   }
   
   TObjString *os = fSocket->SendCoordinator(TXSocket::kQueryROOTVersions);
   if (os) {
      
      Printf("----------------------------------------------------------\n");
      Printf("Available versions (tag ROOT-vers remote-path PROOF-version):\n");
      Printf("%s", os->GetName());
      Printf("----------------------------------------------------------");
      SafeDelete(os);
   }
   
   return;
}
void TXProofMgr::SetROOTVersion(const char *tag)
{
   
   
   if (!IsValid()) {
      Warning("SetROOTVersion","invalid TXProofMgr - do nothing");
      return;
   }
   
   fSocket->SendCoordinator(TXSocket::kROOTVersion, tag);
   
   return;
}
Int_t TXProofMgr::SendMsgToUsers(const char *msg, const char *usr)
{
   
   
   
   
   
   
   Int_t rc = 0;
   
   if (!msg || strlen(msg) <= 0) {
      Error("SendMsgToUsers","no message to send - do nothing");
      return -1;
   }
   
   const Int_t kMAXBUF = 32768;
   char buf[kMAXBUF] = {0};
   char *p = &buf[0];
   Int_t space = kMAXBUF - 1;
   Int_t len = 0;
   Int_t lusr = 0;
   
   if (usr && strlen(usr) > 0 && (strlen(usr) != 1 || usr[0] != '*')) {
      lusr = (strlen(usr) + 3);
      sprintf(buf, "u:%s ", usr);
      p += lusr;
      space -= lusr;
   }
   
   if (!gSystem->AccessPathName(msg, kFileExists)) {
      
      if (gSystem->AccessPathName(msg, kReadPermission)) {
         Error("SendMsgToUsers","request to read message from unreadable file '%s'", msg);
         return -1;
      }
      
      FILE *f = 0;
      if (!(f = fopen(msg, "r"))) {
         Error("SendMsgToUsers", "file '%s' cannot be open", msg);
         return -1;
      }
      
      Int_t left = (Int_t) lseek(fileno(f), (off_t) 0, SEEK_END);
      lseek(fileno(f), (off_t) 0, SEEK_SET);
      
      Int_t wanted = left;
      if (wanted > space) {
         wanted = space;
         Warning("SendMsgToUsers",
                 "requested to send %d bytes: max size is %d bytes: truncating", left, space);
      }
      do {
         while ((len = read(fileno(f), p, wanted)) < 0 &&
                  TSystem::GetErrno() == EINTR)
            TSystem::ResetErrno();
         if (len < 0) {
            SysError("SendMsgToUsers", "error reading file");
            break;
         }
         
         left -= len;
         p += len;
         wanted = (left > kMAXBUF-1) ? kMAXBUF-1 : left;
      } while (len > 0 && left > 0);
   } else {
      
      len = strlen(msg);
      if (len > space) {
         Warning("SendMsgToUsers",
                 "requested to send %d bytes: max size is %d bytes: truncating", len, space);
         len = space;
      }
      memcpy(p, msg, len);
   }
   
   buf[len + lusr] = 0;
   
   fSocket->SendCoordinator(TXSocket::kSendMsgToUser, buf);
   return rc;
}
Last update: Thu Jan 17 09:05:33 2008
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.