#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#ifdef WIN32
# include <io.h>
# include <sys/stat.h>
# include <sys/types.h>
#else
# include <unistd.h>
#endif
#include <vector>
#include "RConfigure.h"
#include "Riostream.h"
#include "Getline.h"
#include "TBrowser.h"
#include "TChain.h"
#include "TCondor.h"
#include "TDSet.h"
#include "TError.h"
#include "TEnv.h"
#include "TEventList.h"
#include "TFile.h"
#include "TFileInfo.h"
#include "TFTP.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TKey.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TMutex.h"
#include "TObjArray.h"
#include "TObjString.h"
#include "TParameter.h"
#include "TProof.h"
#include "TProofNodeInfo.h"
#include "TVirtualProofPlayer.h"
#include "TVirtualPacketizer.h"
#include "TProofServ.h"
#include "TPluginManager.h"
#include "TQueryResult.h"
#include "TRandom.h"
#include "TRegexp.h"
#include "TROOT.h"
#include "TSemaphore.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TSortedList.h"
#include "TSystem.h"
#include "TThread.h"
#include "TTree.h"
#include "TUrl.h"
#include "TFileCollection.h"
#include "TDataSetManager.h"
TProof *gProof = 0;
TVirtualMutex *gProofMutex = 0;
char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
TList *TProof::fgProofEnvList = 0;
TPluginHandler *TProof::fgLogViewer = 0;
ClassImp(TProof)
Bool_t TProofInterruptHandler::Notify()
{
if (isatty(0) == 0 || isatty(1) == 0 || fProof->GetRemoteProtocol() < 22) {
fProof->StopProcess(kTRUE);
} else {
char *a = 0;
if (fProof->GetRemoteProtocol() < 22) {
a = Getline("\nSwith to asynchronous mode not supported remotely:"
"\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
} else {
a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
" any other key to continue: ");
}
if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
Info("Notify","Processing interrupt signal ... %c", a[0]);
Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
fProof->StopProcess(abort);
} else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
fProof->GoAsynchronous();
}
}
return kTRUE;
}
TProofInputHandler::TProofInputHandler(TProof *p, TSocket *s)
: TFileHandler(s->GetDescriptor(),1),
fSocket(s), fProof(p)
{
}
Bool_t TProofInputHandler::Notify()
{
fProof->CollectInputFrom(fSocket);
return kTRUE;
}
ClassImp(TSlaveInfo)
Int_t TSlaveInfo::Compare(const TObject *obj) const
{
if (!obj) return 1;
const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
if (!si) return fOrdinal.CompareTo(obj->GetName());
const char *myord = GetOrdinal();
const char *otherord = si->GetOrdinal();
while (myord && otherord) {
Int_t myval = atoi(myord);
Int_t otherval = atoi(otherord);
if (myval < otherval) return 1;
if (myval > otherval) return -1;
myord = strchr(myord, '.');
if (myord) myord++;
otherord = strchr(otherord, '.');
if (otherord) otherord++;
}
if (myord) return -1;
if (otherord) return 1;
return 0;
}
void TSlaveInfo::Print(Option_t *opt) const
{
TString stat = fStatus == kActive ? "active" :
fStatus == kBad ? "bad" :
"not active";
TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
if (!opt) opt = "";
if (!strcmp(opt, "active") && fStatus != kActive)
return;
if (!strcmp(opt, "notactive") && fStatus != kNotActive)
return;
if (!strcmp(opt, "bad") && fStatus != kBad)
return;
cout << "Slave: " << fOrdinal
<< " hostname: " << fHostName
<< " msd: " << msd
<< " perf index: " << fPerfIndex
<< " " << stat
<< endl;
}
static char *CollapseSlashesInPath(const char *path)
{
if (path) {
Int_t i = 1;
Int_t j = 0;
char *newPath = new char [strlen(path) + 1];
newPath[0] = path[0];
while (path[i]) {
if (path[i] != '/' || newPath[j] != '/') {
j++;
newPath[j] = path[i];
}
i++;
}
if (newPath[j] != '/')
j++;
newPath[j] = 0;
return newPath;
}
return 0;
}
ClassImp(TProof)
TSemaphore *TProof::fgSemaphore = 0;
TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
Int_t loglevel, const char *alias, TProofMgr *mgr)
: fUrl(masterurl)
{
fCloseMutex = 0;
fManager = mgr;
fServType = TProofMgr::kXProofd;
fQueryMode = kSync;
ResetBit(TProof::kIsClient);
ResetBit(TProof::kIsMaster);
if (!masterurl || strlen(masterurl) <= 0) {
fUrl.SetProtocol("proof");
fUrl.SetHost("__master__");
} else if (!(strstr(masterurl, "://"))) {
fUrl.SetProtocol("proof");
}
if (fUrl.GetPort() == TUrl(" ").GetPort())
fUrl.SetPort(TUrl("proof:// ").GetPort());
if (strlen(fUrl.GetUser()) <= 0) {
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUrl.SetUser(pw->fUser);
delete pw;
}
}
if (!strcmp(fUrl.GetHost(), "__master__"))
fMaster = fUrl.GetHost();
else if (!strlen(fUrl.GetHost()))
fMaster = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
else
fMaster = gSystem->GetHostByName(fUrl.GetHost()).GetHostName();
if (strlen(fUrl.GetOptions()) > 0) {
TString opts(fUrl.GetOptions());
if (!(strncmp(fUrl.GetOptions(),"std",3))) {
fServType = TProofMgr::kProofd;
opts.Remove(0,3);
fUrl.SetOptions(opts.Data());
} else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
fServType = TProofMgr::kProofLite;
opts.Remove(0,4);
fUrl.SetOptions(opts.Data());
}
}
fMasterServ = kFALSE;
SetBit(TProof::kIsClient);
ResetBit(TProof::kIsMaster);
if (fMaster == "__master__") {
fMasterServ = kTRUE;
ResetBit(TProof::kIsClient);
SetBit(TProof::kIsMaster);
} else if (fMaster == "prooflite") {
fMasterServ = kTRUE;
SetBit(TProof::kIsMaster);
}
Init(masterurl, conffile, confdir, loglevel, alias);
if (mgr) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(mgr);
gROOT->GetListOfSockets()->Add(mgr);
}
if (IsProofd() || TestBit(TProof::kIsMaster))
if (!gROOT->GetListOfProofs()->FindObject(this))
gROOT->GetListOfProofs()->Add(this);
gProof = this;
}
TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
{
fValid = kFALSE;
fRecvMessages = 0;
fSlaveInfo = 0;
fMasterServ = kFALSE;
fSendGroupView = kFALSE;
fActiveSlaves = 0;
fInactiveSlaves = 0;
fUniqueSlaves = 0;
fAllUniqueSlaves = 0;
fNonUniqueMasters = 0;
fActiveMonitor = 0;
fUniqueMonitor = 0;
fAllUniqueMonitor = 0;
fCurrentMonitor = 0;
fBytesRead = 0;
fRealTime = 0;
fCpuTime = 0;
fIntHandler = 0;
fProgressDialog = 0;
fProgressDialogStarted = kFALSE;
fPlayer = 0;
fFeedback = 0;
fChains = 0;
fDSet = 0;
fNotIdle = 0;
fSync = kTRUE;
fRunStatus = kRunning;
fIsWaiting = kFALSE;
fRedirLog = kFALSE;
fLogFileW = 0;
fLogFileR = 0;
fLogToWindowOnly = kFALSE;
fWaitingSlaves = 0;
fQueries = 0;
fOtherQueries = 0;
fDrawQueries = 0;
fMaxDrawQueries = 1;
fSeqNum = 0;
fSessionID = -1;
fEndMaster = kFALSE;
fGlobalPackageDirList = 0;
fPackageLock = 0;
fEnabledPackagesOnClient = 0;
fInputData = 0;
fPrintProgress = 0;
fLoadedMacros = 0;
fProtocol = -1;
fSlaves = 0;
fBadSlaves = 0;
fAllMonitor = 0;
fDataReady = kFALSE;
fBytesReady = 0;
fTotalBytes = 0;
fAvailablePackages = 0;
fEnabledPackages = 0;
fRunningDSets = 0;
fCollectTimeout = -1;
fManager = 0;
fQueryMode = kSync;
fDynamicStartup = kFALSE;
fCloseMutex = 0;
if (!gROOT->GetListOfProofs()->FindObject(this))
gROOT->GetListOfProofs()->Add(this);
gProof = this;
}
TProof::~TProof()
{
while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
chain->SetProof(0);
RemoveChain(chain);
}
if (TestBit(TProof::kIsClient)) {
TIter nextpackage(fEnabledPackagesOnClient);
while (TObjString *package = dynamic_cast<TObjString*>(nextpackage())) {
FileStat_t stat;
gSystem->GetPathInfo(package->String(), stat);
if (stat.fIsLink)
gSystem->Unlink(package->String());
}
}
Close();
SafeDelete(fIntHandler);
SafeDelete(fSlaves);
SafeDelete(fActiveSlaves);
SafeDelete(fInactiveSlaves);
SafeDelete(fUniqueSlaves);
SafeDelete(fAllUniqueSlaves);
SafeDelete(fNonUniqueMasters);
SafeDelete(fBadSlaves);
SafeDelete(fAllMonitor);
SafeDelete(fActiveMonitor);
SafeDelete(fUniqueMonitor);
SafeDelete(fAllUniqueMonitor);
SafeDelete(fSlaveInfo);
SafeDelete(fChains);
SafeDelete(fPlayer);
SafeDelete(fFeedback);
SafeDelete(fWaitingSlaves);
SafeDelete(fAvailablePackages);
SafeDelete(fEnabledPackages);
SafeDelete(fEnabledPackagesOnClient);
SafeDelete(fLoadedMacros);
SafeDelete(fPackageLock);
SafeDelete(fGlobalPackageDirList);
SafeDelete(fRecvMessages);
SafeDelete(fInputData);
SafeDelete(fRunningDSets);
SafeDelete(fCloseMutex);
if (TestBit(TProof::kIsClient)) {
if (fLogFileR)
fclose(fLogFileR);
if (fLogFileW)
fclose(fLogFileW);
if (fLogFileName.Length() > 0)
gSystem->Unlink(fLogFileName);
}
gROOT->GetListOfProofs()->Remove(this);
if (fManager && fManager->IsValid())
fManager->DiscardSession(this);
if (gProof && gProof == this) {
TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
while ((gProof = (TProof *)pvp())) {
if (gProof->InheritsFrom("TProof"))
break;
}
}
Emit("~TProof()");
}
Int_t TProof::Init(const char *, const char *conffile,
const char *confdir, Int_t loglevel, const char *alias)
{
R__ASSERT(gSystem);
fValid = kFALSE;
Bool_t attach = kFALSE;
if (strlen(fUrl.GetOptions()) > 0) {
attach = kTRUE;
TString opts = fUrl.GetOptions();
if (opts.Contains("GUI")) {
SetBit(TProof::kUsingSessionGui);
opts.Remove(opts.Index("GUI"));
fUrl.SetOptions(opts);
}
}
if (TestBit(TProof::kIsMaster)) {
if (!conffile || strlen(conffile) == 0)
fConfFile = kPROOF_ConfFile;
if (!confdir || strlen(confdir) == 0)
fConfDir = kPROOF_ConfDir;
} else {
fConfDir = confdir;
fConfFile = conffile;
}
ParseConfigField(fConfFile);
fWorkDir = gSystem->WorkingDirectory();
fLogLevel = loglevel;
fProtocol = kPROOF_Protocol;
fSendGroupView = kTRUE;
fImage = fMasterServ ? "" : "<local>";
fIntHandler = 0;
fStatus = 0;
fRecvMessages = new TList;
fRecvMessages->SetOwner(kTRUE);
fSlaveInfo = 0;
fChains = new TList;
fAvailablePackages = 0;
fEnabledPackages = 0;
fRunningDSets = 0;
fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
fInputData = 0;
ResetBit(TProof::kNewInputData);
fPrintProgress = 0;
fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
if (TestBit(TProof::kIsClient))
fDataPoolUrl.Form("root://%s", fMaster.Data());
else
fDataPoolUrl = "";
fProgressDialog = 0;
fProgressDialogStarted = kFALSE;
TString al = (alias) ? alias : fMaster.Data();
SetAlias(al);
fRedirLog = kFALSE;
if (TestBit(TProof::kIsClient)) {
fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
Error("Init", "could not create temporary logfile");
if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
Error("Init", "could not open temp logfile for reading");
}
fLogToWindowOnly = kFALSE;
fNotIdle = 0;
fSync = (attach) ? kFALSE : kTRUE;
fIsWaiting = kFALSE;
fBytesRead = 0;
fRealTime = 0;
fCpuTime = 0;
fQueries = 0;
fOtherQueries = 0;
fDrawQueries = 0;
fMaxDrawQueries = 1;
fSeqNum = 0;
fSessionID = -1;
fWaitingSlaves = 0;
fPlayer = 0;
MakePlayer();
fFeedback = new TList;
fFeedback->SetOwner();
fFeedback->SetName("FeedbackList");
AddInput(fFeedback);
fSlaves = new TSortedList(kSortDescending);
fActiveSlaves = new TList;
fInactiveSlaves = new TList;
fUniqueSlaves = new TList;
fAllUniqueSlaves = new TList;
fNonUniqueMasters = new TList;
fBadSlaves = new TList;
fAllMonitor = new TMonitor;
fActiveMonitor = new TMonitor;
fUniqueMonitor = new TMonitor;
fAllUniqueMonitor = new TMonitor;
fCurrentMonitor = 0;
fPackageLock = 0;
fEnabledPackagesOnClient = 0;
fLoadedMacros = 0;
fGlobalPackageDirList = 0;
Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
if (enableSchemaEvolution) {
TMessage::EnableSchemaEvolutionForAll();
} else {
Info("TProof", "automatic schema evolution in TMessage explicitely disabled");
}
if (IsMaster()) {
fPackageDir = gProofServ->GetPackageDir();
} else {
TString sandbox = gEnv->GetValue("Proof.Sandbox", "");
if (sandbox.IsNull()) {
sandbox.Form("~/%s", kPROOF_WorkDir);
}
gSystem->ExpandPathName(sandbox);
if (AssertPath(sandbox, kTRUE) != 0) {
Error("Init", "failure asserting directory %s", sandbox.Data());
return 0;
}
fPackageDir = gEnv->GetValue("Proof.PackageDir", "");
if (fPackageDir.IsNull())
fPackageDir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
if (AssertPath(fPackageDir, kTRUE) != 0) {
Error("Init", "failure asserting directory %s", fPackageDir.Data());
return 0;
}
}
if (!IsMaster()) {
TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
if (globpack.Length() > 0) {
Int_t ng = 0;
Int_t from = 0;
TString ldir;
while (globpack.Tokenize(ldir, from, ":")) {
if (gSystem->AccessPathName(ldir, kReadPermission)) {
Warning("Init", "directory for global packages %s does not"
" exist or is not readable", ldir.Data());
} else {
TString key = Form("G%d", ng++);
if (!fGlobalPackageDirList) {
fGlobalPackageDirList = new THashList();
fGlobalPackageDirList->SetOwner();
}
fGlobalPackageDirList->Add(new TNamed(key,ldir));
}
}
}
TString lockpath(fPackageDir);
lockpath.ReplaceAll("/", "%");
lockpath.Insert(0, Form("%s/%s", gSystem->TempDirectory(), kPROOF_PackageLockFile));
fPackageLock = new TProofLockPath(lockpath.Data());
fEnabledPackagesOnClient = new TList;
fEnabledPackagesOnClient->SetOwner();
}
if (fDynamicStartup) {
if (!IsMaster()) {
if (!StartSlaves(attach))
return 0;
}
} else {
Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
if (!IsMaster() || !masterOnly) {
if (!StartSlaves(attach))
return 0;
}
}
if (fgSemaphore)
SafeDelete(fgSemaphore);
fValid = kTRUE;
fAllMonitor->DeActivateAll();
GoParallel(9999, attach);
if (!attach)
SendInitialState();
else if (!IsIdle())
fRedirLog = kTRUE;
if (TestBit(TProof::kIsClient))
SetAlias(al);
SetActive(kFALSE);
if (IsValid()) {
ActivateAsyncInput();
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
return fActiveSlaves->GetSize();
}
void TProof::ParseConfigField(const char *config)
{
TString sconf(config);
Int_t ivg = kNPOS;
if ((ivg = sconf.Index("valgrind")) != kNPOS) {
Int_t jvg = sconf.Index(',', ivg);
Int_t lvg = (jvg != kNPOS) ? (jvg-ivg) : sconf.Length();
TString vgconf = sconf(ivg, lvg);
TString mst, wrk, all;
TList *envs = fgProofEnvList;
TNamed *n = 0;
if (envs) {
if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
all = n->GetTitle();
if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
mst = n->GetTitle();
if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
wrk = n->GetTitle();
}
if (all != "" && mst == "") mst = all;
if (all != "" && wrk == "") wrk = all;
if (all != "" && all.BeginsWith("valgrind_opts:")) {
Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
" must be set again for next run , if any");
TProof::DelEnvVar("PROOF_WRAPPERCMD");
}
TString var, cmd("valgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp");
TString mstlab("NO"), wrklab("NO");
if (vgconf == "valgrind" || vgconf.Contains("master")) {
if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
mst.ReplaceAll("valgrind_opts:","");
var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
mstlab = "YES";
} else if (mst != "") {
mstlab = "YES";
}
}
if (vgconf.Contains("=workers") || vgconf.Contains("+workers")) {
if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
wrk.ReplaceAll("valgrind_opts:","");
var.Form("%s --log-file=<logfilewrk>.valgrind.log %s", cmd.Data(), wrk.Data());
TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
TString nwrks("2");
Int_t inw = vgconf.Index('#');
if (inw != kNPOS) {
nwrks = vgconf(inw+1, vgconf.Length());
if (!nwrks.IsDigit()) nwrks = "2";
}
TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
wrklab = nwrks;
TProof::AddEnvVar("PROOF_ADDITIONALLOG", "valgrind.log*");
} else if (wrk != "") {
wrklab = "ALL";
}
}
TProof::AddEnvVar("PROOF_INTWAIT", "5000");
gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
Printf(" ");
Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
Printf(" ---> Please be patient: startup may be VERY slow ...");
Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
Printf(" ");
}
}
Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
{
if (!inpath || strlen(inpath) <= 0) {
Error("AssertPath", "undefined input path");
return -1;
}
TString path(inpath);
gSystem->ExpandPathName(path);
if (gSystem->AccessPathName(path, kFileExists)) {
if (gSystem->mkdir(path, kTRUE) != 0) {
Error("AssertPath", "could not create path %s", path.Data());
return -1;
}
}
if (gSystem->AccessPathName(path, kWritePermission) && writable) {
if (gSystem->Chmod(path, 0666) != 0) {
Error("AssertPath", "could not make path %s writable", path.Data());
return -1;
}
}
return 0;
}
void TProof::SetManager(TProofMgr *mgr)
{
fManager = mgr;
if (mgr) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(mgr);
gROOT->GetListOfSockets()->Add(mgr);
}
}
Int_t TProof::AddWorkers(TList *workerList)
{
if (!IsMaster()) {
Error("AddWorkers", "AddWorkers can only be called on the master!");
return -1;
}
if (!workerList || !(workerList->GetSize())) {
Error("AddWorkers", "The list of workers should not be empty; NULL: %d",
workerList == 0);
return -2;
}
fImage = gProofServ->GetImage();
if (fImage.IsNull())
fImage = Form("%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(),
gProofServ->GetWorkDir());
UInt_t nSlaves = workerList->GetSize();
UInt_t nSlavesDone = 0;
Int_t ord = 0;
TList *addedWorkers = new TList();
addedWorkers->SetOwner(kFALSE);
TListIter next(workerList);
TObject *to;
TProofNodeInfo *worker;
while ((to = next())) {
worker = (TProofNodeInfo *)to;
const Char_t *image = worker->GetImage().Data();
const Char_t *workdir = worker->GetWorkDir().Data();
Int_t perfidx = worker->GetPerfIndex();
Int_t sport = worker->GetPort();
if (sport == -1)
sport = fUrl.GetPort();
TString fullord;
if (worker->GetOrdinal().Length() > 0) {
fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
} else {
fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
}
TUrl u(Form("%s:%d",worker->GetNodeName().Data(), sport));
if (strlen(gProofServ->GetGroup()) > 0) {
if (strlen(u.GetUser()) <= 0)
u.SetUser(gProofServ->GetUser());
u.SetPasswd(gProofServ->GetGroup());
}
TSlave *slave = CreateSlave(u.GetUrl(), fullord, perfidx,
image, workdir);
Bool_t slaveOk = kTRUE;
if (slave->IsValid()) {
fSlaves->Add(slave);
addedWorkers->Add(slave);
} else {
slaveOk = kFALSE;
fBadSlaves->Add(slave);
}
PDB(kGlobal,3)
Info("StartSlaves", "worker on host %s created"
" and added to list", worker->GetNodeName().Data());
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to workers") << nSlaves
<< nSlavesDone << slaveOk;
gProofServ->GetSocket()->Send(m);
ord++;
}
SafeDelete(workerList);
nSlavesDone = 0;
TIter nxsl(addedWorkers);
TSlave *sl = 0;
while ((sl = (TSlave *) nxsl())) {
if (sl->IsValid())
sl->SetupServ(TSlave::kSlave, 0);
Bool_t slaveOk = kTRUE;
if (sl->IsValid()) {
fAllMonitor->Add(sl->GetSocket());
} else {
slaveOk = kFALSE;
fBadSlaves->Add(sl);
}
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up worker servers") << nSlaves
<< nSlavesDone << slaveOk;
gProofServ->GetSocket()->Send(m);
}
SetParallel(99999, 0);
TList *tmpEnabledPackages = gProofServ->GetEnabledPackages();
if (tmpEnabledPackages && tmpEnabledPackages->GetSize() > 0) {
TIter nxp(tmpEnabledPackages);
TObjString *os = 0;
while ((os = (TObjString *) nxp())) {
UploadPackage(os->GetName());
EnablePackage(os->GetName(), kTRUE);
}
}
if (fLoadedMacros) {
TIter nxp(fLoadedMacros);
TObjString *os = 0;
while ((os = (TObjString *) nxp())) {
Printf("Loading a macro : %s", os->GetName());
Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
}
}
TString dyn = gSystem->GetDynamicPath();
dyn.ReplaceAll(":", " ");
dyn.ReplaceAll("\"", " ");
AddDynamicPath(dyn, addedWorkers);
TString inc = gSystem->GetIncludePath();
inc.ReplaceAll("-I", " ");
inc.ReplaceAll("\"", " ");
AddIncludePath(inc, addedWorkers);
delete addedWorkers;
if (fDynamicStartup && gProofServ)
gProofServ->SendParallel(kTRUE);
return 0;
}
Int_t TProof::RemoveWorkers(TList *workerList)
{
if (!IsMaster()) {
Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
return -1;
}
fFileMap.clear();
if (!workerList) {
TIter nxsl(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *) nxsl())) {
TerminateWorker(sl);
}
} else {
if (!(workerList->GetSize())) {
Error("RemoveWorkers", "The list of workers should not be empty!");
return -2;
}
TListIter next(workerList);
TObject *to;
TProofNodeInfo *worker;
while ((to = next())) {
TSlave *sl = 0;
if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
worker = (TProofNodeInfo *)to;
TIter nxsl(fSlaves);
while ((sl = (TSlave *) nxsl())) {
if (sl->GetName() == worker->GetNodeName())
break;
}
} else if (to->InheritsFrom("TSlave")) {
sl = (TSlave *) to;
} else {
Warning("RemoveWorkers","unknown object type: %s - it should be"
" TProofNodeInfo or inheriting from TSlave", to->ClassName());
}
if (sl) {
if (gDebug > 0)
Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
TerminateWorker(sl);
}
}
}
if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
return 0;
}
Bool_t TProof::StartSlaves(Bool_t attach)
{
if (TestBit(TProof::kIsMaster)) {
Int_t pc = 0;
TList *workerList = new TList;
if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
TString emsg("no resource currently available for this session: please retry later");
if (gDebug > 0) Info("StartSlaves", emsg.Data());
gProofServ->SendAsynMessage(emsg.Data());
return kFALSE;
}
if (AddWorkers(workerList) < 0)
return kFALSE;
} else {
Printf("Starting master: opening connection ...");
TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
if (slave->IsValid()) {
fprintf(stderr,"Starting master:"
" connection open: setting up server ... \r");
StartupMessage("Connection to master opened", kTRUE, 1, 1);
if (!attach) {
slave->SetInterruptHandler(kTRUE);
slave->SetupServ(TSlave::kMaster, fConfFile);
if (slave->IsValid()) {
Printf("Starting master: OK ");
StartupMessage("Master started", kTRUE, 1, 1);
if (fProtocol == 1) {
Error("StartSlaves",
"client and remote protocols not compatible (%d and %d)",
kPROOF_Protocol, fProtocol);
slave->Close("S");
delete slave;
return kFALSE;
}
fSlaves->Add(slave);
fAllMonitor->Add(slave->GetSocket());
slave->SetInterruptHandler(kFALSE);
fIntHandler = new TProofInterruptHandler(this);
Int_t rc = Collect(slave, 300);
Int_t slStatus = slave->GetStatus();
if (slStatus == -99 || slStatus == -98 || rc == 0) {
fSlaves->Remove(slave);
fAllMonitor->Remove(slave->GetSocket());
if (slStatus == -99)
Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
else if (slStatus == -98)
Error("StartSlaves", "could not setup output redirection on master");
else
Error("StartSlaves", "setting up master");
slave->Close("S");
delete slave;
return 0;
}
if (!slave->IsValid()) {
fSlaves->Remove(slave);
fAllMonitor->Remove(slave->GetSocket());
slave->Close("S");
delete slave;
Error("StartSlaves",
"failed to setup connection with PROOF master server");
return kFALSE;
}
if (!gROOT->IsBatch()) {
if ((fProgressDialog =
gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
if (fProgressDialog->LoadPlugin() == -1)
fProgressDialog = 0;
}
} else {
Printf("Starting master: failure");
}
} else {
Printf("Starting master: OK ");
StartupMessage("Master attached", kTRUE, 1, 1);
if (!gROOT->IsBatch()) {
if ((fProgressDialog =
gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
if (fProgressDialog->LoadPlugin() == -1)
fProgressDialog = 0;
}
fSlaves->Add(slave);
fIntHandler = new TProofInterruptHandler(this);
}
} else {
delete slave;
if (gDebug > 0)
Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
return kFALSE;
}
}
return kTRUE;
}
void TProof::Close(Option_t *opt)
{
{ R__LOCKGUARD2(fCloseMutex);
fValid = kFALSE;
if (fSlaves) {
if (fIntHandler)
fIntHandler->Remove();
TIter nxs(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *)nxs()))
sl->Close(opt);
fActiveSlaves->Clear("nodelete");
fUniqueSlaves->Clear("nodelete");
fAllUniqueSlaves->Clear("nodelete");
fNonUniqueMasters->Clear("nodelete");
fBadSlaves->Clear("nodelete");
fSlaves->Delete();
}
}
{
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(this);
if (IsProofd()) {
gROOT->GetListOfProofs()->Remove(this);
if (gProof && gProof == this) {
TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
while ((gProof = (TProof *)pvp())) {
if (gProof->IsProofd())
break;
}
}
}
}
}
TSlave *TProof::CreateSlave(const char *url, const char *ord,
Int_t perf, const char *image, const char *workdir)
{
TSlave* sl = TSlave::Create(url, ord, perf, image,
this, TSlave::kSlave, workdir, 0);
if (sl->IsValid()) {
sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
sl->fParallel = 1;
}
return sl;
}
TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
const char *image, const char *msd)
{
TSlave *sl = TSlave::Create(url, ord, 100, image, this,
TSlave::kMaster, 0, msd);
if (sl->IsValid()) {
sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
}
return sl;
}
TSlave *TProof::FindSlave(TSocket *s) const
{
TSlave *sl;
TIter next(fSlaves);
while ((sl = (TSlave *)next())) {
if (sl->IsValid() && sl->GetSocket() == s)
return sl;
}
return 0;
}
void TProof::FindUniqueSlaves()
{
fUniqueSlaves->Clear();
fUniqueMonitor->RemoveAll();
fAllUniqueSlaves->Clear();
fAllUniqueMonitor->RemoveAll();
fNonUniqueMasters->Clear();
TIter next(fActiveSlaves);
while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
if (fImage == sl->fImage) {
if (sl->GetSlaveType() == TSlave::kMaster) {
fNonUniqueMasters->Add(sl);
fAllUniqueSlaves->Add(sl);
fAllUniqueMonitor->Add(sl->GetSocket());
}
continue;
}
TIter next2(fUniqueSlaves);
TSlave *replace_slave = 0;
Bool_t add = kTRUE;
while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
if (sl->fImage == sl2->fImage) {
add = kFALSE;
if (sl->GetSlaveType() == TSlave::kMaster) {
if (sl2->GetSlaveType() == TSlave::kSlave) {
replace_slave = sl2;
add = kTRUE;
} else if (sl2->GetSlaveType() == TSlave::kMaster) {
fNonUniqueMasters->Add(sl);
fAllUniqueSlaves->Add(sl);
fAllUniqueMonitor->Add(sl->GetSocket());
} else {
Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
R__ASSERT(0);
}
}
break;
}
}
if (add) {
fUniqueSlaves->Add(sl);
fAllUniqueSlaves->Add(sl);
fUniqueMonitor->Add(sl->GetSocket());
fAllUniqueMonitor->Add(sl->GetSocket());
if (replace_slave) {
fUniqueSlaves->Remove(replace_slave);
fAllUniqueSlaves->Remove(replace_slave);
fUniqueMonitor->Remove(replace_slave->GetSocket());
fAllUniqueMonitor->Remove(replace_slave->GetSocket());
}
}
}
fUniqueMonitor->DeActivateAll();
fAllUniqueMonitor->DeActivateAll();
}
Int_t TProof::GetNumberOfSlaves() const
{
return fSlaves->GetSize();
}
Int_t TProof::GetNumberOfActiveSlaves() const
{
return fActiveSlaves->GetSize();
}
Int_t TProof::GetNumberOfInactiveSlaves() const
{
return fInactiveSlaves->GetSize();
}
Int_t TProof::GetNumberOfUniqueSlaves() const
{
return fUniqueSlaves->GetSize();
}
Int_t TProof::GetNumberOfBadSlaves() const
{
return fBadSlaves->GetSize();
}
void TProof::AskStatistics()
{
if (!IsValid()) return;
Broadcast(kPROOF_GETSTATS, kActive);
Collect(kActive, fCollectTimeout);
}
void TProof::AskParallel()
{
if (!IsValid()) return;
Broadcast(kPROOF_GETPARALLEL, kActive);
Collect(kActive, fCollectTimeout);
}
TList *TProof::GetListOfQueries(Option_t *opt)
{
if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
TMessage m(kPROOF_QUERYLIST);
m << all;
Broadcast(m, kActive);
Collect(kActive, fCollectTimeout);
return fQueries;
}
Int_t TProof::GetNumberOfQueries()
{
if (fQueries)
return fQueries->GetSize() - fOtherQueries;
return 0;
}
void TProof::SetMaxDrawQueries(Int_t max)
{
if (max > 0) {
if (fPlayer)
fPlayer->SetMaxDrawQueries(max);
fMaxDrawQueries = max;
}
}
void TProof::GetMaxQueries()
{
TMessage m(kPROOF_MAXQUERIES);
m << kFALSE;
Broadcast(m, kActive);
Collect(kActive, fCollectTimeout);
}
TList *TProof::GetQueryResults()
{
return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
}
TQueryResult *TProof::GetQueryResult(const char *ref)
{
return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
}
void TProof::ShowQueries(Option_t *opt)
{
Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
if (help) {
Printf("+++");
Printf("+++ Options: \"A\" show all queries known to server");
Printf("+++ \"L\" show retrieved queries");
Printf("+++ \"F\" full listing of query info");
Printf("+++ \"H\" print this menu");
Printf("+++");
Printf("+++ (case insensitive)");
Printf("+++");
Printf("+++ Use Retrieve(<#>) to retrieve the full"
" query results from the master");
Printf("+++ e.g. Retrieve(8)");
Printf("+++");
return;
}
if (!IsValid()) return;
Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
TObject *pq = 0;
if (!local) {
GetListOfQueries(opt);
if (!fQueries) return;
TIter nxq(fQueries);
if (fOtherQueries > 0) {
Printf("+++");
Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
Int_t nq = 0;
while (nq++ < fOtherQueries && (pq = nxq()))
pq->Print(opt);
}
Printf("+++");
Printf("+++ Queries processed during this session: selector: %d, draw: %d",
GetNumberOfQueries(), fDrawQueries);
while ((pq = nxq()))
pq->Print(opt);
} else {
Printf("+++");
Printf("+++ Queries processed during this session: selector: %d, draw: %d",
GetNumberOfQueries(), fDrawQueries);
TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
if (listlocal) {
Printf("+++");
Printf("+++ Queries available locally: %d", listlocal->GetSize());
TIter nxlq(listlocal);
while ((pq = nxlq()))
pq->Print(opt);
}
}
Printf("+++");
}
Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
{
if (!IsValid()) return kFALSE;
TList submasters;
TIter nextSlave(GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
if (sl->GetSlaveType() == TSlave::kMaster) {
submasters.Add(sl);
}
}
fDataReady = kTRUE;
fBytesReady = 0;
fTotalBytes = 0;
if (submasters.GetSize() > 0) {
Broadcast(kPROOF_DATA_READY, &submasters);
Collect(&submasters);
}
bytesready = fBytesReady;
totalbytes = fTotalBytes;
EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
Info("IsDataReady", "%lld / %lld (%s)",
bytesready, totalbytes, fDataReady?"READY":"NOT READY");
return fDataReady;
}
void TProof::Interrupt(EUrgent type, ESlaves list)
{
if (!IsValid()) return;
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
if (slaves->GetSize() == 0) return;
TSlave *sl;
TIter next(slaves);
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
sl->Interrupt((Int_t)type);
}
}
}
Int_t TProof::GetParallel() const
{
if (!IsValid()) return -1;
TIter nextSlave(GetListOfActiveSlaves());
Int_t nparallel = 0;
while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
if (sl->GetParallel() >= 0)
nparallel += sl->GetParallel();
return nparallel;
}
TList *TProof::GetListOfSlaveInfos()
{
if (!IsValid()) return 0;
if (fSlaveInfo == 0) {
fSlaveInfo = new TSortedList(kSortDescending);
fSlaveInfo->SetOwner();
} else {
fSlaveInfo->Delete();
}
TList masters;
TIter next(GetListOfSlaves());
TSlave *slave;
while ((slave = (TSlave *) next()) != 0) {
if (slave->GetSlaveType() == TSlave::kSlave) {
TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
slave->GetName(),
slave->GetPerfIdx());
fSlaveInfo->Add(slaveinfo);
TIter nextactive(GetListOfActiveSlaves());
TSlave *activeslave;
while ((activeslave = (TSlave *) nextactive())) {
if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
slaveinfo->SetStatus(TSlaveInfo::kActive);
break;
}
}
TIter nextbad(GetListOfBadSlaves());
TSlave *badslave;
while ((badslave = (TSlave *) nextbad())) {
if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
slaveinfo->SetStatus(TSlaveInfo::kBad);
break;
}
}
} else if (slave->GetSlaveType() == TSlave::kMaster) {
if (slave->IsValid()) {
if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
else
masters.Add(slave);
}
} else {
Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
R__ASSERT(0);
}
}
if (masters.GetSize() > 0) Collect(&masters);
return fSlaveInfo;
}
void TProof::Activate(TList *slaves)
{
TMonitor *mon = fAllMonitor;
mon->DeActivateAll();
slaves = !slaves ? fActiveSlaves : slaves;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave*) next())) {
if (sl->IsValid())
mon->Activate(sl->GetSocket());
}
}
void TProof::SetMonitor(TMonitor *mon, Bool_t on)
{
TMonitor *m = (mon) ? mon : fCurrentMonitor;
if (m) {
if (on)
m->ActivateAll();
else
m->DeActivateAll();
}
}
Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
{
if (!IsValid()) return -1;
if (workers->GetSize() == 0) return 0;
int nsent = 0;
TIter next(workers);
TSlave *wrk;
while ((wrk = (TSlave *)next())) {
if (wrk->IsValid()) {
if (wrk->SendGroupPriority(grp, priority) == -1)
MarkBad(wrk, "could not send group priority");
else
nsent++;
}
}
return nsent;
}
Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
{
TList *workers = 0;
if (list == kAll) workers = fSlaves;
if (list == kActive) workers = fActiveSlaves;
if (list == kUnique) workers = fUniqueSlaves;
if (list == kAllUnique) workers = fAllUniqueSlaves;
return BroadcastGroupPriority(grp, priority, workers);
}
void TProof::ResetMergePrg()
{
fMergePrg.Reset(fActiveSlaves->GetSize());
}
Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
{
if (!IsValid()) return -1;
if (!slaves || slaves->GetSize() == 0) return 0;
int nsent = 0;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (sl->GetSocket()->Send(mess) == -1)
MarkBad(sl, "could not broadcast request");
else
nsent++;
}
}
return nsent;
}
Int_t TProof::Broadcast(const TMessage &mess, ESlaves list)
{
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
return Broadcast(mess, slaves);
}
Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
{
TMessage mess(kind);
if (str) mess.WriteString(str);
return Broadcast(mess, slaves);
}
Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
{
TMessage mess(kind);
if (str) mess.WriteString(str);
return Broadcast(mess, list);
}
Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, TList *slaves)
{
TMessage mess(kind);
mess.WriteObject(obj);
return Broadcast(mess, slaves);
}
Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, ESlaves list)
{
TMessage mess(kind);
mess.WriteObject(obj);
return Broadcast(mess, list);
}
Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
{
if (!IsValid()) return -1;
if (slaves->GetSize() == 0) return 0;
int nsent = 0;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (sl->GetSocket()->SendRaw(buffer, length) == -1)
MarkBad(sl, "could not send broadcast-raw request");
else
nsent++;
}
}
return nsent;
}
Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
{
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
return BroadcastRaw(buffer, length, slaves);
}
Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
{
if (!IsValid()) return -1;
if (wrks->GetSize() == 0) return 0;
int nsent = 0;
TIter next(wrks);
TSlave *wrk;
while ((wrk = (TSlave *)next())) {
if (wrk->IsValid()) {
if (SendFile(file, opt, rfile, wrk) < 0)
Error("BroadcastFile",
"problems sending file to worker %s (%s)",
wrk->GetOrdinal(), wrk->GetName());
else
nsent++;
}
}
return nsent;
}
Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
{
TList *wrks = 0;
if (list == kAll) wrks = fSlaves;
if (list == kActive) wrks = fActiveSlaves;
if (list == kUnique) wrks = fUniqueSlaves;
if (list == kAllUnique) wrks = fAllUniqueSlaves;
return BroadcastFile(file, opt, rfile, wrks);
}
void TProof::ReleaseMonitor(TMonitor *mon)
{
if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
&& (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
delete mon;
}
}
Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype)
{
Int_t rc = 0;
TMonitor *mon = 0;
if (!sl->IsValid()) return 0;
if (fCurrentMonitor == fAllMonitor) {
mon = new TMonitor;
} else {
mon = fAllMonitor;
mon->DeActivateAll();
}
mon->Activate(sl->GetSocket());
rc = Collect(mon, timeout, endtype);
ReleaseMonitor(mon);
return rc;
}
Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype)
{
Int_t rc = 0;
TMonitor *mon = 0;
if (fCurrentMonitor == fAllMonitor) {
mon = new TMonitor;
} else {
mon = fAllMonitor;
mon->DeActivateAll();
}
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave*) next())) {
if (sl->IsValid())
mon->Activate(sl->GetSocket());
}
rc = Collect(mon, timeout, endtype);
ReleaseMonitor(mon);
return rc;
}
Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype)
{
Int_t rc = 0;
TMonitor *mon = 0;
if (list == kAll) mon = fAllMonitor;
if (list == kActive) mon = fActiveMonitor;
if (list == kUnique) mon = fUniqueMonitor;
if (list == kAllUnique) mon = fAllUniqueMonitor;
if (fCurrentMonitor == mon) {
mon = new TMonitor(*mon);
}
mon->ActivateAll();
rc = Collect(mon, timeout, endtype);
ReleaseMonitor(mon);
return rc;
}
Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype)
{
fStatus = 0;
fRecvMessages->Clear();
Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", 600) * 1000);
if (!mon->GetActive(actto)) return 0;
DeActivateAsyncInput();
TMonitor *savedMonitor = 0;
if (fCurrentMonitor) {
savedMonitor = fCurrentMonitor;
fCurrentMonitor = mon;
} else {
fCurrentMonitor = mon;
fBytesRead = 0;
fRealTime = 0.0;
fCpuTime = 0.0;
}
Bool_t saveRedirLog = fRedirLog;
if (!IsIdle() && !IsSync())
fRedirLog = kFALSE;
int cnt = 0, rc = 0;
Long_t nto = timeout;
if (gDebug > 2)
Info("Collect","active: %d", mon->GetActive());
if (fIntHandler)
fIntHandler->Add();
Long_t sto = -1;
Int_t nsto = 60;
while (mon->GetActive(sto) && (nto < 0 || nto > 0)) {
TSocket *s = mon->Select(1000);
if (s && s != (TSocket *)(-1)) {
rc = CollectInputFrom(s, endtype);
if (rc == 1 || (rc == 2 && !savedMonitor)) {
mon->DeActivate(s);
PDB(kGlobal, 2)
Info("Collect","deactivating %p (active: %d, %p)",
s, mon->GetActive(),
mon->GetListOfActives()->First());
} else if (rc == 2) {
if (savedMonitor) {
savedMonitor->DeActivate(s);
PDB(kGlobal, 2)
Info("Collect","save monitor: deactivating %p (active: %d, %p)",
s, savedMonitor->GetActive(),
savedMonitor->GetListOfActives()->First());
}
}
if (rc >= 0)
cnt++;
} else {
if (!s)
if (fPlayer && (fPlayer->GetExitStatus() == TVirtualProofPlayer::kFinished))
mon->DeActivateAll();
if (s == (TSocket *)(-1) && nto > 0)
nto--;
}
sto = -1;
if (--nsto <= 0) {
sto = (Long_t) actto;
nsto = 60;
}
}
if (nto == 0) {
TList *al = mon->GetListOfActives();
if (al && al->GetSize() > 0) {
Info("Collect"," %d node(s) went in timeout:", al->GetSize());
TIter nxs(al);
TSocket *xs = 0;
while ((xs = (TSocket *)nxs())) {
TSlave *wrk = FindSlave(xs);
if (wrk)
Info("Collect"," %s", wrk->GetName());
else
Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
xs->GetInetAddress().GetPort());
}
}
mon->DeActivateAll();
}
if (fIntHandler)
fIntHandler->Remove();
SendGroupView();
fRedirLog = saveRedirLog;
fCurrentMonitor = savedMonitor;
ActivateAsyncInput();
return cnt;
}
void TProof::CleanGDirectory(TList *ol)
{
if (ol) {
TIter nxo(ol);
TObject *o = 0;
while ((o = nxo()))
gDirectory->RecursiveRemove(o);
}
}
Int_t TProof::CollectInputFrom(TSocket *s, Int_t endtype)
{
TMessage *mess;
Int_t recvrc = 0;
if ((recvrc = s->Recv(mess)) < 0) {
PDB(kGlobal,2)
Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
Bool_t bad = kTRUE;
if (recvrc == -5) {
if (fCurrentMonitor) fCurrentMonitor->Remove(s);
if (s->Reconnect() == 0) {
if (fCurrentMonitor) fCurrentMonitor->Add(s);
bad = kFALSE;
}
}
if (bad)
MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
return -1;
}
if (!mess) {
MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
return -1;
}
Int_t rc = 0;
Int_t what = mess->What();
TSlave *sl = FindSlave(s);
rc = HandleInputMessage(sl, mess);
if (rc == 1 && (endtype >= 0) && (what != endtype))
rc = 2;
return rc;
}
Int_t TProof::HandleInputMessage(TSlave *sl, TMessage *mess)
{
char str[512];
TObject *obj;
Int_t rc = 0;
if (!mess || !sl) {
Warning("HandleInputMessage", "given an empty message or undefined worker");
return -1;
}
Bool_t delete_mess = kTRUE;
TSocket *s = sl->GetSocket();
if (!s) {
Warning("HandleInputMessage", "worker socket is undefined");
return -1;
}
Int_t what = mess->What();
PDB(kGlobal,3)
Info("HandleInputMessage", "got type %d from '%s'", what, (sl ? sl->GetOrdinal() : "undef"));
switch (what) {
case kMESS_OK:
fRecvMessages->Add(mess);
delete_mess = kFALSE;
break;
case kMESS_OBJECT:
if (fPlayer) fPlayer->HandleRecvHisto(mess);
break;
case kPROOF_FATAL:
MarkBad(s, "received kPROOF_FATAL");
if (fProgressDialogStarted) {
Emit("StopProcess(Bool_t)", kTRUE);
}
break;
case kPROOF_STOP:
Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
(sl ? sl->GetOrdinal() : "undef"));
rc = 1;
break;
case kPROOF_GETTREEHEADER:
fRecvMessages->Add(mess);
delete_mess = kFALSE;
rc = 1;
break;
case kPROOF_TOUCH:
{
sl->Touch();
}
break;
case kPROOF_GETOBJECT:
mess->ReadString(str, sizeof(str));
obj = gDirectory->Get(str);
if (obj)
s->SendObject(obj);
else
s->Send(kMESS_NOTOK);
break;
case kPROOF_GETPACKET:
{
TDSetElement *elem = 0;
elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
if (elem != (TDSetElement*) -1) {
TMessage answ(kPROOF_GETPACKET);
answ << elem;
s->Send(answ);
while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
TPair *p = (TPair*) fWaitingSlaves->First();
s = (TSocket*) p->Key();
TMessage *m = (TMessage*) p->Value();
elem = fPlayer->GetNextPacket(sl, m);
if (elem != (TDSetElement*) -1) {
TMessage a(kPROOF_GETPACKET);
a << elem;
s->Send(a);
fWaitingSlaves->Remove(fWaitingSlaves->FirstLink());
delete p;
delete m;
} else {
break;
}
}
} else {
if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
fWaitingSlaves->Add(new TPair(s, mess));
delete_mess = kFALSE;
}
}
break;
case kPROOF_LOGFILE:
{
Int_t size;
(*mess) >> size;
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_LOGFILE: size: %d", size);
RecvLogFile(s, size);
}
break;
case kPROOF_LOGDONE:
(*mess) >> sl->fStatus >> sl->fParallel;
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_LOGDONE:%s: status %d parallel %d",
sl->GetOrdinal(), sl->fStatus, sl->fParallel);
if (sl->fStatus != 0) fStatus = sl->fStatus;
rc = 1;
break;
case kPROOF_GETSTATS:
{
(*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
>> sl->fWorkDir >> sl->fProofWorkDir;
TString img;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> img;
if (img.IsNull()) {
if (sl->fImage.IsNull())
sl->fImage = Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
sl->fProofWorkDir.Data());
} else {
sl->fImage = img;
}
PDB(kGlobal,2)
Info("HandleInputMessage",
"kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
fBytesRead += sl->fBytesRead;
fRealTime += sl->fRealTime;
fCpuTime += sl->fCpuTime;
rc = 1;
}
break;
case kPROOF_GETPARALLEL:
{
Bool_t async = kFALSE;
(*mess) >> sl->fParallel;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> async;
rc = (async) ? 0 : 1;
}
break;
case kPROOF_CHECKFILE:
{
if ((mess->BufferSize() > mess->Length())) {
(*mess) >> fCheckFileStatus;
} else {
fCheckFileStatus = 1;
}
rc = 1;
}
break;
case kPROOF_SENDFILE:
{
rc = 1;
}
break;
case kPROOF_PACKAGE_LIST:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
Int_t type = 0;
(*mess) >> type;
switch (type) {
case TProof::kListEnabledPackages:
SafeDelete(fEnabledPackages);
fEnabledPackages = (TList *) mess->ReadObject(TList::Class());
if (fEnabledPackages) {
fEnabledPackages->SetOwner();
} else {
Error("HandleInputMessage",
"kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
}
break;
case TProof::kListPackages:
SafeDelete(fAvailablePackages);
fAvailablePackages = (TList *) mess->ReadObject(TList::Class());
if (fAvailablePackages) {
fAvailablePackages->SetOwner();
} else {
Error("HandleInputMessage",
"kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
}
break;
default:
Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
}
}
break;
case kPROOF_OUTPUTOBJECT:
{
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
Int_t type = 0;
while ((mess->BufferSize() > mess->Length())) {
(*mess) >> type;
if (fPlayer) {
if (type == 0) {
TQueryResult *pq =
(TQueryResult *) mess->ReadObject(TQueryResult::Class());
if (pq) {
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
if (fPlayer->GetOutputList())
fPlayer->GetOutputList()->Clear();
fPlayer->AddInput(new TNamed("PROOF_QueryTag",
Form("%s:%s",pq->GetTitle(),pq->GetName())));
} else {
Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
}
} else if (type > 0) {
TObject *o = mess->ReadObject(TObject::Class());
if (gProofServ) {
fMergePrg.IncreaseIdx();
TString msg;
msg.Form("%s: merging output objects ... %s", gProofServ->GetPrefix(), fMergePrg.Export());
gProofServ->SendAsynMessage(msg.Data(), kFALSE);
}
if ((fPlayer->AddOutputObject(o) == 1)) {
SafeDelete(o);
}
if (type > 1) {
fMergePrg.DecreaseNWrks();
if (TestBit(TProof::kIsClient) && !IsLite()) {
TQueryResult *pq = fPlayer->GetCurrentQuery();
pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
pq->SetInputList(fPlayer->GetInputList(), kFALSE);
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
UpdateDialog();
}
}
}
} else {
Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
}
}
}
break;
case kPROOF_OUTPUTLIST:
{
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_OUTPUTLIST: enter");
TList *out = 0;
if (fPlayer) {
if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
out = (TList *) mess->ReadObject(TList::Class());
} else {
TQueryResult *pq =
(TQueryResult *) mess->ReadObject(TQueryResult::Class());
if (pq) {
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
out = pq->GetOutputList();
CleanGDirectory(out);
out = (TList *) out->Clone();
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
} else {
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_OUTPUTLIST: query result missing");
}
}
if (out) {
out->SetOwner();
fPlayer->AddOutput(out);
SafeDelete(out);
} else {
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_OUTPUTLIST: ouputlist is empty");
}
} else {
Warning("HandleInputMessage", "kPROOF_OUTPUTLIST: player undefined!");
}
if (TestBit(TProof::kIsClient) && !IsLite())
UpdateDialog();
}
break;
case kPROOF_QUERYLIST:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
(*mess) >> fOtherQueries >> fDrawQueries;
if (fQueries) {
fQueries->Delete();
delete fQueries;
fQueries = 0;
}
fQueries = (TList *) mess->ReadObject(TList::Class());
}
break;
case kPROOF_RETRIEVE:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
TQueryResult *pq =
(TQueryResult *) mess->ReadObject(TQueryResult::Class());
if (pq && fPlayer) {
fPlayer->AddQueryResult(pq);
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
} else {
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_RETRIEVE: query result missing or player undefined");
}
}
break;
case kPROOF_MAXQUERIES:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
Int_t max = 0;
(*mess) >> max;
Printf("Number of queries fully kept remotely: %d", max);
}
break;
case kPROOF_SERVERSTARTED:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
UInt_t tot = 0, done = 0;
TString action;
Bool_t st = kTRUE;
(*mess) >> action >> tot >> done >> st;
if (TestBit(TProof::kIsClient)) {
if (tot) {
TString type = (action.Contains("submas")) ? "submasters"
: "workers";
Int_t frac = (Int_t) (done*100.)/tot;
char msg[512] = {0};
if (frac >= 100) {
sprintf(msg,"%s: OK (%d %s) \n",
action.Data(),tot, type.Data());
} else {
sprintf(msg,"%s: %d out of %d (%d %%)\r",
action.Data(), done, tot, frac);
}
if (fSync)
fprintf(stderr,"%s", msg);
else
NotifyLogMsg(msg, 0);
}
StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
} else {
TMessage m(kPROOF_SERVERSTARTED);
m << action << tot << done << st;
gProofServ->GetSocket()->Send(m);
}
}
break;
case kPROOF_DATASET_STATUS:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
UInt_t tot = 0, done = 0;
TString action;
Bool_t st = kTRUE;
(*mess) >> action >> tot >> done >> st;
if (TestBit(TProof::kIsClient)) {
if (tot) {
TString type = "files";
Int_t frac = (Int_t) (done*100.)/tot;
char msg[512] = {0};
if (frac >= 100) {
sprintf(msg,"%s: OK (%d %s) \n",
action.Data(),tot, type.Data());
} else {
sprintf(msg,"%s: %d out of %d (%d %%)\r",
action.Data(), done, tot, frac);
}
if (fSync)
fprintf(stderr,"%s", msg);
else
NotifyLogMsg(msg, 0);
}
DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
} else {
TMessage m(kPROOF_DATASET_STATUS);
m << action << tot << done << st;
gProofServ->GetSocket()->Send(m);
}
}
break;
case kPROOF_STARTPROCESS:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
if (!IsLite()) {
fNotIdle = 1;
fIsWaiting = kFALSE;
}
fRedirLog = (fSync) ? fRedirLog : kTRUE;
if (!TestBit(TProof::kIsMaster)) {
TString selec;
Int_t dsz = -1;
Long64_t first = -1, nent = -1;
(*mess) >> selec >> dsz >> first >> nent;
if (!gROOT->IsBatch()) {
if (fProgressDialog && !TestBit(kUsingSessionGui)) {
if (!fProgressDialogStarted) {
fProgressDialog->ExecPlugin(5, this,
selec.Data(), dsz, first, nent);
fProgressDialogStarted = kTRUE;
} else {
ResetProgressDialog(selec, dsz, first, nent);
}
}
ResetBit(kUsingSessionGui);
}
}
}
break;
case kPROOF_ENDINIT:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
if (TestBit(TProof::kIsMaster)) {
if (fPlayer)
fPlayer->SetInitTime();
}
}
break;
case kPROOF_SETIDLE:
{
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_SETIDLE: enter");
if (IsLite()) {
if (fNotIdle > 0) {
fNotIdle--;
} else {
Warning("HandleInputMessage", "got kPROOF_SETIDLE but no running workers ! protocol error?");
}
} else {
fNotIdle = 0;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> fIsWaiting;
}
}
break;
case kPROOF_QUERYSUBMITTED:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
(*mess) >> fSeqNum;
Bool_t sync = fSync;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> sync;
if (sync != fSync && fSync) {
Activate();
fSync = kFALSE;
}
DisableGoAsyn();
fIsWaiting = kTRUE;
if (!IsLite())
fNotIdle = 1;
rc = 1;
}
break;
case kPROOF_SESSIONTAG:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
TString stag;
(*mess) >> stag;
SetName(stag);
}
break;
case kPROOF_FEEDBACK:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
TList *out = (TList *) mess->ReadObject(TList::Class());
out->SetOwner();
if (fPlayer)
fPlayer->StoreFeedback(sl, out);
else
rc = 1;
}
break;
case kPROOF_AUTOBIN:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
TString name;
Double_t xmin, xmax, ymin, ymax, zmin, zmax;
(*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
if (fPlayer) fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
TMessage answ(kPROOF_AUTOBIN);
answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
s->Send(answ);
}
break;
case kPROOF_PROGRESS:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
if (GetRemoteProtocol() > 11) {
Long64_t total, processed, bytesread;
Float_t initTime, procTime, evtrti, mbrti;
(*mess) >> total >> processed >> bytesread
>> initTime >> procTime
>> evtrti >> mbrti;
if (fPlayer)
fPlayer->Progress(sl, total, processed, bytesread,
initTime, procTime, evtrti, mbrti);
} else {
Long64_t total, processed;
(*mess) >> total >> processed;
if (fPlayer)
fPlayer->Progress(sl, total, processed);
}
}
break;
case kPROOF_STOPPROCESS:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
Long64_t events = 0;
Bool_t abort = kFALSE;
TProofProgressStatus *status = 0;
if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
(*mess) >> status >> abort;
} else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
(*mess) >> events >> abort;
} else {
(*mess) >> events;
}
if (!abort && fPlayer) {
if (fProtocol > 18) {
TList *listOfMissingFiles = 0;
if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
listOfMissingFiles = new TList();
listOfMissingFiles->SetName("MissingFiles");
if (fPlayer)
fPlayer->AddOutputObject(listOfMissingFiles);
}
if (fPlayer->GetPacketizer()) {
Int_t ret =
fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
if (ret > 0)
fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
status = 0;
}
} else {
fPlayer->AddEventsProcessed(events);
}
}
SafeDelete(status);
if (!TestBit(TProof::kIsMaster))
Emit("StopProcess(Bool_t)", abort);
break;
}
case kPROOF_GETSLAVEINFO:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_GETSLAVEINFO: enter");
Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
TList* tmpinfo = 0;
(*mess) >> tmpinfo;
if (tmpinfo == 0) {
Error("HandleInputMessage","kPROOF_GETSLAVEINFO: no list received!");
} else {
tmpinfo->SetOwner(kFALSE);
Int_t nentries = tmpinfo->GetSize();
for (Int_t i=0; i<nentries; i++) {
TSlaveInfo* slinfo =
dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
if (slinfo) {
fSlaveInfo->Add(slinfo);
if (slinfo->fStatus != TSlaveInfo::kBad) {
if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
}
if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
slinfo->fMsd = sl->GetMsd();
}
}
delete tmpinfo;
rc = 1;
}
}
break;
case kPROOF_VALIDATE_DSET:
{
PDB(kGlobal,2)
Info("HandleInputMessage","kPROOF_VALIDATE_DSET: enter");
TDSet* dset = 0;
(*mess) >> dset;
if (!fDSet)
Error("HandleInputMessage","kPROOF_VALIDATE_DSET: fDSet not set");
else
fDSet->Validate(dset);
delete dset;
}
break;
case kPROOF_DATA_READY:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATA_READY: enter");
Bool_t dataready = kFALSE;
Long64_t totalbytes, bytesready;
(*mess) >> dataready >> totalbytes >> bytesready;
fTotalBytes += totalbytes;
fBytesReady += bytesready;
if (dataready == kFALSE) fDataReady = dataready;
}
break;
case kPROOF_PING:
break;
case kPROOF_MESSAGE:
{
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
TString msg;
(*mess) >> msg;
Bool_t lfeed = kTRUE;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> lfeed;
if (TestBit(TProof::kIsClient)) {
if (fSync) {
fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
} else {
NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
}
} else {
fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
if (gProofServ) {
gProofServ->FlushLogFile();
gProofServ->SendAsynMessage(msg, lfeed);
}
}
}
break;
case kPROOF_VERSARCHCOMP:
{
TString vac;
(*mess) >> vac;
PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
Int_t from = 0;
TString vers, archcomp;
if (vac.Tokenize(vers, from, "|"))
vac.Tokenize(archcomp, from, "|");
sl->SetArchCompiler(archcomp);
vers.ReplaceAll(":","|");
sl->SetROOTVersion(vers);
}
break;
default:
{
Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
(sl ? sl->GetOrdinal() : "undef"), what);
}
break;
}
if (delete_mess)
delete mess;
return rc;
}
void TProof::UpdateDialog()
{
if (!fPlayer) return;
if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
if (fSync)
Info("UpdateDialog",
"processing was aborted - %lld events processed",
fPlayer->GetEventsProcessed());
if (GetRemoteProtocol() > 11) {
Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
} else {
Progress(-1, fPlayer->GetEventsProcessed());
}
Emit("StopProcess(Bool_t)", kTRUE);
}
if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kStopped) {
if (fSync)
Info("UpdateDialog",
"processing was stopped - %lld events processed",
fPlayer->GetEventsProcessed());
if (GetRemoteProtocol() > 11) {
Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
} else {
Progress(-1, fPlayer->GetEventsProcessed());
}
Emit("StopProcess(Bool_t)", kFALSE);
}
if (GetRemoteProtocol() > 11) {
EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
} else {
EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
}
}
void TProof::ActivateAsyncInput()
{
TIter next(fSlaves);
TSlave *sl;
while ((sl = (TSlave*) next()))
if (sl->GetInputHandler())
sl->GetInputHandler()->Add();
}
void TProof::DeActivateAsyncInput()
{
TIter next(fSlaves);
TSlave *sl;
while ((sl = (TSlave*) next()))
if (sl->GetInputHandler())
sl->GetInputHandler()->Remove();
}
void TProof::MarkBad(TSlave *wrk, const char *reason)
{
R__LOCKGUARD2(fCloseMutex);
if (!IsValid()) return;
if (!wrk) {
Error("MarkBad", "worker instance undefined: protocol error? ");
return;
}
static TString thisurl;
if (thisurl.IsNull()) {
if (IsMaster()) {
Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
thisurl = (port > 0) ? Form("%s:%d", TUrl(gSystem->HostName()).GetHostFQDN(), port)
: TUrl(gSystem->HostName()).GetHostFQDN();
} else {
thisurl = Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
}
}
if (!reason || strcmp(reason, kPROOF_TerminateWorker)) {
const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
TString msg(Form("\n +++ Message from %s : ", src.Data()));
msg += Form("marking %s:%d (%s) as bad\n +++ Reason: %s",
wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
(reason && strlen(reason)) ? reason : "unknown");
Info("MarkBad", "%s", msg.Data());
if (gProofServ) {
msg += Form("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
} else {
msg = Form("\n\n +++ Most likely your code crashed\n");
}
msg += Form(" +++ Please check the session logs for error messages either using\n");
msg += Form(" +++ the 'Show logs' button or executing\n");
msg += Form(" +++\n");
if (gProofServ) {
msg += Form(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->Display(\"%s\",0)\n\n",
thisurl.Data(), wrk->GetOrdinal());
gProofServ->SendAsynMessage(msg, kTRUE);
} else {
msg += Form(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->Display(\"*\")\n\n",
thisurl.Data());
Printf("%s", msg.Data());
}
} else if (reason) {
if (gDebug > 0) {
Info("MarkBad", "worker %s at %s:%d asked to terminate",
wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
}
}
if (IsMaster() && reason) {
if (strcmp(reason, kPROOF_TerminateWorker)) {
TList *listOfMissingFiles = 0;
if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
listOfMissingFiles = new TList();
listOfMissingFiles->SetName("MissingFiles");
if (fPlayer)
fPlayer->AddOutputObject(listOfMissingFiles);
}
TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
if (packetizer) {
packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
}
} else {
if (gProofServ) {
TString ord(wrk->GetOrdinal());
Int_t id = ord.Last('.');
if (id != kNPOS) ord.Remove(0, id+1);
gProofServ->ReleaseWorker(ord.Data());
}
}
}
fActiveSlaves->Remove(wrk);
FindUniqueSlaves();
fAllMonitor->Remove(wrk->GetSocket());
fActiveMonitor->Remove(wrk->GetSocket());
fSendGroupView = kTRUE;
if (IsMaster()) {
if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
fSlaves->Remove(wrk);
fBadSlaves->Remove(wrk);
fActiveSlaves->Remove(wrk);
fInactiveSlaves->Remove(wrk);
fUniqueSlaves->Remove(wrk);
fAllUniqueSlaves->Remove(wrk);
fNonUniqueMasters->Remove(wrk);
delete wrk;
} else {
fBadSlaves->Add(wrk);
wrk->Close();
}
SaveWorkerInfo();
} else {
fSlaves->Remove(wrk);
if (fManager)
fManager->DiscardSession(this);
}
}
void TProof::MarkBad(TSocket *s, const char *reason)
{
R__LOCKGUARD2(fCloseMutex);
if (!IsValid()) return;
TSlave *wrk = FindSlave(s);
MarkBad(wrk, reason);
}
void TProof::TerminateWorker(TSlave *wrk)
{
if (!wrk) {
Warning("TerminateWorker", "worker instance undefined: protocol error? ");
return;
}
if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
TMessage mess(kPROOF_STOP);
wrk->GetSocket()->Send(mess);
} else {
if (gDebug > 0)
Info("TerminateWorker", "connection to worker is already down: cannot"
" send termination message");
}
MarkBad(wrk, kPROOF_TerminateWorker);
}
void TProof::TerminateWorker(const char *ord)
{
if (ord && strlen(ord) > 0) {
Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
if (IsMaster()) {
TIter nxw(fSlaves);
TSlave *wrk = 0;
while ((wrk = (TSlave *)nxw())) {
if (all || !strcmp(wrk->GetOrdinal(), ord)) {
TerminateWorker(wrk);
if (!all) break;
}
}
} else {
TMessage mess(kPROOF_STOP);
mess << TString(ord);
Broadcast(mess);
}
}
}
Int_t TProof::Ping()
{
return Ping(kActive);
}
Int_t TProof::Ping(ESlaves list)
{
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
if (slaves->GetSize() == 0) return 0;
int nsent = 0;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (sl->Ping() == -1) {
MarkBad(sl, "ping unsuccessful");
} else {
nsent++;
}
}
}
return nsent;
}
void TProof::Touch()
{
TList *slaves = fSlaves;
if (slaves->GetSize() == 0) return;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
sl->Touch();
}
}
return;
}
void TProof::Print(Option_t *option) const
{
TString secCont;
if (TestBit(TProof::kIsClient)) {
Printf("Connected to: %s (%s)", GetMaster(),
IsValid() ? "valid" : "invalid");
Printf("Port number: %d", GetPort());
Printf("User: %s", GetUser());
if (gROOT->GetSvnRevision() > 0)
Printf("ROOT version|rev: %s|r%d", gROOT->GetVersion(), gROOT->GetSvnRevision());
else
Printf("ROOT version: %s", gROOT->GetVersion());
Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
gSystem->GetBuildCompilerVersion());
TSlave *sl = (TSlave *)fActiveSlaves->First();
if (sl) {
TString sc;
if (sl->GetSocket()->GetSecContext())
Printf("Security context: %s",
sl->GetSocket()->GetSecContext()->AsString(sc));
Printf("Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
} else {
Printf("Security context: Error - No connection");
Printf("Proofd protocol version: Error - No connection");
}
Printf("Client protocol version: %d", GetClientProtocol());
Printf("Remote protocol version: %d", GetRemoteProtocol());
Printf("Log level: %d", GetLogLevel());
Printf("Session unique tag: %s", IsValid() ? GetSessionTag() : "");
Printf("Default data pool: %s", IsValid() ? GetDataPoolUrl() : "");
if (IsValid())
const_cast<TProof*>(this)->SendPrint(option);
} else {
const_cast<TProof*>(this)->AskStatistics();
if (IsParallel())
Printf("*** Master server %s (parallel mode, %d workers):",
gProofServ->GetOrdinal(), GetParallel());
else
Printf("*** Master server %s (sequential mode):",
gProofServ->GetOrdinal());
Printf("Master host name: %s", gSystem->HostName());
Printf("Port number: %d", GetPort());
if (strlen(gProofServ->GetGroup()) > 0) {
Printf("User/Group: %s/%s", GetUser(), gProofServ->GetGroup());
} else {
Printf("User: %s", GetUser());
}
TString ver(gROOT->GetVersion());
if (gROOT->GetSvnRevision() > 0)
ver += Form("|r%d", gROOT->GetSvnRevision());
if (gSystem->Getenv("ROOTVERSIONTAG"))
ver += Form("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
Printf("ROOT version|rev|tag: %s", ver.Data());
Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
gSystem->GetBuildCompilerVersion());
Printf("Protocol version: %d", GetClientProtocol());
Printf("Image name: %s", GetImage());
Printf("Working directory: %s", gSystem->WorkingDirectory());
Printf("Config directory: %s", GetConfDir());
Printf("Config file: %s", GetConfFile());
Printf("Log level: %d", GetLogLevel());
Printf("Number of workers: %d", GetNumberOfSlaves());
Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
Printf("Total real time used (s): %.3f", GetRealTime());
Printf("Total CPU time used (s): %.3f", GetCpuTime());
if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
Printf("List of workers:");
TList masters;
TIter nextslave(fSlaves);
while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
if (!sl->IsValid()) continue;
if (sl->GetSlaveType() == TSlave::kSlave) {
sl->Print(option);
} else if (sl->GetSlaveType() == TSlave::kMaster) {
TMessage mess(kPROOF_PRINT);
mess.WriteString(option);
if (sl->GetSocket()->Send(mess) == -1)
const_cast<TProof*>(this)->MarkBad(sl, "could not send kPROOF_PRINT request");
else
masters.Add(sl);
} else {
Error("Print", "TSlave is neither Master nor Worker");
R__ASSERT(0);
}
}
const_cast<TProof*>(this)->Collect(&masters, fCollectTimeout);
}
}
}
Long64_t TProof::Process(TDSet *dset, const char *selector, Option_t *option,
Long64_t nentries, Long64_t first)
{
if (!IsValid() || !fPlayer) return -1;
SetRunStatus(TProof::kRunning);
fSync = (GetQueryMode(option) == kSync);
TString opt(option);
if (fSync && (!IsIdle() || IsWaiting())) {
Info("Process", "session is in waiting or processing status: switch to asynchronous mode");
fSync = kFALSE;
opt.ReplaceAll("SYNC","");
opt += "ASYN";
}
if ((IsIdle() && !IsWaiting()) && fRunningDSets && fRunningDSets->GetSize() > 0) {
fRunningDSets->SetOwner(kTRUE);
fRunningDSets->Delete();
}
TSignalHandler *sh = 0;
if (fSync) {
if (gApplication)
sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
}
Long64_t rv = fPlayer->Process(dset, selector, opt.Data(), nentries, first);
if (fSync) {
if (sh)
gSystem->AddSignalHandler(sh);
}
return rv;
}
Long64_t TProof::Process(TFileCollection *fc, const char *selector,
Option_t *option, Long64_t nentries, Long64_t first)
{
if (!IsValid() || !fPlayer) return -1;
if (fProtocol < 17) {
Info("Process", "server version < 5.18/00:"
" processing of TFileCollection not supported");
return -1;
}
TDSet *dset = new TDSet(Form("TFileCollection:%s", fc->GetName()), 0, 0, "");
fPlayer->AddInput(fc);
Long64_t retval = Process(dset, selector, option, nentries, first);
fPlayer->GetInputList()->Remove(fc);
if (IsLite() && !fSync) {
if (!fRunningDSets) fRunningDSets = new TList;
fRunningDSets->Add(dset);
} else {
delete dset;
}
return retval;
}
Long64_t TProof::Process(const char *dsetname, const char *selector,
Option_t *option, Long64_t nentries,
Long64_t first, TObject *enl)
{
if (fProtocol < 13) {
Info("Process", "processing 'by name' not supported by the server");
return -1;
}
TString name(dsetname);
TString obj;
TString dir = "/";
Int_t idxc = name.Index("#");
if (idxc != kNPOS) {
Int_t idxs = name.Index("/", 1, idxc, TString::kExact);
if (idxs != kNPOS) {
obj = name(idxs+1, name.Length());
dir = name(idxc+1, name.Length());
dir.Remove(dir.Index("/") + 1);
name.Remove(idxc);
} else {
obj = name(idxc+1, name.Length());
name.Remove(idxc);
}
} else if (name.Index(":") != kNPOS && name.Index("://") == kNPOS) {
Error("Process", "bad name syntax (%s): please use"
" a '#' after the dataset name", dsetname);
return -1;
}
TDSet *dset = new TDSet(name, obj, dir);
dset->SetEntryList(enl);
Long64_t retval = Process(dset, selector, option, nentries, first);
if (IsLite() && !fSync) {
if (!fRunningDSets) fRunningDSets = new TList;
fRunningDSets->Add(dset);
} else {
delete dset;
}
return retval;
}
Long64_t TProof::Process(const char *selector, Long64_t n, Option_t *option)
{
if (!IsValid()) return -1;
if (fProtocol < 16) {
Info("Process", "server version < 5.17/04: generic processing not supported");
return -1;
}
TDSet *dset = new TDSet;
dset->SetBit(TDSet::kEmpty);
Long64_t retval = Process(dset, selector, option, n);
if (IsLite() && !fSync) {
if (!fRunningDSets) fRunningDSets = new TList;
fRunningDSets->Add(dset);
} else {
delete dset;
}
return retval;
}
Int_t TProof::GetQueryReference(Int_t qry, TString &ref)
{
ref = "";
if (qry > 0) {
if (!fQueries)
GetListOfQueries();
if (fQueries) {
TIter nxq(fQueries);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq()))
if (qr->GetSeqNum() == qry) {
ref = Form("%s:%s", qr->GetTitle(), qr->GetName());
return 0;
}
}
}
return -1;
}
Long64_t TProof::Finalize(Int_t qry, Bool_t force)
{
if (fPlayer) {
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0) {
return Finalize(ref, force);
} else {
Info("Finalize", "query #%d not found", qry);
}
} else {
return Finalize("", force);
}
}
return -1;
}
Long64_t TProof::Finalize(const char *ref, Bool_t force)
{
if (fPlayer) {
TQueryResult *qr = (ref && strlen(ref) > 0) ? fPlayer->GetQueryResult(ref)
: GetQueryResult();
Bool_t retrieve = kFALSE;
TString xref(ref);
if (!qr) {
if (!xref.IsNull()) {
retrieve = kTRUE;
}
} else {
if (qr->IsFinalized()) {
if (force) {
retrieve = kTRUE;
} else {
Info("Finalize","query already finalized:"
" use Finalize(<qry>,kTRUE) to force new retrieval");
qr = 0;
}
} else {
retrieve = kTRUE;
xref.Form("%s:%s", qr->GetTitle(), qr->GetName());
}
}
if (retrieve) {
Retrieve(xref.Data());
qr = fPlayer->GetQueryResult(xref.Data());
}
if (qr)
return fPlayer->Finalize(qr);
}
return -1;
}
Int_t TProof::Retrieve(Int_t qry, const char *path)
{
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0)
return Retrieve(ref, path);
else
Info("Retrieve", "query #%d not found", qry);
} else {
Info("Retrieve","positive argument required - do nothing");
}
return -1;
}
Int_t TProof::Retrieve(const char *ref, const char *path)
{
if (ref) {
TMessage m(kPROOF_RETRIEVE);
m << TString(ref);
Broadcast(m, kActive);
Collect(kActive, fCollectTimeout);
if (path) {
TQueryResult *qr = fPlayer ? fPlayer->GetQueryResult(ref) : 0;
if (qr) {
TFile *farc = TFile::Open(path,"UPDATE");
if (!(farc->IsOpen())) {
Info("Retrieve", "archive file cannot be open (%s)", path);
return 0;
}
farc->cd();
qr->SetArchived(path);
qr->Write();
farc->Close();
SafeDelete(farc);
} else {
Info("Retrieve", "query not found after retrieve");
return -1;
}
}
return 0;
}
return -1;
}
Int_t TProof::Remove(Int_t qry, Bool_t all)
{
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0)
return Remove(ref, all);
else
Info("Remove", "query #%d not found", qry);
} else {
Info("Remove","positive argument required - do nothing");
}
return -1;
}
Int_t TProof::Remove(const char *ref, Bool_t all)
{
if (all) {
if (fPlayer)
fPlayer->RemoveQueryResult(ref);
}
if (IsLite()) return 0;
if (ref) {
TMessage m(kPROOF_REMOVE);
m << TString(ref);
Broadcast(m, kActive);
Collect(kActive, fCollectTimeout);
return 0;
}
return -1;
}
Int_t TProof::Archive(Int_t qry, const char *path)
{
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0)
return Archive(ref, path);
else
Info("Archive", "query #%d not found", qry);
} else {
Info("Archive","positive argument required - do nothing");
}
return -1;
}
Int_t TProof::Archive(const char *ref, const char *path)
{
if (ref) {
TMessage m(kPROOF_ARCHIVE);
m << TString(ref) << TString(path);
Broadcast(m, kActive);
Collect(kActive, fCollectTimeout);
return 0;
}
return -1;
}
Int_t TProof::CleanupSession(const char *sessiontag)
{
if (sessiontag) {
TMessage m(kPROOF_CLEANUPSESSION);
m << TString(sessiontag);
Broadcast(m, kActive);
Collect(kActive, fCollectTimeout);
return 0;
}
return -1;
}
void TProof::SetQueryMode(EQueryMode mode)
{
fQueryMode = mode;
if (gDebug > 0)
Info("SetQueryMode","query mode is set to: %s", fQueryMode == kSync ?
"Sync" : "Async");
}
TProof::EQueryMode TProof::GetQueryMode(Option_t *mode) const
{
EQueryMode qmode = fQueryMode;
if (mode && (strlen(mode) > 0)) {
TString m(mode);
m.ToUpper();
if (m.Contains("ASYN")) {
qmode = kAsync;
} else if (m.Contains("SYNC")) {
qmode = kSync;
}
}
if (gDebug > 0)
Info("GetQueryMode","query mode is set to: %s", qmode == kSync ?
"Sync" : "Async");
return qmode;
}
Long64_t TProof::DrawSelect(TDSet *dset, const char *varexp,
const char *selection, Option_t *option,
Long64_t nentries, Long64_t first)
{
if (!IsValid() || !fPlayer) return -1;
if (!IsIdle()) {
Info("DrawSelect","not idle, asynchronous Draw not supported");
return -1;
}
TString opt(option);
Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
if (idx != kNPOS)
opt.Replace(idx,4,"");
return fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
}
Long64_t TProof::DrawSelect(const char *dsetname, const char *varexp,
const char *selection, Option_t *option,
Long64_t nentries, Long64_t first, TObject *enl)
{
if (fProtocol < 13) {
Info("Process", "processing 'by name' not supported by the server");
return -1;
}
TString name(dsetname);
TString obj;
TString dir = "/";
Int_t idxc = name.Index("#");
if (idxc != kNPOS) {
Int_t idxs = name.Index("/", 1, idxc, TString::kExact);
if (idxs != kNPOS && idxc != kNPOS) {
obj = name(idxs+1, name.Length());
dir = name(idxc+1, name.Length());
dir.Remove(dir.Index("/") + 1);
name.Remove(idxc);
} else if (idxc != kNPOS && idxs == kNPOS) {
obj = name(idxc+1, name.Length());
name.Remove(idxc);
} else if (idxs != kNPOS && idxc == kNPOS) {
Error("DrawSelect", "bad name syntax (%s): specification of additional"
" attributes needs a '#' after the dataset name", dsetname);
return -1;
}
} else if (name.Index(":") != kNPOS && name.Index("://") == kNPOS) {
Error("DrawSelect", "bad name syntax (%s): please use"
" a '#' after the dataset name", dsetname);
return -1;
}
TDSet *dset = new TDSet(name, obj, dir);
dset->SetEntryList(enl);
Long64_t retval = DrawSelect(dset, varexp, selection, option, nentries, first);
delete dset;
return retval;
}
void TProof::StopProcess(Bool_t abort, Int_t timeout)
{
PDB(kGlobal,2)
Info("StopProcess","enter %d", abort);
if (!IsValid())
return;
ERunStatus rst = abort ? TProof::kAborted : TProof::kStopped;
SetRunStatus(rst);
if (fPlayer)
fPlayer->StopProcess(abort, timeout);
if (TestBit(TProof::kIsClient) || abort)
InterruptCurrentMonitor();
if (fSlaves->GetSize() == 0)
return;
TSlave *sl;
TIter next(fSlaves);
while ((sl = (TSlave *)next()))
if (sl->IsValid())
sl->StopProcess(abort, timeout);
}
void TProof::DisableGoAsyn()
{
Emit("DisableGoAsyn()");
}
void TProof::GoAsynchronous()
{
if (!IsValid()) return;
if (GetRemoteProtocol() < 22) {
Info("GoAsynchronous", "functionality not supported by the server - ignoring");
return;
}
if (fSync && !IsIdle()) {
TMessage m(kPROOF_GOASYNC);
Broadcast(m);
} else {
Info("GoAsynchronous", "either idle or already in asynchronous mode - ignoring");
}
}
void TProof::RecvLogFile(TSocket *s, Int_t size)
{
const Int_t kMAXBUF = 16384;
char buf[kMAXBUF];
Int_t fdout = -1;
if (!fLogToWindowOnly) {
fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
if (fdout < 0) {
Warning("RecvLogFile", "file descriptor for outputs undefined (%d):"
" will not log msgs", fdout);
return;
}
lseek(fdout, (off_t) 0, SEEK_END);
}
Int_t left, rec, r;
Long_t filesize = 0;
while (filesize < size) {
left = Int_t(size - filesize);
if (left > kMAXBUF)
left = kMAXBUF;
rec = s->RecvRaw(&buf, left);
filesize = (rec > 0) ? (filesize + rec) : filesize;
if (!fLogToWindowOnly) {
if (rec > 0) {
char *p = buf;
r = rec;
while (r) {
Int_t w;
w = write(fdout, p, r);
if (w < 0) {
SysError("RecvLogFile", "error writing to unit: %d", fdout);
break;
}
r -= w;
p += w;
}
} else if (rec < 0) {
Error("RecvLogFile", "error during receiving log file");
break;
}
}
if (rec > 0) {
buf[rec] = 0;
EmitVA("LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
}
}
if (fRedirLog && IsIdle() && !TestBit(TProof::kIsMaster))
fRedirLog = kFALSE;
}
void TProof::NotifyLogMsg(const char *msg, const char *sfx)
{
Int_t len = 0;
if (!msg || (len = strlen(msg)) <= 0)
return;
Int_t lsfx = (sfx) ? strlen(sfx) : 0;
Int_t fdout = -1;
if (!fLogToWindowOnly) {
fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
if (fdout < 0) {
Warning("NotifyLogMsg", "file descriptor for outputs undefined (%d):"
" will not notify msgs", fdout);
return;
}
lseek(fdout, (off_t) 0, SEEK_END);
}
if (!fLogToWindowOnly) {
if (len > 0) {
char *p = (char *)msg;
Int_t r = len;
while (r) {
Int_t w = write(fdout, p, r);
if (w < 0) {
SysError("NotifyLogMsg", "error writing to unit: %d", fdout);
break;
}
r -= w;
p += w;
}
if (lsfx > 0)
if (write(fdout, sfx, lsfx) != lsfx)
SysError("NotifyLogMsg", "error writing to unit: %d", fdout);
}
}
if (len > 0) {
EmitVA("LogMessage(const char*,Bool_t)", 2, msg, kFALSE);
}
if (fRedirLog && IsIdle())
fRedirLog = kFALSE;
}
void TProof::LogMessage(const char *msg, Bool_t all)
{
PDB(kGlobal,1)
Info("LogMessage","Enter ... %s, 'all: %s", msg ? msg : "",
all ? "true" : "false");
if (gROOT->IsBatch()) {
PDB(kGlobal,1) Info("LogMessage","GUI not started - use TProof::ShowLog()");
return;
}
if (msg)
EmitVA("LogMessage(const char*,Bool_t)", 2, msg, all);
if (all)
lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
Int_t len;
do {
while ((len = read(fileno(fLogFileR), buf, kMAXBUF-1)) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
Error("LogMessage", "error reading log file");
break;
}
if (len > 0) {
buf[len] = 0;
EmitVA("LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
}
} while (len > 0);
}
Int_t TProof::SendGroupView()
{
if (!IsValid()) return -1;
if (TestBit(TProof::kIsClient)) return 0;
if (!fSendGroupView) return 0;
fSendGroupView = kFALSE;
TIter next(fActiveSlaves);
TSlave *sl;
int bad = 0, cnt = 0, size = GetNumberOfActiveSlaves();
char str[32];
while ((sl = (TSlave *)next())) {
sprintf(str, "%d %d", cnt, size);
if (sl->GetSocket()->Send(str, kPROOF_GROUPVIEW) == -1) {
MarkBad(sl, "could not send kPROOF_GROUPVIEW message");
bad++;
} else
cnt++;
}
if (bad) SendGroupView();
return GetNumberOfActiveSlaves();
}
Bool_t TProof::GetFileInCmd(const char *cmd, TString &fn)
{
TString s = cmd;
s = s.Strip(TString::kBoth);
if (s.Length() > 0 &&
(s.BeginsWith(".L") || s.BeginsWith(".x") || s.BeginsWith(".X"))) {
TString file = s(2, s.Length());
TString acm, arg, io;
fn = gSystem->SplitAclicMode(file, acm, arg, io);
if (!fn.IsNull())
return kTRUE;
}
return kFALSE;
}
Int_t TProof::Exec(const char *cmd, Bool_t plusMaster)
{
return Exec(cmd, kActive, plusMaster);
}
Int_t TProof::Exec(const char *cmd, ESlaves list, Bool_t plusMaster)
{
if (!IsValid()) return -1;
TString s = cmd;
s = s.Strip(TString::kBoth);
if (!s.Length()) return 0;
TString filename;
if (TProof::GetFileInCmd(s.Data(), filename)) {
char *fn = gSystem->Which(TROOT::GetMacroPath(), filename, kReadPermission);
if (fn) {
if (GetNumberOfUniqueSlaves() > 0) {
if (SendFile(fn, kAscii | kForward | kCpBin) < 0) {
Error("Exec", "file %s could not be transfered", fn);
delete [] fn;
return -1;
}
} else {
TString scmd = s(0,3) + fn;
Int_t n = SendCommand(scmd, list);
delete [] fn;
return n;
}
} else {
Error("Exec", "macro %s not found", filename.Data());
return -1;
}
delete [] fn;
}
if (plusMaster) {
if (IsLite()) {
gROOT->ProcessLine(cmd);
} else {
Int_t n = GetParallel();
SetParallelSilent(0);
Int_t res = SendCommand(cmd, list);
SetParallelSilent(n);
if (res < 0)
return res;
}
}
return SendCommand(cmd, list);
}
Int_t TProof::SendCommand(const char *cmd, ESlaves list)
{
if (!IsValid()) return -1;
Broadcast(cmd, kMESS_CINT, list);
Collect(list);
return fStatus;
}
Int_t TProof::SendCurrentState(ESlaves list)
{
if (!IsValid()) return -1;
Broadcast(gDirectory->GetPath(), kPROOF_RESET, list);
return GetParallel();
}
Int_t TProof::SendInitialState()
{
if (!IsValid()) return -1;
SetLogLevel(fLogLevel, gProofDebugMask);
return GetNumberOfActiveSlaves();
}
Bool_t TProof::CheckFile(const char *file, TSlave *slave, Long_t modtime, Int_t cpopt)
{
Bool_t sendto = kFALSE;
TString sn = slave->GetName();
sn += ":";
sn += slave->GetOrdinal();
sn += ":";
sn += gSystem->BaseName(file);
FileMap_t::const_iterator it;
if ((it = fFileMap.find(sn)) != fFileMap.end()) {
MD5Mod_t md = (*it).second;
if (md.fModtime != modtime) {
TMD5 *md5 = TMD5::FileChecksum(file);
if (md5) {
if ((*md5) != md.fMD5) {
sendto = kTRUE;
md.fMD5 = *md5;
md.fModtime = modtime;
fFileMap[sn] = md;
if (TestBit(TProof::kIsMaster)) {
sendto = kFALSE;
TMessage mess(kPROOF_CHECKFILE);
mess << TString(gSystem->BaseName(file)) << md.fMD5 << cpopt;
slave->GetSocket()->Send(mess);
fCheckFileStatus = 0;
Collect(slave, fCollectTimeout, kPROOF_CHECKFILE);
sendto = (fCheckFileStatus == 0) ? kTRUE : kFALSE;
}
}
delete md5;
} else {
Error("CheckFile", "could not calculate local MD5 check sum - dont send");
return kFALSE;
}
}
} else {
TMD5 *md5 = TMD5::FileChecksum(file);
MD5Mod_t md;
if (md5) {
md.fMD5 = *md5;
md.fModtime = modtime;
fFileMap[sn] = md;
delete md5;
} else {
Error("CheckFile", "could not calculate local MD5 check sum - dont send");
return kFALSE;
}
TMessage mess(kPROOF_CHECKFILE);
mess << TString(gSystem->BaseName(file)) << md.fMD5 << cpopt;
slave->GetSocket()->Send(mess);
fCheckFileStatus = 0;
Collect(slave, fCollectTimeout, kPROOF_CHECKFILE);
sendto = (fCheckFileStatus == 0) ? kTRUE : kFALSE;
}
return sendto;
}
Int_t TProof::SendFile(const char *file, Int_t opt, const char *rfile, TSlave *wrk)
{
if (!IsValid()) return -1;
TList *slaves = (rfile && !strcmp(rfile, "cache")) ? fUniqueSlaves : fActiveSlaves;
if (wrk) {
slaves = new TList();
slaves->Add(wrk);
}
if (slaves->GetSize() == 0) return 0;
#ifndef R__WIN32
Int_t fd = open(file, O_RDONLY);
#else
Int_t fd = open(file, O_RDONLY | O_BINARY);
#endif
if (fd < 0) {
SysError("SendFile", "cannot open file %s", file);
return -1;
}
Long64_t size;
Long_t id, flags, modtime;
if (gSystem->GetPathInfo(file, &id, &size, &flags, &modtime) == 1) {
Error("SendFile", "cannot stat file %s", file);
return -1;
}
if (size == 0) {
Error("SendFile", "empty file %s", file);
return -1;
}
Bool_t bin = (opt & kBinary) ? kTRUE : kFALSE;
Bool_t force = (opt & kForce) ? kTRUE : kFALSE;
Bool_t fw = (opt & kForward) ? kTRUE : kFALSE;
Int_t cpopt = 0;
if ((opt & kCp)) cpopt |= kCp;
if ((opt & kCpBin)) cpopt |= (kCp | kCpBin);
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
Int_t nsl = 0;
TIter next(slaves);
TSlave *sl;
TString fnam(rfile);
if (fnam == "cache") {
fnam += Form(":%s", gSystem->BaseName(file));
} else if (fnam.IsNull()) {
fnam = gSystem->BaseName(file);
}
TList wsent;
while ((sl = (TSlave *)next())) {
if (!sl->IsValid())
continue;
Bool_t sendto = force ? kTRUE : CheckFile(file, sl, modtime, cpopt);
PDB(kPackage,2) {
const char *snd = (sl->fSlaveType == TSlave::kSlave && sendto) ? "" : "not";
Info("SendFile", "%s sending file %s to: %s:%s (%d)", snd,
file, sl->GetName(), sl->GetOrdinal(), sendto);
}
if (sl->fSlaveType == TSlave::kSlave && !sendto)
continue;
Long64_t siz = sendto ? size : 0;
sprintf(buf, "%s %d %lld %d", fnam.Data(), bin, siz, fw);
if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
MarkBad(sl, "could not send kPROOF_SENDFILE request");
continue;
}
wsent.Add(sl);
if (sendto) {
lseek(fd, 0, SEEK_SET);
Int_t len;
do {
while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendFile", "error reading from file %s", file);
Interrupt(kSoftInterrupt, kActive);
close(fd);
return -1;
}
if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
SysError("SendFile", "error writing to slave %s:%s (now offline)",
sl->GetName(), sl->GetOrdinal());
MarkBad(sl, "sendraw failure");
break;
}
} while (len > 0);
nsl++;
}
Collect(sl, fCollectTimeout, kPROOF_SENDFILE);
}
close(fd);
if (slaves != fActiveSlaves && slaves != fUniqueSlaves)
SafeDelete(slaves);
return nsl;
}
Int_t TProof::SendObject(const TObject *obj, ESlaves list)
{
if (!IsValid() || !obj) return -1;
TMessage mess(kMESS_OBJECT);
mess.WriteObject(obj);
return Broadcast(mess, list);
}
Int_t TProof::SendPrint(Option_t *option)
{
if (!IsValid()) return -1;
Broadcast(option, kPROOF_PRINT, kActive);
return Collect(kActive, fCollectTimeout);
}
void TProof::SetLogLevel(Int_t level, UInt_t mask)
{
char str[32];
fLogLevel = level;
gProofDebugLevel = level;
gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
sprintf(str, "%d %u", level, mask);
Broadcast(str, kPROOF_LOGLEVEL, kAll);
}
void TProof::SetRealTimeLog(Bool_t on)
{
if (IsValid()) {
TMessage mess(kPROOF_REALTIMELOG);
mess << on;
Broadcast(mess);
} else {
Warning("SetRealTimeLog","session is invalid - do nothing");
}
}
Int_t TProof::SetParallelSilent(Int_t nodes, Bool_t random)
{
if (!IsValid()) return -1;
if (TestBit(TProof::kIsMaster)) {
GoParallel(nodes, kFALSE, random);
return SendCurrentState();
} else {
PDB(kGlobal,1) Info("SetParallelSilent", "request %d node%s", nodes,
nodes == 1 ? "" : "s");
TMessage mess(kPROOF_PARALLEL);
mess << nodes << random;
Broadcast(mess);
Collect(kActive, fCollectTimeout);
Int_t n = GetParallel();
PDB(kGlobal,1) Info("SetParallelSilent", "got %d node%s", n, n == 1 ? "" : "s");
return n;
}
}
Int_t TProof::SetParallel(Int_t nodes, Bool_t random)
{
Int_t n = SetParallelSilent(nodes, random);
if (TestBit(TProof::kIsClient)) {
if (n < 1) {
Printf("PROOF set to sequential mode");
} else {
TString subfix = (n == 1) ? "" : "s";
if (random)
subfix += ", randomly selected";
Printf("PROOF set to parallel mode (%d worker%s)", n, subfix.Data());
}
}
return n;
}
Int_t TProof::GoParallel(Int_t nodes, Bool_t attach, Bool_t random)
{
if (!IsValid()) return -1;
if (nodes < 0) nodes = 0;
fActiveSlaves->Clear();
fActiveMonitor->RemoveAll();
TSlave *sl = 0;
TList *wlst = new TList;
TIter nxt(fSlaves);
fInactiveSlaves->Clear();
while ((sl = (TSlave *)nxt())) {
if (sl->IsValid() && !fBadSlaves->FindObject(sl)) {
if (strcmp("IGNORE", sl->GetImage()) == 0) continue;
if ((sl->GetSlaveType() != TSlave::kSlave) &&
(sl->GetSlaveType() != TSlave::kMaster)) {
Error("GoParallel", "TSlave is neither Master nor Slave");
R__ASSERT(0);
}
wlst->Add(sl);
fInactiveSlaves->Add(sl);
sl->SetStatus(TSlave::kInactive);
}
}
Int_t nwrks = (nodes > wlst->GetSize()) ? wlst->GetSize() : nodes;
int cnt = 0;
fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
while (cnt < nwrks) {
if (random) {
Int_t iwrk = (Int_t) (gRandom->Rndm() * wlst->GetSize());
sl = (TSlave *) wlst->At(iwrk);
} else {
sl = (TSlave *) wlst->First();
}
if (!sl) {
Error("GoParallel", "attaching to candidate!");
break;
}
Int_t slavenodes = 0;
if (sl->GetSlaveType() == TSlave::kSlave) {
sl->SetStatus(TSlave::kActive);
fActiveSlaves->Add(sl);
fInactiveSlaves->Remove(sl);
fActiveMonitor->Add(sl->GetSocket());
slavenodes = 1;
} else if (sl->GetSlaveType() == TSlave::kMaster) {
fEndMaster = kFALSE;
TMessage mess(kPROOF_PARALLEL);
if (!attach) {
mess << nodes-cnt;
} else {
mess.SetWhat(kPROOF_LOGFILE);
mess << -1 << -1;
}
if (sl->GetSocket()->Send(mess) == -1) {
MarkBad(sl, "could not send kPROOF_PARALLEL or kPROOF_LOGFILE request");
slavenodes = 0;
} else {
Collect(sl, fCollectTimeout);
if (sl->IsValid()) {
sl->SetStatus(TSlave::kActive);
fActiveSlaves->Add(sl);
fInactiveSlaves->Remove(sl);
fActiveMonitor->Add(sl->GetSocket());
if (sl->GetParallel() > 0) {
slavenodes = sl->GetParallel();
} else {
slavenodes = 0;
}
} else {
MarkBad(sl, "collect failed after kPROOF_PARALLEL or kPROOF_LOGFILE request");
slavenodes = 0;
}
}
}
wlst->Remove(sl);
cnt += 1;
}
wlst->SetOwner(0);
SafeDelete(wlst);
AskStatistics();
FindUniqueSlaves();
if (!attach)
SendGroupView();
Int_t n = GetParallel();
if (TestBit(TProof::kIsClient)) {
if (n < 1)
printf("PROOF set to sequential mode\n");
else
printf("PROOF set to parallel mode (%d worker%s)\n",
n, n == 1 ? "" : "s");
}
PDB(kGlobal,1) Info("GoParallel", "got %d node%s", n, n == 1 ? "" : "s");
return n;
}
void TProof::ShowCache(Bool_t all)
{
if (!IsValid()) return;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kShowCache) << all;
Broadcast(mess, kUnique);
if (all) {
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kShowSubCache) << all;
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique, fCollectTimeout);
} else {
Collect(kUnique, fCollectTimeout);
}
}
void TProof::ClearCache(const char *file)
{
if (!IsValid()) return;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kClearCache) << TString(file);
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kClearSubCache) << TString(file);
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
fFileMap.clear();
}
void TProof::ShowPackages(Bool_t all)
{
if (!IsValid()) return;
if (TestBit(TProof::kIsClient)) {
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
while ((nm = (TNamed *)nxd())) {
printf("*** Global Package cache %s client:%s ***\n",
nm->GetName(), nm->GetTitle());
fflush(stdout);
gSystem->Exec(Form("%s %s", kLS, nm->GetTitle()));
printf("\n");
fflush(stdout);
}
}
printf("*** Package cache client:%s ***\n", fPackageDir.Data());
fflush(stdout);
gSystem->Exec(Form("%s %s", kLS, fPackageDir.Data()));
}
if (IsLite()) return;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kShowPackages) << all;
Broadcast(mess, kUnique);
if (all) {
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kShowSubPackages) << all;
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique, fCollectTimeout);
} else {
Collect(kUnique, fCollectTimeout);
}
}
void TProof::ShowEnabledPackages(Bool_t all)
{
if (!IsValid()) return;
if (TestBit(TProof::kIsClient)) {
printf("*** Enabled packages on client on %s\n", gSystem->HostName());
TIter next(fEnabledPackagesOnClient);
while (TObjString *str = (TObjString*) next())
printf("%s\n", str->GetName());
}
if (IsLite()) return;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kShowEnabledPackages) << all;
Broadcast(mess);
Collect(kActive, fCollectTimeout);
}
Int_t TProof::ClearPackages()
{
if (!IsValid()) return -1;
if (UnloadPackages() == -1)
return -1;
if (DisablePackages() == -1)
return -1;
return fStatus;
}
Int_t TProof::ClearPackage(const char *package)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("ClearPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
if (UnloadPackage(pac) == -1)
return -1;
if (DisablePackage(pac) == -1)
return -1;
return fStatus;
}
Int_t TProof::DisablePackage(const char *package)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("DisablePackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
if (DisablePackageOnClient(pac) == -1)
return -1;
if (IsLite()) return 0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kDisablePackage) << pac;
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kDisableSubPackage) << pac;
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
return fStatus;
}
Int_t TProof::DisablePackageOnClient(const char *package)
{
if (TestBit(TProof::kIsClient)) {
fPackageLock->Lock();
gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(), package));
gSystem->Exec(Form("%s %s/%s.par", kRM, fPackageDir.Data(), package));
fPackageLock->Unlock();
if (!gSystem->AccessPathName(Form("%s/%s.par", fPackageDir.Data(), package)))
Warning("DisablePackageOnClient", "unable to remove package PAR file for %s", package);
if (!gSystem->AccessPathName(Form("%s/%s", fPackageDir.Data(), package)))
Warning("DisablePackageOnClient", "unable to remove package directory for %s", package);
}
return 0;
}
Int_t TProof::DisablePackages()
{
if (!IsValid()) return -1;
if (TestBit(TProof::kIsClient)) {
fPackageLock->Lock();
gSystem->Exec(Form("%s %s/*", kRM, fPackageDir.Data()));
fPackageLock->Unlock();
}
if (IsLite()) return 0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kDisablePackages);
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kDisableSubPackages);
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
return fStatus;
}
Int_t TProof::BuildPackage(const char *package, EBuildPackageOpt opt)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("BuildPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
Bool_t buildOnClient = kTRUE;
if (opt == kDontBuildOnClient) {
buildOnClient = kFALSE;
opt = kBuildAll;
}
if (opt <= kBuildAll && !IsLite()) {
TMessage mess(kPROOF_CACHE);
mess << Int_t(kBuildPackage) << pac;
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kBuildSubPackage) << pac;
Broadcast(mess2, fNonUniqueMasters);
}
if (opt >= kBuildAll) {
Int_t st = 0;
if (buildOnClient)
st = BuildPackageOnClient(pac);
fStatus = 0;
if (!IsLite())
Collect(kAllUnique);
if (fStatus < 0 || st < 0)
return -1;
}
return 0;
}
Int_t TProof::BuildPackageOnClient(const TString &package)
{
if (TestBit(TProof::kIsClient)) {
Int_t status = 0;
TString pdir, ocwd;
pdir = fPackageDir + "/" + package;
if (gSystem->AccessPathName(pdir, kReadPermission) ||
gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
while ((nm = (TNamed *)nxd())) {
pdir = Form("%s/%s", nm->GetTitle(), package.Data());
if (!gSystem->AccessPathName(pdir, kReadPermission) &&
!gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
break;
}
pdir = "";
}
if (pdir.Length() <= 0) {
Error("BuildPackageOnClient", "failure locating %s ...", package.Data());
return -1;
} else {
if (gDebug > 0)
Info("BuildPackageOnClient", "found global package: %s", pdir.Data());
return 0;
}
}
}
PDB(kPackage, 1)
Info("BuildPackageOnCLient",
"package %s exists and has PROOF-INF directory", package.Data());
fPackageLock->Lock();
ocwd = gSystem->WorkingDirectory();
gSystem->ChangeDirectory(pdir);
if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
Bool_t savever = kFALSE;
Int_t rev = -1;
TString v;
FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
if (f) {
TString r;
v.Gets(f);
r.Gets(f);
rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
fclose(f);
}
if (!f || v != gROOT->GetVersion() ||
(gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision())) {
savever = kTRUE;
Info("BuildPackageOnCLient",
"%s: version change (current: %s:%d, build: %s:%d): cleaning ... ",
package.Data(), gROOT->GetVersion(), gROOT->GetSvnRevision(), v.Data(), rev);
gSystem->ChangeDirectory(fPackageDir);
gSystem->Exec(Form("%s %s", kRM, pdir.Data()));
char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP, kExecutePermission);
if (gunzip) {
TString par = Form("%s.par", pdir.Data());
TString cmd(Form(kUNTAR3, gunzip, par.Data()));
status = gSystem->Exec(cmd);
if ((status = gSystem->Exec(cmd))) {
Error("BuildPackageOnCLient", "failure executing: %s", cmd.Data());
} else {
gSystem->ChangeDirectory(pdir);
}
delete [] gunzip;
} else {
Error("BuildPackageOnCLient", "%s not found", kGUNZIP);
status = -1;
}
}
if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
Error("BuildPackageOnClient", "building package %s on the client failed", package.Data());
status = -1;
}
if (savever && !status) {
f = fopen("PROOF-INF/proofvers.txt", "w");
if (f) {
fputs(gROOT->GetVersion(), f);
fputs(Form("\n%d",gROOT->GetSvnRevision()), f);
fclose(f);
}
}
} else {
PDB(kPackage, 1)
Info("BuildPackageOnCLient",
"package %s exists but has no PROOF-INF/BUILD.sh script", package.Data());
}
gSystem->ChangeDirectory(ocwd);
fPackageLock->Unlock();
return status;
}
return 0;
}
Int_t TProof::LoadPackage(const char *package, Bool_t notOnClient)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("LoadPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
if (!notOnClient)
if (LoadPackageOnClient(pac) == -1)
return -1;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kLoadPackage) << pac;
Broadcast(mess);
Collect();
return fStatus;
}
Int_t TProof::LoadPackageOnClient(const TString &package)
{
if (TestBit(TProof::kIsClient)) {
Int_t status = 0;
TString pdir, ocwd;
if (fEnabledPackagesOnClient->FindObject(package)) {
Info("LoadPackageOnClient",
"package %s already loaded", package.Data());
return 0;
}
pdir = fPackageDir + "/" + package;
if (gSystem->AccessPathName(pdir, kReadPermission)) {
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
while ((nm = (TNamed *)nxd())) {
pdir = Form("%s/%s", nm->GetTitle(), package.Data());
if (!gSystem->AccessPathName(pdir, kReadPermission)) {
break;
}
pdir = "";
}
if (pdir.Length() <= 0) {
Error("LoadPackageOnClient", "failure locating %s ...", package.Data());
return -1;
}
}
}
ocwd = gSystem->WorkingDirectory();
gSystem->ChangeDirectory(pdir);
if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
Int_t err = 0;
Int_t errm = gROOT->Macro("PROOF-INF/SETUP.C", &err);
if (errm < 0)
status = -1;
if (err > TInterpreter::kNoError && err <= TInterpreter::kFatal)
status = -1;
} else {
PDB(kPackage, 1)
Info("LoadPackageOnCLient",
"package %s exists but has no PROOF-INF/SETUP.C script", package.Data());
}
gSystem->ChangeDirectory(ocwd);
if (!status) {
fPackageLock->Lock();
FileStat_t stat;
Int_t st = gSystem->GetPathInfo(package, stat);
if (stat.fIsLink)
gSystem->Unlink(package);
else if (st == 0) {
Error("LoadPackageOnClient", "cannot create symlink %s in %s on client, "
"another item with same name already exists", package.Data(), ocwd.Data());
fPackageLock->Unlock();
return -1;
}
gSystem->Symlink(pdir, package);
fPackageLock->Unlock();
gSystem->AddIncludePath(TString("-I") + package);
gROOT->ProcessLine(TString(".include ") + package);
fEnabledPackagesOnClient->Add(new TObjString(package));
PDB(kPackage, 1)
Info("LoadPackageOnClient",
"package %s successfully loaded", package.Data());
} else
Error("LoadPackageOnClient", "loading package %s on client failed", package.Data());
return status;
}
return 0;
}
Int_t TProof::UnloadPackage(const char *package)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("UnloadPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
if (UnloadPackageOnClient(pac) == -1)
return -1;
if (IsLite()) return 0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kUnloadPackage) << pac;
Broadcast(mess);
Collect();
return fStatus;
}
Int_t TProof::UnloadPackageOnClient(const char *package)
{
if (TestBit(TProof::kIsClient)) {
TObjString *pack = (TObjString *) fEnabledPackagesOnClient->FindObject(package);
if (pack) {
TString aclicincpath = gSystem->GetIncludePath();
TString cintincpath = gInterpreter->GetIncludePath();
aclicincpath.Remove(aclicincpath.Length() - cintincpath.Length() - 1);
aclicincpath.ReplaceAll(TString(" -I") + package, "");
gSystem->SetIncludePath(aclicincpath);
fEnabledPackagesOnClient->Remove(pack);
}
if (!gSystem->AccessPathName(package))
if (gSystem->Unlink(package) != 0)
Warning("UnloadPackageOnClient", "unable to remove symlink to %s", package);
delete pack;
}
return 0;
}
Int_t TProof::UnloadPackages()
{
if (!IsValid()) return -1;
if (TestBit(TProof::kIsClient)) {
TIter nextpackage(fEnabledPackagesOnClient);
while (TObjString *objstr = dynamic_cast<TObjString*>(nextpackage()))
if (UnloadPackageOnClient(objstr->String()) == -1 )
return -1;
}
if (IsLite()) return 0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kUnloadPackages);
Broadcast(mess);
Collect();
return fStatus;
}
Int_t TProof::EnablePackage(const char *package, Bool_t notOnClient)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("EnablePackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
EBuildPackageOpt opt = kBuildAll;
if (notOnClient)
opt = kDontBuildOnClient;
if (BuildPackage(pac, opt) == -1)
return -1;
if (LoadPackage(pac, notOnClient) == -1)
return -1;
return 0;
}
Int_t TProof::UploadPackage(const char *pack, EUploadPackageOpt opt)
{
if (!IsValid()) return -1;
TString par = pack;
if (!par.EndsWith(".par"))
par += ".par";
gSystem->ExpandPathName(par);
if (gSystem->AccessPathName(par, kReadPermission)) {
TString tried = par;
par = Form("%s/%s", fPackageDir.Data(), gSystem->BaseName(par));
if (gSystem->AccessPathName(par, kReadPermission)) {
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
TString pdir;
while ((nm = (TNamed *)nxd())) {
pdir = Form("%s/%s", nm->GetTitle(), pack);
if (!gSystem->AccessPathName(pdir, kReadPermission)) {
break;
}
pdir = "";
}
if (pdir.Length() > 0) {
if (gDebug > 0)
Info("UploadPackage", "global package found (%s): no upload needed",
pdir.Data());
return 0;
}
}
Error("UploadPackage", "PAR file '%s' not found; paths tried: %s, %s",
gSystem->BaseName(par), tried.Data(), par.Data());
return -1;
}
}
TMD5 *md5 = TMD5::FileChecksum(par);
if (UploadPackageOnClient(par, opt, md5) == -1) {
delete md5;
return -1;
}
if (IsLite()) {
delete md5;
return 0;
}
TString smsg;
smsg.Form("+%s", gSystem->BaseName(par));
TMessage mess(kPROOF_CHECKFILE);
mess << smsg << (*md5);
TMessage mess2(kPROOF_CHECKFILE);
smsg.Replace(0, 1, "-");
mess2 << smsg << (*md5);
TMessage mess3(kPROOF_CHECKFILE);
smsg.Replace(0, 1, "=");
mess3 << smsg << (*md5);
delete md5;
if (fProtocol > 8) {
mess << (UInt_t) opt;
mess2 << (UInt_t) opt;
mess3 << (UInt_t) opt;
}
TIter next(fUniqueSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *) next())) {
if (!sl->IsValid())
continue;
sl->GetSocket()->Send(mess);
fCheckFileStatus = 0;
Collect(sl, fCollectTimeout, kPROOF_CHECKFILE);
if (fCheckFileStatus == 0) {
if (fProtocol > 5) {
smsg.Form("%s/%s/%s", sl->GetProofWorkDir(), kPROOF_PackDir,
gSystem->BaseName(par));
if (SendFile(par, (kBinary | kForce | kCpBin | kForward), smsg.Data(), sl) < 0) {
Error("UploadPackage", "%s: problems uploading file %s",
sl->GetOrdinal(), par.Data());
return -1;
}
} else {
TFTP ftp(TString("root://")+sl->GetName(), 1);
if (!ftp.IsZombie()) {
smsg.Form("%s/%s", sl->GetProofWorkDir(), kPROOF_PackDir);
ftp.cd(smsg.Data());
ftp.put(par, gSystem->BaseName(par));
}
}
sl->GetSocket()->Send(mess2);
fCheckFileStatus = 0;
Collect(sl, fCollectTimeout, kPROOF_CHECKFILE);
if (fCheckFileStatus == 0) {
Error("UploadPackage", "%s: unpacking of package %s failed",
sl->GetOrdinal(), gSystem->BaseName(par));
return -1;
}
}
}
TIter nextmaster(fNonUniqueMasters);
TSlave *ma;
while ((ma = (TSlave *) nextmaster())) {
if (!ma->IsValid())
continue;
ma->GetSocket()->Send(mess3);
fCheckFileStatus = 0;
Collect(sl, fCollectTimeout, kPROOF_CHECKFILE);
if (fCheckFileStatus == 0) {
Error("UploadPackage", "package %s did not exist on submaster %s",
par.Data(), ma->GetOrdinal());
return -1;
}
}
return 0;
}
Int_t TProof::UploadPackageOnClient(const TString &par, EUploadPackageOpt opt, TMD5 *md5)
{
Int_t status = 0;
if (TestBit(TProof::kIsClient)) {
fPackageLock->Lock();
TString lpar = fPackageDir + "/" + gSystem->BaseName(par);
FileStat_t stat;
Int_t st = gSystem->GetPathInfo(lpar, stat);
if (stat.fIsLink)
gSystem->Unlink(lpar);
else if (st == 0) {
Error("UploadPackageOnClient", "cannot create symlink %s on client, "
"another item with same name already exists",
lpar.Data());
fPackageLock->Unlock();
return -1;
}
if (!gSystem->IsAbsoluteFileName(par)) {
TString fpar = par;
gSystem->Symlink(gSystem->PrependPathName(gSystem->WorkingDirectory(), fpar), lpar);
} else
gSystem->Symlink(par, lpar);
TString packnam = par(0, par.Length() - 4);
packnam = gSystem->BaseName(packnam);
TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
TMD5 *md5local = TMD5::ReadChecksum(md5f);
if (!md5local || (*md5) != (*md5local)) {
if ((opt & TProof::kRemoveOld)) {
if (gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
packnam.Data())))
Error("UploadPackageOnClient", "failure executing: %s %s/%s",
kRM, fPackageDir.Data(), packnam.Data());
}
char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
kExecutePermission);
if (gunzip) {
if (gSystem->Exec(Form(kUNTAR2, gunzip, par.Data(), fPackageDir.Data())))
Error("Uploadpackage", "failure executing: %s",
Form(kUNTAR2, gunzip, par.Data(), fPackageDir.Data()));
delete [] gunzip;
} else
Error("UploadPackageOnClient", "%s not found", kGUNZIP);
if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
Error("UploadPackageOnClient",
"package %s did not unpack into %s/%s", par.Data(), fPackageDir.Data(),
packnam.Data());
status = -1;
} else {
TMD5::WriteChecksum(md5f, md5);
}
}
fPackageLock->Unlock();
delete md5local;
}
return status;
}
Int_t TProof::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueWorkers,
TList *wrks)
{
if (!IsValid()) return -1;
if (IsLite()) {
Warning("Load", "functionality not yet implemented; please use Exec(...)"
" or a dedicated PAR package");
return -1;
}
if (!macro || !strlen(macro)) {
Error("Load", "need to specify a macro name");
return -1;
}
if (TestBit(TProof::kIsClient)) {
if (wrks) {
Error("Load", "the 'wrks' arg can be used only on the master");
return -1;
}
TString implname = macro;
TString acmode, args, io;
implname = gSystem->SplitAclicMode(implname, acmode, args, io);
Int_t dot = implname.Last('.');
if (dot == kNPOS) {
Info("Load", "macro '%s' does not contain a '.': do nothing", macro);
return -1;
}
Bool_t hasHeader = kTRUE;
TString headname = implname;
headname.Remove(dot);
headname += ".h";
if (gSystem->AccessPathName(headname, kReadPermission)) {
TString h = headname;
headname.Remove(dot);
headname += ".hh";
if (gSystem->AccessPathName(headname, kReadPermission)) {
hasHeader = kFALSE;
if (gDebug > 0)
Info("Load", "no associated header file found: tried: %s %s",
h.Data(), headname.Data());
}
}
if (SendFile(implname, kAscii | kForward , "cache") == -1) {
Info("Load", "problems sending implementation file %s", implname.Data());
return -1;
}
if (hasHeader)
if (SendFile(headname, kAscii | kForward , "cache") == -1) {
Info("Load", "problems sending header file %s", headname.Data());
return -1;
}
TString basemacro = gSystem->BaseName(macro);
TMessage mess(kPROOF_CACHE);
mess << Int_t(kLoadMacro) << basemacro;
Broadcast(mess, kActive);
if (!notOnClient) {
gROOT->ProcessLine(Form(".L %s", macro));
TString mp(TROOT::GetMacroPath());
TString np(gSystem->DirName(macro));
if (!np.IsNull()) {
np += ":";
Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
mp.Insert(ip, np);
}
TROOT::SetMacroPath(mp);
if (gDebug > 0)
Info("Load", "macro path set to '%s'", TROOT::GetMacroPath());
}
Collect(kActive);
} else {
TString basemacro = gSystem->BaseName(macro);
TMessage mess(kPROOF_CACHE);
if (uniqueWorkers) {
mess << Int_t(kLoadMacro) << basemacro;
if (wrks)
Broadcast(mess, wrks);
else
Broadcast(mess, kUnique);
} else {
Collect(kUnique);
TList others;
TSlave *wrk = 0;
TIter nxw(fActiveSlaves);
while ((wrk = (TSlave *)nxw())) {
if (!fUniqueSlaves->FindObject(wrk)) {
others.Add(wrk);
}
}
Int_t ld = basemacro.Last('.');
if (ld != kNPOS) {
Int_t lpp = basemacro.Index("++", ld);
if (lpp != kNPOS) basemacro.Replace(lpp, 2, "+");
}
mess << Int_t(kLoadMacro) << basemacro;
Broadcast(mess, &others);
Collect(&others);
}
PDB(kGlobal, 1) Info("Load", "adding loaded macro: %s", macro);
if (!fLoadedMacros) {
fLoadedMacros = new TList();
fLoadedMacros->SetOwner();
}
if (!wrks)
fLoadedMacros->Add(new TObjString(macro));
}
return 0;
}
Int_t TProof::AddDynamicPath(const char *libpath, Bool_t onClient, TList *wrks)
{
if ((!libpath || !strlen(libpath))) {
if (gDebug > 0)
Info("AddDynamicPath", "list is empty - nothing to do");
return 0;
}
if (onClient)
HandleLibIncPath("lib", kTRUE, libpath);
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("lib") << (Bool_t)kTRUE;
if (libpath && strlen(libpath))
m << TString(libpath);
else
m << TString("-");
if (wrks)
Broadcast(m, wrks);
else
Broadcast(m);
Collect(kActive, fCollectTimeout);
return 0;
}
Int_t TProof::AddIncludePath(const char *incpath, Bool_t onClient, TList *wrks)
{
if ((!incpath || !strlen(incpath))) {
if (gDebug > 0)
Info("AddIncludePath", "list is empty - nothing to do");
return 0;
}
if (onClient)
HandleLibIncPath("inc", kTRUE, incpath);
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("inc") << (Bool_t)kTRUE;
if (incpath && strlen(incpath))
m << TString(incpath);
else
m << TString("-");
if (wrks)
Broadcast(m, wrks);
else
Broadcast(m);
Collect(kActive, fCollectTimeout);
return 0;
}
Int_t TProof::RemoveDynamicPath(const char *libpath, Bool_t onClient)
{
if ((!libpath || !strlen(libpath))) {
if (gDebug > 0)
Info("RemoveDynamicPath", "list is empty - nothing to do");
return 0;
}
if (onClient)
HandleLibIncPath("lib", kFALSE, libpath);
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("lib") <<(Bool_t)kFALSE;
if (libpath && strlen(libpath))
m << TString(libpath);
else
m << TString("-");
Broadcast(m);
Collect(kActive, fCollectTimeout);
return 0;
}
Int_t TProof::RemoveIncludePath(const char *incpath, Bool_t onClient)
{
if ((!incpath || !strlen(incpath))) {
if (gDebug > 0)
Info("RemoveIncludePath", "list is empty - nothing to do");
return 0;
}
if (onClient)
HandleLibIncPath("in", kFALSE, incpath);
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("inc") << (Bool_t)kFALSE;
if (incpath && strlen(incpath))
m << TString(incpath);
else
m << TString("-");
Broadcast(m);
Collect(kActive, fCollectTimeout);
return 0;
}
void TProof::HandleLibIncPath(const char *what, Bool_t add, const char *dirs)
{
TString type(what);
TString path(dirs);
if ((type != "lib") && (type != "inc")) {
Error("HandleLibIncPath","unknown action type: %s", type.Data());
return;
}
path.ReplaceAll(","," ");
TObjArray *op = 0;
if (path.Length() > 0 && path != "-") {
if (!(op = path.Tokenize(" "))) {
Error("HandleLibIncPath","decomposing path %s", path.Data());
return;
}
}
if (add) {
if (type == "lib") {
TIter nxl(op, kIterBackward);
TObjString *lib = 0;
while ((lib = (TObjString *) nxl())) {
TString xlib = lib->GetName();
gSystem->ExpandPathName(xlib);
if (!gSystem->AccessPathName(xlib, kReadPermission)) {
TString newlibpath = gSystem->GetDynamicPath();
Int_t pos = 0;
if (newlibpath.BeginsWith(".:"))
pos = 2;
if (newlibpath.Index(xlib) == kNPOS) {
newlibpath.Insert(pos,Form("%s:", xlib.Data()));
gSystem->SetDynamicPath(newlibpath);
}
} else {
Info("HandleLibIncPath",
"libpath %s does not exist or cannot be read - not added", xlib.Data());
}
}
} else {
TIter nxi(op);
TObjString *inc = 0;
while ((inc = (TObjString *) nxi())) {
TString xinc = inc->GetName();
gSystem->ExpandPathName(xinc);
if (!gSystem->AccessPathName(xinc, kReadPermission)) {
TString curincpath = gSystem->GetIncludePath();
if (curincpath.Index(xinc) == kNPOS)
gSystem->AddIncludePath(Form("-I%s", xinc.Data()));
} else
Info("HandleLibIncPath",
"incpath %s does not exist or cannot be read - not added", xinc.Data());
}
}
} else {
if (type == "lib") {
TIter nxl(op);
TObjString *lib = 0;
while ((lib = (TObjString *) nxl())) {
TString xlib = lib->GetName();
gSystem->ExpandPathName(xlib);
TString newlibpath = gSystem->GetDynamicPath();
newlibpath.ReplaceAll(Form("%s:", xlib.Data()),"");
gSystem->SetDynamicPath(newlibpath);
}
} else {
TIter nxi(op);
TObjString *inc = 0;
while ((inc = (TObjString *) nxi())) {
TString newincpath = gSystem->GetIncludePath();
newincpath.ReplaceAll(Form("-I%s", inc->GetName()),"");
newincpath.ReplaceAll(gInterpreter->GetIncludePath(),"");
gSystem->SetIncludePath(newincpath);
}
}
}
}
TList *TProof::GetListOfPackages()
{
if (!IsValid())
return (TList *)0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kListPackages);
Broadcast(mess);
Collect(kActive, fCollectTimeout);
return fAvailablePackages;
}
TList *TProof::GetListOfEnabledPackages()
{
if (!IsValid())
return (TList *)0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kListEnabledPackages);
Broadcast(mess);
Collect(kActive, fCollectTimeout);
return fEnabledPackages;
}
void TProof::PrintProgress(Long64_t total, Long64_t processed, Float_t procTime)
{
if (fPrintProgress) {
Bool_t redirlog = fRedirLog;
fRedirLog = kFALSE;
(*fPrintProgress)(total, processed, procTime);
fRedirLog = redirlog;
return;
}
fprintf(stderr, "[TProof::Progress] Total %lld events\t|", total);
for (int l = 0; l < 20; l++) {
if (total > 0) {
if (l < 20*processed/total)
fprintf(stderr, "=");
else if (l == 20*processed/total)
fprintf(stderr, ">");
else if (l > 20*processed/total)
fprintf(stderr, ".");
} else
fprintf(stderr, "=");
}
Float_t evtrti = (procTime > 0. && processed > 0) ? processed / procTime : -1.;
if (evtrti > 0.)
fprintf(stderr, "| %.02f %% [%.1f evts/s]\r",
(total ? ((100.0*processed)/total) : 100.0), evtrti);
else
fprintf(stderr, "| %.02f %%\r",
(total ? ((100.0*processed)/total) : 100.0));
if (processed >= total)
fprintf(stderr, "\n");
}
void TProof::Progress(Long64_t total, Long64_t processed)
{
if (fPrintProgress) {
return (*fPrintProgress)(total, processed, -1.);
}
PDB(kGlobal,1)
Info("Progress","%2f (%lld/%lld)", 100.*processed/total, processed, total);
if (gROOT->IsBatch()) {
if (total > 0)
PrintProgress(total, processed);
} else {
EmitVA("Progress(Long64_t,Long64_t)", 2, total, processed);
}
}
void TProof::Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{
PDB(kGlobal,1)
Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
initTime, procTime, evtrti, mbrti);
if (gROOT->IsBatch()) {
if (total > 0)
PrintProgress(total, processed, procTime);
} else {
EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
7, total, processed, bytesread, initTime, procTime, evtrti, mbrti);
}
}
void TProof::Feedback(TList *objs)
{
PDB(kGlobal,1)
Info("Feedback","%d objects", objs->GetSize());
PDB(kFeedback,1) {
Info("Feedback","%d objects", objs->GetSize());
objs->ls();
}
Emit("Feedback(TList *objs)", (Long_t) objs);
}
void TProof::CloseProgressDialog()
{
PDB(kGlobal,1)
Info("CloseProgressDialog",
"called: have progress dialog: %d", fProgressDialogStarted);
if (!fProgressDialogStarted)
return;
Emit("CloseProgressDialog()");
}
void TProof::ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst,
Long64_t ent)
{
PDB(kGlobal,1)
Info("ResetProgressDialog","(%s,%d,%lld,%lld)", sel, sz, fst, ent);
EmitVA("ResetProgressDialog(const char*,Int_t,Long64_t,Long64_t)",
4, sel, sz, fst, ent);
}
void TProof::StartupMessage(const char *msg, Bool_t st, Int_t done, Int_t total)
{
PDB(kGlobal,1)
Info("StartupMessage","(%s,%d,%d,%d)", msg, st, done, total);
EmitVA("StartupMessage(const char*,Bool_t,Int_t,Int_t)",
4, msg, st, done, total);
}
void TProof::DataSetStatus(const char *msg, Bool_t st, Int_t done, Int_t total)
{
PDB(kGlobal,1)
Info("DataSetStatus","(%s,%d,%d,%d)", msg, st, done, total);
EmitVA("DataSetStatus(const char*,Bool_t,Int_t,Int_t)",
4, msg, st, done, total);
}
void TProof::SendDataSetStatus(const char *action, UInt_t done,
UInt_t tot, Bool_t st)
{
if (IsLite()) {
if (tot) {
TString type = "files";
Int_t frac = (Int_t) (done*100.)/tot;
char msg[512] = {0};
if (frac >= 100) {
sprintf(msg,"%s: OK (%d %s) \n",
action,tot, type.Data());
} else {
sprintf(msg,"%s: %d out of %d (%d %%)\r",
action, done, tot, frac);
}
if (fSync)
fprintf(stderr,"%s", msg);
else
NotifyLogMsg(msg, 0);
}
return;
}
if (TestBit(TProof::kIsMaster)) {
TMessage mess(kPROOF_DATASET_STATUS);
mess << TString(action) << tot << done << st;
gProofServ->GetSocket()->Send(mess);
}
}
void TProof::QueryResultReady(const char *ref)
{
PDB(kGlobal,1)
Info("QueryResultReady","ref: %s", ref);
Emit("QueryResultReady(const char*)",ref);
}
void TProof::ValidateDSet(TDSet *dset)
{
if (dset->ElementsValid()) return;
TList nodes;
nodes.SetOwner();
TList slholder;
slholder.SetOwner();
TList elemholder;
elemholder.SetOwner();
TIter nextSlave(GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
TList *sllist = 0;
TPair *p = dynamic_cast<TPair*>(nodes.FindObject(sl->GetName()));
if (!p) {
sllist = new TList;
sllist->SetName(sl->GetName());
slholder.Add(sllist);
TList *elemlist = new TList;
elemlist->SetName(TString(sl->GetName())+"_elem");
elemholder.Add(elemlist);
nodes.Add(new TPair(sllist, elemlist));
} else {
sllist = dynamic_cast<TList*>(p->Key());
}
sllist->Add(sl);
}
TList nonLocal;
for (Int_t i = 0; i < 2; i++) {
Bool_t local = i>0?kFALSE:kTRUE;
TIter nextElem(local ? dset->GetListOfElements() : &nonLocal);
while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
if (elem->GetValid()) continue;
TPair *p = dynamic_cast<TPair*>(local?nodes.FindObject(TUrl(elem->GetFileName()).GetHost()):nodes.At(0));
if (p) {
TList *eli = dynamic_cast<TList*>(p->Value());
TList *sli = dynamic_cast<TList*>(p->Key());
eli->Add(elem);
TPair *p2 = p;
Bool_t stop = kFALSE;
while (!stop) {
TPair *p3 = dynamic_cast<TPair*>(nodes.After(p2->Key()));
if (p3) {
Int_t nelem = dynamic_cast<TList*>(p3->Value())->GetSize();
Int_t nsl = dynamic_cast<TList*>(p3->Key())->GetSize();
if (nelem*sli->GetSize() < eli->GetSize()*nsl) p2 = p3;
else stop = kTRUE;
} else {
stop = kTRUE;
}
}
if (p2!=p) {
nodes.Remove(p->Key());
nodes.AddAfter(p2->Key(), p);
}
} else {
if (local) {
nonLocal.Add(elem);
} else {
Error("ValidateDSet", "No Node to allocate TDSetElement to");
R__ASSERT(0);
}
}
}
}
TList usedslaves;
TIter nextNode(&nodes);
SetDSet(dset);
while (TPair *node = dynamic_cast<TPair*>(nextNode())) {
TList *slaves = dynamic_cast<TList*>(node->Key());
TList *setelements = dynamic_cast<TList*>(node->Value());
Int_t nslaves = slaves->GetSize();
Int_t nelements = setelements->GetSize();
for (Int_t i=0; i<nslaves; i++) {
TDSet copyset(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
for (Int_t j = (i*nelements)/nslaves;
j < ((i+1)*nelements)/nslaves;
j++) {
TDSetElement *elem =
dynamic_cast<TDSetElement*>(setelements->At(j));
copyset.Add(elem->GetFileName(), elem->GetObjName(),
elem->GetDirectory(), elem->GetFirst(),
elem->GetNum(), elem->GetMsd());
}
if (copyset.GetListOfElements()->GetSize()>0) {
TMessage mesg(kPROOF_VALIDATE_DSET);
mesg << ©set;
TSlave *sl = dynamic_cast<TSlave*>(slaves->At(i));
PDB(kGlobal,1) Info("ValidateDSet",
"Sending TDSet with %d elements to slave %s"
" to be validated",
copyset.GetListOfElements()->GetSize(),
sl->GetOrdinal());
sl->GetSocket()->Send(mesg);
usedslaves.Add(sl);
}
}
}
PDB(kGlobal,1)
Info("ValidateDSet","Calling Collect");
Collect(&usedslaves);
SetDSet(0);
}
void TProof::AddInputData(TObject *obj, Bool_t push)
{
if (obj) {
if (!fInputData) fInputData = new TList;
if (!fInputData->FindObject(obj)) {
fInputData->Add(obj);
SetBit(TProof::kNewInputData);
}
}
if (push) SetBit(TProof::kNewInputData);
}
void TProof::ClearInputData(TObject *obj)
{
if (!obj) {
if (fInputData) {
fInputData->SetOwner(kTRUE);
SafeDelete(fInputData);
}
ResetBit(TProof::kNewInputData);
TObject *o = 0;
TList *in = GetInputList();
while ((o = GetInputList()->FindObject("PROOF_InputDataFile")))
in->Remove(o);
while ((o = GetInputList()->FindObject("PROOF_InputData")))
in->Remove(o);
fInputDataFile = "";
gSystem->Unlink(kPROOF_InputDataFile);
} else if (fInputData) {
Int_t sz = fInputData->GetSize();
while (fInputData->FindObject(obj))
fInputData->Remove(obj);
if (sz != fInputData->GetSize())
SetBit(TProof::kNewInputData);
}
}
void TProof::ClearInputData(const char *name)
{
TObject *obj = (fInputData && name) ? fInputData->FindObject(name) : 0;
if (obj) ClearInputData(obj);
}
void TProof::SetInputDataFile(const char *datafile)
{
if (datafile && strlen(datafile) > 0) {
if (fInputDataFile != datafile && strcmp(datafile, kPROOF_InputDataFile))
SetBit(TProof::kNewInputData);
fInputDataFile = datafile;
} else {
if (!fInputDataFile.IsNull())
SetBit(TProof::kNewInputData);
fInputDataFile = "";
}
if (fInputDataFile != kPROOF_InputDataFile && !fInputDataFile.IsNull() &&
gSystem->AccessPathName(fInputDataFile, kReadPermission)) {
fInputDataFile = "";
}
}
void TProof::SendInputDataFile()
{
TString dataFile;
PrepareInputDataFile(dataFile);
if (dataFile.Length() > 0) {
Info("SendInputDataFile", "broadcasting %s", dataFile.Data());
BroadcastFile(dataFile.Data(), kBinary, "cache", kActive);
AddInput(new TNamed("PROOF_InputDataFile", Form("cache:%s", gSystem->BaseName(dataFile))));
}
}
void TProof::PrepareInputDataFile(TString &dataFile)
{
Bool_t newdata = TestBit(TProof::kNewInputData) ? kTRUE : kFALSE;
ResetBit(TProof::kNewInputData);
Bool_t list_ok = (fInputData && fInputData->GetSize() > 0) ? kTRUE : kFALSE;
Bool_t file_ok = kFALSE;
if (fInputDataFile != kPROOF_InputDataFile && !fInputDataFile.IsNull() &&
!gSystem->AccessPathName(fInputDataFile, kReadPermission)) {
TFile *f = TFile::Open(fInputDataFile);
if (f && f->GetListOfKeys() && f->GetListOfKeys()->GetSize() > 0)
file_ok = kTRUE;
}
TObject *o = 0;
TList *in = GetInputList();
while ((o = GetInputList()->FindObject("PROOF_InputDataFile")))
in->Remove(o);
while ((o = GetInputList()->FindObject("PROOF_InputData")))
in->Remove(o);
dataFile = "";
if (!list_ok && !file_ok) return;
if (file_ok && !list_ok) {
dataFile = fInputDataFile;
} else if (!file_ok && list_ok) {
fInputDataFile = kPROOF_InputDataFile;
if (!newdata && !gSystem->AccessPathName(fInputDataFile)) return;
TFile *f = TFile::Open(fInputDataFile, "RECREATE");
if (f) {
f->cd();
TIter next(fInputData);
TObject *obj;
while ((obj = next())) {
obj->Write(0, TObject::kSingleKey, 0);
}
f->Close();
SafeDelete(f);
} else {
Error("PrepareInputDataFile", "could not (re-)create %s", fInputDataFile.Data());
return;
}
dataFile = fInputDataFile;
} else if (file_ok && list_ok) {
dataFile = kPROOF_InputDataFile;
if (newdata || gSystem->AccessPathName(dataFile)) {
if (!gSystem->AccessPathName(dataFile))
gSystem->Unlink(dataFile);
if (dataFile != fInputDataFile) {
if (gSystem->CopyFile(fInputDataFile, dataFile, kTRUE) != 0) {
Error("PrepareInputDataFile", "could not make local copy of %s", fInputDataFile.Data());
return;
}
}
TFile *f = TFile::Open(dataFile, "UPDATE");
if (f) {
f->cd();
TIter next(fInputData);
TObject *obj = 0;
while ((obj = next())) {
obj->Write(0, TObject::kSingleKey, 0);
}
f->Close();
SafeDelete(f);
} else {
Error("PrepareInputDataFile", "could not open %s for updating", dataFile.Data());
return;
}
}
}
return;
}
void TProof::AddInput(TObject *obj)
{
if (fPlayer) fPlayer->AddInput(obj);
}
void TProof::ClearInput()
{
if (fPlayer) fPlayer->ClearInput();
AddInput(fFeedback);
}
TList *TProof::GetInputList()
{
return (fPlayer ? fPlayer->GetInputList() : (TList *)0);
}
TObject *TProof::GetOutput(const char *name)
{
return (fPlayer) ? fPlayer->GetOutput(name) : (TObject *)0;
}
TList *TProof::GetOutputList()
{
return (fPlayer ? fPlayer->GetOutputList() : (TList *)0);
}
void TProof::SetParameter(const char *par, const char *value)
{
if (!fPlayer) {
Warning("SetParameter", "player undefined! Ignoring");
return;
}
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TNamed(par, value));
}
void TProof::SetParameter(const char *par, Int_t value)
{
if (!fPlayer) {
Warning("SetParameter", "player undefined! Ignoring");
return;
}
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Int_t>(par, value));
}
void TProof::SetParameter(const char *par, Long_t value)
{
if (!fPlayer) {
Warning("SetParameter", "player undefined! Ignoring");
return;
}
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Long_t>(par, value));
}
void TProof::SetParameter(const char *par, Long64_t value)
{
if (!fPlayer) {
Warning("SetParameter", "player undefined! Ignoring");
return;
}
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Long64_t>(par, value));
}
void TProof::SetParameter(const char *par, Double_t value)
{
if (!fPlayer) {
Warning("SetParameter", "player undefined! Ignoring");
return;
}
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Double_t>(par, value));
}
TObject *TProof::GetParameter(const char *par) const
{
if (!fPlayer) {
Warning("GetParameter", "player undefined! Ignoring");
return (TObject *)0;
}
TList *il = fPlayer->GetInputList();
return il->FindObject(par);
}
void TProof::DeleteParameters(const char *wildcard)
{
if (!fPlayer) return;
if (!wildcard) wildcard = "";
TRegexp re(wildcard, kTRUE);
Int_t nch = strlen(wildcard);
TList *il = fPlayer->GetInputList();
TObject *p;
TIter next(il);
while ((p = next())) {
TString s = p->GetName();
if (nch && s != wildcard && s.Index(re) == kNPOS) continue;
il->Remove(p);
delete p;
}
}
void TProof::ShowParameters(const char *wildcard) const
{
if (!fPlayer) return;
if (!wildcard) wildcard = "";
TRegexp re(wildcard, kTRUE);
Int_t nch = strlen(wildcard);
TList *il = fPlayer->GetInputList();
TObject *p;
TIter next(il);
while ((p = next())) {
TString s = p->GetName();
if (nch && s != wildcard && s.Index(re) == kNPOS) continue;
if (p->IsA() == TNamed::Class()) {
Printf("%s\t\t\t%s", s.Data(), p->GetTitle());
} else if (p->IsA() == TParameter<Long_t>::Class()) {
Printf("%s\t\t\t%ld", s.Data(), dynamic_cast<TParameter<Long_t>*>(p)->GetVal());
} else if (p->IsA() == TParameter<Long64_t>::Class()) {
Printf("%s\t\t\t%lld", s.Data(), dynamic_cast<TParameter<Long64_t>*>(p)->GetVal());
} else if (p->IsA() == TParameter<Double_t>::Class()) {
Printf("%s\t\t\t%f", s.Data(), dynamic_cast<TParameter<Double_t>*>(p)->GetVal());
} else {
Printf("%s\t\t\t%s", s.Data(), p->GetTitle());
}
}
}
void TProof::AddFeedback(const char *name)
{
PDB(kFeedback, 3)
Info("AddFeedback", "Adding object \"%s\" to feedback", name);
if (fFeedback->FindObject(name) == 0)
fFeedback->Add(new TObjString(name));
}
void TProof::RemoveFeedback(const char *name)
{
TObject *obj = fFeedback->FindObject(name);
if (obj != 0) {
fFeedback->Remove(obj);
delete obj;
}
}
void TProof::ClearFeedback()
{
fFeedback->Delete();
}
void TProof::ShowFeedback() const
{
if (fFeedback->GetSize() == 0) {
Info("","no feedback requested");
return;
}
fFeedback->Print();
}
TList *TProof::GetFeedbackList() const
{
return fFeedback;
}
TTree *TProof::GetTreeHeader(TDSet *dset)
{
TList *l = GetListOfActiveSlaves();
TSlave *sl = (TSlave*) l->First();
if (sl == 0) {
Error("GetTreeHeader", "No connection");
return 0;
}
TSocket *soc = sl->GetSocket();
TMessage msg(kPROOF_GETTREEHEADER);
msg << dset;
soc->Send(msg);
TMessage *reply;
Int_t d = -1;
if (fProtocol >= 20) {
Collect(sl, fCollectTimeout, kPROOF_GETTREEHEADER);
reply = (TMessage *) fRecvMessages->First();
} else {
d = soc->Recv(reply);
}
if (!reply) {
Error("GetTreeHeader", "Error getting a replay from the master.Result %d", (int) d);
return 0;
}
TString s1;
TTree *t = 0;
(*reply) >> s1;
if (s1 == "Success")
(*reply) >> t;
PDB(kGlobal, 1) {
if (t) {
Info("GetTreeHeader", "%s, message size: %d, entries: %d",
s1.Data(), reply->BufferSize(), (int) t->GetMaxEntryLoop());
} else {
Info("GetTreeHeader", "tree header retrieval failed");
}
}
delete reply;
return t;
}
TDrawFeedback *TProof::CreateDrawFeedback()
{
return (fPlayer ? fPlayer->CreateDrawFeedback(this) : (TDrawFeedback *)0);
}
void TProof::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
{
if (fPlayer) fPlayer->SetDrawFeedbackOption(f, opt);
}
void TProof::DeleteDrawFeedback(TDrawFeedback *f)
{
if (fPlayer) fPlayer->DeleteDrawFeedback(f);
}
TList *TProof::GetOutputNames()
{
return 0;
}
void TProof::Browse(TBrowser *b)
{
b->Add(fActiveSlaves, fActiveSlaves->Class(), "fActiveSlaves");
b->Add(&fMaster, fMaster.Class(), "fMaster");
b->Add(fFeedback, fFeedback->Class(), "fFeedback");
b->Add(fChains, fChains->Class(), "fChains");
if (fPlayer) {
b->Add(fPlayer->GetInputList(), fPlayer->GetInputList()->Class(), "InputList");
if (fPlayer->GetOutputList())
b->Add(fPlayer->GetOutputList(), fPlayer->GetOutputList()->Class(), "OutputList");
if (fPlayer->GetListOfResults())
b->Add(fPlayer->GetListOfResults(),
fPlayer->GetListOfResults()->Class(), "ListOfResults");
}
}
void TProof::SetPlayer(TVirtualProofPlayer *player)
{
if (fPlayer)
delete fPlayer;
fPlayer = player;
};
TVirtualProofPlayer *TProof::MakePlayer(const char *player, TSocket *s)
{
if (!player)
player = "remote";
SetPlayer(TVirtualProofPlayer::Create(player, this, s));
return GetPlayer();
}
void TProof::AddChain(TChain *chain)
{
fChains->Add(chain);
}
void TProof::RemoveChain(TChain *chain)
{
fChains->Remove(chain);
}
void TProof::GetLog(Int_t start, Int_t end)
{
if (!IsValid() || TestBit(TProof::kIsMaster)) return;
TMessage msg(kPROOF_LOGFILE);
msg << start << end;
Broadcast(msg, kActive);
Collect(kActive, fCollectTimeout);
}
void TProof::PutLog(TQueryResult *pq)
{
if (!pq) return;
TList *lines = pq->GetLogFile()->GetListOfLines();
if (lines) {
TIter nxl(lines);
TObjString *l = 0;
while ((l = (TObjString *)nxl()))
EmitVA("LogMessage(const char*,Bool_t)", 2, l->GetName(), kFALSE);
}
}
void TProof::ShowLog(const char *queryref)
{
Retrieve(queryref);
if (fPlayer) {
if (queryref) {
if (fPlayer->GetListOfResults()) {
TIter nxq(fPlayer->GetListOfResults());
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq()))
if (strstr(queryref, qr->GetTitle()) &&
strstr(queryref, qr->GetName()))
break;
if (qr) {
PutLog(qr);
return;
}
}
}
}
}
void TProof::ShowLog(Int_t qry)
{
Int_t nowlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_CUR);
Int_t startlog = nowlog;
Int_t endlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_END);
lseek(fileno(fLogFileR), (off_t) nowlog, SEEK_SET);
if (qry == 0) {
startlog = 0;
lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
} else if (qry != -1) {
TQueryResult *pq = 0;
if (qry == -2) {
pq = (GetQueryResults()) ? ((TQueryResult *)(GetQueryResults()->Last())) : 0;
if (!pq) {
GetListOfQueries();
if (fQueries)
pq = (TQueryResult *)(fQueries->Last());
}
} else if (qry > 0) {
TList *queries = GetQueryResults();
if (queries) {
TIter nxq(queries);
while ((pq = (TQueryResult *)nxq()))
if (qry == pq->GetSeqNum())
break;
}
if (!pq) {
queries = GetListOfQueries();
TIter nxq(queries);
while ((pq = (TQueryResult *)nxq()))
if (qry == pq->GetSeqNum())
break;
}
}
if (pq) {
PutLog(pq);
return;
} else {
if (gDebug > 0)
Info("ShowLog","query %d not found in list", qry);
qry = -1;
}
}
UInt_t tolog = (UInt_t)(endlog - startlog);
if (tolog <= 0)
lseek(fileno(fLogFileR), (off_t) startlog, SEEK_SET);
Int_t np = 0;
char line[2048];
Int_t wanted = (tolog > sizeof(line)) ? sizeof(line) : tolog;
while (fgets(line, wanted, fLogFileR)) {
Int_t r = strlen(line);
if (!SendingLogToWindow()) {
if (line[r-1] != '\n') line[r-1] = '\n';
if (r > 0) {
char *p = line;
while (r) {
Int_t w = write(fileno(stdout), p, r);
if (w < 0) {
SysError("ShowLogFile", "error writing to stdout");
break;
}
r -= w;
p += w;
}
}
tolog -= strlen(line);
np++;
if (!(np%10)) {
char *opt = Getline("More (y/n)? [y]");
if (opt[0] == 'n')
break;
}
if (tolog <= 0)
break;
wanted = (tolog > sizeof(line)) ? sizeof(line) : tolog;
} else {
if (line[r-1] == '\n') line[r-1] = 0;
LogMessage(line, kFALSE);
}
}
if (!SendingLogToWindow()) {
if (write(fileno(stdout), "\n", 1) != 1)
SysError("ShowLogFile", "error writing to stdout");
}
if (qry > -1)
lseek(fileno(fLogFileR), (off_t) nowlog, SEEK_SET);
}
void TProof::cd(Int_t id)
{
if (GetManager()) {
TProofDesc *d = GetManager()->GetProofDesc(id);
if (d) {
if (d->GetProof()) {
gProof = d->GetProof();
return;
}
}
gProof = this;
}
return;
}
void TProof::Detach(Option_t *opt)
{
if (!IsValid()) return;
TSlave *sl = (TSlave *) fActiveSlaves->First();
TSocket *s = 0;
if (!sl || !(sl->IsValid()) || !(s = sl->GetSocket())) {
Error("Detach","corrupted worker instance: wrk:%p, sock:%p", sl, s);
return;
}
Bool_t shutdown = (strchr(opt,'s') || strchr(opt,'S')) ? kTRUE : kFALSE;
if (shutdown && !IsIdle()) {
Remove("cleanupqueue");
Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
timeout = (timeout > 20) ? timeout : 20;
StopProcess(kFALSE, (Long_t) (timeout / 2));
Collect(kActive, timeout);
}
DeActivateAsyncInput();
sl->FlushSocket();
Close(opt);
if (fProgressDialogStarted)
CloseProgressDialog();
if (GetManager() && GetManager()->QuerySessions("L")) {
TIter nxd(GetManager()->QuerySessions("L"));
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
if (d->GetProof() == this) {
d->SetProof(0);
GetManager()->QuerySessions("L")->Remove(d);
break;
}
}
}
if ((!fProgressDialogStarted) && !TestBit(kUsingSessionGui))
delete this;
else
fValid = kFALSE;
return;
}
void TProof::SetAlias(const char *alias)
{
TNamed::SetTitle(alias);
if (TestBit(TProof::kIsMaster))
TNamed::SetName(alias);
if (!IsValid()) return;
if (!IsProofd() && TestBit(TProof::kIsClient)) {
TSlave *sl = (TSlave *) fActiveSlaves->First();
if (sl)
sl->SetAlias(alias);
}
return;
}
Int_t TProof::UploadDataSet(const char *dataSetName,
TList *files,
const char *desiredDest,
Int_t opt,
TList *skippedFiles)
{
if (fProtocol < 15) {
Info("UploadDataSet", "functionality not available: the server has an"
" incompatible version of TFileInfo");
return -1;
}
if (IsLite()) {
Info("UploadDataSet", "Lite-session: functionality not needed - do nothing");
return -1;
}
if (strchr(dataSetName, '/')) {
if (strstr(dataSetName, "public") != dataSetName) {
Error("UploadDataSet",
"Name of public dataset should start with public/");
return kError;
}
}
if ((opt & kOverwriteAllFiles && opt & kOverwriteNoFiles) ||
(opt & kNoOverwriteDataSet && opt & kAppend) ||
(opt & kOverwriteDataSet && opt & kAppend) ||
(opt & kNoOverwriteDataSet && opt & kOverwriteDataSet) ||
(opt & kAskUser && opt & (kOverwriteDataSet | kNoOverwriteDataSet | kAppend |
kOverwriteAllFiles | kOverwriteNoFiles))) {
Error("UploadDataSet", "you specified contradicting options.");
return kError;
}
Int_t overwriteAll = (opt & kOverwriteAllFiles) ? kTRUE : kFALSE;
Int_t overwriteNone = (opt & kOverwriteNoFiles) ? kTRUE : kFALSE;
Int_t goodName = (opt & (kOverwriteDataSet | kAppend)) ? 1 : -1;
Int_t appendToDataSet = (opt & kAppend) ? kTRUE : kFALSE;
Int_t overwriteNoDataSet = (opt & kNoOverwriteDataSet) ? kTRUE : kFALSE;
if (!skippedFiles && overwriteNone) {
Error("UploadDataSet",
"Provide pointer to TList object as skippedFiles argument when using kOverwriteNoFiles option.");
return kError;
}
if (skippedFiles) {
if (skippedFiles->Class() != TList::Class()) {
Error("UploadDataSet",
"Provided skippedFiles argument does not point to a TList object.");
return kError;
}
}
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("UploadDataSet", "No connection to the master!");
return kError;
}
Int_t fileCount = 0;
if (goodName == -1) {
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kCheckDataSetName);
nameMess << TString(dataSetName);
Broadcast(nameMess);
Collect(kActive, fCollectTimeout);
if (fStatus == -1) {
while (goodName == -1 && !overwriteNoDataSet) {
Info("UploadDataSet", "dataset %s already exist. ",
dataSetName);
Info("UploadDataSet", "do you want to overwrite it[Yes/No/Append]?");
TString answer;
answer.ReadToken(cin);
if (!strncasecmp(answer.Data(), "y", 1)) {
goodName = 1;
} else if (!strncasecmp(answer.Data(), "n", 1)) {
goodName = 0;
} else if (!strncasecmp(answer.Data(), "a", 1)) {
goodName = 1;
appendToDataSet = kTRUE;
}
}
} else {
goodName = 1;
}
}
if (goodName == 1) {
char *relativeDestDir = Form("%s/%s/",
gSystem->GetUserInfo()->fUser.Data(),
desiredDest?desiredDest:"");
relativeDestDir = CollapseSlashesInPath(relativeDestDir);
TString dest = Form("%s/%s", GetDataPoolUrl(), relativeDestDir);
delete[] relativeDestDir;
TFileCollection *fileList = new TFileCollection();
TIter next(files);
while (TFileInfo *fileInfo = ((TFileInfo*)next())) {
TUrl *fileUrl = fileInfo->GetFirstUrl();
if (gSystem->AccessPathName(fileUrl->GetUrl()) == kFALSE) {
const char *ent = gSystem->BaseName(fileUrl->GetFile());
Int_t goodFileName = 1;
if (!overwriteAll &&
gSystem->AccessPathName(Form("%s/%s", dest.Data(), ent), kFileExists)
== kFALSE) {
goodFileName = -1;
while (goodFileName == -1 && !overwriteAll && !overwriteNone) {
Info("UploadDataSet", "file %s already exists. ", Form("%s/%s", dest.Data(), ent));
Info("UploadDataSet", "do you want to overwrite it [Yes/No/all/none]?");
TString answer;
answer.ReadToken(cin);
if (!strncasecmp(answer.Data(), "y", 1))
goodFileName = 1;
else if (!strncasecmp(answer.Data(), "all", 3))
overwriteAll = kTRUE;
else if (!strncasecmp(answer.Data(), "none", 4))
overwriteNone = kTRUE;
else if (!strncasecmp(answer.Data(), "n", 1))
goodFileName = 0;
}
}
if (goodFileName == 1 || overwriteAll) {
Info("UploadDataSet", "Uploading %s to %s/%s",
fileUrl->GetUrl(), dest.Data(), ent);
if (TFile::Cp(fileUrl->GetUrl(), Form("%s/%s", dest.Data(), ent))) {
fileList->GetList()->Add(new TFileInfo(Form("%s/%s", dest.Data(), ent)));
} else
Error("UploadDataSet", "file %s was not copied", fileUrl->GetUrl());
} else {
fileList->GetList()->Add(new TFileInfo(Form("%s/%s", dest.Data(), ent)));
if (skippedFiles) {
skippedFiles->Add(new TFileInfo(fileUrl->GetUrl()));
}
}
}
}
if ((fileCount = fileList->GetList()->GetSize()) == 0) {
Info("UploadDataSet", "no files were copied. The dataset will not be saved");
} else {
TString o = (appendToDataSet) ? "" : "O";
if (!RegisterDataSet(dataSetName, fileList, o)) {
Error("UploadDataSet", "Error while saving dataset: %s", dataSetName);
fileCount = kError;
}
}
delete fileList;
} else if (overwriteNoDataSet) {
Info("UploadDataSet", "dataset %s already exists", dataSetName);
return kDataSetExists;
}
return fileCount;
}
Int_t TProof::UploadDataSet(const char *dataSetName,
const char *files,
const char *desiredDest,
Int_t opt,
TList *skippedFiles)
{
if (fProtocol < 15) {
Info("UploadDataSet", "functionality not available: the server has an"
" incompatible version of TFileInfo");
return -1;
}
TList fileList;
fileList.SetOwner();
void *dataSetDir = gSystem->OpenDirectory(gSystem->DirName(files));
const char* ent;
TString filesExp(gSystem->BaseName(files));
filesExp.ReplaceAll("*",".*");
TRegexp rg(filesExp);
while ((ent = gSystem->GetDirEntry(dataSetDir))) {
TString entryString(ent);
if (entryString.Index(rg) != kNPOS) {
TString u(Form("file://%s/%s", gSystem->DirName(files), ent));
if (gSystem->AccessPathName(u, kReadPermission) == kFALSE)
fileList.Add(new TFileInfo(u));
}
}
Int_t fileCount;
if ((fileCount = fileList.GetSize()) == 0)
Printf("No files match your selection. The dataset will not be saved");
else
fileCount = UploadDataSet(dataSetName, &fileList, desiredDest,
opt, skippedFiles);
return fileCount;
}
Int_t TProof::UploadDataSetFromFile(const char *dataset, const char *file,
const char *dest, Int_t opt,
TList *skippedFiles)
{
if (fProtocol < 15) {
Info("UploadDataSetFromFile", "functionality not available: the server has an"
" incompatible version of TFileInfo");
return -1;
}
Int_t fileCount = -1;
TList fileList;
fileList.SetOwner();
ifstream f;
f.open(gSystem->ExpandPathName(file), ifstream::out);
if (f.is_open()) {
while (f.good()) {
TString line;
line.ReadToDelim(f);
line.Strip(TString::kTrailing, '\n');
if (gSystem->AccessPathName(line, kReadPermission) == kFALSE)
fileList.Add(new TFileInfo(line));
}
f.close();
if ((fileCount = fileList.GetSize()) == 0)
Info("UploadDataSetFromFile",
"no files match your selection. The dataset will not be saved");
else
fileCount = UploadDataSet(dataset, &fileList, dest,
opt, skippedFiles);
} else {
Error("UploadDataSetFromFile", "unable to open the specified file");
}
return fileCount;
}
Bool_t TProof::RegisterDataSet(const char *dataSetName,
TFileCollection *dataSet, const char* optStr)
{
if (fProtocol < 17) {
Info("RegisterDataSet",
"functionality not available: the server does not have dataset support");
return kFALSE;
}
if (!dataSetName || strlen(dataSetName) <= 0) {
Info("RegisterDataSet", "specifying a dataset name is mandatory");
return kFALSE;
}
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("RegisterDataSet", "No connection to the master!");
return kFALSE;
}
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kRegisterDataSet);
mess << TString(dataSetName);
mess << TString(optStr);
mess.WriteObject(dataSet);
Broadcast(mess);
Bool_t result = kTRUE;
Collect();
if (fStatus != 0) {
Error("RegisterDataSet", "dataset was not saved");
result = kFALSE;
}
return result;
}
Int_t TProof::SetDataSetTreeName(const char *dataset, const char *treename)
{
if (fProtocol < 23) {
Info("SetDataSetTreeName", "functionality not supported by the server");
return -1;
}
if (!dataset || strlen(dataset) <= 0) {
Info("SetDataSetTreeName", "specifying a dataset name is mandatory");
return -1;
}
if (!treename || strlen(treename) <= 0) {
Info("SetDataSetTreeName", "specifying a tree name is mandatory");
return -1;
}
TUri uri(dataset);
TString fragment(treename);
if (!fragment.BeginsWith("/")) fragment.Insert(0, "/");
uri.SetFragment(fragment);
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kSetDefaultTreeName);
mess << uri.GetUri();
Broadcast(mess);
Collect();
if (fStatus != 0) {
Error("SetDataSetTreeName", "some error occured: default tree name not changed");
return -1;
}
return 0;
}
TMap *TProof::GetDataSets(const char *uri, const char* optStr)
{
if (fProtocol < 15) {
Info("GetDataSets",
"functionality not available: the server does not have dataset support");
return 0;
}
TSocket *master = 0;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("GetDataSets", "no connection to the master!");
return 0;
}
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kGetDataSets);
mess << TString(uri?uri:"");
mess << TString(optStr?optStr:"");
Broadcast(mess);
Collect(kActive, fCollectTimeout);
TMap *dataSetMap = 0;
if (fStatus != 0) {
Error("GetDataSets", "error receiving datasets information");
} else {
TMessage *retMess = (TMessage *) fRecvMessages->First();
if (retMess && retMess->What() == kMESS_OK) {
if (!(dataSetMap = (TMap *)(retMess->ReadObject(TMap::Class()))))
Error("GetDataSets", "error receiving datasets");
} else
Error("GetDataSets", "message not found or wrong type (%p)", retMess);
}
return dataSetMap;
}
void TProof::ShowDataSets(const char *uri, const char* optStr)
{
if (fProtocol < 15) {
Info("ShowDataSets",
"functionality not available: the server does not have dataset support");
return;
}
TSocket *master = 0;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("ShowDataSets",
"no connection to the master!");
return;
}
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kShowDataSets);
mess << TString(uri ? uri : "");
mess << TString(optStr ? optStr : "");
Broadcast(mess);
Collect(kActive, fCollectTimeout);
if (fStatus != 0)
Error("ShowDataSets", "error receiving datasets information");
}
Bool_t TProof::ExistsDataSet(const char *dataset)
{
if (fProtocol < 15) {
Info("ExistsDataSet", "functionality not available: the server has an"
" incompatible version of TFileInfo");
return kFALSE;
}
if (!dataset || strlen(dataset) <= 0) {
Error("ExistsDataSet", "dataset name missing");
return kFALSE;
}
TMessage msg(kPROOF_DATASETS);
msg << Int_t(kCheckDataSetName) << TString(dataset);
Broadcast(msg);
Collect(kActive, fCollectTimeout);
if (fStatus == -1) {
return kTRUE;
}
return kFALSE;
}
TFileCollection *TProof::GetDataSet(const char *uri, const char* optStr)
{
if (fProtocol < 15) {
Info("GetDataSet", "functionality not available: the server has an"
" incompatible version of TFileInfo");
return 0;
}
if (!uri || strlen(uri) <= 0) {
Info("GetDataSet", "specifying a dataset name is mandatory");
return 0;
}
TSocket *master = 0;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("GetDataSet", "no connection to the master!");
return 0;
}
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kGetDataSet);
nameMess << TString(uri?uri:"");
nameMess << TString(optStr?optStr:"");
if (Broadcast(nameMess) < 0)
Error("GetDataSet", "sending request failed");
Collect(kActive, fCollectTimeout);
TFileCollection *fileList = 0;
if (fStatus != 0) {
Error("GetDataSet", "error receiving datasets information");
} else {
TMessage *retMess = (TMessage *) fRecvMessages->First();
if (retMess && retMess->What() == kMESS_OK) {
if (!(fileList = (TFileCollection*)(retMess->ReadObject(TFileCollection::Class()))))
Error("GetDataSet", "error reading list of files");
} else
Error("GetDataSet", "message not found or wrong type (%p)", retMess);
}
return fileList;
}
void TProof::ShowDataSet(const char *uri, const char* opt)
{
TFileCollection *fileList = 0;
if ((fileList = GetDataSet(uri))) {
fileList->Print(opt);
delete fileList;
} else
Warning("ShowDataSet","no such dataset: %s", uri);
}
Int_t TProof::RemoveDataSet(const char *uri, const char* optStr)
{
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("RemoveDataSet", "no connection to the master!");
return kError;
}
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kRemoveDataSet);
nameMess << TString(uri?uri:"");
nameMess << TString(optStr?optStr:"");
if (Broadcast(nameMess) < 0)
Error("RemoveDataSet", "sending request failed");
Collect(kActive, fCollectTimeout);
if (fStatus != 0)
return -1;
else
return 0;
}
TList* TProof::FindDataSets(const char* , const char* )
{
Error ("FindDataSets", "not yet implemented");
return (TList *) 0;
}
Int_t TProof::VerifyDataSet(const char *uri, const char* optStr)
{
if (fProtocol < 15) {
Info("VerifyDataSet", "functionality not available: the server has an"
" incompatible version of TFileInfo");
return kError;
}
Int_t nMissingFiles = 0;
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("VerifyDataSet", "no connection to the master!");
return kError;
}
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kVerifyDataSet);
nameMess << TString(uri ? uri : "");
nameMess << TString(optStr ? optStr : "");
Broadcast(nameMess);
Collect(kActive, fCollectTimeout);
if (fStatus < 0) {
Info("VerifyDataSet", "no such dataset %s", uri);
return -1;
} else
nMissingFiles = fStatus;
return nMissingFiles;
}
TMap *TProof::GetDataSetQuota(const char* optStr)
{
if (IsLite()) {
Info("UploadDataSet", "Lite-session: functionality not implemented");
return (TMap *)0;
}
TSocket *master = 0;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("GetDataSetQuota", "no connection to the master!");
return 0;
}
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kGetQuota);
mess << TString(optStr?optStr:"");
Broadcast(mess);
Collect(kActive, fCollectTimeout);
TMap *groupQuotaMap = 0;
if (fStatus < 0) {
Info("GetDataSetQuota", "could not receive quota");
} else {
TMessage *retMess = (TMessage *) fRecvMessages->First();
if (retMess && retMess->What() == kMESS_OK) {
if (!(groupQuotaMap = (TMap*)(retMess->ReadObject(TMap::Class()))))
Error("GetDataSetQuota", "error getting quotas");
} else
Error("GetDataSetQuota", "message not found or wrong type (%p)", retMess);
}
return groupQuotaMap;
}
void TProof::ShowDataSetQuota(Option_t* opt)
{
if (fProtocol < 15) {
Info("ShowDataSetQuota",
"functionality not available: the server does not have dataset support");
return;
}
if (IsLite()) {
Info("UploadDataSet", "Lite-session: functionality not implemented");
return;
}
TSocket *master = 0;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("ShowDataSetQuota", "no connection to the master!");
return;
}
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kShowQuota);
mess << TString(opt?opt:"");
Broadcast(mess);
Collect();
if (fStatus != 0)
Error("ShowDataSetQuota", "error receiving quota information");
}
void TProof::InterruptCurrentMonitor()
{
if (fCurrentMonitor)
fCurrentMonitor->Interrupt();
}
void TProof::ActivateWorker(const char *ord)
{
ModifyWorkerLists(ord, kTRUE);
}
void TProof::DeactivateWorker(const char *ord)
{
ModifyWorkerLists(ord, kFALSE);
}
void TProof::ModifyWorkerLists(const char *ord, Bool_t add)
{
if (!ord || strlen(ord) <= 0) {
Info("ModifyWorkerLists",
"An ordinal number - e.g. \"0.4\" or \"*\" for all - is required as input");
return;
}
Bool_t fw = kTRUE;
Bool_t rs = kFALSE;
TList *in = (add) ? fInactiveSlaves : fActiveSlaves;
TList *out = (add) ? fActiveSlaves : fInactiveSlaves;
if (TestBit(TProof::kIsMaster)) {
fw = IsEndMaster() ? kFALSE : kTRUE;
if (in->GetSize() > 0) {
TIter nxw(in);
TSlave *wrk = 0;
while ((wrk = (TSlave *) nxw())) {
if (ord[0] == '*' || !strncmp(wrk->GetOrdinal(), ord, strlen(ord))) {
if (!out->FindObject(wrk)) {
out->Add(wrk);
if (add)
fActiveMonitor->Add(wrk->GetSocket());
}
in->Remove(wrk);
if (!add) {
fActiveMonitor->Remove(wrk->GetSocket());
wrk->SetStatus(TSlave::kInactive);
} else
wrk->SetStatus(TSlave::kActive);
fw = kFALSE;
rs = kTRUE;
if (ord[0] != '*')
break;
}
}
}
}
if (rs)
FindUniqueSlaves();
Int_t action = (add) ? (Int_t) kActivateWorker : (Int_t) kDeactivateWorker;
if (fw) {
TMessage mess(kPROOF_WORKERLISTS);
mess << action << TString(ord);
Broadcast(mess);
Collect(kActive, fCollectTimeout);
}
}
TProof *TProof::Open(const char *cluster, const char *conffile,
const char *confdir, Int_t loglevel)
{
const char *pn = "TProof::Open";
if (!cluster) {
TPluginManager *pm = gROOT->GetPluginManager();
if (!pm) {
::Error(pn, "plugin manager not found");
return 0;
}
if (gROOT->IsBatch()) {
::Error(pn, "we are in batch mode, cannot show PROOF Session Viewer");
return 0;
}
TPluginHandler *sv = pm->FindHandler("TSessionViewer", "");
if (!sv) {
::Error(pn, "no plugin found for TSessionViewer");
return 0;
}
if (sv->LoadPlugin() == -1) {
::Error(pn, "plugin for TSessionViewer could not be loaded");
return 0;
}
sv->ExecPlugin(0);
return 0;
} else {
TString clst(cluster);
if (clst.BeginsWith("workers=") || clst.BeginsWith("tunnel="))
clst.Insert(0, "/?");
TUrl u(clst);
TString opts(u.GetOptions());
if (!opts.IsNull()) {
Int_t it = opts.Index("tunnel=");
if (it != kNPOS) {
TString sport = opts(it + strlen("tunnel="), opts.Length());
TString host("127.0.0.1");
Int_t port = -1;
Int_t ic = sport.Index(":");
if (ic != kNPOS) {
host = sport(0, ic);
sport.Remove(0, ic + 1);
}
if (!sport.IsDigit()) {
TRegexp re("[^0-9]");
Int_t ind = sport.Index(re);
if (ind != kNPOS)
sport.Remove(ind);
}
if (sport.IsDigit())
port = sport.Atoi();
if (port > 0) {
::Info("TProof::Open","using tunnel at %s:%d", host.Data(), port);
gEnv->SetValue("XNet.SOCKS4Host", host);
gEnv->SetValue("XNet.SOCKS4Port", port);
} else {
::Warning("TProof::Open",
"problems parsing tunnelling info from options: %s", opts.Data());
}
}
}
Int_t locid = -1;
Bool_t create = kFALSE;
if (opts.Length() > 0) {
if (opts.BeginsWith("N",TString::kIgnoreCase)) {
create = kTRUE;
} else if (opts.IsDigit()) {
locid = opts.Atoi();
}
}
TProofMgr *mgr = TProofMgr::Create(u.GetUrl());
TProof *proof = 0;
if (mgr && mgr->IsValid()) {
Bool_t attach = (create || mgr->IsProofd() || mgr->IsLite()) ? kFALSE : kTRUE;
if (attach) {
TProofDesc *d = 0;
if (locid < 0)
d = (TProofDesc *) mgr->QuerySessions("")->First();
else
d = (TProofDesc *) mgr->GetProofDesc(locid);
if (d) {
proof = (TProof*) mgr->AttachSession(d);
if (!proof || !proof->IsValid()) {
if (locid)
::Error(pn, "new session could not be attached");
SafeDelete(proof);
}
}
}
if (!proof) {
proof = (TProof*) mgr->CreateSession(conffile, confdir, loglevel);
if (!proof || !proof->IsValid()) {
::Error(pn, "new session could not be created");
SafeDelete(proof);
}
}
}
return proof;
}
}
TProofMgr *TProof::Mgr(const char *url)
{
if (!url)
return (TProofMgr *)0;
return TProofMgr::Create(url);
}
void TProof::Reset(const char *url, Bool_t hard)
{
if (url) {
TProofMgr *mgr = TProof::Mgr(url);
if (mgr && mgr->IsValid())
mgr->Reset(hard);
else
::Error("TProof::Reset",
"unable to initialize a valid manager instance");
}
}
const TList *TProof::GetEnvVars()
{
return fgProofEnvList;
}
void TProof::AddEnvVar(const char *name, const char *value)
{
if (gDebug > 0) ::Info("TProof::AddEnvVar","%s=%s", name, value);
if (fgProofEnvList == 0) {
fgProofEnvList = new TList;
fgProofEnvList->SetOwner();
} else {
TObject *o = fgProofEnvList->FindObject(name);
if (o != 0) {
fgProofEnvList->Remove(o);
}
}
fgProofEnvList->Add(new TNamed(name, value));
}
void TProof::DelEnvVar(const char *name)
{
if (fgProofEnvList == 0) return;
TObject *o = fgProofEnvList->FindObject(name);
if (o != 0) {
fgProofEnvList->Remove(o);
}
}
void TProof::ResetEnvVars()
{
if (fgProofEnvList == 0) return;
SafeDelete(fgProofEnvList);
}
void TProof::SaveWorkerInfo()
{
if (TestBit(TProof::kIsClient))
return;
if (!gProofServ) {
Error("SaveWorkerInfo","gProofServ undefined");
return;
}
if (!fSlaves && !fBadSlaves) {
Warning("SaveWorkerInfo","all relevant worker lists is undefined");
return;
}
TString fnwrk = Form("%s/.workers",
gSystem->DirName(gProofServ->GetSessionDir()));
FILE *fwrk = fopen(fnwrk.Data(),"w");
if (!fwrk) {
Error("SaveWorkerInfo",
"cannot open %s for writing (errno: %d)", fnwrk.Data(), errno);
return;
}
TString addlogext;
if (gSystem->Getenv("PROOF_ADDITIONALLOG")) {
addlogext = gSystem->Getenv("PROOF_ADDITIONALLOG");
if (gDebug > 0)
Info("SaveWorkerInfo", "request for additional line with ext: '%s'", addlogext.Data());
}
TIter nxa(fSlaves);
TSlave *wrk = 0;
while ((wrk = (TSlave *) nxa())) {
Int_t status = (fBadSlaves && fBadSlaves->FindObject(wrk)) ? 0 : 1;
fprintf(fwrk,"%s@%s:%d %d %s %s.log\n",
wrk->GetUser(), wrk->GetName(), wrk->GetPort(), status,
wrk->GetOrdinal(), wrk->GetWorkDir());
if (addlogext.Length() > 0) {
fprintf(fwrk,"%s@%s:%d %d %s %s.%s\n",
wrk->GetUser(), wrk->GetName(), wrk->GetPort(), status,
wrk->GetOrdinal(), wrk->GetWorkDir(), addlogext.Data());
}
}
fclose(fwrk);
return;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, TString &value)
{
TObject *obj = c->FindObject(par);
if (obj) {
TNamed *p = dynamic_cast<TNamed*>(obj);
if (p) {
value = p->GetTitle();
return 0;
}
}
return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Int_t &value)
{
TObject *obj = c->FindObject(par);
if (obj) {
TParameter<Int_t> *p = dynamic_cast<TParameter<Int_t>*>(obj);
if (p) {
value = p->GetVal();
return 0;
}
}
return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Long_t &value)
{
TObject *obj = c->FindObject(par);
if (obj) {
TParameter<Long_t> *p = dynamic_cast<TParameter<Long_t>*>(obj);
if (p) {
value = p->GetVal();
return 0;
}
}
return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Long64_t &value)
{
TObject *obj = c->FindObject(par);
if (obj) {
TParameter<Long64_t> *p = dynamic_cast<TParameter<Long64_t>*>(obj);
if (p) {
value = p->GetVal();
return 0;
}
}
return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Double_t &value)
{
TObject *obj = c->FindObject(par);
if (obj) {
TParameter<Double_t> *p = dynamic_cast<TParameter<Double_t>*>(obj);
if (p) {
value = p->GetVal();
return 0;
}
}
return -1;
}
Int_t TProof::AssertDataSet(TDSet *dset, TList *input,
TDataSetManager *mgr, TString &emsg)
{
emsg = "";
if (!dset || !input || !mgr) {
emsg.Form("invalid inputs (%p, %p, %p)", dset, input, mgr);
return -1;
}
TFileCollection* dataset = 0;
TString lookupopt;
TString dsname(dset->GetName());
if (dsname.BeginsWith("TFileCollection:")) {
dsname.ReplaceAll("TFileCollection:", "");
dataset = (TFileCollection *) input->FindObject(dsname);
if (!dataset) {
emsg.Form("TFileCollection %s not found in input list", dset->GetName());
return -1;
}
input->RecursiveRemove(dataset);
if (TProof::GetParameter(input, "PROOF_LookupOpt", lookupopt) != 0) {
lookupopt = gEnv->GetValue("Proof.LookupOpt", "all");
input->Add(new TNamed("PROOF_LookupOpt", lookupopt.Data()));
}
}
if (!dataset) {
dataset = mgr->GetDataSet(dsname.Data());
if (!dataset) {
emsg.Form("no such dataset on the master: %s", dsname.Data());
return -1;
}
if (TProof::GetParameter(input, "PROOF_LookupOpt", lookupopt) != 0) {
lookupopt = gEnv->GetValue("Proof.LookupOpt", "stagedOnly");
input->Add(new TNamed("PROOF_LookupOpt", lookupopt.Data()));
}
}
TString dsTree;
mgr->ParseUri(dsname.Data(), 0, 0, 0, &dsTree);
if (dsTree.IsNull()) {
dsTree += dset->GetDirectory();
dsTree += dset->GetObjName();
}
if (!dsTree.IsNull() && dsTree != "/") {
TString tree(dsTree);
Int_t idx = tree.Index("/");
if (idx != kNPOS) {
TString dir = tree(0, idx+1);
tree.Remove(0, idx);
dset->SetDirectory(dir);
}
dset->SetObjName(tree);
} else {
dsTree = dataset->GetDefaultTreeName();
}
if (dataset) {
TList *missingFiles = new TList;
TSeqCollection* files = dataset->GetList();
if (gDebug > 0) files->Print();
Bool_t availableOnly = (lookupopt != "all") ? kTRUE : kFALSE;
if (!dset->Add(files, dsTree, availableOnly, missingFiles)) {
emsg.Form("error retrieving dataset %s", dset->GetName());
delete dataset;
return -1;
}
if (missingFiles) {
TIter next(missingFiles);
TObject *file;
while ((file = next()))
dataset->GetList()->Remove(file);
}
delete dataset;
if (missingFiles && missingFiles->GetSize() > 0) {
missingFiles->SetName("MissingFiles");
input->Add(missingFiles);
}
}
return 0;
}
Int_t TProof::SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg)
{
TList *input = 0;
if (!qr || !(input = qr->GetInputList()) ||
!cachedir || strlen(cachedir) <= 0) return 0;
TNamed *data = (TNamed *) input->FindObject("PROOF_InputDataFile");
TList *inputdata = (TList *) input->FindObject("PROOF_InputData");
if (!data && !inputdata) return 0;
if (!data)
input->Add((data = new TNamed("PROOF_InputDataFile", kPROOF_InputDataFile)));
TString dstname(data->GetTitle()), srcname;
Bool_t fromcache = kFALSE;
if (dstname.BeginsWith("cache:")) {
fromcache = kTRUE;
dstname.ReplaceAll("cache:", "");
srcname.Form("%s/%s", cachedir, dstname.Data());
if (gSystem->AccessPathName(srcname)) {
emsg.Form("input data file not found in cache (%s)", srcname.Data());
return -1;
}
}
if (fromcache) {
if (gSystem->CopyFile(srcname, dstname, kTRUE) != 0) {
emsg.Form("problems copying %s to %s", srcname.Data(), dstname.Data());
return -1;
}
} else {
if (inputdata && inputdata->GetSize() > 0) {
TFile *f = TFile::Open(dstname.Data(), "RECREATE");
if (f) {
f->cd();
inputdata->Write();
f->Close();
delete f;
} else {
emsg.Form("could not create %s", dstname.Data());
return -1;
}
} else {
emsg.Form("no input data!");
return -1;
}
}
::Info("TProof::SaveInputData", "input data saved to %s", dstname.Data());
data->SetTitle(dstname);
if (inputdata) {
input->Remove(inputdata);
inputdata->SetOwner();
delete inputdata;
}
return 0;
}
Int_t TProof::SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
{
TList *input = 0;
if (!qr || !(input = qr->GetInputList())) return 0;
TNamed *inputdata = (TNamed *) input->FindObject("PROOF_InputDataFile");
if (!inputdata) return 0;
TString fname(inputdata->GetTitle());
if (gSystem->AccessPathName(fname)) {
emsg.Form("input data file not found in sandbox (%s)", fname.Data());
return -1;
}
if (!p || !p->IsValid()) {
emsg.Form("TProof object undefined or invalid: protocol error!");
return -1;
}
p->BroadcastFile(fname, TProof::kBinary, "cache");
return 0;
}
Int_t TProof::GetInputData(TList *input, const char *cachedir, TString &emsg)
{
if (!input || !cachedir || strlen(cachedir) <= 0) return 0;
TNamed *inputdata = (TNamed *) input->FindObject("PROOF_InputDataFile");
if (!inputdata) return 0;
TString fname;
fname.Form("%s/%s", cachedir, inputdata->GetTitle());
if (gSystem->AccessPathName(fname)) {
emsg.Form("input data file not found in cache (%s)", fname.Data());
return -1;
}
TFile *f = TFile::Open(fname.Data());
if (f) {
TList *keys = (TList *) f->GetListOfKeys();
if (!keys) {
emsg.Form("could not get list of object keys from file");
return -1;
}
TIter nxk(keys);
TKey *k = 0;
while ((k = (TKey *)nxk())) {
TObject *o = f->Get(k->GetName());
if (o) input->Add(o);
}
f->Close();
delete f;
} else {
emsg.Form("could not open %s", fname.Data());
return -1;
}
return 0;
}
void TProof::LogViewer(const char *url, Int_t idx)
{
if (!gROOT->IsBatch()) {
if (!fgLogViewer) {
if ((fgLogViewer =
gROOT->GetPluginManager()->FindHandler("TProofProgressLog"))) {
if (fgLogViewer->LoadPlugin() == -1) {
fgLogViewer = 0;
::Error("TProof::LogViewer", "cannot load the relevant plug-in");
return;
}
}
}
if (fgLogViewer) {
fgLogViewer->ExecPlugin(2, url, idx);
}
} else {
if (url && strlen(url) > 0) {
::Info("TProof::LogViewer",
"batch mode: use TProofLog *pl = TProof::Mgr(\"%s\")->GetSessionLogs(%d)", url, idx);
} else {
::Info("TProof::LogViewer",
"batch mode: use TProofLog *pl = TProof::Mgr(\"<master>\")->GetSessionLogs(%d)", idx);
}
}
return;
}