22#include "RConfigure.h"
46#if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
47 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
48 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
50#define lockf(fd, op, sz) flock((fd), (op))
52#define F_LOCK (LOCK_EX | LOCK_NB)
55#define F_ULOCK LOCK_UN
87#include "compiledata.h"
126static const char *__crashreporter_info__ = 0;
127asm(
".desc ___crashreporter_info__, 0x10");
165 Printf(
"Received SIGTERM: terminating");
257 Error(
"TProofServLogHandler",
"executing command in pipe");
261 Error(
"TProofServLogHandler",
262 "undefined command (%p) or socket (%p)", (
int *)cmd, s);
280 Error(
"TProofServLogHandler",
"undefined file (%p) or socket (%p)",
f, s);
293 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
311 if ((plf = strchr(
line,
'\n')))
361 Error(
"TProofServLogHandlerGuard",
"invalid handler");
365 Error(
"TProofServLogHandlerGuard",
"undefined command");
381 Error(
"TProofServLogHandlerGuard",
"invalid handler");
385 Error(
"TProofServLogHandlerGuard",
"undefined file");
419 printf(
"TShutdownTimer::Notify: checking activity on the input socket\n");
429 printf(
"TShutdownTimer::Notify: input socket: %p: did not show any activity"
430 " during the last %d mins: aborting\n", xs,
fTimeout);
436 printf(
"TShutdownTimer::Notify: input socket: %p: show activity"
437 " %ld secs ago\n", xs, dt / 60000);
467 spid.
Form(
"%d", pid);
487 pid = waitpid(
p->GetVal(), &status, WNOHANG);
488 }
while (pid < 0 && errno == EINTR);
491 pid = _cwait(&status, (intptr_t)
p->GetVal(), 0);
493 if (pid > 0 && pid ==
p->GetVal()) {
517 Info (
"Notify",
"session idle for more then %lld secs: terminating",
Long64_t(
fTime)/1000);
523 Warning(
"Notify",
"problems updating session status (errno: %d)", -uss_rc);
527 msg.
Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
528 "// Please IGNORE any error message possibly displayed below\n//",
531 msg.
Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
539 Warning(
"Notify",
"fProofServ undefined!");
566 Printf(
"proofserv: command line testing: OK");
578 if (fgVirtMemMax < 0 && gSystem->Getenv(
"PROOF_VIRTMEMMAX")) {
580 if (mmx < kMaxLong && mmx > 0)
584 if (fgVirtMemMax < 0 && gSystem->Getenv(
"ROOTPROOFASHARD")) {
586 if (mmx < kMaxLong && mmx > 0)
591 if (fgResMemMax < 0 && gSystem->Getenv(
"PROOF_RESMEMMAX")) {
593 if (mmx < kMaxLong && mmx > 0)
601 Warning(
"TProofServ",
"requested memory fraction threshold to stop processing"
602 " (MemStop) out of range [0,1] - ignoring");
606 Warning(
"TProofServ",
"requested memory fraction threshold for warning and finer monitoring"
607 " (MemHWM) out of range [0,MemStop] - ignoring");
614 if ((
gEnv->
GetValue(
"Proof.GdbHook",0) == 3 && !test) ||
621 if (argc && *argc >= 4)
622 if (!strcmp(argv[3],
"test"))
626 if (argc && *argc < 2) {
627 Error(
"TProofServ",
"Must have at least 1 arguments (see proofd).");
735 Warning(
"TProofServ",
"bad formatted log file size limit ignored: '%s'", logmx.
Data());
755 char c = (slog[0] ==
'M' || slog[0] ==
'm') ?
'm' :
'a';
756 c = (slog[0] ==
'W' || slog[0] ==
'w') ?
'w' :
c;
763 Warning(
"TProofServ",
"request for syslog logging ineffective!");
778 if (enableSchemaEvolution) {
781 Info(
"TProofServ",
"automatic schema evolution in TMessage explicitly disabled");
794 if (opensock.
Length() <= 0)
798 Fatal(
"CreateServer",
"Invalid socket descriptor number (%d)", sock);
822 Info(
"CreateServer",
"Service %s ConfDir %s IsMaster %d\n",
909 TString master =
"proof://__master__";
913 master +=
a.GetPort();
919 Error(
"CreateServer",
"no plugin manager found");
928 Error(
"CreateServer",
"no plugin found for TProof with a"
936 if (
h->LoadPlugin() == -1) {
937 Error(
"CreateServer",
"plugin for TProof could not be loaded");
949 Error(
"CreateServer",
"plugin for TProof could not be executed");
972 msg.
Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
973 " This may generate compatibility problems between streamed objects.\n"
974 " The advise is to move to ROOT >= 5.21/02 .");
986 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
988 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
1032 motdname +=
"/etc/proof/noproof";
1034 if ((motd = fopen(motdname,
"r"))) {
1037 while ((
c = getc(motd)) != EOF)
1049 Long_t id, flags, modtime, lasttime = 0;
1054 if (time(0) - lasttime > (time_t)86400)
1063 motdname +=
"/etc/proof/motd";
1066 if (modtime > lasttime || show) {
1067 if ((motd = fopen(motdname,
"r"))) {
1070 while ((
c = getc(motd)) != EOF)
1080 Int_t fd = creat(lastname.
Data(), 0600);
1081 if (fd >= 0) close(fd);
1094 Error(
"Get",
"problems sending request");
1112 Error(
"Get",
"command %d cannot be executed while processing",
what);
1113 }
else if (xrc == -2) {
1114 Error(
"Get",
"unknown command %d ! Protocol error?",
what);
1132 Info(
"RestartComputeTime",
"compute time restarted after %f secs (%d entries)",
1161 Error(
"GetNextPacket",
"no progress status object");
1178 req << cacheSize << learnent;
1183 req << totalEntries;
1190 Info(
"GetNextPacket",
"cacheSize: %lld, learnent: %d", cacheSize, learnent);
1198 << bytesRead << totalEntries;
1206 Error(
"GetNextPacket",
"Send() failed, returned %d", rc);
1214 Warning(
"GetNextPacket",
"problems saving partial results");
1225 Error(
"GetNextPacket",
"Recv() failed, returned %d", rc);
1241 PDB(kLoop, 2)
Info(
"GetNextPacket",
"'%s' '%s' '%s' %lld %lld",
1242 e->GetFileName(),
e->GetDirectory(),
1243 e->GetObjName(),
e->GetFirst(),
e->GetNum());
1245 PDB(kLoop, 2)
Info(
"GetNextPacket",
"Done");
1255 PDB(kLoop, 2)
Info(
"GetNextPacket:kPROOF_STOPPROCESS",
"received");
1261 Error(
"GetNextPacket",
"command %d cannot be executed while processing",
what);
1262 }
else if (xrc == -2) {
1263 Error(
"GetNextPacket",
"unknown command %d ! Protocol error?",
what);
1282 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3],
"test")) ?
kTRUE :
kFALSE;
1285 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1286 Printf(
"proofserv: command line testing: OK");
1290 if (!argc || (argc && *argc <= 1)) {
1291 Fatal(
"GetOptions",
"Must be started from proofd with arguments");
1295 if (!strcmp(argv[1],
"proofserv")) {
1298 }
else if (!strcmp(argv[1],
"proofslave")) {
1302 Fatal(
"GetOptions",
"Must be started as 'proofserv' or 'proofslave'");
1310 Fatal(
"GetOptions",
"ROOTCONFDIR shell variable not set");
1341 Error(
"HandleSocketInput",
"retrieving message from input socket");
1362 emsg.
Form(
"HandleSocketInput: command %d cannot be executed while processing",
what);
1363 }
else if (rc == -3) {
1364 emsg.
Form(
"HandleSocketInput: message %d undefined! Protocol error?",
what);
1366 emsg.
Form(
"HandleSocketInput: unknown command %d! Protocol error?",
what);
1369 }
else if (rc == 2) {
1373 Info(
"HandleSocketInput",
"message of type %d enqueued; sz: %d",
1383 Info(
"HandleSocketInput",
"processing enqueued message of type %d; left: %d",
1393 }
catch (std::bad_alloc &) {
1395 exmsg.
Form(
"caught exception 'bad_alloc' (memory leak?) %s %lld",
1397 }
catch (std::exception &exc) {
1399 exmsg.
Form(
"caught standard exception '%s' %s %lld",
1403 exmsg.
Form(
"caught exception throwing %d %s %lld",
1405 }
catch (
const char *str) {
1407 exmsg.
Form(
"caught exception throwing '%s' %s %lld",
1411 exmsg.
Form(
"caught exception <unknown> %s %lld",
1418 Error(
"HandleSocketInput",
"%s", exmsg.
Data());
1429 exmsg.
Form(
"high-memory footprint detected during Process(...) - terminating");
1430 Error(
"HandleSocketInput",
"%s", exmsg.
Data());
1445 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1446 SendAsynMessage(
" *** No workers left: cannot continue! Terminating ... *** ");
1473 if (!mess)
return -3;
1477 Info(
"HandleSocketInput",
"processing message type %d from '%s'",
1482 Int_t rc = 0, lirc = 0;
1500 Info(
"HandleSocketInput:kMESS_CINT",
"processing: %s...", str);
1519 if (pslb) slb = str;
1556 Info(
"HandleSocketInput:kPROOF_LOGLEVEL",
"debug level set to %d (mask: 0x%x)",
1587 Warning(
"HandleSocketInput:kPROOF_STATUS",
1588 "kPROOF_STATUS message is obsolete");
1590 Warning(
"HandleSocketInput:kPROOF_STATUS",
"problem sending of request");
1607 Info(
"HandleSocketInput:kPROOF_STOP",
"request for worker %s", ord.
Data());
1611 Info(
"HandleSocketInput:kPROOF_STOP",
"got request to terminate");
1624 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
"enter");
1631 Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
1632 "recursive mode: enter %d, %ld", aborted, timeout);
1646 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_PROCESS",
"enter");
1656 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_SENDOUTPUT",
1657 "worker was asked to send output to master");
1660 Error(
"HandleSocketInput:kPROOF_SENDOUTPUT",
"problems sending output list");
1705 Info(
"HandleSocketInput:kPROOF_MAXQUERIES",
"Enter");
1717 Info(
"HandleSocketInput:kPROOF_CLEANUPSESSION",
"Enter");
1721 Printf(
"Session %s cleaned up", stag.
Data());
1723 Printf(
"Could not cleanup session %s", stag.
Data());
1733 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Enter");
1741 (*mess) >> isTree >>
filename >> dir >> objname;
1742 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1743 "Report size of object %s (%s) in dir %s in file %s",
1744 objname.Data(), isTree ?
"T" :
"O",
1747 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1748 "Found %lld %s", entries, isTree ?
"entries" :
"objects");
1753 answ << entries << objname;
1756 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Done");
1781 sscanf(str,
"%1023s %d %ld %d",
name, &bin, &
size, &fw);
1783 sscanf(str,
"%1023s %d %ld",
name, &bin, &
size);
1810 Info(
"HandleSocketInput",
"forwarding file: %s", fnam.
Data());
1811 if (
fProof->
SendFile(fnam, opt, (copytocache ?
"cache" :
"")) < 0) {
1812 Error(
"HandleSocketInput",
"forwarding file: %s", fnam.
Data());
1826 (*mess) >> start >> end;
1828 Info(
"HandleSocketInput:kPROOF_LOGFILE",
1829 "Logfile request - byte range: %d - %d", start, end);
1860 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_CACHE",
"enter");
1873 Warning(
"HandleSocketInput:kPROOF_WORKERLISTS",
1874 "Action meaning-less on worker nodes: protocol error?");
1885 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Enter");
1899 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1900 "adding a list of worker nodes returned: %d", ret);
1903 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1904 "getting list of worker nodes returned: %d", retVal);
1924 answ << (
TList *)info;
1930 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Done");
1941 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Enter");
1945 p->HandleGetTreeHeader(mess);
1948 Error(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"could not create TProofPlayer instance!");
1951 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Done");
1961 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Enter");
1962 TList* outputList = 0;
1966 outputList =
new TList();
1968 outputList =
new TList();
1973 while ( (o = next()) ) {
1983 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Done");
1990 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Enter");
2003 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Done");
2013 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Enter");
2016 Long64_t totalbytes = 0, bytesready = 0;
2018 answ << dataready << totalbytes << bytesready;
2020 Error(
"HandleSocketInput:kPROOF_DATA_READY",
2021 "This message should not be sent to slaves");
2025 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Done");
2041 Error(
"HandleSocketInput",
"old client: no or incompatible dataset support");
2066 Info(
"HandleSocketInput:kPROOF_REALTIMELOG",
2067 "setting real-time logging %s", (
on ?
"ON" :
"OFF"));
2090 Error(
"HandleSocketInput",
"no queries enqueued");
2103 Error(
"HandleSocketInput",
"adding a list of worker nodes returned: %d", ret);
2116 Error(
"HandleSocketInput",
"error getting list of worker nodes");
2118 Warning(
"HandleSocketInput",
"query was re-queued!");
2120 Error(
"HandleSocketInput",
"unexpected answer: %d", retVal);
2141 " idle or undefined player - ignoring");
2167 smsg.
Form(
"Echo response from %s:%s: %s",
2178 Error(
"HandleSocketInput",
"Can't redirect output");
2193 smsg.
Form(
"*** Echo response from %s:%s ***\n",
2216 Error(
"HandleSocketInput",
"unknown command %d",
what);
2241 Int_t mergedWorkers = 0;
2243 PDB(kSubmerger, 1)
Info(
"AcceptResults",
"enter");
2251 Int_t numworkers = 0;
2256 Info(
"AcceptResults",
"interrupt!");
2264 if (sw && sw != (
TSocket *)(-1)) {
2268 Info(
"AcceptResults",
"connection from a worker accepted on merger %s ",
2271 if (++numworkers >= connections)
2275 Info(
"AcceptResults",
"spurious signal found of merging socket");
2278 if (s->
Recv(mess) < 0) {
2279 Error(
"AcceptResults",
"problems receiving message");
2283 Info(
"AcceptResults",
"message received: %d ", (mess ? mess->
What() : 0));
2285 Error(
"AcceptResults",
"message received: %p ", mess);
2294 PDB(kSubmerger, 2)
Info(
"AcceptResults",
" type %d ",
type);
2298 Info(
"AcceptResults",
2299 "a new worker has been mergerd. Total merged workers: %d",
2305 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"removing %p (has been merged)", o);
2308 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"%p not merged yet", o);
2318 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"closing socket");
2326 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"exit: %d",
result);
2336 Int_t n, nch, wasted = 0;
2338 const Int_t kBufSize = 1024;
2339 char waste[kBufSize];
2345 Info(
"HandleUrgentData",
"handling oob...");
2366 if (nch > kBufSize) nch = kBufSize;
2369 Error(
"HandleUrgentData",
"error receiving waste");
2374 Error(
"HandleUrgentData",
"error receiving OOB");
2380 Info(
"HandleUrgentData",
"got OOB byte: %d\n", oob_byte);
2387 Info(
"HandleUrgentData",
"*** Hard Interrupt");
2404 Error(
"HandleUrgentData",
"error sending OOB");
2415 if (nch > kBufSize) nch = kBufSize;
2418 Error(
"HandleUrgentData",
"error receiving waste (2)");
2428 Info(
"HandleUrgentData",
"Soft Interrupt");
2435 Error(
"HandleUrgentData",
"soft interrupt flushed stream");
2446 Info(
"HandleUrgentData",
"Shutdown Interrupt");
2457 Error(
"HandleUrgentData",
"unexpected OOB byte");
2477 Info(
"HandleSigPipe",
"keepAlive probe failed");
2486 Info(
"HandleSigPipe",
"keepAlive probe failed");
2529 if ((freopen(logfile,
mode, stdout)) == 0)
2530 SysError(
"RedirectOutput",
"could not freopen stdout (%s)", logfile);
2532 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2533 SysError(
"RedirectOutput",
"could not redirect stderr");
2535 if ((
fLogFile = fopen(logfile,
"r")) == 0)
2536 SysError(
"RedirectOutput",
"could not open logfile '%s'", logfile);
2540 Warning(
"RedirectOutput",
"no way to tell master (or client) where"
2541 " to upload packages");
2556 dd.
Replace(0, ic,
"proofserv");
2580 if (
size <= 0)
return 0;
2583 Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2585 SysError(
"ReceiveFile",
"error opening file %s", file);
2589 const Int_t kMAXBUF = 16384;
2590 char buf[kMAXBUF], cpy[kMAXBUF];
2595 while (filesize <
size) {
2608 Int_t k = 0, i = 0, j = 0;
2615 cpy[j++] = buf[i++];
2619 w = write(fd,
q,
r);
2621 w = write(fd,
p,
r);
2625 SysError(
"ReceiveFile",
"error writing to file %s", file);
2633 Error(
"ReceiveFile",
"error during receiving file %s", file);
2641 if (chmod(file, 0644) != 0)
2642 Warning(
"ReceiveFile",
"error setting mode 0644 on file %s", file);
2681 off_t ltot=0, lnow=0;
2686 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2689 if (ltot >= 0 && lnow >= 0) {
2692 if (end <= start || end > ltot)
2694 left = (
Int_t)(end - start);
2699 left = (
Int_t)(ltot - lnow);
2706 SysError(
"SendLogFile",
"error sending kPROOF_LOGFILE");
2710 const Int_t kMAXBUF = 32768;
2712 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2720 SysError(
"SendLogFile",
"error reading log file");
2724 if (end == ltot &&
len == wanted)
2728 SysError(
"SendLogFile",
"error sending log file");
2734 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2736 }
while (
len > 0 && left > 0);
2740 if (adhoc && lnow >=0 )
2747 mess << status << (
Int_t) 1;
2750 SysError(
"SendLogFile",
"error sending kPROOF_LOGDONE");
2754 PDB(kGlobal, 1)
Info(
"SendLogFile",
"kPROOF_LOGDONE sent");
2771 mess << bytesread << realtime << cputime << workdir;
2782 Int_t nparallel = 0;
2785 Info(
"SendParallel",
"Will invoke AskParallel()");
2788 Info(
"SendParallel",
"Will invoke GetParallel()");
2795 mess << nparallel << async;
2814 Error(
"Setup",
"failed to send proof server startup message");
2822 Error(
"Setup",
"failed to receive remote proof protocol");
2826 Error(
"Setup",
"failed to send local proof protocol");
2834 Error(
"Setup",
"OldAuthSetup: failed to setup authentication");
2853 Error(
"Setup",
"failed to receive ordinal and config info");
2882 if (tmpWorkDir !=
"")
2886 Info(
"Setup",
"invalid config file %s (missing or unreadable",
2932 Error(
"Setup",
"common setup failed");
2965 if (paths.
Length() > 0) {
2969 else if (paths.
Contains(
"<compiler>"))
2978 if (!bindir.
IsNull()) bindir +=
":";
2980 }
else if (icomp == -1) {
2981 if (!path.
IsNull()) path +=
":";
2989 else if (paths.
Contains(
"<sysbin>"))
2993 if (!bindir.
IsNull()) bindir +=
":";
2994 bindir +=
"/bin:/usr/bin:/usr/local/bin";
2995 }
else if (isysb == -1) {
2996 if (!path.
IsNull()) path +=
":";
2997 path +=
"/bin:/usr/bin:/usr/local/bin";
3002 if (!bindir.
IsNull()) bindir +=
":";
3010 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3019 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3054 const char *k = (
IsMaster()) ?
"Mst" :
"Wrk";
3058 Info(
"SetupCommon",
"package directory set to %s", packdir.
Data());
3073 Warning(
"SetupCommon",
"problems creating path '%s' (errno: %d)",
3081 if (!dataDirOpts.
IsNull()) {
3089 if (dataDirOpts.
Contains(
"g"))
m = 0775;
3092 Info(
"SetupCommon",
"requested mode for data directories is '%o'",
m);
3099 if (subp.
IsNull())
continue;
3104 Warning(
"SetupCommon",
"problems setting mode '%o' on path '%s' (errno: %d)",
3111 Warning(
"SetupCommon",
"problems stat-ing path '%s' (errno: %d; datadir: %s)",
3124 Info(
"SetupCommon",
" %d global package directories registered", nglb);
3133 Error(
"SetupCommon",
"can not change to working directory '%s'",
3186 while (dsms.
Tokenize(dsm, from,
",")) {
3188 Warning(
"SetupCommon",
"a valid dataset manager already initialized");
3189 Warning(
"SetupCommon",
"support for multiple managers not yet available");
3193 if (
gROOT->GetPluginManager()) {
3195 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
3196 if (
h &&
h->LoadPlugin() != -1) {
3206 Warning(
"SetupCommon",
"dataset manager plug-in initialization failed");
3207 SendAsynMessage(
"TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3223 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
3224 if (
h &&
h->LoadPlugin() == -1)
h = 0;
3233 Warning(
"SetupCommon",
"default dataset manager plug-in initialization failed");
3239 if (!dsReqCfg.
IsNull()) {
3240 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
3242 if (reReqDir.
Match(dsReqCfg) == 5) {
3244 dsDirFmt.
Form(
"dir:%s perms:open", reReqDir[3].Data());
3250 "failed init of dataset staging requests repository");
3255 "specify, with [dir:]<path>, a valid path for staging requests");
3258 Warning(
"SetupCommon",
"no repository for staging requests available");
3272 while (quotas.
Tokenize(tok, from,
" ")) {
3280 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.
Data());
3283 const char *ksz[2] = {
"hwmsz=",
"maxsz="};
3284 for (
Int_t j = 0; j < 2; j++) {
3291 const char *s[3] = {
"k",
"m",
"g"};
3292 Int_t i = 0, ki = 1024;
3307 TString ssz(ksz[j], strlen(ksz[j])-1);
3308 Info(
"SetupCommon",
"parsing '%s' : ignoring token %s", ssz.
Data(), tok.
Data());
3318 Warning(
"SetupCommon",
"problems applying fMaxQueries");
3338 if (!
name.IsNull()) {
3360 Info(
"SetupCommon",
"successfully completed");
3380 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3414 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.
Data());
3421 while ((fh = next())) {
3439 if (!path || strlen(path) <= 0)
return kFALSE;
3445 const char *ent = 0;
3447 if (!strcmp(ent,
".") || !strcmp(ent,
".."))
continue;
3448 fpath.
Form(
"%s/%s", path, ent);
3465 Warning(
"UnlinkDataDir",
"data directory '%s' is empty but could not be removed", path);
3496 if (!oldAuthSetupHook) {
3498 TString authlib =
"libRootAuth";
3504 Error(
"OldAuthSetup",
"can't load %s",authlib.
Data());
3508 Error(
"OldAuthSetup",
"can't locate %s",authlib.
Data());
3517 Error(
"OldAuthSetup",
"can't find OldProofServAuthSetup");
3533 TDSet *dset,
const char *selec,
3550 fst, dset, selec, elist);
3568 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3572 Info(
"SetQueryRunning",
"starting query: %d", pq->
GetSeqNum());
3600 Info(
"HandleArchive",
"Enter");
3604 (*mess) >> queryref >> path;
3606 if (slb) slb->
Form(
"%s %s", queryref.
Data(), path.
Data());
3609 if (queryref ==
"Default") {
3611 Info(
"HandleArchive",
3621 if (path.
Length() <= 0) {
3623 Info(
"HandleArchive",
3624 "archive paths are not defined - do nothing");
3628 path.
Form(
"%s/session-%s-%d.root",
3639 if (!pqr || qry < 0) {
3641 fout +=
"/query-result.root";
3647 TIter nxk(
f->GetListOfKeys());
3649 while ((k = (
TKey *)nxk())) {
3659 Info(
"HandleArchive",
3660 "file cannot be open (%s)",fout.
Data());
3667 PDB(kGlobal, 1)
Info(
"HandleArchive",
3668 "archive path for query #%d: %s",
3675 if (!farc || !(farc->
IsOpen())) {
3676 Info(
"HandleArchive",
3677 "archive file cannot be open (%s)",path.
Data());
3691 if (qry > -1 &&
fQMgr)
3695 Info(
"HandleArchive",
3696 "results of query %s archived to file %s",
3716 emsg.
Form(
"file collection undefined!");
3759 Info(
"HandleProcess",
"Enter");
3784 if ((!hasNoData) && elist)
3795 Error(
"HandleProcess",
"AssertDataSet: %s", emsg.
Data());
3800 }
else if (hasNoData) {
3802 TNamed *ftp =
dynamic_cast<TNamed *
>(
input->FindObject(
"PROOF_FilesToProcess"));
3810 emsg.
Form(
"dataset manager not initialized!");
3815 emsg.
Form(
"requested dataset '%s' does not exists", dsn.
Data());
3822 fcmap->
SetName(
"PROOF_FilesToProcess");
3830 Error(
"HandleProcess",
"%s", emsg.
Data());
3846 if (dset)
input->Add(dset);
3847 if (elist)
input->Add(elist);
3851 input->Clear(
"nodelete");
3856 Warning(
"HandleProcess",
"could not save input data: %s", emsg.
Data());
3882 Error(
"HandleProcess",
"error getting list of worker nodes");
3889 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3893 Error(
"HandleProcess",
"Adding a list of worker nodes returned: %d",
3903 Error(
"HandleProcess",
"error getting list of worker nodes");
3910 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3912 Error(
"HandleProcess",
"unknown return value: %d", retVal);
3923 if (!sync || enqueued) {
3931 Info(
"HandleProcess",
3998 Warning(
"HandleProcess",
"could not get input data: %s", emsg.
Data());
4002 Warning(
"HandleProcess",
"could not get query sequential number!");
4006 while ((nord =
input->FindObject(
"PROOF_Ordinal")))
4007 input->Remove(nord);
4013 while ((o = next())) {
4014 PDB(kGlobal, 2)
Info(
"HandleProcess",
"adding: %s", o->
GetName());
4022 while ((obj = nxt())){
4026 Info(
"HandleProcess",
"selector obj for '%s' found", selector_obj->
ClassName());
4042 Info(
"HandleProcess",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4046 Info(
"HandleProcess",
"calling fPlayer->Process() with selector name: %s",
filename.Data());
4058 m << status << abort;
4071 Info(
"TProofServ::Handleprocess",
4072 "worker %s has finished processing with %d objects in output list",
4092 if (!isSubMerging) {
4112 Info(
"HandleProcess",
"controlled mode: worker %s has finished,"
4119 Info(
"HandleProcess",
"submerging disabled because of high-memory case");
4122 PDB(kGlobal, 2)
Info(
"HandleProcess",
"merging mode check: %d", isSubMerging);
4138 Int_t merge_port = 0;
4141 Info(
"HandleProcess",
"possible port for merging connections: %d",
4145 msg_osize << merge_port;
4159 PDB(kGlobal, 2)
Info(
"HandleProcess",
"sending result directly to master");
4161 Warning(
"HandleProcess",
"problems sending output list");
4183 Warning(
"HandleProcess",
"the output list is empty!");
4185 Warning(
"HandleProcess",
"problems sending output list");
4202 while ((obj = nex())) {
4210 TList *added =
dynamic_cast<TList *
>(
input->FindObject(
"PROOF_InputObjsFromFile"));
4218 while ((o = nxo())) {
input->Remove(o); }
4219 input->Remove(added);
4235 PDB(kGlobal, 1)
Info(
"HandleProcess",
"done");
4246 PDB(kOutput, 2)
Info(
"SendResults",
"enter");
4257 msg.
Form(
"%s: merging output objects ... done ",
4261 msg.
Form(
"%s: objects merged; sending output: %d objs",
fPrefix.
Data(), olsz);
4266 if (sock->
Send(mbuf) < 0)
return -1;
4272 Int_t totsz = 0, objsz = 0;
4274 while ((o = nxo())) {
4278 "message has %d bytes: limit of %lld bytes reached - sending ...",
4291 msg.
Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4295 if (sock->
Send(mbuf) < 0)
return -1;
4302 mbuf << (
Int_t) ((ns >= olsz) ? 2 : 1);
4317 msg.
Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4321 if (sock->
Send(mbuf) < 0)
return -1;
4325 msg.
Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4337 msg.
Form(
"%s: merging output objects ... done ",
4341 msg.
Form(
"%s: objects merged; sending output: %d objs",
fPrefix.
Data(), olsz);
4346 if (sock->
Send(mbuf) < 0)
return -1;
4350 Int_t totsz = 0, objsz = 0;
4353 while ((o = nxo())) {
4370 msg.
Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4374 if (sock->
Send(mbuf) < 0)
return -1;
4379 msg.
Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4393 msg.
Form(
"%s: sending output: %d objs, %d bytes",
fPrefix.
Data(), olsz, blen);
4395 if (sock->
Send(mbuf) < 0)
return -1;
4399 PDB(kGlobal, 2)
Info(
"SendResults",
"sending output list");
4401 PDB(kGlobal, 2)
Info(
"SendResults",
"notifying failure or abort");
4406 PDB(kOutput,2)
Info(
"SendResults",
"done");
4458 Error(
"ProcessNext",
"no TDset object: cannot continue");
4480 while ((obj = nxt())){
4484 Info(
"ProcessNext",
"found object for selector '%s'", obj->
ClassName());
4491 Error(
"ProcessNext",
"empty waiting queries list!");
4537 if (
gEnv->
Lookup(
"Proof.UseMergers") && !
input->FindObject(
"PROOF_UseMergers")) {
4541 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"PROOF_UseMergers set to %d", smg);
4547 if ((o =
input->FindObject(
"PROOF_MergersByHost"))) {
input->Remove(o);
delete o; }
4549 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"submergers setup by host/node");
4558 while ((o = next())) {
4564 if ((o =
input->FindObject(
"MissingFiles")))
input->Remove(o);
4569 Info(
"ProcessNext",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4573 Info(
"ProcessNext",
"calling fPlayer->Process() with selector name: %s",
filename.Data());
4588 m << status << abort;
4604 Warning(
"ProcessNext",
"problems registering produced datasets: %s", emsg.
Data());
4625 while ((xo = nxo())) {