#include "Bytes.h"
#include "TError.h"
#include "TEnv.h"
#include "TFile.h"
#include "TFileCollection.h"
#include "TFileInfo.h"
#include "TList.h"
#include "TParameter.h"
#include "TProof.h"
#include "TProofMgr.h"
#include "TProofMgrLite.h"
#include "TSocket.h"
#include "TROOT.h"
#include "TMath.h"
ClassImp(TProofMgr)
TList TProofMgr::fgListOfManagers;
TProofMgr_t TProofMgr::fgTXProofMgrHook = 0;
typedef struct {
int first;
int second;
int third;
int fourth;
int fifth;
} clnt_HS_t;
typedef struct {
int msglen;
int protover;
int msgval;
} srv_HS_t;
TProofMgr::TProofMgr(const char *url, Int_t, const char *alias)
: TNamed("",""), fRemoteProtocol(-1), fServType(kXProofd),
fSessions(0), fIntHandler(0)
{
fServType = kProofd;
fUrl = (!url || strlen(url) <= 0) ? TUrl("proof://localhost") : TUrl(url);
if (!strcmp(fUrl.GetProtocol(), TUrl("a").GetProtocol()))
fUrl.SetProtocol("proof");
if (fUrl.GetPort() == TUrl("a").GetPort()) {
Int_t port = gSystem->GetServiceByName("proofd");
if (port < 0) {
if (gDebug > 0)
Info("TProofMgr","service 'proofd' not found by GetServiceByName"
": using default IANA assigned tcp port 1093");
port = 1093;
} else {
if (gDebug > 1)
Info("TProofMgr","port from GetServiceByName: %d", port);
}
fUrl.SetPort(port);
}
if (strcmp(fUrl.GetHost(), "__lite__")) {
if (strcmp(fUrl.GetHost(), fUrl.GetHostFQDN()))
fUrl.SetHost(fUrl.GetHostFQDN());
}
SetName(fUrl.GetUrl(kTRUE));
if (alias)
SetAlias(alias);
else
SetAlias(fUrl.GetHost());
}
TProofMgr::~TProofMgr()
{
SafeDelete(fSessions);
SafeDelete(fIntHandler);
fgListOfManagers.Remove(this);
gROOT->GetListOfProofs()->Remove(this);
}
TProof *TProofMgr::AttachSession(Int_t id, Bool_t gui)
{
TProofDesc *d = GetProofDesc(id);
if (d)
return AttachSession(d, gui);
Info("AttachSession","invalid proofserv id (%d)", id);
return 0;
}
TProof *TProofMgr::AttachSession(TProofDesc *d, Bool_t)
{
if (!d) {
Warning("AttachSession","invalid description object - do nothing");
return 0;
}
if (d->GetProof())
return d->GetProof();
Warning("AttachSession","session not available - do nothing");
return 0;
}
void TProofMgr::DetachSession(Int_t id, Option_t *opt)
{
if (!IsValid()) {
Warning("DetachSession","invalid TProofMgr - do nothing");
return;
}
if (id > 0) {
TProofDesc *d = GetProofDesc(id);
if (d) {
if (d->GetProof())
d->GetProof()->Detach(opt);
TProof *p = d->GetProof();
fSessions->Remove(d);
SafeDelete(p);
delete d;
}
} else if (id == 0) {
if (fSessions) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
if (d->GetProof())
d->GetProof()->Detach(opt);
TProof *p = d->GetProof();
fSessions->Remove(d);
SafeDelete(p);
}
fSessions->Delete();
}
}
return;
}
void TProofMgr::DetachSession(TProof *p, Option_t *opt)
{
if (!IsValid()) {
Warning("DetachSession","invalid TProofMgr - do nothing");
return;
}
if (p) {
TProofDesc *d = GetProofDesc(p);
if (d) {
if (d->GetProof())
d->GetProof()->Detach(opt);
fSessions->Remove(d);
delete d;
}
}
return;
}
TList *TProofMgr::QuerySessions(Option_t *opt)
{
if (opt && !strncasecmp(opt,"L",1))
return fSessions;
if (!fSessions) {
fSessions = new TList();
fSessions->SetOwner();
}
if (gROOT->GetListOfProofs()) {
TIter nxp(gROOT->GetListOfProofs());
TObject *o = 0;
TProof *p = 0;
Int_t ns = 0;
while ((o = nxp())) {
if (o->InheritsFrom(TProof::Class())) {
p = (TProof *)o;
if (MatchUrl(p->GetUrl())) {
if (!(fSessions->FindObject(p->GetSessionTag()))) {
Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
: TProofDesc::kRunning;
TProofDesc *d =
new TProofDesc(p->GetName(), p->GetTitle(), p->GetUrl(),
++ns, p->GetSessionID(), st, p);
fSessions->Add(d);
}
}
}
}
}
if (fSessions->GetSize() > 0) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
if (d->GetProof()) {
if (!(gROOT->GetListOfProofs()->FindObject(d->GetProof()))) {
fSessions->Remove(d);
SafeDelete(d);
} else {
if (opt && !strncasecmp(opt,"S",1))
d->Print("");
}
}
}
}
return fSessions;
}
Int_t TProofMgr::SendMsgToUsers(const char *, const char *)
{
Warning("SendMsgToUsers","functionality not supported");
return -1;
}
Int_t TProofMgr::Reset(Bool_t, const char *)
{
Warning("Reset","functionality not supported");
return -1;
}
void TProofMgr::ShowWorkers()
{
AbstractMethod("ShowWorkers");
}
TProofDesc *TProofMgr::GetProofDesc(Int_t id)
{
TProofDesc *d = 0;
if (id > 0) {
QuerySessions("");
if (fSessions) {
TIter nxd(fSessions);
while ((d = (TProofDesc *)nxd())) {
if (d->MatchId(id))
return d;
}
}
}
return d;
}
TProofDesc *TProofMgr::GetProofDesc(TProof *p)
{
TProofDesc *d = 0;
if (p) {
QuerySessions("");
if (fSessions) {
TIter nxd(fSessions);
while ((d = (TProofDesc *)nxd())) {
if (p == d->GetProof())
return d;
}
}
}
return d;
}
void TProofMgr::DiscardSession(TProof *p)
{
if (p) {
TProofDesc *d = 0;
if (fSessions) {
TIter nxd(fSessions);
while ((d = (TProofDesc *)nxd())) {
if (p == d->GetProof()) {
fSessions->Remove(d);
delete d;
break;
}
}
}
}
}
TProof *TProofMgr::CreateSession(const char *cfg,
const char *cfgdir, Int_t loglevel)
{
if (IsProofd())
fUrl.SetOptions("std");
TProof *p = new TProof(fUrl.GetUrl(), cfg, cfgdir, loglevel, 0, this);
if (p && p->IsValid()) {
Int_t ns = 1;
if (fSessions) {
if (fSessions->Last())
ns = ((TProofDesc *)(fSessions->Last()))->GetLocalId() + 1;
} else {
fSessions = new TList;
}
Int_t st = (p->IsIdle()) ? TProofDesc::kIdle : TProofDesc::kRunning ;
TProofDesc *d =
new TProofDesc(p->GetName(), p->GetTitle(), p->GetUrl(),
ns, p->GetSessionID(), st, p);
fSessions->Add(d);
} else {
if (gDebug > 0) Error("CreateSession", "PROOF session creation failed");
SafeDelete(p);
}
return p;
}
Bool_t TProofMgr::MatchUrl(const char *url)
{
TUrl u(url);
if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
u.SetProtocol("proof");
if (u.GetPort() == TUrl("a").GetPort()) {
Int_t port = gSystem->GetServiceByName("proofd");
if (port < 0)
port = 1093;
u.SetPort(port);
}
if (!strcmp(u.GetHostFQDN(), fUrl.GetHostFQDN()))
if (u.GetPort() == fUrl.GetPort())
if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
return kTRUE;
return kFALSE;
}
TList *TProofMgr::GetListOfManagers()
{
if (gROOT->GetListOfProofs()) {
TIter nxp(gROOT->GetListOfProofs());
TObject *o = 0;
while ((o = nxp())) {
if (o->InheritsFrom(TProofMgr::Class()) && !fgListOfManagers.FindObject(o))
fgListOfManagers.Add(o);
}
}
if (fgListOfManagers.GetSize() > 0) {
TIter nxp(&fgListOfManagers);
TObject *o = 0;
Int_t nm = 0;
while ((o = nxp())) {
if (!(gROOT->GetListOfProofs()->FindObject(o))) {
fgListOfManagers.Remove(o);
} else {
TProofMgr *p = (TProofMgr *)o;
if (gDebug > 0)
Printf("// #%d: \"%s\" (%s)", ++nm, p->GetName(), p->GetTitle());
}
}
} else {
if (gDebug > 0)
Printf("No managers found");
}
return &fgListOfManagers;
}
TProofMgr *TProofMgr::Create(const char *uin, Int_t loglevel,
const char *alias, Bool_t xpd)
{
TProofMgr *m= 0;
Bool_t isLite = kFALSE;
TUrl u(uin);
TString proto = u.GetProtocol();
if (proto.IsNull()) {
u.SetUrl(gEnv->GetValue("Proof.LocalDefault", "lite://"));
proto = u.GetProtocol();
}
TString host = u.GetHost();
if (proto == "lite" || host == "__lite__" ) {
#ifndef WIN32
isLite = kTRUE;
u.SetHost("__lite__");
u.SetProtocol("proof");
u.SetPort(1093);
#else
::Info("TProofMgr::Create","'lite' not yet supported on Windows");
return m;
#endif
}
if (!isLite) {
if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
u.SetProtocol("proof");
if (u.GetPort() == TUrl("a").GetPort())
u.SetPort(1093);
}
const char *url = u.GetUrl();
TList *lm = TProofMgr::GetListOfManagers();
if (lm) {
TIter nxm(lm);
while ((m = (TProofMgr *)nxm())) {
if (m->IsValid()) {
if (m->MatchUrl(url)) return m;
} else {
fgListOfManagers.Remove(m);
SafeDelete(m);
break;
}
}
}
if (isLite) {
return new TProofMgrLite(url, loglevel, alias);
}
m = 0;
Bool_t trystd = kTRUE;
if (xpd) {
TProofMgr_t cm = TProofMgr::GetXProofMgrHook();
if (cm) {
m = (TProofMgr *) (*cm)(url, loglevel, alias);
trystd = (m && !(m->IsValid()) && m->IsProofd()) ? kTRUE : kFALSE;
}
}
if (trystd) {
SafeDelete(m);
m = new TProofMgr(url, loglevel, alias);
}
if (m) {
fgListOfManagers.Add(m);
if (m->IsValid() && !(m->IsProofd())) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfProofs()->Add(m);
gROOT->GetListOfSockets()->Add(m);
}
}
return m;
}
TProofMgr_t TProofMgr::GetXProofMgrHook()
{
if (!fgTXProofMgrHook) {
TString prooflib = "libProofx";
char *p = 0;
if ((p = gSystem->DynamicPathName(prooflib, kTRUE))) {
delete[] p;
if (gSystem->Load(prooflib) == -1)
::Error("TProofMgr::GetXProofMgrCtor",
"can't load %s", prooflib.Data());
} else
::Error("TProofMgr::GetXProofMgrCtor",
"can't locate %s", prooflib.Data());
}
return fgTXProofMgrHook;
}
void TProofMgr::SetTXProofMgrHook(TProofMgr_t pmh)
{
fgTXProofMgrHook = pmh;
}
Int_t TProofMgr::Ping(const char *url, Bool_t checkxrd)
{
if (!url || (url && strlen(url) <= 0)) {
::Error("TProofMgr::Ping", "empty url - fail");
return -1;
}
TUrl u(url);
if (!strcmp(u.GetProtocol(), "http") && u.GetPort() == 80) {
if (!checkxrd) {
u.SetPort(1093);
} else {
u.SetPort(1094);
}
}
Int_t oldLevel = gErrorIgnoreLevel;
gErrorIgnoreLevel = kSysError+1;
TSocket s(u.GetHost(), u.GetPort());
if (!(s.IsValid())) {
if (gDebug > 0)
::Info("TProofMgr::Ping", "could not open connection to %s:%d", u.GetHost(), u.GetPort());
gErrorIgnoreLevel = oldLevel;
return -1;
}
int writeCount = -1;
clnt_HS_t initHS;
memset(&initHS, 0, sizeof(initHS));
int len = sizeof(initHS);
if (checkxrd) {
initHS.fourth = (int)host2net((int)4);
initHS.fifth = (int)host2net((int)2012);
if ((writeCount = s.SendRaw(&initHS, len)) != len) {
if (gDebug > 0)
::Info("TProofMgr::Ping", "1st: wrong number of bytes sent: %d (expected: %d)",
writeCount, len);
gErrorIgnoreLevel = oldLevel;
return 1;
}
} else {
initHS.third = (int)host2net((int)1);
if ((writeCount = s.SendRaw(&initHS, len)) != len) {
if (gDebug > 0)
::Info("TProofMgr::Ping", "1st: wrong number of bytes sent: %d (expected: %d)",
writeCount, len);
gErrorIgnoreLevel = oldLevel;
return 1;
}
int dum[2];
dum[0] = (int)host2net((int)4);
dum[1] = (int)host2net((int)2012);
if ((writeCount = s.SendRaw(&dum[0], sizeof(dum))) != sizeof(dum)) {
if (gDebug > 0)
::Info("TProofMgr::Ping", "2nd: wrong number of bytes sent: %d (expected: %d)",
writeCount, (int) sizeof(dum));
gErrorIgnoreLevel = oldLevel;
return 1;
}
}
int type;
len = sizeof(type);
int readCount = s.RecvRaw(&type, len);
if (readCount != len) {
if (gDebug > 0)
::Info("TProofMgr::Ping", "1st: wrong number of bytes read: %d (expected: %d)",
readCount, len);
gErrorIgnoreLevel = oldLevel;
return 1;
}
type = net2host(type);
if (type == 0) {
srv_HS_t xbody;
len = sizeof(xbody);
readCount = s.RecvRaw(&xbody, len);
if (readCount != len) {
if (gDebug > 0)
::Info("TProofMgr::Ping", "2nd: wrong number of bytes read: %d (expected: %d)",
readCount, len);
gErrorIgnoreLevel = oldLevel;
return 1;
}
xbody.protover = net2host(xbody.protover);
xbody.msgval = net2host(xbody.msglen);
xbody.msglen = net2host(xbody.msgval);
} else if (type == 8) {
if (gDebug > 0) ::Info("TProofMgr::Ping", "server is old %s", (checkxrd ? "ROOTD" : "PROOFD"));
gErrorIgnoreLevel = oldLevel;
return 1;
} else {
if (gDebug > 0) ::Info("TProofMgr::Ping", "unknown server type: %d", type);
gErrorIgnoreLevel = oldLevel;
return 1;
}
gErrorIgnoreLevel = oldLevel;
return 0;
}
void TProofMgr::ReplaceSubdirs(const char *fn, TString &fdst, TList &dirph)
{
if (!fn || (fn && strlen(fn) <= 0)) return;
if (dirph.GetSize() <= 0) return;
TList dirs;
TString dd(fn), d;
Ssiz_t from = 0;
while (dd.Tokenize(d, from, "/")) {
if (!d.IsNull()) dirs.Add(new TObjString(d));
}
if (dirs.GetSize() <= 0) return;
dirs.SetOwner(kTRUE);
TIter nxph(&dirph);
TParameter<Int_t> *pi = 0;
while ((pi = (TParameter<Int_t> *) nxph())) {
if (pi->GetVal() < dirs.GetSize()) {
TObjString *os = (TObjString *) dirs.At(pi->GetVal());
if (os) fdst.ReplaceAll(pi->GetName(), os->GetName());
} else {
::Warning("TProofMgr::ReplaceSubdirs",
"requested directory level '%s' is not available in the file path",
pi->GetName());
}
}
}
TFileCollection *TProofMgr::UploadFiles(TList *src,
const char *mss, const char *dest)
{
TFileCollection *ds = 0;
if (!src || (src && src->GetSize() <= 0)) {
::Warning("TProofMgr::UploadFiles", "list is empty!");
return ds;
}
if (!mss || (mss && strlen(mss) <= 0)) {
::Warning("TProofMgr::UploadFiles", "MSS is undefined!");
return ds;
}
TList dirph;
if (dest && strlen(dest) > 0) {
TString dst(dest), dt;
Ssiz_t from = 0;
TRegexp re("<d+[0-9]>");
while (dst.Tokenize(dt, from, "/")) {
if (dt.Contains(re)) {
TParameter<Int_t> *pi = new TParameter<Int_t>(dt, -1);
dt.ReplaceAll("<d", "");
dt.ReplaceAll(">", "");
if (dt.IsDigit()) {
pi->SetVal(dt.Atoi());
dirph.Add(pi);
} else {
SafeDelete(pi);
}
}
}
dirph.SetOwner(kTRUE);
}
TString sForm = TString::Format("%%0%dd",
Int_t(TMath::Log10(src->GetEntries()+1)));
ds = new TFileCollection();
TIter nxf(src);
TObject *o = 0;
TObjString *os = 0;
TFileInfo *fi = 0;
Int_t kn = 0;
while ((o = nxf())) {
TUrl *furl = 0;
if (!strcmp(o->ClassName(), "TFileInfo")) {
if (!(fi = dynamic_cast<TFileInfo *>(o))) {
::Warning("TProofMgr::UploadFiles",
"object of class name '%s' does not cast to %s - ignore",
o->ClassName(), o->ClassName());
continue;
}
furl = fi->GetFirstUrl();
} else if (!strcmp(o->ClassName(), "TObjString")) {
if (!(os = dynamic_cast<TObjString *>(o))) {
::Warning("TProofMgr::UploadFiles",
"object of class name '%s' does not cast to %s - ignore",
o->ClassName(), o->ClassName());
continue;
}
furl = new TUrl(os->GetName());
} else {
::Warning("TProofMgr::UploadFiles",
"object of unsupported class '%s' found in list - ignore", o->ClassName());
continue;
}
if (gSystem->AccessPathName(furl->GetUrl()) == kFALSE) {
TString fdst(mss);
if (dest && strlen(dest) > 0) {
fdst += dest;
} else {
fdst += TString::Format("/%s", furl->GetFile());
}
if (fdst.Contains("<bn>")) fdst.ReplaceAll("<bn>", gSystem->BaseName(furl->GetFile()));
if (fdst.Contains("<fn>")) fdst.ReplaceAll("<fn>", furl->GetFile());
if (fdst.Contains("<bs>")) {
TString bs(gSystem->BaseName(furl->GetFile()));
Int_t idx = bs.Last('.');
if (idx != kNPOS) bs.Remove(idx);
fdst.ReplaceAll("<bs>", bs.Data());
}
if (fdst.Contains("<ex>")) {
TString ex(furl->GetFile());
Int_t idx = ex.Last('.');
if (idx != kNPOS) ex.Remove(0, idx+1);
else ex = "";
fdst.ReplaceAll("<ex>", ex);
}
if (fdst.Contains("<pa>")) {
fdst.ReplaceAll("<pa>",
gSystem->BaseName(gSystem
->DirName(furl->GetFile())));
}
if (fdst.Contains("<gp>")) {
fdst.ReplaceAll("<gp>",
gSystem->BaseName(gSystem
->DirName(gSystem
->DirName(furl->GetFile()))));
}
if (fdst.Contains("<sn>")) {
TString skn = TString::Format("%d", kn);
fdst.ReplaceAll("<sn>", skn);
}
if (fdst.Contains("<s0>")) {
TString skn = TString::Format(sForm.Data(), kn);
fdst.ReplaceAll("<s0>", skn);
}
kn++;
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
if (fdst.Contains("<us>")) fdst.ReplaceAll("<us>", pw->fUser);
if (fdst.Contains("<gr>")) fdst.ReplaceAll("<gr>", pw->fGroup);
delete pw;
}
if (gProof && fdst.Contains("<pg>"))
fdst.ReplaceAll("<pg>", gProof->GetGroup());
if (dirph.GetSize() > 0)
TProofMgr::ReplaceSubdirs(gSystem->DirName(furl->GetFile()), fdst, dirph);
TUrl u(fdst);
fdst = u.GetUrl();
::Info("TProofMgr::UploadFiles", "uploading '%s' to '%s'", furl->GetUrl(), fdst.Data());
if (TFile::Cp(furl->GetUrl(), fdst.Data())) {
ds->Add(new TFileInfo(fdst.Data()));
} else {
::Error("TProofMgr::UploadFiles", "file %s was not copied", furl->GetUrl());
}
}
}
return ds;
}
TFileCollection *TProofMgr::UploadFiles(const char *srcfiles,
const char *mss, const char *dest)
{
TFileCollection *ds = 0;
if (!srcfiles || (srcfiles && strlen(srcfiles) <= 0)) {
::Error("TProofMgr::UploadFiles", "input text file or directory undefined!");
return ds;
}
if (!mss || (mss && strlen(mss) <= 0)) {
::Error("TProofMgr::UploadFiles", "MSS is undefined!");
return ds;
}
TString inpath(gSystem->ExpandPathName(srcfiles));
FileStat_t fst;
if (gSystem->GetPathInfo(inpath.Data(), fst)) {
::Error("TProofMgr::UploadFiles",
"could not get information about the input path '%s':"
" make sure that it exists and is readable", srcfiles);
return ds;
}
TList files;
files.SetOwner();
TString line;
if (R_ISREG(fst.fMode)) {
std::ifstream f;
f.open(inpath.Data(), std::ifstream::out);
if (f.is_open()) {
while (f.good()) {
line.ReadToDelim(f);
line.Strip(TString::kTrailing, '\n');
if (line.BeginsWith("#")) continue;
if (gSystem->AccessPathName(line, kReadPermission) == kFALSE)
files.Add(new TFileInfo(line));
}
f.close();
} else {
::Error("TProofMgr::UploadFiles", "unable to open file '%s'", srcfiles);
}
} else if (R_ISDIR(fst.fMode)) {
void *dirp = gSystem->OpenDirectory(inpath.Data());
if (dirp) {
const char *ent = 0;
while ((ent = gSystem->GetDirEntry(dirp))) {
if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
line.Form("%s/%s", inpath.Data(), ent);
if (gSystem->AccessPathName(line, kReadPermission) == kFALSE)
files.Add(new TFileInfo(line));
}
gSystem->FreeDirectory(dirp);
} else {
::Error("TProofMgr::UploadFiles", "unable to open directory '%s'", inpath.Data());
}
} else {
::Error("TProofMgr::UploadFiles",
"input path '%s' is neither a regular file nor a directory!", inpath.Data());
return ds;
}
if (files.GetSize() <= 0) {
::Warning("TProofMgr::UploadFiles", "no files found in file or directory '%s'", inpath.Data());
} else {
ds = TProofMgr::UploadFiles(&files, mss, dest);
}
return ds;
}
Int_t TProofMgr::Rm(const char *what, const char *, const char *)
{
Int_t rc = -1;
if (!IsValid()) {
Error("Rm", "invalid TProofMgr - do nothing");
return rc;
}
if (!what || (what && strlen(what) <= 0)) {
Error("Rm", "path undefined!");
return rc;
}
TUrl u(what);
if (!strcmp(u.GetProtocol(), "file")) {
rc = gSystem->Unlink(u.GetFile());
} else {
rc = gSystem->Unlink(what);
}
return (rc == 0) ? 0 : -1;
}
ClassImp(TProofDesc)
void TProofDesc::Print(Option_t *) const
{
const char *st[] = { "unknown", "idle", "processing", "shutting down"};
Printf("// # %d", fLocalId);
Printf("// alias: %s, url: \"%s\"", GetTitle(), GetUrl());
Printf("// tag: %s", GetName());
Printf("// status: %s, attached: %s (remote ID: %d)",st[fStatus+1], (fProof ? "YES" : "NO"), fRemoteId);
}