43 # include <sys/stat.h> 44 # include <sys/types.h> 45 # include "snprintf.h" 51 #include "RConfigure.h" 121 if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
124 fProof->StopProcess(
kTRUE);
129 if (fProof->GetRemoteProtocol() < 22) {
130 a = Getline(
"\nSwitch to asynchronous mode not supported remotely:" 131 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
133 a = Getline(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit," 134 " any other key to continue: ");
136 if (a[0] ==
'Q' || a[0] ==
'S' || a[0] ==
'q' || a[0] ==
's') {
138 Info(
"Notify",
"Processing interrupt signal ... %c", a[0]);
142 fProof->StopProcess(abort);
144 }
else if ((a[0] ==
'A' || a[0] ==
'a') && fProof->GetRemoteProtocol() >= 22) {
146 fProof->GoAsynchronous();
159 fSocket(s), fProof(p)
186 if (!si)
return fOrdinal.CompareTo(obj->GetName());
188 const char *myord = GetOrdinal();
190 while (myord && otherord) {
191 Int_t myval = atoi(myord);
192 Int_t otherval = atoi(otherord);
193 if (myval < otherval)
return 1;
194 if (myval > otherval)
return -1;
195 myord = strchr(myord,
'.');
197 otherord = strchr(otherord,
'.');
198 if (otherord) otherord++;
200 if (myord)
return -1;
201 if (otherord)
return 1;
213 return (strcmp(GetOrdinal(), si->
GetOrdinal()) == 0);
224 TString stat = fStatus == kActive ?
"active" :
225 fStatus == kBad ?
"bad" :
234 if (oo ==
"active" && fStatus != kActive)
return;
235 if (oo ==
"notactive" && fStatus != kNotActive)
return;
236 if (oo ==
"bad" && fStatus != kBad)
return;
240 if (!(fMsd.IsNull())) msd.
Form(
"| msd: %s ", fMsd.Data());
241 if (!(fDataDir.IsNull())) datadir.
Form(
"| datadir: %s ", fDataDir.Data());
242 if (fSysInfo.fCpus > 0) {
243 si.
Form(
"| %s, %d cores, %d MB ram", fHostName.Data(),
244 fSysInfo.fCpus, fSysInfo.fPhysRam);
246 si.
Form(
"| %s", fHostName.Data());
253 std::cout <<
"Slave: " << fOrdinal
254 <<
" hostname: " << fHostName
256 <<
" perf index: " << fPerfIndex
267 fSysInfo.fOS = si.
fOS;
268 fSysInfo.fModel = si.
fModel;
270 fSysInfo.fCpus = si.
fCpus;
288 fWorkers->SetOwner(
kFALSE);
297 if (AreAllWorkersMerged())
298 Error(
"SetMergedWorker",
"all workers have been already merged before!");
309 fWorkers =
new TList();
310 if (fWorkersToMerge == fWorkers->GetSize()) {
311 Error(
"AddWorker",
"all workers have been already assigned to this merger");
322 return (fWorkersToMerge == fMergedWorkers);
333 return (fWorkers->GetSize() == fWorkersToMerge);
365 if( 0 == _cluster->
Length() ) {
366 Error(
"PoDCheckUrl",
"PoD server is not running");
406 if (!masterurl || strlen(masterurl) <= 0) {
409 }
else if (!(strstr(masterurl,
"://"))) {
446 }
else if (
fMaster ==
"prooflite") {
455 Init(masterurl, conffile, confdir, loglevel, alias);
460 if (
Exec(
"gProofServ->GetUser()",
"0",
kTRUE) == 0) {
467 emsg =
"could not find 'const char *' string in macro log";
470 emsg =
"could not retrieve user info";
479 Warning(
"TProof",
"%s: using local default %s", emsg.
Data(), usr.
Data());
489 gROOT->GetListOfSockets()->Remove(mgr);
490 gROOT->GetListOfSockets()->Add(mgr);
495 if (!
gROOT->GetListOfProofs()->FindObject(
this))
496 gROOT->GetListOfProofs()->Add(
this);
515 if (!
gROOT->GetListOfProofs()->FindObject(
this))
516 gROOT->GetListOfProofs()->Add(
this);
623 while (envs.Tokenize(env, from,
",")) {
626 Warning(
"Init",
"request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
628 if (!envsfound.IsNull()) envsfound +=
",";
635 if (envsfound.IsNull()) {
636 Warning(
"Init",
"none of the requested env variables were found: '%s'", envs.Data());
638 Info(
"Init",
"the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
718 gROOT->GetListOfProofs()->Remove(
this);
723 if (gProof && gProof ==
this) {
726 while ((gProof = (
TProof *)pvp())) {
734 Emit(
"CloseWindow()");
746 const char *confdir,
Int_t loglevel,
const char *alias)
770 if (!conffile || !conffile[0])
772 if (!confdir || !confdir[0])
828 Error(
"Init",
"could not create temporary logfile");
830 Error(
"Init",
"could not open temp logfile for reading");
892 if (enableSchemaEvolution) {
895 Info(
"TProof",
"automatic schema evolution in TMessage explicitly disabled");
905 Error(
"Init",
"failure asserting sandbox directory %s", sandbox.
Data());
914 Error(
"Init",
"failure asserting directory %s", packdir.
Data());
919 Info(
"Init",
"package directory set to %s", packdir.
Data());
928 Info(
"Init",
" %d global package directories registered", nglb);
950 GetRC(
"Proof.DynamicStartup", dyn);
967 if (s.IsDigit()) nwrk = s.
Atoi();
990 gROOT->GetListOfSockets()->Add(
this);
1015 }
else if (sb ==
"..") {
1038 const char *cq = (
IsLite()) ?
"\"" :
"";
1039 while (sconf.Tokenize(opt, from,
",")) {
1040 if (opt.
IsNull())
continue;
1047 TString mst, top, sub, wrk, all;
1062 if (all !=
"" && mst ==
"") mst = all;
1063 if (all !=
"" && top ==
"") top = all;
1064 if (all !=
"" && sub ==
"") sub = all;
1065 if (all !=
"" && wrk ==
"") wrk = all;
1066 if (all !=
"" && all.
BeginsWith(
"valgrind_opts:")) {
1068 Info(
"ParseConfigField",
"valgrind run: resetting 'PROOF_WRAPPERCMD':" 1069 " must be set again for next run , if any");
1073 cmd.
Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1074 TString mstlab(
"NO"), wrklab(
"NO");
1075 Bool_t doMaster = (opt ==
"valgrind" || (opt.
Contains(
"master") &&
1081 if (mst ==
"" || mst.
BeginsWith(
"valgrind_opts:")) {
1083 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), mst.
Data());
1086 }
else if (mst !=
"") {
1092 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1094 if (!opt.
Contains(
"workers"))
return;
1096 if (opt ==
"valgrind" || opt ==
"valgrind=") opt =
"valgrind=workers";
1101 if (top ==
"" || top.
BeginsWith(
"valgrind_opts:")) {
1103 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), top.
Data());
1106 }
else if (top !=
"") {
1112 if (sub ==
"" || sub.
BeginsWith(
"valgrind_opts:")) {
1114 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), sub.
Data());
1117 }
else if (sub !=
"") {
1123 if (wrk ==
"" || wrk.
BeginsWith(
"valgrind_opts:")) {
1125 var.
Form(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.
Data(), wrk.
Data(), cq);
1130 nwrks = opt(inw+1, opt.
Length());
1131 if (!nwrks.
IsDigit()) nwrks =
"2";
1143 }
else if (wrk !=
"") {
1157 Printf(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.
Data());
1159 Printf(
" ---> Starting a debug run with valgrind (workers:%s)", wrklab.
Data());
1161 Printf(
" ---> Please be patient: startup may be VERY slow ...");
1162 Printf(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1163 Printf(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1172 Printf(
"*** Requested IgProf performance profiling ***");
1173 TString addLogExt =
"__igprof.pp__.log";
1174 TString addLogFmt =
"igprof -pk -pp -t proofserv.exe -o %s.%s";
1182 tmp.
Form(addLogFmt.
Data(),
"<logfilemst>", addLogExt.
Data());
1185 tmp.
Form(addLogFmt.
Data(),
"<logfilewrk>", addLogExt.
Data());
1211 if ((c !=
'+') && ((c < '0') || (c >
'9')))
1230 if (
IsLite() && cpuPin) {
1231 Printf(
"*** Requested CPU pinning ***");
1233 const char *pinCmd =
"taskset -c <cpupin>";
1236 if (ev && (p = dynamic_cast<TNamed *>(ev->
FindObject(
"PROOF_SLAVE_WRAPPERCMD")))) {
1242 val.
Form(
"\"%s\"", pinCmd);
1255 if (!inpath || strlen(inpath) <= 0) {
1256 Error(
"AssertPath",
"undefined input path");
1265 Error(
"AssertPath",
"could not create path %s", path.
Data());
1272 Error(
"AssertPath",
"could not make path %s writable", path.
Data());
1291 gROOT->GetListOfSockets()->Remove(mgr);
1292 gROOT->GetListOfSockets()->Add(mgr);
1307 Error(
"AddWorkers",
"AddWorkers can only be called on the master!");
1311 if (!workerList || !(workerList->
GetSize())) {
1312 Error(
"AddWorkers",
"empty list of workers!");
1333 if (!addedWorkers) {
1335 Error(
"AddWorkers",
"cannot create new list for the workers to be added");
1343 while ((to = next())) {
1370 if (wn ==
"localhost" || wn.BeginsWith(
"localhost.")) wn =
gSystem->
HostName();
1392 addedWorkers->
Add(slave);
1400 Info(
"AddWorkers",
"worker on host %s created" 1406 m <<
TString(
"Opening connections to workers") << nSlaves
1407 << nSlavesDone << slaveOk;
1421 TIter nxsl(addedWorkers);
1423 while ((sl = (
TSlave *) nxsl())) {
1434 Info(
"AddWorkers",
"worker on host %s finalized" 1444 m <<
TString(
"Setting up worker servers") << nSlaves
1445 << nSlavesDone << slaveOk;
1458 if (s.IsDigit()) nwrk = s.
Atoi();
1464 Info(
"AddWorkers",
"will invoke GoMoreParallel()");
1467 Info(
"AddWorkers",
"GoMoreParallel()=%d", nw);
1473 Info(
"AddWorkers",
"will invoke GoParallel()");
1482 Info(
"AddWorkers",
"will invoke SaveWorkerInfo()");
1488 Info(
"AddWorkers",
"will invoke SendParallel()");
1491 if (goMoreParallel &&
fPlayer) {
1496 Info(
"AddWorkers",
"will send the PROCESS message to selected workers");
1504 delete addedWorkers;
1516 if (packs && packs->
GetSize() > 0) {
1519 while ((pck = (
TPair *) nxp())) {
1525 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on added workers");
1530 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on all workers");
1543 Info(
"SetupWorkersEnv",
"will invoke Load() on selected workers");
1552 dyn.ReplaceAll(
":",
" ");
1553 dyn.ReplaceAll(
"\"",
" ");
1555 Info(
"SetupWorkersEnv",
"will invoke AddDynamicPath() on selected workers");
1563 Info(
"SetupWorkersEnv",
"will invoke AddIncludePath() on selected workers");
1578 Error(
"RemoveWorkers",
"RemoveWorkers can only be called on the master!");
1588 while ((sl = (
TSlave *) nxsl())) {
1594 if (!(workerList->
GetSize())) {
1595 Error(
"RemoveWorkers",
"The list of workers should not be empty!");
1603 while ((to = next())) {
1605 if (!strcmp(to->
ClassName(),
"TProofNodeInfo")) {
1609 while ((sl = (
TSlave *) nxsl())) {
1617 Warning(
"RemoveWorkers",
"unknown object type: %s - it should be" 1618 " TProofNodeInfo or inheriting from TSlave", to->
ClassName());
1648 TString emsg(
"no resource currently available for this session: please retry later");
1660 Printf(
"Starting master: opening connection ...");
1666 fprintf(stderr,
"Starting master:" 1667 " connection open: setting up server ... \r");
1681 Printf(
"Starting master: OK ");
1687 Error(
"StartSlaves",
1688 "client and remote protocols not compatible (%d and %d)",
1707 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1710 if (slStatus == -99)
1711 Error(
"StartSlaves",
"no resources available or problems setting up workers (check logs)");
1712 else if (slStatus == -98)
1713 Error(
"StartSlaves",
"could not setup output redirection on master");
1715 Error(
"StartSlaves",
"setting up master");
1726 Error(
"StartSlaves",
1727 "failed to setup connection with PROOF master server");
1733 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1739 Printf(
"Starting master: failure");
1744 Printf(
"Starting master: OK ");
1749 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1762 Error(
"StartSlaves",
"failed to create (or connect to) the PROOF master server");
1778 { std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
1787 while ((sl = (
TSlave *)nxs()))
1801 gROOT->GetListOfSockets()->Remove(
this);
1813 gROOT->GetListOfProofs()->Remove(
this);
1814 if (gProof && gProof ==
this) {
1817 while ((gProof = (
TProof *)pvp())) {
1832 Int_t perf,
const char *image,
const char *workdir)
1854 const char *image,
const char *msd,
Int_t nwk)
1874 while ((sl = (
TSlave *)next())) {
1900 while (
TSlave *sl = dynamic_cast<TSlave*>(next())) {
1901 if (
fImage == sl->fImage) {
1911 TSlave *replace_slave = 0;
1913 while (
TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1914 if (sl->fImage == sl2->fImage) {
1919 replace_slave = sl2;
1926 Error(
"FindUniqueSlaves",
"TSlave is neither Master nor Slave");
1939 if (replace_slave) {
2031 if (s.Contains(
"Total MB's processed:")) {
2034 }
else if (s.Contains(
"Total real time used (s):")) {
2035 s.ReplaceAll(
"Total real time used (s):",
"");
2037 }
else if (s.Contains(
"Total CPU time used (s):")) {
2038 s.ReplaceAll(
"Total CPU time used (s):",
"");
2039 if (s.IsFloat())
fCpuTime = s.Atof();
2047 Printf(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2151 Printf(
"+++ Options: \"A\" show all queries known to server");
2152 Printf(
"+++ \"L\" show retrieved queries");
2153 Printf(
"+++ \"F\" full listing of query info");
2154 Printf(
"+++ \"H\" print this menu");
2156 Printf(
"+++ (case insensitive)");
2158 Printf(
"+++ Use Retrieve(<#>) to retrieve the full" 2159 " query results from the master");
2160 Printf(
"+++ e.g. Retrieve(8)");
2190 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2192 while ((pq = nxq()))
2199 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2206 Printf(
"+++ Queries available locally: %d", listlocal->
GetSize());
2207 TIter nxlq(listlocal);
2208 while ((pq = nxlq()))
2224 while (
TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2234 if (submasters.
GetSize() > 0) {
2242 EmitVA(
"IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2245 Info(
"IsDataReady",
"%lld / %lld (%s)",
2246 bytesready, totalbytes,
fDataReady?
"READY":
"NOT READY");
2264 if (slaves->
GetSize() == 0)
return;
2269 while ((sl = (
TSlave *)next())) {
2288 Int_t nparallel = 0;
2289 while (
TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2290 if (sl->GetParallel() >= 0)
2291 nparallel += sl->GetParallel();
2314 while ((slave = (
TSlave *) next()) != 0) {
2324 while ((activeslave = (
TSlave *) nextactive())) {
2333 while ((badslave = (
TSlave *) nextbad())) {
2342 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2350 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2355 Error(
"GetSlaveInfo",
"TSlave is neither Master nor Slave");
2376 while ((sl = (
TSlave*) next())) {
2406 if (workers->
GetSize() == 0)
return 0;
2409 TIter next(workers);
2412 while ((wrk = (
TSlave *)next())) {
2415 MarkBad(wrk,
"could not send group priority");
2457 if (!slaves || slaves->
GetSize() == 0)
return 0;
2463 while ((sl = (
TSlave *)next())) {
2466 MarkBad(sl,
"could not broadcast request");
2549 if (slaves->
GetSize() == 0)
return 0;
2555 while ((sl = (
TSlave *)next())) {
2558 MarkBad(sl,
"could not send broadcast-raw request");
2592 if (wrks->
GetSize() == 0)
return 0;
2598 while ((wrk = (
TSlave *)next())) {
2600 if (
SendFile(file, opt, rfile, wrk) < 0)
2601 Error(
"BroadcastFile",
2602 "problems sending file to worker %s (%s)",
2662 rc =
Collect(mon, timeout, endtype, deactonfail);
2688 while ((sl = (
TSlave*) next())) {
2693 rc =
Collect(mon, timeout, endtype, deactonfail);
2720 rc =
Collect(mon, timeout, endtype, deactonfail);
2739 Info(
"Collect",
">>>>>> Entering collect responses #%04d", collectId);
2769 int cnt = 0, rc = 0;
2774 Info(
"Collect",
"#%04d: active: %d", collectId, mon->
GetActive());
2786 while ((nact = mon->
GetActive(sto)) && (nto < 0 || nto > 0)) {
2792 if (al && al->
GetSize() > 0) {
2793 Info(
"Collect",
" %d node(s) still active:", al->
GetSize());
2796 while ((xs = (
TSocket *)nxs())) {
2814 fLastPollWorkers_s = time(0);
2817 Info(
"Collect",
"#%04d: now active: %d", collectId, mon->
GetActive());
2822 Info(
"Collect",
"Will invoke Select() #%04d", collectId);
2825 if (s && s != (
TSocket *)(-1)) {
2828 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2832 Info(
"Collect",
"#%04d: deactivating %p (active: %d, %p)", collectId,
2835 }
else if (rc == 2) {
2841 Info(
"Collect",
"save monitor: deactivating %p (active: %d, %p)",
2858 if (s == (
TSocket *)(-1) && nto > 0)
2870 while (mxws && (wrk = (
TSlave *) nxwr())) {
2875 Info(
"Collect",
"worker %s was asked to send its output to master",
2900 if (al && al->
GetSize() > 0) {
2902 Info(
"Collect",
" %d node(s) went in timeout:", al->
GetSize());
2905 while ((xs = (
TSocket *)nxs())) {
2933 Info(
"Collect",
"<<<<<< Exiting collect responses #%04d", collectId);
2950 Error(
"PollForNewWorkers",
"Can't invoke: not on a master -- should not happen!");
2954 Error(
"PollForNewWorkers",
"No ProofServ available -- should not happen!");
2964 TIter next(reqWorkers);
2967 while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2975 while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2982 if (found)
delete ni;
2984 newWorkers->
Add(ni);
2986 Info(
"PollForNewWorkers",
"New worker found: %s:%s",
2996 if (nNewWorkers > 0) {
2998 Info(
"PollForNewWorkers",
"Requesting to add %d new worker(s)", newWorkers->
GetEntries());
3001 Error(
"PollForNewWorkers",
"Call to AddWorkers() failed (got %d < 0)", rv);
3008 Info(
"PollForNewWorkers",
"No new worker found");
3037 if ((recvrc = s->
Recv(mess)) < 0) {
3039 Info(
"CollectInputFrom",
"%p: got %d from Recv()", s, recvrc);
3050 MarkBad(s,
"problems receiving a message in TProof::CollectInputFrom(...)");
3056 MarkBad(s,
"undefined message in TProof::CollectInputFrom(...)");
3064 if (rc == 1 && (endtype >= 0) && (what != endtype))
3084 Warning(
"HandleInputMessage",
"given an empty message or undefined worker");
3090 Warning(
"HandleInputMessage",
"worker socket is undefined");
3098 Info(
"HandleInputMessage",
"got type %d from '%s'", what, sl->
GetOrdinal());
3117 MarkBad(s,
"received kPROOF_FATAL");
3130 Info(
"HandleInputMessage",
"received kPROOF_STOP from %s: disabling any further collection this worker",
3162 Info(
"HandleInputMessage",
"%s: kPROOF_GETPACKET", sl->
GetOrdinal());
3204 Info(
"HandleInputMessage",
"%s: kPROOF_LOGFILE: size: %d", sl->
GetOrdinal(), size);
3212 Info(
"HandleInputMessage",
"%s: kPROOF_LOGDONE: status %d parallel %d",
3246 Info(
"HandleInputMessage",
3262 rc = (async) ? 0 : 1;
3287 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: enter");
3297 Error(
"HandleInputMessage",
3298 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3307 Error(
"HandleInputMessage",
3308 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3312 Error(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: unknown type: %d", type);
3325 Info(
"HandleInputMessage",
"kPROOF_SENDOUTPUT: enter (%s)", sl->
GetOrdinal());
3341 Info(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: enter");
3345 Info(
"HandleInputMessage",
"finalization on %s started ...", prefix);
3371 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: query result missing");
3373 }
else if (type > 0) {
3383 }
else if (
IsTty() || changed) {
3384 fprintf(stderr,
"%s\r", msg.
Data());
3404 while ((xo = nxin()))
3416 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: player undefined!");
3428 Info(
"HandleInputMessage",
"%s: kPROOF_OUTPUTLIST: enter", sl->
GetOrdinal());
3444 out = (
TList *) out->Clone();
3449 Info(
"HandleInputMessage",
3450 "%s: kPROOF_OUTPUTLIST: query result missing", sl->
GetOrdinal());
3459 Info(
"HandleInputMessage",
3460 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->
GetOrdinal());
3464 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->
GetOrdinal());
3474 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYLIST: enter");
3487 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_RETRIEVE: enter");
3496 Info(
"HandleInputMessage",
3497 "kPROOF_RETRIEVE: query result missing or player undefined");
3504 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MAXQUERIES: enter");
3508 Printf(
"Number of queries fully kept remotely: %d", max);
3514 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SERVERSTARTED: enter");
3516 UInt_t tot = 0, done = 0;
3520 (*mess) >> action >> tot >> done >> st;
3527 char msg[512] = {0};
3529 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3532 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3533 action.
Data(), done, tot, frac);
3536 fprintf(stderr,
"%s", msg);
3546 m << action << tot << done << st;
3554 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATASET_STATUS: enter");
3556 UInt_t tot = 0, done = 0;
3560 (*mess) >> action >> tot >> done >> st;
3566 char msg[512] = {0};
3568 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3571 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3572 action.
Data(), done, tot, frac);
3575 fprintf(stderr,
"%s", msg);
3585 m << action << tot << done << st;
3593 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STARTPROCESS: enter");
3613 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"Preparation time: %f s",
fPrepTime);
3618 (*mess) >> selec >> dsz >> first >> nent;
3620 if (!
gROOT->IsBatch()) {
3625 selec.
Data(), dsz, first, nent);
3639 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_ENDINIT: enter");
3658 Info(
"HandleInputMessage",
"%s: got kPROOF_SETIDLE", sl->
GetOrdinal());
3661 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3675 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYSUBMITTED: enter");
3700 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SESSIONTAG: enter");
3723 Info(
"HandleInputMessage",
"kPROOF_FEEDBACK: enter");
3736 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_AUTOBIN: enter");
3741 (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3747 answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3755 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PROGRESS: enter");
3764 Float_t initTime, procTime, evtrti, mbrti;
3765 (*mess) >> total >> processed >> bytesread
3766 >> initTime >> procTime
3770 initTime, procTime, evtrti, mbrti);
3775 (*mess) >> total >> processed;
3789 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STOPPROCESS: enter");
3796 (*mess) >> status >> abort;
3798 (*mess) >> events >> abort;
3804 TList *listOfMissingFiles = 0;
3805 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
3806 listOfMissingFiles =
new TList();
3807 listOfMissingFiles->
SetName(
"MissingFiles");
3826 Emit(
"StopProcess(Bool_t)", abort);
3832 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SUBMERGER: enter");
3839 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: enter");
3846 Error(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: no list received!");
3848 tmpinfo->SetOwner(
kFALSE);
3890 Info(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: enter");
3894 Error(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: fDSet not set");
3903 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATA_READY: enter");
3906 (*mess) >> dataready >> totalbytes >> bytesready;
3919 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MESSAGE: enter");
3932 fprintf(stderr,
"%s%c", msg.Data(), (lfeed ?
'\n' :
'\r'));
3940 fprintf(stderr,
"%s%c", msg.Data(), (lfeed ?
'\n' :
'\r'));
3956 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_VERSARCHCOMP: %s", vac.Data());
3959 if (vac.Tokenize(vers, from,
"|"))
3969 Error(
"HandleInputMessage",
"unknown command received from '%s' (what = %d)",
3997 Int_t merger_id = -1;
3998 (*mess) >> merger_id;
4001 Info(
"HandleSubmerger",
"kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4005 Error(
"HandleSubmerger",
"kOutputSize: #%d not in list ", merger_id);
4008 TMergerInfo * mi = (TMergerInfo *)
fMergers->
At(merger_id);
4018 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"all mergers removed ... ");
4022 PDB(kSubmerger, 2)
Error(
"HandleSubmerger",
"kOutputSent: received not on endmaster!");
4029 Int_t merger_id = -1;
4030 (*mess) >> merger_id;
4032 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown: #%d ", merger_id);
4035 Error(
"HandleSubmerger",
"kMergerDown: #%d not in list ", merger_id);
4039 TMergerInfo * mi = (TMergerInfo *)
fMergers->
At(merger_id);
4058 while ((o = nxo())) {
4061 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown:%d: exit", merger_id);
4069 Info(
"HandleSubmerger",
"worker %s reported as finished ", sl->
GetOrdinal());
4073 Info(
"HandleSubmerger",
"finalization on %s started ...", prefix);
4077 Int_t output_size = 0;
4078 Int_t merging_port = 0;
4079 (*mess) >> output_size >> merging_port;
4081 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
4082 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4098 msg.Form(
"%s: Invalid request: cannot start %d mergers for %d workers",
4108 if (activeWorkers > 1) {
4114 msg.Form(
"%s: Number of mergers set dynamically to %d (for %d workers)",
4117 msg.Form(
"%s: No mergers will be used for %d workers",
4118 prefix, activeWorkers);
4127 if (activeWorkers > 1) {
4132 while ((wrk = nxwk())) {
4140 msg.Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host",
4143 msg.Form(
"%s: No mergers will be used for %d workers",
4144 prefix, activeWorkers);
4152 msg.Form(
"%s: Number of mergers set by user to %d (for %d workers)",
4198 TMergerInfo *mgi = 0;
4199 while ((mgi = (TMergerInfo *) nxmg())) {
4220 Error(
"HandleSubMerger",
"kOutputSize received not on endmaster!");
4232 Int_t merger_id = -1;
4236 TMergerInfo *mgi = (TMergerInfo *)
fMergers->
At(i);
4246 if (merger_id == -1) {
4253 Info(
"RedirectWorker",
"redirecting worker %s to merger %d", sl->
GetOrdinal(), merger_id);
4255 PDB(kSubmerger, 2)
Info(
"RedirectWorker",
"redirecting output to merger #%d", merger_id);
4257 Error(
"RedirectWorker",
"#%d not in list ", merger_id);
4260 TMergerInfo * mi = (TMergerInfo *)
fMergers->
At(merger_id);
4263 sendoutput << merger_id;
4264 sendoutput << hname;
4266 s->
Send(sendoutput);
4278 while (fLastAssignedMerger < fMergers->GetSize() &&
4290 while (fLastAssignedMerger < fMergers->GetSize() &&
4311 PDB(kSubmerger, 2)
Info(
"AskForOutput",
4312 "worker %s was asked to send its output to master",
4316 sendoutput <<
TString(
"master");
4332 Info(
"UpdateDialog",
4333 "processing was aborted - %lld events processed",
4348 Info(
"UpdateDialog",
4349 "processing was stopped - %lld events processed",
4366 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4371 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4387 while ((sl = (
TSlave*) next()))
4400 while ((sl = (
TSlave*) next()))
4412 Int_t active_mergers = 0;
4415 TMergerInfo *mi = 0;
4416 while ((mi = (TMergerInfo *)mergers())) {
4417 if (mi->
IsActive()) active_mergers++;
4420 return active_mergers;
4429 Info(
"CreateMerger",
"worker %s will be merger ", sl->
GetOrdinal());
4431 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"Begin");
4435 Info(
"CreateMerger",
"cannot create merger on port %d - exit", port);
4452 Int_t workersOnHost = 0;
4456 workers = workersOnHost - 1;
4460 msg.
Form(
"worker %s on host %s will be merger for %d additional workers", sl->
GetOrdinal(), sl->
GetName(), workers);
4467 TMergerInfo * merger =
new TMergerInfo(sl, port, workers);
4472 bemerger << workers;
4475 PDB(kSubmerger,2)
Info(
"CreateMerger",
4476 "merger #%d (port: %d) for %d workers started",
4484 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"exit");
4495 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4501 Error(
"MarkBad",
"worker instance undefined: protocol error? ");
4522 msg.
Form(
"\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4524 (reason && strlen(reason)) ? reason :
"unknown");
4525 Info(
"MarkBad",
"%s", msg.Data());
4529 msg +=
TString::Format(
"\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4534 msg +=
TString::Format(
" +++ Please check the session logs for error messages either using\n");
4538 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->" 4542 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->" 4543 "Display(\"*\")\n\n", thisurl.
Data());
4544 Printf(
"%s", msg.Data());
4546 }
else if (reason) {
4548 Info(
"MarkBad",
"worker %s at %s:%d asked to terminate",
4556 TList *listOfMissingFiles = 0;
4557 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
4558 listOfMissingFiles =
new TList();
4559 listOfMissingFiles->
SetName(
"MissingFiles");
4568 packetizer->
MarkBad(wrk, 0, &listOfMissingFiles);
4575 if (
id !=
kNPOS) ord.Remove(0,
id+1);
4624 Int_t mergersCount = -1;
4626 if (mc) mergersCount = mc->
GetVal();
4628 if (mergersCount == 0) {
4630 if (activeWorkers > 1) {
4656 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4671 Warning(
"TerminateWorker",
"worker instance undefined: protocol error? ");
4681 Info(
"TerminateWorker",
"connection to worker is already down: cannot" 4682 " send termination message");
4694 if (ord && strlen(ord) > 0) {
4699 while ((wrk = (
TSlave *)nxw())) {
4700 if (all || !strcmp(wrk->
GetOrdinal(), ord)) {
4732 if (slaves->
GetSize() == 0)
return 0;
4738 while ((sl = (
TSlave *)next())) {
4740 if (sl->
Ping() == -1) {
4741 MarkBad(sl,
"ping unsuccessful");
4758 if (slaves->
GetSize() == 0)
return;
4763 while ((sl = (
TSlave *)next())) {
4781 IsValid() ?
"valid" :
"invalid");
4784 Printf(
"ROOT version|rev: %s|%s",
gROOT->GetVersion(),
gROOT->GetGitCommit());
4790 if (sl->GetSocket()->GetSecContext())
4791 Printf(
"Security context: %s",
4792 sl->GetSocket()->GetSecContext()->AsString(sc));
4793 Printf(
"Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4795 Printf(
"Security context: Error - No connection");
4796 Printf(
"Proofd protocol version: Error - No connection");
4808 Printf(
"*** Master server %s (parallel mode, %d workers):",
4811 Printf(
"*** Master server %s (sequential mode):",
4825 Printf(
"ROOT version|rev|tag: %s", ver.
Data());
4843 Printf(
"List of workers:");
4846 while (
TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4847 if (!sl->IsValid())
continue;
4854 if (sl->GetSocket()->Send(mess) == -1)
4855 const_cast<TProof*>(
this)->MarkBad(sl,
"could not send kPROOF_PRINT request");
4859 Error(
"Print",
"TSlave is neither Master nor Worker");
4911 TString outfile, dsname, stfopt;
4915 while (opt.
Tokenize(oo, from,
"[; ]")) {
4918 iof = opt.
Index(tagf);
4921 iof = opt.
Index(tagf);
4924 iod = opt.
Index(tagd);
4927 iod = opt.
Index(tagd);
4930 ios = opt.
Index(tags);
4932 tags =
"savetofile";
4933 ios = opt.
Index(tags);
4938 Error(
"HandleOutputOptions",
"options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4944 from = iof + tagf.
Length();
4946 Error(
"HandleOutputOptions",
"could not extract output file settings string! (%s)", opt.
Data());
4954 from = iod + tagd.
Length();
4955 if (!opt.
Tokenize(dsname, from,
"[; ]"))
4956 if (
gDebug > 0)
Info(
"HandleOutputOptions",
"no dataset name found: use default");
4965 if (dsname.
IsNull()) dsname =
"dataset_<qtag>";
4969 from = ios + tags.
Length();
4970 if (!opt.
Tokenize(stfopt, from,
"[; ]"))
4971 if (
gDebug > 0)
Info(
"HandleOutputOptions",
"save-to-file not found: use default");
4979 Error(
"HandleOutputOptions",
"save-to-file option must be a digit! (%s)", stfopt.
Data());
5003 Warning(
"HandleOutputOptions",
5004 "directory '%s' for the output file does not exists or is not writable:" 5025 if (
Exec(
"gProofServ->GetDataDir()",
"0",
kTRUE) == 0) {
5030 ddir = os->
GetString()(fst+1, lst-fst-1);
5032 emsg =
"could not find 'const char *' string in macro log! cannot continue";
5035 emsg =
"could not retrieve master data directory info! cannot continue";
5038 Error(
"HandleOutputOptions",
"%s", emsg.
Data());
5042 if (!ddir.
IsNull()) ddir +=
"/";
5044 outfile.
Form(
"%s<file>", ddir.
Data());
5065 Warning(
"HandleOutputOptions",
"Dataset required bu Save-To-File disabled: enabling!");
5066 stfopt.
Form(
"%d", ostf+1);
5081 if (target ==
"ds|V") {
5086 while ((o = nxo())) {
5098 Warning(
"HandleOutputOptions",
"could not retrieve TFileCollection for dataset '%s'", dsname.
Data());
5101 Warning(
"HandleOutputOptions",
"dataset not found!");
5113 Printf(
" Output successfully copied to %s", target.
Data());
5114 targetcopied =
kTRUE;
5116 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.
Data());
5124 while ((o = nxo())) {
5128 if (pof == pf && targetcopied)
continue;
5133 Printf(
" Output successfully copied to %s", target.
Data());
5136 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.
Data());
5146 Warning(
"HandleOutputOptions",
5153 if (!target.
IsNull() && !swapcopied) {
5156 if (!fout || (fout && fout->IsZombie())) {
5158 Warning(
"HandleOutputOptions",
"problems opening output file %s", target.
Data());
5163 while ((o = nxo())) {