Logo ROOT  
Reference Guide
TProof.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2 // Author: Fons Rademakers 13/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /**
13 \defgroup proof PROOF
14 
15 Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
16 
17 \deprecated
18 We keep PROOF for those who still need it for legacy use cases.
19 PROOF is not developed anymore and receiving only limited support.
20 %ROOT has since a few years moved to RDataFrame and related products as multi-core/multi-processing engines.
21 
22 */
23 
24 /**
25 \defgroup proofkernel PROOF kernel Libraries
26 \ingroup proof
27 
28 The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
29 the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
30 and handling of results.
31 
32 */
33 
34 /** \class TProof
35 \ingroup proofkernel
36 
37 This class controls a Parallel ROOT Facility, PROOF, cluster.
38 It fires the worker servers, it keeps track of how many workers are
39 running, it keeps track of the workers running status, it broadcasts
40 messages to all workers, it collects results, etc.
41 
42 */
43 
44 #include <stdlib.h>
45 #include <fcntl.h>
46 #include <errno.h>
47 #ifdef WIN32
48 # include <io.h>
49 # include <sys/stat.h>
50 # include <sys/types.h>
51 # include "snprintf.h"
52 #else
53 # include <unistd.h>
54 #endif
55 #include <vector>
56 
57 #include "RConfigure.h"
58 #include "Riostream.h"
59 #include "Getline.h"
60 #include "TBrowser.h"
61 #include "TChain.h"
62 #include "TCondor.h"
63 #include "TDSet.h"
64 #include "TError.h"
65 #include "TEnv.h"
66 #include "TEntryList.h"
67 #include "TEventList.h"
68 #include "TFile.h"
69 #include "TFileInfo.h"
70 #include "TFTP.h"
71 #include "THashList.h"
72 #include "TInterpreter.h"
73 #include "TKey.h"
74 #include "TMap.h"
75 #include "TMath.h"
76 #include "TMessage.h"
77 #include "TMonitor.h"
78 #include "TObjArray.h"
79 #include "TObjString.h"
80 #include "TParameter.h"
81 #include "TProof.h"
82 #include "TProofNodeInfo.h"
83 #include "TProofOutputFile.h"
84 #include "TVirtualProofPlayer.h"
85 #include "TVirtualPacketizer.h"
86 #include "TProofServ.h"
87 #include "TPluginManager.h"
88 #include "TQueryResult.h"
89 #include "TRandom.h"
90 #include "TRegexp.h"
91 #include "TROOT.h"
92 #include "TSlave.h"
93 #include "TSocket.h"
94 #include "TSortedList.h"
95 #include "TSystem.h"
96 #include "TTree.h"
97 #include "TUrl.h"
98 #include "TFileCollection.h"
99 #include "TDataSetManager.h"
100 #include "TDataSetManagerFile.h"
101 #include "TMacro.h"
102 #include "TSelector.h"
103 #include "TPRegexp.h"
104 #include "TPackMgr.h"
105 
106 #include <mutex>
107 
109 
110 // Rotating indicator
111 char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
112 
113 TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
114 TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
115 
117 
118 //----- PROOF Interrupt signal handler -----------------------------------------
119 ////////////////////////////////////////////////////////////////////////////////
120 /// TProof interrupt handler.
121 
123 {
124  if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
125 
126  // Cannot ask the user : abort any remote processing
128 
129  } else {
130  // Real stop or request to switch to asynchronous?
131  const char *a = 0;
132  if (fProof->GetRemoteProtocol() < 22) {
133  a = Getline("\nSwitch to asynchronous mode not supported remotely:"
134  "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
135  } else {
136  a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
137  " any other key to continue: ");
138  }
139  if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
140 
141  Info("Notify","Processing interrupt signal ... %c", a[0]);
142 
143  // Stop or abort any remote processing
144  Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
145  fProof->StopProcess(abort);
146 
147  } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
148  // Stop any remote processing
150  }
151  }
152 
153  return kTRUE;
154 }
155 
156 //----- Input handler for messages from TProofServ -----------------------------
157 ////////////////////////////////////////////////////////////////////////////////
158 /// Constructor
159 
161  : TFileHandler(s->GetDescriptor(),1),
162  fSocket(s), fProof(p)
163 {
164 }
165 
166 ////////////////////////////////////////////////////////////////////////////////
167 /// Handle input
168 
170 {
172  return kTRUE;
173 }
174 
175 
176 //------------------------------------------------------------------------------
177 
179 
180 ////////////////////////////////////////////////////////////////////////////////
181 /// Used to sort slaveinfos by ordinal.
182 
184 {
185  if (!obj) return 1;
186 
187  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
188 
189  if (!si) return fOrdinal.CompareTo(obj->GetName());
190 
191  const char *myord = GetOrdinal();
192  const char *otherord = si->GetOrdinal();
193  while (myord && otherord) {
194  Int_t myval = atoi(myord);
195  Int_t otherval = atoi(otherord);
196  if (myval < otherval) return 1;
197  if (myval > otherval) return -1;
198  myord = strchr(myord, '.');
199  if (myord) myord++;
200  otherord = strchr(otherord, '.');
201  if (otherord) otherord++;
202  }
203  if (myord) return -1;
204  if (otherord) return 1;
205  return 0;
206 }
207 
208 ////////////////////////////////////////////////////////////////////////////////
209 /// Used to compare slaveinfos by ordinal.
210 
212 {
213  if (!obj) return kFALSE;
214  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
215  if (!si) return kFALSE;
216  return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
217 }
218 
219 ////////////////////////////////////////////////////////////////////////////////
220 /// Print slave info. If opt = "active" print only the active
221 /// slaves, if opt="notactive" print only the not active slaves,
222 /// if opt = "bad" print only the bad slaves, else
223 /// print all slaves.
224 
225 void TSlaveInfo::Print(Option_t *opt) const
226 {
227  TString stat = fStatus == kActive ? "active" :
228  fStatus == kBad ? "bad" :
229  "not active";
230 
231  Bool_t newfmt = kFALSE;
232  TString oo(opt);
233  if (oo.Contains("N")) {
234  newfmt = kTRUE;
235  oo.ReplaceAll("N","");
236  }
237  if (oo == "active" && fStatus != kActive) return;
238  if (oo == "notactive" && fStatus != kNotActive) return;
239  if (oo == "bad" && fStatus != kBad) return;
240 
241  if (newfmt) {
242  TString msd, si, datadir;
243  if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
244  if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
245  if (fSysInfo.fCpus > 0) {
246  si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
248  } else {
249  si.Form("| %s", fHostName.Data());
250  }
251  Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
252 
253  } else {
254  TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
255 
256  std::cout << "Slave: " << fOrdinal
257  << " hostname: " << fHostName
258  << " msd: " << msd
259  << " perf index: " << fPerfIndex
260  << " " << stat
261  << std::endl;
262  }
263 }
264 
265 ////////////////////////////////////////////////////////////////////////////////
266 /// Setter for fSysInfo
267 
269 {
270  fSysInfo.fOS = si.fOS; // OS
271  fSysInfo.fModel = si.fModel; // computer model
272  fSysInfo.fCpuType = si.fCpuType; // type of cpu
273  fSysInfo.fCpus = si.fCpus; // number of cpus
274  fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
275  fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
276  fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
277  fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
278 }
279 
281 
282 //------------------------------------------------------------------------------
283 
284 ////////////////////////////////////////////////////////////////////////////////
285 /// Destructor
286 
288 {
289  // Just delete the list, the objects are owned by other list
290  if (fWorkers) {
293  }
294 }
295 ////////////////////////////////////////////////////////////////////////////////
296 /// Increase number of already merged workers by 1
297 
299 {
300  if (AreAllWorkersMerged())
301  Error("SetMergedWorker", "all workers have been already merged before!");
302  else
303  fMergedWorkers++;
304 }
305 
306 ////////////////////////////////////////////////////////////////////////////////
307 /// Add new worker to the list of workers to be merged by this merger
308 
310 {
311  if (!fWorkers)
312  fWorkers = new TList();
313  if (fWorkersToMerge == fWorkers->GetSize()) {
314  Error("AddWorker", "all workers have been already assigned to this merger");
315  return;
316  }
317  fWorkers->Add(sl);
318 }
319 
320 ////////////////////////////////////////////////////////////////////////////////
321 /// Return if merger has already merged all workers, i.e. if it has finished its merging job
322 
324 {
325  return (fWorkersToMerge == fMergedWorkers);
326 }
327 
328 ////////////////////////////////////////////////////////////////////////////////
329 /// Return if the determined number of workers has been already assigned to this merger
330 
332 {
333  if (!fWorkers)
334  return kFALSE;
335 
336  return (fWorkers->GetSize() == fWorkersToMerge);
337 }
338 
339 ////////////////////////////////////////////////////////////////////////////////
340 /// This a private API function.
341 /// It checks whether the connection string contains a PoD cluster protocol.
342 /// If it does, then the connection string will be changed to reflect
343 /// a real PROOF connection string for a PROOF cluster managed by PoD.
344 /// PoD: http://pod.gsi.de .
345 /// Return -1 if the PoD request failed; return 0 otherwise.
346 
347 static Int_t PoDCheckUrl(TString *_cluster)
348 {
349  if ( !_cluster )
350  return 0;
351 
352  // trim spaces from both sides of the string
353  *_cluster = _cluster->Strip( TString::kBoth );
354  // PoD protocol string
355  const TString pod_prot("pod");
356 
357  // URL test
358  // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
359  TUrl url( _cluster->Data() );
360  if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
361  return 0;
362 
363  // PoD cluster is used
364  // call pod-info in a batch mode (-b).
365  // pod-info will find either a local PoD cluster or
366  // a remote one, manged by pod-remote.
367  *_cluster = gSystem->GetFromPipe("pod-info -c -b");
368  if( 0 == _cluster->Length() ) {
369  Error("PoDCheckUrl", "PoD server is not running");
370  return -1;
371  }
372  return 0;
373 }
374 
375 ////////////////////////////////////////////////////////////////////////////////
376 /// Create a PROOF environment. Starting PROOF involves either connecting
377 /// to a master server, which in turn will start a set of slave servers, or
378 /// directly starting as master server (if master = ""). Masterurl is of
379 /// the form: [proof[s]://]host[:port]. Conffile is the name of the config
380 /// file describing the remote PROOF cluster (this argument alows you to
381 /// describe different cluster configurations).
382 /// The default is proof.conf. Confdir is the directory where the config
383 /// file and other PROOF related files are (like motd and noproof files).
384 /// Loglevel is the log level (default = 1). User specified custom config
385 /// files will be first looked for in $HOME/.conffile.
386 
387 TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
388  Int_t loglevel, const char *alias, TProofMgr *mgr)
389  : fUrl(masterurl)
390 {
391  // Default initializations
392  InitMembers();
393 
394  // This may be needed during init
395  fManager = mgr;
396 
397  // Default server type
399 
400  // Default query mode
401  fQueryMode = kSync;
402 
403  // Parse the main URL, adjusting the missing fields and setting the relevant
404  // bits
407 
408  // Protocol and Host
409  if (!masterurl || strlen(masterurl) <= 0) {
410  fUrl.SetProtocol("proof");
411  fUrl.SetHost("__master__");
412  } else if (!(strstr(masterurl, "://"))) {
413  fUrl.SetProtocol("proof");
414  }
415  // Port
416  if (fUrl.GetPort() == TUrl(" ").GetPort())
417  fUrl.SetPort(TUrl("proof:// ").GetPort());
418 
419  // Make sure to store the FQDN, so to get a solid reference for subsequent checks
420  if (!strcmp(fUrl.GetHost(), "__master__"))
421  fMaster = fUrl.GetHost();
422  else if (!strlen(fUrl.GetHost()))
424  else
426 
427  // Server type
428  if (strlen(fUrl.GetOptions()) > 0) {
429  TString opts(fUrl.GetOptions());
430  if (!(strncmp(fUrl.GetOptions(),"std",3))) {
432  opts.Remove(0,3);
433  fUrl.SetOptions(opts.Data());
434  } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
436  opts.Remove(0,4);
437  fUrl.SetOptions(opts.Data());
438  }
439  }
440 
441  // Instance type
445  if (fMaster == "__master__") {
446  fMasterServ = kTRUE;
449  } else if (fMaster == "prooflite") {
450  // Client and master are merged
451  fMasterServ = kTRUE;
453  }
454  // Flag that we are a client
456  if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
457 
458  Init(masterurl, conffile, confdir, loglevel, alias);
459 
460  // If the user was not set, get it from the master
461  if (strlen(fUrl.GetUser()) <= 0) {
462  TString usr, emsg;
463  if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
464  TObjString *os = fMacroLog.GetLineWith("const char");
465  if (os) {
466  Ssiz_t fst = os->GetString().First('\"');
467  Ssiz_t lst = os->GetString().Last('\"');
468  usr = os->GetString()(fst+1, lst-fst-1);
469  } else {
470  emsg = "could not find 'const char *' string in macro log";
471  }
472  } else {
473  emsg = "could not retrieve user info";
474  }
475  if (!emsg.IsNull()) {
476  // Get user logon name
478  if (pw) {
479  usr = pw->fUser;
480  delete pw;
481  }
482  Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
483  }
484  // Set the user name in the main URL
485  fUrl.SetUser(usr.Data());
486  }
487 
488  // If called by a manager, make sure it stays in last position
489  // for cleaning
490  if (mgr) {
492  gROOT->GetListOfSockets()->Remove(mgr);
493  gROOT->GetListOfSockets()->Add(mgr);
494  }
495 
496  // Old-style server type: we add this to the list and set the global pointer
498  if (!gROOT->GetListOfProofs()->FindObject(this))
499  gROOT->GetListOfProofs()->Add(this);
500 
501  // Still needed by the packetizers: needs to be changed
502  gProof = this;
503 }
504 
505 ////////////////////////////////////////////////////////////////////////////////
506 /// Protected constructor to be used by classes deriving from TProof
507 /// (they have to call Init themselves and override StartSlaves
508 /// appropriately).
509 ///
510 /// This constructor simply closes any previous gProof and sets gProof
511 /// to this instance.
512 
513 TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
514 {
515  // Default initializations
516  InitMembers();
517 
518  if (!gROOT->GetListOfProofs()->FindObject(this))
519  gROOT->GetListOfProofs()->Add(this);
520 
521  gProof = this;
522 }
523 
524 ////////////////////////////////////////////////////////////////////////////////
525 /// Default initializations
526 
528 {
529  fValid = kFALSE;
530  fTty = kFALSE;
531  fRecvMessages = 0;
532  fSlaveInfo = 0;
536  fLastPollWorkers_s = -1;
537  fActiveSlaves = 0;
538  fInactiveSlaves = 0;
539  fUniqueSlaves = 0;
540  fAllUniqueSlaves = 0;
541  fNonUniqueMasters = 0;
542  fActiveMonitor = 0;
543  fUniqueMonitor = 0;
544  fAllUniqueMonitor = 0;
545  fCurrentMonitor = 0;
546  fBytesRead = 0;
547  fRealTime = 0;
548  fCpuTime = 0;
549  fIntHandler = 0;
550  fProgressDialog = 0;
553  fPlayer = 0;
554  fFeedback = 0;
555  fChains = 0;
556  fDSet = 0;
557  fNotIdle = 0;
558  fSync = kTRUE;
560  fIsWaiting = kFALSE;
561  fRedirLog = kFALSE;
562  fLogFileW = 0;
563  fLogFileR = 0;
566  fMacroLog.SetName("ProofLogMacro");
567 
568  fWaitingSlaves = 0;
569  fQueries = 0;
570  fOtherQueries = 0;
571  fDrawQueries = 0;
572  fMaxDrawQueries = 1;
573  fSeqNum = 0;
574 
575  fSessionID = -1;
576  fEndMaster = kFALSE;
577 
578  fPackMgr = 0;
580 
581  fInputData = 0;
582 
583  fPrintProgress = 0;
584 
585  fLoadedMacros = 0;
586 
587  fProtocol = -1;
588  fSlaves = 0;
590  fBadSlaves = 0;
591  fAllMonitor = 0;
592  fDataReady = kFALSE;
593  fBytesReady = 0;
594  fTotalBytes = 0;
595  fAvailablePackages = 0;
596  fEnabledPackages = 0;
597  fRunningDSets = 0;
598 
599  fCollectTimeout = -1;
600 
601  fManager = 0;
602  fQueryMode = kSync;
604 
607  fMergers = 0;
608  fMergersCount = -1;
610  fWorkersToMerge = 0;
612 
613  fPerfTree = "";
614 
615  fWrksOutputReady = 0;
616 
617  fSelector = 0;
618 
619  fPrepTime = 0.;
620 
621  // Check if the user defined a list of environment variables to send over:
622  // include them into the dedicated list
623  if (gSystem->Getenv("PROOF_ENVVARS")) {
624  TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
625  Int_t from = 0;
626  while (envs.Tokenize(env, from, ",")) {
627  if (!env.IsNull()) {
628  if (!gSystem->Getenv(env)) {
629  Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
630  } else {
631  if (!envsfound.IsNull()) envsfound += ",";
632  envsfound += env;
633  TProof::DelEnvVar(env);
634  TProof::AddEnvVar(env, gSystem->Getenv(env));
635  }
636  }
637  }
638  if (envsfound.IsNull()) {
639  Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
640  } else {
641  Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
642  }
643  }
644 
645  // Done
646  return;
647 }
648 
649 ////////////////////////////////////////////////////////////////////////////////
650 /// Clean up PROOF environment.
651 
653 {
654  if (fChains) {
655  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
656  // remove "chain" from list
657  chain->SetProof(0);
658  RemoveChain(chain);
659  }
660  }
661 
662  // remove links to packages enabled on the client
663  if (TestBit(TProof::kIsClient)) {
664  // iterate over all packages
665  TList *epl = fPackMgr->GetListOfEnabled();
666  if (epl) {
667  TIter nxp(epl);
668  while (TObjString *pck = (TObjString *)(nxp())) {
669  FileStat_t stat;
670  if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
671  // check if symlink, if so unlink
672  // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
673  // existing file or to a directory, but if fIsLink is true the symlink exists
674  if (stat.fIsLink)
675  gSystem->Unlink(pck->String());
676  }
677  }
678  epl->Delete();
679  delete epl;
680  }
681  }
682 
683  Close();
710  if (fWrksOutputReady) {
712  delete fWrksOutputReady;
713  }
714  if (fQueries) {
715  fQueries->Delete();
716  delete fQueries;
717  }
718 
719  // remove file with redirected logs
720  if (TestBit(TProof::kIsClient)) {
721  if (fLogFileR)
722  fclose(fLogFileR);
723  if (fLogFileW)
724  fclose(fLogFileW);
725  if (fLogFileName.Length() > 0)
727  }
728 
729  // Remove for the global list
730  gROOT->GetListOfProofs()->Remove(this);
731  // ... and from the manager list
732  if (fManager && fManager->IsValid())
733  fManager->DiscardSession(this);
734 
735  if (gProof && gProof == this) {
736  // Set previous as default
737  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
738  while ((gProof = (TProof *)pvp())) {
740  break;
741  }
742  }
743 
744  // For those interested in our destruction ...
745  Emit("~TProof()");
746  Emit("CloseWindow()");
747 }
748 
749 ////////////////////////////////////////////////////////////////////////////////
750 /// Start the PROOF environment. Starting PROOF involves either connecting
751 /// to a master server, which in turn will start a set of slave servers, or
752 /// directly starting as master server (if master = ""). For a description
753 /// of the arguments see the TProof ctor. Returns the number of started
754 /// master or slave servers, returns 0 in case of error, in which case
755 /// fValid remains false.
756 
757 Int_t TProof::Init(const char *, const char *conffile,
758  const char *confdir, Int_t loglevel, const char *alias)
759 {
761 
762  fValid = kFALSE;
763 
764  // Connected to terminal?
765  fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
766 
767  // If in attach mode, options is filled with additional info
768  Bool_t attach = kFALSE;
769  if (strlen(fUrl.GetOptions()) > 0) {
770  attach = kTRUE;
771  // A flag from the GUI
772  TString opts = fUrl.GetOptions();
773  if (opts.Contains("GUI")) {
775  opts.Remove(opts.Index("GUI"));
776  fUrl.SetOptions(opts);
777  }
778  }
779 
780  if (TestBit(TProof::kIsMaster)) {
781  // Fill default conf file and conf dir
782  if (!conffile || !conffile[0])
784  if (!confdir || !confdir[0])
786  // The group; the client receives it in the kPROOF_SESSIONTAG message
788  } else {
789  fConfDir = confdir;
790  fConfFile = conffile;
791  }
792 
793  // Analysise the conffile field
794  if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
796 
798  fLogLevel = loglevel;
801  fImage = fMasterServ ? "" : "<local>";
802  fIntHandler = 0;
803  fStatus = 0;
804  fRecvMessages = new TList;
806  fSlaveInfo = 0;
807  fChains = new TList;
808  fAvailablePackages = 0;
809  fEnabledPackages = 0;
810  fRunningDSets = 0;
812  fInputData = 0;
814  fPrintProgress = 0;
815 
816  // Timeout for some collect actions
817  fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
818 
819  // Should the workers be started dynamically; default: no
820  fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
821 
822  // Default entry point for the data pool is the master
824  fDataPoolUrl.Form("root://%s", fMaster.Data());
825  else
826  fDataPoolUrl = "";
827 
828  fProgressDialog = 0;
830 
831  // Default alias is the master name
832  TString al = (alias) ? alias : fMaster.Data();
833  SetAlias(al);
834 
835  // Client logging of messages from the master and slaves
836  fRedirLog = kFALSE;
837  if (TestBit(TProof::kIsClient)) {
838  fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
839  if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
840  Error("Init", "could not create temporary logfile");
841  if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
842  Error("Init", "could not open temp logfile for reading");
843  }
845 
846  // Status of cluster
847  fNotIdle = 0;
848  // Query type
849  fSync = (attach) ? kFALSE : kTRUE;
850  // Not enqueued
851  fIsWaiting = kFALSE;
852 
853  // Counters
854  fBytesRead = 0;
855  fRealTime = 0;
856  fCpuTime = 0;
857 
858  // List of queries
859  fQueries = 0;
860  fOtherQueries = 0;
861  fDrawQueries = 0;
862  fMaxDrawQueries = 1;
863  fSeqNum = 0;
864 
865  // Remote ID of the session
866  fSessionID = -1;
867 
868  // Part of active query
869  fWaitingSlaves = 0;
870 
871  // Make remote PROOF player
872  fPlayer = 0;
873  MakePlayer();
874 
875  fFeedback = new TList;
876  fFeedback->SetOwner();
877  fFeedback->SetName("FeedbackList");
879 
880  // sort slaves by descending performance index
882  fActiveSlaves = new TList;
883  fInactiveSlaves = new TList;
884  fUniqueSlaves = new TList;
885  fAllUniqueSlaves = new TList;
886  fNonUniqueMasters = new TList;
887  fBadSlaves = new TList;
888  fAllMonitor = new TMonitor;
889  fActiveMonitor = new TMonitor;
890  fUniqueMonitor = new TMonitor;
892  fCurrentMonitor = 0;
893 
896 
897  fLoadedMacros = 0;
898  fPackMgr = 0;
899 
900  // Enable optimized sending of streamer infos to use embedded backward/forward
901  // compatibility support between different ROOT versions and different versions of
902  // users classes
903  Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
904  if (enableSchemaEvolution) {
906  } else {
907  Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
908  }
909 
910  if (IsMaster()) {
911  // to make UploadPackage() method work on the master as well.
913  } else {
914 
915  TString sandbox;
916  if (GetSandbox(sandbox, kTRUE) != 0) {
917  Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
918  return 0;
919  }
920 
921  // Package Dir
922  TString packdir = gEnv->GetValue("Proof.PackageDir", "");
923  if (packdir.IsNull())
924  packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
925  if (AssertPath(packdir, kTRUE) != 0) {
926  Error("Init", "failure asserting directory %s", packdir.Data());
927  return 0;
928  }
929  fPackMgr = new TPackMgr(packdir);
930  if (gDebug > 0)
931  Info("Init", "package directory set to %s", packdir.Data());
932  }
933 
934  if (!IsMaster()) {
935  // List of directories where to look for global packages
936  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
937  TProofServ::ResolveKeywords(globpack);
938  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
939  if (gDebug > 0)
940  Info("Init", " %d global package directories registered", nglb);
941  }
942 
943  // Master may want dynamic startup
944  if (fDynamicStartup) {
945  if (!IsMaster()) {
946  // If on client - start the master
947  if (!StartSlaves(attach))
948  return 0;
949  }
950  } else {
951 
952  // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
953  // result retrieving, ...)
954  Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
955  if (!IsMaster() || !masterOnly) {
956  // Start slaves (the old, static, per-session way)
957  if (!StartSlaves(attach))
958  return 0;
959  // Client: Is Master in dynamic startup mode?
960  if (!IsMaster()) {
961  Int_t dyn = 0;
962  GetRC("Proof.DynamicStartup", dyn);
963  if (dyn != 0) fDynamicStartup = kTRUE;
964  }
965  }
966  }
967  // we are now properly initialized
968  fValid = kTRUE;
969 
970  // De-activate monitor (will be activated in Collect)
972 
973  // By default go into parallel mode
974  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
975  TNamed *n = 0;
976  if (TProof::GetEnvVars() &&
977  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
978  TString s(n->GetTitle());
979  if (s.IsDigit()) nwrk = s.Atoi();
980  }
981  GoParallel(nwrk, attach);
982 
983  // Send relevant initial state to slaves
984  if (!attach)
986  else if (!IsIdle())
987  // redirect log
988  fRedirLog = kTRUE;
989 
990  // Done at this point, the alias will be communicated to the coordinator, if any
992  SetAlias(al);
993 
994  SetActive(kFALSE);
995 
996  if (IsValid()) {
997 
998  // Activate input handler
1000 
1002  gROOT->GetListOfSockets()->Add(this);
1003  }
1004 
1005  AskParallel();
1006 
1007  return fActiveSlaves->GetSize();
1008 }
1009 
1010 ////////////////////////////////////////////////////////////////////////////////
1011 /// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1012 /// Use the existing setting or the default if nothing is found.
1013 /// If 'assert' is kTRUE, make also sure that the path exists.
1014 /// Return 0 on success, -1 on failure
1015 
1016 Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1017 {
1018  // Get it from 'rc', if defined
1019  if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1020  // Or use the default 'rc'
1021  if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1022  // If nothing found , use the default
1023  if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1024  // Expand special settings
1025  if (sb == ".") {
1026  sb = gSystem->pwd();
1027  } else if (sb == "..") {
1028  sb = gSystem->GetDirName(gSystem->pwd());
1029  }
1030  gSystem->ExpandPathName(sb);
1031 
1032  // Assert the path, if required
1033  if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1034  // Done
1035  return 0;
1036 }
1037 
1038 ////////////////////////////////////////////////////////////////////////////////
1039 /// The config file field may contain special instructions which need to be
1040 /// parsed at the beginning, e.g. for debug runs with valgrind.
1041 /// Several options can be given separated by a ','
1042 
1043 void TProof::ParseConfigField(const char *config)
1044 {
1045  TString sconf(config), opt;
1046  Ssiz_t from = 0;
1047  Bool_t cpuPin = kFALSE;
1048 
1049  // Analysise the field
1050  const char *cq = (IsLite()) ? "\"" : "";
1051  while (sconf.Tokenize(opt, from, ",")) {
1052  if (opt.IsNull()) continue;
1053 
1054  if (opt.BeginsWith("valgrind")) {
1055  // Any existing valgrind setting? User can give full settings, which we fully respect,
1056  // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1057  // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1058  // will add option "--time-stamp --leak-check=full" to our default options
1059  TString mst, top, sub, wrk, all;
1060  TList *envs = fgProofEnvList;
1061  TNamed *n = 0;
1062  if (envs) {
1063  if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1064  all = n->GetTitle();
1065  if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1066  mst = n->GetTitle();
1067  if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1068  top = n->GetTitle();
1069  if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1070  sub = n->GetTitle();
1071  if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1072  wrk = n->GetTitle();
1073  }
1074  if (all != "" && mst == "") mst = all;
1075  if (all != "" && top == "") top = all;
1076  if (all != "" && sub == "") sub = all;
1077  if (all != "" && wrk == "") wrk = all;
1078  if (all != "" && all.BeginsWith("valgrind_opts:")) {
1079  // The field is used to add an option Reset the setting
1080  Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1081  " must be set again for next run , if any");
1082  TProof::DelEnvVar("PROOF_WRAPPERCMD");
1083  }
1084  TString var, cmd;
1085  cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1086  TString mstlab("NO"), wrklab("NO");
1087  Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1088  !opt.Contains("topmaster") && !opt.Contains("submaster")))
1089  ? kTRUE : kFALSE;
1090  if (doMaster) {
1091  if (!IsLite()) {
1092  // Check if we have to add a var
1093  if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1094  mst.ReplaceAll("valgrind_opts:","");
1095  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1096  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1097  mstlab = "YES";
1098  } else if (mst != "") {
1099  mstlab = "YES";
1100  }
1101  } else {
1102  if (opt.Contains("master")) {
1103  Warning("ParseConfigField",
1104  "master valgrinding does not make sense for PROOF-Lite: ignoring");
1105  opt.ReplaceAll("master", "");
1106  if (!opt.Contains("workers")) return;
1107  }
1108  if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1109  }
1110  }
1111  if (opt.Contains("topmaster")) {
1112  // Check if we have to add a var
1113  if (top == "" || top.BeginsWith("valgrind_opts:")) {
1114  top.ReplaceAll("valgrind_opts:","");
1115  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1116  TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1117  mstlab = "YES";
1118  } else if (top != "") {
1119  mstlab = "YES";
1120  }
1121  }
1122  if (opt.Contains("submaster")) {
1123  // Check if we have to add a var
1124  if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1125  sub.ReplaceAll("valgrind_opts:","");
1126  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1127  TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1128  mstlab = "YES";
1129  } else if (sub != "") {
1130  mstlab = "YES";
1131  }
1132  }
1133  if (opt.Contains("=workers") || opt.Contains("+workers")) {
1134  // Check if we have to add a var
1135  if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1136  wrk.ReplaceAll("valgrind_opts:","");
1137  var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1138  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1139  TString nwrks("2");
1140  Int_t inw = opt.Index('#');
1141  if (inw != kNPOS) {
1142  nwrks = opt(inw+1, opt.Length());
1143  if (!nwrks.IsDigit()) nwrks = "2";
1144  }
1145  // Set the relevant variables
1146  if (!IsLite()) {
1147  TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1148  } else {
1149  gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1150  }
1151  wrklab = nwrks;
1152  // Register the additional worker log in the session file
1153  // (for the master this is done automatically)
1154  TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1155  } else if (wrk != "") {
1156  wrklab = "ALL";
1157  }
1158  }
1159  // Increase the relevant timeouts
1160  if (!IsLite()) {
1161  TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1162  gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1163  } else {
1164  gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1165  }
1166  // Warn for slowness
1167  Printf(" ");
1168  if (!IsLite()) {
1169  Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1170  } else {
1171  Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1172  }
1173  Printf(" ---> Please be patient: startup may be VERY slow ...");
1174  Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1175  Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1176  Printf(" ");
1177 
1178  } else if (opt.BeginsWith("igprof-pp")) {
1179 
1180  // IgProf profiling on master and worker. PROOF does not set the
1181  // environment for you: proper environment variables (like PATH and
1182  // LD_LIBRARY_PATH) should be set externally
1183 
1184  Printf("*** Requested IgProf performance profiling ***");
1185  TString addLogExt = "__igprof.pp__.log";
1186  TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1187  TString tmp;
1188 
1189  if (IsLite()) {
1190  addLogFmt.Append("\"");
1191  addLogFmt.Prepend("\"");
1192  }
1193 
1194  tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1195  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1196 
1197  tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1198  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1199 
1200  TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1201 
1202  } else if (opt.BeginsWith("cpupin=")) {
1203  // Enable CPU pinning. Takes as argument the list of processor IDs
1204  // that will be used in order. Processor IDs are numbered from 0,
1205  // use likwid to see how they are organized. A possible parameter
1206  // format would be:
1207  //
1208  // cpupin=3+4+0+9+10+22+7
1209  //
1210  // Only the specified processor IDs will be used in a round-robin
1211  // fashion, dealing with the fact that you can request more workers
1212  // than the number of processor IDs you have specified.
1213  //
1214  // To use all available processors in their order:
1215  //
1216  // cpupin=*
1217 
1218  opt.Remove(0, 7);
1219 
1220  // Remove any char which is neither a number nor a plus '+'
1221  for (Ssiz_t i=0; i<opt.Length(); i++) {
1222  Char_t c = opt[i];
1223  if ((c != '+') && ((c < '0') || (c > '9')))
1224  opt[i] = '_';
1225  }
1226  opt.ReplaceAll("_", "");
1227  TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1228  cpuPin = kTRUE;
1229  } else if (opt.BeginsWith("workers=")) {
1230 
1231  // Request for a given number of workers (within the max) or worker
1232  // startup combination:
1233  // workers=5 start max 5 workers (or less, if less are assigned)
1234  // workers=2x start max 2 workers per node (or less, if less are assigned)
1235  opt.ReplaceAll("workers=","");
1236  TProof::AddEnvVar("PROOF_NWORKERS", opt);
1237  }
1238  }
1239 
1240  // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1241  #ifdef R__LINUX
1242  if (IsLite() && cpuPin) {
1243  Printf("*** Requested CPU pinning ***");
1244  const TList *ev = GetEnvVars();
1245  const char *pinCmd = "taskset -c <cpupin>";
1246  TString val;
1247  TNamed *p;
1248  if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1249  val = p->GetTitle();
1250  val.Insert(val.Length()-1, " ");
1251  val.Insert(val.Length()-1, pinCmd);
1252  }
1253  else {
1254  val.Form("\"%s\"", pinCmd);
1255  }
1256  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1257  }
1258  #endif
1259 }
1260 
1261 ////////////////////////////////////////////////////////////////////////////////
1262 /// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1263 /// that the path is writable
1264 
1265 Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1266 {
1267  if (!inpath || strlen(inpath) <= 0) {
1268  Error("AssertPath", "undefined input path");
1269  return -1;
1270  }
1271 
1272  TString path(inpath);
1273  gSystem->ExpandPathName(path);
1274 
1275  if (gSystem->AccessPathName(path, kFileExists)) {
1276  if (gSystem->mkdir(path, kTRUE) != 0) {
1277  Error("AssertPath", "could not create path %s", path.Data());
1278  return -1;
1279  }
1280  }
1281  // It must be writable
1282  if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1283  if (gSystem->Chmod(path, 0666) != 0) {
1284  Error("AssertPath", "could not make path %s writable", path.Data());
1285  return -1;
1286  }
1287  }
1288 
1289  // Done
1290  return 0;
1291 }
1292 
1293 ////////////////////////////////////////////////////////////////////////////////
1294 /// Set manager and schedule its destruction after this for clean
1295 /// operations.
1296 
1298 {
1299  fManager = mgr;
1300 
1301  if (mgr) {
1303  gROOT->GetListOfSockets()->Remove(mgr);
1304  gROOT->GetListOfSockets()->Add(mgr);
1305  }
1306 }
1307 
1308 ////////////////////////////////////////////////////////////////////////////////
1309 /// Works on the master node only.
1310 /// It starts workers on the machines in workerList and sets the paths,
1311 /// packages and macros as on the master.
1312 /// It is a subbstitute for StartSlaves(...)
1313 /// The code is mostly the master part of StartSlaves,
1314 /// with the parallel startup removed.
1315 
1317 {
1318  if (!IsMaster()) {
1319  Error("AddWorkers", "AddWorkers can only be called on the master!");
1320  return -1;
1321  }
1322 
1323  if (!workerList || !(workerList->GetSize())) {
1324  Error("AddWorkers", "empty list of workers!");
1325  return -2;
1326  }
1327 
1328  // Code taken from master part of StartSlaves with the parllel part removed
1329 
1330  fImage = gProofServ->GetImage();
1331  if (fImage.IsNull())
1333 
1334  // Get all workers
1335  UInt_t nSlaves = workerList->GetSize();
1336  UInt_t nSlavesDone = 0;
1337  Int_t ord = 0;
1338 
1339  // Loop over all new workers and start them (if we had already workers it means we are
1340  // increasing parallelism or that is not the first time we are called)
1341  Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1342 
1343  // A list of TSlave objects for workers that are being added
1344  TList *addedWorkers = new TList();
1345  if (!addedWorkers) {
1346  // This is needed to silence Coverity ...
1347  Error("AddWorkers", "cannot create new list for the workers to be added");
1348  return -2;
1349  }
1350  addedWorkers->SetOwner(kFALSE);
1351  TListIter next(workerList);
1352  TObject *to;
1353  TProofNodeInfo *worker;
1354  TSlaveInfo *dummysi = new TSlaveInfo();
1355  while ((to = next())) {
1356  // Get the next worker from the list
1357  worker = (TProofNodeInfo *)to;
1358 
1359  // Read back worker node info
1360  const Char_t *image = worker->GetImage().Data();
1361  const Char_t *workdir = worker->GetWorkDir().Data();
1362  Int_t perfidx = worker->GetPerfIndex();
1363  Int_t sport = worker->GetPort();
1364  if (sport == -1)
1365  sport = fUrl.GetPort();
1366 
1367  // Create worker server
1368  TString fullord;
1369  if (worker->GetOrdinal().Length() > 0) {
1370  fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1371  } else {
1372  fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1373  }
1374 
1375  // Remove worker from the list of workers terminated gracefully
1376  dummysi->SetOrdinal(fullord);
1377  TSlaveInfo *rmsi = (TSlaveInfo *)fTerminatedSlaveInfos->Remove(dummysi);
1378  SafeDelete(rmsi);
1379 
1380  // Create worker server
1381  TString wn(worker->GetNodeName());
1382  if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1383  TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1384  // Add group info in the password firdl, if any
1385  if (strlen(gProofServ->GetGroup()) > 0) {
1386  // Set also the user, otherwise the password is not exported
1387  if (strlen(u.GetUser()) <= 0)
1388  u.SetUser(gProofServ->GetUser());
1390  }
1391  TSlave *slave = 0;
1392  if (worker->IsWorker()) {
1393  slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1394  } else {
1395  slave = CreateSubmaster(u.GetUrl(), fullord,
1396  image, worker->GetMsd(), worker->GetNWrks());
1397  }
1398 
1399  // Add to global list (we will add to the monitor list after
1400  // finalizing the server startup)
1401  Bool_t slaveOk = kTRUE;
1402  fSlaves->Add(slave);
1403  if (slave->IsValid()) {
1404  addedWorkers->Add(slave);
1405  } else {
1406  slaveOk = kFALSE;
1407  fBadSlaves->Add(slave);
1408  Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1409  }
1410 
1411  PDB(kGlobal,3)
1412  Info("AddWorkers", "worker on host %s created"
1413  " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1414 
1415  // Notify opening of connection
1416  nSlavesDone++;
1418  m << TString("Opening connections to workers") << nSlaves
1419  << nSlavesDone << slaveOk;
1420  gProofServ->GetSocket()->Send(m);
1421 
1422  ord++;
1423  } //end of the worker loop
1424  SafeDelete(dummysi);
1425 
1426  // Cleanup
1427  SafeDelete(workerList);
1428 
1429  nSlavesDone = 0;
1430 
1431  // Here we finalize the server startup: in this way the bulk
1432  // of remote operations are almost parallelized
1433  TIter nxsl(addedWorkers);
1434  TSlave *sl = 0;
1435  while ((sl = (TSlave *) nxsl())) {
1436 
1437  // Finalize setup of the server
1438  if (sl->IsValid())
1439  sl->SetupServ(TSlave::kSlave, 0);
1440 
1441  // Monitor good slaves
1442  Bool_t slaveOk = kTRUE;
1443  if (sl->IsValid()) {
1444  fAllMonitor->Add(sl->GetSocket());
1445  PDB(kGlobal,3)
1446  Info("AddWorkers", "worker on host %s finalized"
1447  " and added to list", sl->GetOrdinal());
1448  } else {
1449  slaveOk = kFALSE;
1450  fBadSlaves->Add(sl);
1451  }
1452 
1453  // Notify end of startup operations
1454  nSlavesDone++;
1456  m << TString("Setting up worker servers") << nSlaves
1457  << nSlavesDone << slaveOk;
1458  gProofServ->GetSocket()->Send(m);
1459  }
1460 
1461  // Now set new state on the added workers (on all workers for simplicity)
1462  // use fEnabledPackages, fLoadedMacros,
1463  // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1464  // no need to load packages that are only loaded and not enabled (dyn mode)
1465  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1466  TNamed *n = 0;
1467  if (TProof::GetEnvVars() &&
1468  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1469  TString s(n->GetTitle());
1470  if (s.IsDigit()) nwrk = s.Atoi();
1471  }
1472 
1473  if (fDynamicStartup && goMoreParallel) {
1474 
1475  PDB(kGlobal, 3)
1476  Info("AddWorkers", "will invoke GoMoreParallel()");
1477  Int_t nw = GoMoreParallel(nwrk);
1478  PDB(kGlobal, 3)
1479  Info("AddWorkers", "GoMoreParallel()=%d", nw);
1480 
1481  }
1482  else {
1483  // Not in Dynamic Workers mode
1484  PDB(kGlobal, 3)
1485  Info("AddWorkers", "will invoke GoParallel()");
1486  GoParallel(nwrk, kFALSE, 0);
1487  }
1488 
1489  // Set worker processing environment
1490  SetupWorkersEnv(addedWorkers, goMoreParallel);
1491 
1492  // Update list of current workers
1493  PDB(kGlobal, 3)
1494  Info("AddWorkers", "will invoke SaveWorkerInfo()");
1495  SaveWorkerInfo();
1496 
1497  // Inform the client that the number of workers has changed
1498  if (fDynamicStartup && gProofServ) {
1499  PDB(kGlobal, 3)
1500  Info("AddWorkers", "will invoke SendParallel()");
1502 
1503  if (goMoreParallel && fPlayer) {
1504  // In case we are adding workers dynamically to an existing process, we
1505  // should invoke a special player's Process() to set only added workers
1506  // to the proper state
1507  PDB(kGlobal, 3)
1508  Info("AddWorkers", "will send the PROCESS message to selected workers");
1509  fPlayer->JoinProcess(addedWorkers);
1510  // Update merger counters (new workers are not yet active)
1511  fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1512  }
1513  }
1514 
1515  // Cleanup
1516  delete addedWorkers;
1517 
1518  return 0;
1519 }
1520 
1521 ////////////////////////////////////////////////////////////////////////////////
1522 /// Set up packages, loaded macros, include and lib paths ...
1523 
1524 void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1525 {
1526  TList *server_packs = gProofServ ? gProofServ->GetEnabledPackages() : nullptr;
1527  // Packages
1528  TList *packs = server_packs ? server_packs : GetEnabledPackages();
1529  if (packs && packs->GetSize() > 0) {
1530  TIter nxp(packs);
1531  TPair *pck = 0;
1532  while ((pck = (TPair *) nxp())) {
1533  // Upload and Enable methods are intelligent and avoid
1534  // re-uploading or re-enabling of a package to a node that has it.
1535  if (fDynamicStartup && increasingWorkers) {
1536  // Upload only on added workers
1537  PDB(kGlobal, 3)
1538  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1539  if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1540  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1541  } else {
1542  PDB(kGlobal, 3)
1543  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1544  if (UploadPackage(pck->GetName()) >= 0)
1545  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1546  }
1547  }
1548  }
1549 
1550  if (server_packs) {
1551  server_packs->Delete();
1552  delete server_packs;
1553  }
1554 
1555  // Loaded macros
1556  if (fLoadedMacros) {
1557  TIter nxp(fLoadedMacros);
1558  TObjString *os = 0;
1559  while ((os = (TObjString *) nxp())) {
1560  PDB(kGlobal, 3) {
1561  Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1562  Printf("Loading a macro : %s", os->GetName());
1563  }
1564  Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1565  }
1566  }
1567 
1568  // Dynamic path
1569  TString dyn = gSystem->GetDynamicPath();
1570  dyn.ReplaceAll(":", " ");
1571  dyn.ReplaceAll("\"", " ");
1572  PDB(kGlobal, 3)
1573  Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1574  AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1575 
1576  // Include path
1577  TString inc = gSystem->GetIncludePath();
1578  inc.ReplaceAll("-I", " ");
1579  inc.ReplaceAll("\"", " ");
1580  PDB(kGlobal, 3)
1581  Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1582  AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1583 
1584  // Done
1585  return;
1586 }
1587 
1588 ////////////////////////////////////////////////////////////////////////////////
1589 /// Used for shuting down the workres after a query is finished.
1590 /// Sends each of the workers from the workerList, a kPROOF_STOP message.
1591 /// If the workerList == 0, shutdown all the workers.
1592 
1594 {
1595  if (!IsMaster()) {
1596  Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1597  return -1;
1598  }
1599 
1600  fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1601 
1602  if (!workerList) {
1603  // shutdown all the workers
1604  TIter nxsl(fSlaves);
1605  TSlave *sl = 0;
1606  while ((sl = (TSlave *) nxsl())) {
1607  // Shut down the worker assumig that it is not processing
1608  TerminateWorker(sl);
1609  }
1610 
1611  } else {
1612  if (!(workerList->GetSize())) {
1613  Error("RemoveWorkers", "The list of workers should not be empty!");
1614  return -2;
1615  }
1616 
1617  // Loop over all the workers and stop them
1618  TListIter next(workerList);
1619  TObject *to;
1620  TProofNodeInfo *worker;
1621  while ((to = next())) {
1622  TSlave *sl = 0;
1623  if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1624  // Get the next worker from the list
1625  worker = (TProofNodeInfo *)to;
1626  TIter nxsl(fSlaves);
1627  while ((sl = (TSlave *) nxsl())) {
1628  // Shut down the worker assumig that it is not processing
1629  if (sl->GetName() == worker->GetNodeName())
1630  break;
1631  }
1632  } else if (to->InheritsFrom(TSlave::Class())) {
1633  sl = (TSlave *) to;
1634  } else {
1635  Warning("RemoveWorkers","unknown object type: %s - it should be"
1636  " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1637  }
1638  // Shut down the worker assumig that it is not processing
1639  if (sl) {
1640  if (gDebug > 0)
1641  Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1642  TerminateWorker(sl);
1643  }
1644  }
1645  }
1646 
1647  // Update also the master counter
1648  if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1649 
1650  return 0;
1651 }
1652 
1653 ////////////////////////////////////////////////////////////////////////////////
1654 /// Start up PROOF slaves.
1655 
1657 {
1658  // If this is a master server, find the config file and start slave
1659  // servers as specified in the config file
1660  if (TestBit(TProof::kIsMaster)) {
1661 
1662  Int_t pc = 0;
1663  TList *workerList = new TList;
1664  // Get list of workers
1665  if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1666  TString emsg("no resource currently available for this session: please retry later");
1667  if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1668  gProofServ->SendAsynMessage(emsg.Data());
1669  return kFALSE;
1670  }
1671  // Setup the workers
1672  if (AddWorkers(workerList) < 0)
1673  return kFALSE;
1674 
1675  } else {
1676 
1677  // create master server
1678  Printf("Starting master: opening connection ...");
1679  TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1680 
1681  if (slave->IsValid()) {
1682 
1683  // Notify
1684  fprintf(stderr,"Starting master:"
1685  " connection open: setting up server ... \r");
1686  StartupMessage("Connection to master opened", kTRUE, 1, 1);
1687 
1688  if (!attach) {
1689 
1690  // Set worker interrupt handler
1691  slave->SetInterruptHandler(kTRUE);
1692 
1693  // Finalize setup of the server
1695 
1696  if (slave->IsValid()) {
1697 
1698  // Notify
1699  Printf("Starting master: OK ");
1700  StartupMessage("Master started", kTRUE, 1, 1);
1701 
1702  // check protocol compatibility
1703  // protocol 1 is not supported anymore
1704  if (fProtocol == 1) {
1705  Error("StartSlaves",
1706  "client and remote protocols not compatible (%d and %d)",
1708  slave->Close("S");
1709  delete slave;
1710  return kFALSE;
1711  }
1712 
1713  fSlaves->Add(slave);
1714  fAllMonitor->Add(slave->GetSocket());
1715 
1716  // Unset worker interrupt handler
1717  slave->SetInterruptHandler(kFALSE);
1718 
1719  // Set interrupt PROOF handler from now on
1720  fIntHandler = new TProofInterruptHandler(this);
1721 
1722  // Give-up after 5 minutes
1723  Int_t rc = Collect(slave, 300);
1724  Int_t slStatus = slave->GetStatus();
1725  if (slStatus == -99 || slStatus == -98 || rc == 0) {
1726  fSlaves->Remove(slave);
1727  fAllMonitor->Remove(slave->GetSocket());
1728  if (slStatus == -99)
1729  Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1730  else if (slStatus == -98)
1731  Error("StartSlaves", "could not setup output redirection on master");
1732  else
1733  Error("StartSlaves", "setting up master");
1734  slave->Close("S");
1735  delete slave;
1736  return 0;
1737  }
1738 
1739  if (!slave->IsValid()) {
1740  fSlaves->Remove(slave);
1741  fAllMonitor->Remove(slave->GetSocket());
1742  slave->Close("S");
1743  delete slave;
1744  Error("StartSlaves",
1745  "failed to setup connection with PROOF master server");
1746  return kFALSE;
1747  }
1748 
1749  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1750  if ((fProgressDialog =
1751  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1752  if (fProgressDialog->LoadPlugin() == -1)
1753  fProgressDialog = 0;
1754  }
1755  } else {
1756  // Notify
1757  Printf("Starting master: failure");
1758  }
1759  } else {
1760 
1761  // Notify
1762  Printf("Starting master: OK ");
1763  StartupMessage("Master attached", kTRUE, 1, 1);
1764 
1765  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1766  if ((fProgressDialog =
1767  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1768  if (fProgressDialog->LoadPlugin() == -1)
1769  fProgressDialog = 0;
1770  }
1771 
1772  fSlaves->Add(slave);
1773  fIntHandler = new TProofInterruptHandler(this);
1774  }
1775 
1776  } else {
1777  delete slave;
1778  // Notify only if verbosity is on: most likely the failure has already been notified
1779  if (gDebug > 0)
1780  Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1781  return kFALSE;
1782  }
1783  }
1784 
1785  return kTRUE;
1786 }
1787 
1788 ////////////////////////////////////////////////////////////////////////////////
1789 /// Close all open slave servers.
1790 /// Client can decide to shutdown the remote session by passing option is 'S'
1791 /// or 's'. Default for clients is detach, if supported. Masters always
1792 /// shutdown the remote counterpart.
1793 
1795 {
1796  { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1797 
1798  fValid = kFALSE;
1799  if (fSlaves) {
1800  if (fIntHandler)
1801  fIntHandler->Remove();
1802 
1803  TIter nxs(fSlaves);
1804  TSlave *sl = 0;
1805  while ((sl = (TSlave *)nxs()))
1806  sl->Close(opt);
1807 
1808  fActiveSlaves->Clear("nodelete");
1809  fUniqueSlaves->Clear("nodelete");
1810  fAllUniqueSlaves->Clear("nodelete");
1811  fNonUniqueMasters->Clear("nodelete");
1812  fBadSlaves->Clear("nodelete");
1813  fInactiveSlaves->Clear("nodelete");
1814  fSlaves->Delete();
1815  }
1816  }
1817 
1819  gROOT->GetListOfSockets()->Remove(this);
1820 
1821  if (fChains) {
1822  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1823  // remove "chain" from list
1824  chain->SetProof(0);
1825  RemoveChain(chain);
1826  }
1827  }
1828 
1829  if (IsProofd()) {
1830 
1831  gROOT->GetListOfProofs()->Remove(this);
1832  if (gProof && gProof == this) {
1833  // Set previous proofd-related as default
1834  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1835  while ((gProof = (TProof *)pvp())) {
1836  if (gProof->IsProofd())
1837  break;
1838  }
1839  }
1840  }
1841  }
1842 }
1843 
1844 ////////////////////////////////////////////////////////////////////////////////
1845 /// Create a new TSlave of type TSlave::kSlave.
1846 /// Note: creation of TSlave is private with TProof as a friend.
1847 /// Derived classes must use this function to create slaves.
1848 
1849 TSlave *TProof::CreateSlave(const char *url, const char *ord,
1850  Int_t perf, const char *image, const char *workdir)
1851 {
1852  TSlave* sl = TSlave::Create(url, ord, perf, image,
1853  this, TSlave::kSlave, workdir, 0);
1854 
1855  if (sl->IsValid()) {
1856  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1857  // must set fParallel to 1 for slaves since they do not
1858  // report their fParallel with a LOG_DONE message
1859  sl->fParallel = 1;
1860  }
1861 
1862  return sl;
1863 }
1864 
1865 
1866 ////////////////////////////////////////////////////////////////////////////////
1867 /// Create a new TSlave of type TSlave::kMaster.
1868 /// Note: creation of TSlave is private with TProof as a friend.
1869 /// Derived classes must use this function to create slaves.
1870 
1871 TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1872  const char *image, const char *msd, Int_t nwk)
1873 {
1874  TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1875  TSlave::kMaster, 0, msd, nwk);
1876 
1877  if (sl->IsValid()) {
1878  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1879  }
1880 
1881  return sl;
1882 }
1883 
1884 ////////////////////////////////////////////////////////////////////////////////
1885 /// Find slave that has TSocket s. Returns 0 in case slave is not found.
1886 
1888 {
1889  TSlave *sl;
1890  TIter next(fSlaves);
1891 
1892  while ((sl = (TSlave *)next())) {
1893  if (sl->IsValid() && sl->GetSocket() == s)
1894  return sl;
1895  }
1896  return 0;
1897 }
1898 
1899 ////////////////////////////////////////////////////////////////////////////////
1900 /// Add to the fUniqueSlave list the active slaves that have a unique
1901 /// (user) file system image. This information is used to transfer files
1902 /// only once to nodes that share a file system (an image). Submasters
1903 /// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1904 /// list. That list is used to trigger the transferring of files to
1905 /// the submaster's unique slaves without the need to transfer the file
1906 /// to the submaster.
1907 
1909 {
1910  fUniqueSlaves->Clear();
1915 
1916  TIter next(fActiveSlaves);
1917 
1918  while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1919  if (fImage == sl->fImage) {
1920  if (sl->GetSlaveType() == TSlave::kMaster) {
1921  fNonUniqueMasters->Add(sl);
1922  fAllUniqueSlaves->Add(sl);
1923  fAllUniqueMonitor->Add(sl->GetSocket());
1924  }
1925  continue;
1926  }
1927 
1928  TIter next2(fUniqueSlaves);
1929  TSlave *replace_slave = 0;
1930  Bool_t add = kTRUE;
1931  while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1932  if (sl->fImage == sl2->fImage) {
1933  add = kFALSE;
1934  if (sl->GetSlaveType() == TSlave::kMaster) {
1935  if (sl2->GetSlaveType() == TSlave::kSlave) {
1936  // give preference to master
1937  replace_slave = sl2;
1938  add = kTRUE;
1939  } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1940  fNonUniqueMasters->Add(sl);
1941  fAllUniqueSlaves->Add(sl);
1942  fAllUniqueMonitor->Add(sl->GetSocket());
1943  } else {
1944  Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1945  R__ASSERT(0);
1946  }
1947  }
1948  break;
1949  }
1950  }
1951 
1952  if (add) {
1953  fUniqueSlaves->Add(sl);
1954  fAllUniqueSlaves->Add(sl);
1955  fUniqueMonitor->Add(sl->GetSocket());
1956  fAllUniqueMonitor->Add(sl->GetSocket());
1957  if (replace_slave) {
1958  fUniqueSlaves->Remove(replace_slave);
1959  fAllUniqueSlaves->Remove(replace_slave);
1960  fUniqueMonitor->Remove(replace_slave->GetSocket());
1961  fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1962  }
1963  }
1964  }
1965 
1966  // will be actiavted in Collect()
1969 }
1970 
1971 ////////////////////////////////////////////////////////////////////////////////
1972 /// Return number of slaves as described in the config file.
1973 
1975 {
1976  return fSlaves->GetSize();
1977 }
1978 
1979 ////////////////////////////////////////////////////////////////////////////////
1980 /// Return number of active slaves, i.e. slaves that are valid and in
1981 /// the current computing group.
1982 
1984 {
1985  return fActiveSlaves->GetSize();
1986 }
1987 
1988 ////////////////////////////////////////////////////////////////////////////////
1989 /// Return number of inactive slaves, i.e. slaves that are valid but not in
1990 /// the current computing group.
1991 
1993 {
1994  return fInactiveSlaves->GetSize();
1995 }
1996 
1997 ////////////////////////////////////////////////////////////////////////////////
1998 /// Return number of unique slaves, i.e. active slaves that have each a
1999 /// unique different user files system.
2000 
2002 {
2003  return fUniqueSlaves->GetSize();
2004 }
2005 
2006 ////////////////////////////////////////////////////////////////////////////////
2007 /// Return number of bad slaves. This are slaves that we in the config
2008 /// file, but refused to startup or that died during the PROOF session.
2009 
2011 {
2012  return fBadSlaves->GetSize();
2013 }
2014 
2015 ////////////////////////////////////////////////////////////////////////////////
2016 /// Ask the for the statistics of the slaves.
2017 
2019 {
2020  if (!IsValid()) return;
2021 
2024 }
2025 
2026 ////////////////////////////////////////////////////////////////////////////////
2027 /// Get statistics about CPU time, real time and bytes read.
2028 /// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2029 /// and GetBytesRead()
2030 
2032 {
2033  if (fProtocol > 27) {
2034  // This returns the correct result
2035  AskStatistics();
2036  } else {
2037  // AskStatistics is buggy: parse the output of Print()
2038  RedirectHandle_t rh;
2039  gSystem->RedirectOutput(fLogFileName, "a", &rh);
2040  Print();
2041  gSystem->RedirectOutput(0, 0, &rh);
2042  TMacro *mp = GetLastLog();
2043  if (mp) {
2044  // Look for global directories
2045  TIter nxl(mp->GetListOfLines());
2046  TObjString *os = 0;
2047  while ((os = (TObjString *) nxl())) {
2048  TString s(os->GetName());
2049  if (s.Contains("Total MB's processed:")) {
2050  s.ReplaceAll("Total MB's processed:", "");
2051  if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2052  } else if (s.Contains("Total real time used (s):")) {
2053  s.ReplaceAll("Total real time used (s):", "");
2054  if (s.IsFloat()) fRealTime = s.Atof();
2055  } else if (s.Contains("Total CPU time used (s):")) {
2056  s.ReplaceAll("Total CPU time used (s):", "");
2057  if (s.IsFloat()) fCpuTime = s.Atof();
2058  }
2059  }
2060  delete mp;
2061  }
2062  }
2063 
2064  if (verbose) {
2065  Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2066  GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2067  }
2068 }
2069 
2070 ////////////////////////////////////////////////////////////////////////////////
2071 /// Ask the for the number of parallel slaves.
2072 
2074 {
2075  if (!IsValid()) return;
2076 
2079 }
2080 
2081 ////////////////////////////////////////////////////////////////////////////////
2082 /// Ask the master for the list of queries.
2083 
2085 {
2086  if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2087 
2088  Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2090  m << all;
2091  Broadcast(m, kActive);
2093 
2094  // This should have been filled by now
2095  return fQueries;
2096 }
2097 
2098 ////////////////////////////////////////////////////////////////////////////////
2099 /// Number of queries processed by this session
2100 
2102 {
2103  if (fQueries)
2104  return fQueries->GetSize() - fOtherQueries;
2105  return 0;
2106 }
2107 
2108 ////////////////////////////////////////////////////////////////////////////////
2109 /// Set max number of draw queries whose results are saved
2110 
2112 {
2113  if (max > 0) {
2114  if (fPlayer)
2115  fPlayer->SetMaxDrawQueries(max);
2116  fMaxDrawQueries = max;
2117  }
2118 }
2119 
2120 ////////////////////////////////////////////////////////////////////////////////
2121 /// Get max number of queries whose full results are kept in the
2122 /// remote sandbox
2123 
2125 {
2127  m << kFALSE;
2128  Broadcast(m, kActive);
2130 }
2131 
2132 ////////////////////////////////////////////////////////////////////////////////
2133 /// Return pointer to the list of query results in the player
2134 
2136 {
2137  return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2138 }
2139 
2140 ////////////////////////////////////////////////////////////////////////////////
2141 /// Return pointer to the full TQueryResult instance owned by the player
2142 /// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2143 
2145 {
2146  return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2147 }
2148 
2149 ////////////////////////////////////////////////////////////////////////////////
2150 /// Ask the master for the list of queries.
2151 /// Options:
2152 /// "A" show information about all the queries known to the
2153 /// server, i.e. even those processed by other sessions
2154 /// "L" show only information about queries locally available
2155 /// i.e. already retrieved. If "L" is specified, "A" is
2156 /// ignored.
2157 /// "F" show all details available about queries
2158 /// "H" print help menu
2159 /// Default ""
2160 
2162 {
2163  Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2164  if (help) {
2165 
2166  // Help
2167 
2168  Printf("+++");
2169  Printf("+++ Options: \"A\" show all queries known to server");
2170  Printf("+++ \"L\" show retrieved queries");
2171  Printf("+++ \"F\" full listing of query info");
2172  Printf("+++ \"H\" print this menu");
2173  Printf("+++");
2174  Printf("+++ (case insensitive)");
2175  Printf("+++");
2176  Printf("+++ Use Retrieve(<#>) to retrieve the full"
2177  " query results from the master");
2178  Printf("+++ e.g. Retrieve(8)");
2179 
2180  Printf("+++");
2181 
2182  return;
2183  }
2184 
2185  if (!IsValid()) return;
2186 
2187  Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2188 
2189  TObject *pq = 0;
2190  if (!local) {
2191  GetListOfQueries(opt);
2192 
2193  if (!fQueries) return;
2194 
2195  TIter nxq(fQueries);
2196 
2197  // Queries processed by other sessions
2198  if (fOtherQueries > 0) {
2199  Printf("+++");
2200  Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2201  Int_t nq = 0;
2202  while (nq++ < fOtherQueries && (pq = nxq()))
2203  pq->Print(opt);
2204  }
2205 
2206  // Queries processed by this session
2207  Printf("+++");
2208  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2210  while ((pq = nxq()))
2211  pq->Print(opt);
2212 
2213  } else {
2214 
2215  // Queries processed by this session
2216  Printf("+++");
2217  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2219 
2220  // Queries available locally
2221  TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2222  if (listlocal) {
2223  Printf("+++");
2224  Printf("+++ Queries available locally: %d", listlocal->GetSize());
2225  TIter nxlq(listlocal);
2226  while ((pq = nxlq()))
2227  pq->Print(opt);
2228  }
2229  }
2230  Printf("+++");
2231 }
2232 
2233 ////////////////////////////////////////////////////////////////////////////////
2234 /// See if the data is ready to be analyzed.
2235 
2236 Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
2237 {
2238  if (!IsValid()) return kFALSE;
2239 
2240  TList submasters;
2241  TIter nextSlave(GetListOfActiveSlaves());
2242  while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2243  if (sl->GetSlaveType() == TSlave::kMaster) {
2244  submasters.Add(sl);
2245  }
2246  }
2247 
2248  fDataReady = kTRUE; //see if any submasters set it to false
2249  fBytesReady = 0;
2250  fTotalBytes = 0;
2251  //loop over submasters and see if data is ready
2252  if (submasters.GetSize() > 0) {
2253  Broadcast(kPROOF_DATA_READY, &submasters);
2254  Collect(&submasters);
2255  }
2256 
2257  bytesready = fBytesReady;
2258  totalbytes = fTotalBytes;
2259 
2260  EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2261 
2262  PDB(kGlobal,2)
2263  Info("IsDataReady", "%lld / %lld (%s)",
2264  bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2265 
2266  return fDataReady;
2267 }
2268 
2269 ////////////////////////////////////////////////////////////////////////////////
2270 /// Send interrupt to master or slave servers.
2271 
2273 {
2274  if (!IsValid()) return;
2275 
2276  TList *slaves = 0;
2277  if (list == kAll) slaves = fSlaves;
2278  if (list == kActive) slaves = fActiveSlaves;
2279  if (list == kUnique) slaves = fUniqueSlaves;
2280  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2281 
2282  if (slaves->GetSize() == 0) return;
2283 
2284  TSlave *sl;
2285  TIter next(slaves);
2286 
2287  while ((sl = (TSlave *)next())) {
2288  if (sl->IsValid()) {
2289 
2290  // Ask slave to progate the interrupt request
2291  sl->Interrupt((Int_t)type);
2292  }
2293  }
2294 }
2295 
2296 ////////////////////////////////////////////////////////////////////////////////
2297 /// Returns number of slaves active in parallel mode. Returns 0 in case
2298 /// there are no active slaves. Returns -1 in case of error.
2299 
2301 {
2302  if (!IsValid()) return -1;
2303 
2304  // iterate over active slaves and return total number of slaves
2305  TIter nextSlave(GetListOfActiveSlaves());
2306  Int_t nparallel = 0;
2307  while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2308  if (sl->GetParallel() >= 0)
2309  nparallel += sl->GetParallel();
2310 
2311  return nparallel;
2312 }
2313 
2314 ////////////////////////////////////////////////////////////////////////////////
2315 /// Returns list of TSlaveInfo's. In case of error return 0.
2316 
2318 {
2319  if (!IsValid()) return 0;
2320 
2321  if (fSlaveInfo == 0) {
2323  fSlaveInfo->SetOwner();
2324  } else {
2325  fSlaveInfo->Delete();
2326  }
2327 
2328  TList masters;
2329  TIter next(GetListOfSlaves());
2330  TSlave *slave;
2331 
2332  while ((slave = (TSlave *) next()) != 0) {
2333  if (slave->GetSlaveType() == TSlave::kSlave) {
2334  const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2335  TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2336  name,
2337  slave->GetPerfIdx());
2338  fSlaveInfo->Add(slaveinfo);
2339 
2340  TIter nextactive(GetListOfActiveSlaves());
2341  TSlave *activeslave;
2342  while ((activeslave = (TSlave *) nextactive())) {
2343  if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2344  slaveinfo->SetStatus(TSlaveInfo::kActive);
2345  break;
2346  }
2347  }
2348 
2349  TIter nextbad(GetListOfBadSlaves());
2350  TSlave *badslave;
2351  while ((badslave = (TSlave *) nextbad())) {
2352  if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2353  slaveinfo->SetStatus(TSlaveInfo::kBad);
2354  break;
2355  }
2356  }
2357  // Get system info if supported
2358  if (slave->IsValid()) {
2359  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2360  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2361  else
2362  masters.Add(slave);
2363  }
2364 
2365  } else if (slave->GetSlaveType() == TSlave::kMaster) {
2366  if (slave->IsValid()) {
2367  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2368  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2369  else
2370  masters.Add(slave);
2371  }
2372  } else {
2373  Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2374  R__ASSERT(0);
2375  }
2376  }
2377  if (masters.GetSize() > 0) Collect(&masters);
2378 
2379  return fSlaveInfo;
2380 }
2381 
2382 ////////////////////////////////////////////////////////////////////////////////
2383 /// Activate slave server list.
2384 
2386 {
2387  TMonitor *mon = fAllMonitor;
2388  mon->DeActivateAll();
2389 
2390  slaves = !slaves ? fActiveSlaves : slaves;
2391 
2392  TIter next(slaves);
2393  TSlave *sl;
2394  while ((sl = (TSlave*) next())) {
2395  if (sl->IsValid())
2396  mon->Activate(sl->GetSocket());
2397  }
2398 }
2399 
2400 ////////////////////////////////////////////////////////////////////////////////
2401 /// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2402 /// monitored by 'mon'.
2403 
2405 {
2406  TMonitor *m = (mon) ? mon : fCurrentMonitor;
2407  if (m) {
2408  if (on)
2409  m->ActivateAll();
2410  else
2411  m->DeActivateAll();
2412  }
2413 }
2414 
2415 ////////////////////////////////////////////////////////////////////////////////
2416 /// Broadcast the group priority to all workers in the specified list. Returns
2417 /// the number of workers the message was successfully sent to.
2418 /// Returns -1 in case of error.
2419 
2420 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2421 {
2422  if (!IsValid()) return -1;
2423 
2424  if (workers->GetSize() == 0) return 0;
2425 
2426  int nsent = 0;
2427  TIter next(workers);
2428 
2429  TSlave *wrk;
2430  while ((wrk = (TSlave *)next())) {
2431  if (wrk->IsValid()) {
2432  if (wrk->SendGroupPriority(grp, priority) == -1)
2433  MarkBad(wrk, "could not send group priority");
2434  else
2435  nsent++;
2436  }
2437  }
2438 
2439  return nsent;
2440 }
2441 
2442 ////////////////////////////////////////////////////////////////////////////////
2443 /// Broadcast the group priority to all workers in the specified list. Returns
2444 /// the number of workers the message was successfully sent to.
2445 /// Returns -1 in case of error.
2446 
2447 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2448 {
2449  TList *workers = 0;
2450  if (list == kAll) workers = fSlaves;
2451  if (list == kActive) workers = fActiveSlaves;
2452  if (list == kUnique) workers = fUniqueSlaves;
2453  if (list == kAllUnique) workers = fAllUniqueSlaves;
2454 
2455  return BroadcastGroupPriority(grp, priority, workers);
2456 }
2457 
2458 ////////////////////////////////////////////////////////////////////////////////
2459 /// Reset the merge progress notificator
2460 
2462 {
2464 }
2465 
2466 ////////////////////////////////////////////////////////////////////////////////
2467 /// Broadcast a message to all slaves in the specified list. Returns
2468 /// the number of slaves the message was successfully sent to.
2469 /// Returns -1 in case of error.
2470 
2471 Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
2472 {
2473  if (!IsValid()) return -1;
2474 
2475  if (!slaves || slaves->GetSize() == 0) return 0;
2476 
2477  int nsent = 0;
2478  TIter next(slaves);
2479 
2480  TSlave *sl;
2481  while ((sl = (TSlave *)next())) {
2482  if (sl->IsValid()) {
2483  if (sl->GetSocket()->Send(mess) == -1)
2484  MarkBad(sl, "could not broadcast request");
2485  else
2486  nsent++;
2487  }
2488  }
2489 
2490  return nsent;
2491 }
2492 
2493 ////////////////////////////////////////////////////////////////////////////////
2494 /// Broadcast a message to all slaves in the specified list (either
2495 /// all slaves or only the active slaves). Returns the number of slaves
2496 /// the message was successfully sent to. Returns -1 in case of error.
2497 
2499 {
2500  TList *slaves = 0;
2501  if (list == kAll) slaves = fSlaves;
2502  if (list == kActive) slaves = fActiveSlaves;
2503  if (list == kUnique) slaves = fUniqueSlaves;
2504  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2505 
2506  return Broadcast(mess, slaves);
2507 }
2508 
2509 ////////////////////////////////////////////////////////////////////////////////
2510 /// Broadcast a character string buffer to all slaves in the specified
2511 /// list. Use kind to set the TMessage what field. Returns the number of
2512 /// slaves the message was sent to. Returns -1 in case of error.
2513 
2514 Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2515 {
2516  TMessage mess(kind);
2517  if (str) mess.WriteString(str);
2518  return Broadcast(mess, slaves);
2519 }
2520 
2521 ////////////////////////////////////////////////////////////////////////////////
2522 /// Broadcast a character string buffer to all slaves in the specified
2523 /// list (either all slaves or only the active slaves). Use kind to
2524 /// set the TMessage what field. Returns the number of slaves the message
2525 /// was sent to. Returns -1 in case of error.
2526 
2527 Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2528 {
2529  TMessage mess(kind);
2530  if (str) mess.WriteString(str);
2531  return Broadcast(mess, list);
2532 }
2533 
2534 ////////////////////////////////////////////////////////////////////////////////
2535 /// Broadcast an object to all slaves in the specified list. Use kind to
2536 /// set the TMEssage what field. Returns the number of slaves the message
2537 /// was sent to. Returns -1 in case of error.
2538 
2540 {
2541  TMessage mess(kind);
2542  mess.WriteObject(obj);
2543  return Broadcast(mess, slaves);
2544 }
2545 
2546 ////////////////////////////////////////////////////////////////////////////////
2547 /// Broadcast an object to all slaves in the specified list. Use kind to
2548 /// set the TMEssage what field. Returns the number of slaves the message
2549 /// was sent to. Returns -1 in case of error.
2550 
2552 {
2553  TMessage mess(kind);
2554  mess.WriteObject(obj);
2555  return Broadcast(mess, list);
2556 }
2557 
2558 ////////////////////////////////////////////////////////////////////////////////
2559 /// Broadcast a raw buffer of specified length to all slaves in the
2560 /// specified list. Returns the number of slaves the buffer was sent to.
2561 /// Returns -1 in case of error.
2562 
2563 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2564 {
2565  if (!IsValid()) return -1;
2566 
2567  if (slaves->GetSize() == 0) return 0;
2568 
2569  int nsent = 0;
2570  TIter next(slaves);
2571 
2572  TSlave *sl;
2573  while ((sl = (TSlave *)next())) {
2574  if (sl->IsValid()) {
2575  if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2576  MarkBad(sl, "could not send broadcast-raw request");
2577  else
2578  nsent++;
2579  }
2580  }
2581 
2582  return nsent;
2583 }
2584 
2585 ////////////////////////////////////////////////////////////////////////////////
2586 /// Broadcast a raw buffer of specified length to all slaves in the
2587 /// specified list. Returns the number of slaves the buffer was sent to.
2588 /// Returns -1 in case of error.
2589 
2590 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
2591 {
2592  TList *slaves = 0;
2593  if (list == kAll) slaves = fSlaves;
2594  if (list == kActive) slaves = fActiveSlaves;
2595  if (list == kUnique) slaves = fUniqueSlaves;
2596  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2597 
2598  return BroadcastRaw(buffer, length, slaves);
2599 }
2600 
2601 ////////////////////////////////////////////////////////////////////////////////
2602 /// Broadcast file to all workers in the specified list. Returns the number of workers
2603 /// the buffer was sent to.
2604 /// Returns -1 in case of error.
2605 
2606 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2607 {
2608  if (!IsValid()) return -1;
2609 
2610  if (wrks->GetSize() == 0) return 0;
2611 
2612  int nsent = 0;
2613  TIter next(wrks);
2614 
2615  TSlave *wrk;
2616  while ((wrk = (TSlave *)next())) {
2617  if (wrk->IsValid()) {
2618  if (SendFile(file, opt, rfile, wrk) < 0)
2619  Error("BroadcastFile",
2620  "problems sending file to worker %s (%s)",
2621  wrk->GetOrdinal(), wrk->GetName());
2622  else
2623  nsent++;
2624  }
2625  }
2626 
2627  return nsent;
2628 }
2629 
2630 ////////////////////////////////////////////////////////////////////////////////
2631 /// Broadcast file to all workers in the specified list. Returns the number of workers
2632 /// the buffer was sent to.
2633 /// Returns -1 in case of error.
2634 
2635 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2636 {
2637  TList *wrks = 0;
2638  if (list == kAll) wrks = fSlaves;
2639  if (list == kActive) wrks = fActiveSlaves;
2640  if (list == kUnique) wrks = fUniqueSlaves;
2641  if (list == kAllUnique) wrks = fAllUniqueSlaves;
2642 
2643  return BroadcastFile(file, opt, rfile, wrks);
2644 }
2645 
2646 ////////////////////////////////////////////////////////////////////////////////
2647 /// Release the used monitor to be used, making sure to delete newly created
2648 /// monitors.
2649 
2651 {
2652  if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2653  && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2654  delete mon;
2655  }
2656 }
2657 
2658 ////////////////////////////////////////////////////////////////////////////////
2659 /// Collect responses from slave sl. Returns the number of slaves that
2660 /// responded (=1).
2661 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2662 /// which means wait forever).
2663 /// If defined (>= 0) endtype is the message that stops this collection.
2664 
2665 Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2666 {
2667  Int_t rc = 0;
2668 
2669  TMonitor *mon = 0;
2670  if (!sl->IsValid()) return 0;
2671 
2672  if (fCurrentMonitor == fAllMonitor) {
2673  mon = new TMonitor;
2674  } else {
2675  mon = fAllMonitor;
2676  mon->DeActivateAll();
2677  }
2678  mon->Activate(sl->GetSocket());
2679 
2680  rc = Collect(mon, timeout, endtype, deactonfail);
2681  ReleaseMonitor(mon);
2682  return rc;
2683 }
2684 
2685 ////////////////////////////////////////////////////////////////////////////////
2686 /// Collect responses from the slave servers. Returns the number of slaves
2687 /// that responded.
2688 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2689 /// which means wait forever).
2690 /// If defined (>= 0) endtype is the message that stops this collection.
2691 
2692 Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2693 {
2694  Int_t rc = 0;
2695 
2696  TMonitor *mon = 0;
2697 
2698  if (fCurrentMonitor == fAllMonitor) {
2699  mon = new TMonitor;
2700  } else {
2701  mon = fAllMonitor;
2702  mon->DeActivateAll();
2703  }
2704  TIter next(slaves);
2705  TSlave *sl;
2706  while ((sl = (TSlave*) next())) {
2707  if (sl->IsValid())
2708  mon->Activate(sl->GetSocket());
2709  }
2710 
2711  rc = Collect(mon, timeout, endtype, deactonfail);
2712  ReleaseMonitor(mon);
2713  return rc;
2714 }
2715 
2716 ////////////////////////////////////////////////////////////////////////////////
2717 /// Collect responses from the slave servers. Returns the number of slaves
2718 /// that responded.
2719 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2720 /// which means wait forever).
2721 /// If defined (>= 0) endtype is the message that stops this collection.
2722 
2723 Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2724 {
2725  Int_t rc = 0;
2726  TMonitor *mon = 0;
2727 
2728  if (list == kAll) mon = fAllMonitor;
2729  if (list == kActive) mon = fActiveMonitor;
2730  if (list == kUnique) mon = fUniqueMonitor;
2731  if (list == kAllUnique) mon = fAllUniqueMonitor;
2732  if (fCurrentMonitor == mon) {
2733  // Get a copy
2734  mon = new TMonitor(*mon);
2735  }
2736  mon->ActivateAll();
2737 
2738  rc = Collect(mon, timeout, endtype, deactonfail);
2739  ReleaseMonitor(mon);
2740  return rc;
2741 }
2742 
2743 ////////////////////////////////////////////////////////////////////////////////
2744 /// Collect responses from the slave servers. Returns the number of messages
2745 /// received. Can be 0 if there are no active slaves.
2746 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2747 /// which means wait forever).
2748 /// If defined (>= 0) endtype is the message that stops this collection.
2749 /// Collect also stops its execution from time to time to check for new
2750 /// workers in Dynamic Startup mode.
2751 
2752 Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2753 {
2754  Int_t collectId = gRandom->Integer(9999);
2755 
2756  PDB(kCollect, 3)
2757  Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2758 
2759  // Reset the status flag and clear the messages in the list, if any
2760  fStatus = 0;
2761  fRecvMessages->Clear();
2762 
2763  Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2764 
2765  if (!mon->GetActive(actto)) return 0;
2766 
2768 
2769  // Used by external code to know what we are monitoring
2770  TMonitor *savedMonitor = 0;
2771  if (fCurrentMonitor) {
2772  savedMonitor = fCurrentMonitor;
2773  fCurrentMonitor = mon;
2774  } else {
2775  fCurrentMonitor = mon;
2776  fBytesRead = 0;
2777  fRealTime = 0.0;
2778  fCpuTime = 0.0;
2779  }
2780 
2781  // We want messages on the main window during synchronous collection,
2782  // but we save the present status to restore it at the end
2783  Bool_t saveRedirLog = fRedirLog;
2784  if (!IsIdle() && !IsSync())
2785  fRedirLog = kFALSE;
2786 
2787  int cnt = 0, rc = 0;
2788 
2789  // Timeout counter
2790  Long_t nto = timeout;
2791  PDB(kCollect, 2)
2792  Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2793 
2794  // On clients, handle Ctrl-C during collection
2795  if (fIntHandler)
2796  fIntHandler->Add();
2797 
2798  // Sockets w/o activity during the last 'sto' millisecs are deactivated
2799  Int_t nact = 0;
2800  Long_t sto = -1;
2801  Int_t nsto = 60;
2802  Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2803  mon->ResetInterrupt();
2804  while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2805 
2806  // Dump last waiting sockets, if in debug mode
2807  PDB(kCollect, 2) {
2808  if (nact < 4) {
2809  TList *al = mon->GetListOfActives();
2810  if (al && al->GetSize() > 0) {
2811  Info("Collect"," %d node(s) still active:", al->GetSize());
2812  TIter nxs(al);
2813  TSocket *xs = 0;
2814  while ((xs = (TSocket *)nxs())) {
2815  TSlave *wrk = FindSlave(xs);
2816  if (wrk)
2817  Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2818  else
2819  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2820  xs->GetInetAddress().GetPort());
2821  }
2822  }
2823  delete al;
2824  }
2825  }
2826 
2827  // Preemptive poll for new workers on the master only in Dynamic Mode and only
2828  // during processing (TODO: should work on Top Master only)
2830  ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2833  fLastPollWorkers_s = time(0);
2835  PDB(kCollect, 1)
2836  Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2837  }
2838 
2839  // Wait for a ready socket
2840  PDB(kCollect, 3)
2841  Info("Collect", "Will invoke Select() #%04d", collectId);
2842  TSocket *s = mon->Select(1000);
2843 
2844  if (s && s != (TSocket *)(-1)) {
2845  // Get and analyse the info it did receive
2846  rc = CollectInputFrom(s, endtype, deactonfail);
2847  if (rc == 1 || (rc == 2 && !savedMonitor)) {
2848  // Deactivate it if we are done with it
2849  mon->DeActivate(s);
2850  TList *al = mon->GetListOfActives();
2851  PDB(kCollect, 2)
2852  Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2853  s, mon->GetActive(),
2854  al->First());
2855  delete al;
2856  } else if (rc == 2) {
2857  // This end message was for the saved monitor
2858  // Deactivate it if we are done with it
2859  if (savedMonitor) {
2860  savedMonitor->DeActivate(s);
2861  TList *al = mon->GetListOfActives();
2862  PDB(kCollect, 2)
2863  Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2864  s, savedMonitor->GetActive(),
2865  al->First());
2866  delete al;
2867  }
2868  }
2869 
2870  // Update counter (if no error occured)
2871  if (rc >= 0)
2872  cnt++;
2873  } else {
2874  // If not timed-out, exit if not stopped or not aborted
2875  // (player exits status is finished in such a case); otherwise,
2876  // we still need to collect the partial output info
2877  if (!s)
2879  mon->DeActivateAll();
2880  // Decrease the timeout counter if requested
2881  if (s == (TSocket *)(-1) && nto > 0)
2882  nto--;
2883  }
2884 
2885  // Check if there are workers with ready output to be sent and ask the first to send it
2886  if (IsMaster() && fWrksOutputReady && fWrksOutputReady->GetSize() > 0) {
2887  // Maximum number of concurrent sendings
2888  Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2889  if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2890  mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2891  TIter nxwr(fWrksOutputReady);
2892  TSlave *wrk = 0;
2893  while (mxws && (wrk = (TSlave *) nxwr())) {
2894  if (!wrk->TestBit(TSlave::kOutputRequested)) {
2895  // Ask worker for output
2896  TMessage sendoutput(kPROOF_SENDOUTPUT);
2897  PDB(kCollect, 2)
2898  Info("Collect", "worker %s was asked to send its output to master",
2899  wrk->GetOrdinal());
2900  if (wrk->GetSocket()->Send(sendoutput) != 1) {
2902  mxws--;
2903  }
2904  } else {
2905  // Count
2906  mxws--;
2907  }
2908  }
2909  }
2910 
2911  // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2912  sto = -1;
2913  if (--nsto <= 0) {
2914  sto = (Long_t) actto;
2915  nsto = 60;
2916  }
2917 
2918  } // end loop over active monitors
2919 
2920  // If timed-out, deactivate the remaining sockets
2921  if (nto == 0) {
2922  TList *al = mon->GetListOfActives();
2923  if (al && al->GetSize() > 0) {
2924  // Notify the name of those which did timeout
2925  Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2926  TIter nxs(al);
2927  TSocket *xs = 0;
2928  while ((xs = (TSocket *)nxs())) {
2929  TSlave *wrk = FindSlave(xs);
2930  if (wrk)
2931  Info("Collect"," %s", wrk->GetName());
2932  else
2933  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2934  xs->GetInetAddress().GetPort());
2935  }
2936  }
2937  delete al;
2938  mon->DeActivateAll();
2939  }
2940 
2941  // Deactivate Ctrl-C special handler
2942  if (fIntHandler)
2943  fIntHandler->Remove();
2944 
2945  // make sure group view is up to date
2946  SendGroupView();
2947 
2948  // Restore redirection setting
2949  fRedirLog = saveRedirLog;
2950 
2951  // Restore the monitor
2952  fCurrentMonitor = savedMonitor;
2953 
2955 
2956  PDB(kCollect, 3)
2957  Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2958 
2959  return cnt;
2960 }
2961 
2962 ////////////////////////////////////////////////////////////////////////////////
2963 /// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2964 /// them. Returns the number of new workers found, or <0 on errors.
2965 
2967 {
2968  // Requests for worker updates
2969  Int_t dummy = 0;
2970  TList *reqWorkers = new TList();
2971  reqWorkers->SetOwner(kFALSE);
2972 
2973  if (!TestBit(TProof::kIsMaster)) {
2974  Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2975  return -1;
2976  }
2977  if (!gProofServ) {
2978  Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2979  return -1;
2980  }
2981 
2982  gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2983 
2984  // List of new workers only (TProofNodeInfo)
2985  TList *newWorkers = new TList();
2986  newWorkers->SetOwner(kTRUE);
2987 
2988  TIter next(reqWorkers);
2989  TProofNodeInfo *ni;
2990  TString fullOrd;
2991  while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2992 
2993  // Form the full ordinal
2994  fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2995 
2996  TIter nextInner(fSlaves);
2997  TSlave *sl;
2998  Bool_t found = kFALSE;
2999  while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
3000  if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
3001  found = kTRUE;
3002  break;
3003  }
3004  }
3005 
3006  if (found) delete ni;
3007  else {
3008  newWorkers->Add(ni);
3009  PDB(kGlobal, 1)
3010  Info("PollForNewWorkers", "New worker found: %s:%s",
3011  ni->GetNodeName().Data(), fullOrd.Data());
3012  }
3013  }
3014 
3015  delete reqWorkers; // not owner
3016 
3017  Int_t nNewWorkers = newWorkers->GetEntries();
3018 
3019  // Add the new workers
3020  if (nNewWorkers > 0) {
3021  PDB(kGlobal, 1)
3022  Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
3023  Int_t rv = AddWorkers(newWorkers);
3024  if (rv < 0) {
3025  Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
3026  return -1;
3027  }
3028  // Don't delete newWorkers: AddWorkers() will do that
3029  }
3030  else {
3031  PDB(kGlobal, 2)
3032  Info("PollForNewWorkers", "No new worker found");
3033  delete newWorkers;
3034  }
3035 
3036  return nNewWorkers;
3037 }
3038 
3039 ////////////////////////////////////////////////////////////////////////////////
3040 /// Remove links to objects in list 'ol' from gDirectory
3041 
3043 {
3044  if (ol) {
3045  TIter nxo(ol);
3046  TObject *o = 0;
3047  while ((o = nxo()))
3048  gDirectory->RecursiveRemove(o);
3049  }
3050 }
3051 
3052 ////////////////////////////////////////////////////////////////////////////////
3053 /// Collect and analyze available input from socket s.
3054 /// Returns 0 on success, -1 if any failure occurs.
3055 
3057 {
3058  TMessage *mess;
3059 
3060  Int_t recvrc = 0;
3061  if ((recvrc = s->Recv(mess)) < 0) {
3062  PDB(kCollect,2)
3063  Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3064  Bool_t bad = kTRUE;
3065  if (recvrc == -5) {
3066  // Broken connection: try reconnection
3068  if (s->Reconnect() == 0) {
3070  bad = kFALSE;
3071  }
3072  }
3073  if (bad)
3074  MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3075  // Ignore this wake up
3076  return -1;
3077  }
3078  if (!mess) {
3079  // we get here in case the remote server died
3080  MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3081  return -1;
3082  }
3083  Int_t rc = 0;
3084 
3085  Int_t what = mess->What();
3086  TSlave *sl = FindSlave(s);
3087  rc = HandleInputMessage(sl, mess, deactonfail);
3088  if (rc == 1 && (endtype >= 0) && (what != endtype))
3089  // This message was for the base monitor in recursive case
3090  rc = 2;
3091 
3092  // We are done successfully
3093  return rc;
3094 }
3095 
3096 ////////////////////////////////////////////////////////////////////////////////
3097 /// Analyze the received message.
3098 /// Returns 0 on success (1 if this the last message from this socket), -1 if
3099 /// any failure occurs.
3100 
3102 {
3103  char str[512];
3104  TObject *obj;
3105  Int_t rc = 0;
3106 
3107  if (!mess || !sl) {
3108  Warning("HandleInputMessage", "given an empty message or undefined worker");
3109  return -1;
3110  }
3111  Bool_t delete_mess = kTRUE;
3112  TSocket *s = sl->GetSocket();
3113  if (!s) {
3114  Warning("HandleInputMessage", "worker socket is undefined");
3115  return -1;
3116  }
3117 
3118  // The message type
3119  Int_t what = mess->What();
3120 
3121  PDB(kCollect,3)
3122  Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3123 
3124  switch (what) {
3125 
3126  case kMESS_OK:
3127  // Add the message to the list
3128  fRecvMessages->Add(mess);
3129  delete_mess = kFALSE;
3130  break;
3131 
3132  case kMESS_OBJECT:
3133  if (fPlayer) fPlayer->HandleRecvHisto(mess);
3134  break;
3135 
3136  case kPROOF_FATAL:
3137  { TString msg;
3138  if ((mess->BufferSize() > mess->Length()))
3139  (*mess) >> msg;
3140  if (msg.IsNull()) {
3141  MarkBad(s, "received kPROOF_FATAL");
3142  } else {
3143  MarkBad(s, msg);
3144  }
3145  }
3146  if (fProgressDialogStarted) {
3147  // Finalize the progress dialog
3148  Emit("StopProcess(Bool_t)", kTRUE);
3149  }
3150  break;
3151 
3152  case kPROOF_STOP:
3153  // Stop collection from this worker
3154  Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3155  sl->GetOrdinal());
3156  rc = 1;
3157  break;
3158 
3159  case kPROOF_GETTREEHEADER:
3160  // Add the message to the list
3161  fRecvMessages->Add(mess);
3162  delete_mess = kFALSE;
3163  rc = 1;
3164  break;
3165 
3166  case kPROOF_TOUCH:
3167  // send a request for touching the remote admin file
3168  {
3169  sl->Touch();
3170  }
3171  break;
3172 
3173  case kPROOF_GETOBJECT:
3174  // send slave object it asks for
3175  mess->ReadString(str, sizeof(str));
3176  obj = gDirectory->Get(str);
3177  if (obj)
3178  s->SendObject(obj);
3179  else
3180  s->Send(kMESS_NOTOK);
3181  break;
3182 
3183  case kPROOF_GETPACKET:
3184  {
3185  PDB(kGlobal,2)
3186  Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3187  TDSetElement *elem = 0;
3188  elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3189 
3190  if (elem != (TDSetElement*) -1) {
3191  TMessage answ(kPROOF_GETPACKET);
3192  answ << elem;
3193  s->Send(answ);
3194 
3195  while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3196  TPair *p = (TPair*) fWaitingSlaves->First();
3197  s = (TSocket*) p->Key();
3198  TMessage *m = (TMessage*) p->Value();
3199 
3200  elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3201  if (elem != (TDSetElement*) -1) {
3203  a << elem;
3204  s->Send(a);
3205  // remove has to happen via Links because TPair does not have
3206  // a Compare() function and therefore RemoveFirst() and
3207  // Remove(TObject*) do not work
3209  delete p;
3210  delete m;
3211  } else {
3212  break;
3213  }
3214  }
3215  } else {
3216  if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3217  fWaitingSlaves->Add(new TPair(s, mess));
3218  delete_mess = kFALSE;
3219  }
3220  }
3221  break;
3222 
3223  case kPROOF_LOGFILE:
3224  {
3225  Int_t size;
3226  (*mess) >> size;
3227  PDB(kGlobal,2)
3228  Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3229  RecvLogFile(s, size);
3230  }
3231  break;
3232 
3233  case kPROOF_LOGDONE:
3234  (*mess) >> sl->fStatus >> sl->fParallel;
3235  PDB(kCollect,2)
3236  Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3237  sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3238  if (sl->fStatus != 0) {
3239  // Return last nonzero status
3240  fStatus = sl->fStatus;
3241  // Deactivate the worker, if required
3242  if (deactonfail) DeactivateWorker(sl->fOrdinal);
3243  }
3244  // Remove from the workers-ready list
3247  fWrksOutputReady->Remove(sl);
3248  }
3249  rc = 1;
3250  break;
3251 
3252  case kPROOF_GETSTATS:
3253  {
3254  (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3255  >> sl->fWorkDir >> sl->fProofWorkDir;
3256  PDB(kCollect,2)
3257  Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3258  TString img;
3259  if ((mess->BufferSize() > mess->Length()))
3260  (*mess) >> img;
3261  // Set image
3262  if (img.IsNull()) {
3263  if (sl->fImage.IsNull())
3264  sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3265  sl->fProofWorkDir.Data());
3266  } else {
3267  sl->fImage = img;
3268  }
3269  PDB(kGlobal,2)
3270  Info("HandleInputMessage",
3271  "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3272 
3273  fBytesRead += sl->fBytesRead;
3274  fRealTime += sl->fRealTime;
3275  fCpuTime += sl->fCpuTime;
3276  rc = 1;
3277  }
3278  break;
3279 
3280  case kPROOF_GETPARALLEL:
3281  {
3282  Bool_t async = kFALSE;
3283  (*mess) >> sl->fParallel;
3284  if ((mess->BufferSize() > mess->Length()))
3285  (*mess) >> async;
3286  rc = (async) ? 0 : 1;
3287  }
3288  break;
3289 
3290  case kPROOF_CHECKFILE:
3291  { // New servers (>= 5.22) send the status
3292  if ((mess->BufferSize() > mess->Length())) {
3293  (*mess) >> fCheckFileStatus;
3294  } else {
3295  // Form old servers this meant success (failure was signaled with the
3296  // dangerous kPROOF_FATAL)
3297  fCheckFileStatus = 1;
3298  }
3299  rc = 1;
3300  }
3301  break;
3302 
3303  case kPROOF_SENDFILE:
3304  { // New server: signals ending of sendfile operation
3305  rc = 1;
3306  }
3307  break;
3308 
3309  case kPROOF_PACKAGE_LIST:
3310  {
3311  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3312  Int_t type = 0;
3313  (*mess) >> type;
3314  switch (type) {
3318  if (fEnabledPackages) {
3320  } else {
3321  Error("HandleInputMessage",
3322  "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3323  }
3324  break;
3325  case TProof::kListPackages:
3328  if (fAvailablePackages) {
3330  } else {
3331  Error("HandleInputMessage",
3332  "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3333  }
3334  break;
3335  default:
3336  Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3337  }
3338  }
3339  break;
3340 
3341  case kPROOF_SENDOUTPUT:
3342  {
3343  // We start measuring the merging time
3344  fPlayer->SetMerging();
3345 
3346  // Worker is ready to send output: make sure the relevant bit is reset
3348  PDB(kGlobal,2)
3349  Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3350  // Create the list if not yet done
3351  if (!fWrksOutputReady) {
3352  fWrksOutputReady = new TList;
3354  }
3355  fWrksOutputReady->Add(sl);
3356  }
3357  break;
3358 
3359  case kPROOF_OUTPUTOBJECT:
3360  {
3361  // We start measuring the merging time
3362  fPlayer->SetMerging();
3363 
3364  PDB(kGlobal,2)
3365  Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3366  Int_t type = 0;
3367  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3369  Info("HandleInputMessage", "finalization on %s started ...", prefix);
3371  }
3372 
3373  while ((mess->BufferSize() > mess->Length())) {
3374  (*mess) >> type;
3375  // If a query result header, add it to the player list
3376  if (fPlayer) {
3377  if (type == 0) {
3378  // Retrieve query result instance (output list not filled)
3379  TQueryResult *pq =
3381  if (pq) {
3382  // Add query to the result list in TProofPlayer
3383  fPlayer->AddQueryResult(pq);
3384  fPlayer->SetCurrentQuery(pq);
3385  // And clear the output list, as we start merging a new set of results
3386  if (fPlayer->GetOutputList())
3387  fPlayer->GetOutputList()->Clear();
3388  // Add the unique query tag as TNamed object to the input list
3389  // so that it is available in TSelectors for monitoring
3390  TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3391  if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3392  fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3393  fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3394  } else {
3395  Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3396  }
3397  } else if (type > 0) {
3398  // Read object
3399  TObject *o = mess->ReadObject(TObject::Class());
3400  // Increment counter on the client side
3402  TString msg;
3403  Bool_t changed = kFALSE;
3404  msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3405  if (gProofServ) {
3407  } else if (IsTty() || changed) {
3408  fprintf(stderr, "%s\r", msg.Data());
3409  }
3410  // Add or merge it
3411  if ((fPlayer->AddOutputObject(o) == 1)) {
3412  // Remove the object if it has been merged
3413  SafeDelete(o);
3414  }
3415  if (type > 1) {
3416  // Update the merger progress info
3418  if (TestBit(TProof::kIsClient) && !IsLite()) {
3419  // In PROOFLite this has to be done once only in TProofLite::Process
3421  if (pq) {
3423  // Add input objects (do not override remote settings, if any)
3424  TObject *xo = 0;
3425  TIter nxin(fPlayer->GetInputList());
3426  // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3427  if (!pq->GetInputList()) pq->SetInputList(new TList());
3428  while ((xo = nxin()))
3429  if (!pq->GetInputList()->FindObject(xo->GetName()))
3430  pq->AddInput(xo->Clone());
3431  // If the last object, notify the GUI that the result arrived
3432  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3433  }
3434  // Processing is over
3435  UpdateDialog();
3436  }
3437  }
3438  }
3439  } else {
3440  Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3441  }
3442  }
3443  }
3444  break;
3445 
3446  case kPROOF_OUTPUTLIST:
3447  {
3448  // We start measuring the merging time
3449 
3450  PDB(kGlobal,2)
3451  Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3452  TList *out = 0;
3453  if (fPlayer) {
3454  fPlayer->SetMerging();
3455  if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3456  out = (TList *) mess->ReadObject(TList::Class());
3457  } else {
3458  TQueryResult *pq =
3460  if (pq) {
3461  // Add query to the result list in TProofPlayer
3462  fPlayer->AddQueryResult(pq);
3463  fPlayer->SetCurrentQuery(pq);
3464  // To avoid accidental cleanups from anywhere else
3465  // remove objects from gDirectory and clone the list
3466  out = pq->GetOutputList();
3467  CleanGDirectory(out);
3468  out = (TList *) out->Clone();
3469  // Notify the GUI that the result arrived
3470  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3471  } else {
3472  PDB(kGlobal,2)
3473  Info("HandleInputMessage",
3474  "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3475  }
3476  }
3477  if (out) {
3478  out->SetOwner();
3479  fPlayer->AddOutput(out); // Incorporate the list
3480  SafeDelete(out);
3481  } else {
3482  PDB(kGlobal,2)
3483  Info("HandleInputMessage",
3484  "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3485  }
3486  } else {
3487  Warning("HandleInputMessage",
3488  "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3489  }
3490  // On clients at this point processing is over
3491  if (TestBit(TProof::kIsClient) && !IsLite())
3492  UpdateDialog();
3493  }
3494  break;
3495 
3496  case kPROOF_QUERYLIST:
3497  {
3498  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3499  (*mess) >> fOtherQueries >> fDrawQueries;
3500  if (fQueries) {
3501  fQueries->Delete();
3502  delete fQueries;
3503  fQueries = 0;
3504  }
3505  fQueries = (TList *) mess->ReadObject(TList::Class());
3506  }
3507  break;
3508 
3509  case kPROOF_RETRIEVE:
3510  {
3511  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3512  TQueryResult *pq =
3514  if (pq && fPlayer) {
3515  fPlayer->AddQueryResult(pq);
3516  // Notify the GUI that the result arrived
3517  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3518  } else {
3519  PDB(kGlobal,2)
3520  Info("HandleInputMessage",
3521  "kPROOF_RETRIEVE: query result missing or player undefined");
3522  }
3523  }
3524  break;
3525 
3526  case kPROOF_MAXQUERIES:
3527  {
3528  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3529  Int_t max = 0;
3530 
3531  (*mess) >> max;
3532  Printf("Number of queries fully kept remotely: %d", max);
3533  }
3534  break;
3535 
3536  case kPROOF_SERVERSTARTED:
3537  {
3538  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3539 
3540  UInt_t tot = 0, done = 0;
3541  TString action;
3542  Bool_t st = kTRUE;
3543 
3544  (*mess) >> action >> tot >> done >> st;
3545 
3546  if (TestBit(TProof::kIsClient)) {
3547  if (tot) {
3548  TString type = (action.Contains("submas")) ? "submasters"
3549  : "workers";
3550  Int_t frac = (Int_t) (done*100.)/tot;
3551  char msg[512] = {0};
3552  if (frac >= 100) {
3553  snprintf(msg, 512, "%s: OK (%d %s) \n",
3554  action.Data(),tot, type.Data());
3555  } else {
3556  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3557  action.Data(), done, tot, frac);
3558  }
3559  if (fSync)
3560  fprintf(stderr,"%s", msg);
3561  else
3562  NotifyLogMsg(msg, 0);
3563  }
3564  // Notify GUIs
3565  StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3566  } else {
3567 
3568  // Just send the message one level up
3570  m << action << tot << done << st;
3571  gProofServ->GetSocket()->Send(m);
3572  }
3573  }
3574  break;
3575 
3576  case kPROOF_DATASET_STATUS:
3577  {
3578  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3579 
3580  UInt_t tot = 0, done = 0;
3581  TString action;
3582  Bool_t st = kTRUE;
3583 
3584  (*mess) >> action >> tot >> done >> st;
3585 
3586  if (TestBit(TProof::kIsClient)) {
3587  if (tot) {
3588  TString type = "files";
3589  Int_t frac = (Int_t) (done*100.)/tot;
3590  char msg[512] = {0};
3591  if (frac >= 100) {
3592  snprintf(msg, 512, "%s: OK (%d %s) \n",
3593  action.Data(),tot, type.Data());
3594  } else {
3595  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3596  action.Data(), done, tot, frac);
3597  }
3598  if (fSync)
3599  fprintf(stderr,"%s", msg);
3600  else
3601  NotifyLogMsg(msg, 0);
3602  }
3603  // Notify GUIs
3604  DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3605  } else {
3606 
3607  // Just send the message one level up
3609  m << action << tot << done << st;
3610  gProofServ->GetSocket()->Send(m);
3611  }
3612  }
3613  break;
3614 
3615  case kPROOF_STARTPROCESS:
3616  {
3617  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3618 
3619  // For Proof-Lite this variable is the number of workers and is set
3620  // by the player
3621  if (!IsLite()) {
3622  fNotIdle = 1;
3623  fIsWaiting = kFALSE;
3624  }
3625 
3626  // Redirect the output, if needed
3627  fRedirLog = (fSync) ? fRedirLog : kTRUE;
3628 
3629  // The signal is used on masters by XrdProofdProtocol to catch
3630  // the start of processing; on clients it allows to update the
3631  // progress dialog
3632  if (!TestBit(TProof::kIsMaster)) {
3633 
3634  // This is the end of preparation
3635  fQuerySTW.Stop();
3637  PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3638 
3639  TString selec;
3640  Int_t dsz = -1;
3641  Long64_t first = -1, nent = -1;
3642  (*mess) >> selec >> dsz >> first >> nent;
3643  // Start or reset the progress dialog
3644  if (!gROOT->IsBatch()) {
3645  if (fProgressDialog &&
3647  if (!fProgressDialogStarted) {
3648  fProgressDialog->ExecPlugin(5, this,
3649  selec.Data(), dsz, first, nent);
3651  } else {
3652  ResetProgressDialog(selec, dsz, first, nent);
3653  }
3654  }
3656  }
3657  }
3658  }
3659  break;
3660 
3661  case kPROOF_ENDINIT:
3662  {
3663  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3664 
3665  if (TestBit(TProof::kIsMaster)) {
3666  if (fPlayer)
3667  fPlayer->SetInitTime();
3668  }
3669  }
3670  break;
3671 
3672  case kPROOF_SETIDLE:
3673  {
3674  PDB(kGlobal,2)
3675  Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3676 
3677  // The session is idle
3678  if (IsLite()) {
3679  if (fNotIdle > 0) {
3680  fNotIdle--;
3681  PDB(kGlobal,2)
3682  Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3683  } else {
3684  Warning("HandleInputMessage",
3685  "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3686  sl->GetOrdinal());
3687  }
3688  } else {
3689  fNotIdle = 0;
3690  // Check if the query has been enqueued
3691  if ((mess->BufferSize() > mess->Length()))
3692  (*mess) >> fIsWaiting;
3693  }
3694  }
3695  break;
3696 
3697  case kPROOF_QUERYSUBMITTED:
3698  {
3699  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3700 
3701  // We have received the sequential number
3702  (*mess) >> fSeqNum;
3703  Bool_t sync = fSync;
3704  if ((mess->BufferSize() > mess->Length()))
3705  (*mess) >> sync;
3706  if (sync != fSync && fSync) {
3707  // The server required to switch to asynchronous mode
3708  Activate();
3709  fSync = kFALSE;
3710  }
3711  DisableGoAsyn();
3712  // Check if the query has been enqueued
3713  fIsWaiting = kTRUE;
3714  // For Proof-Lite this variable is the number of workers and is set by the player
3715  if (!IsLite())
3716  fNotIdle = 1;
3717 
3718  rc = 1;
3719  }
3720  break;
3721 
3722  case kPROOF_SESSIONTAG:
3723  {
3724  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3725 
3726  // We have received the unique tag and save it as name of this object
3727  TString stag;
3728  (*mess) >> stag;
3729  SetName(stag);
3730  // In the TSlave object
3731  sl->SetSessionTag(stag);
3732  // Server may have also sent the group
3733  if ((mess->BufferSize() > mess->Length()))
3734  (*mess) >> fGroup;
3735  // Server may have also sent the user
3736  if ((mess->BufferSize() > mess->Length())) {
3737  TString usr;
3738  (*mess) >> usr;
3739  if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3740  }
3741  }
3742  break;
3743 
3744  case kPROOF_FEEDBACK:
3745  {
3746  PDB(kGlobal,2)
3747  Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3748  TList *out = (TList *) mess->ReadObject(TList::Class());
3749  out->SetOwner();
3750  if (fPlayer)
3751  fPlayer->StoreFeedback(sl, out); // Adopts the list
3752  else
3753  // Not yet ready: stop collect asap
3754  rc = 1;
3755  }
3756  break;
3757 
3758  case kPROOF_AUTOBIN:
3759  {
3760  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3761 
3762  TString name;
3763  Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3764 
3765  (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3766 
3767  if (fPlayer) fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
3768 
3769  TMessage answ(kPROOF_AUTOBIN);
3770 
3771  answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3772 
3773  s->Send(answ);
3774  }
3775  break;
3776 
3777  case kPROOF_PROGRESS:
3778  {
3779  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3780 
3781  if (GetRemoteProtocol() > 25) {
3782  // New format
3783  TProofProgressInfo *pi = 0;
3784  (*mess) >> pi;
3785  fPlayer->Progress(sl,pi);
3786  } else if (GetRemoteProtocol() > 11) {
3787  Long64_t total, processed, bytesread;
3788  Float_t initTime, procTime, evtrti, mbrti;
3789  (*mess) >> total >> processed >> bytesread
3790  >> initTime >> procTime
3791  >> evtrti >> mbrti;
3792  if (fPlayer)
3793  fPlayer->Progress(sl, total, processed, bytesread,
3794  initTime, procTime, evtrti, mbrti);
3795 
3796  } else {
3797  // Old format
3798  Long64_t total, processed;
3799  (*mess) >> total >> processed;
3800  if (fPlayer)
3801  fPlayer->Progress(sl, total, processed);
3802  }
3803  }
3804  break;
3805 
3806  case kPROOF_STOPPROCESS:
3807  {
3808  // This message is sent from a worker that finished processing.
3809  // We determine whether it was asked to finish by the
3810  // packetizer or stopped during processing a packet
3811  // (by TProof::RemoveWorkers() or by an external signal).
3812  // In the later case call packetizer->MarkBad.
3813  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3814 
3815  Long64_t events = 0;
3816  Bool_t abort = kFALSE;
3817  TProofProgressStatus *status = 0;
3818 
3819  if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3820  (*mess) >> status >> abort;
3821  } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3822  (*mess) >> events >> abort;
3823  } else {
3824  (*mess) >> events;
3825  }
3826  if (fPlayer) {
3827  if (fProtocol > 18) {
3828  TList *listOfMissingFiles = 0;
3829  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3830  listOfMissingFiles = new TList();
3831  listOfMissingFiles->SetName("MissingFiles");
3832  if (fPlayer)
3833  fPlayer->AddOutputObject(listOfMissingFiles);
3834  }
3835  if (fPlayer->GetPacketizer()) {
3836  Int_t ret =
3837  fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3838  if (ret > 0)
3839  fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3840  // This object is now owned by the packetizer
3841  status = 0;
3842  }
3843  if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3844  } else {
3845  fPlayer->AddEventsProcessed(events);
3846  }
3847  }
3848  SafeDelete(status);
3849  if (!TestBit(TProof::kIsMaster))
3850  Emit("StopProcess(Bool_t)", abort);
3851  break;
3852  }
3853 
3854  case kPROOF_SUBMERGER:
3855  {
3856  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3857  HandleSubmerger(mess, sl);
3858  }
3859  break;
3860 
3861  case kPROOF_GETSLAVEINFO:
3862  {
3863  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3864 
3865  Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3866  Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3867  TList* tmpinfo = 0;
3868  (*mess) >> tmpinfo;
3869  if (tmpinfo == 0) {
3870  Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3871  } else {
3872  tmpinfo->SetOwner(kFALSE);
3873  Int_t nentries = tmpinfo->GetSize();
3874  for (Int_t i=0; i<nentries; i++) {
3875  TSlaveInfo* slinfo =
3876  dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3877  if (slinfo) {
3878  // If PROOF-Lite
3879  if (IsLite()) slinfo->fHostName = gSystem->HostName();
3880  // Check if we have already a instance for this worker
3881  TIter nxw(fSlaveInfo);
3882  TSlaveInfo *ourwi = 0;
3883  while ((ourwi = (TSlaveInfo *)nxw())) {
3884  if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3885  ourwi->SetSysInfo(slinfo->GetSysInfo());
3886  ourwi->fHostName = slinfo->GetName();
3887  if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3888  ourwi->fDataDir = slinfo->GetDataDir();
3889  break;
3890  }
3891  }
3892  if (!ourwi) {
3893  fSlaveInfo->Add(slinfo);
3894  } else {
3895  slinfo = ourwi;
3896  }
3897  if (slinfo->fStatus != TSlaveInfo::kBad) {
3898  if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3899  if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3900  }
3901  if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3902  slinfo->fMsd = sl->GetMsd();
3903  }
3904  }
3905  delete tmpinfo;
3906  rc = 1;
3907  }
3908  }
3909  break;
3910 
3911  case kPROOF_VALIDATE_DSET:
3912  {
3913  PDB(kGlobal,2)
3914  Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3915  TDSet* dset = 0;
3916  (*mess) >> dset;
3917  if (!fDSet)
3918  Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3919  else
3920  fDSet->Validate(dset);
3921  delete dset;
3922  }
3923  break;
3924 
3925  case kPROOF_DATA_READY:
3926  {
3927  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3928  Bool_t dataready = kFALSE;
3929  Long64_t totalbytes, bytesready;
3930  (*mess) >> dataready >> totalbytes >> bytesready;
3931  fTotalBytes += totalbytes;
3932  fBytesReady += bytesready;
3933  if (dataready == kFALSE) fDataReady = dataready;
3934  }
3935  break;
3936 
3937  case kPROOF_PING:
3938  // do nothing (ping is already acknowledged)
3939  break;
3940 
3941  case kPROOF_MESSAGE:
3942  {
3943  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3944 
3945  // We have received the unique tag and save it as name of this object
3946  TString msg;
3947  (*mess) >> msg;
3948  Bool_t lfeed = kTRUE;
3949  if ((mess->BufferSize() > mess->Length()))
3950  (*mess) >> lfeed;
3951 
3952  if (TestBit(TProof::kIsClient)) {
3953 
3954  if (fSync) {
3955  // Notify locally
3956  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3957  } else {
3958  // Notify locally taking care of redirection, windows logs, ...
3959  NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3960  }
3961  } else {
3962 
3963  // The message is logged for debugging purposes.
3964  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3965  if (gProofServ) {
3966  // We hide it during normal operations
3968 
3969  // And send the message one level up
3970  gProofServ->SendAsynMessage(msg, lfeed);
3971  }
3972  }
3973  }
3974  break;
3975 
3976  case kPROOF_VERSARCHCOMP:
3977  {
3978  TString vac;
3979  (*mess) >> vac;
3980  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3981  Int_t from = 0;
3982  TString vers, archcomp;
3983  if (vac.Tokenize(vers, from, "|"))
3984  vac.Tokenize(archcomp, from, "|");
3985  sl->SetArchCompiler(archcomp);
3986  vers.ReplaceAll(":","|");
3987  sl->SetROOTVersion(vers);
3988  }
3989  break;
3990 
3991  default:
3992  {
3993  Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3994  sl->GetOrdinal(), what);
3995  }
3996  break;
3997  }
3998 
3999  // Cleanup
4000  if (delete_mess)
4001  delete mess;
4002 
4003  // We are done successfully
4004  return rc;
4005 }
4006 
4007 ////////////////////////////////////////////////////////////////////////////////
4008 /// Process a message of type kPROOF_SUBMERGER
4009 
4011 {
4012  // Message sub-type
4013  Int_t type = 0;
4014  (*mess) >> type;
4015  TSocket *s = sl->GetSocket();
4016 
4017  switch (type) {
4018  case kOutputSent:
4019  {
4020  if (IsEndMaster()) {
4021  Int_t merger_id = -1;
4022  (*mess) >> merger_id;
4023 
4024  PDB(kSubmerger, 2)
4025  Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4026  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4027 
4028  if (!fMergers || fMergers->GetSize() <= merger_id) {
4029  Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4030  break;
4031  }
4032  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4033  mi->SetMergedWorker();
4034  if (mi->AreAllWorkersMerged()) {
4035  mi->Deactivate();
4036  if (GetActiveMergersCount() == 0) {
4037  fMergers->Clear();
4038  delete fMergers;
4039  fMergersSet = kFALSE;
4040  fMergersCount = -1;
4041  fLastAssignedMerger = 0;
4042  PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4043  }
4044  }
4045  } else {
4046  PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4047  }
4048  }
4049  break;
4050 
4051  case kMergerDown:
4052  {
4053  Int_t merger_id = -1;
4054  (*mess) >> merger_id;
4055 
4056  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4057 
4058  if (!fMergers || fMergers->GetSize() <= merger_id) {
4059  Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4060  break;
4061  }
4062 
4063  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4064  if (!mi->IsActive()) {
4065  break;
4066  } else {
4067  mi->Deactivate();
4068  }
4069 
4070  // Stop the invalid merger in the case it is still listening
4071  TMessage stop(kPROOF_SUBMERGER);
4072  stop << Int_t(kStopMerging);
4073  stop << 0;
4074  s->Send(stop);
4075 
4076  // Ask for results from merger (only original results from this node as worker are returned)
4077  AskForOutput(mi->GetMerger());
4078 
4079  // Ask for results from all workers assigned to this merger
4080  TIter nxo(mi->GetWorkers());
4081  TObject * o = 0;
4082  while ((o = nxo())) {
4083  AskForOutput((TSlave *)o);
4084  }
4085  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4086  }
4087  break;
4088 
4089  case kOutputSize:
4090  {
4091  if (IsEndMaster()) {
4092  PDB(kSubmerger, 2)
4093  Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4094 
4095  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4096  if (!fFinalizationRunning) {
4097  Info("HandleSubmerger", "finalization on %s started ...", prefix);
4099  }
4100 
4101  Int_t output_size = 0;
4102  Int_t merging_port = 0;
4103  (*mess) >> output_size >> merging_port;
4104 
4105  PDB(kSubmerger, 2) Info("HandleSubmerger",
4106  "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4107  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4108  TString msg;
4109  if (!fMergersSet) {
4110 
4112 
4113  // First pass - setting number of mergers according to user or dynamically
4114  fMergersCount = -1; // No mergers used if not set by user
4115  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4116  if (mc) fMergersCount = mc->GetVal(); // Value set by user
4117  TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4118  if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4119 
4120  // Mergers count specified by user but not valid
4121  if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4122  msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4123  prefix, fMergersCount, activeWorkers);
4124  if (gProofServ)
4126  else
4127  Printf("%s",msg.Data());
4128  fMergersCount = 0;
4129  }
4130  // Mergers count will be set dynamically
4131  if ((fMergersCount == 0) && (!fMergersByHost)) {
4132  if (activeWorkers > 1) {
4133  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4134  if (activeWorkers / fMergersCount < 2)
4135  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4136  }
4137  if (fMergersCount > 1)
4138  msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4139  prefix, fMergersCount, activeWorkers);
4140  else {
4141  msg.Form("%s: No mergers will be used for %d workers",
4142  prefix, activeWorkers);
4143  fMergersCount = -1;
4144  }
4145  if (gProofServ)
4147  else
4148  Printf("%s",msg.Data());
4149  } else if (fMergersByHost) {
4150  // We force mergers at host level to minimize network traffic
4151  if (activeWorkers > 1) {
4152  fMergersCount = 0;
4153  THashList hosts;
4154  TIter nxwk(fSlaves);
4155  TObject *wrk = 0;
4156  while ((wrk = nxwk())) {
4157  if (!hosts.FindObject(wrk->GetName())) {
4158  hosts.Add(new TObjString(wrk->GetName()));
4159  fMergersCount++;
4160  }
4161  }
4162  }
4163  if (fMergersCount > 1)
4164  msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4165  prefix, fMergersCount, activeWorkers);
4166  else {
4167  msg.Form("%s: No mergers will be used for %d workers",
4168  prefix, activeWorkers);
4169  fMergersCount = -1;
4170  }
4171  if (gProofServ)
4173  else
4174  Printf("%s",msg.Data());
4175  } else {
4176  msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4177  prefix, fMergersCount, activeWorkers);
4178  if (gProofServ)
4180  else
4181  Printf("%s",msg.Data());
4182  }
4183 
4184  // We started merging; we call it here because fMergersCount is still the original number
4185  // and can be saved internally
4187 
4188  // Update merger counters (new workers are not yet active)
4190 
4191  if (fMergersCount > 0) {
4192 
4193  fMergers = new TList();
4194  fLastAssignedMerger = 0;
4195  // Total number of workers, which will not act as mergers ('pure workers')
4196  fWorkersToMerge = (activeWorkers - fMergersCount);
4197  // Establish the first merger
4198  if (!CreateMerger(sl, merging_port)) {
4199  // Cannot establish first merger
4200  AskForOutput(sl);
4201  fWorkersToMerge--;
4202  fMergersCount--;
4203  }
4205  } else {
4206  AskForOutput(sl);
4207  }
4208  fMergersSet = kTRUE;
4209  } else {
4210  // Multiple pass
4211  if (fMergersCount == -1) {
4212  // No mergers. Workers send their outputs directly to master
4213  AskForOutput(sl);
4214  } else {
4215  if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4216  RedirectWorker(s, sl, output_size);
4217  fRedirectNext--;
4218  } else {
4219  Bool_t newMerger = kTRUE;
4220  if (fMergersByHost) {
4221  TIter nxmg(fMergers);
4222  TMergerInfo *mgi = 0;
4223  while ((mgi = (TMergerInfo *) nxmg())) {
4224  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4225  newMerger = kFALSE;
4226  break;
4227  }
4228  }
4229  }
4230  if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4231  // Still not enough mergers established
4232  if (!CreateMerger(sl, merging_port)) {
4233  // Cannot establish a merger
4234  AskForOutput(sl);
4235  fWorkersToMerge--;
4236  fMergersCount--;
4237  }
4238  } else
4239  RedirectWorker(s, sl, output_size);
4240  }
4241  }
4242  }
4243  } else {
4244  Error("HandleSubMerger","kOutputSize received not on endmaster!");
4245  }
4246  }
4247  break;
4248  }
4249 }
4250 
4251 ////////////////////////////////////////////////////////////////////////////////
4252 /// Redirect output of worker sl to some merger
4253 
4254 void TProof::RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size)
4255 {
4256  Int_t merger_id = -1;
4257 
4258  if (fMergersByHost) {
4259  for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4260  TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4261  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4262  merger_id = i;
4263  break;
4264  }
4265  }
4266  } else {
4267  merger_id = FindNextFreeMerger();
4268  }
4269 
4270  if (merger_id == -1) {
4271  // No free merger (probably it had crashed before)
4272  AskForOutput(sl);
4273  } else {
4274  TMessage sendoutput(kPROOF_SUBMERGER);
4275  sendoutput << Int_t(kSendOutput);
4276  PDB(kSubmerger, 2)
4277  Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4278 
4279  PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4280  if (!fMergers || fMergers->GetSize() <= merger_id) {
4281  Error("RedirectWorker", "#%d not in list ", merger_id);
4282  return;
4283  }
4284  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4285 
4286  TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4287  sendoutput << merger_id;
4288  sendoutput << hname;
4289  sendoutput << mi->GetPort();
4290  s->Send(sendoutput);
4291  mi->AddMergedObjects(output_size);
4292  mi->AddWorker(sl);
4293  }
4294 }
4295 
4296 ////////////////////////////////////////////////////////////////////////////////
4297 /// Return a merger, which is both active and still accepts some workers to be
4298 /// assigned to it. It works on the 'round-robin' basis.
4299 
4301 {
4302  while (fLastAssignedMerger < fMergers->GetSize() &&
4303  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4304  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4306  }
4307 
4308  if (fLastAssignedMerger == fMergers->GetSize()) {
4309  fLastAssignedMerger = 0;
4310  } else {
4311  return fLastAssignedMerger++;
4312  }
4313 
4314  while (fLastAssignedMerger < fMergers->GetSize() &&
4315  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4316  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4318  }
4319 
4320  if (fLastAssignedMerger == fMergers->GetSize()) {
4321  return -1;
4322  } else {
4323  return fLastAssignedMerger++;
4324  }
4325 }
4326 
4327 ////////////////////////////////////////////////////////////////////////////////
4328 /// Master asks for output from worker sl
4329 
4331 {
4332  TMessage sendoutput(kPROOF_SUBMERGER);
4333  sendoutput << Int_t(kSendOutput);
4334 
4335  PDB(kSubmerger, 2) Info("AskForOutput",
4336  "worker %s was asked to send its output to master",
4337  sl->GetOrdinal());
4338 
4339  sendoutput << -1;
4340  sendoutput << TString("master");
4341  sendoutput << -1;
4342  sl->GetSocket()->Send(sendoutput);
4343  if (IsLite()) fMergePrg.IncreaseNWrks();
4344 }
4345 
4346 ////////////////////////////////////////////////////////////////////////////////
4347 /// Final update of the progress dialog
4348 
4350 {
4351  if (!fPlayer) return;
4352 
4353  // Handle abort ...
4355  if (fSync)
4356  Info("UpdateDialog",
4357  "processing was aborted - %lld events processed",
4359 
4360  if (GetRemoteProtocol() > 11) {
4361  // New format
4362  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4363  } else {
4365  }
4366  Emit("StopProcess(Bool_t)", kTRUE);
4367  }
4368 
4369  // Handle stop ...
4371  if (fSync)
4372  Info("UpdateDialog",
4373  "processing was stopped - %lld events processed",
4375 
4376  if (GetRemoteProtocol() > 25) {
4377  // New format
4378  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4379  } else if (GetRemoteProtocol() > 11) {
4380  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4381  } else {
4383  }
4384  Emit("StopProcess(Bool_t)", kFALSE);
4385  }
4386 
4387  // Final update of the dialog box
4388  if (GetRemoteProtocol() > 25) {
4389  // New format
4390  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4391  10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4392  (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4393  } else if (GetRemoteProtocol() > 11) {
4394  // New format
4395  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4396  7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4397  (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4398  } else {
4399  EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4400  }
4401 }
4402 
4403 ////////////////////////////////////////////////////////////////////////////////
4404 /// Activate the a-sync input handler.
4405 
4407 {
4408  TIter next(fSlaves);
4409  TSlave *sl;
4410 
4411  while ((sl = (TSlave*) next()))
4412  if (sl->GetInputHandler())
4413  sl->GetInputHandler()->Add();
4414 }
4415 
4416 ////////////////////////////////////////////////////////////////////////////////
4417 /// De-activate a-sync input handler.
4418 
4420 {
4421  TIter next(fSlaves);
4422  TSlave *sl;
4423 
4424  while ((sl = (TSlave*) next()))
4425  if (sl->GetInputHandler())
4426  sl->GetInputHandler()->Remove();
4427 }
4428 
4429 ////////////////////////////////////////////////////////////////////////////////
4430 /// Get the active mergers count
4431 
4433 {
4434  if (!fMergers) return 0;
4435 
4436  Int_t active_mergers = 0;
4437 
4438  TIter mergers(fMergers);
4439  TMergerInfo *mi = 0;
4440  while ((mi = (TMergerInfo *)mergers())) {
4441  if (mi->IsActive()) active_mergers++;
4442  }
4443 
4444  return active_mergers;
4445 }
4446 
4447 ////////////////////////////////////////////////////////////////////////////////
4448 /// Create a new merger
4449 
4451 {
4452  PDB(kSubmerger, 2)
4453  Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4454 
4455  PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4456 
4457  if (port <= 0) {
4458  PDB(kSubmerger,2)
4459  Info("CreateMerger", "cannot create merger on port %d - exit", port);
4460  return kFALSE;
4461  }
4462 
4463  Int_t workers = -1;
4464  if (!fMergersByHost) {
4465  Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4466  // Number of pure workers, which are not simply divisible by mergers
4467  Int_t rest = fWorkersToMerge % mergersToCreate;
4468  // We add one more worker for each of the first 'rest' mergers being established
4469  if (rest > 0 && fMergers->GetSize() < rest) {
4470  rest = 1;
4471  } else {
4472  rest = 0;
4473  }
4474  workers = (fWorkersToMerge / mergersToCreate) + rest;
4475  } else {
4476  Int_t workersOnHost = 0;
4477  for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4478  if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4479  }
4480  workers = workersOnHost - 1;
4481  }
4482 
4483  TString msg;
4484  msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4485 
4486  if (gProofServ) {
4488  } else {
4489  Printf("%s",msg.Data());
4490  }
4491  TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4492 
4493  TMessage bemerger(kPROOF_SUBMERGER);
4494  bemerger << Int_t(kBeMerger);
4495  bemerger << fMergers->GetSize();
4496  bemerger << workers;
4497  sl->GetSocket()->Send(bemerger);
4498 
4499  PDB(kSubmerger,2) Info("CreateMerger",
4500  "merger #%d (port: %d) for %d workers started",
4501  fMergers->GetSize(), port, workers);
4502 
4503  fMergers->Add(merger);
4504  fWorkersToMerge = fWorkersToMerge - workers;
4505 
4506  fRedirectNext = workers / 2;
4507 
4508  PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4509  return kTRUE;
4510 }
4511 
4512 ////////////////////////////////////////////////////////////////////////////////
4513 /// Add a bad slave server to the bad slave list and remove it from
4514 /// the active list and from the two monitor objects. Assume that the work
4515 /// done by this worker was lost and ask packerizer to reassign it.
4516 
4517 void TProof::MarkBad(TSlave *wrk, const char *reason)
4518 {
4519  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4520 
4521  // We may have been invalidated in the meanwhile: nothing to do in such a case
4522  if (!IsValid()) return;
4523 
4524  if (!wrk) {
4525  Error("MarkBad", "worker instance undefined: protocol error? ");
4526  return;
4527  }
4528 
4529  // Local URL
4530  static TString thisurl;
4531  if (thisurl.IsNull()) {
4532  if (IsMaster()) {
4533  Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4534  thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4535  if (port > 0) thisurl += TString::Format(":%d", port);
4536  } else {
4537  thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4538  }
4539  }
4540 
4541  if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4542  // Message for notification
4543  const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4544  TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4545  TString msg;
4546  msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4547  src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4548  (reason && strlen(reason)) ? reason : "unknown");
4549  Info("MarkBad", "%s", msg.Data());
4550  // Notify one level up, if the case
4551  // Add some hint for diagnostics
4552  if (gProofServ) {
4553  msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4554  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4555  } else {
4556  msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4557  }
4558  msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4559  msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4560  msg += TString::Format(" +++\n");
4561  if (gProofServ) {
4562  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4563  "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4565  } else {
4566  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4567  "Display(\"*\")\n\n", thisurl.Data());
4568  Printf("%s", msg.Data());
4569  }
4570  } else if (reason) {
4571  if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4572  Info("MarkBad", "worker %s at %s:%d asked to terminate",
4573  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4574  }
4575  }
4576 
4577  if (IsMaster() && reason) {
4578  if (strcmp(reason, kPROOF_TerminateWorker)) {
4579  // if the reason was not a planned termination
4580  TList *listOfMissingFiles = 0;
4581  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4582  listOfMissingFiles = new TList();
4583  listOfMissingFiles->SetName("MissingFiles");
4584  if (fPlayer)
4585  fPlayer->AddOutputObject(listOfMissingFiles);
4586  }
4587  // If a query is being processed, assume that the work done by
4588  // the worker was lost and needs to be reassigned.
4589  TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4590  if (packetizer) {
4591  // the worker was lost so do resubmit the packets
4592  packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4593  }
4594  } else {
4595  // Tell the coordinator that we are gone
4596  if (gProofServ) {
4597  TString ord(wrk->GetOrdinal());
4598  Int_t id = ord.Last('.');
4599  if (id != kNPOS) ord.Remove(0, id+1);
4600  gProofServ->ReleaseWorker(ord.Data());
4601  }
4602  }
4603  } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4604  // We are invalid after this
4605  fValid = kFALSE;
4606  }
4607 
4608  fActiveSlaves->Remove(wrk);
4609  FindUniqueSlaves();
4610 
4611  fAllMonitor->Remove(wrk->GetSocket());
4612  fActiveMonitor->Remove(wrk->GetSocket());
4613 
4615 
4616  if (IsMaster()) {
4617  if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4618  // if the reason was a planned termination then delete the worker and
4619  // remove it from all the lists
4620  fSlaves->Remove(wrk);
4621  fBadSlaves->Remove(wrk);
4622  fActiveSlaves->Remove(wrk);
4623  fInactiveSlaves->Remove(wrk);
4624  fUniqueSlaves->Remove(wrk);
4625  fAllUniqueSlaves->Remove(wrk);
4626  fNonUniqueMasters->Remove(wrk);
4627 
4628  // we add it to the list of terminated slave infos instead, so that it
4629  // stays available in the .workers persistent file
4630  TSlaveInfo *si = new TSlaveInfo(
4631  wrk->GetOrdinal(),
4632  Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4633  0, "", wrk->GetWorkDir());
4635  else delete si;
4636 
4637  delete wrk;
4638  } else {
4639  fBadSlaves->Add(wrk);
4640  fActiveSlaves->Remove(wrk);
4641  fUniqueSlaves->Remove(wrk);
4642  fAllUniqueSlaves->Remove(wrk);
4643  fNonUniqueMasters->Remove(wrk);
4645  wrk->Close();
4646  // Update the mergers count, if needed
4647  if (fMergersSet) {
4648  Int_t mergersCount = -1;
4649  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4650  if (mc) mergersCount = mc->GetVal(); // Value set by user
4651  // Mergers count is set dynamically: recalculate it
4652  if (mergersCount == 0) {
4654  if (activeWorkers > 1) {
4655  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4656  if (activeWorkers / fMergersCount < 2)
4657  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4658  }
4659  }
4660  }
4661  }
4662 
4663  // Update session workers files
4664  SaveWorkerInfo();
4665  } else {
4666  // On clients the proof session should be removed from the lists
4667  // and deleted, since it is not valid anymore
4668  fSlaves->Remove(wrk);
4669  if (fManager)
4670  fManager->DiscardSession(this);
4671  }
4672 }
4673 
4674 ////////////////////////////////////////////////////////////////////////////////
4675 /// Add slave with socket s to the bad slave list and remove if from
4676 /// the active list and from the two monitor objects.
4677 
4678 void TProof::MarkBad(TSocket *s, const char *reason)
4679 {
4680  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4681 
4682  // We may have been invalidated in the meanwhile: nothing to do in such a case
4683  if (!IsValid()) return;
4684 
4685  TSlave *wrk = FindSlave(s);
4686  MarkBad(wrk, reason);
4687 }
4688 
4689 ////////////////////////////////////////////////////////////////////////////////
4690 /// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4691 
4693 {
4694  if (!wrk) {
4695  Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4696  return;
4697  }
4698 
4699  // Send stop message
4700  if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4701  TMessage mess(kPROOF_STOP);
4702  wrk->GetSocket()->Send(mess);
4703  } else {
4704  if (gDebug > 0)
4705  Info("TerminateWorker", "connection to worker is already down: cannot"
4706  " send termination message");
4707  }
4708 
4709  // This is a bad worker from now on
4711 }
4712 
4713 ////////////////////////////////////////////////////////////////////////////////
4714 /// Ask an active worker 'ord' to terminate, i.e. to shutdown
4715 
4716 void TProof::TerminateWorker(const char *ord)
4717 {
4718  if (ord && strlen(ord) > 0) {
4719  Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
4720  if (IsMaster()) {
4721  TIter nxw(fSlaves);
4722  TSlave *wrk = 0;
4723  while ((wrk = (TSlave *)nxw())) {
4724  if (all || !strcmp(wrk->GetOrdinal(), ord)) {
4725  TerminateWorker(wrk);
4726  if (!all) break;
4727  }
4728  }
4729  } else {
4730  TMessage mess(kPROOF_STOP);
4731  mess << TString(ord);
4732  Broadcast(mess);
4733  }
4734  }
4735 }
4736 
4737 ////////////////////////////////////////////////////////////////////////////////
4738 /// Ping PROOF. Returns 1 if master server responded.
4739 
4741 {
4742  return Ping(kActive);
4743 }
4744 
4745 ////////////////////////////////////////////////////////////////////////////////
4746 /// Ping PROOF slaves. Returns the number of slaves that responded.
4747 
4749 {
4750  TList *slaves = 0;
4751  if (list == kAll) slaves = fSlaves;
4752  if (list == kActive) slaves = fActiveSlaves;
4753  if (list == kUnique) slaves = fUniqueSlaves;
4754  if (list == kAllUnique) slaves = fAllUniqueSlaves;
4755 
4756  if (slaves->GetSize() == 0) return 0;
4757 
4758  int nsent = 0;
4759  TIter next(slaves);
4760 
4761  TSlave *sl;
4762  while ((sl = (TSlave *)next())) {
4763  if (sl->IsValid()) {
4764  if (sl->Ping() == -1) {
4765  MarkBad(sl, "ping unsuccessful");
4766  } else {
4767  nsent++;
4768  }
4769  }
4770  }
4771 
4772  return nsent;
4773 }
4774 
4775 ////////////////////////////////////////////////////////////////////////////////
4776 /// Ping PROOF slaves. Returns the number of slaves that responded.
4777 
4779 {
4780  TList *slaves = fSlaves;
4781 
4782  if (slaves->GetSize() == 0) return;
4783 
4784  TIter next(slaves);
4785 
4786  TSlave *sl;
4787  while ((sl = (TSlave *)next())) {
4788  if (sl->IsValid()) {
4789  sl->Touch();
4790  }
4791  }
4792 
4793  return;
4794 }
4795 
4796 ////////////////////////////////////////////////////////////////////////////////
4797 /// Print status of PROOF cluster.
4798 
4799 void TProof::Print(Option_t *option) const
4800 {
4801  TString secCont;
4802 
4803  if (TestBit(TProof::kIsClient)) {
4804  Printf("Connected to: %s (%s)", GetMaster(),
4805  IsValid() ? "valid" : "invalid");
4806  Printf("Port number: %d", GetPort());
4807  Printf("User: %s", GetUser());
4808  Printf("ROOT version|rev: %s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4809  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4811  TSlave *sl = (TSlave *)fActiveSlaves->First();
4812  if (sl) {
4813  TString sc;
4814  if (sl->GetSocket()->GetSecContext())
4815  Printf("Security context: %s",
4816  sl->GetSocket()->GetSecContext()->AsString(sc));
4817  Printf("Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4818  } else {
4819  Printf("Security context: Error - No connection");
4820  Printf("Proofd protocol version: Error - No connection");
4821  }
4822  Printf("Client protocol version: %d", GetClientProtocol());
4823  Printf("Remote protocol version: %d", GetRemoteProtocol());
4824  Printf("Log level: %d", GetLogLevel());
4825  Printf("Session unique tag: %s", IsValid() ? GetSessionTag() : "");
4826  Printf("Default data pool: %s", IsValid() ? GetDataPoolUrl() : "");
4827  if (IsValid())
4828  const_cast<TProof*>(this)->SendPrint(option);
4829  } else {
4830  const_cast<TProof*>(this)->AskStatistics();
4831  if (IsParallel())
4832  Printf("*** Master server %s (parallel mode, %d workers):",
4834  else
4835  Printf("*** Master server %s (sequential mode):",
4836  gProofServ->GetOrdinal());
4837 
4838  Printf("Master host name: %s", gSystem->HostName());
4839  Printf("Port number: %d", GetPort());
4840  if (strlen(gProofServ->GetGroup()) > 0) {
4841  Printf("User/Group: %s/%s", GetUser(), gProofServ->GetGroup());
4842  } else {
4843  Printf("User: %s", GetUser());
4844  }
4845  TString ver;
4846  ver.Form("%s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4847  if (gSystem->Getenv("ROOTVERSIONTAG"))
4848  ver.Form("%s|%s", gROOT->GetVersion(), gSystem->Getenv("ROOTVERSIONTAG"));
4849  Printf("ROOT version|rev|tag: %s", ver.Data());
4850  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4852  Printf("Protocol version: %d", GetClientProtocol());
4853  Printf("Image name: %s", GetImage());
4854  Printf("Working directory: %s", gSystem->WorkingDirectory());
4855  Printf("Config directory: %s", GetConfDir());
4856  Printf("Config file: %s", GetConfFile());
4857  Printf("Log level: %d", GetLogLevel());
4858  Printf("Number of workers: %d", GetNumberOfSlaves());
4859  Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
4860  Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
4861  Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
4862  Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
4863  Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
4864  Printf("Total real time used (s): %.3f", GetRealTime());
4865  Printf("Total CPU time used (s): %.3f", GetCpuTime());
4866  if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
4867  Printf("List of workers:");
4868  TList masters;
4869  TIter nextslave(fSlaves);
4870  while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4871  if (!sl->IsValid()) continue;
4872 
4873  if (sl->GetSlaveType() == TSlave::kSlave) {
4874  sl->Print(option);
4875  } else if (sl->GetSlaveType() == TSlave::kMaster) {
4876  TMessage mess(kPROOF_PRINT);
4877  mess.WriteString(option);
4878  if (sl->GetSocket()->Send(mess) == -1)
4879  const_cast<TProof*>(this)->MarkBad(sl, "could not send kPROOF_PRINT request");
4880  else
4881  masters.Add(sl);
4882  } else {
4883  Error("Print", "TSlave is neither Master nor Worker");
4884  R__ASSERT(0);
4885  }
4886  }
4887  const_cast<TProof*>(this)->Collect(&masters, fCollectTimeout);
4888  }
4889  }
4890 }
4891 
4892 ////////////////////////////////////////////////////////////////////////////////
4893 /// Extract from opt information about output handling settings.
4894 /// The understood keywords are:
4895 /// of=<file>, outfile=<file> output file location
4896 /// ds=<dsname>, dataset=<dsname> dataset name ('of' and 'ds' are
4897 /// mutually exclusive,execution stops
4898 /// if both are found)
4899 /// sft[=<opt>], savetofile[=<opt>] control saving to file
4900 ///
4901 /// For 'mvf', the <opt> integer has the following meaning:
4902 /// <opt> = <how>*10 + <force>
4903 /// <force> = 0 save to file if memory threshold is reached
4904 /// (the memory threshold is set by the cluster
4905 /// admin); in case an output file is defined, the
4906 /// files are merged at the end;
4907 /// 1 save results to file.
4908 /// <how> = 0 save at the end of the query
4909 /// 1 save results after each packet (to reduce the
4910 /// loss in case of crash).
4911 ///
4912 /// Setting 'ds' automatically sets 'mvf=1'; it is still possible to set 'mvf=11'
4913 /// to save results after each packet.
4914 ///
4915 /// The separator from the next option is either a ' ' or a ';'
4916 ///
4917 /// All recognized settings are removed from the input string opt.
4918 /// If action == 0, set up the output file accordingly, if action == 1 clean related
4919 /// output file settings.
4920 /// If the final target file is local then 'target' is set to the final local path
4921 /// when action == 0 and used to retrieve the file with TFile::Cp when action == 1.
4922 ///
4923 /// Output file settings are in the form
4924 ///
4925 /// <previous_option>of=name <next_option>
4926 /// <previous_option>outfile=name,...;<next_option>
4927 ///
4928 /// The separator from the next option is either a ' ' or a ';'
4929 /// Called interanally by TProof::Process.
4930 ///
4931 /// Returns 0 on success, -1 on error.
4932 
4934 {
4935  TString outfile, dsname, stfopt;
4936  if (action == 0) {
4937  TString tagf, tagd, tags, oo;
4938  Ssiz_t from = 0, iof = kNPOS, iod = kNPOS, ios = kNPOS;
4939  while (opt.Tokenize(oo, from, "[; ]")) {
4940  if (oo.BeginsWith("of=")) {
4941  tagf = "of=";
4942  iof = opt.Index(tagf);
4943  } else if (oo.BeginsWith("outfile=")) {
4944  tagf = "outfile=";
4945  iof = opt.Index(tagf);
4946  } else if (oo.BeginsWith("ds")) {
4947  tagd = "ds";
4948  iod = opt.Index(tagd);
4949  } else if (oo.BeginsWith("dataset")) {
4950  tagd = "dataset";
4951  iod = opt.Index(tagd);
4952  } else if (oo.BeginsWith("stf")) {
4953  tags = "stf";
4954  ios = opt.Index(tags);
4955  } else if (oo.BeginsWith("savetofile")) {
4956  tags = "savetofile";
4957  ios = opt.Index(tags);
4958  }
4959  }
4960  // Check consistency
4961  if (iof != kNPOS && iod != kNPOS) {
4962  Error("HandleOutputOptions", "options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4963  return -1;
4964  }
4965 
4966  // Check output file first
4967  if (iof != kNPOS) {
4968  from = iof + tagf.Length();
4969  if (!opt.Tokenize(outfile, from, "[; ]") || outfile.IsNull()) {
4970  Error("HandleOutputOptions", "could not extract output file settings string! (%s)", opt.Data());
4971  return -1;
4972  }
4973  // For removal from original options string
4974  tagf += outfile;
4975  }
4976  // Check dataset
4977  if (iod != kNPOS) {
4978  from = iod + tagd.Length();
4979  if (!opt.Tokenize(dsname, from, "[; ]"))
4980  if (gDebug > 0) Info("HandleOutputOptions", "no dataset name found: use default");
4981  // For removal from original options string
4982  tagd += dsname;
4983  // The name may be empty or beginning with a '='
4984  if (dsname.BeginsWith("=")) dsname.Replace(0, 1, "");
4985  if (dsname.Contains("|V")) {
4986  target = "ds|V";
4987  dsname.ReplaceAll("|V", "");
4988  }
4989  if (dsname.IsNull()) dsname = "dataset_<qtag>";
4990  }
4991  // Check stf
4992  if (ios != kNPOS) {
4993  from = ios + tags.Length();
4994  if (!opt.Tokenize(stfopt, from, "[; ]"))
4995  if (gDebug > 0) Info("HandleOutputOptions", "save-to-file not found: use default");
4996  // For removal from original options string
4997  tags += stfopt;
4998  // It must be digit
4999  if (!stfopt.IsNull()) {
5000  if (stfopt.BeginsWith("=")) stfopt.Replace(0,1,"");
5001  if (!stfopt.IsNull()) {
5002  if (!stfopt.IsDigit()) {
5003  Error("HandleOutputOptions", "save-to-file option must be a digit! (%s)", stfopt.Data());
5004  return -1;
5005  }
5006  } else {
5007  // Default
5008  stfopt = "1";
5009  }
5010  } else {
5011  // Default
5012  stfopt = "1";
5013  }
5014  }
5015  // Remove from original options string
5016  opt.ReplaceAll(tagf, "");
5017  opt.ReplaceAll(tagd, "");
5018  opt.ReplaceAll(tags, "");
5019  }
5020 
5021  // Parse now
5022  if (action == 0) {
5023  // Output file
5024  if (!outfile.IsNull()) {
5025  if (!outfile.BeginsWith("master:")) {
5027  Warning("HandleOutputOptions",
5028  "directory '%s' for the output file does not exists or is not writable:"
5029  " saving to master", gSystem->GetDirName(outfile.Data()).Data());
5030  outfile.Form("master:%s", gSystem->BaseName(outfile.Data()));
5031  } else {
5032  if (!IsLite()) {
5033  // The target file is local, so we need to retrieve it
5034  target = outfile;
5035  if (!stfopt.IsNull()) {
5036  outfile.Form("master:%s", gSystem->BaseName(target.Data()));
5037  } else {
5038  outfile = "";
5039  }
5040  }
5041  }
5042  }
5043  if (outfile.BeginsWith("master:")) {
5044  outfile.ReplaceAll("master:", "");
5045  if (outfile.IsNull() || !gSystem->IsAbsoluteFileName(outfile)) {
5046  // Get the master data dir
5047  TString ddir, emsg;
5048  if (!IsLite()) {
5049  if (Exec("gProofServ->GetDataDir()", "0", kTRUE) == 0) {
5050  TObjString *os = fMacroLog.GetLineWith("const char");
5051  if (os) {
5052  Ssiz_t fst = os->GetString().First('\"');
5053  Ssiz_t lst = os->GetString().Last('\"');
5054  ddir = os->GetString()(fst+1, lst-fst-1);
5055  } else {
5056  emsg = "could not find 'const char *' string in macro log! cannot continue";
5057  }
5058  } else {
5059  emsg = "could not retrieve master data directory info! cannot continue";
5060  }
5061  if (!emsg.IsNull()) {
5062  Error("HandleOutputOptions", "%s", emsg.Data());
5063  return -1;
5064  }
5065  }
5066  if (!ddir.IsNull()) ddir += "/";
5067  if (outfile.IsNull()) {
5068  outfile.