1// @(#)root/proof:$Id$
2// Author: Fons Rademakers 13/02/97
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
12#ifndef ROOT_TProof
13#define ROOT_TProof
17// //
18// TProof //
19// //
20// This class controls a Parallel ROOT Facility, PROOF, cluster. //
21// It fires the worker servers, it keeps track of how many workers are //
22// running, it keeps track of the workers running status, it broadcasts //
23// messages to all workers, it collects results, etc. //
24// //
27#include "TProofMgr.h"
28#include "TProofDebug.h"
29#include "TString.h"
30#include "TMacro.h"
31#include "MessageTypes.h"
32#include "TMD5.h"
33#include "TRegexp.h"
34#include "TSysEvtHandler.h"
35#include "TUrl.h"
36#include "TProofOutputList.h"
37#include "TStopwatch.h"
38#include "TVirtualMutex.h"
39#include "TPackMgr.h"
41#include <map>
42#include <mutex>
44#define CANNOTUSE(x) Info(x,"Not manager: cannot use this method")
46class TChain;
47class TCondor;
48class TCondorSlave;
49class TDrawFeedback;
50class TDSet;
51class TEventList;
52class THashList;
53class TList;
54class TCollection;
55class TMessage;
56class TMonitor;
57class TPluginHandler;
58class TProof;
61class TProofLockPath;
63class TProofPlayer;
66class TProofServ;
67class TQueryResult;
68class TSignalHandler;
69class TSlave;
70class TSocket;
71class TTree;
72class TFileCollection;
73class TMap;
74class TDataSetManager;
76class TMacro;
77class TSelector;
79// protocol changes:
80// 1 -> 2: new arguments for Process() command, option added
81// 2 -> 3: package manager enabling protocol changed
82// 3 -> 4: introduction of multi-level-master support
83// 4 -> 5: added friends support
84// 5 -> 6: drop TFTP, support for asynchronous queries
85// 6 -> 7: support for multisessions, archieve, retrieve, ...
86// 7 -> 8: return number of entries in GetNextPacket
87// 8 -> 9: support for stateless connection via xproofd
88// 9 -> 10: new features requested, tested at CAF
89// 10 -> 11: new merging strategy
90// 11 -> 12: new progress message
91// 12 -> 13: exchange version/architecture/compiler info
92// 13 -> 14: new proofserv environment setting
93// 14 -> 15: add support for entry lists; new version of TFileInfo
94// 15 -> 16: add support for generic non-data based processing
95// 16 -> 17: new dataset handling system; support for TFileCollection processing
96// 17 -> 18: support for reconnection on daemon restarts
97// 18 -> 19: TProofProgressStatus used in kPROOF_PROGRESS, kPROOF_STOPPROCESS
98// and kPROOF_GETNEXTPACKET messages in Master - worker communication
99// 19 -> 20: Fix the asynchronous mode (required changes in some messages)
100// 20 -> 21: Add support for session queuing
101// 21 -> 22: Add support for switching from sync to async while running ('Ctrl-Z' functionality)
102// 22 -> 23: New dataset features (default tree name; classification per fileserver)
103// 23 -> 24: Merging optimization
104// 24 -> 25: Handling of 'data' dir; group information
105// 25 -> 26: Use new TProofProgressInfo class
106// 26 -> 27: Use new file for updating the session status
107// 27 -> 28: Support for multi-datasets, fix global pack dirs, fix AskStatistics,
108// package download, dataset caching
109// 28 -> 29: Support for config parameters in EnablePackage, idle-timeout
110// 29 -> 30: Add information about data dir in TSlaveInfo
111// 30 -> 31: Development cycle 5.29
112// 31 -> 32: New log path trasmission
113// 32 -> 33: Development cycle 5.29/04 (fixed worker activation, new startup technology, ...)
114// 33 -> 34: Development cycle 5.33/02 (fix load issue, ...)
115// 34 -> 35: Development cycle 5.99/01 (PLite on workers, staging requests in separate dsmgr...)
116// 35 -> 36: SetParallel in dynamic mode (changes default in GoParallel), cancel staging requests
117// 36 -> 37: Support for remote (web) PAR packages
119// PROOF magic constants
120const Int_t kPROOF_Protocol = 37; // protocol version number
121const Int_t kPROOF_Port = 1093; // IANA registered PROOF port
122const char* const kPROOF_ConfFile = "proof.conf"; // default config file
123const char* const kPROOF_ConfDir = "/usr/local/root"; // default config dir
124const char* const kPROOF_WorkDir = ".proof"; // default working directory
125const char* const kPROOF_CacheDir = "cache"; // file cache dir, under WorkDir
126const char* const kPROOF_PackDir = "packages"; // package dir, under WorkDir
127const char* const kPROOF_PackDownloadDir = "downloaded"; // subdir with downloaded PARs, under PackDir
128const char* const kPROOF_QueryDir = "queries"; // query dir, under WorkDir
129const char* const kPROOF_DataSetDir = "datasets"; // dataset dir, under WorkDir
130const char* const kPROOF_DataDir = "data"; // dir for produced data, under WorkDir
131const char* const kPROOF_CacheLockFile = "proof-cache-lock-"; // cache lock file
132const char* const kPROOF_PackageLockFile = "proof-package-lock-"; // package lock file
133const char* const kPROOF_QueryLockFile = "proof-query-lock-"; // query lock file
134const char* const kPROOF_TerminateWorker = "+++ terminating +++"; // signal worker termination in MarkBad
135const char* const kPROOF_WorkerIdleTO = "+++ idle-timeout +++"; // signal worker idle timeout in MarkBad
136const char* const kPROOF_InputDataFile = "inputdata.root"; // Default input data file name
137const char* const kPROOF_MissingFiles = "MissingFiles"; // Missingfile list name
138const Long64_t kPROOF_DynWrkPollInt_s = 10; // minimum number of seconds between two polls for dyn wrks
140#ifndef R__WIN32
141const char* const kCP = "/bin/cp -fp";
142const char* const kRM = "/bin/rm -rf";
143const char* const kLS = "/bin/ls -l";
144const char* const kUNTAR = "%s -c %s/%s | (cd %s; tar xf -)";
145const char* const kUNTAR2 = "%s -c %s | (cd %s; tar xf -)";
146const char* const kUNTAR3 = "%s -c %s | (tar xf -)";
147const char* const kGUNZIP = "gunzip";
149const char* const kCP = "copy";
150const char* const kRM = "delete";
151const char* const kLS = "dir";
152const char* const kUNTAR = "...";
153const char* const kUNTAR2 = "...";
154const char* const kUNTAR3 = "...";
155const char* const kGUNZIP = "gunzip";
158typedef void (*PrintProgress_t)(Long64_t tot, Long64_t proc, Float_t proctime, Long64_t bytes);
160// Structure for the progress information
163 Long64_t fTotal; // Total number of events to process
164 Long64_t fProcessed; // Number of events processed
165 Long64_t fBytesRead; // Number of bytes read
166 Float_t fInitTime; // Time for initialization
167 Float_t fProcTime; // Time for processing
168 Float_t fEvtRateI; // Instantaneous event rate
169 Float_t fMBRateI; // Instantaneous byte read rate
170 Int_t fActWorkers; // Numebr of workers still active
171 Int_t fTotSessions; // Numebr of PROOF sessions running currently on the clusters
172 Float_t fEffSessions; // Number of effective sessions running on the machines allocated to this session
173 TProofProgressInfo(Long64_t tot = 0, Long64_t proc = 0, Long64_t bytes = 0,
174 Float_t initt = -1., Float_t proct = -1.,
175 Float_t evts = -1., Float_t mbs = -1.,
176 Int_t actw = 0, Int_t tsess = 0, Float_t esess = 0.) :
177 fTotal(tot), fProcessed(proc), fBytesRead(bytes),
178 fInitTime(initt), fProcTime(proct), fEvtRateI(evts), fMBRateI(mbs),
179 fActWorkers(actw), fTotSessions(tsess), fEffSessions(esess) { }
181 ClassDef(TProofProgressInfo, 1); // Progress information
184// PROOF Interrupt signal handler
194 Bool_t Notify();
197// Input handler for messages from TProofServ
203 TProofInputHandler(const TProofInputHandler&); // Not implemented
207 Bool_t Notify();
208 Bool_t ReadNotify() { return Notify(); }
211// Slaves info class
212class TSlaveInfo : public TObject {
216 TString fOrdinal; //slave ordinal
217 TString fHostName; //hostname this slave is running on
218 TString fMsd; //mass storage domain slave is in
219 TString fDataDir; //directory for user data
220 Int_t fPerfIndex; //relative performance of this slave
221 SysInfo_t fSysInfo; //Infomation about its hardware
222 ESlaveStatus fStatus; //slave status
224 TSlaveInfo(const char *ordinal = "", const char *host = "", Int_t perfidx = 0,
225 const char *msd = "", const char *datadir = "") :
226 fOrdinal(ordinal), fHostName(host), fMsd(msd), fDataDir(datadir),
227 fPerfIndex(perfidx), fSysInfo(), fStatus(kNotActive) { }
229 const char *GetDataDir() const { return fDataDir; }
230 const char *GetMsd() const { return fMsd; }
231 const char *GetName() const { return fHostName; }
232 const char *GetOrdinal() const { return fOrdinal; }
233 SysInfo_t GetSysInfo() const { return fSysInfo; }
234 void SetStatus(ESlaveStatus stat) { fStatus = stat; }
235 void SetSysInfo(SysInfo_t si);
236 void SetOrdinal(const char *ord) { fOrdinal = ord; }
238 Int_t Compare(const TObject *obj) const;
239 Bool_t IsSortable() const { return kTRUE; }
240 void Print(Option_t *option="") const;
241 Bool_t IsEqual(const TObject* obj) const;
243 ClassDef(TSlaveInfo,4) //basic info on workers
246// Merger info class
247class TMergerInfo : public TObject {
250 TSlave *fMerger; // Slave that acts as merger
251 Int_t fPort; // Port number, on which it accepts outputs from other workers
252 Int_t fMergedObjects; // Total number of objects it must accept from other workers
253 // (-1 == not set yet)
254 Int_t fWorkersToMerge; // Number of workers that are merged on this merger
255 // (does not change during time)
256 Int_t fMergedWorkers; // Current number of already merged workers
257 // (does change during time as workers are being merged)
259 TList *fWorkers; // List of already assigned workers
260 Bool_t fIsActive; // Merger state
262 TMergerInfo(const TMergerInfo&); // Not implemented
263 TMergerInfo& operator=(const TMergerInfo&); // Not implemented
266 TMergerInfo(TSlave *t, Int_t port, Int_t forHowManyWorkers) :
267 fMerger(t), fPort(port), fMergedObjects(0), fWorkersToMerge(forHowManyWorkers),
269 virtual ~TMergerInfo();
271 void AddWorker(TSlave *sl);
272 TList *GetWorkers() { return fWorkers; }
274 TSlave *GetMerger() { return fMerger; }
275 Int_t GetPort() { return fPort; }
281 void SetMergedWorker();
282 void AddMergedObjects(Int_t objects) { fMergedObjects += objects; }
290 ClassDef(TMergerInfo,0) // Basic info on merger, i.e. worker serving as merger
293// Small auxiliary class for merging progress notification
300 static char fgCr[4];
302 TProofMergePrg() : fExp(), fIdx(-1), fNWrks(-1), fLastNWrks(-1) { }
304 const char *Export(Bool_t &changed) {
305 fExp.Form("%c (%d workers still sending) ", fgCr[fIdx], fNWrks);
306 changed = (fLastNWrks != fNWrks || fLastNWrks == -1) ? kTRUE : kFALSE;
308 return fExp.Data(); }
309 void DecreaseNWrks() { fNWrks--; }
310 void IncreaseNWrks() { fNWrks++; }
311 void IncreaseIdx() { fIdx++; if (fIdx == 4) fIdx = 0; }
312 void Reset(Int_t n = -1) { fIdx = -1; SetNWrks(n); }
313 void SetNWrks(Int_t n) { fNWrks = n; }
316class TProof : public TNamed, public TQObject {
318friend class TPacketizer;
319friend class TPacketizerDev;
321friend class TProofLite;
322friend class TDataSetManager;
323friend class TProofServ;
326friend class TProofPlayer;
327friend class TProofPlayerLite;
330friend class TSlave;
331friend class TSlaveLite;
333friend class TXSlave;
334friend class TXSocket; // to access kPing
335friend class TXSocketHandler; // to access fCurrentMonitor and CollectInputFrom
336friend class TXProofMgr; // to access EUrgent
337friend class TXProofServ; // to access EUrgent
340 // PROOF status bits
348 };
350 kSync = 0,
351 kAsync = 1
352 };
354 kAppend = 0x1,
359 kAskUser = 0x0
360 };
365 };
367 kUntar = 0x0, //Untar over existing dir [default]
368 kRemoveOld = 0x1 //Remove existing dir with same name
369 };
371 kRunning = 0, // Normal status
372 kStopped = 1, // After the stop button has been pressed
373 kAborted = 2 // After the abort button has been pressed
374 };
377 kOutputSize = 1, //Number of objects in worker's output list
378 kSendOutput = 2, //Naster asks worker for its output list
379 kBeMerger = 3, //Master tells worker to be a merger
380 kMergerDown = 4, //Merger cannot serve
381 kStopMerging = 5, //Master tells worker to stop merging (and return output)
382 kOutputSent = 6 //Worker reports sending its output to given worker
383 };
386 kPurge = 0x1,
388 kDataset = 0x4,
389 kForceClear = 0x8
390 };
393 enum EUrgent {
395 kPing = 0,
399 };
421 kLoadMacro = 21
422 };
424 kUploadDataSet = 1, //Upload a dataset
425 kCheckDataSetName = 2, //Check wheter dataset of this name exists
426 kGetDataSets = 3, //List datasets saved on the master node
427 kRegisterDataSet = 4, //Save a TList object as a dataset
428 kGetDataSet = 5, //Get a TFileCollection of TFileInfo objects
429 kVerifyDataSet = 6, //Try open all files from a dataset and report results
430 kRemoveDataSet = 7, //Remove a dataset but leave files belonging to it
431 kMergeDataSet = 8, //Add new files to an existing dataset
432 kShowDataSets = 9, //Shows datasets, returns formatted output
433 kGetQuota = 10, //Get quota info per group
434 kShowQuota = 11, //Show quotas
435 kSetDefaultTreeName = 12, //Set the default tree name
436 kCache = 13, //Show/clear cache
437 kRequestStaging = 14, //Request staging of a dataset
438 kStagingStatus = 15, //Obtain staging status for the given dataset
439 kCancelStaging = 16 //Cancels dataset staging request
440 };
442 kAscii = 0x0,
443 kBinary = 0x1,
444 kForce = 0x2,
445 kForward = 0x4,
446 kCpBin = 0x8,
447 kCp = 0x10
448 };
452 };
458 };
461 kPerUser = 0x2
462 };
464 Bool_t fValid; //is this a valid proof object
465 Bool_t fTty; //TRUE if connected to a terminal
466 TString fMaster; //master server ("" if a master); used in the browser
467 TString fWorkDir; //current work directory on remote servers
468 TString fGroup; //PROOF group of this user
469 Int_t fLogLevel; //server debug logging level
470 Int_t fStatus; //remote return status (part of kPROOF_LOGDONE)
471 Int_t fCheckFileStatus; //remote return status after kPROOF_CHECKFILE
472 TList *fRecvMessages; //Messages received during collect not yet processed
473 TList *fSlaveInfo; //!list returned by kPROOF_GETSLAVEINFO
474 Bool_t fSendGroupView; //if true send new group view
475 Bool_t fIsPollingWorkers; //will be set to kFALSE to prevent recursive dyn workers check in dyn mode
476 Long64_t fLastPollWorkers_s; //timestamp (in seconds) of last poll for workers, -1 if never checked
477 TList *fActiveSlaves; //list of active slaves (subset of all slaves)
478 TString fActiveSlavesSaved;// comma-separated list of active slaves (before last call to
479 // SetParallel or Activate/DeactivateWorkers)
480 TList *fInactiveSlaves; //list of inactive slaves (good but not used for processing)
481 TList *fUniqueSlaves; //list of all active slaves with unique file systems
482 TList *fAllUniqueSlaves; //list of all active slaves with unique file systems, including all submasters
483 TList *fNonUniqueMasters; //list of all active masters with a nonunique file system
484 TMonitor *fActiveMonitor; //monitor activity on all active slave sockets
485 TMonitor *fUniqueMonitor; //monitor activity on all unique slave sockets
486 TMonitor *fAllUniqueMonitor; //monitor activity on all unique slave sockets, including all submasters
487 TMonitor *fCurrentMonitor; //currently active monitor
488 Long64_t fBytesRead; //bytes read by all slaves during the session
489 Float_t fRealTime; //realtime spent by all slaves during the session
490 Float_t fCpuTime; //CPU time spent by all slaves during the session
491 TSignalHandler *fIntHandler; //interrupt signal handler (ctrl-c)
492 TPluginHandler *fProgressDialog; //progress dialog plugin
493 Bool_t fProgressDialogStarted; //indicates if the progress dialog is up
494 TVirtualProofPlayer *fPlayer; //current player
495 TList *fFeedback; //list of names to be returned as feedback
496 TList *fChains; //chains with this proof set
497 struct MD5Mod_t {
498 TMD5 fMD5; //file's md5
499 Long_t fModtime; //file's modification time
500 };
501 typedef std::map<TString, MD5Mod_t> FileMap_t;
502 FileMap_t fFileMap; //map keeping track of a file's md5 and mod time
503 TDSet *fDSet; //current TDSet being validated
505 Int_t fNotIdle; //Number of non-idle sub-nodes
506 Bool_t fSync; //true if type of currently processed query is sync
507 ERunStatus fRunStatus; //run status
508 Bool_t fIsWaiting; //true if queries have been enqueued
510 Bool_t fRedirLog; //redirect received log info
511 TString fLogFileName; //name of the temp file for redirected logs
512 FILE *fLogFileW; //temp file to redirect logs
513 FILE *fLogFileR; //temp file to read redirected logs
514 Bool_t fLogToWindowOnly; //send log to window only
516 Bool_t fSaveLogToMacro; // Whether to save received logs to TMacro fMacroLog (use with care)
517 TMacro fMacroLog; // Macro with the saved (last) log
519 TProofMergePrg fMergePrg; //Merging progress
521 TList *fWaitingSlaves; //stores a TPair of the slaves's TSocket and TMessage
522 TList *fQueries; //list of TProofQuery objects
523 Int_t fOtherQueries; //number of queries in list from previous sessions
524 Int_t fDrawQueries; //number of draw queries during this sessions
525 Int_t fMaxDrawQueries; //max number of draw queries kept
526 Int_t fSeqNum; //Remote sequential # of the last query submitted
528 Int_t fSessionID; //remote ID of the session
530 Bool_t fEndMaster; //true for a master in direct contact only with workers
532 TPackMgr *fPackMgr; // Default package manager
533 TList *fEnabledPackagesOnCluster; //list of enabled packages
535 TList *fInputData; //Input data objects sent over via file
536 TString fInputDataFile; //File with input data objects
538 TProofOutputList fOutputList; // TList implementation filtering ls(...) and Print(...)
540 PrintProgress_t fPrintProgress; //Function function to display progress info in batch mode
542 std::recursive_mutex fCloseMutex; // Avoid crashes in MarkBad or alike while closing
544 TList *fLoadedMacros; // List of loaded macros (just file names)
545 static TList *fgProofEnvList; // List of TNameds defining environment
546 // variables to pass to proofserv
548 Bool_t fMergersSet; // Indicates, if the following variables have been initialized properly
549 Bool_t fMergersByHost; // Mergers assigned by host name
551 Int_t fWorkersToMerge; // Current total number of workers, which have not been yet assigned to any merger
557 TString fPerfTree; // If non-null triggers saving of the performance info into fPerfTree
559 TList *fWrksOutputReady; // List of workers ready to send output (in control output sending mode)
561 static TPluginHandler *fgLogViewer; // Log dialog box plugin
566 Bool_t fMasterServ; //true if we are a master server
567 TUrl fUrl; //Url of the master
568 TString fConfFile; //file containing config information
569 TString fConfDir; //directory containing cluster config information
570 TString fImage; //master's image name
571 Int_t fProtocol; //remote PROOF server protocol version number
572 TList *fSlaves; //list of all slave servers as in config file
573 TList *fTerminatedSlaveInfos; //list of unique infos of terminated slaves
574 TList *fBadSlaves; //dead slaves (subset of all slaves)
575 TMonitor *fAllMonitor; //monitor activity on all valid slave sockets
576 Bool_t fDataReady; //true if data is ready to be analyzed
577 Long64_t fBytesReady; //number of bytes staged
578 Long64_t fTotalBytes; //number of bytes to be analyzed
579 TList *fAvailablePackages; //list of available packages
580 TList *fEnabledPackages; //list of enabled packages
581 TList *fRunningDSets; // Temporary datasets used for async running
583 Int_t fCollectTimeout; // Timeout for (some) collect actions
585 TString fDataPoolUrl; // default data pool entry point URL
586 TProofMgr::EServType fServType; // type of server: proofd, XrdProofd
587 TProofMgr *fManager; // manager to which this session belongs (if any)
588 EQueryMode fQueryMode; // default query mode
589 Bool_t fDynamicStartup; // are the workers started dynamically?
591 TSelector *fSelector; // Selector to be processed, if any
593 TStopwatch fQuerySTW; // Stopwatch to measure query times
594 Float_t fPrepTime; // Preparation time
597 TProof(const TProof &); // not implemented
598 void operator=(const TProof &); // idem
600 void CleanGDirectory(TList *ol);
602 Int_t Exec(const char *cmd, ESlaves list, Bool_t plusMaster);
603 Int_t SendCommand(const char *cmd, ESlaves list = kActive);
606 Bool_t CheckFile(const char *file, TSlave *sl, Long_t modtime, Int_t cpopt = (kCp | kCpBin));
607 Int_t SendObject(const TObject *obj, ESlaves list = kActive);
610 Int_t SendPrint(Option_t *option="");
611 Int_t Ping(ESlaves list);
612 void Interrupt(EUrgent type, ESlaves list = kActive);
613 void AskStatistics();
614 void AskParallel();
615 Int_t GoParallel(Int_t nodes, Bool_t accept = kFALSE, Bool_t random = kFALSE);
616 Int_t GoMoreParallel(Int_t nWorkersToAdd);
617 Int_t SetParallelSilent(Int_t nodes, Bool_t random = kFALSE);
618 void RecvLogFile(TSocket *s, Int_t size);
619 void NotifyLogMsg(const char *msg, const char *sfx = "\n");
621 Int_t BuildPackage(const char *package, EBuildPackageOpt opt = kBuildAll, Int_t chkveropt = TPackMgr::kCheckROOT, TList *workers = 0);
622 Int_t LoadPackage(const char *package, Bool_t notOnClient = kFALSE, TList *loadopts = 0, TList *workers = 0);
623 Int_t UnloadPackage(const char *package);
625 Int_t DisablePackage(const char *package);
628 void Activate(TList *slaves = 0);
629 Int_t Broadcast(const TMessage &mess, TList *slaves);
630 Int_t Broadcast(const TMessage &mess, ESlaves list = kActive);
631 Int_t Broadcast(const char *mess, Int_t kind, TList *slaves);
632 Int_t Broadcast(const char *mess, Int_t kind = kMESS_STRING, ESlaves list = kActive);
633 Int_t Broadcast(Int_t kind, TList *slaves) { return Broadcast(0, kind, slaves); }
634 Int_t Broadcast(Int_t kind, ESlaves list = kActive) { return Broadcast(0, kind, list); }
635 Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks);
636 Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile = 0, ESlaves list = kAllUnique);
637 Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list = kAllUnique);
638 Int_t BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers);
639 Int_t BroadcastObject(const TObject *obj, Int_t kind, TList *slaves);
640 Int_t BroadcastObject(const TObject *obj, Int_t kind = kMESS_OBJECT, ESlaves list = kActive);
641 Int_t BroadcastRaw(const void *buffer, Int_t length, TList *slaves);
642 Int_t BroadcastRaw(const void *buffer, Int_t length, ESlaves list = kActive);
643 Int_t Collect(const TSlave *sl, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
644 Int_t Collect(TMonitor *mon, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
645 Int_t CollectInputFrom(TSocket *s, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
646 Int_t HandleInputMessage(TSlave *wrk, TMessage *m, Bool_t deactonfail = kFALSE);
647 void HandleSubmerger(TMessage *mess, TSlave *sl);
648 void SetMonitor(TMonitor *mon = 0, Bool_t on = kTRUE);
650 void ReleaseMonitor(TMonitor *mon);
652 virtual void FindUniqueSlaves();
653 TSlave *FindSlave(TSocket *s) const;
654 TList *GetListOfSlaves() const { return fSlaves; }
658 Int_t GetNumberOfSlaves() const;
664 Bool_t IsEndMaster() const { return fEndMaster; }
665 Int_t ModifyWorkerLists(const char *ord, Bool_t add, Bool_t save);
667 void SaveActiveList();
669 Bool_t IsSync() const { return fSync; }
672 void SetRunStatus(ERunStatus rst) { fRunStatus = rst; }
674 void MarkBad(TSlave *wrk, const char *reason = 0);
675 void MarkBad(TSocket *s, const char *reason = 0);
676 void TerminateWorker(TSlave *wrk);
677 void TerminateWorker(const char *ord);
679 void ActivateAsyncInput();
683 void PrintProgress(Long64_t total, Long64_t processed,
684 Float_t procTime = -1., Long64_t bytesread = -1);
686 // Managing mergers
687 Bool_t CreateMerger(TSlave *sl, Int_t port);
688 void RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size);
692 void AskForOutput(TSlave *sl);
696 void ResetMergePrg();
697 void ParseConfigField(const char *config);
699 Bool_t Prompt(const char *p);
702 static TList *GetDataSetSrvMaps(const TString &srvmaps);
705 TProof(); // For derived classes to use
706 void InitMembers();
707 Int_t Init(const char *masterurl, const char *conffile,
708 const char *confdir, Int_t loglevel,
709 const char *alias = 0);
710 virtual Bool_t StartSlaves(Bool_t attach = kFALSE);
711 Int_t AddWorkers(TList *wrks);
713 void SetupWorkersEnv(TList *wrks, Bool_t increasingpool = kFALSE);
715 void SetPlayer(TVirtualProofPlayer *player);
717 virtual TVirtualProofPlayer *MakePlayer(const char *player = 0, TSocket *s = 0);
719 void UpdateDialog();
721 void HandleLibIncPath(const char *what, Bool_t add, const char *dirs);
724 TSlave *CreateSlave(const char *url, const char *ord,
725 Int_t perf, const char *image, const char *workdir);
726 TSlave *CreateSubmaster(const char *url, const char *ord,
727 const char *image, const char *msd, Int_t nwk = 1);
729 virtual Int_t PollForNewWorkers();
730 virtual void SaveWorkerInfo();
732 Int_t Collect(ESlaves list = kActive, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
733 Int_t Collect(TList *slaves, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
737 void SetDSet(TDSet *dset) { fDSet = dset; }
738 virtual void ValidateDSet(TDSet *dset);
740 Int_t VerifyDataSetParallel(const char *uri, const char *optStr);
744 Int_t AssertPath(const char *path, Bool_t writable);
745 Int_t GetSandbox(TString &sb, Bool_t assert = kFALSE, const char *rc = 0);
747 void PrepareInputDataFile(TString &dataFile);
748 virtual void SendInputDataFile();
749 Int_t SendFile(const char *file, Int_t opt = (kBinary | kForward | kCp | kCpBin),
750 const char *rfile = 0, TSlave *sl = 0);
752 // Fast enable/disable feedback from Process
753 void SetFeedback(TString &opt, TString &optfb, Int_t action);
754 // Output file handling during Process
755 Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action);
757 static void *SlaveStartupThread(void *arg);
759 static Int_t AssertDataSet(TDSet *dset, TList *input,
760 TDataSetManager *mgr, TString &emsg);
761 static void AssertMacroPath(const char *macro);
763 // Input data handling
764 static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg);
765 static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg);
766 static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg);
768 // Parse CINT commands
769 static Bool_t GetFileInCmd(const char *cmd, TString &fn);
771 // Pipe execution of commands
772 static void SystemCmd(const char *cmd, Int_t fdout);
775 TProof(const char *masterurl, const char *conffile = kPROOF_ConfFile,
776 const char *confdir = kPROOF_ConfDir, Int_t loglevel = 0,
777 const char *alias = 0, TProofMgr *mgr = 0);
778 virtual ~TProof();
780 void cd(Int_t id = -1);
782 Int_t Ping();
783 void Touch();
784 Int_t Exec(const char *cmd, Bool_t plusMaster = kFALSE);
785 Int_t Exec(const char *cmd, const char *ord, Bool_t logtomacro = kFALSE);
787 TString Getenv(const char *env, const char *ord = "0");
788 Int_t GetRC(const char *RCenv, Int_t &env, const char *ord = "0");
789 Int_t GetRC(const char *RCenv, Double_t &env, const char *ord = "0");
790 Int_t GetRC(const char *RCenv, TString &env, const char *ord = "0");
792 virtual Long64_t Process(TDSet *dset, const char *selector,
793 Option_t *option = "", Long64_t nentries = -1,
794 Long64_t firstentry = 0);
795 virtual Long64_t Process(TFileCollection *fc, const char *selector,
796 Option_t *option = "", Long64_t nentries = -1,
797 Long64_t firstentry = 0);
798 virtual Long64_t Process(const char *dsetname, const char *selector,
799 Option_t *option = "", Long64_t nentries = -1,
800 Long64_t firstentry = 0, TObject *enl = 0);
801 virtual Long64_t Process(const char *selector, Long64_t nentries,
802 Option_t *option = "");
803 // Process via TSelector
804 virtual Long64_t Process(TDSet *dset, TSelector *selector,
805 Option_t *option = "", Long64_t nentries = -1,
806 Long64_t firstentry = 0);
807 virtual Long64_t Process(TFileCollection *fc, TSelector *selector,
808 Option_t *option = "", Long64_t nentries = -1,
809 Long64_t firstentry = 0);
810 virtual Long64_t Process(const char *dsetname, TSelector *selector,
811 Option_t *option = "", Long64_t nentries = -1,
812 Long64_t firstentry = 0, TObject *enl = 0);
813 virtual Long64_t Process(TSelector *selector, Long64_t nentries,
814 Option_t *option = "");
816 virtual Long64_t DrawSelect(TDSet *dset, const char *varexp,
817 const char *selection = "",
818 Option_t *option = "", Long64_t nentries = -1,
819 Long64_t firstentry = 0);
820 Long64_t DrawSelect(const char *dsetname, const char *varexp,
821 const char *selection = "",
822 Option_t *option = "", Long64_t nentries = -1,
823 Long64_t firstentry = 0, TObject *enl = 0);
824 Int_t Archive(Int_t query, const char *url);
825 Int_t Archive(const char *queryref, const char *url = 0);
826 Int_t CleanupSession(const char *sessiontag);
827 Long64_t Finalize(Int_t query = -1, Bool_t force = kFALSE);
828 Long64_t Finalize(const char *queryref, Bool_t force = kFALSE);
829 Int_t Remove(Int_t query, Bool_t all = kFALSE);
830 Int_t Remove(const char *queryref, Bool_t all = kFALSE);
831 Int_t Retrieve(Int_t query, const char *path = 0);
832 Int_t Retrieve(const char *queryref, const char *path = 0);
834 void DisableGoAsyn();
835 void GoAsynchronous();
836 void StopProcess(Bool_t abort, Int_t timeout = -1);
837 void Browse(TBrowser *b);
839 virtual Int_t Echo(const TObject *obj);
840 virtual Int_t Echo(const char *str);
842 Int_t SetParallel(Int_t nodes = -1, Bool_t random = kFALSE);
843 void SetLogLevel(Int_t level, UInt_t mask = TProofDebug::kAll);
845 void Close(Option_t *option="");
846 virtual void Print(Option_t *option="") const;
848 //-- cache and package management
849 virtual void ShowCache(Bool_t all = kFALSE);
850 virtual void ClearCache(const char *file = 0);
853 void ShowPackages(Bool_t all = kFALSE, Bool_t redirlog = kFALSE);
856 Int_t ClearPackage(const char *package);
857 Int_t DownloadPackage(const char *par, const char *dstdir = 0);
858 Int_t EnablePackage(const char *package, Bool_t notOnClient = kFALSE, TList *workers = 0);
859 Int_t EnablePackage(const char *package, const char *loadopts,
860 Bool_t notOnClient = kFALSE, TList *workers = 0);
861 Int_t EnablePackage(const char *package, TList *loadopts,
862 Bool_t notOnClient = kFALSE, TList *workers = 0);
863 Int_t UploadPackage(const char *par, EUploadPackageOpt opt = kUntar, TList *workers = 0);
864 virtual Int_t Load(const char *macro, Bool_t notOnClient = kFALSE, Bool_t uniqueOnly = kTRUE,
865 TList *wrks = 0);
867 Int_t AddDynamicPath(const char *libpath, Bool_t onClient = kFALSE, TList *wrks = 0, Bool_t doCollect = kTRUE);
868 Int_t AddIncludePath(const char *incpath, Bool_t onClient = kFALSE, TList *wrks = 0, Bool_t doCollect = kTRUE);
869 Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient = kFALSE);
870 Int_t RemoveIncludePath(const char *incpath, Bool_t onClient = kFALSE);
872 //-- dataset management
873 Int_t UploadDataSet(const char *, TList *, const char * = 0, Int_t = 0, TList * = 0);
874 Int_t UploadDataSet(const char *, const char *, const char * = 0, Int_t = 0, TList * = 0);
875 Int_t UploadDataSetFromFile(const char *, const char *, const char * = 0, Int_t = 0, TList * = 0);
876 virtual Bool_t RegisterDataSet(const char *name,
877 TFileCollection *dataset, const char* optStr = "");
878 virtual TMap *GetDataSets(const char *uri = "", const char* optStr = "");
879 virtual void ShowDataSets(const char *uri = "", const char* optStr = "");
881 TMap *GetDataSetQuota(const char* optStr = "");
882 void ShowDataSetQuota(Option_t* opt = 0);
884 virtual Bool_t ExistsDataSet(const char *dataset);
885 void ShowDataSet(const char *dataset = "", const char* opt = "filter:SsCc");
886 virtual Int_t RemoveDataSet(const char *dataset, const char* optStr = "");
887 virtual Int_t VerifyDataSet(const char *dataset, const char* optStr = "");
888 virtual TFileCollection *GetDataSet(const char *dataset, const char* optStr = "");
889 TList *FindDataSets(const char *searchString, const char* optStr = "");
890 virtual Bool_t RequestStagingDataSet(const char *dataset);
891 virtual TFileCollection *GetStagingStatusDataSet(const char *dataset);
892 virtual void ShowStagingStatusDataSet(const char *dataset, const char *optStr = "filter:SsCc");
893 virtual Bool_t CancelStagingDataSet(const char *dataset);
895 virtual Int_t SetDataSetTreeName( const char *dataset, const char *treename);
897 virtual void ShowDataSetCache(const char *dataset = 0);
898 virtual void ClearDataSetCache(const char *dataset = 0);
900 virtual void ShowData();
901 void ClearData(UInt_t what = kUnregistered, const char *dsname = 0);
903 const char *GetMaster() const { return fMaster; }
904 const char *GetConfDir() const { return fConfDir; }
905 const char *GetConfFile() const { return fConfFile; }
906 const char *GetUser() const { return fUrl.GetUser(); }
907 const char *GetGroup() const { return fGroup; }
908 const char *GetWorkDir() const { return fWorkDir; }
909 const char *GetSessionTag() const { return GetName(); }
910 const char *GetImage() const { return fImage; }
911 const char *GetUrl() { return fUrl.GetUrl(); }
912 Int_t GetPort() const { return fUrl.GetPort(); }
915 Int_t GetStatus() const { return fStatus; }
916 Int_t GetLogLevel() const { return fLogLevel; }
917 Int_t GetParallel() const;
918 Int_t GetSeqNum() const { return fSeqNum; }
919 Int_t GetSessionID() const { return fSessionID; }
923 EQueryMode GetQueryMode(Option_t *mode = 0) const;
924 void SetQueryMode(EQueryMode mode);
926 void SetRealTimeLog(Bool_t on = kTRUE);
928 void GetStatistics(Bool_t verbose = kFALSE);
929 Long64_t GetBytesRead() const { return fBytesRead; }
930 Float_t GetRealTime() const { return fRealTime; }
931 Float_t GetCpuTime() const { return fCpuTime; }
935 Bool_t IsFolder() const { return kTRUE; }
936 Bool_t IsMaster() const { return fMasterServ; }
937 Bool_t IsValid() const { return fValid; }
938 Bool_t IsTty() const { return fTty; }
939 Bool_t IsParallel() const { return GetParallel() > 0 ? kTRUE : kFALSE; }
940 Bool_t IsIdle() const { return (fNotIdle <= 0) ? kTRUE : kFALSE; }
941 Bool_t IsWaiting() const { return fIsWaiting; }
946 //-- input list parameter handling
947 void SetParameter(const char *par, const char *value);
948 void SetParameter(const char *par, Int_t value);
949 void SetParameter(const char *par, Long_t value);
950 void SetParameter(const char *par, Long64_t value);
951 void SetParameter(const char *par, Double_t value);
952 TObject *GetParameter(const char *par) const;
953 void DeleteParameters(const char *wildcard);
954 void ShowParameters(const char *wildcard = "PROOF_*") const;
956 void AddInput(TObject *obj);
957 void ClearInput();
959 TObject *GetOutput(const char *name);
961 static TObject *GetOutput(const char *name, TList *out);
963 void ShowMissingFiles(TQueryResult *qr = 0);
966 void AddInputData(TObject *obj, Bool_t push = kFALSE);
967 void SetInputDataFile(const char *datafile);
968 void ClearInputData(TObject *obj = 0);
969 void ClearInputData(const char *name);
971 void AddFeedback(const char *name);
972 void RemoveFeedback(const char *name);
973 void ClearFeedback();
974 void ShowFeedback() const;
975 TList *GetFeedbackList() const;
977 virtual TList *GetListOfQueries(Option_t *opt = "");
981 TQueryResult *GetQueryResult(const char *ref = 0);
982 void GetMaxQueries();
983 void SetMaxDrawQueries(Int_t max);
984 void ShowQueries(Option_t *opt = "");
986 Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready);
988 void SetActive(Bool_t /*active*/ = kTRUE) { }
990 void LogMessage(const char *msg, Bool_t all); //*SIGNAL*
991 void Progress(Long64_t total, Long64_t processed); //*SIGNAL*
992 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
993 Float_t initTime, Float_t procTime,
994 Float_t evtrti, Float_t mbrti); // *SIGNAL*
995 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
996 Float_t initTime, Float_t procTime,
997 Float_t evtrti, Float_t mbrti,
998 Int_t actw, Int_t tses, Float_t eses); // *SIGNAL*
999 void Feedback(TList *objs); //*SIGNAL*
1000 void QueryResultReady(const char *ref); //*SIGNAL*
1001 void CloseProgressDialog(); //*SIGNAL*
1002 void ResetProgressDialog(const char *sel, Int_t sz,
1003 Long64_t fst, Long64_t ent); //*SIGNAL*
1004 void StartupMessage(const char *msg, Bool_t status, Int_t done,
1005 Int_t total); //*SIGNAL*
1006 void DataSetStatus(const char *msg, Bool_t status,
1007 Int_t done, Int_t total); //*SIGNAL*
1009 void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st);
1011 void GetLog(Int_t start = -1, Int_t end = -1);
1012 TMacro *GetLastLog();
1013 void PutLog(TQueryResult *qr);
1014 void ShowLog(Int_t qry = -1);
1015 void ShowLog(const char *queryref);
1023 virtual TTree *GetTreeHeader(TDSet *tdset);
1026 void AddChain(TChain *chain);
1027 void RemoveChain(TChain *chain);
1033 void Detach(Option_t *opt = "");
1035 virtual void SetAlias(const char *alias="");
1038 void SetManager(TProofMgr *mgr);
1040 Int_t ActivateWorker(const char *ord, Bool_t save = kTRUE);
1041 Int_t DeactivateWorker(const char *ord, Bool_t save = kTRUE);
1043 const char *GetDataPoolUrl() const { return fManager ? fManager->GetMssUrl() : 0; }
1044 void SetDataPoolUrl(const char *url) { if (fManager) fManager->SetMssUrl(url); }
1048 void SetProgressDialog(Bool_t on = kTRUE);
1050 // Enable the performance tree
1051 Int_t SavePerfTree(const char *pf = 0, const char *qref = 0);
1052 void SetPerfTree(const char *pf = "perftree.root", Bool_t withWrks = kFALSE);
1054 // Opening and managing PROOF connections
1055 static TProof *Open(const char *url = 0, const char *conffile = 0,
1056 const char *confdir = 0, Int_t loglevel = 0);
1057 static void LogViewer(const char *url = 0, Int_t sessionidx = 0);
1058 static TProofMgr *Mgr(const char *url);
1059 static void Reset(const char *url, Bool_t hard = kFALSE);
1061 static void AddEnvVar(const char *name, const char *value);
1062 static void DelEnvVar(const char *name);
1063 static const TList *GetEnvVars();
1064 static void ResetEnvVars();
1066 // Input/output list utilities
1067 static Int_t GetParameter(TCollection *c, const char *par, TString &value);
1068 static Int_t GetParameter(TCollection *c, const char *par, Int_t &value);
1069 static Int_t GetParameter(TCollection *c, const char *par, Long_t &value);
1070 static Int_t GetParameter(TCollection *c, const char *par, Long64_t &value);
1071 static Int_t GetParameter(TCollection *c, const char *par, Double_t &value);
1073 ClassDef(TProof,0) //PROOF control class
