#include "TProofDraw.h"
#include "TProofPlayer.h"
#include "THashList.h"
#include "TEventIter.h"
#include "TVirtualPacketizer.h"
#include "TPacketizer.h"
#include "TPacketizerProgressive.h"
#include "TAdaptivePacketizer.h"
#include "TSelector.h"
#include "TSocket.h"
#include "TProofServ.h"
#include "TProof.h"
#include "TProofSuperMaster.h"
#include "TSlave.h"
#include "TROOT.h"
#include "TError.h"
#include "TException.h"
#include "MessageTypes.h"
#include "TMessage.h"
#include "TDSetProxy.h"
#include "TString.h"
#include "TSystem.h"
#include "TFile.h"
#include "TProofDebug.h"
#include "TTimer.h"
#include "TMap.h"
#include "TPerfStats.h"
#include "TStatus.h"
#include "TEventList.h"
#include "TProofLimitsFinder.h"
#include "TSortedList.h"
#include "TTreeDrawArgsParser.h"
#include "TCanvas.h"
#include "TNamed.h"
#include "TObjString.h"
#include "TQueryResult.h"
#include "TMD5.h"
#include "TMethodCall.h"
#include "TObjArray.h"
#include "TMutex.h"
#ifndef R__TH1MERGEFIXED
#include "TH1.h"
#endif
#include "TVirtualMonitoring.h"
#include "TParameter.h"
#define kPEX_STOPPED 1001
#define kPEX_ABORTED 1002
static Bool_t gAbort = kFALSE;
class TAutoBinVal : public TNamed {
private:
Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
public:
TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
{
fXmin = xmin; fXmax = xmax;
fYmin = ymin; fYmax = ymax;
fZmin = zmin; fZmax = zmax;
}
void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
Double_t& ymax, Double_t& zmin, Double_t& zmax)
{
xmin = fXmin; xmax = fXmax;
ymin = fYmin; ymax = fYmax;
zmin = fZmin; zmax = fZmax;
}
};
ClassImp(TProofPlayer)
TProofPlayer::TProofPlayer()
: fAutoBins(0), fOutput(0), fSelector(0), fSelectorClass(0),
fFeedbackTimer(0), fEvIter(0), fSelStatus(0), fEventsProcessed(0),
fTotalEvents(0), fQueryResults(0), fQuery(0), fDrawQueries(0),
fMaxDrawQueries(1), fStopTimer(0), fStopTimerMtx(0)
{
fInput = new TList;
fExitStatus = kFinished;
}
TProofPlayer::~TProofPlayer()
{
SafeDelete(fInput);
SafeDelete(fSelector);
SafeDelete(fFeedbackTimer);
SafeDelete(fEvIter);
SafeDelete(fQueryResults);
}
void TProofPlayer::StopProcess(Bool_t abort, Int_t timeout)
{
if (gDebug > 0)
Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
if (fEvIter != 0)
fEvIter->StopProcess(abort);
Long_t to = 1;
if (abort == kTRUE) {
fExitStatus = kAborted;
} else {
fExitStatus = kStopped;
to = timeout;
}
if (to > 0)
SetStopTimer(kTRUE, abort, to);
}
void TProofPlayer::SetStopTimer(Bool_t on, Bool_t abort, Int_t timeout)
{
fStopTimerMtx = (fStopTimerMtx) ? fStopTimerMtx : new TMutex(kTRUE);
R__LOCKGUARD(fStopTimerMtx);
if (on) {
if (gDebug > 0)
Info ("SetStopTimer","START: %d, timeout: %d", abort, timeout);
if (timeout > 864000) {
Warning("SetStopTimer",
"Abnormous timeout value (%d): corruption? setting to 0", timeout);
timeout = 0;
}
timeout = (timeout <= 0) ? 1 : timeout;
fStopTimer = new TTimer((timeout * 1000), kFALSE);
if (abort)
fStopTimer->Connect("Timeout()", "TProofPlayer", this, "HandleAbortTimer()");
else
fStopTimer->Connect("Timeout()", "TProofPlayer", this, "HandleStopTimer()");
fStopTimer->Start(-1, kTRUE);
} else {
if (gDebug > 0)
Info ("SetStopTimer",
"STOP: %d, timeout: %d, timer: %p", abort, timeout, fStopTimer);
if (fStopTimer) {
fStopTimer->Stop();
if (abort)
fStopTimer->Disconnect("Timeout()", this, "HandleAbortTimer()");
else
fStopTimer->Disconnect("Timeout()", this, "HandleStopTimer()");
SafeDelete(fStopTimer);
}
}
}
void TProofPlayer::HandleStopTimer()
{
if (gDebug > 0)
Info ("HandleAbortTimer","called!");
Throw(kPEX_STOPPED);
}
void TProofPlayer::HandleAbortTimer()
{
if (gDebug > 0)
Info ("HandleAbortTimer","called!");
Throw(kPEX_ABORTED);
}
void TProofPlayer::AddQueryResult(TQueryResult *q)
{
if (!q) {
Warning("AddQueryResult","query undefined - do nothing");
return;
}
if (!(q->IsDraw())) {
if (!fQueryResults) {
fQueryResults = new TList;
fQueryResults->Add(q);
} else {
TIter nxr(fQueryResults);
TQueryResult *qr = 0;
TQueryResult *qp = 0;
while ((qr = (TQueryResult *) nxr())) {
if (*qr == *q) {
fQueryResults->Remove(qr);
delete qr;
break;
}
if (qr->GetEndTime().Convert() < q->GetEndTime().Convert())
qp = qr;
}
if (!qp) {
fQueryResults->AddFirst(q);
} else {
fQueryResults->AddAfter(qp, q);
}
}
} else if(IsClient()) {
if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
TIter nxr(fQueryResults);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxr())) {
if (qr->IsDraw()) {
fDrawQueries--;
fQueryResults->Remove(qr);
delete qr;
break;
}
}
}
if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
fDrawQueries++;
if (!fQueryResults)
fQueryResults = new TList;
fQueryResults->Add(q);
}
}
}
void TProofPlayer::RemoveQueryResult(const char *ref)
{
if (fQueryResults) {
TIter nxq(fQueryResults);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq())) {
if (qr->Matches(ref)) {
fQueryResults->Remove(qr);
delete qr;
}
}
}
}
TQueryResult *TProofPlayer::GetQueryResult(const char *ref)
{
if (fQueryResults) {
if (ref && strlen(ref) > 0) {
TIter nxq(fQueryResults);
TQueryResult *qr = 0;
while ((qr = (TQueryResult *) nxq())) {
if (qr->Matches(ref))
return qr;
}
} else {
return (TQueryResult *) fQueryResults->Last();
}
}
return (TQueryResult *)0;
}
void TProofPlayer::SetCurrentQuery(TQueryResult *q)
{
fPreviousQuery = fQuery;
fQuery = q;
}
void TProofPlayer::AddInput(TObject *inp)
{
fInput->Add(inp);
}
void TProofPlayer::ClearInput()
{
fInput->Clear();
}
TObject *TProofPlayer::GetOutput(const char *name) const
{
if (fOutput != 0) {
return fOutput->FindObject(name);
} else {
return 0;
}
}
TList *TProofPlayer::GetOutputList() const
{
TList *ol = fOutput;
if (!ol && fQuery)
ol = fQuery->GetOutputList();
return ol;
}
Int_t TProofPlayer::ReinitSelector(TQueryResult *qr)
{
Int_t rc = 0;
if (!qr) {
Info("ReinitSelector", "query undefined - do nothing");
return -1;
}
TString selec = qr->GetSelecImp()->GetName();
if (selec.Length() <= 0) {
Info("ReinitSelector", "selector name undefined - do nothing");
return -1;
}
Bool_t stdselec = TSelector::IsStandardDraw(selec);
Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
TString ipathold;
if (!stdselec && !compselec) {
Bool_t expandselec = kTRUE;
TString dir, ipath;
char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
if (selc) {
TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
md5icur = TMD5::FileChecksum(selc);
md5iold = qr->GetSelecImp()->Checksum();
char *selh = StrDup(selc);
char *p = (char *) strrchr(selh,'.');
if (p) strcpy(p+1,"h");
if (!gSystem->AccessPathName(selh, kReadPermission))
md5hcur = TMD5::FileChecksum(selh);
md5hold = qr->GetSelecHdr()->Checksum();
if (*md5hcur == *md5hold && *md5icur == *md5iold)
expandselec = kFALSE;
SafeDelete(md5icur);
SafeDelete(md5hcur);
SafeDelete(md5iold);
SafeDelete(md5hold);
if (selc) delete [] selc;
if (selh) delete [] selh;
}
Bool_t ok = kTRUE;
if (expandselec) {
ok = kFALSE;
TUUID u;
dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
if (!(gSystem->MakeDirectory(dir))) {
selec = Form("%s/%s",dir.Data(),selec.Data());
qr->GetSelecImp()->SaveSource(selec);
TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
qr->GetSelecHdr()->SaveSource(seleh);
ipathold = gSystem->GetIncludePath();
ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
gSystem->SetIncludePath(ipath.Data());
ok = kTRUE;
}
} else {
TString opts(qr->GetOptions());
Int_t idx = opts.Index("#");
if (idx != kNPOS) {
opts.Remove(0,idx+1);
selec += opts;
}
}
if (!ok) {
Info("ReinitSelector", "problems locating or exporting selector files");
return -1;
}
}
SafeDelete(fSelector);
fSelectorClass = 0;
Int_t iglevelsave = gErrorIgnoreLevel;
if (compselec)
gErrorIgnoreLevel = kBreak;
if ((fSelector = TSelector::GetSelector(selec))) {
if (compselec)
gErrorIgnoreLevel = iglevelsave;
fSelectorClass = fSelector->IsA();
fSelector->SetOption(qr->GetOptions());
if (stdselec) {
fSelector->SetInputList(qr->GetInputList());
((TProofDraw *)fSelector)->DefVar();
}
} else {
if (compselec) {
gErrorIgnoreLevel = iglevelsave;
if (strlen(qr->GetLibList()) > 0) {
TString sl(qr->GetLibList());
TObjArray *oa = sl.Tokenize(" ");
if (oa) {
Bool_t retry = kFALSE;
TIter nxl(oa);
TObjString *os = 0;
while ((os = (TObjString *) nxl())) {
TString lib = gSystem->BaseName(os->GetName());
if (lib != "lib") {
lib.ReplaceAll("-l", "lib");
if (gSystem->Load(lib) == 0)
retry = kTRUE;
}
}
if (retry && (fSelector = TSelector::GetSelector(selec))) {
fSelectorClass = fSelector->IsA();
fSelector->SetOption(qr->GetOptions());
}
}
}
}
if (!fSelector) {
if (compselec)
Info("ReinitSelector", "compiled selector re-init failed:"
" automatic reload unsuccessful:"
" please load manually the correct library");
rc = -1;
}
}
if (ipathold.Length() > 0)
gSystem->SetIncludePath(ipathold.Data());
return rc;
}
Int_t TProofPlayer::AddOutputObject(TObject *)
{
MayNotUse("AddOutputObject");
return -1;
}
void TProofPlayer::AddOutput(TList *)
{
MayNotUse("AddOutput");
}
void TProofPlayer::StoreOutput(TList *)
{
MayNotUse("StoreOutput");
}
void TProofPlayer::StoreFeedback(TObject *, TList *)
{
MayNotUse("StoreFeedback");
}
void TProofPlayer::Progress(Long64_t , Long64_t )
{
MayNotUse("Progress");
}
void TProofPlayer::Progress(Long64_t , Long64_t ,
Long64_t ,
Float_t , Float_t ,
Float_t , Float_t )
{
MayNotUse("Progress");
}
void TProofPlayer::Feedback(TList *)
{
MayNotUse("Feedback");
}
Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first, TEventList * )
{
PDB(kGlobal,1) Info("Process","Enter");
fExitStatus = kFinished;
fOutput = 0;
SafeDelete(fSelector);
fSelectorClass = 0;
TRY {
if (!(fSelector = TSelector::GetSelector(selector_file))) {
Error("Process", "cannot load: %s", selector_file );
return -1;
}
} CATCH(excode) {
Error("Process","exception %d caught", excode);
Error("Process", "cannot load: %s", selector_file );
return -1;
} ENDTRY;
fSelectorClass = fSelector->IsA();
Int_t version = fSelector->Version();
fOutput = fSelector->GetOutputList();
TPerfStats::Start(fInput, fOutput);
fSelStatus = new TStatus;
fOutput->Add(fSelStatus);
TCleanup clean(this);
SetupFeedback();
fSelector->SetOption(option);
fSelector->SetInputList(fInput);
fTotalEvents = nentries;
if (fTotalEvents < 0 && gProofServ &&
gProofServ->IsMaster() && !gProofServ->IsParallel()) {
dset->Validate();
dset->Reset();
TDSetElement *e = 0;
while ((e = dset->Next())) {
fTotalEvents += e->GetNum();
}
}
dset->Reset();
fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
if (version == 0) {
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
} else {
if (IsClient()) {
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
}
if (fSelStatus->IsOk()) {
PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
fSelector->SlaveBegin(0);
}
}
if (gMonitoringWriter)
gMonitoringWriter->SendProcessingStatus("STARTED",kTRUE);
PDB(kLoop,1) Info("Process","Looping over Process()");
Long64_t readbytesatstart = 0;
readbytesatstart = TFile::GetFileBytesRead();
if (gMonitoringWriter)
gMonitoringWriter->SendProcessingProgress(0,0,kTRUE);
gAbort = kFALSE;
Long64_t entry;
fEventsProcessed = 0;
while ((entry = fEvIter->GetNextEvent()) >= 0 && fSelStatus->IsOk()) {
Bool_t ok = kTRUE;
if (version == 0) {
PDB(kLoop,3)Info("Process","Call ProcessCut(%lld)", entry);
if (fSelector->ProcessCut(entry)) {
PDB(kLoop,3)Info("Process","Call ProcessFill(%lld)", entry);
fSelector->ProcessFill(entry);
}
} else {
PDB(kLoop,3)Info("Process","Call Process(%lld)", entry);
TRY {
fSelector->Process(entry);
} CATCH(excode) {
ok = kFALSE;
if (excode == kPEX_STOPPED) {
Info("Process","received stop-process signal");
} else if (excode == kPEX_ABORTED) {
gAbort = kTRUE;
Info("Process","received abort-process signal");
} else {
Error("Process","exception %d caught", excode);
fExitStatus = kAborted;
}
break;
} ENDTRY;
if (fSelector->GetAbort() == TSelector::kAbortProcess) break;
}
if (ok) {
fEventsProcessed++;
if (gMonitoringWriter)
gMonitoringWriter->SendProcessingProgress(fEventsProcessed,TFile::GetFileBytesRead()-readbytesatstart, kFALSE);
}
gSystem->DispatchOneEvent(kTRUE);
if (!ok || gROOT->IsInterrupted()) break;
}
PDB(kGlobal,2) Info("Process","%lld events processed",fEventsProcessed);
if (gMonitoringWriter) {
gMonitoringWriter->SendProcessingProgress(fEventsProcessed,TFile::GetFileBytesRead()-readbytesatstart, kFALSE);
gMonitoringWriter->SendProcessingStatus("DONE");
}
if (fStopTimer != 0)
SetStopTimer(kFALSE, gAbort);
if (fFeedbackTimer != 0)
HandleTimer(0);
StopFeedback();
SafeDelete(fEvIter);
if (fExitStatus != kAborted) {
if (fSelStatus->IsOk()) {
if (version == 0) {
PDB(kLoop,1) Info("Process","Call Terminate()");
fSelector->Terminate();
} else {
PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
fSelector->SlaveTerminate();
if (IsClient() && fSelStatus->IsOk()) {
PDB(kLoop,1) Info("Process","Call Terminate()");
fSelector->Terminate();
}
}
}
if (gProofServ && !gProofServ->IsParallel()) {
TIter next(gROOT->GetListOfCanvases());
while (TCanvas* c = dynamic_cast<TCanvas*> (next()))
fOutput->Add(c);
}
}
TPerfStats::Stop();
return 0;
}
Long64_t TProofPlayer::Finalize(Bool_t, Bool_t)
{
MayNotUse("Finalize");
return -1;
}
Long64_t TProofPlayer::Finalize(TQueryResult *)
{
MayNotUse("Finalize");
return -1;
}
void TProofPlayer::UpdateAutoBin(const char *name,
Double_t& xmin, Double_t& xmax,
Double_t& ymin, Double_t& ymax,
Double_t& zmin, Double_t& zmax)
{
if ( fAutoBins == 0 ) {
fAutoBins = new THashList;
}
TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
if ( val == 0 ) {
if (gProofServ && !gProofServ->IsTopMaster()) {
TString key = name;
TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
}
val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
fAutoBins->Add(val);
} else {
val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
}
}
TDSetElement *TProofPlayer::GetNextPacket(TSlave *, TMessage *)
{
MayNotUse("GetNextPacket");
return 0;
}
void TProofPlayer::SetupFeedback()
{
MayNotUse("SetupFeedback");
}
void TProofPlayer::StopFeedback()
{
MayNotUse("StopFeedback");
}
Long64_t TProofPlayer::DrawSelect(TDSet * , const char * ,
const char * , Option_t * ,
Long64_t , Long64_t )
{
MayNotUse("DrawSelect");
return 0;
}
ClassImp(TProofPlayerLocal)
ClassImp(TProofPlayerRemote)
TProofPlayerRemote::~TProofPlayerRemote()
{
SafeDelete(fOutput);
SafeDelete(fOutputLists);
if (fFeedbackLists != 0) {
TIter next(fFeedbackLists);
while (TMap *m = (TMap*) next()) {
m->DeleteValues();
}
}
SafeDelete(fFeedbackLists);
SafeDelete(fPacketizer);
}
Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first, TEventList * )
{
PDB(kGlobal,1) Info("Process","Enter");
fDSet = dset;
fExitStatus = kFinished;
fEventsProcessed = 0;
if (!fOutput)
fOutput = new TList;
else
fOutput->Clear();
if (fProof->IsMaster()){
TPerfStats::Start(fInput, fOutput);
} else {
TPerfStats::Setup(fInput);
}
if(!SendSelector(selector_file)) return -1;
TMessage mesg(kPROOF_PROCESS);
TString fn(gSystem->BaseName(selector_file));
Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
TDSet *set = dset;
if (fProof->IsMaster()) {
PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
dset->GetDirectory() );
delete fPacketizer;
TNamed *packetizer;
if ((packetizer = (TNamed*)fInput->FindObject("PROOF_Packetizer")) != 0) {
Info("Process","Using Alternate Packetizer: %s", packetizer->GetTitle());
TClass *cl = gROOT->GetClass(packetizer->GetTitle());
if (cl == 0) {
Error("Process","Class '%s' not found", packetizer->GetTitle());
fExitStatus = kAborted;
return -1;
}
TMethodCall callEnv;
callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*");
if (!callEnv.IsValid()) {
Error("Process","Cannot find correct constructor for '%s'", cl->GetName());
fExitStatus = kAborted;
return -1;
}
callEnv.ResetParam();
callEnv.SetParam((Long_t) dset);
callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
callEnv.SetParam((Long64_t) first);
callEnv.SetParam((Long64_t) nentries);
callEnv.SetParam((Long_t) fInput);
Long_t ret = 0;
callEnv.Execute(ret);
fPacketizer = (TVirtualPacketizer*) ret;
if (fPacketizer == 0) {
Error("Process","Cannot construct '%s'", cl->GetName());
fExitStatus = kAborted;
return -1;
}
} else {
PDB(kGlobal,1) Info("Process","Using Standard TPacketizer");
fPacketizer = new TPacketizer(dset, fProof->GetListOfActiveSlaves(),
first, nentries, fInput);
}
if ( !fPacketizer->IsValid() ) {
fExitStatus = kAborted;
return -1;
}
first = 0;
} else {
if (fOutputLists) {
fOutputLists->Delete();
delete fOutputLists;
fOutputLists = 0;
}
if (!sync) {
gSystem->RedirectOutput(fProof->fLogFileName);
Printf(" ");
Info("Process","starting new query");
}
SafeDelete(fSelector);
fSelectorClass = 0;
if (!(fSelector = TSelector::GetSelector(selector_file))) {
if (!sync)
gSystem->RedirectOutput(0);
return -1;
}
fSelectorClass = fSelector->IsA();
fSelector->SetInputList(fInput);
fSelector->SetOption(option);
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
if (!sync)
gSystem->RedirectOutput(0);
}
TCleanup clean(this);
SetupFeedback();
TString opt = option;
TEventList* elist = 0;
if (!fProof->IsMaster() && set->GetEventList()) {
elist = set->GetEventList();
}
if (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) {
mesg << set << fn << fInput << opt << (Long64_t)-1 << (Long64_t)0 << elist << sync;
} else {
mesg << set << fn << fInput << opt << nentries << first << elist << sync;
}
PDB(kGlobal,1) Info("Process","Calling Broadcast");
fProof->Broadcast(mesg);
if (IsClient())
fProof->fRedirLog = kTRUE;
if (!sync) {
if (IsClient()) {
PDB(kGlobal,1) Info("Process","Asynchronous processing:"
" activating CollectInputFrom");
fProof->Activate();
fProof->Collect();
return fProof->fSeqNum;
} else {
PDB(kGlobal,1) Info("Process","Calling Collect");
fProof->Collect();
HandleTimer(0);
StopFeedback();
return Finalize(kFALSE,sync);
}
} else {
PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
fProof->Collect();
if (IsClient())
fProof->fRedirLog = kFALSE;
if (!IsClient()) {
HandleTimer(0);
if (fPacketizer && fQuery)
fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
fPacketizer->GetInitTime(),
fPacketizer->GetProcTime());
}
StopFeedback();
if (!IsClient() || GetExitStatus() != TProofPlayer::kAborted)
return Finalize(kFALSE,sync);
else
return -1;
}
}
Long64_t TProofPlayerRemote::Finalize(Bool_t force, Bool_t sync)
{
if (IsClient()) {
if (fOutputLists == 0) {
if (force)
if (fQuery)
return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
fQuery->GetName()), force);
} else {
if (fProof->fProtocol < 11) {
PDB(kGlobal,1) Info("Finalize","Calling Merge Output");
MergeOutput();
}
}
}
Long64_t rv = 0;
if (fProof->IsMaster()) {
TPerfStats::Stop();
fOutput->SetOwner();
SafeDelete(fSelector);
} else {
if (fExitStatus != kAborted) {
if (!sync) {
if (ReinitSelector(fQuery) == -1) {
Info("Finalize", "problems reinitializing selector \"%s\"",
fQuery->GetSelecImp()->GetName());
return -1;
}
}
fSelector->SetInputList(fInput);
TIter next(fOutput);
TList *output = fSelector->GetOutputList();
while(TObject* obj = next()) {
if (!fProof->IsParallel()) {
if (TCanvas* c = dynamic_cast<TCanvas *> (obj))
c->Draw();
else
output->Add(obj);
}
else
output->Add(obj);
}
PDB(kLoop,1) Info("Process","Call Terminate()");
fOutput->Clear("nodelete");
fSelector->Terminate();
rv = fSelector->GetStatus();
TIter it(output);
while(TObject* o = it()) {
fOutput->Add(o);
}
if (fQuery) {
fQuery->SetOutputList(fOutput);
fQuery->SetFinalized();
} else {
Warning("Finalize","current TQueryResult object is undefined!");
}
output->SetOwner(kFALSE);
SafeDelete(fSelector);
fOutput->SetOwner(kFALSE);
SafeDelete(fOutput);
}
}
PDB(kGlobal,1) Info("Process","exit");
return rv;
}
Long64_t TProofPlayerRemote::Finalize(TQueryResult *qr)
{
PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
if (!IsClient()) {
Info("Finalize(TQueryResult *)","method to be executed only clients");
return -1;
}
if (!qr) {
Info("Finalize(TQueryResult *)", "query undefined");
return -1;
}
if (qr->IsFinalized()) {
Info("Finalize(TQueryResult *)", "query already finalized");
return -1;
}
if (!fOutput)
fOutput = new TList;
else
fOutput->Clear();
if (fOutputLists) {
fOutputLists->Delete();
delete fOutputLists;
fOutputLists = 0;
}
gSystem->RedirectOutput(fProof->fLogFileName);
TList *tmp = (TList *) qr->GetOutputList();
if (!tmp) {
gSystem->RedirectOutput(0);
Info("Finalize(TQueryResult *)", "ouputlist is empty");
return -1;
}
TList *out = fOutput;
if (fProof->fProtocol < 11)
out = new TList;
TIter nxo(tmp);
TObject *o = 0;
while ((o = nxo()))
out->Add(o->Clone());
if (fProof->fProtocol < 11) {
out->SetOwner();
StoreOutput(out);
}
gSystem->RedirectOutput(0);
SetCurrentQuery(qr);
Long64_t rc = Finalize();
RestorePreviousQuery();
return rc;
}
Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
{
if (!selector_file) {
Info("SendSelector", "Invalid input: selector (file) name undefined");
return kFALSE;
}
if (!strchr(gSystem->BaseName(selector_file), '.')) {
if (gDebug > 1)
Info("SendSelector", "selector name '%s' does not contain a '.':"
" nothing to send, it will be loaded from a library", selector_file);
return kTRUE;
}
const char *cext[3] = { ".C", ".cxx", ".cc" };
Int_t e = 0;
for ( ; e < 3; e++)
if (strstr(selector_file, cext[e]))
break;
if (e >= 3) {
Info("SendSelector",
"Invalid extension: %s (supportd extensions: .C, .cxx, .cc", selector_file);
return kFALSE;
}
Int_t l = strlen(cext[e]);
TString selec = selector_file;
TString aclicMode;
TString arguments;
TString io;
selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
TString header = selec;
header.Replace(header.Length()-l, l,".h");
if (gSystem->AccessPathName(header, kReadPermission)) {
TString h = header;
header = selec;
header.Replace(header.Length()-l, l,".hh");
if (gSystem->AccessPathName(header, kReadPermission)) {
Info("SendSelector",
"header file not found: tried: %s %s", h.Data(), header.Data());
return kFALSE;
}
}
if (fProof->SendFile(selec) == -1) {
Info("SendSelector", "problems sending implementation file %s", selec.Data());
return kFALSE;
}
if (fProof->SendFile(header) == -1) {
Info("SendSelector", "problems sending header file %s", header.Data());
return kFALSE;
}
return kTRUE;
}
void TProofPlayerRemote::MergeOutput()
{
PDB(kOutput,1) Info("MergeOutput","Enter");
if (fOutputLists == 0) {
PDB(kOutput,1) Info("MergeOutput","Leave (no output)");
return;
}
TIter next(fOutputLists);
TList *list;
while ( (list = (TList *) next()) ) {
TObject *obj = fOutput->FindObject(list->GetName());
if (obj == 0) {
obj = list->First();
list->Remove(obj);
fOutput->Add(obj);
}
if ( list->IsEmpty() ) continue;
TMethodCall callEnv;
if (obj->IsA())
callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
if (callEnv.IsValid()) {
callEnv.SetParam((Long_t) list);
callEnv.Execute(obj);
} else {
while ( (obj = list->First()) ) {
fOutput->Add(obj);
list->Remove(obj);
}
}
}
SafeDelete(fOutputLists);
PDB(kOutput,1) Info("MergeOutput","Leave (%d object(s))", fOutput->GetSize());
}
void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed)
{
fProof->Progress(total, processed);
}
void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed,
Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{
PDB(kGlobal,1)
Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
initTime, procTime, evtrti, mbrti);
fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
}
void TProofPlayerRemote::Feedback(TList *objs)
{
fProof->Feedback(objs);
}
void TProofPlayerRemote::StopProcess(Bool_t abort, Int_t)
{
if (fPacketizer != 0)
fPacketizer->StopProcess(abort);
if (abort == kTRUE)
fExitStatus = kAborted;
else
fExitStatus = kStopped;
}
Int_t TProofPlayerRemote::AddOutputObject(TObject *obj)
{
PDB(kOutput,1) Info("AddOutputObject","Enter");
if (!obj) {
PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
return -1;
}
if (!fOutput)
fOutput = new TList;
Bool_t merged = kTRUE;
TList *elists = dynamic_cast<TList *> (obj);
if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
TEventList *evlist = new TEventList("PROOF_EventList");
TIter nxevl(elists);
TEventList *evl = 0;
while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
TIter nxelem(fDSet->GetListOfElements());
TDSetElement *elem = 0;
while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
if (!strcmp(elem->GetFileName(), evl->GetName()))
break;
}
if (!elem) {
Error("AddOutputObject", "Found an event list for %s, but no object with"
" the same name in the TDSet", evl->GetName());
continue;
}
Long64_t offset = elem->GetTDSetOffset();
Long64_t *arr = evl->GetList();
Int_t num = evl->GetN();
if (arr && offset > 0)
for (Int_t i = 0; i < num; i++)
arr[i] += offset;
evlist->Add(evl);
}
Incorporate(evlist, fOutput, merged);
if (merged)
SafeDelete(evlist);
return 1;
}
Incorporate(obj, fOutput, merged);
return (merged ? 1 : 0);
}
void TProofPlayerRemote::AddOutput(TList *out)
{
PDB(kOutput,1) Info("AddOutput","Enter");
if (!out) {
PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
return;
}
if (!fOutput)
fOutput = new TList;
Bool_t merged = kTRUE;
TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
if (elists) {
TEventList *evlist = new TEventList("PROOF_EventList");
TIter nxevl(elists);
TEventList *evl = 0;
while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
TIter nxelem(fDSet->GetListOfElements());
TDSetElement *elem = 0;
while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
if (!strcmp(elem->GetFileName(), evl->GetName()))
break;
}
if (!elem) {
Error("AddOutput", "Found an event list for %s, but no object with"
" the same name in the TDSet", evl->GetName());
continue;
}
Long64_t offset = elem->GetTDSetOffset();
Long64_t *arr = evl->GetList();
Int_t num = evl->GetN();
if (arr && offset > 0)
for (Int_t i = 0; i < num; i++)
arr[i] += offset;
evlist->Add(evl);
}
out->Remove(elists);
delete elists;
Incorporate(evlist, fOutput, merged);
}
TIter nxo(out);
TObject *obj = 0;
while ((obj = nxo())) {
Incorporate(obj, fOutput, merged);
if (!merged)
out->Remove(obj);
}
return;
}
Int_t TProofPlayerRemote::Incorporate(TObject *newobj, TList *outlist, Bool_t &merged)
{
merged = kTRUE;
if (!newobj || !outlist) {
Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
return -1;
}
TObject *obj = outlist->FindObject(newobj->GetName());
if (!obj) {
outlist->Add(newobj);
merged = kFALSE;
return 0;
}
TMethodCall callEnv;
if (obj->IsA())
callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
if (callEnv.IsValid()) {
static TList *xlist = new TList;
xlist->Add(newobj);
callEnv.SetParam((Long_t) xlist);
callEnv.Execute(obj);
xlist->Clear();
} else {
outlist->Add(newobj);
merged = kFALSE;
}
return 0;
}
void TProofPlayerRemote::StoreOutput(TList *out)
{
PDB(kOutput,1) Info("StoreOutput","Enter");
if ( out == 0 ) {
PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
return;
}
TIter next(out);
out->SetOwner(kFALSE);
if (fOutputLists == 0) {
PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
fOutputLists = new TList;
fOutputLists->SetOwner();
}
TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
if (lists) {
out->Remove(lists);
TEventList *mainList = new TEventList("PROOF_EventList");
out->Add(mainList);
TIter it(lists);
TEventList *aList;
while ( (aList = dynamic_cast<TEventList*> (it())) ) {
TIter next(fDSet->GetListOfElements());
TDSetElement *elem;
while ( (elem = dynamic_cast<TDSetElement*> (next())) ) {
if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
break;
}
if (!elem) {
Error("StoreOutput",Form("Found the EventList for %s, but no object with that name "
"in the TDSet", aList->GetName()));
continue;
}
Long64_t offset = elem->GetTDSetOffset();
Long64_t *arr = aList->GetList();
Int_t num = aList->GetN();
if (arr && offset)
for (int i = 0; i < num; i++)
arr[i] += offset;
mainList->Add(aList);
}
delete lists;
}
TObject *obj;
while( (obj = next()) ) {
PDB(kOutput,2) Info("StoreOutput","Find '%s'", obj->GetName() );
TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
if ( list == 0 ) {
PDB(kOutput,2) Info("StoreOutput","List not Found (creating)", obj->GetName() );
list = new TList;
list->SetName( obj->GetName() );
list->SetOwner();
fOutputLists->Add( list );
}
list->Add( obj );
}
delete out;
PDB(kOutput,1) Info("StoreOutput","Leave");
}
TList *TProofPlayerRemote::MergeFeedback()
{
PDB(kFeedback,1)
Info("MergeFeedback","Enter");
if ( fFeedbackLists == 0 ) {
PDB(kFeedback,1)
Info("MergeFeedback","Leave (no output)");
return 0;
}
TList *fb = new TList;
fb->SetOwner();
TIter next(fFeedbackLists);
TMap *map;
while ( (map = (TMap*) next()) ) {
TList *list = new TList;
TIter keys(map);
#ifndef R__TH1MERGEFIXED
Int_t nbmx = -1;
TObject *oref = 0;
#endif
while ( TObject *key = keys() ) {
list->Add(map->GetValue(key));
#ifndef R__TH1MERGEFIXED
TObject *o = map->GetValue(key);
if (o->InheritsFrom("TH1") && !strncmp(o->GetName(),"PROOF_",6)) {
if (((TH1 *)o)->GetNbinsX() > nbmx) {
nbmx= ((TH1 *)o)->GetNbinsX();
oref = o;
}
}
#endif
}
#ifdef R__TH1MERGEFIXED
TObject *obj = list->First();
#else
TObject *obj = (oref) ? oref : list->First();
#endif
list->Remove(obj);
obj = obj->Clone();
fb->Add(obj);
if ( list->IsEmpty() ) {
delete list;
continue;
}
TMethodCall callEnv;
if (obj->IsA())
callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
if (callEnv.IsValid()) {
callEnv.SetParam((Long_t) list);
callEnv.Execute(obj);
} else {
while ( (obj = list->First()) ) {
fb->Add(obj->Clone());
list->Remove(obj);
}
}
delete list;
}
PDB(kFeedback,1)
Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
return fb;
}
void TProofPlayerRemote::StoreFeedback(TObject *slave, TList *out)
{
PDB(kFeedback,1)
Info("StoreFeedback","Enter");
if ( out == 0 ) {
PDB(kFeedback,1)
Info("StoreFeedback","Leave (empty)");
return;
}
if ( IsClient() ) {
Feedback(out);
delete out;
return;
}
if (fFeedbackLists == 0) {
PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
fFeedbackLists = new TList;
fFeedbackLists->SetOwner();
}
TIter next(out);
out->SetOwner(kFALSE);
TObject *obj;
while( (obj = next()) ) {
PDB(kFeedback,2)
Info("StoreFeedback","Find '%s'", obj->GetName() );
TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
if ( map == 0 ) {
PDB(kFeedback,2)
Info("StoreFeedback","Map not Found (creating)", obj->GetName() );
map = new TMap;
map->SetName(obj->GetName());
fFeedbackLists->Add(map);
} else {
PDB(kFeedback,2)
Info("StoreFeedback","removing previous value");
if (map->GetValue(slave))
delete map->GetValue(slave);
map->Remove(slave);
}
map->Add(slave, obj);
}
delete out;
PDB(kFeedback,1)
Info("StoreFeedback","Leave");
}
void TProofPlayerRemote::SetupFeedback()
{
if ( IsClient() ) return;
fFeedback = (TList*) fInput->FindObject("FeedbackList");
PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
fFeedback == 0 ? "NOT ":"");
if (fFeedback == 0) return;
TParameter<Int_t> *par = 0;
TObject *obj = fInput->FindObject("PROOF_FeedbackPeriod");
Int_t period = 2000;
if (obj && (par = dynamic_cast<TParameter<Int_t>*>(obj)))
period = par->GetVal();
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(period, kTRUE);
}
void TProofPlayerRemote::StopFeedback()
{
if (fFeedbackTimer == 0) return;
PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
delete fFeedbackTimer; fFeedbackTimer = 0;
}
Bool_t TProofPlayerRemote::HandleTimer(TTimer *)
{
PDB(kFeedback,2) Info("HandleTimer","Entry");
R__ASSERT( !IsClient() );
if ( fFeedbackTimer == 0 ) return kFALSE;
TList *fb = new TList;
fb->SetOwner();
TIter next(fFeedback);
while( TObjString *name = (TObjString*) next() ) {
TObject *o = fOutput->FindObject(name->GetName());
if (o != 0) fb->Add(o->Clone());
}
if (fb->GetSize() > 0)
StoreFeedback(this, fb);
else
delete fb;
if ( fFeedbackLists == 0 ) {
fFeedbackTimer->Start(500,kTRUE);
return kFALSE;
}
fb = MergeFeedback();
PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
TMessage m(kPROOF_FEEDBACK);
m << fb;
gProofServ->GetSocket()->Send(m);
delete fb;
fFeedbackTimer->Start(500,kTRUE);
return kFALSE;
}
TDSetElement *TProofPlayerRemote::GetNextPacket(TSlave *slave, TMessage *r)
{
TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
if (e == 0) {
PDB(kPacketizer,2) Info("GetNextPacket","Done");
} else if (e == (TDSetElement*) -1) {
PDB(kPacketizer,2) Info("GetNextPacket","Waiting");
} else {
PDB(kPacketizer,2)
Info("GetNextPacket","To slave-%d (%s): '%s' '%s' '%s' %lld %lld",
slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
}
return e;
}
Bool_t TProofPlayerRemote::IsClient() const
{
return !fProof->IsMaster();
}
Long64_t TProofPlayerRemote::DrawSelect(TDSet *set, const char *varexp,
const char *selection, Option_t *option,
Long64_t nentries, Long64_t firstentry)
{
TTreeDrawArgsParser info;
info.Parse(varexp, selection, option);
TString selector = info.GetProofSelectorName();
TNamed *varexpobj = new TNamed("varexp", varexp);
TNamed *selectionobj = new TNamed("selection", selection);
TList *fb = (TList*) fInput->FindObject("FeedbackList");
if (fb)
fInput->Remove(fb);
fInput->Clear();
if (fb)
fInput->Add(fb);
fInput->Add(varexpobj);
fInput->Add(selectionobj);
if (info.GetObjectName() == "")
info.SetObjectName("htemp");
fProof->AddFeedback(info.GetObjectName());
Long64_t r = Process(set, selector, option, nentries, firstentry);
fProof->RemoveFeedback(info.GetObjectName());
fInput->Remove(varexpobj);
fInput->Remove(selectionobj);
if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
fInput->Remove(opt);
delete opt;
}
delete varexpobj;
delete selectionobj;
return r;
}
ClassImp(TProofPlayerSlave)
void TProofPlayerSlave::SetupFeedback()
{
TList *fb = (TList*) fInput->FindObject("FeedbackList");
PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
fb == 0 ? "NOT ":"");
if (fb == 0) return;
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(500,kFALSE);
fFeedback = fb;
}
void TProofPlayerSlave::StopFeedback()
{
if (fFeedbackTimer == 0) return;
PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
fFeedbackTimer->Stop();
delete fFeedbackTimer;
fFeedbackTimer = 0;
}
Bool_t TProofPlayerSlave::HandleTimer(TTimer *)
{
PDB(kFeedback,2) Info("HandleTimer","Entry");
if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsParallel()) {
TMessage m(kPROOF_PROGRESS);
m << fTotalEvents << fEventsProcessed;
gProofServ->GetSocket()->Send(m);
}
if ( fFeedback == 0 ) return kFALSE;
TList *fb = new TList;
fb->SetOwner(kFALSE);
if (fOutput == 0) {
fOutput = fSelector->GetOutputList();
}
if (fOutput) {
TIter next(fFeedback);
while( TObjString *name = (TObjString*) next() ) {
TObject *o = fOutput->FindObject(name->GetName());
if (o != 0) fb->Add(o);
}
}
PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
TMessage m(kPROOF_FEEDBACK);
m << fb;
gProofServ->GetSocket()->Send(m);
delete fb;
fFeedbackTimer->Start(500,kTRUE);
return kFALSE;
}
Long64_t TProofPlayerSlave::DrawSelect(TDSet * , const char * ,
const char * , Option_t * ,
Long64_t , Long64_t )
{
MayNotUse("DrawSelect");
return -1;
}
ClassImp(TProofPlayerSuperMaster)
Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first, TEventList * )
{
fEventsProcessed = 0;
PDB(kGlobal,1) Info("Process","Enter");
TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
if (!proof) return -1;
delete fOutput;
fOutput = new TList;
TPerfStats::Start(fInput, fOutput);
if (!SendSelector(selector_file)) {
Error("Process", "sending selector %s", selector_file);
return -1;
}
TCleanup clean(this);
SetupFeedback();
if (proof->IsMaster()) {
if (!dset->ElementsValid()) {
proof->ValidateDSet(dset);
if (!dset->ElementsValid()) {
Error("Process", "could not validate TDSet");
return -1;
}
}
TList msds;
msds.SetOwner();
TList keyholder;
keyholder.SetOwner();
TList valueholder;
valueholder.SetOwner();
TIter nextslave(proof->GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
TList *submasters = 0;
TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
if (!msd) {
submasters = new TList;
submasters->SetName(sl->GetMsd());
keyholder.Add(submasters);
TList *setelements = new TSortedList(kSortDescending);
setelements->SetName(TString(sl->GetMsd())+"_Elements");
valueholder.Add(setelements);
msds.Add(new TPair(submasters, setelements));
} else {
submasters = dynamic_cast<TList*>(msd->Key());
}
submasters->Add(sl);
}
Long64_t cur = 0;
TIter nextelement(dset->GetListOfElements());
while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
if (elem->GetNum()<1) continue;
if (nentries !=-1 && cur>=first+nentries) {
break;
}
if (cur+elem->GetNum()-1<first) {
cur+=elem->GetNum();
continue;
}
if (cur<first) {
elem->SetNum(elem->GetNum()-(first-cur));
elem->SetFirst(elem->GetFirst()+first-cur);
cur=first;
}
if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
cur+=elem->GetNum();
} else {
elem->SetNum(first+nentries-cur);
cur=first+nentries;
}
TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
if (!msd) {
Error("Process", "data requires mass storage domain '%s'"
" which is not accessible in this proof session",
elem->GetMsd());
return -1;
} else {
TList *elements = dynamic_cast<TList*>(msd->Value());
elements->Add(elem);
}
}
TList usedmasters;
TIter nextmsd(msds.MakeIterator());
while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
TList *submasters = dynamic_cast<TList*>(msd->Key());
TList *setelements = dynamic_cast<TList*>(msd->Value());
Int_t nmasters = submasters->GetSize();
Int_t nelements = setelements->GetSize();
for (Int_t i=0; i<nmasters; i++) {
Long64_t nentries = 0;
TDSet set(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
for (Int_t j = (i*nelements)/nmasters;
j < ((i+1)*nelements)/nmasters;
j++) {
TDSetElement *elem =
dynamic_cast<TDSetElement*>(setelements->At(j));
set.Add(elem->GetFileName(), elem->GetObjName(),
elem->GetDirectory(), elem->GetFirst(),
elem->GetNum(), elem->GetMsd());
nentries+=elem->GetNum();
}
if (set.GetListOfElements()->GetSize()>0) {
TMessage mesg(kPROOF_PROCESS);
TString fn(gSystem->BaseName(selector_file));
TString opt = option;
mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
PDB(kGlobal,1) Info("Process",
"Sending TDSet with %d elements to submaster %s",
set.GetListOfElements()->GetSize(),
sl->GetOrdinal());
sl->GetSocket()->Send(mesg);
usedmasters.Add(sl);
fSlaves.AddLast(sl);
fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
fSlaveProgress[fSlaveProgress.GetSize()-1]=0;
fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
fSlaveTotals[fSlaveTotals.GetSize()-1]=nentries;
}
}
}
if ( !IsClient() ) HandleTimer(0);
PDB(kGlobal,1) Info("Process","Calling Collect");
proof->Collect(&usedmasters);
HandleTimer(0);
}
StopFeedback();
PDB(kGlobal,1) Info("Process","Calling Merge Output");
MergeOutput();
TPerfStats::Stop();
return 0;
}
void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total, Long64_t processed)
{
Int_t idx = fSlaves.IndexOf(sl);
fSlaveProgress[idx] = processed;
if (fSlaveTotals[idx] != total)
Warning("Progress", "total events has changed for slave %s", sl->GetName());
fSlaveTotals[idx] = total;
Long64_t tot = 0;
Int_t i;
for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
Long64_t proc = 0;
for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
Progress(tot, proc);
}
Bool_t TProofPlayerSuperMaster::HandleTimer(TTimer *)
{
if (fFeedbackTimer == 0) return kFALSE;
Long64_t tot = 0;
Int_t i;
for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
Long64_t proc = 0;
for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
TMessage m(kPROOF_PROGRESS);
m << tot << proc;
gProofServ->GetSocket()->Send(m);
if (fReturnFeedback)
return TProofPlayerRemote::HandleTimer(0);
else
return kFALSE;
}
void TProofPlayerSuperMaster::SetupFeedback()
{
if (IsClient()) return;
TProofPlayerRemote::SetupFeedback();
if (fFeedbackTimer) {
fReturnFeedback = kTRUE;
return;
} else {
fReturnFeedback = kFALSE;
}
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(500,kFALSE);
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.