#include <errno.h>
#ifdef WIN32
#include <io.h>
#endif
#include "Getline.h"
#include "TList.h"
#include "TObjArray.h"
#include "TObjString.h"
#include "TProof.h"
#include "TProofLog.h"
#include "TXProofMgr.h"
#include "TXSocket.h"
#include "TXSocketHandler.h"
#include "TROOT.h"
#include "TStopwatch.h"
#include "TSysEvtHandler.h"
#include "XProofProtocol.h"
ClassImp(TXProofMgr)
class TProofMgrInterruptHandler : public TSignalHandler {
private:
TProofMgr *fMgr;
TProofMgrInterruptHandler(const TProofMgrInterruptHandler&);
TProofMgrInterruptHandler& operator=(const TProofMgrInterruptHandler&);
public:
TProofMgrInterruptHandler(TProofMgr *mgr)
: TSignalHandler(kSigInterrupt, kFALSE), fMgr(mgr) { }
Bool_t Notify();
};
Bool_t TProofMgrInterruptHandler::Notify()
{
if (isatty(0) != 0 && isatty(1) != 0) {
TString u = fMgr->GetUrl();
Printf("Opening new connection to %s", u.Data());
TXSocket *s = new TXSocket(u, 'C', kPROOF_Protocol,
kXPROOF_Protocol, 0, -1, (TXHandler *)fMgr);
if (s && s->IsValid()) {
s->CtrlC();
}
}
return kTRUE;
}
TProofMgr *GetTXProofMgr(const char *url, Int_t l, const char *al)
{ return ((TProofMgr *) new TXProofMgr(url, l, al)); }
class TXProofMgrInit {
public:
TXProofMgrInit() {
TProofMgr::SetTXProofMgrHook(&GetTXProofMgr);
}};
static TXProofMgrInit gxproofmgr_init;
TXProofMgr::TXProofMgr(const char *url, Int_t dbg, const char *alias)
: TProofMgr(url, dbg, alias)
{
fServType = kXProofd;
if (Init(dbg) != 0) {
SafeDelete(fSocket);
}
}
Int_t TXProofMgr::Init(Int_t)
{
TString u = fUrl.GetUrl(kTRUE);
fSocket = 0;
if (!(fSocket = new TXSocket(u, 'C', kPROOF_Protocol,
kXPROOF_Protocol, 0, -1, this)) ||
!(fSocket->IsValid())) {
if (!fSocket || !(fSocket->IsServProofd()))
if (gDebug > 0)
Error("Init", "while opening the connection to %s - exit (error: %d)",
u.Data(), (fSocket ? fSocket->GetOpenError() : -1));
if (fSocket && fSocket->IsServProofd())
fServType = TProofMgr::kProofd;
return -1;
}
fRemoteProtocol = fSocket->GetRemoteProtocol();
{ R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(fSocket);
}
fIntHandler = new TProofMgrInterruptHandler(this);
return 0;
}
TXProofMgr::~TXProofMgr()
{
SetInvalid();
}
void TXProofMgr::SetInvalid()
{
if (fSocket)
fSocket->Close("P");
SafeDelete(fSocket);
{ R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(this);
}
}
TProof *TXProofMgr::AttachSession(TProofDesc *d, Bool_t gui)
{
if (!IsValid()) {
Warning("AttachSession","invalid TXProofMgr - do nothing");
return 0;
}
if (!d) {
Warning("AttachSession","invalid description object - do nothing");
return 0;
}
if (d->GetProof())
return d->GetProof();
TString u(Form("%s/?%d", fUrl.GetUrl(kTRUE), d->GetRemoteId()));
if (gui)
u += "GUI";
TProof *p = new TProof(u, 0, 0, gDebug, 0, this);
if (p && p->IsValid()) {
p->SetManager(this);
Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
: TProofDesc::kRunning;
d->SetStatus(st);
d->SetProof(p);
p->SetName(d->GetName());
} else {
Error("AttachSession", "attaching to PROOF session");
}
return p;
}
void TXProofMgr::DetachSession(Int_t id, Option_t *opt)
{
if (!IsValid()) {
Warning("DetachSession","invalid TXProofMgr - do nothing");
return;
}
if (id > 0) {
TProofDesc *d = GetProofDesc(id);
if (d) {
if (fSocket)
fSocket->DisconnectSession(d->GetRemoteId(), opt);
TProof *p = d->GetProof();
fSessions->Remove(d);
SafeDelete(p);
delete d;
}
} else if (id == 0) {
if (fSocket) {
TString o = Form("%sA",opt);
fSocket->DisconnectSession(-1, o);
}
if (fSessions) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
TProof *p = d->GetProof();
SafeDelete(p);
}
fSessions->Delete();
}
}
return;
}
void TXProofMgr::DetachSession(TProof *p, Option_t *opt)
{
if (!IsValid()) {
Warning("DetachSession","invalid TXProofMgr - do nothing");
return;
}
if (p) {
TProofDesc *d = GetProofDesc(p);
if (d) {
if (fSocket)
fSocket->DisconnectSession(d->GetRemoteId(), opt);
fSessions->Remove(d);
p->Close(opt);
delete d;
}
}
return;
}
Bool_t TXProofMgr::MatchUrl(const char *url)
{
if (!IsValid()) {
Warning("MatchUrl","invalid TXProofMgr - do nothing");
return 0;
}
TUrl u(url);
if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
u.SetProtocol("proof");
if (u.GetPort() == TUrl("a").GetPort()) {
Int_t port = gSystem->GetServiceByName("proofd");
if (port < 0)
port = 1093;
u.SetPort(port);
}
if (!strcmp(u.GetHostFQDN(), fUrl.GetHost()))
if (u.GetPort() == fUrl.GetPort() ||
u.GetPort() == fSocket->GetPort())
if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
return kTRUE;
return kFALSE;
}
void TXProofMgr::ShowWorkers()
{
if (!IsValid()) {
Warning("ShowWorkers","invalid TXProofMgr - do nothing");
return;
}
TObjString *os = fSocket->SendCoordinator(kQueryWorkers);
if (os) {
TObjArray *oa = TString(os->GetName()).Tokenize(TString("&"));
if (oa) {
TIter nxos(oa);
TObjString *to = 0;
while ((to = (TObjString *) nxos()))
Printf("+ %s", to->GetName());
}
}
}
const char *TXProofMgr::GetMssUrl(Bool_t retrieve)
{
if (fMssUrl.IsNull() || retrieve) {
if (!IsValid()) {
Error("GetMssUrl", "invalid TXProofMgr - do nothing");
return 0;
}
if (fSocket->GetXrdProofdVersion() < 1007) {
Error("GetMssUrl", "functionality not supported by server");
return 0;
}
TObjString *os = fSocket->SendCoordinator(kQueryMssUrl);
if (os) {
Printf("os: '%s'", os->GetName());
fMssUrl = os->GetName();
SafeDelete(os);
} else {
Error("GetMssUrl", "problems retrieving the required information");
return 0;
}
} else if (!IsValid()) {
Warning("GetMssUrl", "TXProofMgr is now invalid: information may not be valid");
return 0;
}
return fMssUrl.Data();
}
TList *TXProofMgr::QuerySessions(Option_t *opt)
{
if (opt && !strncasecmp(opt,"L",1))
return fSessions;
if (!IsValid()) {
Warning("QuerySessions","invalid TXProofMgr - do nothing");
return 0;
}
if (!fSessions) {
fSessions = new TList();
fSessions->SetOwner();
}
TList *ocl = new TList;
TObjString *os = fSocket->SendCoordinator(kQuerySessions);
if (os) {
TObjArray *oa = TString(os->GetName()).Tokenize(TString("|"));
if (oa) {
TProofDesc *d = 0;
TIter nxos(oa);
TObjString *to = (TObjString *) nxos();
if (to && to->GetString().IsDigit() && !strncasecmp(opt,"S",1))
Printf("// +++ %s session(s) currently active +++", to->GetName());
while ((to = (TObjString *) nxos())) {
Int_t id = -1, st = -1;
TString al, tg, tk;
Ssiz_t from = 0;
while (to->GetString()[from] == ' ') { from++; }
if (!to->GetString().Tokenize(tk, from, " ") || !tk.IsDigit()) continue;
id = tk.Atoi();
if (!to->GetString().Tokenize(tg, from, " ")) continue;
if (!to->GetString().Tokenize(al, from, " ")) continue;
if (!to->GetString().Tokenize(tk, from, " ") || !tk.IsDigit()) continue;
st = tk.Atoi();
if (!(d = (TProofDesc *) fSessions->FindObject(tg))) {
Int_t locid = fSessions->GetSize() + 1;
d = new TProofDesc(tg, al, GetUrl(), locid, id, st, 0);
fSessions->Add(d);
} else {
d->SetStatus(st);
d->SetRemoteId(id);
d->SetTitle(al);
}
ocl->Add(new TObjString(tg));
}
SafeDelete(oa);
}
SafeDelete(os);
}
if (fSessions->GetSize() > 0) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
if (ocl->FindObject(d->GetName())) {
if (opt && !strncasecmp(opt,"S",1))
d->Print("");
} else {
fSessions->Remove(d);
SafeDelete(d);
}
}
}
return fSessions;
}
Bool_t TXProofMgr::HandleInput(const void *)
{
if (fSocket && fSocket->IsValid()) {
TMessage *mess;
if (fSocket->Recv(mess) >= 0) {
Int_t what = mess->What();
if (gDebug > 0)
Info("HandleInput", "%p: got message type: %d", this, what);
switch (what) {
case kPROOF_TOUCH:
fSocket->RemoteTouch();
break;
default:
Warning("HandleInput", "%p: got unknown message type: %d", this, what);
break;
}
}
} else {
Warning("HandleInput", "%p: got message but socket is invalid!", this);
}
return kTRUE;
}
Bool_t TXProofMgr::HandleError(const void *in)
{
XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
if (fSocket && herr && (herr->fOpt == 1)) {
fSocket->Reconnect();
if (fSocket && fSocket->IsValid()) {
if (gDebug > 0)
Printf("ProofMgr: connection to coordinator at %s re-established",
fUrl.GetUrl());
return kFALSE;
}
}
Printf("TXProofMgr::HandleError: %p: got called ...", this);
if (fSessions && fSessions->GetSize() > 0) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
TProof *p = (TProof *) d->GetProof();
if (p)
p->InterruptCurrentMonitor();
}
}
if (gDebug > 0)
Printf("TXProofMgr::HandleError: %p: DONE ... ", this);
return kTRUE;
}
Int_t TXProofMgr::Reset(Bool_t hard, const char *usr)
{
if (!IsValid()) {
Warning("Reset","invalid TXProofMgr - do nothing");
return -1;
}
Int_t h = (hard) ? 1 : 0;
fSocket->SendCoordinator(kCleanupSessions, usr, h);
return 0;
}
TProofLog *TXProofMgr::GetSessionLogs(Int_t isess, const char *stag,
const char *pattern, Bool_t rescan)
{
if (!IsValid()) {
Warning("GetSessionLogs","invalid TXProofMgr - do nothing");
return 0;
}
TProofLog *pl = 0;
isess = (isess > 0) ? -isess : isess;
bool retrieve = 1;
TString sesstag(stag);
if (sesstag == "NR") {
retrieve = 0;
sesstag = "";
}
Int_t xrs = (rescan) ? 1 : 0;
TObjString *os = fSocket->SendCoordinator(kQueryLogPaths, sesstag.Data(), isess, -1, xrs);
Int_t ii = 0;
if (os) {
TString rs(os->GetName());
Ssiz_t from = 0;
TString tag;
if (!rs.Tokenize(tag, from, "|")) {
Warning("GetSessionLogs", "Session tag undefined: corruption?\n"
" (received string: %s)", os->GetName());
return (TProofLog *)0;
}
TString purl;
if (!rs.Tokenize(purl, from, "|")) {
Warning("GetSessionLogs", "Pool URL undefined: corruption?\n"
" (received string: %s)", os->GetName());
return (TProofLog *)0;
}
if (!pl)
pl = new TProofLog(tag, GetUrl(), this);
TString to;
while (rs.Tokenize(to, from, "|")) {
if (!to.IsNull()) {
TString ord(to);
ord.Strip(TString::kLeading, ' ');
TString url(ord);
if ((ii = ord.Index(" ")) != kNPOS)
ord.Remove(ii);
if ((ii = url.Index(" ")) != kNPOS)
url.Remove(0, ii + 1);
if (url.Contains(".valgrind")) ord += "-valgrind";
pl->Add(ord, url);
if (gDebug > 1)
Info("GetSessionLogs", "ord: %s, url: %s", ord.Data(), url.Data());
}
}
SafeDelete(os);
if (pl && retrieve) {
const char *pat = pattern ? pattern : "-v \"| SvcMsg\"";
if (pat && strlen(pat) > 0)
pl->Retrieve("*", TProofLog::kGrep, 0, pat);
else
pl->Retrieve();
}
}
return pl;
}
TObjString *TXProofMgr::ReadBuffer(const char *fin, Long64_t ofs, Int_t len)
{
if (!IsValid()) {
Warning("ReadBuffer","invalid TXProofMgr - do nothing");
return (TObjString *)0;
}
return fSocket->SendCoordinator(kReadBuffer, fin, len, ofs, 0);
}
TObjString *TXProofMgr::ReadBuffer(const char *fin, const char *pattern)
{
if (!IsValid()) {
Warning("ReadBuffer", "invalid TXProofMgr - do nothing");
return (TObjString *)0;
}
const char *ptr;
Int_t type;
if (*pattern == '|') {
ptr = &pattern[1];
type = 3;
}
else {
ptr = pattern;
type = 1;
}
Int_t plen = strlen(ptr);
Int_t lfi = strlen(fin);
char *buf = new char[lfi + plen + 1];
memcpy(buf, fin, lfi);
memcpy(buf+lfi, ptr, plen);
buf[lfi+plen] = 0;
return fSocket->SendCoordinator(kReadBuffer, buf, plen, 0, type);
}
void TXProofMgr::ShowROOTVersions()
{
if (!IsValid()) {
Warning("ShowROOTVersions","invalid TXProofMgr - do nothing");
return;
}
TObjString *os = fSocket->SendCoordinator(kQueryROOTVersions);
if (os) {
Printf("----------------------------------------------------------\n");
Printf("Available versions (tag ROOT-vers remote-path PROOF-version):\n");
Printf("%s", os->GetName());
Printf("----------------------------------------------------------");
SafeDelete(os);
}
return;
}
Int_t TXProofMgr::SetROOTVersion(const char *tag)
{
if (!IsValid()) {
Warning("SetROOTVersion","invalid TXProofMgr - do nothing");
return -1;
}
fSocket->SendCoordinator(kROOTVersion, tag);
return (fSocket->GetOpenError() != kXR_noErrorYet) ? -1 : 0;
}
Int_t TXProofMgr::SendMsgToUsers(const char *msg, const char *usr)
{
Int_t rc = 0;
if (!msg || strlen(msg) <= 0) {
Error("SendMsgToUsers","no message to send - do nothing");
return -1;
}
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF] = {0};
char *p = &buf[0];
size_t space = kMAXBUF - 1;
Int_t lusr = 0;
if (usr && strlen(usr) > 0 && (strlen(usr) != 1 || usr[0] != '*')) {
lusr = (strlen(usr) + 3);
snprintf(buf, kMAXBUF, "u:%s ", usr);
p += lusr;
space -= lusr;
}
ssize_t len = 0;
if (!gSystem->AccessPathName(msg, kFileExists)) {
if (gSystem->AccessPathName(msg, kReadPermission)) {
Error("SendMsgToUsers","request to read message from unreadable file '%s'", msg);
return -1;
}
FILE *f = 0;
if (!(f = fopen(msg, "r"))) {
Error("SendMsgToUsers", "file '%s' cannot be open", msg);
return -1;
}
size_t left = 0;
off_t rcsk = lseek(fileno(f), (off_t) 0, SEEK_END);
if ((rcsk != (off_t)(-1))) {
left = (size_t) rcsk;
if ((lseek(fileno(f), (off_t) 0, SEEK_SET) == (off_t)(-1))) {
Error("SendMsgToUsers", "cannot rewind open file (seek to 0)");
fclose(f);
return -1;
}
} else {
Error("SendMsgToUsers", "cannot get size of open file (seek to END)");
fclose(f);
return -1;
}
size_t wanted = left;
if (wanted > space) {
wanted = space;
Warning("SendMsgToUsers",
"requested to send %lld bytes: max size is %lld bytes: truncating",
(Long64_t)left, (Long64_t)space);
}
do {
while ((len = read(fileno(f), p, wanted)) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendMsgToUsers", "error reading file");
break;
}
left = (len >= (ssize_t)left) ? 0 : left - len;
p += len;
wanted = (left > kMAXBUF-1) ? kMAXBUF-1 : left;
} while (len > 0 && left > 0);
fclose(f);
} else {
len = strlen(msg);
if (len > (ssize_t)space) {
Warning("SendMsgToUsers",
"requested to send %lld bytes: max size is %lld bytes: truncating",
(Long64_t)len, (Long64_t)space);
len = space;
}
memcpy(p, msg, len);
}
buf[len + lusr] = 0;
fSocket->SendCoordinator(kSendMsgToUser, buf);
return rc;
}
void TXProofMgr::Grep(const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("Grep","invalid TXProofMgr - do nothing");
return;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Grep", "functionality not supported by server");
return;
}
TObjString *os = Exec(kGrep, what, how, where);
if (os) Printf("%s", os->GetName());
SafeDelete(os);
}
void TXProofMgr::Find(const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("Find","invalid TXProofMgr - do nothing");
return;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Find", "functionality not supported by server (XrdProofd version: %d)",
fSocket->GetXrdProofdVersion());
return;
}
TObjString *os = Exec(kFind, what, how, where);
if (os) Printf("%s", os->GetName());
SafeDelete(os);
}
void TXProofMgr::Ls(const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("Ls","invalid TXProofMgr - do nothing");
return;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Ls", "functionality not supported by server");
return;
}
TObjString *os = Exec(kLs, what, how, where);
if (os) Printf("%s", os->GetName());
SafeDelete(os);
}
void TXProofMgr::More(const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("More","invalid TXProofMgr - do nothing");
return;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("More", "functionality not supported by server");
return;
}
TObjString *os = Exec(kMore, what, how, where);
if (os) Printf("%s", os->GetName());
SafeDelete(os);
}
Int_t TXProofMgr::Rm(const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("Rm","invalid TXProofMgr - do nothing");
return -1;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Rm", "functionality not supported by server");
return -1;
}
TString prompt, ans("Y"), opt(how);
Bool_t force = kFALSE;
if (!opt.IsNull()) {
TString t;
Int_t from = 0;
while (!force && opt.Tokenize(t, from, " ")) {
if (t == "--force") {
force = kTRUE;
} else if (t.BeginsWith("-") && !t.BeginsWith("--") && t.Contains("f")) {
force = kTRUE;
}
}
}
if (!force && isatty(0) != 0 && isatty(1) != 0) {
prompt.Form("Do you really want to remove '%s'? [N/y]", what);
ans = "";
while (ans != "N" && ans != "Y") {
ans = Getline(prompt.Data());
ans.Remove(TString::kTrailing, '\n');
if (ans == "") ans = "N";
ans.ToUpper();
if (ans != "N" && ans != "Y")
Printf("Please answer y, Y, n or N");
}
}
if (ans == "Y") {
TObjString *os = Exec(kRm, what, how, where);
if (os) {
if (gDebug > 1) Printf("%s", os->GetName());
SafeDelete(os);
return 0;
}
return -1;
}
return 0;
}
void TXProofMgr::Tail(const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("Tail","invalid TXProofMgr - do nothing");
return;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Tail", "functionality not supported by server");
return;
}
TObjString *os = Exec(kTail, what, how, where);
if (os) Printf("%s", os->GetName());
SafeDelete(os);
}
Int_t TXProofMgr::Md5sum(const char *what, TString &sum, const char *where)
{
if (!IsValid()) {
Error("Md5sum","invalid TXProofMgr - do nothing");
return -1;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Md5sum", "functionality not supported by server");
return -1;
}
if (where && !strcmp(where, "all")) {
Error("Md5sum","cannot run on all nodes at once: please specify one");
return -1;
}
TObjString *os = Exec(kMd5sum, what, 0, where);
if (os) {
if (gDebug > 1) Printf("%s", os->GetName());
sum = os->GetName();
SafeDelete(os);
return 0;
}
return -1;
}
Int_t TXProofMgr::Stat(const char *what, FileStat_t &st, const char *where)
{
if (!IsValid()) {
Error("Stat","invalid TXProofMgr - do nothing");
return -1;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Stat", "functionality not supported by server");
return -1;
}
if (where && !strcmp(where, "all")) {
Error("Stat","cannot run on all nodes at once: please specify one");
return -1;
}
TObjString *os = Exec(kStat, what, 0, where);
if (os) {
if (gDebug > 1) Printf("%s", os->GetName());
#if 0
Int_t mode, uid, gid, islink;
Long_t dev, ino, mtime;
Long64_t size;
#ifdef R__WIN32
sscanf(os->GetName(), "%ld %ld %d %d %d %I64d %ld %d", &dev, &ino, &mode,
&uid, &gid, &size, &mtime, &islink);
#else
sscanf(os->GetName(), "%ld %ld %d %d %d %lld %ld %d", &dev, &ino, &mode,
&uid, &gid, &size, &mtime, &islink);
#endif
if (dev == -1)
return -1;
st.fDev = dev;
st.fIno = ino;
st.fMode = mode;
st.fUid = uid;
st.fGid = gid;
st.fSize = size;
st.fMtime = mtime;
st.fIsLink = (islink == 1);
#else
TString tkn;
Ssiz_t from = 0;
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fDev = tkn.Atoi();
if (st.fDev == -1) return -1;
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fIno = tkn.Atoi();
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fMode = tkn.Atoi();
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fUid = tkn.Atoi();
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fGid = tkn.Atoi();
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fSize = tkn.Atoll();
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fMtime = tkn.Atoi();
if (!os->GetString().Tokenize(tkn, from, "[ ]+") || !tkn.IsDigit()) return -1;
st.fIsLink = (tkn.Atoi() == 1) ? kTRUE : kFALSE;
#endif
SafeDelete(os);
return 0;
}
return -1;
}
TObjString *TXProofMgr::Exec(Int_t action,
const char *what, const char *how, const char *where)
{
if (!IsValid()) {
Error("Exec","invalid TXProofMgr - do nothing");
return (TObjString *)0;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Exec", "functionality not supported by server");
return (TObjString *)0;
}
if (!what || strlen(what) <= 0) {
Error("Exec","specifying a path is mandatory");
return (TObjString *)0;
}
TString opt(how);
if (action == kTail && !opt.IsNull()) {
TString opts(how), o;
Int_t from = 0;
Bool_t isc = kFALSE, isn = kFALSE;
while (opts.Tokenize(o, from, " ")) {
if (!o.BeginsWith("-") && !isc && isn) continue;
if (isc) {
opt.Form("-c %s", o.Data());
isc = kFALSE;
}
if (isn) {
opt.Form("-n %s", o.Data());
isn = kFALSE;
}
if (o == "-c") {
isc = kTRUE;
} else if (o == "-n") {
isn = kTRUE;
} else if (o == "--bytes=" || o == "--lines=") {
opt = o;
} else if (o.BeginsWith("-")) {
o.Remove(TString::kLeading,'-');
if (o.IsDigit()) opt.Form("-%s", o.Data());
}
}
}
TString cmd(where);
if (cmd.IsNull()) cmd.Form("%s:%d", fUrl.GetHost(), fUrl.GetPort());
cmd += "|";
cmd += what;
cmd += "|";
cmd += opt;
if (fIntHandler) fIntHandler->Add();
TObjString *os = fSocket->SendCoordinator(kExec, cmd.Data(), action);
if (fIntHandler) fIntHandler->Remove();
return os;
}
Int_t TXProofMgr::GetFile(const char *remote, const char *local, const char *opt)
{
Int_t rc = -1;
if (!IsValid()) {
Error("GetFile", "invalid TXProofMgr - do nothing");
return rc;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("GetFile", "functionality not supported by server");
return rc;
}
TString filerem(remote);
if (filerem.IsNull()) {
Error("GetFile", "remote file path undefined");
return rc;
}
TString oo(opt);
oo.ToUpper();
Bool_t force = (oo.Contains("FORCE")) ? kTRUE : kFALSE;
Bool_t silent = (oo.Contains("SILENT")) ? kTRUE : kFALSE;
TString fileloc(local);
if (fileloc.IsNull()) {
fileloc = gSystem->BaseName(filerem);
}
gSystem->ExpandPathName(fileloc);
#ifdef WIN32
UInt_t openflags = O_WRONLY | O_BINARY;
#else
UInt_t openflags = O_WRONLY;
#endif
UInt_t openmode = 0600;
UserGroup_t *ugloc = 0;
Int_t rcloc = 0;
FileStat_t stloc;
if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) == 0) {
if (R_ISDIR(stloc.fMode)) {
if (!fileloc.EndsWith("/")) fileloc += "/";
fileloc += gSystem->BaseName(filerem);
rcloc = gSystem->GetPathInfo(fileloc, stloc);
}
if (rcloc == 0) {
if (!R_ISREG(stloc.fMode)) {
if (!silent)
Printf("[GetFile] local file '%s' exists and is not regular: cannot continue",
fileloc.Data());
return rc;
}
if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
Error("GetFile", "cannot get user info for additional checks");
return rc;
}
Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
delete ugloc;
if ((owner && !(stloc.fMode & kS_IWUSR)) ||
(group && !(stloc.fMode & kS_IWGRP)) || (other && !(stloc.fMode & kS_IWOTH))) {
if (!silent) {
Printf("[GetFile] file '%s' exists: no permission to delete or overwrite the file", fileloc.Data());
Printf("[GetFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
Printf("[GetFile] mode: %x", stloc.fMode);
}
return rc;
}
openflags |= O_CREAT | O_TRUNC;
} else {
openflags |= O_CREAT;
}
} else {
openflags |= O_CREAT;
}
TString remsum;
if (Md5sum(filerem, remsum) != 0) {
if (!silent)
Printf("[GetFile] remote file '%s' does not exists or cannot be read", filerem.Data());
return rc;
}
bool same = 0;
if (rcloc == 0 && !force) {
TMD5 *md5loc = TMD5::FileChecksum(fileloc);
if (md5loc) {
if (remsum == md5loc->AsString()) {
if (!silent) {
Printf("[GetFile] local file '%s' and remote file '%s' have the same MD5 check sum",
fileloc.Data(), filerem.Data());
Printf("[GetFile] use option 'force' to override");
}
same = 1;
}
delete md5loc;
}
if (!same) {
const char *a = Getline("Local file exists already: would you like to overwrite it? [N/y]");
if (a[0] == 'n' || a[0] == 'N' || a[0] == '\0') return 0;
} else {
return 0;
}
}
Int_t fdout = open(fileloc, openflags, openmode);
if (fdout < 0) {
Error("GetFile", "could not open local file '%s' for writing: errno: %d", local, errno);
return rc;
}
TString cmd(filerem);
gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
TStopwatch watch;
watch.Start();
TObjString *os = fSocket->SendCoordinator(kGetFile, cmd.Data());
if (os) {
TString ssz(os->GetName());
ssz.ReplaceAll(" ", "");
if (!ssz.IsDigit()) {
Error("GetFile", "received non-digit size string: '%s' ('%s')", os->GetName(), ssz.Data());
close(fdout);
return rc;
}
Long64_t size = ssz.Atoll();
if (size <= 0) {
Error("GetFile", "received null or negative size: %lld", size);
close(fdout);
return rc;
}
const Int_t kMAXBUF = 16384;
char buf[kMAXBUF];
rc = 0;
Int_t rec, r;
Long64_t filesize = 0, left = 0;
while (rc == 0 && filesize < size) {
left = size - filesize;
if (left > kMAXBUF) left = kMAXBUF;
rec = fSocket->RecvRaw(&buf, left);
filesize = (rec > 0) ? (filesize + rec) : filesize;
if (rec > 0) {
char *p = buf;
r = rec;
while (r) {
Int_t w = 0;
while ((w = write(fdout, p, r)) < 0 && TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (w < 0) {
SysError("GetFile", "error writing to unit: %d", fdout);
rc = -1;
break;
}
r -= w;
p += w;
}
CpProgress("GetFile", filesize, size, &watch);
} else if (rec < 0) {
rc = -1;
Error("GetFile", "error during receiving file");
break;
}
}
CpProgress("GetFile", filesize, size, &watch, kTRUE);
} else {
Error("GetFile", "size not received");
rc = -1;
}
gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
close(fdout);
watch.Stop();
watch.Reset();
if (rc == 0) {
TMD5 *md5loc = TMD5::FileChecksum(fileloc);
if (!md5loc) {
Error("GetFile", "cannot get MD5 checksum of the new local file '%s'", fileloc.Data());
rc = -1;
} else if (remsum != md5loc->AsString()) {
Error("GetFile", "checksums for the local copy and the remote file differ: {rem:%s,loc:%s}",
remsum.Data(), md5loc->AsString());
rc = -1;
delete md5loc;
}
}
return rc;
}
Int_t TXProofMgr::PutFile(const char *local, const char *remote, const char *opt)
{
Int_t rc = -1;
if (!IsValid()) {
Error("PutFile", "invalid TXProofMgr - do nothing");
return rc;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("PutFile", "functionality not supported by server");
return rc;
}
TString fileloc(local);
if (fileloc.IsNull()) {
Error("PutFile", "local file path undefined");
return rc;
}
gSystem->ExpandPathName(fileloc);
TString oo(opt);
oo.ToUpper();
Bool_t force = (oo == "FORCE") ? kTRUE : kFALSE;
TString filerem(remote);
if (filerem.IsNull()) {
filerem.Form("~/%s", gSystem->BaseName(fileloc));
} else if (filerem.EndsWith("/")) {
filerem += gSystem->BaseName(fileloc);
}
#ifdef WIN32
UInt_t openflags = O_RDONLY | O_BINARY;
#else
UInt_t openflags = O_RDONLY;
#endif
Int_t rcloc = 0;
FileStat_t stloc;
if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) != 0 || !R_ISREG(stloc.fMode)) {
const char *why = (rcloc == 0) ? "is not regular" : "does not exists";
Printf("[PutFile] local file '%s' %s: cannot continue", fileloc.Data(), why);
return rc;
}
UserGroup_t *ugloc = 0;
if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
Error("PutFile", "cannot get user info for additional checks");
return rc;
}
Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
delete ugloc;
if ((owner && !(stloc.fMode & kS_IRUSR)) ||
(group && !(stloc.fMode & kS_IRGRP)) || (other && !(stloc.fMode & kS_IROTH))) {
Printf("[PutFile] file '%s': no permission to read the file", fileloc.Data());
Printf("[PutFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
Printf("[PutFile] mode: %x", stloc.fMode);
return rc;
}
TString locsum;
TMD5 *md5loc = TMD5::FileChecksum(fileloc);
if (!md5loc) {
Error("PutFile", "cannot calculate the check sum for '%s'", fileloc.Data());
return rc;
} else {
locsum = md5loc->AsString();
delete md5loc;
}
Bool_t same = kFALSE;
FileStat_t strem;
TString remsum;
if (Stat(filerem, strem) == 0) {
if (Md5sum(filerem, remsum) != 0) {
Printf("[PutFile] remote file exists but the check sum calculation failed");
return rc;
}
if (remsum == locsum) {
if (!force) {
Printf("[PutFile] local file '%s' and remote file '%s' have the same MD5 check sum",
fileloc.Data(), filerem.Data());
Printf("[PutFile] use option 'force' to override");
}
same = kTRUE;
}
if (!force) {
if (!same) {
const char *a = Getline("Remote file exists already: would you like to overwrite it? [N/y]");
if (a[0] == 'n' || a[0] == 'N' || a[0] == '\0') return 0;
force = kTRUE;
} else {
return 0;
}
}
}
int fd = open(fileloc.Data(), openflags);
if (fd < 0) {
Error("PutFile", "cannot open file '%s': %d", fileloc.Data(), errno);
return -1;
}
TString cmd;
cmd.Form("%s %lld", filerem.Data(), stloc.fSize);
if (force) cmd += " force";
gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
TStopwatch watch;
watch.Start();
TObjString *os = fSocket->SendCoordinator(kPutFile, cmd.Data());
if (os) {
const Int_t kMAXBUF = 16384;
char buf[kMAXBUF];
Long64_t pos = 0;
lseek(fd, pos, SEEK_SET);
rc = 0;
while (rc == 0 && pos < stloc.fSize) {
Long64_t left = stloc.fSize - pos;
if (left > kMAXBUF) left = kMAXBUF;
Int_t siz;
while ((siz = read(fd, &buf[0], left)) < 0 && TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (siz < 0 || siz != left) {
Error("PutFile", "error reading from file: errno: %d", errno);
rc = -1;
break;
}
Int_t src = 0;
if ((src = fSocket->fConn->WriteRaw((void *)&buf[0], left)) != left) {
Error("PutFile", "error sending over: errno: %d (rc: %d)", TSystem::GetErrno(), src);
rc = -1;
break;
}
CpProgress("PutFile", pos, stloc.fSize, &watch);
pos += left;
}
CpProgress("PutFile", pos, stloc.fSize, &watch, kTRUE);
} else {
Error("PutFile", "command could not be executed");
rc = -1;
}
gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
close(fd);
watch.Stop();
watch.Reset();
if (rc == 0) {
if (Md5sum(filerem, remsum) != 0) {
Printf("[PutFile] cannot get MD5 checksum of the new remote file '%s'", filerem.Data());
rc = -1;
} else if (remsum != locsum) {
Printf("[PutFile] checksums for the local copy and the remote file differ: {rem:%s, loc:%s}",
remsum.Data(), locsum.Data());
rc = -1;
}
}
return rc;
}
void TXProofMgr::CpProgress(const char *pfx, Long64_t bytes,
Long64_t size, TStopwatch *watch, Bool_t cr)
{
if (!pfx || size == 0 || !watch) return;
fprintf(stderr, "[%s] Total %.02f MB\t|", pfx, (Double_t)size/1048576);
for (int l = 0; l < 20; l++) {
if (size > 0) {
if (l < 20*bytes/size)
fprintf(stderr, "=");
else if (l == 20*bytes/size)
fprintf(stderr, ">");
else if (l > 20*bytes/size)
fprintf(stderr, ".");
} else
fprintf(stderr, "=");
}
gSystem->ProcessEvents();
watch->Stop();
Double_t copytime = watch->RealTime();
fprintf(stderr, "| %.02f %% [%.01f MB/s]\r",
100.0*(size?(bytes/size):1), bytes/copytime/1048576.);
if (cr) fprintf(stderr, "\n");
watch->Continue();
}
Int_t TXProofMgr::Cp(const char *src, const char *dst, const char *fmt)
{
Int_t rc = -1;
if (!IsValid()) {
Error("Cp", "invalid TXProofMgr - do nothing");
return rc;
}
if (fSocket->GetXrdProofdVersion() < 1006) {
Error("Cp", "functionality not supported by server");
return rc;
}
TString filesrc(src);
if (filesrc.IsNull()) {
Error("Cp", "source file path undefined");
return rc;
}
TString filedst(dst);
if (filedst.IsNull()) {
filedst = gSystem->BaseName(TUrl(filesrc.Data()).GetFile());
} else if (filedst.EndsWith("/")) {
filedst += gSystem->BaseName(filesrc);
}
TUrl usrc = TUrl(filesrc.Data(), kTRUE).GetUrl();
filesrc = usrc.GetUrl();
if (!strcmp(usrc.GetProtocol(), "file"))
filesrc.Form("file://host/%s", usrc.GetFileAndOptions());
TUrl udst = TUrl(filedst.Data(), kTRUE).GetUrl();
filedst = udst.GetUrl();
if (!strcmp(udst.GetProtocol(), "file"))
filedst.Form("file://host/%s", udst.GetFileAndOptions());
TString cmd;
cmd.Form("%s %s %s", filesrc.Data(), filedst.Data(), (fmt ? fmt : ""));
if (fIntHandler) fIntHandler->Add();
TObjString *os = fSocket->SendCoordinator(kCpFile, cmd.Data());
if (fIntHandler) fIntHandler->Remove();
if (os) {
if (gDebug > 0) Printf("%s", os->GetName());
rc = 0;
}
return rc;
}