22 #include "RConfigure.h" 35 #include <sys/types.h> 48 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \ 49 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \ 50 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3))) 52 #define lockf(fd, op, sz) flock((fd), (op)) 54 #define F_LOCK (LOCK_EX | LOCK_NB) 57 #define F_ULOCK LOCK_UN 88 #include "compiledata.h" 131 static const char *__crashreporter_info__ = 0;
132 asm(
".desc ___crashreporter_info__, 0x10");
168 Bool_t TProofServTerminationHandler::Notify()
170 Printf(
"Received SIGTERM: terminating");
171 fServ->HandleTermination();
189 Bool_t TProofServInterruptHandler::Notify()
191 fServ->HandleUrgentData();
212 Bool_t TProofServSigPipeHandler::Notify()
214 fServ->HandleSigPipe();
227 Bool_t ReadNotify() {
return Notify(); }
233 Bool_t TProofServInputHandler::Notify()
235 fServ->HandleSocketInput();
262 Error(
"TProofServLogHandler",
"executing command in pipe");
266 Error(
"TProofServLogHandler",
267 "undefined command (%p) or socket (%p)", (
int *)cmd, s);
285 Error(
"TProofServLogHandler",
"undefined file (%p) or socket (%p)", f, s);
298 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
315 while (fgets(line,
sizeof(line),
fFile)) {
316 if ((plf = strchr(line,
'\n')))
320 if (
fPfx.Length() > 0) {
322 log.Form(
"%s: %s",
fPfx.Data(),
line);
323 }
else if (
fgPfx.Length() > 0) {
358 const char *pfx,
Bool_t on)
363 if (fExecHandler->IsValid()) {
366 Error(
"TProofServLogHandlerGuard",
"invalid handler");
370 Error(
"TProofServLogHandlerGuard",
"undefined command");
378 const char *pfx,
Bool_t on)
383 if (fExecHandler->IsValid()) {
386 Error(
"TProofServLogHandlerGuard",
"invalid handler");
390 Error(
"TProofServLogHandlerGuard",
"undefined file");
399 if (fExecHandler && fExecHandler->IsValid()) {
424 printf(
"TShutdownTimer::Notify: checking activity on the input socket\n");
434 printf(
"TShutdownTimer::Notify: input socket: %p: did not show any activity" 435 " during the last %d mins: aborting\n", xs,
fTimeout);
441 printf(
"TShutdownTimer::Notify: input socket: %p: show activity" 442 " %ld secs ago\n", xs, dt / 60000);
457 fChildren->SetOwner(
kTRUE);
470 fChildren =
new TList;
472 spid.Form(
"%d", pid);
485 TIter nxp(fChildren);
492 pid = waitpid(p->
GetVal(), &status, WNOHANG);
493 }
while (pid < 0 && errno == EINTR);
496 pid = _cwait(&status, (intptr_t)p->
GetVal(), 0);
498 if (pid > 0 && pid == p->
GetVal()) {
500 fChildren->Remove(p);
507 if (!fChildren || fChildren->GetSize() <= 0) {
522 Info (
"Notify",
"session idle for more then %lld secs: terminating",
Long64_t(
fTime)/1000);
528 Warning(
"Notify",
"problems updating session status (errno: %d)", -uss_rc);
532 msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n" 533 "// Please IGNORE any error message possibly displayed below\n//",
536 msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
544 Warning(
"Notify",
"fProofServ undefined!");
571 Printf(
"proofserv: command line testing: OK");
583 if (fgVirtMemMax < 0 && gSystem->Getenv(
"PROOF_VIRTMEMMAX")) {
585 if (mmx < kMaxLong && mmx > 0)
589 if (fgVirtMemMax < 0 && gSystem->Getenv(
"ROOTPROOFASHARD")) {
591 if (mmx < kMaxLong && mmx > 0)
596 if (fgResMemMax < 0 && gSystem->Getenv(
"PROOF_RESMEMMAX")) {
598 if (mmx < kMaxLong && mmx > 0)
606 Warning(
"TProofServ",
"requested memory fraction threshold to stop processing" 607 " (MemStop) out of range [0,1] - ignoring");
611 Warning(
"TProofServ",
"requested memory fraction threshold for warning and finer monitoring" 612 " (MemHWM) out of range [0,MemStop] - ignoring");
619 if ((
gEnv->
GetValue(
"Proof.GdbHook",0) == 3 && !test) ||
626 if (argc && *argc >= 4)
627 if (!strcmp(argv[3],
"test"))
631 if (argc && *argc < 2) {
632 Error(
"TProofServ",
"Must have at least 1 arguments (see proofd).");
719 TString logmx =
gEnv->
GetValue(
"ProofServ.LogFileMaxSize",
"");
720 if (!logmx.IsNull()) {
722 if (!logmx.IsDigit()) {
723 if (logmx.EndsWith(
"K")) {
726 }
else if (logmx.EndsWith(
"M")) {
729 }
if (logmx.EndsWith(
"G")) {
734 if (logmx.IsDigit()) {
740 Warning(
"TProofServ",
"bad formatted log file size limit ignored: '%s'", logmx.Data());
755 TString slog =
gEnv->
GetValue(
"ProofServ.LogToSysLog",
"");
756 if (!(slog.IsNull())) {
757 if (slog.IsDigit()) {
760 char c = (slog[0] ==
'M' || slog[0] ==
'm') ?
'm' :
'a';
761 c = (slog[0] ==
'W' || slog[0] ==
'w') ?
'w' : c;
768 Warning(
"TProofServ",
"request for syslog logging ineffective!");
783 if (enableSchemaEvolution) {
786 Info(
"TProofServ",
"automatic schema evolution in TMessage explicitly disabled");
799 if (opensock.Length() <= 0)
801 Int_t sock = opensock.Atoi();
803 Fatal(
"CreateServer",
"Invalid socket descriptor number (%d)", sock);
827 Info(
"CreateServer",
"Service %s ConfDir %s IsMaster %d\n",
841 TString pfx = (
IsMaster() ?
"Mst-" :
"Wrk-");
914 TString master =
"proof://__master__";
924 Error(
"CreateServer",
"no plugin manager found");
933 Error(
"CreateServer",
"no plugin found for TProof with a" 934 " config file of '%s'",
fConfFile.Data());
942 Error(
"CreateServer",
"plugin for TProof could not be loaded");
954 Error(
"CreateServer",
"plugin for TProof could not be executed");
977 msg.Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n" 978 " This may generate compatibility problems between streamed objects.\n" 979 " The advise is to move to ROOT >= 5.21/02 .");
991 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
993 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
1037 motdname +=
"/etc/proof/noproof";
1039 if ((motd = fopen(motdname,
"r"))) {
1042 while ((c = getc(motd)) != EOF)
1051 lastname = TString(
GetWorkDir()) +
"/.prooflast";
1054 Long_t id, flags, modtime, lasttime = 0;
1059 if (time(0) - lasttime > (time_t)86400)
1068 motdname +=
"/etc/proof/motd";
1071 if (modtime > lasttime || show) {
1072 if ((motd = fopen(motdname,
"r"))) {
1075 while ((c = getc(motd)) != EOF)
1085 Int_t fd = creat(last, 0600);
1086 if (fd >= 0)
close(fd);
1100 Error(
"Get",
"problems sending request");
1118 Error(
"Get",
"command %d cannot be executed while processing", what);
1119 }
else if (xrc == -2) {
1120 Error(
"Get",
"unknown command %d ! Protocol error?", what);
1138 Info(
"RestartComputeTime",
"compute time restarted after %f secs (%d entries)",
1167 Error(
"GetNextPacket",
"no progress status object");
1184 req << cacheSize << learnent;
1189 req << totalEntries;
1195 PDB(kLoop, 2) status->Print();
1196 Info(
"GetNextPacket",
"cacheSize: %lld, learnent: %d", cacheSize, learnent);
1204 << bytesRead << totalEntries;
1212 Error(
"GetNextPacket",
"Send() failed, returned %d", rc);
1220 Warning(
"GetNextPacket",
"problems saving partial results");
1231 Error(
"GetNextPacket",
"Recv() failed, returned %d", rc);
1236 TString
file, dir, obj;
1247 PDB(kLoop, 2)
Info(
"GetNextPacket",
"'%s' '%s' '%s' %lld %lld",
1248 e->GetFileName(), e->GetDirectory(),
1249 e->GetObjName(), e->GetFirst(),e->GetNum());
1251 PDB(kLoop, 2)
Info(
"GetNextPacket",
"Done");
1261 PDB(kLoop, 2)
Info(
"GetNextPacket:kPROOF_STOPPROCESS",
"received");
1267 Error(
"GetNextPacket",
"command %d cannot be executed while processing", what);
1268 }
else if (xrc == -2) {
1269 Error(
"GetNextPacket",
"unknown command %d ! Protocol error?", what);
1288 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3],
"test")) ?
kTRUE :
kFALSE;
1291 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1292 Printf(
"proofserv: command line testing: OK");
1296 if (!argc || (argc && *argc <= 1)) {
1297 Fatal(
"GetOptions",
"Must be started from proofd with arguments");
1301 if (!strcmp(argv[1],
"proofserv")) {
1304 }
else if (!strcmp(argv[1],
"proofslave")) {
1308 Fatal(
"GetOptions",
"Must be started as 'proofserv' or 'proofslave'");
1316 Fatal(
"GetOptions",
"ROOTCONFDIR shell variable not set");
1347 Error(
"HandleSocketInput",
"retrieving message from input socket");
1368 emsg.Form(
"HandleSocketInput: command %d cannot be executed while processing", what);
1369 }
else if (rc == -3) {
1370 emsg.Form(
"HandleSocketInput: message %d undefined! Protocol error?", what);
1372 emsg.Form(
"HandleSocketInput: unknown command %d! Protocol error?", what);
1375 }
else if (rc == 2) {
1379 Info(
"HandleSocketInput",
"message of type %d enqueued; sz: %d",
1389 Info(
"HandleSocketInput",
"processing enqueued message of type %d; left: %d",
1399 }
catch (std::bad_alloc &) {
1401 exmsg.Form(
"caught exception 'bad_alloc' (memory leak?) %s %lld",
1403 }
catch (std::exception &exc) {
1405 exmsg.Form(
"caught standard exception '%s' %s %lld",
1409 exmsg.Form(
"caught exception throwing %d %s %lld",
1411 }
catch (
const char *str) {
1413 exmsg.Form(
"caught exception throwing '%s' %s %lld",
1417 exmsg.Form(
"caught exception <unknown> %s %lld",
1422 if (!exmsg.IsNull()) {
1424 Error(
"HandleSocketInput",
"%s", exmsg.Data());
1435 exmsg.Form(
"high-memory footprint detected during Process(...) - terminating");
1436 Error(
"HandleSocketInput",
"%s", exmsg.Data());
1451 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1452 SendAsynMessage(
" *** No workers left: cannot continue! Terminating ... *** ");
1479 if (!mess)
return -3;
1483 Info(
"HandleSocketInput",
"processing message type %d from '%s'",
1488 Int_t rc = 0, lirc = 0;
1506 Info(
"HandleSocketInput:kMESS_CINT",
"processing: %s...", str);
1525 if (pslb) slb = str;
1557 sscanf(str,
"%d %u", &
fLogLevel, &mask);
1562 Info(
"HandleSocketInput:kPROOF_LOGLEVEL",
"debug level set to %d (mask: 0x%x)",
1593 Warning(
"HandleSocketInput:kPROOF_STATUS",
1594 "kPROOF_STATUS message is obsolete");
1596 Warning(
"HandleSocketInput:kPROOF_STATUS",
"problem sending of request");
1613 Info(
"HandleSocketInput:kPROOF_STOP",
"request for worker %s", ord.Data());
1617 Info(
"HandleSocketInput:kPROOF_STOP",
"got request to terminate");
1630 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
"enter");
1637 Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
1638 "recursive mode: enter %d, %ld", aborted, timeout);
1652 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_PROCESS",
"enter");
1662 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_SENDOUTPUT",
1663 "worker was asked to send output to master");
1666 Error(
"HandleSocketInput:kPROOF_SENDOUTPUT",
"problems sending output list");
1711 Info(
"HandleSocketInput:kPROOF_MAXQUERIES",
"Enter");
1723 Info(
"HandleSocketInput:kPROOF_CLEANUPSESSION",
"Enter");
1727 Printf(
"Session %s cleaned up", stag.Data());
1729 Printf(
"Could not cleanup session %s", stag.Data());
1739 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Enter");
1743 TString objname(
"undef");
1747 (*mess) >> isTree >> filename >> dir >> objname;
1748 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1749 "Report size of object %s (%s) in dir %s in file %s",
1750 objname.Data(), isTree ?
"T" :
"O",
1751 dir.Data(), filename.Data());
1753 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1754 "Found %lld %s", entries, isTree ?
"entries" :
"objects");
1759 answ << entries << objname;
1762 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Done");
1787 sscanf(str,
"%1023s %d %ld %d", name, &bin, &size, &fw);
1789 sscanf(str,
"%1023s %d %ld", name, &bin, &size);
1793 if (fnam.BeginsWith(
"cache:")) {
1803 if (!fnam.BeginsWith(
fCacheDir.Data())) {
1816 Info(
"HandleSocketInput",
"forwarding file: %s", fnam.Data());
1817 if (
fProof->
SendFile(fnam, opt, (copytocache ?
"cache" :
"")) < 0) {
1818 Error(
"HandleSocketInput",
"forwarding file: %s", fnam.Data());
1832 (*mess) >> start >> end;
1834 Info(
"HandleSocketInput:kPROOF_LOGFILE",
1835 "Logfile request - byte range: %d - %d", start, end);
1866 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_CACHE",
"enter");
1879 Warning(
"HandleSocketInput:kPROOF_WORKERLISTS",
1880 "Action meaning-less on worker nodes: protocol error?");
1891 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Enter");
1904 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1905 "adding a list of worker nodes returned: %d", ret);
1908 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1909 "getting list of worker nodes returned: %d", retVal);
1929 answ << (
TList *)info;
1935 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Done");
1946 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Enter");
1953 Error(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"could not create TProofPlayer instance!");
1956 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Done");
1959 answ << TString(
"Failed") << (
TObject *)0;
1966 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Enter");
1967 TList* outputList = 0;
1971 outputList =
new TList();
1973 outputList =
new TList();
1978 while ( (o = next()) ) {
1988 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Done");
1995 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Enter");
2001 else dset->Validate();
2008 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Done");
2018 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Enter");
2021 Long64_t totalbytes = 0, bytesready = 0;
2023 answ << dataready << totalbytes << bytesready;
2025 Error(
"HandleSocketInput:kPROOF_DATA_READY",
2026 "This message should not be sent to slaves");
2030 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Done");
2046 Error(
"HandleSocketInput",
"old client: no or incompatible dataset support");
2071 Info(
"HandleSocketInput:kPROOF_REALTIMELOG",
2072 "setting real-time logging %s", (on ?
"ON" :
"OFF"));
2095 Error(
"HandleSocketInput",
"no queries enqueued");
2108 Error(
"HandleSocketInput",
"adding a list of worker nodes returned: %d", ret);
2121 Error(
"HandleSocketInput",
"error getting list of worker nodes");
2123 Warning(
"HandleSocketInput",
"query was re-queued!");
2125 Error(
"HandleSocketInput",
"unexpected answer: %d", retVal);
2146 " idle or undefined player - ignoring");
2172 smsg.Form(
"Echo response from %s:%s: %s",
2180 TString tmpfn =
"echo-out-";
2183 Error(
"HandleSocketInput",
"Can't redirect output");
2198 smsg.Form(
"*** Echo response from %s:%s ***\n",
2204 while (( line = (
TObjString *)nextLine() )) {
2205 smsg.Append( line->String() );
2221 Error(
"HandleSocketInput",
"unknown command %d", what);
2246 Int_t mergedWorkers = 0;
2248 PDB(kSubmerger, 1)
Info(
"AcceptResults",
"enter");
2256 Int_t numworkers = 0;
2261 Info(
"AcceptResults",
"interrupt!");
2269 if (sw && sw != (
TSocket *)(-1)) {
2273 Info(
"AcceptResults",
"connection from a worker accepted on merger %s ",
2276 if (++numworkers >= connections)
2280 Info(
"AcceptResults",
"spurious signal found of merging socket");
2283 if (s->
Recv(mess) < 0) {
2284 Error(
"AcceptResults",
"problems receiving message");
2288 Info(
"AcceptResults",
"message received: %d ", (mess ? mess->
What() : 0));
2290 Error(
"AcceptResults",
"message received: %p ", mess);
2299 PDB(kSubmerger, 2)
Info(
"AcceptResults",
" type %d ", type);
2303 Info(
"AcceptResults",
2304 "a new worker has been mergerd. Total merged workers: %d",
2310 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"removing %p (has been merged)", o);
2313 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"%p not merged yet", o);
2321 for (
Int_t i =0; i< size; ++i){
2323 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"closing socket");
2330 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"exit: %d", result);
2340 Int_t n, nch, wasted = 0;
2342 const Int_t kBufSize = 1024;
2343 char waste[kBufSize];
2349 Info(
"HandleUrgentData",
"handling oob...");
2370 if (nch > kBufSize) nch = kBufSize;
2373 Error(
"HandleUrgentData",
"error receiving waste");
2378 Error(
"HandleUrgentData",
"error receiving OOB");
2384 Info(
"HandleUrgentData",
"got OOB byte: %d\n", oob_byte);
2391 Info(
"HandleUrgentData",
"*** Hard Interrupt");
2408 Error(
"HandleUrgentData",
"error sending OOB");
2419 if (nch > kBufSize) nch = kBufSize;
2422 Error(
"HandleUrgentData",
"error receiving waste (2)");
2432 Info(
"HandleUrgentData",
"Soft Interrupt");
2439 Error(
"HandleUrgentData",
"soft interrupt flushed stream");
2450 Info(
"HandleUrgentData",
"Shutdown Interrupt");
2461 Error(
"HandleUrgentData",
"unexpected OOB byte");
2481 Info(
"HandleSigPipe",
"keepAlive probe failed");
2490 Info(
"HandleSigPipe",
"keepAlive probe failed");
2526 TString sdir = (dir && strlen(dir) > 0) ? dir :
fSessionDir.Data();
2533 if ((freopen(logfile, mode, stdout)) == 0)
2534 SysError(
"RedirectOutput",
"could not freopen stdout (%s)", logfile);
2536 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2537 SysError(
"RedirectOutput",
"could not redirect stderr");
2539 if ((
fLogFile = fopen(logfile,
"r")) == 0)
2540 SysError(
"RedirectOutput",
"could not open logfile '%s'", logfile);
2544 Warning(
"RedirectOutput",
"no way to tell master (or client) where" 2545 " to upload packages");
2557 if (!dd.BeginsWith(
"proofserv")) {
2558 Int_t ic = dd.Index(
":");
2560 dd.Replace(0, ic,
"proofserv");
2584 if (size <= 0)
return 0;
2587 Int_t fd =
open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2589 SysError(
"ReceiveFile",
"error opening file %s", file);
2593 const Int_t kMAXBUF = 16384;
2594 char buf[kMAXBUF], cpy[kMAXBUF];
2599 while (filesize < size) {
2600 left =
Int_t(size - filesize);
2612 Int_t k = 0, i = 0, j = 0;
2619 cpy[j++] = buf[i++];
2623 w = write(fd, q, r);
2625 w = write(fd, p, r);
2629 SysError(
"ReceiveFile",
"error writing to file %s", file);
2637 Error(
"ReceiveFile",
"error during receiving file %s", file);
2645 if (chmod(file, 0644) != 0)
2646 Warning(
"ReceiveFile",
"error setting mode 0644 on file %s", file);
2685 off_t ltot=0, lnow=0;
2690 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2693 if (ltot >= 0 && lnow >= 0) {
2696 if (end <= start || end > ltot)
2698 left = (
Int_t)(end - start);
2703 left = (
Int_t)(ltot - lnow);
2710 SysError(
"SendLogFile",
"error sending kPROOF_LOGFILE");
2714 const Int_t kMAXBUF = 32768;
2716 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2719 while ((len = read(
fLogFileDes, buf, wanted)) < 0 &&
2724 SysError(
"SendLogFile",
"error reading log file");
2728 if (end == ltot && len == wanted)
2732 SysError(
"SendLogFile",
"error sending log file");
2738 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2740 }
while (len > 0 && left > 0);
2744 if (adhoc && lnow >=0 )
2751 mess << status << (
Int_t) 1;
2754 SysError(
"SendLogFile",
"error sending kPROOF_LOGDONE");
2758 PDB(kGlobal, 1)
Info(
"SendLogFile",
"kPROOF_LOGDONE sent");
2775 mess << bytesread << realtime << cputime << workdir;
2777 mess << TString(gProofServ->
GetImage());
2786 Int_t nparallel = 0;
2789 Info(
"SendParallel",
"Will invoke AskParallel()");
2792 Info(
"SendParallel",
"Will invoke GetParallel()");
2799 mess << nparallel << async;
2818 Error(
"Setup",
"failed to send proof server startup message");
2826 Error(
"Setup",
"failed to receive remote proof protocol");
2830 Error(
"Setup",
"failed to send local proof protocol");
2838 Error(
"Setup",
"OldAuthSetup: failed to setup authentication");
2857 Error(
"Setup",
"failed to receive ordinal and config info");
2865 if (fWorkDir.IsNull())
2879 conffile.Remove(0, 1 + conffile.Index(
":"));
2886 if (tmpWorkDir !=
"")
2890 Info(
"Setup",
"invalid config file %s (missing or unreadable",
2916 Info(
"Setup",
"working directory set to %s",
fWorkDir.Data());
2920 if (host.Index(
".") !=
kNPOS)
2921 host.Remove(host.Index(
"."));
2938 Error(
"Setup",
"common setup failed");
2970 TString paths =
gEnv->
GetValue(
"ProofServ.BinPaths",
"");
2971 if (paths.Length() > 0) {
2973 if (paths.Contains(
"^<compiler>"))
2975 else if (paths.Contains(
"<compiler>"))
2979 TString compiler = COMPILER;
2980 if (compiler.Index(
"is ") !=
kNPOS)
2981 compiler.Remove(0, compiler.Index(
"is ") + 3);
2984 if (!bindir.IsNull()) bindir +=
":";
2986 }
else if (icomp == -1) {
2987 if (!path.IsNull()) path +=
":";
2993 if (paths.Contains(
"^<sysbin>"))
2995 else if (paths.Contains(
"<sysbin>"))
2999 if (!bindir.IsNull()) bindir +=
":";
3000 bindir +=
"/bin:/usr/bin:/usr/local/bin";
3001 }
else if (isysb == -1) {
3002 if (!path.IsNull()) path +=
":";
3003 path +=
"/bin:/usr/bin:/usr/local/bin";
3008 if (!bindir.IsNull()) bindir +=
":";
3009 path.Insert(0, bindir);
3016 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3025 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3042 Info(
"SetupCommon",
"cache directory set to %s",
fCacheDir.Data());
3046 TString(
fCacheDir).ReplaceAll(
"/",
"%").Data()));
3051 TString packdir =
gEnv->
GetValue(
"ProofServ.PackageDir",
3060 const char *k = (
IsMaster()) ?
"Mst" :
"Wrk";
3061 noth.Form(
"%s-%s", k,
fOrdinal.Data());
3064 Info(
"SetupCommon",
"package directory set to %s", packdir.Data());
3069 if (fDataDir.IsNull()) {
3072 }
else if ((isep = fDataDir.Last(
' ')) !=
kNPOS) {
3074 fDataDir.Remove(isep);
3079 Warning(
"SetupCommon",
"problems creating path '%s' (errno: %d)",
3083 Info(
"SetupCommon",
"data directory set to %s", fDataDir.Data());
3087 TString dataDirOpts =
gEnv->
GetValue(
"ProofServ.DataDirOpts",
"");
3088 if (!dataDirOpts.IsNull()) {
3091 if ((
IsMaster() && !dataDirOpts.Contains(
"M")) ||
3096 if (dataDirOpts.Contains(
"g")) m = 0775;
3097 if (dataDirOpts.Contains(
"a") || dataDirOpts.Contains(
"o")) m = 0777;
3099 Info(
"SetupCommon",
"requested mode for data directories is '%o'", m);
3104 if (fDataDir.BeginsWith(
"/")) p =
"/";
3105 while (fDataDir.Tokenize(subp, from,
"/")) {
3106 if (subp.IsNull())
continue;
3111 Warning(
"SetupCommon",
"problems setting mode '%o' on path '%s' (errno: %d)",
3118 Warning(
"SetupCommon",
"problems stat-ing path '%s' (errno: %d; datadir: %s)",
3127 TString globpack =
gEnv->
GetValue(
"Proof.GlobalPackageDirs",
"");
3131 Info(
"SetupCommon",
" %d global package directories registered", nglb);
3140 Error(
"SetupCommon",
"can not change to working directory '%s'",
3169 TString(
fQueryDir).ReplaceAll(
"/",
"%").Data()));
3189 TString dsms =
gEnv->
GetValue(
"Proof.DataSetManager",
"");
3190 if (!dsms.IsNull()) {
3193 while (dsms.Tokenize(dsm, from,
",")) {
3195 Warning(
"SetupCommon",
"a valid dataset manager already initialized");
3196 Warning(
"SetupCommon",
"support for multiple managers not yet available");
3200 if (
gROOT->GetPluginManager()) {
3202 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
3207 fUser.Data(), dsm.Data()));
3213 Warning(
"SetupCommon",
"dataset manager plug-in initialization failed");
3214 SendAsynMessage(
"TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3219 TString opts(
"Av:");
3220 TString dsetdir =
gEnv->
GetValue(
"ProofServ.DataSetDir",
"");
3221 if (dsetdir.IsNull()) {
3230 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
3235 TString oo =
TString::Format(
"dir:%s opt:%s", dsetdir.Data(), opts.Data());
3237 fGroup.Data(), fUser.Data(), oo.Data()));
3240 Warning(
"SetupCommon",
"default dataset manager plug-in initialization failed");
3245 TString dsReqCfg =
gEnv->
GetValue(
"Proof.DataSetStagingRequests",
"");
3246 if (!dsReqCfg.IsNull()) {
3247 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
3249 if (reReqDir.
Match(dsReqCfg) == 5) {
3251 dsDirFmt.Form(
"dir:%s perms:open", reReqDir[3].Data());
3257 "failed init of dataset staging requests repository");
3262 "specify, with [dir:]<path>, a valid path for staging requests");
3265 Warning(
"SetupCommon",
"no repository for staging requests available");
3271 if (quotas.IsNull())
3273 if (quotas.IsNull())
3275 if (!quotas.IsNull()) {
3279 while (quotas.Tokenize(tok, from,
" ")) {
3281 if (tok.BeginsWith(
"maxquerykept=")) {
3282 tok.ReplaceAll(
"maxquerykept=",
"");
3287 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3290 const char *ksz[2] = {
"hwmsz=",
"maxsz="};
3291 for (
Int_t j = 0; j < 2; j++) {
3292 if (tok.BeginsWith(ksz[j])) {
3293 tok.ReplaceAll(ksz[j],
"");
3295 if (!tok.IsDigit()) {
3298 const char *
s[3] = {
"k",
"m",
"g"};
3299 Int_t i = 0, ki = 1024;
3301 if (tok.EndsWith(s[i++]))
3306 tok.Remove(tok.Length()-1);
3308 if (tok.IsDigit()) {
3310 fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3312 fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3314 TString ssz(ksz[j], strlen(ksz[j])-1);
3315 Info(
"SetupCommon",
"parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3325 Warning(
"SetupCommon",
"problems applying fMaxQueries");
3329 TString vac =
gROOT->GetVersion();
3331 TString rtag =
gEnv->
GetValue(
"ProofServ.RootVersionTag",
"");
3332 if (rtag.Length() > 0)
3344 while (all_vars.Tokenize(name, from,
",")) {
3345 if (!name.IsNull()) {
3355 }
else if (!(
fUser.IsNull()) &&
fGroup.IsNull()) {
3357 }
else if (
fUser.IsNull() && !(
fGroup.IsNull())) {
3367 Info(
"SetupCommon",
"successfully completed");
3387 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3421 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.Data());
3428 while ((fh = next())) {
3429 TProofServInputHandler *ih =
dynamic_cast<TProofServInputHandler *
>(fh);
3446 if (!path || strlen(path) <= 0)
return kFALSE;
3452 const char *ent = 0;
3454 if (!strcmp(ent,
".") || !strcmp(ent,
".."))
continue;
3455 fpath.Form(
"%s/%s", path, ent);
3472 Warning(
"UnlinkDataDir",
"data directory '%s' is empty but could not be removed", path);
3503 if (!oldAuthSetupHook) {
3505 TString authlib =
"libRootAuth";
3511 Error(
"OldAuthSetup",
"can't load %s",authlib.Data());
3515 Error(
"OldAuthSetup",
"can't locate %s",authlib.Data());
3524 Error(
"OldAuthSetup",
"can't find OldProofServAuthSetup");
3540 TDSet *dset,
const char *selec,
3557 fst, dset, selec, elist);
3575 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3579 Info(
"SetQueryRunning",
"starting query: %d", pq->
GetSeqNum());
3582 TString parlist =
"";
3607 Info(
"HandleArchive",
"Enter");
3611 (*mess) >> queryref >> path;
3613 if (slb) slb->Form(
"%s %s", queryref.Data(), path.Data());
3616 if (queryref ==
"Default") {
3618 Info(
"HandleArchive",
3628 if (path.Length() <= 0) {
3630 Info(
"HandleArchive",
3631 "archive paths are not defined - do nothing");
3635 path.Form(
"%s/session-%s-%d.root",
3639 path.ReplaceAll(
":q",
"-");
3646 if (!pqr || qry < 0) {
3647 TString fout = qdir;
3648 fout +=
"/query-result.root";
3654 TIter nxk(f->GetListOfKeys());
3656 while ((k = (
TKey *)nxk())) {
3657 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
3666 Info(
"HandleArchive",
3667 "file cannot be open (%s)",fout.Data());
3674 PDB(kGlobal, 1)
Info(
"HandleArchive",
3675 "archive path for query #%d: %s",
3682 if (!farc || !(farc->IsOpen())) {
3683 Info(
"HandleArchive",
3684 "archive file cannot be open (%s)",path.Data());
3698 if (qry > -1 &&
fQMgr)
3702 Info(
"HandleArchive",
3703 "results of query %s archived to file %s",
3704 queryref.Data(), path.Data());
3723 emsg.Form(
"file collection undefined!");
3734 TUrl *xurl = fiind->GetCurrentUrl();
3766 Info(
"HandleProcess",
"Enter");
3773 TString filename, opt;
3780 (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3791 if ((!hasNoData) && elist)
3801 fPrefix.Data(), emsg.Data()));
3802 Error(
"HandleProcess",
"AssertDataSet: %s", emsg.Data());
3807 }
else if (hasNoData) {
3812 if (!dsn.Contains(
":") || dsn.BeginsWith(
"dataset:")) {
3813 dsn.ReplaceAll(
"dataset:",
"");
3817 emsg.Form(
"dataset manager not initialized!");
3822 emsg.Form(
"requested dataset '%s' does not exists", dsn.Data());
3829 fcmap->
SetName(
"PROOF_FilesToProcess");
3834 if (!emsg.IsNull()) {
3836 fPrefix.Data(), emsg.Data()));
3837 Error(
"HandleProcess",
"%s", emsg.Data());
3853 if (dset) input->
Add(dset);
3854 if (elist) input->
Add(elist);
3858 input->
Clear(
"nodelete");
3863 Warning(
"HandleProcess",
"could not save input data: %s", emsg.Data());
3889 Error(
"HandleProcess",
"error getting list of worker nodes");
3896 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3898 Error(
"HandleProcess",
"Adding a list of worker nodes returned: %d",
3907 Error(
"HandleProcess",
"error getting list of worker nodes");
3914 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3916 Error(
"HandleProcess",
"unknown return value: %d", retVal);
3927 if (!sync || enqueued) {
3935 Info(
"HandleProcess",
4002 Warning(
"HandleProcess",
"could not get input data: %s", emsg.Data());
4006 Warning(
"HandleProcess",
"could not get query sequential number!");
4010 while ((nord = input->
FindObject(
"PROOF_Ordinal")))
4017 while ((o = next())) {
4018 PDB(kGlobal, 2)
Info(
"HandleProcess",
"adding: %s", o->
GetName());
4026 while ((obj = nxt())){
4030 Info(
"HandleProcess",
"selector obj for '%s' found", selector_obj->
ClassName());
4046 Info(
"HandleProcess",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4050 Info(
"HandleProcess",
"calling fPlayer->Process() with selector name: %s", filename.Data());
4062 m << status << abort;
4075 Info(
"TProofServ::Handleprocess",
4076 "worker %s has finished processing with %d objects in output list",
4096 if (!isSubMerging) {
4116 Info(
"HandleProcess",
"controlled mode: worker %s has finished," 4117 " sizes sent to master",
fOrdinal.Data());
4123 Info(
"HandleProcess",
"submerging disabled because of high-memory case");
4126 PDB(kGlobal, 2)
Info(
"HandleProcess",
"merging mode check: %d", isSubMerging);
4142 Int_t merge_port = 0;
4145 Info(
"HandleProcess",
"possible port for merging connections: %d",
4149 msg_osize << merge_port;
4158 PDB(kSubmerger, 2)
Info(
"HandleProcess",
"worker %s has finished",
fOrdinal.Data());
4163 PDB(kGlobal, 2)
Info(
"HandleProcess",
"sending result directly to master");
4165 Warning(
"HandleProcess",
"problems sending output list");
4187 Warning(
"HandleProcess",
"the output list is empty!");
4189 Warning(
"HandleProcess",
"problems sending output list");
4206 while ((obj = nex())) {
4218 TFile *
f =
dynamic_cast<TFile *
>(added->
Last());
4222 while ((o = nxo())) { input->
Remove(o); }
4239 PDB(kGlobal, 1)
Info(
"HandleProcess",
"done");
4250 PDB(kOutput, 2)
Info(
"SendResults",
"enter");
4261 msg.Form(
"%s: merging output objects ... done ",
4265 msg.Form(
"%s: objects merged; sending output: %d objs",
fPrefix.Data(), olsz);
4270 if (sock->
Send(mbuf) < 0)
return -1;
4276 Int_t totsz = 0, objsz = 0;
4278 while ((o = nxo())) {
4282 "message has %d bytes: limit of %lld bytes reached - sending ...",
4295 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4299 if (sock->
Send(mbuf) < 0)
return -1;
4306 mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4321 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4325 if (sock->
Send(mbuf) < 0)
return -1;
4329 msg.Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4341 msg.Form(
"%s: merging output objects ... done ",
4345 msg.Form(
"%s: objects merged; sending output: %d objs",
fPrefix.Data(), olsz);
4350 if (sock->
Send(mbuf) < 0)
return -1;
4354 Int_t totsz = 0, objsz = 0;
4357 while ((o = nxo())) {
4374 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4378 if (sock->
Send(mbuf) < 0)
return -1;
4383 msg.Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4397 msg.Form(
"%s: sending output: %d objs, %d bytes",
fPrefix.Data(), olsz, blen);
4399 if (sock->
Send(mbuf) < 0)
return -1;
4403 PDB(kGlobal, 2)
Info(
"SendResults",
"sending output list");
4405 PDB(kGlobal, 2)
Info(
"SendResults",
"notifying failure or abort");
4410 PDB(kOutput,2)
Info(
"SendResults",
"done");
4423 TString filename, opt;
4450 Ssiz_t id = opt.Last(
'#');
4451 if (
id !=
kNPOS &&
id < opt.Length() - 1) {
4452 filename += opt(
id + 1, opt.Length());
4462 Error(
"ProcessNext",
"no TDset object: cannot continue");
4484 while ((obj = nxt())){
4488 Info(
"ProcessNext",
"found object for selector '%s'", obj->
ClassName());
4495 Error(
"ProcessNext",
"empty waiting queries list!");
4534 input->
Add(
new TNamed(
"PROOF_QueryTag", qid.Data()));
4545 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"PROOF_UseMergers set to %d", smg);
4551 if ((o = input->
FindObject(
"PROOF_MergersByHost"))) { input->
Remove(o);
delete o; }
4553 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"submergers setup by host/node");
4562 while ((o = next())) {
4573 Info(
"ProcessNext",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4577 Info(
"ProcessNext",
"calling fPlayer->Process() with selector name: %s", filename.Data());
4592 m << status << abort;
4608 Warning(
"ProcessNext",
"problems registering produced datasets: %s", emsg.Data());
4629 while ((xo = nxo())) {
4639 if (oopt.BeginsWith(
"of:")) {
4640 oopt.Replace(0, 3,
"");
4651 Info(
"ProcessNext",
"adding info about dataset '%s' in the light query result", dset->
GetName());
4658 Info(
"ProcessNext",
"sending results");
4661 Warning(
"ProcessNext",
"problems sending output list");
4666 Warning(
"ProcessNext",
"the output list is empty!");
4668 Warning(
"ProcessNext",
"problems sending output list");
4678 if (!(pq->
IsDraw()) && pqr) {
4705 ::
Info(
"TProofServ::RegisterDataSets",
4706 "enter: %d objs in the output list", (out ? out->
GetSize() : -1));
4708 if (!in || !out || !dsm) {
4709 ::Error(
"TProofServ::RegisterDataSets",
"invalid inputs: %p, %p, %p", in, out, dsm);
4717 while ((o = nxo())) {
4735 if (regopt.Contains(
":sortidx:")) {
4737 regopt.ReplaceAll(
":sortidx:",
"");
4744 const char *vfmsg = regopt.Contains(
"V") ?
" and verifying" :
"";
4745 msg.Form(
"Registering%s dataset '%s' ... ", vfmsg, ds->
GetName());
4754 ::Warning(
"TProofServ::RegisterDataSets",
4755 "failure registering or verifying dataset '%s'", ds->
GetName());
4756 msg.Form(
"Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->
GetName());
4758 ::Info(
"TProofServ::RegisterDataSets",
"dataset '%s' successfully registered%s",
4759 ds->
GetName(), (strlen(vfmsg) > 0) ?
" and verified" :
"");
4760 msg.Form(
"Registering%s dataset '%s' ... OK", vfmsg, ds->
GetName());
4767 ::Info(
"TProofServ::RegisterDataSets",
"printing collection");
4771 ::Warning(
"TProofServ::RegisterDataSets",
"collection '%s' is empty", o->
GetName());
4774 ::Info(
"TProofServ::RegisterDataSets",
"dataset registration not allowed");
4781 while ((o = nxrm())) out->
Remove(o);
4785 while((o = nxtg())) {
4791 PDB(kDataset, 1) ::
Info(
"TProofServ::RegisterDataSets",
"exit");
4802 Info(
"HandleQueryList",
"Enter");
4808 Int_t ntot = 0, npre = 0, ndraw= 0;
4813 Int_t idx = qdir.Index(
"session-");
4841 Warning(
"HandleQueryList",
"unable to clone TProofQueryResult '%s:%s'",
4842 pqr->GetName(), pqr->GetTitle());
4851 m << npre << ndraw << ql;
4865 Info(
"HandleRemove",
"Enter");
4868 (*mess) >> queryref;
4870 if (slb) *slb = queryref;
4872 if (queryref ==
"cleanupqueue") {
4876 Info(
"HandleRemove",
"%d queries removed from the waiting list", pend);
4881 if (queryref ==
"cleanupdir") {
4887 Info(
"HandleRemove",
"%d directories removed", nd);
4912 Warning(
"HandleRemove",
"query result manager undefined!");
4916 Info(
"HandleRemove",
4917 "query %s could not be removed (unable to lock session)", queryref.Data());
4929 Info(
"HandleRetrieve",
"Enter");
4932 (*mess) >> queryref;
4934 if (slb) *slb = queryref;
4941 TString fout = qdir;
4942 fout +=
"/query-result.root";
4948 TIter nxk(f->GetListOfKeys());
4950 while ((k = (
TKey *)nxk())) {
4951 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
4959 if ((d = dynamic_cast<TDSet *>(o)))
4968 static const char *clb[4] = {
"bytes",
"KB",
"MB",
"GB" };
4969 while (qsz > 1000. && ilb < 3) {
4978 Info(
"HandleRetrieve",
4979 "query not found in file %s",fout.Data());
4989 Info(
"HandleRetrieve",
4990 "file cannot be open (%s)",fout.Data());
5009 (*mess) >> type >> add >> path;
5013 if ((type !=
"lib") && (type !=
"inc")) {
5014 Error(
"HandleLibIncPath",
"unknown action type: %s", type.Data());
5019 path.ReplaceAll(
",",
" ");
5023 if (path.Length() > 0 && path !=
"-") {
5024 if (!(op = path.Tokenize(
" "))) {
5025 Error(
"HandleLibIncPath",
"decomposing path %s", path.Data());
5032 if (type ==
"lib") {
5039 TString xlib = lib->
GetName();
5046 if (newlibpath.BeginsWith(
".:"))
5048 if (newlibpath.Index(xlib) ==
kNPOS) {
5053 Info(
"HandleLibIncPath",
5054 "libpath %s does not exist or cannot be read - not added", xlib.Data());
5069 TString xinc = inc->
GetName();
5074 if (curincpath.Index(xinc) ==
kNPOS)
5077 Info(
"HandleLibIncPath",
5078 "incpath %s does not exist or cannot be read - not added", xinc.Data());
5089 if (type ==
"lib") {
5096 TString xlib = lib->
GetName();
5117 newincpath.ReplaceAll(
gInterpreter->GetIncludePath(),
"");
5142 (*mess) >> filenam >> md5;
5146 if (slb) *slb = filenam;
5148 if (filenam.BeginsWith(
"-")) {
5154 TString packnam = filenam;
5155 packnam.Remove(packnam.Length() - 4);
5158 if (md5local && md5 == (*md5local)) {
5161 Error(
"HandleCheckFile",
"failure cleaning %s", packnam.Data());
5169 Info(
"HandleCheckFile",
5170 "package %s installed on node", filenam.Data());
5173 Error(
"HandleCheckFile",
"gunzip not found");
5175 Error(
"HandleCheckFile",
"package %s did not unpack into %s",
5176 filenam.Data(), packnam.Data());
5187 Error(
"HandleCheckFile",
5188 "package %s not yet on node", filenam.Data());
5206 Info(
"HandleCheckFile",
5207 "problems uploading package %s", parpath.Data());
5212 }
else if (filenam.BeginsWith(
"+") || filenam.BeginsWith(
"=")) {
5213 filenam.Remove(0,1);
5215 TString parname = filenam;
5222 "problems installing package %s", filenam.Data());
5230 if (md5local && md5 == (*md5local)) {
5234 Info(
"HandleCheckFile",
5235 "package %s already on node", parname.Data());
5238 TString par = filenam;
5245 "problems uploading package %s", par.Data());
5253 Info(
"HandleCheckFile",
5254 "package %s not yet on node", filenam.Data());
5261 TString cachef =
fCacheDir +
"/" + filenam;
5265 if (md5local && md5 == (*md5local)) {
5268 Info(
"HandleCheckFile",
"file %s already on node", filenam.Data());
5273 Info(
"HandleCheckFile",
"file %s not yet on node", filenam.Data());
5287 Info(
"HandleCache",
"Enter");
5299 const char *k = (
IsMaster()) ?
"Mst" :
"Wrk";
5300 noth.Form(
"%s-%s", k,
fOrdinal.Data());
5303 TString packagedir, package, pdir, ocwd,
file;
5308 printf(
"*** File cache %s:%s ***\n",