Attaches to a PROOF session, possibly at the indicated URL.
If no existing PROOF session is found and no URL is given, try to start a local PROOF session.
Arguments: 'url' URL of the master where to start/attach the PROOF session; this is also the place where to force creation of a new session, if needed (use option 'N', e.g. "proof://mymaster:myport/?N")
The following arguments apply to xrootd responding at 'refloc' only: 'nwrks' Number of workers to be started. [] 'dir' Directory to be used for the files and working areas []. When starting a new instance of the daemon this directory is cleaned with 'rm -fr'. If 'dir' is null, the default is used: '/tmp/<user>/.getproof' 'opt' Defines what to do if an existing xrootd uses the same ports; possible options are: "ask", ask the user; "force", kill the xrootd and start a new one; if any other string is specified the existing xrootd will be used ["ask"]. NB: for a change in 'nwrks' to be effective you need to specify opt = "force" 'dyn' This flag can be used to switch on dynamic, per-job worker setup scheduling [kFALSE]. 'tutords' This flag can be used to force a dataset dir under the tutorial dir [kFALSE]
It is possible to trigger the automatic valgrind setup by defining the env GETPROOF_VALGRIND. E.g. to run the master in valgrind do
(or $ export GETPROOF_VALGRIND="valgrind=master valgrind_opts:--leak-check=full"
to set some options) before running getProof. Note that 'getProof' is also called by 'stressProof', so this holds for 'stressProof' runs too.
#include "Getline.h"
int getDebugEnum(const char *what);
Int_t getXrootdPid(
Int_t port,
const char *subdir =
"xpdtut");
Int_t checkXrootdAt(
Int_t port,
const char *host =
"localhost");
Int_t checkXproofdAt(
Int_t port,
const char *host =
"localhost");
Int_t killXrootdAt(
Int_t port,
const char *
id = 0);
typedef struct {
int third;
int fourth;
int fifth;
} clnt_HS_t;
typedef struct {
int msglen;
int protover;
int msgval;
} srv_HS_t;
const char *refloc = "proof://localhost:40000";
TProof *getProof(
const char *url =
"proof://localhost:40000",
Int_t nwrks = -1,
const char *dir = 0,
{
#ifndef WIN32
while (
s.Tokenize(t, from ,
" ")) {
vopts = t;
else
vopt = t;
}
if (vopts.
IsNull()) vopts =
"valgrind_opts:--leak-check=full --track-origins=yes";
Printf(
"getProof: valgrind run: '%s' (opts: '%s')", vopt.
Data(), vopts.
Data());
}
#endif
TUrl uu(url), uref(refloc);
Bool_t ext = (strcmp(uu.GetHost(), uref.GetHost()) ||
if (ext && url) {
if (!strcmp(url, "lite://") || !url[0]) {
if (!url[0]) uu.SetUrl("lite://");
if (dir && strlen(dir) > 0)
gEnv->
SetValue(
"Proof.Sandbox", dir);
if (nwrks > 0) {
uu.SetOptions(
Form(
"workers=%d", nwrks));
swrk.Form("%d workers", nwrks);
}
Printf(
"getProof: trying to open a PROOF-Lite session with %s", swrk.Data());
} else {
Printf(
"getProof: trying to open a session on the external cluster at '%s'", url);
}
if (ext && !lite && nwrks > 0) {
Printf(
"getProof: WARNING: started/attached a session on external cluster (%s):"
" 'nwrks=%d' ignored", url, nwrks);
}
if (ext && !lite && dir && strlen(dir) > 0) {
Printf(
"getProof: WARNING: started/attached a session on external cluster (%s):"
" 'dir=\"%s\"' ignored", url, dir);
}
if (ext && !strcmp(opt,"force")) {
Printf(
"getProof: WARNING: started/attached a session on external cluster (%s):"
" 'opt=\"force\"' ignored", url);
}
if (ext && dyn) {
Printf(
"getProof: WARNING: started/attached a session on external cluster (%s):"
" 'dyn=kTRUE' ignored", url);
}
} else {
Printf(
"getProof: could not get/start a valid session at %s", url);
if (p) delete p;
p = 0;
}
return p;
}
#ifdef WIN32
Printf(
"getProof: local PROOF not yet supported on Windows, sorry!");
return p;
#else
Printf(
"getProof: unable to create the working area at the requested path: '%s'"
" - cannot continue", tutdir.
Data());
} else {
Printf(
"getProof: working area at the requested path '%s'"
" created but it is not writable - cannot continue", tutdir.
Data());
}
return p;
}
} else {
Printf(
"getProof: working area at the requested path '%s'"
" exists but is not writable - cannot continue", tutdir.
Data());
return p;
}
}
} else {
Printf(
"getProof: working area not specified temp ");
tutdir="/tmp";
if (!ug) {
Printf(
"getProof: could not get user info");
return p;
}
tutdir += "/.getproof";
Printf(
"getProof: unable to get a writable working area (tried: %s)"
" - cannot continue", tutdir.
Data());
return p;
}
}
}
Printf(
"getProof: working area (tutorial dir): %s", tutdir.
Data());
if (tutords) {
datasetdir =
Form(
"%s/dataset", tutdir.
Data());
Printf(
"getProof: unable to get a writable dataset directory (tried: %s)"
" - cannot continue", datasetdir.
Data());
return p;
}
Printf(
"getProof: dataset dir: %s", datasetdir.
Data());
}
}
u.SetProtocol("proof");
if (!strcmp(uu.GetHost(), uref.GetHost()) && (uu.GetPort() != uref.GetPort()))
u.SetPort(uu.GetPort());
Int_t lportp = u.GetPort();
Int_t lportx = lportp + 1;
if ((rc = checkXproofdAt(lportp)) == 1) {
Printf(
"getProof: something else the a XProofd service is running on"
" port %d - cannot continue", lportp);
return p;
} else if (rc == 0) {
pid = getXrootdPid(lportx);
Printf(
"getProof: daemon found listening on dedicated ports {%d,%d} (pid: %d)",
lportx, lportp, pid);
if (isatty(0) == 0 || isatty(1) == 0) {
} else {
if (!strcmp(opt,"ask")) {
char *answer = (char *) Getline("getProof: would you like to restart it (N,Y)? [N] ");
if (answer && (answer[0] == 'Y' || answer[0] == 'y'))
}
}
if (!strcmp(opt,"force"))
if (restart) {
Printf(
"getProof: cleaning existing instance ...");
cmd =
Form(
"kill -9 %d", pid);
Printf(
"getProof: problems stopping xrootd process %d (%d)", pid, rc);
Printf(
"getProof: wait 5 secs so that previous connections are cleaned ...");
}
}
if (restart) {
if (!xpd) {
Printf(
"getProof: xproofd not found: please check the environment!");
return p;
}
cmd =
Form(
"rm -fr %s/xpdtut %s %s %s %s", tutdir.
Data(), workarea.
Data(),
xpdcf.Data(), xpdpid.Data(), proofsessions.Data());
FILE *fcf = fopen(xpdcf.Data(), "w");
if (!fcf) {
Printf(
"getProof: could not create config file for XPD (%s)", xpdcf.Data());
return p;
}
fprintf(fcf,
"### Use admin path at %s/admin to avoid interferences with other users\n", tutdir.
Data());
fprintf(fcf,
"xrd.adminpath %s/admin\n", tutdir.
Data());
#if defined(R__MACOSX)
fprintf(fcf,"### Use dedicated socket path under /tmp to avoid length problems\n");
fprintf(fcf,"xpd.sockpathdir /tmp/xpd-sock\n");
#endif
fprintf(fcf,"### Load the XrdProofd protocol on port %d\n", lportp);
fprintf(fcf,"xrd.protocol xproofd libXrdProofd.so\n");
fprintf(fcf,"xpd.port %d\n", lportp);
if (nwrks > 0) {
fprintf(fcf,"### Force number of local workers\n");
fprintf(fcf,"xpd.localwrks %d\n", nwrks);
}
fprintf(fcf,"### Root path for working dir\n");
fprintf(fcf,
"xpd.workdir %s\n", workarea.
Data());
fprintf(fcf,"### Allow different users to connect\n");
fprintf(fcf,"xpd.multiuser 1\n");
fprintf(fcf,"### Limit the number of query results kept in the master sandbox\n");
fprintf(fcf,"xpd.putrc ProofServ.UserQuotas: maxquerykept=2\n");
fprintf(fcf,"### Limit the number of sessions kept in the sandbox\n");
fprintf(fcf,"xpd.putrc Proof.MaxOldSessions: 1\n");
if (tutords) {
fprintf(fcf,"### Use dataset directory under the tutorial dir\n");
fprintf(fcf,
"xpd.datasetsrc file url:%s opt:-Cq:Av:As:\n", datasetdir.
Data());
}
if (dyn) {
fprintf(fcf,"### Use dynamic, per-job scheduling\n");
fprintf(fcf,"xpd.putrc Proof.DynamicStartup 1\n");
}
fprintf(fcf,"### For internal file serving use the xrootd protocol on the same port\n");
fprintf(fcf,"xpd.xrootd libXrdXrootd-4.so\n");
fprintf(fcf,"### Set the local data server for the temporary output files accordingly\n");
fprintf(fcf,
"xpd.putenv LOCALDATASERVER=root://%s:%d\n",
gSystem->
HostName(), lportp);
fclose(fcf);
Printf(
"getProof: xproofd config file at %s", xpdcf.Data());
Printf(
"getProof: xproofd log file at %s", xpdlogprt.Data());
cmd =
Form(
"%s -c %s -b -l %s -n xpdtut -p %d",
xpd, xpdcf.Data(), xpdlog.Data(), lportp);
Printf(
"(NB: any error line from XrdClientSock::RecvRaw and XrdClientMessage::ReadRaw should be ignored)");
Printf(
"getProof: problems starting xproofd (%d)", rc);
return p;
}
delete[] xpd;
Printf(
"getProof: waiting for xproofd to start ...");
pid = getXrootdPid(lportp);
Printf(
"getProof: xproofd pid: %d", pid);
FILE *fpid = fopen(xpdpid.Data(), "w");
if (!fpid) {
Printf(
"getProof: could not create pid file for XPD");
} else {
fprintf(fpid,"%d\n", pid);
fclose(fpid);
}
}
Printf(
"getProof: start / attach the PROOF session ...");
Printf(
"getProof: starting local session failed");
if (p) delete p;
p = 0;
return p;
}
return p;
#endif
}
Int_t getXrootdPid(
Int_t port,
const char *subdir)
{
#ifdef WIN32
Printf(
"getXrootdPid: Xrootd/Proof not supported on Windows, sorry!");
return -1;
#else
#if defined(__sun)
const char *com = "-eo pid,comm";
#elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
const char *com = "ax -w -w";
#else
const char *com = "-w -w -eo pid,command";
#endif
if (subdir && strlen(subdir) > 0) {
cmd.
Form(
"ps %s | grep xrootd | grep \"\\-p %d\" | grep %s", com, port, subdir);
} else {
cmd.
Form(
"ps %s | grep xrootd | grep \"\\-p %d\"", com, port);
}
if (fp) {
char line[2048], rest[2048];
sscanf(
line,
"%d %s", &pid, rest);
break;
}
}
return pid;
#endif
}
{
Printf(
"checkXrootdAt: could not open connection to %s:%d", host, port);
return -1;
}
clnt_HS_t initHS;
memset(&initHS, 0, sizeof(initHS));
int len = sizeof(initHS);
int readCount =
s.RecvRaw(&
type, len);
if (readCount != len) {
Printf(
"checkXrootdAt: 1st: wrong number of bytes read: %d (expected: %d)",
readCount, len);
return 1;
}
srv_HS_t xbody;
len = sizeof(xbody);
readCount =
s.RecvRaw(&xbody, len);
if (readCount != len) {
Printf(
"checkXrootdAt: 2nd: wrong number of bytes read: %d (expected: %d)",
readCount, len);
return 1;
}
Printf(
"checkXrootdAt: server is ROOTD");
return 1;
} else {
Printf(
"checkXrootdAt: unknown server type: %d",
type);
return 1;
}
return 0;
}
Int_t checkXproofdAt(
Int_t port,
const char *host)
{
Printf(
"checkXproofdAt: could not open connection to %s:%d", host, port);
return -1;
}
clnt_HS_t initHS;
memset(&initHS, 0, sizeof(initHS));
int len = sizeof(initHS);
int dum[2];
s.SendRaw(&dum[0],
sizeof(dum));
int readCount =
s.RecvRaw(&
type, len);
if (readCount != len) {
Printf(
"checkXproofdAt: 1st: wrong number of bytes read: %d (expected: %d)",
readCount, len);
return 1;
}
srv_HS_t xbody;
len = sizeof(xbody);
readCount =
s.RecvRaw(&xbody, len);
if (readCount != len) {
Printf(
"checkXproofdAt: 2nd: wrong number of bytes read: %d (expected: %d)",
readCount, len);
return 1;
}
xbody.protover =
net2host(xbody.protover);
Printf(
"checkXproofdAt: server is PROOFD");
return 1;
} else {
Printf(
"checkXproofdAt: unknown server type: %d",
type);
return 1;
}
return 0;
}
{
#ifdef WIN32
Printf(
"startXrootdAt: Xrootd not supported on Windows, sorry!");
return -1;
#else
if ((rc = checkXrootdAt(port)) == 1) {
Printf(
"startXrootdAt: some other service running on port %d - cannot proceed ", port);
return -1;
} else if (rc == 0) {
if (force) {
} else {
Printf(
"startXrootdAt: xrootd service already available on port %d: ", port);
char *answer = (char *) Getline("startXrootdAt: would you like to restart it (N,Y)? [N] ");
if (answer && (answer[0] == 'Y' || answer[0] == 'y')) {
}
}
if (restart) {
Printf(
"startXrootdAt: cleaning existing instance ...");
Int_t pid = getXrootdPid(port,
"xrd-basic");
Printf(
"startXrootdAt: problems stopping xrootd process %d (%d)", pid, rc);
}
}
if (restart) {
Printf(
"startXrootdAt: could not assert dir for log file");
return -1;
}
}
cmd.
Form(
"xrootd -d -p %d -b -l /tmp/xrd-basic/xrootd.log", port);
if (exportdirs && strlen(exportdirs) > 0) {
while (dirs.Tokenize(
d, from,
" ")) {
cmd += " ";
}
}
}
Printf(
"startXrootdAt: problems executing starting command (%d)", rc);
return -1;
}
Printf(
"startXrootdAt: waiting for xrootd to start ...");
if ((rc = checkXrootdAt(port)) != 0) {
Printf(
"startXrootdAt: xrootd service not available at %d (rc = %d) - startup failed",
port, rc);
return -1;
}
Printf(
"startXrootdAt: basic xrootd started!");
}
return 0;
#endif
}
{
#ifdef WIN32
Printf(
"killXrootdAt: Xrootd not supported on Windows, sorry!");
return -1;
#else
if ((pid = getXrootdPid(port, id)) > 0) {
Printf(
"killXrootdAt: problems stopping xrootd process %d (%d)", pid, rc);
}
return rc;
#endif
}
int getDebugEnum(const char *what)
{
int rcmask = 0;
int from = 0;
while (sws.Tokenize(sw, from , "|")) {
if (sw == "None") {
} else if (sw == "Packetizer") {
} else if (sw == "Loop") {
} else if (sw == "Selector") {
} else if (sw == "Output") {
} else if (sw == "Input") {
} else if (sw == "Global") {
} else if (sw == "Package") {
} else if (sw == "Feedback") {
} else if (sw == "Condor") {
} else if (sw == "Draw") {
} else if (sw == "Asyn") {
} else if (sw == "Cache") {
} else if (sw == "Collect") {
} else if (sw == "Dataset") {
} else if (sw == "Submerger") {
} else if (sw == "Monitoring") {
} else if (sw == "All") {
Printf(
"WARNING: requested debug enum name '%s' does not exist: assuming 'All'", sw.
Data());
}
}
return rcmask;
}
UShort_t host2net(UShort_t x)
UShort_t net2host(UShort_t x)
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
R__EXTERN TSystem * gSystem
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
This class controls a Parallel ROOT Facility, PROOF, cluster.
static TProof * Open(const char *url=0, const char *conffile=0, const char *confdir=0, Int_t loglevel=0)
Start a PROOF session on a specific cluster.
static void AddEnvVar(const char *name, const char *value)
Add an variable to the list of environment variables passed to proofserv on the master and slaves.
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
const char * Data() const
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TString & Remove(Ssiz_t pos)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
virtual const char * Getenv(const char *env)
Get environment variable.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
virtual Int_t Exec(const char *shellcmd)
Execute a command.
virtual FILE * OpenPipe(const char *command, const char *mode)
Open a pipe.
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
virtual int ClosePipe(FILE *pipe)
Close the pipe.
virtual const char * HostName()
Return the system's host name.
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
virtual Int_t GetUid(const char *user=0)
Returns the user's id. If user = 0, returns current user's id.
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
This class represents a WWW compatible URL.
static constexpr double us
static constexpr double s
static constexpr double second