26 # include "snprintf.h"
28 #include "RConfigure.h"
83 fReInvalid =
new TPMERegexp(
"[^A-Za-z0-9._-]");
104 fUrl.SetProtocol(
"proof");
105 fUrl.SetHost(
"__lite__");
109 if (strlen(fUrl.GetUser()) <= 0) {
113 fUrl.SetUser(pw->
fUser);
120 ParseConfigField(conffile);
125 if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
132 Printf(
" +++ Starting PROOF-Lite %swith %d workers +++", stup.
Data(), fNWorkers);
134 Init(url, conffile, confdir, loglevel, alias);
138 if (!
gROOT->GetListOfProofs()->FindObject(
this))
139 gROOT->GetListOfProofs()->Add(
this);
154 const char *confdir,
Int_t loglevel,
const char *)
165 if (!conffile || !conffile[0])
167 if (!confdir || !confdir[0])
176 Error(
"Init",
"could not create/assert sandbox for this session");
183 if (sockpathdir(sockpathdir.
Length()-1) ==
'/') sockpathdir.
Remove(sockpathdir.
Length()-1);
187 Error(
"Init",
"Unix socket path '%s' is too long (%d bytes):",
189 Error(
"Init",
"use 'ProofLite.SockPathDir' to create it under a directory different"
190 " from '%s'", sockpathdir.
Data());
221 if (dynconf.
Length() > 0) {
264 Warning(
"Init",
"problems applying fMaxQueries");
267 Warning(
"Init",
"problems initializing the dataset manager");
322 Warning(
"Init",
"fork-based workers startup is not available on Windows - ignoring");
334 if (globpack.
Length() > 0) {
338 while (globpack.
Tokenize(ldir, from,
":")) {
341 Warning(
"Init",
"directory for global packages %s does not"
342 " exist or is not readable", ldir.
Data());
361 fEnabledPackagesOnClient->SetOwner();
366 Error(
"Init",
"problems setting up workers");
392 gROOT->GetListOfSockets()->Add(
this);
435 if (fgWrksMax == -2) {
437 TString sysname =
"system.rootrc";
451 fgWrksMax = sysenv.
GetValue(
"ProofLite.MaxWorkers", -1);
456 if (fgWrksMax == 0) {
457 ::Error(
"TProofLite::GetNumberOfWorkers",
458 "PROOF-Lite disabled by the system administrator: sorry!");
465 if (url && strlen(url)) {
469 nw.
Remove(0, in + strlen(
"workers="));
473 if ((nWorkers = nw.
Atoi()) <= 0) {
474 ::Warning(
"TProofLite::GetNumberOfWorkers",
475 "number of workers specified by 'workers='"
476 " is non-positive: using default");
489 if ((nWorkers = nw.
Atoi()) == 0) {
490 ::Warning(
"TProofLite::GetNumberOfWorkers",
491 "number of workers specified by 'workers='"
492 " is non-positive: using default");
507 if (notify) notify =
kFALSE;
511 if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
513 ::Warning(
"TProofLite::GetNumberOfWorkers",
"number of PROOF-Lite workers limited by"
514 " the system administrator to %d", fgWrksMax);
536 Error(
"SetupWorkers",
537 "unable to create server socket for internal communications");
548 Int_t nWrksDone = 0, nWrksTot = -1;
556 for (; ord < nWrksTot; ord++) {
560 fullord.
Form(
"%s.%d", o, ord);
570 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
575 Warning(
"SetupWorkers",
"standard startup: workers already started");
583 for (; ord < nWrksTot; ord++) {
587 fullord.
Form(
"%s.%d", o, ord + 1);
588 if (!clones.
IsNull()) clones +=
" ";
596 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
611 while (started.
GetSize() > 0 && nSelects < nWrksTot) {
618 if (xs == (
TSocket *) -1)
continue;
625 if (s->
Recv(msg) < 0) {
626 Warning(
"SetupWorkers",
"problems receiving message from accepted socket!");
643 gROOT->GetListOfSockets()->Remove(s);
661 if (startedWorkers) startedWorkers->
Add(wrk);
663 NotifyStartUp(
"Setting up worker servers", ++nWrksDone, nWrksTot);
670 Warning(
"SetupWorkers",
"received empty message from accepted socket!");
683 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
708 snprintf(msg, 512,
"%s: OK (%d workers) \n",
711 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
712 action, done, tot, frac);
714 fprintf(stderr,
"%s", msg);
723 if (!ord || strlen(ord) <= 0) {
724 Error(
"SetProofServEnv",
"ordinal string undefined");
730 FILE *frc = fopen(rcfile.
Data(),
"w");
732 Error(
"SetProofServEnv",
"cannot open rc file %s", rcfile.
Data());
737 fprintf(frc,
"# The session working dir\n");
738 fprintf(frc,
"ProofServ.SessionDir: %s/worker-%s\n",
fWorkDir.
Data(),
ord);
741 fprintf(frc,
"# Session tag\n");
742 fprintf(frc,
"ProofServ.SessionTag: %s\n",
GetName());
745 fprintf(frc,
"# Proof Log/Debug level\n");
746 fprintf(frc,
"Proof.DebugLevel: %d\n",
gDebug);
749 fprintf(frc,
"# Ordinal number\n");
750 fprintf(frc,
"ProofServ.Ordinal: %s\n", ord);
753 fprintf(frc,
"# ROOT Version tag\n");
754 fprintf(frc,
"ProofServ.RootVersionTag: %s\n",
gROOT->GetVersion());
759 Warning(
"SetProofServEnv",
"problems getting sandbox string for worker");
760 fprintf(frc,
"# Users sandbox\n");
761 fprintf(frc,
"ProofServ.Sandbox: %s\n", sandbox.Data());
764 fprintf(frc,
"# Users cache\n");
768 fprintf(frc,
"# Users packages\n");
772 fprintf(frc,
"# Server image\n");
773 fprintf(frc,
"ProofServ.Image: %s\n",
fImage.
Data());
776 fprintf(frc,
"# Open socket\n");
780 fprintf(frc,
"# Client Protocol\n");
788 FILE *fenv = fopen(envfile.
Data(),
"w");
790 Error(
"SetProofServEnv",
"cannot open env file %s", envfile.
Data());
794 #ifdef R__HAVE_CONFIG
795 fprintf(fenv,
"export ROOTSYS=%s\n", ROOTPREFIX);
797 fprintf(fenv,
"export ROOTSYS=%s\n",
gSystem->
Getenv(
"ROOTSYS"));
800 #ifdef R__HAVE_CONFIG
801 fprintf(fenv,
"export ROOTCONFDIR=%s\n", ROOTETCDIR);
803 fprintf(fenv,
"export ROOTCONFDIR=%s\n",
gSystem->
Getenv(
"ROOTSYS"));
809 fprintf(fenv,
"export ROOTPROOFLOGFILE=%s\n", logfile.Data());
811 fprintf(fenv,
"export ROOTRCFILE=%s\n", rcfile.
Data());
813 fprintf(fenv,
"export ROOTVERSIONTAG=%s\n",
gROOT->GetVersion());
815 fprintf(fenv,
"export ROOTPROOFLITE=%d\n",
fNWorkers);
817 fprintf(fenv,
"export LOCALDATASERVER=\"file://\"\n");
823 while ((env = (
TNamed *)nxenv())) {
826 fprintf(fenv,
"export %s=%s\n", env->
GetName(), senv.Data());
827 if (namelist.
Length() > 0)
831 fprintf(fenv,
"export PROOF_ALLVARS=%s\n", namelist.
Data());
848 if (!logfile)
return;
851 if (s.
Contains(
"<logfilewrk>") && logfile) {
881 var =
dynamic_cast<TNamed *
>(envVars->
FindObject(
"PROOF_SLAVE_CPUPIN_ORDER"));
882 if (var) cpuPinList = var->
GetTitle();
894 if (cpuPinList.
IsNull() || (cpuPinList ==
"*")) {
904 for (
Int_t i=0; cpuPinList.
Tokenize(tok, from,
"\\+"); i++) {
906 n = (tok.
Atoi() % nCpus);
972 lastsess.
Form(
"%s/last-lite-session", sessdir.
Data());
999 Printf(
"*** PROOF-Lite cluster %s(sequential mode)", ord.
Data());
1006 Printf(
"URL: %s", url.Data());
1015 Printf(
"ROOT version|rev|tag: %s", ver.Data());
1031 Printf(
"List of workers:");
1033 while (
TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
1058 (dset ? dset->GetEntryList() : 0));
1076 Info(
"SetQueryRunning",
"starting query: %d", pq->
GetSeqNum());
1083 if (parlist.Length() <= 0)
1104 const char *selection,
Option_t *option,
1111 Info(
"DrawSelect",
"not idle, asynchronous Draw not supported");
1123 return Process(dset,
"draw:", opt, nentries, first);
1141 if (opt.Contains(
"fb=") || opt.Contains(
"feedback="))
SetFeedback(opt, optfb, 0);
1148 Info(
"Process",
"asynchronous mode not yet supported in PROOF-Lite");
1154 Info(
"Process",
"not idle: cannot accept queries");
1165 Error(
"Process",
"invalid sesion or query-result manager undefined!");
1182 Error(
"Process",
"from AssertDataSet: %s", emsg.
Data());
1186 Error(
"Process",
"no files to process!");
1189 }
else if (hasNoData) {
1194 if (!dsn.Contains(
":") || dsn.BeginsWith(
"dataset:")) {
1198 emsg.
Form(
"dataset manager not initialized!");
1203 emsg.
Form(
"requested dataset '%s' does not exists", dsn.Data());
1210 fcmap->
SetName(
"PROOF_FilesToProcess");
1216 Error(
"HandleProcess",
"%s", emsg.
Data());
1223 TString selec(selector), varexp, selection, objname;
1225 if (selec.BeginsWith(
"draw:")) {
1230 Error(
"Process",
"draw query: error parsing arguments '%s', '%s', '%s'",
1231 varexp.
Data(), selection.
Data(), opt.Data());
1276 if (!
gROOT->IsBatch()) {
1325 TList *startedWorkers = 0;
1327 startedWorkers =
new TList;
1337 if (selector && strlen(selector)) {
1376 Emit(
"StopProcess(Bool_t)", abort);
1401 Warning(
"ProcessNext",
"problems registering produced datasets: %s", err.
Data());
1423 if (!(pq->
IsDraw()) && memqueries >= 0) {
1433 msg.
Form(
"Lite-0: all output objects have been merged ");
1434 fprintf(stderr,
"%s\n", msg.
Data());
1450 if (sst) rv = sst->
GetVal();
1476 while ((wrk = (
TSlave *) nxw())) {
1482 Warning(
"CreateSymLinks",
"problems creating sym link: %s", lnk.
Data());
1485 Info(
"CreateSymLinks",
"created sym link: %s", lnk.
Data());
1490 Warning(
"CreateSymLinks",
"files list is undefined");
1517 if (
gROOT->GetPluginManager()) {
1519 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
1524 user.Data(), dsm.
Data()));
1529 Warning(
"InitDataSetManager",
"dataset manager plug-in initialization failed");
1544 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
1550 group.
Data(), user.Data(),
1554 Warning(
"InitDataSetManager",
"default dataset manager plug-in initialization failed");
1560 Info(
"InitDataSetManager",
"datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
1570 if (!dsReqCfg.
IsNull()) {
1571 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
1573 if (reReqDir.
Match(dsReqCfg) == 5) {
1575 dsDirFmt.
Form(
"dir:%s perms:open", reReqDir[3].
Data());
1578 Warning(
"InitDataSetManager",
"failed init of dataset staging requests repository");
1582 Warning(
"InitDataSetManager",
"specify, with [dir:]<path>, a valid path for staging requests");
1585 Warning(
"InitDataSetManager",
"no repository for staging requests available");
1612 if (!file || strlen(file) <= 0) {
1632 if (!macro || !macro[0]) {
1633 Error(
"Load",
"need to specify a macro name");
1639 while (macs.Tokenize(mac, from,
",")) {
1654 if (!strncmp(e, macn.
Data(), macn.
Length())) {
1666 return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
1697 Info(
"CopyMacroToCache",
"enter: names: %s, %s", macro, name.
Data());
1701 Error(
"CopyMacroToCache",
"file %s not found or not readable", name.
Data());
1721 const char *hext[] = {
".h",
".hh",
"" };
1724 while (strlen(hext[i]) > 0) {
1725 hname =
name(0, dot);
1729 if (!checkedext.
IsNull()) checkedext +=
",";
1730 checkedext += hext[i];
1734 if (hname.
IsNull() && headerRequired == 1) {
1735 Error(
"CopyMacroToCache",
"header file for %s not found or not readable "
1736 "(checked extensions: %s)", name.
Data(), checkedext.
Data());
1739 if (headerRequired < 0)
1753 if (md5 && md5cache && (*md5 == *md5cache))
1754 useCacheBinaries =
kTRUE;
1759 if (md5h && md5hcache && (*md5h != *md5hcache))
1760 useCacheBinaries =
kFALSE;
1771 dot = vername.
Last(
'.');
1774 vername +=
".binversion";
1778 if (useCacheBinaries) {
1780 FILE *
f = fopen(
Form(
"%s/%s", cacheDir.
Data(), vername.
Data()),
"r");
1786 if (!f || v !=
gROOT->GetVersion() || r !=
gROOT->GetGitCommit())
1787 useCacheBinaries =
kFALSE;
1792 dot = binname.
Last(
'.');
1799 if (useCacheBinaries) {
1806 if (!strncmp(e, binname.
Data(), binname.
Length())) {
1817 Info(
"CopyMacroToCache",
1818 "retrieving %s from cache", fncache.
Data());
1832 Error(
"CopyMacroToCache",
"could not create a selector from %s", macro);
1845 if (!strncmp(e, binname.
Data(), binname.
Length())) {
1856 Info(
"CopyMacroToCache",
"caching %s ...", e);
1870 FILE *
f = fopen(
Form(
"%s/%s", cacheDir.
Data(), vername.
Data()),
"w");
1872 fputs(
gROOT->GetVersion(),
f);
1873 fputs(
Form(
"\n%s",
gROOT->GetGitCommit()), f);
1879 if (!useCacheBinaries) {
1882 Info(
"CopyMacroToCache",
"caching %s ...", name.
Data());
1887 Info(
"CopyMacroToCache",
"caching %s ...", hname.
Data());
1900 if (opt & (kCp |
kCpBin))
1916 if (maxold < 0)
return 0;
1926 if (!strncmp(e,
"session-", 8) && !strstr(e,
GetName())) {
1941 while (olddirs->
GetSize() > maxold) {
1942 if (notify &&
gDebug > 0)
1943 Printf(
"Cleaning sandbox at: %s", sandbox.
Data());
1969 Int_t ntot = 0, npre = 0, ndraw= 0;
2002 Warning(
"GetListOfQueries",
"unable to clone TProofQueryResult '%s:%s'",
2003 pqr->GetName(), pqr->GetTitle());
2037 Info(
"RegisterDataSet",
"dataset manager not available");
2041 if (!uri || strlen(uri) <= 0) {
2042 Info(
"RegisterDataSet",
"specifying a dataset name is mandatory");
2050 parallelverify =
kTRUE;
2060 Error(
"RegisterDataSet",
"can not save an empty list.");
2067 Info(
"RegisterDataSet",
"dataset registration not allowed");
2072 Error(
"RegisterDataSet",
"dataset was not saved");
2075 if (!parallelverify)
return result;
2080 Error(
"RegisterDataSet",
"problems verifying dataset '%s'", uri);
2096 Info(
"ExistsDataSet",
"dataset manager not available");
2100 if (!dataset || strlen(dataset) <= 0) {
2101 Info(
"SetDataSetTreeName",
"specifying a dataset name is mandatory");
2105 if (!treename || strlen(treename) <= 0) {
2106 Info(
"SetDataSetTreeName",
"specifying a tree name is mandatory");
2125 Info(
"ExistsDataSet",
"dataset manager not available");
2129 if (!uri || strlen(uri) <= 0) {
2130 Error(
"ExistsDataSet",
"dataset name missing");
2144 Info(
"GetDataSets",
"dataset manager not available");
2149 if (srvex && strlen(srvex) > 0) {
2164 Info(
"GetDataSet",
"dataset manager not available");
2178 Info(
"GetDataSet",
"dataset manager not available");
2182 if (!uri || strlen(uri) <= 0) {
2183 Info(
"GetDataSet",
"specifying a dataset name is mandatory");
2198 Info(
"RemoveDataSet",
"dataset manager not available");
2208 Info(
"RemoveDataSet",
"dataset creation / removal not allowed");
2225 Error(
"RequestStagingDataSet",
"invalid dataset specified");
2230 Error(
"RequestStagingDataSet",
"no dataset staging request repository available");
2234 TString dsUser, dsGroup, dsName, dsTree;
2242 Warning(
"RequestStagingDataSet",
"staging of %s already requested", dataset);
2249 Error(
"RequestStagingDataSet",
"empty dataset or no dataset returned");
2257 while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
2259 Int_t nToErase = fi->GetNUrls() - 1;
2260 for (
Int_t i=0; i<nToErase; i++)
2270 Error(
"RequestStagingDataSet",
"can't register staging request for %s", dataset);
2275 Info(
"RequestStagingDataSet",
"Staging request registered for %s", dataset);
2289 Error(
"CancelStagingDataSet",
"invalid dataset specified");
2294 Error(
"CancelStagingDataSet",
"no dataset staging request repository available");
2317 Error(
"GetStagingStatusDataSet",
"invalid dataset specified");
2322 Error(
"GetStagingStatusDataSet",
"no dataset staging request repository available");
2334 Info(
"GetStagingStatusDataSet",
"no pending staging request for %s", dataset);
2349 Info(
"VerifyDataSet",
"dataset manager not available");
2360 Info(
"VerifyDataSet",
"dataset verification not allowed");
2408 if (dataFile.
Length() > 0) {
2419 Warning(
"SendInputDataFile",
"problems copying '%s' to '%s'",
2434 Info(
"Remove",
"Enter: %s, %d", ref, all);
2444 if (queryref ==
"cleanupdir") {
2450 Info(
"Remove",
"%d directories removed", nd);
2473 Warning(
"Remove",
"query result manager undefined!");
2478 "query %s could not be removed (unable to lock session)", queryref.
Data());
2492 Error(
"GetTreeHeader",
"undefined TDSet");
2501 PDB(kGlobal, 1)
Info(
"GetTreeHeader",
"empty TDSet");
2508 t->SetMaxVirtualSize(0);
2510 entries = t->GetEntries();
2513 while ((e = dset->
Next()) != 0) {
2524 t->SetMaxEntryLoop(entries);
2553 Error(
"FindUniqueSlaves",
"first object in fActiveSlaves not a TSlave: embarrasing!");
2588 if (!dirname)
return;
2596 const char *ent = 0;
2598 fn.
Form(
"%s/%s", dirname, ent);
2632 Info(
"PollForNewWorkers",
"max reached: %d workers started",
fNWorkers);
2646 Int_t nWrksDone = 0, nWrksTot = -1;
2653 for (; ord < nWrksTot; ord++) {
2656 fullord =
Form(
"0.%d", ord);
2666 Info(
"PollForNewWorkers",
"additional worker '%s' started", fullord.
Data());
2669 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
2683 while (started.
GetSize() > 0 && nSelects < nWrksTot) {
2690 if (xs == (
TSocket *) -1)
continue;
2697 if (s->
Recv(msg) < 0) {
2698 Warning(
"PollForNewWorkers",
"problems receiving message from accepted socket!");
2714 gROOT->GetListOfSockets()->Remove(s);
2732 if (addedWorkers) addedWorkers->
Add(wrk);
2734 NotifyStartUp(
"Setting up added worker servers", ++nWrksDone, nWrksTot);
2741 Warning(
"PollForNewWorkers",
"received empty message from accepted socket!");
2769 Info(
"PollForNewWorkers",
"Will send the PROCESS message to selected workers");
2777 TIter naw(addedWorkers);
2778 while ((wrk = (
TSlave *)naw())) {
2782 delete addedWorkers;
Int_t SetProofServEnv(const char *ord)
Create environment files for worker 'ord'.
const char * GetName() const
Returns name of object.
void SetQueryRunning(TProofQueryResult *pq)
Set query in running state.
Int_t GetNumberOfUniqueSlaves() const
Return number of unique slaves, i.e.
void IncrementDrawQueries()
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
virtual Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)=0
virtual const char * GetTitle() const
Returns title of object.
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Int_t VerifyDataSet(const char *uri, const char *=0)
Verify if all files in the specified dataset are available.
This class starts a PROOF session on the local machine: no daemons, client and master merged...
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Bool_t RequestStagingDataSet(const char *dataset)
Allows users to request staging of a particular dataset.
Long64_t GetNFiles() const
virtual TList * GetInputList() const =0
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
TQueryResultManager * fQMgr
virtual int GetPid()
Get process id.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
void ActivateAsyncInput()
Activate the a-sync input handler.
void AskParallel()
Ask the for the number of parallel slaves.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void ShowDataSetCache(const char *dataset=0)
Display the content of the dataset cache, if any (matching 'dataset', if defined).
TMonitor * fAllUniqueMonitor
virtual Int_t ClearCache(const char *uri)
Clear cached information matching uri.
virtual void AddInput(TObject *inp)=0
TFileCollection * GetDataSet(const char *uri, const char *=0)
Get a list of TFileInfo objects describing the files of the specified dataset.
const char * GetDataDir() const
TSocket * GetSocket() const
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
void PrepareInputDataFile(TString &dataFile)
Prepare the file with the input data objects to be sent the master; the objects are taken from the de...
Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Copy the specified macro in the cache directory.
void SetPerfTree(const char *pf="perftree.root", Bool_t withWrks=kFALSE)
Enable/Disable saving of the performance tree.
virtual const char * WorkingDirectory()
Return working directory.
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Bool_t ExistsDataSet(const char *uri)
Returns kTRUE if 'dataset' described by 'uri' exists, kFALSE otherwise.
Collectable string class.
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
virtual ~TProofLite()
Destructor.
virtual const char * GetBuildArch() const
Return the build architecture.
Bool_t RegisterDataSet(const char *dsName, TFileCollection *ds, const char *opt="")
Register the 'dataSet' on the cluster under the current user, group and the given 'dataSetName'...
virtual Bool_t IsValid() const
TFileCollection * GetStagingStatusDataSet(const char *dataset)
Obtains a TFileCollection showing the staging status of the specified dataset.
TString & ReplaceAll(const TString &s1, const TString &s2)
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool=kFALSE)
Set up packages, loaded macros, include and lib paths ...
void SendInputDataFile()
Make sure that the input data objects are available to the workers in a dedicated file in the cache; ...
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
virtual Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
This class implements a data set to be used for PROOF processing.
virtual void SetName(const char *name)
Change (i.e.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void SetParameter(const char *par, const char *value)
Set input list parameter.
virtual Bool_t RemoveDataSet(const char *uri)
Removes the indicated dataset.
The PROOF manager interacts with the PROOF server coordinator to create or destroy a PROOF session...
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
static const TList * GetEnvVars()
Get environemnt variables.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
virtual EExitStatus GetExitStatus() const =0
virtual int MakeDirectory(const char *name)
Make a directory.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
virtual TFileCollection * GetDataSet(const char *uri, const char *server=0)
Utility function used in various methods for user dataset upload.
Int_t PollForNewWorkers()
Simulate dynamic addition, for test purposes.
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
void SetSocket(TSocket *s)
TProofLockPath * fCacheLock
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t LoadPlugin()
Load the plugin library for this handler.
virtual void SetCurrentQuery(TQueryResult *q)=0
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
The TEnv class reads config files, by default named .rootrc.
void ClearDataSetCache(const char *dataset=0)
Clear the content of the dataset cache, if any (matching 'dataset', if defined).
Int_t SetDataSetTreeName(const char *dataset, const char *treename)
Set/Change the name of the default tree.
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
void NotifyStartUp(const char *action, Int_t done, Int_t tot)
Notify setting-up operation message.
TQueryResult * CloneInfo()
Return an instance of TQueryResult containing only the local info fields, i.e.
virtual Bool_t JoinProcess(TList *workers)=0
R__EXTERN TVirtualMutex * gROOTMutex
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
Int_t GetNumberOfBadSlaves() const
Return number of bad slaves.
virtual void RemoveAll()
Remove all sockets from the monitor.
virtual void ShowDataSets(const char *uri="*", const char *opt="")
Prints formatted information about the dataset 'uri'.
const char *const kPROOF_PackageLockFile
This class represents a RFC 3986 compatible URI.
Int_t DrawQueries() const
void Print(Option_t *option="") const
Print status of PROOF-Lite cluster.
R__EXTERN TApplication * gApplication
virtual void DeActivateAll()
De-activate all activated sockets.
void ShowData()
List contents of the data directory in the sandbox.
void ShowDataDir(const char *dirname)
List contents of the data directory 'dirname'.
Long_t ExecPlugin(int nargs, const T &...params)
void ScanPreviousQueries(const char *dir)
Scan the queries directory for the results of previous queries.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
Long64_t GetBytesRead() const
TString & Insert(Ssiz_t pos, const char *s)
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
TSignalHandler * GetSignalHandler() const
TVirtualProofPlayer * fPlayer
Bool_t R_ISREG(Int_t mode)
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
void ResolveKeywords(TString &s, const char *ord, const char *logfile)
Resolve some keywords in 's' <logfilewrk>, <user>, <rootsys>, <cpupin>
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
static const char * GetMacroPath()
Get macro search path. Static utility function.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
const char * GetObjName() const
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
Int_t SavePerfTree(const char *pf=0, const char *qref=0)
Save performance information from TPerfStats to file 'pf'.
TDataSetManagerFile * fDataSetStgRepo
const char * Data() const
TSignalHandler * fIntHandler
Manages an element of a TDSet.
static struct mg_connection * fc(struct mg_context *ctx)
Int_t Update(Long64_t avgsize=-1)
Update accumulated information about the elements of the collection (e.g.
virtual const char * GetDirEntry(void *dirp)
Get a directory entry. Returns 0 if no more entries.
TPluginHandler * fProgressDialog
virtual TList * GetOutputList() const =0
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Int_t CreateSymLinks(TList *files, TList *wrks=0)
Create in each worker sandbox symlinks to the files in the list Used to make the cache information av...
Double_t dot(const TVector2 &v1, const TVector2 &v2)
Bool_t fSendGroupView
list returned by kPROOF_GETSLAVEINFO
void Stop()
Stop the stopwatch.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
void UpdateDialog()
Final update of the progress dialog.
TList * fEnabledPackagesOnClient
THashList * fGlobalPackageDirList
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
TDataSetManager * fDataSetManager
TSlave * CreateSlave(const char *url, const char *ord, Int_t perf, const char *image, const char *workdir)
Create a new TSlave of type TSlave::kSlave.
This code implements the MD5 message-digest algorithm.
The TNamed class is the base class for all named ROOT classes.
Long64_t Process(TDSet *dset, const char *sel, Option_t *o="", Long64_t nent=-1, Long64_t fst=0)
Process a data set (TDSet) using the specified selector (.C) file.
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
const char *const kPROOF_QueryDir
UChar_t mod R__LOCKGUARD2(gSrvAuthenticateMutex)
Int_t Init(const char *masterurl, const char *conffile, const char *confdir, Int_t loglevel, const char *alias=0)
Start the PROOF environment.
virtual Int_t RegisterDataSet(const char *uri, TFileCollection *dataSet, const char *opt)
Register a dataset, perfoming quota checkings, if needed.
void Init(TClassEdit::TInterpreterLookupHelper *helper)
Bool_t CancelStagingDataSet(const char *dataset)
Cancels a dataset staging request.
virtual const char * Getenv(const char *env)
Get environment variable.
TProofLockPath * fPackageLock
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
TList * GetListOfElements() const
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e)
Register TFileCollections in 'out' as datasets according to the rules in 'in'.
Int_t ApplyMaxQueries(Int_t mxq)
Scan the queries directory and remove the oldest ones (and relative dirs, if empty) in such a way onl...
A sorted doubly linked list.
std::vector< std::vector< double > > Data
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
const char * GetUser() const
Int_t Atoi() const
Return integer value of string.
TQueryResult * GetQueryResult(const char *ref=0)
Return pointer to the full TQueryResult instance owned by the player and referenced by 'ref'...
Bool_t IsParallel() const
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
virtual TSocket * Accept(UChar_t Opt=0)
Accept a connection on a server socket.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
const char * GetWorkDir() const
const Bool_t kSortDescending
friend class TProofInputHandler
A container class for query results.
Long64_t GetEntries() const
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo's. In case of error return 0.
Int_t fDynamicStartupNMax
static TList * fgProofEnvList
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
void Emit(const char *signal)
Acitvate signal without args.
void SetTermTime(Float_t termtime)
TObject * GetEntryList() const
TMonitor * fUniqueMonitor
const char *const kPROOF_ConfFile
Bool_t RemoveDataSet(const char *group, const char *user, const char *dsName)
Removes the indicated dataset.
Int_t RemoveDataSet(const char *uri, const char *=0)
Remove the specified dataset from the PROOF cluster.
virtual Int_t ReadFile(const char *fname, EEnvLevel level)
Read and parse the resource file for a certain level.
const char *const kPROOF_QueryLockFile
Int_t GetNumberOfActiveSlaves() const
Return number of active slaves, i.e.
virtual void AddQueryResult(TQueryResult *q)=0
void ShowDataSets(const char *uri="", const char *=0)
Shows datasets in locations that match the uri By default shows the user's datasets and global ones...
virtual TMap * GetSubDataSets(const char *uri, const char *excludeservers)
Partition dataset 'ds' accordingly to the servers.
Long64_t DrawSelect(TDSet *dset, const char *varexp, const char *selection="", Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Execute the specified drawing action on a data set (TDSet).
void QueryResultReady(const char *ref)
Notify availability of a query result.
const char * GetFileName() const
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Class managing the query-result area.
R__EXTERN TSystem * gSystem
const TString GetUri() const
Returns the whole URI - an implementation of chapter 5.3 component recomposition. ...
const char *const kPROOF_DataSetDir
TMonitor * fCurrentMonitor
Int_t SetupWorkers(Int_t opt=0, TList *wrks=0)
Start up PROOF workers.
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Int_t GetLogLevel() const
virtual TObject * Remove(TObject *obj)
Remove object from the list.
const char *const kPROOF_ConfDir
Bool_t Gets(FILE *fp, Bool_t chop=kTRUE)
Read one line from the stream, including the , or until EOF.
virtual Bool_t ExistsDataSet(const char *uri)
Checks if the indicated dataset exits.
Bool_t ParseUri(const char *uri, TString *dsGroup=0, TString *dsUser=0, TString *dsName=0, TString *dsTree=0, Bool_t onlyCurrent=kFALSE, Bool_t wildcards=kFALSE)
Parses a (relative) URI that describes a DataSet on the cluster.
void ClearCache(const char *file=0)
Remove files from all file caches.
Int_t fDynamicStartupStep
virtual Int_t ShowCache(const char *uri)
Show cached information matching uri.
void SetActive(Bool_t=kTRUE)
Int_t WriteDataSet(const char *group, const char *user, const char *dsName, TFileCollection *dataset, UInt_t option=0, TMD5 *checksum=0)
Writes indicated dataset.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t TestBit(UInt_t f) const
const char *const kPROOF_CacheLockFile
Int_t ScanDataSet(const char *uri, const char *opt)
Scans the dataset indicated by 'uri' following the 'opts' directives.
char * Form(const char *fmt,...)
void SetRunning(Int_t startlog, const char *par, Int_t nwrks)
Call when running starts.
TServerSocket * fServSock
virtual const char * GetName() const
Returns name of object.
Bool_t SetFragment(const TString &fragment)
Set fragment component of URI: fragment = *( pchar / "/" / "?" ).
TList * GetListOfQueries(Option_t *opt="")
Get the list of queries.
Bool_t fProgressDialogStarted
virtual Int_t Exec(const char *shellcmd)
Execute a command.
const Int_t kPROOF_Protocol
Int_t SendGroupView()
Send to all active slaves servers the current slave group size and their unique id.
Bool_t FinalizeQuery(TProofQueryResult *pq, TProof *proof, TVirtualProofPlayer *player)
Final steps after Process() to complete the TQueryResult instance.
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
virtual const char * GetBuildCompilerVersion() const
Return the build compiler version.
void SetName(const char *name)
static Int_t GetNumberOfWorkers(const char *url=0)
Static method to determine the number of workers giving priority to users request.
virtual void FreeDirectory(void *dirp)
Free a directory.
void AddInput(TObject *obj)
Add objects that might be needed during the processing of the selector (see Process()).
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
TMap * GetDataSets(const char *uri="", const char *=0)
lists all datasets that match given uri
Int_t VerifyDataSetParallel(const char *uri, const char *optStr)
Internal function for parallel dataset verification used TProof::VerifyDataSet and TProofLite::Verify...
virtual void RemoveQueryResult(const char *ref)=0
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Int_t AssertPath(const char *path, Bool_t writable)
Make sure that 'path' exists; if 'writable' is kTRUE, make also sure that the path is writable...
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Int_t CreateSandbox()
Create the sandbox for this session.
TString & Remove(Ssiz_t pos)
Float_t GetRealTime() const
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action)
Extract from opt information about output handling settings.
R__EXTERN TProof * gProof
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
virtual Int_t GetSize() const
void SetFeedback(TString &opt, TString &optfb, Int_t action)
Extract from opt in optfb information about wanted feedback settings.
Int_t GoParallel(Int_t nodes, Bool_t accept=kFALSE, Bool_t random=kFALSE)
Go in parallel mode with at most "nodes" slaves.
virtual const char * HostName()
Return the system's host name.
virtual int Symlink(const char *from, const char *to)
Create a symbolic link from file1 to file2.
Int_t Lock()
Locks the directory.
TList * fNonUniqueMasters
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Int_t CleanupSandbox()
Remove old sessions dirs keep at most 'Proof.MaxOldSessions' (default 10)
const char * GetOrdinal() const
TList * fEnabledPackagesOnCluster
virtual void Reset()
Reset or initialize access to the elements.
void SetRunStatus(ERunStatus rst)
void RemoveQuery(TQueryResult *qr, Bool_t soft=kFALSE)
Remove everything about query qr.
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Int_t GetSandbox(TString &sb, Bool_t assert=kFALSE, const char *rc=0)
Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
TQueryResult version adapted to PROOF neeeds.
static TMap * GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
Get a map {server-name, list-of-files} for collection 'fc' to be used in TPacketizerFile.
void SaveQuery(TProofQueryResult *qr, const char *fout=0)
Save current status of query 'qr' to file name fout.
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex 'this' was created with.
Mother of all ROOT objects.
Float_t GetCpuTime() const
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Int_t GetNumberOfSlaves() const
Return number of slaves as described in the config file.
Bool_t ExistsDataSet(const char *group, const char *user, const char *dsName)
Checks if the indicated dataset exits.
Int_t InitDataSetManager()
Initialize the dataset manager from directives or from defaults Return 0 on success, -1 on failure.
Bool_t R_ISDIR(Int_t mode)
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
R__EXTERN TProofServ * gProofServ
virtual void Add(TObject *obj)
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
TProofOutputList fOutputList
R__EXTERN const char * gRootDir
TProofLockPath * fQueryLock
TFileCollection * GetDataSet(const char *uri, const char *srv=0)
Utility function used in various methods for user dataset upload.
Int_t CleanupQueriesDir()
Remove all queries results referring to previous sessions.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
void AskStatistics()
Ask the for the statistics of the slaves.
Int_t GetNumberOfInactiveSlaves() const
Return number of inactive slaves, i.e.
TTree * GetTreeHeader(TDSet *tdset)
Creates a tree header (a tree with nonexisting files) object for the DataSet.
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
virtual int CopyFile(const char *from, const char *to, Bool_t overwrite=kFALSE)
Copy a file.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Int_t GetClientProtocol() const
virtual Long64_t GetEntries() const
TList * fAvailablePackages
A TTree object has a header with a name and a title.
Class describing a generic file including meta information.
Int_t SendInitialState()
Transfer the initial (i.e.
const AParamType & GetVal() const
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
TList * fTerminatedSlaveInfos
virtual Bool_t IsValid() const
TProofQueryResult * MakeQueryResult(Long64_t nent, const char *opt, Long64_t fst, TDSet *dset, const char *selec)
Create a TProofQueryResult instance for this query.
Int_t Remove(const char *ref, Bool_t all)
Handle remove request.
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client...
Int_t Substitute(TString &s, const TString &r, Bool_t doDollarSubst=kTRUE)
Substitute matching part of s with r, dollar back-ref substitution is performed if doDollarSubst is t...
Class describing a PROOF worker server.
void FindUniqueSlaves()
Add to the fUniqueSlave list the active slaves that have a unique (user) file system image...
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
void ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst, Long64_t ent)
Reset progress dialog.
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
TList * PreviousQueries() const
virtual TMap * GetDataSets(const char *uri, UInt_t=TDataSetManager::kExport)
Returns all datasets for the <group> and <user> specified by <uri>.
void ShowCache(Bool_t all=kFALSE)
List contents of file cache.
virtual char * ConcatFileName(const char *dir, const char *name)
Concatenate a directory and a file name. User must delete returned string.
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
virtual Int_t SetupServ(Int_t stype, const char *conffile)
Init a PROOF slave object.
TMonitor * fActiveMonitor
virtual TList * GetListOfResults() const =0
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Long64_t fLastPollWorkers_s
Int_t CopyMacroToCache(const char *macro, Int_t headerRequired=0, TSelector **selector=0, Int_t opt=0, TList *wrks=0)
Copy a macro, and its possible associated .h[h] file, to the cache directory, from where the workers ...
const char *const kPROOF_CacheDir
const char *const kPROOF_PackDir
void SetInputHandler(TFileHandler *ih)
Adopt and register input handler for this slave.
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.