44# include <sys/types.h>
51#include "RConfigure.h"
127 a = Getline(
"\nSwitch to asynchronous mode not supported remotely:"
128 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
130 a = Getline(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
131 " any other key to continue: ");
133 if (
a[0] ==
'Q' ||
a[0] ==
'S' ||
a[0] ==
'q' ||
a[0] ==
's') {
135 Info(
"Notify",
"Processing interrupt signal ... %c",
a[0]);
156 fSocket(s), fProof(p)
187 while (myord && otherord) {
188 Int_t myval = atoi(myord);
189 Int_t otherval = atoi(otherord);
190 if (myval < otherval)
return 1;
191 if (myval > otherval)
return -1;
192 myord = strchr(myord,
'.');
194 otherord = strchr(otherord,
'.');
195 if (otherord) otherord++;
197 if (myord)
return -1;
198 if (otherord)
return 1;
295 Error(
"SetMergedWorker",
"all workers have been already merged before!");
308 Error(
"AddWorker",
"all workers have been already assigned to this merger");
362 if( 0 == _cluster->
Length() ) {
363 Error(
"PoDCheckUrl",
"PoD server is not running");
403 if (!masterurl || strlen(masterurl) <= 0) {
406 }
else if (!(strstr(masterurl,
"://"))) {
443 }
else if (
fMaster ==
"prooflite") {
452 Init(masterurl, conffile, confdir, loglevel, alias);
457 if (
Exec(
"gProofServ->GetUser()",
"0",
kTRUE) == 0) {
464 emsg =
"could not find 'const char *' string in macro log";
467 emsg =
"could not retrieve user info";
476 Warning(
"TProof",
"%s: using local default %s", emsg.
Data(), usr.
Data());
486 gROOT->GetListOfSockets()->Remove(mgr);
487 gROOT->GetListOfSockets()->Add(mgr);
492 if (!
gROOT->GetListOfProofs()->FindObject(
this))
493 gROOT->GetListOfProofs()->Add(
this);
512 if (!
gROOT->GetListOfProofs()->FindObject(
this))
513 gROOT->GetListOfProofs()->Add(
this);
620 while (envs.Tokenize(env, from,
",")) {
623 Warning(
"Init",
"request for sending over undefined environemnt variable '%s' - ignoring", env.
Data());
625 if (!envsfound.
IsNull()) envsfound +=
",";
633 Warning(
"Init",
"none of the requested env variables were found: '%s'", envs.Data());
635 Info(
"Init",
"the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.
Data());
724 gROOT->GetListOfProofs()->Remove(
this);
740 Emit(
"CloseWindow()");
752 const char *confdir,
Int_t loglevel,
const char *alias)
776 if (!conffile || !conffile[0])
778 if (!confdir || !confdir[0])
834 Error(
"Init",
"could not create temporary logfile");
836 Error(
"Init",
"could not open temp logfile for reading");
898 if (enableSchemaEvolution) {
901 Info(
"TProof",
"automatic schema evolution in TMessage explicitly disabled");
911 Error(
"Init",
"failure asserting sandbox directory %s", sandbox.
Data());
920 Error(
"Init",
"failure asserting directory %s", packdir.
Data());
925 Info(
"Init",
"package directory set to %s", packdir.
Data());
934 Info(
"Init",
" %d global package directories registered", nglb);
956 GetRC(
"Proof.DynamicStartup", dyn);
996 gROOT->GetListOfSockets()->Add(
this);
1021 }
else if (sb ==
"..") {
1044 const char *cq = (
IsLite()) ?
"\"" :
"";
1045 while (sconf.Tokenize(opt, from,
",")) {
1046 if (opt.
IsNull())
continue;
1053 TString mst, top, sub, wrk, all;
1058 all =
n->GetTitle();
1060 mst =
n->GetTitle();
1062 top =
n->GetTitle();
1064 sub =
n->GetTitle();
1066 wrk =
n->GetTitle();
1068 if (all !=
"" && mst ==
"") mst = all;
1069 if (all !=
"" && top ==
"") top = all;
1070 if (all !=
"" && sub ==
"") sub = all;
1071 if (all !=
"" && wrk ==
"") wrk = all;
1072 if (all !=
"" && all.
BeginsWith(
"valgrind_opts:")) {
1074 Info(
"ParseConfigField",
"valgrind run: resetting 'PROOF_WRAPPERCMD':"
1075 " must be set again for next run , if any");
1079 cmd.
Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1080 TString mstlab(
"NO"), wrklab(
"NO");
1081 Bool_t doMaster = (opt ==
"valgrind" || (opt.
Contains(
"master") &&
1087 if (mst ==
"" || mst.
BeginsWith(
"valgrind_opts:")) {
1089 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), mst.
Data());
1092 }
else if (mst !=
"") {
1098 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1100 if (!opt.
Contains(
"workers"))
return;
1102 if (opt ==
"valgrind" || opt ==
"valgrind=") opt =
"valgrind=workers";
1107 if (top ==
"" || top.
BeginsWith(
"valgrind_opts:")) {
1109 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), top.
Data());
1112 }
else if (top !=
"") {
1118 if (sub ==
"" || sub.
BeginsWith(
"valgrind_opts:")) {
1120 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), sub.
Data());
1123 }
else if (sub !=
"") {
1129 if (wrk ==
"" || wrk.
BeginsWith(
"valgrind_opts:")) {
1131 var.
Form(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.
Data(), wrk.
Data(), cq);
1136 nwrks = opt(inw+1, opt.
Length());
1137 if (!nwrks.
IsDigit()) nwrks =
"2";
1149 }
else if (wrk !=
"") {
1163 Printf(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.
Data());
1165 Printf(
" ---> Starting a debug run with valgrind (workers:%s)", wrklab.
Data());
1167 Printf(
" ---> Please be patient: startup may be VERY slow ...");
1168 Printf(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1169 Printf(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1178 Printf(
"*** Requested IgProf performance profiling ***");
1179 TString addLogExt =
"__igprof.pp__.log";
1180 TString addLogFmt =
"igprof -pk -pp -t proofserv.exe -o %s.%s";
1188 tmp.
Form(addLogFmt.
Data(),
"<logfilemst>", addLogExt.
Data());
1191 tmp.
Form(addLogFmt.
Data(),
"<logfilewrk>", addLogExt.
Data());
1217 if ((
c !=
'+') && ((
c <
'0') || (
c >
'9')))
1236 if (
IsLite() && cpuPin) {
1237 Printf(
"*** Requested CPU pinning ***");
1239 const char *pinCmd =
"taskset -c <cpupin>";
1242 if (ev && (p =
dynamic_cast<TNamed *
>(ev->
FindObject(
"PROOF_SLAVE_WRAPPERCMD")))) {
1248 val.
Form(
"\"%s\"", pinCmd);
1261 if (!inpath || strlen(inpath) <= 0) {
1262 Error(
"AssertPath",
"undefined input path");
1271 Error(
"AssertPath",
"could not create path %s", path.
Data());
1278 Error(
"AssertPath",
"could not make path %s writable", path.
Data());
1297 gROOT->GetListOfSockets()->Remove(mgr);
1298 gROOT->GetListOfSockets()->Add(mgr);
1313 Error(
"AddWorkers",
"AddWorkers can only be called on the master!");
1317 if (!workerList || !(workerList->
GetSize())) {
1318 Error(
"AddWorkers",
"empty list of workers!");
1339 if (!addedWorkers) {
1341 Error(
"AddWorkers",
"cannot create new list for the workers to be added");
1349 while ((to = next())) {
1398 addedWorkers->
Add(slave);
1406 Info(
"AddWorkers",
"worker on host %s created"
1412 m <<
TString(
"Opening connections to workers") << nSlaves
1413 << nSlavesDone << slaveOk;
1427 TIter nxsl(addedWorkers);
1429 while ((sl = (
TSlave *) nxsl())) {
1440 Info(
"AddWorkers",
"worker on host %s finalized"
1450 m <<
TString(
"Setting up worker servers") << nSlaves
1451 << nSlavesDone << slaveOk;
1470 Info(
"AddWorkers",
"will invoke GoMoreParallel()");
1473 Info(
"AddWorkers",
"GoMoreParallel()=%d", nw);
1479 Info(
"AddWorkers",
"will invoke GoParallel()");
1488 Info(
"AddWorkers",
"will invoke SaveWorkerInfo()");
1494 Info(
"AddWorkers",
"will invoke SendParallel()");
1497 if (goMoreParallel &&
fPlayer) {
1502 Info(
"AddWorkers",
"will send the PROCESS message to selected workers");
1510 delete addedWorkers;
1523 if (packs && packs->
GetSize() > 0) {
1526 while ((pck = (
TPair *) nxp())) {
1532 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on added workers");
1537 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on all workers");
1546 delete server_packs;
1555 Info(
"SetupWorkersEnv",
"will invoke Load() on selected workers");
1567 Info(
"SetupWorkersEnv",
"will invoke AddDynamicPath() on selected workers");
1575 Info(
"SetupWorkersEnv",
"will invoke AddIncludePath() on selected workers");
1590 Error(
"RemoveWorkers",
"RemoveWorkers can only be called on the master!");
1600 while ((sl = (
TSlave *) nxsl())) {
1606 if (!(workerList->
GetSize())) {
1607 Error(
"RemoveWorkers",
"The list of workers should not be empty!");
1615 while ((to = next())) {
1617 if (!strcmp(to->
ClassName(),
"TProofNodeInfo")) {
1621 while ((sl = (
TSlave *) nxsl())) {
1629 Warning(
"RemoveWorkers",
"unknown object type: %s - it should be"
1630 " TProofNodeInfo or inheriting from TSlave", to->
ClassName());
1660 TString emsg(
"no resource currently available for this session: please retry later");
1672 Printf(
"Starting master: opening connection ...");
1678 fprintf(stderr,
"Starting master:"
1679 " connection open: setting up server ... \r");
1693 Printf(
"Starting master: OK ");
1699 Error(
"StartSlaves",
1700 "client and remote protocols not compatible (%d and %d)",
1719 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1722 if (slStatus == -99)
1723 Error(
"StartSlaves",
"no resources available or problems setting up workers (check logs)");
1724 else if (slStatus == -98)
1725 Error(
"StartSlaves",
"could not setup output redirection on master");
1727 Error(
"StartSlaves",
"setting up master");
1738 Error(
"StartSlaves",
1739 "failed to setup connection with PROOF master server");
1745 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1751 Printf(
"Starting master: failure");
1756 Printf(
"Starting master: OK ");
1761 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1774 Error(
"StartSlaves",
"failed to create (or connect to) the PROOF master server");
1790 { std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
1799 while ((sl = (
TSlave *)nxs()))
1813 gROOT->GetListOfSockets()->Remove(
this);
1825 gROOT->GetListOfProofs()->Remove(
this);
1844 Int_t perf,
const char *image,
const char *workdir)
1866 const char *image,
const char *msd,
Int_t nwk)
1886 while ((sl = (
TSlave *)next())) {
1913 if (
fImage == sl->fImage) {
1923 TSlave *replace_slave = 0;
1926 if (sl->fImage == sl2->fImage) {
1931 replace_slave = sl2;
1938 Error(
"FindUniqueSlaves",
"TSlave is neither Master nor Slave");
1951 if (replace_slave) {
2043 if (s.
Contains(
"Total MB's processed:")) {
2046 }
else if (s.
Contains(
"Total real time used (s):")) {
2047 s.
ReplaceAll(
"Total real time used (s):",
"");
2049 }
else if (s.
Contains(
"Total CPU time used (s):")) {
2050 s.
ReplaceAll(
"Total CPU time used (s):",
"");
2059 Printf(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2163 Printf(
"+++ Options: \"A\" show all queries known to server");
2164 Printf(
"+++ \"L\" show retrieved queries");
2165 Printf(
"+++ \"F\" full listing of query info");
2166 Printf(
"+++ \"H\" print this menu");
2168 Printf(
"+++ (case insensitive)");
2170 Printf(
"+++ Use Retrieve(<#>) to retrieve the full"
2171 " query results from the master");
2172 Printf(
"+++ e.g. Retrieve(8)");
2202 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2204 while ((pq = nxq()))
2211 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2218 Printf(
"+++ Queries available locally: %d", listlocal->
GetSize());
2219 TIter nxlq(listlocal);
2220 while ((pq = nxlq()))
2236 while (
TSlave *sl =
dynamic_cast<TSlave*
>(nextSlave())) {
2246 if (submasters.
GetSize() > 0) {
2254 EmitVA(
"IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2257 Info(
"IsDataReady",
"%lld / %lld (%s)",
2258 bytesready, totalbytes,
fDataReady?
"READY":
"NOT READY");
2276 if (slaves->
GetSize() == 0)
return;
2281 while ((sl = (
TSlave *)next())) {
2300 Int_t nparallel = 0;
2301 while (
TSlave* sl =
dynamic_cast<TSlave*
>(nextSlave()))
2302 if (sl->GetParallel() >= 0)
2303 nparallel += sl->GetParallel();
2326 while ((slave = (
TSlave *) next()) != 0) {
2336 while ((activeslave = (
TSlave *) nextactive())) {
2345 while ((badslave = (
TSlave *) nextbad())) {
2354 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2362 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2367 Error(
"GetSlaveInfo",
"TSlave is neither Master nor Slave");
2388 while ((sl = (
TSlave*) next())) {
2418 if (workers->
GetSize() == 0)
return 0;
2421 TIter next(workers);
2424 while ((wrk = (
TSlave *)next())) {
2427 MarkBad(wrk,
"could not send group priority");
2469 if (!slaves || slaves->
GetSize() == 0)
return 0;
2475 while ((sl = (
TSlave *)next())) {
2478 MarkBad(sl,
"could not broadcast request");
2561 if (slaves->
GetSize() == 0)
return 0;
2567 while ((sl = (
TSlave *)next())) {
2570 MarkBad(sl,
"could not send broadcast-raw request");
2604 if (wrks->
GetSize() == 0)
return 0;
2610 while ((wrk = (
TSlave *)next())) {
2613 Error(
"BroadcastFile",
2614 "problems sending file to worker %s (%s)",
2674 rc =
Collect(mon, timeout, endtype, deactonfail);
2700 while ((sl = (
TSlave*) next())) {
2705 rc =
Collect(mon, timeout, endtype, deactonfail);
2732 rc =
Collect(mon, timeout, endtype, deactonfail);
2751 Info(
"Collect",
">>>>>> Entering collect responses #%04d", collectId);
2781 int cnt = 0, rc = 0;
2786 Info(
"Collect",
"#%04d: active: %d", collectId, mon->
GetActive());
2798 while ((nact = mon->
GetActive(sto)) && (nto < 0 || nto > 0)) {
2804 if (al && al->
GetSize() > 0) {
2805 Info(
"Collect",
" %d node(s) still active:", al->
GetSize());
2808 while ((xs = (
TSocket *)nxs())) {
2830 Info(
"Collect",
"#%04d: now active: %d", collectId, mon->
GetActive());
2835 Info(
"Collect",
"Will invoke Select() #%04d", collectId);
2838 if (s && s != (
TSocket *)(-1)) {
2841 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2846 Info(
"Collect",
"#%04d: deactivating %p (active: %d, %p)", collectId,
2850 }
else if (rc == 2) {
2857 Info(
"Collect",
"save monitor: deactivating %p (active: %d, %p)",
2875 if (s == (
TSocket *)(-1) && nto > 0)
2887 while (mxws && (wrk = (
TSlave *) nxwr())) {
2892 Info(
"Collect",
"worker %s was asked to send its output to master",
2917 if (al && al->
GetSize() > 0) {
2919 Info(
"Collect",
" %d node(s) went in timeout:", al->
GetSize());
2922 while ((xs = (
TSocket *)nxs())) {
2951 Info(
"Collect",
"<<<<<< Exiting collect responses #%04d", collectId);
2968 Error(
"PollForNewWorkers",
"Can't invoke: not on a master -- should not happen!");
2972 Error(
"PollForNewWorkers",
"No ProofServ available -- should not happen!");
2982 TIter next(reqWorkers);
2993 while (( sl =
dynamic_cast<TSlave *
>(nextInner()) )) {
3000 if (found)
delete ni;
3002 newWorkers->
Add(ni);
3004 Info(
"PollForNewWorkers",
"New worker found: %s:%s",
3014 if (nNewWorkers > 0) {
3016 Info(
"PollForNewWorkers",
"Requesting to add %d new worker(s)", newWorkers->
GetEntries());
3019 Error(
"PollForNewWorkers",
"Call to AddWorkers() failed (got %d < 0)", rv);
3026 Info(
"PollForNewWorkers",
"No new worker found");
3055 if ((recvrc = s->
Recv(mess)) < 0) {
3057 Info(
"CollectInputFrom",
"%p: got %d from Recv()", s, recvrc);
3068 MarkBad(s,
"problems receiving a message in TProof::CollectInputFrom(...)");
3074 MarkBad(s,
"undefined message in TProof::CollectInputFrom(...)");
3082 if (rc == 1 && (endtype >= 0) && (
what != endtype))
3102 Warning(
"HandleInputMessage",
"given an empty message or undefined worker");
3108 Warning(
"HandleInputMessage",
"worker socket is undefined");
3135 MarkBad(s,
"received kPROOF_FATAL");
3148 Info(
"HandleInputMessage",
"received kPROOF_STOP from %s: disabling any further collection this worker",
3180 Info(
"HandleInputMessage",
"%s: kPROOF_GETPACKET", sl->
GetOrdinal());
3222 Info(
"HandleInputMessage",
"%s: kPROOF_LOGFILE: size: %d", sl->
GetOrdinal(), size);
3230 Info(
"HandleInputMessage",
"%s: kPROOF_LOGDONE: status %d parallel %d",
3264 Info(
"HandleInputMessage",
3280 rc = (async) ? 0 : 1;
3305 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: enter");
3315 Error(
"HandleInputMessage",
3316 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3325 Error(
"HandleInputMessage",
3326 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3330 Error(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: unknown type: %d",
type);
3343 Info(
"HandleInputMessage",
"kPROOF_SENDOUTPUT: enter (%s)", sl->
GetOrdinal());
3359 Info(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: enter");
3363 Info(
"HandleInputMessage",
"finalization on %s started ...", prefix);
3389 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: query result missing");
3391 }
else if (
type > 0) {
3401 }
else if (
IsTty() || changed) {
3402 fprintf(stderr,
"%s\r", msg.
Data());
3422 while ((xo = nxin()))
3434 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: player undefined!");
3445 Info(
"HandleInputMessage",
"%s: kPROOF_OUTPUTLIST: enter", sl->
GetOrdinal());
3467 Info(
"HandleInputMessage",
3468 "%s: kPROOF_OUTPUTLIST: query result missing", sl->
GetOrdinal());
3477 Info(
"HandleInputMessage",
3478 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->
GetOrdinal());
3482 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->
GetOrdinal());
3492 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYLIST: enter");
3505 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_RETRIEVE: enter");
3514 Info(
"HandleInputMessage",
3515 "kPROOF_RETRIEVE: query result missing or player undefined");
3522 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MAXQUERIES: enter");
3526 Printf(
"Number of queries fully kept remotely: %d", max);
3532 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SERVERSTARTED: enter");
3534 UInt_t tot = 0, done = 0;
3538 (*mess) >> action >> tot >> done >> st;
3545 char msg[512] = {0};
3547 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3550 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3551 action.
Data(), done, tot, frac);
3554 fprintf(stderr,
"%s", msg);
3564 m << action << tot << done << st;
3572 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATASET_STATUS: enter");
3574 UInt_t tot = 0, done = 0;
3578 (*mess) >> action >> tot >> done >> st;
3584 char msg[512] = {0};
3586 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3589 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3590 action.
Data(), done, tot, frac);
3593 fprintf(stderr,
"%s", msg);
3603 m << action << tot << done << st;
3611 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STARTPROCESS: enter");
3631 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"Preparation time: %f s",
fPrepTime);
3636 (*mess) >> selec >> dsz >>
first >> nent;
3638 if (!
gROOT->IsBatch()) {
3657 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_ENDINIT: enter");
3676 Info(
"HandleInputMessage",
"%s: got kPROOF_SETIDLE", sl->
GetOrdinal());
3679 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3693 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYSUBMITTED: enter");
3718 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SESSIONTAG: enter");
3741 Info(
"HandleInputMessage",
"kPROOF_FEEDBACK: enter");
3754 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_AUTOBIN: enter");
3773 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PROGRESS: enter");
3782 Float_t initTime, procTime, evtrti, mbrti;
3783 (*mess) >>
total >> processed >> bytesread
3784 >> initTime >> procTime
3788 initTime, procTime, evtrti, mbrti);
3793 (*mess) >>
total >> processed;
3807 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STOPPROCESS: enter");
3814 (*mess) >> status >> abort;
3816 (*mess) >> events >> abort;
3822 TList *listOfMissingFiles = 0;
3823 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
3824 listOfMissingFiles =
new TList();
3825 listOfMissingFiles->
SetName(
"MissingFiles");
3844 Emit(
"StopProcess(Bool_t)", abort);
3850 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SUBMERGER: enter");
3857 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: enter");
3864 Error(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: no list received!");
3908 Info(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: enter");
3912 Error(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: fDSet not set");
3921 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATA_READY: enter");
3924 (*mess) >> dataready >> totalbytes >> bytesready;
3937 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MESSAGE: enter");
3950 fprintf(stderr,
"%s%c", msg.
Data(), (lfeed ?
'\n' :
'\r'));
3958 fprintf(stderr,
"%s%c", msg.
Data(), (lfeed ?
'\n' :
'\r'));
3974 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_VERSARCHCOMP: %s", vac.
Data());
3987 Error(
"HandleInputMessage",
"unknown command received from '%s' (what = %d)",
4015 Int_t merger_id = -1;
4016 (*mess) >> merger_id;
4019 Info(
"HandleSubmerger",
"kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4023 Error(
"HandleSubmerger",
"kOutputSize: #%d not in list ", merger_id);
4036 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"all mergers removed ... ");
4040 PDB(kSubmerger, 2)
Error(
"HandleSubmerger",
"kOutputSent: received not on endmaster!");
4047 Int_t merger_id = -1;
4048 (*mess) >> merger_id;
4050 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown: #%d ", merger_id);
4053 Error(
"HandleSubmerger",
"kMergerDown: #%d not in list ", merger_id);
4076 while ((o = nxo())) {
4079 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown:%d: exit", merger_id);
4087 Info(
"HandleSubmerger",
"worker %s reported as finished ", sl->
GetOrdinal());
4091 Info(
"HandleSubmerger",
"finalization on %s started ...", prefix);
4095 Int_t output_size = 0;
4096 Int_t merging_port = 0;
4097 (*mess) >> output_size >> merging_port;
4099 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
4100 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4116 msg.
Form(
"%s: Invalid request: cannot start %d mergers for %d workers",
4126 if (activeWorkers > 1) {
4132 msg.
Form(
"%s: Number of mergers set dynamically to %d (for %d workers)",
4135 msg.
Form(
"%s: No mergers will be used for %d workers",
4136 prefix, activeWorkers);
4145 if (activeWorkers > 1) {
4150 while ((wrk = nxwk())) {
4158 msg.
Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host",
4161 msg.
Form(
"%s: No mergers will be used for %d workers",
4162 prefix, activeWorkers);
4170 msg.
Form(
"%s: Number of mergers set by user to %d (for %d workers)",
4238 Error(
"HandleSubMerger",
"kOutputSize received not on endmaster!");
4250 Int_t merger_id = -1;
4264 if (merger_id == -1) {
4271 Info(
"RedirectWorker",
"redirecting worker %s to merger %d", sl->
GetOrdinal(), merger_id);
4273 PDB(kSubmerger, 2)
Info(
"RedirectWorker",
"redirecting output to merger #%d", merger_id);
4275 Error(
"RedirectWorker",
"#%d not in list ", merger_id);
4281 sendoutput << merger_id;
4282 sendoutput << hname;
4284 s->
Send(sendoutput);
4296 while (fLastAssignedMerger < fMergers->GetSize() &&
4308 while (fLastAssignedMerger < fMergers->GetSize() &&
4329 PDB(kSubmerger, 2)
Info(
"AskForOutput",
4330 "worker %s was asked to send its output to master",
4334 sendoutput <<
TString(
"master");
4350 Info(
"UpdateDialog",
4351 "processing was aborted - %lld events processed",
4366 Info(
"UpdateDialog",
4367 "processing was stopped - %lld events processed",
4384 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4389 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4405 while ((sl = (
TSlave*) next()))
4418 while ((sl = (
TSlave*) next()))
4430 Int_t active_mergers = 0;
4435 if (mi->
IsActive()) active_mergers++;
4438 return active_mergers;
4447 Info(
"CreateMerger",
"worker %s will be merger ", sl->
GetOrdinal());
4449 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"Begin");
4453 Info(
"CreateMerger",
"cannot create merger on port %d - exit", port);
4470 Int_t workersOnHost = 0;
4474 workers = workersOnHost - 1;
4478 msg.
Form(
"worker %s on host %s will be merger for %d additional workers", sl->
GetOrdinal(), sl->
GetName(), workers);
4490 bemerger << workers;
4493 PDB(kSubmerger,2)
Info(
"CreateMerger",
4494 "merger #%d (port: %d) for %d workers started",
4502 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"exit");
4513 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4519 Error(
"MarkBad",
"worker instance undefined: protocol error? ");
4540 msg.
Form(
"\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4542 (reason && strlen(reason)) ? reason :
"unknown");
4547 msg +=
TString::Format(
"\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4552 msg +=
TString::Format(
" +++ Please check the session logs for error messages either using\n");
4556 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4560 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4561 "Display(\"*\")\n\n", thisurl.
Data());
4564 }
else if (reason) {
4566 Info(
"MarkBad",
"worker %s at %s:%d asked to terminate",
4574 TList *listOfMissingFiles = 0;
4575 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
4576 listOfMissingFiles =
new TList();
4577 listOfMissingFiles->
SetName(
"MissingFiles");
4586 packetizer->
MarkBad(wrk, 0, &listOfMissingFiles);
4642 Int_t mergersCount = -1;
4644 if (mc) mergersCount = mc->
GetVal();
4646 if (mergersCount == 0) {
4648 if (activeWorkers > 1) {
4674 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4689 Warning(
"TerminateWorker",
"worker instance undefined: protocol error? ");
4699 Info(
"TerminateWorker",
"connection to worker is already down: cannot"
4700 " send termination message");
4712 if (ord && strlen(ord) > 0) {
4717 while ((wrk = (
TSlave *)nxw())) {
4718 if (all || !strcmp(wrk->
GetOrdinal(), ord)) {
4750 if (slaves->
GetSize() == 0)
return 0;
4756 while ((sl = (
TSlave *)next())) {
4758 if (sl->
Ping() == -1) {
4759 MarkBad(sl,
"ping unsuccessful");