Logo ROOT  
Reference Guide
TProof.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2 // Author: Fons Rademakers 13/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 /**
12  \defgroup proof PROOF
13 
14  Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
15 
16 */
17 
18 /**
19  \defgroup proofkernel PROOF kernel Libraries
20  \ingroup proof
21 
22  The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
23  the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
24  and handling of results.
25 
26 */
27 
28 /** \class TProof
29 \ingroup proofkernel
30 
31 This class controls a Parallel ROOT Facility, PROOF, cluster.
32 It fires the worker servers, it keeps track of how many workers are
33 running, it keeps track of the workers running status, it broadcasts
34 messages to all workers, it collects results, etc.
35 
36 */
37 
38 #include <stdlib.h>
39 #include <fcntl.h>
40 #include <errno.h>
41 #ifdef WIN32
42 # include <io.h>
43 # include <sys/stat.h>
44 # include <sys/types.h>
45 # include "snprintf.h"
46 #else
47 # include <unistd.h>
48 #endif
49 #include <vector>
50 
51 #include "RConfigure.h"
52 #include "Riostream.h"
53 #include "Getline.h"
54 #include "TBrowser.h"
55 #include "TChain.h"
56 #include "TCondor.h"
57 #include "TDSet.h"
58 #include "TError.h"
59 #include "TEnv.h"
60 #include "TEntryList.h"
61 #include "TEventList.h"
62 #include "TFile.h"
63 #include "TFileInfo.h"
64 #include "TFTP.h"
65 #include "THashList.h"
66 #include "TInterpreter.h"
67 #include "TKey.h"
68 #include "TMap.h"
69 #include "TMath.h"
70 #include "TMessage.h"
71 #include "TMonitor.h"
72 #include "TObjArray.h"
73 #include "TObjString.h"
74 #include "TParameter.h"
75 #include "TProof.h"
76 #include "TProofNodeInfo.h"
77 #include "TProofOutputFile.h"
78 #include "TVirtualProofPlayer.h"
79 #include "TVirtualPacketizer.h"
80 #include "TProofServ.h"
81 #include "TPluginManager.h"
82 #include "TQueryResult.h"
83 #include "TRandom.h"
84 #include "TRegexp.h"
85 #include "TROOT.h"
86 #include "TSlave.h"
87 #include "TSocket.h"
88 #include "TSortedList.h"
89 #include "TSystem.h"
90 #include "TTree.h"
91 #include "TUrl.h"
92 #include "TFileCollection.h"
93 #include "TDataSetManager.h"
94 #include "TDataSetManagerFile.h"
95 #include "TMacro.h"
96 #include "TSelector.h"
97 #include "TPRegexp.h"
98 #include "TPackMgr.h"
99 
100 #include <mutex>
101 
103 
104 // Rotating indicator
105 char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
106 
107 TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
108 TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
109 
111 
112 //----- PROOF Interrupt signal handler -----------------------------------------
113 ////////////////////////////////////////////////////////////////////////////////
114 /// TProof interrupt handler.
115 
117 {
118  if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
119 
120  // Cannot ask the user : abort any remote processing
122 
123  } else {
124  // Real stop or request to switch to asynchronous?
125  const char *a = 0;
126  if (fProof->GetRemoteProtocol() < 22) {
127  a = Getline("\nSwitch to asynchronous mode not supported remotely:"
128  "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
129  } else {
130  a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
131  " any other key to continue: ");
132  }
133  if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
134 
135  Info("Notify","Processing interrupt signal ... %c", a[0]);
136 
137  // Stop or abort any remote processing
138  Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
139  fProof->StopProcess(abort);
140 
141  } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
142  // Stop any remote processing
144  }
145  }
146 
147  return kTRUE;
148 }
149 
150 //----- Input handler for messages from TProofServ -----------------------------
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Constructor
153 
155  : TFileHandler(s->GetDescriptor(),1),
156  fSocket(s), fProof(p)
157 {
158 }
159 
160 ////////////////////////////////////////////////////////////////////////////////
161 /// Handle input
162 
164 {
166  return kTRUE;
167 }
168 
169 
170 //------------------------------------------------------------------------------
171 
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 /// Used to sort slaveinfos by ordinal.
176 
178 {
179  if (!obj) return 1;
180 
181  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
182 
183  if (!si) return fOrdinal.CompareTo(obj->GetName());
184 
185  const char *myord = GetOrdinal();
186  const char *otherord = si->GetOrdinal();
187  while (myord && otherord) {
188  Int_t myval = atoi(myord);
189  Int_t otherval = atoi(otherord);
190  if (myval < otherval) return 1;
191  if (myval > otherval) return -1;
192  myord = strchr(myord, '.');
193  if (myord) myord++;
194  otherord = strchr(otherord, '.');
195  if (otherord) otherord++;
196  }
197  if (myord) return -1;
198  if (otherord) return 1;
199  return 0;
200 }
201 
202 ////////////////////////////////////////////////////////////////////////////////
203 /// Used to compare slaveinfos by ordinal.
204 
206 {
207  if (!obj) return kFALSE;
208  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
209  if (!si) return kFALSE;
210  return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
211 }
212 
213 ////////////////////////////////////////////////////////////////////////////////
214 /// Print slave info. If opt = "active" print only the active
215 /// slaves, if opt="notactive" print only the not active slaves,
216 /// if opt = "bad" print only the bad slaves, else
217 /// print all slaves.
218 
219 void TSlaveInfo::Print(Option_t *opt) const
220 {
221  TString stat = fStatus == kActive ? "active" :
222  fStatus == kBad ? "bad" :
223  "not active";
224 
225  Bool_t newfmt = kFALSE;
226  TString oo(opt);
227  if (oo.Contains("N")) {
228  newfmt = kTRUE;
229  oo.ReplaceAll("N","");
230  }
231  if (oo == "active" && fStatus != kActive) return;
232  if (oo == "notactive" && fStatus != kNotActive) return;
233  if (oo == "bad" && fStatus != kBad) return;
234 
235  if (newfmt) {
236  TString msd, si, datadir;
237  if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
238  if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
239  if (fSysInfo.fCpus > 0) {
240  si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
242  } else {
243  si.Form("| %s", fHostName.Data());
244  }
245  Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
246 
247  } else {
248  TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
249 
250  std::cout << "Slave: " << fOrdinal
251  << " hostname: " << fHostName
252  << " msd: " << msd
253  << " perf index: " << fPerfIndex
254  << " " << stat
255  << std::endl;
256  }
257 }
258 
259 ////////////////////////////////////////////////////////////////////////////////
260 /// Setter for fSysInfo
261 
263 {
264  fSysInfo.fOS = si.fOS; // OS
265  fSysInfo.fModel = si.fModel; // computer model
266  fSysInfo.fCpuType = si.fCpuType; // type of cpu
267  fSysInfo.fCpus = si.fCpus; // number of cpus
268  fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
269  fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
270  fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
271  fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
272 }
273 
275 
276 //------------------------------------------------------------------------------
277 
278 ////////////////////////////////////////////////////////////////////////////////
279 /// Destructor
280 
282 {
283  // Just delete the list, the objects are owned by other list
284  if (fWorkers) {
287  }
288 }
289 ////////////////////////////////////////////////////////////////////////////////
290 /// Increase number of already merged workers by 1
291 
293 {
294  if (AreAllWorkersMerged())
295  Error("SetMergedWorker", "all workers have been already merged before!");
296  else
297  fMergedWorkers++;
298 }
299 
300 ////////////////////////////////////////////////////////////////////////////////
301 /// Add new worker to the list of workers to be merged by this merger
302 
304 {
305  if (!fWorkers)
306  fWorkers = new TList();
307  if (fWorkersToMerge == fWorkers->GetSize()) {
308  Error("AddWorker", "all workers have been already assigned to this merger");
309  return;
310  }
311  fWorkers->Add(sl);
312 }
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 /// Return if merger has already merged all workers, i.e. if it has finished its merging job
316 
318 {
319  return (fWorkersToMerge == fMergedWorkers);
320 }
321 
322 ////////////////////////////////////////////////////////////////////////////////
323 /// Return if the determined number of workers has been already assigned to this merger
324 
326 {
327  if (!fWorkers)
328  return kFALSE;
329 
330  return (fWorkers->GetSize() == fWorkersToMerge);
331 }
332 
333 ////////////////////////////////////////////////////////////////////////////////
334 /// This a private API function.
335 /// It checks whether the connection string contains a PoD cluster protocol.
336 /// If it does, then the connection string will be changed to reflect
337 /// a real PROOF connection string for a PROOF cluster managed by PoD.
338 /// PoD: http://pod.gsi.de .
339 /// Return -1 if the PoD request failed; return 0 otherwise.
340 
341 static Int_t PoDCheckUrl(TString *_cluster)
342 {
343  if ( !_cluster )
344  return 0;
345 
346  // trim spaces from both sides of the string
347  *_cluster = _cluster->Strip( TString::kBoth );
348  // PoD protocol string
349  const TString pod_prot("pod");
350 
351  // URL test
352  // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
353  TUrl url( _cluster->Data() );
354  if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
355  return 0;
356 
357  // PoD cluster is used
358  // call pod-info in a batch mode (-b).
359  // pod-info will find either a local PoD cluster or
360  // a remote one, manged by pod-remote.
361  *_cluster = gSystem->GetFromPipe("pod-info -c -b");
362  if( 0 == _cluster->Length() ) {
363  Error("PoDCheckUrl", "PoD server is not running");
364  return -1;
365  }
366  return 0;
367 }
368 
369 ////////////////////////////////////////////////////////////////////////////////
370 /// Create a PROOF environment. Starting PROOF involves either connecting
371 /// to a master server, which in turn will start a set of slave servers, or
372 /// directly starting as master server (if master = ""). Masterurl is of
373 /// the form: [proof[s]://]host[:port]. Conffile is the name of the config
374 /// file describing the remote PROOF cluster (this argument alows you to
375 /// describe different cluster configurations).
376 /// The default is proof.conf. Confdir is the directory where the config
377 /// file and other PROOF related files are (like motd and noproof files).
378 /// Loglevel is the log level (default = 1). User specified custom config
379 /// files will be first looked for in $HOME/.conffile.
380 
381 TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
382  Int_t loglevel, const char *alias, TProofMgr *mgr)
383  : fUrl(masterurl)
384 {
385  // Default initializations
386  InitMembers();
387 
388  // This may be needed during init
389  fManager = mgr;
390 
391  // Default server type
393 
394  // Default query mode
395  fQueryMode = kSync;
396 
397  // Parse the main URL, adjusting the missing fields and setting the relevant
398  // bits
401 
402  // Protocol and Host
403  if (!masterurl || strlen(masterurl) <= 0) {
404  fUrl.SetProtocol("proof");
405  fUrl.SetHost("__master__");
406  } else if (!(strstr(masterurl, "://"))) {
407  fUrl.SetProtocol("proof");
408  }
409  // Port
410  if (fUrl.GetPort() == TUrl(" ").GetPort())
411  fUrl.SetPort(TUrl("proof:// ").GetPort());
412 
413  // Make sure to store the FQDN, so to get a solid reference for subsequent checks
414  if (!strcmp(fUrl.GetHost(), "__master__"))
415  fMaster = fUrl.GetHost();
416  else if (!strlen(fUrl.GetHost()))
418  else
420 
421  // Server type
422  if (strlen(fUrl.GetOptions()) > 0) {
423  TString opts(fUrl.GetOptions());
424  if (!(strncmp(fUrl.GetOptions(),"std",3))) {
426  opts.Remove(0,3);
427  fUrl.SetOptions(opts.Data());
428  } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
430  opts.Remove(0,4);
431  fUrl.SetOptions(opts.Data());
432  }
433  }
434 
435  // Instance type
439  if (fMaster == "__master__") {
440  fMasterServ = kTRUE;
443  } else if (fMaster == "prooflite") {
444  // Client and master are merged
445  fMasterServ = kTRUE;
447  }
448  // Flag that we are a client
450  if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
451 
452  Init(masterurl, conffile, confdir, loglevel, alias);
453 
454  // If the user was not set, get it from the master
455  if (strlen(fUrl.GetUser()) <= 0) {
456  TString usr, emsg;
457  if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
458  TObjString *os = fMacroLog.GetLineWith("const char");
459  if (os) {
460  Ssiz_t fst = os->GetString().First('\"');
461  Ssiz_t lst = os->GetString().Last('\"');
462  usr = os->GetString()(fst+1, lst-fst-1);
463  } else {
464  emsg = "could not find 'const char *' string in macro log";
465  }
466  } else {
467  emsg = "could not retrieve user info";
468  }
469  if (!emsg.IsNull()) {
470  // Get user logon name
472  if (pw) {
473  usr = pw->fUser;
474  delete pw;
475  }
476  Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
477  }
478  // Set the user name in the main URL
479  fUrl.SetUser(usr.Data());
480  }
481 
482  // If called by a manager, make sure it stays in last position
483  // for cleaning
484  if (mgr) {
486  gROOT->GetListOfSockets()->Remove(mgr);
487  gROOT->GetListOfSockets()->Add(mgr);
488  }
489 
490  // Old-style server type: we add this to the list and set the global pointer
492  if (!gROOT->GetListOfProofs()->FindObject(this))
493  gROOT->GetListOfProofs()->Add(this);
494 
495  // Still needed by the packetizers: needs to be changed
496  gProof = this;
497 }
498 
499 ////////////////////////////////////////////////////////////////////////////////
500 /// Protected constructor to be used by classes deriving from TProof
501 /// (they have to call Init themselves and override StartSlaves
502 /// appropriately).
503 ///
504 /// This constructor simply closes any previous gProof and sets gProof
505 /// to this instance.
506 
507 TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
508 {
509  // Default initializations
510  InitMembers();
511 
512  if (!gROOT->GetListOfProofs()->FindObject(this))
513  gROOT->GetListOfProofs()->Add(this);
514 
515  gProof = this;
516 }
517 
518 ////////////////////////////////////////////////////////////////////////////////
519 /// Default initializations
520 
522 {
523  fValid = kFALSE;
524  fTty = kFALSE;
525  fRecvMessages = 0;
526  fSlaveInfo = 0;
530  fLastPollWorkers_s = -1;
531  fActiveSlaves = 0;
532  fInactiveSlaves = 0;
533  fUniqueSlaves = 0;
534  fAllUniqueSlaves = 0;
535  fNonUniqueMasters = 0;
536  fActiveMonitor = 0;
537  fUniqueMonitor = 0;
538  fAllUniqueMonitor = 0;
539  fCurrentMonitor = 0;
540  fBytesRead = 0;
541  fRealTime = 0;
542  fCpuTime = 0;
543  fIntHandler = 0;
544  fProgressDialog = 0;
547  fPlayer = 0;
548  fFeedback = 0;
549  fChains = 0;
550  fDSet = 0;
551  fNotIdle = 0;
552  fSync = kTRUE;
554  fIsWaiting = kFALSE;
555  fRedirLog = kFALSE;
556  fLogFileW = 0;
557  fLogFileR = 0;
560  fMacroLog.SetName("ProofLogMacro");
561 
562  fWaitingSlaves = 0;
563  fQueries = 0;
564  fOtherQueries = 0;
565  fDrawQueries = 0;
566  fMaxDrawQueries = 1;
567  fSeqNum = 0;
568 
569  fSessionID = -1;
570  fEndMaster = kFALSE;
571 
572  fPackMgr = 0;
574 
575  fInputData = 0;
576 
577  fPrintProgress = 0;
578 
579  fLoadedMacros = 0;
580 
581  fProtocol = -1;
582  fSlaves = 0;
584  fBadSlaves = 0;
585  fAllMonitor = 0;
586  fDataReady = kFALSE;
587  fBytesReady = 0;
588  fTotalBytes = 0;
589  fAvailablePackages = 0;
590  fEnabledPackages = 0;
591  fRunningDSets = 0;
592 
593  fCollectTimeout = -1;
594 
595  fManager = 0;
596  fQueryMode = kSync;
598 
601  fMergers = 0;
602  fMergersCount = -1;
604  fWorkersToMerge = 0;
606 
607  fPerfTree = "";
608 
609  fWrksOutputReady = 0;
610 
611  fSelector = 0;
612 
613  fPrepTime = 0.;
614 
615  // Check if the user defined a list of environment variables to send over:
616  // include them into the dedicated list
617  if (gSystem->Getenv("PROOF_ENVVARS")) {
618  TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
619  Int_t from = 0;
620  while (envs.Tokenize(env, from, ",")) {
621  if (!env.IsNull()) {
622  if (!gSystem->Getenv(env)) {
623  Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
624  } else {
625  if (!envsfound.IsNull()) envsfound += ",";
626  envsfound += env;
627  TProof::DelEnvVar(env);
628  TProof::AddEnvVar(env, gSystem->Getenv(env));
629  }
630  }
631  }
632  if (envsfound.IsNull()) {
633  Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
634  } else {
635  Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
636  }
637  }
638 
639  // Done
640  return;
641 }
642 
643 ////////////////////////////////////////////////////////////////////////////////
644 /// Clean up PROOF environment.
645 
647 {
648  if (fChains) {
649  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
650  // remove "chain" from list
651  chain->SetProof(0);
652  RemoveChain(chain);
653  }
654  }
655 
656  // remove links to packages enabled on the client
657  if (TestBit(TProof::kIsClient)) {
658  // iterate over all packages
659  TList *epl = fPackMgr->GetListOfEnabled();
660  if (epl) {
661  TIter nxp(epl);
662  while (TObjString *pck = (TObjString *)(nxp())) {
663  FileStat_t stat;
664  if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
665  // check if symlink, if so unlink
666  // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
667  // existing file or to a directory, but if fIsLink is true the symlink exists
668  if (stat.fIsLink)
669  gSystem->Unlink(pck->String());
670  }
671  }
672  epl->Delete();
673  delete epl;
674  }
675  }
676 
677  Close();
704  if (fWrksOutputReady) {
706  delete fWrksOutputReady;
707  }
708  if (fQueries) {
709  fQueries->Delete();
710  delete fQueries;
711  }
712 
713  // remove file with redirected logs
714  if (TestBit(TProof::kIsClient)) {
715  if (fLogFileR)
716  fclose(fLogFileR);
717  if (fLogFileW)
718  fclose(fLogFileW);
719  if (fLogFileName.Length() > 0)
721  }
722 
723  // Remove for the global list
724  gROOT->GetListOfProofs()->Remove(this);
725  // ... and from the manager list
726  if (fManager && fManager->IsValid())
727  fManager->DiscardSession(this);
728 
729  if (gProof && gProof == this) {
730  // Set previous as default
731  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
732  while ((gProof = (TProof *)pvp())) {
734  break;
735  }
736  }
737 
738  // For those interested in our destruction ...
739  Emit("~TProof()");
740  Emit("CloseWindow()");
741 }
742 
743 ////////////////////////////////////////////////////////////////////////////////
744 /// Start the PROOF environment. Starting PROOF involves either connecting
745 /// to a master server, which in turn will start a set of slave servers, or
746 /// directly starting as master server (if master = ""). For a description
747 /// of the arguments see the TProof ctor. Returns the number of started
748 /// master or slave servers, returns 0 in case of error, in which case
749 /// fValid remains false.
750 
751 Int_t TProof::Init(const char *, const char *conffile,
752  const char *confdir, Int_t loglevel, const char *alias)
753 {
755 
756  fValid = kFALSE;
757 
758  // Connected to terminal?
759  fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
760 
761  // If in attach mode, options is filled with additional info
762  Bool_t attach = kFALSE;
763  if (strlen(fUrl.GetOptions()) > 0) {
764  attach = kTRUE;
765  // A flag from the GUI
766  TString opts = fUrl.GetOptions();
767  if (opts.Contains("GUI")) {
769  opts.Remove(opts.Index("GUI"));
770  fUrl.SetOptions(opts);
771  }
772  }
773 
774  if (TestBit(TProof::kIsMaster)) {
775  // Fill default conf file and conf dir
776  if (!conffile || !conffile[0])
778  if (!confdir || !confdir[0])
780  // The group; the client receives it in the kPROOF_SESSIONTAG message
782  } else {
783  fConfDir = confdir;
784  fConfFile = conffile;
785  }
786 
787  // Analysise the conffile field
788  if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
790 
792  fLogLevel = loglevel;
795  fImage = fMasterServ ? "" : "<local>";
796  fIntHandler = 0;
797  fStatus = 0;
798  fRecvMessages = new TList;
800  fSlaveInfo = 0;
801  fChains = new TList;
802  fAvailablePackages = 0;
803  fEnabledPackages = 0;
804  fRunningDSets = 0;
806  fInputData = 0;
808  fPrintProgress = 0;
809 
810  // Timeout for some collect actions
811  fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
812 
813  // Should the workers be started dynamically; default: no
814  fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
815 
816  // Default entry point for the data pool is the master
818  fDataPoolUrl.Form("root://%s", fMaster.Data());
819  else
820  fDataPoolUrl = "";
821 
822  fProgressDialog = 0;
824 
825  // Default alias is the master name
826  TString al = (alias) ? alias : fMaster.Data();
827  SetAlias(al);
828 
829  // Client logging of messages from the master and slaves
830  fRedirLog = kFALSE;
831  if (TestBit(TProof::kIsClient)) {
832  fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
833  if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
834  Error("Init", "could not create temporary logfile");
835  if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
836  Error("Init", "could not open temp logfile for reading");
837  }
839 
840  // Status of cluster
841  fNotIdle = 0;
842  // Query type
843  fSync = (attach) ? kFALSE : kTRUE;
844  // Not enqueued
845  fIsWaiting = kFALSE;
846 
847  // Counters
848  fBytesRead = 0;
849  fRealTime = 0;
850  fCpuTime = 0;
851 
852  // List of queries
853  fQueries = 0;
854  fOtherQueries = 0;
855  fDrawQueries = 0;
856  fMaxDrawQueries = 1;
857  fSeqNum = 0;
858 
859  // Remote ID of the session
860  fSessionID = -1;
861 
862  // Part of active query
863  fWaitingSlaves = 0;
864 
865  // Make remote PROOF player
866  fPlayer = 0;
867  MakePlayer();
868 
869  fFeedback = new TList;
870  fFeedback->SetOwner();
871  fFeedback->SetName("FeedbackList");
873 
874  // sort slaves by descending performance index
876  fActiveSlaves = new TList;
877  fInactiveSlaves = new TList;
878  fUniqueSlaves = new TList;
879  fAllUniqueSlaves = new TList;
880  fNonUniqueMasters = new TList;
881  fBadSlaves = new TList;
882  fAllMonitor = new TMonitor;
883  fActiveMonitor = new TMonitor;
884  fUniqueMonitor = new TMonitor;
886  fCurrentMonitor = 0;
887 
890 
891  fLoadedMacros = 0;
892  fPackMgr = 0;
893 
894  // Enable optimized sending of streamer infos to use embedded backward/forward
895  // compatibility support between different ROOT versions and different versions of
896  // users classes
897  Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
898  if (enableSchemaEvolution) {
900  } else {
901  Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
902  }
903 
904  if (IsMaster()) {
905  // to make UploadPackage() method work on the master as well.
907  } else {
908 
909  TString sandbox;
910  if (GetSandbox(sandbox, kTRUE) != 0) {
911  Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
912  return 0;
913  }
914 
915  // Package Dir
916  TString packdir = gEnv->GetValue("Proof.PackageDir", "");
917  if (packdir.IsNull())
918  packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
919  if (AssertPath(packdir, kTRUE) != 0) {
920  Error("Init", "failure asserting directory %s", packdir.Data());
921  return 0;
922  }
923  fPackMgr = new TPackMgr(packdir);
924  if (gDebug > 0)
925  Info("Init", "package directory set to %s", packdir.Data());
926  }
927 
928  if (!IsMaster()) {
929  // List of directories where to look for global packages
930  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
931  TProofServ::ResolveKeywords(globpack);
932  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
933  if (gDebug > 0)
934  Info("Init", " %d global package directories registered", nglb);
935  }
936 
937  // Master may want dynamic startup
938  if (fDynamicStartup) {
939  if (!IsMaster()) {
940  // If on client - start the master
941  if (!StartSlaves(attach))
942  return 0;
943  }
944  } else {
945 
946  // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
947  // result retrieving, ...)
948  Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
949  if (!IsMaster() || !masterOnly) {
950  // Start slaves (the old, static, per-session way)
951  if (!StartSlaves(attach))
952  return 0;
953  // Client: Is Master in dynamic startup mode?
954  if (!IsMaster()) {
955  Int_t dyn = 0;
956  GetRC("Proof.DynamicStartup", dyn);
957  if (dyn != 0) fDynamicStartup = kTRUE;
958  }
959  }
960  }
961  // we are now properly initialized
962  fValid = kTRUE;
963 
964  // De-activate monitor (will be activated in Collect)
966 
967  // By default go into parallel mode
968  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
969  TNamed *n = 0;
970  if (TProof::GetEnvVars() &&
971  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
972  TString s(n->GetTitle());
973  if (s.IsDigit()) nwrk = s.Atoi();
974  }
975  GoParallel(nwrk, attach);
976 
977  // Send relevant initial state to slaves
978  if (!attach)
980  else if (!IsIdle())
981  // redirect log
982  fRedirLog = kTRUE;
983 
984  // Done at this point, the alias will be communicated to the coordinator, if any
986  SetAlias(al);
987 
988  SetActive(kFALSE);
989 
990  if (IsValid()) {
991 
992  // Activate input handler
994 
996  gROOT->GetListOfSockets()->Add(this);
997  }
998 
999  AskParallel();
1000 
1001  return fActiveSlaves->GetSize();
1002 }
1003 
1004 ////////////////////////////////////////////////////////////////////////////////
1005 /// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1006 /// Use the existing setting or the default if nothing is found.
1007 /// If 'assert' is kTRUE, make also sure that the path exists.
1008 /// Return 0 on success, -1 on failure
1009 
1010 Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1011 {
1012  // Get it from 'rc', if defined
1013  if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1014  // Or use the default 'rc'
1015  if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1016  // If nothing found , use the default
1017  if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1018  // Expand special settings
1019  if (sb == ".") {
1020  sb = gSystem->pwd();
1021  } else if (sb == "..") {
1022  sb = gSystem->GetDirName(gSystem->pwd());
1023  }
1024  gSystem->ExpandPathName(sb);
1025 
1026  // Assert the path, if required
1027  if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1028  // Done
1029  return 0;
1030 }
1031 
1032 ////////////////////////////////////////////////////////////////////////////////
1033 /// The config file field may contain special instructions which need to be
1034 /// parsed at the beginning, e.g. for debug runs with valgrind.
1035 /// Several options can be given separated by a ','
1036 
1037 void TProof::ParseConfigField(const char *config)
1038 {
1039  TString sconf(config), opt;
1040  Ssiz_t from = 0;
1041  Bool_t cpuPin = kFALSE;
1042 
1043  // Analysise the field
1044  const char *cq = (IsLite()) ? "\"" : "";
1045  while (sconf.Tokenize(opt, from, ",")) {
1046  if (opt.IsNull()) continue;
1047 
1048  if (opt.BeginsWith("valgrind")) {
1049  // Any existing valgrind setting? User can give full settings, which we fully respect,
1050  // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1051  // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1052  // will add option "--time-stamp --leak-check=full" to our default options
1053  TString mst, top, sub, wrk, all;
1054  TList *envs = fgProofEnvList;
1055  TNamed *n = 0;
1056  if (envs) {
1057  if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1058  all = n->GetTitle();
1059  if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1060  mst = n->GetTitle();
1061  if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1062  top = n->GetTitle();
1063  if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1064  sub = n->GetTitle();
1065  if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1066  wrk = n->GetTitle();
1067  }
1068  if (all != "" && mst == "") mst = all;
1069  if (all != "" && top == "") top = all;
1070  if (all != "" && sub == "") sub = all;
1071  if (all != "" && wrk == "") wrk = all;
1072  if (all != "" && all.BeginsWith("valgrind_opts:")) {
1073  // The field is used to add an option Reset the setting
1074  Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1075  " must be set again for next run , if any");
1076  TProof::DelEnvVar("PROOF_WRAPPERCMD");
1077  }
1078  TString var, cmd;
1079  cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1080  TString mstlab("NO"), wrklab("NO");
1081  Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1082  !opt.Contains("topmaster") && !opt.Contains("submaster")))
1083  ? kTRUE : kFALSE;
1084  if (doMaster) {
1085  if (!IsLite()) {
1086  // Check if we have to add a var
1087  if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1088  mst.ReplaceAll("valgrind_opts:","");
1089  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1090  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1091  mstlab = "YES";
1092  } else if (mst != "") {
1093  mstlab = "YES";
1094  }
1095  } else {
1096  if (opt.Contains("master")) {
1097  Warning("ParseConfigField",
1098  "master valgrinding does not make sense for PROOF-Lite: ignoring");
1099  opt.ReplaceAll("master", "");
1100  if (!opt.Contains("workers")) return;
1101  }
1102  if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1103  }
1104  }
1105  if (opt.Contains("topmaster")) {
1106  // Check if we have to add a var
1107  if (top == "" || top.BeginsWith("valgrind_opts:")) {
1108  top.ReplaceAll("valgrind_opts:","");
1109  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1110  TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1111  mstlab = "YES";
1112  } else if (top != "") {
1113  mstlab = "YES";
1114  }
1115  }
1116  if (opt.Contains("submaster")) {
1117  // Check if we have to add a var
1118  if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1119  sub.ReplaceAll("valgrind_opts:","");
1120  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1121  TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1122  mstlab = "YES";
1123  } else if (sub != "") {
1124  mstlab = "YES";
1125  }
1126  }
1127  if (opt.Contains("=workers") || opt.Contains("+workers")) {
1128  // Check if we have to add a var
1129  if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1130  wrk.ReplaceAll("valgrind_opts:","");
1131  var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1132  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1133  TString nwrks("2");
1134  Int_t inw = opt.Index('#');
1135  if (inw != kNPOS) {
1136  nwrks = opt(inw+1, opt.Length());
1137  if (!nwrks.IsDigit()) nwrks = "2";
1138  }
1139  // Set the relevant variables
1140  if (!IsLite()) {
1141  TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1142  } else {
1143  gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1144  }
1145  wrklab = nwrks;
1146  // Register the additional worker log in the session file
1147  // (for the master this is done automatically)
1148  TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1149  } else if (wrk != "") {
1150  wrklab = "ALL";
1151  }
1152  }
1153  // Increase the relevant timeouts
1154  if (!IsLite()) {
1155  TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1156  gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1157  } else {
1158  gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1159  }
1160  // Warn for slowness
1161  Printf(" ");
1162  if (!IsLite()) {
1163  Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1164  } else {
1165  Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1166  }
1167  Printf(" ---> Please be patient: startup may be VERY slow ...");
1168  Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1169  Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1170  Printf(" ");
1171 
1172  } else if (opt.BeginsWith("igprof-pp")) {
1173 
1174  // IgProf profiling on master and worker. PROOF does not set the
1175  // environment for you: proper environment variables (like PATH and
1176  // LD_LIBRARY_PATH) should be set externally
1177 
1178  Printf("*** Requested IgProf performance profiling ***");
1179  TString addLogExt = "__igprof.pp__.log";
1180  TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1181  TString tmp;
1182 
1183  if (IsLite()) {
1184  addLogFmt.Append("\"");
1185  addLogFmt.Prepend("\"");
1186  }
1187 
1188  tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1189  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1190 
1191  tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1192  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1193 
1194  TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1195 
1196  } else if (opt.BeginsWith("cpupin=")) {
1197  // Enable CPU pinning. Takes as argument the list of processor IDs
1198  // that will be used in order. Processor IDs are numbered from 0,
1199  // use likwid to see how they are organized. A possible parameter
1200  // format would be:
1201  //
1202  // cpupin=3+4+0+9+10+22+7
1203  //
1204  // Only the specified processor IDs will be used in a round-robin
1205  // fashion, dealing with the fact that you can request more workers
1206  // than the number of processor IDs you have specified.
1207  //
1208  // To use all available processors in their order:
1209  //
1210  // cpupin=*
1211 
1212  opt.Remove(0, 7);
1213 
1214  // Remove any char which is neither a number nor a plus '+'
1215  for (Ssiz_t i=0; i<opt.Length(); i++) {
1216  Char_t c = opt[i];
1217  if ((c != '+') && ((c < '0') || (c > '9')))
1218  opt[i] = '_';
1219  }
1220  opt.ReplaceAll("_", "");
1221  TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1222  cpuPin = kTRUE;
1223  } else if (opt.BeginsWith("workers=")) {
1224 
1225  // Request for a given number of workers (within the max) or worker
1226  // startup combination:
1227  // workers=5 start max 5 workers (or less, if less are assigned)
1228  // workers=2x start max 2 workers per node (or less, if less are assigned)
1229  opt.ReplaceAll("workers=","");
1230  TProof::AddEnvVar("PROOF_NWORKERS", opt);
1231  }
1232  }
1233 
1234  // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1235  #ifdef R__LINUX
1236  if (IsLite() && cpuPin) {
1237  Printf("*** Requested CPU pinning ***");
1238  const TList *ev = GetEnvVars();
1239  const char *pinCmd = "taskset -c <cpupin>";
1240  TString val;
1241  TNamed *p;
1242  if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1243  val = p->GetTitle();
1244  val.Insert(val.Length()-1, " ");
1245  val.Insert(val.Length()-1, pinCmd);
1246  }
1247  else {
1248  val.Form("\"%s\"", pinCmd);
1249  }
1250  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1251  }
1252  #endif
1253 }
1254 
1255 ////////////////////////////////////////////////////////////////////////////////
1256 /// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1257 /// that the path is writable
1258 
1259 Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1260 {
1261  if (!inpath || strlen(inpath) <= 0) {
1262  Error("AssertPath", "undefined input path");
1263  return -1;
1264  }
1265 
1266  TString path(inpath);
1267  gSystem->ExpandPathName(path);
1268 
1269  if (gSystem->AccessPathName(path, kFileExists)) {
1270  if (gSystem->mkdir(path, kTRUE) != 0) {
1271  Error("AssertPath", "could not create path %s", path.Data());
1272  return -1;
1273  }
1274  }
1275  // It must be writable
1276  if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1277  if (gSystem->Chmod(path, 0666) != 0) {
1278  Error("AssertPath", "could not make path %s writable", path.Data());
1279  return -1;
1280  }
1281  }
1282 
1283  // Done
1284  return 0;
1285 }
1286 
1287 ////////////////////////////////////////////////////////////////////////////////
1288 /// Set manager and schedule its destruction after this for clean
1289 /// operations.
1290 
1292 {
1293  fManager = mgr;
1294 
1295  if (mgr) {
1297  gROOT->GetListOfSockets()->Remove(mgr);
1298  gROOT->GetListOfSockets()->Add(mgr);
1299  }
1300 }
1301 
1302 ////////////////////////////////////////////////////////////////////////////////
1303 /// Works on the master node only.
1304 /// It starts workers on the machines in workerList and sets the paths,
1305 /// packages and macros as on the master.
1306 /// It is a subbstitute for StartSlaves(...)
1307 /// The code is mostly the master part of StartSlaves,
1308 /// with the parallel startup removed.
1309 
1311 {
1312  if (!IsMaster()) {
1313  Error("AddWorkers", "AddWorkers can only be called on the master!");
1314  return -1;
1315  }
1316 
1317  if (!workerList || !(workerList->GetSize())) {
1318  Error("AddWorkers", "empty list of workers!");
1319  return -2;
1320  }
1321 
1322  // Code taken from master part of StartSlaves with the parllel part removed
1323 
1324  fImage = gProofServ->GetImage();
1325  if (fImage.IsNull())
1327 
1328  // Get all workers
1329  UInt_t nSlaves = workerList->GetSize();
1330  UInt_t nSlavesDone = 0;
1331  Int_t ord = 0;
1332 
1333  // Loop over all new workers and start them (if we had already workers it means we are
1334  // increasing parallelism or that is not the first time we are called)
1335  Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1336 
1337  // A list of TSlave objects for workers that are being added
1338  TList *addedWorkers = new TList();
1339  if (!addedWorkers) {
1340  // This is needed to silence Coverity ...
1341  Error("AddWorkers", "cannot create new list for the workers to be added");
1342  return -2;
1343  }
1344  addedWorkers->SetOwner(kFALSE);
1345  TListIter next(workerList);
1346  TObject *to;
1347  TProofNodeInfo *worker;
1348  TSlaveInfo *dummysi = new TSlaveInfo();
1349  while ((to = next())) {
1350  // Get the next worker from the list
1351  worker = (TProofNodeInfo *)to;
1352 
1353  // Read back worker node info
1354  const Char_t *image = worker->GetImage().Data();
1355  const Char_t *workdir = worker->GetWorkDir().Data();
1356  Int_t perfidx = worker->GetPerfIndex();
1357  Int_t sport = worker->GetPort();
1358  if (sport == -1)
1359  sport = fUrl.GetPort();
1360 
1361  // Create worker server
1362  TString fullord;
1363  if (worker->GetOrdinal().Length() > 0) {
1364  fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1365  } else {
1366  fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1367  }
1368 
1369  // Remove worker from the list of workers terminated gracefully
1370  dummysi->SetOrdinal(fullord);
1371  TSlaveInfo *rmsi = (TSlaveInfo *)fTerminatedSlaveInfos->Remove(dummysi);
1372  SafeDelete(rmsi);
1373 
1374  // Create worker server
1375  TString wn(worker->GetNodeName());
1376  if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1377  TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1378  // Add group info in the password firdl, if any
1379  if (strlen(gProofServ->GetGroup()) > 0) {
1380  // Set also the user, otherwise the password is not exported
1381  if (strlen(u.GetUser()) <= 0)
1382  u.SetUser(gProofServ->GetUser());
1384  }
1385  TSlave *slave = 0;
1386  if (worker->IsWorker()) {
1387  slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1388  } else {
1389  slave = CreateSubmaster(u.GetUrl(), fullord,
1390  image, worker->GetMsd(), worker->GetNWrks());
1391  }
1392 
1393  // Add to global list (we will add to the monitor list after
1394  // finalizing the server startup)
1395  Bool_t slaveOk = kTRUE;
1396  fSlaves->Add(slave);
1397  if (slave->IsValid()) {
1398  addedWorkers->Add(slave);
1399  } else {
1400  slaveOk = kFALSE;
1401  fBadSlaves->Add(slave);
1402  Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1403  }
1404 
1405  PDB(kGlobal,3)
1406  Info("AddWorkers", "worker on host %s created"
1407  " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1408 
1409  // Notify opening of connection
1410  nSlavesDone++;
1412  m << TString("Opening connections to workers") << nSlaves
1413  << nSlavesDone << slaveOk;
1414  gProofServ->GetSocket()->Send(m);
1415 
1416  ord++;
1417  } //end of the worker loop
1418  SafeDelete(dummysi);
1419 
1420  // Cleanup
1421  SafeDelete(workerList);
1422 
1423  nSlavesDone = 0;
1424 
1425  // Here we finalize the server startup: in this way the bulk
1426  // of remote operations are almost parallelized
1427  TIter nxsl(addedWorkers);
1428  TSlave *sl = 0;
1429  while ((sl = (TSlave *) nxsl())) {
1430 
1431  // Finalize setup of the server
1432  if (sl->IsValid())
1433  sl->SetupServ(TSlave::kSlave, 0);
1434 
1435  // Monitor good slaves
1436  Bool_t slaveOk = kTRUE;
1437  if (sl->IsValid()) {
1438  fAllMonitor->Add(sl->GetSocket());
1439  PDB(kGlobal,3)
1440  Info("AddWorkers", "worker on host %s finalized"
1441  " and added to list", sl->GetOrdinal());
1442  } else {
1443  slaveOk = kFALSE;
1444  fBadSlaves->Add(sl);
1445  }
1446 
1447  // Notify end of startup operations
1448  nSlavesDone++;
1450  m << TString("Setting up worker servers") << nSlaves
1451  << nSlavesDone << slaveOk;
1452  gProofServ->GetSocket()->Send(m);
1453  }
1454 
1455  // Now set new state on the added workers (on all workers for simplicity)
1456  // use fEnabledPackages, fLoadedMacros,
1457  // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1458  // no need to load packages that are only loaded and not enabled (dyn mode)
1459  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1460  TNamed *n = 0;
1461  if (TProof::GetEnvVars() &&
1462  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1463  TString s(n->GetTitle());
1464  if (s.IsDigit()) nwrk = s.Atoi();
1465  }
1466 
1467  if (fDynamicStartup && goMoreParallel) {
1468 
1469  PDB(kGlobal, 3)
1470  Info("AddWorkers", "will invoke GoMoreParallel()");
1471  Int_t nw = GoMoreParallel(nwrk);
1472  PDB(kGlobal, 3)
1473  Info("AddWorkers", "GoMoreParallel()=%d", nw);
1474 
1475  }
1476  else {
1477  // Not in Dynamic Workers mode
1478  PDB(kGlobal, 3)
1479  Info("AddWorkers", "will invoke GoParallel()");
1480  GoParallel(nwrk, kFALSE, 0);
1481  }
1482 
1483  // Set worker processing environment
1484  SetupWorkersEnv(addedWorkers, goMoreParallel);
1485 
1486  // Update list of current workers
1487  PDB(kGlobal, 3)
1488  Info("AddWorkers", "will invoke SaveWorkerInfo()");
1489  SaveWorkerInfo();
1490 
1491  // Inform the client that the number of workers has changed
1492  if (fDynamicStartup && gProofServ) {
1493  PDB(kGlobal, 3)
1494  Info("AddWorkers", "will invoke SendParallel()");
1496 
1497  if (goMoreParallel && fPlayer) {
1498  // In case we are adding workers dynamically to an existing process, we
1499  // should invoke a special player's Process() to set only added workers
1500  // to the proper state
1501  PDB(kGlobal, 3)
1502  Info("AddWorkers", "will send the PROCESS message to selected workers");
1503  fPlayer->JoinProcess(addedWorkers);
1504  // Update merger counters (new workers are not yet active)
1505  fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1506  }
1507  }
1508 
1509  // Cleanup
1510  delete addedWorkers;
1511 
1512  return 0;
1513 }
1514 
1515 ////////////////////////////////////////////////////////////////////////////////
1516 /// Set up packages, loaded macros, include and lib paths ...
1517 
1518 void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1519 {
1520  TList *server_packs = gProofServ ? gProofServ->GetEnabledPackages() : nullptr;
1521  // Packages
1522  TList *packs = server_packs ? server_packs : GetEnabledPackages();
1523  if (packs && packs->GetSize() > 0) {
1524  TIter nxp(packs);
1525  TPair *pck = 0;
1526  while ((pck = (TPair *) nxp())) {
1527  // Upload and Enable methods are intelligent and avoid
1528  // re-uploading or re-enabling of a package to a node that has it.
1529  if (fDynamicStartup && increasingWorkers) {
1530  // Upload only on added workers
1531  PDB(kGlobal, 3)
1532  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1533  if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1534  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1535  } else {
1536  PDB(kGlobal, 3)
1537  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1538  if (UploadPackage(pck->GetName()) >= 0)
1539  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1540  }
1541  }
1542  }
1543 
1544  if (server_packs) {
1545  server_packs->Delete();
1546  delete server_packs;
1547  }
1548 
1549  // Loaded macros
1550  if (fLoadedMacros) {
1551  TIter nxp(fLoadedMacros);
1552  TObjString *os = 0;
1553  while ((os = (TObjString *) nxp())) {
1554  PDB(kGlobal, 3) {
1555  Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1556  Printf("Loading a macro : %s", os->GetName());
1557  }
1558  Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1559  }
1560  }
1561 
1562  // Dynamic path
1563  TString dyn = gSystem->GetDynamicPath();
1564  dyn.ReplaceAll(":", " ");
1565  dyn.ReplaceAll("\"", " ");
1566  PDB(kGlobal, 3)
1567  Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1568  AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1569 
1570  // Include path
1571  TString inc = gSystem->GetIncludePath();
1572  inc.ReplaceAll("-I", " ");
1573  inc.ReplaceAll("\"", " ");
1574  PDB(kGlobal, 3)
1575  Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1576  AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1577 
1578  // Done
1579  return;
1580 }
1581 
1582 ////////////////////////////////////////////////////////////////////////////////
1583 /// Used for shuting down the workres after a query is finished.
1584 /// Sends each of the workers from the workerList, a kPROOF_STOP message.
1585 /// If the workerList == 0, shutdown all the workers.
1586 
1588 {
1589  if (!IsMaster()) {
1590  Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1591  return -1;
1592  }
1593 
1594  fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1595 
1596  if (!workerList) {
1597  // shutdown all the workers
1598  TIter nxsl(fSlaves);
1599  TSlave *sl = 0;
1600  while ((sl = (TSlave *) nxsl())) {
1601  // Shut down the worker assumig that it is not processing
1602  TerminateWorker(sl);
1603  }
1604 
1605  } else {
1606  if (!(workerList->GetSize())) {
1607  Error("RemoveWorkers", "The list of workers should not be empty!");
1608  return -2;
1609  }
1610 
1611  // Loop over all the workers and stop them
1612  TListIter next(workerList);
1613  TObject *to;
1614  TProofNodeInfo *worker;
1615  while ((to = next())) {
1616  TSlave *sl = 0;
1617  if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1618  // Get the next worker from the list
1619  worker = (TProofNodeInfo *)to;
1620  TIter nxsl(fSlaves);
1621  while ((sl = (TSlave *) nxsl())) {
1622  // Shut down the worker assumig that it is not processing
1623  if (sl->GetName() == worker->GetNodeName())
1624  break;
1625  }
1626  } else if (to->InheritsFrom(TSlave::Class())) {
1627  sl = (TSlave *) to;
1628  } else {
1629  Warning("RemoveWorkers","unknown object type: %s - it should be"
1630  " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1631  }
1632  // Shut down the worker assumig that it is not processing
1633  if (sl) {
1634  if (gDebug > 0)
1635  Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1636  TerminateWorker(sl);
1637  }
1638  }
1639  }
1640 
1641  // Update also the master counter
1642  if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1643 
1644  return 0;
1645 }
1646 
1647 ////////////////////////////////////////////////////////////////////////////////
1648 /// Start up PROOF slaves.
1649 
1651 {
1652  // If this is a master server, find the config file and start slave
1653  // servers as specified in the config file
1654  if (TestBit(TProof::kIsMaster)) {
1655 
1656  Int_t pc = 0;
1657  TList *workerList = new TList;
1658  // Get list of workers
1659  if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1660  TString emsg("no resource currently available for this session: please retry later");
1661  if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1662  gProofServ->SendAsynMessage(emsg.Data());
1663  return kFALSE;
1664  }
1665  // Setup the workers
1666  if (AddWorkers(workerList) < 0)
1667  return kFALSE;
1668 
1669  } else {
1670 
1671  // create master server
1672  Printf("Starting master: opening connection ...");
1673  TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1674 
1675  if (slave->IsValid()) {
1676 
1677  // Notify
1678  fprintf(stderr,"Starting master:"
1679  " connection open: setting up server ... \r");
1680  StartupMessage("Connection to master opened", kTRUE, 1, 1);
1681 
1682  if (!attach) {
1683 
1684  // Set worker interrupt handler
1685  slave->SetInterruptHandler(kTRUE);
1686 
1687  // Finalize setup of the server
1689 
1690  if (slave->IsValid()) {
1691 
1692  // Notify
1693  Printf("Starting master: OK ");
1694  StartupMessage("Master started", kTRUE, 1, 1);
1695 
1696  // check protocol compatibility
1697  // protocol 1 is not supported anymore
1698  if (fProtocol == 1) {
1699  Error("StartSlaves",
1700  "client and remote protocols not compatible (%d and %d)",
1702  slave->Close("S");
1703  delete slave;
1704  return kFALSE;
1705  }
1706 
1707  fSlaves->Add(slave);
1708  fAllMonitor->Add(slave->GetSocket());
1709 
1710  // Unset worker interrupt handler
1711  slave->SetInterruptHandler(kFALSE);
1712 
1713  // Set interrupt PROOF handler from now on
1714  fIntHandler = new TProofInterruptHandler(this);
1715 
1716  // Give-up after 5 minutes
1717  Int_t rc = Collect(slave, 300);
1718  Int_t slStatus = slave->GetStatus();
1719  if (slStatus == -99 || slStatus == -98 || rc == 0) {
1720  fSlaves->Remove(slave);
1721  fAllMonitor->Remove(slave->GetSocket());
1722  if (slStatus == -99)
1723  Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1724  else if (slStatus == -98)
1725  Error("StartSlaves", "could not setup output redirection on master");
1726  else
1727  Error("StartSlaves", "setting up master");
1728  slave->Close("S");
1729  delete slave;
1730  return 0;
1731  }
1732 
1733  if (!slave->IsValid()) {
1734  fSlaves->Remove(slave);
1735  fAllMonitor->Remove(slave->GetSocket());
1736  slave->Close("S");
1737  delete slave;
1738  Error("StartSlaves",
1739  "failed to setup connection with PROOF master server");
1740  return kFALSE;
1741  }
1742 
1743  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1744  if ((fProgressDialog =
1745  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1746  if (fProgressDialog->LoadPlugin() == -1)
1747  fProgressDialog = 0;
1748  }
1749  } else {
1750  // Notify
1751  Printf("Starting master: failure");
1752  }
1753  } else {
1754 
1755  // Notify
1756  Printf("Starting master: OK ");
1757  StartupMessage("Master attached", kTRUE, 1, 1);
1758 
1759  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1760  if ((fProgressDialog =
1761  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1762  if (fProgressDialog->LoadPlugin() == -1)
1763  fProgressDialog = 0;
1764  }
1765 
1766  fSlaves->Add(slave);
1767  fIntHandler = new TProofInterruptHandler(this);
1768  }
1769 
1770  } else {
1771  delete slave;
1772  // Notify only if verbosity is on: most likely the failure has already been notified
1773  if (gDebug > 0)
1774  Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1775  return kFALSE;
1776  }
1777  }
1778 
1779  return kTRUE;
1780 }
1781 
1782 ////////////////////////////////////////////////////////////////////////////////
1783 /// Close all open slave servers.
1784 /// Client can decide to shutdown the remote session by passing option is 'S'
1785 /// or 's'. Default for clients is detach, if supported. Masters always
1786 /// shutdown the remote counterpart.
1787 
1789 {
1790  { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1791 
1792  fValid = kFALSE;
1793  if (fSlaves) {
1794  if (fIntHandler)
1795  fIntHandler->Remove();
1796 
1797  TIter nxs(fSlaves);
1798  TSlave *sl = 0;
1799  while ((sl = (TSlave *)nxs()))
1800  sl->Close(opt);
1801 
1802  fActiveSlaves->Clear("nodelete");
1803  fUniqueSlaves->Clear("nodelete");
1804  fAllUniqueSlaves->Clear("nodelete");
1805  fNonUniqueMasters->Clear("nodelete");
1806  fBadSlaves->Clear("nodelete");
1807  fInactiveSlaves->Clear("nodelete");
1808  fSlaves->Delete();
1809  }
1810  }
1811 
1813  gROOT->GetListOfSockets()->Remove(this);
1814 
1815  if (fChains) {
1816  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1817  // remove "chain" from list
1818  chain->SetProof(0);
1819  RemoveChain(chain);
1820  }
1821  }
1822 
1823  if (IsProofd()) {
1824 
1825  gROOT->GetListOfProofs()->Remove(this);
1826  if (gProof && gProof == this) {
1827  // Set previous proofd-related as default
1828  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1829  while ((gProof = (TProof *)pvp())) {
1830  if (gProof->IsProofd())
1831  break;
1832  }
1833  }
1834  }
1835  }
1836 }
1837 
1838 ////////////////////////////////////////////////////////////////////////////////
1839 /// Create a new TSlave of type TSlave::kSlave.
1840 /// Note: creation of TSlave is private with TProof as a friend.
1841 /// Derived classes must use this function to create slaves.
1842 
1843 TSlave *TProof::CreateSlave(const char *url, const char *ord,
1844  Int_t perf, const char *image, const char *workdir)
1845 {
1846  TSlave* sl = TSlave::Create(url, ord, perf, image,
1847  this, TSlave::kSlave, workdir, 0);
1848 
1849  if (sl->IsValid()) {
1850  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1851  // must set fParallel to 1 for slaves since they do not
1852  // report their fParallel with a LOG_DONE message
1853  sl->fParallel = 1;
1854  }
1855 
1856  return sl;
1857 }
1858 
1859 
1860 ////////////////////////////////////////////////////////////////////////////////
1861 /// Create a new TSlave of type TSlave::kMaster.
1862 /// Note: creation of TSlave is private with TProof as a friend.
1863 /// Derived classes must use this function to create slaves.
1864 
1865 TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1866  const char *image, const char *msd, Int_t nwk)
1867 {
1868  TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1869  TSlave::kMaster, 0, msd, nwk);
1870 
1871  if (sl->IsValid()) {
1872  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1873  }
1874 
1875  return sl;
1876 }
1877 
1878 ////////////////////////////////////////////////////////////////////////////////
1879 /// Find slave that has TSocket s. Returns 0 in case slave is not found.
1880 
1882 {
1883  TSlave *sl;
1884  TIter next(fSlaves);
1885 
1886  while ((sl = (TSlave *)next())) {
1887  if (sl->IsValid() && sl->GetSocket() == s)
1888  return sl;
1889  }
1890  return 0;
1891 }
1892 
1893 ////////////////////////////////////////////////////////////////////////////////
1894 /// Add to the fUniqueSlave list the active slaves that have a unique
1895 /// (user) file system image. This information is used to transfer files
1896 /// only once to nodes that share a file system (an image). Submasters
1897 /// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1898 /// list. That list is used to trigger the transferring of files to
1899 /// the submaster's unique slaves without the need to transfer the file
1900 /// to the submaster.
1901 
1903 {
1904  fUniqueSlaves->Clear();
1909 
1910  TIter next(fActiveSlaves);
1911 
1912  while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1913  if (fImage == sl->fImage) {
1914  if (sl->GetSlaveType() == TSlave::kMaster) {
1915  fNonUniqueMasters->Add(sl);
1916  fAllUniqueSlaves->Add(sl);
1917  fAllUniqueMonitor->Add(sl->GetSocket());
1918  }
1919  continue;
1920  }
1921 
1922  TIter next2(fUniqueSlaves);
1923  TSlave *replace_slave = 0;
1924  Bool_t add = kTRUE;
1925  while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1926  if (sl->fImage == sl2->fImage) {
1927  add = kFALSE;
1928  if (sl->GetSlaveType() == TSlave::kMaster) {
1929  if (sl2->GetSlaveType() == TSlave::kSlave) {
1930  // give preference to master
1931  replace_slave = sl2;
1932  add = kTRUE;
1933  } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1934  fNonUniqueMasters->Add(sl);
1935  fAllUniqueSlaves->Add(sl);
1936  fAllUniqueMonitor->Add(sl->GetSocket());
1937  } else {
1938  Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1939  R__ASSERT(0);
1940  }
1941  }
1942  break;
1943  }
1944  }
1945 
1946  if (add) {
1947  fUniqueSlaves->Add(sl);
1948  fAllUniqueSlaves->Add(sl);
1949  fUniqueMonitor->Add(sl->GetSocket());
1950  fAllUniqueMonitor->Add(sl->GetSocket());
1951  if (replace_slave) {
1952  fUniqueSlaves->Remove(replace_slave);
1953  fAllUniqueSlaves->Remove(replace_slave);
1954  fUniqueMonitor->Remove(replace_slave->GetSocket());
1955  fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1956  }
1957  }
1958  }
1959 
1960  // will be actiavted in Collect()
1963 }
1964 
1965 ////////////////////////////////////////////////////////////////////////////////
1966 /// Return number of slaves as described in the config file.
1967 
1969 {
1970  return fSlaves->GetSize();
1971 }
1972 
1973 ////////////////////////////////////////////////////////////////////////////////
1974 /// Return number of active slaves, i.e. slaves that are valid and in
1975 /// the current computing group.
1976 
1978 {
1979  return fActiveSlaves->GetSize();
1980 }
1981 
1982 ////////////////////////////////////////////////////////////////////////////////
1983 /// Return number of inactive slaves, i.e. slaves that are valid but not in
1984 /// the current computing group.
1985 
1987 {
1988  return fInactiveSlaves->GetSize();
1989 }
1990 
1991 ////////////////////////////////////////////////////////////////////////////////
1992 /// Return number of unique slaves, i.e. active slaves that have each a
1993 /// unique different user files system.
1994 
1996 {
1997  return fUniqueSlaves->GetSize();
1998 }
1999 
2000 ////////////////////////////////////////////////////////////////////////////////
2001 /// Return number of bad slaves. This are slaves that we in the config
2002 /// file, but refused to startup or that died during the PROOF session.
2003 
2005 {
2006  return fBadSlaves->GetSize();
2007 }
2008 
2009 ////////////////////////////////////////////////////////////////////////////////
2010 /// Ask the for the statistics of the slaves.
2011 
2013 {
2014  if (!IsValid()) return;
2015 
2018 }
2019 
2020 ////////////////////////////////////////////////////////////////////////////////
2021 /// Get statistics about CPU time, real time and bytes read.
2022 /// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2023 /// and GetBytesRead()
2024 
2026 {
2027  if (fProtocol > 27) {
2028  // This returns the correct result
2029  AskStatistics();
2030  } else {
2031  // AskStatistics is buggy: parse the output of Print()
2032  RedirectHandle_t rh;
2033  gSystem->RedirectOutput(fLogFileName, "a", &rh);
2034  Print();
2035  gSystem->RedirectOutput(0, 0, &rh);
2036  TMacro *mp = GetLastLog();
2037  if (mp) {
2038  // Look for global directories
2039  TIter nxl(mp->GetListOfLines());
2040  TObjString *os = 0;
2041  while ((os = (TObjString *) nxl())) {
2042  TString s(os->GetName());
2043  if (s.Contains("Total MB's processed:")) {
2044  s.ReplaceAll("Total MB's processed:", "");
2045  if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2046  } else if (s.Contains("Total real time used (s):")) {
2047  s.ReplaceAll("Total real time used (s):", "");
2048  if (s.IsFloat()) fRealTime = s.Atof();
2049  } else if (s.Contains("Total CPU time used (s):")) {
2050  s.ReplaceAll("Total CPU time used (s):", "");
2051  if (s.IsFloat()) fCpuTime = s.Atof();
2052  }
2053  }
2054  delete mp;
2055  }
2056  }
2057 
2058  if (verbose) {
2059  Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2060  GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2061  }
2062 }
2063 
2064 ////////////////////////////////////////////////////////////////////////////////
2065 /// Ask the for the number of parallel slaves.
2066 
2068 {
2069  if (!IsValid()) return;
2070 
2073 }
2074 
2075 ////////////////////////////////////////////////////////////////////////////////
2076 /// Ask the master for the list of queries.
2077 
2079 {
2080  if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2081 
2082  Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2084  m << all;
2085  Broadcast(m, kActive);
2087 
2088  // This should have been filled by now
2089  return fQueries;
2090 }
2091 
2092 ////////////////////////////////////////////////////////////////////////////////
2093 /// Number of queries processed by this session
2094 
2096 {
2097  if (fQueries)
2098  return fQueries->GetSize() - fOtherQueries;
2099  return 0;
2100 }
2101 
2102 ////////////////////////////////////////////////////////////////////////////////
2103 /// Set max number of draw queries whose results are saved
2104 
2106 {
2107  if (max > 0) {
2108  if (fPlayer)
2109  fPlayer->SetMaxDrawQueries(max);
2110  fMaxDrawQueries = max;
2111  }
2112 }
2113 
2114 ////////////////////////////////////////////////////////////////////////////////
2115 /// Get max number of queries whose full results are kept in the
2116 /// remote sandbox
2117 
2119 {
2121  m << kFALSE;
2122  Broadcast(m, kActive);
2124 }
2125 
2126 ////////////////////////////////////////////////////////////////////////////////
2127 /// Return pointer to the list of query results in the player
2128 
2130 {
2131  return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2132 }
2133 
2134 ////////////////////////////////////////////////////////////////////////////////
2135 /// Return pointer to the full TQueryResult instance owned by the player
2136 /// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2137 
2139 {
2140  return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2141 }
2142 
2143 ////////////////////////////////////////////////////////////////////////////////
2144 /// Ask the master for the list of queries.
2145 /// Options:
2146 /// "A" show information about all the queries known to the
2147 /// server, i.e. even those processed by other sessions
2148 /// "L" show only information about queries locally available
2149 /// i.e. already retrieved. If "L" is specified, "A" is
2150 /// ignored.
2151 /// "F" show all details available about queries
2152 /// "H" print help menu
2153 /// Default ""
2154 
2156 {
2157  Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2158  if (help) {
2159 
2160  // Help
2161 
2162  Printf("+++");
2163  Printf("+++ Options: \"A\" show all queries known to server");
2164  Printf("+++ \"L\" show retrieved queries");
2165  Printf("+++ \"F\" full listing of query info");
2166  Printf("+++ \"H\" print this menu");
2167  Printf("+++");
2168  Printf("+++ (case insensitive)");
2169  Printf("+++");
2170  Printf("+++ Use Retrieve(<#>) to retrieve the full"
2171  " query results from the master");
2172  Printf("+++ e.g. Retrieve(8)");
2173 
2174  Printf("+++");
2175 
2176  return;
2177  }
2178 
2179  if (!IsValid()) return;
2180 
2181  Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2182 
2183  TObject *pq = 0;
2184  if (!local) {
2185  GetListOfQueries(opt);
2186 
2187  if (!fQueries) return;
2188 
2189  TIter nxq(fQueries);
2190 
2191  // Queries processed by other sessions
2192  if (fOtherQueries > 0) {
2193  Printf("+++");
2194  Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2195  Int_t nq = 0;
2196  while (nq++ < fOtherQueries && (pq = nxq()))
2197  pq->Print(opt);
2198  }
2199 
2200  // Queries processed by this session
2201  Printf("+++");
2202  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2204  while ((pq = nxq()))
2205  pq->Print(opt);
2206 
2207  } else {
2208 
2209  // Queries processed by this session
2210  Printf("+++");
2211  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2213 
2214  // Queries available locally
2215  TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2216  if (listlocal) {
2217  Printf("+++");
2218  Printf("+++ Queries available locally: %d", listlocal->GetSize());
2219  TIter nxlq(listlocal);
2220  while ((pq = nxlq()))
2221  pq->Print(opt);
2222  }
2223  }
2224  Printf("+++");
2225 }
2226 
2227 ////////////////////////////////////////////////////////////////////////////////
2228 /// See if the data is ready to be analyzed.
2229 
2230 Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
2231 {
2232  if (!IsValid()) return kFALSE;
2233 
2234  TList submasters;
2235  TIter nextSlave(GetListOfActiveSlaves());
2236  while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2237  if (sl->GetSlaveType() == TSlave::kMaster) {
2238  submasters.Add(sl);
2239  }
2240  }
2241 
2242  fDataReady = kTRUE; //see if any submasters set it to false
2243  fBytesReady = 0;
2244  fTotalBytes = 0;
2245  //loop over submasters and see if data is ready
2246  if (submasters.GetSize() > 0) {
2247  Broadcast(kPROOF_DATA_READY, &submasters);
2248  Collect(&submasters);
2249  }
2250 
2251  bytesready = fBytesReady;
2252  totalbytes = fTotalBytes;
2253 
2254  EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2255 
2256  PDB(kGlobal,2)
2257  Info("IsDataReady", "%lld / %lld (%s)",
2258  bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2259 
2260  return fDataReady;
2261 }
2262 
2263 ////////////////////////////////////////////////////////////////////////////////
2264 /// Send interrupt to master or slave servers.
2265 
2267 {
2268  if (!IsValid()) return;
2269 
2270  TList *slaves = 0;
2271  if (list == kAll) slaves = fSlaves;
2272  if (list == kActive) slaves = fActiveSlaves;
2273  if (list == kUnique) slaves = fUniqueSlaves;
2274  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2275 
2276  if (slaves->GetSize() == 0) return;
2277 
2278  TSlave *sl;
2279  TIter next(slaves);
2280 
2281  while ((sl = (TSlave *)next())) {
2282  if (sl->IsValid()) {
2283 
2284  // Ask slave to progate the interrupt request
2285  sl->Interrupt((Int_t)type);
2286  }
2287  }
2288 }
2289 
2290 ////////////////////////////////////////////////////////////////////////////////
2291 /// Returns number of slaves active in parallel mode. Returns 0 in case
2292 /// there are no active slaves. Returns -1 in case of error.
2293 
2295 {
2296  if (!IsValid()) return -1;
2297 
2298  // iterate over active slaves and return total number of slaves
2299  TIter nextSlave(GetListOfActiveSlaves());
2300  Int_t nparallel = 0;
2301  while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2302  if (sl->GetParallel() >= 0)
2303  nparallel += sl->GetParallel();
2304 
2305  return nparallel;
2306 }
2307 
2308 ////////////////////////////////////////////////////////////////////////////////
2309 /// Returns list of TSlaveInfo's. In case of error return 0.
2310 
2312 {
2313  if (!IsValid()) return 0;
2314 
2315  if (fSlaveInfo == 0) {
2317  fSlaveInfo->SetOwner();
2318  } else {
2319  fSlaveInfo->Delete();
2320  }
2321 
2322  TList masters;
2323  TIter next(GetListOfSlaves());
2324  TSlave *slave;
2325 
2326  while ((slave = (TSlave *) next()) != 0) {
2327  if (slave->GetSlaveType() == TSlave::kSlave) {
2328  const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2329  TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2330  name,
2331  slave->GetPerfIdx());
2332  fSlaveInfo->Add(slaveinfo);
2333 
2334  TIter nextactive(GetListOfActiveSlaves());
2335  TSlave *activeslave;
2336  while ((activeslave = (TSlave *) nextactive())) {
2337  if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2338  slaveinfo->SetStatus(TSlaveInfo::kActive);
2339  break;
2340  }
2341  }
2342 
2343  TIter nextbad(GetListOfBadSlaves());
2344  TSlave *badslave;
2345  while ((badslave = (TSlave *) nextbad())) {
2346  if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2347  slaveinfo->SetStatus(TSlaveInfo::kBad);
2348  break;
2349  }
2350  }
2351  // Get system info if supported
2352  if (slave->IsValid()) {
2353  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2354  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2355  else
2356  masters.Add(slave);
2357  }
2358 
2359  } else if (slave->GetSlaveType() == TSlave::kMaster) {
2360  if (slave->IsValid()) {
2361  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2362  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2363  else
2364  masters.Add(slave);
2365  }
2366  } else {
2367  Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2368  R__ASSERT(0);
2369  }
2370  }
2371  if (masters.GetSize() > 0) Collect(&masters);
2372 
2373  return fSlaveInfo;
2374 }
2375 
2376 ////////////////////////////////////////////////////////////////////////////////
2377 /// Activate slave server list.
2378 
2380 {
2381  TMonitor *mon = fAllMonitor;
2382  mon->DeActivateAll();
2383 
2384  slaves = !slaves ? fActiveSlaves : slaves;
2385 
2386  TIter next(slaves);
2387  TSlave *sl;
2388  while ((sl = (TSlave*) next())) {
2389  if (sl->IsValid())
2390  mon->Activate(sl->GetSocket());
2391  }
2392 }
2393 
2394 ////////////////////////////////////////////////////////////////////////////////
2395 /// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2396 /// monitored by 'mon'.
2397 
2399 {
2400  TMonitor *m = (mon) ? mon : fCurrentMonitor;
2401  if (m) {
2402  if (on)
2403  m->ActivateAll();
2404  else
2405  m->DeActivateAll();
2406  }
2407 }
2408 
2409 ////////////////////////////////////////////////////////////////////////////////
2410 /// Broadcast the group priority to all workers in the specified list. Returns
2411 /// the number of workers the message was successfully sent to.
2412 /// Returns -1 in case of error.
2413 
2414 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2415 {
2416  if (!IsValid()) return -1;
2417 
2418  if (workers->GetSize() == 0) return 0;
2419 
2420  int nsent = 0;
2421  TIter next(workers);
2422 
2423  TSlave *wrk;
2424  while ((wrk = (TSlave *)next())) {
2425  if (wrk->IsValid()) {
2426  if (wrk->SendGroupPriority(grp, priority) == -1)
2427  MarkBad(wrk, "could not send group priority");
2428  else
2429  nsent++;
2430  }
2431  }
2432 
2433  return nsent;
2434 }
2435 
2436 ////////////////////////////////////////////////////////////////////////////////
2437 /// Broadcast the group priority to all workers in the specified list. Returns
2438 /// the number of workers the message was successfully sent to.
2439 /// Returns -1 in case of error.
2440 
2441 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2442 {
2443  TList *workers = 0;
2444  if (list == kAll) workers = fSlaves;
2445  if (list == kActive) workers = fActiveSlaves;
2446  if (list == kUnique) workers = fUniqueSlaves;
2447  if (list == kAllUnique) workers = fAllUniqueSlaves;
2448 
2449  return BroadcastGroupPriority(grp, priority, workers);
2450 }
2451 
2452 ////////////////////////////////////////////////////////////////////////////////
2453 /// Reset the merge progress notificator
2454 
2456 {
2458 }
2459 
2460 ////////////////////////////////////////////////////////////////////////////////
2461 /// Broadcast a message to all slaves in the specified list. Returns
2462 /// the number of slaves the message was successfully sent to.
2463 /// Returns -1 in case of error.
2464 
2465 Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
2466 {
2467  if (!IsValid()) return -1;
2468 
2469  if (!slaves || slaves->GetSize() == 0) return 0;
2470 
2471  int nsent = 0;
2472  TIter next(slaves);
2473 
2474  TSlave *sl;
2475  while ((sl = (TSlave *)next())) {
2476  if (sl->IsValid()) {
2477  if (sl->GetSocket()->Send(mess) == -1)
2478  MarkBad(sl, "could not broadcast request");
2479  else
2480  nsent++;
2481  }
2482  }
2483 
2484  return nsent;
2485 }
2486 
2487 ////////////////////////////////////////////////////////////////////////////////
2488 /// Broadcast a message to all slaves in the specified list (either
2489 /// all slaves or only the active slaves). Returns the number of slaves
2490 /// the message was successfully sent to. Returns -1 in case of error.
2491 
2493 {
2494  TList *slaves = 0;
2495  if (list == kAll) slaves = fSlaves;
2496  if (list == kActive) slaves = fActiveSlaves;
2497  if (list == kUnique) slaves = fUniqueSlaves;
2498  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2499 
2500  return Broadcast(mess, slaves);
2501 }
2502 
2503 ////////////////////////////////////////////////////////////////////////////////
2504 /// Broadcast a character string buffer to all slaves in the specified
2505 /// list. Use kind to set the TMessage what field. Returns the number of
2506 /// slaves the message was sent to. Returns -1 in case of error.
2507 
2508 Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2509 {
2510  TMessage mess(kind);
2511  if (str) mess.WriteString(str);
2512  return Broadcast(mess, slaves);
2513 }
2514 
2515 ////////////////////////////////////////////////////////////////////////////////
2516 /// Broadcast a character string buffer to all slaves in the specified
2517 /// list (either all slaves or only the active slaves). Use kind to
2518 /// set the TMessage what field. Returns the number of slaves the message
2519 /// was sent to. Returns -1 in case of error.
2520 
2521 Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2522 {
2523  TMessage mess(kind);
2524  if (str) mess.WriteString(str);
2525  return Broadcast(mess, list);
2526 }
2527 
2528 ////////////////////////////////////////////////////////////////////////////////
2529 /// Broadcast an object to all slaves in the specified list. Use kind to
2530 /// set the TMEssage what field. Returns the number of slaves the message
2531 /// was sent to. Returns -1 in case of error.
2532 
2534 {
2535  TMessage mess(kind);
2536  mess.WriteObject(obj);
2537  return Broadcast(mess, slaves);
2538 }
2539 
2540 ////////////////////////////////////////////////////////////////////////////////
2541 /// Broadcast an object to all slaves in the specified list. Use kind to
2542 /// set the TMEssage what field. Returns the number of slaves the message
2543 /// was sent to. Returns -1 in case of error.
2544 
2546 {
2547  TMessage mess(kind);
2548  mess.WriteObject(obj);
2549  return Broadcast(mess, list);
2550 }
2551 
2552 ////////////////////////////////////////////////////////////////////////////////
2553 /// Broadcast a raw buffer of specified length to all slaves in the
2554 /// specified list. Returns the number of slaves the buffer was sent to.
2555 /// Returns -1 in case of error.
2556 
2557 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2558 {
2559  if (!IsValid()) return -1;
2560 
2561  if (slaves->GetSize() == 0) return 0;
2562 
2563  int nsent = 0;
2564  TIter next(slaves);
2565 
2566  TSlave *sl;
2567  while ((sl = (TSlave *)next())) {
2568  if (sl->IsValid()) {
2569  if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2570  MarkBad(sl, "could not send broadcast-raw request");
2571  else
2572  nsent++;
2573  }
2574  }
2575 
2576  return nsent;
2577 }
2578 
2579 ////////////////////////////////////////////////////////////////////////////////
2580 /// Broadcast a raw buffer of specified length to all slaves in the
2581 /// specified list. Returns the number of slaves the buffer was sent to.
2582 /// Returns -1 in case of error.
2583 
2584 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
2585 {
2586  TList *slaves = 0;
2587  if (list == kAll) slaves = fSlaves;
2588  if (list == kActive) slaves = fActiveSlaves;
2589  if (list == kUnique) slaves = fUniqueSlaves;
2590  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2591 
2592  return BroadcastRaw(buffer, length, slaves);
2593 }
2594 
2595 ////////////////////////////////////////////////////////////////////////////////
2596 /// Broadcast file to all workers in the specified list. Returns the number of workers
2597 /// the buffer was sent to.
2598 /// Returns -1 in case of error.
2599 
2600 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2601 {
2602  if (!IsValid()) return -1;
2603 
2604  if (wrks->GetSize() == 0) return 0;
2605 
2606  int nsent = 0;
2607  TIter next(wrks);
2608 
2609  TSlave *wrk;
2610  while ((wrk = (TSlave *)next())) {
2611  if (wrk->IsValid()) {
2612  if (SendFile(file, opt, rfile, wrk) < 0)
2613  Error("BroadcastFile",
2614  "problems sending file to worker %s (%s)",
2615  wrk->GetOrdinal(), wrk->GetName());
2616  else
2617  nsent++;
2618  }
2619  }
2620 
2621  return nsent;
2622 }
2623 
2624 ////////////////////////////////////////////////////////////////////////////////
2625 /// Broadcast file to all workers in the specified list. Returns the number of workers
2626 /// the buffer was sent to.
2627 /// Returns -1 in case of error.
2628 
2629 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2630 {
2631  TList *wrks = 0;
2632  if (list == kAll) wrks = fSlaves;
2633  if (list == kActive) wrks = fActiveSlaves;
2634  if (list == kUnique) wrks = fUniqueSlaves;
2635  if (list == kAllUnique) wrks = fAllUniqueSlaves;
2636 
2637  return BroadcastFile(file, opt, rfile, wrks);
2638 }
2639 
2640 ////////////////////////////////////////////////////////////////////////////////
2641 /// Release the used monitor to be used, making sure to delete newly created
2642 /// monitors.
2643 
2645 {
2646  if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2647  && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2648  delete mon;
2649  }
2650 }
2651 
2652 ////////////////////////////////////////////////////////////////////////////////
2653 /// Collect responses from slave sl. Returns the number of slaves that
2654 /// responded (=1).
2655 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2656 /// which means wait forever).
2657 /// If defined (>= 0) endtype is the message that stops this collection.
2658 
2659 Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2660 {
2661  Int_t rc = 0;
2662 
2663  TMonitor *mon = 0;
2664  if (!sl->IsValid()) return 0;
2665 
2666  if (fCurrentMonitor == fAllMonitor) {
2667  mon = new TMonitor;
2668  } else {
2669  mon = fAllMonitor;
2670  mon->DeActivateAll();
2671  }
2672  mon->Activate(sl->GetSocket());
2673 
2674  rc = Collect(mon, timeout, endtype, deactonfail);
2675  ReleaseMonitor(mon);
2676  return rc;
2677 }
2678 
2679 ////////////////////////////////////////////////////////////////////////////////
2680 /// Collect responses from the slave servers. Returns the number of slaves
2681 /// that responded.
2682 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2683 /// which means wait forever).
2684 /// If defined (>= 0) endtype is the message that stops this collection.
2685 
2686 Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2687 {
2688  Int_t rc = 0;
2689 
2690  TMonitor *mon = 0;
2691 
2692  if (fCurrentMonitor == fAllMonitor) {
2693  mon = new TMonitor;
2694  } else {
2695  mon = fAllMonitor;
2696  mon->DeActivateAll();
2697  }
2698  TIter next(slaves);
2699  TSlave *sl;
2700  while ((sl = (TSlave*) next())) {
2701  if (sl->IsValid())
2702  mon->Activate(sl->GetSocket());
2703  }
2704 
2705  rc = Collect(mon, timeout, endtype, deactonfail);
2706  ReleaseMonitor(mon);
2707  return rc;
2708 }
2709 
2710 ////////////////////////////////////////////////////////////////////////////////
2711 /// Collect responses from the slave servers. Returns the number of slaves
2712 /// that responded.
2713 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2714 /// which means wait forever).
2715 /// If defined (>= 0) endtype is the message that stops this collection.
2716 
2717 Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2718 {
2719  Int_t rc = 0;
2720  TMonitor *mon = 0;
2721 
2722  if (list == kAll) mon = fAllMonitor;
2723  if (list == kActive) mon = fActiveMonitor;
2724  if (list == kUnique) mon = fUniqueMonitor;
2725  if (list == kAllUnique) mon = fAllUniqueMonitor;
2726  if (fCurrentMonitor == mon) {
2727  // Get a copy
2728  mon = new TMonitor(*mon);
2729  }
2730  mon->ActivateAll();
2731 
2732  rc = Collect(mon, timeout, endtype, deactonfail);
2733  ReleaseMonitor(mon);
2734  return rc;
2735 }
2736 
2737 ////////////////////////////////////////////////////////////////////////////////
2738 /// Collect responses from the slave servers. Returns the number of messages
2739 /// received. Can be 0 if there are no active slaves.
2740 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2741 /// which means wait forever).
2742 /// If defined (>= 0) endtype is the message that stops this collection.
2743 /// Collect also stops its execution from time to time to check for new
2744 /// workers in Dynamic Startup mode.
2745 
2746 Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2747 {
2748  Int_t collectId = gRandom->Integer(9999);
2749 
2750  PDB(kCollect, 3)
2751  Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2752 
2753  // Reset the status flag and clear the messages in the list, if any
2754  fStatus = 0;
2755  fRecvMessages->Clear();
2756 
2757  Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2758 
2759  if (!mon->GetActive(actto)) return 0;
2760 
2762 
2763  // Used by external code to know what we are monitoring
2764  TMonitor *savedMonitor = 0;
2765  if (fCurrentMonitor) {
2766  savedMonitor = fCurrentMonitor;
2767  fCurrentMonitor = mon;
2768  } else {
2769  fCurrentMonitor = mon;
2770  fBytesRead = 0;
2771  fRealTime = 0.0;
2772  fCpuTime = 0.0;
2773  }
2774 
2775  // We want messages on the main window during synchronous collection,
2776  // but we save the present status to restore it at the end
2777  Bool_t saveRedirLog = fRedirLog;
2778  if (!IsIdle() && !IsSync())
2779  fRedirLog = kFALSE;
2780 
2781  int cnt = 0, rc = 0;
2782 
2783  // Timeout counter
2784  Long_t nto = timeout;
2785  PDB(kCollect, 2)
2786  Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2787 
2788  // On clients, handle Ctrl-C during collection
2789  if (fIntHandler)
2790  fIntHandler->Add();
2791 
2792  // Sockets w/o activity during the last 'sto' millisecs are deactivated
2793  Int_t nact = 0;
2794  Long_t sto = -1;
2795  Int_t nsto = 60;
2796  Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2797  mon->ResetInterrupt();
2798  while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2799 
2800  // Dump last waiting sockets, if in debug mode
2801  PDB(kCollect, 2) {
2802  if (nact < 4) {
2803  TList *al = mon->GetListOfActives();
2804  if (al && al->GetSize() > 0) {
2805  Info("Collect"," %d node(s) still active:", al->GetSize());
2806  TIter nxs(al);
2807  TSocket *xs = 0;
2808  while ((xs = (TSocket *)nxs())) {
2809  TSlave *wrk = FindSlave(xs);
2810  if (wrk)
2811  Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2812  else
2813  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2814  xs->GetInetAddress().GetPort());
2815  }
2816  }
2817  delete al;
2818  }
2819  }
2820 
2821  // Preemptive poll for new workers on the master only in Dynamic Mode and only
2822  // during processing (TODO: should work on Top Master only)
2824  ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2827  fLastPollWorkers_s = time(0);
2829  PDB(kCollect, 1)
2830  Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2831  }
2832 
2833  // Wait for a ready socket
2834  PDB(kCollect, 3)
2835  Info("Collect", "Will invoke Select() #%04d", collectId);
2836  TSocket *s = mon->Select(1000);
2837 
2838  if (s && s != (TSocket *)(-1)) {
2839  // Get and analyse the info it did receive
2840  rc = CollectInputFrom(s, endtype, deactonfail);
2841  if (rc == 1 || (rc == 2 && !savedMonitor)) {
2842  // Deactivate it if we are done with it
2843  mon->DeActivate(s);
2844  TList *al = mon->GetListOfActives();
2845  PDB(kCollect, 2)
2846  Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2847  s, mon->GetActive(),
2848  al->First());
2849  delete al;
2850  } else if (rc == 2) {
2851  // This end message was for the saved monitor
2852  // Deactivate it if we are done with it
2853  if (savedMonitor) {
2854  savedMonitor->DeActivate(s);
2855  TList *al = mon->GetListOfActives();
2856  PDB(kCollect, 2)
2857  Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2858  s, savedMonitor->GetActive(),
2859  al->First());
2860  delete al;
2861  }
2862  }
2863 
2864  // Update counter (if no error occured)
2865  if (rc >= 0)
2866  cnt++;
2867  } else {
2868  // If not timed-out, exit if not stopped or not aborted
2869  // (player exits status is finished in such a case); otherwise,
2870  // we still need to collect the partial output info
2871  if (!s)
2873  mon->DeActivateAll();
2874  // Decrease the timeout counter if requested
2875  if (s == (TSocket *)(-1) && nto > 0)
2876  nto--;
2877  }
2878 
2879  // Check if there are workers with ready output to be sent and ask the first to send it
2880  if (IsMaster() && fWrksOutputReady && fWrksOutputReady->GetSize() > 0) {
2881  // Maximum number of concurrent sendings
2882  Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2883  if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2884  mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2885  TIter nxwr(fWrksOutputReady);
2886  TSlave *wrk = 0;
2887  while (mxws && (wrk = (TSlave *) nxwr())) {
2888  if (!wrk->TestBit(TSlave::kOutputRequested)) {
2889  // Ask worker for output
2890  TMessage sendoutput(kPROOF_SENDOUTPUT);
2891  PDB(kCollect, 2)
2892  Info("Collect", "worker %s was asked to send its output to master",
2893  wrk->GetOrdinal());
2894  if (wrk->GetSocket()->Send(sendoutput) != 1) {
2896  mxws--;
2897  }
2898  } else {
2899  // Count
2900  mxws--;
2901  }
2902  }
2903  }
2904 
2905  // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2906  sto = -1;
2907  if (--nsto <= 0) {
2908  sto = (Long_t) actto;
2909  nsto = 60;
2910  }
2911 
2912  } // end loop over active monitors
2913 
2914  // If timed-out, deactivate the remaining sockets
2915  if (nto == 0) {
2916  TList *al = mon->GetListOfActives();
2917  if (al && al->GetSize() > 0) {
2918  // Notify the name of those which did timeout
2919  Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2920  TIter nxs(al);
2921  TSocket *xs = 0;
2922  while ((xs = (TSocket *)nxs())) {
2923  TSlave *wrk = FindSlave(xs);
2924  if (wrk)
2925  Info("Collect"," %s", wrk->GetName());
2926  else
2927  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2928  xs->GetInetAddress().GetPort());
2929  }
2930  }
2931  delete al;
2932  mon->DeActivateAll();
2933  }
2934 
2935  // Deactivate Ctrl-C special handler
2936  if (fIntHandler)
2937  fIntHandler->Remove();
2938 
2939  // make sure group view is up to date
2940  SendGroupView();
2941 
2942  // Restore redirection setting
2943  fRedirLog = saveRedirLog;
2944 
2945  // Restore the monitor
2946  fCurrentMonitor = savedMonitor;
2947 
2949 
2950  PDB(kCollect, 3)
2951  Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2952 
2953  return cnt;
2954 }
2955 
2956 ////////////////////////////////////////////////////////////////////////////////
2957 /// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2958 /// them. Returns the number of new workers found, or <0 on errors.
2959 
2961 {
2962  // Requests for worker updates
2963  Int_t dummy = 0;
2964  TList *reqWorkers = new TList();
2965  reqWorkers->SetOwner(kFALSE);
2966 
2967  if (!TestBit(TProof::kIsMaster)) {
2968  Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2969  return -1;
2970  }
2971  if (!gProofServ) {
2972  Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2973  return -1;
2974  }
2975 
2976  gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2977 
2978  // List of new workers only (TProofNodeInfo)
2979  TList *newWorkers = new TList();
2980  newWorkers->SetOwner(kTRUE);
2981 
2982  TIter next(reqWorkers);
2983  TProofNodeInfo *ni;
2984  TString fullOrd;
2985  while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2986 
2987  // Form the full ordinal
2988  fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2989 
2990  TIter nextInner(fSlaves);
2991  TSlave *sl;
2992  Bool_t found = kFALSE;
2993  while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2994  if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
2995  found = kTRUE;
2996  break;
2997  }
2998  }
2999 
3000  if (found) delete ni;
3001  else {
3002  newWorkers->Add(ni);
3003  PDB(kGlobal, 1)
3004  Info("PollForNewWorkers", "New worker found: %s:%s",
3005  ni->GetNodeName().Data(), fullOrd.Data());
3006  }
3007  }
3008 
3009  delete reqWorkers; // not owner
3010 
3011  Int_t nNewWorkers = newWorkers->GetEntries();
3012 
3013  // Add the new workers
3014  if (nNewWorkers > 0) {
3015  PDB(kGlobal, 1)
3016  Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
3017  Int_t rv = AddWorkers(newWorkers);
3018  if (rv < 0) {
3019  Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
3020  return -1;
3021  }
3022  // Don't delete newWorkers: AddWorkers() will do that
3023  }
3024  else {
3025  PDB(kGlobal, 2)
3026  Info("PollForNewWorkers", "No new worker found");
3027  delete newWorkers;
3028  }
3029 
3030  return nNewWorkers;
3031 }
3032 
3033 ////////////////////////////////////////////////////////////////////////////////
3034 /// Remove links to objects in list 'ol' from gDirectory
3035 
3037 {
3038  if (ol) {
3039  TIter nxo(ol);
3040  TObject *o = 0;
3041  while ((o = nxo()))
3042  gDirectory->RecursiveRemove(o);
3043  }
3044 }
3045 
3046 ////////////////////////////////////////////////////////////////////////////////
3047 /// Collect and analyze available input from socket s.
3048 /// Returns 0 on success, -1 if any failure occurs.
3049 
3051 {
3052  TMessage *mess;
3053 
3054  Int_t recvrc = 0;
3055  if ((recvrc = s->Recv(mess)) < 0) {
3056  PDB(kCollect,2)
3057  Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3058  Bool_t bad = kTRUE;
3059  if (recvrc == -5) {
3060  // Broken connection: try reconnection
3062  if (s->Reconnect() == 0) {
3064  bad = kFALSE;
3065  }
3066  }
3067  if (bad)
3068  MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3069  // Ignore this wake up
3070  return -1;
3071  }
3072  if (!mess) {
3073  // we get here in case the remote server died
3074  MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3075  return -1;
3076  }
3077  Int_t rc = 0;
3078 
3079  Int_t what = mess->What();
3080  TSlave *sl = FindSlave(s);
3081  rc = HandleInputMessage(sl, mess, deactonfail);
3082  if (rc == 1 && (endtype >= 0) && (what != endtype))
3083  // This message was for the base monitor in recursive case
3084  rc = 2;
3085 
3086  // We are done successfully
3087  return rc;
3088 }
3089 
3090 ////////////////////////////////////////////////////////////////////////////////
3091 /// Analyze the received message.
3092 /// Returns 0 on success (1 if this the last message from this socket), -1 if
3093 /// any failure occurs.
3094 
3096 {
3097  char str[512];
3098  TObject *obj;
3099  Int_t rc = 0;
3100 
3101  if (!mess || !sl) {
3102  Warning("HandleInputMessage", "given an empty message or undefined worker");
3103  return -1;
3104  }
3105  Bool_t delete_mess = kTRUE;
3106  TSocket *s = sl->GetSocket();
3107  if (!s) {
3108  Warning("HandleInputMessage", "worker socket is undefined");
3109  return -1;
3110  }
3111 
3112  // The message type
3113  Int_t what = mess->What();
3114 
3115  PDB(kCollect,3)
3116  Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3117 
3118  switch (what) {
3119 
3120  case kMESS_OK:
3121  // Add the message to the list
3122  fRecvMessages->Add(mess);
3123  delete_mess = kFALSE;
3124  break;
3125 
3126  case kMESS_OBJECT:
3127  if (fPlayer) fPlayer->HandleRecvHisto(mess);
3128  break;
3129 
3130  case kPROOF_FATAL:
3131  { TString msg;
3132  if ((mess->BufferSize() > mess->Length()))
3133  (*mess) >> msg;
3134  if (msg.IsNull()) {
3135  MarkBad(s, "received kPROOF_FATAL");
3136  } else {
3137  MarkBad(s, msg);
3138  }
3139  }
3140  if (fProgressDialogStarted) {
3141  // Finalize the progress dialog
3142  Emit("StopProcess(Bool_t)", kTRUE);
3143  }
3144  break;
3145 
3146  case kPROOF_STOP:
3147  // Stop collection from this worker
3148  Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3149  sl->GetOrdinal());
3150  rc = 1;
3151  break;
3152 
3153  case kPROOF_GETTREEHEADER:
3154  // Add the message to the list
3155  fRecvMessages->Add(mess);
3156  delete_mess = kFALSE;
3157  rc = 1;
3158  break;
3159 
3160  case kPROOF_TOUCH:
3161  // send a request for touching the remote admin file
3162  {
3163  sl->Touch();
3164  }
3165  break;
3166 
3167  case kPROOF_GETOBJECT:
3168  // send slave object it asks for
3169  mess->ReadString(str, sizeof(str));
3170  obj = gDirectory->Get(str);
3171  if (obj)
3172  s->SendObject(obj);
3173  else
3174  s->Send(kMESS_NOTOK);
3175  break;
3176 
3177  case kPROOF_GETPACKET:
3178  {
3179  PDB(kGlobal,2)
3180  Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3181  TDSetElement *elem = 0;
3182  elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3183 
3184  if (elem != (TDSetElement*) -1) {
3185  TMessage answ(kPROOF_GETPACKET);
3186  answ << elem;
3187  s->Send(answ);
3188 
3189  while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3190  TPair *p = (TPair*) fWaitingSlaves->First();
3191  s = (TSocket*) p->Key();
3192  TMessage *m = (TMessage*) p->Value();
3193 
3194  elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3195  if (elem != (TDSetElement*) -1) {
3197  a << elem;
3198  s->Send(a);
3199  // remove has to happen via Links because TPair does not have
3200  // a Compare() function and therefore RemoveFirst() and
3201  // Remove(TObject*) do not work
3203  delete p;
3204  delete m;
3205  } else {
3206  break;
3207  }
3208  }
3209  } else {
3210  if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3211  fWaitingSlaves->Add(new TPair(s, mess));
3212  delete_mess = kFALSE;
3213  }
3214  }
3215  break;
3216 
3217  case kPROOF_LOGFILE:
3218  {
3219  Int_t size;
3220  (*mess) >> size;
3221  PDB(kGlobal,2)
3222  Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3223  RecvLogFile(s, size);
3224  }
3225  break;
3226 
3227  case kPROOF_LOGDONE:
3228  (*mess) >> sl->fStatus >> sl->fParallel;
3229  PDB(kCollect,2)
3230  Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3231  sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3232  if (sl->fStatus != 0) {
3233  // Return last nonzero status
3234  fStatus = sl->fStatus;
3235  // Deactivate the worker, if required
3236  if (deactonfail) DeactivateWorker(sl->fOrdinal);
3237  }
3238  // Remove from the workers-ready list
3241  fWrksOutputReady->Remove(sl);
3242  }
3243  rc = 1;
3244  break;
3245 
3246  case kPROOF_GETSTATS:
3247  {
3248  (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3249  >> sl->fWorkDir >> sl->fProofWorkDir;
3250  PDB(kCollect,2)
3251  Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3252  TString img;
3253  if ((mess->BufferSize() > mess->Length()))
3254  (*mess) >> img;
3255  // Set image
3256  if (img.IsNull()) {
3257  if (sl->fImage.IsNull())
3258  sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3259  sl->fProofWorkDir.Data());
3260  } else {
3261  sl->fImage = img;
3262  }
3263  PDB(kGlobal,2)
3264  Info("HandleInputMessage",
3265  "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3266 
3267  fBytesRead += sl->fBytesRead;
3268  fRealTime += sl->fRealTime;
3269  fCpuTime += sl->fCpuTime;
3270  rc = 1;
3271  }
3272  break;
3273 
3274  case kPROOF_GETPARALLEL:
3275  {
3276  Bool_t async = kFALSE;
3277  (*mess) >> sl->fParallel;
3278  if ((mess->BufferSize() > mess->Length()))
3279  (*mess) >> async;
3280  rc = (async) ? 0 : 1;
3281  }
3282  break;
3283 
3284  case kPROOF_CHECKFILE:
3285  { // New servers (>= 5.22) send the status
3286  if ((mess->BufferSize() > mess->Length())) {
3287  (*mess) >> fCheckFileStatus;
3288  } else {
3289  // Form old servers this meant success (failure was signaled with the
3290  // dangerous kPROOF_FATAL)
3291  fCheckFileStatus = 1;
3292  }
3293  rc = 1;
3294  }
3295  break;
3296 
3297  case kPROOF_SENDFILE:
3298  { // New server: signals ending of sendfile operation
3299  rc = 1;
3300  }
3301  break;
3302 
3303  case kPROOF_PACKAGE_LIST:
3304  {
3305  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3306  Int_t type = 0;
3307  (*mess) >> type;
3308  switch (type) {
3312  if (fEnabledPackages) {
3314  } else {
3315  Error("HandleInputMessage",
3316  "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3317  }
3318  break;
3319  case TProof::kListPackages:
3322  if (fAvailablePackages) {
3324  } else {
3325  Error("HandleInputMessage",
3326  "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3327  }
3328  break;
3329  default:
3330  Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3331  }
3332  }
3333  break;
3334 
3335  case kPROOF_SENDOUTPUT:
3336  {
3337  // We start measuring the merging time
3338  fPlayer->SetMerging();
3339 
3340  // Worker is ready to send output: make sure the relevant bit is reset
3342  PDB(kGlobal,2)
3343  Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3344  // Create the list if not yet done
3345  if (!fWrksOutputReady) {
3346  fWrksOutputReady = new TList;
3348  }
3349  fWrksOutputReady->Add(sl);
3350  }
3351  break;
3352 
3353  case kPROOF_OUTPUTOBJECT:
3354  {
3355  // We start measuring the merging time
3356  fPlayer->SetMerging();
3357 
3358  PDB(kGlobal,2)
3359  Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3360  Int_t type = 0;
3361  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3363  Info("HandleInputMessage", "finalization on %s started ...", prefix);
3365  }
3366 
3367  while ((mess->BufferSize() > mess->Length())) {
3368  (*mess) >> type;
3369  // If a query result header, add it to the player list
3370  if (fPlayer) {
3371  if (type == 0) {
3372  // Retrieve query result instance (output list not filled)
3373  TQueryResult *pq =
3375  if (pq) {
3376  // Add query to the result list in TProofPlayer
3377  fPlayer->AddQueryResult(pq);
3378  fPlayer->SetCurrentQuery(pq);
3379  // And clear the output list, as we start merging a new set of results
3380  if (fPlayer->GetOutputList())
3381  fPlayer->GetOutputList()->Clear();
3382  // Add the unique query tag as TNamed object to the input list
3383  // so that it is available in TSelectors for monitoring
3384  TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3385  if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3386  fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3387  fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3388  } else {
3389  Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3390  }
3391  } else if (type > 0) {
3392  // Read object
3393  TObject *o = mess->ReadObject(TObject::Class());
3394  // Increment counter on the client side
3396  TString msg;
3397  Bool_t changed = kFALSE;
3398  msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3399  if (gProofServ) {
3401  } else if (IsTty() || changed) {
3402  fprintf(stderr, "%s\r", msg.Data());
3403  }
3404  // Add or merge it
3405  if ((fPlayer->AddOutputObject(o) == 1)) {
3406  // Remove the object if it has been merged
3407  SafeDelete(o);
3408  }
3409  if (type > 1) {
3410  // Update the merger progress info
3412  if (TestBit(TProof::kIsClient) && !IsLite()) {
3413  // In PROOFLite this has to be done once only in TProofLite::Process
3415  if (pq) {
3417  // Add input objects (do not override remote settings, if any)
3418  TObject *xo = 0;
3419  TIter nxin(fPlayer->GetInputList());
3420  // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3421  if (!pq->GetInputList()) pq->SetInputList(new TList());
3422  while ((xo = nxin()))
3423  if (!pq->GetInputList()->FindObject(xo->GetName()))
3424  pq->AddInput(xo->Clone());
3425  // If the last object, notify the GUI that the result arrived
3426  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3427  }
3428  // Processing is over
3429  UpdateDialog();
3430  }
3431  }
3432  }
3433  } else {
3434  Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3435  }
3436  }
3437  }
3438  break;
3439 
3440  case kPROOF_OUTPUTLIST:
3441  {
3442  // We start measuring the merging time
3443 
3444  PDB(kGlobal,2)
3445  Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3446  TList *out = 0;
3447  if (fPlayer) {
3448  fPlayer->SetMerging();
3449  if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3450  out = (TList *) mess->ReadObject(TList::Class());
3451  } else {
3452  TQueryResult *pq =
3454  if (pq) {
3455  // Add query to the result list in TProofPlayer
3456  fPlayer->AddQueryResult(pq);
3457  fPlayer->SetCurrentQuery(pq);
3458  // To avoid accidental cleanups from anywhere else
3459  // remove objects from gDirectory and clone the list
3460  out = pq->GetOutputList();
3461  CleanGDirectory(out);
3462  out = (TList *) out->Clone();
3463  // Notify the GUI that the result arrived
3464  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3465  } else {
3466  PDB(kGlobal,2)
3467  Info("HandleInputMessage",
3468  "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3469  }
3470  }
3471  if (out) {
3472  out->SetOwner();
3473  fPlayer->AddOutput(out); // Incorporate the list
3474  SafeDelete(out);
3475  } else {
3476  PDB(kGlobal,2)
3477  Info("HandleInputMessage",
3478  "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3479  }
3480  } else {
3481  Warning("HandleInputMessage",
3482  "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3483  }
3484  // On clients at this point processing is over
3485  if (TestBit(TProof::kIsClient) && !IsLite())
3486  UpdateDialog();
3487  }
3488  break;
3489 
3490  case kPROOF_QUERYLIST:
3491  {
3492  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3493  (*mess) >> fOtherQueries >> fDrawQueries;
3494  if (fQueries) {
3495  fQueries->Delete();
3496  delete fQueries;
3497  fQueries = 0;
3498  }
3499  fQueries = (TList *) mess->ReadObject(TList::Class());
3500  }
3501  break;
3502 
3503  case kPROOF_RETRIEVE:
3504  {
3505  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3506  TQueryResult *pq =
3508  if (pq && fPlayer) {
3509  fPlayer->AddQueryResult(pq);
3510  // Notify the GUI that the result arrived
3511  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3512  } else {
3513  PDB(kGlobal,2)
3514  Info("HandleInputMessage",
3515  "kPROOF_RETRIEVE: query result missing or player undefined");
3516  }
3517  }
3518  break;
3519 
3520  case kPROOF_MAXQUERIES:
3521  {
3522  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3523  Int_t max = 0;
3524 
3525  (*mess) >> max;
3526  Printf("Number of queries fully kept remotely: %d", max);
3527  }
3528  break;
3529 
3530  case kPROOF_SERVERSTARTED:
3531  {
3532  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3533 
3534  UInt_t tot = 0, done = 0;
3535  TString action;
3536  Bool_t st = kTRUE;
3537 
3538  (*mess) >> action >> tot >> done >> st;
3539 
3540  if (TestBit(TProof::kIsClient)) {
3541  if (tot) {
3542  TString type = (action.Contains("submas")) ? "submasters"
3543  : "workers";
3544  Int_t frac = (Int_t) (done*100.)/tot;
3545  char msg[512] = {0};
3546  if (frac >= 100) {
3547  snprintf(msg, 512, "%s: OK (%d %s) \n",
3548  action.Data(),tot, type.Data());
3549  } else {
3550  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3551  action.Data(), done, tot, frac);
3552  }
3553  if (fSync)
3554  fprintf(stderr,"%s", msg);
3555  else
3556  NotifyLogMsg(msg, 0);
3557  }
3558  // Notify GUIs
3559  StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3560  } else {
3561 
3562  // Just send the message one level up
3564  m << action << tot << done << st;
3565  gProofServ->GetSocket()->Send(m);
3566  }
3567  }
3568  break;
3569 
3570  case kPROOF_DATASET_STATUS:
3571  {
3572  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3573 
3574  UInt_t tot = 0, done = 0;
3575  TString action;
3576  Bool_t st = kTRUE;
3577 
3578  (*mess) >> action >> tot >> done >> st;
3579 
3580  if (TestBit(TProof::kIsClient)) {
3581  if (tot) {
3582  TString type = "files";
3583  Int_t frac = (Int_t) (done*100.)/tot;
3584  char msg[512] = {0};
3585  if (frac >= 100) {
3586  snprintf(msg, 512, "%s: OK (%d %s) \n",
3587  action.Data(),tot, type.Data());
3588  } else {
3589  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3590  action.Data(), done, tot, frac);
3591  }
3592  if (fSync)
3593  fprintf(stderr,"%s", msg);
3594  else
3595  NotifyLogMsg(msg, 0);
3596  }
3597  // Notify GUIs
3598  DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3599  } else {
3600 
3601  // Just send the message one level up
3603  m << action << tot << done << st;
3604  gProofServ->GetSocket()->Send(m);
3605  }
3606  }
3607  break;
3608 
3609  case kPROOF_STARTPROCESS:
3610  {
3611  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3612 
3613  // For Proof-Lite this variable is the number of workers and is set
3614  // by the player
3615  if (!IsLite()) {
3616  fNotIdle = 1;
3617  fIsWaiting = kFALSE;
3618  }
3619 
3620  // Redirect the output, if needed
3621  fRedirLog = (fSync) ? fRedirLog : kTRUE;
3622 
3623  // The signal is used on masters by XrdProofdProtocol to catch
3624  // the start of processing; on clients it allows to update the
3625  // progress dialog
3626  if (!TestBit(TProof::kIsMaster)) {
3627 
3628  // This is the end of preparation
3629  fQuerySTW.Stop();
3631  PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3632 
3633  TString selec;
3634  Int_t dsz = -1;
3635  Long64_t first = -1, nent = -1;
3636  (*mess) >> selec >> dsz >> first >> nent;
3637  // Start or reset the progress dialog
3638  if (!gROOT->IsBatch()) {
3639  if (fProgressDialog &&
3641  if (!fProgressDialogStarted) {
3642  fProgressDialog->ExecPlugin(5, this,
3643  selec.Data(), dsz, first, nent);
3645  } else {
3646  ResetProgressDialog(selec, dsz, first, nent);
3647  }
3648  }
3650  }
3651  }
3652  }
3653  break;
3654 
3655  case kPROOF_ENDINIT:
3656  {
3657  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3658 
3659  if (TestBit(TProof::kIsMaster)) {
3660  if (fPlayer)
3661  fPlayer->SetInitTime();
3662  }
3663  }
3664  break;
3665 
3666  case kPROOF_SETIDLE:
3667  {
3668  PDB(kGlobal,2)
3669  Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3670 
3671  // The session is idle
3672  if (IsLite()) {
3673  if (fNotIdle > 0) {
3674  fNotIdle--;
3675  PDB(kGlobal,2)
3676  Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3677  } else {
3678  Warning("HandleInputMessage",
3679  "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3680  sl->GetOrdinal());
3681  }
3682  } else {
3683  fNotIdle = 0;
3684  // Check if the query has been enqueued
3685  if ((mess->BufferSize() > mess->Length()))
3686  (*mess) >> fIsWaiting;
3687  }
3688  }
3689  break;
3690 
3691  case kPROOF_QUERYSUBMITTED:
3692  {
3693  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3694 
3695  // We have received the sequential number
3696  (*mess) >> fSeqNum;
3697  Bool_t sync = fSync;
3698  if ((mess->BufferSize() > mess->Length()))
3699  (*mess) >> sync;
3700  if (sync != fSync && fSync) {
3701  // The server required to switch to asynchronous mode
3702  Activate();
3703  fSync = kFALSE;
3704  }
3705  DisableGoAsyn();
3706  // Check if the query has been enqueued
3707  fIsWaiting = kTRUE;
3708  // For Proof-Lite this variable is the number of workers and is set by the player
3709  if (!IsLite())
3710  fNotIdle = 1;
3711 
3712  rc = 1;
3713  }
3714  break;
3715 
3716  case kPROOF_SESSIONTAG:
3717  {
3718  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3719 
3720  // We have received the unique tag and save it as name of this object
3721  TString stag;
3722  (*mess) >> stag;
3723  SetName(stag);
3724  // In the TSlave object
3725  sl->SetSessionTag(stag);
3726  // Server may have also sent the group
3727  if ((mess->BufferSize() > mess->Length()))
3728  (*mess) >> fGroup;
3729  // Server may have also sent the user
3730  if ((mess->BufferSize() > mess->Length())) {
3731  TString usr;
3732  (*mess) >> usr;
3733  if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3734  }
3735  }
3736  break;
3737 
3738  case kPROOF_FEEDBACK:
3739  {
3740  PDB(kGlobal,2)
3741  Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3742  TList *out = (TList *) mess->ReadObject(TList::Class());
3743  out->SetOwner();
3744  if (fPlayer)
3745  fPlayer->StoreFeedback(sl, out); // Adopts the list
3746  else
3747  // Not yet ready: stop collect asap
3748  rc = 1;
3749  }
3750  break;
3751 
3752  case kPROOF_AUTOBIN:
3753  {
3754  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3755 
3756  TString name;
3757  Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3758 
3759  (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3760 
3761  if (fPlayer) fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
3762 
3763  TMessage answ(kPROOF_AUTOBIN);
3764 
3765  answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3766 
3767  s->Send(answ);
3768  }
3769  break;
3770 
3771  case kPROOF_PROGRESS:
3772  {
3773  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3774 
3775  if (GetRemoteProtocol() > 25) {
3776  // New format
3777  TProofProgressInfo *pi = 0;
3778  (*mess) >> pi;
3779  fPlayer->Progress(sl,pi);
3780  } else if (GetRemoteProtocol() > 11) {
3781  Long64_t total, processed, bytesread;
3782  Float_t initTime, procTime, evtrti, mbrti;
3783  (*mess) >> total >> processed >> bytesread
3784  >> initTime >> procTime
3785  >> evtrti >> mbrti;
3786  if (fPlayer)
3787  fPlayer->Progress(sl, total, processed, bytesread,
3788  initTime, procTime, evtrti, mbrti);
3789 
3790  } else {
3791  // Old format
3792  Long64_t total, processed;
3793  (*mess) >> total >> processed;
3794  if (fPlayer)
3795  fPlayer->Progress(sl, total, processed);
3796  }
3797  }
3798  break;
3799 
3800  case kPROOF_STOPPROCESS:
3801  {
3802  // This message is sent from a worker that finished processing.
3803  // We determine whether it was asked to finish by the
3804  // packetizer or stopped during processing a packet
3805  // (by TProof::RemoveWorkers() or by an external signal).
3806  // In the later case call packetizer->MarkBad.
3807  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3808 
3809  Long64_t events = 0;
3810  Bool_t abort = kFALSE;
3811  TProofProgressStatus *status = 0;
3812 
3813  if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3814  (*mess) >> status >> abort;
3815  } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3816  (*mess) >> events >> abort;
3817  } else {
3818  (*mess) >> events;
3819  }
3820  if (fPlayer) {
3821  if (fProtocol > 18) {
3822  TList *listOfMissingFiles = 0;
3823  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3824  listOfMissingFiles = new TList();
3825  listOfMissingFiles->SetName("MissingFiles");
3826  if (fPlayer)
3827  fPlayer->AddOutputObject(listOfMissingFiles);
3828  }
3829  if (fPlayer->GetPacketizer()) {
3830  Int_t ret =
3831  fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3832  if (ret > 0)
3833  fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3834  // This object is now owned by the packetizer
3835  status = 0;
3836  }
3837  if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3838  } else {
3839  fPlayer->AddEventsProcessed(events);
3840  }
3841  }
3842  SafeDelete(status);
3843  if (!TestBit(TProof::kIsMaster))
3844  Emit("StopProcess(Bool_t)", abort);
3845  break;
3846  }
3847 
3848  case kPROOF_SUBMERGER:
3849  {
3850  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3851  HandleSubmerger(mess, sl);
3852  }
3853  break;
3854 
3855  case kPROOF_GETSLAVEINFO:
3856  {
3857  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3858 
3859  Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3860  Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3861  TList* tmpinfo = 0;
3862  (*mess) >> tmpinfo;
3863  if (tmpinfo == 0) {
3864  Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3865  } else {
3866  tmpinfo->SetOwner(kFALSE);
3867  Int_t nentries = tmpinfo->GetSize();
3868  for (Int_t i=0; i<nentries; i++) {
3869  TSlaveInfo* slinfo =
3870  dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3871  if (slinfo) {
3872  // If PROOF-Lite
3873  if (IsLite()) slinfo->fHostName = gSystem->HostName();
3874  // Check if we have already a instance for this worker
3875  TIter nxw(fSlaveInfo);
3876  TSlaveInfo *ourwi = 0;
3877  while ((ourwi = (TSlaveInfo *)nxw())) {
3878  if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3879  ourwi->SetSysInfo(slinfo->GetSysInfo());
3880  ourwi->fHostName = slinfo->GetName();
3881  if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3882  ourwi->fDataDir = slinfo->GetDataDir();
3883  break;
3884  }
3885  }
3886  if (!ourwi) {
3887  fSlaveInfo->Add(slinfo);
3888  } else {
3889  slinfo = ourwi;
3890  }
3891  if (slinfo->fStatus != TSlaveInfo::kBad) {
3892  if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3893  if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3894  }
3895  if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3896  slinfo->fMsd = sl->GetMsd();
3897  }
3898  }
3899  delete tmpinfo;
3900  rc = 1;
3901  }
3902  }
3903  break;
3904 
3905  case kPROOF_VALIDATE_DSET:
3906  {
3907  PDB(kGlobal,2)
3908  Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3909  TDSet* dset = 0;
3910  (*mess) >> dset;
3911  if (!fDSet)
3912  Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3913  else
3914  fDSet->Validate(dset);
3915  delete dset;
3916  }
3917  break;
3918 
3919  case kPROOF_DATA_READY:
3920  {
3921  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3922  Bool_t dataready = kFALSE;
3923  Long64_t totalbytes, bytesready;
3924  (*mess) >> dataready >> totalbytes >> bytesready;
3925  fTotalBytes += totalbytes;
3926  fBytesReady += bytesready;
3927  if (dataready == kFALSE) fDataReady = dataready;
3928  }
3929  break;
3930 
3931  case kPROOF_PING:
3932  // do nothing (ping is already acknowledged)
3933  break;
3934 
3935  case kPROOF_MESSAGE:
3936  {
3937  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3938 
3939  // We have received the unique tag and save it as name of this object
3940  TString msg;
3941  (*mess) >> msg;
3942  Bool_t lfeed = kTRUE;
3943  if ((mess->BufferSize() > mess->Length()))
3944  (*mess) >> lfeed;
3945 
3946  if (TestBit(TProof::kIsClient)) {
3947 
3948  if (fSync) {
3949  // Notify locally
3950  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3951  } else {
3952  // Notify locally taking care of redirection, windows logs, ...
3953  NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3954  }
3955  } else {
3956 
3957  // The message is logged for debugging purposes.
3958  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3959  if (gProofServ) {
3960  // We hide it during normal operations
3962 
3963  // And send the message one level up
3964  gProofServ->SendAsynMessage(msg, lfeed);
3965  }
3966  }
3967  }
3968  break;
3969 
3970  case kPROOF_VERSARCHCOMP:
3971  {
3972  TString vac;
3973  (*mess) >> vac;
3974  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3975  Int_t from = 0;
3976  TString vers, archcomp;
3977  if (vac.Tokenize(vers, from, "|"))
3978  vac.Tokenize(archcomp, from, "|");
3979  sl->SetArchCompiler(archcomp);
3980  vers.ReplaceAll(":","|");
3981  sl->SetROOTVersion(vers);
3982  }
3983  break;
3984 
3985  default:
3986  {
3987  Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3988  sl->GetOrdinal(), what);
3989  }
3990  break;
3991  }
3992 
3993  // Cleanup
3994  if (delete_mess)
3995  delete mess;
3996 
3997  // We are done successfully
3998  return rc;
3999 }
4000 
4001 ////////////////////////////////////////////////////////////////////////////////
4002 /// Process a message of type kPROOF_SUBMERGER
4003 
4005 {
4006  // Message sub-type
4007  Int_t type = 0;
4008  (*mess) >> type;
4009  TSocket *s = sl->GetSocket();
4010 
4011  switch (type) {
4012  case kOutputSent:
4013  {
4014  if (IsEndMaster()) {
4015  Int_t merger_id = -1;
4016  (*mess) >> merger_id;
4017 
4018  PDB(kSubmerger, 2)
4019  Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4020  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4021 
4022  if (!fMergers || fMergers->GetSize() <= merger_id) {
4023  Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4024  break;
4025  }
4026  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4027  mi->SetMergedWorker();
4028  if (mi->AreAllWorkersMerged()) {
4029  mi->Deactivate();
4030  if (GetActiveMergersCount() == 0) {
4031  fMergers->Clear();
4032  delete fMergers;
4033  fMergersSet = kFALSE;
4034  fMergersCount = -1;
4035  fLastAssignedMerger = 0;
4036  PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4037  }
4038  }
4039  } else {
4040  PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4041  }
4042  }
4043  break;
4044 
4045  case kMergerDown:
4046  {
4047  Int_t merger_id = -1;
4048  (*mess) >> merger_id;
4049 
4050  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4051 
4052  if (!fMergers || fMergers->GetSize() <= merger_id) {
4053  Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4054  break;
4055  }
4056 
4057  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4058  if (!mi->IsActive()) {
4059  break;
4060  } else {
4061  mi->Deactivate();
4062  }
4063 
4064  // Stop the invalid merger in the case it is still listening
4065  TMessage stop(kPROOF_SUBMERGER);
4066  stop << Int_t(kStopMerging);
4067  stop << 0;
4068  s->Send(stop);
4069 
4070  // Ask for results from merger (only original results from this node as worker are returned)
4071  AskForOutput(mi->GetMerger());
4072 
4073  // Ask for results from all workers assigned to this merger
4074  TIter nxo(mi->GetWorkers());
4075  TObject * o = 0;
4076  while ((o = nxo())) {
4077  AskForOutput((TSlave *)o);
4078  }
4079  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4080  }
4081  break;
4082 
4083  case kOutputSize:
4084  {
4085  if (IsEndMaster()) {
4086  PDB(kSubmerger, 2)
4087  Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4088 
4089  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4090  if (!fFinalizationRunning) {
4091  Info("HandleSubmerger", "finalization on %s started ...", prefix);
4093  }
4094 
4095  Int_t output_size = 0;
4096  Int_t merging_port = 0;
4097  (*mess) >> output_size >> merging_port;
4098 
4099  PDB(kSubmerger, 2) Info("HandleSubmerger",
4100  "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4101  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4102  TString msg;
4103  if (!fMergersSet) {
4104 
4106 
4107  // First pass - setting number of mergers according to user or dynamically
4108  fMergersCount = -1; // No mergers used if not set by user
4109  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4110  if (mc) fMergersCount = mc->GetVal(); // Value set by user
4111  TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4112  if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4113 
4114  // Mergers count specified by user but not valid
4115  if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4116  msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4117  prefix, fMergersCount, activeWorkers);
4118  if (gProofServ)
4120  else
4121  Printf("%s",msg.Data());
4122  fMergersCount = 0;
4123  }
4124  // Mergers count will be set dynamically
4125  if ((fMergersCount == 0) && (!fMergersByHost)) {
4126  if (activeWorkers > 1) {
4127  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4128  if (activeWorkers / fMergersCount < 2)
4129  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4130  }
4131  if (fMergersCount > 1)
4132  msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4133  prefix, fMergersCount, activeWorkers);
4134  else {
4135  msg.Form("%s: No mergers will be used for %d workers",
4136  prefix, activeWorkers);
4137  fMergersCount = -1;
4138  }
4139  if (gProofServ)
4141  else
4142  Printf("%s",msg.Data());
4143  } else if (fMergersByHost) {
4144  // We force mergers at host level to minimize network traffic
4145  if (activeWorkers > 1) {
4146  fMergersCount = 0;
4147  THashList hosts;
4148  TIter nxwk(fSlaves);
4149  TObject *wrk = 0;
4150  while ((wrk = nxwk())) {
4151  if (!hosts.FindObject(wrk->GetName())) {
4152  hosts.Add(new TObjString(wrk->GetName()));
4153  fMergersCount++;
4154  }
4155  }
4156  }
4157  if (fMergersCount > 1)
4158  msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4159  prefix, fMergersCount, activeWorkers);
4160  else {
4161  msg.Form("%s: No mergers will be used for %d workers",
4162  prefix, activeWorkers);
4163  fMergersCount = -1;
4164  }
4165  if (gProofServ)
4167  else
4168  Printf("%s",msg.Data());
4169  } else {
4170  msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4171  prefix, fMergersCount, activeWorkers);
4172  if (gProofServ)
4174  else
4175  Printf("%s",msg.Data());
4176  }
4177 
4178  // We started merging; we call it here because fMergersCount is still the original number
4179  // and can be saved internally
4181 
4182  // Update merger counters (new workers are not yet active)
4184 
4185  if (fMergersCount > 0) {
4186 
4187  fMergers = new TList();
4188  fLastAssignedMerger = 0;
4189  // Total number of workers, which will not act as mergers ('pure workers')
4190  fWorkersToMerge = (activeWorkers - fMergersCount);
4191  // Establish the first merger
4192  if (!CreateMerger(sl, merging_port)) {
4193  // Cannot establish first merger
4194  AskForOutput(sl);
4195  fWorkersToMerge--;
4196  fMergersCount--;
4197  }
4199  } else {
4200  AskForOutput(sl);
4201  }
4202  fMergersSet = kTRUE;
4203  } else {
4204  // Multiple pass
4205  if (fMergersCount == -1) {
4206  // No mergers. Workers send their outputs directly to master
4207  AskForOutput(sl);
4208  } else {
4209  if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4210  RedirectWorker(s, sl, output_size);
4211  fRedirectNext--;
4212  } else {
4213  Bool_t newMerger = kTRUE;
4214  if (fMergersByHost) {
4215  TIter nxmg(fMergers);
4216  TMergerInfo *mgi = 0;
4217  while ((mgi = (TMergerInfo *) nxmg())) {
4218  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4219  newMerger = kFALSE;
4220  break;
4221  }
4222  }
4223  }
4224  if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4225  // Still not enough mergers established
4226  if (!CreateMerger(sl, merging_port)) {
4227  // Cannot establish a merger
4228  AskForOutput(sl);
4229  fWorkersToMerge--;
4230  fMergersCount--;
4231  }
4232  } else
4233  RedirectWorker(s, sl, output_size);
4234  }
4235  }
4236  }
4237  } else {
4238  Error("HandleSubMerger","kOutputSize received not on endmaster!");
4239  }
4240  }
4241  break;
4242  }
4243 }
4244 
4245 ////////////////////////////////////////////////////////////////////////////////
4246 /// Redirect output of worker sl to some merger
4247 
4248 void TProof::RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size)
4249 {
4250  Int_t merger_id = -1;
4251 
4252  if (fMergersByHost) {
4253  for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4254  TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4255  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4256  merger_id = i;
4257  break;
4258  }
4259  }
4260  } else {
4261  merger_id = FindNextFreeMerger();
4262  }
4263 
4264  if (merger_id == -1) {
4265  // No free merger (probably it had crashed before)
4266  AskForOutput(sl);
4267  } else {
4268  TMessage sendoutput(kPROOF_SUBMERGER);
4269  sendoutput << Int_t(kSendOutput);
4270  PDB(kSubmerger, 2)
4271  Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4272 
4273  PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4274  if (!fMergers || fMergers->GetSize() <= merger_id) {
4275  Error("RedirectWorker", "#%d not in list ", merger_id);
4276  return;
4277  }
4278  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4279 
4280  TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4281  sendoutput << merger_id;
4282  sendoutput << hname;
4283  sendoutput << mi->GetPort();
4284  s->Send(sendoutput);
4285  mi->AddMergedObjects(output_size);
4286  mi->AddWorker(sl);
4287  }
4288 }
4289 
4290 ////////////////////////////////////////////////////////////////////////////////
4291 /// Return a merger, which is both active and still accepts some workers to be
4292 /// assigned to it. It works on the 'round-robin' basis.
4293 
4295 {
4296  while (fLastAssignedMerger < fMergers->GetSize() &&
4297  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4298  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4300  }
4301 
4302  if (fLastAssignedMerger == fMergers->GetSize()) {
4303  fLastAssignedMerger = 0;
4304  } else {
4305  return fLastAssignedMerger++;
4306  }
4307 
4308  while (fLastAssignedMerger < fMergers->GetSize() &&
4309  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4310  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4312  }
4313 
4314  if (fLastAssignedMerger == fMergers->GetSize()) {
4315  return -1;
4316  } else {
4317  return fLastAssignedMerger++;
4318  }
4319 }
4320 
4321 ////////////////////////////////////////////////////////////////////////////////
4322 /// Master asks for output from worker sl
4323 
4325 {
4326  TMessage sendoutput(kPROOF_SUBMERGER);
4327  sendoutput << Int_t(kSendOutput);
4328 
4329  PDB(kSubmerger, 2) Info("AskForOutput",
4330  "worker %s was asked to send its output to master",
4331  sl->GetOrdinal());
4332 
4333  sendoutput << -1;
4334  sendoutput << TString("master");
4335  sendoutput << -1;
4336  sl->GetSocket()->Send(sendoutput);
4337  if (IsLite()) fMergePrg.IncreaseNWrks();
4338 }
4339 
4340 ////////////////////////////////////////////////////////////////////////////////
4341 /// Final update of the progress dialog
4342 
4344 {
4345  if (!fPlayer) return;
4346 
4347  // Handle abort ...
4349  if (fSync)
4350  Info("UpdateDialog",
4351  "processing was aborted - %lld events processed",
4353 
4354  if (GetRemoteProtocol() > 11) {
4355  // New format
4356  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4357  } else {
4359  }
4360  Emit("StopProcess(Bool_t)", kTRUE);
4361  }
4362 
4363  // Handle stop ...
4365  if (fSync)
4366  Info("UpdateDialog",
4367  "processing was stopped - %lld events processed",
4369 
4370  if (GetRemoteProtocol() > 25) {
4371  // New format
4372  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4373  } else if (GetRemoteProtocol() > 11) {
4374  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4375  } else {
4377  }
4378  Emit("StopProcess(Bool_t)", kFALSE);
4379  }
4380 
4381  // Final update of the dialog box
4382  if (GetRemoteProtocol() > 25) {
4383  // New format
4384  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4385  10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4386  (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4387  } else if (GetRemoteProtocol() > 11) {
4388  // New format
4389  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4390  7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4391  (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4392  } else {
4393  EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4394  }
4395 }
4396 
4397 ////////////////////////////////////////////////////////////////////////////////
4398 /// Activate the a-sync input handler.
4399 
4401 {
4402  TIter next(fSlaves);
4403  TSlave *sl;
4404 
4405  while ((sl = (TSlave*) next()))
4406  if (sl->GetInputHandler())
4407  sl->GetInputHandler()->Add();
4408 }
4409 
4410 ////////////////////////////////////////////////////////////////////////////////
4411 /// De-activate a-sync input handler.
4412 
4414 {
4415  TIter next(fSlaves);
4416  TSlave *sl;
4417 
4418  while ((sl = (TSlave*) next()))
4419  if (sl->GetInputHandler())
4420  sl->GetInputHandler()->Remove();
4421 }
4422 
4423 ////////////////////////////////////////////////////////////////////////////////
4424 /// Get the active mergers count
4425 
4427 {
4428  if (!fMergers) return 0;
4429 
4430  Int_t active_mergers = 0;
4431 
4432  TIter mergers(fMergers);
4433  TMergerInfo *mi = 0;
4434  while ((mi = (TMergerInfo *)mergers())) {
4435  if (mi->IsActive()) active_mergers++;
4436  }
4437 
4438  return active_mergers;
4439 }
4440 
4441 ////////////////////////////////////////////////////////////////////////////////
4442 /// Create a new merger
4443 
4445 {
4446  PDB(kSubmerger, 2)
4447  Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4448 
4449  PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4450 
4451  if (port <= 0) {
4452  PDB(kSubmerger,2)
4453  Info("CreateMerger", "cannot create merger on port %d - exit", port);
4454  return kFALSE;
4455  }
4456 
4457  Int_t workers = -1;
4458  if (!fMergersByHost) {
4459  Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4460  // Number of pure workers, which are not simply divisible by mergers
4461  Int_t rest = fWorkersToMerge % mergersToCreate;
4462  // We add one more worker for each of the first 'rest' mergers being established
4463  if (rest > 0 && fMergers->GetSize() < rest) {
4464  rest = 1;
4465  } else {
4466  rest = 0;
4467  }
4468  workers = (fWorkersToMerge / mergersToCreate) + rest;
4469  } else {
4470  Int_t workersOnHost = 0;
4471  for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4472  if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4473  }
4474  workers = workersOnHost - 1;
4475  }
4476 
4477  TString msg;
4478  msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4479 
4480  if (gProofServ) {
4482  } else {
4483  Printf("%s",msg.Data());
4484  }
4485  TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4486 
4487  TMessage bemerger(kPROOF_SUBMERGER);
4488  bemerger << Int_t(kBeMerger);
4489  bemerger << fMergers->GetSize();
4490  bemerger << workers;
4491  sl->GetSocket()->Send(bemerger);
4492 
4493  PDB(kSubmerger,2) Info("CreateMerger",
4494  "merger #%d (port: %d) for %d workers started",
4495  fMergers->GetSize(), port, workers);
4496 
4497  fMergers->Add(merger);
4498  fWorkersToMerge = fWorkersToMerge - workers;
4499 
4500  fRedirectNext = workers / 2;
4501 
4502  PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4503  return kTRUE;
4504 }
4505 
4506 ////////////////////////////////////////////////////////////////////////////////
4507 /// Add a bad slave server to the bad slave list and remove it from
4508 /// the active list and from the two monitor objects. Assume that the work
4509 /// done by this worker was lost and ask packerizer to reassign it.
4510 
4511 void TProof::MarkBad(TSlave *wrk, const char *reason)
4512 {
4513  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4514 
4515  // We may have been invalidated in the meanwhile: nothing to do in such a case
4516  if (!IsValid()) return;
4517 
4518  if (!wrk) {
4519  Error("MarkBad", "worker instance undefined: protocol error? ");
4520  return;
4521  }
4522 
4523  // Local URL
4524  static TString thisurl;
4525  if (thisurl.IsNull()) {
4526  if (IsMaster()) {
4527  Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4528  thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4529  if (port > 0) thisurl += TString::Format(":%d", port);
4530  } else {
4531  thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4532  }
4533  }
4534 
4535  if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4536  // Message for notification
4537  const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4538  TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4539  TString msg;
4540  msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4541  src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4542  (reason && strlen(reason)) ? reason : "unknown");
4543  Info("MarkBad", "%s", msg.Data());
4544  // Notify one level up, if the case
4545  // Add some hint for diagnostics
4546  if (gProofServ) {
4547  msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4548  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4549  } else {
4550  msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4551  }
4552  msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4553  msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4554  msg += TString::Format(" +++\n");
4555  if (gProofServ) {
4556  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4557  "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4559  } else {
4560  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4561  "Display(\"*\")\n\n", thisurl.Data());
4562  Printf("%s", msg.Data());
4563  }
4564  } else if (reason) {
4565  if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4566  Info("MarkBad", "worker %s at %s:%d asked to terminate",
4567  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4568  }
4569  }
4570 
4571  if (IsMaster() && reason) {
4572  if (strcmp(reason, kPROOF_TerminateWorker)) {
4573  // if the reason was not a planned termination
4574  TList *listOfMissingFiles = 0;
4575  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4576  listOfMissingFiles = new TList();
4577  listOfMissingFiles->SetName("MissingFiles");
4578  if (fPlayer)
4579  fPlayer->AddOutputObject(listOfMissingFiles);
4580  }
4581  // If a query is being processed, assume that the work done by
4582  // the worker was lost and needs to be reassigned.
4583  TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4584  if (packetizer) {
4585  // the worker was lost so do resubmit the packets
4586  packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4587  }
4588  } else {
4589  // Tell the coordinator that we are gone
4590  if (gProofServ) {
4591  TString ord(wrk->GetOrdinal());
4592  Int_t id = ord.Last('.');
4593  if (id != kNPOS) ord.Remove(0, id+1);
4594  gProofServ->ReleaseWorker(ord.Data());
4595  }
4596  }
4597  } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4598  // We are invalid after this
4599  fValid = kFALSE;
4600  }
4601 
4602  fActiveSlaves->Remove(wrk);
4603  FindUniqueSlaves();
4604 
4605  fAllMonitor->Remove(wrk->GetSocket());
4606  fActiveMonitor->Remove(wrk->GetSocket());
4607 
4609 
4610  if (IsMaster()) {
4611  if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4612  // if the reason was a planned termination then delete the worker and
4613  // remove it from all the lists
4614  fSlaves->Remove(wrk);
4615  fBadSlaves->Remove(wrk);
4616  fActiveSlaves->Remove(wrk);
4617  fInactiveSlaves->Remove(wrk);
4618  fUniqueSlaves->Remove(wrk);
4619  fAllUniqueSlaves->Remove(wrk);
4620  fNonUniqueMasters->Remove(wrk);
4621 
4622  // we add it to the list of terminated slave infos instead, so that it
4623  // stays available in the .workers persistent file
4624  TSlaveInfo *si = new TSlaveInfo(
4625  wrk->GetOrdinal(),
4626  Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4627  0, "", wrk->GetWorkDir());
4629  else delete si;
4630 
4631  delete wrk;
4632  } else {
4633  fBadSlaves->Add(wrk);
4634  fActiveSlaves->Remove(wrk);
4635  fUniqueSlaves->Remove(wrk);
4636  fAllUniqueSlaves->Remove(wrk);
4637  fNonUniqueMasters->Remove(wrk);
4639  wrk->Close();
4640  // Update the mergers count, if needed
4641  if (fMergersSet) {
4642  Int_t mergersCount = -1;
4643  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4644  if (mc) mergersCount = mc->GetVal(); // Value set by user
4645  // Mergers count is set dynamically: recalculate it
4646  if (mergersCount == 0) {
4648  if (activeWorkers > 1) {
4649  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4650  if (activeWorkers / fMergersCount < 2)
4651  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4652  }
4653  }
4654  }
4655  }
4656 
4657  // Update session workers files
4658  SaveWorkerInfo();
4659  } else {
4660  // On clients the proof session should be removed from the lists
4661  // and deleted, since it is not valid anymore
4662  fSlaves->Remove(wrk);
4663  if (fManager)
4664  fManager->DiscardSession(this);
4665  }
4666 }
4667 
4668 ////////////////////////////////////////////////////////////////////////////////
4669 /// Add slave with socket s to the bad slave list and remove if from
4670 /// the active list and from the two monitor objects.
4671 
4672 void TProof::MarkBad(TSocket *s, const char *reason)
4673 {
4674  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4675 
4676  // We may have been invalidated in the meanwhile: nothing to do in such a case
4677  if (!IsValid()) return;
4678 
4679  TSlave *wrk = FindSlave(s);
4680  MarkBad(wrk, reason);
4681 }
4682 
4683 ////////////////////////////////////////////////////////////////////////////////
4684 /// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4685 
4687 {
4688  if (!wrk) {
4689  Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4690  return;
4691  }
4692 
4693  // Send stop message
4694  if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4695  TMessage mess(kPROOF_STOP);
4696  wrk->GetSocket()->Send(mess);
4697  } else {
4698  if (gDebug > 0)
4699  Info("TerminateWorker", "connection to worker is already down: cannot"
4700  " send termination message");
4701  }
4702 
4703  // This is a bad worker from now on
4705 }
4706 
4707 ////////////////////////////////////////////////////////////////////////////////
4708 /// Ask an active worker 'ord' to terminate, i.e. to shutdown
4709 
4710 void TProof::TerminateWorker(const char *ord)
4711 {
4712  if (ord && strlen(ord) > 0) {
4713  Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
4714  if (IsMaster()) {
4715  TIter nxw(fSlaves);
4716  TSlave *wrk = 0;
4717  while ((wrk = (TSlave *)nxw())) {
4718  if (all || !strcmp(wrk->GetOrdinal(), ord)) {
4719  TerminateWorker(wrk);
4720  if (!all) break;
4721  }
4722  }
4723  } else {
4724  TMessage mess(kPROOF_STOP);
4725  mess << TString(ord);
4726  Broadcast(mess);
4727  }
4728  }
4729 }
4730 
4731 ////////////////////////////////////////////////////////////////////////////////
4732 /// Ping PROOF. Returns 1 if master server responded.
4733 
4735 {
4736  return Ping(kActive);
4737 }
4738 
4739 ////////////////////////////////////////////////////////////////////////////////
4740 /// Ping PROOF slaves. Returns the number of slaves that responded.
4741 
4743 {
4744  TList *slaves = 0;
4745  if (list == kAll) slaves = fSlaves;
4746  if (list == kActive) slaves = fActiveSlaves;
4747  if (list == kUnique) slaves = fUniqueSlaves;
4748  if (list == kAllUnique) slaves = fAllUniqueSlaves;
4749 
4750  if (slaves->GetSize() == 0) return 0;
4751 
4752  int nsent = 0;
4753  TIter next(slaves);
4754 
4755  TSlave *sl;
4756  while ((sl = (TSlave *)next())) {
4757  if (sl->IsValid()) {
4758  if (sl->Ping() == -1) {
4759  MarkBad(sl, "ping unsuccessful");
4760  } else {
4761  nsent++;
4762  }
4763  }
4764  }
4765 
4766  return nsent;
4767 }
4768 
4769 ////////////////////////////////////////////////////////////////////////////////
4770 /// Ping PROOF slaves. Returns the number of slaves that responded.
4771 
4773 {
4774  TList *slaves = fSlaves;
4775 
4776  if (slaves->GetSize() == 0) return;
4777 
4778  TIter next(slaves);
4779 
4780  TSlave *sl;
4781  while ((sl = (TSlave *)next())) {
4782  if (sl->IsValid()) {
4783  sl->Touch();
4784  }
4785  }
4786 
4787  return;
4788 }
4789 
4790 ////////////////////////////////////////////////////////////////////////////////
4791 /// Print status of PROOF cluster.
4792 
4793 void TProof::Print(Option_t *option) const
4794 {
4795  TString secCont;
4796 
4797  if (TestBit(TProof::kIsClient)) {
4798  Printf("Connected to: %s (%s)", GetMaster(),
4799  IsValid() ? "valid" : "invalid");
4800  Printf("Port number: %d", GetPort());
4801  Printf("User: %s", GetUser());
4802  Printf("ROOT version|rev: %s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4803  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4805  TSlave *sl = (TSlave *)fActiveSlaves->First();
4806  if (sl) {
4807  TString sc;
4808  if (sl->GetSocket()->GetSecContext())
4809  Printf("Security context: %s",
4810  sl->GetSocket()->GetSecContext()->AsString(sc));
4811  Printf("Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4812  } else {
4813  Printf("Security context: Error - No connection");
4814  Printf("Proofd protocol version: Error - No connection");
4815  }
4816  Printf("Client protocol version: %d", GetClientProtocol());
4817  Printf("Remote protocol version: %d", GetRemoteProtocol());
4818  Printf("Log level: %d", GetLogLevel());
4819  Printf("Session unique tag: %s", IsValid() ? GetSessionTag() : "");
4820  Printf("Default data pool: %s", IsValid() ? GetDataPoolUrl() : "");
4821  if (IsValid())
4822  const_cast<TProof*>(this)->SendPrint(option);
4823  } else {
4824  const_cast<TProof*>(this)->AskStatistics();
4825  if (IsParallel())
4826  Printf("*** Master server %s (parallel mode, %d workers):",
4828  else
4829  Printf("*** Master server %s (sequential mode):",
4830  gProofServ->GetOrdinal());
4831 
4832  Printf("Master host name: %s", gSystem->HostName());
4833  Printf("Port number: %d", GetPort());
4834  if (strlen(gProofServ->GetGroup()) > 0) {
4835  Printf("User/Group: %s/%s", GetUser(), gProofServ->GetGroup());
4836  } else {
4837  Printf("User: %s", GetUser());
4838  }
4839  TString ver;
4840  ver.Form("%s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4841  if (gSystem->Getenv("ROOTVERSIONTAG"))
4842  ver.Form("%s|%s", gROOT->GetVersion(), gSystem->Getenv("ROOTVERSIONTAG"));
4843  Printf("ROOT version|rev|tag: %s", ver.Data());
4844  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4846  Printf("Protocol version: %d", GetClientProtocol());
4847  Printf("Image name: %s", GetImage());
4848  Printf("Working directory: %s", gSystem->WorkingDirectory());
4849  Printf("Config directory: %s", GetConfDir());
4850  Printf("Config file: %s", GetConfFile());
4851  Printf("Log level: %d", GetLogLevel());
4852  Printf("Number of workers: %d", GetNumberOfSlaves());
4853  Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
4854  Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
4855  Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
4856  Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
4857  Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
4858  Printf("Total real time used (s): %.3f", GetRealTime());
4859  Printf("Total CPU time used (s): %.3f", GetCpuTime());
4860  if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
4861  Printf("List of workers:");
4862  TList masters;
4863  TIter nextslave(fSlaves);
4864  while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4865  if (!sl->IsValid()) continue;
4866 
4867  if (sl->GetSlaveType() == TSlave::kSlave) {
4868  sl->Print(option);
4869  } else if (sl->GetSlaveType() == TSlave::kMaster) {
4870  TMessage mess(kPROOF_PRINT);
4871  mess.WriteString(option);
4872  if (sl->GetSocket()->Send(mess) == -1)
4873  const_cast<TProof*>(this)->MarkBad(sl, "could not send kPROOF_PRINT request");
4874  else
4875  masters.Add(sl);
4876  } else {
4877  Error("Print", "TSlave is neither Master nor Worker");
4878  R__ASSERT(0);
4879  }
4880  }
4881  const_cast<TProof*>(this)->Collect(&masters, fCollectTimeout);
4882  }
4883  }
4884 }
4885 
4886 ////////////////////////////////////////////////////////////////////////////////
4887 /// Extract from opt information about output handling settings.
4888 /// The understood keywords are:
4889 /// of=<file>, outfile=<file> output file location
4890 /// ds=<dsname>, dataset=<dsname> dataset name ('of' and 'ds' are
4891 /// mutually exclusive,execution stops
4892 /// if both are found)
4893 /// sft[=<opt>], savetofile[=<opt>] control saving to file
4894 ///
4895 /// For 'mvf', the <opt> integer has the following meaning:
4896 /// <opt> = <how>*10 + <force>
4897 /// <force> = 0 save to file if memory threshold is reached
4898 /// (the memory threshold is set by the cluster
4899 /// admin); in case an output file is defined, the
4900 /// files are merged at the end;
4901 /// 1 save results to file.
4902 /// <how> = 0 save at the end of the query
4903 /// 1 save results after each packet (to reduce the
4904 /// loss in case of crash).
4905 ///
4906 /// Setting 'ds' automatically sets 'mvf=1'; it is still possible to set 'mvf=11'
4907 /// to save results after each packet.
4908 ///
4909 /// The separator from the next option is either a ' ' or a ';'
4910 ///
4911 /// All recognized settings are removed from the input string opt.
4912 /// If action == 0, set up the output file accordingly, if action == 1 clean related
4913 /// output file settings.
4914 /// If the final target file is local then 'target' is set to the final local path
4915 /// when action == 0 and used to retrieve the file with TFile::Cp when action == 1.
4916 ///
4917 /// Output file settings are in the form
4918 ///
4919 /// <previous_option>of=name <next_option>
4920 /// <previous_option>outfile=name,...;<next_option>
4921 ///
4922 /// The separator from the next option is either a ' ' or a ';'
4923 /// Called interanally by TProof::Process.
4924 ///
4925 /// Returns 0 on success, -1 on error.
4926 
4928 {
4929  TString outfile, dsname, stfopt;
4930  if (action == 0) {
4931  TString tagf, tagd, tags, oo;
4932  Ssiz_t from = 0, iof = kNPOS, iod = kNPOS, ios = kNPOS;
4933  while (opt.Tokenize(oo, from, "[; ]")) {
4934  if (oo.BeginsWith("of=")) {
4935  tagf = "of=";
4936  iof = opt.Index(tagf);
4937  } else if (oo.BeginsWith("outfile=")) {
4938  tagf = "outfile=";
4939  iof = opt.Index(tagf);
4940  } else if (oo.BeginsWith("ds")) {
4941  tagd = "ds";
4942  iod = opt.Index(tagd);
4943  } else if (oo.BeginsWith("dataset")) {
4944  tagd = "dataset";
4945  iod = opt.Index(tagd);
4946  } else if (oo.BeginsWith("stf")) {
4947  tags = "stf";
4948  ios = opt.Index(tags);
4949  } else if (oo.BeginsWith("savetofile")) {
4950  tags = "savetofile";
4951  ios = opt.Index(tags);
4952  }
4953  }
4954  // Check consistency
4955  if (iof != kNPOS && iod != kNPOS) {
4956  Error("HandleOutputOptions", "options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4957  return -1;
4958  }
4959 
4960  // Check output file first
4961  if (iof != kNPOS) {
4962  from = iof + tagf.Length();
4963  if (!opt.Tokenize(outfile, from, "[; ]") || outfile.IsNull()) {
4964  Error("HandleOutputOptions", "could not extract output file settings string! (%s)", opt.Data());
4965  return -1;
4966  }
4967  // For removal from original options string
4968  tagf += outfile;
4969  }
4970  // Check dataset
4971  if (iod != kNPOS) {
4972  from = iod + tagd.Length();
4973  if (!opt.Tokenize(dsname, from, "[; ]"))
4974  if (gDebug > 0) Info("HandleOutputOptions", "no dataset name found: use default");
4975  // For removal from original options string
4976  tagd += dsname;
4977  // The name may be empty or beginning with a '='
4978  if (dsname.BeginsWith("=")) dsname.Replace(0, 1, "");
4979  if (dsname.Contains("|V")) {
4980  target = "ds|V";
4981  dsname.ReplaceAll("|V", "");
4982  }
4983  if (dsname.IsNull()) dsname = "dataset_<qtag>";
4984  }
4985  // Check stf
4986  if (ios != kNPOS) {
4987  from = ios + tags.Length();
4988  if (!opt.Tokenize(stfopt, from, "[; ]"))
4989  if (gDebug > 0) Info("HandleOutputOptions", "save-to-file not found: use default");
4990  // For removal from original options string
4991  tags += stfopt;
4992  // It must be digit
4993  if (!stfopt.IsNull()) {
4994  if (stfopt.BeginsWith("=")) stfopt.Replace(0,1,"");
4995  if (!stfopt.IsNull()) {
4996  if (!stfopt.IsDigit()) {
4997  Error("HandleOutputOptions", "save-to-file option must be a digit! (%s)", stfopt.Data());
4998  return -1;
4999  }
5000  } else {
5001  // Default
5002  stfopt = "1";
5003  }
5004  } else {
5005  // Default
5006  stfopt = "1";
5007  }
5008  }
5009  // Remove from original options string
5010  opt.ReplaceAll(tagf, "");
5011  opt.ReplaceAll(tagd, "");
5012  opt.ReplaceAll(tags, "");
5013  }
5014 
5015  // Parse now
5016  if (action == 0) {
5017  // Output file
5018  if (!outfile.IsNull()) {
5019  if (!outfile.BeginsWith("master:")) {
5021  Warning("HandleOutputOptions",
5022  "directory '%s' for the output file does not exists or is not writable:"
5023  " saving to master", gSystem->GetDirName(outfile.Data()).Data());
5024  outfile.Form("master:%s", gSystem->BaseName(outfile.Data()));
5025  } else {
5026  if (!IsLite()) {
5027  // The target file is local, so we need to retrieve it
5028  target = outfile;
5029  if (!stfopt.IsNull()) {
5030  outfile.Form("master:%s", gSystem->BaseName(target.Data()));
5031  } else {
5032  outfile = "";
5033  }
5034  }
5035  }
5036  }
5037  if (outfile.BeginsWith("master:")) {
5038  outfile.ReplaceAll("master:", "");
5039  if (outfile.IsNull() || !gSystem->IsAbsoluteFileName(outfile)) {
5040  // Get the master data dir
5041  TString ddir, emsg;
5042  if (!IsLite()) {
5043  if (Exec("gProofServ->GetDataDir()", "0", kTRUE) == 0) {
5044  TObjString *os = fMacroLog.GetLineWith("const char");
5045  if (os) {
5046  Ssiz_t fst = os->GetString().First('\"');
5047  Ssiz_t lst = os->GetString().Last('\"');
5048  ddir = os->GetString()(fst+1, lst-fst-1);
5049  } else {
5050  emsg = "could not find 'const char *' string in macro log! cannot continue";
5051  }
5052  } else {
5053  emsg = "could not retrieve master data directory info! cannot continue";
5054  }
5055  if (!emsg.IsNull()) {
5056  Error("HandleOutputOptions", "%s", emsg.Data());
5057  return -1;
5058  }
5059  }
5060  if (!ddir.IsNull()) ddir += "/";
5061  if (outfile.IsNull()) {
5062  outfile.Form("%s<file>", ddir.Data());
5063  } else {
5064  outfile.Insert(0, TString::Format("%s", ddir.Data()));
5065  }
5066  }
5067