#include <vector>
#include <fcntl.h>
#include <errno.h>
#ifdef WIN32
# include <io.h>
# include <sys/stat.h>
# include <sys/types.h>
#else
# include <unistd.h>
#endif
#include <vector>
#ifdef R__HAVE_CONFIG
#include "RConfigure.h"
#endif
#include "TProof.h"
#include "TSortedList.h"
#include "TSlave.h"
#include "TMonitor.h"
#include "TMessage.h"
#include "TSystem.h"
#include "TError.h"
#include "TUrl.h"
#include "TFTP.h"
#include "TROOT.h"
#include "TFile.h"
#include "TH1.h"
#include "TProofPlayer.h"
#include "TQueryResult.h"
#include "TDSet.h"
#include "TEnv.h"
#include "TPluginManager.h"
#include "TCondor.h"
#include "Riostream.h"
#include "TTree.h"
#include "TDrawFeedback.h"
#include "TEventList.h"
#include "TMonitor.h"
#include "TBrowser.h"
#include "TChain.h"
#include "TProofServ.h"
#include "TMap.h"
#include "TThread.h"
#include "TSemaphore.h"
#include "TMutex.h"
#include "TObjString.h"
#include "TObjArray.h"
#include "Getline.h"
#include "TProofNodeInfo.h"
#include "TProofResourcesStatic.h"
#include "TInterpreter.h"
#include "TParameter.h"
#include "TRandom.h"
#include "TRegexp.h"
#include "TFileInfo.h"
#include "TFileMerger.h"
TProof *gProof = 0;
TVirtualMutex *gProofMutex = 0;
TList *TProof::fgProofEnvList = 0;
ClassImp(TProof)
TProofThreadArg::TProofThreadArg(const char *h, Int_t po, const char *o,
Int_t pe, const char *i, const char *w,
TList *s, TProof *prf)
: fOrd(o), fPerf(pe), fImage(i), fWorkdir(w),
fSlaves(s), fProof(prf), fCslave(0), fClaims(0),
fType(TSlave::kSlave)
{
fUrl = new TUrl(Form("%s:%d",h,po));
}
TProofThreadArg::TProofThreadArg(TCondorSlave *csl, TList *clist,
TList *s, TProof *prf)
: fUrl(0), fOrd(0), fPerf(-1), fImage(0), fWorkdir(0),
fSlaves(s), fProof(prf), fCslave(csl), fClaims(clist),
fType(TSlave::kSlave)
{
if (csl) {
fUrl = new TUrl(Form("%s:%d",csl->fHostname.Data(),csl->fPort));
fImage = csl->fImage;
fOrd = csl->fOrdinal;
fWorkdir = csl->fWorkDir;
fPerf = csl->fPerfIdx;
}
}
TProofThreadArg::TProofThreadArg(const char *h, Int_t po, const char *o,
const char *i, const char *w, const char *m,
TList *s, TProof *prf)
: fOrd(o), fPerf(-1), fImage(i), fWorkdir(w),
fMsd(m), fSlaves(s), fProof(prf), fCslave(0), fClaims(0),
fType(TSlave::kSlave)
{
fUrl = new TUrl(Form("%s:%d",h,po));
}
Bool_t TProofInterruptHandler::Notify()
{
Info("Notify","Processing interrupt signal ...");
fProof->StopProcess(kTRUE);
fProof->Interrupt(TProof::kLocalInterrupt);
return kTRUE;
}
Bool_t TProofInputHandler::Notify()
{
fProof->CollectInputFrom(fSocket);
return kTRUE;
}
ClassImp(TSlaveInfo)
Int_t TSlaveInfo::Compare(const TObject *obj) const
{
if (!obj) return 1;
const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
if (!si) return fOrdinal.CompareTo(obj->GetName());
const char *myord = GetOrdinal();
const char *otherord = si->GetOrdinal();
while (myord && otherord) {
Int_t myval = atoi(myord);
Int_t otherval = atoi(otherord);
if (myval < otherval) return 1;
if (myval > otherval) return -1;
myord = strchr(myord, '.');
if (myord) myord++;
otherord = strchr(otherord, '.');
if (otherord) otherord++;
}
if (myord) return -1;
if (otherord) return 1;
return 0;
}
void TSlaveInfo::Print(Option_t *opt) const
{
TString stat = fStatus == kActive ? "active" :
fStatus == kBad ? "bad" :
"not active";
TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
if (!opt) opt = "";
if (!strcmp(opt, "active") && fStatus != kActive)
return;
if (!strcmp(opt, "notactive") && fStatus != kNotActive)
return;
if (!strcmp(opt, "bad") && fStatus != kBad)
return;
cout << "Slave: " << fOrdinal
<< " hostname: " << fHostName
<< " msd: " << msd
<< " perf index: " << fPerfIndex
<< " " << stat
<< endl;
}
static char *CollapseSlashesInPath(const char *path)
{
if (path) {
Int_t i = 1;
Int_t j = 0;
char *newPath = new char [strlen(path) + 1];
newPath[0] = path[0];
while (path[i]) {
if (path[i] != '/' || newPath[j] != '/') {
j++;
newPath[j] = path[i];
}
i++;
}
if (newPath[j] != '/')
j++;
newPath[j] = 0;
return newPath;
}
return 0;
}
ClassImp(TProof)
TSemaphore *TProof::fgSemaphore = 0;
TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
Int_t loglevel, const char *alias, TProofMgr *mgr)
: fUrl(masterurl)
{
fManager = mgr;
fServType = TProofMgr::kXProofd;
if (!conffile || strlen(conffile) == 0)
conffile = kPROOF_ConfFile;
if (!confdir || strlen(confdir) == 0)
confdir = kPROOF_ConfDir;
Init(masterurl, conffile, confdir, loglevel, alias);
if (mgr) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(mgr);
gROOT->GetListOfSockets()->Add(mgr);
}
if (IsProofd() || IsMaster())
gROOT->GetListOfProofs()->Add(this);
gProof = this;
}
TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
{
gROOT->GetListOfProofs()->Add(this);
gProof = this;
}
TProof::~TProof()
{
while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
chain->SetProof(0);
}
Close();
SafeDelete(fIntHandler);
SafeDelete(fSlaves);
SafeDelete(fActiveSlaves);
SafeDelete(fInactiveSlaves);
SafeDelete(fUniqueSlaves);
SafeDelete(fAllUniqueSlaves);
SafeDelete(fNonUniqueMasters);
SafeDelete(fBadSlaves);
SafeDelete(fAllMonitor);
SafeDelete(fActiveMonitor);
SafeDelete(fUniqueMonitor);
SafeDelete(fAllUniqueMonitor);
SafeDelete(fSlaveInfo);
SafeDelete(fChains);
SafeDelete(fPlayer);
SafeDelete(fFeedback);
SafeDelete(fWaitingSlaves);
SafeDelete(fAvailablePackages);
SafeDelete(fEnabledPackages);
SafeDelete(fEnabledPackagesOnClient);
SafeDelete(fPackageLock);
if (!IsMaster()) {
if (fLogFileR)
fclose(fLogFileR);
if (fLogFileW)
fclose(fLogFileW);
if (fLogFileName.Length())
gSystem->Unlink(fLogFileName);
}
Emit("~TProof()");
}
Int_t TProof::Init(const char *masterurl, const char *conffile,
const char *confdir, Int_t loglevel, const char *alias)
{
R__ASSERT(gSystem);
fValid = kFALSE;
if (strlen(fUrl.GetOptions()) > 0 && !(strncmp(fUrl.GetOptions(),"std",3))) {
fServType = TProofMgr::kProofd;
fUrl.SetOptions("");
}
if (!masterurl || !*masterurl) {
fUrl.SetProtocol("proof");
fUrl.SetHost("__master__");
} else if (!(strstr(masterurl, "://"))) {
fUrl.SetProtocol("proof");
}
if (fUrl.GetPort() == TUrl(" ").GetPort())
fUrl.SetPort(TUrl("proof:// ").GetPort());
Bool_t attach = kFALSE;
if (strlen(fUrl.GetOptions()) > 0) {
attach = kTRUE;
TString opts = fUrl.GetOptions();
if (opts.Contains("GUI")) {
SetBit(TProof::kUsingSessionGui);
opts.Remove(opts.Index("GUI"));
fUrl.SetOptions(opts);
}
}
if (strlen(fUrl.GetUser()) <= 0) {
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUrl.SetUser(pw->fUser);
delete pw;
}
}
if (!strlen(fUrl.GetHost()))
fMaster = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
else
fMaster = gSystem->GetHostByName(fUrl.GetHost()).GetHostName();
fConfDir = confdir;
fConfFile = conffile;
fWorkDir = gSystem->WorkingDirectory();
fLogLevel = loglevel;
fProtocol = kPROOF_Protocol;
fMasterServ = (fMaster == "__master__") ? kTRUE : kFALSE;
fSendGroupView = kTRUE;
fImage = fMasterServ ? "" : "<local>";
fIntHandler = 0;
fStatus = 0;
fSlaveInfo = 0;
fChains = new TList;
fAvailablePackages = 0;
fEnabledPackages = 0;
fEndMaster = IsMaster() ? kTRUE : kFALSE;
if (!IsMaster())
fDataPoolUrl.Form("root://%s", fMaster.Data());
else
fDataPoolUrl = "";
fProgressDialog = 0;
fProgressDialogStarted = kFALSE;
TString al = (alias) ? alias : fMaster.Data();
SetAlias(al);
fRedirLog = kFALSE;
if (!IsMaster()) {
fLogFileName = "ProofLog_";
if ((fLogFileW = gSystem->TempFileName(fLogFileName)) == 0)
Error("Init", "could not create temporary logfile");
if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
Error("Init", "could not open temp logfile for reading");
}
fLogToWindowOnly = kFALSE;
fIdle = kTRUE;
fSync = kTRUE;
fQueries = 0;
fOtherQueries = 0;
fDrawQueries = 0;
fMaxDrawQueries = 1;
fSeqNum = 0;
fSessionID = -1;
fWaitingSlaves = 0;
fPlayer = MakePlayer();
fFeedback = new TList;
fFeedback->SetOwner();
fFeedback->SetName("FeedbackList");
AddInput(fFeedback);
fSlaves = new TSortedList(kSortDescending);
fActiveSlaves = new TList;
fInactiveSlaves = new TList;
fUniqueSlaves = new TList;
fAllUniqueSlaves = new TList;
fNonUniqueMasters = new TList;
fBadSlaves = new TList;
fAllMonitor = new TMonitor;
fActiveMonitor = new TMonitor;
fUniqueMonitor = new TMonitor;
fAllUniqueMonitor = new TMonitor;
fCurrentMonitor = 0;
fPackageLock = 0;
fEnabledPackagesOnClient = 0;
if (!IsMaster()) {
fPackageDir = kPROOF_WorkDir;
gSystem->ExpandPathName(fPackageDir);
if (gSystem->AccessPathName(fPackageDir)) {
if (gSystem->MakeDirectory(fPackageDir) == -1) {
Error("Init", "failure creating directory %s", fPackageDir.Data());
return 0;
}
}
fPackageDir += TString("/") + kPROOF_PackDir;
if (gSystem->AccessPathName(fPackageDir)) {
if (gSystem->MakeDirectory(fPackageDir) == -1) {
Error("Init", "failure creating directory %s", fPackageDir.Data());
return 0;
}
}
UserGroup_t *ug = gSystem->GetUserInfo();
fPackageLock = new TProofLockPath(Form("%s%s", kPROOF_PackageLockFile, ug->fUser.Data()));
delete ug;
fEnabledPackagesOnClient = new TList;
fEnabledPackagesOnClient->SetOwner();
}
Bool_t parallelStartup = kFALSE;
if (!attach && IsMaster()) {
parallelStartup = gEnv->GetValue("Proof.ParallelStartup", kFALSE);
PDB(kGlobal,1) Info("Init", "Parallel Startup: %s",
parallelStartup ? "kTRUE" : "kFALSE");
if (parallelStartup) {
#ifdef ROOTLIBDIR
TString threadLib = TString(ROOTLIBDIR) + "/libThread";
#else
TString threadLib = TString(gRootDir) + "/lib/libThread";
#endif
char *p;
if ((p = gSystem->DynamicPathName(threadLib, kTRUE))) {
delete[]p;
if (gSystem->Load(threadLib) == -1) {
Warning("Init",
"Cannot load libThread: switch to serial startup (%s)",
threadLib.Data());
parallelStartup = kFALSE;
}
} else {
Warning("Init",
"Cannot find libThread: switch to serial startup (%s)",
threadLib.Data());
parallelStartup = kFALSE;
}
Int_t parallelRequests = gEnv->GetValue("Proof.ParallelStartupRequests", 0);
if (parallelRequests > 0) {
PDB(kGlobal,1)
Info("Init", "Parallel Startup Requests: %d", parallelRequests);
fgSemaphore = new TSemaphore((UInt_t)(parallelRequests));
}
}
}
if (!StartSlaves(parallelStartup, attach))
return 0;
if (fgSemaphore)
SafeDelete(fgSemaphore);
fValid = kTRUE;
fAllMonitor->DeActivateAll();
GoParallel(9999, attach);
if (!attach)
SendInitialState();
else if (!IsIdle())
fRedirLog = kTRUE;
if (!IsMaster())
SetAlias(al);
SetActive(kFALSE);
if (IsValid()) {
ActivateAsyncInput();
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
return fActiveSlaves->GetSize();
}
void TProof::SetManager(TProofMgr *mgr)
{
fManager = mgr;
if (mgr) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(mgr);
gROOT->GetListOfSockets()->Add(mgr);
}
}
Bool_t TProof::StartSlaves(Bool_t parallel, Bool_t attach)
{
if (IsMaster()) {
Int_t pc = 0;
TList *workerList = new TList;
if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
Error("StartSlaves", "getting list of worker nodes");
return kFALSE;
}
fImage = gProofServ->GetImage();
UInt_t nSlaves = workerList->GetSize();
UInt_t nSlavesDone = 0;
Int_t ord = 0;
std::vector<TProofThread *> thrHandlers;
if (parallel) {
thrHandlers.reserve(nSlaves);
if (thrHandlers.max_size() < nSlaves) {
PDB(kGlobal,1)
Info("StartSlaves","cannot reserve enough space for thread"
" handlers - switch to serial startup");
parallel = kFALSE;
}
}
TListIter next(workerList);
TObject *to;
TProofNodeInfo *worker;
while ((to = next())) {
worker = (TProofNodeInfo *)to;
const Char_t *image = worker->GetImage().Data();
const Char_t *workdir = worker->GetWorkDir().Data();
Int_t perfidx = worker->GetPerfIndex();
Int_t sport = worker->GetPort();
if (sport == -1)
sport = fUrl.GetPort();
TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
if (parallel) {
TProofThreadArg *ta =
new TProofThreadArg(worker->GetNodeName().Data(), sport,
fullord, perfidx, image, workdir,
fSlaves, this);
if (ta) {
TThread *th = new TThread(SlaveStartupThread, ta);
if (!th) {
Info("StartSlaves","Can't create startup thread:"
" out of system resources");
SafeDelete(ta);
} else {
thrHandlers.push_back(new TProofThread(th, ta));
th->Run();
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to workers") << nSlaves
<< nSlavesDone << kTRUE;
gProofServ->GetSocket()->Send(m);
}
}
else {
Info("StartSlaves","Can't create thread arguments object:"
" out of system resources");
}
}
else {
TUrl u(Form("%s:%d",worker->GetNodeName().Data(), sport));
TSlave *slave = CreateSlave(u.GetUrl(), fullord, perfidx,
image, workdir);
Bool_t slaveOk = kTRUE;
if (slave->IsValid()) {
fSlaves->Add(slave);
} else {
slaveOk = kFALSE;
fBadSlaves->Add(slave);
}
PDB(kGlobal,3)
Info("StartSlaves", "worker on host %s created"
" and added to list", worker->GetNodeName().Data());
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to workers") << nSlaves
<< nSlavesDone << slaveOk;
gProofServ->GetSocket()->Send(m);
}
ord++;
}
SafeDelete(workerList);
nSlavesDone = 0;
if (parallel) {
std::vector<TProofThread *>::iterator i;
for (i = thrHandlers.begin(); i != thrHandlers.end(); ++i) {
TProofThread *pt = *i;
if (pt && pt->fThread->GetState() == TThread::kRunningState) {
PDB(kGlobal,3)
Info("Init",
"parallel startup: waiting for worker %s (%s:%d)",
pt->fArgs->fOrd.Data(), pt->fArgs->fUrl->GetHost(),
pt->fArgs->fUrl->GetPort());
pt->fThread->Join();
}
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up worker servers") << nSlaves
<< nSlavesDone << kTRUE;
gProofServ->GetSocket()->Send(m);
}
TIter next(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *)next())) {
if (sl->IsValid())
fAllMonitor->Add(sl->GetSocket());
else
fBadSlaves->Add(sl);
}
while (!thrHandlers.empty()) {
i = thrHandlers.end()-1;
if (*i) {
SafeDelete(*i);
thrHandlers.erase(i);
}
}
} else {
TIter nxsl(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *) nxsl())) {
if (sl->IsValid())
sl->SetupServ(TSlave::kSlave, 0);
Bool_t slaveOk = kTRUE;
if (sl->IsValid()) {
fAllMonitor->Add(sl->GetSocket());
} else {
slaveOk = kFALSE;
fBadSlaves->Add(sl);
}
nSlavesDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up worker servers") << nSlaves
<< nSlavesDone << slaveOk;
gProofServ->GetSocket()->Send(m);
}
}
} else {
fprintf(stderr,"Starting master: opening connection ... \n");
TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
if (slave->IsValid()) {
fprintf(stderr,"Starting master:"
" connection open: setting up server ... \r");
StartupMessage("Connection to master opened", kTRUE, 1, 1);
if (!attach) {
slave->SetInterruptHandler(kTRUE);
slave->SetupServ(TSlave::kMaster, fConfFile);
if (slave->IsValid()) {
fprintf(stderr,"Starting master: OK \n");
StartupMessage("Master started", kTRUE, 1, 1);
if (fProtocol == 1) {
Error("StartSlaves",
"client and remote protocols not compatible (%d and %d)",
kPROOF_Protocol, fProtocol);
slave->Close("S");
delete slave;
return kFALSE;
}
fSlaves->Add(slave);
fAllMonitor->Add(slave->GetSocket());
slave->SetInterruptHandler(kFALSE);
fIntHandler = new TProofInterruptHandler(this);
fIntHandler->Add();
Collect(slave);
Int_t slStatus = slave->GetStatus();
if (slStatus == -99 || slStatus == -98) {
fSlaves->Remove(slave);
fAllMonitor->Remove(slave->GetSocket());
if (slStatus == -99)
Error("StartSlaves", "not allowed to connect to PROOF master server");
else if (slStatus == -98)
Error("StartSlaves", "could not setup output redirection on master");
else
Error("StartSlaves", "setting up master");
slave->Close("S");
delete slave;
return 0;
}
if (!slave->IsValid()) {
fSlaves->Remove(slave);
fAllMonitor->Remove(slave->GetSocket());
slave->Close("S");
delete slave;
Error("StartSlaves",
"failed to setup connection with PROOF master server");
return kFALSE;
}
if (!gROOT->IsBatch()) {
if ((fProgressDialog =
gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
if (fProgressDialog->LoadPlugin() == -1)
fProgressDialog = 0;
}
} else {
fprintf(stderr,"Starting master: failure\n");
}
} else {
if (attach) {
fprintf(stderr,"Starting master: OK \n");
StartupMessage("Master attached", kTRUE, 1, 1);
if (!gROOT->IsBatch()) {
if ((fProgressDialog =
gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
if (fProgressDialog->LoadPlugin() == -1)
fProgressDialog = 0;
}
} else {
fprintf(stderr,"Starting manager: OK \n");
StartupMessage("Manager started", kTRUE, 1, 1);
}
fSlaves->Add(slave);
fAllMonitor->Add(slave->GetSocket());
fIntHandler = new TProofInterruptHandler(this);
fIntHandler->Add();
}
} else {
delete slave;
Error("StartSlaves", "failed to connect to a PROOF master server");
return kFALSE;
}
}
return kTRUE;
}
void TProof::Close(Option_t *opt)
{
if (fSlaves) {
if (fIntHandler) fIntHandler->Remove();
TIter nxs(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *)nxs()))
sl->Close(opt);
fActiveSlaves->Clear("nodelete");
fUniqueSlaves->Clear("nodelete");
fAllUniqueSlaves->Clear("nodelete");
fNonUniqueMasters->Clear("nodelete");
fBadSlaves->Clear("nodelete");
fSlaves->Delete();
}
{
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(this);
if (IsProofd()) {
gROOT->GetListOfProofs()->Remove(this);
if (gProof && gProof == this) {
TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
while ((gProof = (TProof *)pvp())) {
if (gProof->IsProofd())
break;
}
}
}
}
}
TSlave *TProof::CreateSlave(const char *url, const char *ord,
Int_t perf, const char *image, const char *workdir)
{
TSlave* sl = TSlave::Create(url, ord, perf, image,
this, TSlave::kSlave, workdir, 0);
if (sl->IsValid()) {
sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
sl->fParallel = 1;
}
return sl;
}
TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
const char *image, const char *msd)
{
TSlave *sl = TSlave::Create(url, ord, 100, image, this,
TSlave::kMaster, 0, msd);
if (sl->IsValid()) {
sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
}
return sl;
}
TSlave *TProof::FindSlave(TSocket *s) const
{
TSlave *sl;
TIter next(fSlaves);
while ((sl = (TSlave *)next())) {
if (sl->IsValid() && sl->GetSocket() == s)
return sl;
}
return 0;
}
void TProof::FindUniqueSlaves()
{
fUniqueSlaves->Clear();
fUniqueMonitor->RemoveAll();
fAllUniqueSlaves->Clear();
fAllUniqueMonitor->RemoveAll();
fNonUniqueMasters->Clear();
TIter next(fActiveSlaves);
while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
if (fImage == sl->fImage) {
if (sl->GetSlaveType() == TSlave::kMaster) {
fNonUniqueMasters->Add(sl);
fAllUniqueSlaves->Add(sl);
fAllUniqueMonitor->Add(sl->GetSocket());
}
continue;
}
TIter next2(fUniqueSlaves);
TSlave *replace_slave = 0;
Bool_t add = kTRUE;
while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
if (sl->fImage == sl2->fImage) {
add = kFALSE;
if (sl->GetSlaveType() == TSlave::kMaster) {
if (sl2->GetSlaveType() == TSlave::kSlave) {
replace_slave = sl2;
add = kTRUE;
} else if (sl2->GetSlaveType() == TSlave::kMaster) {
fNonUniqueMasters->Add(sl);
fAllUniqueSlaves->Add(sl);
fAllUniqueMonitor->Add(sl->GetSocket());
} else {
Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
R__ASSERT(0);
}
}
break;
}
}
if (add) {
fUniqueSlaves->Add(sl);
fAllUniqueSlaves->Add(sl);
fUniqueMonitor->Add(sl->GetSocket());
fAllUniqueMonitor->Add(sl->GetSocket());
if (replace_slave) {
fUniqueSlaves->Remove(replace_slave);
fAllUniqueSlaves->Remove(replace_slave);
fUniqueMonitor->Remove(replace_slave->GetSocket());
fAllUniqueMonitor->Remove(replace_slave->GetSocket());
}
}
}
fUniqueMonitor->DeActivateAll();
fAllUniqueMonitor->DeActivateAll();
}
Int_t TProof::GetNumberOfSlaves() const
{
return fSlaves->GetSize();
}
Int_t TProof::GetNumberOfActiveSlaves() const
{
return fActiveSlaves->GetSize();
}
Int_t TProof::GetNumberOfInactiveSlaves() const
{
return fInactiveSlaves->GetSize();
}
Int_t TProof::GetNumberOfUniqueSlaves() const
{
return fUniqueSlaves->GetSize();
}
Int_t TProof::GetNumberOfBadSlaves() const
{
return fBadSlaves->GetSize();
}
void TProof::AskStatistics()
{
if (!IsValid()) return;
Broadcast(kPROOF_GETSTATS, kActive);
Collect(kActive);
}
void TProof::AskParallel()
{
if (!IsValid()) return;
Broadcast(kPROOF_GETPARALLEL, kActive);
Collect(kActive);
}
TList *TProof::GetListOfQueries(Option_t *opt)
{
if (!IsValid() || IsMaster()) return (TList *)0;
Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
TMessage m(kPROOF_QUERYLIST);
m << all;
Broadcast(m, kActive);
Collect(kActive);
return fQueries;
}
Int_t TProof::GetNumberOfQueries()
{
if (fQueries)
return fQueries->GetSize() - fOtherQueries;
return 0;
}
void TProof::SetMaxDrawQueries(Int_t max)
{
if (max > 0) {
if (fPlayer)
fPlayer->SetMaxDrawQueries(max);
fMaxDrawQueries = max;
}
}
void TProof::GetMaxQueries()
{
TMessage m(kPROOF_MAXQUERIES);
m << kFALSE;
Broadcast(m, kActive);
Collect(kActive);
}
TList *TProof::GetQueryResults()
{
return fPlayer->GetListOfResults();
}
TQueryResult *TProof::GetQueryResult(const char *ref)
{
return fPlayer->GetQueryResult(ref);
}
void TProof::ShowQueries(Option_t *opt)
{
Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
if (help) {
Printf("+++");
Printf("+++ Options: \"A\" show all queries known to server");
Printf("+++ \"L\" show retrieved queries");
Printf("+++ \"F\" full listing of query info");
Printf("+++ \"H\" print this menu");
Printf("+++");
Printf("+++ (case insensitive)");
Printf("+++");
Printf("+++ Use Retrieve(<#>) to retrieve the full"
" query results from the master");
Printf("+++ e.g. Retrieve(8)");
Printf("+++");
return;
}
if (!IsValid()) return;
Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
TObject *pq = 0;
if (!local) {
GetListOfQueries(opt);
if (!fQueries) return;
TIter nxq(fQueries);
if (fOtherQueries > 0) {
Printf("+++");
Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
Int_t nq = 0;
while (nq++ < fOtherQueries && (pq = nxq()))
pq->Print(opt);
}
Printf("+++");
Printf("+++ Queries processed during this session: selector: %d, draw: %d",
GetNumberOfQueries(), fDrawQueries);
while ((pq = nxq()))
pq->Print(opt);
} else {
Printf("+++");
Printf("+++ Queries processed during this session: selector: %d, draw: %d",
GetNumberOfQueries(), fDrawQueries);
TList *listlocal = fPlayer->GetListOfResults();
if (listlocal) {
Printf("+++");
Printf("+++ Queries available locally: %d", listlocal->GetSize());
TIter nxlq(listlocal);
while ((pq = nxlq()))
pq->Print(opt);
}
}
Printf("+++");
}
Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
{
if (!IsValid()) return kFALSE;
TList submasters;
TIter nextSlave(GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
if (sl->GetSlaveType() == TSlave::kMaster) {
submasters.Add(sl);
}
}
fDataReady = kTRUE;
fBytesReady = 0;
fTotalBytes = 0;
if (submasters.GetSize() > 0) {
Broadcast(kPROOF_DATA_READY, &submasters);
Collect(&submasters);
}
bytesready = fBytesReady;
totalbytes = fTotalBytes;
EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
Info("IsDataReady", "%lld / %lld (%s)",
bytesready, totalbytes, fDataReady?"READY":"NOT READY");
return fDataReady;
}
void TProof::Interrupt(EUrgent type, ESlaves list)
{
if (!IsValid()) return;
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
if (slaves->GetSize() == 0) return;
TSlave *sl;
TIter next(slaves);
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
sl->Interrupt((Int_t)type);
}
}
}
Int_t TProof::GetParallel() const
{
if (!IsValid()) return -1;
TIter nextSlave(GetListOfActiveSlaves());
Int_t nparallel = 0;
while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
if (sl->GetParallel() >= 0)
nparallel += sl->GetParallel();
return nparallel;
}
TList *TProof::GetSlaveInfo()
{
if (!IsValid()) return 0;
if (fSlaveInfo == 0) {
fSlaveInfo = new TSortedList(kSortDescending);
fSlaveInfo->SetOwner();
} else {
fSlaveInfo->Delete();
}
TList masters;
TIter next(GetListOfSlaves());
TSlave *slave;
while((slave = (TSlave *) next()) != 0) {
if (slave->GetSlaveType() == TSlave::kSlave) {
TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
slave->GetName(),
slave->GetPerfIdx());
fSlaveInfo->Add(slaveinfo);
TIter nextactive(GetListOfActiveSlaves());
TSlave *activeslave;
while ((activeslave = (TSlave *) nextactive())) {
if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
slaveinfo->SetStatus(TSlaveInfo::kActive);
break;
}
}
TIter nextbad(GetListOfBadSlaves());
TSlave *badslave;
while ((badslave = (TSlave *) nextbad())) {
if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
slaveinfo->SetStatus(TSlaveInfo::kBad);
break;
}
}
} else if (slave->GetSlaveType() == TSlave::kMaster) {
if (slave->IsValid()) {
if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
MarkBad(slave);
else
masters.Add(slave);
}
} else {
Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
R__ASSERT(0);
}
}
if (masters.GetSize() > 0) Collect(&masters);
return fSlaveInfo;
}
void TProof::Activate(TList *slaves)
{
TMonitor *mon = fAllMonitor;
mon->DeActivateAll();
slaves = !slaves ? fActiveSlaves : slaves;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave*) next())) {
if (sl->IsValid())
mon->Activate(sl->GetSocket());
}
}
Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
{
if (!IsValid()) return -1;
if (slaves->GetSize() == 0) return 0;
int nsent = 0;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (sl->GetSocket()->Send(mess) == -1)
MarkBad(sl);
else
nsent++;
}
}
return nsent;
}
Int_t TProof::Broadcast(const TMessage &mess, ESlaves list)
{
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
return Broadcast(mess, slaves);
}
Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
{
TMessage mess(kind);
if (str) mess.WriteString(str);
return Broadcast(mess, slaves);
}
Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
{
TMessage mess(kind);
if (str) mess.WriteString(str);
return Broadcast(mess, list);
}
Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, TList *slaves)
{
TMessage mess(kind);
mess.WriteObject(obj);
return Broadcast(mess, slaves);
}
Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, ESlaves list)
{
TMessage mess(kind);
mess.WriteObject(obj);
return Broadcast(mess, list);
}
Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
{
if (!IsValid()) return -1;
if (slaves->GetSize() == 0) return 0;
int nsent = 0;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (sl->GetSocket()->SendRaw(buffer, length) == -1)
MarkBad(sl);
else
nsent++;
}
}
return nsent;
}
Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
{
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
return BroadcastRaw(buffer, length, slaves);
}
Int_t TProof::Collect(const TSlave *sl, Long_t timeout)
{
if (!sl->IsValid()) return 0;
TMonitor *mon = fAllMonitor;
mon->DeActivateAll();
mon->Activate(sl->GetSocket());
return Collect(mon, timeout);
}
Int_t TProof::Collect(TList *slaves, Long_t timeout)
{
TMonitor *mon = fAllMonitor;
mon->DeActivateAll();
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave*) next())) {
if (sl->IsValid())
mon->Activate(sl->GetSocket());
}
return Collect(mon, timeout);
}
Int_t TProof::Collect(ESlaves list, Long_t timeout)
{
TMonitor *mon = 0;
if (list == kAll) mon = fAllMonitor;
if (list == kActive) mon = fActiveMonitor;
if (list == kUnique) mon = fUniqueMonitor;
if (list == kAllUnique) mon = fAllUniqueMonitor;
mon->ActivateAll();
return Collect(mon, timeout);
}
Int_t TProof::Collect(TMonitor *mon, Long_t timeout)
{
fStatus = 0;
if (!mon->GetActive()) return 0;
DeActivateAsyncInput();
fCurrentMonitor = mon;
Bool_t saveRedirLog = fRedirLog;
if (!IsIdle() && !IsSync())
fRedirLog = kFALSE;
int cnt = 0, rc = 0;
fBytesRead = 0;
fRealTime = 0.0;
fCpuTime = 0.0;
Long_t nto = timeout;
if (gDebug > 2)
Info("Collect","active: %d", mon->GetActive());
while (mon->GetActive() && (nto < 0 || nto > 0)) {
TSocket *s = mon->Select(1000);
if (s && s != (TSocket *)(-1)) {
if ((rc = CollectInputFrom(s)) == 1) {
mon->DeActivate(s);
if (gDebug > 2)
Info("Collect","deactivating %p (active: %d, %p)",
s, mon->GetActive(),
mon->GetListOfActives()->First());
}
if (rc >= 0)
cnt++;
} else {
if (!s)
if (fPlayer && (fPlayer->GetExitStatus() == TProofPlayer::kFinished))
mon->DeActivateAll();
if (s == (TSocket *)(-1) && nto > 0)
nto--;
}
}
if (nto == 0)
mon->DeActivateAll();
SendGroupView();
fRedirLog = saveRedirLog;
fCurrentMonitor = 0;
ActivateAsyncInput();
return cnt;
}
void TProof::CleanGDirectory(TList *ol)
{
if (ol) {
TIter nxo(ol);
TObject *o = 0;
while ((o = nxo()))
gDirectory->RecursiveRemove(o);
}
}
Int_t TProof::CollectInputFrom(TSocket *s)
{
TMessage *mess;
Int_t rc = 0;
char str[512];
TSlave *sl;
TObject *obj;
Int_t what;
Bool_t delete_mess = kTRUE;
if (s->Recv(mess) < 0) {
MarkBad(s);
return -1;
}
if (!mess) {
MarkBad(s);
return -1;
}
what = mess->What();
PDB(kGlobal,3) {
sl = FindSlave(s);
Info("CollectInputFrom","got %d from %s", what, sl->GetOrdinal());
}
switch (what) {
case kMESS_OBJECT:
obj = mess->ReadObject(mess->GetClass());
if (obj->InheritsFrom(TH1::Class())) {
TH1 *h = (TH1*)obj;
h->SetDirectory(0);
TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
if (horg)
horg->Add(h);
else
h->SetDirectory(gDirectory);
}
break;
case kPROOF_FATAL:
MarkBad(s);
break;
case kPROOF_GETOBJECT:
mess->ReadString(str, sizeof(str));
obj = gDirectory->Get(str);
if (obj)
s->SendObject(obj);
else
s->Send(kMESS_NOTOK);
break;
case kPROOF_GETPACKET:
{
TDSetElement *elem = 0;
sl = FindSlave(s);
elem = fPlayer->GetNextPacket(sl, mess);
if (elem != (TDSetElement*) -1) {
TMessage answ(kPROOF_GETPACKET);
answ << elem;
s->Send(answ);
while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
TPair *p = (TPair*) fWaitingSlaves->First();
s = (TSocket*) p->Key();
sl = FindSlave(s);
TMessage *m = (TMessage*) p->Value();
elem = fPlayer->GetNextPacket(sl, m);
if (elem != (TDSetElement*) -1) {
TMessage a(kPROOF_GETPACKET);
a << elem;
s->Send(a);
fWaitingSlaves->Remove(fWaitingSlaves->FirstLink());
delete p;
delete m;
} else {
break;
}
}
} else {
if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
fWaitingSlaves->Add(new TPair(s, mess));
delete_mess = kFALSE;
}
}
break;
case kPROOF_LOGFILE:
{
Int_t size;
(*mess) >> size;
RecvLogFile(s, size);
}
break;
case kPROOF_LOGDONE:
sl = FindSlave(s);
(*mess) >> sl->fStatus >> sl->fParallel;
PDB(kGlobal,2)
Info("CollectInputFrom","kPROOF_LOGDONE:%s: status %d parallel %d",
sl->GetOrdinal(), sl->fStatus, sl->fParallel);
if (sl->fStatus != 0) fStatus = sl->fStatus;
rc = 1;
break;
case kPROOF_GETSTATS:
sl = FindSlave(s);
(*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
>> sl->fWorkDir >> sl->fProofWorkDir;
fBytesRead += sl->fBytesRead;
fRealTime += sl->fRealTime;
fCpuTime += sl->fCpuTime;
rc = 1;
break;
case kPROOF_GETPARALLEL:
sl = FindSlave(s);
(*mess) >> sl->fParallel;
rc = 1;
break;
case kPROOF_PACKAGE_LIST:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_PACKAGE_LIST: enter");
Int_t type = 0;
(*mess) >> type;
switch (type) {
case TProof::kListEnabledPackages:
SafeDelete(fEnabledPackages);
fEnabledPackages = (TList *) mess->ReadObject(TList::Class());
fEnabledPackages->SetOwner();
break;
case TProof::kListPackages:
SafeDelete(fAvailablePackages);
fAvailablePackages = (TList *) mess->ReadObject(TList::Class());
fAvailablePackages->SetOwner();
break;
default:
Info("CollectInputFrom","kPROOF_PACKAGE_LIST: unknown type: %d", type);
}
}
break;
case kPROOF_OUTPUTOBJECT:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_OUTPUTOBJECT: enter");
Int_t type = 0;
(*mess) >> type;
if (type == 0) {
TQueryResult *pq =
(TQueryResult *) mess->ReadObject(TQueryResult::Class());
if (pq) {
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
fPlayer->AddInput(new TNamed("PROOF_QueryTag",
Form("%s:%s",pq->GetTitle(),pq->GetName())));
} else {
Warning("CollectInputFrom","kPROOF_OUTPUTOBJECT: query result missing");
}
} else if (type > 0) {
TObject *obj = mess->ReadObject(TObject::Class());
if ((fPlayer->AddOutputObject(obj) == 1))
SafeDelete(obj);
if (type > 1 && !IsMaster()) {
TQueryResult *pq = fPlayer->GetCurrentQuery();
pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
pq->SetInputList(fPlayer->GetInputList(), kFALSE);
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
}
}
}
break;
case kPROOF_OUTPUTLIST:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_OUTPUTLIST: enter");
TList *out = 0;
if (IsMaster() || fProtocol < 7) {
out = (TList *) mess->ReadObject(TList::Class());
} else {
TQueryResult *pq =
(TQueryResult *) mess->ReadObject(TQueryResult::Class());
if (pq) {
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
out = pq->GetOutputList();
CleanGDirectory(out);
out = (TList *) out->Clone();
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
} else {
PDB(kGlobal,2)
Info("CollectInputFrom","kPROOF_OUTPUTLIST: query result missing");
}
}
if (out) {
out->SetOwner();
fPlayer->AddOutput(out);
SafeDelete(out);
} else {
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_OUTPUTLIST: ouputlist is empty");
}
if (!IsMaster()) {
if (fPlayer->GetExitStatus() == TProofPlayer::kAborted) {
if (fSync)
Info("CollectInputFrom",
"the processing was aborted - %lld events processed",
fPlayer->GetEventsProcessed());
if (GetRemoteProtocol() > 11) {
Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
} else {
Progress(-1, fPlayer->GetEventsProcessed());
}
Emit("StopProcess(Bool_t)", kTRUE);
}
if (fPlayer->GetExitStatus() == TProofPlayer::kStopped) {
if (fSync)
Info("CollectInputFrom",
"the processing was stopped - %lld events processed",
fPlayer->GetEventsProcessed());
if (GetRemoteProtocol() > 11) {
Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
} else {
Progress(-1, fPlayer->GetEventsProcessed());
}
Emit("StopProcess(Bool_t)", kFALSE);
}
if (GetRemoteProtocol() > 11) {
EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,)",
7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
} else {
EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
}
}
}
break;
case kPROOF_QUERYLIST:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_QUERYLIST: enter");
(*mess) >> fOtherQueries >> fDrawQueries;
if (fQueries) {
fQueries->Delete();
delete fQueries;
fQueries = 0;
}
fQueries = (TList *) mess->ReadObject(TList::Class());
}
break;
case kPROOF_RETRIEVE:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_RETRIEVE: enter");
TQueryResult *pq =
(TQueryResult *) mess->ReadObject(TQueryResult::Class());
if (pq) {
fPlayer->AddQueryResult(pq);
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
} else {
PDB(kGlobal,2)
Info("CollectInputFrom","kPROOF_RETRIEVE: query result missing");
}
}
break;
case kPROOF_MAXQUERIES:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_MAXQUERIES: enter");
Int_t max = 0;
(*mess) >> max;
Printf("Number of queries fully kept remotely: %d", max);
}
break;
case kPROOF_SERVERSTARTED:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_SERVERSTARTED: enter");
UInt_t tot = 0, done = 0;
TString action;
Bool_t st = kTRUE;
(*mess) >> action >> tot >> done >> st;
if (!IsMaster()) {
if (tot) {
TString type = (action.Contains("submas")) ? "submasters"
: "workers";
Int_t frac = (Int_t) (done*100.)/tot;
if (frac >= 100) {
fprintf(stderr,"%s: OK (%d %s) \n",
action.Data(),tot, type.Data());
} else {
fprintf(stderr,"%s: %d out of %d (%d %%)\r",
action.Data(), done, tot, frac);
}
}
StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
} else {
TMessage m(kPROOF_SERVERSTARTED);
m << action << tot << done << st;
gProofServ->GetSocket()->Send(m);
}
}
break;
case kPROOF_DATASET_STATUS:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_DATASET_STATUS: enter");
UInt_t tot = 0, done = 0;
TString action;
Bool_t st = kTRUE;
(*mess) >> action >> tot >> done >> st;
if (!IsMaster()) {
if (tot) {
TString type = "files";
Int_t frac = (Int_t) (done*100.)/tot;
if (frac >= 100) {
fprintf(stderr,"%s: OK (%d %s) \n",
action.Data(),tot, type.Data());
} else {
fprintf(stderr,"%s: %d out of %d (%d %%)\r",
action.Data(), done, tot, frac);
}
}
DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
} else {
TMessage m(kPROOF_DATASET_STATUS);
m << action << tot << done << st;
gProofServ->GetSocket()->Send(m);
}
}
break;
case kPROOF_STARTPROCESS:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_STARTPROCESS: enter");
fIdle = kFALSE;
TString selec;
Int_t dsz = -1;
Long64_t first = -1, nent = -1;
(*mess) >> selec >> dsz >> first >> nent;
if (fProgressDialog && !TestBit(kUsingSessionGui)) {
if (!fProgressDialogStarted) {
fProgressDialog->ExecPlugin(5, this,
selec.Data(), dsz, first, nent);
fProgressDialogStarted = kTRUE;
} else {
ResetProgressDialog(selec, dsz, first, nent);
}
}
ResetBit(kUsingSessionGui);
}
break;
case kPROOF_SETIDLE:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_SETIDLE: enter");
fIdle = kTRUE;
}
break;
case kPROOF_QUERYSUBMITTED:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_QUERYSUBMITTED: enter");
(*mess) >> fSeqNum;
rc = 1;
}
break;
case kPROOF_SESSIONTAG:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_SESSIONTAG: enter");
TString stag;
(*mess) >> stag;
SetName(stag);
}
break;
case kPROOF_FEEDBACK:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_FEEDBACK: enter");
TList *out = (TList *) mess->ReadObject(TList::Class());
out->SetOwner();
sl = FindSlave(s);
if (fPlayer)
fPlayer->StoreFeedback(sl, out);
else
rc = 1;
}
break;
case kPROOF_AUTOBIN:
{
TString name;
Double_t xmin, xmax, ymin, ymax, zmin, zmax;
(*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
TMessage answ(kPROOF_AUTOBIN);
answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
s->Send(answ);
}
break;
case kPROOF_PROGRESS:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_PROGRESS: enter");
sl = FindSlave(s);
if (GetRemoteProtocol() > 11) {
Long64_t total, processed, bytesread;
Float_t initTime, procTime, evtrti, mbrti;
(*mess) >> total >> processed >> bytesread
>> initTime >> procTime
>> evtrti >> mbrti;
fPlayer->Progress(total, processed, bytesread,
initTime, procTime, evtrti, mbrti);
} else {
Long64_t total, processed;
(*mess) >> total >> processed;
fPlayer->Progress(sl, total, processed);
}
}
break;
case kPROOF_STOPPROCESS:
{
Long64_t events;
Bool_t abort = kFALSE;
if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
(*mess) >> events >> abort;
else
(*mess) >> events;
fPlayer->AddEventsProcessed(events);
if (!IsMaster())
Emit("StopProcess(Bool_t)", abort);
break;
}
case kPROOF_GETSLAVEINFO:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_GETSLAVEINFO: enter");
sl = FindSlave(s);
Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
TList* tmpinfo = 0;
(*mess) >> tmpinfo;
tmpinfo->SetOwner(kFALSE);
Int_t nentries = tmpinfo->GetSize();
for (Int_t i=0; i<nentries; i++) {
TSlaveInfo* slinfo =
dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
if (slinfo) {
fSlaveInfo->Add(slinfo);
if (slinfo->fStatus != TSlaveInfo::kBad) {
if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
}
if (!sl->GetMsd().IsNull()) slinfo->fMsd = sl->GetMsd();
}
}
delete tmpinfo;
rc = 1;
}
break;
case kPROOF_VALIDATE_DSET:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_VALIDATE_DSET: enter");
TDSet* dset = 0;
(*mess) >> dset;
if (!fDSet)
Error("CollectInputFrom","kPROOF_VALIDATE_DSET: fDSet not set");
else
fDSet->Validate(dset);
delete dset;
}
break;
case kPROOF_DATA_READY:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_DATA_READY: enter");
Bool_t dataready = kFALSE;
Long64_t totalbytes, bytesready;
(*mess) >> dataready >> totalbytes >> bytesready;
fTotalBytes += totalbytes;
fBytesReady += bytesready;
if (dataready == kFALSE) fDataReady = dataready;
}
break;
case kPROOF_PING:
break;
case kPROOF_MESSAGE:
{
PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_MESSAGE: enter");
TString msg;
(*mess) >> msg;
Bool_t lfeed = kTRUE;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> lfeed;
if (!IsMaster()) {
if (lfeed) {
fprintf(stderr, "%s\n", msg.Data());
} else {
fprintf(stderr, "%s\r", msg.Data());
}
} else {
TMessage m(kPROOF_MESSAGE);
m << msg;
gProofServ->GetSocket()->Send(m);
}
}
break;
default:
Error("Collect", "unknown command received from slave (what = %d)", what);
break;
}
if (delete_mess)
delete mess;
return rc;
}
void TProof::ActivateAsyncInput()
{
TIter next(fSlaves);
TSlave *sl;
while ((sl = (TSlave*) next()))
if (sl->GetInputHandler())
sl->GetInputHandler()->Add();
}
void TProof::DeActivateAsyncInput()
{
TIter next(fSlaves);
TSlave *sl;
while ((sl = (TSlave*) next()))
if (sl->GetInputHandler())
sl->GetInputHandler()->Remove();
}
void TProof::HandleAsyncInput(TSocket *sl)
{
TMessage *mess;
Int_t what;
if (sl->Recv(mess) <= 0)
return;
what = mess->What();
switch (what) {
case kPROOF_PING:
break;
default:
Error("HandleAsyncInput", "unknown command (what = %d)", what);
break;
}
delete mess;
}
void TProof::MarkBad(TSlave *sl)
{
fActiveSlaves->Remove(sl);
FindUniqueSlaves();
fBadSlaves->Add(sl);
fAllMonitor->Remove(sl->GetSocket());
fActiveMonitor->Remove(sl->GetSocket());
sl->Close();
fSendGroupView = kTRUE;
SaveWorkerInfo();
}
void TProof::MarkBad(TSocket *s)
{
TSlave *sl = FindSlave(s);
MarkBad(sl);
}
Int_t TProof::Ping()
{
return Ping(kActive);
}
Int_t TProof::Ping(ESlaves list)
{
TList *slaves = 0;
if (list == kAll) slaves = fSlaves;
if (list == kActive) slaves = fActiveSlaves;
if (list == kUnique) slaves = fUniqueSlaves;
if (list == kAllUnique) slaves = fAllUniqueSlaves;
if (slaves->GetSize() == 0) return 0;
int nsent = 0;
TIter next(slaves);
TSlave *sl;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (sl->Ping() == -1)
MarkBad(sl);
else
nsent++;
}
}
return nsent;
}
void TProof::Print(Option_t *option) const
{
TString secCont;
if (!IsMaster()) {
Printf("Connected to: %s (%s)", GetMaster(),
IsValid() ? "valid" : "invalid");
Printf("Port number: %d", GetPort());
Printf("User: %s", GetUser());
TSlave *sl = (TSlave *)fActiveSlaves->First();
if (sl) {
TString sc;
if (sl->GetSocket()->GetSecContext())
Printf("Security context: %s",
sl->GetSocket()->GetSecContext()->AsString(sc));
Printf("Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
} else {
Printf("Security context: Error - No connection");
Printf("Proofd protocol version: Error - No connection");
}
Printf("Client protocol version: %d", GetClientProtocol());
Printf("Remote protocol version: %d", GetRemoteProtocol());
Printf("Log level: %d", GetLogLevel());
Printf("Session unique tag: %s", IsValid() ? GetSessionTag() : "");
Printf("Default data pool: %s", IsValid() ? GetDataPoolUrl() : "");
if (IsValid())
const_cast<TProof*>(this)->SendPrint(option);
} else {
const_cast<TProof*>(this)->AskStatistics();
if (IsParallel())
Printf("*** Master server %s (parallel mode, %d slaves):",
gProofServ->GetOrdinal(), GetParallel());
else
Printf("*** Master server %s (sequential mode):",
gProofServ->GetOrdinal());
Printf("Master host name: %s", gSystem->HostName());
Printf("Port number: %d", GetPort());
Printf("User: %s", GetUser());
Printf("Protocol version: %d", GetClientProtocol());
Printf("Image name: %s", GetImage());
Printf("Working directory: %s", gSystem->WorkingDirectory());
Printf("Config directory: %s", GetConfDir());
Printf("Config file: %s", GetConfFile());
Printf("Log level: %d", GetLogLevel());
Printf("Number of workers: %d", GetNumberOfSlaves());
Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
Printf("Total real time used (s): %.3f", GetRealTime());
Printf("Total CPU time used (s): %.3f", GetCpuTime());
if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
Printf("List of workers:");
TList masters;
TIter nextslave(fSlaves);
while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
if (!sl->IsValid()) continue;
if (sl->GetSlaveType() == TSlave::kSlave) {
sl->Print(option);
} else if (sl->GetSlaveType() == TSlave::kMaster) {
TMessage mess(kPROOF_PRINT);
mess.WriteString(option);
if (sl->GetSocket()->Send(mess) == -1)
const_cast<TProof*>(this)->MarkBad(sl);
else
masters.Add(sl);
} else {
Error("Print", "TSlave is neither Master nor Worker");
R__ASSERT(0);
}
}
const_cast<TProof*>(this)->Collect(&masters);
}
}
}
Long64_t TProof::Process(TDSet *dset, const char *selector, Option_t *option,
Long64_t nentries, Long64_t first, TEventList *evl)
{
if (!IsValid()) return -1;
fSync = (GetQueryMode(option) == kSync);
if (fSync && !IsIdle()) {
Info("Process","not idle, cannot submit synchronous query");
return -1;
}
TSignalHandler *sh = 0;
if (fSync) {
if (gApplication)
sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
}
Long64_t rv = fPlayer->Process(dset, selector, option, nentries, first, evl);
if (fSync) {
if (sh)
gSystem->AddSignalHandler(sh);
}
return rv;
}
Int_t TProof::GetQueryReference(Int_t qry, TString &ref)
{
ref = "";
if (qry > 0) {
if (!fQueries)
GetListOfQueries();
if (fQueries) {
TIter nxq(fQueries);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq()))
if (qr->GetSeqNum() == qry) {
ref = Form("%s:%s", qr->GetTitle(), qr->GetName());
return 0;
}
}
}
return -1;
}
Long64_t TProof::Finalize(Int_t qry, Bool_t force)
{
if (fPlayer) {
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0) {
return Finalize(ref, force);
} else {
Info("Finalize", "query #%d not found", qry);
}
} else {
return fPlayer->Finalize(force);
}
}
return -1;
}
Long64_t TProof::Finalize(const char *ref, Bool_t force)
{
if (fPlayer) {
if (ref) {
TQueryResult *qr = fPlayer->GetQueryResult(ref);
Bool_t retrieve = kFALSE;
if (!qr) {
retrieve = kTRUE;
} else {
if (qr->IsFinalized()) {
if (force) {
retrieve = kTRUE;
} else {
Info("Finalize","query already finalized:"
" use Finalize(<qry>,kTRUE) to force new retrieval");
qr = 0;
}
}
}
if (retrieve) {
Retrieve(ref);
qr = fPlayer->GetQueryResult(ref);
}
if (qr)
return fPlayer->Finalize(qr);
}
}
return -1;
}
Int_t TProof::Retrieve(Int_t qry, const char *path)
{
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0)
return Retrieve(ref, path);
else
Info("Retrieve", "query #%d not found", qry);
} else {
Info("Retrieve","positive argument required - do nothing");
}
return -1;
}
Int_t TProof::Retrieve(const char *ref, const char *path)
{
if (ref) {
TMessage m(kPROOF_RETRIEVE);
m << TString(ref);
Broadcast(m, kActive);
Collect(kActive);
if (path) {
TQueryResult *qr = fPlayer ? fPlayer->GetQueryResult(ref) : 0;
if (qr) {
TFile *farc = TFile::Open(path,"UPDATE");
if (!(farc->IsOpen())) {
Info("Retrieve", "archive file cannot be open (%s)", path);
return 0;
}
farc->cd();
qr->SetArchived(path);
qr->Write();
farc->Close();
SafeDelete(farc);
} else {
Info("Retrieve", "query not found after retrieve");
return -1;
}
}
return 0;
}
return -1;
}
Int_t TProof::Remove(Int_t qry, Bool_t all)
{
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0)
return Remove(ref, all);
else
Info("Remove", "query #%d not found", qry);
} else {
Info("Remove","positive argument required - do nothing");
}
return -1;
}
Int_t TProof::Remove(const char *ref, Bool_t all)
{
if (all) {
if (fPlayer)
fPlayer->RemoveQueryResult(ref);
}
if (ref) {
TMessage m(kPROOF_REMOVE);
m << TString(ref);
Broadcast(m, kActive);
Collect(kActive);
return 0;
}
return -1;
}
Int_t TProof::Archive(Int_t qry, const char *path)
{
if (qry > 0) {
TString ref;
if (GetQueryReference(qry, ref) == 0)
return Archive(ref, path);
else
Info("Archive", "query #%d not found", qry);
} else {
Info("Archive","positive argument required - do nothing");
}
return -1;
}
Int_t TProof::Archive(const char *ref, const char *path)
{
if (ref) {
TMessage m(kPROOF_ARCHIVE);
m << TString(ref) << TString(path);
Broadcast(m, kActive);
Collect(kActive);
return 0;
}
return -1;
}
Int_t TProof::CleanupSession(const char *sessiontag)
{
if (sessiontag) {
TMessage m(kPROOF_CLEANUPSESSION);
m << TString(sessiontag);
Broadcast(m, kActive);
Collect(kActive);
return 0;
}
return -1;
}
void TProof::SetQueryMode(EQueryMode mode)
{
fQueryMode = mode;
if (gDebug > 0)
Info("SetQueryMode","query mode is set to: %s", fQueryMode == kSync ?
"Sync" : "Async");
}
TProof::EQueryMode TProof::GetQueryMode() const
{
if (gDebug > 0)
Info("GetQueryMode","query mode is set to: %s", fQueryMode == kSync ?
"Sync" : "Async");
return fQueryMode;
}
TProof::EQueryMode TProof::GetQueryMode(Option_t *mode) const
{
EQueryMode qmode = fQueryMode;
if (mode) {
TString m(mode);
m.ToUpper();
if (m.Contains("ASYN")) {
qmode = kAsync;
} else if (m.Contains("SYNC")) {
qmode = kSync;
}
}
return qmode;
}
Long64_t TProof::DrawSelect(TDSet *dset, const char *varexp, const char *selection, Option_t *option,
Long64_t nentries, Long64_t first)
{
if (!IsValid()) return -1;
if (!IsIdle()) {
Info("DrawSelect","not idle, asynchronous Draw not supported");
return -1;
}
TString opt(option);
Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
if (idx != kNPOS)
opt.Replace(idx,4,"");
return fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
}
void TProof::StopProcess(Bool_t abort, Int_t timeout)
{
PDB(kGlobal,2)
Info("StopProcess","enter %d", abort);
if (!IsValid())
return;
fPlayer->StopProcess(abort, timeout);
if (!IsMaster())
InterruptCurrentMonitor();
if (fSlaves->GetSize() == 0)
return;
TSlave *sl;
TIter next(fSlaves);
while ((sl = (TSlave *)next()))
if (sl->IsValid())
sl->StopProcess(abort, timeout);
}
void TProof::RecvLogFile(TSocket *s, Int_t size)
{
const Int_t kMAXBUF = 16384;
char buf[kMAXBUF];
Int_t fdout = -1;
if (!fLogToWindowOnly) {
fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
if (fdout < 0) {
Warning("RecvLogFile", "file descriptor for outputs undefined (%d):"
" will not log msgs", fdout);
return;
}
lseek(fdout, (off_t) 0, SEEK_END);
}
Int_t left, rec, r;
Long_t filesize = 0;
while (filesize < size) {
left = Int_t(size - filesize);
if (left > kMAXBUF)
left = kMAXBUF;
rec = s->RecvRaw(&buf, left);
filesize = (rec > 0) ? (filesize + rec) : filesize;
if (!fLogToWindowOnly) {
if (rec > 0) {
char *p = buf;
r = rec;
while (r) {
Int_t w;
w = write(fdout, p, r);
if (w < 0) {
SysError("RecvLogFile", "error writing to stdout");
break;
}
r -= w;
p += w;
}
} else if (rec < 0) {
Error("RecvLogFile", "error during receiving log file");
break;
}
}
if (rec > 0) {
buf[rec] = 0;
EmitVA("LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
}
}
if (fRedirLog && IsIdle())
fRedirLog = kFALSE;
}
void TProof::LogMessage(const char *msg, Bool_t all)
{
PDB(kGlobal,1)
Info("LogMessage","Enter ... %s, 'all: %s", msg ? msg : "",
all ? "true" : "false");
if (gROOT->IsBatch()) {
PDB(kGlobal,1) Info("LogMessage","GUI not started - use TProof::ShowLog()");
return;
}
if (msg)
EmitVA("LogMessage(const char*,Bool_t)", 2, msg, all);
if (all)
lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
Int_t len;
do {
while ((len = read(fileno(fLogFileR), buf, kMAXBUF-1)) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
Error("LogMessage", "error reading log file");
break;
}
if (len > 0) {
buf[len] = 0;
EmitVA("LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
}
} while (len > 0);
}
Int_t TProof::SendGroupView()
{
if (!IsValid()) return -1;
if (!IsMaster()) return 0;
if (!fSendGroupView) return 0;
fSendGroupView = kFALSE;
TIter next(fActiveSlaves);
TSlave *sl;
int bad = 0, cnt = 0, size = GetNumberOfActiveSlaves();
char str[32];
while ((sl = (TSlave *)next())) {
sprintf(str, "%d %d", cnt, size);
if (sl->GetSocket()->Send(str, kPROOF_GROUPVIEW) == -1) {
MarkBad(sl);
bad++;
} else
cnt++;
}
if (bad) SendGroupView();
return GetNumberOfActiveSlaves();
}
Int_t TProof::Exec(const char *cmd, Bool_t plusMaster)
{
return Exec(cmd, kActive, plusMaster);
}
Int_t TProof::Exec(const char *cmd, ESlaves list, Bool_t plusMaster)
{
if (!IsValid()) return -1;
TString s = cmd;
s = s.Strip(TString::kBoth);
if (!s.Length()) return 0;
if (s.BeginsWith(".L") || s.BeginsWith(".x") || s.BeginsWith(".X")) {
TString file = s(2, s.Length());
TString acm, arg, io;
TString filename = gSystem->SplitAclicMode(file, acm, arg, io);
char *fn = gSystem->Which(TROOT::GetMacroPath(), filename, kReadPermission);
if (fn) {
if (GetNumberOfUniqueSlaves() > 0) {
if (SendFile(fn, kAscii | kForward) < 0) {
Error("Exec", "file %s could not be transfered", fn);
delete [] fn;
return -1;
}
} else {
TString scmd = s(0,3) + fn;
Int_t n = SendCommand(scmd, list);
delete [] fn;
return n;
}
} else {
Error("Exec", "macro %s not found", file.Data());
return -1;
}
delete [] fn;
}
if (plusMaster) {
Int_t n = GetParallel();
SetParallelSilent(0);
Int_t res = SendCommand(cmd, list);
SetParallelSilent(n);
if (res < 0)
return res;
}
return SendCommand(cmd, list);
}
Int_t TProof::SendCommand(const char *cmd, ESlaves list)
{
if (!IsValid()) return -1;
Broadcast(cmd, kMESS_CINT, list);
Collect(list);
return fStatus;
}
Int_t TProof::SendCurrentState(ESlaves list)
{
if (!IsValid()) return -1;
Broadcast(gDirectory->GetPath(), kPROOF_RESET, list);
return GetParallel();
}
Int_t TProof::SendInitialState()
{
if (!IsValid()) return -1;
SetLogLevel(fLogLevel, gProofDebugMask);
return GetNumberOfActiveSlaves();
}
Bool_t TProof::CheckFile(const char *file, TSlave *slave, Long_t modtime)
{
Bool_t sendto = kFALSE;
TString sn = slave->GetName();
sn += ":";
sn += slave->GetOrdinal();
sn += ":";
sn += gSystem->BaseName(file);
FileMap_t::const_iterator it;
if ((it = fFileMap.find(sn)) != fFileMap.end()) {
MD5Mod_t md = (*it).second;
if (md.fModtime != modtime) {
TMD5 *md5 = TMD5::FileChecksum(file);
if (md5) {
if ((*md5) != md.fMD5) {
sendto = kTRUE;
md.fMD5 = *md5;
md.fModtime = modtime;
fFileMap[sn] = md;
if (IsMaster()) {
sendto = kFALSE;
TMessage mess(kPROOF_CHECKFILE);
mess << TString(gSystem->BaseName(file)) << md.fMD5;
slave->GetSocket()->Send(mess);
TMessage *reply;
slave->GetSocket()->Recv(reply);
if (reply->What() != kPROOF_CHECKFILE)
sendto = kTRUE;
delete reply;
}
}
delete md5;
} else {
Error("CheckFile", "could not calculate local MD5 check sum - dont send");
return kFALSE;
}
}
} else {
TMD5 *md5 = TMD5::FileChecksum(file);
MD5Mod_t md;
if (md5) {
md.fMD5 = *md5;
md.fModtime = modtime;
fFileMap[sn] = md;
delete md5;
} else {
Error("CheckFile", "could not calculate local MD5 check sum - dont send");
return kFALSE;
}
TMessage mess(kPROOF_CHECKFILE);
mess << TString(gSystem->BaseName(file)) << md.fMD5;
slave->GetSocket()->Send(mess);
TMessage *reply;
slave->GetSocket()->Recv(reply);
if (reply->What() != kPROOF_CHECKFILE)
sendto = kTRUE;
delete reply;
}
return sendto;
}
Int_t TProof::SendFile(const char *file, Int_t opt, const char *rfile, TSlave *wrk)
{
if (!IsValid()) return -1;
TList *slaves = fActiveSlaves;
if (wrk) {
slaves = new TList();
slaves->Add(wrk);
}
if (slaves->GetSize() == 0) return 0;
#ifndef R__WIN32
Int_t fd = open(file, O_RDONLY);
#else
Int_t fd = open(file, O_RDONLY | O_BINARY);
#endif
if (fd < 0) {
SysError("SendFile", "cannot open file %s", file);
return -1;
}
Long64_t size;
Long_t id, flags, modtime;
if (gSystem->GetPathInfo(file, &id, &size, &flags, &modtime) == 1) {
Error("SendFile", "cannot stat file %s", file);
return -1;
}
if (size == 0) {
Error("SendFile", "empty file %s", file);
return -1;
}
Bool_t bin = (opt & kBinary) ? kTRUE : kFALSE;
Bool_t force = (opt & kForce) ? kTRUE : kFALSE;
Bool_t fw = (opt & kForward) ? kTRUE : kFALSE;
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
Int_t nsl = 0;
TIter next(slaves);
TSlave *sl;
const char *fnam = (rfile) ? rfile : gSystem->BaseName(file);
while ((sl = (TSlave *)next())) {
if (!sl->IsValid())
continue;
Bool_t sendto = force ? kTRUE : CheckFile(file, sl, modtime);
if (sl->fSlaveType == TSlave::kSlave && !sendto)
continue;
size = sendto ? size : 0;
PDB(kPackage,2)
if (size > 0) {
if (!nsl)
Info("SendFile", "sending file %s to:", file);
printf(" slave = %s:%s\n", sl->GetName(), sl->GetOrdinal());
}
sprintf(buf, "%s %d %lld %d", fnam, bin, size, fw);
if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
MarkBad(sl);
continue;
}
if (!sendto)
continue;
lseek(fd, 0, SEEK_SET);
Int_t len;
do {
while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendFile", "error reading from file %s", file);
Interrupt(kSoftInterrupt, kActive);
close(fd);
return -1;
}
if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
SysError("SendFile", "error writing to slave %s:%s (now offline)",
sl->GetName(), sl->GetOrdinal());
MarkBad(sl);
break;
}
} while (len > 0);
nsl++;
}
close(fd);
if (slaves != fActiveSlaves)
SafeDelete(slaves);
return nsl;
}
Int_t TProof::SendObject(const TObject *obj, ESlaves list)
{
if (!IsValid() || !obj) return -1;
TMessage mess(kMESS_OBJECT);
mess.WriteObject(obj);
return Broadcast(mess, list);
}
Int_t TProof::SendPrint(Option_t *option)
{
if (!IsValid()) return -1;
Broadcast(option, kPROOF_PRINT, kActive);
return Collect(kActive);
}
void TProof::SetLogLevel(Int_t level, UInt_t mask)
{
char str[32];
fLogLevel = level;
gProofDebugLevel = level;
gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
sprintf(str, "%d %u", level, mask);
Broadcast(str, kPROOF_LOGLEVEL, kAll);
}
Int_t TProof::SetParallelSilent(Int_t nodes)
{
if (!IsValid()) return -1;
if (IsMaster()) {
GoParallel(nodes);
return SendCurrentState();
} else {
PDB(kGlobal,1) Info("SetParallelSilent", "request %d node%s", nodes,
nodes == 1 ? "" : "s");
TMessage mess(kPROOF_PARALLEL);
mess << nodes;
Broadcast(mess);
Collect();
Int_t n = GetParallel();
PDB(kGlobal,1) Info("SetParallelSilent", "got %d node%s", n, n == 1 ? "" : "s");
return n;
}
}
Int_t TProof::SetParallel(Int_t nodes)
{
Int_t n = SetParallelSilent(nodes);
if (!IsMaster()) {
if (n < 1)
printf("PROOF set to sequential mode\n");
else
printf("PROOF set to parallel mode (%d worker%s)\n",
n, n == 1 ? "" : "s");
}
return n;
}
Int_t TProof::GoParallel(Int_t nodes, Bool_t attach)
{
if (!IsValid()) return -1;
if (nodes < 0) nodes = 0;
fActiveSlaves->Clear();
fActiveMonitor->RemoveAll();
TIter next(fSlaves);
int cnt = 0;
TSlave *sl;
fEndMaster = IsMaster() ? kTRUE : kFALSE;
while (cnt < nodes && (sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (strcmp("IGNORE", sl->GetImage()) == 0) continue;
Int_t slavenodes = 0;
if (sl->GetSlaveType() == TSlave::kSlave) {
fActiveSlaves->Add(sl);
fActiveMonitor->Add(sl->GetSocket());
slavenodes = 1;
} else if (sl->GetSlaveType() == TSlave::kMaster) {
fEndMaster = kFALSE;
TMessage mess(kPROOF_PARALLEL);
if (!attach) {
mess << nodes-cnt;
} else {
mess.SetWhat(kPROOF_LOGFILE);
mess << -1 << -1;
}
if (sl->GetSocket()->Send(mess) == -1) {
MarkBad(sl);
slavenodes = 0;
} else {
Collect(sl);
fActiveSlaves->Add(sl);
fActiveMonitor->Add(sl->GetSocket());
if (sl->GetParallel() > 0) {
slavenodes = sl->GetParallel();
} else {
slavenodes = 0;
}
}
} else {
Error("GoParallel", "TSlave is neither Master nor Slave");
R__ASSERT(0);
}
cnt += slavenodes;
}
}
AskStatistics();
FindUniqueSlaves();
if (!attach)
SendGroupView();
Int_t n = GetParallel();
if (!IsMaster()) {
if (n < 1)
printf("PROOF set to sequential mode\n");
else
printf("PROOF set to parallel mode (%d worker%s)\n",
n, n == 1 ? "" : "s");
}
PDB(kGlobal,1) Info("GoParallel", "got %d node%s", n, n == 1 ? "" : "s");
return n;
}
void TProof::ShowCache(Bool_t all)
{
if (!IsValid()) return;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kShowCache) << all;
Broadcast(mess, kUnique);
if (all) {
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kShowSubCache) << all;
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
} else {
Collect(kUnique);
}
}
void TProof::ClearCache()
{
if (!IsValid()) return;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kClearCache);
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kClearSubCache);
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
fFileMap.clear();
}
void TProof::ShowPackages(Bool_t all)
{
if (!IsValid()) return;
if (!IsMaster()) {
printf("*** Package cache client:%s ***\n", fPackageDir.Data());
fflush(stdout);
gSystem->Exec(Form("%s %s", kLS, fPackageDir.Data()));
}
TMessage mess(kPROOF_CACHE);
mess << Int_t(kShowPackages) << all;
Broadcast(mess, kUnique);
if (all) {
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kShowSubPackages) << all;
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
} else {
Collect(kUnique);
}
}
void TProof::ShowEnabledPackages(Bool_t all)
{
if (!IsValid()) return;
if (!IsMaster()) {
printf("*** Enabled packages on client on %s\n", gSystem->HostName());
TIter next(fEnabledPackagesOnClient);
while (TObjString *str = (TObjString*) next())
printf("%s\n", str->GetName());
}
TMessage mess(kPROOF_CACHE);
mess << Int_t(kShowEnabledPackages) << all;
Broadcast(mess);
Collect();
}
Int_t TProof::ClearPackages()
{
if (!IsValid()) return -1;
if (UnloadPackages() == -1)
return -1;
if (DisablePackages() == -1)
return -1;
return fStatus;
}
Int_t TProof::ClearPackage(const char *package)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("ClearPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
if (UnloadPackage(pac) == -1)
return -1;
if (DisablePackage(pac) == -1)
return -1;
return fStatus;
}
Int_t TProof::DisablePackage(const char *package)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("DisablePackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
TMessage mess(kPROOF_CACHE);
mess << Int_t(kDisablePackage) << pac;
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kDisableSubPackage) << pac;
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
return fStatus;
}
Int_t TProof::DisablePackages()
{
if (!IsValid()) return -1;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kDisablePackages);
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kDisableSubPackages);
Broadcast(mess2, fNonUniqueMasters);
Collect(kAllUnique);
return fStatus;
}
Int_t TProof::BuildPackage(const char *package, EBuildPackageOpt opt)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("BuildPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
Bool_t buildOnClient = kTRUE;
if (opt == kDontBuildOnClient) {
buildOnClient = kFALSE;
opt = kBuildAll;
}
if (opt <= 0) {
TMessage mess(kPROOF_CACHE);
mess << Int_t(kBuildPackage) << pac;
Broadcast(mess, kUnique);
TMessage mess2(kPROOF_CACHE);
mess2 << Int_t(kBuildSubPackage) << pac;
Broadcast(mess2, fNonUniqueMasters);
}
if (opt >= 0) {
Int_t st = 0;
if (buildOnClient)
st = BuildPackageOnClient(pac);
Collect(kAllUnique);
if (fStatus < 0 || st < 0)
return -1;
}
return 0;
}
Int_t TProof::BuildPackageOnClient(const TString &package)
{
if (!IsMaster()) {
Int_t status = 0;
TString pdir, ocwd;
fPackageLock->Lock();
pdir = fPackageDir + "/" + package;
if (gSystem->AccessPathName(pdir)) {
Error("BuildPackageOnClient", "package %s does not exist",
package.Data());
fPackageLock->Unlock();
return -1;
} else if (gSystem->AccessPathName(pdir + "/PROOF-INF")) {
Error("BuildPackageOnClient", "package %s does not have a PROOF-INF directory",
package.Data());
fPackageLock->Unlock();
return -1;
}
PDB(kPackage, 1)
Info("BuildPackageOnCLient",
"package %s exists and has PROOF-INF directory", package.Data());
ocwd = gSystem->WorkingDirectory();
gSystem->ChangeDirectory(pdir);
if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
if (f) {
TString v;
v.Gets(f);
fclose(f);
if (v != gROOT->GetVersion()) {
if (gSystem->Exec("PROOF-INF/BUILD.sh clean")) {
Error("BuildPackageOnClient", "cleaning package %s on the client failed", package.Data());
status = -1;
}
}
}
if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
Error("BuildPackageOnClient", "building package %s on the client failed", package.Data());
status = -1;
}
f = fopen("PROOF-INF/proofvers.txt", "w");
if (f) {
fputs(gROOT->GetVersion(), f);
fclose(f);
}
} else {
PDB(kPackage, 1)
Info("BuildPackageOnCLient",
"package %s exists but has no PROOF-INF/BUILD.sh script", package.Data());
}
gSystem->ChangeDirectory(ocwd);
fPackageLock->Unlock();
return status;
}
return 0;
}
Int_t TProof::LoadPackage(const char *package, Bool_t notOnClient)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("LoadPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
if (!notOnClient)
if (LoadPackageOnClient(pac) == -1)
return -1;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kLoadPackage) << pac;
Broadcast(mess);
Collect();
return fStatus;
}
Int_t TProof::LoadPackageOnClient(const TString &package)
{
if (!IsMaster()) {
Int_t status = 0;
TString pdir, ocwd;
if (fEnabledPackagesOnClient->FindObject(package)) {
Info("LoadPackageOnClient",
"package %s already loaded", package.Data());
return 0;
}
pdir = fPackageDir + "/" + package;
ocwd = gSystem->WorkingDirectory();
gSystem->ChangeDirectory(pdir);
if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
Int_t err = 0;
Int_t errm = gROOT->Macro("PROOF-INF/SETUP.C", &err);
if (errm < 0)
status = -1;
if (err > TInterpreter::kNoError && err <= TInterpreter::kFatal)
status = -1;
} else {
PDB(kPackage, 1)
Info("LoadPackageOnCLient",
"package %s exists but has no PROOF-INF/SETUP.C script", package.Data());
}
gSystem->ChangeDirectory(ocwd);
if (!status) {
fPackageLock->Lock();
FileStat_t stat;
Int_t st = gSystem->GetPathInfo(package, stat);
if (stat.fIsLink)
gSystem->Unlink(package);
else if (st == 0) {
Error("LoadPackageOnClient", "cannot create symlink %s in %s on client, "
"another item with same name already exists", package.Data(), ocwd.Data());
fPackageLock->Unlock();
return -1;
}
gSystem->Symlink(pdir, package);
fPackageLock->Unlock();
gSystem->AddIncludePath(TString("-I") + package);
gROOT->ProcessLine(TString(".include ") + package);
fEnabledPackagesOnClient->Add(new TObjString(package));
PDB(kPackage, 1)
Info("LoadPackageOnClient",
"package %s successfully loaded", package.Data());
} else
Error("LoadPackageOnClient", "loading package %s on client failed", package.Data());
return status;
}
return 0;
}
Int_t TProof::UnloadPackage(const char *package)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("UnloadPackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
TMessage mess(kPROOF_CACHE);
mess << Int_t(kUnloadPackage) << pac;
Broadcast(mess);
Collect();
return fStatus;
}
Int_t TProof::UnloadPackages()
{
if (!IsValid()) return -1;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kUnloadPackages);
Broadcast(mess);
Collect();
return fStatus;
}
Int_t TProof::EnablePackage(const char *package, Bool_t notOnClient)
{
if (!IsValid()) return -1;
if (!package || !strlen(package)) {
Error("EnablePackage", "need to specify a package name");
return -1;
}
TString pac = package;
if (pac.EndsWith(".par"))
pac.Remove(pac.Length()-4);
pac = gSystem->BaseName(pac);
EBuildPackageOpt opt = kBuildAll;
if (notOnClient)
opt = kDontBuildOnClient;
if (BuildPackage(pac, opt) == -1)
return -1;
if (LoadPackage(pac, notOnClient) == -1)
return -1;
return 0;
}
Int_t TProof::UploadPackage(const char *tpar, EUploadPackageOpt opt)
{
if (!IsValid()) return -1;
TString par = tpar;
if (!par.EndsWith(".par")) {
Error("UploadPackage", "package %s must have extension .par", tpar);
return -1;
}
gSystem->ExpandPathName(par);
if (gSystem->AccessPathName(par, kReadPermission)) {
Error("UploadPackage", "package %s does not exist", par.Data());
return -1;
}
TMD5 *md5 = TMD5::FileChecksum(par);
if (UploadPackageOnClient(par, opt, md5) == -1) {
delete md5;
return -1;
}
TMessage mess(kPROOF_CHECKFILE);
mess << TString("+")+TString(gSystem->BaseName(par)) << (*md5);
TMessage mess2(kPROOF_CHECKFILE);
mess2 << TString("-")+TString(gSystem->BaseName(par)) << (*md5);
TMessage mess3(kPROOF_CHECKFILE);
mess3 << TString("=")+TString(gSystem->BaseName(par)) << (*md5);
delete md5;
if (fProtocol > 8) {
mess << (UInt_t) opt;
mess2 << (UInt_t) opt;
mess3 << (UInt_t) opt;
}
TIter next(fUniqueSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *) next())) {
if (!sl->IsValid())
continue;
sl->GetSocket()->Send(mess);
TMessage *reply;
sl->GetSocket()->Recv(reply);
if (reply->What() != kPROOF_CHECKFILE) {
if (fProtocol > 5) {
if (SendFile(par, (kBinary | kForce), Form("%s/%s/%s",
sl->GetProofWorkDir(), kPROOF_PackDir,
gSystem->BaseName(par)), sl) < 0) {
Error("UploadPackage", "problems uploading file %s", par.Data());
SafeDelete(reply);
return -1;
}
} else {
TFTP ftp(TString("root://")+sl->GetName(), 1);
if (!ftp.IsZombie()) {
ftp.cd(Form("%s/%s", sl->GetProofWorkDir(), kPROOF_PackDir));
ftp.put(par, gSystem->BaseName(par));
}
}
sl->GetSocket()->Send(mess2);
SafeDelete(reply);
sl->GetSocket()->Recv(reply);
if (!reply || reply->What() != kPROOF_CHECKFILE) {
Error("UploadPackage", "unpacking of package %s failed", par.Data());
SafeDelete(reply);
return -1;
}
}
SafeDelete(reply);
}
TIter nextmaster(fNonUniqueMasters);
TSlave *ma;
while ((ma = (TSlave *) nextmaster())) {
if (!ma->IsValid())
continue;
ma->GetSocket()->Send(mess3);
TMessage *reply = 0;
ma->GetSocket()->Recv(reply);
if (!reply || reply->What() != kPROOF_CHECKFILE) {
Error("UploadPackage", "package %s did not exist on submaster %s",
par.Data(), ma->GetOrdinal());
SafeDelete(reply);
return -1;
}
SafeDelete(reply);
}
return 0;
}
Int_t TProof::UploadPackageOnClient(const TString &par, EUploadPackageOpt opt, TMD5 *md5)
{
Int_t status = 0;
if (!IsMaster()) {
fPackageLock->Lock();
TString lpar = fPackageDir + "/" + gSystem->BaseName(par);
FileStat_t stat;
Int_t st = gSystem->GetPathInfo(lpar, stat);
if (stat.fIsLink)
gSystem->Unlink(lpar);
else if (st == 0) {
Error("UploadPackageOnClient", "cannot create symlink %s on client, "
"another item with same name already exists",
lpar.Data());
fPackageLock->Unlock();
return -1;
}
if (!gSystem->IsAbsoluteFileName(par)) {
TString fpar = par;
gSystem->Symlink(gSystem->PrependPathName(gSystem->WorkingDirectory(), fpar), lpar);
} else
gSystem->Symlink(par, lpar);
TString packnam = par(0, par.Length() - 4);
packnam = gSystem->BaseName(packnam);
TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
TMD5 *md5local = TMD5::ReadChecksum(md5f);
if (!md5local || (*md5) != (*md5local)) {
Int_t st = 0;
if ((opt & TProof::kRemoveOld)) {
st = gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
packnam.Data()));
if (st)
Error("UploadPackageOnClient", "failure executing: %s %s/%s",
kRM, fPackageDir.Data(), packnam.Data());
}
char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
kExecutePermission);
if (gunzip) {
st = gSystem->Exec(Form(kUNTAR2, gunzip, par.Data(), fPackageDir.Data()));
if (st)
Error("Uploadpackage", "failure executing: %s",
Form(kUNTAR2, gunzip, par.Data(), fPackageDir.Data()));
delete [] gunzip;
} else
Error("UploadPackageOnClient", "%s not found", kGUNZIP);
if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
Error("UploadPackageOnClient",
"package %s did not unpack into %s/%s", par.Data(), fPackageDir.Data(),
packnam.Data());
status = -1;
} else {
TMD5::WriteChecksum(md5f, md5);
}
}
fPackageLock->Unlock();
delete md5local;
}
return status;
}
Int_t TProof::AddDynamicPath(const char *libpath)
{
if ((!libpath || !strlen(libpath))) {
if (gDebug > 0)
Info("AddDynamicPath", "list is empty - nothing to do");
return 0;
}
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("lib") << (Bool_t)kTRUE;
if (libpath && strlen(libpath))
m << TString(libpath);
else
m << TString("-");
Broadcast(m);
Collect();
return 0;
}
Int_t TProof::AddIncludePath(const char *incpath)
{
if ((!incpath || !strlen(incpath))) {
if (gDebug > 0)
Info("AddIncludePath", "list is empty - nothing to do");
return 0;
}
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("inc") << (Bool_t)kTRUE;
if (incpath && strlen(incpath))
m << TString(incpath);
else
m << TString("-");
Broadcast(m);
Collect();
return 0;
}
Int_t TProof::RemoveDynamicPath(const char *libpath)
{
if ((!libpath || !strlen(libpath))) {
if (gDebug > 0)
Info("AddDynamicPath", "list is empty - nothing to do");
return 0;
}
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("lib") <<(Bool_t)kFALSE;
if (libpath && strlen(libpath))
m << TString(libpath);
else
m << TString("-");
Broadcast(m);
Collect();
return 0;
}
Int_t TProof::RemoveIncludePath(const char *incpath)
{
if ((!incpath || !strlen(incpath))) {
if (gDebug > 0)
Info("RemoveIncludePath", "list is empty - nothing to do");
return 0;
}
TMessage m(kPROOF_LIB_INC_PATH);
m << TString("inc") << (Bool_t)kFALSE;
if (incpath && strlen(incpath))
m << TString(incpath);
else
m << TString("-");
Broadcast(m);
Collect();
return 0;
}
TList *TProof::GetListOfPackages()
{
if (!IsValid())
return (TList *)0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kListPackages);
Broadcast(mess);
Collect();
return fAvailablePackages;
}
TList *TProof::GetListOfEnabledPackages()
{
if (!IsValid())
return (TList *)0;
TMessage mess(kPROOF_CACHE);
mess << Int_t(kListEnabledPackages);
Broadcast(mess);
Collect();
return fEnabledPackages;
}
void TProof::Progress(Long64_t total, Long64_t processed)
{
PDB(kGlobal,1)
Info("Progress","%2f (%lld/%lld)", 100.*processed/total, processed, total);
EmitVA("Progress(Long64_t,Long64_t)", 2, total, processed);
}
void TProof::Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{
PDB(kGlobal,1)
Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
initTime, procTime, evtrti, mbrti);
EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
7, total, processed, bytesread, initTime, procTime, evtrti, mbrti);
}
void TProof::Feedback(TList *objs)
{
PDB(kGlobal,1)
Info("Feedback","%d objects", objs->GetSize());
PDB(kFeedback,1) {
Info("Feedback","%d objects", objs->GetSize());
objs->ls();
}
Emit("Feedback(TList *objs)", (Long_t) objs);
}
void TProof::CloseProgressDialog()
{
PDB(kGlobal,1)
Info("CloseProgressDialog",
"called: have progress dialog: %d", fProgressDialogStarted);
if (!fProgressDialogStarted)
return;
Emit("CloseProgressDialog()");
}
void TProof::ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst,
Long64_t ent)
{
PDB(kGlobal,1)
Info("ResetProgressDialog","(%s,%d,%lld,%lld)", sel, sz, fst, ent);
EmitVA("ResetProgressDialog(const char*,Int_t,Long64_t,Long64_t)",
4, sel, sz, fst, ent);
}
void TProof::StartupMessage(const char *msg, Bool_t st, Int_t done, Int_t total)
{
PDB(kGlobal,1)
Info("StartupMessage","(%s,%d,%d,%d)", msg, st, done, total);
EmitVA("StartupMessage(const char*,Bool_t,Int_t,Int_t)",
4, msg, st, done, total);
}
void TProof::DataSetStatus(const char *msg, Bool_t st, Int_t done, Int_t total)
{
PDB(kGlobal,1)
Info("DataSetStatus","(%s,%d,%d,%d)", msg, st, done, total);
EmitVA("DataSetStatus(const char*,Bool_t,Int_t,Int_t)",
4, msg, st, done, total);
}
void TProof::SendDataSetStatus(const char *msg, UInt_t n,
UInt_t tot, Bool_t st)
{
if (IsMaster()) {
TMessage mess(kPROOF_DATASET_STATUS);
mess << TString(msg) << tot << n << st;
gProofServ->GetSocket()->Send(mess);
}
}
void TProof::QueryResultReady(const char *ref)
{
PDB(kGlobal,1)
Info("QueryResultReady","ref: %s", ref);
Emit("QueryResultReady(const char*)",ref);
}
void TProof::ValidateDSet(TDSet *dset)
{
if (dset->ElementsValid()) return;
TList nodes;
nodes.SetOwner();
TList slholder;
slholder.SetOwner();
TList elemholder;
elemholder.SetOwner();
TIter nextSlave(GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
TList *sllist = 0;
TPair *p = dynamic_cast<TPair*>(nodes.FindObject(sl->GetName()));
if (!p) {
sllist = new TList;
sllist->SetName(sl->GetName());
slholder.Add(sllist);
TList *elemlist = new TList;
elemlist->SetName(TString(sl->GetName())+"_elem");
elemholder.Add(elemlist);
nodes.Add(new TPair(sllist, elemlist));
} else {
sllist = dynamic_cast<TList*>(p->Key());
}
sllist->Add(sl);
}
TList nonLocal;
for (Int_t i = 0; i < 2; i++) {
Bool_t local = i>0?kFALSE:kTRUE;
TIter nextElem(local ? dset->GetListOfElements() : &nonLocal);
while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
if (elem->GetValid()) continue;
TPair *p = dynamic_cast<TPair*>(local?nodes.FindObject(TUrl(elem->GetFileName()).GetHost()):nodes.At(0));
if (p) {
TList *eli = dynamic_cast<TList*>(p->Value());
TList *sli = dynamic_cast<TList*>(p->Key());
eli->Add(elem);
TPair *p2 = p;
Bool_t stop = kFALSE;
while (!stop) {
TPair *p3 = dynamic_cast<TPair*>(nodes.After(p2->Key()));
if (p3) {
Int_t nelem = dynamic_cast<TList*>(p3->Value())->GetSize();
Int_t nsl = dynamic_cast<TList*>(p3->Key())->GetSize();
if (nelem*sli->GetSize() < eli->GetSize()*nsl) p2 = p3;
else stop = kTRUE;
} else {
stop = kTRUE;
}
}
if (p2!=p) {
nodes.Remove(p->Key());
nodes.AddAfter(p2->Key(), p);
}
} else {
if (local) {
nonLocal.Add(elem);
} else {
Error("ValidateDSet", "No Node to allocate TDSetElement to");
R__ASSERT(0);
}
}
}
}
TList usedslaves;
TIter nextNode(&nodes);
SetDSet(dset);
while (TPair *node = dynamic_cast<TPair*>(nextNode())) {
TList *slaves = dynamic_cast<TList*>(node->Key());
TList *setelements = dynamic_cast<TList*>(node->Value());
Int_t nslaves = slaves->GetSize();
Int_t nelements = setelements->GetSize();
for (Int_t i=0; i<nslaves; i++) {
TDSet copyset(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
for (Int_t j = (i*nelements)/nslaves;
j < ((i+1)*nelements)/nslaves;
j++) {
TDSetElement *elem =
dynamic_cast<TDSetElement*>(setelements->At(j));
copyset.Add(elem->GetFileName(), elem->GetObjName(),
elem->GetDirectory(), elem->GetFirst(),
elem->GetNum(), elem->GetMsd());
}
if (copyset.GetListOfElements()->GetSize()>0) {
TMessage mesg(kPROOF_VALIDATE_DSET);
mesg << ©set;
TSlave *sl = dynamic_cast<TSlave*>(slaves->At(i));
PDB(kGlobal,1) Info("ValidateDSet",
"Sending TDSet with %d elements to slave %s"
" to be validated",
copyset.GetListOfElements()->GetSize(),
sl->GetOrdinal());
sl->GetSocket()->Send(mesg);
usedslaves.Add(sl);
}
}
}
PDB(kGlobal,1) Info("ValidateDSet","Calling Collect");
Collect(&usedslaves);
SetDSet(0);
}
void TProof::AddInput(TObject *obj)
{
fPlayer->AddInput(obj);
}
void TProof::ClearInput()
{
fPlayer->ClearInput();
AddInput(fFeedback);
}
TObject *TProof::GetOutput(const char *name)
{
return fPlayer->GetOutput(name);
}
TList *TProof::GetOutputList()
{
return fPlayer->GetOutputList();
}
void TProof::SetParameter(const char *par, const char *value)
{
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TNamed(par, value));
}
void TProof::SetParameter(const char *par, Long_t value)
{
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Long_t>(par, value));
}
void TProof::SetParameter(const char *par, Long64_t value)
{
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Long64_t>(par, value));
}
void TProof::SetParameter(const char *par, Double_t value)
{
TList *il = fPlayer->GetInputList();
TObject *item = il->FindObject(par);
if (item) {
il->Remove(item);
delete item;
}
il->Add(new TParameter<Double_t>(par, value));
}
TObject *TProof::GetParameter(const char *par) const
{
TList *il = fPlayer->GetInputList();
return il->FindObject(par);
}
void TProof::DeleteParameters(const char *wildcard)
{
if (!wildcard) wildcard = "";
TRegexp re(wildcard, kTRUE);
Int_t nch = strlen(wildcard);
TList *il = fPlayer->GetInputList();
TObject *p;
TIter next(il);
while ((p = next())) {
TString s = p->GetName();
if (nch && s != wildcard && s.Index(re) == kNPOS) continue;
il->Remove(p);
delete p;
}
}
void TProof::ShowParameters(const char *wildcard) const
{
if (!wildcard) wildcard = "";
TRegexp re(wildcard, kTRUE);
Int_t nch = strlen(wildcard);
TList *il = fPlayer->GetInputList();
TObject *p;
TIter next(il);
while ((p = next())) {
TString s = p->GetName();
if (nch && s != wildcard && s.Index(re) == kNPOS) continue;
if (p->IsA() == TNamed::Class()) {
Printf("%s\t\t\t%s", s.Data(), p->GetTitle());
} else if (p->IsA() == TParameter<Long_t>::Class()) {
Printf("%s\t\t\t%ld", s.Data(), dynamic_cast<TParameter<Long_t>*>(p)->GetVal());
} else if (p->IsA() == TParameter<Long64_t>::Class()) {
Printf("%s\t\t\t%lld", s.Data(), dynamic_cast<TParameter<Long64_t>*>(p)->GetVal());
} else if (p->IsA() == TParameter<Double_t>::Class()) {
Printf("%s\t\t\t%f", s.Data(), dynamic_cast<TParameter<Double_t>*>(p)->GetVal());
} else {
Printf("%s\t\t\t%s", s.Data(), p->GetTitle());
}
}
}
void TProof::AddFeedback(const char *name)
{
PDB(kFeedback, 3)
Info("AddFeedback", "Adding object \"%s\" to feedback", name);
if (fFeedback->FindObject(name) == 0)
fFeedback->Add(new TObjString(name));
}
void TProof::RemoveFeedback(const char *name)
{
TObject *obj = fFeedback->FindObject(name);
if (obj != 0) {
fFeedback->Remove(obj);
delete obj;
}
}
void TProof::ClearFeedback()
{
fFeedback->Delete();
}
void TProof::ShowFeedback() const
{
if (fFeedback->GetSize() == 0) {
Info("","no feedback requested");
return;
}
fFeedback->Print();
}
TList *TProof::GetFeedbackList() const
{
return fFeedback;
}
TTree *TProof::GetTreeHeader(TDSet *dset)
{
TList *l = GetListOfActiveSlaves();
TSlave *sl = (TSlave*) l->First();
if (sl == 0) {
Error("GetTreeHeader", "No connection");
return 0;
}
TSocket *soc = sl->GetSocket();
TMessage msg(kPROOF_GETTREEHEADER);
msg << dset;
soc->Send(msg);
TMessage *reply;
Int_t d = soc->Recv(reply);
if (reply <= 0) {
Error("GetTreeHeader", "Error getting a replay from the master.Result %d", (int) d);
return 0;
}
TString s1;
TTree * t;
(*reply) >> s1;
(*reply) >> t;
PDB(kGlobal, 1)
if (t)
Info("GetTreeHeader", Form("%s, message size: %d, entries: %d\n",
s1.Data(), reply->BufferSize(), (int) t->GetMaxEntryLoop()));
else
Info("GetTreeHeader", Form("%s, message size: %d\n", s1.Data(), reply->BufferSize()));
delete reply;
return t;
}
TDrawFeedback *TProof::CreateDrawFeedback()
{
return new TDrawFeedback(this);
}
void TProof::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
{
if (f)
f->SetOption(opt);
}
void TProof::DeleteDrawFeedback(TDrawFeedback *f)
{
if (f)
delete f;
}
TList *TProof::GetOutputNames()
{
return 0;
}
void TProof::Browse(TBrowser *b)
{
b->Add(fActiveSlaves, fActiveSlaves->Class(), "fActiveSlaves");
b->Add(&fMaster, fMaster.Class(), "fMaster");
b->Add(fFeedback, fFeedback->Class(), "fFeedback");
b->Add(fChains, fChains->Class(), "fChains");
b->Add(fPlayer->GetInputList(), fPlayer->GetInputList()->Class(), "InputList");
if (fPlayer->GetOutputList())
b->Add(fPlayer->GetOutputList(), fPlayer->GetOutputList()->Class(), "OutputList");
if (fPlayer->GetListOfResults())
b->Add(fPlayer->GetListOfResults(),
fPlayer->GetListOfResults()->Class(), "ListOfResults");
}
TProofPlayer *TProof::MakePlayer()
{
SetPlayer(new TProofPlayerRemote(this));
return GetPlayer();
}
void TProof::AddChain(TChain *chain)
{
fChains->Add(chain);
}
void TProof::RemoveChain(TChain *chain)
{
fChains->Remove(chain);
}
void *TProof::SlaveStartupThread(void *arg)
{
if (fgSemaphore) fgSemaphore->Wait();
TProofThreadArg *ta = (TProofThreadArg *)arg;
PDB(kGlobal,1)
::Info("TProof::SlaveStartupThread",
"Starting slave %s on host %s", ta->fOrd.Data(), ta->fUrl->GetHost());
TSlave *sl = 0;
if (ta->fType == TSlave::kSlave) {
sl = ta->fProof->CreateSlave(ta->fUrl->GetUrl(), ta->fOrd,
ta->fPerf, ta->fImage, ta->fWorkdir);
if (sl && sl->IsValid())
sl->SetupServ(TSlave::kSlave, 0);
} else {
sl = ta->fProof->CreateSubmaster(ta->fUrl->GetUrl(), ta->fOrd,
ta->fImage, ta->fMsd);
if (sl && sl->IsValid())
sl->SetupServ(TSlave::kMaster, ta->fWorkdir);
}
if (sl && sl->IsValid()) {
{ R__LOCKGUARD2(gProofMutex);
ta->fSlaves->Add(sl);
if (ta->fClaims) {
TCondorSlave *c = ta->fCslave;
ta->fClaims->Remove(c);
}
}
PDB(kGlobal,1)
::Info("TProof::SlaveStartupThread",
"slave %s on host %s created and added to list",
ta->fOrd.Data(), ta->fUrl->GetHost());
} else {
SafeDelete(sl);
::Error("TProof::SlaveStartupThread",
"slave %s on host %s could not be created",
ta->fOrd.Data(), ta->fUrl->GetHost());
}
if (fgSemaphore) fgSemaphore->Post();
return 0;
}
void TProof::GetLog(Int_t start, Int_t end)
{
if (!IsValid() || IsMaster()) return;
TMessage msg(kPROOF_LOGFILE);
msg << start << end;
Broadcast(msg, kActive);
Collect(kActive);
}
void TProof::PutLog(TQueryResult *pq)
{
if (!pq) return;
TList *lines = pq->GetLogFile()->GetListOfLines();
if (lines) {
TIter nxl(lines);
TObjString *l = 0;
while ((l = (TObjString *)nxl()))
EmitVA("LogMessage(const char*,Bool_t)", 2, l->GetName(), kFALSE);
}
}
void TProof::ShowLog(const char *queryref)
{
Retrieve(queryref);
if (fPlayer) {
if (queryref) {
if (fPlayer->GetListOfResults()) {
TIter nxq(fPlayer->GetListOfResults());
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq()))
if (strstr(queryref, qr->GetTitle()) &&
strstr(queryref, qr->GetName()))
break;
if (qr) {
PutLog(qr);
return;
}
}
}
}
}
void TProof::ShowLog(Int_t qry)
{
Int_t nowlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_CUR);
Int_t startlog = nowlog;
Int_t endlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_END);
lseek(fileno(fLogFileR), (off_t) nowlog, SEEK_SET);
if (qry == 0) {
startlog = 0;
lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
} else if (qry != -1) {
TQueryResult *pq = 0;
if (qry == -2) {
pq = (GetQueryResults()) ? ((TQueryResult *)(GetQueryResults()->Last())) : 0;
if (!pq) {
GetListOfQueries();
if (fQueries)
pq = (TQueryResult *)(fQueries->Last());
}
} else if (qry > 0) {
TList *queries = GetQueryResults();
if (queries) {
TIter nxq(queries);
while ((pq = (TQueryResult *)nxq()))
if (qry == pq->GetSeqNum())
break;
}
if (!pq) {
queries = GetListOfQueries();
TIter nxq(queries);
while ((pq = (TQueryResult *)nxq()))
if (qry == pq->GetSeqNum())
break;
}
}
if (pq) {
PutLog(pq);
return;
} else {
if (gDebug > 0)
Info("ShowLog","query %d not found in list", qry);
qry = -1;
}
}
UInt_t tolog = (UInt_t)(endlog - startlog);
if (tolog <= 0)
lseek(fileno(fLogFileR), (off_t) startlog, SEEK_SET);
Int_t np = 0;
char line[2048];
Int_t wanted = (tolog > sizeof(line)) ? sizeof(line) : tolog;
while (fgets(line, wanted, fLogFileR)) {
Int_t r = strlen(line);
if (!SendingLogToWindow()) {
if (line[r-1] != '\n') line[r-1] = '\n';
if (r > 0) {
char *p = line;
while (r) {
Int_t w = write(fileno(stdout), p, r);
if (w < 0) {
SysError("ShowLogFile", "error writing to stdout");
break;
}
r -= w;
p += w;
}
}
tolog -= strlen(line);
np++;
if (!(np%10)) {
char *opt = Getline("More (y/n)? [y]");
if (opt[0] == 'n')
break;
}
if (tolog <= 0)
break;
wanted = (tolog > sizeof(line)) ? sizeof(line) : tolog;
} else {
if (line[r-1] == '\n') line[r-1] = 0;
LogMessage(line, kFALSE);
}
}
if (!SendingLogToWindow()) {
write(fileno(stdout), "\n", 1);
}
if (qry > -1)
lseek(fileno(fLogFileR), (off_t) nowlog, SEEK_SET);
}
void TProof::cd(Int_t id)
{
if (GetManager()) {
TProofDesc *d = GetManager()->GetProofDesc(id);
if (d) {
if (d->GetProof()) {
gProof = d->GetProof();
return;
}
}
gProof = this;
}
return;
}
void TProof::Detach(Option_t *opt)
{
if (!IsValid()) return;
TSlave *sl = (TSlave *) fActiveSlaves->First();
TSocket *s = sl->GetSocket();
if (!sl || !(sl->IsValid()) || !s) {
Error("Detach","corrupted worker instance: wrk:%p, sock:%p", sl, s);
return;
}
Bool_t shutdown = (strchr(opt,'s') || strchr(opt,'S')) ? kTRUE : kFALSE;
if (shutdown && !IsIdle()) {
Remove("cleanupqueue");
Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
timeout = (timeout > 20) ? timeout : 20;
StopProcess(kFALSE, (Long_t) (timeout / 2));
Collect(kActive, timeout);
}
DeActivateAsyncInput();
sl->FlushSocket();
Close(opt);
if (fProgressDialogStarted)
CloseProgressDialog();
if (GetManager() && GetManager()->QuerySessions("L")) {
TIter nxd(GetManager()->QuerySessions("L"));
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
if (d->GetProof() == this) {
d->SetProof(0);
GetManager()->QuerySessions("L")->Remove(d);
break;
}
}
}
if (!fProgressDialogStarted)
delete this;
else
fValid = kFALSE;
return;
}
void TProof::SetAlias(const char *alias)
{
TNamed::SetTitle(alias);
if (IsMaster())
TNamed::SetName(alias);
if (!IsValid()) return;
if (!IsProofd() && !IsMaster()) {
TSlave *sl = (TSlave *) fActiveSlaves->First();
if (sl)
sl->SetAlias(alias);
}
return;
}
Int_t TProof::UploadDataSet(const char *dataSetName,
TList *files,
const char *desiredDest,
Int_t opt,
TList *skippedFiles)
{
if (strchr(dataSetName, '/')) {
if (strstr(dataSetName, "public") != dataSetName) {
Error("UploadDataSet",
"Name of public dataset should start with public/");
return kError;
}
}
if (opt & kOverwriteAllFiles && opt & kOverwriteNoFiles
|| opt & kNoOverwriteDataSet && opt & kAppend
|| opt & kOverwriteDataSet && opt & kAppend
|| opt & kNoOverwriteDataSet && opt & kOverwriteDataSet
|| opt & kAskUser && opt & (kOverwriteDataSet |
kNoOverwriteDataSet |
kAppend |
kOverwriteAllFiles |
kOverwriteNoFiles)) {
Error("UploadDataSet", "you specified contradicting options.");
return kError;
}
Int_t overwriteAll = (opt & kOverwriteAllFiles) ? kTRUE : kFALSE;
Int_t overwriteNone = (opt & kOverwriteNoFiles) ? kTRUE : kFALSE;
Int_t goodName = (opt & (kOverwriteDataSet | kAppend)) ? 1 : -1;
Int_t appendToDataSet = (opt & kAppend) ? kTRUE : kFALSE;
Int_t overwriteNoDataSet = (opt & kNoOverwriteDataSet) ? kTRUE : kFALSE;
if ((!skippedFiles || !&skippedFiles) && overwriteNone) {
Error("UploadDataSet",
"Provide pointer to TList object as skippedFiles argument when using kOverwriteNoFiles option.");
return kError;
}
if (skippedFiles && &skippedFiles)
if (skippedFiles->Class() != TList::Class()) {
Error("UploadDataSet",
"Provided skippedFiles argument does not point to a TList object.");
return kError;
}
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("UploadDataSet", "No connection to the master!");
return kError;
}
Int_t fileCount = 0;
TMessage *retMess;
if (goodName == -1) {
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kCheckDataSetName);
nameMess << TString(dataSetName);
Broadcast(nameMess);
master->Recv(retMess);
Collect();
if (retMess->What() == kMESS_NOTOK) {
while (goodName == -1 && !overwriteNoDataSet) {
Printf("Dataset %s already exist. ",
dataSetName);
Printf("Do you want to overwrite it[Yes/No/Append]?");
TString answer;
answer.ReadToken(cin);
if (!strncasecmp(answer.Data(), "y", 1)) {
goodName = 1;
} else if (!strncasecmp(answer.Data(), "n", 1)) {
goodName = 0;
} else if (!strncasecmp(answer.Data(), "a", 1)) {
goodName = 1;
appendToDataSet = kTRUE;
}
}
}
else if (retMess->What() == kMESS_OK)
goodName = 1;
else
Error("UploadDataSet", "unrecongnized message type: %d!",
retMess->What());
delete retMess;
}
if (goodName == 1) {
char *relativeDestDir = Form("%s/%s/",
gSystem->GetUserInfo()->fUser.Data(),
desiredDest?desiredDest:"");
relativeDestDir = CollapseSlashesInPath(relativeDestDir);
TString dest = Form("%s/%s", GetDataPoolUrl(), relativeDestDir);
delete[] relativeDestDir;
TList *fileList = new TList();
TFileMerger fileCopier;
TIter next(files);
while (TFileInfo *fileInfo = ((TFileInfo*)next())) {
TUrl *fileUrl = fileInfo->GetFirstUrl();
if (gSystem->AccessPathName(fileUrl->GetUrl()) == kFALSE) {
const char *ent = gSystem->BaseName(fileUrl->GetFile());
Int_t goodFileName = 1;
if (!overwriteAll &&
gSystem->AccessPathName(Form("%s/%s", dest.Data(), ent), kFileExists)
== kFALSE) {
goodFileName = -1;
while (goodFileName == -1 && !overwriteAll && !overwriteNone) {
Printf("File %s already exists. ", Form("%s/%s", dest.Data(), ent));
Printf("Do you want to overwrite it [Yes/No/all/none]?");
TString answer;
answer.ReadToken(cin);
if (!strncasecmp(answer.Data(), "y", 1))
goodFileName = 1;
else if (!strncasecmp(answer.Data(), "all", 3))
overwriteAll = kTRUE;
else if (!strncasecmp(answer.Data(), "none", 4))
overwriteNone = kTRUE;
else if (!strncasecmp(answer.Data(), "n", 1))
goodFileName = 0;
}
}
if (goodFileName == 1 || overwriteAll) {
Printf("Uploading %s to %s/%s",
fileUrl->GetUrl(), dest.Data(), ent);
if (fileCopier.Cp(fileUrl->GetUrl(),
Form("%s/%s", dest.Data(), ent))) {
fileList->Add(new TFileInfo(Form("%s/%s", dest.Data(), ent)));
} else
Error("UploadDataSet", "file %s was not copied", fileUrl->GetUrl());
} else {
fileList->Add(new TFileInfo(Form("%s/%s", dest.Data(), ent)));
if (skippedFiles && &skippedFiles) {
skippedFiles->Add(new TFileInfo(fileUrl->GetUrl()));
}
}
}
}
if ((fileCount = fileList->GetSize()) == 0) {
Printf("No files were copied. The dataset will not be saved");
} else {
if (CreateDataSet(dataSetName, fileList,
appendToDataSet?kAppend:kOverwriteDataSet) <= 0) {
Error("UploadDataSet", "Error while saving dataset!");
fileCount = kError;
}
}
fileList->SetOwner();
delete fileList;
} else if (overwriteNoDataSet) {
Printf("Dataset %s already exists", dataSetName);
return kDataSetExists;
}
return fileCount;
}
Int_t TProof::UploadDataSet(const char *dataSetName,
const char *files,
const char *desiredDest,
Int_t opt,
TList *skippedFiles)
{
TList *fileList = new TList();
void *dataSetDir = gSystem->OpenDirectory(gSystem->DirName(files));
const char* ent;
TString filesExp(gSystem->BaseName(files));
filesExp.ReplaceAll("*",".*");
TRegexp rg(filesExp);
while ((ent = gSystem->GetDirEntry(dataSetDir))) {
TString entryString(ent);
if (entryString.Index(rg) != kNPOS) {
TUrl *url = new TUrl(Form("%s/%s",
gSystem->DirName(files), ent), kTRUE);
if (gSystem->AccessPathName(url->GetUrl(), kReadPermission) == kFALSE)
fileList->Add(new TFileInfo(url->GetUrl()));
delete url;
}
}
Int_t fileCount;
if ((fileCount = fileList->GetSize()) == 0)
Printf("No files match your selection. The dataset will not be saved");
else
fileCount = UploadDataSet(dataSetName, fileList, desiredDest,
opt, skippedFiles);
fileList->SetOwner();
delete fileList;
return fileCount;
}
Int_t TProof::UploadDataSetFromFile(const char *dataset, const char *file,
const char *dest, Int_t opt)
{
Int_t fileCount = 0;
ifstream f;
f.open(gSystem->ExpandPathName(file), ifstream::out);
if (f.is_open()) {
while (f.good()) {
TString line;
line.ReadToDelim(f);
if (fileCount == 0) {
fileCount += UploadDataSet(dataset, line.Data(), dest, opt);
} else
fileCount += UploadDataSet(dataset, line.Data(), dest,
opt | kAppend);
}
f.close();
} else {
Error("UploadDataSetFromFile", "unable to open the specified file");
return -1;
}
return fileCount;
}
Int_t TProof::CreateDataSet(const char *dataSetName,
TList *files,
Int_t opt)
{
if (strchr(dataSetName, '/')) {
if (strstr(dataSetName, "public") != dataSetName) {
Error("CreateDataSet",
"Name of public dataset should start with public/");
return kError;
}
}
if (opt & kOverwriteDataSet && opt & kAppend
|| opt & kNoOverwriteDataSet && opt & kAppend
|| opt & kNoOverwriteDataSet && opt & kOverwriteDataSet
|| opt & kAskUser && opt & (kOverwriteDataSet |
kNoOverwriteDataSet |
kAppend)) {
Error("CreateDataSet", "you specified contradicting options.");
return kError;
}
if (opt & kOverwriteAllFiles || opt & kOverwriteNoFiles) {
Error("CreateDataSet", "you specified unsupported options.");
return kError;
}
Int_t goodName = (opt & (kOverwriteDataSet | kAppend)) ? 1 : -1;
Int_t appendToDataSet = (opt & kAppend) ? kTRUE : kFALSE;
Int_t overwriteNoDataSet = (opt & kNoOverwriteDataSet) ? kTRUE : kFALSE;
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("CreateDataSet", "No connection to the master!");
return kError;
}
Int_t fileCount = 0;
TMessage *retMess;
if (goodName == -1) {
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kCheckDataSetName);
nameMess << TString(dataSetName);
Broadcast(nameMess);
master->Recv(retMess);
Collect();
if (retMess->What() == kMESS_NOTOK) {
while (goodName == -1 && !overwriteNoDataSet) {
Printf("Dataset %s already exists. ",
dataSetName);
Printf("Do you want to overwrite it[Yes/No/Append]?");
TString answer;
answer.ReadToken(cin);
if (!strncasecmp(answer.Data(), "y", 1)) {
goodName = 1;
} else if (!strncasecmp(answer.Data(), "n", 1)) {
goodName = 0;
} else if (!strncasecmp(answer.Data(), "a", 1)) {
goodName = 1;
appendToDataSet = kTRUE;
}
}
}
else if (retMess->What() == kMESS_OK)
goodName = 1;
else
Error("CreateDataSet", "unrecongnized message type: %d!",
retMess->What());
delete retMess;
}
if (goodName == 1) {
if ((fileCount = files->GetSize()) == 0) {
Printf("No files specified!");
} else {
TMessage mess(kPROOF_DATASETS);
if (appendToDataSet)
mess << Int_t(kAppendDataSet);
else
mess << Int_t(kCreateDataSet);
mess << TString(dataSetName);
mess.WriteObject(files);
Broadcast(mess);
if (master->Recv(retMess) <= 0) {
Error("CreateDataSet", "No response form the master");
fileCount = -1;
} else {
if (retMess->What() == kMESS_NOTOK) {
Printf("Dataset was not saved.");
fileCount = -1;
} else if (retMess->What() != kMESS_OK)
Error("CreateDataSet",
"Unexpected message type: %d", retMess->What());
delete retMess;
}
}
} else if (overwriteNoDataSet) {
Printf("Dataset %s already exists", dataSetName);
Collect();
return kDataSetExists;
}
Collect();
return fileCount;
}
TList *TProof::GetDataSets(const char *dir)
{
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("GetDataSets", "No connection to the master!");
return 0;
}
if (dir) {
if (strstr(dir, "public") != dir && strchr(dir, '~') != dir) {
Error("GetDataSets",
"directory should be of form '[~userName/]public'");
return 0;
}
}
TMessage mess(kPROOF_DATASETS);
mess << Int_t(kGetDataSets);
mess << TString(dir?dir:"");
Broadcast(mess);
TMessage *retMess;
master->Recv(retMess);
TList *dataSetList = 0;
if (retMess->What() == kMESS_OBJECT) {
dataSetList = (TList*)(retMess->ReadObject(TList::Class()));
if (!dataSetList)
Error("GetDataSets", "Error receiving list of datasets");
} else
Printf("The dataset directory could not be open");
Collect();
delete retMess;
return dataSetList;
}
void TProof::ShowDataSets(const char *dir)
{
TList *dataSetList;
if ((dataSetList = GetDataSets(dir))) {
if (dir)
Printf("DataSets in %s :", dir);
else
Printf("Existing DataSets:");
TIter next(dataSetList);
while (TObjString *obj = (TObjString*)next())
Printf("%s", obj->GetString().Data());
dataSetList->SetOwner();
delete dataSetList;
} else
Printf("Error getting a list of datasets");
}
TList *TProof::GetDataSet(const char *dataset)
{
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("GetDataSet", "No connection to the master!");
return 0;
}
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kGetDataSet);
nameMess << TString(dataset);
if (Broadcast(nameMess) < 0)
Error("GetDataSet", "Sending request failed");
TMessage *retMess;
master->Recv(retMess);
TList *fileList = 0;
if (retMess->What() == kMESS_OK) {
if (!(fileList = (TList*)(retMess->ReadObject(TList::Class()))))
Error("GetDataSet", "Error reading list of files");
} else if (retMess->What() != kMESS_NOTOK)
Error("GetDataSet", "Wrong message type %d", retMess->What());
Collect();
delete retMess;
return fileList;
}
void TProof::ShowDataSet(const char *dataset)
{
TList *fileList;
if ((fileList = GetDataSet(dataset))) {
if (fileList->GetSize()) {
Printf("Files in %s:", dataset);
TIter next(fileList);
while (TFileInfo *obj = (TFileInfo*)next())
Printf("%s", obj->GetFirstUrl()->GetUrl());
} else
Printf("There are no files in %s", dataset);
delete fileList;
}
else
Printf("No such dataset: %s", dataset);
}
Int_t TProof::RemoveDataSet(const char *dataSet)
{
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("RemoveDataSet", "No connection to the master!");
return kError;
}
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kRemoveDataSet);
nameMess << TString(dataSet);
if (Broadcast(nameMess) < 0)
Error("RemoveDataSet", "Sending request failed");
TMessage *mess;
TString errorMess;
master->Recv(mess);
Collect();
if (mess->What() != kMESS_OK) {
if (mess->What() != kMESS_NOTOK)
Error("RemoveDataSet", "unrecongnized message type: %d!",
mess->What());
delete mess;
return -1;
} else {
delete mess;
return 0;
}
}
Int_t TProof::VerifyDataSet(const char *dataSet)
{
Int_t nMissingFiles = 0;
TSocket *master;
if (fActiveSlaves->GetSize())
master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
else {
Error("VerifyDataSet", "No connection to the master!");
return kError;
}
TMessage nameMess(kPROOF_DATASETS);
nameMess << Int_t(kVerifyDataSet);
nameMess << TString(dataSet);
if (Broadcast(nameMess) < 0)
Error("VerifyDataSet", "Sending request failed");
TMessage *mess;
master->Recv(mess);
Collect();
if (mess->What() == kMESS_OK) {
TList *missingFiles;
missingFiles = (TList*)(mess->ReadObject(TList::Class()));
nMissingFiles = missingFiles->GetSize();
if (nMissingFiles == 0)
Printf("The files from %s dataset are all present on the cluster",
dataSet);
else {
Printf("The following files are missing from dataset %s ", dataSet);
Printf("at the moment:");
TIter next(missingFiles);
TFileInfo* fileInfo;
while ((fileInfo = (TFileInfo*)next())) {
Printf("\t%s", fileInfo->GetFirstUrl()->GetUrl());
}
}
missingFiles->SetOwner();
delete missingFiles;
} else if (mess->What() == kMESS_NOTOK) {
Printf("ValidateDataSet: no such dataset %s", dataSet);
delete mess;
return -1;
} else
Fatal("ValidateDataSet", "unknown message type %d", mess->What());
delete mess;
return nMissingFiles;
}
void TProof::InterruptCurrentMonitor()
{
if (fCurrentMonitor)
fCurrentMonitor->Interrupt();
}
void TProof::ActivateWorker(const char *ord)
{
ModifyWorkerLists(ord, kTRUE);
}
void TProof::DeactivateWorker(const char *ord)
{
ModifyWorkerLists(ord, kFALSE);
}
void TProof::ModifyWorkerLists(const char *ord, Bool_t add)
{
if (!ord || strlen(ord) <= 0) {
Info("ModifyWorkerLists",
"An ordinal number - e.g. \"0.4\" or \"*\" for all - is required as input");
return;
}
Bool_t fw = kTRUE;
Bool_t rs = kFALSE;
TList *in = (add) ? fInactiveSlaves : fActiveSlaves;
TList *out = (add) ? fActiveSlaves : fInactiveSlaves;
if (IsMaster()) {
fw = IsEndMaster() ? kFALSE : kTRUE;
if (in->GetSize() > 0) {
TIter nxw(in);
TSlave *wrk = 0;
while ((wrk = (TSlave *) nxw())) {
if (ord[0] == '*' || !strncmp(wrk->GetOrdinal(), ord, strlen(ord))) {
if (!out->FindObject(wrk)) {
out->Add(wrk);
if (add)
fActiveMonitor->Add(wrk->GetSocket());
}
in->Remove(wrk);
if (!add) {
fActiveMonitor->Remove(wrk->GetSocket());
wrk->SetStatus(TSlave::kInactive);
} else
wrk->SetStatus(TSlave::kActive);
fw = kFALSE;
rs = kTRUE;
if (ord[0] != '*')
break;
}
}
}
}
if (rs)
FindUniqueSlaves();
Int_t action = (add) ? (Int_t) kActivateWorker : (Int_t) kDeactivateWorker;
if (fw) {
TMessage mess(kPROOF_WORKERLISTS);
mess << action << TString(ord);
Broadcast(mess);
Collect();
}
}
TProof *TProof::Open(const char *cluster, const char *conffile,
const char *confdir, Int_t loglevel)
{
const char *pn = "TProof::Open";
if (!cluster) {
TPluginManager *pm = gROOT->GetPluginManager();
if (!pm) {
::Error(pn, "plugin manager not found");
return 0;
}
if (gROOT->IsBatch()) {
::Error(pn, "we are in batch mode, cannot show PROOF Session Viewer");
return 0;
}
TPluginHandler *sv = pm->FindHandler("TSessionViewer", "");
if (!sv) {
::Error(pn, "no plugin found for TSessionViewer");
return 0;
}
if (sv->LoadPlugin() == -1) {
::Error(pn, "plugin for TSessionViewer could not be loaded");
return 0;
}
sv->ExecPlugin(0);
return 0;
} else {
TUrl u(cluster);
TString o(u.GetOptions());
Int_t locid = -1;
Bool_t create = kFALSE;
if (o.Length() > 0) {
if (o.BeginsWith("N",TString::kIgnoreCase)) {
create = kTRUE;
} else if (o.IsDigit()) {
locid = o.Atoi();
}
u.SetOptions("");
}
TProofMgr *mgr = TProofMgr::Create(u.GetUrl());
TProof *proof = 0;
if (mgr && mgr->IsValid()) {
Bool_t attach = (create || mgr->IsProofd()) ? kFALSE : kTRUE;
if (attach) {
TProofDesc *d = 0;
if (locid < 0)
d = (TProofDesc *) mgr->QuerySessions("")->First();
else
d = (TProofDesc *) mgr->GetProofDesc(locid);
if (d) {
proof = (TProof*) mgr->AttachSession(d->GetLocalId());
if (!proof || !proof->IsValid()) {
if (locid)
::Error(pn, "new session could not be attached");
SafeDelete(proof);
}
}
}
if (!proof) {
proof = (TProof*) mgr->CreateSession(conffile, confdir, loglevel);
if (!proof || !proof->IsValid()) {
::Error(pn, "new session could not be created");
SafeDelete(proof);
}
}
}
return proof;
}
}
TProofMgr *TProof::Mgr(const char *url)
{
if (!url)
return (TProofMgr *)0;
return TProofMgr::Create(url);
}
void TProof::Reset(const char *url)
{
if (url) {
TProofMgr *mgr = TProof::Mgr(url);
if (mgr && mgr->IsValid())
mgr->Reset();
else
::Error("TProof::Reset",
"unable to initialize a valid manager instance");
}
}
const TList *TProof::GetEnvVars()
{
return fgProofEnvList;
}
void TProof::AddEnvVar(const char *name, const char *value)
{
if (gDebug > 0) ::Info("TProof::AddEnvVar","%s=%s", name, value);
if (fgProofEnvList == 0) {
fgProofEnvList = new TList;
fgProofEnvList->SetOwner();
} else {
TObject *o = fgProofEnvList->FindObject(name);
if (o != 0) {
fgProofEnvList->Remove(o);
}
}
fgProofEnvList->Add(new TNamed(name, value));
}
void TProof::DelEnvVar(const char *name)
{
if (fgProofEnvList == 0) return;
TObject *o = fgProofEnvList->FindObject(name);
if (o != 0) {
fgProofEnvList->Remove(o);
}
}
void TProof::ResetEnvVars()
{
if (fgProofEnvList == 0) return;
SafeDelete(fgProofEnvList);
}
void TProof::SaveWorkerInfo()
{
if (!IsMaster())
return;
if (!gProofServ) {
Error("SaveWorkerInfo","gProofServ undefined");
return;
}
const_cast<TProof*>(this)->AskStatistics();
if (!fSlaves && !fBadSlaves) {
Warning("SaveWorkerInfo","all relevant worker lists is undefined");
return;
}
TString fnwrk = Form("%s/.workers",
gSystem->DirName(gProofServ->GetSessionDir()));
FILE *fwrk = fopen(fnwrk.Data(),"w");
if (!fwrk) {
Error("SaveWorkerInfo",
"cannot open %s for writing (errno: %d)", fnwrk.Data(), errno);
return;
}
TIter nxa(fSlaves);
TSlave *wrk = 0;
while ((wrk = (TSlave *) nxa())) {
Int_t status = (fBadSlaves && fBadSlaves->FindObject(wrk)) ? 0 : 1;
fprintf(fwrk,"%s@%s:%d %d %s %s.log\n",
wrk->GetUser(), wrk->GetName(), wrk->GetPort(), status,
wrk->GetOrdinal(), wrk->GetWorkDir());
}
fclose(fwrk);
return;
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.