#include "RConfigure.h"
#include "RConfig.h"
#include "Riostream.h"
#ifdef WIN32
#include <io.h>
typedef long off_t;
#endif
#include <sys/types.h>
#include <netinet/in.h>
#include "TXProofServ.h"
#include "TObjString.h"
#include "TEnv.h"
#include "TError.h"
#include "TException.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofQueryResult.h"
#include "TRegexp.h"
#include "TClass.h"
#include "TROOT.h"
#include "TSystem.h"
#include "TPluginManager.h"
#include "TXSocketHandler.h"
#include "TXUnixSocket.h"
#include "compiledata.h"
#include "TProofResourcesStatic.h"
#include "TProofNodeInfo.h"
#include "XProofProtocol.h"
#include <XrdClient/XrdClientConst.hh>
#include <XrdClient/XrdClientEnv.hh>
static volatile Int_t gProofServDebug = 1;
class TXProofServInterruptHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServInterruptHandler(TXProofServ *s)
: TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServInterruptHandler::Notify()
{
fServ->HandleUrgentData();
if (TROOT::Initialized()) {
Throw(GetSignal());
}
return kTRUE;
}
class TXProofServSigPipeHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
{ fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServSigPipeHandler::Notify()
{
fServ->HandleSigPipe();
return kTRUE;
}
class TXProofServTerminationHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServTerminationHandler(TXProofServ *s)
: TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServTerminationHandler::Notify()
{
Printf("TXProofServTerminationHandler::Notify: wake up!");
fServ->HandleTermination();
return kTRUE;
}
class TXProofServSegViolationHandler : public TSignalHandler {
TXProofServ *fServ;
public:
TXProofServSegViolationHandler(TXProofServ *s)
: TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TXProofServSegViolationHandler::Notify()
{
Printf("**** ");
Printf("**** Segmentation violation: terminating ****");
Printf("**** ");
fServ->HandleTermination();
return kTRUE;
}
class TXProofServInputHandler : public TFileHandler {
TXProofServ *fServ;
public:
TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
{ fServ = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
Bool_t TXProofServInputHandler::Notify()
{
fServ->HandleSocketInput();
((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
return kTRUE;
}
ClassImp(TXProofServ)
extern "C" {
TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
{ return new TXProofServ(argc, argv, flog); }
}
TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
: TProofServ(argc, argv, flog)
{
fInterruptHandler = 0;
fInputHandler = 0;
fTerminated = kFALSE;
fShutdownTimerMtx = new TMutex(kTRUE);
}
Int_t TXProofServ::CreateServer()
{
Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
if (gProofDebugLevel > 0)
Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
if (fLogFile) {
if ((fLogFileDes = fileno(fLogFile)) < 0) {
Error("CreateServer", "resolving the log file description number");
return -1;
}
}
TXSocket::fgLoc = (IsMaster()) ? "master" : "slave" ;
EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
if (xtest) {
if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
Error("CreateServer", "Socket setup by xpd undefined");
return -1;
}
Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
int proto = htonl(kPROOF_Protocol);
fSockPath = "";
if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
Error("CreateServer", "test: sending protocol number");
return -1;
}
exit(0);
} else {
fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
if (fSockPath.Length() <= 0) {
Error("CreateServer", "Socket setup by xpd undefined");
return -1;
}
TString entity = gEnv->GetValue("ProofServ.Entity", "");
if (entity.Length() > 0)
fSockPath.Insert(0,Form("%s/", entity.Data()));
}
Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
if (psid < 0) {
Error("CreateServer", "Session ID undefined");
return -1;
}
fSocket = new TXUnixSocket(fSockPath, psid, -1, this);
if (!fSocket || !(fSocket->IsValid())) {
Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
return -1;
}
((TXSocket *)fSocket)->fReference = this;
Int_t sock = fSocket->GetDescriptor();
fInterruptHandler = new TXProofServInterruptHandler(this);
gSystem->AddSignalHandler(fInterruptHandler);
fInputHandler =
TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
gSystem->AddFileHandler(fInputHandler);
Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
if (cid < 0) {
Error("CreateServer", "Client ID undefined");
SendLogFile();
return -1;
}
((TXSocket *)fSocket)->SetClientID(cid);
if (IsMaster()) {
if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
while (gProofServDebug)
;
}
} else {
if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
while (gProofServDebug)
;
}
}
if (gProofDebugLevel > 0)
Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
if (Setup() == -1) {
Terminate(0);
SendLogFile();
return -1;
}
if (!fLogFile) {
RedirectOutput();
if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
Terminate(0);
SendLogFile(-98);
return -1;
}
}
if (IsMaster()) {
if (CatMotd() == -1) {
Terminate(0);
SendLogFile(-99);
return -1;
}
}
ProcessLine("#include <iostream>", kTRUE);
ProcessLine("#include <_string>",kTRUE);
ProcessLine("#include <RtypesCint.h>", kTRUE);
ProcessLine("#define ROOT_Rtypes 0", kTRUE);
ProcessLine("#define ROOT_TError 0", kTRUE);
ProcessLine("#define ROOT_TGenericClassInfo 0", kTRUE);
const char *logon;
logon = gEnv->GetValue("Proof.Load", (char *)0);
if (logon) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessLine(Form(".L %s", logon), kTRUE);
delete [] mac;
}
logon = gEnv->GetValue("Proof.Logon", (char *)0);
if (logon && !NoLogOpt()) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessFile(logon);
delete [] mac;
}
gInterpreter->SaveContext();
gInterpreter->SaveGlobalsContext();
if (IsMaster()) {
TString master = Form("proof://%s@__master__", fUser.Data());
Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
if (port > -1) {
master += ":";
master += port;
}
gEnv->SetValue("Proof.ParallelStartup", 0);
TPluginManager *pm = gROOT->GetPluginManager();
if (!pm) {
Error("CreateServer", "no plugin manager found");
SendLogFile(-99);
Terminate(0);
return -1;
}
TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
if (!h) {
Error("CreateServer", "no plugin found for TProof with a"
" config file of '%s'", fConfFile.Data());
SendLogFile(-99);
Terminate(0);
return -1;
}
if (h->LoadPlugin() == -1) {
Error("CreateServer", "plugin for TProof could not be loaded");
SendLogFile(-99);
Terminate(0);
return -1;
}
fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
fConfFile.Data(),
fConfDir.Data(),
fLogLevel,
fSessionTag.Data()));
if (!fProof || !fProof->IsValid()) {
Error("CreateServer", "plugin for TProof could not be executed");
delete fProof;
fProof = 0;
SendLogFile(-99);
Terminate(0);
return -1;
}
fEndMaster = fProof->IsEndMaster();
fProof->SaveWorkerInfo();
SendLogFile();
}
return 0;
}
TXProofServ::~TXProofServ()
{
delete fSocket;
}
void TXProofServ::HandleUrgentData()
{
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt();
if (iLev < 0) {
Error("HandleUrgentData", "error receiving interrupt");
return;
}
PDB(kGlobal, 5)
Info("HandleUrgentData", "got interrupt: %d\n", iLev);
if (fProof)
fProof->SetActive();
switch (iLev) {
case TProof::kPing:
PDB(kGlobal, 5)
Info("HandleUrgentData", "*** Ping");
if (IsMaster()) {
Int_t nbad = fProof->fActiveSlaves->GetSize()-fProof->Ping();
if (nbad > 0) {
Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
}
}
((TXSocket *)fSocket)->Ping();
if (IsMaster())
SendLogFile();
break;
case TProof::kHardInterrupt:
Info("HandleUrgentData", "*** Hard Interrupt");
if (IsMaster())
fProof->Interrupt(TProof::kHardInterrupt);
((TXSocket *)fSocket)->Flush();
if (IsMaster())
SendLogFile();
break;
case TProof::kSoftInterrupt:
Info("HandleUrgentData", "Soft Interrupt");
if (IsMaster())
fProof->Interrupt(TProof::kSoftInterrupt);
Interrupt();
if (IsMaster())
SendLogFile();
break;
case TProof::kShutdownInterrupt:
Info("HandleUrgentData", "Shutdown Interrupt");
HandleTermination();
break;
default:
Error("HandleUrgentData", "unexpected type");
break;
}
if (fProof) fProof->SetActive(kFALSE);
}
void TXProofServ::HandleSigPipe()
{
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
if (IsMaster())
fProof->Close("S");
Terminate(0);
}
void TXProofServ::HandleTermination()
{
if (IsMaster()) {
if (!fIdle) {
fWaitingQueries->Delete();
fProof->InterruptCurrentMonitor();
Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
timeout = (timeout > 20) ? timeout : 20;
fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
fProof->Collect(TProof::kActive, timeout);
if (!fIdle)
Warning("HandleTermination","processing could not be stopped");
}
if (fProof)
fProof->Close("S");
}
Terminate(0);
}
Int_t TXProofServ::Setup()
{
char str[512];
if (IsMaster()) {
sprintf(str, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
} else {
sprintf(str, "**** PROOF worker server @ %s started ****", gSystem->HostName());
}
if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
Error("Setup", "failed to send proof server startup message");
return -1;
}
if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
Error("Setup", "remote proof protocol missing");
return -1;
}
fUser = gEnv->GetValue("ProofServ.Entity", "");
if (fUser.Length() >= 0) {
if (fUser.Contains(":"))
fUser.Remove(fUser.Index(":"));
if (fUser.Contains("@"))
fUser.Remove(fUser.Index("@"));
} else {
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUser = pw->fUser;
delete pw;
}
}
if (IsMaster()) {
TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
if (cf.Length() > 0)
fConfFile = cf;
}
fWorkDir = gEnv->GetValue("ProofServ.Sandbox", kPROOF_WorkDir);
if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
Error("Setup", "Session tag missing");
return -1;
}
if (gProofDebugLevel > 0)
Info("Setup", "session tag is %s", fSessionTag.Data());
if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
Error("Setup", "Session dir missing");
return -1;
}
char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
fWorkDir = workdir;
delete [] workdir;
if (gProofDebugLevel > 0)
Info("Setup", "working directory set to %s", fWorkDir.Data());
if (SetupCommon() != 0) {
Error("Setup", "common setup failed");
return -1;
}
gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
return 0;
}
void TXProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
{
fflush(stdout);
off_t ltot, lnow;
Int_t left;
ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
Bool_t adhoc = kFALSE;
if (start > -1) {
lseek(fLogFileDes, (off_t) start, SEEK_SET);
if (end <= start || end > ltot)
end = ltot;
left = (Int_t)(end - start);
if (end < ltot)
left++;
adhoc = kTRUE;
} else {
left = (Int_t)(ltot - lnow);
}
if (left > 0) {
fSocket->Send(left, kPROOF_LOGFILE);
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
Int_t len;
do {
while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendLogFile", "error reading log file");
break;
}
if (end == ltot && len == wanted)
buf[len-1] = '\n';
if (fSocket->SendRaw(buf, len, kDontBlock) < 0) {
SysError("SendLogFile", "error sending log file");
break;
}
left -= len;
wanted = (left > kMAXBUF) ? kMAXBUF : left;
} while (len > 0 && left > 0);
}
if (adhoc)
lseek(fLogFileDes, lnow, SEEK_SET);
TMessage mess(kPROOF_LOGDONE);
if (IsMaster())
mess << status << (fProof ? fProof->GetParallel() : 0);
else
mess << status << (Int_t) 1;
fSocket->Send(mess);
}
TProofServ::EQueryAction TXProofServ::GetWorkers(TList *workers,
Int_t & )
{
if (!workers) {
Error("GetWorkers", "output list undefined");
return kQueryStop;
}
if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
Int_t pc = 1;
TProofServ::EQueryAction rc = TProofServ::GetWorkers(workers, pc);
if (rc == kQueryOK)
return rc;
}
TObjString *os = ((TXSocket *)fSocket)->SendCoordinator(TXSocket::kGetWorkers);
if (os) {
TString fl(os->GetName());
TString tok;
Ssiz_t from = 0;
if (fl.Tokenize(tok, from, "&")) {
if (!tok.IsNull()) {
TProofNodeInfo *master = new TProofNodeInfo(tok);
if (!master) {
Error("GetWorkers", "no appropriate master line got from coordinator");
return kQueryStop;
} else {
if (fImage.IsNull() && strlen(master->GetImage()) > 0)
fImage = master->GetImage();
SafeDelete(master);
}
while (fl.Tokenize(tok, from, "&")) {
if (!tok.IsNull())
workers->Add(new TProofNodeInfo(tok));
}
}
}
}
return kQueryOK;
}
Bool_t TXProofServ::HandleError(const void *)
{
Printf("TXProofServ::HandleError: %p: got called ...", this);
if (IsMaster())
fProof->Close("S");
((TXSocket *)fSocket)->SetSessionID(-1);
Terminate(0);
Printf("TXProofServ::HandleError: %p: DONE ... ", this);
return kTRUE;
}
Bool_t TXProofServ::HandleInput(const void *in)
{
if (gDebug > 2)
Printf("TXProofServ::HandleInput %p, in: %p", this, in);
XHandleIn_t *hin = (XHandleIn_t *) in;
Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
if (acod == kXPD_ping || acod == kXPD_interrupt) {
HandleUrgentData();
} else if (acod == kXPD_timer) {
fShutdownWhenIdle = (hin->fInt2 == 2) ? kFALSE : kTRUE;
if (hin->fInt2 > 0)
SetShutdownTimer(kTRUE, hin->fInt3);
else
SetShutdownTimer(kFALSE);
} else if (acod == kXPD_flush) {
Info("HandleInput","kXPD_flush: flushing log file (stdout)");
fflush(stdout);
} else if (acod == kXPD_urgent) {
Int_t type = hin->fInt2;
switch (type) {
case TXSocket::kStopProcess:
{
Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
Int_t timeout = hin->fInt4;
if (fProof)
fProof->StopProcess(abort, timeout);
else
if (fPlayer)
fPlayer->StopProcess(abort, timeout);
}
break;
default:
Info("HandleInput","kXPD_urgent: unknown type: %d", type);
}
} else if (acod == kXPD_inflate) {
fInflateFactor = (hin->fInt2 >= 1000) ? hin->fInt2 : fInflateFactor;
Info("HandleInput", "kXPD_inflate: inflate factor set to %f",
(Float_t) fInflateFactor / 1000.);
} else if (acod == kXPD_priority) {
fGroupPriority = hin->fInt2;
if (fProof)
fProof->BroadcastGroupPriority(fGroup, fGroupPriority);
Info("HandleInput", "kXPD_priority: group %s priority set to %f",
fGroup.Data(), (Float_t) fGroupPriority / 100.);
} else {
HandleSocketInput();
((TXSocket *)fSocket)->RemoveClientID();
}
return kTRUE;
}
void TXProofServ::DisableTimeout()
{
if (fSocket)
((TXSocket *)fSocket)->DisableTimeout();
}
void TXProofServ::EnableTimeout()
{
if (fSocket)
((TXSocket *)fSocket)->EnableTimeout();
}
void TXProofServ::Terminate(Int_t status)
{
if (fTerminated)
exit(1);
fTerminated = kTRUE;
Info("Terminate", "starting session termination operations ...");
if (fProof)
fProof->SetMonitor(0, kFALSE);
if (status == 0) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fSessionDir+"/.delete");
gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
}
if (IsMaster()) {
if (!(fQueries->GetSize())) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fQueryDir+"/.delete");
gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
if (fQueryLock)
gSystem->Unlink(fQueryLock->GetName());
}
if (fQueryLock)
fQueryLock->Unlock();
}
gSystem->RemoveFileHandler(fInputHandler);
gSystem->RemoveSignalHandler(fInterruptHandler);
gSystem->ExitLoop();
TXSocket::PostPipe((TXSocket *)fSocket);
((TXSocket *)fSocket)->SetSessionID(-1);
Printf("Terminate: termination operations ended: quitting!");
}
Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
{
if (strstr(sessiontag, fSessionTag))
return 0;
if (!lck) {
Info("LockSession","locker space undefined");
return -1;
}
*lck = 0;
TString stag = sessiontag;
TRegexp re("session-.*-.*-.*");
Int_t i1 = stag.Index(re);
if (i1 == kNPOS) {
Info("LockSession","bad format: %s", sessiontag);
return -1;
}
stag.ReplaceAll("session-","");
Int_t i2 = stag.Index(":q");
if (i2 != kNPOS)
stag.Remove(i2);
TString parlog = fSessionDir;
parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
parlog += stag;
if (!gSystem->AccessPathName(parlog)) {
Info("LockSession","parent still running: do nothing");
return -1;
}
TString qlock = fQueryLock->GetName();
qlock.ReplaceAll(fSessionTag, stag);
if (!gSystem->AccessPathName(qlock)) {
*lck = new TProofLockPath(qlock);
if (((*lck)->Lock()) < 0) {
Info("LockSession","problems locking query lock file");
SafeDelete(*lck);
return -1;
}
}
return 0;
}
void TXProofServ::SetShutdownTimer(Bool_t on, Int_t delay)
{
R__LOCKGUARD(fShutdownTimerMtx);
if (delay < 0 && !fShutdownTimer)
return;
if (delay > 864000) {
Warning("SetShutdownTimer",
"abnormous delay value (%d): corruption? setting to 0", delay);
delay = 1;
}
Int_t del = (delay <= 0) ? 10 : delay * 1000;
if (on) {
if (!fShutdownTimer) {
fShutdownTimer = new TShutdownTimer(this, del);
if (!fShutdownWhenIdle || fIdle)
fShutdownTimer->Start(-1, kTRUE);
} else {
fShutdownTimer->Start(-1, kTRUE);
}
Info("SetShutdownTimer",
"session will be shutdown in %d seconds (%d millisec)", delay, del);
} else {
if (fShutdownTimer) {
SafeDelete(fShutdownTimer);
Info("SetShutdownTimer", "shutdown countdown timer stopped: resuming session");
} else {
Info("SetShutdownTimer", "shutdown countdown timer never started - do nothing");
}
}
FlushLogFile();
}
Last update: Thu Jan 17 09:05:34 2008
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.