#ifndef ROOT_TProof
#define ROOT_TProof
#ifndef ROOT_TVirtualProof
#include "TVirtualProof.h"
#endif
#ifndef ROOT_TProofDebug
#include "TProofDebug.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_MessageTypes
#include "MessageTypes.h"
#endif
#ifndef ROOT_TMD5
#include "TMD5.h"
#endif
#ifndef ROOT_TSocket
#include "TSocket.h"
#endif
#ifndef ROOT_TSysEvtHandler
#include "TSysEvtHandler.h"
#endif
#ifndef ROOT_TThread
#include "TThread.h"
#endif
#ifndef ROOT_TUrl
#include "TUrl.h"
#endif
#include <map>
#ifdef R__GLOBALSTL
namespace std { using ::map; }
#endif
#define CANNOTUSE(x) Info(x,"Not manager: cannot use this method")
class TMessage;
class TMonitor;
class TSignalHandler;
class TPluginHandler;
class TSlave;
class TProofServ;
class TProofInputHandler;
class TProofInterruptHandler;
class TProofPlayer;
class TProofPlayerRemote;
class TProofProgressDialog;
class TCondor;
class TTree;
class TDrawFeedback;
class TDSet;
class TSemaphore;
class TCondorSlave;
class TList;
class TProof;
class TVirtualMutex;
const Int_t kPROOF_Protocol = 10;
const Int_t kPROOF_Port = 1093;
const char* const kPROOF_ConfFile = "proof.conf";
const char* const kPROOF_ConfDir = "/usr/local/root";
const char* const kPROOF_WorkDir = "~/proof";
const char* const kPROOF_CacheDir = "cache";
const char* const kPROOF_PackDir = "packages";
const char* const kPROOF_QueryDir = "queries";
const char* const kPROOF_DataSetDir = "datasets";
const char* const kPROOF_CacheLockFile = "/tmp/proof-cache-lock-";
const char* const kPROOF_PackageLockFile = "/tmp/proof-package-lock-";
const char* const kPROOF_QueryLockFile = "/tmp/proof-query-lock-";
const char* const kPROOF_DataSetLockFile = "/tmp/proof-dataset-lock-";
R__EXTERN TVirtualMutex *gProofMutex;
class TProofThreadArg {
public:
TUrl *fUrl;
TString fOrd;
Int_t fPerf;
TString fImage;
TString fWorkdir;
TString fMsd;
TList *fSlaves;
TProof *fProof;
TCondorSlave *fCslave;
TList *fClaims;
Int_t fType;
TProofThreadArg(const char *h, Int_t po, const char *o, Int_t pe,
const char *i, const char *w,
TList *s, TProof *prf);
TProofThreadArg(TCondorSlave *csl, TList *clist,
TList *s, TProof *prf);
TProofThreadArg(const char *h, Int_t po, const char *o,
const char *i, const char *w, const char *m,
TList *s, TProof *prf);
virtual ~TProofThreadArg() { if (fUrl) delete fUrl; }
};
class TProofThread {
public:
TThread *fThread;
TProofThreadArg *fArgs;
TProofThread(TThread *t, TProofThreadArg *a) { fThread = t; fArgs = a; }
virtual ~TProofThread() { SafeDelete(fThread); SafeDelete(fArgs); }
};
class TProofInterruptHandler : public TSignalHandler {
private:
TProof *fProof;
public:
TProofInterruptHandler(TProof *p)
: TSignalHandler(kSigInterrupt, kFALSE), fProof(p) { }
Bool_t Notify();
};
class TProofInputHandler : public TFileHandler {
private:
TSocket *fSocket;
TProof *fProof;
public:
TProofInputHandler(TProof *p, TSocket *s)
: TFileHandler(s->GetDescriptor(), 1) { fProof = p; fSocket = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
class TSlaveInfo : public TObject {
public:
enum ESlaveStatus { kActive, kNotActive, kBad };
TString fOrdinal;
TString fHostName;
TString fMsd;
Int_t fPerfIndex;
ESlaveStatus fStatus;
TSlaveInfo(const char *ordinal = "", const char *host = "", Int_t perfidx = 0)
: fOrdinal(ordinal), fHostName(host), fPerfIndex(perfidx),
fStatus(kNotActive) { }
const char *GetName() const { return fHostName; }
const char *GetOrdinal() const { return fOrdinal; }
void SetStatus(ESlaveStatus stat) { fStatus = stat; }
Int_t Compare(const TObject *obj) const;
Bool_t IsSortable() const { return kTRUE; }
void Print(Option_t *option="") const;
ClassDef(TSlaveInfo,2)
};
class TProof : public TVirtualProof {
friend class TPacketizer;
friend class TPacketizerDev;
friend class TProofServ;
friend class TProofInputHandler;
friend class TProofInterruptHandler;
friend class TProofPlayer;
friend class TProofPlayerRemote;
friend class TProofProgressDialog;
friend class TSlave;
friend class TXSlave;
friend class TXSocket;
friend class TXSocketHandler;
friend class TXProofMgr;
friend class TXProofServ;
private:
enum EUrgent {
kLocalInterrupt = -1,
kPing = 0,
kHardInterrupt = 1,
kSoftInterrupt,
kShutdownInterrupt
};
enum EProofCacheCommands {
kShowCache = 1,
kClearCache = 2,
kShowPackages = 3,
kClearPackages = 4,
kClearPackage = 5,
kBuildPackage = 6,
kLoadPackage = 7,
kShowEnabledPackages = 8,
kShowSubCache = 9,
kClearSubCache = 10,
kShowSubPackages = 11,
kDisableSubPackages = 12,
kDisableSubPackage = 13,
kBuildSubPackage = 14,
kUnloadPackage = 15,
kDisablePackage = 16,
kUnloadPackages = 17,
kDisablePackages = 18,
kListPackages = 19,
kListEnabledPackages = 20
};
enum EProofDataSetCommands {
kUploadDataSet = 1,
kCheckDataSetName,
kGetDataSets,
kCreateDataSet,
kGetDataSet,
kVerifyDataSet,
kRemoveDataSet,
kAppendDataSet
};
enum ESendFileOpt {
kAscii = 0x0,
kBinary = 0x1,
kForce = 0x2,
kForward = 0x4
};
enum EProofWrkListAction {
kActivateWorker = 1,
kDeactivateWorker = 2
};
Bool_t fValid;
TString fMaster;
TString fWorkDir;
Int_t fLogLevel;
Int_t fStatus;
TList *fSlaveInfo;
Bool_t fMasterServ;
Bool_t fSendGroupView;
TList *fActiveSlaves;
TList *fInactiveSlaves;
TList *fUniqueSlaves;
TList *fNonUniqueMasters;
TMonitor *fActiveMonitor;
TMonitor *fUniqueMonitor;
TMonitor *fCurrentMonitor;
Long64_t fBytesRead;
Float_t fRealTime;
Float_t fCpuTime;
TSignalHandler *fIntHandler;
TPluginHandler *fProgressDialog;
Bool_t fProgressDialogStarted;
TProofPlayer *fPlayer;
TList *fFeedback;
TList *fChains;
struct MD5Mod_t {
TMD5 fMD5;
Long_t fModtime;
};
typedef std::map<TString, MD5Mod_t> FileMap_t;
FileMap_t fFileMap;
TDSet *fDSet;
Bool_t fIdle;
Bool_t fSync;
Bool_t fRedirLog;
TString fLogFileName;
FILE *fLogFileW;
FILE *fLogFileR;
Bool_t fLogToWindowOnly;
TList *fWaitingSlaves;
TList *fQueries;
Int_t fOtherQueries;
Int_t fDrawQueries;
Int_t fMaxDrawQueries;
Int_t fSeqNum;
Int_t fSessionID;
Bool_t fEndMaster;
protected:
enum ESlaves { kAll, kActive, kUnique };
TUrl fUrl;
TString fConfFile;
TString fConfDir;
TString fImage;
Int_t fProtocol;
TList *fSlaves;
TList *fBadSlaves;
TMonitor *fAllMonitor;
Bool_t fDataReady;
Long64_t fBytesReady;
Long64_t fTotalBytes;
TList *fAvailablePackages;
TList *fEnabledPackages;
static TSemaphore *fgSemaphore;
private:
TProof(const TProof &);
void operator=(const TProof &);
void CleanGDirectory(TList *ol);
Int_t Exec(const char *cmd, ESlaves list);
Int_t SendCommand(const char *cmd, ESlaves list = kActive);
Int_t SendCurrentState(ESlaves list = kActive);
Bool_t CheckFile(const char *file, TSlave *sl, Long_t modtime);
Int_t SendFile(const char *file, Int_t opt = (kBinary | kForward),
const char *rfile = 0, TSlave *sl = 0);
Int_t SendObject(const TObject *obj, ESlaves list = kActive);
Int_t SendGroupView();
Int_t SendInitialState();
Int_t SendPrint(Option_t *option="");
Int_t Ping(ESlaves list);
void Interrupt(EUrgent type, ESlaves list = kActive);
void AskStatistics();
void AskParallel();
Int_t GoParallel(Int_t nodes, Bool_t accept = kFALSE);
void RecvLogFile(TSocket *s, Int_t size);
Int_t BuildPackage(const char *package);
Int_t LoadPackage(const char *package);
Int_t UnloadPackage(const char *package);
Int_t UnloadPackages();
Int_t DisablePackage(const char *package);
Int_t DisablePackages();
void Activate(TList *slaves = 0);
Int_t Broadcast(const TMessage &mess, TList *slaves);
Int_t Broadcast(const TMessage &mess, ESlaves list = kActive);
Int_t Broadcast(const char *mess, Int_t kind, TList *slaves);
Int_t Broadcast(const char *mess, Int_t kind = kMESS_STRING, ESlaves list = kActive);
Int_t Broadcast(Int_t kind, TList *slaves) { return Broadcast(0, kind, slaves); }
Int_t Broadcast(Int_t kind, ESlaves list = kActive) { return Broadcast(0, kind, list); }
Int_t BroadcastObject(const TObject *obj, Int_t kind, TList *slaves);
Int_t BroadcastObject(const TObject *obj, Int_t kind = kMESS_OBJECT, ESlaves list = kActive);
Int_t BroadcastRaw(const void *buffer, Int_t length, TList *slaves);
Int_t BroadcastRaw(const void *buffer, Int_t length, ESlaves list = kActive);
Int_t Collect(const TSlave *sl, Long_t timeout = -1);
Int_t Collect(TMonitor *mon, Long_t timeout = -1);
Int_t CollectInputFrom(TSocket *s);
void FindUniqueSlaves();
TSlave *FindSlave(TSocket *s) const;
TList *GetListOfSlaves() const { return fSlaves; }
TList *GetListOfInactiveSlaves() const { return fInactiveSlaves; }
TList *GetListOfUniqueSlaves() const { return fUniqueSlaves; }
TList *GetListOfBadSlaves() const { return fBadSlaves; }
Int_t GetNumberOfSlaves() const;
Int_t GetNumberOfActiveSlaves() const;
Int_t GetNumberOfInactiveSlaves() const;
Int_t GetNumberOfUniqueSlaves() const;
Int_t GetNumberOfBadSlaves() const;
Bool_t IsEndMaster() const { return fEndMaster; }
void ModifyWorkerLists(const char *ord, Bool_t add);
Bool_t IsSync() const { return fSync; }
void InterruptCurrentMonitor();
void MarkBad(TSlave *sl);
void MarkBad(TSocket *s);
void ActivateAsyncInput();
void DeActivateAsyncInput();
void HandleAsyncInput(TSocket *s);
Int_t GetQueryReference(Int_t qry, TString &ref);
protected:
TProof();
Int_t Init(const char *masterurl, const char *conffile,
const char *confdir, Int_t loglevel, const char *alias = 0);
virtual Bool_t StartSlaves(Bool_t parallel, Bool_t attach = kFALSE);
void SetPlayer(TProofPlayer *player) { fPlayer = player; };
TProofPlayer *GetPlayer() const { return fPlayer; };
virtual TProofPlayer *MakePlayer();
TList *GetListOfActiveSlaves() const { return fActiveSlaves; }
TSlave *CreateSlave(const char *url, const char *ord,
Int_t perf, const char *image, const char *workdir);
TSlave *CreateSubmaster(const char *url, const char *ord,
const char *image, const char *msd);
Int_t Collect(ESlaves list = kActive, Long_t timeout = -1);
Int_t Collect(TList *slaves, Long_t timeout = -1);
void SetDSet(TDSet *dset) { fDSet = dset; }
virtual void ValidateDSet(TDSet *dset);
TPluginHandler *GetProgressDialog() const { return fProgressDialog; };
static void *SlaveStartupThread(void *arg);
public:
TProof(const char *masterurl, const char *conffile = kPROOF_ConfFile,
const char *confdir = kPROOF_ConfDir, Int_t loglevel = 0, const char *alias = 0);
virtual ~TProof();
void cd(Int_t id = -1);
Int_t Ping();
Int_t Exec(const char *cmd);
Int_t Process(TDSet *set, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0, TEventList *evl = 0);
Int_t DrawSelect(TDSet *set, const char *varexp,
const char *selection = "",
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
Int_t Archive(Int_t query, const char *url);
Int_t Archive(const char *queryref, const char *url = 0);
Int_t CleanupSession(const char *sessiontag);
Int_t Finalize(Int_t query = -1, Bool_t force = kFALSE);
Int_t Finalize(const char *queryref, Bool_t force = kFALSE);
Int_t Remove(Int_t query);
Int_t Remove(const char *queryref, Bool_t all = kFALSE);
Int_t Retrieve(Int_t query, const char *path = 0);
Int_t Retrieve(const char *queryref, const char *path = 0);
void StopProcess(Bool_t abort, Int_t timeout = -1);
void AddInput(TObject *obj);
void Browse(TBrowser *b);
void ClearInput();
TObject *GetOutput(const char *name);
TList *GetOutputList();
Int_t SetParallel(Int_t nodes = 9999);
void SetLogLevel(Int_t level, UInt_t mask = TProofDebug::kAll);
void Close(Option_t *option="");
void Print(Option_t *option="") const;
void ShowCache(Bool_t all = kFALSE);
void ClearCache();
TList *GetListOfPackages();
TList *GetListOfEnabledPackages();
void ShowPackages(Bool_t all = kFALSE);
void ShowEnabledPackages(Bool_t all = kFALSE);
Int_t ClearPackages();
Int_t ClearPackage(const char *package);
Int_t EnablePackage(const char *package);
Int_t UploadPackage(const char *par, EUploadPackageOpt opt = kUntar);
Int_t AddDynamicPath(const char *libpath);
Int_t AddIncludePath(const char *incpath);
Int_t RemoveDynamicPath(const char *libpath);
Int_t RemoveIncludePath(const char *incpath);
Int_t UploadDataSet(const char *dataset,
TList *files,
const char *dest = 0,
Int_t opt = kAskUser,
TList *skippedFiles = 0);
Int_t UploadDataSet(const char *dataset,
const char *files,
const char *dest = 0,
Int_t opt = kAskUser,
TList *skippedFiles = 0);
Int_t UploadDataSetFromFile(const char *dataset,
const char *file,
const char *dest = 0,
Int_t opt = kAskUser);
Int_t CreateDataSet(const char *dataset,
TList *files,
Int_t opt = kAskUser);
TList *GetDataSets(const char *dir = 0);
void ShowDataSets(const char *dir = 0);
void ShowDataSet(const char *dataset);
Int_t RemoveDataSet(const char *dateset);
Int_t VerifyDataSet(const char *dataset);
TList *GetDataSet(const char *dataset);
const char *GetMaster() const { return fMaster; }
const char *GetConfDir() const { return fConfDir; }
const char *GetConfFile() const { return fConfFile; }
const char *GetUser() const { return fUrl.GetUser(); }
const char *GetWorkDir() const { return fWorkDir; }
const char *GetImage() const { return fImage; }
const char *GetUrl() { return fUrl.GetUrl(); }
Int_t GetPort() const { return fUrl.GetPort(); }
Int_t GetRemoteProtocol() const { return fProtocol; }
Int_t GetClientProtocol() const { return kPROOF_Protocol; }
Int_t GetStatus() const { return fStatus; }
Int_t GetLogLevel() const { return fLogLevel; }
Int_t GetParallel() const;
Int_t GetSessionID() const { return fSessionID; }
TList *GetSlaveInfo();
EQueryMode GetQueryMode() const;
EQueryMode GetQueryMode(Option_t *mode) const;
void SetQueryMode(EQueryMode mode);
Long64_t GetBytesRead() const { return fBytesRead; }
Float_t GetRealTime() const { return fRealTime; }
Float_t GetCpuTime() const { return fCpuTime; }
Bool_t IsFolder() const { return kTRUE; }
Bool_t IsMaster() const { return fMasterServ; }
Bool_t IsValid() const { return fValid; }
Bool_t IsParallel() const { return GetParallel() > 0 ? kTRUE : kFALSE; }
Bool_t IsIdle() const { return fIdle; }
void AddFeedback(const char *name);
void RemoveFeedback(const char *name);
void ClearFeedback();
void ShowFeedback() const;
TList *GetFeedbackList() const;
TList *GetListOfQueries(Option_t *opt = "");
Int_t GetNumberOfQueries();
Int_t GetNumberOfDrawQueries() { return fDrawQueries; }
TList *GetQueryResults();
TQueryResult *GetQueryResult(const char *ref);
void GetMaxQueries();
void SetMaxDrawQueries(Int_t max);
void ShowQueries(Option_t *opt = "");
Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready);
void SetActive(Bool_t = kTRUE) { }
void LogMessage(const char *msg, Bool_t all);
void Progress(Long64_t total, Long64_t processed);
void Feedback(TList *objs);
void QueryResultReady(const char *ref);
void CloseProgressDialog();
void ResetProgressDialog(const char *sel, Int_t sz,
Long64_t fst, Long64_t ent);
void StartupMessage(const char *msg, Bool_t status, Int_t done,
Int_t total);
void GetLog(Int_t start = -1, Int_t end = -1);
void PutLog(TQueryResult *qr);
void ShowLog(Int_t qry = -1);
void ShowLog(const char *queryref);
Bool_t SendingLogToWindow() const { return fLogToWindowOnly; }
void SendLogToWindow(Bool_t mode) { fLogToWindowOnly = mode; }
void ResetProgressDialogStatus() { fProgressDialogStarted = kFALSE; }
TTree *GetTreeHeader(TDSet *tdset);
TList *GetOutputNames();
void AddChain(TChain *chain);
void RemoveChain(TChain *chain);
TDrawFeedback *CreateDrawFeedback();
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
void DeleteDrawFeedback(TDrawFeedback *f);
void Detach(Option_t *opt = "");
void SetAlias(const char *alias="");
void ActivateWorker(const char *ord);
void DeactivateWorker(const char *ord);
static TVirtualProof *Open(const char *url = 0, const char *conffile = 0,
const char *confdir = 0, Int_t loglevel = 0);
static Int_t Reset(const char *url, const char *usr = 0);
ClassDef(TProof,0)
};
#endif
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.