#include "TProofDraw.h"
#include "TProofPlayer.h"
#include "THashList.h"
#include "TEnv.h"
#include "TEventIter.h"
#include "TVirtualPacketizer.h"
#include "TSelector.h"
#include "TSocket.h"
#include "TProofServ.h"
#include "TProof.h"
#include "TProofOutputFile.h"
#include "TProofSuperMaster.h"
#include "TSlave.h"
#include "TClass.h"
#include "TROOT.h"
#include "TError.h"
#include "TException.h"
#include "MessageTypes.h"
#include "TMessage.h"
#include "TDSetProxy.h"
#include "TString.h"
#include "TSystem.h"
#include "TFile.h"
#include "TFileCollection.h"
#include "TFileInfo.h"
#include "TFileMerger.h"
#include "TProofDebug.h"
#include "TTimer.h"
#include "TMap.h"
#include "TPerfStats.h"
#include "TStatus.h"
#include "TEventList.h"
#include "TProofLimitsFinder.h"
#include "THashList.h"
#include "TSortedList.h"
#include "TTree.h"
#include "TEntryList.h"
#include "TDSet.h"
#include "TDrawFeedback.h"
#include "TNamed.h"
#include "TObjString.h"
#include "TQueryResult.h"
#include "TMD5.h"
#include "TMethodCall.h"
#include "TObjArray.h"
#include "TMutex.h"
#include "TH1.h"
#include "TVirtualMonitoring.h"
#include "TParameter.h"
#include "TOutputListSelectorDataMap.h"
#include "TStopwatch.h"
#define kPEX_STOPPED 1001
#define kPEX_ABORTED 1002
static Bool_t gAbort = kFALSE;
class TAutoBinVal : public TNamed {
private:
Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
public:
TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
{
fXmin = xmin; fXmax = xmax;
fYmin = ymin; fYmax = ymax;
fZmin = zmin; fZmax = zmax;
}
void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
Double_t& ymax, Double_t& zmin, Double_t& zmax)
{
xmin = fXmin; xmax = fXmax;
ymin = fYmin; ymax = fYmax;
zmin = fZmin; zmax = fZmax;
}
};
class TDispatchTimer : public TTimer {
private:
TProofPlayer *fPlayer;
public:
TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
Bool_t Notify();
};
Bool_t TDispatchTimer::Notify()
{
if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
Reset();
return kTRUE;
}
class TProctimeTimer : public TTimer {
private:
TProofPlayer *fPlayer;
public:
TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
Bool_t Notify();
};
Bool_t TProctimeTimer::Notify()
{
if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
return kTRUE;
}
class TStopTimer : public TTimer {
private:
Bool_t fAbort;
TProofPlayer *fPlayer;
public:
TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
Bool_t Notify();
};
TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
: TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
{
if (gDebug > 0)
Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
fPlayer = p;
fAbort = abort;
if (gDebug > 1)
Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
}
Bool_t TStopTimer::Notify()
{
if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
if (fAbort)
Throw(kPEX_ABORTED);
else
Throw(kPEX_STOPPED);
return kTRUE;
}
ClassImp(TProofPlayer)
THashList *TProofPlayer::fgDrawInputPars = 0;
TProofPlayer::TProofPlayer(TProof *)
: fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
fFeedbackTimer(0), fFeedbackPeriod(2000),
fEvIter(0), fSelStatus(0),
fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
fMaxDrawQueries(1), fStopTimer(0), fStopTimerMtx(0), fDispatchTimer(0),
fProcTimeTimer(0), fProcTime(0),
fOutputFile(0),
fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
{
fInput = new TList;
fExitStatus = kFinished;
fProgressStatus = new TProofProgressStatus();
ResetBit(TProofPlayer::kDispatchOneEvent);
ResetBit(TProofPlayer::kIsProcessing);
ResetBit(TProofPlayer::kMaxProcTimeReached);
ResetBit(TProofPlayer::kMaxProcTimeExtended);
static Bool_t initLimitsFinder = kFALSE;
if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
THLimitsFinder::SetLimitsFinder(new TProofLimitsFinder);
initLimitsFinder = kTRUE;
}
}
TProofPlayer::~TProofPlayer()
{
fInput->Clear("nodelete");
SafeDelete(fInput);
SafeDelete(fSelector);
SafeDelete(fFeedbackTimer);
SafeDelete(fEvIter);
SafeDelete(fQueryResults);
SafeDelete(fDispatchTimer);
SafeDelete(fProcTimeTimer);
SafeDelete(fProcTime);
SafeDelete(fStopTimer);
}
void TProofPlayer::SetProcessing(Bool_t on)
{
if (on)
SetBit(TProofPlayer::kIsProcessing);
else
ResetBit(TProofPlayer::kIsProcessing);
}
void TProofPlayer::StopProcess(Bool_t abort, Int_t timeout)
{
if (gDebug > 0)
Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
if (fEvIter != 0)
fEvIter->StopProcess(abort);
Long_t to = 1;
if (abort == kTRUE) {
fExitStatus = kAborted;
} else {
fExitStatus = kStopped;
to = timeout;
}
if (to > 0)
SetStopTimer(kTRUE, abort, to);
}
void TProofPlayer::SetDispatchTimer(Bool_t on)
{
SafeDelete(fDispatchTimer);
ResetBit(TProofPlayer::kDispatchOneEvent);
if (on) {
fDispatchTimer = new TDispatchTimer(this);
fDispatchTimer->Start();
}
}
void TProofPlayer::SetStopTimer(Bool_t on, Bool_t abort, Int_t timeout)
{
fStopTimerMtx = (fStopTimerMtx) ? fStopTimerMtx : new TMutex(kTRUE);
R__LOCKGUARD(fStopTimerMtx);
SafeDelete(fStopTimer);
if (on) {
fStopTimer = new TStopTimer(this, abort, timeout);
fStopTimer->Start();
if (gDebug > 0)
Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
(abort ? "ABORT" : "STOP"), timeout);
} else {
if (gDebug > 0)
Info ("SetStopTimer", "timer STOPPED");
}
}
void TProofPlayer::AddQueryResult(TQueryResult *q)
{
if (!q) {
Warning("AddQueryResult","query undefined - do nothing");
return;
}
if (!(q->IsDraw())) {
if (!fQueryResults) {
fQueryResults = new TList;
fQueryResults->Add(q);
} else {
TIter nxr(fQueryResults);
TQueryResult *qr = 0;
TQueryResult *qp = 0;
while ((qr = (TQueryResult *) nxr())) {
if (*qr == *q) {
fQueryResults->Remove(qr);
delete qr;
break;
}
if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
qp = qr;
}
if (!qp) {
fQueryResults->AddFirst(q);
} else {
fQueryResults->AddAfter(qp, q);
}
}
} else if (IsClient()) {
if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
TIter nxr(fQueryResults);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxr())) {
if (qr->IsDraw()) {
fDrawQueries--;
fQueryResults->Remove(qr);
delete qr;
break;
}
}
}
if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
fDrawQueries++;
if (!fQueryResults)
fQueryResults = new TList;
fQueryResults->Add(q);
}
}
}
void TProofPlayer::RemoveQueryResult(const char *ref)
{
if (fQueryResults) {
TIter nxq(fQueryResults);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq())) {
if (qr->Matches(ref)) {
fQueryResults->Remove(qr);
delete qr;
}
}
}
}
TQueryResult *TProofPlayer::GetQueryResult(const char *ref)
{
if (fQueryResults) {
if (ref && strlen(ref) > 0) {
TIter nxq(fQueryResults);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq())) {
if (qr->Matches(ref))
return qr;
}
} else {
return (TQueryResult *) fQueryResults->Last();
}
}
return (TQueryResult *)0;
}
void TProofPlayer::SetCurrentQuery(TQueryResult *q)
{
fPreviousQuery = fQuery;
fQuery = q;
}
void TProofPlayer::AddInput(TObject *inp)
{
fInput->Add(inp);
}
void TProofPlayer::ClearInput()
{
fInput->Clear();
}
TObject *TProofPlayer::GetOutput(const char *name) const
{
if (fOutput)
return fOutput->FindObject(name);
return 0;
}
TList *TProofPlayer::GetOutputList() const
{
TList *ol = fOutput;
if (!ol && fQuery)
ol = fQuery->GetOutputList();
return ol;
}
Int_t TProofPlayer::ReinitSelector(TQueryResult *qr)
{
Int_t rc = 0;
if (!qr) {
Info("ReinitSelector", "query undefined - do nothing");
return -1;
}
TString selec = qr->GetSelecImp()->GetName();
if (selec.Length() <= 0) {
Info("ReinitSelector", "selector name undefined - do nothing");
return -1;
}
Bool_t stdselec = TSelector::IsStandardDraw(selec);
Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
TString ipathold;
if (!stdselec && !compselec) {
Bool_t expandselec = kTRUE;
TString dir, ipath;
char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
if (selc) {
TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
md5icur = TMD5::FileChecksum(selc);
md5iold = qr->GetSelecImp()->Checksum();
TString selh(selc);
Int_t dot = selh.Last('.');
if (dot != kNPOS) selh.Remove(dot);
selh += ".h";
if (!gSystem->AccessPathName(selh, kReadPermission))
md5hcur = TMD5::FileChecksum(selh);
md5hold = qr->GetSelecHdr()->Checksum();
if (md5hcur && md5hold && md5icur && md5iold)
if (*md5hcur == *md5hold && *md5icur == *md5iold)
expandselec = kFALSE;
SafeDelete(md5icur);
SafeDelete(md5hcur);
SafeDelete(md5iold);
SafeDelete(md5hold);
if (selc) delete [] selc;
}
Bool_t ok = kTRUE;
if (expandselec) {
ok = kFALSE;
TUUID u;
dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
if (!(gSystem->MakeDirectory(dir))) {
selec = Form("%s/%s",dir.Data(),selec.Data());
qr->GetSelecImp()->SaveSource(selec);
TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
qr->GetSelecHdr()->SaveSource(seleh);
ipathold = gSystem->GetIncludePath();
ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
gSystem->SetIncludePath(ipath.Data());
ok = kTRUE;
}
}
TString opt(qr->GetOptions());
Ssiz_t id = opt.Last('#');
if (id != kNPOS && id < opt.Length() - 1)
selec += opt(id + 1, opt.Length());
if (!ok) {
Info("ReinitSelector", "problems locating or exporting selector files");
return -1;
}
}
SafeDelete(fSelector);
fSelectorClass = 0;
Int_t iglevelsave = gErrorIgnoreLevel;
if (compselec)
gErrorIgnoreLevel = kBreak;
if ((fSelector = TSelector::GetSelector(selec))) {
if (compselec)
gErrorIgnoreLevel = iglevelsave;
fSelectorClass = fSelector->IsA();
fSelector->SetOption(qr->GetOptions());
} else {
if (compselec) {
gErrorIgnoreLevel = iglevelsave;
if (strlen(qr->GetLibList()) > 0) {
TString sl(qr->GetLibList());
TObjArray *oa = sl.Tokenize(" ");
if (oa) {
Bool_t retry = kFALSE;
TIter nxl(oa);
TObjString *os = 0;
while ((os = (TObjString *) nxl())) {
TString lib = gSystem->BaseName(os->GetName());
if (lib != "lib") {
lib.ReplaceAll("-l", "lib");
if (gSystem->Load(lib) == 0)
retry = kTRUE;
}
}
if (retry)
fSelector = TSelector::GetSelector(selec);
}
}
}
if (!fSelector) {
if (compselec)
Info("ReinitSelector", "compiled selector re-init failed:"
" automatic reload unsuccessful:"
" please load manually the correct library");
rc = -1;
}
}
if (fSelector) {
fSelector->SetInputList(qr->GetInputList());
if (stdselec) {
((TProofDraw *)fSelector)->DefVar();
} else {
fSelector->Begin(0);
}
}
if (ipathold.Length() > 0)
gSystem->SetIncludePath(ipathold.Data());
return rc;
}
Int_t TProofPlayer::AddOutputObject(TObject *)
{
MayNotUse("AddOutputObject");
return -1;
}
void TProofPlayer::AddOutput(TList *)
{
MayNotUse("AddOutput");
}
void TProofPlayer::StoreOutput(TList *)
{
MayNotUse("StoreOutput");
}
void TProofPlayer::StoreFeedback(TObject *, TList *)
{
MayNotUse("StoreFeedback");
}
void TProofPlayer::Progress(Long64_t , Long64_t )
{
MayNotUse("Progress");
}
void TProofPlayer::Progress(Long64_t , Long64_t ,
Long64_t ,
Float_t , Float_t ,
Float_t , Float_t )
{
MayNotUse("Progress");
}
void TProofPlayer::Progress(TProofProgressInfo * )
{
MayNotUse("Progress");
}
void TProofPlayer::Feedback(TList *)
{
MayNotUse("Feedback");
}
TDrawFeedback *TProofPlayer::CreateDrawFeedback(TProof *p)
{
return new TDrawFeedback(p);
}
void TProofPlayer::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
{
if (f)
f->SetOption(opt);
}
void TProofPlayer::DeleteDrawFeedback(TDrawFeedback *f)
{
delete f;
}
Int_t TProofPlayer::SavePartialResults(Bool_t queryend, Bool_t force)
{
Bool_t save = (force || (fSavePartialResults &&
(queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
if (!save) {
PDB(kOutput, 2)
Info("SavePartialResults", "partial result saving disabled");
return 0;
}
if (!gProofServ) {
Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
return -1;
}
if (!fOutput) {
Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
return -1;
}
PDB(kOutput, 1)
Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
queryend, force, fSavePartialResults, fSaveResultsPerPacket);
PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
packets, (packets ? packets->GetSize(): -1));
const char *oopt = "UPDATE";
TString baseName(fOutputFilePath);
if (fOutputFilePath.IsNull()) {
baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
gProofServ->GetDataDirOpts());
} else {
fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
}
Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
oopt = "RECREATE";
}
if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
(fOutputFile && fOutputFile->IsZombie())) {
Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
SafeDelete(fOutputFile);
return -1;
}
TDirectory *curdir = gDirectory;
fOutputFile->cd();
if (packets) {
TDirectory *packetsDir = fOutputFile->mkdir("packets");
if (packetsDir) packetsDir->cd();
packets->Write(0, TObject::kSingleKey | TObject::kOverwrite);
fOutputFile->cd();
}
Bool_t notempty = kFALSE;
TList torm;
TIter nxo(fOutput);
TObject *o = 0;
while ((o = nxo())) {
if (o->InheritsFrom(TProofOutputFile::Class())) continue;
if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
if (o->InheritsFrom(TOutputListSelectorDataMap::Class())) continue;
if (!strcmp(o->GetName(), "MissingFiles")) continue;
if (o->InheritsFrom("TTree")) {
TTree *t = (TTree *) o;
TDirectory *d = t->GetDirectory();
if (!d || (d && !d->InheritsFrom("TFile"))) {
t->SetDirectory(fOutputFile);
}
if (t->GetDirectory() == fOutputFile) {
if (queryend) {
o->Write(0, TObject::kOverwrite);
notempty = kTRUE;
torm.Add(o);
t->SetDirectory(0);
} else {
t->SetAutoFlush();
}
}
} else if (queryend || fSaveResultsPerPacket) {
o->Write(0, TObject::kOverwrite);
notempty = kTRUE;
if (queryend) torm.Add(o);
}
}
gDirectory = curdir;
if (notempty) {
if (!fOutput->FindObject(baseName)) {
TProofOutputFile *po = 0;
TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
if (nm && oname.BeginsWith("ds:")) {
oname.Replace(0, 3, "");
TString qtag =
TString::Format("%s_q%d", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
oname.ReplaceAll("<qtag>", qtag);
po = new TProofOutputFile(baseName, "DRO", oname.Data());
} else {
Bool_t hasddir = kFALSE;
po = new TProofOutputFile(baseName, "M");
if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
if (gProofServ->IsTopMaster()) {
if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
TString dsrv;
TProofServ::GetLocalServer(dsrv);
TProofServ::FilterLocalroot(oname, dsrv);
oname.Insert(0, dsrv);
}
} else {
if (nm) {
oname.ReplaceAll("<file>", baseName);
} else {
oname.Form("<datadir>/%s", baseName.Data());
hasddir = kTRUE;
}
}
po->SetOutputFileName(oname.Data());
if (hasddir)
po->ResetBit(TProofOutputFile::kOutputFileNameSet);
po->SetName(gSystem->BaseName(oname.Data()));
}
po->AdoptFile(fOutputFile);
fOutput->Add(po);
po->SetBit(TProofOutputFile::kSwapFile);
}
}
fOutputFile->Close();
SafeDelete(fOutputFile);
if (queryend && torm.GetSize() > 0) {
TIter nxrm(&torm);
while ((o = nxrm())) { fOutput->Remove(o); }
}
torm.SetOwner(kFALSE);
PDB(kOutput, 1)
Info("SavePartialResults", "partial results saved to file");
return 0;
}
Int_t TProofPlayer::AssertSelector(const char *selector_file)
{
if (selector_file && strlen(selector_file)) {
if (fCreateSelObj) SafeDelete(fSelector);
if (gProofServ) {
gProofServ->GetCacheLock()->Lock();
gProofServ->CopyFromCache(selector_file, 1);
}
if (!(fSelector = TSelector::GetSelector(selector_file))) {
Error("AssertSelector", "cannot load: %s", selector_file );
gProofServ->GetCacheLock()->Unlock();
return -1;
}
if (gProofServ) {
gProofServ->CopyToCache(selector_file, 1);
gProofServ->GetCacheLock()->Unlock();
}
fCreateSelObj = kTRUE;
Info("AssertSelector", "Processing via filename");
} else if (!fSelector) {
Error("AssertSelector", "no TSelector object define : cannot continue!");
return -1;
} else {
Info("AssertSelector", "Processing via TSelector object");
}
return 0;
}
void TProofPlayer::UpdateProgressInfo()
{
if (fProgressStatus) {
fProgressStatus->IncEntries(fProcessedRun);
fProgressStatus->SetBytesRead(TFile::GetFileBytesRead()-fReadBytesRun);
fProgressStatus->SetReadCalls(TFile::GetFileReadCalls()-fReadCallsRun);
fProgressStatus->SetLastUpdate();
if (gMonitoringWriter)
gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(),
fReadBytesRun, kFALSE);
fProcessedRun = 0;
}
}
Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first)
{
PDB(kGlobal,1) Info("Process","Enter");
fExitStatus = kFinished;
fOutput = 0;
TCleanup clean(this);
fSelectorClass = 0;
TString wmsg;
TRY {
if (AssertSelector(selector_file) != 0 || !fSelector) {
Error("Process", "cannot assert the selector object");
return -1;
}
fSelectorClass = fSelector->IsA();
Int_t version = fSelector->Version();
if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
fOutput = (THashList *) fSelector->GetOutputList();
if (gProofServ)
TPerfStats::Start(fInput, fOutput);
fSelStatus = new TStatus;
fOutput->Add(fSelStatus);
fSelector->SetOption(option);
fSelector->SetInputList(fInput);
fTotalEvents = nentries;
if (fTotalEvents < 0 && gProofServ &&
gProofServ->IsMaster() && !gProofServ->IsParallel()) {
dset->Validate();
dset->Reset();
TDSetElement *e = 0;
while ((e = dset->Next())) {
fTotalEvents += e->GetNum();
}
}
dset->Reset();
Int_t useTreeCache = 1;
if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
if (useTreeCache > -1 && useTreeCache < 2)
gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
}
Long64_t cacheSize = -1;
if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
TString sz = TString::Format("%lld", cacheSize);
gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
}
Int_t useParallelUnzip = 0;
if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
if (useParallelUnzip > -1 && useParallelUnzip < 2)
gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
}
Int_t dontCacheFiles = 0;
if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
if (dontCacheFiles == 1)
gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
}
fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
Int_t opt = 0;
if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
}
fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
if (memfrac > 0.) {
SysInfo_t si;
if (gSystem->GetSysInfo(&si) == 0) {
fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
Info("Process", "memory threshold for saving objects to file set to %ld kB",
fSaveMemThreshold);
} else {
Error("Process", "cannot get SysInfo_t (!)");
}
}
if (version == 0) {
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
} else {
if (IsClient()) {
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
}
if (!fSelStatus->TestBit(TStatus::kNotOk)) {
PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
fSelector->SlaveBegin(0);
}
}
} CATCH(excode) {
ResetBit(TProofPlayer::kIsProcessing);
Error("Process","exception %d caught", excode);
gProofServ->GetCacheLock()->Unlock();
return -1;
} ENDTRY;
if (SavePartialResults(kFALSE) < 0)
Warning("Process", "problems seetting up file-object swapping");
SetupFeedback();
if (gMonitoringWriter)
gMonitoringWriter->SendProcessingStatus("STARTED",kTRUE);
PDB(kLoop,1)
Info("Process","Looping over Process()");
fReadBytesRun = TFile::GetFileBytesRead();
fReadCallsRun = TFile::GetFileReadCalls();
fProcessedRun = 0;
if (gMonitoringWriter)
gMonitoringWriter->SendProcessingProgress(0,0,kTRUE);
SetDispatchTimer(kTRUE);
gAbort = kFALSE;
Long64_t entry;
fProgressStatus->Reset();
if (gProofServ) gProofServ->ResetBit(TProofServ::kHighMemory);
TRY {
Int_t mrc = -1;
Long64_t memlogfreq = -1;
if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
Long64_t singleshot = 1;
Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
TString lastMsg("(unfortunately no detailed info is available about current packet)");
if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
Error("Process", "%s", wmsg.Data());
wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ->GetOrdinal()));
fSelStatus->Add(wmsg.Data());
if (gProofServ) {
gProofServ->SendAsynMessage(wmsg.Data());
gProofServ->SetBit(TProofServ::kHighMemory);
}
fExitStatus = kStopped;
ResetBit(TProofPlayer::kIsProcessing);
} else if (!wmsg.IsNull()) {
Warning("Process", "%s", wmsg.Data());
}
TPair *currentElem = 0;
Long64_t fst = -1, num;
TEntryList *enl = 0;
TEventList *evl = 0;
Long_t maxproctime = -1;
Bool_t newrun = kFALSE;
while ((fEvIter->GetNextPacket(fst, num, &enl, &evl) != -1) &&
!fSelStatus->TestBit(TStatus::kNotOk) &&
fSelector->GetAbort() == TSelector::kContinue) {
SetBit(TProofPlayer::kIsProcessing);
if (dset->Current()) {
if (!currentElem) {
currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
fInput->Add(currentElem);
} else {
if (currentElem->Value() != dset->Current()) {
currentElem->SetValue(dset->Current());
} else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
dset->Current()->ResetBit(TDSetElement::kNewRun);
}
}
if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
if (dset->TestBit(TDSet::kEmpty)) {
lastMsg = "check logs for possible stacktrace - last cycle:";
} else {
TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
TString fn = (elem) ? elem->GetFileName() : "<undef>";
lastMsg.Form("while processing dset:'%s', file:'%s'"
" - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
}
TProofServ::SetLastMsg(lastMsg);
}
if (dset->Current()->GetMaxProcTime() >= 0.)
maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
}
ResetBit(TProofPlayer::kMaxProcTimeReached);
ResetBit(TProofPlayer::kMaxProcTimeExtended);
if (maxproctime > 0) {
if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
fProcTimeTimer->Start(maxproctime, kTRUE);
if (!fProcTime) fProcTime = new TStopwatch();
fProcTime->Reset();
}
Long64_t refnum = num;
if (refnum < 0 && maxproctime <= 0) {
wmsg.Form("neither entries nor max proc time specified:"
" risk of infinite loop: processing aborted");
Error("Process", "%s", wmsg.Data());
if (gProofServ) {
wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
gProofServ->GetOrdinal(), fProcessedRun));
gProofServ->SendAsynMessage(wmsg.Data());
}
fExitStatus = kAborted;
ResetBit(TProofPlayer::kIsProcessing);
break;
}
while (refnum < 0 || num--) {
if (TestBit(TProofPlayer::kMaxProcTimeReached)) {
fProcTime->Stop();
if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
if (xleft < 0.2) {
Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
SetBit(TProofPlayer::kMaxProcTimeExtended);
fProcTimeTimer->Start(mpt, kTRUE);
ResetBit(TProofPlayer::kMaxProcTimeReached);
}
}
if (TestBit(TProofPlayer::kMaxProcTimeReached)) {
Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
maxproctime, lastMsg.Data());
break;
}
}
if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
fSelector->GetAbort() == TSelector::kContinue)) break;
if (fEvIter->TestBit(TEventIter::kData)) {
if (enl){
entry = enl->GetEntry(fst);
} else if (evl) {
entry = evl->GetEntry(fst);
} else {
entry = fst;
}
fst++;
} else {
entry = fst++;
}
fEvIter->PreProcessEvent(entry);
TProofServ::SetLastEntry(entry);
if (fSelector->Version() == 0) {
PDB(kLoop,3)
Info("Process","Call ProcessCut(%lld)", entry);
if (fSelector->ProcessCut(entry)) {
PDB(kLoop,3)
Info("Process","Call ProcessFill(%lld)", entry);
fSelector->ProcessFill(entry);
}
} else {
PDB(kLoop,3)
Info("Process","Call Process(%lld)", entry);
fSelector->Process(entry);
if (fSelector->GetAbort() == TSelector::kAbortProcess) {
ResetBit(TProofPlayer::kIsProcessing);
break;
} else if (fSelector->GetAbort() == TSelector::kAbortFile) {
Info("Process", "packet processing aborted following the"
" selector settings:\n%s", lastMsg.Data());
fEvIter->InvalidatePacket();
fProgressStatus->SetBit(TProofProgressStatus::kFileCorrupted);
}
}
if (!fSelStatus->TestBit(TStatus::kNotOk)) fProcessedRun++;
if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
Error("Process", "%s", wmsg.Data());
if (gProofServ) {
wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
gProofServ->GetOrdinal(), entry));
gProofServ->SendAsynMessage(wmsg.Data());
}
fExitStatus = kStopped;
ResetBit(TProofPlayer::kIsProcessing);
if (gProofServ) gProofServ->SetBit(TProofServ::kHighMemory);
break;
} else {
if (!wmsg.IsNull()) {
Warning("Process", "%s", wmsg.Data());
if (gProofServ) {
wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
gProofServ->GetOrdinal(), entry));
gProofServ->SendAsynMessage(wmsg.Data());
}
}
}
}
if (TestBit(TProofPlayer::kDispatchOneEvent)) {
gSystem->DispatchOneEvent(kTRUE);
ResetBit(TProofPlayer::kDispatchOneEvent);
}
ResetBit(TProofPlayer::kIsProcessing);
if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
if (fSelector->GetAbort() == TSelector::kAbortFile)
fSelector->Abort("status reset", TSelector::kContinue);
}
}
} CATCH(excode) {
if (excode == kPEX_STOPPED) {
Info("Process","received stop-process signal");
fExitStatus = kStopped;
} else if (excode == kPEX_ABORTED) {
gAbort = kTRUE;
Info("Process","received abort-process signal");
fExitStatus = kAborted;
} else {
Error("Process","exception %d caught", excode);
gAbort = kTRUE;
fExitStatus = kAborted;
}
ResetBit(TProofPlayer::kIsProcessing);
} ENDTRY;
TPair *currentElem = 0;
if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
delete currentElem->Key();
delete currentElem;
}
}
Long64_t singleshot = 1;
Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
PDB(kGlobal,2)
Info("Process","%lld events processed", fProgressStatus->GetEntries());
if (gMonitoringWriter) {
gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(),
TFile::GetFileBytesRead()-fReadBytesRun, kFALSE);
gMonitoringWriter->SendProcessingStatus("DONE");
}
SetDispatchTimer(kFALSE);
if (fStopTimer != 0)
SetStopTimer(kFALSE, gAbort);
if (fFeedbackTimer != 0)
HandleTimer(0);
StopFeedback();
if (SavePartialResults(kTRUE) < 0)
Warning("Process", "problems saving the results to file");
SafeDelete(fEvIter);
if (fExitStatus != kAborted) {
TIter nxo(GetOutputList());
TObject *o = 0;
while ((o = nxo())) {
if (o->IsA() == TProofOutputFile::Class()) {
TProofOutputFile *of = (TProofOutputFile *)o;
of->Print();
of->SetWorkerOrdinal(gProofServ->GetOrdinal());
const char *dir = of->GetDir();
if (!dir || (dir && strlen(dir) <= 0)) {
of->SetDir(gProofServ->GetSessionDir());
} else if (dir && strlen(dir) > 0) {
TUrl u(dir);
if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
!strcmp(u.GetHost(), "localhost.localdomain")) {
u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
of->SetDir(u.GetUrl(kTRUE));
}
of->Print();
}
}
}
MapOutputListToDataMembers();
if (!fSelStatus->TestBit(TStatus::kNotOk)) {
if (fSelector->Version() == 0) {
PDB(kLoop,1) Info("Process","Call Terminate()");
fSelector->Terminate();
} else {
PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
fSelector->SlaveTerminate();
if (IsClient() && !fSelStatus->TestBit(TStatus::kNotOk)) {
PDB(kLoop,1) Info("Process","Call Terminate()");
fSelector->Terminate();
}
}
}
fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
if (gProofServ && !gProofServ->IsParallel()) {
TIter nxc(gROOT->GetListOfCanvases());
while (TObject *c = nxc())
fOutput->Add(c);
}
}
if (gProofServ)
TPerfStats::Stop();
return 0;
}
Long64_t TProofPlayer::Process(TDSet *dset, TSelector *selector,
Option_t *option, Long64_t nentries,
Long64_t first)
{
if (!selector) {
Error("Process", "selector object undefiend!");
return -1;
}
if (fCreateSelObj) SafeDelete(fSelector);
fSelector = selector;
fCreateSelObj = kFALSE;
return Process(dset, (const char *)0, option, nentries, first);
}
Bool_t TProofPlayer::JoinProcess(TList *)
{
return kFALSE;
}
Bool_t TProofPlayer::CheckMemUsage(Long64_t &mfreq, Bool_t &w80r,
Bool_t &w80v, TString &wmsg)
{
Long64_t processed = GetEventsProcessed() + fProcessedRun;
if (mfreq > 0 && processed%mfreq == 0) {
ProcInfo_t pi;
if (!gSystem->GetProcInfo(&pi)){
wmsg = "";
if (gProofServ)
Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
pi.fMemVirtual, pi.fMemResident, processed);
fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
if (TProofServ::GetVirtMemMax() > 0) {
if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
" - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
return kFALSE;
} else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
mfreq = 1;
wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
(Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
w80v = kFALSE;
}
}
if (TProofServ::GetResMemMax() > 0) {
if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
" - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
return kFALSE;
} else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
mfreq = 1;
if (wmsg.Length() > 0) {
wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
(Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
} else {
wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
(Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
}
w80r = kFALSE;
}
}
if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
}
}
return kTRUE;
}
Long64_t TProofPlayer::Finalize(Bool_t, Bool_t)
{
MayNotUse("Finalize");
return -1;
}
Long64_t TProofPlayer::Finalize(TQueryResult *)
{
MayNotUse("Finalize");
return -1;
}
void TProofPlayer::MergeOutput(Bool_t)
{
MayNotUse("MergeOutput");
return;
}
void TProofPlayer::MapOutputListToDataMembers() const
{
TOutputListSelectorDataMap* olsdm = new TOutputListSelectorDataMap(fSelector);
fOutput->Add(olsdm);
}
void TProofPlayer::UpdateAutoBin(const char *name,
Double_t& xmin, Double_t& xmax,
Double_t& ymin, Double_t& ymax,
Double_t& zmin, Double_t& zmax)
{
if ( fAutoBins == 0 ) {
fAutoBins = new THashList;
}
TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
if ( val == 0 ) {
if (gProofServ && !gProofServ->IsTopMaster()) {
TString key = name;
TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
}
val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
fAutoBins->Add(val);
} else {
val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
}
}
TDSetElement *TProofPlayer::GetNextPacket(TSlave *, TMessage *)
{
MayNotUse("GetNextPacket");
return 0;
}
void TProofPlayer::SetupFeedback()
{
MayNotUse("SetupFeedback");
}
void TProofPlayer::StopFeedback()
{
MayNotUse("StopFeedback");
}
Long64_t TProofPlayer::DrawSelect(TDSet * , const char * ,
const char * , Option_t * ,
Long64_t , Long64_t )
{
MayNotUse("DrawSelect");
return -1;
}
void TProofPlayer::HandleGetTreeHeader(TMessage *)
{
MayNotUse("HandleGetTreeHeader|");
}
void TProofPlayer::HandleRecvHisto(TMessage *mess)
{
TObject *obj = mess->ReadObject(mess->GetClass());
if (obj->InheritsFrom(TH1::Class())) {
TH1 *h = (TH1*)obj;
h->SetDirectory(0);
TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
if (horg)
horg->Add(h);
else
h->SetDirectory(gDirectory);
}
}
Int_t TProofPlayer::DrawCanvas(TObject *obj)
{
static Int_t (*gDrawCanvasHook)(TObject *) = 0;
if (!gDrawCanvasHook) {
TString drawlib = "libProofDraw";
char *p = 0;
if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
delete[] p;
if (gSystem->Load(drawlib) != -1) {
Func_t f = 0;
if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
gDrawCanvasHook = (Int_t (*)(TObject *))(f);
else
Warning("DrawCanvas", "can't find DrawCanvas");
} else
Warning("DrawCanvas", "can't load %s", drawlib.Data());
} else
Warning("DrawCanvas", "can't locate %s", drawlib.Data());
}
if (gDrawCanvasHook && obj)
return (*gDrawCanvasHook)(obj);
return 1;
}
Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
TString &selector, TString &objname)
{
static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
TString &, TString &) = 0;
if (!gGetDrawArgsHook) {
TString drawlib = "libProofDraw";
char *p = 0;
if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
delete[] p;
if (gSystem->Load(drawlib) != -1) {
Func_t f = 0;
if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
TString &, TString &))(f);
else
Warning("GetDrawArgs", "can't find GetDrawArgs");
} else
Warning("GetDrawArgs", "can't load %s", drawlib.Data());
} else
Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
}
if (gGetDrawArgsHook)
return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
return 1;
}
void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
{
static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
if (!gFeedBackCanvasHook) {
TString drawlib = "libProofDraw";
char *p = 0;
if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
delete[] p;
if (gSystem->Load(drawlib) != -1) {
Func_t f = 0;
if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
else
Warning("FeedBackCanvas", "can't find FeedBackCanvas");
} else
Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
} else
Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
}
if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
return;
}
Long64_t TProofPlayer::GetCacheSize()
{
if (fEvIter) return fEvIter->GetCacheSize();
return -1;
}
Int_t TProofPlayer::GetLearnEntries()
{
if (fEvIter) return fEvIter->GetLearnEntries();
return -1;
}
void TProofPlayerRemote::SetMerging(Bool_t on)
{
if (on) {
if (!fMergeSTW) fMergeSTW = new TStopwatch();
PDB(kGlobal,1)
Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
if (fNumMergers <= 0 && fProof->fMergersCount > 0)
fNumMergers = fProof->fMergersCount;
} else if (fMergeSTW) {
fMergeSTW->Stop();
Float_t rt = fMergeSTW->RealTime();
PDB(kGlobal,1)
Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
if (fQuery) {
if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
fQuery->SetMergeTime(rt);
fQuery->SetNumMergers(fNumMergers);
} else {
fQuery->SetRecvTime(rt);
}
PDB(kGlobal,2) fQuery->Print("F");
}
}
}
ClassImp(TProofPlayerLocal)
Long64_t TProofPlayerLocal::Process(TSelector *selector,
Long64_t nentries, Option_t *option)
{
if (!selector) {
Error("Process", "selector object undefiend!");
return -1;
}
TDSetProxy *set = new TDSetProxy("", "", "");
set->SetBit(TDSet::kEmpty);
set->SetBit(TDSet::kIsLocal);
Long64_t rc = Process(set, selector, option, nentries);
SafeDelete(set);
return rc;
}
Long64_t TProofPlayerLocal::Process(const char *selector,
Long64_t nentries, Option_t *option)
{
TDSetProxy *set = new TDSetProxy("", "", "");
set->SetBit(TDSet::kEmpty);
set->SetBit(TDSet::kIsLocal);
Long64_t rc = Process(set, selector, option, nentries);
SafeDelete(set);
return rc;
}
ClassImp(TProofPlayerRemote)
TProofPlayerRemote::~TProofPlayerRemote()
{
SafeDelete(fOutput);
SafeDelete(fOutputLists);
SafeDelete(fFeedbackLists);
SafeDelete(fPacketizer);
if (fProcessMessage)
SafeDelete(fProcessMessage);
}
Int_t TProofPlayerRemote::InitPacketizer(TDSet *dset, Long64_t nentries,
Long64_t first, const char *defpackunit,
const char *defpackdata)
{
SafeDelete(fPacketizer);
PDB(kGlobal,1) Info("Process","Enter");
fDSet = dset;
fExitStatus = kFinished;
Int_t honebyone = 1;
if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
TString packetizer;
TList *listOfMissingFiles = 0;
TMethodCall callEnv;
TClass *cl;
noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
if (noData) {
if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
packetizer = defpackunit;
else
Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
cl = TClass::GetClass(packetizer);
if (cl == 0) {
Error("InitPacketizer", "class '%s' not found", packetizer.Data());
fExitStatus = kAborted;
return -1;
}
callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
if (!callEnv.IsValid()) {
Error("InitPacketizer",
"cannot find correct constructor for '%s'", cl->GetName());
fExitStatus = kAborted;
return -1;
}
callEnv.ResetParam();
callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
callEnv.SetParam((Long64_t) nentries);
callEnv.SetParam((Long_t) fInput);
callEnv.SetParam((Long_t) fProgressStatus);
} else if (dset->TestBit(TDSet::kMultiDSet)) {
if (fProof->GetRunStatus() != TProof::kRunning) {
Error("InitPacketizer", "received stop/abort request");
fExitStatus = kAborted;
return -1;
}
packetizer = "TPacketizerMulti";
cl = TClass::GetClass(packetizer);
if (cl == 0) {
Error("InitPacketizer", "class '%s' not found", packetizer.Data());
fExitStatus = kAborted;
return -1;
}
callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
if (!callEnv.IsValid()) {
Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
fExitStatus = kAborted;
return -1;
}
callEnv.ResetParam();
callEnv.SetParam((Long_t) dset);
callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
callEnv.SetParam((Long64_t) first);
callEnv.SetParam((Long64_t) nentries);
callEnv.SetParam((Long_t) fInput);
callEnv.SetParam((Long_t) fProgressStatus);
dset->SetBit(TDSet::kValidityChecked);
dset->ResetBit(TDSet::kSomeInvalid);
} else {
if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
fInput->Remove(listOfMissingFiles);
} else {
listOfMissingFiles = new TList;
}
TString lkopt;
if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
dset->Lookup(kTRUE, &listOfMissingFiles);
if (fProof->GetRunStatus() != TProof::kRunning) {
Error("InitPacketizer", "received stop/abort request");
fExitStatus = kAborted;
return -1;
}
if (!(dset->GetListOfElements()) ||
!(dset->GetListOfElements()->GetSize())) {
if (gProofServ)
gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
Error("InitPacketizer", "No files from the data set were found - Aborting");
fExitStatus = kAborted;
if (listOfMissingFiles) {
listOfMissingFiles->SetOwner();
fOutput->Remove(listOfMissingFiles);
SafeDelete(listOfMissingFiles);
}
return -1;
}
if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
packetizer = defpackdata;
else
Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
cl = TClass::GetClass(packetizer);
if (cl == 0) {
Error("InitPacketizer", "class '%s' not found", packetizer.Data());
fExitStatus = kAborted;
return -1;
}
callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
if (!callEnv.IsValid()) {
Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
fExitStatus = kAborted;
return -1;
}
callEnv.ResetParam();
callEnv.SetParam((Long_t) dset);
callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
callEnv.SetParam((Long64_t) first);
callEnv.SetParam((Long64_t) nentries);
callEnv.SetParam((Long_t) fInput);
callEnv.SetParam((Long_t) fProgressStatus);
dset->SetBit(TDSet::kValidityChecked);
dset->ResetBit(TDSet::kSomeInvalid);
}
Long_t ret = 0;
callEnv.Execute(ret);
if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
fExitStatus = kAborted;
return -1;
}
if (!fPacketizer->IsValid()) {
Error("InitPacketizer",
"instantiated packetizer object '%s' is invalid", cl->GetName());
fExitStatus = kAborted;
SafeDelete(fPacketizer);
return -1;
}
if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
fInput->Remove(listOfMissingFiles);
}
}
if (!noData) {
TDSetElement *elem = 0;
if (dset->TestBit(TDSet::kSomeInvalid)) {
TIter nxe(dset->GetListOfElements());
while ((elem = (TDSetElement *)nxe())) {
if (!elem->GetValid()) {
if (!listOfMissingFiles)
listOfMissingFiles = new TList;
listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
dset->Remove(elem, kFALSE);
}
}
dset->ResetBit(TDSet::kSomeInvalid);
}
if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
TIter missingFiles(listOfMissingFiles);
TString msg;
if (gDebug > 0) {
TFileInfo *fi = 0;
while ((fi = (TFileInfo *) missingFiles.Next())) {
if (fi->GetCurrentUrl()) {
msg = Form("File not found: %s - skipping!",
fi->GetCurrentUrl()->GetUrl());
} else {
msg = Form("File not found: %s - skipping!", fi->GetName());
}
if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
}
}
if (!GetOutput("MissingFiles")) {
listOfMissingFiles->SetName("MissingFiles");
AddOutputObject(listOfMissingFiles);
}
TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
Int_t ngood = dset->GetListOfElements()->GetSize();
Int_t nbad = listOfMissingFiles->GetSize();
Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
" the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
tmpStatus->Add(msg.Data());
msg = Form(" +++\n"
" +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
" the 'MissingFiles' list\n"
" +++", xb * 100., '%', nbad, nbad + ngood);
if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
} else {
SafeDelete(listOfMissingFiles);
}
}
return 0;
}
Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first)
{
PDB(kGlobal,1) Info("Process", "Enter");
fDSet = dset;
fExitStatus = kFinished;
if (!fProgressStatus) {
Error("Process", "No progress status");
return -1;
}
fProgressStatus->Reset();
if (!fOutput)
fOutput = new THashList;
else
fOutput->Clear();
SafeDelete(fFeedbackLists);
if (fProof->IsMaster()){
TPerfStats::Start(fInput, fOutput);
} else {
TPerfStats::Setup(fInput);
}
TStopwatch elapsed;
TString fn;
fSelectorFileName = selector_file;
if (fCreateSelObj) {
if(!SendSelector(selector_file)) return -1;
fn = gSystem->BaseName(selector_file);
} else {
fn = selector_file;
}
TMessage mesg(kPROOF_PROCESS);
Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
TList *inputtmp = 0;
TDSet *set = dset;
if (fProof->IsMaster()) {
PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
dset->GetDirectory() );
if (dset->TestBit(TDSet::kEmpty))
set->SetBit(TDSet::kEmpty);
if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
Error("Process", "cannot init the packetizer");
fExitStatus = kAborted;
return -1;
}
first = 0;
Int_t mrc = -1;
Long64_t memlogfreq = -1, mlf;
if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
}
if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
if (memlogfreq == 0) {
memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
if (memlogfreq <= 0) memlogfreq = 1;
}
if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
TString emsg;
if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
Warning("Process", "could not forward input data: %s", emsg.Data());
if (fInput->FindObject("PROOF_StatsHist") != 0) {
if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
} else {
PDB(kLoop,1)
Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
" packets being processed");
}
}
} else {
if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
if (smg >= 0) {
fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
if (gEnv->Lookup("Proof.MergersByHost")) {
Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
if (mbh != 0) {
TObject *o = 0;
if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
}
}
}
}
if (fOutputLists) {
fOutputLists->Delete();
delete fOutputLists;
fOutputLists = 0;
}
if (!sync) {
gSystem->RedirectOutput(fProof->fLogFileName);
Printf(" ");
Info("Process","starting new query");
}
if (fCreateSelObj) {
SafeDelete(fSelector);
if (!(fSelector = TSelector::GetSelector(selector_file))) {
if (!sync)
gSystem->RedirectOutput(0);
return -1;
}
}
fSelectorClass = 0;
fSelectorClass = fSelector->IsA();
if (!fCreateSelObj) {
if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
TIter nxi(fSelector->GetInputList());
TObject *o = 0;
while ((o = nxi())) {
if (!fInput->FindObject(o)) {
fInput->Add(o);
if (!inputtmp) {
inputtmp = new TList;
inputtmp->SetOwner(kFALSE);
}
inputtmp->Add(o);
}
}
}
fInput->Add(fSelector);
}
fSelector->SetInputList(fInput);
fSelector->SetOption(option);
if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
if (!fCreateSelObj) fSelector->SetInputList(0);
fProof->SendInputDataFile();
if (!sync)
gSystem->RedirectOutput(0);
}
TCleanup clean(this);
SetupFeedback();
TString opt = option;
if (fProof->fProtocol < 13)
dset->SetWriteV3(kTRUE);
Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
: (TEntryList *)0;
TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
: (TEventList *)0;
if (fProof->fProtocol > 14) {
if (fProcessMessage) delete fProcessMessage;
fProcessMessage = new TMessage(kPROOF_PROCESS);
mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
(*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
} else {
mesg << set << fn << fInput << opt << num << fst << evl << sync;
if (enl)
Warning("Process","entry lists not supported by the server");
}
fProof->ResetMergePrg();
Int_t nb = fProof->Broadcast(mesg);
PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
if (fProof->IsLite()) fProof->fNotIdle += nb;
if (fProof->fProtocol < 13)
dset->SetWriteV3(kFALSE);
if (IsClient())
fProof->fRedirLog = kTRUE;
if (!IsClient()){
Info("Process|Svc", "Start merging Memory information");
}
if (!sync) {
if (IsClient()) {
PDB(kGlobal,1) Info("Process","Asynchronous processing:"
" activating CollectInputFrom");
fProof->Activate();
fProof->Collect();
return fProof->fSeqNum;
} else {
PDB(kGlobal,1) Info("Process","Calling Collect");
fProof->Collect();
HandleTimer(0);
if (fPacketizer) {
fPacketizer->StopProcess(kFALSE, kTRUE);
fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
elapsed.Stop();
if (fQuery)
fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
fPacketizer->GetInitTime(),
elapsed.RealTime());
}
StopFeedback();
return Finalize(kFALSE,sync);
}
} else {
PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
fProof->Collect();
if (!(fProof->IsSync())) {
Info("Process", "switching to the asynchronous mode ...");
return fProof->fSeqNum;
}
if (IsClient())
fProof->fRedirLog = kFALSE;
if (!IsClient()) {
HandleTimer(0);
if (fPacketizer) {
fPacketizer->StopProcess(kFALSE, kTRUE);
fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
if (fQuery)
fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
fPacketizer->GetInitTime(),
fPacketizer->GetProcTime());
}
} else {
if (!fCreateSelObj) fSelector->SetInputList(fInput);
}
StopFeedback();
Long64_t rc = -1;
if (!IsClient() || GetExitStatus() != TProofPlayer::kAborted)
rc = Finalize(kFALSE,sync);
if (inputtmp) {
TIter nxi(inputtmp);
TObject *o = 0;
while ((o = nxi())) fInput->Remove(o);
SafeDelete(inputtmp);
}
return rc;
}
}
Long64_t TProofPlayerRemote::Process(TDSet *dset, TSelector *selector,
Option_t *option, Long64_t nentries,
Long64_t first)
{
if (!selector) {
Error("Process", "selector object undefined");
return -1;
}
if (IsClient() && (selector != fSelector)) {
if (fCreateSelObj) SafeDelete(fSelector);
fSelector = selector;
}
fCreateSelObj = kFALSE;
Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
fCreateSelObj = kTRUE;
return rc;
}
Bool_t TProofPlayerRemote::JoinProcess(TList *workers)
{
if (!fProcessMessage || !fProof || !fPacketizer) {
Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
fProcessMessage, fProof, fPacketizer);
return kFALSE;
}
if (!workers || !fProof->IsMaster()) {
Error("Process", "Invalid call");
return kFALSE;
}
PDB(kGlobal, 1)
Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
if (fCreateSelObj) {
PDB(kGlobal, 2)
Info("Process", "Sending selector file %s", fSelectorFileName.Data());
if(!SendSelector(fSelectorFileName.Data())) {
Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
return kFALSE;
}
}
if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
PDB(kGlobal, 2)
Info("Process", "Adding new workers to the packetizer");
if (fPacketizer->AddWorkers(workers) == -1) {
Error("Process", "Cannot add new workers to the packetizer!");
return kFALSE;
}
PDB(kGlobal, 2)
Info("Process", "Broadcasting process message to new workers");
fProof->Broadcast(*fProcessMessage, workers);
return kTRUE;
}
Bool_t TProofPlayerRemote::MergeOutputFiles()
{
PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
PDB(kOutput,2) fOutput->ls();
TList *rmList = 0;
if (fMergeFiles) {
TIter nxo(fOutput);
TObject *o = 0;
TProofOutputFile *pf = 0;
while ((o = nxo())) {
if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
PDB(kOutput,2) pf->Print();
if (pf->IsMerge()) {
Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
TFileMerger *filemerger = pf->GetFileMerger(localMerge);
if (!filemerger) {
Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
pf->Print();
continue;
}
if (!pf->IsMerged()) {
PDB(kOutput,2) pf->Print();
TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
filemerger->AddFile(fileLoc);
}
TString ddir, ddopts;
if (gProofServ) {
ddir.Form("%s/", gProofServ->GetDataDir());
if (gProofServ->GetDataDirOpts()) ddopts= gProofServ->GetDataDirOpts();
}
TString outfile(pf->GetOutputFileName());
if (outfile.Contains("<datadir>/")) {
outfile.ReplaceAll("<datadir>/", ddir.Data());
if (!ddopts.IsNull())
outfile += TString::Format("?%s", ddopts.Data());
pf->SetOutputFileName(outfile);
}
if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
TFile::EFileType ftyp = TFile::kLocal;
TString srv;
TProofServ::GetLocalServer(srv);
TUrl usrv(srv);
Bool_t localFile = kFALSE;
if (pf->IsRetrieve()) {
if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
outfile.Form("%s%s", ddir.Data(), bn.Data());
if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
localFile = kTRUE;
} else {
if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
TUrl uof(outfile.Data(), kTRUE);
TString lfn;
ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
if (ftyp == TFile::kLocal && !srv.IsNull()) {
if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
}
if (ftyp == TFile::kLocal) outfile = lfn;
if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
}
TString outfilerem(outfile);
if (localFile) {
TProofServ::FilterLocalroot(outfilerem, srv);
outfilerem.Insert(0, srv);
}
pf->SetOutputFileName(outfilerem);
pf->SetFileName(gSystem->BaseName(outfilerem));
}
if (!filemerger->OutputFile(outfile)) {
Error("MergeOutputFiles", "cannot open the output file");
continue;
}
PDB(kSubmerger,2) filemerger->PrintFiles("");
if (!filemerger->Merge()) {
Error("MergeOutputFiles", "cannot merge the output files");
continue;
}
TList *fileList = filemerger->GetMergeList();
if (fileList) {
TIter next(fileList);
TObjString *url = 0;
while((url = (TObjString*)next())) {
TUrl u(url->GetName());
if (!strcmp(u.GetProtocol(), "file")) {
gSystem->Unlink(u.GetFile());
} else {
gSystem->Unlink(url->GetName());
}
}
}
filemerger->Reset();
} else {
if (!pf->IsMerged()) {
TList dumlist;
dumlist.Add(new TNamed("dum", "dum"));
dumlist.SetOwner(kTRUE);
pf->Merge(&dumlist);
}
TFileCollection *fc = pf->GetFileCollection();
if (!fc) {
Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
pf->Print();
continue;
}
fOutput->Add(fc);
pf->ResetFileCollection();
if (pf->IsRegister()) {
TString opt;
if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
TString tag = TString::Format("DATASET_%s", pf->GetTitle());
fOutput->Add(new TNamed(tag, opt));
}
fOutput->Remove(pf);
if (!rmList) rmList = new TList;
rmList->Add(pf);
PDB(kOutput,2) fOutput->Print();
}
}
}
}
if (rmList && rmList->GetSize() > 0) {
TIter nxo(rmList);
TObject *o = 0;
while((o = nxo())) {
fOutput->Remove(o);
}
rmList->SetOwner(kTRUE);
delete rmList;
}
PDB(kOutput,1) Info("MergeOutputFiles", "done!");
return kTRUE;
}
void TProofPlayerRemote::SetSelectorDataMembersFromOutputList()
{
TOutputListSelectorDataMap* olsdm
= TOutputListSelectorDataMap::FindInList(fOutput);
if (!olsdm) {
PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
"failed to find map object in output list!");
return;
}
olsdm->SetDataMembers(fSelector);
}
Long64_t TProofPlayerRemote::Finalize(Bool_t force, Bool_t sync)
{
if (IsClient()) {
if (fOutputLists == 0) {
if (force)
if (fQuery)
return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
fQuery->GetName()), force);
} else {
PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
MergeOutput();
}
}
Long64_t rv = 0;
if (fProof->IsMaster()) {
TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
if (!status) {
status = new TStatus();
fOutput->Add(status);
TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
status->Add(emsg);
}
status->SetExitStatus((Int_t) GetExitStatus());
PDB(kOutput,1) Info("Finalize","Calling Merge Output");
MergeOutput();
fOutput->SetOwner();
if (fPacketizer) {
TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
if (pperf) fOutput->Add(pperf);
TList *parms = fPacketizer->GetConfigParams(kTRUE);
if (parms) {
TIter nxo(parms);
TObject *o = 0;
while ((o = nxo())) fOutput->Add(o);
}
TDSetElement *elem = 0;
if (fPacketizer->GetFailedPackets()) {
TString type = (fPacketizer->TestBit(TVirtualPacketizer::kIsTree)) ? "TTree" : "";
TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
if (!listOfMissingFiles) {
listOfMissingFiles = new TList;
listOfMissingFiles->SetName("MissingFiles");
}
TIter nxe(fPacketizer->GetFailedPackets());
while ((elem = (TDSetElement *)nxe()))
listOfMissingFiles->Add(elem->GetFileInfo(type));
if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
}
}
TPerfStats::Stop();
Long_t vmaxmst, rmaxmst;
TPerfStats::GetMemValues(vmaxmst, rmaxmst);
status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
SafeDelete(fSelector);
} else {
if (fExitStatus != kAborted) {
if (!sync) {
if (ReinitSelector(fQuery) == -1) {
Info("Finalize", "problems reinitializing selector \"%s\"",
fQuery->GetSelecImp()->GetName());
return -1;
}
}
if (fPacketizer)
if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
fPacketizer->SetFailedPackets(0);
failedPackets->SetName("FailedPackets");
AddOutputObject(failedPackets);
TStatus *status = (TStatus *)GetOutput("PROOF_Status");
if (!status) AddOutputObject((status = new TStatus()));
status->Add("Some packets were not processed! Check the the"
" 'FailedPackets' list in the output list");
}
fSelector->SetInputList(fInput);
TList *output = fSelector->GetOutputList();
if (output) {
TIter next(fOutput);
while(TObject* obj = next()) {
if (fProof->IsParallel() || DrawCanvas(obj) == 1)
output->Add(obj);
}
} else {
Warning("Finalize", "undefined output list in the selector! Protocol error?");
}
fOutput->SetOwner(kFALSE);
fOutput->Clear("nodelete");
SetSelectorDataMembersFromOutputList();
PDB(kLoop,1) Info("Finalize","Call Terminate()");
SetMerging(kFALSE);
fProof->fQuerySTW.Reset();
fSelector->Terminate();
rv = fSelector->GetStatus();
TIter it(output);
while(TObject* o = it()) {
fOutput->Add(o);
}
if (fQuery) {
fQuery->SetOutputList(fOutput);
fQuery->SetFinalized();
} else {
Warning("Finalize","current TQueryResult object is undefined!");
}
if (!fCreateSelObj) {
fInput->Remove(fSelector);
fOutput->Remove(fSelector);
if (output) output->Remove(fSelector);
fSelector = 0;
}
if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
if (fCreateSelObj) SafeDelete(fSelector);
fOutput->SetOwner(kFALSE);
fOutput->Clear("nodelete");
SafeDelete(fOutput);
} else {
fOutput->SetOwner();
SafeDelete(fSelector);
if (!fCreateSelObj) fSelector = 0;
}
}
PDB(kGlobal,1) Info("Process","exit");
if (!IsClient()) {
Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
}
fProof->FinalizationDone();
return rv;
}
Long64_t TProofPlayerRemote::Finalize(TQueryResult *qr)
{
PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
if (!IsClient()) {
Info("Finalize(TQueryResult *)",
"method to be executed only on the clients");
return -1;
}
if (!qr) {
Info("Finalize(TQueryResult *)", "query undefined");
return -1;
}
if (qr->IsFinalized()) {
Info("Finalize(TQueryResult *)", "query already finalized");
return -1;
}
if (!fOutput)
fOutput = new THashList;
else
fOutput->Clear();
if (fOutputLists) {
fOutputLists->Delete();
delete fOutputLists;
fOutputLists = 0;
}
gSystem->RedirectOutput(fProof->fLogFileName);
TList *tmp = (TList *) qr->GetOutputList();
if (!tmp) {
gSystem->RedirectOutput(0);
Info("Finalize(TQueryResult *)", "outputlist is empty");
return -1;
}
TList *out = fOutput;
if (fProof->fProtocol < 11)
out = new TList;
TIter nxo(tmp);
TObject *o = 0;
while ((o = nxo()))
out->Add(o->Clone());
if (fProof->fProtocol < 11) {
out->SetOwner();
StoreOutput(out);
}
gSystem->RedirectOutput(0);
SetSelectorDataMembersFromOutputList();
SetCurrentQuery(qr);
Long64_t rc = Finalize();
RestorePreviousQuery();
return rc;
}
Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
{
if (!selector_file) {
Info("SendSelector", "Invalid input: selector (file) name undefined");
return kFALSE;
}
if (!strchr(gSystem->BaseName(selector_file), '.')) {
if (gDebug > 1)
Info("SendSelector", "selector name '%s' does not contain a '.':"
" nothing to send, it will be loaded from a library", selector_file);
return kTRUE;
}
TString selec = selector_file;
TString aclicMode;
TString arguments;
TString io;
selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
gSystem->ExpandPathName(selec);
TString mp(TROOT::GetMacroPath());
TString np(gSystem->DirName(selec));
if (!np.IsNull()) {
np += ":";
if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
mp.Insert(ip, np);
TROOT::SetMacroPath(mp);
if (gDebug > 0)
Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
}
}
TString header = selec;
header.Remove(header.Last('.'));
header += ".h";
if (gSystem->AccessPathName(header, kReadPermission)) {
TString h = header;
header.Remove(header.Last('.'));
header += ".hh";
if (gSystem->AccessPathName(header, kReadPermission)) {
Info("SendSelector",
"header file not found: tried: %s %s", h.Data(), header.Data());
return kFALSE;
}
}
if (fProof->SendFile(selec, (TProof::kBinary | TProof::kForward | TProof::kCp | TProof::kCpBin)) == -1) {
Info("SendSelector", "problems sending implementation file %s", selec.Data());
return kFALSE;
}
if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
Info("SendSelector", "problems sending header file %s", header.Data());
return kFALSE;
}
return kTRUE;
}
void TProofPlayerRemote::MergeOutput(Bool_t saveMemValues)
{
PDB(kOutput,1) Info("MergeOutput","Enter");
TObject *obj = 0;
if (fOutputLists) {
TIter next(fOutputLists);
TList *list;
while ( (list = (TList *) next()) ) {
if (!(obj = fOutput->FindObject(list->GetName()))) {
obj = list->First();
list->Remove(obj);
fOutput->Add(obj);
}
if ( list->IsEmpty() ) continue;
TMethodCall callEnv;
if (obj->IsA())
callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
if (callEnv.IsValid()) {
callEnv.SetParam((Long_t) list);
callEnv.Execute(obj);
} else {
while ( (obj = list->First()) ) {
fOutput->Add(obj);
list->Remove(obj);
}
}
}
SafeDelete(fOutputLists);
} else {
PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
}
if (!IsClient() || fProof->IsLite()) {
MergeOutputFiles();
}
TString key;
TNamed *nm = 0;
TList rmlist;
TIter nxo(fOutput);
while ((obj = nxo())) {
TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
if (pf) {
if (gProofServ) {
PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
TString dir(pf->GetOutputFileName());
PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
pf->SetDir(dir);
TUrl u(dir);
dir = u.GetFile();
TString pfx = gEnv->GetValue("Path.Localroot","");
if (!pfx.IsNull() &&
(!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
dir.Insert(0, pfx);
PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
pf->SetDir(dir, kTRUE);
pf->SetWorkerOrdinal(gProofServ ? gProofServ->GetOrdinal() : "0");
key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
pf->SetOutputFileName(nm->GetTitle());
rmlist.Add(nm);
} else if (TestBit(TVirtualProofPlayer::kIsSubmerger)) {
pf->SetOutputFileName(0);
pf->ResetBit(TProofOutputFile::kOutputFileNameSet);
}
dir = pf->GetFileName();
if (TestBit(TVirtualProofPlayer::kIsSubmerger)) {
dir += ".merger";
pf->SetMerged(kFALSE);
} else {
if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
}
pf->SetFileName(dir);
} else if (fProof->IsLite()) {
pf->SetWorkerOrdinal("0");
pf->SetDir(gSystem->DirName(pf->GetOutputFileName()));
TUrl u(pf->GetOutputFileName(), kTRUE);
pf->SetFileName(gSystem->BaseName(u.GetFile()));
pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
Printf("\nOutput file: %s", pf->GetOutputFileName());
}
} else {
PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
}
}
if (rmlist.GetSize() > 0) {
TIter nxrm(&rmlist);
while ((obj = nxrm()))
fOutput->Remove(obj);
rmlist.SetOwner(kTRUE);
}
if (saveMemValues) {
TPerfStats::Stop();
Long_t vmaxmst, rmaxmst;
TPerfStats::GetMemValues(vmaxmst, rmaxmst);
TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
}
PDB(kOutput,1) fOutput->Print();
PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
}
void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed)
{
if (IsClient()) {
fProof->Progress(total, processed);
} else {
TMessage m(kPROOF_PROGRESS);
m << total << processed;
gProofServ->GetSocket()->Send(m);
}
}
void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed,
Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{
PDB(kGlobal,1)
Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
initTime, procTime, evtrti, mbrti);
if (IsClient()) {
fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
} else {
TMessage m(kPROOF_PROGRESS);
m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
gProofServ->GetSocket()->Send(m);
}
}
void TProofPlayerRemote::Progress(TProofProgressInfo *pi)
{
if (pi) {
PDB(kGlobal,1)
Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
pi->fActWorkers, pi->fEffSessions);
if (IsClient()) {
fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
pi->fInitTime, pi->fProcTime,
pi->fEvtRateI, pi->fMBRateI,
pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
} else {
TMessage m(kPROOF_PROGRESS);
m << pi;
gProofServ->GetSocket()->Send(m);
}
} else {
Warning("Progress","TProofProgressInfo object undefined!");
}
}
void TProofPlayerRemote::Feedback(TList *objs)
{
fProof->Feedback(objs);
}
void TProofPlayerRemote::StopProcess(Bool_t abort, Int_t)
{
if (fPacketizer != 0)
fPacketizer->StopProcess(abort, kFALSE);
if (abort == kTRUE)
fExitStatus = kAborted;
else
fExitStatus = kStopped;
}
Int_t TProofPlayerRemote::AddOutputObject(TObject *obj)
{
PDB(kOutput,1)
Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
if (!obj) {
PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
return -1;
}
if (!fOutput)
fOutput = new THashList;
Bool_t merged = kTRUE;
TList *elists = dynamic_cast<TList *> (obj);
if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
TEventList *evlist = new TEventList("PROOF_EventList");
TIter nxevl(elists);
TEventList *evl = 0;
while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
TIter nxelem(fDSet->GetListOfElements());
TDSetElement *elem = 0;
while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
if (!strcmp(elem->GetFileName(), evl->GetName()))
break;
}
if (!elem) {
Error("AddOutputObject", "Found an event list for %s, but no object with"
" the same name in the TDSet", evl->GetName());
continue;
}
Long64_t offset = elem->GetTDSetOffset();
Long64_t *arr = evl->GetList();
Int_t num = evl->GetN();
if (arr && offset > 0)
for (Int_t i = 0; i < num; i++)
arr[i] += offset;
evlist->Add(evl);
}
SetLastMergingMsg(evlist);
Incorporate(evlist, fOutput, merged);
NotifyMemory(evlist);
if (merged)
SafeDelete(evlist);
return 1;
}
TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
if (pf) {
fMergeFiles = kTRUE;
if (!IsClient() || fProof->IsLite()) {
if (pf->IsMerge()) {
Bool_t hasfout = (pf->GetOutputFileName() &&
strlen(pf->GetOutputFileName()) > 0 &&
pf->TestBit(TProofOutputFile::kOutputFileNameSet)) ? kTRUE : kFALSE;
Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
if (setfout) {
TString ddir, ddopts;
if (gProofServ) {
ddir.Form("%s/", gProofServ->GetDataDir());
if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
}
TString outfile(pf->GetOutputFileName());
outfile.ReplaceAll("<datadir>/", ddir.Data());
if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
pf->SetOutputFileName(outfile);
if (gProofServ) {
if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
if (!fOutput->FindObject(key.Data()))
fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
}
TString of;
TProofServ::GetLocalServer(of);
if (of.IsNull()) {
of.Form("root://%s/", gSystem->HostName());
if (gSystem->Getenv("XRDPORT")) {
TString sp(gSystem->Getenv("XRDPORT"));
if (sp.IsDigit())
of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
}
}
TString sessionPath(gProofServ->GetSessionDir());
TProofServ::FilterLocalroot(sessionPath, of);
of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
if (TestBit(TVirtualProofPlayer::kIsSubmerger)) {
if (!of.EndsWith(".merger")) of += ".merger";
} else {
if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
}
pf->SetOutputFileName(of);
}
}
PDB(kOutput, 1) pf->Print();
}
} else {
Printf("Output file: %s", pf->GetOutputFileName());
}
}
SetLastMergingMsg(obj);
Incorporate(obj, fOutput, merged);
NotifyMemory(obj);
return (merged ? 1 : 0);
}
void TProofPlayerRemote::RedirectOutput(Bool_t on)
{
if (on && fProof && fProof->fLogFileW) {
TProofServ::SetErrorHandlerFile(fProof->fLogFileW);
fErrorHandler = SetErrorHandler(TProofServ::ErrorHandler);
} else if (!on) {
if (fErrorHandler) {
TProofServ::SetErrorHandlerFile(0);
SetErrorHandler(fErrorHandler);
}
}
}
void TProofPlayerRemote::AddOutput(TList *out)
{
PDB(kOutput,1) Info("AddOutput","Enter");
if (!out) {
PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
return;
}
if (!fOutput)
fOutput = new THashList;
Bool_t merged = kTRUE;
TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
if (elists) {
TEventList *evlist = new TEventList("PROOF_EventList");
TIter nxevl(elists);
TEventList *evl = 0;
while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
TIter nxelem(fDSet->GetListOfElements());
TDSetElement *elem = 0;
while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
if (!strcmp(elem->GetFileName(), evl->GetName()))
break;
}
if (!elem) {
Error("AddOutput", "Found an event list for %s, but no object with"
" the same name in the TDSet", evl->GetName());
continue;
}
Long64_t offset = elem->GetTDSetOffset();
Long64_t *arr = evl->GetList();
Int_t num = evl->GetN();
if (arr && offset > 0)
for (Int_t i = 0; i < num; i++)
arr[i] += offset;
evlist->Add(evl);
}
out->Remove(elists);
delete elists;
SetLastMergingMsg(evlist);
Incorporate(evlist, fOutput, merged);
NotifyMemory(evlist);
}
TIter nxo(out);
TObject *obj = 0;
while ((obj = nxo())) {
SetLastMergingMsg(obj);
Incorporate(obj, fOutput, merged);
if (!merged)
out->Remove(obj);
NotifyMemory(obj);
}
return;
}
void TProofPlayerRemote::NotifyMemory(TObject *obj)
{
if (fProof && (!IsClient() || fProof->IsLite())){
ProcInfo_t pi;
if (!gSystem->GetProcInfo(&pi)){
RedirectOutput(fProof->IsLite());
Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
pi.fMemVirtual, pi.fMemResident, obj->GetName());
RedirectOutput(0);
}
TPerfStats::SetMemValues();
}
}
void TProofPlayerRemote::SetLastMergingMsg(TObject *obj)
{
TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
TProofServ::SetLastMsg(lastMsg);
}
Int_t TProofPlayerRemote::Incorporate(TObject *newobj, TList *outlist, Bool_t &merged)
{
merged = kTRUE;
PDB(kOutput,1)
Info("Incorporate", "enter: obj: %p (%s), list: %p",
newobj, newobj ? newobj->ClassName() : "undef", outlist);
if (!newobj || !outlist) {
Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
return -1;
}
Bool_t specialH =
(!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
if (specialH && newobj->InheritsFrom(TH1::Class())) {
if (!HandleHistogram(newobj, merged)) {
if (merged) {
PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
} else {
PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
" appropriate list for delayed merging", newobj->GetName());
}
return 0;
}
}
TObject *obj = outlist->FindObject(newobj->GetName());
if (!obj) {
outlist->Add(newobj);
merged = kFALSE;
return 0;
}
TMethodCall callEnv;
if (obj->IsA())
callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
if (callEnv.IsValid()) {
static TList *xlist = new TList;
xlist->Add(newobj);
callEnv.SetParam((Long_t) xlist);
callEnv.Execute(obj);
xlist->Clear();
} else {
outlist->Add(newobj);
merged = kFALSE;
}
return 0;
}
TObject *TProofPlayerRemote::HandleHistogram(TObject *obj, Bool_t &merged)
{
TH1 *h = dynamic_cast<TH1 *>(obj);
if (!h) {
return obj;
}
merged = kFALSE;
Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
Int_t nent = h->GetBufferLength();
PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
h->GetName(), nent, h->GetBufferSize());
TList *list = 0;
if (!fOutputLists) {
PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
fOutputLists = new TList;
fOutputLists->SetOwner();
}
list = (TList *) fOutputLists->FindObject(h->GetName());
TH1 *href = 0;
if (tobebinned) {
if (!list) {
list = new TList;
list->SetName(h->GetName());
list->SetOwner();
fOutputLists->Add(list);
if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
fOutput->Remove(href);
list->Add(href);
}
}
TIter nxh(list);
while ((href = (TH1 *) nxh())) {
if (href->GetBuffer() && href->GetBufferLength() < nent) break;
}
if (href) {
list->AddBefore(href, h);
} else {
list->Add(h);
}
return (TObject *)0;
} else {
if (list) {
TIter nxh(list);
while ((href = (TH1 *) nxh())) {
if (href->GetBuffer() || href->GetEntries() < nent) break;
}
if (href) {
list->AddBefore(href, h);
} else {
list->Add(h);
}
return (TObject *)0;
} else {
TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
if (hout) {
fOutput->Remove(hout);
Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
list = new TList;
list->Add(hout);
h->Merge(list);
list->SetOwner();
delete list;
return h;
} else {
list = new TList;
list->SetName(h->GetName());
list->SetOwner();
fOutputLists->Add(list);
list->Add(hout);
list->Add(h);
return (TObject *)0;
}
} else {
fOutput->Add(h);
return (TObject *)0;
}
}
}
PDB(kOutput,1) Info("HandleHistogram", "leaving");
}
Bool_t TProofPlayerRemote::HistoSameAxis(TH1 *h0, TH1 *h1)
{
Bool_t rc = kFALSE;
if (!h0 || !h1) return rc;
TAxis *a0 = 0, *a1 = 0;
a0 = h0->GetXaxis();
a1 = h1->GetXaxis();
if (a0->GetNbins() == a1->GetNbins())
if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
if (h0->GetDimension() > 1) {
rc = kFALSE;
a0 = h0->GetYaxis();
a1 = h1->GetYaxis();
if (a0->GetNbins() == a1->GetNbins())
if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
}
if (h0->GetDimension() > 2) {
rc = kFALSE;
a0 = h0->GetZaxis();
a1 = h1->GetZaxis();
if (a0->GetNbins() == a1->GetNbins())
if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
}
return rc;
}
void TProofPlayerRemote::StoreOutput(TList *out)
{
PDB(kOutput,1) Info("StoreOutput","Enter");
if ( out == 0 ) {
PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
return;
}
TIter next(out);
out->SetOwner(kFALSE);
if (fOutputLists == 0) {
PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
fOutputLists = new TList;
fOutputLists->SetOwner();
}
TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
if (lists) {
out->Remove(lists);
TEventList *mainList = new TEventList("PROOF_EventList");
out->Add(mainList);
TIter it(lists);
TEventList *aList;
while ( (aList = dynamic_cast<TEventList*> (it())) ) {
TIter nxe(fDSet->GetListOfElements());
TDSetElement *elem;
while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
break;
}
if (!elem) {
Error("StoreOutput", "found the EventList for %s, but no object with that name "
"in the TDSet", aList->GetName());
continue;
}
Long64_t offset = elem->GetTDSetOffset();
Long64_t *arr = aList->GetList();
Int_t num = aList->GetN();
if (arr && offset)
for (int i = 0; i < num; i++)
arr[i] += offset;
mainList->Add(aList);
}
delete lists;
}
TObject *obj;
while( (obj = next()) ) {
PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
if ( list == 0 ) {
PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
list = new TList;
list->SetName( obj->GetName() );
list->SetOwner();
fOutputLists->Add( list );
}
list->Add( obj );
}
delete out;
PDB(kOutput,1) Info("StoreOutput", "leave");
}
TList *TProofPlayerRemote::MergeFeedback()
{
PDB(kFeedback,1)
Info("MergeFeedback","Enter");
if ( fFeedbackLists == 0 ) {
PDB(kFeedback,1)
Info("MergeFeedback","Leave (no output)");
return 0;
}
TList *fb = new TList;
fb->SetOwner();
TIter next(fFeedbackLists);
TMap *map;
while ( (map = (TMap*) next()) ) {
PDB(kFeedback,2)
Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
TList *list = new TList;
TIter keys(map);
#ifndef R__TH1MERGEFIXED
Int_t nbmx = -1;
TObject *oref = 0;
#endif
while ( TObject *key = keys() ) {
TObject *o = map->GetValue(key);
TH1 *h = dynamic_cast<TH1 *>(o);
#ifndef R__TH1MERGEFIXED
if (h && !strncmp(o->GetName(),"PROOF_",6)) {
if (h->GetNbinsX() > nbmx) {
nbmx= h->GetNbinsX();
oref = o;
}
}
#endif
if (h) {
TIter nxh(list);
TH1 *href= 0;
while ((href = (TH1 *)nxh())) {
if (h->GetBuffer()) {
if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
} else {
if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
}
}
if (href) {
list->AddBefore(href, h);
} else {
list->Add(h);
}
} else {
list->Add(o);
}
}
#ifdef R__TH1MERGEFIXED
TObject *obj = list->First();
#else
TObject *obj = (oref) ? oref : list->First();
#endif
list->Remove(obj);
obj = obj->Clone();
fb->Add(obj);
if ( list->IsEmpty() ) {
delete list;
continue;
}
TMethodCall callEnv;
if (obj->IsA())
callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
if (callEnv.IsValid()) {
callEnv.SetParam((Long_t) list);
callEnv.Execute(obj);
} else {
while ( (obj = list->First()) ) {
fb->Add(obj->Clone());
list->Remove(obj);
}
}
delete list;
}
PDB(kFeedback,1)
Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
return fb;
}
void TProofPlayerRemote::StoreFeedback(TObject *slave, TList *out)
{
PDB(kFeedback,1)
Info("StoreFeedback","Enter");
if ( out == 0 ) {
PDB(kFeedback,1)
Info("StoreFeedback","Leave (empty)");
return;
}
if ( IsClient() ) {
Feedback(out);
delete out;
return;
}
if (fFeedbackLists == 0) {
PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
fFeedbackLists = new TList;
fFeedbackLists->SetOwner();
}
TIter next(out);
out->SetOwner(kFALSE);
const char *ord = ((TSlave*) slave)->GetOrdinal();
TObject *obj;
while( (obj = next()) ) {
PDB(kFeedback,2)
Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
if ( map == 0 ) {
PDB(kFeedback,2)
Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
map = new TMap;
map->SetName(obj->GetName());
fFeedbackLists->Add(map);
} else {
PDB(kFeedback,2)
Info("StoreFeedback","%s: removing previous value", ord);
if (map->GetValue(slave))
delete map->GetValue(slave);
map->Remove(slave);
}
map->Add(slave, obj);
PDB(kFeedback,2)
Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
}
delete out;
PDB(kFeedback,1)
Info("StoreFeedback","Leave");
}
void TProofPlayerRemote::SetupFeedback()
{
if (IsClient()) return;
fFeedback = (TList*) fInput->FindObject("FeedbackList");
PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
fFeedback == 0 ? "NOT ":"");
if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
SafeDelete(fFeedbackTimer);
fFeedbackPeriod = 2000;
TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
}
void TProofPlayerRemote::StopFeedback()
{
if (fFeedbackTimer == 0) return;
PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
SafeDelete(fFeedbackTimer);
}
Bool_t TProofPlayerRemote::HandleTimer(TTimer *)
{
PDB(kFeedback,2) Info("HandleTimer","Entry");
if (fFeedbackTimer == 0) return kFALSE;
TList *fb = new TList;
fb->SetOwner();
TIter next(fFeedback);
while( TObjString *name = (TObjString*) next() ) {
TObject *o = fOutput->FindObject(name->GetName());
if (o != 0) {
fb->Add(o->Clone());
TMap *m = 0;
if (fFeedbackLists &&
(m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
fFeedbackLists->Remove(m);
m->DeleteValues();
delete m;
}
}
}
if (fb->GetSize() > 0) {
StoreFeedback(this, fb);
} else {
delete fb;
}
if (fFeedbackLists == 0) {
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
fb = MergeFeedback();
PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
TMessage m(kPROOF_FEEDBACK);
m << fb;
gProofServ->GetSocket()->Send(m);
delete fb;
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
TDSetElement *TProofPlayerRemote::GetNextPacket(TSlave *slave, TMessage *r)
{
SetInitTime();
if (fProcPackets) {
Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
if (bin >= 0) {
if (fProcPackets->GetBinContent(bin) > 0)
fProcPackets->Fill(slave->GetOrdinal(), -1);
}
}
TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
if (e == 0) {
PDB(kPacketizer,2)
Info("GetNextPacket","%s: done!", slave->GetOrdinal());
} else if (e == (TDSetElement*) -1) {
PDB(kPacketizer,2)
Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
} else {
PDB(kPacketizer,2)
Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
}
return e;
}
Bool_t TProofPlayerRemote::IsClient() const
{
return fProof ? fProof->TestBit(TProof::kIsClient) : kFALSE;
}
Long64_t TProofPlayerRemote::DrawSelect(TDSet *set, const char *varexp,
const char *selection, Option_t *option,
Long64_t nentries, Long64_t firstentry)
{
if (!fgDrawInputPars) {
fgDrawInputPars = new THashList;
fgDrawInputPars->Add(new TObjString("FeedbackList"));
fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
}
TString selector, objname;
if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
Error("DrawSelect", "parsing arguments");
return -1;
}
TNamed *varexpobj = new TNamed("varexp", varexp);
TNamed *selectionobj = new TNamed("selection", selection);
TObject *o = 0;
TList *savedInput = new TList;
TIter nxi(fInput);
while ((o = nxi())) {
savedInput->Add(o);
TString n(o->GetName());
if (fgDrawInputPars &&
!fgDrawInputPars->FindObject(o->GetName()) &&
!n.BeginsWith("alias:")) fInput->Remove(o);
}
fInput->Add(varexpobj);
fInput->Add(selectionobj);
if (objname == "") objname = "htemp";
fProof->AddFeedback(objname);
Long64_t r = Process(set, selector, option, nentries, firstentry);
fProof->RemoveFeedback(objname);
fInput->Remove(varexpobj);
fInput->Remove(selectionobj);
if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
fInput->Remove(opt);
delete opt;
}
delete varexpobj;
delete selectionobj;
fInput->Clear();
TIter nxsi(savedInput);
while ((o = nxsi()))
fInput->Add(o);
savedInput->SetOwner(kFALSE);
delete savedInput;
return r;
}
void TProofPlayerRemote::SetInitTime()
{
if (fPacketizer)
fPacketizer->SetInitTime();
}
ClassImp(TProofPlayerSlave)
void TProofPlayerSlave::SetupFeedback()
{
TList *fb = (TList*) fInput->FindObject("FeedbackList");
if (fb) {
PDB(kFeedback,1)
Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
} else {
PDB(kFeedback,1)
Info("SetupFeedback","\"FeedbackList\" NOT found");
}
if (fb == 0 || fb->GetSize() == 0) return;
SafeDelete(fFeedbackTimer);
fFeedbackPeriod = 2000;
TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
fFeedback = fb;
}
void TProofPlayerSlave::StopFeedback()
{
if (fFeedbackTimer == 0) return;
PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
SafeDelete(fFeedbackTimer);
}
Bool_t TProofPlayerSlave::HandleTimer(TTimer *)
{
PDB(kFeedback,2) Info("HandleTimer","Entry");
if (gProofServ) {
Bool_t sendm = kFALSE;
TMessage m(kPROOF_PROGRESS);
if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
sendm = kTRUE;
if (gProofServ->GetProtocol() > 25) {
m << GetProgressStatus();
} else if (gProofServ->GetProtocol() > 11) {
TProofProgressStatus *ps = GetProgressStatus();
m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
<< (Float_t) -1. << (Float_t) ps->GetProcTime()
<< (Float_t) ps->GetRate() << (Float_t) -1.;
} else {
m << fTotalEvents << GetEventsProcessed();
}
}
if (sendm) gProofServ->GetSocket()->Send(m);
}
if (fFeedback == 0) return kFALSE;
TList *fb = new TList;
fb->SetOwner(kFALSE);
if (fOutput == 0) {
fOutput = (THashList *) fSelector->GetOutputList();
}
if (fOutput) {
TIter next(fFeedback);
while( TObjString *name = (TObjString*) next() ) {
TObject *o = fOutput->FindObject(name->GetName());
if (o != 0) fb->Add(o);
}
}
PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
TMessage m(kPROOF_FEEDBACK);
m << fb;
gProofServ->GetSocket()->Send(m);
delete fb;
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
void TProofPlayerSlave::HandleGetTreeHeader(TMessage *mess)
{
TMessage answ(kPROOF_GETTREEHEADER);
TDSet *dset;
(*mess) >> dset;
dset->Reset();
TDSetElement *e = dset->Next();
Long64_t entries = 0;
TFile *f = 0;
TTree *t = 0;
if (!e) {
PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
} else {
f = TFile::Open(e->GetFileName());
t = 0;
if (f) {
t = (TTree*) f->Get(e->GetObjName());
if (t) {
t->SetMaxVirtualSize(0);
t->DropBaskets();
entries = t->GetEntries();
while ((e = dset->Next()) != 0) {
TFile *f1 = TFile::Open(e->GetFileName());
if (f1) {
TTree *t1 = (TTree*) f1->Get(e->GetObjName());
if (t1) {
entries += t1->GetEntries();
delete t1;
}
delete f1;
}
}
t->SetMaxEntryLoop(entries);
}
}
}
if (t)
answ << TString("Success") << t;
else
answ << TString("Failed") << t;
fSocket->Send(answ);
SafeDelete(t);
SafeDelete(f);
}
ClassImp(TProofPlayerSuperMaster)
Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first)
{
fProgressStatus->Reset();
PDB(kGlobal,1) Info("Process","Enter");
TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
if (!proof) return -1;
delete fOutput;
fOutput = new THashList;
TPerfStats::Start(fInput, fOutput);
if (!SendSelector(selector_file)) {
Error("Process", "sending selector %s", selector_file);
return -1;
}
TCleanup clean(this);
SetupFeedback();
if (proof->IsMaster()) {
if (!dset->ElementsValid()) {
proof->ValidateDSet(dset);
if (!dset->ElementsValid()) {
Error("Process", "could not validate TDSet");
return -1;
}
}
TList msds;
msds.SetOwner();
TList keyholder;
keyholder.SetOwner();
TList valueholder;
valueholder.SetOwner();
TIter nextslave(proof->GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
TList *submasters = 0;
TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
if (!msd) {
submasters = new TList;
submasters->SetName(sl->GetMsd());
keyholder.Add(submasters);
TList *setelements = new TSortedList(kSortDescending);
setelements->SetName(TString(sl->GetMsd())+"_Elements");
valueholder.Add(setelements);
msds.Add(new TPair(submasters, setelements));
} else {
submasters = dynamic_cast<TList*>(msd->Key());
}
if (submasters) submasters->Add(sl);
}
Long64_t cur = 0;
TIter nextelement(dset->GetListOfElements());
while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
if (elem->GetNum()<1) continue;
if (nentries !=-1 && cur>=first+nentries) {
break;
}
if (cur+elem->GetNum()-1<first) {
cur+=elem->GetNum();
continue;
}
if (cur<first) {
elem->SetNum(elem->GetNum()-(first-cur));
elem->SetFirst(elem->GetFirst()+first-cur);
cur=first;
}
if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
cur+=elem->GetNum();
} else {
elem->SetNum(first+nentries-cur);
cur=first+nentries;
}
TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
if (!msd) {
Error("Process", "data requires mass storage domain '%s'"
" which is not accessible in this proof session",
elem->GetMsd());
return -1;
} else {
TList *elements = dynamic_cast<TList*>(msd->Value());
if (elements) elements->Add(elem);
}
}
TList usedmasters;
TIter nextmsd(msds.MakeIterator());
while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
TList *submasters = dynamic_cast<TList*>(msd->Key());
TList *setelements = dynamic_cast<TList*>(msd->Value());
Int_t nmasters = submasters ? submasters->GetSize() : -1;
Int_t nelements = setelements ? setelements->GetSize() : -1;
for (Int_t i=0; i<nmasters; i++) {
Long64_t nent = 0;
TDSet set(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
for (Int_t j = (i*nelements)/nmasters;
j < ((i+1)*nelements)/nmasters;
j++) {
TDSetElement *elem = setelements ?
dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
if (elem) {
set.Add(elem->GetFileName(), elem->GetObjName(),
elem->GetDirectory(), elem->GetFirst(),
elem->GetNum(), elem->GetMsd());
nent += elem->GetNum();
} else {
Warning("Process", "not a TDSetElement object");
}
}
if (set.GetListOfElements()->GetSize()>0) {
TMessage mesg(kPROOF_PROCESS);
TString fn(gSystem->BaseName(selector_file));
TString opt = option;
mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
if (sl) {
PDB(kGlobal,1) Info("Process",
"Sending TDSet with %d elements to submaster %s",
set.GetListOfElements()->GetSize(),
sl->GetOrdinal());
sl->GetSocket()->Send(mesg);
usedmasters.Add(sl);
fSlaves.AddLast(sl);
fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
fSlaveActW.Set(fSlaveActW.GetSize()+1);
fSlaveActW[fSlaveActW.GetSize()-1] = 0;
fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
} else {
Warning("Process", "not a TSlave object");
}
}
}
}
if ( !IsClient() ) HandleTimer(0);
PDB(kGlobal,1) Info("Process","Calling Collect");
proof->Collect(&usedmasters);
HandleTimer(0);
}
StopFeedback();
PDB(kGlobal,1) Info("Process","Calling Merge Output");
MergeOutput();
TPerfStats::Stop();
return 0;
}
void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total, Long64_t processed)
{
Int_t idx = fSlaves.IndexOf(sl);
fSlaveProgress[idx] = processed;
if (fSlaveTotals[idx] != total)
Warning("Progress", "total events has changed for slave %s", sl->GetName());
fSlaveTotals[idx] = total;
Long64_t tot = 0;
Int_t i;
for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
Long64_t proc = 0;
for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
Progress(tot, proc);
}
void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total,
Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{
PDB(kGlobal,2)
Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
processed, bytesread, initTime, procTime, evtrti, mbrti);
Int_t idx = fSlaves.IndexOf(sl);
if (fSlaveTotals[idx] != total)
Warning("Progress", "total events has changed for slave %s", sl->GetName());
fSlaveTotals[idx] = total;
fSlaveProgress[idx] = processed;
fSlaveBytesRead[idx] = bytesread;
fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
Int_t i;
Long64_t tot = 0;
Long64_t proc = 0;
Long64_t bytes = 0;
Float_t init = -1.;
Float_t ptime = -1.;
Float_t erti = 0.;
Float_t srti = 0.;
Int_t nerti = 0;
Int_t nsrti = 0;
for (i = 0; i < fSlaveTotals.GetSize(); i++) {
tot += fSlaveTotals[i];
if (i < fSlaveProgress.GetSize())
proc += fSlaveProgress[i];
if (i < fSlaveBytesRead.GetSize())
bytes += fSlaveBytesRead[i];
if (i < fSlaveInitTime.GetSize())
if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
init = fSlaveInitTime[i];
if (i < fSlaveProcTime.GetSize())
if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
ptime = fSlaveProcTime[i];
if (i < fSlaveEvtRti.GetSize())
if (fSlaveEvtRti[i] > -1.) {
erti += fSlaveEvtRti[i];
nerti++;
}
if (i < fSlaveMBRti.GetSize())
if (fSlaveMBRti[i] > -1.) {
srti += fSlaveMBRti[i];
nsrti++;
}
}
srti = (nsrti > 0) ? srti / nerti : 0.;
Progress(tot, proc, bytes, init, ptime, erti, srti);
}
void TProofPlayerSuperMaster::Progress(TSlave *wrk, TProofProgressInfo *pi)
{
if (pi) {
PDB(kGlobal,2)
Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
pi->fTotal, pi->fProcessed, pi->fBytesRead,
pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
pi->fActWorkers, pi->fEffSessions);
Int_t idx = fSlaves.IndexOf(wrk);
if (fSlaveTotals[idx] != pi->fTotal)
Warning("Progress", "total events has changed for worker %s", wrk->GetName());
fSlaveTotals[idx] = pi->fTotal;
fSlaveProgress[idx] = pi->fProcessed;
fSlaveBytesRead[idx] = pi->fBytesRead;
fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
Int_t i;
Int_t nerti = 0;
Int_t nsrti = 0;
TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
for (i = 0; i < fSlaveTotals.GetSize(); i++) {
pisum.fTotal += fSlaveTotals[i];
if (i < fSlaveProgress.GetSize())
pisum.fProcessed += fSlaveProgress[i];
if (i < fSlaveBytesRead.GetSize())
pisum.fBytesRead += fSlaveBytesRead[i];
if (i < fSlaveInitTime.GetSize())
if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
pisum.fInitTime = fSlaveInitTime[i];
if (i < fSlaveProcTime.GetSize())
if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
pisum.fProcTime = fSlaveProcTime[i];
if (i < fSlaveEvtRti.GetSize())
if (fSlaveEvtRti[i] > -1.) {
pisum.fEvtRateI += fSlaveEvtRti[i];
nerti++;
}
if (i < fSlaveMBRti.GetSize())
if (fSlaveMBRti[i] > -1.) {
pisum.fMBRateI += fSlaveMBRti[i];
nsrti++;
}
if (i < fSlaveActW.GetSize())
pisum.fActWorkers += fSlaveActW[i];
if (i < fSlaveTotS.GetSize())
if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
pisum.fTotSessions = fSlaveTotS[i];
if (i < fSlaveEffS.GetSize())
if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
pisum.fEffSessions = fSlaveEffS[i];
}
pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
Progress(&pisum);
}
}
Bool_t TProofPlayerSuperMaster::HandleTimer(TTimer *)
{
if (fFeedbackTimer == 0) return kFALSE;
Int_t i;
Long64_t tot = 0;
Long64_t proc = 0;
Long64_t bytes = 0;
Float_t init = -1.;
Float_t ptime = -1.;
Float_t erti = 0.;
Float_t srti = 0.;
Int_t nerti = 0;
Int_t nsrti = 0;
for (i = 0; i < fSlaveTotals.GetSize(); i++) {
tot += fSlaveTotals[i];
if (i < fSlaveProgress.GetSize())
proc += fSlaveProgress[i];
if (i < fSlaveBytesRead.GetSize())
bytes += fSlaveBytesRead[i];
if (i < fSlaveInitTime.GetSize())
if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
init = fSlaveInitTime[i];
if (i < fSlaveProcTime.GetSize())
if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
ptime = fSlaveProcTime[i];
if (i < fSlaveEvtRti.GetSize())
if (fSlaveEvtRti[i] > -1.) {
erti += fSlaveEvtRti[i];
nerti++;
}
if (i < fSlaveMBRti.GetSize())
if (fSlaveMBRti[i] > -1.) {
srti += fSlaveMBRti[i];
nsrti++;
}
}
erti = (nerti > 0) ? erti / nerti : 0.;
srti = (nsrti > 0) ? srti / nerti : 0.;
TMessage m(kPROOF_PROGRESS);
if (gProofServ->GetProtocol() > 25) {
TProofProgressInfo pi(tot, proc, bytes, init, ptime,
erti, srti, -1,
gProofServ->GetTotSessions(), gProofServ->GetEffSessions());
m << π
} else {
m << tot << proc << bytes << init << ptime << erti << srti;
}
gProofServ->GetSocket()->Send(m);
if (fReturnFeedback)
return TProofPlayerRemote::HandleTimer(0);
else
return kFALSE;
}
void TProofPlayerSuperMaster::SetupFeedback()
{
if (IsClient()) return;
TProofPlayerRemote::SetupFeedback();
if (fFeedbackTimer) {
fReturnFeedback = kTRUE;
return;
} else {
fReturnFeedback = kFALSE;
}
SafeDelete(fFeedbackTimer);
fFeedbackPeriod = 2000;
TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
}