22 #include "RConfigure.h" 35 #include <sys/types.h> 48 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \ 49 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \ 50 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3))) 52 #define lockf(fd, op, sz) flock((fd), (op)) 54 #define F_LOCK (LOCK_EX | LOCK_NB) 57 #define F_ULOCK LOCK_UN 88 #include "compiledata.h" 131 static const char *__crashreporter_info__ = 0;
132 asm(
".desc ___crashreporter_info__, 0x10");
168 Bool_t TProofServTerminationHandler::Notify()
170 Printf(
"Received SIGTERM: terminating");
171 fServ->HandleTermination();
189 Bool_t TProofServInterruptHandler::Notify()
191 fServ->HandleUrgentData();
212 Bool_t TProofServSigPipeHandler::Notify()
214 fServ->HandleSigPipe();
227 Bool_t ReadNotify() {
return Notify(); }
233 Bool_t TProofServInputHandler::Notify()
235 fServ->HandleSocketInput();
262 Error(
"TProofServLogHandler",
"executing command in pipe");
266 Error(
"TProofServLogHandler",
267 "undefined command (%p) or socket (%p)", (
int *)cmd, s);
285 Error(
"TProofServLogHandler",
"undefined file (%p) or socket (%p)", f, s);
298 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
315 while (fgets(line,
sizeof(line),
fFile)) {
316 if ((plf = strchr(line,
'\n')))
320 if (
fPfx.Length() > 0) {
322 log.Form(
"%s: %s",
fPfx.Data(),
line);
323 }
else if (
fgPfx.Length() > 0) {
358 const char *pfx,
Bool_t on)
363 if (fExecHandler->IsValid()) {
366 Error(
"TProofServLogHandlerGuard",
"invalid handler");
370 Error(
"TProofServLogHandlerGuard",
"undefined command");
378 const char *pfx,
Bool_t on)
383 if (fExecHandler->IsValid()) {
386 Error(
"TProofServLogHandlerGuard",
"invalid handler");
390 Error(
"TProofServLogHandlerGuard",
"undefined file");
399 if (fExecHandler && fExecHandler->IsValid()) {
424 printf(
"TShutdownTimer::Notify: checking activity on the input socket\n");
434 printf(
"TShutdownTimer::Notify: input socket: %p: did not show any activity" 435 " during the last %d mins: aborting\n", xs,
fTimeout);
441 printf(
"TShutdownTimer::Notify: input socket: %p: show activity" 442 " %ld secs ago\n", xs, dt / 60000);
457 fChildren->SetOwner(
kTRUE);
470 fChildren =
new TList;
472 spid.Form(
"%d", pid);
485 TIter nxp(fChildren);
492 pid = waitpid(p->
GetVal(), &status, WNOHANG);
493 }
while (pid < 0 && errno == EINTR);
496 pid = _cwait(&status, (intptr_t)p->
GetVal(), 0);
498 if (pid > 0 && pid == p->
GetVal()) {
500 fChildren->Remove(p);
507 if (!fChildren || fChildren->GetSize() <= 0) {
522 Info (
"Notify",
"session idle for more then %lld secs: terminating",
Long64_t(
fTime)/1000);
528 Warning(
"Notify",
"problems updating session status (errno: %d)", -uss_rc);
532 msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n" 533 "// Please IGNORE any error message possibly displayed below\n//",
536 msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
544 Warning(
"Notify",
"fProofServ undefined!");
571 Printf(
"proofserv: command line testing: OK");
583 if (fgVirtMemMax < 0 && gSystem->Getenv(
"PROOF_VIRTMEMMAX")) {
585 if (mmx < kMaxLong && mmx > 0)
589 if (fgVirtMemMax < 0 && gSystem->Getenv(
"ROOTPROOFASHARD")) {
591 if (mmx < kMaxLong && mmx > 0)
596 if (fgResMemMax < 0 && gSystem->Getenv(
"PROOF_RESMEMMAX")) {
598 if (mmx < kMaxLong && mmx > 0)
606 Warning(
"TProofServ",
"requested memory fraction threshold to stop processing" 607 " (MemStop) out of range [0,1] - ignoring");
611 Warning(
"TProofServ",
"requested memory fraction threshold for warning and finer monitoring" 612 " (MemHWM) out of range [0,MemStop] - ignoring");
619 if ((
gEnv->
GetValue(
"Proof.GdbHook",0) == 3 && !test) ||
626 if (argc && *argc >= 4)
627 if (!strcmp(argv[3],
"test"))
631 if (argc && *argc < 2) {
632 Error(
"TProofServ",
"Must have at least 1 arguments (see proofd).");
719 TString logmx =
gEnv->
GetValue(
"ProofServ.LogFileMaxSize",
"");
720 if (!logmx.IsNull()) {
722 if (!logmx.IsDigit()) {
723 if (logmx.EndsWith(
"K")) {
726 }
else if (logmx.EndsWith(
"M")) {
729 }
if (logmx.EndsWith(
"G")) {
734 if (logmx.IsDigit()) {
740 Warning(
"TProofServ",
"bad formatted log file size limit ignored: '%s'", logmx.Data());
755 TString slog =
gEnv->
GetValue(
"ProofServ.LogToSysLog",
"");
756 if (!(slog.IsNull())) {
757 if (slog.IsDigit()) {
760 char c = (slog[0] ==
'M' || slog[0] ==
'm') ?
'm' :
'a';
761 c = (slog[0] ==
'W' || slog[0] ==
'w') ?
'w' : c;
768 Warning(
"TProofServ",
"request for syslog logging ineffective!");
783 if (enableSchemaEvolution) {
786 Info(
"TProofServ",
"automatic schema evolution in TMessage explicitly disabled");
799 if (opensock.Length() <= 0)
801 Int_t sock = opensock.Atoi();
803 Fatal(
"CreateServer",
"Invalid socket descriptor number (%d)", sock);
827 Info(
"CreateServer",
"Service %s ConfDir %s IsMaster %d\n",
841 TString pfx = (
IsMaster() ?
"Mst-" :
"Wrk-");
914 TString master =
"proof://__master__";
924 Error(
"CreateServer",
"no plugin manager found");
933 Error(
"CreateServer",
"no plugin found for TProof with a" 934 " config file of '%s'",
fConfFile.Data());
942 Error(
"CreateServer",
"plugin for TProof could not be loaded");
954 Error(
"CreateServer",
"plugin for TProof could not be executed");
977 msg.Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n" 978 " This may generate compatibility problems between streamed objects.\n" 979 " The advise is to move to ROOT >= 5.21/02 .");
991 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
993 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
1037 motdname +=
"/etc/proof/noproof";
1039 if ((motd = fopen(motdname,
"r"))) {
1042 while ((c = getc(motd)) != EOF)
1051 lastname = TString(
GetWorkDir()) +
"/.prooflast";
1054 Long_t id, flags, modtime, lasttime = 0;
1059 if (time(0) - lasttime > (time_t)86400)
1068 motdname +=
"/etc/proof/motd";
1071 if (modtime > lasttime || show) {
1072 if ((motd = fopen(motdname,
"r"))) {
1075 while ((c = getc(motd)) != EOF)
1085 Int_t fd = creat(last, 0600);
1086 if (fd >= 0)
close(fd);
1100 Error(
"Get",
"problems sending request");
1118 Error(
"Get",
"command %d cannot be executed while processing", what);
1119 }
else if (xrc == -2) {
1120 Error(
"Get",
"unknown command %d ! Protocol error?", what);
1138 Info(
"RestartComputeTime",
"compute time restarted after %f secs (%d entries)",
1167 Error(
"GetNextPacket",
"no progress status object");
1184 req << cacheSize << learnent;
1189 req << totalEntries;
1195 PDB(kLoop, 2) status->Print();
1196 Info(
"GetNextPacket",
"cacheSize: %lld, learnent: %d", cacheSize, learnent);
1204 << bytesRead << totalEntries;
1212 Error(
"GetNextPacket",
"Send() failed, returned %d", rc);
1220 Warning(
"GetNextPacket",
"problems saving partial results");
1231 Error(
"GetNextPacket",
"Recv() failed, returned %d", rc);
1236 TString
file, dir, obj;
1247 PDB(kLoop, 2)
Info(
"GetNextPacket",
"'%s' '%s' '%s' %lld %lld",
1248 e->GetFileName(), e->GetDirectory(),
1249 e->GetObjName(), e->GetFirst(),e->GetNum());
1251 PDB(kLoop, 2)
Info(
"GetNextPacket",
"Done");
1261 PDB(kLoop, 2)
Info(
"GetNextPacket:kPROOF_STOPPROCESS",
"received");
1267 Error(
"GetNextPacket",
"command %d cannot be executed while processing", what);
1268 }
else if (xrc == -2) {
1269 Error(
"GetNextPacket",
"unknown command %d ! Protocol error?", what);
1288 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3],
"test")) ?
kTRUE :
kFALSE;
1291 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1292 Printf(
"proofserv: command line testing: OK");
1296 if (!argc || (argc && *argc <= 1)) {
1297 Fatal(
"GetOptions",
"Must be started from proofd with arguments");
1301 if (!strcmp(argv[1],
"proofserv")) {
1304 }
else if (!strcmp(argv[1],
"proofslave")) {
1308 Fatal(
"GetOptions",
"Must be started as 'proofserv' or 'proofslave'");
1316 Fatal(
"GetOptions",
"ROOTCONFDIR shell variable not set");
1347 Error(
"HandleSocketInput",
"retrieving message from input socket");
1368 emsg.Form(
"HandleSocketInput: command %d cannot be executed while processing", what);
1369 }
else if (rc == -3) {
1370 emsg.Form(
"HandleSocketInput: message %d undefined! Protocol error?", what);
1372 emsg.Form(
"HandleSocketInput: unknown command %d! Protocol error?", what);
1375 }
else if (rc == 2) {
1379 Info(
"HandleSocketInput",
"message of type %d enqueued; sz: %d",
1389 Info(
"HandleSocketInput",
"processing enqueued message of type %d; left: %d",
1399 }
catch (std::bad_alloc &) {
1401 exmsg.Form(
"caught exception 'bad_alloc' (memory leak?) %s %lld",
1403 }
catch (std::exception &exc) {
1405 exmsg.Form(
"caught standard exception '%s' %s %lld",
1409 exmsg.Form(
"caught exception throwing %d %s %lld",
1411 }
catch (
const char *str) {
1413 exmsg.Form(
"caught exception throwing '%s' %s %lld",
1417 exmsg.Form(
"caught exception <unknown> %s %lld",
1422 if (!exmsg.IsNull()) {
1424 Error(
"HandleSocketInput",
"%s", exmsg.Data());
1435 exmsg.Form(
"high-memory footprint detected during Process(...) - terminating");
1436 Error(
"HandleSocketInput",
"%s", exmsg.Data());
1451 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1452 SendAsynMessage(
" *** No workers left: cannot continue! Terminating ... *** ");
1479 if (!mess)
return -3;
1483 Info(
"HandleSocketInput",
"processing message type %d from '%s'",
1488 Int_t rc = 0, lirc = 0;
1506 Info(
"HandleSocketInput:kMESS_CINT",
"processing: %s...", str);
1525 if (pslb) slb = str;
1557 sscanf(str,
"%d %u", &
fLogLevel, &mask);
1562 Info(
"HandleSocketInput:kPROOF_LOGLEVEL",
"debug level set to %d (mask: 0x%x)",
1593 Warning(
"HandleSocketInput:kPROOF_STATUS",
1594 "kPROOF_STATUS message is obsolete");
1596 Warning(
"HandleSocketInput:kPROOF_STATUS",
"problem sending of request");
1613 Info(
"HandleSocketInput:kPROOF_STOP",
"request for worker %s", ord.Data());
1617 Info(
"HandleSocketInput:kPROOF_STOP",
"got request to terminate");
1630 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
"enter");
1637 Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
1638 "recursive mode: enter %d, %ld", aborted, timeout);
1652 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_PROCESS",
"enter");
1662 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_SENDOUTPUT",
1663 "worker was asked to send output to master");
1666 Error(
"HandleSocketInput:kPROOF_SENDOUTPUT",
"problems sending output list");
1711 Info(
"HandleSocketInput:kPROOF_MAXQUERIES",
"Enter");
1723 Info(
"HandleSocketInput:kPROOF_CLEANUPSESSION",
"Enter");
1727 Printf(
"Session %s cleaned up", stag.Data());
1729 Printf(
"Could not cleanup session %s", stag.Data());
1739 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Enter");
1743 TString objname(
"undef");
1747 (*mess) >> isTree >> filename >> dir >> objname;
1748 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1749 "Report size of object %s (%s) in dir %s in file %s",
1750 objname.Data(), isTree ?
"T" :
"O",
1751 dir.Data(), filename.Data());
1753 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1754 "Found %lld %s", entries, isTree ?
"entries" :
"objects");
1759 answ << entries << objname;
1762 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Done");
1787 sscanf(str,
"%1023s %d %ld %d", name, &bin, &size, &fw);
1789 sscanf(str,
"%1023s %d %ld", name, &bin, &size);
1793 if (fnam.BeginsWith(
"cache:")) {
1803 if (!fnam.BeginsWith(
fCacheDir.Data())) {
1816 Info(
"HandleSocketInput",
"forwarding file: %s", fnam.Data());
1817 if (
fProof->
SendFile(fnam, opt, (copytocache ?
"cache" :
"")) < 0) {
1818 Error(
"HandleSocketInput",
"forwarding file: %s", fnam.Data());
1832 (*mess) >> start >> end;
1834 Info(
"HandleSocketInput:kPROOF_LOGFILE",
1835 "Logfile request - byte range: %d - %d", start, end);
1866 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_CACHE",
"enter");
1879 Warning(
"HandleSocketInput:kPROOF_WORKERLISTS",
1880 "Action meaning-less on worker nodes: protocol error?");
1891 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Enter");
1904 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1905 "adding a list of worker nodes returned: %d", ret);
1908 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1909 "getting list of worker nodes returned: %d", retVal);
1929 answ << (
TList *)info;
1935 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Done");
1946 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Enter");
1953 Error(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"could not create TProofPlayer instance!");
1956 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Done");
1959 answ << TString(
"Failed") << (
TObject *)0;
1966 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Enter");
1967 TList* outputList = 0;
1971 outputList =
new TList();
1973 outputList =
new TList();
1978 while ( (o = next()) ) {
1988 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Done");
1995 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Enter");
2001 else dset->Validate();
2008 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Done");
2018 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Enter");
2021 Long64_t totalbytes = 0, bytesready = 0;
2023 answ << dataready << totalbytes << bytesready;
2025 Error(
"HandleSocketInput:kPROOF_DATA_READY",
2026 "This message should not be sent to slaves");
2030 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Done");
2046 Error(
"HandleSocketInput",
"old client: no or incompatible dataset support");
2071 Info(
"HandleSocketInput:kPROOF_REALTIMELOG",
2072 "setting real-time logging %s", (on ?
"ON" :
"OFF"));
2095 Error(
"HandleSocketInput",
"no queries enqueued");
2108 Error(
"HandleSocketInput",
"adding a list of worker nodes returned: %d", ret);
2121 Error(
"HandleSocketInput",
"error getting list of worker nodes");
2123 Warning(
"HandleSocketInput",
"query was re-queued!");
2125 Error(
"HandleSocketInput",
"unexpected answer: %d", retVal);
2146 " idle or undefined player - ignoring");
2172 smsg.Form(
"Echo response from %s:%s: %s",
2180 TString tmpfn =
"echo-out-";
2183 Error(
"HandleSocketInput",
"Can't redirect output");
2198 smsg.Form(
"*** Echo response from %s:%s ***\n",
2204 while (( line = (
TObjString *)nextLine() )) {
2205 smsg.Append( line->String() );
2221 Error(
"HandleSocketInput",
"unknown command %d", what);
2246 Int_t mergedWorkers = 0;
2248 PDB(kSubmerger, 1)
Info(
"AcceptResults",
"enter");
2256 Int_t numworkers = 0;
2261 Info(
"AcceptResults",
"interrupt!");
2269 if (sw && sw != (
TSocket *)(-1)) {
2273 Info(
"AcceptResults",
"connection from a worker accepted on merger %s ",
2276 if (++numworkers >= connections)
2280 Info(
"AcceptResults",
"spurious signal found of merging socket");
2283 if (s->
Recv(mess) < 0) {
2284 Error(
"AcceptResults",
"problems receiving message");
2288 Info(
"AcceptResults",
"message received: %d ", (mess ? mess->
What() : 0));
2290 Error(
"AcceptResults",
"message received: %p ", mess);
2299 PDB(kSubmerger, 2)
Info(
"AcceptResults",
" type %d ", type);
2303 Info(
"AcceptResults",
2304 "a new worker has been mergerd. Total merged workers: %d",
2310 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"removing %p (has been merged)", o);
2313 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"%p not merged yet", o);
2321 for (
Int_t i =0; i< size; ++i){
2323 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"closing socket");
2330 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"exit: %d", result);
2340 Int_t n, nch, wasted = 0;
2342 const Int_t kBufSize = 1024;
2343 char waste[kBufSize];
2349 Info(
"HandleUrgentData",
"handling oob...");
2370 if (nch > kBufSize) nch = kBufSize;
2373 Error(
"HandleUrgentData",
"error receiving waste");
2378 Error(
"HandleUrgentData",
"error receiving OOB");
2384 Info(
"HandleUrgentData",
"got OOB byte: %d\n", oob_byte);
2391 Info(
"HandleUrgentData",
"*** Hard Interrupt");
2408 Error(
"HandleUrgentData",
"error sending OOB");
2419 if (nch > kBufSize) nch = kBufSize;
2422 Error(
"HandleUrgentData",
"error receiving waste (2)");
2432 Info(
"HandleUrgentData",
"Soft Interrupt");
2439 Error(
"HandleUrgentData",
"soft interrupt flushed stream");
2450 Info(
"HandleUrgentData",
"Shutdown Interrupt");
2461 Error(
"HandleUrgentData",
"unexpected OOB byte");
2481 Info(
"HandleSigPipe",
"keepAlive probe failed");
2490 Info(
"HandleSigPipe",
"keepAlive probe failed");
2526 TString sdir = (dir && strlen(dir) > 0) ? dir :
fSessionDir.Data();
2533 if ((freopen(logfile, mode, stdout)) == 0)
2534 SysError(
"RedirectOutput",
"could not freopen stdout (%s)", logfile);
2536 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2537 SysError(
"RedirectOutput",
"could not redirect stderr");
2539 if ((
fLogFile = fopen(logfile,
"r")) == 0)
2540 SysError(
"RedirectOutput",
"could not open logfile '%s'", logfile);
2544 Warning(
"RedirectOutput",
"no way to tell master (or client) where" 2545 " to upload packages");
2557 if (!dd.BeginsWith(
"proofserv")) {
2558 Int_t ic = dd.Index(
":");
2560 dd.Replace(0, ic,
"proofserv");
2584 if (size <= 0)
return 0;
2587 Int_t fd =
open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2589 SysError(
"ReceiveFile",
"error opening file %s", file);
2593 const Int_t kMAXBUF = 16384;
2594 char buf[kMAXBUF], cpy[kMAXBUF];
2599 while (filesize < size) {
2600 left =
Int_t(size - filesize);
2612 Int_t k = 0, i = 0, j = 0;
2619 cpy[j++] = buf[i++];
2623 w =
write(fd, q, r);
2625 w =
write(fd, p, r);
2629 SysError(
"ReceiveFile",
"error writing to file %s", file);
2637 Error(
"ReceiveFile",
"error during receiving file %s", file);
2645 if (chmod(file, 0644) != 0)
2646 Warning(
"ReceiveFile",
"error setting mode 0644 on file %s", file);
2685 off_t ltot=0, lnow=0;
2690 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2693 if (ltot >= 0 && lnow >= 0) {
2696 if (end <= start || end > ltot)
2698 left = (
Int_t)(end - start);
2703 left = (
Int_t)(ltot - lnow);
2710 SysError(
"SendLogFile",
"error sending kPROOF_LOGFILE");
2714 const Int_t kMAXBUF = 32768;
2716 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2724 SysError(
"SendLogFile",
"error reading log file");
2728 if (end == ltot && len == wanted)
2732 SysError(
"SendLogFile",
"error sending log file");
2738 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2740 }
while (len > 0 && left > 0);
2744 if (adhoc && lnow >=0 )
2751 mess << status << (
Int_t) 1;
2754 SysError(
"SendLogFile",
"error sending kPROOF_LOGDONE");
2758 PDB(kGlobal, 1)
Info(
"SendLogFile",
"kPROOF_LOGDONE sent");
2775 mess << bytesread << realtime << cputime << workdir;
2777 mess << TString(gProofServ->
GetImage());
2786 Int_t nparallel = 0;
2789 Info(
"SendParallel",
"Will invoke AskParallel()");
2792 Info(
"SendParallel",
"Will invoke GetParallel()");
2799 mess << nparallel << async;
2818 Error(
"Setup",
"failed to send proof server startup message");
2826 Error(
"Setup",
"failed to receive remote proof protocol");
2830 Error(
"Setup",
"failed to send local proof protocol");
2838 Error(
"Setup",
"OldAuthSetup: failed to setup authentication");
2857 Error(
"Setup",
"failed to receive ordinal and config info");
2865 if (fWorkDir.IsNull())
2879 conffile.Remove(0, 1 + conffile.Index(
":"));
2886 if (tmpWorkDir !=
"")
2890 Info(
"Setup",
"invalid config file %s (missing or unreadable",
2916 Info(
"Setup",
"working directory set to %s",
fWorkDir.Data());
2920 if (host.Index(
".") !=
kNPOS)
2921 host.Remove(host.Index(
"."));
2938 Error(
"Setup",
"common setup failed");
2969 bindir = ROOTBINDIR;
2972 if (!bindir.IsNull()) bindir +=
"/bin";
2976 TString paths =
gEnv->
GetValue(
"ProofServ.BinPaths",
"");
2977 if (paths.Length() > 0) {
2979 if (paths.Contains(
"^<compiler>"))
2981 else if (paths.Contains(
"<compiler>"))
2985 TString compiler = COMPILER;
2986 if (compiler.Index(
"is ") !=
kNPOS)
2987 compiler.Remove(0, compiler.Index(
"is ") + 3);
2990 if (!bindir.IsNull()) bindir +=
":";
2992 }
else if (icomp == -1) {
2993 if (!path.IsNull()) path +=
":";
2999 if (paths.Contains(
"^<sysbin>"))
3001 else if (paths.Contains(
"<sysbin>"))
3005 if (!bindir.IsNull()) bindir +=
":";
3006 bindir +=
"/bin:/usr/bin:/usr/local/bin";
3007 }
else if (isysb == -1) {
3008 if (!path.IsNull()) path +=
":";
3009 path +=
"/bin:/usr/bin:/usr/local/bin";
3014 if (!bindir.IsNull()) bindir +=
":";
3015 path.Insert(0, bindir);
3022 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3031 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3048 Info(
"SetupCommon",
"cache directory set to %s",
fCacheDir.Data());
3052 TString(
fCacheDir).ReplaceAll(
"/",
"%").Data()));
3057 TString packdir =
gEnv->
GetValue(
"ProofServ.PackageDir",
3066 const char *k = (
IsMaster()) ?
"Mst" :
"Wrk";
3067 noth.Form(
"%s-%s", k,
fOrdinal.Data());
3070 Info(
"SetupCommon",
"package directory set to %s", packdir.Data());
3075 if (fDataDir.IsNull()) {
3078 }
else if ((isep = fDataDir.Last(
' ')) !=
kNPOS) {
3080 fDataDir.Remove(isep);
3085 Warning(
"SetupCommon",
"problems creating path '%s' (errno: %d)",
3089 Info(
"SetupCommon",
"data directory set to %s", fDataDir.Data());
3093 TString dataDirOpts =
gEnv->
GetValue(
"ProofServ.DataDirOpts",
"");
3094 if (!dataDirOpts.IsNull()) {
3097 if ((
IsMaster() && !dataDirOpts.Contains(
"M")) ||
3102 if (dataDirOpts.Contains(
"g")) m = 0775;
3103 if (dataDirOpts.Contains(
"a") || dataDirOpts.Contains(
"o")) m = 0777;
3105 Info(
"SetupCommon",
"requested mode for data directories is '%o'", m);
3110 if (fDataDir.BeginsWith(
"/")) p =
"/";
3111 while (fDataDir.Tokenize(subp, from,
"/")) {
3112 if (subp.IsNull())
continue;
3117 Warning(
"SetupCommon",
"problems setting mode '%o' on path '%s' (errno: %d)",
3124 Warning(
"SetupCommon",
"problems stat-ing path '%s' (errno: %d; datadir: %s)",
3133 TString globpack =
gEnv->
GetValue(
"Proof.GlobalPackageDirs",
"");
3137 Info(
"SetupCommon",
" %d global package directories registered", nglb);
3146 Error(
"SetupCommon",
"can not change to working directory '%s'",
3175 TString(
fQueryDir).ReplaceAll(
"/",
"%").Data()));
3195 TString dsms =
gEnv->
GetValue(
"Proof.DataSetManager",
"");
3196 if (!dsms.IsNull()) {
3199 while (dsms.Tokenize(dsm, from,
",")) {
3201 Warning(
"SetupCommon",
"a valid dataset manager already initialized");
3202 Warning(
"SetupCommon",
"support for multiple managers not yet available");
3206 if (
gROOT->GetPluginManager()) {
3208 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
3213 fUser.Data(), dsm.Data()));
3219 Warning(
"SetupCommon",
"dataset manager plug-in initialization failed");
3220 SendAsynMessage(
"TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3225 TString opts(
"Av:");
3226 TString dsetdir =
gEnv->
GetValue(
"ProofServ.DataSetDir",
"");
3227 if (dsetdir.IsNull()) {
3236 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
3241 TString oo =
TString::Format(
"dir:%s opt:%s", dsetdir.Data(), opts.Data());
3243 fGroup.Data(), fUser.Data(), oo.Data()));
3246 Warning(
"SetupCommon",
"default dataset manager plug-in initialization failed");
3251 TString dsReqCfg =
gEnv->
GetValue(
"Proof.DataSetStagingRequests",
"");
3252 if (!dsReqCfg.IsNull()) {
3253 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
3255 if (reReqDir.
Match(dsReqCfg) == 5) {
3257 dsDirFmt.Form(
"dir:%s perms:open", reReqDir[3].
Data());
3263 "failed init of dataset staging requests repository");
3268 "specify, with [dir:]<path>, a valid path for staging requests");
3271 Warning(
"SetupCommon",
"no repository for staging requests available");
3277 if (quotas.IsNull())
3279 if (quotas.IsNull())
3281 if (!quotas.IsNull()) {
3285 while (quotas.Tokenize(tok, from,
" ")) {
3287 if (tok.BeginsWith(
"maxquerykept=")) {
3288 tok.ReplaceAll(
"maxquerykept=",
"");
3293 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3296 const char *ksz[2] = {
"hwmsz=",
"maxsz="};
3297 for (
Int_t j = 0; j < 2; j++) {
3298 if (tok.BeginsWith(ksz[j])) {
3299 tok.ReplaceAll(ksz[j],
"");
3301 if (!tok.IsDigit()) {
3304 const char *s[3] = {
"k",
"m",
"g"};
3305 Int_t i = 0, ki = 1024;
3307 if (tok.EndsWith(s[i++]))
3312 tok.Remove(tok.Length()-1);
3314 if (tok.IsDigit()) {
3316 fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3318 fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3320 TString ssz(ksz[j], strlen(ksz[j])-1);
3321 Info(
"SetupCommon",
"parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3331 Warning(
"SetupCommon",
"problems applying fMaxQueries");
3335 TString vac =
gROOT->GetVersion();
3337 TString rtag =
gEnv->
GetValue(
"ProofServ.RootVersionTag",
"");
3338 if (rtag.Length() > 0)
3350 while (all_vars.Tokenize(name, from,
",")) {
3351 if (!name.IsNull()) {
3361 }
else if (!(
fUser.IsNull()) &&
fGroup.IsNull()) {
3363 }
else if (
fUser.IsNull() && !(
fGroup.IsNull())) {
3373 Info(
"SetupCommon",
"successfully completed");
3393 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3427 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.Data());
3434 while ((fh = next())) {
3435 TProofServInputHandler *ih =
dynamic_cast<TProofServInputHandler *
>(fh);
3452 if (!path || strlen(path) <= 0)
return kFALSE;
3458 const char *ent = 0;
3460 if (!strcmp(ent,
".") || !strcmp(ent,
".."))
continue;
3461 fpath.Form(
"%s/%s", path, ent);
3478 Warning(
"UnlinkDataDir",
"data directory '%s' is empty but could not be removed", path);
3509 if (!oldAuthSetupHook) {
3511 TString authlib =
"libRootAuth";
3517 Error(
"OldAuthSetup",
"can't load %s",authlib.Data());
3521 Error(
"OldAuthSetup",
"can't locate %s",authlib.Data());
3530 Error(
"OldAuthSetup",
"can't find OldProofServAuthSetup");
3546 TDSet *dset,
const char *selec,
3563 fst, dset, selec, elist);
3581 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3585 Info(
"SetQueryRunning",
"starting query: %d", pq->
GetSeqNum());
3588 TString parlist =
"";
3613 Info(
"HandleArchive",
"Enter");
3617 (*mess) >> queryref >> path;
3619 if (slb) slb->Form(
"%s %s", queryref.Data(), path.Data());
3622 if (queryref ==
"Default") {
3624 Info(
"HandleArchive",
3634 if (path.Length() <= 0) {
3636 Info(
"HandleArchive",
3637 "archive paths are not defined - do nothing");
3641 path.Form(
"%s/session-%s-%d.root",
3645 path.ReplaceAll(
":q",
"-");
3652 if (!pqr || qry < 0) {
3653 TString fout = qdir;
3654 fout +=
"/query-result.root";
3660 TIter nxk(f->GetListOfKeys());
3662 while ((k = (
TKey *)nxk())) {
3663 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
3672 Info(
"HandleArchive",
3673 "file cannot be open (%s)",fout.Data());
3680 PDB(kGlobal, 1)
Info(
"HandleArchive",
3681 "archive path for query #%d: %s",
3688 if (!farc || !(farc->IsOpen())) {
3689 Info(
"HandleArchive",
3690 "archive file cannot be open (%s)",path.Data());
3704 if (qry > -1 &&
fQMgr)
3708 Info(
"HandleArchive",
3709 "results of query %s archived to file %s",
3710 queryref.Data(), path.Data());
3729 emsg.Form(
"file collection undefined!");
3740 TUrl *xurl = fiind->GetCurrentUrl();
3772 Info(
"HandleProcess",
"Enter");
3779 TString filename, opt;
3786 (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3797 if ((!hasNoData) && elist)
3807 fPrefix.Data(), emsg.Data()));
3808 Error(
"HandleProcess",
"AssertDataSet: %s", emsg.Data());
3813 }
else if (hasNoData) {
3818 if (!dsn.Contains(
":") || dsn.BeginsWith(
"dataset:")) {
3819 dsn.ReplaceAll(
"dataset:",
"");
3823 emsg.Form(
"dataset manager not initialized!");
3828 emsg.Form(
"requested dataset '%s' does not exists", dsn.Data());
3835 fcmap->
SetName(
"PROOF_FilesToProcess");
3840 if (!emsg.IsNull()) {
3842 fPrefix.Data(), emsg.Data()));
3843 Error(
"HandleProcess",
"%s", emsg.Data());
3859 if (dset) input->
Add(dset);
3860 if (elist) input->
Add(elist);
3864 input->
Clear(
"nodelete");
3869 Warning(
"HandleProcess",
"could not save input data: %s", emsg.Data());
3895 Error(
"HandleProcess",
"error getting list of worker nodes");
3902 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3904 Error(
"HandleProcess",
"Adding a list of worker nodes returned: %d",
3913 Error(
"HandleProcess",
"error getting list of worker nodes");
3920 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3922 Error(
"HandleProcess",
"unknown return value: %d", retVal);
3933 if (!sync || enqueued) {
3941 Info(
"HandleProcess",
4008 Warning(
"HandleProcess",
"could not get input data: %s", emsg.Data());
4012 Warning(
"HandleProcess",
"could not get query sequential number!");
4016 while ((nord = input->
FindObject(
"PROOF_Ordinal")))
4023 while ((o = next())) {
4024 PDB(kGlobal, 2)
Info(
"HandleProcess",
"adding: %s", o->
GetName());
4032 while ((obj = nxt())){
4036 Info(
"HandleProcess",
"selector obj for '%s' found", selector_obj->
ClassName());
4052 Info(
"HandleProcess",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4056 Info(
"HandleProcess",
"calling fPlayer->Process() with selector name: %s", filename.Data());
4068 m << status << abort;
4081 Info(
"TProofServ::Handleprocess",
4082 "worker %s has finished processing with %d objects in output list",
4102 if (!isSubMerging) {
4122 Info(
"HandleProcess",
"controlled mode: worker %s has finished," 4123 " sizes sent to master",
fOrdinal.Data());
4129 Info(
"HandleProcess",
"submerging disabled because of high-memory case");
4132 PDB(kGlobal, 2)
Info(
"HandleProcess",
"merging mode check: %d", isSubMerging);
4148 Int_t merge_port = 0;
4151 Info(
"HandleProcess",
"possible port for merging connections: %d",
4155 msg_osize << merge_port;
4164 PDB(kSubmerger, 2)
Info(
"HandleProcess",
"worker %s has finished",
fOrdinal.Data());
4169 PDB(kGlobal, 2)
Info(
"HandleProcess",
"sending result directly to master");
4171 Warning(
"HandleProcess",
"problems sending output list");
4193 Warning(
"HandleProcess",
"the output list is empty!");
4195 Warning(
"HandleProcess",
"problems sending output list");
4212 while ((obj = nex())) {
4224 TFile *
f =
dynamic_cast<TFile *
>(added->
Last());
4228 while ((o = nxo())) { input->
Remove(o); }
4245 PDB(kGlobal, 1)
Info(
"HandleProcess",
"done");
4256 PDB(kOutput, 2)
Info(
"SendResults",
"enter");
4267 msg.Form(
"%s: merging output objects ... done ",
4271 msg.Form(
"%s: objects merged; sending output: %d objs",
fPrefix.Data(), olsz);
4276 if (sock->
Send(mbuf) < 0)
return -1;
4279 Int_t ns = 0, np = 0;
4282 Int_t totsz = 0, objsz = 0;
4284 while ((o = nxo())) {
4288 "message has %d bytes: limit of %lld bytes reached - sending ...",
4301 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4302 fPrefix.Data(), ns, olsz, objsz);
4305 if (sock->
Send(mbuf) < 0)
return -1;
4312 mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4327 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4328 fPrefix.Data(), ns, olsz, objsz);
4331 if (sock->
Send(mbuf) < 0)
return -1;
4335 msg.Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4347 msg.Form(
"%s: merging output objects ... done ",
4351 msg.Form(
"%s: objects merged; sending output: %d objs",
fPrefix.Data(), olsz);
4356 if (sock->
Send(mbuf) < 0)
return -1;
4360 Int_t totsz = 0, objsz = 0;
4363 while ((o = nxo())) {
4380 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4381 fPrefix.Data(), ns, olsz, objsz);
4384 if (sock->
Send(mbuf) < 0)
return -1;
4389 msg.Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4403 msg.Form(
"%s: sending output: %d objs, %d bytes",
fPrefix.Data(), olsz, blen);
4405 if (sock->
Send(mbuf) < 0)
return -1;
4409 PDB(kGlobal, 2)
Info(
"SendResults",
"sending output list");
4411 PDB(kGlobal, 2)
Info(
"SendResults",
"notifying failure or abort");
4416 PDB(kOutput,2)
Info(
"SendResults",
"done");
4429 TString filename, opt;
4456 Ssiz_t id = opt.Last(
'#');
4457 if (
id !=
kNPOS &&
id < opt.Length() - 1) {
4458 filename += opt(
id + 1, opt.Length());
4468 Error(
"ProcessNext",
"no TDset object: cannot continue");
4490 while ((obj = nxt())){
4494 Info(
"ProcessNext",
"found object for selector '%s'", obj->
ClassName());
4501 Error(
"ProcessNext",
"empty waiting queries list!");
4540 input->
Add(
new TNamed(
"PROOF_QueryTag", qid.Data()));
4551 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"PROOF_UseMergers set to %d", smg);
4557 if ((o = input->
FindObject(
"PROOF_MergersByHost"))) { input->
Remove(o);
delete o; }
4559 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"submergers setup by host/node");
4568 while ((o = next())) {
4579 Info(
"ProcessNext",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4583 Info(
"ProcessNext",
"calling fPlayer->Process() with selector name: %s", filename.Data());
4598 m << status << abort;
4614 Warning(
"ProcessNext",
"problems registering produced datasets: %s", emsg.Data());
4635 while ((xo = nxo())) {
4645 if (oopt.BeginsWith(
"of:")) {
4646 oopt.Replace(0, 3,
"");
4657 Info(
"ProcessNext",
"adding info about dataset '%s' in the light query result", dset->
GetName());
4664 Info(
"ProcessNext",
"sending results");
4667 Warning(
"ProcessNext",
"problems sending output list");
4672 Warning(
"ProcessNext",
"the output list is empty!");
4674 Warning(
"ProcessNext",
"problems sending output list");
4684 if (!(pq->
IsDraw()) && pqr) {
4711 ::
Info(
"TProofServ::RegisterDataSets",
4712 "enter: %d objs in the output list", (out ? out->
GetSize() : -1));
4714 if (!in || !out || !dsm) {
4715 ::Error(
"TProofServ::RegisterDataSets",
"invalid inputs: %p, %p, %p", in, out, dsm);
4723 while ((o = nxo())) {
4741 if (regopt.Contains(
":sortidx:")) {
4743 regopt.ReplaceAll(
":sortidx:",
"");
4750 const char *vfmsg = regopt.Contains(
"V") ?
" and verifying" :
"";
4751 msg.Form(
"Registering%s dataset '%s' ... ", vfmsg, ds->
GetName());
4760 ::Warning(
"TProofServ::RegisterDataSets",
4761 "failure registering or verifying dataset '%s'", ds->
GetName());
4762 msg.Form(
"Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->
GetName());
4764 ::Info(
"TProofServ::RegisterDataSets",
"dataset '%s' successfully registered%s",
4765 ds->
GetName(), (strlen(vfmsg) > 0) ?
" and verified" :
"");
4766 msg.Form(
"Registering%s dataset '%s' ... OK", vfmsg, ds->
GetName());
4773 ::Info(
"TProofServ::RegisterDataSets",
"printing collection");
4777 ::Warning(
"TProofServ::RegisterDataSets",
"collection '%s' is empty", o->
GetName());
4780 ::Info(
"TProofServ::RegisterDataSets",
"dataset registration not allowed");
4787 while ((o = nxrm())) out->
Remove(o);
4791 while((o = nxtg())) {
4797 PDB(kDataset, 1) ::
Info(
"TProofServ::RegisterDataSets",
"exit");
4808 Info(
"HandleQueryList",
"Enter");
4814 Int_t ntot = 0, npre = 0, ndraw= 0;
4819 Int_t idx = qdir.Index(
"session-");
4847 Warning(
"HandleQueryList",
"unable to clone TProofQueryResult '%s:%s'",
4848 pqr->GetName(), pqr->GetTitle());
4857 m << npre << ndraw << ql;
4871 Info(
"HandleRemove",
"Enter");
4874 (*mess) >> queryref;
4876 if (slb) *slb = queryref;
4878 if (queryref ==
"cleanupqueue") {
4882 Info(
"HandleRemove",
"%d queries removed from the waiting list", pend);
4887 if (queryref ==
"cleanupdir") {
4893 Info(
"HandleRemove",
"%d directories removed", nd);
4918 Warning(
"HandleRemove",
"query result manager undefined!");
4922 Info(
"HandleRemove",
4923 "query %s could not be removed (unable to lock session)", queryref.Data());
4935 Info(
"HandleRetrieve",
"Enter");
4938 (*mess) >> queryref;
4940 if (slb) *slb = queryref;
4947 TString fout = qdir;
4948 fout +=
"/query-result.root";
4954 TIter nxk(f->GetListOfKeys());
4956 while ((k = (
TKey *)nxk())) {
4957 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
4965 if ((d = dynamic_cast<TDSet *>(o)))
4974 static const char *clb[4] = {
"bytes",
"KB",
"MB",
"GB" };
4975 while (qsz > 1000. && ilb < 3) {
4984 Info(
"HandleRetrieve",
4985 "query not found in file %s",fout.Data());
4995 Info(
"HandleRetrieve",
4996 "file cannot be open (%s)",fout.Data());
5015 (*mess) >> type >> add >> path;
5019 if ((type !=
"lib") && (type !=
"inc")) {
5020 Error(
"HandleLibIncPath",
"unknown action type: %s", type.Data());
5025 path.ReplaceAll(
",",
" ");
5029 if (path.Length() > 0 && path !=
"-") {
5030 if (!(op = path.Tokenize(
" "))) {
5031 Error(
"HandleLibIncPath",
"decomposing path %s", path.Data());
5038 if (type ==
"lib") {
5045 TString xlib = lib->
GetName();
5052 if (newlibpath.BeginsWith(
".:"))
5054 if (newlibpath.Index(xlib) ==
kNPOS) {
5059 Info(
"HandleLibIncPath",
5060 "libpath %s does not exist or cannot be read - not added", xlib.Data());
5075 TString xinc = inc->
GetName();
5080 if (curincpath.Index(xinc) ==
kNPOS)
5083 Info(
"HandleLibIncPath",
5084 "incpath %s does not exist or cannot be read - not added", xinc.Data());
5095 if (type ==
"lib") {
5102 TString xlib = lib->
GetName();
5123 newincpath.ReplaceAll(
gInterpreter->GetIncludePath(),
"");
5148 (*mess) >> filenam >> md5;
5152 if (slb) *slb = filenam;
5154 if (filenam.BeginsWith(
"-")) {
5160 TString packnam = filenam;
5161 packnam.Remove(packnam.Length() - 4);
5164 if (md5local && md5 == (*md5local)) {
5167 Error(
"HandleCheckFile",
"failure cleaning %s", packnam.Data());
5175 Info(
"HandleCheckFile",
5176 "package %s installed on node", filenam.Data());
5179 Error(
"HandleCheckFile",
"gunzip not found");
5181 Error(
"HandleCheckFile",
"package %s did not unpack into %s",
5182 filenam.Data(), packnam.Data());
5193 Error(
"HandleCheckFile",
5194 "package %s not yet on node", filenam.Data());
5212 Info(
"HandleCheckFile",
5213 "problems uploading package %s", parpath.Data());
5218 }
else if (filenam.BeginsWith(
"+") || filenam.BeginsWith(
"=")) {
5219 filenam.Remove(0,1);
5221 TString parname = filenam;
5228 "problems installing package %s", filenam.Data());
5236 if (md5local && md5 == (*md5local)) {
5240 Info(
"HandleCheckFile",
5241 "package %s already on node", parname.Data());
5244 TString
par = filenam;
5251 "problems uploading package %s", par.Data());
5259 Info(
"HandleCheckFile",
5260 "package %s not yet on node", filenam.Data());
5267 TString cachef =
fCacheDir +
"/" + filenam;
5271 if (md5local && md5 == (*md5local)) {
5274 Info(
"HandleCheckFile",
"file %s already on node", filenam.Data());
5279 Info(
"HandleCheckFile",
"file %s not yet on node", filenam.Data());
5293 Info(
"HandleCache",
"Enter");
5305 const char *k = (
IsMaster()) ?
"Mst" :
"Wrk";
5306 noth.Form(
"%s-%s", k,
fOrdinal.Data());
5309 TString packagedir, package, pdir, ocwd,
file;
5325 if (slb) slb->Form(
"%d %d", type, all);
5331 if (file.IsNull() || file ==
"*") {
5339 if (slb) slb->Form(
"%d %s", type, file.Data());
5347 if (slb) slb->Form(
"%d %d", type, all);
5355 if (slb) slb->Form(
"%d %d", type, status);
5364 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5373 noth.Data(), package.Data()));
5386 "kBuildPackage: problems forwarding package %s to workers", package.Data());
5388 noth.Data(), package.Data()));
5396 "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
5403 status = packmgr->
Build(package.Data(), chkveropt);
5414 Info(
"HandleCache",
"package %s successfully built", package.Data());
5416 if (slb) slb->Form(
"%d %s %d %d", type, package.Data(), status, chkveropt);
5425 if ((status =
fPackMgr->
Load(package.Data(), optls)) < 0) {
5429 noth.Data(), package.Data(), optls,
5430 (optls && optls->GetSize() > 0) ? optls->GetSize() : 0));
5435 if (optls && optls->GetSize() > 0) {
5445 Info(
"HandleCache",
"package %s successfully loaded", package.Data());
5448 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5453 { TString title(
"*** Enabled packages ***");
5455 title.Form(
"*** Enabled packages on %s %s on %s",
5456 (
IsMaster()) ?
"master" :
"worker",
5464 if (slb) slb->Form(
"%d %d", type, all);
5471 if (slb) slb->Form(
"%d %d", type, all);
5478 if (slb) slb->Form(
"%d %s", type, file.Data());
5485 if (slb) slb->Form(
"%d %d", type, all);
5490 if (slb) slb->Form(
"%d", type);
5496 if (slb) slb->Form(
"%d %s", type, package.Data());
5503 if (slb) slb->Form(
"%d %s %d", type, package.Data(), chkveropt);
5510 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5517 if (slb) slb->Form(
"%d %s", type, package.Data());
5523 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5529 if (slb) slb->Form(
"%d %s", type, package.Data());
5539 if (slb) slb->Form(
"%d", type);
5545 msg << type << pack;
5547 pack->SetOwner(
kTRUE);
5550 if (slb) slb->Form(
"%d", type);
5568 TString pack(package);
5570 if ((from = pack.Index(
",")) !=
kNPOS) pack.Remove(from);
5571 Info(
"HandleCache",
"loading macro %s ...", pack.Data());
5586 if (slb) slb->Form(
"%d %s", type, package.Data());
5590 Error(
"HandleCache",
"unknown type %d", type);
5604 Info(
"HandleWorkerLists",
"Enter");
5614 if (ord !=
"*" && !ord.BeginsWith(
GetOrdinal()) && ord !=
"restore")
break;
5623 if (nactnew == nactmax) {
5624 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"all workers (re-)activated");
5627 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"%d workers could not be (re-)activated", nactmax - nactnew);
5629 }
else if (ord ==
"restore") {
5631 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"active worker(s) restored");
5633 Error(
"HandleWorkerList",
"some active worker(s) could not be restored; check logs");
5636 if (nactnew == (nact + nwc)) {
5638 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"worker(s) %s (re-)activated", ord.Data());
5641 Error(
"HandleWorkerList",
"some worker(s) could not be (re-)activated;" 5642 " # of actives: %d --> %d (nwc: %d)",
5643 nact, nactnew, nwc);
5645 rc = (nwc < 0) ? nwc : -1;
5649 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"all workers are already active");
5652 Warning(
"HandleWorkerList",
"undefined PROOF session: protocol error?");
5657 if (ord !=
"*" && !ord.BeginsWith(
GetOrdinal()) && ord !=
"restore")
break;
5665 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"all workers deactivated");
5668 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"%d workers could not be deactivated", nactnew);
5671 if (nactnew == (nact - nwc)) {
5673 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"worker(s) %s deactivated", ord.Data());
5676 Error(
"HandleWorkerList",
"some worker(s) could not be deactivated:" 5677 " # of actives: %d --> %d (nwc: %d)",
5678 nact, nactnew, nwc);
5680 rc = (nwc < 0) ? nwc : -1;
5684 PDB(kGlobal, 1)
Info(
"HandleWorkerList",
"all workers are already inactive");
5687 Warning(
"HandleWorkerList",
"undefined PROOF session: protocol error?");
5691 Warning(
"HandleWorkerList",
"unknown action type (%d)", type);
5711 Info(
"GetWorkers",
"using PROOF config file: %s",
fConfFile.Data());
5718 "no appropriate master line found in %s",
fConfFile.Data());
5722 if (
fImage.IsNull() && strlen(master->GetImage()) > 0)
5723 fImage = master->GetImage();
5771 TString lvl =
gEnv->
GetValue(
"Root.ErrorIgnoreLevel",
"Print");
5793 if (level >=
kError && gProofServ)
5798 const char *
type = 0;
5801 Int_t ipos = (location) ? strlen(location) : 0;
5807 if (level >=
kInfo) {
5809 char *ps = location ? (
char *) strrchr(location,
'|') : (
char *)0;
5811 ipos = (int)(ps - (
char *)location);
5827 type =
"*** Break ***";
5845 if (!location || ipos == 0 ||
5850 (gProofServ ? gProofServ->
GetPrefix() :
"proof"),
5857 (gProofServ ? gProofServ->
GetPrefix() :
"proof"),
5858 type, ipos, location, msg);
5868 if (__crashreporter_info__)
5869 delete [] __crashreporter_info__;
5870 __crashreporter_info__ =
StrDup(buf);
5877 if (gProofServ != 0 && !recursive) {
5922 Printf(
" +++ Latest processing times: %f s (CPU: %f s)",
5951 TString sqlserv =
gEnv->
GetValue(
"ProofServ.QueryLogDB",
"");
5952 TString sqluser =
gEnv->
GetValue(
"ProofServ.QueryLogUser",
"");
5953 TString sqlpass =
gEnv->
GetValue(
"ProofServ.QueryLogPasswd",
"");
5955 Int_t priority = 100;
5961 sql.Form(
"SELECT priority WHERE group='%s' FROM proofpriority",
fGroup.Data());
5967 Error(
"GetPriority",
"failed to connect to SQL server %s as %s %s",
5968 sqlserv.Data(), sqluser.Data(), sqlpass.Data());
5969 printf(
"%s\n", sql.Data());
5974 Error(
"GetPriority",
"query into proofpriority failed");
5975 Printf(
"%s", sql.Data());
5982 Error(
"GetPriority",
"first row is header is NULL");
6006 Info(
"SendAsynMessage",
"%s", (msg ? msg :
"(null)"));
6010 m << TString(msg) << lf;
6013 "could not send message '%s'", (msg ? msg :
"(null)"));
6026 off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
6027 if (lend >= 0) lseek(
fLogFileDes, lend, SEEK_SET);
6044 if (truncsz < 100) {
6045 emsg.Form(
"+++ WARNING +++: %s: requested truncate size too small" 6051 while (ftruncate(fileno(stdout), truncsz) != 0 &&
6056 Error(
"TruncateLogFile",
"truncating to %lld bytes; file size is %lld bytes (errno: %d)",
6058 emsg.Form(
"+++ WARNING +++: %s: problems truncating log file to %lld bytes; file size is %lld bytes" 6062 Info(
"TruncateLogFile",
"file truncated to %lld bytes (80%% of %lld); file size was %lld bytes ",
6064 emsg.Form(
"+++ WARNING +++: %s: log file truncated to %lld bytes (80%% of %lld)",
6070 emsg.Form(
"+++ WARNING +++: %s: could not stat log file descriptor" 6083 Error(
"HandleException",
"caugth exception triggered by signal '%d' %s %lld",
6087 emsg.Form(
"%s: caught exception triggered by signal '%d' %s %lld",
6101 Info(
"HandleDataSets",
"enter");
6105 Warning(
"HandleDataSets",
"no data manager is available to fullfil the request");
6110 TString dsUser, dsGroup, dsName, dsTree, uri, opt;
6126 if (slb) slb->Form(
"%d %s", type, uri.Data());
6138 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6143 Error(
"HandleDataSets",
"can not save an empty list.");
6151 Info(
"HandleDataSets",
"dataset registration not allowed");
6152 if (slb) slb->Form(
"%d notallowed", type);
6163 Error(
"HandleDataSets",
6164 "no dataset staging request repository available");
6169 TString validUri = uri;
6170 while (reInvalid.
Substitute(validUri,
"_")) {}
6175 Warning(
"HandleDataSets",
"staging of %s already requested",
6183 Error(
"HandleDataSets",
"empty dataset or no dataset returned");
6191 while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
6193 Int_t nToErase = fi->GetNUrls() - 1;
6194 for (
Int_t i=0; i<nToErase; i++)
6205 Error(
"HandleDataSets",
6206 "can't register staging request for %s", uri.Data());
6211 Info(
"HandleDataSets",
6212 "Staging request registered for %s", uri.Data());
6222 Error(
"HandleDataSets",
6223 "no dataset staging request repository available");
6241 Info(
"HandleDataSets",
"no pending staging request for %s",
6251 Error(
"HandleDataSets",
6252 "no dataset staging request repository available");
6270 (*mess) >> uri >> opt;
6271 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6279 (*mess) >> uri >> opt;
6280 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6284 if (kLite !=
kNPOS) {
6286 opt.Remove(kLite, strlen(
":lite:"));
6290 if (returnMap && !opt.IsNull()) {
6295 TIter nxd(returnMap);
6306 Info(
"HandleDataSets",
"no dataset found on server '%s'", opt.Data());
6323 (*mess) >> uri >> opt;
6324 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6340 if (slb) slb->Form(
"%d %s", type, uri.Data());
6346 Info(
"HandleDataSets",
"dataset creation / removal not allowed");
6347 if (slb) slb->Form(
"%d notallowed", type);
6355 (*mess) >> uri >> opt;
6356 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6366 Info(
"HandleDataSets",
"dataset verification not allowed");
6374 if (slb) slb->Form(
"%d", type);
6376 if (groupQuotaMap) {
6383 Info(
"HandleDataSets",
"quota control disabled");
6384 if (slb) slb->Form(
"%d disabled", type);
6392 if (slb) slb->Form(
"%d", type);
6397 Info(
"HandleDataSets",
"quota control disabled");
6398 if (slb) slb->Form(
"%d disabled", type);
6406 if (slb) slb->Form(
"%d %s", type, uri.Data());
6409 Info(
"HandleDataSets",
"kSetDefaultTreeName: modification of dataset info not allowed");
6410 if (slb) slb->Form(
"%d notallowed", type);
6417 (*mess) >> uri >> opt;
6418 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6419 if (opt ==
"show") {
6422 }
else if (opt ==
"clear") {
6426 Error(
"HandleDataSets",
"kCache: unknown action: %s", opt.Data());
6432 Error(
"HandleDataSets",
"unknown type %d", type);
6459 Info(
"HandleSubmerger",
"kSendOutput: interrupting ...");
6470 Int_t merger_id = -1;
6471 (*mess) >> merger_id >> name >> port;
6473 Info(
"HandleSubmerger",
"worker %s redirected to merger #%d %s:%d",
fOrdinal.Data(), merger_id, name.Data(), port);
6476 if (name.Length() > 0 && port > 0 && (t =
new TSocket(name, port)) && t->IsValid()) {
6478 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
6479 "kSendOutput: worker asked for sending output to merger #%d %s:%d",
6480 merger_id, name.Data(), port);
6483 msg.Form(
"worker %s cannot send results to merger #%d at %s:%d",
GetPrefix(), merger_id, name.Data(), port);
6484 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
6485 "kSendOutput: %s - inform the master", msg.Data());
6499 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kSendOutput: worker sent its output");
6506 if (name ==
"master") {
6507 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
6508 "kSendOutput: worker was asked for sending output to master");
6510 Warning(
"HandleSubmerger",
"problems sending output list");
6516 }
else if (!t || !(t->IsValid())) {
6517 msg.Form(
"worker %s could not open a valid socket to merger #%d at %s:%d",
6518 GetPrefix(), merger_id, name.Data(), port);
6519 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
6520 "kSendOutput: %s - inform the master", msg.Data());
6535 Error(
"HandleSubmerger",
"kSendOutput: received not on worker");
6546 Int_t merger_id = -1;
6548 Int_t connections = 0;
6549 (*mess) >> merger_id >> connections;
6551 Info(
"HandleSubmerger",
"worker %s established as merger",
fOrdinal.Data());
6554 Info(
"HandleSubmerger",
6555 "kBeMerger: worker asked for being merger #%d for %d connections",
6556 merger_id, connections);
6561 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
6562 "kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
6570 Info(
"HandleSubmerger",
"kBeMerger: all outputs from workers accepted");
6573 Info(
"",
"adding own output to the list on %s",
fOrdinal.Data());
6582 while ((o = nxo())) {
6588 Info(
"HandleSocketInput",
"removing merged object (%p)", o);
6593 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kBeMerger: own outputs added");
6594 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"starting delayed merging on %s",
fOrdinal.Data());
6601 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"delayed merging on %s finished ",
fOrdinal.Data());
6602 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"%s sending results to master ",
fOrdinal.Data());
6605 Warning(
"HandleSubmerger",
"kBeMerger: problems sending output list");
6609 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kBeMerger: results sent to master");
6626 Warning(
"HandleSubmerger",
"kBeMerger: problems craeting the merger player!");
6635 Error(
"HandleSubmerger",
"kSendOutput: received not on worker");
6649 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kStopMerging");
6651 Info(
"HandleSubmerger",
"kStopMerging: interrupting ...");
6667 Info(
"HandleFork",
"fork cloning not implemented");
6681 if ((pid = fork()) < 0) {
6682 Error(
"Fork",
"failed to fork");
6687 if (!pid)
return pid;
6701 Warning(
"Fork",
"Functionality not provided under windows");
6716 if (fname.Contains(
"<user>")) {
6717 if (gProofServ && gProofServ->
GetUser() && strlen(gProofServ->
GetUser())) {
6718 fname.ReplaceAll(
"<user>", gProofServ->
GetUser());
6722 fname.ReplaceAll(
"<user>",
"nouser");
6726 if (fname.Contains(
"<u>")) {
6727 if (gProofServ && gProofServ->
GetUser() && strlen(gProofServ->
GetUser())) {
6728 TString u(gProofServ->
GetUser()[0]);
6729 fname.ReplaceAll(
"<u>", u);
6732 fname.ReplaceAll(
"<u>", u);
6734 fname.ReplaceAll(
"<u>",
"n");
6738 if (fname.Contains(
"<group>")) {
6739 if (gProofServ && gProofServ->
GetGroup() && strlen(gProofServ->
GetGroup())) {
6740 fname.ReplaceAll(
"<group>", gProofServ->
GetGroup());
6744 fname.ReplaceAll(
"<group>",
"default");
6748 if (fname.Contains(
"<stag>")) {
6754 ::Warning(
"TProofServ::ResolveKeywords",
"session tag undefined: ignoring");
6758 if (fname.Contains(
"<ord>")) {
6760 fname.ReplaceAll(
"<ord>", gProofServ->
GetOrdinal());
6762 ::Warning(
"TProofServ::ResolveKeywords",
"ordinal number undefined: ignoring");
6765 if (fname.Contains(
"<qnum>")) {
6769 ::Warning(
"TProofServ::ResolveKeywords",
"query seqeuntial number undefined: ignoring");
6772 if (fname.Contains(
"<file>") && path && strlen(path) > 0) {
6773 fname.ReplaceAll(
"<file>", path);
6776 if (fname.Contains(
"<rver>")) {
6778 fname.ReplaceAll(
"<rver>", v);
6781 if (fname.Contains(
"<build>")) {
6784 fname.ReplaceAll(
"<build>", b);
6799 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6815 fprintf(fs,
"%d", st);
6818 Info(
"UpdateSessionStatus",
"status (=%d) update in path: %s", st,
fAdminPath.Data());
6831 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6840 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6849 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6859 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6869 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6880 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6893 std::lock_guard<std::recursive_mutex> lock(
fQMtx);
6898 while ((o = nxq())) {
6965 if (!dsrv.EndsWith(
"/")) dsrv +=
"/";
6983 if (!pfx.IsNull() && !strncmp(u.
GetFile(), pfx.Data(), pfx.Length())) {
6985 if (srvp ==
"root" || srvp ==
"xrd") path.Remove(0, pfx.Length());
6999 const char *pname =
GetName();
7002 fLockId =
open(pname, O_CREAT|O_RDWR, 0644);
7004 fLockId =
open(pname, O_RDWR);
7006 if (fLockId == -1) {
7007 SysError(
"Lock",
"cannot open lock file %s", pname);
7014 #if !defined(R__WIN32) && !defined(R__WINGCC) 7015 if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
7016 SysError(
"Lock",
"error locking %s", pname);
7041 lseek(fLockId, 0, SEEK_SET);
7042 #if !defined(R__WIN32) && !defined(R__WINGCC) 7043 if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
virtual void HandleException(Int_t sig)
Exception handler: we do not try to recover here, just exit.
void SetCompressionSettings(Int_t settings=1)
void IncrementDrawQueries()
TClass * GetClass() const
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
static void AssertMacroPath(const char *macro)
Make sure that the directory path contained by macro is in the macro path.
Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
Accept and merge results from a set of workers.
virtual const char * GetName() const
Returns name of object.
virtual void SysError(const char *method, const char *msgfmt,...) const
Issue system error message.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Long64_t GetEntries() const
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
static Int_t GetCmdRtn()
Static method to get the return code from the execution of a command via the pipe.
TList * GetListOfLines() const
static FILE * SetErrorHandlerFile(FILE *ferr)
Set the file stream where to log (default stderr).
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Int_t Compress()
Compress the message.
double read(const std::string &file_name)
reading
ErrorHandlerFunc_t SetErrorHandler(ErrorHandlerFunc_t newhandler)
Set an errorhandler function. Returns the old handler.
TList * GetListOfBadSlaves() const
TMD5 * ReadMD5(const char *pack)
Read MD5 checksum of the PAR file from the PROOF-INF/md5.txt file.
static FILE * fgErrorHandlerFile
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Bool_t IsIdle()
Return the idle status.
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "...
virtual TList * GetInputList() const =0
void IncCPUTime(Double_t procTime)
The PROOF package manager contains tools to manage packages.
void GetEnabledPackages(TString &packlist)
Method to get a semi-colon separated list with the names of the enabled packages. ...
virtual void HandleRemove(TMessage *mess, TString *slb=0)
Handle remove request.
virtual int GetPid()
Get process id.
virtual Int_t GetLearnEntries()=0
Int_t UnloadPackages()
Unload all packages.
void GetOptions(Int_t *argc, char **argv)
Get and handle command line options.
TList * GetSubmasters()
Get the list of submaster nodes.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
static Long64_t GetEntries(Bool_t isTree, const char *filename, const char *path, TString &objname)
Returns number of entries in tree or objects in file.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
void AskParallel()
Ask the for the number of parallel slaves.
void LogToMaster(Bool_t on=kTRUE)
Long64_t GetBytesRead() const
static Bool_t GetFileInCmd(const char *cmd, TString &fn)
Static method to extract the filename (if any) form a CINT command.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Int_t ClearPackages()
Remove all packages.
TFileCollection * GetFilesOnServer(const char *server)
Return the subset of files served by 'server'.
virtual const char * GetBuildCompilerVersion() const
Return the build compiler version.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
virtual Int_t ClearCache(const char *uri)
Clear cached information matching uri.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void AddInput(TObject *inp)=0
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
R__EXTERN Int_t gErrorIgnoreLevel
Bool_t IsWaiting()
Return kTRUE if the session is waiting for the OK to start processing.
R__EXTERN TProofDebug::EProofDebugMask gProofDebugMask
virtual const char * WorkingDirectory()
Return working directory.
virtual void StackTrace()
Print a stack trace.
static TSQLServer * Connect(const char *db, const char *uid, const char *pw)
The db should be of the form: <dbms>://<host>[:<port>][/<database>], e.g.
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
void SetRealTimeLog(Bool_t on=kTRUE)
Switch ON/OFF the real-time logging facility.
void Reset()
Reset the timer.
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
static Long_t fgResMemMax
void SetCompressionSettings(Int_t settings=1)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level...
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
virtual TSeqCollection * GetListOfFileHandlers() const
Collectable string class.
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
virtual TSQLResult * Query(const char *sql)=0
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
This class represents a WWW compatible URL.
void SetWriteV3(Bool_t on=kTRUE)
Set/Reset the 'OldStreamer' bit in this instance and its elements.
static void FilterLocalroot(TString &path, const char *url="root://dum/")
If 'path' is local and 'dsrv' is Xrootd, apply 'path.Localroot' settings, if any. ...
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
void SetPlayer(TVirtualProofPlayer *player)
Set a new PROOF player.
TObject * GetParameter(const char *par) const
Get specified parameter.
Bool_t TestBit(UInt_t f) const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
const char * GetProtocol() const
This class implements a data set to be used for PROOF processing.
const char * GetGroup() const
void TerminateWorker(TSlave *wrk)
Ask an active worker 'wrk' to terminate, i.e. to shutdown.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t HandleDataSets(TMessage *mess, TString *slb=0)
Handle here requests about datasets.
Int_t Unload(const char *pack)
Method to unload a package.
virtual void MakePlayer()
Make player instance.
virtual void HandleProcess(TMessage *mess, TString *slb=0)
Handle processing request.
TVirtualProofPlayer * fPlayer
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked, kFALSE otherwise.
static void SendAsynMsg(const char *msg)
Int_t GetParPath(const char *pack, TString &path)
Method to get the path of the PAR file for package 'pack'.
virtual Bool_t RemoveDataSet(const char *uri)
Removes the indicated dataset.
Int_t GetNumOfFiles()
Return the number of files in the dataset.
TSocket * GetSocket() const
const char *const kPROOF_WorkDir
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
virtual EExitStatus GetExitStatus() const =0
virtual Int_t GetEntries() const
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
TList * GetOutputList()
Get list with all object created during processing (see Process()).
static Long_t GetVirtMemMax()
VirtMemMax getter.
TDataSetManager * fDataSetManager
TObject * GetInputObject(const char *classname) const
Return first instance of class 'classname' in the input list.
virtual Int_t GetLocalPort()
Get port # to which server socket is bound. In case of error returns -1.
This class represents an Internet Protocol (IP) address.
virtual int MakeDirectory(const char *name)
Make a directory.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Int_t DisablePackage(const char *package)
Remove a specific package.
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
virtual TFileCollection * GetDataSet(const char *uri, const char *server=0)
Utility function used in various methods for user dataset upload.
Int_t WaitingQueries()
Return the number of waiting queries.
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
static TPackMgr * GetPackMgr(const char *pack, TPackMgr *packmgr=nullptr)
Get the package manager having 'pack'; priority is given to packmgr, if defined.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
void IncProcTime(Double_t procTime)
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
R__EXTERN Int_t gErrorAbortLevel
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual Int_t HandleLibIncPath(TMessage *mess)
Handle lib, inc search paths modification request.
Class supporting a collection of lines with C++ code.
TList * GetListOfInactiveSlaves() const
Int_t LoadPlugin()
Load the plugin library for this handler.
TList * GetListOfSlaves() const
virtual int Load(const char *module, const char *entry="", Bool_t system=kFALSE)
Load a shared library.
virtual void SetCurrentQuery(TQueryResult *q)=0
TDataSetManagerFile * fDataSetStgRepo
Int_t Clean(const char *pack)
Clean dir for package 'pack' Return -1 in case of error, 0 otherwise.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void SetQueryRunning(TProofQueryResult *pq)
Set query in running state.
Int_t Load(const char *pack, TList *optls=0)
Method to load a package taking an option list Return -1 on error, 0 otherwise.
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
TQueryResult * CloneInfo()
Return an instance of TQueryResult containing only the local info fields, i.e.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
TReaperTimer * fReaperTimer
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Int_t GetProtocol() const
virtual void RemoveAll()
Remove all sockets from the monitor.
virtual void ShowDataSets(const char *uri="*", const char *opt="")
Prints formatted information about the dataset 'uri'.
Option_t * GetOption() const
TProofQueryResult * LocateQuery(TString queryref, Int_t &qry, TString &qdir)
Locate query referenced by queryref.
virtual TMap * GetGroupQuotaMap()
virtual FILE * OpenPipe(const char *command, const char *mode)
Open a pipe.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
TProofServLogHandlerGuard(const char *cmd, TSocket *s, const char *pfx="", Bool_t on=kTRUE)
Init a guard for executing a command in a pipe.
void Show(const char *title=0)
Show available packages.
virtual void DeActivateAll()
De-activate all activated sockets.
Bool_t IsTopMaster() const
Bool_t UseDynamicStartup() const
static volatile Int_t gProofServDebug
TObject * FindObject(const char *name) const
Find object using its name.
TMonitor * fMergingMonitor
Long64_t GetEntries() const
void ScanPreviousQueries(const char *dir)
Scan the queries directory for the results of previous queries.
Int_t Unpack(const char *pack, TMD5 *sum=0)
Read MD5 checksum of the PAR file from the PROOF-INF/md5.txt file.
Int_t AddIncludePath(const char *incpath, Bool_t onClient=kFALSE, TList *wrks=0, Bool_t doCollect=kTRUE)
Add 'incpath' to the inc path search.
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
virtual void ClearCache(const char *file=0)
Remove file from all file caches.
void Print(Option_t *option="") const
Dump the content.
const char * GetOptions() const
static Float_t GetMemHWM()
MemHWM getter.
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
TList * GetListOfElements() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
TProofNodeInfo * GetMaster()
Get the master node.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
static Long_t fgVirtMemMax
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void HandleCheckFile(TMessage *mess, TString *slb=0)
Handle file checking request.
void SendParallel(Bool_t async=kFALSE)
Send number of parallel nodes to master or client.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
std::recursive_mutex fQMtx
static const char * GetMacroPath()
Get macro search path. Static utility function.
static Int_t GetErrno()
Static function returning system error number.
virtual void HandleArchive(TMessage *mess, TString *slb=0)
Handle archive request.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
virtual void UpdateProgressInfo()=0
Int_t RemoveIncludePath(const char *incpath, Bool_t onClient=kFALSE)
Remove 'incpath' from the inc path search.
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
TTimeStamp GetLastUsage()
const char * GetFile() const
Int_t Install(const char *par, Bool_t rmold=kFALSE)
Install package from par (unpack the file in the directory); par can be an URL for remote retrieval...
Manages an element of a TDSet.
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
static void SetLastEntry(Long64_t lastentry)
Set the last entry before exception.
TList * GetListOfEnabled() const
Get list of enabled packages Returns a pointer to a TList object, transferring ownership to the calle...
static struct mg_connection * fc(struct mg_context *ctx)
Int_t Update(Long64_t avgsize=-1)
Update accumulated information about the elements of the collection (e.g.
virtual const char * GetDirEntry(void *dirp)
Get a directory entry. Returns 0 if no more entries.
virtual TList * GetOutputList() const =0
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Int_t GetQuerySeqNum() const
virtual const char * ClassName() const
Returns name of class to which the object belongs.
void Stop()
Stop the stopwatch.
TList * GetWorkers()
Get the list of worker nodes.
Int_t GetCompressionLevel() const
virtual void Print(Option_t *option="") const
Print status of PROOF cluster.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual FILE * TempFileName(TString &base, const char *dir=0)
Create a secure temporary file by appending a unique 6 letter string to base.
virtual void HandleFork(TMessage *mess)
Cloning itself via fork. Not implemented.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
virtual void MergeOutput(Bool_t=kFALSE)=0
This code implements the MD5 message-digest algorithm.
The TNamed class is the base class for all named ROOT classes.
virtual void ShowQuota(const char *opt)
Display quota information.
void SetLogLevel(Int_t level, UInt_t mask=TProofDebug::kAll)
Set server logging level.
virtual void Run(Bool_t retrn=kFALSE)
Main application eventloop. Calls system dependent eventloop via gSystem.
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
TList * PreviousQueries() const
const char *const kPROOF_QueryDir
TMacro * GetSelecHdr() const
static Bool_t Initialized()
Return kTRUE if the TROOT object has been initialized.
static Long64_t GetFileBytesRead()
Static function returning the total number of bytes read from all files.
virtual Int_t RegisterDataSet(const char *uri, TFileCollection *dataSet, const char *opt)
Register a dataset, perfoming quota checkings, if needed.
virtual void ShowCache(Bool_t all=kFALSE)
List contents of file cache.
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=0)
Resolve the file type as a function of the protocol field in 'name'.
Long64_t GetBytes() const
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Bool_t IsEndMaster() const
void ShowEnabled(const char *title=0)
Show enabled packages.
void DeleteAll()
Remove all (key,value) pairs from the map AND delete the keys AND values when they are allocated on t...
virtual const char * GetDynamicPath()
Return the dynamic path (used to find shared libraries).
const char * GetGroup() const
virtual void HandleUrgentData()
Handle Out-Of-Band data sent by the master or client.
TFileHandler * fInputHandler
Int_t CleanupSession(const char *sessiontag)
Cleanup query dir qdir.
const char * AsString(const Option_t *option="") const
Return the date & time as a string.
virtual const char * Getenv(const char *env)
Get environment variable.
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e)
Register TFileCollections in 'out' as datasets according to the rules in 'in'.
Int_t ApplyMaxQueries(Int_t mxq)
Scan the queries directory and remove the oldest ones (and relative dirs, if empty) in such a way onl...
std::vector< std::vector< double > > Data
virtual void ExitLoop()
Exit from event loop.
virtual Bool_t IsValid() const
const char * GetSessionTag() const
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
TProofQueryResult * NextQuery()
Get the next query from the waiting list.
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
virtual ~TProofServ()
Cleanup.
static Long_t GetResMemMax()
ResMemMax getter.
virtual TSocket * Accept(UChar_t Opt=0)
Accept a connection on a server socket.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
virtual Int_t HandleCache(TMessage *mess, TString *slb=0)
Handle here all cache and package requests.
virtual void HandleQueryList(TMessage *mess)
Handle request for list of queries.
A container class for query results.
Int_t ActivateWorker(const char *ord, Bool_t save=kTRUE)
Make sure that the worker identified by the ordinal number 'ord' is in the active list...
const char * GetTitle() const
Returns title of object.
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo's. In case of error return 0.
const char *const kPROOF_DataDir
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
static TVirtualProofPlayer * Create(const char *player, TProof *p, TSocket *s=0)
Create a PROOF player.
virtual ~TProofServLogHandler()
Handle available message in the open file.
virtual ~TReaperTimer()
Destructor.
const char * GetSessionTag() const
virtual Int_t Fork()
Fork a child.
TList * GetList() const
Get list of available packages Returns a pointer to a TList object, transferring ownership to the cal...
Bool_t Notify()
Check if any of the registered children has changed its state.
TShutdownTimer(TProofServ *p, Int_t delay)
Construtor.
static TProofServ * This()
Static function returning pointer to global object gProofServ.
Bool_t RemoveDataSet(const char *group, const char *user, const char *dsName)
Removes the indicated dataset.
const char * GetName() const
Returns name of object.
virtual Int_t ReadFile(const char *fname, EEnvLevel level)
Read and parse the resource file for a certain level.
const char *const kPROOF_QueryLockFile
static Float_t GetMemStop()
MemStop getter.
void Sort(Bool_t useindex=kFALSE)
Sort the collection.
void Reset()
Reset the message buffer so we can use (i.e. fill) it again.
Int_t UploadPackage(const char *par, EUploadPackageOpt opt=kUntar, TList *workers=0)
Upload a PROOF archive (PAR file).
virtual void AddQueryResult(TQueryResult *q)=0
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
Named parameter, streamable and storable.
Int_t GetPriority()
Get the processing priority for the group the user belongs too.
Bool_t IsEndMaster() const
Int_t Build(const char *pack, Int_t opt=TPackMgr::kCheckROOT)
Method to build a package.
Bool_t IsInDir(const char *path)
Method to check if 'path' is in the managed directory Return kTRUE or kFALSE.
virtual const char * GetBuildArch() const
Return the build architecture.
virtual void Close(Option_t *opt="")
Close the socket.
static Int_t fgLogToSysLog
Int_t ReceiveFile(const char *file, Bool_t bin, Long64_t size)
Receive a file, either sent by a client or a master server.
virtual void Setenv(const char *name, const char *value)
Set environment variable.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Int_t DisablePackages()
Remove all packages.
Class managing the query-result area.
R__EXTERN TSystem * gSystem
Class providing the PROOF server.
if object ctor succeeded but object should not be used
static void GetLocalServer(TString &dsrv)
Extract LOCALDATASERVER info in 'dsrv'.
const char *const kPROOF_DataSetDir
TApplication * GetTProofServ(Int_t *argc, char **argv, FILE *flog)
TString GetFileName() const
void SetLearnTime(Double_t learnTime)
Long_t ExecPlugin(int nargs, const T &... params)
Long64_t GetNFiles() const
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
void Print(Option_t *option="") const
Prints the contents of the TFileCollection.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
void ShowPackages(Bool_t all=kFALSE, Bool_t redirlog=kFALSE)
List contents of package directory.
void SetPrefix(const char *pfx)
virtual Bool_t ExistsDataSet(const char *uri)
Checks if the indicated dataset exits.
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
Bool_t ParseUri(const char *uri, TString *dsGroup=0, TString *dsUser=0, TString *dsName=0, TString *dsTree=0, Bool_t onlyCurrent=kFALSE, Bool_t wildcards=kFALSE)
Parses a (relative) URI that describes a DataSet on the cluster.
Bool_t Notify()
Handle expiration of the idle timer. The session will just be terminated.
virtual TEnvRec * Lookup(const char *n)
Loop over all resource records and return the one with name.
virtual void SetDynamicPath(const char *pathname)
Set the dynamic path to a new value.
virtual Int_t ShowCache(const char *uri)
Show cached information matching uri.
void SetActive(Bool_t=kTRUE)
Int_t BuildPackage(const char *package, EBuildPackageOpt opt=kBuildAll, Int_t chkveropt=TPackMgr::kCheckROOT, TList *workers=0)
Build specified package.
Int_t WriteDataSet(const char *group, const char *user, const char *dsName, TFileCollection *dataset, UInt_t option=0, TMD5 *checksum=0)
Writes indicated dataset.
Int_t QueueQuery(TProofQueryResult *pq)
Add a query to the waiting list Returns the number of queries in the list.
const char *const kPROOF_CacheLockFile
TIdleTOTimer * fIdleTOTimer
Int_t ScanDataSet(const char *uri, const char *opt)
Scans the dataset indicated by 'uri' following the 'opts' directives.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual int Umask(Int_t mask)
Set the process file creation mode mask.
void SetRunning(Int_t startlog, const char *par, Int_t nwrks)
Call when running starts.
const char * GetConfDir() const
Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
See if the data is ready to be analyzed.
This class implements a plugin library manager.
virtual void SetMerging(Bool_t on=kTRUE)=0
void SaveSource(FILE *fp)
Save macro source in file pointer fp.
virtual Int_t Echo(const TObject *obj)
Sends an object to master and workers and expect them to send back a message with the output of its T...
virtual Int_t ReadFile(const char *filename)
Read lines in filename in this macro.
A TEventList object is a list of selected events (entries) in a TTree.
Int_t fSeqNum
query unique sequential number
virtual void HandleSigPipe()
Called when the client is not alive anymore (i.e.
Handles synchronous and a-synchronous timer events.
virtual const char * GetField(Int_t field)=0
static void AddEnvVar(const char *name, const char *value)
Add an variable to the list of environment variables passed to proofserv on the master and slaves...
TServerSocket * fMergingSocket
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
const Int_t kPROOF_Protocol
virtual Int_t Exec(const char *shellcmd)
Execute a command.
static Bool_t IsActive()
Static function that returns kTRUE in case we are a PROOF server.
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
virtual void Abort(int code=0)
Abort the application.
Bool_t FinalizeQuery(TProofQueryResult *pq, TProof *proof, TVirtualProofPlayer *player)
Final steps after Process() to complete the TQueryResult instance.
virtual void ProcessNext(TString *slb=0)
process the next query from the queue of submitted jobs.
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
void SetName(const char *name)
Int_t DrawQueries() const
Int_t SendFile(const char *file, Int_t opt=(kBinary|kForward|kCp|kCpBin), const char *rfile=0, TSlave *sl=0)
Send a file to master or slave servers.
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
void Warning(const char *location, const char *msgfmt,...)
TShutdownTimer * fShutdownTimer
Float_t GetUsedCPU() const
virtual void FreeDirectory(void *dirp)
Free a directory.
Int_t SendResults(TSocket *sock, TList *outlist=0, TQueryResult *pq=0)
Sends all objects from the given list to the specified socket.
virtual int ClosePipe(FILE *pipe)
Close the pipe.
virtual void ValidateDSet(TDSet *dset)
Validate a TDSet.
Long64_t GetFirst() const
void AddPid(Int_t pid)
Add an entry for 'pid' in the internal list.
virtual void HandleSubmerger(TMessage *mess)
Handle a message of type kPROOF_SUBMERGER.
char * StrDup(const char *str)
Duplicate the string str.
Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient=kFALSE)
Remove 'libpath' from the lib path search.
virtual void RemoveQueryResult(const char *ref)=0
virtual void HandleRetrieve(TMessage *mess, TString *slb=0)
Handle retrieve request.
Int_t SendCommand(const char *cmd, ESlaves list=kActive)
Send command to be executed on the PROOF master and/or slaves.
virtual Int_t HandleWorkerLists(TMessage *mess)
Handle here all requests to modify worker lists.
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=0)
Redirect standard output (stdout, stderr) to the specified file.
Int_t LoadPackage(const char *package, Bool_t notOnClient=kFALSE, TList *loadopts=0, TList *workers=0)
Load specified package.
TObject * Get(const char *namecycle)
Get object with name "name;cycle" (e.g.
TMacro * GetSelecImp() const
virtual int Chmod(const char *file, UInt_t mode)
Set the file permission bits. Returns -1 in case or error, 0 otherwise.
R__EXTERN TProof * gProof
Class used by TMap to store (key,value) pairs.
Int_t OldAuthSetup(TString &wconf)
Setup authentication related stuff for old versions.
virtual const char * GetIncludePath()
Get the list of include path.
Bool_t Notify()
Handle available message in the open file.
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
virtual Long64_t GetEventsProcessed() const =0
const char * GetService() const
void SetSysInfo(SysInfo_t si)
Setter for fSysInfo.
TVirtualProofPlayer * GetPlayer() const
virtual Func_t DynFindSymbol(const char *module, const char *entry)
Find specific entry point in specified library.
virtual const char * HostName()
Return the system's host name.
void Reset(const char *dir)
Reset PROOF environment to be ready for execution of next command.
char * DynamicPathName(const char *lib, Bool_t quiet=kFALSE)
Find a dynamic library called lib using the system search paths.
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
Int_t(* OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t, TString &, TString &, TString &)
Int_t Lock()
Locks the directory.
static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg)
Get the input data from the file defined in the input list.
void Run(Bool_t retrn=kFALSE)
Main server eventloop.
virtual void SetEntryList(TObject *aList)
Set entry (or event) list for this data set.
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
The TTimeStamp encapsulates seconds and ns since EPOCH.
This class controls a Parallel ROOT Facility, PROOF, cluster.
virtual void SetInputList(TList *in, Bool_t adopt=kTRUE)
Set / change the input list.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
virtual Int_t GetGid(const char *group=0)
Returns the group's id. If group = 0, returns current user's group.
virtual TQueryResult * GetCurrentQuery() const =0
void SetRunStatus(ERunStatus rst)
static Int_t RegisterGlobalPath(const char *paths)
Parse one or more paths as possible sources of packages Returns number of paths added; or -1 in case ...
void RemoveQuery(TQueryResult *qr, Bool_t soft=kFALSE)
Remove everything about query qr.
virtual TInetAddress GetSockName(int sock)
Get Internet Protocol (IP) address of host and port #.
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
virtual void Clear(Option_t *option="")
Remove all objects from the list.
virtual Int_t GetUid(const char *user=0)
Returns the user's id. If user = 0, returns current user's id.
void SetIdle(Bool_t st=kTRUE)
Change the idle status.
Int_t AddWorkers(TList *wrks)
Works on the master node only.
void SetArchived(const char *archfile)
Set (or update) query in archived state.
Bool_t Notify()
Handle expiration of the shutdown timer.
TQueryResult version adapted to PROOF neeeds.
static TMap * GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
Get a map {server-name, list-of-files} for collection 'fc' to be used in TPacketizerFile.
void SetLogger(TPackMgrLog_t logger)
void SaveQuery(TProofQueryResult *qr, const char *fout=0)
Save current status of query 'qr' to file name fout.
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex 'this' was created with.
static TString fgSysLogEntity
Int_t UnloadPackage(const char *package)
Unload specified package.
void Throw(int code)
If an exception context has been set (using the TRY and RETRY macros) jump back to where it was set...
Mother of all ROOT objects.
The purpose of this class is to provide a standard interface to static config files.
void TruncateLogFile()
Truncate the log file to the 80% of the required max size if this is set.
const char * GetImage() const
virtual Bool_t Add(const char *file, const char *objname=0, const char *dir=0, Long64_t first=0, Long64_t num=-1, const char *msd=0)
Add file to list of files to be analyzed.
Bool_t ExistsDataSet(const char *group, const char *user, const char *dsName)
Checks if the indicated dataset exits.
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Float_t GetCpuTime() const
Long64_t GetBytesRead() const
virtual void HandleGetTreeHeader(TMessage *mess)=0
virtual void Openlog(const char *name, Int_t options, ELogFacility facility)
Open connection to system log daemon.
void ShowEnabledPackages(Bool_t all=kFALSE)
List which packages are enabled.
Bool_t R_ISDIR(Int_t mode)
const TString & GetWorkDir() const
virtual Int_t CreateServer()
Finalize the server setup.
virtual Long64_t GetCacheSize()=0
virtual void Add(TObject *obj)
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Int_t GetSessionStatus()
Return the status of this session: 0 idle 1 running 2 being terminated (currently unused) 3 queued 4 ...
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
const char * GetWorkDir() const
virtual void SetOutputFilePath(const char *fp)=0
TFileCollection * GetDataSet(const char *uri, const char *srv=0)
Utility function used in various methods for user dataset upload.
TProofQueryResult * MakeQueryResult(Long64_t nentries, const char *opt, TList *inl, Long64_t first, TDSet *dset, const char *selec, TObject *elist)
Create a TProofQueryResult instance for this query.
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Int_t CleanupQueriesDir()
Remove all queries results referring to previous sessions.
R__EXTERN Int_t gProofDebugLevel
void WriteObject(const TObject *obj)
Write object to message buffer.
virtual Int_t Setup()
Print the ProofServ logo on standard output.
TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx="")
Execute 'cmd' in a pipe and handle output messages from the related file.
void AskStatistics()
Ask the for the statistics of the slaves.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg)
Save input data file from 'cachedir' into the sandbox or create a the file with input data objects...
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Int_t Remove(const char *pack=0, Bool_t dolock=kTRUE)
Remove package 'pack' If 'pack' is null or empty all packages are cleared.
virtual void DeletePlayer()
Delete player instance.
Int_t AddDynamicPath(const char *libpath, Bool_t onClient=kFALSE, TList *wrks=0, Bool_t doCollect=kTRUE)
Add 'libpath' to the lib path search.
virtual TProofProgressStatus * GetProgressStatus() const =0
Bool_t IsParallel() const
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
virtual void AddIncludePath(const char *includePath)
Add includePath to the already set include path.
const char * GetUser() const
virtual Int_t AddOutputObject(TObject *obj)=0
const AParamType & GetVal() const
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
TMD5 * GetMD5(const char *pack)
Get MD5 checksum of the PAR file corresponding to given package Returns a pointer to a TMD5 object...
static void ResetErrno()
Static function resetting system error number.
static void SetLastMsg(const char *lastmsg)
Set the message to be sent back in case of exceptions.
Class describing a generic file including meta information.
static void SetDefaultPrefix(const char *pfx)
Static method to set the default prefix.
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
void RestartComputeTime()
Reset the compute time.
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Abstract interface for the PROOF player.
virtual void Terminate(Int_t status)
Terminate the proof server.
virtual ~TProofServLogHandlerGuard()
Close a guard for executing a command in a pipe.
virtual void Fatal(const char *method, const char *msgfmt,...) const
Issue fatal error message.
const Bool_t kIterBackward
virtual Int_t SavePartialResults(Bool_t queryend=kFALSE, Bool_t force=kFALSE)=0
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
void Print(Option_t *option="") const
Print status of slave server.
void SendStatistics()
Send statistics of slave server to master or client.
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client...
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
Int_t Substitute(TString &s, const TString &r, Bool_t doDollarSubst=kTRUE)
Substitute matching part of s with r, dollar back-ref substitution is performed if doDollarSubst is t...
const char * GetDataDir() const
const char * GetUser() const
Container class for processing statistics.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
static void ErrorHandler(Int_t level, Bool_t abort, const char *location, const char *msg)
The PROOF error handler function.
const char * GetObjName() const
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Int_t ClearPackage(const char *package)
Remove a specific package.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Bool_t IsParallel() const
True if in parallel mode.
Int_t CleanupWaitingQueries(Bool_t del=kTRUE, TList *qls=0)
Cleanup the waiting queries list.
virtual void TurnOn()
Add the timer to the system timer list.
A List of entry numbers in a TTree or TChain.
Int_t DeactivateWorker(const char *ord, Bool_t save=kTRUE)
Remove the worker identified by the ordinal number 'ord' from the the active list.
TList * GetListOfActiveSlaves() const
virtual TMap * GetDataSets(const char *uri, UInt_t=TDataSetManager::kExport)
Returns all datasets for the <group> and <user> specified by <uri>.
static TString fgSysLogService
const char * GetOrdinal() const
TProofServ(Int_t *argc, char **argv, FILE *flog=0)
Main constructor.
TProofLockPath * fCacheLock
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
virtual const char * GetTitle() const
Returns title of object.
TQueryResultManager * fQMgr
const char * GetPrefix() const
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
TProofLockPath * fQueryLock
static Long64_t fgLastEntry
const char *const kPROOF_CacheDir
virtual TSQLRow * Next()=0
const char *const kPROOF_PackDir
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.