#include "RConfigure.h"
#include "RConfig.h"
#include "Riostream.h"
#ifdef WIN32
#include <io.h>
typedef long off_t;
#endif
#include <sys/types.h>
#include <netinet/in.h>
#include <utime.h>
#include "TXProofServ.h"
#include "TObjString.h"
#include "TEnv.h"
#include "TError.h"
#include "TException.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TQueryResultManager.h"
#include "TRegexp.h"
#include "TClass.h"
#include "TROOT.h"
#include "TSystem.h"
#include "TPluginManager.h"
#include "TXSocketHandler.h"
#include "TXUnixSocket.h"
#include "compiledata.h"
#include "TProofNodeInfo.h"
#include "XProofProtocol.h"
#include <XrdClient/XrdClientConst.hh>
#include <XrdClient/XrdClientEnv.hh>
static volatile Int_t gProofServDebug = 1;
class TXProofServSigPipeHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
{ fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServSigPipeHandler::Notify()
{
fServ->HandleSigPipe();
return kTRUE;
}
class TXProofServTerminationHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServTerminationHandler(TXProofServ *s)
: TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServTerminationHandler::Notify()
{
Printf("Received SIGTERM: terminating");
fServ->HandleTermination();
return kTRUE;
}
class TXProofServSegViolationHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServSegViolationHandler(TXProofServ *s)
: TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServSegViolationHandler::Notify()
{
Printf("**** ");
Printf("**** Segmentation violation: terminating ****");
Printf("**** ");
fServ->HandleTermination();
return kTRUE;
}
class TXProofServInputHandler : public TFileHandler {
TXProofServ *fServ;
public:
TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
{ fServ = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
Bool_t TXProofServInputHandler::Notify()
{
fServ->HandleSocketInput();
((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
return kTRUE;
}
ClassImp(TXProofServ)
extern "C" {
TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
{ return new TXProofServ(argc, argv, flog); }
}
TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
: TProofServ(argc, argv, flog)
{
fInterruptHandler = 0;
fInputHandler = 0;
fTerminated = kFALSE;
}
Int_t TXProofServ::CreateServer()
{
Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
if (gProofDebugLevel > 0)
Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
if (fLogFile) {
if ((fLogFileDes = fileno(fLogFile)) < 0) {
Error("CreateServer", "resolving the log file description number");
return -1;
}
if (gProofDebugLevel <= 0)
lseek(fLogFileDes, (off_t) 0, SEEK_END);
}
TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
if (xtest) {
if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
Error("CreateServer", "Socket setup by xpd undefined");
return -1;
}
Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
int proto = htonl(kPROOF_Protocol);
fSockPath = "";
if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
Error("CreateServer", "test: sending protocol number");
return -1;
}
exit(0);
} else {
fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
if (fSockPath.Length() <= 0) {
Error("CreateServer", "Socket setup by xpd undefined");
return -1;
}
TString entity = gEnv->GetValue("ProofServ.Entity", "");
if (entity.Length() > 0)
fSockPath.Insert(0,Form("%s/", entity.Data()));
}
Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
if (psid < 0) {
Error("CreateServer", "Session ID undefined");
return -1;
}
fSocket = new TXUnixSocket(fSockPath, psid, -1, this);
if (!fSocket || !(fSocket->IsValid())) {
Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
return -1;
}
fSocket->SetCompressionLevel(fCompressMsg);
TString tgt("client");
if (fOrdinal != "0") {
tgt = fOrdinal;
if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
}
fSocket->SetTitle(tgt);
((TXSocket *)fSocket)->fReference = this;
Int_t sock = fSocket->GetDescriptor();
fInputHandler =
TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
gSystem->AddFileHandler(fInputHandler);
Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
if (cid < 0) {
Error("CreateServer", "Client ID undefined");
SendLogFile();
return -1;
}
((TXSocket *)fSocket)->SetClientID(cid);
if (IsMaster()) {
if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
while (gProofServDebug)
;
}
} else {
if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
while (gProofServDebug)
;
}
}
if (gProofDebugLevel > 0)
Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
if (Setup() == -1) {
LogToMaster();
SendLogFile();
Terminate(0);
return -1;
}
if (!fLogFile) {
RedirectOutput();
if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
LogToMaster();
SendLogFile(-98);
Terminate(0);
return -1;
}
}
if (IsMaster()) {
if (CatMotd() == -1) {
LogToMaster();
SendLogFile(-99);
Terminate(0);
return -1;
}
}
ProcessLine("#include <iostream>", kTRUE);
ProcessLine("#include <_string>",kTRUE);
ProcessLine("#include <RtypesCint.h>", kTRUE);
ProcessLine("#define ROOT_Rtypes 0", kTRUE);
ProcessLine("#define ROOT_TError 0", kTRUE);
ProcessLine("#define ROOT_TGenericClassInfo 0", kTRUE);
const char *logon;
logon = gEnv->GetValue("Proof.Load", (char *)0);
if (logon) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessLine(Form(".L %s", logon), kTRUE);
delete [] mac;
}
logon = gEnv->GetValue("Proof.Logon", (char *)0);
if (logon && !NoLogOpt()) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessFile(logon);
delete [] mac;
}
gInterpreter->SaveContext();
gInterpreter->SaveGlobalsContext();
if (IsMaster()) {
TString master = Form("proof://%s@__master__", fUser.Data());
Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
if (port > -1) {
master += ":";
master += port;
}
gEnv->SetValue("Proof.ParallelStartup", 0);
TPluginManager *pm = gROOT->GetPluginManager();
if (!pm) {
Error("CreateServer", "no plugin manager found");
SendLogFile(-99);
Terminate(0);
return -1;
}
TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
if (!h) {
Error("CreateServer", "no plugin found for TProof with a"
" config file of '%s'", fConfFile.Data());
SendLogFile(-99);
Terminate(0);
return -1;
}
if (h->LoadPlugin() == -1) {
Error("CreateServer", "plugin for TProof could not be loaded");
SendLogFile(-99);
Terminate(0);
return -1;
}
fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
fConfFile.Data(),
fConfDir.Data(),
fLogLevel,
fTopSessionTag.Data()));
if (!fProof || !fProof->IsValid()) {
Error("CreateServer", "plugin for TProof could not be executed");
FlushLogFile();
delete fProof;
fProof = 0;
SendLogFile(-99);
Terminate(0);
return -1;
}
fEndMaster = fProof->IsEndMaster();
fProof->SaveWorkerInfo();
SendLogFile();
}
if (!fShutdownTimer) {
fShutdownTimer = new TShutdownTimer(this, 300000);
fShutdownTimer->Start(-1, kFALSE);
}
if (fProtocol <= 17) {
TString msg;
msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
" This may generate compatibility problems between streamed objects.\n"
" The advise is to move to ROOT >= 5.21/02 .");
SendAsynMessage(msg.Data());
}
return 0;
}
TXProofServ::~TXProofServ()
{
delete fSocket;
}
void TXProofServ::HandleUrgentData()
{
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
Bool_t fw = kFALSE;
Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
if (iLev < 0) {
Error("HandleUrgentData", "error receiving interrupt");
return;
}
PDB(kGlobal, 5)
Info("HandleUrgentData", "got interrupt: %d\n", iLev);
if (fProof)
fProof->SetActive();
switch (iLev) {
case TProof::kPing:
PDB(kGlobal, 5)
Info("HandleUrgentData", "*** Ping");
if (fw && IsMaster()) {
Int_t nbad = fProof->fActiveSlaves->GetSize() - fProof->Ping();
if (nbad > 0) {
Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
}
}
if (fAdminPath.IsNull()) {
fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
TString spid = Form(".%d", getpid());
if (!fAdminPath.IsNull() && !fAdminPath.EndsWith(spid))
fAdminPath += spid;
}
if (!fAdminPath.IsNull()) {
if (utime(fAdminPath.Data(), 0) != 0)
Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
else
if (gDebug > 0)
Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
} else {
Info("HandleUrgentData", "admin path undefined");
}
break;
case TProof::kHardInterrupt:
Info("HandleUrgentData", "*** Hard Interrupt");
if (fw && IsMaster())
fProof->Interrupt(TProof::kHardInterrupt);
((TXSocket *)fSocket)->Flush();
if (IsMaster())
SendLogFile();
break;
case TProof::kSoftInterrupt:
Info("HandleUrgentData", "Soft Interrupt");
if (fw && IsMaster())
fProof->Interrupt(TProof::kSoftInterrupt);
Interrupt();
if (IsMaster())
SendLogFile();
break;
case TProof::kShutdownInterrupt:
Info("HandleUrgentData", "Shutdown Interrupt");
HandleTermination();
break;
default:
Error("HandleUrgentData", "unexpected type: %d", iLev);
break;
}
if (fProof) fProof->SetActive(kFALSE);
}
void TXProofServ::HandleSigPipe()
{
Info("HandleSigPipe","got sigpipe ... do nothing");
}
void TXProofServ::HandleTermination()
{
if (IsMaster()) {
if (!fIdle) {
fWaitingQueries->Delete();
fProof->InterruptCurrentMonitor();
Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
timeout = (timeout > 20) ? timeout : 20;
fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
fProof->Collect(TProof::kActive, timeout);
if (!fIdle)
Warning("HandleTermination","processing could not be stopped");
}
if (fProof)
fProof->Close("S");
}
Terminate(0);
}
Int_t TXProofServ::Setup()
{
char str[512];
if (IsMaster()) {
sprintf(str, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
} else {
sprintf(str, "**** PROOF worker server @ %s started ****", gSystem->HostName());
}
if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
Error("Setup", "failed to send proof server startup message");
return -1;
}
if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
Error("Setup", "remote proof protocol missing");
return -1;
}
fUser = gEnv->GetValue("ProofServ.Entity", "");
if (fUser.Length() >= 0) {
if (fUser.Contains(":"))
fUser.Remove(fUser.Index(":"));
if (fUser.Contains("@"))
fUser.Remove(fUser.Index("@"));
} else {
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUser = pw->fUser;
delete pw;
}
}
if (IsMaster()) {
TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
if (cf.Length() > 0)
fConfFile = cf;
}
fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
if ((fTopSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
Error("Setup", "Session tag missing");
return -1;
}
fSessionTag = fTopSessionTag;
TString spid = Form("-%d", gSystem->GetPid());
if (!fSessionTag.EndsWith(spid)) {
Int_t nd = 0;
if ((nd = fSessionTag.CountChar('-')) >= 2) {
Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
if (id != kNPOS) fSessionTag.Remove(id);
} else if (nd != 1) {
Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
}
fSessionTag += spid;
}
if (gProofDebugLevel > 0)
Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
Error("Setup", "Session dir missing");
return -1;
}
char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
fWorkDir = workdir;
delete [] workdir;
if (gProofDebugLevel > 0)
Info("Setup", "working directory set to %s", fWorkDir.Data());
if (SetupCommon() != 0) {
Error("Setup", "common setup failed");
return -1;
}
fSocket->SetOption(kNoDelay, 1);
fSocket->SetOption(kKeepAlive, 1);
gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
if (gProofDebugLevel > 0)
Info("Setup", "successfully completed");
return 0;
}
TProofServ::EQueryAction TXProofServ::GetWorkers(TList *workers,
Int_t & ,
Bool_t resume)
{
TProofServ::EQueryAction rc = kQueryStop;
if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
Int_t pc = 1;
if ((rc = TProofServ::GetWorkers(workers, pc)) == kQueryOK)
return rc;
}
Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
if (!fWaitingQueries->IsEmpty()) {
if (resume) {
seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
} else {
seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
}
}
TObjString *os =
((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
if (os) {
TString fl(os->GetName());
if (fl.BeginsWith(XPD_GW_QueryEnqueued)) {
SendAsynMessage("+++ Query cannot be processed now: enqueued");
return kQueryEnqueued;
}
Int_t nwrks = -1;
if (gSystem->Getenv("PROOF_NWORKERS")) {
TString s(gSystem->Getenv("PROOF_NWORKERS"));
if (s.IsDigit()) {
nwrks = s.Atoi();
TString msg;
msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
SendAsynMessage(msg);
}
}
TString tok;
Ssiz_t from = 0;
if (fl.Tokenize(tok, from, "&")) {
if (!tok.IsNull()) {
TProofNodeInfo *master = new TProofNodeInfo(tok);
if (!master) {
Error("GetWorkers", "no appropriate master line got from coordinator");
return kQueryStop;
} else {
if (fImage.IsNull() && strlen(master->GetImage()) > 0)
fImage = master->GetImage();
SafeDelete(master);
}
while (fl.Tokenize(tok, from, "&") && (nwrks == -1 || nwrks > 0)) {
if (!tok.IsNull()) {
if (workers)
workers->Add(new TProofNodeInfo(tok));
rc = kQueryOK;
if (nwrks != -1) nwrks--;
}
}
}
}
}
return rc;
}
Bool_t TXProofServ::HandleError(const void *)
{
if (fSocket && !fSocket->IsValid()) {
fSocket->Reconnect();
if (fSocket && fSocket->IsValid()) {
if (gDebug > 0)
Info("HandleError",
"%p: connection to local coordinator re-established", this);
FlushLogFile();
return kFALSE;
}
}
Printf("TXProofServ::HandleError: %p: got called ...", this);
if (IsMaster())
fProof->Close("S");
((TXSocket *)fSocket)->SetSessionID(-1);
Terminate(0);
Printf("TXProofServ::HandleError: %p: DONE ... ", this);
return kTRUE;
}
Bool_t TXProofServ::HandleInput(const void *in)
{
if (gDebug > 2)
Printf("TXProofServ::HandleInput %p, in: %p", this, in);
XHandleIn_t *hin = (XHandleIn_t *) in;
Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
if (acod == kXPD_ping || acod == kXPD_interrupt) {
HandleUrgentData();
} else if (acod == kXPD_flush) {
Info("HandleInput","kXPD_flush: flushing log file (stdout)");
fflush(stdout);
} else if (acod == kXPD_urgent) {
Int_t type = hin->fInt2;
switch (type) {
case TXSocket::kStopProcess:
{
Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
Int_t timeout = hin->fInt4;
if (fProof)
fProof->StopProcess(abort, timeout);
else
if (fPlayer)
fPlayer->StopProcess(abort, timeout);
}
break;
default:
Info("HandleInput","kXPD_urgent: unknown type: %d", type);
}
} else if (acod == kXPD_inflate) {
fInflateFactor = (hin->fInt2 >= 1000) ? hin->fInt2 : fInflateFactor;
Info("HandleInput", "kXPD_inflate: inflate factor set to %f",
(Float_t) fInflateFactor / 1000.);
} else if (acod == kXPD_priority) {
fGroupPriority = hin->fInt2;
if (fProof)
fProof->BroadcastGroupPriority(fGroup, fGroupPriority);
Info("HandleInput", "kXPD_priority: group %s priority set to %f",
fGroup.Data(), (Float_t) fGroupPriority / 100.);
} else {
HandleSocketInput();
((TXSocket *)fSocket)->RemoveClientID();
}
return kTRUE;
}
void TXProofServ::DisableTimeout()
{
if (fSocket)
((TXSocket *)fSocket)->DisableTimeout();
}
void TXProofServ::EnableTimeout()
{
if (fSocket)
((TXSocket *)fSocket)->EnableTimeout();
}
void TXProofServ::Terminate(Int_t status)
{
if (fTerminated)
exit(1);
fTerminated = kTRUE;
Info("Terminate", "starting session termination operations ...");
ProcInfo_t pi;
if (!gSystem->GetProcInfo(&pi)){
Info("Terminate", "process memory footprint: %ld kB virtual, %ld kB resident ",
pi.fMemVirtual, pi.fMemResident);
if (fVirtMemHWM > 0 || fVirtMemMax > 0) {
Info("Terminate", "process virtual memory limits: %ld kB HWM, %ld kB max ",
fVirtMemHWM, fVirtMemMax);
}
}
if (fProof)
fProof->SetMonitor(0, kFALSE);
if (status == 0) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fSessionDir+"/.delete");
gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
}
if (IsMaster()) {
if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fQueryDir+"/.delete");
gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
if (fQueryLock)
gSystem->Unlink(fQueryLock->GetName());
}
if (fQueryLock)
fQueryLock->Unlock();
} else {
Bool_t abort = (status == 0) ? kFALSE : kTRUE;
if (!fIdle && fPlayer)
fPlayer->StopProcess(abort,1);
gSystem->Sleep(2000);
}
gSystem->RemoveFileHandler(fInputHandler);
gSystem->ExitLoop();
TXSocket::fgPipe.Post((TXSocket *)fSocket);
Printf("Terminate: termination operations ended: quitting!");
}
Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
{
if (strstr(sessiontag, fTopSessionTag))
return 0;
if (!lck) {
Info("LockSession","locker space undefined");
return -1;
}
*lck = 0;
TString stag = sessiontag;
TRegexp re("session-.*-.*-.*");
Int_t i1 = stag.Index(re);
if (i1 == kNPOS) {
Info("LockSession","bad format: %s", sessiontag);
return -1;
}
stag.ReplaceAll("session-","");
Int_t i2 = stag.Index(":q");
if (i2 != kNPOS)
stag.Remove(i2);
TString parlog = fSessionDir;
parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
parlog += stag;
if (!gSystem->AccessPathName(parlog)) {
Info("LockSession","parent still running: do nothing");
return -1;
}
TString qlock = fQueryLock->GetName();
qlock.ReplaceAll(fTopSessionTag, stag);
if (!gSystem->AccessPathName(qlock)) {
*lck = new TProofLockPath(qlock);
if (((*lck)->Lock()) < 0) {
Info("LockSession","problems locking query lock file");
SafeDelete(*lck);
return -1;
}
}
return 0;
}
void TXProofServ::ReleaseWorker(const char *ord)
{
Info("ReleaseWorker","releasing: %s", ord);
((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
}