#ifndef ROOT_TProofPlayer
#define ROOT_TProofPlayer
#ifndef ROOT_TVirtualProofPlayer
#include "TVirtualProofPlayer.h"
#endif
#ifndef ROOT_TArrayL64
#include "TArrayL64.h"
#endif
#ifndef ROOT_TArrayF
#include "TArrayF.h"
#endif
#ifndef ROOT_TArrayI
#include "TArrayI.h"
#endif
#ifndef ROOT_TList
#include "TList.h"
#endif
#ifndef ROOT_TSystem
#include "TSystem.h"
#endif
#ifndef ROOT_TQueryResult
#include "TQueryResult.h"
#endif
#ifndef ROOT_TProofProgressStatus
#include "TProofProgressStatus.h"
#endif
#ifndef ROOT_TError
#include "TError.h"
#endif
class TSelector;
class TSocket;
class TVirtualPacketizer;
class TSlave;
class TEventIter;
class TProofStats;
class TMutex;
class TStatus;
class TTimer;
class THashList;
class TH1;
class TFile;
class TStopwatch;
class TProofPlayer : public TVirtualProofPlayer {
private:
TList *fAutoBins;
protected:
TList *fInput;
THashList *fOutput;
TSelector *fSelector;
Bool_t fCreateSelObj;
TClass *fSelectorClass;
TTimer *fFeedbackTimer;
Long_t fFeedbackPeriod;
TEventIter *fEvIter;
TStatus *fSelStatus;
EExitStatus fExitStatus;
Long64_t fTotalEvents;
TProofProgressStatus *fProgressStatus;
Long64_t fReadBytesRun;
Long64_t fReadCallsRun;
Long64_t fProcessedRun;
TList *fQueryResults;
TQueryResult *fQuery;
TQueryResult *fPreviousQuery;
Int_t fDrawQueries;
Int_t fMaxDrawQueries;
TTimer *fStopTimer;
TMutex *fStopTimerMtx;
TTimer *fDispatchTimer;
TTimer *fProcTimeTimer;
TStopwatch *fProcTime;
TString fOutputFilePath;
TFile *fOutputFile;
Long_t fSaveMemThreshold;
Bool_t fSavePartialResults;
Bool_t fSaveResultsPerPacket;
static THashList *fgDrawInputPars;
void *GetSender() { return this; }
virtual Int_t DrawCanvas(TObject *obj);
virtual void SetupFeedback();
virtual void MergeOutput(Bool_t savememvalues = kFALSE);
public:
virtual void StopFeedback();
protected:
class TCleanup {
private:
TProofPlayer *fPlayer;
public:
TCleanup(TProofPlayer *p) : fPlayer(p) { }
~TCleanup() { fPlayer->StopFeedback(); }
};
Int_t AssertSelector(const char *selector_file);
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg);
void MapOutputListToDataMembers() const;
public:
enum EStatusBits { kDispatchOneEvent = BIT(15), kIsProcessing = BIT(16),
kMaxProcTimeReached = BIT(17), kMaxProcTimeExtended = BIT(18) };
TProofPlayer(TProof *proof = 0);
virtual ~TProofPlayer();
Long64_t Process(TDSet *set,
const char *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
Long64_t Process(TDSet *set,
TSelector *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
virtual Bool_t JoinProcess(TList *workers);
TVirtualPacketizer *GetPacketizer() const { return 0; }
Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
Long64_t Finalize(TQueryResult *qr);
Long64_t DrawSelect(TDSet *set, const char *varexp,
const char *selection, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt,
TString &selector, TString &objname);
void HandleGetTreeHeader(TMessage *mess);
void HandleRecvHisto(TMessage *mess);
void FeedBackCanvas(const char *name, Bool_t create);
void StopProcess(Bool_t abort, Int_t timeout = -1);
void AddInput(TObject *inp);
void ClearInput();
TObject *GetOutput(const char *name) const;
TList *GetOutputList() const;
TList *GetInputList() const { return fInput; }
TList *GetListOfResults() const { return fQueryResults; }
void AddQueryResult(TQueryResult *q);
TQueryResult *GetCurrentQuery() const { return fQuery; }
TQueryResult *GetQueryResult(const char *ref);
void RemoveQueryResult(const char *ref);
void SetCurrentQuery(TQueryResult *q);
void SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
void RestorePreviousQuery() { fQuery = fPreviousQuery; }
Int_t AddOutputObject(TObject *obj);
void AddOutput(TList *out);
void StoreOutput(TList *out);
void StoreFeedback(TObject *slave, TList *out);
void Progress(Long64_t total, Long64_t processed);
void Progress(TSlave *, Long64_t total, Long64_t processed)
{ Progress(total, processed); }
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti);
void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{ Progress(total, processed, bytesread, initTime, procTime,
evtrti, mbrti); }
void Progress(TProofProgressInfo *pi);
void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); }
void Feedback(TList *objs);
TDrawFeedback *CreateDrawFeedback(TProof *p);
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
void DeleteDrawFeedback(TDrawFeedback *f);
TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
Int_t ReinitSelector(TQueryResult *qr);
void UpdateAutoBin(const char *name,
Double_t& xmin, Double_t& xmax,
Double_t& ymin, Double_t& ymax,
Double_t& zmin, Double_t& zmax);
Bool_t IsClient() const { return kFALSE; }
void SetExitStatus(EExitStatus st) { fExitStatus = st; }
EExitStatus GetExitStatus() const { return fExitStatus; }
Long64_t GetEventsProcessed() const { return fProgressStatus->GetEntries(); }
void AddEventsProcessed(Long64_t ev) { fProgressStatus->IncEntries(ev); }
void SetDispatchTimer(Bool_t on = kTRUE);
void SetStopTimer(Bool_t on = kTRUE,
Bool_t abort = kFALSE, Int_t timeout = 0);
virtual void SetInitTime() { }
virtual void SetMerging(Bool_t = kTRUE) { }
Long64_t GetCacheSize();
Int_t GetLearnEntries();
void SetOutputFilePath(const char *fp) { fOutputFilePath = fp; }
Int_t SavePartialResults(Bool_t queryend = kFALSE, Bool_t force = kFALSE);
void SetProcessing(Bool_t on = kTRUE);
TProofProgressStatus *GetProgressStatus() const { return fProgressStatus; }
void UpdateProgressInfo();
ClassDef(TProofPlayer,0)
};
class TProofPlayerLocal : public TProofPlayer {
private:
Bool_t fIsClient;
protected:
void SetupFeedback() { }
void StopFeedback() { }
public:
TProofPlayerLocal(Bool_t client = kTRUE) : fIsClient(client) { }
virtual ~TProofPlayerLocal() { }
Bool_t IsClient() const { return fIsClient; }
Long64_t Process(const char *selector, Long64_t nentries = -1, Option_t *option = "");
Long64_t Process(TSelector *selector, Long64_t nentries = -1, Option_t *option = "");
Long64_t Process(TDSet *set,
const char *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0) {
return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
Long64_t Process(TDSet *set,
TSelector *selector, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0) {
return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
ClassDef(TProofPlayerLocal,0)
};
class TProofPlayerRemote : public TProofPlayer {
protected:
TProof *fProof;
TList *fOutputLists;
TList *fFeedback;
TList *fFeedbackLists;
TVirtualPacketizer *fPacketizer;
Bool_t fMergeFiles;
TDSet *fDSet;
ErrorHandlerFunc_t fErrorHandler;
Bool_t fMergeTH1OneByOne;
TH1 *fProcPackets;
TMessage *fProcessMessage;
TString fSelectorFileName;
TStopwatch *fMergeSTW;
Int_t fNumMergers;
virtual Bool_t HandleTimer(TTimer *timer);
Int_t InitPacketizer(TDSet *dset, Long64_t nentries,
Long64_t first, const char *defpackunit,
const char *defpackdata);
TList *MergeFeedback();
Bool_t MergeOutputFiles();
void NotifyMemory(TObject *obj);
void SetLastMergingMsg(TObject *obj);
virtual Bool_t SendSelector(const char *selector_file);
TProof *GetProof() const { return fProof; }
void SetupFeedback();
void StopFeedback();
void SetSelectorDataMembersFromOutputList();
public:
TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
fFeedbackLists(0), fPacketizer(0),
fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0),
fMergeTH1OneByOne(kTRUE), fProcPackets(0),
fProcessMessage(0), fMergeSTW(0), fNumMergers(0)
{ fProgressStatus = new TProofProgressStatus(); }
virtual ~TProofPlayerRemote();
virtual Long64_t Process(TDSet *set, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Long64_t Process(TDSet *set, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Bool_t JoinProcess(TList *workers);
virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
virtual Long64_t Finalize(TQueryResult *qr);
Long64_t DrawSelect(TDSet *set, const char *varexp,
const char *selection, Option_t *option = "",
Long64_t nentries = -1, Long64_t firstentry = 0);
void RedirectOutput(Bool_t on = kTRUE);
void StopProcess(Bool_t abort, Int_t timeout = -1);
void StoreOutput(TList *out);
virtual void StoreFeedback(TObject *slave, TList *out);
Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged);
TObject *HandleHistogram(TObject *obj, Bool_t &merged);
Bool_t HistoSameAxis(TH1 *h0, TH1 *h1);
Int_t AddOutputObject(TObject *obj);
void AddOutput(TList *out);
virtual void MergeOutput(Bool_t savememvalues = kFALSE);
void Progress(Long64_t total, Long64_t processed);
void Progress(TSlave*, Long64_t total, Long64_t processed)
{ Progress(total, processed); }
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti);
void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{ Progress(total, processed, bytesread, initTime, procTime,
evtrti, mbrti); }
void Progress(TProofProgressInfo *pi);
void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); }
void Feedback(TList *objs);
TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
TVirtualPacketizer *GetPacketizer() const { return fPacketizer; }
Bool_t IsClient() const;
void SetInitTime();
void SetMerging(Bool_t on = kTRUE);
ClassDef(TProofPlayerRemote,0)
};
class TProofPlayerSlave : public TProofPlayer {
private:
TSocket *fSocket;
TList *fFeedback;
Bool_t HandleTimer(TTimer *timer);
protected:
void SetupFeedback();
void StopFeedback();
public:
TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }
void HandleGetTreeHeader(TMessage *mess);
ClassDef(TProofPlayerSlave,0)
};
class TProofPlayerSuperMaster : public TProofPlayerRemote {
private:
TArrayL64 fSlaveProgress;
TArrayL64 fSlaveTotals;
TArrayL64 fSlaveBytesRead;
TArrayF fSlaveInitTime;
TArrayF fSlaveProcTime;
TArrayF fSlaveEvtRti;
TArrayF fSlaveMBRti;
TArrayI fSlaveActW;
TArrayI fSlaveTotS;
TArrayF fSlaveEffS;
TList fSlaves;
Bool_t fReturnFeedback;
protected:
Bool_t HandleTimer(TTimer *timer);
void SetupFeedback();
public:
TProofPlayerSuperMaster(TProof *proof = 0) :
TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
virtual ~TProofPlayerSuperMaster() { }
Long64_t Process(TDSet *set, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
Long64_t Process(TDSet *set, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0)
{ return TProofPlayerRemote::Process(set, selector, option,
nentries, firstentry); }
void Progress(Long64_t total, Long64_t processed)
{ TProofPlayerRemote::Progress(total, processed); }
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti)
{ TProofPlayerRemote::Progress(total, processed, bytesread,
initTime, procTime, evtrti, mbrti); }
void Progress(TProofProgressInfo *pi) { TProofPlayerRemote::Progress(pi); }
void Progress(TSlave *sl, Long64_t total, Long64_t processed);
void Progress(TSlave *sl, Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti);
void Progress(TSlave *sl, TProofProgressInfo *pi);
ClassDef(TProofPlayerSuperMaster,0)
};
#endif